Blage's Coding Blage's Coding
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
  • SpringCloudAlibaba

  • MQ消息队列

    • RabbitMQ基础
    • Kafka基础
      • 1.基本指令
      • 2.kafka配置信息
      • 3.生产者消费者注解使用
    • 高并发MQ
    • 消息队列面试
  • Nginx

  • Elasticsearch

  • Gateway

  • Xxl-job

  • Feign

  • Eureka

  • 中间件
  • MQ消息队列
phan
2023-05-15
目录

Kafka基础

# Kafka基础

# 1.基本指令

  • 启动:先启动zk再启动kafka。注意启动zk后要隔几分钟后再启动kafka,否则kafka注册不上zk消息传递失效。-daemon以后台的方式启动
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
1
2
  • 创建topic
bin/kafka-topics.sh --create --zookeeper 192.168.200.200:2181 --replication-factor 1 --partitions 1 --topic lottery_partake
1
  • 查看所有topic名称列表(指定查看zk地址上的topic)
bin/kafka-topics.sh --list --zookeeper localhost:2181
1
  • 查看指定topic的分区信息
bin/kafka-topics.sh --zookeeper localhost:2181 --topic lottery_invoice --describe
1
  • 查看topic的消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic lottery_invoice --from-beginning
1

# 2.kafka配置信息

replication-factor副本因子:每分数据存在几份副本

listeners:监听端口,指定broker启动时本机的监听端口,用于给服务端(kafka)使用,一般为(配置kafka所在的机器+端口)

advertised.listeners:对外发布的访问ip和端口,注册到zookeeper,给客户端(需要调用kafka发送消息的微服务)使用。默认使用listeners配置。

内外网解析:kafka集群和zookeeper可以通过内网进行通讯,而生产者和消费者走公网IP使用kafka服务。实现内外分流,在listeners和advertised.listeners配置INTERNAL,EXTERNA,kafka的listeners的外网可以设置成0.0.0.0:9093端口,表示监听外网所有网卡的9093端口,而生产消费者的advertised.listeners外网ip需要配置公网ip,也就是kafka所在的云服务器公网ip。

一个分区只能被同一个消费者组的一个消费者消费,可以被不同消费者组的消费者消费。因此给消费者指定消费者组可以保证消息不被重复消费。

kafka通过最新保存偏移量进行消息消费,通过ack.acknowledge()来手动提交偏移量。否则消息就会被拒绝而反复消费。

kafka将offset存储在consumer_offsets这个特殊的topic中。

# 3.生产者消费者注解使用

KafkaTemplate<String, Object>:发送消息通过使用框架注入的该Bean对象实现。

@Header:可以获取消息头和信息,包括Topic名称。

ConsumerRecord:通过该对象获取生产者发送的消息(对象)。

生产者发送完消息,可以通过ListenableFuture对象实现回调函数,通过调用future.addCallback执行消息发送成功和失败的处理逻辑。

@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    public static final String TOPIC_INVOICE = "lottery_invoice";

    public ListenableFuture<SendResult<String, Object>> sendLotteryInvoice(InvoiceVO invoiceVO) {
        String invoice = JSON.toJSONString(invoiceVO);
        logger.error("发送MQ消息 topic:{},uid:{},message:{}",TOPIC_INVOICE,invoiceVO.getuId(),invoice);
        return kafkaTemplate.send(TOPIC_INVOICE,invoice);
    }
1
2
3
4
5
6
7
8
9
10
11

消费者在消息处理方法上注解并指明监听的topic和groupId——@KafkaListener,消费方法能接受的类型:ConsumerRecord接收消息体(包含分区信息,消息头),Acknowledgment 用于ack机制回调,@Header获取消息头或者消息体,此处用来获取监听的TopicName。最后消费完消息后通过ack.acknowledge()手动提交偏移。

@Component
public class LotteryInvoiceListener {
    @Autowired
    private DistributionGoodsFactory distributionGoodsFactory;
    @KafkaListener(topics = KafkaProducer.TOPIC_INVOICE, groupId ="lottery")
    public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        Optional<?> message = Optional.ofNullable(record.value());
        if (!message.isPresent()) {
            return;
        }
        try {
            Object msg = message.get();
            InvoiceVO invoiceVO = JSON.parseObject((String) msg, InvoiceVO.class);
            GoodsReq goodsReq=new GoodsReq();
            BeanUtils.copyProperties(invoiceVO,goodsReq);
            DistributionRes distributionRes = distributionGoodsService.doDistribution(goodsReq);
            ack.acknowledge();
        } catch (Exception e) {
            throw e;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
编辑 (opens new window)
#中间件#消息队列
上次更新: 2023/12/15, 15:49:57
RabbitMQ基础
高并发MQ

← RabbitMQ基础 高并发MQ→

Theme by Vdoing | Copyright © 2023-2024 blageCoder
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式