目录
1.安装kafka
2.安装kafkamanager可视化工具
3.springboot整合kafka
1.pom导包
2.启动类和yml配置
3.代码演示
编写生产者:
消费者:
1.安装kafka
进入kafka官网下载对应版本kafka
kafka官网地址:Apache Kafka
kafka是使用Scala开发,所以版本号是由 Scala的版本号和Kafka版本号组成的,如:kafka_2.12-3.2.0 , 2.12是scala版本, 3.2.0是kafka版本,下载完成解压得到kafka,目录结构如下:
结构介绍:
bin :kafka的执行脚本 ,其中包括启动kafka的脚本:kafka-server-start.bat 和 zookeeper-server-start.bat 启动zookeeper的脚本(kafka内置有zookeeper) ,bin/windows 目录中的脚本是针对windows平台。
config : 配置文件目录 ,包括server.properties :kafka的配置 ; zookeeper.properties :zookeeper的配置, producer.properties:生产者的配置 ; consumer.properties 消费者的配置等等。
libs : 依赖的三方jar包
可以进入config文件夹,修改kafka和zookeeper配置文件:
zookeeper.properties是作为zookeeper的配置文件,dataDir为数据目录,clientPort为启动端口,比如你想修改zookeeper的默认端口通过配置文件修 clientPort=2181项即可 ,如下:
server.properties作为kafka的配置文件,我们关注下面几个配置,你也可以根据情况进行修改
broker.id =0 : 如果是做个多个kafka主机集群,那么brocker.id不能重复,0 ;1 ;2 增长
zookeeper.connect : zookeeper的地址 ,如果有多个zk就用逗号隔开配置多个地址
num.partions = 1 : 默认partions 数量默认为1
log.dirs : 日志目录,不建议放到tmp临时目录,一定要修改,如:log.dirs=d:/kafka-logs
在安装目录下运行cmd,使用命令:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
启动zookeeper
使用命令:
.\bin\windows\kafka-server-start.bat .\config\server.properties
启动kafka
2.安装kafkamanager可视化工具
进入git官网下载:kafka-manager 项目地址:https://github.com/yahoo/kafka-manager
可以直接下载release版本
下载好后需要解压然后对原始文件进行编译,编译完成后会的得到一个kafka-manager-1.3.3.23.zip文件,解压这个文件之后才能启动manager
这里建议大家直接下载编译好的mangaer,地址:
链接:https://pan.baidu.com/s/1oEC2XlPtlSZmOotPYGjpOQ?pwd=jne1
提取码:jne1
解压好后得到如下结构:
使用bin\kafka-manager命令启动kafka-manager:
启动完成之后访问:http://localhost:9000/ 可以看到kafkaManager主页:
第一次进入需要新建 Cluster
输入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服务器地址(如localhost:2181),选择最接近的Kafka版本(如0.8.1.1)
注意:如果没有在 Kafka 中配置过 JMX_PORT,千万不要选择第一个复选框。
Enable JMX Polling
如果选择了该复选框,Kafka-manager 可能会无法启动。
以下全使用默认设置:
点击进入刚刚创建的集群即可看到如下结构:
点击topics可以看到所有创建的topic主题;brokers则代表所有集群内的kafka服务,有几个服务就会显示几个broker;点击topics可以进入查看topic
进入test_topic:
相关参数和使用教程文档可以参考这个大佬的文章:Kafka可视化管理工具kafka-manager部署安装和使用_kafka manager-CSDN博客
3.springboot整合kafka
1.pom导包
创建一个maven结构的springboot项目,首先在pom中导入如下依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.28</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!-- swagger -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
2.启动类和yml配置
导入依赖之后需要为SpringBoot创建启动类,在启动类中我们通过注解的方式创建一个Topic,如下:
@SpringBootApplication
public class KafKaApplication {
private static final String TOPIC_NAME = "kafka_test_topic";
public static void main(String[] args) {
SpringApplication.run(KafKaApplication.class);
}
//通过定义Bean的方式创建Topic
@Bean
public NewTopic topicHello(){
//创建Topic : topic名字, partition数量 , replicas副本数量
return TopicBuilder.name(TOPIC_NAME).build();
}
}
在yml中对kafka做一些常规配置,如下:
server:
port: 12012
spring:
application:
name: application-kafka
kafka:
bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer #键序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer #值序列化
retries: 1 # 消息发送重试次数
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
batch-size: 16384 #批量大小
properties:
linger:
ms: 0 #提交延迟
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #键序列化
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #值序列化
group-id: test-consumer-group #消费者的ID,这个对应 config/consumer.properties中的group.id
更详细的全局配置以及说明:
spring :kafka :bootstrap-servers : 192.168.10.70 : 9092producer :# 发生错误后,消息重发的次数。retries : 0# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size : 16384# 设置生产者内存缓冲区的大小。buffer-memory : 33554432# 键的序列化方式key-serializer : org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer : org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks : 1consumer :# 自动提交的时间间隔 在 spring boot 2.X 版本中这里采用的是值的类型为 Duration 需要符合特定的格式,如 1S,1M,2H,5Dauto-commit-interval : 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest (默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset : earliest# 是否自动提交偏移量,默认值是 true, 为了避免出现重复数据和数据丢失,可以把它设置为 false,然后手动提交偏移量enable-auto-commit : false# 键的反序列化方式key-deserializer : org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer :org.apache.kafka.common.serialization.StringDeserializer# 配置使用默认的消费组 IDgroup-id : defaultConsumerGrouplistener :# 在侦听器容器中运行的线程数。concurrency : 5#listner 负责 ack ,每调用一次,就立即 commitack-mode : manual_immediatemissing-topics-fatal : false
3.代码演示
编写生产者:
编写生产者案例 ,Kafka提供了 KafkaTemplate
用来向Kafka发送消息,直接在查询中注入即可使用。KafkaTemplate提供了很多个重载的send方法,方法返回ListenableFuture对象,即发送的结果对象。
同步阻塞
需要特别注意的是: future.get()方法会阻塞,他会一直尝试获取发送结果,如果Kafka迟迟没有返回发送结果那么程序会阻塞到这里。所以这种发送方式是同步的。
当然如果你的消息不重要允许丢失你也可以直接执行 : kafkaTemplate.send ,不调用get()方法获取发送结果,程序就不会阻塞,当然你也就不知道消息到底有没有发送成功。
异步非阻塞
幸好Kafka为 ListenableFuture 提供了Callback异步回调,我们可以通过异步回调来接收发送结果
@RestController("/producer")
@Api(tags = "生产者示例接口", description = "生产者示例接口 | 消息发送测试接口", hidden = false)
public class ProducerContrller {
private static final String TOPIC_NAME = "kafka_test_topic";
@Autowired
private KafkaTemplate<Object,Object> kafkaTemplate;
/**
* 同步,阻塞消息队列
* @param msg
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
@PostMapping("/sendSyncMsg/{msg}")
@ApiOperation(value = "生产者生成数据", notes = "同步,阻塞消息队列")
@ApiImplicitParams({
@ApiImplicitParam(name = "msg", value = "需要发送的数据", required = true, dataType = "String"),
})
public String sendSyncMsg(@PathVariable("msg")String msg) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(TOPIC_NAME, msg);
System.out.println("发送结果:"+future.get().toString());
return "发送成功";
}
/**
* 异步,非阻塞消息队列
* @param msg
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
@PostMapping("/sendAsyncMsg/{msg}")
@ApiOperation(value = "生产者生成数据", notes = "异步,非阻塞消息队列")
@ApiImplicitParams({
@ApiImplicitParam(name = "msg", value = "需要发送的数据", required = true, dataType = "String"),
})
public String sendAsyncMsg(@PathVariable("msg")String msg) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(TOPIC_NAME, msg);
future.addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable ex) {
ex.getStackTrace();
}
@Override
public void onSuccess(SendResult<Object, Object> result) {
System.out.println("发送结果:"+result);
}
});
return "发送成功";
}
}
也有原生的使用KafkaProducer的方式创建生产者发送消息,这样的好处是可以灵活配置,不需要每次对kafka配置修改后就要重启服务
以下是代码示例:
/**
* 原生构建KafkaProducer的生产者方法接口
*
* @param msg
* @return
*/
@PostMapping("/sendMsgByProducer/{msg}")
@ApiOperation(value = "生产者生成数据", notes = "原生构建KafkaProducer的生产者方法接口")
@ApiImplicitParams({
@ApiImplicitParam(name = "msg", value = "需要发送的数据", required = true, dataType = "String"),
})
public String sendMsgByProducer(@PathVariable("msg") String msg){
// 创建一个 Map或Properties 对象,用于构建 Kafka 生产者的配置信息
// Properties map = new Properties();
Map map = new HashMap();
// 这个是kafka的地址,对应你server.properties中配置的
map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 键序列化
map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 值序列化
map.put(ProducerConfig.RETRIES_CONFIG, 1); // 设置重试次数
map.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 设置重试间隔
if (false) {
// 添加ssl认证
String userName = "";
String passWord = "";
map.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
map.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
map.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + userName + "\" password=\"" + passWord + "\";");
}
// 创建 KafkaProducer 对象 kafkaProducer,并传入配置信息 map
KafkaProducer kafkaProducer = new KafkaProducer<>(map);
// 创建要发送的消息 ProducerRecord,同时订阅主题和需要发送的内容
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, msg);
// 1.不需要回调的消息发送方式
kafkaProducer.send(record);
// 创建一个 CompletableFuture 对象 future 用于异步处理发送消息的结果
CompletableFuture<Object> future = new CompletableFuture<>();
try {
// 2.需要回调的消息发送方式
kafkaProducer.send(record, (data, exception) -> {
if (exception == null) {
System.out.println(String.format("Message sent successfully! Topic: {} Partition: {} Offset: {}", data.topic(), data.partition(), data.offset()));
future.complete("消息投递成功,无异常"); // 成功时完成future
} else {
System.out.println(String.format("Error sending message: " + exception.getMessage(), exception));
future.completeExceptionally(exception); // 错误时传递异常
}
});
}catch (Exception e){
e.printStackTrace();
} finally {
// 关闭生产者通道,释放资源
kafkaProducer.flush();
kafkaProducer.close();
}
return "发送成功";
}
消费者:
使用@KafkaListener
注释来接收消息,用法比较简单,实例如下:
@Component
public class HelloConsumer {
@KafkaListener(topics = "kafka_test_topic")
public void handle(ConsumerRecord consumerRecord) {
System.out.println("消费者消费消息:" + consumerRecord);
System.out.println(String.format("消费者收到消息,topic:%s,partition:%s", consumerRecord.topic(), consumerRecord.partition()));
System.out.println("消费内容:" + consumerRecord.value());
}
//消费消息的时候,给方法添加 Acknowledgment 参数用来签收消息
@KafkaListener(topics = "kafka_test_topic", containerFactory = "kafkaManualAckListenerContainerFactory")
public void handler(String message, Acknowledgment ack){
System.out.println("收到消息:"+message);
//确认收到消息
ack.acknowledge();
}
}
也有原生的使用KafkaProducer的方式创建生产者发送消息的示例:
使用while循环来保证达到与注解的方式相同的实时接收消息的相同的功能,这样的好处是可以灵活配置,可以每次订阅多个不同的topic,不使用的topic可以直接释放掉
@RestController
@Api(tags = "消费者示例接口", description = "消费者示例接口 | 消费消息测试接口", hidden = false)
public class ConsumerContrller {
@PostMapping("/useMsg")
@ApiOperation(value = "消费者消费数据", notes = "消费者消费消息数据")
public String useMsg(){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("kafka_test_topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} catch (WakeupException e) {
// Ignore exception for shutdown
} finally {
consumer.close();
}
return "消费成功";
}
}