Kafka入门二——SpringBoot连接Kafka示例

实现

1.引入maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.9</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>kafka-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-demo</name>
    <description>kafka-demo</description>
    <properties>
        <java.version>8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <image>
                        <builder>paketobuildpacks/builder-jammy-base:latest</builder>
                    </image>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2.修改application.properties配置

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.bootstrap-servers=10.20.37.85:9092
spring.kafka.consumer.group-id=demo

通过spring.kafka.consumer.auto-offset-reset配置可以指定Kafka消费者的行为,当遇到没有初始偏移量或当前偏移量不再服务器上存在的情况时的处理策略。这个配置有以下三个可选值:

  • earliest:当消费者组中没有存储的偏移量,或者偏移量超出了可用范围时,从最早的可用消息开始消费。
  • latest(默认):当消费者组中没有存储的偏移量,或者偏移量超出了可用范围时,从最新的可用消息开始消费。
  • none:当服务器上找不到消费者的偏移量时,抛出异常。

通过spring.kafka.bootstrap-servers配置可以指定Kafka的地址。这个配置的值应该是一个或多个Kafka服务器的地址,用逗号分隔。

通过spring.kafka.consumer.group-id配置可以指定Kafka消费者的组ID。这个配置的值应该是一个字符串,用于标识消费者所属的组。

定义Configuer引入kafka Bean

@Component
public class MyConfiguer {
   
    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }
}

使用了@Bean注解,表示该方法将返回一个Spring管理的bean对象。在这个方法中,通过调用TopicBuilder类的name方法指定了主题的名称为"topic1",然后使用partitions方法设置了主题的分区数为10,使用replicas方法设置了主题的副本数为1。最后,调用build方法构建并返回了一个NewTopic对象。

定义生产者

@Component
public class KafkaProducer {

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");
        };
    }
}

ApplicationRunner接口是Spring Boot提供的一个回调接口,用于在应用程序启动后执行一些操作。这段代码的作用是在应用程序启动后自动发送一条消息到Kafka的"topic1"主题,消息内容为"test"。

定义消费者

@Component
public class KafkaConsumer {
    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }
}

@KafkaListener注解用于标记方法作为Kafka消息的消费者。这段代码的作用是创建一个Kafka消费者,监听名为"topic1"的主题,并将接收到的消息内容打印到控制台。

总结

生产者

Kafka生产者发送消息的流程如下:

  • 创建ProducerRecord:首先,生产者需要创建一个ProducerRecord对象,该对象包含目标主题和要发送的内容。如果需要,还可以指定消息的键(key)或分区(partition)。

  • 序列化:生产者将ProducerRecord对象中的键和值进行序列化,转换为字节数组,以便能够在网络上传输。

  • 拦截器链:消息会通过一个由一个或多个拦截器组成的链。这些拦截器可以对消息进行预处理,例如添加头信息或实施自定义的逻辑。

  • 元数据加载:生产者在发送消息之前,需要加载Kafka集群的元数据,以确定各个主题的分区和副本分布情况。

  • 计算分区号:根据ProducerRecord中的键和分区策略(如果有的话),生产者会计算出消息应该发送到哪个分区。

  • 累加器缓存:消息被缓存到RecordAccumulator累加器中,这是一个按照批次处理消息的组件。

  • Sender线程发送:Sender线程负责将符合条件的消息批次发送给Kafka broker。除了发送消息外,Sender线程也负责更新元数据。

  • 等待确认:生产者可以选择等待broker的确认,以确保消息已经被成功接收并写入日志。

  • 处理响应:生产者处理来自broker的响应,包括错误处理和重试逻辑。

  • 完成发送:一旦消息被成功发送并且确认,生产者会继续发送下一批消息或者结束发送过程。
    在这里插入图片描述

消费者

Kafka消费者接收消息的流程涉及到多个关键步骤,确保了消息能够从broker高效地传送到消费者手中。以下是该过程的详细描述:

  • 创建消费者实例:需要创建一个Kafka消费者实例,这通常涉及到指定一些关键参数,如Kafka集群的地址、消费者组ID、反序列化器等。

  • 订阅主题:消费者需要订阅一个或多个感兴趣的主题。这意味着消费者希望从这些主题接收消息。

  • 协调消费者组:在ConsumerCoordinator的poll方法中,会有一个ensureActiveGroup的过程,消费者通过这个过程向Coordinator发送加入消费者组的请求,并等待Coordinator的响应。

  • 拉取消息:一旦加入了消费者组,消费者开始从订阅的主题中拉取(pull)消息。Kafka采用pull模式而不是push模式,这意味着消费者需要主动去服务器拉取消息。

  • 处理消息:消费者获取消息后,会进行相应的处理。这个处理过程可能包括数据转换、存储或者其他业务逻辑。

  • 提交消费位移:消费者在处理完消息后,需要提交消费位移。这是告诉Kafka它已经消费到了哪个位置,以便下次从正确的位置开始消费。位移的提交是由消费者自己管理的,Kafka提供了接口来更新这些位移。

  • 控制消费速率:消费者可以通过各种方式控制消息的消费速率,例如通过限制拉取操作的频率或者调整消费者的线程数。

  • 错误处理和重试:在消费过程中可能会遇到错误,比如网络问题或者消息格式错误。消费者需要有相应的错误处理机制来处理这些情况,并在必要时进行重试。

  • 关闭消费者:完成消息消费后,应当关闭消费者实例,释放资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/413400.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

