安装环境
Kafka安装可参考官方网站的指导(https://kafka.apache.org/quickstart), 按步骤解压压缩包,修改配置。然后再启动zookeeper和kafka-server即可。
需要注意的一点:如果是在VMware虚拟机上启动的kafka, 需要修改一下server.properties配置文件,增加如下配置:
advertised.listener指定访问kafka的IP和端口,IP设置为虚拟机暴露给外部访问的IP。通过本地代码连接kafka,需要使用该配置。
生产者代码样例
public class KafkaProducerService {
private static final String NEO_TOPIC = "elon-topic";
private KafkaProducer<String, String> producer = null;
public KafkaProducerService() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.5.128:9092");
props.put("acks", "0");
props.put("group.id", "1111");
props.put("retries", "2");
//设置key和value序列化方式
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
//生产者实例
producer = new KafkaProducer<>(props);
}
/**
* 外部调用的发消息接口
*/
public void sendMessage() {
for (int i = 0; i < 10; ++i) {
int p = i % 2;
ProducerRecord<String, String> record = new ProducerRecord(NEO_TOPIC, p, "neo", JSON.toJSONString(i));
producer.send(record);
}
}
}
发送消息时,将10个数据分别发送到0分区和1分区。
消费者代码样例
public class KafkaConsumerService {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
private static final String NEO_TOPIC = "elon-topic";
Properties properties = new Properties();
private KafkaConsumer consumer = null;
public KafkaConsumerService() {
properties.put("bootstrap.servers","192.168.5.128:9092"); // 指定 Broker
properties.put("group.id", "neo1"); // 指定消费组群 ID
properties.put("max.poll.records", "5");
properties.put("enable.auto.commit", "false");
properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象
properties.put("value.deserializer", StringDeserializer.class); // 将 value 的字节数组转成 Java 对象
consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList(NEO_TOPIC)); // 订阅主题 order-events
new Thread(this::receiveMessage).start();
}
public void receiveMessage() {
try {
while (true) {
synchronized (this) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
LOGGER.info("Fetch record num:{}", records.count());
for (ConsumerRecord<String,String> record: records) {
String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
LOGGER.info("Received:" + info);
Thread.sleep(100);
}
consumer.commitSync();
}
}
} catch (Exception e){
} finally {
consumer.close();
}
}
消费者按主题订阅。从打印的结果可以看到,消费者循环从topic下取出各个分区的消息依次消费。