通过rocketmq-spring-boot-starte
r可以快速的搭建RocketMQ
生产者和消费者服务。
pom.xml
引入组件rocketmq-spring-boot-starter
依赖
1 2 3 4 5 6
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
|
- 修改
application.yml
,添加RocketMQ
相关配置
1 2 3 4 5
| rocketmq: name-server: 192.168.2.100:9876 producer: group: test-group
|
- 发送消息与消费消息
使用RocketMQTemplate
实现消息的发送;
使用实现RocketMQListener
接口,并添加@RocketMQMessageListener
注解,声明消费主题,消费者分组等,且默认消费模式是集群消费。
1. 普通消息
发送消息测试接口:http://localhost:8080/send/common
1 2 3 4 5 6 7 8 9 10 11
| @RestController @RequiredArgsConstructor public class RocketMqController {
private final RocketMQTemplate rocketMQTemplate;
@GetMapping("send/common") public Object sendCommon() { return rocketMQTemplate.syncSend("common_topic", "普通消息"); } }
|
普通消息监听消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Slf4j @Component @RocketMQMessageListener( topic = "common_topic", consumerGroup = "test_group") public class CommonListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("{}收到消息:{}", this.getClass().getSimpleName(), message); } }
|
2. 带Tag的消息
发送消息测试接口:http://localhost:8080/send/tag
1 2 3 4
| @GetMapping("send/tag") public Object sendWithTag() { return rocketMQTemplate.syncSend("tag_topic"+ ":" + "tag", "tag消息,tag:tag"); }
|
监听消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@Slf4j @Component @RocketMQMessageListener( topic = "tag_topic", selectorType = SelectorType.TAG, selectorExpression = "tag",//指定了tag后,发送的消息如果不带tag,将会消费不到 consumerGroup = "tag_group") public class TagMsgListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("{}收到消息:{}", this.getClass().getSimpleName(), message); } }
|
3. 消费模式为广播消费
发送消息测试接口:http://localhost:8080/send/broadcast
1 2 3 4
| @GetMapping("send/broadcast") public Object sendWithManyTag() { return rocketMQTemplate.syncSend("broadcast_topic", "广播消息"); }
|
监听消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Slf4j @Component @RocketMQMessageListener( topic = "broadcast_topic", messageModel = MessageModel.BROADCASTING,//指定为广播消费 consumerGroup = "broadcast_group") public class BroadcastListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("{}收到消息:{}", this.getClass().getSimpleName(), message); } }
|
4. 顺序发送的消息,随机消费
发送消息测试接口:http://localhost:8080/send/random
1 2 3 4 5 6 7 8 9
| @GetMapping("send/random") public Object sendRandom() { List<SendResult> results = new ArrayList<>(); for (int i = 0; i <= 3; i++) { SendResult sendResult = rocketMQTemplate.syncSend("random_topic", "无序消息" + i); results.add(sendResult); } return results; }
|
监听消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Slf4j @Component @RocketMQMessageListener( topic = "random_topic", consumerGroup = "random_group") public class RandomListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("{}收到消息:{}", this.getClass().getSimpleName(), message); } }
|
5. 顺序消费
发送消息测试接口:http://localhost:8080/send/order
1 2 3 4 5 6 7 8 9
| @GetMapping("send/order") public Object sendOrder() { List<SendResult> results = new ArrayList<>(); for (int i = 0; i <= 3; i++) { SendResult sendResult = rocketMQTemplate.syncSendOrderly("order_topic", "有序消息" + i, "hashkey"); results.add(sendResult); } return results; }
|
监听消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Slf4j @Component @RocketMQMessageListener( topic = "order_topic", consumeMode = ConsumeMode.ORDERLY,//指定为顺序消费 consumerGroup = "order_group") public class OrderListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("{}收到消息:{}", this.getClass().getSimpleName(), message); } }
|
6. 异步消息
producer
向broker
发送消息时指定消息发送成功及发送异常的回调方法,调用API
后立即返回,producer
发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行。
发送消息测试接口:http://localhost:8080/send/async
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @GetMapping("send/async") public Object sendAsync() { rocketMQTemplate.asyncSend("async_topic", "异步消息", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("发送成功:{}", JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { } }); return null; }
|
监听消费
1 2 3 4 5 6 7 8 9 10 11
| @Slf4j @Component @RocketMQMessageListener( topic = "async_topic", consumerGroup = "async_group") public class AsyncListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("{}收到消息:{}", this.getClass().getSimpleName(), message); } }
|
7. 单向发送消息
单向发送消息这种方式主要用在不特别关心发送结果的场景,例如日志发送。
发送消息测试接口:http://localhost:8080/send/oneway
1 2 3 4
| @GetMapping("send/oneway") public void sendOneway() { rocketMQTemplate.sendOneWay("oneway_topic", "单向消息"); }
|
监听消费
1 2 3 4 5 6 7 8 9 10 11
| @Slf4j @Component @RocketMQMessageListener( topic = "oneway_topic", consumerGroup = "oneway_group") public class OnewayListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("{}收到消息:{}", this.getClass().getSimpleName(), message); } }
|
8. 延时消息
发送消息测试接口:http://localhost:8080/send/delay
1 2 3 4 5 6 7 8 9
| @GetMapping("send/delay") public Object sendDelay() { String txt = "延时消息:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Message<String> message = MessageBuilder.withPayload(txt).build(); return rocketMQTemplate.syncSend("delay_topic", message, 2000, 4); }
|
监听消费
1 2 3 4 5 6 7 8 9 10 11
| @Slf4j @Component @RocketMQMessageListener( topic = "delay_topic", consumerGroup = "delay_group") public class DelayListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("{}于{}收到消息:{}", this.getClass().getSimpleName(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), message); } }
|
9. 事务消息(半消息)
- 事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
- 事务消息仅仅只是保证本地事务和MQ消息发送形成整体的 原子性,而投递到MQ服务器后,并无法保证消费者一定能消费成功。
发送消息测试接口:http://localhost:8080/send/tx
1 2 3 4 5 6 7 8 9 10
| @GetMapping("send/tx") public Object sendTransaction() { int i = new Random().nextInt(1000); Map<String, String> txtMap = new HashMap<>(2); txtMap.put("key", "key" + i); txtMap.put("name", "事务消息"); txtMap.put("desc", "事务消息" + i); Message<Map<String, String>> message = MessageBuilder.withPayload(txtMap).setHeader("key", txtMap.get("key")).build(); return rocketMQTemplate.sendMessageInTransaction("tx_topic", message , i); }
|
生产者端需要实现RocketMQLocalTransactionListener
接口,重写执行本地事务的方法和检查本地事务方法;
@RocketMQTransactionListener
注解表明这个一个生产端的消息监听器,需要配置监听的事务消息生产者组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| @Slf4j @Service @RocketMQTransactionListener public class TxProducerListener implements RocketMQLocalTransactionListener {
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { log.info("开始执行本地事务"); RocketMQLocalTransactionState state; try{ Integer i = (Integer) arg; if (i % 2 == 0) { int a = i / 0; } state = RocketMQLocalTransactionState.COMMIT; log.info("本地事务已提交。{}",message.getHeaders().get("key").toString()); }catch (Exception e){ log.info("执行本地事务失败。{}",e); state = RocketMQLocalTransactionState.ROLLBACK; } return state; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { log.info("开始执行回查"); boolean flag = false; if ( flag ) { log.info("回滚半消息"); return RocketMQLocalTransactionState.ROLLBACK; } log.info("提交半消息"); return RocketMQLocalTransactionState.COMMIT; } }
|
监听消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Slf4j @Component @RocketMQMessageListener( topic = "tx_topic", consumerGroup = "tx_group") public class TxConsumerListener implements RocketMQListener<Map<String, String>> { @Override public void onMessage(Map<String, String> message) { log.info("{}收到消息:{}", this.getClass().getSimpleName(), message); } }
|
10. 部分测试日志打印
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| 2021-06-11 17:25:02.861 INFO 13904 --- [MessageThread_3] com.demo.CommonListener : CommonListener收到消息:普通消息 2021-06-11 17:25:10.296 INFO 13904 --- [MessageThread_2] com.demo.TagMsgListener : TagMsgListener收到消息:tag消息,tag:tag 2021-06-11 17:26:05.070 INFO 13904 --- [MessageThread_1] com.demo.BroadcastListener : BroadcastListener收到消息:广播消息 2021-06-11 17:27:19.969 INFO 13904 --- [MessageThread_2] com.demo.RandomListener : RandomListener收到消息:无序消息1 2021-06-11 17:27:19.969 INFO 13904 --- [MessageThread_1] com.demo.RandomListener : RandomListener收到消息:无序消息0 2021-06-11 17:27:19.970 INFO 13904 --- [MessageThread_4] com.demo.RandomListener : RandomListener收到消息:无序消息3 2021-06-11 17:27:19.970 INFO 13904 --- [MessageThread_3] com.demo.RandomListener : RandomListener收到消息:无序消息2 2021-06-11 17:28:15.530 INFO 13904 --- [MessageThread_2] com.demo.OrderListener : OrderListener收到消息:有序消息0 2021-06-11 17:28:15.531 INFO 13904 --- [MessageThread_3] com.demo.OrderListener : OrderListener收到消息:有序消息1 2021-06-11 17:28:15.533 INFO 13904 --- [MessageThread_4] com.demo.OrderListener : OrderListener收到消息:有序消息2 2021-06-11 17:28:15.540 INFO 13904 --- [MessageThread_5] com.demo.OrderListener : OrderListener收到消息:有序消息3 2021-06-11 17:29:24.630 INFO 13904 --- [MessageThread_1] com.demo.AsyncListener : AsyncListener收到消息:异步消息 2021-06-11 17:29:24.644 INFO 13904 --- [ublicExecutor_1] o.example.controller.RocketMqController : 发送成功:{"messageQueue":{"brokerName":"localhost.localdomain","queueId":0,"topic":"async_topic"},"msgId":"7F000001365018B4AAC237405B920066","offsetMsgId":"C0A8026400002A9F000000000004FF02","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true} 2021-06-11 17:30:12.790 INFO 13904 --- [MessageThread_1] com.demo.OnewayListener : OnewayListener收到消息:单向消息 2021-06-11 17:31:06.185 INFO 13904 --- [MessageThread_1] com.demo.DelayListener : DelayListener于2021-06-11 17:31:06收到消息:延时消息:2021-06-11 17:30:36 2021-06-11 17:31:22.216 INFO 13904 --- [nio-8080-exec-5] com.demo.TxProducerListener : 开始执行本地事务 2021-06-11 17:31:22.217 INFO 13904 --- [nio-8080-exec-5] com.demo.TxProducerListener : 本地事务已提交。key5 2021-06-11 17:31:22.233 INFO 13904 --- [MessageThread_3] com.demo.TxConsumerListener : TxConsumerListener收到消息:{name=事务消息, key=key5, desc=事务消息5} 2021-06-11 17:31:27.872 INFO 13904 --- [nio-8080-exec-4] com.demo.TxProducerListener : 开始执行本地事务 2021-06-11 17:31:27.880 INFO 13904 --- [nio-8080-exec-4] com.demo.TxProducerListener : 执行本地事务失败。{} java.lang.ArithmeticException: / by zero at com.demo.TxProducerListener.executeLocalTransaction(TxProducerListener.java:42) ~[classes/:na] ...
|
源码地址:https://gitee.com/chaoo/mq-test.git
原文链接: http://chaooo.github.io/2021/06/11/spring-boot-rocketmq.html
版权声明: 转载请注明出处.