一、构建开发环境
File > New > Project
选择一个最简单的模板
项目和坐标命名
配置maven路径
添加maven依赖
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.2.1</version> </dependency> </dependencies>
加载刚刚添加的依赖
此时发现项目还没有包目录,如果遇到这种情况,点击新建目录就会自动提示了
二、创建一个新的topic
kafka-topics --create --topic kafka-study --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
#查看topic详情
kafka-topics --describe --zookeeper cdh1:2181 --topic kafka-study
#查看 topic 指定分区 offset
kafka-run-class kafka.tools.GetOffsetShell --topic kafka-study --time -1 --broker-list cdh1:9092
三、编写生产者
kafka源码中有生产者和消费者的示例,我们简单修改下就直接用了
package org.example.kafkaStudy;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
public class KafkaProducerDemo {
public static void main(String[] args) {
try{
//topic名称
String topicName = "kafka-study";
//broker列表
String bootstrapServers = "cdh1:9092,cdh2:9092,cdh3:9092";
//向topic打多少数据
int numRecords = 10000;
//是否异步推送数据
boolean isAsync = true;
int key = 0;
int sentRecords = 0;
//创建生产者
KafkaProducer<Integer, String> producer = createKafkaProducer(bootstrapServers,-1,null,false);
//判断是否达到生产要求
while (sentRecords < numRecords) {
if (isAsync) {
//异步推送
asyncSend(producer,topicName, key, "test" + key,sentRecords);
} else {
//同步推送
syncSend(producer,topicName, key, "test" + key,sentRecords);
}
key++;
sentRecords++;
}
producer.close();
} catch (Throwable e) {
e.printStackTrace();
}
}
private static RecordMetadata syncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords)
throws ExecutionException, InterruptedException {
try {
// 发送记录,然后调用get,这会阻止等待来自broker的ack
RecordMetadata metadata = producer.send(new ProducerRecord<>(topicName, key, value)).get();
Utils.maybePrintRecord(sentRecords, key, value, metadata);
return metadata;
} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
| OutOfOrderSequenceException | SerializationException e) {
Utils.printErr(e.getMessage());
} catch (KafkaException e) {
Utils.printErr(e.getMessage());
}
return null;
}
private static void asyncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords) {
//异步发送记录,设置一个回调以通知结果。
//请注意,即使使用linger.ms=0设置了一个batch.size 当缓冲区内存已满或元数据不可用时,发送操作仍将被阻止
producer.send(new ProducerRecord<>(topicName, key, value), new ProducerCallback(key, value,sentRecords));
}
private static KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers ,
int transactionTimeoutMs,String transactionalId,boolean enableIdempotency) {
Properties props = new Properties();
// 生产者连接到broker需要引导服务器配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/port
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
// 设置序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
if (transactionTimeoutMs > 0) {
// 事务协调器主动中止正在进行的事务之前的最长时间
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
}
if (transactionalId != null) {
// 事务id必须是静态且唯一的,它用于在流程重启过程中标识相同的生产者实例
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
}
// 在分区级别启用重复保护
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
return new KafkaProducer<>(props);
}
static class ProducerCallback implements Callback {
private final int key;
private final int sentRecords;
private final String value;
public ProducerCallback(int key, String value,int sentRecords) {
this.key = key;
this.sentRecords = sentRecords;
this.value = value;
}
/**
* 用户可以实现一种回调方法,以提供请求完成的异步处理。当发送到服务器的记录得到确认时,将调用此方法。当回调中的异常不为null时,
* 元数据将包含除topicPartition之外的所有字段的特殊-1值,该值将有效。
*
* @param metadata 发送的记录的元数据(即分区和偏移量)。如果发生错误,将返回除topicPartition之外的所有字段的值为-1的空元数据。
* @param exception 处理此记录时引发的异常。如果没有发生错误,则为空。
*/
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
Utils.printErr(exception.getMessage());
if (!(exception instanceof RetriableException)) {
// 我们无法从这些异常中恢复过来
}
} else {
Utils.maybePrintRecord(sentRecords, key, value, metadata);
}
}
}
}
四、编写消费者
package org.example.kafkaStudy;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import static java.util.Collections.singleton;
public class KafkaConsumerDemo {
public static void main(String[] args) {
//topic名称
String topicName = "kafka-study";
//组名称
String groupName = "my-group-1";
//broker列表
String bootstrapServers = "cdh1:9092,cdh2:9092,cdh3:9092";
//向topci打多少数据
int numRecords = 10000;
int remainingRecords = 10000;
// 消费来自 topic = kafka-study 的数据
KafkaConsumer<Integer, String> consumer = createKafkaConsumer(bootstrapServers,groupName,false);
//订阅主题列表以获取动态分配的分区此类实现了我们在此处传递的再平衡侦听器,以接收此类事件的通知
consumer.subscribe(singleton(topicName));
Utils.printOut("Subscribed to %s", topicName);
while (remainingRecords > 0) {
try {
// 如果需要,轮询会更新分区分配并调用配置的重新平衡侦听器,然后尝试使用上次提交的偏移量或auto.offset.reset按顺序获取记录。
// 如果有记录或超时返回空记录集,则重置策略会立即返回。下一次轮询必须在session.timeout.ms中调用,以避免组重新平衡
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Integer, String> record : records) {
Utils.maybePrintRecord(numRecords, record);
}
remainingRecords -= records.count();
} catch (AuthorizationException | UnsupportedVersionException
e) {
// 我们无法从这些异常中恢复过来
Utils.printErr(e.getMessage());
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
// 在没有auto.reset.policy的情况下,找不到偏移量或偏移量无效
Utils.printOut("Invalid or no offset found, using latest");
consumer.seekToEnd(e.partitions());
consumer.commitSync();
} catch (KafkaException e) {
// 记录异常并尝试继续
Utils.printErr(e.getMessage());
}
}
consumer.close();
Utils.printOut("Fetched %d records", numRecords - remainingRecords);
}
private static KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers,
String groupId , boolean readCommitted) {
Properties props = new Properties();
// 消费者连接到broker需要引导服务器配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/port
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
// 当我们使用订阅(topic)进行组管理时,需要消费者groupId
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//设置静态成员资格以提高可用性(例如滚动重启)
// instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
//启用EOS时禁用自动提交,因为偏移量与事务一起提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");
//读取数据用到的反序列化器,需要和生产者对应
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (readCommitted) {
// 跳过正在进行和已中止的事务
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
}
// 在偏移无效或没有偏移的情况下设置重置偏移策略
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
}
五、运行程序
生产者日志打印
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(0, test0), partition(kafka-study-0), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(1, test1), partition(kafka-study-0), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(5, test5), partition(kafka-study-0), offset(2)......
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9940, test9940), partition(kafka-study-0), offset(4979)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9960, test9960), partition(kafka-study-0), offset(4987)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9970, test9970), partition(kafka-study-0), offset(4991)kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(2, test2), partition(kafka-study-1), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(3, test3), partition(kafka-study-1), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(4, test4), partition(kafka-study-1), offset(2).......
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9950, test9950), partition(kafka-study-1), offset(4966)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9980, test9980), partition(kafka-study-1), offset(4986)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9990, test9990), partition(kafka-study-1), offset(4991)
我们再次用命令看下每个分区的offset
消费者日志打印
main - Subscribed to kafka-study
main - Sample: record(0, test0), partition(kafka-study-0), offset(0)
main - Sample: record(1000, test1000), partition(kafka-study-0), offset(506)
main - Sample: record(2000, test2000), partition(kafka-study-0), offset(1020)
main - Sample: record(3000, test3000), partition(kafka-study-0), offset(1554)
main - Sample: record(7000, test7000), partition(kafka-study-0), offset(3550)
main - Sample: record(4000, test4000), partition(kafka-study-1), offset(1929)
main - Sample: record(5000, test5000), partition(kafka-study-1), offset(2422)
main - Sample: record(6000, test6000), partition(kafka-study-1), offset(2932)
main - Sample: record(8000, test8000), partition(kafka-study-1), offset(3963)
main - Sample: record(9000, test9000), partition(kafka-study-1), offset(4467)
main - Fetched 10000 records
六、问题说明
从日志中我们可以看到,在异步生产和消费时offset并不是逐个递增上去的,这是为什么呢?
在前面博客中我们提到,生产者在异步的情况下会启用批处理,即:Kafka生产者将尝试在内存中积累数据,并在单个请求中发送更大的批处理。批处理可以配置为积累不超过固定数量的消息,并且等待时间不超过一些固定的延迟限制(例如64k或10毫秒)。这允许积累更多的消息来发送,并且在服务器上几乎没有更大的I/O操作。这种缓冲是可配置的,并提供了一种机制来权衡少量额外的延迟以获得更好的吞吐量。当然如果你选择的是同步推送或者异步中单条消息特别大会导致批处理优化使用不到。
消费者也是从brokers一批一批的拉取数据来消费的
我们也可以看下broker的日志中数据的索引情况
kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.log | head -10
kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.index | head -10
从这里我们可以看到,生产者是一批一批往broker推送的,broker以更大的批次往磁盘写,从而降低推送的频次,也降低与磁盘交互的频次。