RabbitMQ基础
# RabbitMQ基础
卡夫卡吞吐能力较强,适合用于海量数据。RocketMQ双十一
RabbitMQ几个概念:
- channel:操作MQ的工具,只有通过通道才能控制消息进出
- exchange:路由消息到队列中,类似于交换机
- queue:缓存消息
- virtual host:虚拟主机
# 1.SpringAMQP
普通模型:建立连接connection->创建channel->利用通道channel声明队列->向队列发送消息(消费者定义消费行为)
- 父工程配置maven文件
- 发送者和消费者配置springboot.yaml文件,主要是配置MQ的地址
- 在publisher中直接通过rabbitTemplate.convertAndSend(queueName,message),指定消息队列的名称(String类型),发送消息。注意这里消息队列要预先创建
- consumer消费者则通过注解监听队列名,获取消息处理业务逻辑
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "Simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收的消息"+msg);
}
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# 2.Work Queue
当发送者发送的消息过多时,需要定义多个监听的消费者来处理消息,否则会出现消息堆积丢弃的情况。
消息预取:当有多条消息时,在不考虑处理能力的情况下,消费者们会先平均分配取出消息,但是处理时因为消费者之间处理能力有别,就会导致处理总时间超出。期望应该是处理的快的消费者取出处理更多的消息。
可以设置application.yml的preFetch的值,控制预取消息的上限
spring:
rabbitmq:
listener:
simple:
prefetch: 1
1
2
3
4
5
2
3
4
5
# 3.发布订阅模式
引入exchange交换机允许将同一消息发送给多个消费者
- FanoutExchange广播群发:在配置类声明exchange和queue实现类以及将它们绑定在一起的binding类。发送者仅发送一个,然后声明两个消费者分别监听两个队列。最终交换机会把消息发送给和他绑定的所有队列。
@Bean
public FanoutExchange fanoutExchange1() {
return new FanoutExchange("itcast.fanout1");
}
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
@Bean
public Binding fanoutBingding(Queue fanoutQueue1,FanoutExchange fanoutExchange1) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
DirectExchange路由模式:将收到的消息根据路由规则路由到指定的queue队列中。发布者发送消息需要指定消息的RoutingKey。
定义消费者的队列,交换机,以及交换机发送给当前队列的key。注解声明binding:
@RabbitListener(bindings =@QueueBinding(value=@Queue(name="direct.queue1"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
key={"red","blue"}))
public void listendirectQueue(String msg) throws InterruptedException {
System.err.println("消费者接收directqueue的消息"+msg);
}
1
2
3
4
5
6
2
3
4
5
6
- TopicExchange:支持话题key和通配符。
@RabbitListener(bindings =@QueueBinding(value=@Queue(name="topic.queue1"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key="china.#"))
public void listendirectQueue(String msg) throws InterruptedException {
System.err.println("消费者接收directqueue的消息"+msg);
}
1
2
3
4
5
6
2
3
4
5
6
# 4.消息转换器
SpringAMQP发布消息时,会基于Jdk完成序列化。发布时推荐使用JSON进行序列化,然后消费者接收也需要更改反序列化器。发送方和接收方必须使用相同的MessageConverter
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
1
2
3
4
2
3
4
编辑 (opens new window)
上次更新: 2023/12/15, 15:49:57