一、查看topic
bin/kafka-topics.sh --list --zookeeper node10:2181,node11:2181,node12:2181
二、查看topic状态
bin/kafka-topics.sh --describe --zookeeper node10:2181,node11:2181,node12:2181 --topic TestTopic
三、KAFKA常用配置
1、主题配置
(1)# 新创建的主题包含1个分区num.partitions=1
写入和读取数据的速度是1G/s,一个消费者处理速度50M/s,需要20个分区分别由20个消费者处理速度(吞吐量)1G/s
(2)消息配置
# 消息可以保留168小时=7天
log.retention.hours=168
# 消息字节数超过1G就删除
og.retention.bytes=1073741824
# 5分钟检查一次消息是否过期
log.retention.check.interval.ms=300000
# 单个消息的最大100M
message.max.bytes=104857600
2、broker配置
(1)broker信息配置
broker配置 broker消息配置
broker.id=0
port=9092
zookeeper.connect=node10:2181,node11:2181,node12:2181
# 消息保存的磁盘目录
log.dirs=/tmp/kafka-logs
(2)broker消息形式配置
# 不自动创建topic:生产者写入消息,消费者读取消息,发送元数据请求
auto.create.topics.enable=false
3、集群需要多少个Broker
每个broker可以存储2T数据,如果需要保存10T,则需要5T
4、主题的分区和副本放置策略
(1)broker数
所有broker依次分配主分区,下一个broker分配副本,注意:第一个分区随机放,每个分区副本数不能超过broker个数
(2)broker分配
n个broker,i分区分配到(i % n)broker, 其j副本分配到((i+j) % n)broker
例如:5个broker,0分区到0号broker,3副本到3号broker
5、KAFKA偏移量
auto.offset.reset
(1)设置为earliest
当一个分区被一个消费者组已经提交了offset时,同一消费者组从提交的offset开始消费;无提交的offset时,从头开始消费一个新的消费者组进行消费,从头开始
(2)设置为latest
当一个分区被一个消费者组已经提交了offset时,同一消费者组从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
(3)设置为none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
6、KAFKA分区消费者关系
一个消费者组的消费者消费所有分区,消费者数=分区数(均分),消费者数<分区数(某消费者消费多个分区),消费者数>分区数(某个消费者不消费分区),不同消费者组互不干扰
7、发送消息push
(1)同步方式发送消息
ProducerRecord<String, String> msg = new ProducerRecord<String, String>("TestTopic4", null, "hello world tomas100");
producer.send(msg).get(); // 同步发送消息,死等broker返回结果
producer.close();
(2)异步发送消息
ProducerRecord<String, String> msg = new ProducerRecord<String, String>("TestTopic4", null, "hello world tomas100");
producer.send(msg); // 异步发送消息
producer.send(msg, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
exception.printStackTrace();
} else {
System.out.println("主题:" + metadata.topic() + " 分区:" + metadata.partition() + " 偏移量:" + metadata.offset());
}
}
}); // 异步发送消息,回调函数
producer.close();
8、接受消息pull
Consumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe(Collections.singletonList("TestTopic4"));
try {
while (true) {
ConsumerRecords<String, String> msgs = consumer.poll(5000);// 5000毫秒轮询一次
for (ConsumerRecord<String, String> msg : msgs) {
System.out.println(" topic:" + msg.topic() + " partition:" + msg.partition() + " offset:" + msg.offset() + "key:" + msg.key() + " value:" + msg.value());
}
try {
consumer.commitAsync(); // 提交偏移量
} catch (Exception ex) {
ex.printStackTrace();
}
}
} finally {
consumer.close();
}