1.新建项目
用的idea是20222.1.3版本,没有Spring Initializr 插件,不能直接创建springboot项目
可以在以下网址创建项目,下载后解压,然后用idea打开项目即可
1.1 在 https://start.spring.io/ 上创建项目
1.2上传到linux,解压
unzip Spring_Boot-kafka.zip
1.3 idea打开项目(文件-打开- Spring-Boot-kafka)
2.添加依赖(pox.xml)
(也可以选择修改对应的jdk版本,创建项目时勾选的是22)
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3.springboot生产者集成Kafka
3.1生产者api
新建controller包ProducerController类
package com.lir.Spring.Boot_kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
//导入注解
@RestController
public class ProducerController {
//自动注入内容到kafka模板
@Autowired
KafkaTemplate<String,String> kafka;
//导入注解,使外部能访问方法
@RequestMapping("/ljr")
//接收外部数据
public String data(String msg){
//通过kafka发送出去
kafka.send("customers",msg);
return "发送成功";
}
}
3.2修改application.properties文件
#连接kafka集群
spring.kafka.bootstrap-servers=node1:9092,node2:9092
# key value 序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
3.3运行SpringBootKafkaApplication
SpringBootKafkaApplication这个程序在创建项目时自动生成
打开网页输入地址node1:8080/ljr?msg="lisiguang" /ljr?msg均是ProducerController指定的
输入后网页返回发送成功
Kafka consumer 消费到信息
可见spring boot生成者集成Kafka成功
4.spring boot消费者集成Kafka
4.1消费者api
package com.lir.Spring.Boot_kafka.controller;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
//指定这个类为configuration类,以让KafkaListener能扫描到生效
@Configuration
public class MyConsumer {
//监听主题
@KafkaListener(topics = "customers")
public void consumerTopic(String msg){
System.out.println("接收信息:" + msg);
}
}
4.2修改application.properties文件
# key value 反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费者组id
spring.kafka.consumer.group-id=sbk
4.3运行SpringBootKafkaApplication
打开网页输入地址node1:8080/ljr?msg="lisi"
kafka生产信息
控制台有输出,spring boot消费者集成Kafka成功。