下载 Apache Kafka
演示window 安装
编写启动脚本,脚本的路径根据自己实际的来
启动说明
先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper
巧记: 铲屎官(zookeeper)总是第一个到,最后一个走
启动zookeeper
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
启动kafka
call bin/windows/kafka-server-start.bat config/server.properties
测试脚本,主要用于创建主题 ‘test-topic’
# 创建主题(窗口1)
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --create
# 查看主题
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --list
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --describe
# 修改某主题的分区
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --alter --partitions 2
# 生产消息(窗口2)向test-topic主题发送消息
bin/window> kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test-topic
>hello kafka
# 消费消息(窗口3)消费test-topic主题的消息
bin/window> kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic
package com.ldj.kafka.admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* User: ldj
* Date: 2024/6/13
* Time: 0:00
* Description: 创建主题
*/
public class AdminTopic {
public static void main(String[] args) {
Map<String, Object> adminConfigMap = new HashMap<>();
adminConfigMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
NewTopic topic1 = new NewTopic("topic-01", 1, (short) 1);
NewTopic topic2 = new NewTopic("topic-02", 2, (short) 1);
AdminClient adminClient = AdminClient.create(adminConfigMap);
CreateTopicsResult addResult = adminClient.createTopics(Arrays.asList(topic1, topic2));
//DeleteTopicsResult delResult = adminClient.deleteTopics(Arrays.asList("topic-02"));
adminClient.close();
}
}
package com.ldj.kafka.producer;
import com.alibaba.fastjson.JSON;
import com.ldj.kafka.model.UserEntity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* User: ldj
* Date: 2024/6/12
* Time: 21:08
* Description: 生产者
*/
public class KfkProducer {
public static void main(String[] args) {
//生产者配置
Map<String, Object> producerConfigMap = new HashMap<>();
producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfigMap);
//构建消息 ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
try {
for (int i = 0; i < 10; i++) {
UserEntity userEntity = new UserEntity()
.setUserId(2436687942335620L + i)
.setUsername("lisi")
.setGender(1)
.setAge(18);
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic",
userEntity.getUserId().toString(),
JSON.toJSONString(userEntity));
//发送数据到Broker
producer.send(record, (RecordMetadata var1, Exception var2) -> {
if (Objects.isNull(var2)) {
System.out.printf("[%s]消息发送成功!", userEntity.getUserId());
} else {
System.out.printf("[%s]消息发送失败!err:%s", userEntity.getUserId(), var2.getCause());
}
});
}
} finally {
//关闭通道
producer.close();
}
}
}
package com.ldj.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* User: ldj
* Date: 2024/6/12
* Time: 21:10
* Description: 消费者
*/
public class KfkConsumer {
public static void main(String[] args) {
//消费者配置
Map<String, Object> consumerConfigMap = new HashMap<>();
consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//所属消费组
consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test123456");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigMap);
//消费主题的消息 ConsumerRebalanceListener
consumer.subscribe(Collections.singletonList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
//数据存储结构:Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
} finally {
//关闭消费者
consumer.close();
}
}
}