手拉手springboot整合kafka发送消息

环境介绍
技术栈springboot+mybatis-plus+mysql+rocketmq
软件版本
mysql8
IDEAIntelliJ IDEA 2022.2.1
JDK17
Spring Boot3.1.7
kafka2.13-3.7.0

创建topic时,若不指定topic的分区(Partition主题分区数)数量使,则默认为1个分区(partition)

springboot加入依赖kafka

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

application.yml配置连接kafka

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092

生产者

发送消息

@Resource
private KafkaTemplate<String,String> kafkaTemplate;

@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

消费者

接收消息

@Component
public class KafkaConsumer {

@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}

}

若没有配置groupid

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

@Component
public class KafkaConsumer {

@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}

}

想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

application.yml需要将auto.offset.reset设置为earliest

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092
consumer:
auto-offset-reset: earliest

Earliest:将偏移量重置为最早的偏移量

Latest: 将偏移量重置为最新的偏移量

None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

Exception: 向消费者抛出异常

重置消费者组偏移量

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

重置完成

Spring-kafka生产者发送消息

.send与sendDefault()方法都返回CompletableFuture<String<k,v>>;

CompletableFuture类用于异步编程,表示异步计算结果。该特征使得调用者不必等待操作完成就可以继续执行其他任务,从而提高引用的响应速度和吞吐量

@Resource
private KafkaTemplate<String,String> kafkaTemplate;

@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

发送Message

@Test
void kafkaSendMessageTest1(){
//通过构建器模式创建Message
Message<String> message = MessageBuilder.withPayload("hello kafka send message")
.setHeader(KafkaHeaders.TOPIC,"kafkamsg01")
.build();
kafkaTemplate.send(message);
}

SendProducerRecord

String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers

@Test
void kafkaSendProducerRecordTest1() {
//参数 String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers
Headers headers = new RecordHeaders();
headers.add("msg","123".getBytes(StandardCharsets.UTF_8));
ProducerRecord<String,String> record = new ProducerRecord(
"kafkaTopic01",
0,
System.currentTimeMillis(),
"key",
"hello kafka send message");
kafkaTemplate.send(record);
}

默认主题发送消息

yml配置默认主题

template:
default-topic: default-topic

@Test
void kafkaSendDefaultTest01(){
kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"key01","hello ");
}

发送Object消息

序列化默认为String

