Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门

参考文章

【Docker安装部署Kafka+Zookeeper详细教程】_linux arm docker安装kafka-CSDN博客

Docker搭建kafka+zookeeper

打开我们的docker的镜像源配置

vim /etc/docker/daemon.json

配置

 {
  "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}

 下面的那个insecure是我自己虚拟机的,不用理会

拉取镜像

然后开始拉取我们的zookeeper镜像和我们的kafka镜像

这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本

docker pull zookeeper

kafka镜像 

docker pull wurstmeister/kafka

因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络

让不同容器都加入这个网络

docker network create创建我们的网络

然后那个zookeeper_network是我们自定义的网络名称

docker network create --driver bridge zookeeper_network

kafka是依赖于zookeeper的所以我们要先安装zookeeper

我们先用run来创建一个zookeeper容器

 docker run -d --name zookeeper1  --network zookeeper_network -p 2181:2181   zookeeper

-d 是后台运行

--name 是我们自定义容器的名字  我定义的名字是zookeeper1

--network 

是指定我们的网络环境,我们刚刚创建的网络环境名字叫zookeeper_network,所以我们要让容器加入这个网络

-p 是指定我们的容器暴露给外部的端口  2181:2181是指虚拟机(或服务器)的2181端口与容器内部的2181端口做映射

最后面的那个zookeeper 是我们的使用的镜像源的名称

一般是zookeeper:xxx来执行使用镜像源的版本,如果不指定版本默认用的就是最新版本

查看我们创建的网络环境的地址

docker inspect zookeeper_network

那个IPv4就是我们的网络环境的地址,这是我的网络环境的地址

我的是12.21.0.2,这个ip地址是要记住方便后面使用的
 

创建一个kafka容器

这段代码有点长,根子自己改吧

 # 启动kafka
docker run -d --name kafka1  --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092  wurstmeister/kafka

解释 

KAFKA_ZOOKEEPER_CONNECT 后面写的是我们的之前的网络的地址

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我们的虚拟机(服务器)的本机的地址

不知道本机地址可以输入 ip addr来查看本机地址

这样子就搭建完成了


SpringBoot集成kafka

首先就是springboot和kafka的版本兼容了

Spring for Apache Kafka

然后我们引入两个kafka的依赖 

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>


<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.2.0</version>
</dependency>

自己对着自己的版本来

自己看b站视频,9分钟就搞定了

63-kafka-集成-Java场景-SpringBoot_哔哩哔哩_bilibili

然后开始写我们的application.yml配置文件

下面是配置文件的全部+解析

其实和普通mq差不多

也就是配置生产者和消费者和一些过期时间超时时间

重点在于那个missing-topics-fatal

主题不存在的话,我们是否还要成功启动

我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题

所以我启动的时候,报错然后失败了 

spring:
  kafka:
    bootstrap-servers: 192.168.88.130:9092  #Kafka 集群的地址和端口号
    producer:
      acks: all #生产者发送消息时, Kafka 集群需要确认的确认级别。all 表示需要所有 broker 确认消息已经写入
      batch-size: 16384  #生产者在发送消息时, 会先缓存一些消息, 达到 batch-size 后再批量发送。这个参数设置了批量发送的大小。
      buffer-memory: 33554432  #生产者用于缓存消息的内存大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  #定义了消息 key 和 value 的序列化方式。
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。
      retries: 0
    consumer:
      group-id: test #消费者组ID
      #消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费
      # earliest:无提交记录,表示从最早的消息开始消费
      #latest:无提交记录,从最新的消息的下一条开始消费
      auto-offset-reset: earliest  #当消费者没有提交过 offset 时, 从何处开始消费消息
      enable-auto-commit: true #是否自动提交偏移量offset
      auto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交 offset 的间隔时间
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  #定义了消息 key 和 value 的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式
      max-poll-records: 2  #一次 poll 操作最多返回的消息数量
      properties:
        #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
        #消费者与 Kafka 服务端的会话超时时间
        session.timeout.ms: 120000

        #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
        #消费者调用 poll 方法的最大间隔时间
        max.poll.interval.ms: 300000

        #消费者发送请求到 Kafka 服务端的超时时间
        #配置控制客户端等待请求响应的最长时间。
        #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
        #或者如果重试次数用尽,则请求失败。
        request.timeout.ms: 60000

        #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
        allow.auto.create.topics: true

        #消费者向协调器发送心跳的间隔时间。
        #poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一
        heartbeat.interval.ms: 40000

        #每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节
        #0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制
        #仍然会返回该消息,以确保消费者可以进行

        #每个分区最多拉取的消息字节数。
        #max.partition.fetch.bytes=1048576  #1M

    listener:
      #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
      #ack-mode: manual_immediate   手动 ACK 的方式。

      #如果监听的主题不存在, 是否启动失败。
      missing-topics-fatal: false #如果至少有一个topic不存在,true启动失败。false忽略

      #消费方式, single 表示单条消费, batch 表示批量消费
      #type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-records
      type: batch

      #并发消费的线程数
      concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲

      #默认的主题名称
    template:
      default-topic: "test"
      #springboot启动的端口号
server:
  port: 9999 #这个是java项目启动的端口

基本案例

这是常量类

指定了一个topic和group

主题和分组id

groupid是消费者组的唯一标识

这个视频9分钟看懂kafka

小朋友也可以懂的Kafka入门教程,还不快来学_哔哩哔哩_bilibili

生产者

我们这个Autowired自动注入,会根据我们的配置文件的配置来自动注入

@Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

produces里面指定我们前端传的是json格式

 我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"

消费者

 @KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)

    public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        for (String message : messages) {

            //因为这个String是Json,所以我们可以转回Object对象,其实是转成JsonObject对象
            final JSONObject entries = JSONUtil.parseObj(message);
            System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("name"));
            //ack.acknowledge();
        }
    }

