Kafka java 配置

前言:
        大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。

介绍:

        我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。

给大家解释配置含义。

1.Kafka配置代码

public KafkaConsumer<String, String> getCustomer() {
    // 1. 配置属性参数
    Properties properties = new Properties();

    // 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    // 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    // 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
    // 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    // 设置消费者是否自动提交offset,true表示自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // 设置自动提交offset的时间间隔(单位:毫秒)
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    // 设置每次poll操作返回的最大记录数
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);

    // 根据配置属性创建Kafka消费者实例
    return new KafkaConsumer<>(properties);
}

2.Kafka消费者代码

@Test
void KafkaConsumerTest() {
    // 创建Kafka消费者实例,通过getCustomer()方法获取
    KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();

    // 订阅要消费的主题,这里是 "test-topic"
    consumer.subscribe(Collections.singletonList("test-topic"));

    // 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息的逻辑
        // 打印消息的offset、key和value
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

        //以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。
        boolean flag = true;
        if (flag){
            // 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息
            // 如果需要手动提交offset,可以取消注释下面的代码
            // consumer.commitAsync();
            // 由于flag为true,这里会跳出循环,不再处理后续的消息
            break;
        }
    }
    // 关闭消费者,释放资源
    consumer.close();
    // 打印结束消费的日志
    System.out.println("结束消费");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/910932.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

解决:ros进行gazebo仿真,rviz没有显示传感器数据

目录 前言解决总结 前言 看了很多urdf、xacro文件的编写&#xff0c;每次看了都觉得自己会了&#xff0c;然后自己写一点&#xff0c;就是废物了。 在我这里的案例是&#xff0c;我在一个大方块上面&#xff0c;添加了两个VLP-16的雷达&#xff0c;然后我想获取雷达扫描的数据…

C语言 | Leetcode C语言题解之第546题移除盒子

题目&#xff1a; 题解&#xff1a; int dp[100][100][100];int calculatePoints(int* boxes, int l, int r, int k) {if (l > r) {return 0;}if (dp[l][r][k] 0) {int r1 r, k1 k;while (r1 > l && boxes[r1] boxes[r1 - 1]) {r1--;k1;}dp[l][r][k] calcu…

基于开源 AI 智能名片、S2B2C 商城小程序的用户获取成本优化分析

摘要&#xff1a;本文围绕用户获取成本&#xff08;CAC&#xff09;这一关键指标展开深入剖析&#xff0c;详细阐述其计算方式&#xff0c;并紧密结合开源 AI 智能名片与 S2B2C 商城小程序的独特性质&#xff0c;从多个维度探讨如何通过挖掘新的获客渠道、巧妙运用私域流量池等…

实践出真知:MVEL表达式中for循环的坑

目录标题 背景MVEL脚本(有问题的)MVEL脚本(正确的)结论分析 背景 需要从一个URL的拼接参数中解析出id的值并输出 比如&#xff1a; 存在URLhttps://xxxxxxxxxx?id999999&type123&name345 然后需要输出id999999 MVEL脚本(有问题的) 入参&#xff1a;parseThisUrlhttp…

【C#】使用.net9在C#中向现有对象动态添加属性

在 C# 中向现有对象动态添加属性并不像在 Python 或 JavaScript 中那样容易&#xff0c;因为 C# 是一种强类型语言。 但是&#xff0c;我们可以通过使用一些技术和库来实现这一点&#xff0c;例如扩展方法、字典等。本文将详细介绍如何在 C# 中实现这一点。ExpandoObject 方法 …

工作流初始错误 泛微提交流程提示_泛微协同办公平台E-cology8.0版本后台维护手册(11)–系统参数设置

工作流初始错误 泛微提交流程提示_泛微协同办公平台E-cology8.0版本后台维护手册(11)–系统参数设置...-CSDN博客 工作流初始错误 泛微提交流程提示_泛微OA 工作流WebService接口使用说明 工作流初始错误 泛微提交流程提示_泛微OA 工作流WebService接口使用说明-CSDN博客 工作…

如何有效销售和应用低代码软件?探索其市场机会与策略

随着技术的进步&#xff0c;企业对自动化和数字化的需求日益增加。低代码开发平台应运而生&#xff0c;成为企业实现快速应用程序开发的重要工具。然而&#xff0c;在市场上推广和应用低代码软件并非易事&#xff0c;需要深入了解客户需求&#xff0c;提供定制化的解决方案&…

【css】overflow: hidden效果

1. 不添加overflow: hidden 1.1 效果 上面无圆角 1.2 代码 <template><view class"parent"><view class"child1">child1</view><view class"child2">child2</view></view></template><…

VB6.0桌面小程序(桌面音乐播放器)

