新聞中心
RocketMQ 是一款開源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù),提供低延時(shí)的、高可靠的消息發(fā)布與訂閱服務(wù)。

成都創(chuàng)新互聯(lián)公司-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比齊河網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式齊河網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋齊河地區(qū)。費(fèi)用合理售后完善,10多年實(shí)體公司更值得信賴。
一、本篇所需文件下載
鏈接:https://pan.baidu.com/s/17iUB1lBOjv4CBAEQFvn65A 提取碼:v0sn
一、Linux環(huán)境搭建
1、安裝 jdk環(huán)境
RocketMQ java編寫,需要jdk環(huán)境
下載jdk 1.7.0_80 上傳到linux ,必須64位,32位RocketMQ不支持
tar -zxvf jdk-7u80-linux-x64.tar.gz //解壓
修改環(huán)境變量 vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.7.0_80
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
刷新配置
source /etc/profile
或jdk1.8下載安裝教程:https://blog.csdn.net/qq_41463655/article/details/99173682
2、安裝RocketMQ
2.1、上傳alibaba-rocketmq-3.2.6.tar.gz 上傳到linux解壓安裝
tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local //解壓到 /usr/local
mv /usr/local/alibaba-rocketmq /usr/local/alibaba-rocketmq-3.2.6 //重命名
ln -s /usr/local/alibaba-rocketmq-3.2.6 rocketmq //安裝
安裝好了 2.2、創(chuàng)建存儲(chǔ)路徑
cd /usr/local/rocketmq
mkdir store
mkdir store/commitlog
mkdir store/consumequeue
mkdir store/index
2.3、日志配置
cd /usr/local/rocketmq
mkdir logs
cd conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
2.4、配置 broker-a.properties / broker-b.properties /usr/local/rocketmq/conf/2m-noslave/ 目錄下
2.4.1、broker-a.properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號(hào)分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù)
defaultTopicQueueNums=4
#是否允許 Broker 自動(dòng)創(chuàng)建Topic,建議線下開啟,線上關(guān)閉
autoCreateTopicEnable=true
#是否允許 Broker 自動(dòng)創(chuàng)建訂閱組,建議線下開啟,線上關(guān)閉
autoCreateSubscriptionGroup=true
#Broker 對外服務(wù)的監(jiān)聽端口
listenPort=10911
#刪除文件時(shí)間點(diǎn),默認(rèn)凌晨 4點(diǎn)
deleteWhen=04
#文件保留時(shí)間,默認(rèn) 48 小時(shí)
fileReservedTime=120
#commitLog每個(gè)文件的大小默認(rèn)1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個(gè)文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲(chǔ)路徑
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存儲(chǔ)路徑
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消費(fèi)隊(duì)列存儲(chǔ)路徑存儲(chǔ)路徑
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存儲(chǔ)路徑
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存儲(chǔ)路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存儲(chǔ)路徑
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步復(fù)制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發(fā)消息線程池?cái)?shù)量
#sendMessageThreadPoolNums=128
#拉消息線程池?cái)?shù)量
#pullMessageThreadPoolNums=128
2.4.2、broker-b.properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號(hào)分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù)
defaultTopicQueueNums=4
#是否允許 Broker 自動(dòng)創(chuàng)建Topic,建議線下開啟,線上關(guān)閉
autoCreateTopicEnable=true
#是否允許 Broker 自動(dòng)創(chuàng)建訂閱組,建議線下開啟,線上關(guān)閉
autoCreateSubscriptionGroup=true
#Broker 對外服務(wù)的監(jiān)聽端口
listenPort=10911
#刪除文件時(shí)間點(diǎn),默認(rèn)凌晨 4點(diǎn)
deleteWhen=04
#文件保留時(shí)間,默認(rèn) 48 小時(shí)
fileReservedTime=120
#commitLog每個(gè)文件的大小默認(rèn)1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個(gè)文件默認(rèn)存30W條,根據(jù)業(yè)務(wù)情況調(diào)整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲(chǔ)路徑
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存儲(chǔ)路徑
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消費(fèi)隊(duì)列存儲(chǔ)路徑存儲(chǔ)路徑
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存儲(chǔ)路徑
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存儲(chǔ)路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存儲(chǔ)路徑
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步復(fù)制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發(fā)消息線程池?cái)?shù)量
#sendMessageThreadPoolNums=128
#拉消息線程池?cái)?shù)量
#pullMessageThreadPoolNums=128
兩個(gè)配置文件需修改處
brokerName=broker-a|broker-b 集群a服務(wù)器配置修改為 brokerName=broker-a
brokerName=broker-a|broker-b 集群b服務(wù)器配置修改為 brokerName=broker-b
2.5、修改啟動(dòng)參數(shù) /rocketm/bin下 (jvm)
runbroker.sh 的JAVA_OPT runserver.sh 的JAVA_OPT
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
修改為
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"
2.6、啟動(dòng) NameServer 安裝目錄 /usr/local/ /rocketmq/bin 目錄下
nohup sh mqnamesrv &
2.7、啟動(dòng) BrokerServer /rocketmq/bin 目錄下
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
netstat -ntlp
查看啟動(dòng)狀態(tài)
jps
結(jié)果如下啟動(dòng)成功
3.修改linux 服務(wù)器host
本機(jī)ip,配置域名
192.168.177.128 rocketmq-nameserver1
192.168.177.128 rocketmq-master1
192.168.111.129 rocketmq-nameserver2
192.168.111.129 rocketmq-master2
圖片
4.安裝后臺(tái)管理平臺(tái)
解壓安裝 tomcat 7.0到 /usr/local/
tar -zxvf apache-tomcat-7.0.65.tar.gz -C /usr/local
把rocketmq-web-console.war 復(fù)制到apache-tomcat-7.0.65 的webapps 目錄下 啟動(dòng)tomcat 自動(dòng)解壓,然后修改config /rocketmq-web-console/WEB-INF/classes 的 config.properties 配置 修改ip
單服務(wù)器
rocketmq.namesrv.addr=192.168.177.128:9876
多服務(wù)器
rocketmq.namesrv.addr=192.168.177.128:9876;192.168.177.129:9876
關(guān)閉tomcat / 重啟tomcat
關(guān)閉防火墻
systemctl disable firewalld 或 chkconfig iptables off
訪問 —-》 ip:8080/rocketmq-web-console 出現(xiàn)下方界面就ok了
java 操作
1、生產(chǎn)者
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("192.168.177.128:9876;192.268.177.129:9876");
producer.setInstanceName("producer");
producer.start();
try {
for (int i = 0; i "test-topic",
"TagA",
("test-topic-"+i).getBytes()
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
2、消費(fèi)者
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("192.168.177.128:9876;192.268.177.129:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("test-topic", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) { System.out.println(msg.getMsgId()+
"---"+new String(msg.getBody())); } //返回成功消費(fèi)狀態(tài)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println(
"Consumer Started."); } }
會(huì)出現(xiàn)冪等問題,使用全局id,或者時(shí)間戳,業(yè)務(wù)的唯一id 進(jìn)行判斷,使用redis等日志記錄判斷是否存在,存在表示已經(jīng)成功消費(fèi)
當(dāng)前名稱:Linux下部署分布式消息系統(tǒng)RocketMQ
轉(zhuǎn)載來于:http://fisionsoft.com.cn/article/ccsjgoo.html


咨詢
建站咨詢