@Resource
private KafkaTemplate<String,Object> kafkaTemplate1;
@Test
void kafkaSendObject(){
MessageM messageM =MessageM.builder().userID(123).sn("xo1111").desc("测试").build();
//分区是null,kafka自行决定消息发送到哪个分区
kafkaTemplate1.sendDefault(null,System.currentTimeMillis(),"key01",messageM);
}

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/664878.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HTML静态网页成品作业(HTML+CSS)——企业装饰公司介绍网页(4个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有4个页面。 二、作品演示 三、代…

热电子光探测器的电磁场空间分布与FDTD材料折射率的导出

仿真实例 金属薄膜中金纳米孔阵列透射与反射&#xff0c; 并考虑其近场电磁分布 利用脚本进行电磁场及其光学响应的可视化 设置EOT型超表面结构&#xff0c;以及Structure library的使用 结构的参数化扫描与结果可视化 利用脚本计算峰值增强因子 多层平面结构激发T…

Spark 核心编程之 RDD 介绍

一、Spark 分布式计算模拟 Driver 端将数据拆分成 n 个 Task 发送给 Executor&#xff0c;n 为 Executor 个数&#xff0c;Task 包含数据和计算逻辑&#xff0c;Executor 接收到 Task 后进行计算并将计算后的结果返回给 Driver 定义封装整体数据和逻辑的资源类 class Resource …

高性能服务器网络模型详解

1999年Dan Kegel在发表的论文中提出了The C10K problem&#xff0c;这篇论文对传统服务器架构处理大规模并发连接时的挑战进行了详细描述&#xff0c;并提出了一些解决方案和优化技术。这里的C指的是Concurrent(并发)的缩写&#xff0c;C10K问题是指怎么在单台服务器上并发一万…

buidldroot musl uclib库 编译

buildroot 修改 编译工具链 原本编译器相关信息&#xff1a; Incorrect selection of the C library buidroot编译 注意相关选项&#xff0c;后续使用CUSTOM TOOLCHAIN 时对应 UCLIB 能将生成IMAGE 从2.9K变为2.3K MUSL 能将生成IMAGE 从2.9K变为2.7K 变大了 arm-linux-…

【运维项目经历|025】企业高效邮件系统部署与运维项目

目录 项目名称 项目背景 项目目标 项目成果 我的角色与职责 我主要完成的工作内容 本次项目涉及的技术 本次项目遇到的问题与解决方法 本次项目中可能被面试官问到的问题 问题1&#xff1a;项目周期为多长时间&#xff1f; 问题2&#xff1a;服务器部署架构方式及数量…

ubuntu 18.04 ros1学习

总结了一下&#xff0c;学习内容主要有&#xff1a; 1.ubuntu的基础命令 pwd: 获得当前路径 cd: 进入或者退出一个目录 ls:列举该文件夹下的所有文件名称 mv 移动一个文件到另一个目录中 cp 拷贝一个文件到另一个目录中 rm -r 删除文件 gedit sudo 给予管理员权限 sudo apt-…

uniapp实现图片上传——支持APP、微信小程序

uniapp实现图片、视频上传 文章目录 uniapp实现图片、视频上传效果图组件templatejs 使用 相关文档&#xff1a; 结合 uView 插件 uni.uploadFile 实现 u-upload uploadfile 效果图 组件 简单封装&#xff0c;还有很多属性…&#xff0c;自定义样式等…根据个人所需调整 te…

DNF手游攻略:勇士进阶指南!

在即将到来的6月5日&#xff0c;《DNF手游》将迎来一场盛大的更新&#xff0c;此次更新带来了大量新内容和玩法&#xff0c;极大丰富了游戏的体验。本文将为广大玩家详细解析此次更新的亮点&#xff0c;包括新增的组队挑战玩法“罗特斯入门团本”、新星使宠物的推出、宠物进化功…

ADB日常使用命令

【ADB全称 Android Debug Bridge】 是Android SDK中的一个命令行工具adb命令可以直接操作管理Android模拟器或真实的Android设备&#xff08;手机&#xff09; 建立PC和模拟器连接 # 建立连接 adb connect 127.0.1: 模拟器端口号〈逍遥模拟器21503〉 # 验证是否连接成功 adb d…

NFS p.1 服务器的部署以及客户端与服务端的远程挂载

目录 介绍 应用 NFS的工作原理 NFS的使用 步骤 1、两台机子 2、安装 3、配置文件 4、实验 服务端 准备 启动服务&#xff1a; 客户端 准备 步骤 介绍 NFS&#xff08;Network File System&#xff0c;网络文件系统&#xff09;是一种古老的用于在UNIX/Linux主…

使用 Apache Commons Exec 管理外部进程

&#x1f604; 19年之后由于某些原因断更了三年&#xff0c;23年重新扬帆起航&#xff0c;推出更多优质博文&#xff0c;希望大家多多支持&#xff5e; &#x1f337; 古之立大事者&#xff0c;不惟有超世之才&#xff0c;亦必有坚忍不拔之志 &#x1f390; 个人CSND主页——Mi…

基于 Apache Doris 的实时/离线一体化架构,赋能中国联通 5G 全连接工厂解决方案

作者&#xff1a;田向阳&#xff0c;联通西部创新研究院 大数据专家 共创&#xff1a;SelectDB 技术团队 导读&#xff1a; 数据是 5G 全连接工厂的核心要素&#xff0c;为支持全方位的数据收集、存储、分析等工作的高效进行&#xff0c;联通 5G 全连接工厂从典型的 Lambda 架…

使用PNP管控制MCU是否需要复位

这两台用到一款芯片带电池&#xff0c;希望电池还有电芯片在工作的时候插入电源不要给芯片复位&#xff0c;当电池没电&#xff0c;芯片不在工作的时候&#xff0c;插入电源给芯片复位所以使用一个PNP三极管&#xff0c;通过芯片IO控制是否打开复位&#xff0c;当芯片正常工作的…

在长窗口时代,RAG技术是否仍然必要?

自从谷歌推出 Gemini 1.5 Pro&#xff0c;行业内部对于 RAG 的讨论就不绝于耳。 Gemini 1.5 Pro 的性能确实令人瞩目。根据谷歌公布的技术文档&#xff0c;该系统能够稳定处理长达 100 token 的内容&#xff0c;相当于一小时的视频、十一小时的音频、超过三万行的代码或七十万…

Spring Cloud Alibaba-09-Seata分布式事务

Lison <dreamlison163.com>, v1.0.0, 2024.5.03 Spring Cloud Alibaba-09-Seata分布式事务 文章目录 Spring Cloud Alibaba-09-Seata分布式事务分布式事务基础事务本地事务分布式事务分布式事务的场景 分布式事务的解决方案全局事务可靠消息服务最大努力通知TCC事务 Se…

Java实现数据结构---数组

文章目录 概念存储原理数组的操作完整代码 概念 数组是&#xff08;Array&#xff09;是有限个相同类型的变量所组成的有序集合&#xff0c;数组中的每一个变量为称为元素。数组是最简单、最常用的数据结构。 数组下标从零开始。 存储原理 数组用一组连续的内存空间来存储一…

蓝桥杯第17135题 不完整的算式 C++ Java Python

目录 题目 思路和解题方法 步骤 1&#xff1a;识别缺失的部分 步骤 2&#xff1a;根据已知条件计算或推断 步骤 3&#xff1a;处理特殊情况和验证 c 代码 Java 版本 Python 版本&#xff08;仅供参考&#xff09; 代码和解题细节&#xff1a; 题目 题目链接&#xff…

STM32自己从零开始实操03:输出部分原理图

一、继电器电路 1.1指路 延续使用 JZC-33F-012-ZS3 继电器&#xff0c;设计出以小电流撬动大电流的继电器电路。 &#xff08;提示&#xff09;电路需要包含&#xff1a;三极管开关电路、续流二极管、滤波电容、指示灯、输出部分。 1.2数据手册重要信息提炼 联系排列&…

神经网络与深度学习——第3章 线性模型

本文讨论的内容参考自《神经网络与深度学习》https://nndl.github.io/ 第3章 线性模型 线性模型 线性模型&#xff08;Linear Model&#xff09;是机器学习中应用最广泛的模型&#xff0c;指通过样本特征的线性组合来进行预测的模型&#xff0c;给定一个 D D D维样本 x [ x …