Blage's Coding Blage's Coding
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
  • SpringCloudAlibaba

  • MQ消息队列

    • RabbitMQ基础
    • Kafka基础
    • 高并发MQ
      • 1.消息可靠性问题
      • 2.消费者确认机制
      • 3.死信交换机—消息延迟发送
      • 4.消息堆积
      • 5.实战应用
    • 消息队列面试
  • Nginx

  • Elasticsearch

  • Gateway

  • Xxl-job

  • Feign

  • Eureka

  • 中间件
  • MQ消息队列
phan
2023-05-15
目录

高并发MQ

# 高并发MQ

异步调用解决同步调用的问题:

  • 耦合度降低
  • 吞吐量提高(不需要长时间等待占用资源)
  • 故障隔离,避免了级联失败,一个服务挂了不会影响到消息发起者
  • 流量削峰

# 1.消息可靠性问题

消息确认机制:消息没有到达交换机,队列都会发送ack/nack给发送者。

确认机制发送消息时,需要给每个消息设置一个全局唯一id。

  • spring开启异步回调correlated
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
1
2
3
4
5
6
  • 项目启动时设置returncallback(消息到达队列失败)
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
  • 在发送消息中添加confirmcallback(到达交换机失败)
public void testSendMessage2SimpleQueue() throws InterruptedException {
    String message = "hello, spring amqp!";
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    correlationData.getFuture().addCallback(
        result -> {
            if(result.isAck()){
                log.debug("消息发送成功, ID:{}", correlationData.getId());
            }else{
                log.error("没有到达交换机, ID:{}, 原因{}",correlationData.getId(), result.getReason());
            }
        },
        ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 2.消费者确认机制

消费者处理消息后,向队列发送ack回执,队列收到后才会删除该条消息。如果消费者没处理完宕机,队列收到nack会重新发送消息。

消费者acknowledge-mode一共有三种模式:

  • none:MQ在消息投递给消费者后立即删除,不管消费者后面处理业务是否成功。

  • auto:spring监测消费者listener代码是否出现异常,若有则返回nack给MQ,此时MQ会重新投递给消费者(无限循环)。采用AOP实现,环绕增强,根据异常执行不同操作。

    改进策略:可以在spring设置成本地重试,设置重试次数,RepublishMesageRecoverer:可以将失败的消息重新发布到新的交换机中

# 3.死信交换机—消息延迟发送

死信:①过期消息,超时(消息设置的TTL)无人消费②要投递的队列消息堆积满了③被消费者拒绝或者声明失败的消息

如果队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个死信交换机中—>死信队列。(消费者需要监听的是死信队列)

image-20230310111535548

消息超时,实现延迟队列的方案:

  • 消息队列设置TTL
  • 给发送的消息设置TTL

# 4.消息堆积

解决方案:

  • 增加更多消费者
  • 在消费者内开启线程池,加快消息处理速度
  • 扩大队列容积,提高堆积上限。使用惰性队列(接收到消息直接存入磁盘而非内存;消费者需要消费时才将消息从磁盘加载到内存中),配置时指定x-queue-mode属性为lazy。

# 5.实战应用

  • 中奖后异步消息发奖

在异步发奖消费场景中,中奖落库后(user_strategy_export表插入订单数据,但授奖位还为0),生产者通过MQ推送发奖通知,消费者收到后修改授奖状态位。从而实现解耦削峰,用户只关注抽奖结果,而后续其它过程都可以交给MQ异步处理。

数据库表设计上给export订单表设置了MQ消息发送状态,根据生产者发送消息后的回调状态来修改消息发送状态,对于发送失败消息的采用定时任务补偿(这里如果消息状态修改失败也不会有影响,最终只需要判断成功code码),保证了生产者到broker之间的高可用。

消费者消费失败,可以通过offset偏移量机制(只有消费成功才提交)+指定auto-offset-reset+手动提交偏移量(执行ack.acknowledge())诸多机制来保证消费者与broker之间的高可用。其中某些机制可能导致重复消费(earliest),因此需要保证消费幂等性。

  • 报名活动成功后异步扣减活动库存

原本活动库存扣减使用数据库行级锁(乐观锁)处理扣减,但是存在并发问题,如果库存为1时,两个用户同时都查出库存都大于0,那么它们都可以执行更新操作,导致库存为负数。

UPDATE activity SET stock_surplus_count = stock_surplus_count - 1
WHERE activity_id = #{activityId} AND stock_surplus_count > 0
1
2

通过redis活动库存报名完活动后,使用MQ发送消息异步更新数据库活动库存,做数据最终一致性处理。

编辑 (opens new window)
#中间件#消息队列#高并发
上次更新: 2023/12/15, 15:49:57
Kafka基础
消息队列面试

← Kafka基础 消息队列面试→

Theme by Vdoing | Copyright © 2023-2024 blageCoder
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式