我们用List<String>来接收,因为可能一个消费者接收多条消息

指定消费者监听的主题topic

以及指定消费者的唯一标识GROUP_ID

这些其实都是自己在常量类里面自己写好的



@Header(KafkaHeaders.RECEIVED_TOPIC) String topic

 这个是得到我们的主题topic的名字

我用apifox调试之后,成功执行了


kafka的图形化工具

这里介绍一个免费的开源项目KafkaKing

Releases · Bronya0/Kafka-King (github.com)

里面还能指定中文

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/788854.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

iPhone数据恢复篇:iPhone 数据恢复软件有哪些

问题&#xff1a;iPhone 15 最好的免费恢复软件是什么&#xff1f;我一直在寻找一个恢复程序来恢复从iPhone中意外删除的照片&#xff0c;联系人和消息&#xff0c;但是我有很多选择。 谷歌一下&#xff0c;你会发现许多付费或免费的iPhone数据恢复工具&#xff0c;声称它们可…

[C++]——同步异步日志系统(3)

同步异步日志系统 一、日志系统框架设计1.1模块划分1.1.1 日志等级模块1.1.2 日志消息模块1.1.3 日志消息格式化模块1.1.4 日志落地模块&#xff08;日志落地的方向是工厂模式&#xff09;1.1.5 日志器模块&#xff08;日志器的生成是建造者模式&#xff09;1.1.6 异步线程模块…

【HBZ分享】TCP连接完成后又是如何保证数据的可靠性传输

前提 发送发发送数据时&#xff0c;需要给出一个seq编号。第一个数据包的seq编号是一个随机数&#xff0c; 从第二个开始&#xff0c;seq编号就是【第一次的seq数据包大小】&#xff0c; 即接收方响应过来的期待数据包编号 ACK机制 接收方收到数据后&#xff0c;要给发送方回…

Html5前端基本知识整理与回顾下篇

今天我们继续结合发布的Html5基础知识点文档进行复习&#xff0c;希望对大家有所帮助。 目录 列表 无需列表 有序列表 自定义列表 样例 表格 基本属性 ​编辑 相关属性 Border Width Height ​编辑 表格标题 ​编辑 表格单元头 合并单元格 垂直单元格合并 水…

实践致知第12享:如何新建一个Word并设置格式

一、背景需求 小姑电话说&#xff1a;要新建一个Word文档&#xff0c;并将每段的首行设置空2格。 二、解决方案 1、在电脑桌面上空白地方&#xff0c;点击鼠标右键&#xff0c;在下拉的功能框中选择“DOC文档”或“DOCX文档”都可以&#xff0c;如下图所示。 之后&#xff0…

npm install失败,数据源过期

npm install时报错&#xff1a;“Unexpected token &#xff1c; in JSON at position 0 while parsing near ‘&#xff1c;!DOCTYPE html&#xff1e;” 执行Vue2项目安装时&#xff0c;出现报错了&#xff0c;显示ERROr: **npm ERR! Unexpected token < in JSON at posi…

58、基于径向基神经网络的曲线拟合(matlab)

1、基于径向基神经网络的曲线拟合简介及原理 1&#xff09;原理简介 基于径向基神经网络&#xff08;Radial Basis Function Neural Network, RBFNN&#xff09;的曲线拟合是一种常用的非线性拟合方法&#xff0c;通过在输入空间中使用径向基函数对数据进行处理&#xff0c;实…

