Spring Boot 整合 Kafka 详解

前言:

上一篇分享了 Kafka 的一些基本概念及应用场景,本篇我们来分享一下在 Spring Boot 项目中如何使用 Kafka。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 集成 Kafka

引入 Kafka 依赖

在项目的 pom.xml 文件中引入 Kafka 依赖,如下:

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
	<version>3.1.6</version>
</dependency>

添加 Kafka 配置

在 application.properties 文件中配置 Kafka 相关配置,如下:

#kafka server 地址
spring.kafka.bootstrap-servers=10.xxx.4.xxx:9092,10.xxx.4.xxx::9092,10.xxx.4.xxx::9092
spring.kafka.producer.acks = 1
spring.kafka.producer.retries = 0
spring.kafka.producer.batch-size = 30720000
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
#消费者配置
spring.kafka.consumer.group-id = test-kafka
#是否开启手动提交 默认自动提交
spring.kafka.consumer.enable-auto-commit = true
#如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔
spring.kafka.consumer.auto-commit-interval = 5000
#earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
#kafka 没有创建指定的 topic 下  项目启动是否报错 true  false
spring.kafka.listener.missing-topics-fatal = false
#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
spring.kafka.listener.type = single 
#一次调用 poll() 操作时返回的最大记录数 默认为 500 条
spring.kafka.consumer.max-poll-records = 2
#消息 ACK 模式 有7种
spring.kafka.listener.ack-mode = manual_immediate
#kafka session timeout
spring.kafka.consumer.session.timeout.ms = 300000

配置 Kafka Producer

我们创建一个配置类,并配置生产者工厂,配置 KafkaTemplate。

package com.order.service.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @author :author
 * @description:
 * @modified By:
 * @version: V1.0
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean("myProducerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(4);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> newProducerFactory() {
        return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(newProducerFactory());
    }



}

配置 Kafka Cousumer

我们创建一个配置类,配置消费者工厂和监听容器。

package com.order.service.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @author :author
 * @description:
 * @modified By:
 * @version: V1.0
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String offsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitIntervalMs;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean("myConsumerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(12);
        //是否自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //kafak 服务器
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        //消费组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //一次调用poll()操作时返回的最大记录数,默认值为500
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //自动提交时间间隔 默认 5秒
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        //props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        return props;
    }


    /**
     * @return org.springframework.kafka.config.KafkaListenerContainerFactory<org.springframework.kafka.listener.ConcurrentMessageListenerContainer < java.lang.String, java.lang.String>>
     * @date 2024/10/22 19:41
     * @description kafka 消费者工厂
     */
    @Bean("myContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));
        // 并发创建的消费者数量
        factory.setConcurrency(3);
        // 开启批处理
        factory.setBatchListener(true);
        //拉取超时时间
        factory.getContainerProperties().setPollTimeout(1500);
        //是否自动提交 ACK kafka 默认是自动提交
        if (!enableAutoCommit) {
            //共有其中方式
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
        }
        return factory;
    }


}

Kafka 消息发送

创建一个 Kafka 的 Producer,注入 KafkaTemplate,完成消息发送。

package com.order.service.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * @ClassName: KafkaProducer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description:
 */
@Slf4j
@Component
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage( String message) {
        this.kafkaTemplate.send("my-topic", message);
    }

}

Kafka 消息消费

创建一个 Kafka 的 Consumer,使用 @KafkaListener 注解完成消息消费。

package com.order.service.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName: KafkaConsumer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description:
 */
@Slf4j
@Component
public class MyKafkaConsumer {

    @KafkaListener(id = "my-kafka-consumer",
            idIsGroup = false, topics = "my-topic",
            containerFactory = "myContainerFactory")
    public void listen(String message) {
        log.info("消息消费成功,消息内容:{}", message);
    }

}

Kafka 消息发送消费测试

触发消息发送后,得到如下结果:

2024-10-22 20:22:43.041  INFO 36496 --- [-consumer-0-C-1] c.o.s.kafka.consumer.MyKafkaConsumer     : 消息消费成功,消息内容:第一条 kafka 消息

