高并发MQ
# 高并发MQ
异步调用解决同步调用的问题:
- 耦合度降低
- 吞吐量提高(不需要长时间等待占用资源)
- 故障隔离,避免了级联失败,一个服务挂了不会影响到消息发起者
- 流量削峰
# 1.消息可靠性问题
消息确认机制:消息没有到达交换机,队列都会发送ack/nack给发送者。
确认机制发送消息时,需要给每个消息设置一个全局唯一id。
- spring开启异步回调correlated
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
2
3
4
5
6
- 项目启动时设置returncallback(消息到达队列失败)
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
- 在发送消息中添加confirmcallback(到达交换机失败)
public void testSendMessage2SimpleQueue() throws InterruptedException {
String message = "hello, spring amqp!";
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
log.debug("消息发送成功, ID:{}", correlationData.getId());
}else{
log.error("没有到达交换机, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 2.消费者确认机制
消费者处理消息后,向队列发送ack回执,队列收到后才会删除该条消息。如果消费者没处理完宕机,队列收到nack会重新发送消息。
消费者acknowledge-mode一共有三种模式:
none:MQ在消息投递给消费者后立即删除,不管消费者后面处理业务是否成功。
auto:spring监测消费者listener代码是否出现异常,若有则返回nack给MQ,此时MQ会重新投递给消费者(无限循环)。采用AOP实现,环绕增强,根据异常执行不同操作。
改进策略:可以在spring设置成本地重试,设置重试次数,RepublishMesageRecoverer:可以将失败的消息重新发布到新的交换机中
# 3.死信交换机—消息延迟发送
死信:①过期消息,超时(消息设置的TTL)无人消费②要投递的队列消息堆积满了③被消费者拒绝或者声明失败的消息
如果队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个死信交换机中—>死信队列。(消费者需要监听的是死信队列)

消息超时,实现延迟队列的方案:
- 消息队列设置TTL
- 给发送的消息设置TTL
# 4.消息堆积
解决方案:
- 增加更多消费者
- 在消费者内开启线程池,加快消息处理速度
- 扩大队列容积,提高堆积上限。使用惰性队列(接收到消息直接存入磁盘而非内存;消费者需要消费时才将消息从磁盘加载到内存中),配置时指定x-queue-mode属性为lazy。
# 5.实战应用
- 中奖后异步消息发奖
在异步发奖消费场景中,中奖落库后(user_strategy_export表插入订单数据,但授奖位还为0),生产者通过MQ推送发奖通知,消费者收到后修改授奖状态位。从而实现解耦削峰,用户只关注抽奖结果,而后续其它过程都可以交给MQ异步处理。
数据库表设计上给export订单表设置了MQ消息发送状态,根据生产者发送消息后的回调状态来修改消息发送状态,对于发送失败消息的采用定时任务补偿(这里如果消息状态修改失败也不会有影响,最终只需要判断成功code码),保证了生产者到broker之间的高可用。
消费者消费失败,可以通过offset偏移量机制(只有消费成功才提交)+指定auto-offset-reset+手动提交偏移量(执行ack.acknowledge())诸多机制来保证消费者与broker之间的高可用。其中某些机制可能导致重复消费(earliest),因此需要保证消费幂等性。
- 报名活动成功后异步扣减活动库存
原本活动库存扣减使用数据库行级锁(乐观锁)处理扣减,但是存在并发问题,如果库存为1时,两个用户同时都查出库存都大于0,那么它们都可以执行更新操作,导致库存为负数。
UPDATE activity SET stock_surplus_count = stock_surplus_count - 1
WHERE activity_id = #{activityId} AND stock_surplus_count > 0
2
通过redis活动库存报名完活动后,使用MQ发送消息异步更新数据库活动库存,做数据最终一致性处理。