消息中间件rocketmq_http代理和socket代理

(90) 2024-06-13 23:01:01

Rocket

第一章 环境准备

1.windows 下安装

安装

  • 下载
    下载地址: http://rocketmq.apache.org/release_notes/release-notes-4.2.0/

安装参考1

安装参考2

安装控制台

启动运行

如果broker突然好好的启动不了,试试这个帖子:
参考

2.测试

=代码测试=

1.创建topic

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第1张

2.创建生产者

/** * 生产者 * * @author byChen * @date 2022/8/2 */ public class Producer { 
    public static void main(String[] args) throws Exception { 
    //1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body List<Message> messageList = new ArrayList<>(); Message msgA = new Message("chenchong", "这是chenchong的消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgA); Message msgB = new Message("chenchong", "这是chenchong的消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgB); //4.2 可以选择指定tag,key来进行细分message Message msgC = new Message("chenchong", "tag-a", "这是chenchong的消息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("chenchong", "tag-a", "key1", "这是chenchong的消息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送 for (Message message : messageList) { 
    SendResult send = producer.send(message); System.out.println("消息发送成功:id:" + send.getMsgId() + ", msgId:" + send.getMsgId() + ", offsetMsgId:" + send.getOffsetMsgId() + ", result:" + send.getSendStatus() ); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } } 

3.创建消费者

