安装参考1
安装参考2
如果broker突然好好的启动不了,试试这个帖子:
参考
/** * 生产者 * * @author byChen * @date 2022/8/2 */ public class Producer {
public static void main(String[] args) throws Exception {
//1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body List<Message> messageList = new ArrayList<>(); Message msgA = new Message("chenchong", "这是chenchong的消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgA); Message msgB = new Message("chenchong", "这是chenchong的消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgB); //4.2 可以选择指定tag,key来进行细分message Message msgC = new Message("chenchong", "tag-a", "这是chenchong的消息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("chenchong", "tag-a", "key1", "这是chenchong的消息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送 for (Message message : messageList) {
SendResult send = producer.send(message); System.out.println("消息发送成功:id:" + send.getMsgId() + ", msgId:" + send.getMsgId() + ", offsetMsgId:" + send.getOffsetMsgId() + ", result:" + send.getSendStatus() ); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } }
/** * 消费者 * * @author byChen * @date 2022/8/2 */ public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("chenchong", ""); //4.指定消费者拉取消息方式,这里设置从头开始(也就是历史数据也拉取) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //5.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0); try {
System.out.println("-收到消息:id-" + messageExt.getMsgId() + "," + new String(messageExt.getBody(), "UTF-8") + "," + "keys: " + messageExt.getKeys() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) {
e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //6.启动消费者 consumer.start(); System.out.println(consumer.getClientIP()); System.out.println(consumer.getNamesrvAddr()); } }
生产者结果:
字段解释:
消费者结果:
页面发送消息,控制台会乱码,需要另行配置
内容:
windows本机集群搭建
复制:是指从broker在主broker上面复制下来数据,是纵向的
刷盘:是指将数据从内存保存到磁盘上,是横向的
同步数据成功:指的就是数据被刷盘成功
-本节完-
消息发送成功后,才返回结果
/** * 生产者-发送同步消息 * * @author byChen * @date 2022/8/2 */ public class Producer {
public static void main(String[] args) throws Exception {
//1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body List<Message> messageList = new ArrayList<>(); Message msgA = new Message("chenchong", "这是chenchong的消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgA); Message msgB = new Message("chenchong", "这是chenchong的消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgB); //4.2 可以选择指定tag,key来进行细分message Message msgC = new Message("chenchong", "tag-a", "这是chenchong的消息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("chenchong", "tag-a", "key1", "这是chenchong的消息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送 for (Message message : messageList) {
SendResult send = producer.send(message); System.out.println("消息发送成功:id:" + send.getMsgId() + " result:" + send.getSendStatus()); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } }
消息发送后,不等待MQ返回结果,就直接返回客户端。后续通过回调来获取MQ的给的结果
/** * 生产者-发送异步消息 * * @author byChen * @date 2022/8/2 */ public class ProducerAsync {
public static void main(String[] args) throws Exception {
//1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body,可以选择指定tag,key来进行细分message List<Message> messageList = new ArrayList<>(); Message msgC = new Message("chenchong", "tag-a", "异步消息".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("chenchong", "tag-a", "key1", "异步消息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送异步消息,流程跟同步大致一样,在发送这里有修改 for (Message message : messageList) {
//因为发送异步,因此这里不能直接去获取返回值,,应该使用回调函数来接收回调信息 producer.send(message, new SendCallback() {
/** * 回调成功的方法 * @param sendResult */ @Override public void onSuccess(SendResult sendResult) {
System.out.println("成功:" + sendResult); } /** * 回调失败的函数 * @param throwable */ @Override public void onException(Throwable throwable) {
System.out.println("失败:"+throwable); } }); } //6.不再发送消息就关闭掉生产者,因为这里需要等待回调,所以不能关闭 // producer.shutdown(); } }
用在不需要关心发送结果的场景,例如发送日志信息;因此单向消息没有返回值信息
/** * 生产者-发送单向消息 * * @author byChen * @date 2022/8/2 */ public class ProducerOneWay {
public static void main(String[] args) throws Exception {
//1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body 可以选择指定tag,key来进行细分message List<Message> messageList = new ArrayList<>(); Message msgC = new Message("chenchong", "tag-a", "这是单向消息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("chenchong", "tag-a", "key2", "这是单向消息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送 for (Message message : messageList) {
//使用单向消息发送方法,该方法没有返回值 producer.sendOneway(message); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } }
生产者
/** * 生产者-发送顺序消息 * <p>需要将统一业务放在同一个队列中,消费者也从同一个队列中获取 * 即可完成顺序消费</p> * * @author byChen * @date 2022/8/2 */ public class ProducerByOrder {
public static void main(String[] args) throws Exception {
//1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 List<OrderInfo> order = createOrder(); //5.遍历发送 for (int i = 0; i < order.size(); i++) {
OrderInfo orderInfo = order.get(i); //创建消息体 Message message = new Message("orderTopic", "order", "i:" + i, orderInfo.toString().getBytes()); /** * @param message 发送的消息体 * @param MessageQueueSelector 消息队列的选择器 * @param orderId 选择队列的业务标识 */ SendResult sendResult = producer.send(message, new MessageQueueSelector() {
/** * @param mqs 该主题下的所有队列 * @param msg 发送的消息实体 * @param arg 选择队列的业务标识,对应orderId * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int orderId = (int) arg; //利用业务id来取模,保证相同的业务id,得到的相同的队列 long index = orderId % mqs.size(); MessageQueue messageQueue = mqs.get((int) index); return messageQueue; } }, orderInfo.getOrderId()); System.out.println("发送消息结果:" + sendResult+" 消息发送到的队列:"+sendResult.getMessageQueue().getQueueId()); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } /** * 创建业务数据而已 * * @return */ public static List<OrderInfo> createOrder() {
List<OrderInfo> orderInfoList = new ArrayList<>(); OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(1); orderInfo.setDesc("订单 1 创建"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(1); orderInfo.setDesc("订单 1 付款"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(1); orderInfo.setDesc("订单 1 推送"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(1); orderInfo.setDesc("订单 1 完成"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(2); orderInfo.setDesc("订单 2 创建"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(2); orderInfo.setDesc("订单 2 付款"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(2); orderInfo.setDesc("订单 3 创建"); orderInfoList.add(orderInfo); orderInfo = new OrderInfo(); orderInfo.setOrderId(2); orderInfo.setDesc("订单 3 付款"); orderInfoList.add(orderInfo); return orderInfoList; } }
消费者:
/** * 消费者 -顺序消费消息 * * @author byChen * @date 2022/8/2 */ public class ConsumerByOrder {
public static void main(String[] args) throws MQClientException {
//1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("orderTopic", ""); //4.进行订阅主题,对主题进行监听,内部实现类,保证一个线程只处理一个队列 consumer.registerMessageListener(new MessageListenerOrderly() {
/** * * @param msgs 全部的消息 * @param context * @return */ @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费消息:" + new String(msg.getBody())+"队列id:"+msg.getQueueId()); } return ConsumeOrderlyStatus.SUCCESS; } }); //6.启动消费者 consumer.start(); } }
生产者发送到MQ之后,不是立即被消费者消费,而是延迟一段时间才被消费
生产者:
/** * 生产者-发送延迟消息 * * @author byChen * @date 2022/8/2 */ public class ProducerDelay {
public static void main(String[] args) throws Exception {
//1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body List<Message> messageList = new ArrayList<>(); Message msgA = new Message("delayTopic", "这是延迟消息,没有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); //给消息设定延迟时间,延迟时间是固定的,有几个级别 //:等级 1~18 //:时间 1s 2s ~~~2h msgA.setDelayTimeLevel(2); messageList.add(msgA); //5.发送 for (Message message : messageList) {
SendResult send = producer.send(message); System.out.println("消息发送成功:id:" + send.getMsgId() + " result:" + send.getSendStatus()); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } }
消费者:
/** * 消费者 消费延迟消息 * * @author byChen * @date 2022/8/2 */ public class ConsumerDelay {
public static void main(String[] args) throws MQClientException {
//1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("delayTopic", ""); //4.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0); System.out.println("消息id:" + messageExt.getMsgId() + ",延迟时间:" + (System.currentTimeMillis() - messageExt.getStoreTimestamp())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("==消费者启动=="); } }
结果:
不启用延迟,延迟只有网络波动的延迟
启用延迟消息,延迟就是设置的延迟时间
批量消息,就是将原本遍历发送的多条消息,一下子发送过去
生产者:
/** * 生产者-发送批量消息 * * @author byChen * @date 2022/8/2 */ public class ProducerBatch {
public static void main(String[] args) throws Exception {
//1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body List<Message> messageList = new ArrayList<>(); Message msgA = new Message("batchTopic", "批量消息1".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgA); Message msgB = new Message("batchTopic", "批量消息2".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgB); //4.2 可以选择指定tag,key来进行细分message Message msgC = new Message("batchTopic", "tag-a", "批量消息3".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("batchTopic", "tag-a", "key1", "批量消息4".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送批量消息,直接将集合放进去 SendResult send1 = producer.send(messageList); System.out.println("发送结果:" + send1); //6.不再发送消息就关闭掉生产者 producer.shutdown(); } }
消费者:
/** * 消费者 - 批量消费(跟正常一样) * * @author byChen * @date 2022/8/2 */ public class ConsumerBatch {
public static void main(String[] args) throws MQClientException {
//1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("batchTopic", ""); //4.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0); try {
System.out.println("-收到消息 消息id:" + messageExt.getMsgId() + ",内容" + new String(messageExt.getBody(), "UTF-8") + ",keys: " + messageExt.getKeys() + ",tags: " + messageExt.getTags() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) {
e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("==消费者启动=="); } }
结果:批量发送的消息,在消费者方也是同样的一条一条的获取
生产者生产的时候指定tag,,只有订阅了该tag的消费者才能消费到
生产者:
/** * 生产者-发送同步消息 * * @author byChen * @date 2022/8/2 */ public class ProducerTag {
public static void main(String[] args) throws Exception {
//1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 //4.1 message必须指定topic,和消息体body List<Message> messageList = new ArrayList<>(); //4.2 可以选择指定tag,key来进行细分message Message msgC = new Message("tagFilterTopic", "tag-a", "这是tag过滤的消息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgC); Message msgD = new Message("tagFilterTopic", "tag-b", "这是tag过滤的消息,指定了tag-b".getBytes(RemotingHelper.DEFAULT_CHARSET)); messageList.add(msgD); //5.发送 for (Message message : messageList) {
SendResult send = producer.send(message); System.out.println("消息发送成功:id:" + send.getMsgId() + " result:" + send.getSendStatus()); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } }
消费者:
/** * 消费者 * * @author byChen * @date 2022/8/2 */ public class ConsumerTag {
public static void main(String[] args) throws MQClientException {
//1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定或者写 * 就是全部tags,监听多个就用 || 连接 consumer.subscribe("tagFilterTopic", "tag-a"); // consumer.subscribe("tagFilterTopic", "tag-a || tag-b"); //5.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0); try {
System.out.println("-收到消息 消息id:" + messageExt.getMsgId() + ",内容" + new String(messageExt.getBody(), "UTF-8") + ",keys: " + messageExt.getKeys() + ",tags: " + messageExt.getTags() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) {
e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //6.启动消费者 consumer.start(); System.out.println(consumer.getClientIP()); System.out.println(consumer.getNamesrvAddr()); } }
结果:
生产者:
/** * 生产者-通过sql语句过滤消息 * * @author byChen * @date 2022/8/2 */ public class ProducerSql {
public static void main(String[] args) throws Exception {
//1.创建一个生产者,指定消费者组名 DefaultMQProducer producer = new DefaultMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.启动生产者 producer.start(); System.out.println("===启动了生产者producer==="); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); //4.创建发送消息需要用到的消息实体 集合 for (int i = 0; i < 10; i++) {
Message message = new Message("sqlFilterTopic", "tag", ("这是第 " + i + "条消息").getBytes()); //给消息添加上自己定义的条件,以供消费者过滤 message.putUserProperty("i", String.valueOf(i)); SendResult send = producer.send(message); System.out.println("消息发送成功:" + send); } //6.不再发送消息就关闭掉生产者 producer.shutdown(); } }
消费者:
/** * 消费者 * * @author byChen * @date 2022/8/2 */ public class ConsumerSql {
public static void main(String[] args) throws MQClientException {
//1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.这里不指定监听的主题,而是通过sql来进行过滤 consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("i>5")); //5.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0); try {
System.out.println("-收到消息 消息id:" + messageExt.getMsgId() + ",内容" + new String(messageExt.getBody(), "UTF-8") + ",keys: " + messageExt.getKeys() + ",tags: " + messageExt.getTags() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) {
e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //6.启动消费者 consumer.start(); System.out.println(consumer.getClientIP()); System.out.println(consumer.getNamesrvAddr()); } }
生产者:
/** * 生产者-事务 * <p>生产者发送消息到MQ,MQ会回调监听器的executeLocalTransaction方法,执行本地事务</p> * <p>MQ调用executeLocalTransaction方法后,会接收到对应的 提交 回滚 命令,进行相应的操作。成功就提交消息给消费者消费,失败则回滚删除消息</p> * <p>如果方法没有返回给MQ准确的响应,那么MQ会调用checkLocalTransaction方法进行消息事务状态的回查</p> * * @author byChen * @date 2022/8/2 */ public class Producer {
public static void main(String[] args) throws Exception {
//1.创建一个生产者,指定消费者组名 TransactionMQProducer producer = new TransactionMQProducer("produceGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 producer.setNamesrvAddr("127.0.0.1:9876"); //3.设置事务监听器,处理本地事务+提供给mq来进行回查(事务长时间未提交才会回查) producer.setTransactionListener(new TransactionListener() {
/** * MQ收到消息后,在该方法执行本地事务 * @param msg * @param arg * @return */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
if ("tag1".equals(msg.getTags())) {
//响应正常提交 return LocalTransactionState.COMMIT_MESSAGE; } else if ("tag2".equals(msg.getTags())) {
//响应回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } else if ("tag3".equals(msg.getTags())) {
//不做准确的响应 return LocalTransactionState.UNKNOW; } return LocalTransactionState.UNKNOW; } /** * 该方法提供给MQ进行消息事务状态的回查 * @param msg * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("消息标签:" + msg.getTags() + "进行回查"); return LocalTransactionState.COMMIT_MESSAGE; } }); //3.启动生产者 producer.start(); String[] tags = {
"tag1", "tag2", "tag3"}; System.out.println("===启动了生产者producer==="); for (int i = 0; i < 3; i++) {
Message message = new Message("transtionTopic", tags[i], ("事务消息+" + i).getBytes()); //第二个消息是:指定事务是针对那个消息的,这里针对所有的消息,所有设定为null TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null); System.out.println("消息发送结果:" + transactionSendResult); } //6.不再发送消息就关闭掉生产者,因为MQ会回调监听,因此这里不能关闭生产者 // producer.shutdown(); } }
消费者:
/** * 消费者 * * @author byChen * @date 2022/8/2 */ public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("transtionTopic", "*"); //5.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0); try {
System.out.println("-收到消息 消息id:" + messageExt.getMsgId() + ",内容" + new String(messageExt.getBody(), "UTF-8") + ",tags: " + messageExt.getTags() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) {
e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //6.启动消费者 consumer.start(); System.out.println(consumer.getClientIP()); System.out.println(consumer.getNamesrvAddr()); } }
默认的模式,生产者生产的消息被分配到一个组内每个消费者身上
生产者生产的消息,不论多少条,都会被每个消费者收到
广播模式需要多添加一行代码,来手动设置一下
/** * 消费者 * * @author byChen * @date 2022/8/2 */ public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建一个消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupOne"); //2.指定消费者消费的 nameServer (相当于kafka的zookeeper)地址,可以集群,多个地址用 :分开 consumer.setNamesrvAddr("127.0.0.1:9876"); //3.指定监听的主题,也可以指定监听主题的那些tags ,不指定就是全部tags consumer.subscribe("chenchong", ""); //4.指定消费者拉取消息方式,这里设置从头开始(也就是历史数据也拉取) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 设置消费模式 负载均衡|广播模式 这里设置为广播模式 (默认就是负载均衡) consumer.setMessageModel(MessageModel.BROADCASTING); //5.进行订阅主题,对主题进行监听 consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0); try {
System.out.println("-收到消息 消息id:" + messageExt.getMsgId() + ",内容" + new String(messageExt.getBody(), "UTF-8") + ",keys: " + messageExt.getKeys() + ",tags: " + messageExt.getTags() ); //System.out.println("msg全部信息:"+ msg.toString()); } catch (UnsupportedEncodingException e) {
e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //6.启动消费者 consumer.start(); System.out.println(consumer.getClientIP()); System.out.println(consumer.getNamesrvAddr()); } }
-本节完-
拉取模式也是RocketMq的默认方式
同一个消费者组内的所有消费者,都会获取到topic的所有消息
同一个消费者组里面的全部消费者,最好是订阅同一个topic,同一个tag,同数量的topic
原因无非就两个:消费者订阅的topic的queue数量发生变化 + 消费者组中消费者的数量发生变化
offset 是用来记录每个queue的不同消费者组的消费进度的;
根据消费进度记录存放位置的不同,分为本地模式和远程模式
广播消费使用本地模式
集群消费使用远程模式
消息存储的位置:
消息存储结构
1.查找消息过程
indexFile
它内部也跟consumerQueue类似,存了一些索引一样的东西,不过它存的是偏移量、时间戳、时间区间之类的,算是对consumerQueue的一种补充,提供多一些查找方法。
如何配置:
集群高可用
消费高可用-自动切换
发送高可用-topic的队列分布在不同的broker
消息主从复制
因为从broker不能被生产者写入数据,只能由主broker同步数据,那么主从消息是如何同步的?
也同样分为 同步复制和异步复制
配置:
主从复制搭配数据刷盘
建议主从复制使用同步,保证数据不丢失
数据刷盘使用异步,因为刷盘一般不会出现问题
-本节完-
有两种算法,默认算法为:
另外有一种轮询分配算法
注意:
一个queue只能被一个消费者组内的一个消费者所消费;
一个组内的消费者会被分配到多个queue 分区去消费;
queue-消费者 只能多对一,不可以多对多;
如果消费者数量多于分区,那么多出来的消费者不可能被分配到分区,只能闲置;
因此应该保证分区数大于消费者数;
因为广播模式本来就是将消息广播给所有消费者,因此不存在负载均衡
-本节完-
消息发送失败重试针对不同,有三种不同的策略
消息发送失败,会再次尝试指定的次数;会优先选择位于其他broker上面的该topic的分区去投递;如果只有一个broker,那么会优先选择该broker上面的其他分区;
因此,顺序消息发送是没有重试的(因为顺序消息是发在同一个分区的);
MQ重试的时候,会优先选择没有发生过异常的broker来投递,避免多次发送到以证明发生异常的broker上面;
异步发送,发送完MQ已经通知调用者发送成功的通知,因此不能更改broker了,只能重复在一个broker上面重试;如果一直失败那就消息丢失;
消息重试是在消息消费失败后进行的,是为了尽可能保证消息一定会被成功消费
消费者消费顺序消息失败后,如果没有进行处理,就会不断重试;又因为是顺序消费,本条消息后面的消息一直不会被消费,因此出现消息阻塞的情况;
因为广播模式,理论上每个消费者都会拿到所有的消息,如果只因为某个消费者未消费某条消息而去重试,不值得,因此广播模式没有重试机制。
消费重试工作原理介绍:
消费者消费某条消息失败后,MQ将这条消息放到重试队列中,进行重试消费的时候就直接在这个队列中去消费即可。这个队列所在的topic是一个单独的topic,名为:
不同消费者组在该topic中使用不同的重试队列,因为每个消费者组消费失败的消息不一致,因此每个消费者组分配不同的队列
重试队列不是一开始就存在,而是当发送需要消费重试的时候才出现
ps:消费重试的等级,正好对应延迟消息延迟的等级,是因为底层他们两个是一样的实现;
代码示例:
如果不想重试的话,就在异常里面正常提交成功的回应即可
-本节完-
死信队列针对的是整个消费者组,组内所有消费者消费失败的死信,都会存入该队列,即使是不同的topic的消息,也都会被存入。
私信队列也都属于一个topic,名称为:
死信队列中的消息无法再被消费者正常消费,因此需要查看到死信消息,经过处理,再次重新发送
-本节完-
幂等是需要业务代码来辅助完成的,MQ只提供能够实现幂等的唯一性标识的设置api,以此来避免消息被业务重复处理。
消息重复无法避免,但是可以避免业务重复处理消息
前两步是进行判断是否已经确定了是重复性操作,是用来避免重复进行唯一性处理的步骤,如果命中前两步,就返回,不进行第三步的业务唯一性处理;如果没有命中,就进行业务代码,来进行唯一性判断;
-本节完-
消息被消费后不会立刻删除,而是存在文件中,到达过期时间才进行文件的清理,才会被删除
<!--RocketMq集成springboot,需要配置yml文件--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
# MQ配置 rocketmq: name-server: 127.0.0.1:9876 producer: group: produceGroup consumer: group: consumerGroup
@RestController @RequestMapping("/rockerMq") @AllArgsConstructor public class DemoController {
@Autowired private RocketMQTemplate rocketMQTemplate; /** * 测试springboot集成Mq发送消息 */ @PostMapping("/send") public void add() {
System.out.println("=生产者启动="); System.out.println(rocketMQTemplate.getProducer().getClientIP()); rocketMQTemplate.convertAndSend("springTopic", "hello,spring111"); } }
消费者只需要创建一个监听类,然后指定需要订阅监听的topic,指定消费者组即可
@Component @RocketMQMessageListener(topic = "springTopic",consumerGroup = "${rocketmq.consumer.group}") public class ConsumerListener implements RocketMQListener<String> {
/** * 消息监听 * @param message */ @Override public void onMessage(String message) {
System.out.println("监听到消息:"+message); } }
参考视频①
参考视频②