kafka安装配置及集成springboot

1. 安装

单机安装kafka
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
dockerhub网址: https://hub.docker.com

  • Docker安装zookeeper

下载镜像:

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
  • Docker安装kafka

下载镜像:

docker pull wurstmeister/kafka:latest
docker pull bitnami/kafka:3.6.2 (用这个会有问题,因为创建容器时参数设置与wurstmeister/kafka不同)

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.131 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.131:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.131:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:latest

  • 测试
    终端窗口A
[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181   (创建主题)
Created topic test.
bash-5.1# kafka-console-producer.sh --broker-list localhost:9092 --topic test   (创建生产者)
>hello    (发送消息)
>haha

终端窗口B

[root@192 ~]# docker exec -it kafka /bin/bash
bash-5.1# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning   (创建接收者)
hello    (收到了消息)
haha
  • 安装kafka可视化工具(运行容器后打不开,不知道为啥)
docker run -d --name kafka-eagle -p 8048:8048 -e EFAK_CLUSTER_ZK_LIST="192.168.200.131:2181" nickzurich/efak:latest

集群安装

  1. kafka.yml
version: '3.8'
services:
  zookeeper:
    image: zookeeper:3.7.0
    restart: always
    hostname: 192.168.200.131
    container_name: zookeeper
    privileged: true
    ports:
      - 2181:2181
    volumes:
      - /usr/local/server/zookeeper/data/:/data
    build:
      context: .
      network: host

  kafka1:
    container_name: kafka1
    restart: always
    image: wurstmeister/kafka:latest
    privileged: true
    ports:
      - 9092:9092
      - 19092:19092
    environment:
      KAFKA_BROKER_ID: 1
      HOST_IP: 192.168.200.131
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9092    ## 宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181
      #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
      KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_PORT: 9092
      KAFKA_delete_topic_enable: 'true'
      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19092"
      JMX_PORT: 19092
      volumes:
        /etc/localtime:/etc/localtime
      depends_on:
        zookeeper
  kafka2:
    container_name: kafka2
    restart: always
    image: wurstmeister/kafka:latest
    privileged: true
    ports:
      - 9093:9093
      - 19093:19093
    environment:
      KAFKA_BROKER_ID: 2
      HOST_IP: 192.168.200.131
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9093    ## 宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181
      #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
      KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_PORT: 9093
      KAFKA_delete_topic_enable: 'true'
      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19093"
      JMX_PORT: 19093
      volumes:
        /etc/localtime:/etc/localtime
      depends_on:
        zookeeper
  kafka3:
    container_name: kafka3
    restart: always
    image: wurstmeister/kafka:latest
    privileged: true
    ports:
      - 9094:9094
      - 19094:19094
    environment:
      KAFKA_BROKER_ID: 3
      HOST_IP: 192.168.200.131
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.200.131:9094    ## 宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: 192.168.200.131:2181
      #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
      KAFKA_ADVERTISED_HOST_NAME: 192.168.200.131
      KAFKA_ADVERTISED_PORT: 9094
      KAFKA_PORT: 9094
      KAFKA_delete_topic_enable: 'true'
      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.200.131 -Dcom.sun.management.jmxremote.rmi.port=19094"
      JMX_PORT: 19094
      volumes:
        /etc/localtime:/etc/localtime
      depends_on:
        zookeeper

  eagle:
    image: gui66497/kafka_eagle
    container_name: eagle_monitor
    restart: always
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    ports:
      - "8048:8048"
    environment:
      ZKSERVER: "192.168.200.131:2181"
  1. 命令

docker-compose -f kafka.yml up -d
docker-compose -f kafka.yml down
docker-compose -f kafka.yml ps

[root@192 images]#  ls
kafka.yml
[root@192 images]# docker-compose -f kafka.yml up -d
[+] Running 6/6
 ⠿ Network images_default   Created                                                                                        0.1s
 ⠿ Container kafka2         Started                                                                                        1.0s
 ⠿ Container kafka3         Started                                                                                        1.0s
 ⠿ Container zookeeper      Started                                                                                        1.0s
 ⠿ Container kafka1         Started                                                                                        1.0s
 ⠿ Container eagle_monitor  Started                                                                                        1.5s
[root@192 images]# 

// 但是还是用不了eagle,不知道为啥,防火墙是已经关了

2. springboot集成

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,这里采用这种方式

2.1 创建单点kafka和topic

[root@192 images]# docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
700f01ad38e99df4a8a7979a66cb88e6b629dccc29820c18dd3213ebc60c5814
[root@192 images]# docker run -d --name kafka \
> --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.131 \
> --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.131:2181 \
> --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.131:9092 \
> --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
> --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
> --net=host wurstmeister/kafka:latest
5884d54092ede091c2572e6420158529de29cf8e98da3706a572e1fa1408182e
[root@192 images]# docker exec -it kafka /bin/bash
bash-5.1# kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic test.
bash-5.1# kafka-topics.sh --create --topic user-topic --partitions 1 --replication-factor 1 --zookeeper 192.168.200.131:2181
Created topic user-topic.

2.2 创建生产者

dependencies

<!-- kafkfa -->
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
       <exclusions>
           <exclusion>
               <groupId>org.apache.kafka</groupId>
               <artifactId>kafka-clients</artifactId>
           </exclusion>
       </exclusions>
   </dependency>
   <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
   </dependency>
   <dependency>
       <groupId>com.alibaba</groupId>
       <artifactId>fastjson</artifactId>
       <version>1.2.83</version>
   </dependency>

application.yml

server:
  port: 8080
spring:
  application:
    name: kafka-producer
  kafka:
    bootstrap-servers: 192.168.200.131:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

controller-发送消息

@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("test","springboot发的第一条消息");
        return "ok";
    }

    @GetMapping("/helloUser")
    public String helloUser(){
        User user = new User();
        user.setName("xiaowang");
        user.setAge(18);

        kafkaTemplate.send("user-topic", JSON.toJSONString(user));

        return "ok";
    }
}

