Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>3.1.6</version>
        </dependency>

2、配置application.yml

加入Kafka的配置

spring
  kafka:
    #Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔
    bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095
 
    producer:
      # 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认
      acks: 1
      # 发送失败时的重试次数,0表示不重试
      retries: 0
      # 批量发送时的批次大小(字节)
      batch-size: 30720000 # 30MB
      # 生产者的内存缓冲区大小(字节)
      buffer-memory: 33554432 # 32MB
      # Key的序列化器类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value的序列化器类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
 
    consumer:
      # 消费者所属的组ID
      group-id: test-kafka
      # 禁用自动提交offset,改为手动提交
      enable-auto-commit: false
      # 偏移量重置策略:
      # earliest:从最早的记录开始消费
      # latest:从最新的记录开始消费
      auto-offset-reset: earliest
      # Key的反序列化器类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value的反序列化器类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 每次poll()调用返回的最大消息条数
      max-poll-records: 2
      session:
        # 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)
        timeout:
          ms: 300000 # 5分钟
 
    listener:
      # 如果指定的主题不存在,是否让应用启动失败,false表示不会报错
      missing-topics-fatal: false
      # 消费模式:single=单条消息,batch=批量消费
      type: single
      # 消费确认模式:
      # manual_immediate:手动确认消息,立即提交offset
      ack-mode: manual_immediate

这里的生产者value的序列化器用org.apache.kafka.common.serialization.StringSerializer
 ,消费者value的序列化器用org.apache.kafka.common.serialization.StringDeserializer即可。

(这里不需要自定义序列化器,但在代码需要将JAVA对象转化为JSON字符串发送)

3、config、producer、consumer代码

3.1、User.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private int id;
    private String name;
}

3.2、Task.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public
class Task {
    private int id;
    private String description;
    private User assignedUser;
}

模拟嵌套类 

3.3、KafkaConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
 
@EnableKafka
@Configuration
public class KafkaConfig {
 
    // 单条消费监听器工厂,手动提交offset
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
 
 
}

3.4、KafkaProducer.java

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class KafkaProducer {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducer.class, args);
    }

    @Bean
    CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
        return args -> {
            String topic = "task-topic";
            ObjectMapper objectMapper = new ObjectMapper();
            for (int i = 1; i <= 5; i++) {
                // 定义一个对象实例
                User user = User.builder().id(1).name("Alice").build();
                Task task = Task.builder().id(101).description("Complete report").assignedUser(user).build();
                     //JAVA对象转化为JSON字符串
                String message =  objectMapper.writeValueAsString(task);

                kafkaTemplate.send(topic, message);
                System.out.println("Sent: " + message);
                Thread.sleep(500); // 模拟消息发送间隔
            }
        };
    }
}

序列化:使用 Jackson 的 ObjectMapperTask 对象转化为 JSON 字符串,方法 writeValueAsString() 将 Java 对象转为 JSON 字符串。

3.5、SingleConsumer.java

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;



@Service
public class SingleConsumer {
    @KafkaListener(topics = "task-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws JsonProcessingException {
        String message = record.value();

        ObjectMapper objectMapper = new ObjectMapper();
        Task task = objectMapper.readValue(message,Task.class);
        // 取出
        System.out.println("User - Received: " + task.getAssignedUser());
        // 手动提交offset
        acknowledgment.acknowledge();
    }
}

反序列化: 使用 ObjectMapper 将 JSON 字符串 message 转换回 Task 对象,方法 readValue() 可以将 JSON 字符串解析为指定的 Java 对象类型。

4、测试

启动KafkaProducer.java

可以解析出JAVA对象中User

 

成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/920129.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【SQL实验】视图操作(菜单操作和命令操作)

完整代码在文章末尾【代码是自己的解答&#xff0c;并非标准答案&#xff0c;也有可能写错&#xff0c;文中可能会有不准确或待完善之处&#xff0c;恳请各位读者不吝批评指正&#xff0c;共同促进学习交流】 &#xff08;一&#xff09;菜单操作 1.建立视图“课程”&#xff…

python基础知识(七)——写入excel

一、写入excel 写入数据到excel wb load_workbook("testcase_api_wuye.xlsx") #打开一个已经存在的excel文件 sh wb["register"] #识别某一个表单 sh.cell(row 2,column 8).value "pass" #写入数据&#xff0c;单元格的值赋值 wb.sav…

MATLAB绘图基础11:3D图形绘制

参考书&#xff1a;《 M A T L A B {\rm MATLAB} MATLAB与学术图表绘制》(关东升)。 11.3D图形绘制 11.1 3D图概述 M A T L A B {\rm MATLAB} MATLAB的 3 D {\rm 3D} 3D图主要有&#xff1a; 3 D {\rm 3D} 3D散点图、 3 D {\rm 3D} 3D线图、 3 D {\rm 3D} 3D曲面图、 3 D {\rm…

ssm148基于Spring MVC框架的在线电影评价系统设计与实现+jsp(论文+源码)_kaic

毕 业 设 计&#xff08;论 文&#xff09; 题目&#xff1a;在线电影评价系统设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本在线电影评价系…

激光slam学习笔记5---ubuntu2004部署运行fastlivo踩坑记录

背景&#xff1a;看看fastlivo论文&#xff0c;觉得挺有意思的&#xff0c;就本地部署跑跑看看效果。个人环境&#xff0c;ubuntu20.04。 一、概要 由于依赖比较多&#xff0c;个人构建工作空间&#xff0c;使用catkin_make编译 src├── FAST-LIVO├── livox_ros_driver…

12. 利用“文件组织”实现石头剪刀布小游戏