cesium 雷达扫描

cesium 雷达扫描 (下面附有源码) 实现思路 1、通过改变圆型材质来实现效果, 2、用了模运算和步进函数(step)来创建一个重复的圆形图案 3、当纹理坐标st落在垂直或水平的中心线上时,该代码将改变透明度和颜色,以突出显示这些线 示例代码 <!DOCTYPE html> <ht…

完美解决windows开机时,系统提示此windows副本不是正版的正确解决方法,亲测有效!!!

完美解决windows开机时&#xff0c;系统提示此windows副本不是正版的正确解决方法&#xff0c;亲测有效&#xff01;&#xff01;&#xff01; 亲测有效 完美解决windows开机时&#xff0c;系统提示此windows副本不是正版的正确解决方法&#xff0c;亲测有效&#xff01;&#…

二分查找3

1. 有序数组中的单一元素&#xff08;540&#xff09; 题目描述&#xff1a; 算法原理&#xff1a; 二分查找解题关键就在于去找到数组的二段性&#xff0c;这里数组的二段性是从单个数字a开始出现然后分隔出来的&#xff0c;如果mid落入左半部分那么当mid为偶数时nums[mid1]…

来聊聊Redis持久化AOF管道通信的设计

写在文章开头 最近遇到很多烦心事&#xff0c;希望通过技术来得以放松&#xff0c;今天这篇文章笔者希望会通过源码的方式分析一下AOF如何通过Linux父子进程管道通信的方式保证进行AOF异步重写时还能实时接收用户处理的指令生成的AOF字符串&#xff0c;从而保证尽可能的可靠性…

window 安装 openssl

文章目录 前言window 安装 openssl1. 下载2. 安装3. 配置环境变量4. 测试 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会太差&#xff0c;实在白嫖的话…

LVS集群及其它的NAT模式

1.lvs集群作用&#xff1a;是linux的内核层面实现负载均衡的软件&#xff1b;将多个后端服务器组成一个高可用、高性能的服务器的集群&#xff0c;通过负载均衡的算法将客户端的请求分发到后端的服务器上&#xff0c;通过这种方式实现高可用和负载均衡。 2.集群和分布式&#…

Mattermost:一个强大的开源协作平台

Mattermost是一个强大的开源协作平台&#xff0c;基于云原生架构&#xff0c;为企业级用户提供安全、可扩展且自托管的消息传递解决方案。 一、平台特点 开源与定制性&#xff1a;Mattermost是一个开源项目&#xff0c;用户可以根据自身需求定制界面、添加功能或扩展其功能&am…

百川工作手机实现销售管理微信监控系统

在瞬息万变的商业战场中&#xff0c;每一分效率的提升都是企业制胜的关键。传统销售管理模式已难以满足现代企业对精准、高效、合规的迫切需求。今天&#xff0c;让我们一同探索如何利用工作手机这一创新工具&#xff0c;为您的销售团队装上智能翅膀&#xff0c;开启销售管理的…

计算云服务3

第三章 镜像服务 什么是镜像服务(IMS) 镜像服务(lmage ManagementService&#xff0c;IMS)提供镜像的生命周期管理能力。用户可以灵活地使用公共镜像、私有镜像或共享镜像申请弹性云服务器和裸金属服务器。同时&#xff0c;用户还能通过已有的云服务器或使用外部镜像文件创建…

【C++报错已解决】Invalid Use of ‘void’ Expression

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 引言一、问题描述1.1 报错示例1.2 报错分析1.3 解决思路 二、解决方法2.1 方法一&#xff1a;调整函数返回类型方法二…

ArcGIS的智慧与情怀

初识ArcGIS 在这个信息化的时代&#xff0c;ArcGIS如同一位智者&#xff0c;静静地伫立在地理信息系统的巅峰。初识它时&#xff0c;我仿佛走进了一片未知的领域&#xff0c;心中充满了好奇与期待。ArcGIS&#xff0c;这款专业的地理信息系统软件&#xff0c;凭借其强大的功能…

【Mutilism NPN三极管驱动P-MOS】2022-4-2

缘由NPN三极管驱动P-MOS异常导通-硬件开发-CSDN问答 有电感性负载应该接反向吸收泻放二极管才能保证安全&#xff1b; 同时建议修改电路使得工作更安全可靠&#xff0c;取消下拉电阻R2&#xff0c;R3用小于100欧姆左右串联一个发光管&#xff0c;这样既可可靠工作也能观察IO输出…