/** * 消费者 * * @author byChen * @date 2022/8/2 */ public class Consumer { 
    public static void main(String[] args) throws MQClientException { 
    //1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("chenchong", ""); //4.指定消费者拉取消息方式,这里设置从头开始(也就是历史数据也拉取) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //5.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() { 
    @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
    MessageExt messageExt = msgs.get(0); try { 
    System.out.println("-收到消息:id-" + messageExt.getMsgId() + "," + new String(messageExt.getBody(), "UTF-8") + "," + "keys: " + messageExt.getKeys() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) { 
    e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //6.启动消费者 consumer.start(); System.out.println(consumer.getClientIP()); System.out.println(consumer.getNamesrvAddr()); } } 

4.运行测试

生产者结果:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第2张
字段解释:

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第3张

消费者结果:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第4张

=页面测试=

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第5张

①写入消息内容:

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第6张

②发送,并获取返回结果:

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第7张
页面发送消息,控制台会乱码,需要另行配置

③在页面查看历史消息记录

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第8张
内容:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第9张

3.集群搭建

windows本机集群搭建

1.数据复制与刷盘策略

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第10张
复制:是指从broker在主broker上面复制下来数据,是纵向的
刷盘:是指将数据从内存保存到磁盘上,是横向的

复制策略

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第11张
同步数据成功:指的就是数据被刷盘成功

刷盘策略

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第12张

2.broker集群模式

① 单master

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第13张

② 多master

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第14张

③ 多master+多selve模式 - 异步复制

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第15张

④ 多master+多selve模式 - 同步双写

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第16张

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第17张
-本节完-

第二章. 基础 api 使用

4.生产者-发送各种消息

=同步异步=

1.同步消息

消息发送成功后,才返回结果

 /** * 生产者-发送同步消息 * * @author byChen * @date 2022/8/2 */ public class Producer { 
    public static void main(String[] args) throws Exception { 
    //1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body List<Message> messageList = new ArrayList<>(); Message msgA = new Message("chenchong", "这是chenchong的消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgA); Message msgB = new Message("chenchong", "这是chenchong的消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgB); //4.2 可以选择指定tag,key来进行细分message Message msgC = new Message("chenchong", "tag-a", "这是chenchong的消息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("chenchong", "tag-a", "key1", "这是chenchong的消息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送 for (Message message : messageList) { 
    SendResult send = producer.send(message); System.out.println("消息发送成功:id:" + send.getMsgId() + " result:" + send.getSendStatus()); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } } 

2.异步消息

消息发送后,不等待MQ返回结果,就直接返回客户端。后续通过回调来获取MQ的给的结果

/** * 生产者-发送异步消息 * * @author byChen * @date 2022/8/2 */ public class ProducerAsync { 
    public static void main(String[] args) throws Exception { 
    //1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body,可以选择指定tag,key来进行细分message List<Message> messageList = new ArrayList<>(); Message msgC = new Message("chenchong", "tag-a", "异步消息".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("chenchong", "tag-a", "key1", "异步消息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送异步消息,流程跟同步大致一样,在发送这里有修改 for (Message message : messageList) { 
    //因为发送异步,因此这里不能直接去获取返回值,,应该使用回调函数来接收回调信息 producer.send(message, new SendCallback() { 
    /** * 回调成功的方法 * @param sendResult */ @Override public void onSuccess(SendResult sendResult) { 
    System.out.println("成功:" + sendResult); } /** * 回调失败的函数 * @param throwable */ @Override public void onException(Throwable throwable) { 
    System.out.println("失败:"+throwable); } }); } //6.不再发送消息就关闭掉生产者,因为这里需要等待回调,所以不能关闭 // producer.shutdown(); } } 

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第18张

3.单向消息

用在不需要关心发送结果的场景,例如发送日志信息;因此单向消息没有返回值信息

/** * 生产者-发送单向消息 * * @author byChen * @date 2022/8/2 */ public class ProducerOneWay { 
    public static void main(String[] args) throws Exception { 
    //1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body 可以选择指定tag,key来进行细分message List<Message> messageList = new ArrayList<>(); Message msgC = new Message("chenchong", "tag-a", "这是单向消息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("chenchong", "tag-a", "key2", "这是单向消息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送 for (Message message : messageList) { 
    //使用单向消息发送方法,该方法没有返回值 producer.sendOneway(message); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } } 

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第19张

=发送模式=

1 顺序消息

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第20张

生产者

 /** * 生产者-发送顺序消息 * <p>需要将统一业务放在同一个队列中,消费者也从同一个队列中获取 * 即可完成顺序消费</p> * * @author byChen * @date 2022/8/2 */ public class ProducerByOrder { 
    public static void main(String[] args) throws Exception { 
    //1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 List<OrderInfo> order = createOrder(); //5.遍历发送 for (int i = 0; i < order.size(); i++) { 
    OrderInfo orderInfo = order.get(i); //创建消息体 Message message = new Message("orderTopic", "order", "i:" + i, orderInfo.toString().getBytes()); /** * @param message 发送的消息体 * @param MessageQueueSelector 消息队列的选择器 * @param orderId 选择队列的业务标识 */ SendResult sendResult = producer.send(message, new MessageQueueSelector() { 
    /** * @param mqs 该主题下的所有队列 * @param msg 发送的消息实体 * @param arg 选择队列的业务标识,对应orderId * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { 
    int orderId = (int) arg; //利用业务id来取模,保证相同的业务id,得到的相同的队列 long index = orderId % mqs.size(); MessageQueue messageQueue = mqs.get((int) index); return messageQueue; } }, orderInfo.getOrderId()); System.out.println("发送消息结果:" + sendResult+" 消息发送到的队列:"+sendResult.getMessageQueue().getQueueId()); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } /** * 创建业务数据而已 * * @return */ public static List<OrderInfo> createOrder() { 
    List<OrderInfo> orderInfoList = new ArrayList<>(); OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(1); orderInfo.setDesc("订单 1 创建"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(1); orderInfo.setDesc("订单 1 付款"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(1); orderInfo.setDesc("订单 1 推送"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(1); orderInfo.setDesc("订单 1 完成"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(2); orderInfo.setDesc("订单 2 创建"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(2); orderInfo.setDesc("订单 2 付款"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(2); orderInfo.setDesc("订单 3 创建"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(2); orderInfo.setDesc("订单 3 付款"); orderInfoList.add(orderInfo); return orderInfoList; } } 

消费者:

/** * 消费者 -顺序消费消息 * * @author byChen * @date 2022/8/2 */ public class ConsumerByOrder { 
    public static void main(String[] args) throws MQClientException { 
    //1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("orderTopic", ""); //4.进行订阅主题,对主题进行监听,内部实现类,保证一个线程只处理一个队列 consumer.registerMessageListener(new MessageListenerOrderly() { 
    /** * * @param msgs 全部的消息 * @param context * @return */ @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { 
    for (MessageExt msg : msgs) { 
    System.out.println("消费消息:" + new String(msg.getBody())+"队列id:"+msg.getQueueId()); } return ConsumeOrderlyStatus.SUCCESS; } }); //6.启动消费者 consumer.start(); } } 

2.延迟消息

生产者发送到MQ之后,不是立即被消费者消费,而是延迟一段时间才被消费

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第21张
生产者:

 /** * 生产者-发送延迟消息 * * @author byChen * @date 2022/8/2 */ public class ProducerDelay { 
    public static void main(String[] args) throws Exception { 
    //1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body List<Message> messageList = new ArrayList<>(); Message msgA = new Message("delayTopic", "这是延迟消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); //给消息设定延迟时间,延迟时间是固定的,有几个级别 //:等级 1~18 //:时间 1s 2s ~~~2h msgA.setDelayTimeLevel(2); messageList.add(msgA); //5.发送 for (Message message : messageList) { 
    SendResult send = producer.send(message); System.out.println("消息发送成功:id:" + send.getMsgId() + " result:" + send.getSendStatus()); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } } 

消费者:

/** * 消费者 消费延迟消息 * * @author byChen * @date 2022/8/2 */ public class ConsumerDelay { 
    public static void main(String[] args) throws MQClientException { 
    //1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("delayTopic", ""); //4.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() { 
    @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
    MessageExt messageExt = msgs.get(0); System.out.println("消息id:" + messageExt.getMsgId() + ",延迟时间:" + (System.currentTimeMillis() - messageExt.getStoreTimestamp())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("==消费者启动=="); } } 

结果:
不启用延迟,延迟只有网络波动的延迟
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第22张
启用延迟消息,延迟就是设置的延迟时间
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第23张

3.批量消息

批量消息,就是将原本遍历发送的多条消息,一下子发送过去

生产者:

 /** * 生产者-发送批量消息 * * @author byChen * @date 2022/8/2 */ public class ProducerBatch { 
    public static void main(String[] args) throws Exception { 
    //1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body List<Message> messageList = new ArrayList<>(); Message msgA = new Message("batchTopic", "批量消息1".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgA); Message msgB = new Message("batchTopic", "批量消息2".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgB); //4.2 可以选择指定tag,key来进行细分message Message msgC = new Message("batchTopic", "tag-a", "批量消息3".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("batchTopic", "tag-a", "key1", "批量消息4".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送批量消息,直接将集合放进去 SendResult send1 = producer.send(messageList); System.out.println("发送结果:" + send1); //6.不再发送消息就关闭掉生产者 producer.shutdown(); } } 

消费者:

 /** * 消费者 - 批量消费(跟正常一样) * * @author byChen * @date 2022/8/2 */ public class ConsumerBatch { 
    public static void main(String[] args) throws MQClientException { 
    //1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("batchTopic", ""); //4.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() { 
    @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
    MessageExt messageExt = msgs.get(0); try { 
    System.out.println("-收到消息 消息id:" + messageExt.getMsgId() + ",内容" + new String(messageExt.getBody(), "UTF-8") + ",keys: " + messageExt.getKeys() + ",tags: " + messageExt.getTags() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) { 
    e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("==消费者启动=="); } } 

结果:批量发送的消息,在消费者方也是同样的一条一条的获取
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第24张

=消息过滤=

tag 过滤

生产者生产的时候指定tag,,只有订阅了该tag的消费者才能消费到
生产者:

 /** * 生产者-发送同步消息 * * @author byChen * @date 2022/8/2 */ public class ProducerTag { 
    public static void main(String[] args) throws Exception { 
    //1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body List<Message> messageList = new ArrayList<>(); //4.2 可以选择指定tag,key来进行细分message Message msgC = new Message("tagFilterTopic", "tag-a", "这是tag过滤的消息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("tagFilterTopic", "tag-b", "这是tag过滤的消息,指定了tag-b".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送 for (Message message : messageList) { 
    SendResult send = producer.send(message); System.out.println("消息发送成功:id:" + send.getMsgId() + " result:" + send.getSendStatus()); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } } 

消费者:

 /** * 消费者 * * @author byChen * @date 2022/8/2 */ public class ConsumerTag { 
    public static void main(String[] args) throws MQClientException { 
    //1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定或者写 * 就是全部tags,监听多个就用 || 连接 consumer.subscribe("tagFilterTopic", "tag-a"); // consumer.subscribe("tagFilterTopic", "tag-a || tag-b"); //5.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() { 
    @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
    MessageExt messageExt = msgs.get(0); try { 
    System.out.println("-收到消息 消息id:" + messageExt.getMsgId() + ",内容" + new String(messageExt.getBody(), "UTF-8") + ",keys: " + messageExt.getKeys() + ",tags: " + messageExt.getTags() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) { 
    e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //6.启动消费者 consumer.start(); System.out.println(consumer.getClientIP()); System.out.println(consumer.getNamesrvAddr()); } } 

结果:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第25张

sql过滤

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第26张
生产者:

 /** * 生产者-通过sql语句过滤消息 * * @author byChen * @date 2022/8/2 */ public class ProducerSql { 
    public static void main(String[] args) throws Exception { 
    //1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 for (int i = 0; i < 10; i++) { 
    Message message = new Message("sqlFilterTopic", "tag", ("这是第 " + i + "条消息").getBytes()); //给消息添加上自己定义的条件,以供消费者过滤 message.putUserProperty("i", String.valueOf(i)); SendResult send = producer.send(message); System.out.println("消息发送成功:" + send); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } } 

消费者:

 /** * 消费者 * * @author byChen * @date 2022/8/2 */ public class ConsumerSql { 
    public static void main(String[] args) throws MQClientException { 
    //1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.这里不指定监听的主题,而是通过sql来进行过滤 consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("i>5")); //5.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() { 
    @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
    MessageExt messageExt = msgs.get(0); try { 
    System.out.println("-收到消息 消息id:" + messageExt.getMsgId() + ",内容" + new String(messageExt.getBody(), "UTF-8") + ",keys: " + messageExt.getKeys() + ",tags: " + messageExt.getTags() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) { 
    e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //6.启动消费者 consumer.start(); System.out.println(consumer.getClientIP()); System.out.println(consumer.getNamesrvAddr()); } } 

=事务=

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第27张
生产者:

 /** * 生产者-事务 * <p>生产者发送消息到MQ,MQ会回调监听器的executeLocalTransaction方法,执行本地事务</p> * <p>MQ调用executeLocalTransaction方法后,会接收到对应的 提交 回滚 命令,进行相应的操作。成功就提交消息给消费者消费,失败则回滚删除消息</p> * <p>如果方法没有返回给MQ准确的响应,那么MQ会调用checkLocalTransaction方法进行消息事务状态的回查</p> * * @author byChen * @date 2022/8/2 */ public class Producer { 
    public static void main(String[] args) throws Exception { 
    //1.创建一个生产者,指定消费者组名 TransactionMQProducer producer = new TransactionMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.设置事务监听器,处理本地事务+提供给mq来进行回查(事务长时间未提交才会回查) producer.setTransactionListener(new TransactionListener() { 
    /** * MQ收到消息后,在该方法执行本地事务 * @param msg * @param arg * @return */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { 
    if ("tag1".equals(msg.getTags())) { 
    //响应正常提交 return LocalTransactionState.COMMIT_MESSAGE; } else if ("tag2".equals(msg.getTags())) { 
    //响应回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } else if ("tag3".equals(msg.getTags())) { 
    //不做准确的响应 return LocalTransactionState.UNKNOW; } return LocalTransactionState.UNKNOW; } /** * 该方法提供给MQ进行消息事务状态的回查 * @param msg * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { 
    System.out.println("消息标签:" + msg.getTags() + "进行回查"); return LocalTransactionState.COMMIT_MESSAGE; } }); //3.启动生产者 producer.start(); String[] tags = { 
   "tag1", "tag2", "tag3"}; System.out.println("===启动了生产者producer==="); for (int i = 0; i < 3; i++) { 
    Message message = new Message("transtionTopic", tags[i], ("事务消息+" + i).getBytes()); //第二个消息是:指定事务是针对那个消息的,这里针对所有的消息,所有设定为null TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null); System.out.println("消息发送结果:" + transactionSendResult); } //6.不再发送消息就关闭掉生产者,因为MQ会回调监听,因此这里不能关闭生产者 // producer.shutdown(); } } 

消费者:

 /** * 消费者 * * @author byChen * @date 2022/8/2 */ public class Consumer { 
    public static void main(String[] args) throws MQClientException { 
    //1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("transtionTopic", "*"); //5.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() { 
    @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
    MessageExt messageExt = msgs.get(0); try { 
    System.out.println("-收到消息 消息id:" + messageExt.getMsgId() + ",内容" + new String(messageExt.getBody(), "UTF-8") + ",tags: " + messageExt.getTags() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) { 
    e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //6.启动消费者 consumer.start(); System.out.println(consumer.getClientIP()); System.out.println(consumer.getNamesrvAddr()); } } 

5.消费者-消费模式

1.负载均衡模式

默认的模式,生产者生产的消息被分配到一个组内每个消费者身上

2.广播模式

生产者生产的消息,不论多少条,都会被每个消费者收到

广播模式需要多添加一行代码,来手动设置一下

 /** * 消费者 * * @author byChen * @date 2022/8/2 */ public class Consumer { 
    public static void main(String[] args) throws MQClientException { 
    //1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("chenchong", ""); //4.指定消费者拉取消息方式,这里设置从头开始(也就是历史数据也拉取) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 设置消费模式 负载均衡|广播模式 这里设置为广播模式 (默认就是负载均衡) consumer.setMessageModel(MessageModel.BROADCASTING); //5.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() { 
    @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
    MessageExt messageExt = msgs.get(0); try { 
    System.out.println("-收到消息 消息id:" + messageExt.getMsgId() + ",内容" + new String(messageExt.getBody(), "UTF-8") + ",keys: " + messageExt.getKeys() + ",tags: " + messageExt.getTags() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) { 
    e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //6.启动消费者 consumer.start(); System.out.println(consumer.getClientIP()); System.out.println(consumer.getNamesrvAddr()); } } 

-本节完-

第三章 工作原理

一、消息的生产

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第28张

二、消息的消费

1.消费获取模式

①.拉取式消费 pull

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第29张
拉取模式也是RocketMq的默认方式

②.推送式消费 push

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第30张
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第31张

2.消息消费模式

① 集群模式
②广播模式

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第32张
同一个消费者组内的所有消费者,都会获取到topic的所有消息
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第33张

3.订阅关系的一致性

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第34张
同一个消费者组里面的全部消费者,最好是订阅同一个topic,同一个tag,同数量的topic
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第35张

4.重新分配 Rebanlance

① 定义

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第36张

② 重新分配的限制与危害

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第37张

③ 重新分配的原因与过程

原因无非就两个:消费者订阅的topic的queue数量发生变化 + 消费者组中消费者的数量发生变化

queue数量变化
  1. broker 进行扩容或者缩容
  2. broker 进行升级运维
  3. broker 与 nameServer 之间网络波动
  4. queue 进行扩容或者缩容
消费者数量的变化
  1. 消费者 进行扩容或者缩容
  2. 消费者 升级运维
  3. 消费者与 nameServer 之间网络波动
④重新分配过程

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第38张

5.offset管理

offset 是用来记录每个queue的不同消费者组的消费进度的;
根据消费进度记录存放位置的不同,分为本地模式远程模式

①本地模式

广播消费使用本地模式消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第39张

②远程模式

集群消费使用远程模式
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第40张

二、消息的存储

存储结构

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第41张
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第42张

消息存储的位置:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第43张

消息存储结构
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第44张
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第45张

1.查找消息过程
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第46张
indexFile
它内部也跟consumerQueue类似,存了一些索引一样的东西,不过它存的是偏移量、时间戳、时间区间之类的,算是对consumerQueue的一种补充,提供多一些查找方法。
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第47张

刷盘机制

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第48张
如何配置:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第49张

三、高可用

集群高可用
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第50张
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第51张
消费高可用-自动切换
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第52张
发送高可用-topic的队列分布在不同的broker

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第53张
消息主从复制
因为从broker不能被生产者写入数据,只能由主broker同步数据,那么主从消息是如何同步的?

也同样分为 同步复制异步复制
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第54张
配置:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第55张

主从复制搭配数据刷盘
建议主从复制使用同步,保证数据不丢失
数据刷盘使用异步,因为刷盘一般不会出现问题

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第56张
-本节完-

四、负载均衡

1.生产者负载均衡

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第57张

2.消费者负载均衡

集群模式

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第58张
有两种算法,默认算法为:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第59张
另外有一种轮询分配算法
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第60张
注意:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第61张
一个queue只能被一个消费者组内的一个消费者所消费;
一个组内的消费者会被分配到多个queue 分区去消费;
queue-消费者 只能多对一,不可以多对多;

如果消费者数量多于分区,那么多出来的消费者不可能被分配到分区,只能闲置;
因此应该保证分区数大于消费者数;

广播模式

因为广播模式本来就是将消息广播给所有消费者,因此不存在负载均衡
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第62张

-本节完-

五、消息重试

=消息发送重试机制=

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第63张
消息发送失败重试针对不同,有三种不同的策略

①同步发送失败策略

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第64张
消息发送失败,会再次尝试指定的次数;会优先选择位于其他broker上面的该topic的分区去投递;如果只有一个broker,那么会优先选择该broker上面的其他分区;
因此,顺序消息发送是没有重试的(因为顺序消息是发在同一个分区的);

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第65张
MQ重试的时候,会优先选择没有发生过异常的broker来投递,避免多次发送到以证明发生异常的broker上面;
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第66张

②异步发送失败策略

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第67张
异步发送,发送完MQ已经通知调用者发送成功的通知,因此不能更改broker了,只能重复在一个broker上面重试;如果一直失败那就消息丢失;

③消息刷盘失败策略

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第68张

=消息消费的重试机制=

消息重试是在消息消费失败后进行的,是为了尽可能保证消息一定会被成功消费

1.顺序消息的重试

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第69张
消费者消费顺序消息失败后,如果没有进行处理,就会不断重试;又因为是顺序消费,本条消息后面的消息一直不会被消费,因此出现消息阻塞的情况;
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第70张

2.无序消息的重试

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第71张
因为广播模式,理论上每个消费者都会拿到所有的消息,如果只因为某个消费者未消费某条消息而去重试,不值得,因此广播模式没有重试机制。

tips.消费重试相关原理

消费重试工作原理介绍:

重试队列

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第72张
消费者消费某条消息失败后,MQ将这条消息放到重试队列中,进行重试消费的时候就直接在这个队列中去消费即可。这个队列所在的topic是一个单独的topic,名为:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第73张

不同消费者组在该topic中使用不同的重试队列,因为每个消费者组消费失败的消息不一致,因此每个消费者组分配不同的队列

重试队列不是一开始就存在,而是当发送需要消费重试的时候才出现

重试次数

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第74张
ps:消费重试的等级,正好对应延迟消息延迟的等级,是因为底层他们两个是一样的实现;

配置方式

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第75张
代码示例:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第76张
如果不想重试的话,就在异常里面正常提交成功的回应即可

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第77张
-本节完-

六、死信队列

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第78张

死信特性

死信消息特性

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第79张

死信队列特性

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第80张
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第81张

死信队列针对的是整个消费者组,组内所有消费者消费失败的死信,都会存入该队列,即使是不同的topic的消息,也都会被存入。
私信队列也都属于一个topic,名称为:
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第82张

死信队列中的消息无法再被消费者正常消费,因此需要查看到死信消息,经过处理,再次重新发送

查看死信消息

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第83张

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第84张
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第85张
-本节完-

七、消费幂等

1.消费幂等定义

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第86张
幂等是需要业务代码来辅助完成的,MQ只提供能够实现幂等的唯一性标识的设置api,以此来避免消息被业务重复处理。
消息重复无法避免,但是可以避免业务重复处理消息

2.为什么会出现重复处理消息

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第87张

3.如何处理

① 通用解决思路

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第88张
前两步是进行判断是否已经确定了是重复性操作,是用来避免重复进行唯一性处理的步骤,如果命中前两步,就返回,不进行第三步的业务唯一性处理;如果没有命中,就进行业务代码,来进行唯一性判断;

②业务实现唯一性判断

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第89张

③ 场景举例

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第90张
-本节完-

八、消息的清理

消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第91张
消息被消费后不会立刻删除,而是存在文件中,到达过期时间才进行文件的清理,才会被删除
消息中间件rocketmq_http代理和socket代理 (https://mushiming.com/)  第92张

第四章 集成springBoot

一、基础集成springboot

1.引入依赖

 <!--RocketMq集成springboot,需要配置yml文件--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency> 

2.配置yml

# MQ配置 rocketmq: name-server: 127.0.0.1:9876 producer: group: produceGroup consumer: group: consumerGroup 

3.生产者代码

@RestController @RequestMapping("/rockerMq") @AllArgsConstructor public class DemoController { 
    @Autowired private RocketMQTemplate rocketMQTemplate; /** * 测试springboot集成Mq发送消息 */ @PostMapping("/send") public void add() { 
    System.out.println("=生产者启动="); System.out.println(rocketMQTemplate.getProducer().getClientIP()); rocketMQTemplate.convertAndSend("springTopic", "hello,spring111"); } } 

4.消费者代码

消费者只需要创建一个监听类,然后指定需要订阅监听的topic,指定消费者组即可

@Component @RocketMQMessageListener(topic = "springTopic",consumerGroup = "${rocketmq.consumer.group}") public class ConsumerListener implements RocketMQListener<String> { 
    /** * 消息监听 * @param message */ @Override public void onMessage(String message) { 
    System.out.println("监听到消息:"+message); } } 

参考视频①
参考视频②

THE END

发表回复