RocketMQ架构设计

在RocketMQ上主要分为4部分,Producer、Consumer、NameServer、Broker
- Producer:消息生产者,Producer通过NameServer拉取所有Broker集群,通过负载均衡择相对应的Broker进行消息投递。
- Consumer:消息消费者,Consumer通过NameServer拉取所有Broker集群,通过pull和push2种模式对消息进行消费。
- NameServer:NameServer是一个简化版的注册中心,无状态节点,主要主持Broker的动态注册与发现,功能主要包括对Broker的管理。接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。每个NameServer之间不进行通信,他们都存储了整个Broker集群的数据
- Broker:消息中转角色,负责存储消息,转发消息;为生产者,消费者提供服务。
基本概念
生产者组
同一类Producer组成一个集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
消息生产者
消息生产者会把业务应用系统里产生的消息发送,到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
消费者组
同一类Consumer组成一个集合,这类Consumer通常消费同一类消息且消费逻辑一致。
注:消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费和广播消费。
- 集群消费:相同消费者组的每个Consumer实例平均分摊消息。
- 广播消费:相同消费者组的每个Consumer实例都接收全量的消息。
消息消费者
负责消费消息,一般是后台系统负责异步消费。消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。消息消费者提供了两种消费形式:拉取式消费、推动式消费。
- 拉取式消费:通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
- 推动式消费:Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。推动式消费其底层就是对拉取式消费进行了一次封装,是长连接。
主题(Topic)
一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
同一个Topic下的数据,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。MessageQueue是生产者发送消息与消费者消费消息的最小单位。
服务器(Broker)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
NameServer
NameServer充当路由消息的提供者。Broker Server会在启动时向所有的NameServer注册自己的服务信息,并且后续通过心跳请求的方式保证这个服务信息的实时性。生产者或消费者能够通过NameServer查找各主题相应的Broker列表。
多个NameServer实例组成无状态集群,相互独立,没有信息交换。这种特性也就意味着NameServer中任意的节点挂了,只要有一台服务节点正常,整个路由服务就不会有影响。
消息
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
并且Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
Broker集群模式
集群在RocketMQ中分为2种,普通集群和Dledger高可用集群。Dledger高可用集群是在4.5版本后引入的高可用集群。
普通集群
普通集群会给每个节点分配一个固定的角色,master负责响应客户端的请求,并存储消息。slave则只负责对master的消息进行同步保存,并响应部分客户端的读请求。消息同步方式分为同步同步和异步同步。
这种集群模式下各个节点的角色无法进行切换,也就是说,master节点挂了,这一组Broker就不可用了。
Dledger高可用集群
普通集群有个致命的缺点就是不能高可用。就是master一挂,集群就不可用了。Dledger集群为了解决高可用问题引入的一种技术。
在这个集群模式下如果master挂了会选举一个新的master提供服务。Dledger都是通过Raft协议解决的选举和副本同步问题。
Dledger的职责
- 接管Broker的CommitLog消息存储
- 从集群中选举出master节点
- 完成master节点往slave节点的消息同步
Dledger选举
Dledger是使用Raft算法来进行节点选举
每个节点有三个状态,leader,follower和candidate。正常运行的情况下,集群中会有一个leader,其他都是follower,而客户端的请求全部由leader处理,即使有客户端请求到了一个follower,也会将请求转发到leader。
集群启动阶段:
集群刚启动时,每个节点都是follower状态,之后集群内部会发送一个timeout信号,所有follower就转成candidate去拉取选票,获得大多数选票的节点选为leader,其他候选人转为follower。如果一个timeout信号发出时,没有选出leader,将会重新开始一次新的选举。而Leader节点会往其他节点发送心跳信号,确认他的leader状态
集群运行阶段:
集群启动后每个节点都会启动一个定时器,一段时间没有收到leader心跳,就转化为candidate状态。然后向其他成员发起投票请求,如果收到半数以上成员的投票,则Candidate会晋升为leader。然后leader也有可能会退化成follower。
Dledger多副本消息同步
数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。
Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。
Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。
Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。
这样,就基于Raft协议完成了两阶段的数据同步。
消息存储
分布式消息中间件为了保证性能,读写操作基本都是通过内存来操作的,内存操作在服务器宕机的情况下就存在消息丢失的情况。这时候我们肯定是对消息会进行持久化的。而持久化MQ一般都是采用的文件存储。
磁盘文件读写(顺序写,零拷贝)
文件存储必然带来的问题就是性能问题,文件存储性能一般而言都不高。但是RocketMQ大家都知道性能是很好的,是可以支持到百万级QPS的。它的优良性能是顺序写和零拷贝带来的。
顺序写:目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。RocketMQ会提前分配1个G的磁盘空间给commitLog.之后生产者发送的消息直接分配到这个commitLog上去。这就是顺序写。
零拷贝:Linux操作系统分为用户态和内核态。IO操作不可避免的都会涉及到这2种状态的切换。
假设一台服务器发送了一条数据给客户端,通常会进行4次数据复制。
- 从磁盘文件复制数据到内核态内存
- 从内核态内存复制数据到用户态客户端的内存上,如果是Java就是JVM虚拟内存
- 从用户态客户端内存复制数据到内核态内存
- 最后从网络驱动的内核态内存复制到网卡中进行传输。