结果符合预期。

以上我们简单的分享了使用 Spring Boot 集成 Kafka 的过程,希望帮助到有需要的朋友。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/905080.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ai画质修复工具有哪些?这4款AI照片修复神器建议收藏!

在当今这个科技迅猛发展的时代&#xff0c;人工智能&#xff08;AI&#xff09;正以前所未有的速度重塑我们的日常生活&#xff0c;而照片修复领域正是AI技术大放异彩的舞台。从年代久远、泛黄的老照片到追求极致细节的现代摄影佳作&#xff0c;AI以其非凡的能力&#xff0c;成…

MES管理系统在工艺管理中具备哪些作用

在现代制造业的洪流中&#xff0c;MES管理系统正逐步成为工艺管理领域的一股强大力量&#xff0c;它不仅革新了传统的管理方式&#xff0c;还为企业带来了前所未有的效率提升与成本控制优势。尽管许多企业尚未全面拥抱这一数字化变革&#xff0c;但MES管理系统在工艺管理中的潜…

IM_自定义audio播放消息

做即时通讯&#xff0c;除了文字、图片、表情、还有媒体消息&#xff0c;整理一下制作过程中自定义聊天框中的audio 效果图 tsx完整代码 AzEventBus 是解决点击多个语音播放时候&#xff0c;保证只有一个在播放;没什么特别的&#xff0c;就是自己简单封装了个EvenBusAzEventBus…

tcp shutdown, fin_wait1, fin_wait2, close_wait, last_ack, 谢特!

TCP 作为双向传输协议&#xff0c;如果你想只收不发&#xff0c;可以单向关掉发&#xff0c;shutdown(socket.SHUT_WR)&#xff0c;但不建议这么做。 看以下代码&#xff1a; #!/Users/zhaoya/myenv/bin/python3 # client import socketclient_socket socket.socket(socket.…

怎么知道社媒上用户在讨论品牌什么?评价如何?

现在社交媒体不再仅是人们闲聊和分享生活片段的地方&#xff0c;更是品牌了解市场趋势和消费者需求的重要渠道。所以做号社媒上用户声音的聆听&#xff0c;企业更能抓住客户需求、抢占潜力市场&#xff0c;进一步占据更多市场份额&#xff0c;获得精准客户。 做好用户声音聆听…

【QT】Qt窗口(上)

个人主页~ Qt窗口 一、菜单栏二、工具栏三、状态栏四、浮动窗口五、对话框1、简介&#xff08;1&#xff09;模态对话框&#xff08;2&#xff09;非模态对话框&#xff08;3&#xff09;混合属性对话框 Qt窗口是通过QMainWindow类来实现的&#xff0c;我们之前的学习是通过QWi…

第二十章 Vue组件通信之父子通信

目录 一、引言 二、组件关系分类 三、组件通信的解决方案 3.1. 父子通信流程图 3.2. 父组件通过 props 将数据传递给子组件 3.2.1. 代码App.vue 3.2.2. 代码MySon.vue 3.3. 子组件利用 $emit 通知父组件修改更新 ​编辑3.3.1. 代码App.vue 3.3.2. 代码MySon.vue 3…

用ChatGPT提升工作效率:从理论到实际应用

伴人工智能技术的迅速演进&#xff0c;像ChatGPT这类语言模型已成为提升工作效率的关键工具。这类模型不仅具备处理海量数据的能力&#xff0c;还能自动化许多日常任务&#xff0c;从而提高决策的准确性。本文将深入探讨如何在工作中利用ChatGPT等AI工具提升效率&#xff0c;涵…

golang 服务注册与服务发现框架 入门与实践

Go语言中服务注册与发现的应用 在Go微服务架构中&#xff0c;服务注册与服务发现是实现服务间通信和解耦的关键。随着服务数量的增长&#xff0c;手动管理服务之间的依赖关系变得异常复杂且容易出错。因此&#xff0c;自动化服务注册与发现机制变得尤为重要。当一个Go微服务启…