干货源码 Imports System.IO Imports System.Security.Cryptography Public Class Form1 Private Sub Form1_Load(sender As Object, e As EventArgs) Handles MyBase.Load Button1.Text “上一曲” Button4.Text “播放” Button3.Text “下一曲” Button2.Text “顺序播…

Android MVVM demo(使用DataBinding,LiveData,Fresco,RecyclerView,Room,ViewModel 完成)

使用DataBinding&#xff0c;LiveData&#xff0c;Fresco&#xff0c;RecyclerView&#xff0c;Room&#xff0c;ViewModel 完成 玩Android 开放API-玩Android - wanandroid.com 接口使用的是下面的两个&#xff1a; https://www.wanandroid.com/banner/jsonhttps://www.wan…

协程4 --- 一个特殊的栈溢出例子

文章目录 代码运行结果分析 代码 先看下面这个程序流程&#xff1a; 有个长度位24的字符数组buffer&#xff0c;前面16个字符初始化。 把attack函数的地址复制到后面8个字符&#xff08;编译成64位程序&#xff0c;指针大小为8Byte&#xff09;。 打印信息&#xff1a;do Some…

day01 - web开发简介

本课程涉及到的技术&#xff1a; Vue ElementUI/Html Js SpringBoot–Spring SpringMvc MyBatis(Plus) SSM Axios 学习路径&#xff1a; 前端主要&#xff1a; Html5css3JavaScript(JQuery)–>Vue(Node.js也可以学习一 下&#xff0c;服务端js)ElementUi(uni-app) 后端主要…

简单又便宜的实现电脑远程开机唤醒方法

现有的远程开机方案 1&#xff09;使用向日葵开机棒 缺点是比较贵一点&#xff0c;开机棒要一百多&#xff0c;而且查了评论发现挺多差评说不稳定&#xff0c;会有断联和无法唤醒的情况&#xff0c;而且设置也麻烦&#xff0c;还需要网卡支持WOL 2&#xff09;使用远程开机卡 …

自攻螺钉的世纪演变:探索关键设计与应用

自攻螺钉作为现代工业和建筑中的不可或缺的标准部件&#xff0c;经过了超过100年的发展和创新。从1914年最早的铁螺钉设计到今天的自钻自攻螺钉&#xff0c;自攻螺钉的设计不断优化&#xff0c;以适应更复杂的应用需求。本文将回顾自攻螺钉的演变历程&#xff0c;分析其设计原理…

[复健计划][紫书]Chapter 7 暴力求解法

7.1 简单枚举 例7-1 Division uva725 输入正整数n&#xff0c;按从小到大的顺序输出所有形如abcde/fghij n的表达式&#xff0c;其中a&#xff5e;j恰好为数字0&#xff5e;9的一个排列&#xff08;可以有前导0&#xff09;&#xff0c;2≤n≤79。枚举fghij&#xff0c;验证a…

OpenBayes 一周速览丨Ministral-8B革新侧端AI新体验!PsyDTCorpus心理咨询数据集上线,含5k个数字孪生对话数据

公共资源速递 This Weekly Snapshots &#xff01; 5 个数据集&#xff1a; * Labelme 图像标注数据集 * TIMIT 英语方言录音数据集 * Food-101 食品图像数据集 * SVHN 真实世界图像数据集 * PsyDTCorpus 心理咨询师数字孪生数据集 1 个模型&#xff1a; * Allegro 3…

如何修改网络ip地址:一步步指南‌

在当今这个数字化时代&#xff0c;网络已成为我们日常生活与工作中不可或缺的一部分。无论是浏览网页、在线办公还是享受流媒体服务&#xff0c;稳定的网络连接和适当的IP地址管理都是确保良好体验的关键。然而&#xff0c;出于隐私保护、绕过地理限制或测试网络环境等需要&…

论 ONLYOFFICE:开源办公套件的深度探索

公主请阅 引言第一部分&#xff1a;ONLYOFFICE 的历史背景1.1 开源软件的崛起1.2 ONLYOFFICE 的发展历程 第二部分&#xff1a;ONLYOFFICE 的核心功能2.1 文档处理2.2 电子表格2.3 演示文稿 第三部分&#xff1a;技术架构与兼容性3.1 技术架构3.2 兼容性 第四部分&#xff1a;部…

如何将现有VUE项目所有包更新到最新稳定版

更新有风险,Enter要谨慎!!! 要将项目中的所有 npm 包更新到最新稳定版&#xff0c;可以使用 npm-check-updates 工具。以下是具体步骤&#xff1a; 步骤一&#xff1a;安装 npm-check-updates 首先&#xff0c;全局安装 npm-check-updates 工具&#xff1a; npm install -g…

高德 阿里231滑块 分析

声明: 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff01; 有相关问题请第一时间头像私信联系我删…