RocketMQ安装

starlin 686 2018-05-25

开始安装RocketMQ之前,需要自行安装Maven,JDK,并配置好环境变量,具体的安装过程这里就省略了
本文中使用的环境为:
JDK: 1.8.0_91
Maven: 3.5.3(印象中RocketMQ是对Maven版本有要求的)
RocketMQ: 4.2.0
CentOS: 6.6

下载

这个不用多说,点击下载即可
RocketMQ下载

下载后解压到相应的目录即可,分别执行以下命令即可

unzip rocketmq-all-4.2.0-source-release.zip
cd rocketmq-all-4.2.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/apache-rocketmq

启动Name Server

建议先看下后面的采坑记录第一条
这里必须要说明下,一定要在该目录下distribution/target/apache-rocketmq执行下面的命令

nohup sh bin/mqnamesrv &    //启动name server
tail -f ~/logs/rocketmqlogs/namesrv.log

出现一下提示,则启动成功

2018-05-22 17:32:44 INFO main - The Name Server boot success. serializeType=JSON

启动broker

建议先看下后面的采坑记录第一条

nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true & //启动broker
tail -f ~/logs/rocketmqlogs/broker.log

出现一下提示,则表示启动成功:

2018-05-23 14:20:52 INFO main - register broker to name server localhost:9876 OK
2018-05-23 14:20:52 INFO main - The broker[Master, 192.168.235.134:10911] boot success. serializeType=JSON and name server is localhost:9876

停止服务

关闭namesrv服务:

sh bin/mqshutdown namesrv

关闭broker服务 :

sh bin/mqshutdown broker

管理控制台

这里顺便提下,RocketMQ有一个对其扩展的开源项目incubator-rocketmq-externals,这个项目中有一个子模块叫“rocketmq-console”,这个便是管理控制台项目了。

下载好了以后,,通过命令进入到rocketmq-console子目录,通过maven对其进行编译打包
(windows用户请在管理员运行cmd)

mvn clean package -Dmaven.test.skip=true

打包成功后命令行如下图所示:(注意win10下面用PowerShell运行此命令会报错,LZ也不知道什么鬼)

得到rocketmq-console-ng-1.0.0-sources.jar之后,找到application.properties修改rocketmq.config.namesrvAddr的值。

rocketmq.config.namesrvAddr=192.168.235.134:9876

运行命令,启动控制台:

java -jar rocketmq-console-ng-1.0.0.jar

启动成功后如下显示:

[2018-05-25 15:55:33.508]  INFO No TaskScheduler/ScheduledExecutorService bean found for scheduled processing
[2018-05-25 15:55:33.567]  INFO Initializing ProtocolHandler ["http-nio-8080"]
[2018-05-25 15:55:33.639]  INFO Starting ProtocolHandler [http-nio-8080]
[2018-05-25 15:55:33.697]  INFO Using a shared selector for servlet write/read
[2018-05-25 15:55:33.766]  INFO Tomcat started on port(s): 8080 (http)
[2018-05-25 15:55:33.795]  INFO Started App in 19.84 seconds (JVM running for 21.989)

我们就可以通过浏览器访问localhost:8080 进入控制台界面了,如下图:

后面的大家自己慢慢玩吧,就不多说了

Demo

下面建立一个demo演示一下RocketMQ

  1. 首先新建一个Maven工程,在pom文件里面加入rocketmq
 <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.2.0</version>
</dependency>
  1. 建立生产者demo
public class ProducerDemo {
    // 在生产者范例中,我们制定了消费组为"mengzhidu-user",
    // 并且我们制定了命名服务器的地址,然后我们就开始了生产者,
    // 然后我们在一个for循环中发送了五次消息,然后我们在生产者中获取了消息的状态。
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("starlin-user");
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr("192.168.235.134:9876");
        /**
        * Producer对象在使用之前必须要调用start初始化,初始化一次即可
        * 注意:切记不可以在每次发送消息时,都调用start方法
        */
        producer.start();

        for (int i = 0; i < 5; i++) {
            Message message = new Message("user", "push", String.valueOf(i), new String("starlin-" + i).getBytes());
            SendResult result = producer.send(message);
            System.out.println("消息id为:" + result.getMsgId() + "发送状态为:" + result.getSendStatus());
        }
    }
}

在生产者范例中,我们制定了消费组为"starlin-user",并且我们制定了命名服务器的地址,
然后我们就开始了生产者,然后我们在一个for循环中发送了五次消息,然后我们在生产者中获取了消息的状态。

  1. 建立消费demo
public class ConsumerDemo {
    // 我们的消费者也制定了主题为"starlin-user",
    // 而且它也指定了命名服务器的地址,然后设置了在收到消息后的处理方式,然后就启动了消费者。
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("starlin-user");
        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr("192.168.235.134:9876");
        consumer.subscribe("user","push");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently(){

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                Message message = list.get(0);
                System.out.println("消费者收到的消息内容:" + new String(message.getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

我们的消费者也制定了主题为"starlin-user",而且它也指定了命名服务器的地址,然后设置了在收到消息后的处理方式,然后就启动了消费者。

先启动消费者Consumer,我们会看到消费者端打印如下:

消费者收到的消息内容:starlin-0
消费者收到的消息内容:starlin-1
消费者收到的消息内容:starlin-4
消费者收到的消息内容:starlin-3
消费者收到的消息内容:starlin-2

然后我们启动生产者,我们会看到生产者端打印如下:

消息id为:0A320FB739B014DAD5DC7475E8100000发送状态为:SEND_OK
消息id为:0A320FB739B014DAD5DC7475E82D0001发送状态为:SEND_OK
消息id为:0A320FB739B014DAD5DC7475E8310002发送状态为:SEND_OK
消息id为:0A320FB739B014DAD5DC7475E83A0003发送状态为:SEND_OK
消息id为:0A320FB739B014DAD5DC7475E8410004发送状态为:SEND_OK

踩坑记录

  1. 启动NameServer或broker是提示找不到JAVA_HOME

ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

解决办法:

在该目录中distribution/target/apache-rocketmq/bin找到runserver.sh文件

vim runserver.sh

#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

JAVA_HOME=/usr/java/jdk1.8.0_91 //这是我的java目录
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

同样的后面还需要启动broker,没有配置java环境也是会报错的,后面就不详述了

  1. 无法分配内存Cannot allocate memory

错误信息如下:

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000006c0000000, 2147483648, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 2147483648 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /root/rocketmq-all-4.2.0/distribution/bin/hs_err_pid13462.log

主要原因是RocketMQ默认分配的内存有点大,我们在虚拟机环境下,不需要配置那么大的内存,所以需要修改下默认配置,NameServer和broker都需要改的
NameServer路径:/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/bin/runserver.sh
Borker路径:/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/bin/runbroker.sh

#===========================================================================================
# JVM Configuration
#===========================================================================================
#JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -server -Xms512M -Xmx512M -Xmn256M -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" //改这里的就好了
....省略
  1. No route info of this topic
    出现这个错误有点不好搞,大概分以下几步:
    • 检查各配置是否正确
    • 启动 broker时加上参数atuoCreateTopicEnable=true
    • 多看日志,总是能看出点端倪

# RocketMQ