kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客
1、springboot中引入kafka依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
2、配置application.yml
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
传递String类型的消息
3、controller实现消息发送接口
package com.heima.kafkademo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String addMessage(){
kafkaTemplate.send("lakers-topic","湖人总冠军!");
return "ok";
}
}
4、component中实现接收类HelloListener
package com.heima.kafkademo.component;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class HelloListener {
@KafkaListener(topics = "lakers-topic")
public void onMessage(String msg){
System.out.println(msg);
}
}
5、测试
浏览器访问该接口并查看控制台
接收成功
传递对象类型的消息
思路:在传递消息时,将对象转为json字符串,在接收时再解析
1、controller实现发送
package com.heima.kafkademo.controller;
import com.alibaba.fastjson.JSON;
import com.heima.kafkademo.model.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String addMessage(){
User user = new User();
user.setName("勒布朗");
user.setAge(37);
user.setGender("男");
user.setJob("NBA球员");
kafkaTemplate.send("lakers-topic", JSON.toJSONString(user));
return "ok";
}
}
2、component实现接收类
package com.heima.kafkademo.component;
import com.alibaba.fastjson.JSON;
import com.heima.kafkademo.model.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class HelloListener {
@KafkaListener(topics = "lakers-topic")
public void onMessage(String msg){
System.out.println(JSON.parseObject(msg, User.class));
}
}
3、打印测试结果