文章目录 概要整体架构流程技术名词解释小结 1. 概要 ~ Jack Qiao对米粒说&#xff1a;“前面咱们了解过“文件组织”&#xff0c;让我们利用“文件组织”来制作一个有趣的“石头、剪刀、布”小游戏。”举个栗子&#xff1a; > 程序随机生成一个选择&#xff08;石头、剪刀…

VRT: 关于视频修复的模型

VRT: 关于视频修复的模型 1. 视频修复的背景与重要性背景介绍&#xff1a;重要性&#xff1a; 2. VRT的重要性和研究背景VRT的背景&#xff1a;VRT的重要性&#xff1a; 3. 视频修复概述3.1 定义与目标3.2 与单图像修复的区别3.3 对时间信息利用的需求 4. VRT模型详解4.1 整体框…

Stable Diffusion经典应用场景

&#x1f33a;系列文章推荐&#x1f33a; 扩散模型系列文章正在持续的更新&#xff0c;更新节奏如下&#xff0c;先更新SD模型讲解&#xff0c;再更新相关的微调方法文章&#xff0c;敬请期待&#xff01;&#xff01;&#xff01;&#xff08;本文及其之前的文章均已更新&…

04 —— Webpack打包CSS代码

加载器css-loader &#xff1a;解析css代码 webpack 中文文档 | webpack中文文档 | webpack中文网 加载器style-loader&#xff1a;把解析后的css代码插入到DOM style-loader | webpack 中文文档 | webpack中文文档 | webpack中文网 准备css代码&#xff0c;放到src/login目…

springboot高校网上缴费综合务系统

摘 要 相比于以前的传统手工管理方式&#xff0c;智能化的管理方式可以大幅降低运营人员成本&#xff0c;实现了高校网上缴费综合务系统的标准化、制度化、程序化的管理&#xff0c;有效地防止了高校网上缴费综合务的随意管理&#xff0c;提高了信息的处理速度和精确度&#x…

IDEA怎么定位java类所用maven依赖版本及引用位置

在实际开发中&#xff0c;我们可能会遇到需要搞清楚代码所用依赖版本号及引用位置的场景&#xff0c;便于排查问题&#xff0c;怎么通过IDEA实现呢&#xff1f; 可以在IDEA中打开项目&#xff0c;右键点击maven的pom.xml文件&#xff0c;或者在maven窗口下选中项目&#xff0c;…

springMVC重点知识

一、springMVC请求流程 二、springMVC环境搭建 Idea 下创建 springmvc01 ⼯程 1、pom.xml 坐标添加 <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>12</maven.compiler.source> …

django基于python 语言的酒店推荐系统

摘 要 酒店推荐系统旨在提供一个全面酒店推荐在线平台&#xff0c;该系统允许用户浏览不同的客房类型&#xff0c;并根据个人偏好和需求推荐合适的酒店客房。用户可以便捷地进行客房预订&#xff0c;并在抵达后简化入住登记流程。为了确保连续的住宿体验&#xff0c;系统还提供…

机器学习笔记——30种常见机器学习算法简要汇总

本笔记介绍机器学习中常见的30种机器学习算法。 文章目录 监督学习算法&#xff08;Supervised Learning&#xff09;回归算法&#xff08;Regression Algorithms&#xff09;分类算法&#xff08;Classification Algorithms&#xff09; 无监督学习算法&#xff08;Unsupervis…

Vue3、Vite5、Primevue、Oxlint、Husky9 简单快速搭建最新的Web项目模板

Vue3、Vite5、Oxlint、Husky9 简单搭建最新的Web项目模板 特色进入正题创建基础模板配置API自动化导入配置组件自动化导入配置UnoCss接入Primevue接入VueRouter4配置项目全局环境变量 封装Axios接入Pinia状态管理接入Prerttier OXLint ESLint接入 husky lint-staged&#xf…

利用RAGflow和LM Studio建立食品法规问答系统

前言 食品企业在管理标准、法规&#xff0c;特别是食品原料、特殊食品法规时&#xff0c;难以通过速查法规得到准确的结果。随着AI技术的发展&#xff0c;互联网上出现很多AI知识库的解决方案。 经过一轮测试&#xff0c;找到问题抓手、打通业务底层逻辑、对齐行业颗粒度、沉…

百度智能云千帆大模型平台引领企业创新增长

本文整理自百度世界大会 2024——「智能跃迁 产业加速」论坛的同名演讲。 更多大会演讲内容&#xff0c;请访问&#xff1a; https://baiduworld.baidu.com 首先&#xff0c;跟大家分享一张图&#xff0c;这个是我们目前大模型应用落地的场景分布。可以看到&#xff0c;大模型…

人形机器人开发、XR仿真训练、影视动画制作,一副手套支持多种应用

近日&#xff0c;动作捕捉数据手套供应商Manus 推出了其最新产品Metagloves Pro。其最大特点是佩戴更加方便简洁且精度更高。Metagloves Pro功能强大且适用于多种应用场景&#xff0c;包括&#xff1a;人形机器人研究、XR仿真训练以及影视动画制作等。 一、人形机器人研究 Man…

sql server怎样用sql profiler捕获带变量值的慢sql

一 新建跟踪 点击工具-SQL Server Profiler&#xff1a; 点击文件-新建跟踪的按钮&#xff1a; 在‘事件选择’选项卡只选择如下两项内容&#xff08;RPC:Completed,SQL:BatchCompleted&#xff09;&#xff0c;把多余的取消勾选&#xff1a; 然后勾选上面截图中右下方的‘显示…

Linux中虚拟内存详解

一、虚拟内存的概念 虚拟内存是现代操作系统为了有效管理内存资源、提高内存利用率以及实现多任务处理等目的而引入的一种重要的内存管理机制。它为每个程序&#xff08;通常对应一个进程&#xff09;提供了一个看似连续且容量较大的地址空间&#xff0c;而这个地址空间并不一…