而RocketMQ中用来mmap技术,不在将文件复制到用户内存中去。直接将内核内存进行映射。而这种零拷贝技术有一个限制,一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了

消息结构
RocketMQ消息的存储分为三个部分:commitlog、consumerqueue、index
- commitlog:生产者发送的消息存入到commitlog,commitlog有多个文件,每个文件固定大小1G。以第一条消息的偏移量为文件名。
- consumerqueue:consumerqueue文件夹下是按Topic来分的,一个Topic一个文件夹。topic文件夹下是按队列来分的,一个队列一个文件夹。队列文件记录当前队列被哪些消费者组消费到了哪一条commitlog,记录的commitlog中的索引。
- index:为了消息查询提供了一种通过key或时间区间来查询消息的方法

消息主从复制
Broker以集群启动必然包含一个功能便是主从复制方式。RocketMQ消息复制得方式有2个,同步复制和异步复制。
同步复制
同步复制就是等Master和Slave全部写入消息成功,反馈给客户端消息写入成功。
优势:数据更加完整,Slave有全部的数据备份,即使Master挂了也没事。
劣势:会增加数据写入延迟,降低系统吞吐量。
异步复制
异步复制只要Master复制成功直接返回客户端消息写入成功,然后异步复制消息给Slave
优势:数据写入延迟低,系统吞吐量高。
劣势:消息丢失可能性高。Master节点故障,可能会有数据没有同步到Slave,导致消息丢失。
配置
消息复制方式是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个
消息刷盘机制
上面有副本同步方式,但只涉及到消息到服务端,此时消息并没有写入磁盘,写磁盘与另一个刷盘机制有关。
同步刷盘
在返回写成功状态时,消息已经被写入磁盘。
具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
优势:数据更加完整,Slave有全部的数据备份,即使Master挂了也没事。
劣势:会增加数据写入延迟,降低系统吞吐量。
异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
优势:数据写入延迟低,系统吞吐量高。
劣势:消息丢失可能性高。Master节点故障,可能会有数据没有同步到Slave,导致消息丢失。
配置
消息复制方式是通过Broker配置文件里的flushDiskType参数进行设置的,这个参数可以被设置成ASYNC_FLUSH:异步刷盘
SYNC_FLUSH:同步刷盘。
生产者发送方式
同步发送(等待消息返回继续操作)
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("sync_msg_simple_group");
producer.setNamesrvAddr("xx.xx.xx.xx:9876");
producer.start();
String message = "Hello dm";
for (int i = 0; i < 10; i++) {
Message msg = new Message("TopicSync","TagS",(message+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
异步发送(消息发完直接进行后续操作,但会提供一个消息发送完成的回调方法)
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async_msg_simple_group");
producer.setNamesrvAddr("xx.xx.xx.xx:9876");
producer.start();
//设置发送失败重试机制
producer.setRetryTimesWhenSendAsyncFailed(5);
String message = "Hello dm async ";
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
Message msg = new Message("TopicTest","TagSendOne","OrderID188", (message + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//消息发送成功后,执行回调函数
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
单向发送(消息发完就不管了)
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("oneway_msg_simple_group");
producer.setNamesrvAddr("xx.xx.xx.xx:9876");
producer.start();
String message = "Hello dm oneway ";
for (int i = 0; i < 1; i++) {
Message msg = new Message("TopicTest","TagSendOne","OrderID188", (message + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
}
producer.shutdown();
}
消费者消费方式
推模式(消费者等待Broker把消息推送过来)
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("msg_simple_group");
consumer.setNamesrvAddr("xx.xx.xx.xx:9876");
consumer.subscribe("TopicStudent", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
拉模式(消费者主动去Broker上拉取消息)
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("async_msg_simple_group");
consumer.setNamesrvAddr("xx.Xx.Xx.Xx:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicSync");
for (MessageQueue mq : mqs) {
System.err.println("Consume from the queue: " + mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println(pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
顺序消息
生产者往topic上面发送消息,消息会分配到不同的queue上,而且消费者消费消息也是一个队列取一个进行消费,这时候消费者消费消息就不是顺序的了,要想顺序消费就让它你想顺序的放在同一个队列上。消费者再注册MessageListenerOrderly,按队列消费,一个队列消费完在换下一个队列,这样消费就是顺序的。但是这样是局部有序,要想全局有序就让所有消息放在一个队列消费,但这样性能瓶颈会较大,一般局部有序就可以满足需求了。
生产者
这里的逻辑就是通过订单编号对mqs(队列个数)进行取模,分配到队列上去。这样就是相同订单编号进入相同队列
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");
producer.setNamesrvAddr("xx.xx.xx.xx:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
body.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 生成模拟订单数据
*/
private static List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("购物车");
orderList.add(orderDemo);
return orderList;
}
@ToString
@Data
private static class OrderStep {
private long orderId;
private String desc;
}
消费者
消费者这里主要改动了监听的方法,消费者注册了监听-MessageListenerOrderly,可以顺序消费每一条队列
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");
consumer.setNamesrvAddr("xx.xx.xx.xx:9876");
/**
* 设置消费位置
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true);
Random random = new Random();
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume来消费, 订单对每个queue(分区)有序
try {
System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
延迟消息
在其他的消息中间件中,对延迟消息都没有支持,只能通过其他的方法实现延迟队列,比如RabbitMQ一般通过TTL+死信或者延迟插件实现延迟队列。而RocketMQ原生支持延时队列,就是调用producer.send,消息不会直接发送出去而是会等待一段时间。不过开源版只支持18个级别;messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m
6m 7m 8m 9m 10m 20m 30m 1h 2h。商业版本是支持任意时刻的设置。
生产者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleConsumer");
producer.setNamesrvAddr("xx.xx.xx.xx:9876");
producer.start();
int totalMessagesToSend = 3;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
//延时消费-设置延迟级别为6 2min
message.setDelayTimeLevel(6);
producer.send(message);
}
System.out.printf("message send is completed .%n");
producer.shutdown();
}
广播消息
集群消费模式下,每一条消息只会被一个消费组的一个消费者消费,而广播模式,是把消息发送给所有订阅了该Topic的消费者,不管消费者是不是同一个消费组。
消费者
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setNamesrvAddr("xx.xx.xx.xx:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 设置为广播消息
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
批量消息
批量消息是将多个消息组合成一条批量消息发送出去,是为了减少网络IO,提升系统吞吐量。
注意:官方对于批量消息大小的限制是1M,实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。
生产者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("batch_group");
producer.setNamesrvAddr("xx.xx.xx.xx:9876");
producer.start();
String topic = "BatchTest";
// 模拟消息
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
ListSplitter splitter = new ListSplitter(messages);
// 对批量消息进行拆分
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
消息拆分
public class ListSplitter implements Iterator<List<Message>> {
// 一个批量消息大小
private final int SIZE_LIMIT = 1000 * 1000 * 1;//1MB
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
//遍历消息准备拆分
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
过滤消息
RocketMQ设计了很多种消息过滤方式,简单的一般可以用Tag进行过滤。复杂一点的可以使用参数过滤加SQL92.
消息过滤一般是在broker端进行过滤,性能上也是比较高的。
注意:推模式的消费者才可以使用SQL过滤,拉模式不可以
SQL92语法
- 数值比较:比如:>、>=、<、<=、BETWEEN、=
- 字符比较:比如:=、<>、IN
- 空串判断:IS NULL 、 IS NOT NULL
- 逻辑符号:AND,OR,NOT;
- 常量支持类型为:
数值,比如:123,3.1415;
字符,比如:’abc’,必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE
生产者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("xx.xx.xx.xx:9876");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
消费者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("xx.xx.xx.xx:9876");
// SQL 过滤
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
事务消息
事务消息和延时消息一样都是RocketMQ特有的。事务消息官方解释是分布式系统中保持最终一致性的2阶段提交的消息实现。简单来说就是保证本地事务执行与发送消息到broker2个操作的原子性。
注:事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的只涉及到消息发送者。
事务消息使用限制
- 事务消息不支持延迟消息和批量消息
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为
- 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数
- 事务性消息可能不止一次被检查或消费
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者
实现机制

步骤:
- 发送方将half事务消息发送到broker
- broker把消息持久化成功,向发送方返回ACK确认收到消息。
- 发送方开始执行本地事务
- 发送方根据本地事务执行结果向broker发送二次确认
- broker收到发送方commit状态则将half事务消息标记为可投递,订阅方可以收到消息。。发送方rollback状态,则删除half事务消息。。如果步骤4的二次确认没有发送到broker则会过一段时间broker会对发送方进行消息回查,发送方收到消息回查,检查对应消息的本地事务执行结果。发送方根据回查结果继续步骤5。
生产者代码
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("xx.xx.xx.Xx:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
public class TransactionListenerImpl implements TransactionListener {
/**
* 在提交完事务消息后执行。
* 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
* 返回ROLLBACK_MESSAGE状态的消息会被丢弃。
* 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
//TagA的消息会立即被消费者消费到
if (StringUtils.contains(tags, "TagA")) {
return LocalTransactionState.COMMIT_MESSAGE;
//TagB的消息会被丢弃
} else if (StringUtils.contains(tags, "TagB")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
//其他消息会等待Broker进行事务状态回查。
} else {
return LocalTransactionState.UNKNOW;
}
}
/**
* 在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags();
//TagC的消息过一段时间会被消费者消费到
if (StringUtils.contains(tags, "TagC")) {
return LocalTransactionState.COMMIT_MESSAGE;
//TagD的消息也会在状态回查时被丢弃掉
} else if (StringUtils.contains(tags, "TagD")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
//剩下TagE的消息会在多次状态回查后最终丢弃
} else {
return LocalTransactionState.UNKNOW;
}
}
}