1. 安装
单机安装kafka
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
dockerhub网址: https://hub.docker.com
- Docker安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
- Docker安装kafka
下载镜像:
docker pull wurstmeister/kafka:latest
docker pull bitnami/kafka:3.6.2 (用这个会有问题,因为创建容器时参数设置与wurstmeister/kafka不同)
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.131 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.131:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.131:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:latest
- 测试
终端窗口A
[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181 (创建主题)
Created topic test.
bash-5.1# kafka-console-producer.sh --broker-list localhost:9092 --topic test (创建生产者)
>hello (发送消息)
>haha
终端窗口B
[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning (创建接收者)
hello (收到了消息)
haha
- 安装kafka可视化工具(运行容器后打不开,不知道为啥)
docker run -d --name kafka-eagle -p 8048:8048 -e EFAK_CLUSTER_ZK_LIST="192.168.200.131:2181" nickzurich/efak:latest
集群安装
- kafka.yml
version: '3.8'
services:
zookeeper:
image: zookeeper:3.7.0
restart: always
hostname: 192.168.200.131
container_name: zookeeper
privileged: true
ports:
- 2181:2181
volumes:
- /usr/local/server/zookeeper/data/:/data
build:
context: .
network: host
kafka1:
container_name: kafka1
restart: always
image: wurstmeister/kafka:latest
privileged: true
ports:
- 9092:9092
- 19092:19092
environment:
KAFKA_BROKER_ID: 1
HOST_IP: 192.168.200.131
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9092 ## 宿主机IP
KAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181
#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131
KAFKA_ADVERTISED_PORT: 9092
KAFKA_PORT: 9092
KAFKA_delete_topic_enable: 'true'
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19092"
JMX_PORT: 19092
volumes:
/etc/localtime:/etc/localtime
depends_on:
zookeeper
kafka2:
container_name: kafka2
restart: always
image: wurstmeister/kafka:latest
privileged: true
ports:
- 9093:9093
- 19093:19093
environment:
KAFKA_BROKER_ID: 2
HOST_IP: 192.168.200.131
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9093 ## 宿主机IP
KAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181
#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131
KAFKA_ADVERTISED_PORT: 9093
KAFKA_PORT: 9093
KAFKA_delete_topic_enable: 'true'
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19093"
JMX_PORT: 19093
volumes:
/etc/localtime:/etc/localtime
depends_on:
zookeeper
kafka3:
container_name: kafka3
restart: always
image: wurstmeister/kafka:latest
privileged: true
ports:
- 9094:9094
- 19094:19094
environment:
KAFKA_BROKER_ID: 3
HOST_IP: 192.168.200.131
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9094 ## 宿主机IP
KAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181
#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131
KAFKA_ADVERTISED_PORT: 9094
KAFKA_PORT: 9094
KAFKA_delete_topic_enable: 'true'
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19094"
JMX_PORT: 19094
volumes:
/etc/localtime:/etc/localtime
depends_on:
zookeeper
eagle:
image: gui66497/kafka_eagle
container_name: eagle_monitor
restart: always
depends_on:
- kafka1
- kafka2
- kafka3
ports:
- "8048:8048"
environment:
ZKSERVER: "192.168.200.131:2181"
- 命令
docker-compose -f kafka.yml up -d
docker-compose -f kafka.yml down
docker-compose -f kafka.yml ps
[root@192 images]# ls
kafka.yml
[root@192 images]# docker-compose -f kafka.yml up -d
[+] Running 6/6
⠿ Network images_default Created 0.1s
⠿ Container kafka2 Started 1.0s
⠿ Container kafka3 Started 1.0s
⠿ Container zookeeper Started 1.0s
⠿ Container kafka1 Started 1.0s
⠿ Container eagle_monitor Started 1.5s
[root@192 images]#
// 但是还是用不了eagle,不知道为啥,防火墙是已经关了
2. springboot集成
目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不介绍
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,这里采用这种方式
2.1 创建单点kafka和topic
[root@192 images]# docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
700f01ad38e99df4a8a7979a66cb88e6b629dccc29820c18dd3213ebc60c5814
[root@192 images]# docker run -d --name kafka \
> --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.131 \
> --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.131:2181 \
> --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.131:9092 \
> --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
> --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
> --net=host wurstmeister/kafka:latest
5884d54092ede091c2572e6420158529de29cf8e98da3706a572e1fa1408182e
[root@192 images]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic test.
bash-5.1# kafka-topics.sh --create --topic user-topic --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic user-topic.
2.2 创建生产者
dependencies
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
application.yml
server:
port: 8080
spring:
application:
name: kafka-producer
kafka:
bootstrap-servers: 192.168.200.131:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
controller-发送消息
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String hello(){
kafkaTemplate.send("test","springboot发的第一条消息");
return "ok";
}
@GetMapping("/helloUser")
public String helloUser(){
User user = new User();
user.setName("xiaowang");
user.setAge(18);
kafkaTemplate.send("user-topic", JSON.toJSONString(user));
return "ok";
}
}
User
public class User {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
2.3 创建消费者
dependencies
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
application.yml
server:
port: 8081
spring:
application:
name: kafka-consumer
kafka:
bootstrap-servers: 192.168.200.131:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
User
public class User {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
消息监听器
@Component
public class HelloListener {
@KafkaListener(topics = "test")
public void onMessage1(String message){
if(!StringUtils.isEmpty(message)){
System.out.println(message);
}
}
@KafkaListener(topics = "user-topic")
public void onMessage(String message){
if(!StringUtils.isEmpty(message)){
User user = JSON.parseObject(message, User.class);
System.out.println(user.toString());
}
}
}
启动生产者和消费者项目,浏览器输入http://127.0.0.1:8080/hello,发现消费者收到消息
浏览器输入http://127.0.0.1:8080/helloUser,发现消费者收到消息
项目结构
3.其它
通常在监听类直接调用service方法
@Component
@Slf4j
public class ArtilceIsDownListener {
@Autowired
private ApArticleConfigService apArticleConfigService;
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message){
if(StringUtils.isNotBlank(message)){
Map map = JSON.parseObject(message, Map.class);
apArticleConfigService.updateByMap(map);
log.info("article端文章配置修改,articleId={}",map.get("articleId"));
}
}
}