TensorRT及CUDA自学笔记003 NVCC及其命令行参数

TensorRT及CUDA自学笔记003 NVCC及其命令行参数 各位大佬&#xff0c;这是我的自学笔记&#xff0c;如有错误请指正&#xff0c;也欢迎在评论区学习交流&#xff0c;谢谢&#xff01; NVCC是一种编译器&#xff0c;基于一些命令行参数可以将使用PTX或C语言编写的代码编译成可…

Angular升级后运行编译变慢?如何解决?

公司的Angular项目升级后&#xff0c;使用体验感十分不好&#xff0c;运行、编译的时间明显增长&#xff0c;工作效率是十分低下&#xff0c;但奈何公司的大佬们都有自己的事情要忙&#xff0c;结识的大佬也不够多&#xff0c;只能靠自己找度娘了。但是&#xff0c;哎&#xff…

日更【系统架构设计师知识总结2】指令系统(结合真题)

【原创精华】结合老师的讲授、耗费三个小时的精华总结对正在备考的你一定有用&#xff01;&#xff01;自己一点点手打、总结的脑图&#xff0c;把散落在课本以及老师讲授的知识点合并汇总&#xff0c;反复提炼语言&#xff0c;形成知识框架。希望能给同样在学习的伙伴一点帮助…

Python接口自动化之unittest单元测试!

以下主要介绍unittest特性、运行流程及实际案例。 一、单元测试三连问 1、什么是单元测试&#xff1f; 按照阶段来分&#xff0c;一般就是单元测试&#xff0c;集成测试&#xff0c;系统测试&#xff0c;验收测试。单元测试是对单个模块、单个类或者单个函数进行测试。 将访…

【C++进阶】深入了解继承机制

目录 前言 1. 继承的概念 2. 继承的定义 3. 继承中基类与派生类的赋值转换 4. 继承中的作用域 5. 派生类的默认成员函数 6. 继承与友元、静态成员 7. 多继承与菱形继承 7.1 如何解决 前言 继承是面向对象编程中的一个重要概念&#xff0c;也是面向对象编程语言中普遍存在的特…

项目登录方案选型

一.Cookie + Session 登录 大家都知道,HTTP 是一种无状态的协议。无状态是指协议对于事务处理没有记忆能力,服务器不知道客户端是什么状态。即我们给服务器发送 HTTP 请求之后,服务器根据请求返回数据,但不会记录任何信息。为了解决 HTTP 无状态的问题,出现了 Cookie。Co…

计网Lesson15 - TCP可靠传输

文章目录 1. 停止等待ARQ协议2. 连续ARQ协议与滑动窗口协议 1. 停止等待ARQ协议 ARQ&#xff08;Automatic Repeat–reQuest&#xff09;自动重传请求 几种重传情况 发送端丢失 发送方过久没有接收到接收方的确认报&#xff0c;这种情况会触发超时重传机制&#xff0c;发送方…

【Godot4.2】菜单栏生成函数库menuDB

概述 关于Godot的手动菜单栏制作&#xff0c;我已经在之前的文章中有所描述。 但是对于一些场景&#xff0c;手动制作菜单仍然是一个比较低效的做法。所以我将MenuBar以及基于HBoxContainerMenuButton创建菜单栏写成了一个静态函数库。 利用此函数库我们可以用函数形式构造P…

【Oracle】玩转Oracle数据库(六):模式对象管理与安全管理

前言 嘿&#xff0c;数据库大冒险家们&#xff01;准备好迎接数据库管理的新挑战了吗&#xff1f;今天我们要探索的是Oracle数据库中的模式对象管理与安全管理&#xff01;&#x1f6e1;️&#x1f4bb; 在这篇博文【Oracle】玩转Oracle数据库&#xff08;六&#xff09;&#…

CTFHub技能树web之XSS

