【MQ系列】SprigBoot + RabbitMq发送消息基本使用姿势

文章目录
  1. I. 基本使用姿势
    1. 1. 配置
    2. 2. 配置类
    3. 3. 消息发送
    4. 4. 非序列化对象发送异常case
    5. 5. 自定义MessageConverter
    6. 6. Jackson2JsonMessageConverter
    7. 7. 小结
  2. II. 其他
    1. 0. 系列博文&项目源码
    2. 1. 一灰灰Blog

前面两篇博文,分别介绍了RabbitMq的核心知识点,以及整合SpringBoot的demo应用;接下来也该进入正题,看一下SpringBoot的环境下,如何玩转rabbitmq

本篇内容主要为消息发送,包括以下几点

  • RabbitTemplate 发送消息的基本使用姿势
  • 自定义消息基本属性
  • 自定义消息转换器AbstractMessageConverter
  • 发送Object类型消息失败的case

I. 基本使用姿势

1. 配置

我们借助SpringBoot 2.2.1.RELEASE + rabbitmq 3.7.5来完整项目搭建与测试

项目pom.xml如下

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件application.yml内容如下

1
2
3
4
5
6
7
spring:
rabbitmq:
virtual-host: /
username: admin
password: admin
port: 5672
host: 127.0.0.1

2. 配置类

通过前面rabbitmq的知识点学习,我们可以知道发送端的主要逻辑 “将消息发送给exchange,然后根据不同的策略分发给对应的queue”

本篇博文主要讨论的是消息发送,为了后续的实例演示,我们定义一个topic模式的exchange,并绑定一个的queue;(因为对发送端而言,不同的exchange类型,对发送端的使用姿势影响并不大,有影响的是消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MqConstants {

public static final String exchange = "topic.e";

public static final String routing = "r";

public final static String queue = "topic.a";

}

@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.exchange);
}

@Bean
public Queue queue() {
// 创建一个持久化的队列
return new Queue(MqConstants.queue, true);
}

@Bean
public Binding binding(TopicExchange topicExchange, Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing);
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}

3. 消息发送

消息发送,主要借助的是RabbitTemplate#convertAndSend方法来实现,通常情况下,我们直接使用即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
public class BasicPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 一般的用法,推送消息
*
* @param ans
* @return
*/
private String publish2mq1(String ans) {
String msg = "Durable msg = " + ans;
System.out.println("publish: " + msg);
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg;
}
}

上面的核心点就一行rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);

  • 表示将msg发送给指定的exchange,并设置消息的路由键

请注意

通过上面的方式,发送的消息默认是持久化的,当持久化的消息,分发到持久化的队列时,会有消息的落盘操作;

在某些场景下,我们对消息的完整性要求并没有那么严格,反而更在意mq的性能,丢失一些数据也可以接受;这个时候我们可能需要定制一下发送的消息属性(比如将消息设置为非持久化的)

下面提供两种姿势,推荐第二种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 推送一个非持久化的消息,这个消息推送到持久化的队列时,mq重启,这个消息会丢失;上面的持久化消息不会丢失
*
* @param ans
* @return
*/
private String publish2mq2(String ans) {
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties);

rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message);

System.out.println("publish: " + message);
return message.toString();
}


private String publish2mq3(String ans) {
String msg = "Define msg = " + ans;
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("ta", "测试");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return message;
}
});

return msg;
}

注意

  • 在实际的项目开发中,推荐使用MessagePostProcessor来定制消息属性
  • 其次不推荐在每次发送消息时都创建一个MessagePostProcessor对象,请定义一个通用的对象,能复用就复用

4. 非序列化对象发送异常case

通过查看rabbitTemplate#convertAndSend的接口定义,我们知道发送的消息可以是Object类型,那么是不是意味着任何对象,都可以推送给mq呢?

下面是一个测试case

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private String publish2mq4(String ans) {
NonSerDO nonSerDO = new NonSerDO(18, ans);
System.out.println("publish: " + nonSerDO);
rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
return nonSerDO.toString();
}


