Blage's Coding Blage's Coding
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
  • SpringCloudAlibaba

  • MQ消息队列

    • RabbitMQ基础
      • 1.SpringAMQP
      • 2.Work Queue
      • 3.发布订阅模式
      • 4.消息转换器
    • Kafka基础
    • 高并发MQ
    • 消息队列面试
  • Nginx

  • Elasticsearch

  • Gateway

  • Xxl-job

  • Feign

  • Eureka

  • 中间件
  • MQ消息队列
phan
2023-05-15
目录

RabbitMQ基础

# RabbitMQ基础

卡夫卡吞吐能力较强,适合用于海量数据。RocketMQ双十一

RabbitMQ几个概念:

  • channel:操作MQ的工具,只有通过通道才能控制消息进出
  • exchange:路由消息到队列中,类似于交换机
  • queue:缓存消息
  • virtual host:虚拟主机

# 1.SpringAMQP

普通模型:建立连接connection->创建channel->利用通道channel声明队列->向队列发送消息(消费者定义消费行为)

  • 父工程配置maven文件
  • 发送者和消费者配置springboot.yaml文件,主要是配置MQ的地址
  • 在publisher中直接通过rabbitTemplate.convertAndSend(queueName,message),指定消息队列的名称(String类型),发送消息。注意这里消息队列要预先创建
  • consumer消费者则通过注解监听队列名,获取消息处理业务逻辑
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "Simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者接收的消息"+msg);
    }
}
1
2
3
4
5
6
7

# 2.Work Queue

当发送者发送的消息过多时,需要定义多个监听的消费者来处理消息,否则会出现消息堆积丢弃的情况。

消息预取:当有多条消息时,在不考虑处理能力的情况下,消费者们会先平均分配取出消息,但是处理时因为消费者之间处理能力有别,就会导致处理总时间超出。期望应该是处理的快的消费者取出处理更多的消息。

可以设置application.yml的preFetch的值,控制预取消息的上限

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
1
2
3
4
5

# 3.发布订阅模式

引入exchange交换机允许将同一消息发送给多个消费者

  • FanoutExchange广播群发:在配置类声明exchange和queue实现类以及将它们绑定在一起的binding类。发送者仅发送一个,然后声明两个消费者分别监听两个队列。最终交换机会把消息发送给和他绑定的所有队列。
@Bean
public FanoutExchange fanoutExchange1() {
    return new FanoutExchange("itcast.fanout1");
}
@Bean
public Queue fanoutQueue1() {
    return new Queue("fanout.queue1");
}
@Bean
public Binding fanoutBingding(Queue fanoutQueue1,FanoutExchange fanoutExchange1) {
    return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);
}
1
2
3
4
5
6
7
8
9
10
11
12
  • DirectExchange路由模式:将收到的消息根据路由规则路由到指定的queue队列中。发布者发送消息需要指定消息的RoutingKey。

    定义消费者的队列,交换机,以及交换机发送给当前队列的key。注解声明binding:

@RabbitListener(bindings =@QueueBinding(value=@Queue(name="direct.queue1"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key={"red","blue"}))
 public void listendirectQueue(String msg) throws InterruptedException {
        System.err.println("消费者接收directqueue的消息"+msg);
    }
1
2
3
4
5
6
  • TopicExchange:支持话题key和通配符。
@RabbitListener(bindings =@QueueBinding(value=@Queue(name="topic.queue1"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key="china.#"))
 public void listendirectQueue(String msg) throws InterruptedException {
        System.err.println("消费者接收directqueue的消息"+msg);
    }
1
2
3
4
5
6

# 4.消息转换器

SpringAMQP发布消息时,会基于Jdk完成序列化。发布时推荐使用JSON进行序列化,然后消费者接收也需要更改反序列化器。发送方和接收方必须使用相同的MessageConverter

@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}
1
2
3
4
编辑 (opens new window)
#中间件#消息队列
上次更新: 2023/12/15, 15:49:57
分布式缓存
Kafka基础

← 分布式缓存 Kafka基础→

Theme by Vdoing | Copyright © 2023-2024 blageCoder
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式