1.STM32之定时器TIM---第一部分(基本定时器)(功能最强大结构最复杂的一个外设)(实验基本定时功能)-----定时器定时中断(利用内部时钟72M)

定时器TIM是STM32外设中功能最强大结构最复杂的一个外设&#xff01;Whappy STM32F103C8T6总共由一个高级定时器3个通用定时器 #include "stm32f10x.h" // Device header #include "Delay.h" #include "OLED.h" #include &quo…

无人机救援系统基本组成

无人机救援系统基本组成 1. 源由2. 组成2.1 无人机载具2.1.1 多旋翼2.1.2 垂起固定翼2.1.3 智能避障2.1.4 物资投递 2.2 智能吊舱2.2.1 云台2.2.2 高清摄像2.2.3 红外热成像2.2.4 激光测距2.2.5 目标跟踪 2.3 通讯链路2.3.1 超长距离通信2.3.2 长距离通信2.3.3 中等距离通信 2.…

CSS 复习

复杂选择器可以通过&#xff08;id的个数&#xff0c;class的个数&#xff0c;标签的个数&#xff09;的形式&#xff0c;计算权重。 如果我们需要将某个选择器的某条属性提升权重&#xff0c;可以在属性后面写!important&#xff1b;注意!importent要写在;前面 很多公司不允许…

uniapp一键打包

1.先安装python环境&#xff0c; 2.复制这几个文件到uniapp项目里面 3.修改自己证书路径&#xff0c;配置文件路径什么的 4.在文件夹页面双击buildController.py或者cmd直接输入buildController.py 5.python报错&#xff0c;哪个依赖缺少安装哪个依赖 6.执行不动的话&…

SINAMICS V90 在汽车行业中的应用-天拓四方

随着生活水平的提高&#xff0c;平均每家每户都配有一辆代步用的小汽车&#xff0c;汽车行业也正处于蓬勃的发展中&#xff0c;尤其是新能源汽车&#xff0c;带来了新一轮的汽车生产热潮。生产一辆汽车&#xff0c;从零配件的加工&#xff0c;到整车的组装&#xff0c;基本已经…

C++算法第五天

本篇文章继续和大家一起刷算法题 第一题 题目链接 . - 力扣&#xff08;LeetCode&#xff09; 题目解析 题目要求&#xff1a; 这是一个连续的子数组 计算子数组内元素的和&#xff0c;若数组内元素的和符合 > target的值并且该子数组的长度是最短的&#xff0c;则返回…

【电机控制器】以STC8H1K系列举例——持续更新

【电机控制器】以STC8H1K08 举例——持续更新 文章目录 [TOC](文章目录) 前言一、代填二、参考资料总结 前言 使用工具&#xff1a; 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、代填 二、参考资料 STC8H1K系列数据手册 梁工——BLDC, 三相无…

如何快速给word文件里的文字加拼音?请看详细步骤

怎么快速给word文件里的文字加拼音&#xff1f;在日常的文字处理工作中&#xff0c;很多人可能会遇到一个问题&#xff1a;如何在Word文档中为文字添加注音。尤其是对于一些需要帮助读音的文本&#xff0c;比如中文学习材料、教材或儿童读物&#xff0c;注音可以帮助读者更好地…

AI跟踪报道第62期-本周AI新闻: 微软推出Copilot的AI Agent和Computer Control

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

C++学习:类和对象(一)

一、面向过程与面向对象编程 1. 什么是面向过程编程&#xff1f; 面向过程编程&#xff08;Procedural Programming&#xff09;是一种以过程&#xff08;或函数&#xff09;为中心的编程范式。程序被视为一系列按顺序执行的步骤&#xff0c;主要通过函数对数据进行操作 特点…

mac|maven项目在idea中连接redis

安装maven brew install maven idea-setting导入redis插件 idea新建maven项目 构建系统选择maven 项目右侧数据库图标导入redis 新建一个数据库&#xff0c;名称必须为数字&#xff0c;测试一下是否可以连接&#xff0c;连接成功后选择确定 pom.xml导入redis <depende…