User

public class User {
    private String name;
    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

2.3 创建消费者

dependencies

<!-- kafkfa -->
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
       <exclusions>
           <exclusion>
               <groupId>org.apache.kafka</groupId>
               <artifactId>kafka-clients</artifactId>
           </exclusion>
       </exclusions>
   </dependency>
   <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
   </dependency>
   <dependency>
       <groupId>com.alibaba</groupId>
       <artifactId>fastjson</artifactId>
       <version>1.2.83</version>
   </dependency>

application.yml

server:
  port: 8081
spring:
  application:
    name: kafka-consumer
  kafka:
    bootstrap-servers: 192.168.200.131:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

User

public class User {
    private String name;
    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

消息监听器

@Component
public class HelloListener {

    @KafkaListener(topics = "test")
    public void onMessage1(String message){
        if(!StringUtils.isEmpty(message)){
            System.out.println(message);
        }

    }

    @KafkaListener(topics = "user-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            User user = JSON.parseObject(message, User.class);
            System.out.println(user.toString());
        }

    }
}

启动生产者和消费者项目,浏览器输入http://127.0.0.1:8080/hello,发现消费者收到消息
在这里插入图片描述
浏览器输入http://127.0.0.1:8080/helloUser,发现消费者收到消息
在这里插入图片描述
项目结构
在这里插入图片描述

3.其它

通常在监听类直接调用service方法

@Component
@Slf4j
public class ArtilceIsDownListener {

