kafka-生产者拦截器(SpringBoot整合Kafka)

文章目录

  • 1、生产者拦截器
    • 1.1、创建生产者拦截器
    • 1.2、KafkaTemplate配置生产者拦截器
    • 1.3、使用Java代码创建主题分区副本
    • 1.4、application.yml配置----v1版
    • 1.5、屏蔽 kafka debug 日志 logback.xml
    • 1.6、引入spring-kafka依赖
    • 1.7、控制台日志

1、生产者拦截器

1.1、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {
    //kafka生产者发送消息前执行:拦截发送的消息预处理
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()
        +",partition:"+producerRecord.partition()
        +",key = "+producerRecord.key()
        +",value = "+producerRecord.value());
        return null;
    }

    //kafka broker 给出应答后执行
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        //exception为空表示消息发送成功
        if(e == null){
            System.out.println("消息发送成功:topic = "+ recordMetadata.topic()
                    +",partition:"+recordMetadata.partition()
                    +",offset="+recordMetadata.offset()
            +",timestamp="+recordMetadata.timestamp());
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

1.2、KafkaTemplate配置生产者拦截器

package com.atguigu.kafka.producer;

import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;

@SpringBootTest
class KafkaProducerApplicationTests {

    //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中
    @Resource
    KafkaTemplate kafkaTemplate;

    @Resource
    MyKafkaInterceptor myKafkaInterceptor;

    @PostConstruct
    public void init() {
        kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);
    }
    @Test
    void contextLoads() throws IOException {
        kafkaTemplate.send("my_topic1", "spring-kafka-生产者拦截器");

        //回调是等kafka,ack以后才执行,需要阻塞
        System.in.read();
    }
}

1.3、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {
    @Bean
    public NewTopic myTopic1() {
        //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)
        return TopicBuilder.name("my_topic1")//主题名称
                .partitions(3)//主题分区
                .replicas(3)//主题分区副本数
                .build();//创建
    }
}

1.4、application.yml配置----v1版

server:
  port: 8110

# v1
spring:
  kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    producer: # producer 生产者
      retries: 0 # 重试次数 0表示不重试
      acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01-1/all)
      batch-size: 16384 # 批次大小 单位byte
      buffer-memory: 33554432 # 生产者缓冲区大小 单位byte
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

1.5、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.6、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu.kafka</groupId>
    <artifactId>kafka-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-producer</name>
    <description>kafka-producer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>


1.7、控制台日志

生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者拦截器
消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717490776329
[
  [
    {
      "partition": 0,
      "offset": 0,
      "msg": "spring-kafka-生产者拦截器",
      "timespan": 1717490776329,
      "date": "2024-06-04 08:46:16"
    }
  ]
]

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/688952.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SkyWalking之P0核心业务场景输出调用链路应用

延伸扩展&#xff1a;XX核心业务场景 路由标签打标、传播、检索 链路标签染色与传播 SW: SkyWalking的简写 用户请求携带HTTP头信息X-sw8-correlation “X-sw8-correlation: key1value1,key2value2,key3value3” 网关侧读取解析HTTP头信息X-sw8-correlation&#xff0c;然后通…

Dokcer 基础使用 (4) 网络管理

