SpringBoot整合Kafka
文章目录
- SpringBoot整合Kafka
- 下载与安装
- 创建topic,测试生产消费程序
- SpringBoot整合Kafka
- 导坐标
- 做配置
- 做客户端
下载与安装
下载地址:
https://kafka.apache.org/downloads
下载2的版本,3.的版本会报错
解压安装,记得放在D盘根目录下,要不然启动时会报命令行太长的错误
zookeeper-server-start.bat ../../config/zookeeper.properties
kafka-server-start.bat ../../config/server.properties
先启动zookeeper,在启动kafka(这两个在bin下的windows目录下),都需要指定配置文件(配置文件在config目录下)。
创建topic,测试生产消费程序
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic itheima
kafka-topics.bat --delete --zookeeper localhost:2181 --topic itheima
kafka-console-producer.bat --broker-list localhost:9092 --topic itheima2022
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic itheima2022 --from-beginning
SpringBoot整合Kafka
导坐标
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
做配置
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order
做客户端
@Service
public class MessageServiceKafkaImpl implements MessageService {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列 (kafka),id:" + id);
kafkaTemplate.send("itheima2022",id);
}
@Override
public String doMessage() {
return null;
}
}
@Component
public class MessageListener {
@KafkaListener(topics = "itheima2022")
public void onMessage(ConsumerRecord<String,String> record){
System.out.println("已完成短信发送业务(kafka):id:"+record.value());
}
}