Kafka基础
# Kafka基础
# 1.基本指令
- 启动:先启动zk再启动kafka。注意启动zk后要隔几分钟后再启动kafka,否则kafka注册不上zk消息传递失效。-daemon以后台的方式启动
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
2
- 创建topic
bin/kafka-topics.sh --create --zookeeper 192.168.200.200:2181 --replication-factor 1 --partitions 1 --topic lottery_partake
- 查看所有topic名称列表(指定查看zk地址上的topic)
bin/kafka-topics.sh --list --zookeeper localhost:2181
- 查看指定topic的分区信息
bin/kafka-topics.sh --zookeeper localhost:2181 --topic lottery_invoice --describe
- 查看topic的消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic lottery_invoice --from-beginning
# 2.kafka配置信息
replication-factor副本因子:每分数据存在几份副本
listeners:监听端口,指定broker启动时本机的监听端口,用于给服务端(kafka)使用,一般为(配置kafka所在的机器+端口)
advertised.listeners:对外发布的访问ip和端口,注册到zookeeper,给客户端(需要调用kafka发送消息的微服务)使用。默认使用listeners配置。
内外网解析:kafka集群和zookeeper可以通过内网进行通讯,而生产者和消费者走公网IP使用kafka服务。实现内外分流,在listeners和advertised.listeners配置INTERNAL,EXTERNA,kafka的listeners的外网可以设置成0.0.0.0:9093端口,表示监听外网所有网卡的9093端口,而生产消费者的advertised.listeners外网ip需要配置公网ip,也就是kafka所在的云服务器公网ip。
一个分区只能被同一个消费者组的一个消费者消费,可以被不同消费者组的消费者消费。因此给消费者指定消费者组可以保证消息不被重复消费。
kafka通过最新保存偏移量进行消息消费,通过ack.acknowledge()来手动提交偏移量。否则消息就会被拒绝而反复消费。
kafka将offset存储在consumer_offsets这个特殊的topic中。
# 3.生产者消费者注解使用
KafkaTemplate<String, Object>:发送消息通过使用框架注入的该Bean对象实现。
@Header:可以获取消息头和信息,包括Topic名称。
ConsumerRecord:通过该对象获取生产者发送的消息(对象)。
生产者发送完消息,可以通过ListenableFuture对象实现回调函数,通过调用future.addCallback执行消息发送成功和失败的处理逻辑。
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public static final String TOPIC_INVOICE = "lottery_invoice";
public ListenableFuture<SendResult<String, Object>> sendLotteryInvoice(InvoiceVO invoiceVO) {
String invoice = JSON.toJSONString(invoiceVO);
logger.error("发送MQ消息 topic:{},uid:{},message:{}",TOPIC_INVOICE,invoiceVO.getuId(),invoice);
return kafkaTemplate.send(TOPIC_INVOICE,invoice);
}
2
3
4
5
6
7
8
9
10
11
消费者在消息处理方法上注解并指明监听的topic和groupId——@KafkaListener,消费方法能接受的类型:ConsumerRecord接收消息体(包含分区信息,消息头),Acknowledgment 用于ack机制回调,@Header获取消息头或者消息体,此处用来获取监听的TopicName。最后消费完消息后通过ack.acknowledge()手动提交偏移。
@Component
public class LotteryInvoiceListener {
@Autowired
private DistributionGoodsFactory distributionGoodsFactory;
@KafkaListener(topics = KafkaProducer.TOPIC_INVOICE, groupId ="lottery")
public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional<?> message = Optional.ofNullable(record.value());
if (!message.isPresent()) {
return;
}
try {
Object msg = message.get();
InvoiceVO invoiceVO = JSON.parseObject((String) msg, InvoiceVO.class);
GoodsReq goodsReq=new GoodsReq();
BeanUtils.copyProperties(invoiceVO,goodsReq);
DistributionRes distributionRes = distributionGoodsService.doDistribution(goodsReq);
ack.acknowledge();
} catch (Exception e) {
throw e;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23