文章目录 Docker 网络管理需求Docker 网络架构认识Docker 常见网络类型1. bridge 网络2. host 网络3. container 网络4. none 网络5. overlay 网络 Docker 网路基础指令Docker 网络管理实操 其他相关链接 Docker 基础使用(0&#xff09;基础认识 Docker 基础使用(1&#xff09;…

【HarmonyOS4学习笔记】《HarmonyOS4+NEXT星河版入门到企业级实战教程》课程学习笔记(十三)

课程地址&#xff1a; 黑马程序员HarmonyOS4NEXT星河版入门到企业级实战教程&#xff0c;一套精通鸿蒙应用开发 &#xff08;本篇笔记对应课程第 20 - 21节&#xff09; P20《19.ArkUI-属性动画和显式动画》 本节先来学习属性动画和显式动画&#xff1a; 在代码中定义动画&am…

使用difflib实现文件差异比较用html显示

1.默认方式&#xff0c;其中加入文本过长&#xff0c;需要换行&#xff0c;因此做 contenthtml_output.replace(</style>,table.diff td {word-wrap: break-word;white-space: pre-wrap;max-width: 100%;}</style>)&#xff0c;添加换行操作 ps&#xff1a;当前te…

BGP汇总+认证

一、BGP 的宣告问题 1、在 BGP 协议中每台运行 BGP 的设备上&#xff0c;宣告本地直连路由 2、在 BGP 协议中运行 BGP 协议的设备来宣告.通过 IGP 学习到的&#xff0c;未运行 BGP 协议设备产2、生的路由&#xff1b; 在 BGP 协议中宣告本地路由表中路由条目时,将携带本地到达这…

PostgreSQL基础(九):PostgreSQL的事务介绍

文章目录 PostgreSQL的事务介绍 一、什么是ACID&#xff08;常识&#xff09; 二、事务的基本使用 三、保存点&#xff08;了解&#xff09; PostgreSQL的事务介绍 一、什么是ACID&#xff08;常识&#xff09; 在日常操作中&#xff0c;对于一组相关操作&#xff0c;通常…

视频大模型 Vidu 支持音视频合成;字节跳动推出语音生成模型 Seed-TTS 丨 RTE 开发者日报 Vol.221

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE&#xff08;Real-Time Engagement&#xff09; 领域内「有话题的新闻」、「有态度的观点」、「有意思的数据」、「有思考的文章」、「…

鸿蒙全栈开发-浅谈鸿蒙~线程模型

前言 如果你现在正巧在找工作&#xff0c;或者琢磨着换个职业跑道&#xff0c;鸿蒙开发绝对值得你考虑一下。 为啥&#xff1f;理由很简单&#xff1a; 市场需求大&#xff1a;鸿蒙生态还在持续扩张&#xff0c;应用开发、系统优化、技术支持等岗位需求旺盛&#xff0c;找工作…

三分搜索峰值

问题 现在有一个数组&#xff0c;显示递增&#xff0c;后是递减&#xff0c;如何找到它的峰值&#xff1f; 思路 可以利用分治的思想&#xff0c;向二分查找一样&#xff0c;每次将要查询的区域分成若干个区域&#xff0c;根据区域的特殊点的值淘汰一些区域&#xff0c;缩小…

基于Python的Selenium详细教程

一、PyCharm安装配置Selenium 本文使用环境&#xff1a;windows11、Python 3.10.5、PyCharm 2022.1.3、Selenium 4.3.0 需要你懂的技术&#xff1a;Python、HTML、CSS、JavaScript 1.Seleium安装&#xff1a; 在PyCharm终端或window命令窗口输入以下命令 #查看已安装的Pytho…

硬件产品经理

边端协调管理平台 主页一&#xff1a;模型管理1.1 边侧模型管理 二&#xff1a;配置管理2.1 终端软件配置管理 三&#xff1a;设备管理3.1 区域位置管理3.2 工控机管理&#xff08;其实就是围绕授权&#xff09;3.3 生产设备管理3.4 设备运行管理 四&#xff1a;数据服务4.1 实…

ISP:企业数字化发展的关键推动力

在当今信息化时代&#xff0c;互联网已成为人们生活和工作中不可或缺的一部分。然而&#xff0c;对于很多人来说&#xff0c;ISP这一概念仍显得有些陌生。ISP&#xff0c;即互联网服务提供商&#xff08;Internet Service Provider&#xff09;&#xff0c;是为用户提供互联网接…

【课程总结】Day6(上):机器学习项目实战--外卖点评情感分析预测

机器学习项目实战&#xff1a;外卖点评情感分析预测 项目目的 基于中文外卖评论数据集&#xff0c;通过机器学习算法&#xff0c;对评论内容进行情感预测。 数据集 地址&#xff1a;http://idatascience.cn/dataset-detail?table_id429数据集字段 字段名称字段类型字段说…

package.json中resolutions的使用场景

文章目录 用途配置示例使用方法注意事项和peerDependencies有什么不同peerDependenciesresolutions 总结 ✍创作者&#xff1a;全栈弄潮儿 &#x1f3e1; 个人主页&#xff1a; 全栈弄潮儿的个人主页 &#x1f3d9;️ 个人社区&#xff0c;欢迎你的加入&#xff1a;全栈弄潮儿的…

谈AI 时代网站的未来趋势

以大语言模型为代表的AI 技术迅速发展&#xff0c;将会影响原有信息网络的方式。其中一个明显的趋势是通过chatGPT 对话代替搜索引擎和浏览器来获取信息。 互联网时代&#xff0c;主要是通过网站&#xff08;website&#xff09;提供信息。网站主要为人类阅读的方式构建的。主要…

阿里 Qwen2 模型开源,教你如何将 Qwen2 扩展到百万级上下文

本次开源的 Qwen2 模型包括 5 个尺寸&#xff0c;分别是 0.5B、1.5B、7B、72B、57B&#xff0c;其中 57B 的属于 MoE 模型&#xff08;激活参数 14B&#xff09;&#xff0c;其余为 Dense 模型&#xff0c;本篇文章会快速介绍下各个尺寸模型的情况&#xff0c;然后重点介绍下如…

西门子PLC学习之数据块的单个实例,多重实例与参数实例间的区别

首先介绍下函数&#xff0c;函数块与数据块这三个概念。 数据块 数据块里可以存储各种类型的参数。有人可能会问&#xff0c;m寄存器不是可以存储布尔值&#xff0c;8位&#xff0c;16位&#xff0c;32位变量吗&#xff0c;为什么要多此一举&#xff1f;因为虽然m寄存器能存储以…

LAMPSECURITY: CTF4 靶机实战

信息收集&#xff1a; 存活扫描&#xff1a; 端口扫描&#xff1a; 服务扫描&#xff1a; web页面&#xff1a; blog页面发现注入点&#xff1a; sql注入&#xff1a; sqlmap一把梭&#xff1a; 多个参数记得打&#xff1a; 哦 ssh登录&#xff1a; 老版本的ssh&#xff0c;…

重回1990短视频全集:成都鼎茂宏升文化传媒公司

重回1990短视频全集&#xff1a;时光之旅的温情回顾 在数字技术的浪潮中&#xff0c;短视频以其独特的魅力迅速崛起&#xff0c;成为我们记录生活、分享故事的新方式。而当我们回望过去&#xff0c;那些充满怀旧情怀的年份总是让人心生感慨。今天&#xff0c;就让我们一起踏上…

Oracle EBS AP发票创建会计科目提示:APP-SQLAP-10710:无法联机创建会计分录

系统版本 RDBMS : 12.1.0.2.0 Oracle Applications : 12.2.6 问题症状: 提交“创建会计科目”请求提示错误信息如下: APP-SQLAP-10710:无法联机创建会计分录。 请提交应付款管理系统会计流程,而不要为此事务处理创建会计分录解决方法 数据修复SQL脚本: UPDATE ap_invoi…