Ezio's Blog
Posts Categories Tags Music Mood About
Ezio's Blog· Light
☰ Menu
Posts Categories Tags Music Mood About
Expand all Back to top Go to bottom

消息丢失

Author: Ezio Date: August 7, 2022  23:07:37 Category: RabbitMQ

MQ消息丢失的情况:

  • 发送时丢失消息(生产者发送消息失败)

    • 生成者发送的消息未送到exchange

    • 消息到达exchange后为到达queue

  • MQ宕机,queue将消息丢失 (队列消息丢失)

  • consumer接受到消息后未消费就宕机 (消费者消费消息失败)

生产者确认机制

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到MQ过程中丢失。消息发送到MQ后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未成功投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列,返回ACK,及路由失败原因。

PS:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。

1.在生产者服务中的application.yml添加配置:

1
2
3
4
5
6
spring:
rabbitmq:
publisher-returns: true
publisher-confirm-type: correlated
template:
mandatory: true

配置说明:

  • publisher-confirm-type:开启publisher-confirm,支持的两种类型:

    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时回调ConfirmCallback
  • publisher-returns:开启publisher-return功能,同样基于callback机制,定义ReturnCallback

  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false,则直接丢弃消息

2.配置ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 配置ReturnCallBack
*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息发生失败,应答码={},原因={},交换机={},路由键={},消息={}",
replyCode, replyText, exchange, routingKey, message.toString());
});
}
}

3.设置ConfirmCallback,指定的消息ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void confirmTest() {
// 交换机名称
String exchangeName = "ezio.topic";
// 消息
String message = "今天天气不错";
//消息确认
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(
result -> {
//判断结果
if (result.isAck()) {
//ack
log.debug("消息成功投递到交换机!消息ID:{}", correlationData.getId());
} else {
//nack
log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
}
},
ex -> log.error("消息发送失败!",ex)
);
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.weather", message, correlationData);
}
}

消息持久化

MQ默认时内存存储消息,开启持久化功能可以确保消息缓存在mq中的消息不丢失。

1.交换机持久化:默认true开启

2.队列持久化:在定义队列时设置

1
2
3
4
5
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1", durable = "true"), //绑定的队列
exchange = @Exchange(name = "test.direct"), //声明交换机名称
key = {"red", "blue"} //路由key,可配置通配符,red.# or red.*
))

3.消息持久化,SpringAMQP中消息默认持久,可以通过MessageProperties中的DeliveryMode指定:

1
2
3
Message message = MessageBuilder.withBody("今天天气不错".getBytes(StandardCharsets.UTF_8)) // 消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.build();

消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到回执后才会删除该消息,SpringAMQP提供三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用API发送ack。
  • auto:自动ack,有spring检测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,消息投递后立即被删除。
1
2
3
4
5
6
7
8
9
10
11
spring:
rabbitmq:
host: 127.0.0.1 # rabbitMQ的ip地址
port: 5672 # 端口
username: ezio
password: 123456
virtual-host: /
listener:
simple:
prefetch: 1 # 预取上限,默认250条,每次取一条,谁消费快则消费得多
acknowledge-mode: auto

消费者失败重试

消费者出现异常后,消息会不断requeue(重新入队)到队列,在重新发送给消费者,然后再次异常,再次requeue,导致mq的消息处理飙升,带来不必要的压力。

可以利用spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spring:
rabbitmq:
host: 127.0.0.1 # rabbitMQ的ip地址
port: 5672 # 端口
username: ezio
password: 123456
virtual-host: /
listener:
simple:
prefetch: 1 # 预取上限,默认250条,每次取一条,谁消费快则消费得多
acknowledge-mode: auto
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000 #初始的失败等待时长1秒
multiplier: 1 #下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 #最大重试次数
stateless: true #true无状态,false有状态,有事务用有状态

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果依然失败,则需要有MessageRecoverer接口来处理,它包含三个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认实现方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

实现方式:定义接受失败消息的交换机,队列及其绑定关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 消费者消息重试,接受消费失败的消息
*/
@Configuration
public class ErrorMessageConfig {

@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.direct", true, false);
}

@Bean
public Queue errorQueue() {
return new Queue("error.queue", true);
}

@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
return BindingBuilder
.bind(errorQueue)
.to(errorExchange).with("error");
}

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

}

Author: Ezio

Permalink: https://ezioy.cn/2022/08/07/%E6%B6%88%E6%81%AF%E4%B8%A2%E5%A4%B1/

License: Copyright (c) 2019 CC-BY-NC-4.0 LICENSE

Slogan: Nothing is true,Everything is permitted

Tag(s): # RabbitMQ
back · home
死信交换机 Spring AMQP
Ezio © 2019 - 2026 | Powered by Hexo & Chic | 访客数量:   浏览次数: | 渝公网安备50011302222043 | 渝ICP备2023013933号-1