RocetMQ笔记 RocetMQ笔记概念对齐消息发送普通消息顺序消息延时消息批量消息过滤消息事务消息消息发送方式消息发送DefaultMQProducer的重要属性消息消费DefaultMQPushConsumer——大多都是用推模式的重要属性消费模式高可用认知单master模式多master模式多master模式多slave异步复制模式多master模式多slave同步复制模式 异步刷盘概念对齐NameServerRocketMQ的服务注册中心。启动RocketMQ时先启动NameServer再启动BrokerBroker用于暂存和传输消息启动时向所有的NameServer注册并与NameServer保持长连接默认每30s检查一次broker是否还存活。如果Broker 宕机NameServer会将其从路由注册表中移除——这样可实现高可用Producer 生产者只认topic不认broker。Customer 消费者消息订阅者负责接收消息并消费消费组负载均衡同一条消息在一个消费组内只会被一个消费者消费。消费位点多个消费组可以都订阅一个topic每个组维护自己的消费进度死信队列重试队列每个消费组可以自动绑定自己专属的内部队列死信队列消息达到最大重试次数默认16进入本消费组的死信队列重试队列消费抛异常或返回失败时进入本消费组的重试队列消费模式集群模式组内多实例分摊消息一个消息只会被一个实例消费广播模式组内的每个实例都消费一次生产组无特别用于主要用于事务消息Topic一个生产者可以将消息推给一个或多个topic一个消费者可以订阅一个或多个topictag发送时可以绑定一个tag消费者订阅时可以指定消费消息的tag那么其他tag的消息不会被投递Message Queue消息队列消息发送普通消息引入依赖dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.8.0/version/dependency配置生产者BeanpublicDefaultMQProducermsgMQProducer()throwsMQClientException{DefaultMQProducerproducernew(producerGroup);producer.setNamesrvAddr(nameServerAddress);producer.setSendMsgTimeout(10000);producer.start();returnproducer;}生产者发送消息publicclassMsgProducer{AutowiredDefaultMQProducermsgMQProducer;publicbooleansend(Stringtopic,Stringtag,Stringmsg){try{byte[]bodymsg.getBytes(RemotingHelper.DEFAULT_CHARSET);MessagemessagenewMessage(topic,tag,body);SendResultsendResultmsgMQProducer.send(message);// 同步发送消息到一个brokerif(SendStatus.SEND_OK.equals(sendResult.getSendStatus())){log.info(消息发送成功消息id: {},sendResult.getMsgId());returntrue;}else{log.warn(消息发送失败状态: {}, 消息id: {},sendResult.getSendStatus(),sendResult.getMsgId());returnfalse;}}catch(Exceptione){log.error(e,e);}returnfalse;}}消费者配置BeanpublicDefaultMQPushConsumerMsgConsumer()throwsMQClientException{DefaultMQPushConsumerconsumernewDefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(nameServerAddress);//订阅一个或多个topic并指定tag过滤条件这里指定*表示接收所有tag的消息consumer.subscribe(myTopic,*);consumer.registerMessageListener(msgListener);// 注入消费处理器回调函数MessageListenerConcurrently的实现consumer.setConsumeThreadMin(10);// 最小消费者线程数returnconsumer;}EventListener(ApplicationReadyEvent.class)publicvoidstartMsgConsumer()throwsMQClientException{DefaultMQPushConsumerconsumerMsgConsumer();consumer.start();}消息的发送者步骤1.创建消息生产者并指定生产者组名2. 指定NameServer地址3. 启动producer4. 创建消息对象指定topic、tag和消息体5. 发送消息消息的消费者步骤1.创建消息消费者并指定消费者组名2. 指定NameServer地址3. 订阅Topic和Tag4. 设置回调函数。消费消息5. 启动消费者顺序消息RocketMQ的顺序是指Queue级别的有序使用顺序消费首先要保证消息有序进入到MQ消息依次发送到同一个队列消费时从该Queue中依次拉取就保证了有序全程只有一个Queue参与还有一种是分区有序。当设置了多个消费队列的场景时假设一个订单有创建和付款流程但都是同一个orderId则可以根据orderId取模运算使同一个订单路由到相同队列中实现生产者有序推送。同样消费者也要保证顺序消费使用MessageListenerOrderly做回调消费逻辑上一条没处理完则不会拉取下一条消息MessageListenerConcurrently则是多线程并发消费不保证消息顺序不会阻塞整个队列此时消费者若消费失败则不能将消息放入重试队列——这样会导致乱序。此时会死循环重试消费该消息若没有死信兜底可能导致消息队列卡死延时消息开源版的RocketMQ只支持固定经度的延时消息不能指定任意时间。比如生产者发送时指定DelayTimeLevel参数为16则含义为30min后将消息投递给消费者批量消息TODO 待定不支持延时若消息体积大于4MB最好切分后分批次发送过滤消息绑定Tag事务消息待定不支持延时和批量消息消息发送方式发送方式发送TPS发送结果可靠性解释同步可靠发送快有不丢失发出消息后等待接收方返回响应后发送下一条消息异步可靠发送快有不丢失发送消息不等待接收方直接发送下一条消息。发送方通过回调接口的方式处理接收方的响应单向发送sendOneway最快无可能丢失发送方只负责发无回调函数。耗时一般在微秒级别消息发送DefaultMQProducer的重要属性producerGroup生产者所属组defaultTopicQueueNums默认主题在每一个Broker队列数量sendMsgTimeout发送消息默认超时时间默认3scompressMsgBodyOverHowmuch消息体超过该值则启用压缩默认4kretryTimesWhenSendFailed同步方式发送消息重试次数默认为2总共执行3次retryTimesWhenSendAsyncFailed异步方法发送消息重试次数默认为2retryAnotherBrokerWhenNotStoreOK消息重试时选择另外一个Broker时是否不等待存储结果就返回默认为falsemaxMessageSize允许发送的最大消息长度默认为4M消息消费DefaultMQPushConsumer——大多都是用推模式的重要属性messageModel默认集群模式consumeFromWhere指定消费开始偏移量最大偏移量、最小偏移量、启动时间戳开始消费consumeThreadMin消费者最小线程数量consumeThreadMax消费者最大线程数量pullInterval推模式下任务间隔时间pullBatchSize推模式下任务拉取的条数,默认32条maxReconsumeTimes消息重试次数,-1代表16次consumeTimeout消息消费超时时间subscribe订阅消息并指定队列选择器unsubscribe取消消息订阅registerMessageListener注册监听器消费模式集群模式默认。消费组内的各个实例分摊消费。假设一个topic绑定了三个Queue一个消费组内有三个消费实例那么每个实例只消费其中一个Queue消息会平均散落到不同的Queue上。可以认为是平均消费消息并且该模式下消费进度会维护到broker中。不保证失败重投的消息路由到同一台机器上广播消费积消费组内的每个实例都会被投递一遍消息因此消费进度会持久化到实例的本地所以重复消费的概率稍大些并且不会失败重投且不支持顺序消费不支持重置消费位点高可用认知broker的配置文件中brokerId设置为0——master大于0——slavebrokerNamebroker-a brokerId0# 异步masterbrokerRoleASYNC_MASTER namesrvAddrns1:9876;ns2:9876brokerNamebroker-a brokerId1 brokerRoleSLAVE namesrvAddrns1:9876;ns2:9876单master模式不算集群节点宕机服务就不可用了多master模式多个master节点组成集群单个 master 节点的宕机或重启对应用没有影响该体系中topic对应的队列会分布到多个master上比如topic对应4个队列双master则会对应8个队列每台master对应4个队列方便横向扩展但若master宕机未被消费的消息在这期间是无法被消费的——影响消息的实时性所以引入了多master多slave模式多master模式多slave异步复制模式每个master读写至少对应一个slave读即便master宕机由于主从之间做数据同步所以依然可以在slave中消费存量消息不会影响实时性但需斟酌考虑宕机导致数据没有同步完出现的消息丢失问题多master模式多slave同步复制模式 异步刷盘主从同步保证消息不丢失异步刷盘flushDiskTypeASYNC_FLUSH保证高吞吐量同步刷盘生产者发送每一条消息都同步保存到磁盘异步刷盘默认生产者发送的消息先缓存起来然后定期或达到某一设定值异步将数据刷盘