@Data
public static class NonSerDO {
private Integer age;
private String name;

public NonSerDO(int age, String name) {
this.age = age;
this.name = name;
}
}

当我们调用上面的publish2mq4方法时,并不会是想象中的直接成功,相反抛出一个参数类型异常

为什么会出现这个问题呢?从堆栈分析,我们知道RabbitTemplate默认是利用SimpleMessageConverter来实现封装Message逻辑的,核心代码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 下面代码来自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[]) object;
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
}
else if (object instanceof String) {
try {
bytes = ((String) object).getBytes(this.defaultCharset);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert to Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setContentEncoding(this.defaultCharset);
}
else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
}
catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert to serialized Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
}
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
return new Message(bytes, messageProperties);
}
throw new IllegalArgumentException(getClass().getSimpleName()
+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}

上面逻辑很明确的指出了,只接受byte数组,string字符串,可序列化对象(这里使用的是jdk的序列化方式来实现对象和byte数组之间的互转)

  • 所以我们传递一个非序列化的对象会参数非法的异常

自然而然的,我们会想有没有其他的MessageConverter来友好的支持任何类型的对象

5. 自定义MessageConverter

接下来我们希望通过自定义一个json序列化方式的MessageConverter来解决上面的问题

一个比较简单的实现(利用FastJson来实现序列化/反序列化)

1
2
3
4
5
6
7
8
9
10
11
12
public static class SelfConverter extends AbstractMessageConverter {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
messageProperties.setContentType("application/json");
return new Message(JSON.toJSONBytes(object), messageProperties);
}

@Override
public Object fromMessage(Message message) throws MessageConversionException {
return JSON.parse(message.getBody());
}
}

重新定义一个rabbitTemplate,并设置它的消息转换器为自定义的SelfConverter

1
2
3
4
5
6
@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new SelfConverter());
return rabbitTemplate;
}

然后再次测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class JsonPublisher {
@Autowired
private RabbitTemplate jsonRabbitTemplate;

private String publish1(String ans) {
Map<String, Object> msg = new HashMap<>(8);
msg.put("msg", ans);
msg.put("type", "json");
msg.put("version", 123);
System.out.println("publish: " + msg);
jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg.toString();
}

private String publish2(String ans) {
BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans);
System.out.println("publish: " + nonSerDO);
jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
return nonSerDO.toString();
}
}

mq内接收到的推送消息如下

6. Jackson2JsonMessageConverter

上面虽然实现了Json格式的消息转换,但是比较简陋;而且这么基础通用的功能,按照Spring全家桶的一贯作风,肯定是有现成可用的,没错,这就是Jackson2JsonMessageConverter

所以我们的使用姿势也可以如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//定义RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}


// 测试代码
@Autowired
private RabbitTemplate jacksonRabbitTemplate;
private String publish3(String ans) {
Map<String, Object> msg = new HashMap<>(8);
msg.put("msg", ans);
msg.put("type", "jackson");
msg.put("version", 456);
System.out.println("publish: " + msg);
jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
return msg.toString();
}

下面是通过Jackson序列化消息后的内容,与我们自定义的有一些不同,多了headerscontent_encoding

7. 小结

本篇博文主要的知识点如下

  • 通过RabbitTemplate#convertAndSend来实现消息分发
  • 通过MessagePostProcessor来自定义消息的属性(请注意默认投递的消息时持久化的)
  • 默认的消息封装类为SimpleMessageConverter,只支持分发byte数组,字符串和可序列化的对象;不满足上面三个条件的方法调用会抛异常
  • 我们可以通过实现MessageConverter接口,来定义自己的消息封装类,解决上面的问题

在RabbitMq的知识点博文中,明确提到了,为了确保消息被brocker正确接收,提供了消息确认机制和事务机制两种case,那么如果需要使用这两种方式,消息生产者需要怎么做呢?

限于篇幅,下一篇博文将带来在消息确认机制/事务机制下的发送消息使用姿势

II. 其他

0. 系列博文&项目源码

系列博文

项目源码

1. 一灰灰Blog

尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

一灰灰blog


打赏 如果觉得我的文章对您有帮助,请随意打赏。
分享到