开始安装RocketMQ之前,需要自行安装Maven,JDK,并配置好环境变量,具体的安装过程这里就省略了
本文中使用的环境为:
JDK: 1.8.0_91
Maven: 3.5.3(印象中RocketMQ是对Maven版本有要求的)
RocketMQ: 4.2.0
CentOS: 6.6
下载
这个不用多说,点击下载即可
RocketMQ下载
下载后解压到相应的目录即可,分别执行以下命令即可
unzip rocketmq-all-4.2.0-source-release.zip
cd rocketmq-all-4.2.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/apache-rocketmq
启动Name Server
建议先看下后面的采坑记录第一条
这里必须要说明下,一定要在该目录下distribution/target/apache-rocketmq执行下面的命令
nohup sh bin/mqnamesrv & //启动name server
tail -f ~/logs/rocketmqlogs/namesrv.log
出现一下提示,则启动成功
2018-05-22 17:32:44 INFO main - The Name Server boot success. serializeType=JSON
启动broker
建议先看下后面的采坑记录第一条
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true & //启动broker
tail -f ~/logs/rocketmqlogs/broker.log
出现一下提示,则表示启动成功:
2018-05-23 14:20:52 INFO main - register broker to name server localhost:9876 OK
2018-05-23 14:20:52 INFO main - The broker[Master, 192.168.235.134:10911] boot success. serializeType=JSON and name server is localhost:9876
停止服务
关闭namesrv服务:
sh bin/mqshutdown namesrv
关闭broker服务 :
sh bin/mqshutdown broker
管理控制台
这里顺便提下,RocketMQ有一个对其扩展的开源项目incubator-rocketmq-externals,这个项目中有一个子模块叫“rocketmq-console”,这个便是管理控制台项目了。
下载好了以后,,通过命令进入到rocketmq-console子目录,通过maven对其进行编译打包
(windows用户请在管理员运行cmd)
mvn clean package -Dmaven.test.skip=true
打包成功后命令行如下图所示:(注意win10下面用PowerShell运行此命令会报错,LZ也不知道什么鬼)
得到rocketmq-console-ng-1.0.0-sources.jar之后,找到application.properties修改rocketmq.config.namesrvAddr的值。
rocketmq.config.namesrvAddr=192.168.235.134:9876
运行命令,启动控制台:
java -jar rocketmq-console-ng-1.0.0.jar
启动成功后如下显示:
[2018-05-25 15:55:33.508] INFO No TaskScheduler/ScheduledExecutorService bean found for scheduled processing
[2018-05-25 15:55:33.567] INFO Initializing ProtocolHandler ["http-nio-8080"]
[2018-05-25 15:55:33.639] INFO Starting ProtocolHandler [http-nio-8080]
[2018-05-25 15:55:33.697] INFO Using a shared selector for servlet write/read
[2018-05-25 15:55:33.766] INFO Tomcat started on port(s): 8080 (http)
[2018-05-25 15:55:33.795] INFO Started App in 19.84 seconds (JVM running for 21.989)
我们就可以通过浏览器访问localhost:8080 进入控制台界面了,如下图:
后面的大家自己慢慢玩吧,就不多说了
Demo
下面建立一个demo演示一下RocketMQ
- 首先新建一个Maven工程,在pom文件里面加入rocketmq
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
- 建立生产者demo
public class ProducerDemo {
// 在生产者范例中,我们制定了消费组为"mengzhidu-user",
// 并且我们制定了命名服务器的地址,然后我们就开始了生产者,
// 然后我们在一个for循环中发送了五次消息,然后我们在生产者中获取了消息的状态。
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("starlin-user");
//指定NameServer地址,多个地址以 ; 隔开
producer.setNamesrvAddr("192.168.235.134:9876");
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
for (int i = 0; i < 5; i++) {
Message message = new Message("user", "push", String.valueOf(i), new String("starlin-" + i).getBytes());
SendResult result = producer.send(message);
System.out.println("消息id为:" + result.getMsgId() + "发送状态为:" + result.getSendStatus());
}
}
}
在生产者范例中,我们制定了消费组为"starlin-user",并且我们制定了命名服务器的地址,
然后我们就开始了生产者,然后我们在一个for循环中发送了五次消息,然后我们在生产者中获取了消息的状态。
- 建立消费demo
public class ConsumerDemo {
// 我们的消费者也制定了主题为"starlin-user",
// 而且它也指定了命名服务器的地址,然后设置了在收到消息后的处理方式,然后就启动了消费者。
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("starlin-user");
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.235.134:9876");
consumer.subscribe("user","push");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Message message = list.get(0);
System.out.println("消费者收到的消息内容:" + new String(message.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
我们的消费者也制定了主题为"starlin-user",而且它也指定了命名服务器的地址,然后设置了在收到消息后的处理方式,然后就启动了消费者。
先启动消费者Consumer,我们会看到消费者端打印如下:
消费者收到的消息内容:starlin-0
消费者收到的消息内容:starlin-1
消费者收到的消息内容:starlin-4
消费者收到的消息内容:starlin-3
消费者收到的消息内容:starlin-2
然后我们启动生产者,我们会看到生产者端打印如下:
消息id为:0A320FB739B014DAD5DC7475E8100000发送状态为:SEND_OK
消息id为:0A320FB739B014DAD5DC7475E82D0001发送状态为:SEND_OK
消息id为:0A320FB739B014DAD5DC7475E8310002发送状态为:SEND_OK
消息id为:0A320FB739B014DAD5DC7475E83A0003发送状态为:SEND_OK
消息id为:0A320FB739B014DAD5DC7475E8410004发送状态为:SEND_OK
踩坑记录
- 启动NameServer或broker是提示找不到JAVA_HOME
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
解决办法:
在该目录中distribution/target/apache-rocketmq/bin找到runserver.sh文件
vim runserver.sh
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
JAVA_HOME=/usr/java/jdk1.8.0_91 //这是我的java目录
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
同样的后面还需要启动broker,没有配置java环境也是会报错的,后面就不详述了
- 无法分配内存Cannot allocate memory
错误信息如下:
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000006c0000000, 2147483648, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 2147483648 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /root/rocketmq-all-4.2.0/distribution/bin/hs_err_pid13462.log
主要原因是RocketMQ默认分配的内存有点大,我们在虚拟机环境下,不需要配置那么大的内存,所以需要修改下默认配置,NameServer和broker都需要改的
NameServer路径:/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/bin/runserver.sh
Borker路径:/rocketmq-all-4.2.0/distribution/target/apache-rocketmq/bin/runbroker.sh
#===========================================================================================
# JVM Configuration
#===========================================================================================
#JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -server -Xms512M -Xmx512M -Xmn256M -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" //改这里的就好了
....省略
- No route info of this topic
出现这个错误有点不好搞,大概分以下几步:- 检查各配置是否正确
- 启动 broker时加上参数atuoCreateTopicEnable=true
- 多看日志,总是能看出点端倪