上一篇介绍了RabbitMq借助RabbitTemplate来发送消息的基本使用姿势,我们知道RabbitMq提供了两种机制,来确保发送端的消息被brocke正确接收,本文将主要介绍,在消息确认和事物两种机制的场景下,发送消息的使用姿势
I. 配置
首先创建一个SpringBoot项目,用于后续的演示
依赖配置文件pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath/> </parent>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
<build> <pluginManagement> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </pluginManagement> </build> <repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/libs-snapshot-local</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone-local</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>spring-releases</id> <name>Spring Releases</name> <url>https://repo.spring.io/libs-release-local</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories>
|
在application.yml
配置文件中,添加rabbitmq的相关属性
1 2 3 4 5 6 7
| spring: rabbitmq: virtual-host: / username: admin password: admin port: 5672 host: 127.0.0.1
|
II. 消息确认机制
本节来看一下消息确认机制的使用姿势,首先有必要了解一下什么是消息确认机制
1. 定义
简单来讲就是消息发送之后,需要接收到RabbitMq的正确反馈,然后才能判断消息是否正确发送成功;
一般来说,RabbitMq的业务逻辑包括以下几点
- 生产者将信道设置成Confirm模式,一旦信道进入Confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(以confirm.select为基础从1开始计数)
- 一旦消息被投递到所有匹配的队列之后,Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了
- 如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出
- Broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号(此外Broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理)
2. 基本使用case
从上面的解释,可以知道发送消息端,需要先将信道设置为Confirm模式,RabbitProperties
配置类中,有个属性,正好是用来设置的这个参数的,所以我们可以直接在配置文件application.yml
中,添加下面的配置
1 2 3 4 5 6 7
| spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true
|
上面配置完毕之后,直接使用RabbitTemplate发送消息,表示已经支持Confirm模式了,但实际的使用,会有一点点区别,我们需要接收mq返回的消息,发送失败的回调(以实现重试逻辑等),所以一个典型的发送端代码可以如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| @Service public class AckPublisher implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); }
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("ack send succeed: " + correlationData); } else { System.out.println("ack send failed: " + correlationData + "|" + cause); } }
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ack " + message + " 发送失败"); }
public String publish(String ans) { String msg = "ack msg = " + ans; System.out.println("publish: " + msg);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData); return msg; } }
|
请注意上面的实现,首先需要给RabbitTemplate设置回调,这两个不可或缺
1 2
| rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this);
|
3. 手动配置方式
上面利用的是标准的SpringBoot配置,一般来说是适用于绝大多数的场景的;当不能覆盖的时候,还可以通过手动的方式来定义一个特定的RabbitTemplate(比如一个项目中,只有某一个场景的消息发送需要确认机制,其他的默认即可,所以需要区分RabbitTemplate)
在自动配置类中,可以手动的注册一个RabbitTemplate的bean,来专职消息确认模式的发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private Integer port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost;
@Bean public RabbitTemplate ackRabbitTemplate() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); connectionFactory.setPublisherReturns(true); return new RabbitTemplate(connectionFactory); }
|
至于使用姿势,和前面完全一致,只是将rabbitTemplate
换成ackRabbitTemplate
III. 事务机制
消息确认机制属于异步模式,也就是说一个消息发送完毕之后,不待返回,就可以发送另外一条消息;这里就会有一个问题,publisher先后发送msg1, msg2,但是对RabbitMq而言,接收的顺序可能是msg2, msg1;所以消息的顺序可能会不一致
所以有了更加严格的事务机制,它属于同步模式,发送消息之后,等到接收到确认返回之后,才能发送下一条消息
1. 事务使用方式
首先我们定义一个事务管理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Bean("rabbitTransactionManager") public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); }
@Bean public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); }
|
事务机制的使用姿势,看起来和上面的消息确认差不多,无非是需要添加一个@Transactional
注解罢了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Service public class TransactionPublisher implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate transactionRabbitTemplate;
@PostConstruct public void init() { transactionRabbitTemplate.setChannelTransacted(true); transactionRabbitTemplate.setReturnCallback(this); }
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("事务 " + message + " 发送失败"); }
@Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager") public String publish(String ans) { String msg = "transaction msg = " + ans; System.out.println("publish: " + msg);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); transactionRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData); return msg; } }
|
请注意,核心代码设置信道为事务模式必不可少
1 2
| transactionRabbitTemplate.setChannelTransacted(true);
|
IV. 测试
我们这里主要测试一下事务和消息确认机制的性能对比吧,从定义上来看消息确认机制效率更高,我们简单的对比一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @RestController public class PubRest { @Autowired private AckPublisher ackPublisher; @Autowired private TransactionPublisher transactionPublisher;
private AtomicInteger atomicInteger = new AtomicInteger(1);
@GetMapping(path = "judge") public boolean judge(String name) { for (int i = 0; i < 10; i++) { long start = System.currentTimeMillis(); ackPublisher.publish(name + atomicInteger.getAndIncrement()); ackPublisher.publish(name + atomicInteger.getAndIncrement()); ackPublisher.publish(name + atomicInteger.getAndIncrement()); long mid = System.currentTimeMillis(); System.out.println("ack cost: " + (mid - start));
transactionPublisher.publish(name + atomicInteger.getAndIncrement()); transactionPublisher.publish(name + atomicInteger.getAndIncrement()); transactionPublisher.publish(name + atomicInteger.getAndIncrement()); System.out.println("transaction cost: " + (System.currentTimeMillis() - mid)); } return true; } }
|
去掉无关的输出,仅保留耗时,对比如下(差距还是很明显的)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| ack cost: 5 transaction cost: 111
ack cost: 3 transaction cost: 108
ack cost: 2 transaction cost: 101
ack cost: 3 transaction cost: 107
ack cost: 14 transaction cost: 106
ack cost: 2 transaction cost: 140
ack cost: 4 transaction cost: 124
ack cost: 4 transaction cost: 131
ack cost: 4 transaction cost: 129
ack cost: 2 transaction cost: 99
|
V. 其他
系列博文
项目源码
1. 一灰灰Blog
尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激
下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
一灰灰blog
Be the first person to leave a comment!