在XSS系列的题目中&#xff0c;由于需要使用能够接受XSS数据的平台&#xff0c;并且由于使用的是CTFHub的模拟机器人点击我们的虚假URL&#xff0c;因此使用的XSS平台不能是自己本地搭建的&#xff0c;如果是本地的模拟点击的机器人将无法访问我们给的这个URL地址&#xff0c;也…

Maven【1】(命令行操作)

文章目录 一丶创建maven工程二、理解pom.xml三、maven的构建命令1.编译操作2.清理操作3.测试操作4.打包操作5.安装操作 一丶创建maven工程 首先创建这样一个目录&#xff0c;然后从命令行里进入这个目录&#xff1a; 然后接下来就在这个命令行里进行操作了。 这个命令是&…

听劝!年后跳槽需谨慎……

​新年新气象&#xff0c;许多不满需求的开发者都想开展一番新的事业。跳槽找工作是要吃老本行&#xff1f;还是换岗&#xff1f;请三思啊&#xff01;&#xff01; 2024年的移动开发行业岗位还友好吗&#xff1f; 随着互联网的时间发展推移&#xff0c;大部分开发岗已经走向末…

Spring Cloud Gateway官方文档学习

文章目录 推荐写在前面一、熟悉Gateway基本概念与原理1、三大概念2、工作流程 二、基本使用路由断言的两种写法 三、路由断言工厂1、After路由断言工厂2、Before路由断言工厂3、Between路由断言工厂4、Cookie路由断言工厂5、Header路由断言工厂6、Host路由断言工厂7、Method路由…

中间件-Nginx漏洞整改(限制IP访问隐藏nginx版本信息)

中间件-Nginx漏洞整改&#xff08;限制IP访问&隐藏nginx版本信息&#xff09; 一、限制IP访问1.1 配置Nginx的ACL1.2 重载Nginx配置1.3 验证结果 二、隐藏nginx版本信息2.1 打开Nginx配置文件2.2 隐藏Nginx版本信息2.3 保存并重新加载Nginx配置2.4 验证结果2.5 验证隐藏版本…

LabVIEW光伏逆变器低电压穿越能力测试

LabVIEW光伏逆变器低电压穿越能力测试 随着光伏发电技术的迅速发展&#xff0c;光伏逆变器的低电压穿越&#xff08;LVRT&#xff09;能力日益成为影响电网稳定性的关键因素。为了提升光伏逆变器的并网性能&#xff0c;开发了一套基于LabVIEW的光伏逆变器LVRT测试系统。该系统…

【VSCode】SSH Remote 通过跳板机连开发机提示“bash行1 powershell未找到命令”

需求背景 因为需要&#xff0c;在家我需要挂上公司VPN然后SSH连到跳板机&#xff0c;然后再从跳板机SSH进开发机。 问题背景 跳板机进开发机输入完密码显示 bash行1 powershell未找到命令VSCode SSH Remote跳板机配置请自行搜素其他文章config配置 注意其中ssh.exe地址请根据…

用39块钱的全志V851se视觉开发板做了个小相机,还可以物品识别、自动追焦!

用39块钱的V851se视觉开发板做了个小相机。 可以进行物品识别、自动追焦&#xff01; 这个超低成本的小相机是在V851se上移植使用全志在线开源版本的Tina Linux与OpenCV框架开启摄像头拍照捕获视频&#xff0c;并结合NPU实现Mobilenet v2目标分类识别以及运动追踪等功能…并最终…

k8s节点负载使用情况分析命令kubectl describe node [node-name]

1.到任意安装了kubectl节点命令的节点上执行kubectl describe node [node-name] 上面的Requests最小分配 Limits最大分配是所有pod之和&#xff0c;最小分配之和不能超过服务器实际参数&#xff0c;否则新的pod会因为资源不够起不来&#xff0c;最大分配是预设之和&#xff0…

图片数据爬取工具Image-Downloader

图片数据爬取工具Image-Downloader_image downloader-CSDN博客文章浏览阅读1.2k次。既然我们使用 Image-Downloader 这个工具进行图片搜索&#xff0c;相比python我们都非常熟悉&#xff0c;在此不在叙述&#xff0c;可参考其他文章。_image downloaderhttps://blog.csdn.net/w…

有方机器人 STM32智能小车 项目学习笔记1

今天开始学习有方机器人--智能小车项目&#xff0c;正点原子部分的学习先放一放&#xff0c;还是小车更有吸引力哈哈。 新建工程及工程模板搭建 新建工程须知 目前常用的 STM32 的开发方式主要有基于寄存器编程、基于标准库函数编程、基于 HAL 库编程这三种。 寄存器版本--…