    @Autowired
    private ApArticleConfigService apArticleConfigService;

    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void onMessage(String message){
        if(StringUtils.isNotBlank(message)){
            Map map = JSON.parseObject(message, Map.class);
            apArticleConfigService.updateByMap(map);
            log.info("article端文章配置修改,articleId={}",map.get("articleId"));
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/619196.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SeetaFace6人脸活体检测C++代码实现Demo

SeetaFace6包含人脸识别的基本能力&#xff1a;人脸检测、关键点定位、人脸识别&#xff0c;同时增加了活体检测、质量评估、年龄性别估计&#xff0c;并且顺应实际应用需求&#xff0c;开放口罩检测以及口罩佩戴场景下的人脸识别模型。 官网地址&#xff1a;https://github.co…

【CSP CCF记录】数组推导

题目 过程 思路 每次输入一个Bi即可确定一个Ai值&#xff0c;用temp记录1~B[i-1]&#xff0c;的最大值分为两种情况&#xff1a; 当temp不等于Bi时&#xff0c;则说明Bi值之前未出现过&#xff0c;Ai必须等于Bi才能满足Bi是Ai前缀最大的定义。当temp等于Bi时&#xff0c;则说…

后端开发之用Mybatis简化JDBC的开发快速入门2024及数据库连接池技术和lombok工具详解

JDBC 简化JDBC的开发 JDBC仅仅是一套接口 是一套规范 Mybatis是持久层框架 用于简化JDBC的开发 使用Java语言操作关系型数据库的一套API 原始的JDBC程序 package com.bigdate.mybatis;import com.bigdate.mybatis.mapper.UserMapper; import com.bigdate.mybatis.pojo.Use…

(二)Jetpack Compose 布局模型

前文回顾 &#xff08;一&#xff09;Jetpack Compose 从入门到会写-CSDN博客 首先让我们回顾一下上一篇文章中里提到过几个问题&#xff1a; ComposeView的层级关系&#xff0c;互相嵌套存在的问题&#xff1f; 为什么Compose可以实现只测量一次&#xff1f; ComposeView和…

【JVM】感觉弗如...类加载机制

【JVM】感觉弗如…类加载机制 在Java开发过程中&#xff0c;从源代码&#xff08;.java文件&#xff09;到字节码&#xff08;.class文件&#xff09;再到运行时的类加载&#xff0c;会经历几个关键步骤&#xff0c;我们先简单过一遍大体的过程。再介绍今天这篇博客的重点内容—…

几个字符串函数的使用和模拟实现(2)

strcop的使用和模拟实现 strcpy函数的使用事项&#xff1a; 源字符串时不需要修改的&#xff0c;在定义前加上const 源字符串被拷贝到目标字符串上时终止字符\0也被拷贝进去 目标数组的大小要相对于源数组的大小足够大&#xff0c;并且不应该在内存中重叠 函数的返回值是一个字…

【Unity】Unity项目转抖音小游戏(二)云数据库和云函数

业务需求&#xff0c;开始接触一下抖音小游戏相关的内容&#xff0c;开发过程中记录一下流程。 抖音云官方文档&#xff1a;https://developer.open-douyin.com/docs/resource/zh-CN/developer/tools/cloud/develop-guide/cloud-function-debug 1.开通抖音云环境 抖音云地址&a…

软件体系结构风格

目录 一、定义 二、.经典软件体系结构风格&#xff1a; 1.管道和过滤器 2.数据抽象和面向对象系统 3.基于事件系统&#xff08;隐式调用&#xff09; 4.分层系统 5.仓库 6.C2风格 7.C/S 8.三层C/S 9.B/S 题&#xff1a; 一、定义 软件体系机构风格是描述某一特定应用…

C#泛型委托

在C#中&#xff0c;delegate 关键字用于声明委托&#xff08;delegates&#xff09;&#xff0c;委托是一种类型安全的函数指针&#xff0c;允许你传递方法作为参数或从方法返回方法。有时我们需要将一个函数作为另一个函数的参数&#xff0c;这时就要用到委托&#xff08;Dele…

java项目之车辆管理系统(springboot+vue+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的车辆管理系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 车辆管理系统的主要使用者分…

Deckset for Mac:让演示文稿制作更轻松

还在为繁琐的演示文稿制作而烦恼吗&#xff1f;Deckset for Mac来帮您解决&#xff01;它支持Markdown语言&#xff0c;让您只需专注于内容的创作&#xff0c;无需在排版和设计上耗费过多精力。丰富的主题和布局选项&#xff0c;让您能够轻松打造出专业级的演示文稿。快来体验D…

云计算第十二课

安装虚拟机 第一步新建虚拟机 选择自定义安装 下一步 选择稍后安装操作系统 选择系统类型和版本 选择虚拟机文件路径&#xff08;建议每台虚拟机单独存放并且路径不要有中文&#xff09;点击下一步 选择bios下一步 选择虚拟机处理器内核数量 默认硬盘或者自行调大硬盘 选择虚…

软件测试的分类

1.用户分类 2.查看代码分类 3.阶段分类

云计算十三课

centos安装 点击左上角文件 点击新建虚拟机 点击下一步 点击稍后安装操作系统&#xff0c;下一步 选择Linux&#xff08;l&#xff09;下一步 设置虚拟机名称 点击浏览选择安装位置 新建文件夹设置名称不能为中文&#xff0c;点击确定 点击下一步 设置磁盘大小点击下一步…

4.1 编写程序,从键盘接收一个小写字母,然后找出他的前导字符和后续字符,再按顺序显示这三个字符

方法一&#xff1a; 运行效果&#xff1a; 输入B&#xff0c;输出显示ABC&#xff1b;输入A&#xff0c;输出显示AB 思路&#xff1a; 1、通过键盘输入接收一个字母。 2、将输入的字母减去1&#xff0c;得到前导字符&#xff0c;然后输出。 3、将输入的字母加上1&#xff0c;得…

【python量化交易】qteasy使用教程07——创建更加复杂的自定义交易策略

创建更加复杂的自定义交易策略 使用交易策略类&#xff0c;创建更复杂的自定义策略开始前的准备工作本节的目标继承Strategy类&#xff0c;创建一个复杂的多因子选股策略策略和回测参数配置&#xff0c;并开始回测 本节回顾 使用交易策略类&#xff0c;创建更复杂的自定义策略 …

(四十)第 6 章 树和二叉树(树的双亲表存储)

1. 背景说明 2. 示例代码 1) errorRecord.h // 记录错误宏定义头文件#ifndef ERROR_RECORD_H #define ERROR_RECORD_H#include <stdio.h> #include <string.h> #include <stdint.h>// 从文件路径中提取文件名 #define FILE_NAME(X) strrchr(X, \\) ? strrch…

基于yolov5+streamlit目标检测演示系统设计

YOLOv5与Streamlit&#xff1a;智能目标检测可视化展示介绍 随着人工智能技术的飞速发展&#xff0c;目标检测技术已成为推动智能化社会进步的关键技术之一。在众多目标检测算法中&#xff0c;YOLOv5以其卓越的性能和实时性&#xff0c;成为了业界的佼佼者。与此同时&#xff…

代码随想录阅读笔记-动态规划【爬楼梯】

题目 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 注意&#xff1a;给定 n 是一个正整数。 示例 1&#xff1a; 输入&#xff1a; 2输出&#xff1a; 2解释&#xff1a; 有两种方法可以爬到楼…

[AutoSar]BSW_Diagnostic_002 DCM模块介绍

目录 关键词平台说明背景一、DCM所处架构位置二、DCM 与其他模块的交互三、DCM 的功能四、DCM的内部子模块4.1 Diagnostic Session Layer (DSL)4.1 DSL 与其他模块的交互 4.2 Diagnostic Service Dispatcher (DSD)4.3 Diagnostic Service Processing (DSP)4.4 小结 关键词 嵌入…