一个简单的spring+kafka生产者

1. pom

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2. 生产者

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.xxx.npi.module.common.msg.dto.MsgBase;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class MyMessageProducerService {

    @Value("${npi.default-url}")
    private String domain;

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyMessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public <T extends MsgBase> void sendMessage(String topicName, T msgObj) {
        List<T> list = new ArrayList<>();
        list.add(msgObj);
        if("https://npi.xxx.com".equals(domain)){
            kafkaTemplate.send(topicName, toJsonString(list));
        }
    }

    public <T extends MsgBase> void sendMessage(String topicName, List<T> list) {
        if("https://npi.xxx.com".equals(domain)){
            kafkaTemplate.send(topicName, toJsonString(list));
        }
    }

    private String toJsonString(Object obj) {
        return JSON.toJSONString(obj,
                SerializerFeature.WriteDateUseDateFormat,
                SerializerFeature.WriteMapNullValue,
                SerializerFeature.WriteNullListAsEmpty,
                SerializerFeature.WriteNullStringAsEmpty,
                SerializerFeature.DisableCircularReferenceDetect);
    }

}

3. 配置

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;


@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.producer.retries}")
    private int retries;

    @Value("${spring.kafka.producer.acks}")
    private String acks;

    @Value("${spring.kafka.producer.batch-size}")
    private int batchSize;

    @Value("${spring.kafka.producer.linger-ms}")
    private int lingerMs;

    @Value("${spring.kafka.producer.buffer-memory}")
    private int bufferMemory;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    @Value("${spring.kafka.producer.security.protocol}")
    private String securityProtocol;

    @Value("${spring.kafka.producer.ssl.truststore.location}")
    private Resource sslTruststoreLocationResource;

    @Value("${spring.kafka.producer.ssl.truststore.password}")
    private String sslTruststorePassword;

    @Value("${spring.kafka.producer.sasl.mechanism}")
    private String saslMechanism;

    @Value("${spring.kafka.producer.sasl.jaas.config}")
    private String saslJaasConfig;

    @SuppressWarnings({"unchecked", "rawtypes"})
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }

    @SuppressWarnings("unchecked")
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        @SuppressWarnings("rawtypes")
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());
        // factory.transactionCapable();
        // factory.setTransactionIdPrefix("transaction-");
        return factory;
    }

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put("bootstrap.servers", servers);
        props.put("acks", acks);
        props.put("retries", retries);
        props.put("batch.size", batchSize);
        props.put("linger.ms", lingerMs);
        props.put("buffer.memory", bufferMemory);
        props.put("key.serializer", keySerializer);
        props.put("value.serializer", valueSerializer);
        props.put("security.protocol", securityProtocol);
        props.put("sasl.mechanism", saslMechanism);
        props.put("sasl.jaas.config", saslJaasConfig);
        // 如果需要更低级别的消息丢失防护,可以启用幂等性
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // SSL配置
        props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
        try {
            // 将类路径资源转换为临时文件路径
            InputStream inputStream = sslTruststoreLocationResource.getInputStream();
            File tempFile = File.createTempFile("client_truststore", ".jks");
            Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tempFile.getAbsolutePath());
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);
        } catch (IOException e) {
            throw new RuntimeException("Failed to locate truststore file", e);
        }
        return props;
    }
}

4. application

spring:
  kafka:
    producer:
      bootstrap-servers: n2.ikt.xxx.com:9092, n3.ikt.xxx.com:9092, n4.ikt.xxx.com:9092, n5.ikt.xxx.com:9092, n6.ikt.xxx.com:9092
      acks: all
      retries: 3
      batch-size: 16384
      linger-ms: 1
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      security.protocol: SASL_SSL
      ssl.truststore.location: classpath:client_truststore.jks
      ssl.truststore.password: pwd
      sasl.mechanism: SCRAM-SHA-512
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf-username' password='pwd';
    topic:
      br: mdscinpi.mdscinpi-data.tst
      mem: mdscinpi.msdcinpi-data.tst
      fbr: mdscinpi.inpi-data.tst
      cr: mdscinpi.npi-data.tst

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/777314.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

破解在制品管理不透明难题

在快节奏的现代工业浪潮中&#xff0c;每一个细微的管理环节都直接关系到企业的竞争力与盈利能力。在车间生产中&#xff0c;在制品管理流程不透明是一个常见问题&#xff0c;它可能导致生产效率低下、成本增加、库存积压以及沟通障碍等负面影响。 在制品管理流程不透明&#x…

ETAS工具导入Com Arxml修改步骤

文章目录 前言Confgen之前的更改Confgen之后的修改CANCanIfComComMEcuM修改CanNmCanSMDCMCanTp生成RTE过程报错修改DEXT-诊断文件修改Extract问题总结前言 通讯协议栈开发一般通过导入DBC实现,ETAS工具本身导入DBC也是生成arxml后执行cfggen,本文介绍直接导入客户提供的arxml…

8种数据迁移工具

前言 最近有些小伙伴问我&#xff0c;ETL数据迁移工具该用哪些。 ETL(是Extract-Transform-Load的缩写&#xff0c;即数据抽取、转换、装载的过程)&#xff0c;对于企业应用来说&#xff0c;我们经常会遇到各种数据的处理、转换、迁移的场景。 今天特地给大家汇总了一些目前…

迭代加深——AcWing 170. 加成序列

迭代加深 定义 迭代加深搜索&#xff08;Iterative Deepening Depth-First Search, IDS&#xff09;是一种结合了深度优先搜索&#xff08;DFS&#xff09;和广度优先搜索&#xff08;BFS&#xff09;特点的算法。它通过限制搜索树的深度来控制搜索范围&#xff0c;起初以较小…

CTFShow的RE题(一)

RE2 1.中文字符的显示 2.对文件的读取操作 3.RC4加密 &#xff08;有一点是魔改的&#xff09; 4.enflag.txt文件里面的密文是ASCII编码之后的数据(可以放ida中) 也可以放到 010 里&#xff08;推荐&#xff09; encDH~mqqvqxB^||zllJq~jkwpmvez{ key for i in enc:keychr…

程序员下班为什么不关电脑?难道在偷偷加班?!

不管是周围的程序员朋友还是网上的很多程序员朋友&#xff0c;在下班后都是习惯不关电脑的&#xff0c;关上显示器&#xff0c;拿上手机&#xff0c;快乐下班&#xff01; 那么&#xff0c;为什么程序员下班都不关电脑&#xff1f;难道他们在偷偷加班&#xff1f; 其实&#x…

elasticsearch源码分析-04集群状态发布

集群状态发布 cluster模块封装了在集群层面执行的任务&#xff0c;如集群健康、集群级元信息管理、分片分配给节点、节点管理等。集群任务执行之后可能会产生新的集群状态&#xff0c;如果产生新的集群状态主节点会将集群状态广播给其他节点。 集群状态封装在clusterState中&…

基于Qt实现的PDF阅读、编辑工具

记录一下实现pdf工具功能 语言&#xff1a;c、qt IDE&#xff1a;vs2017 环境&#xff1a;win10 一、功能演示&#xff1a; 二、功能介绍&#xff1a; 1.基于saribbon主体界面框架&#xff0c;该框架主要是为了实现类似word导航项 2.加载PDF放大缩小以及预览功能 3.pdf页面跳转…

Qt 网络编程 网络信息获取操作

学习目标&#xff1a;网络信息获取操作 前置环境 运行环境:qt creator 4.12 学习内容 一、Qt 网络编程基础 Qt 直接提供了网络编程模块,包括基于 TCP/IP 的客户端和服务器相关类,如 QTcpSocket/QTcpServer 和 QUdpSocket,以及实现 HTTP、FTP 等协议的高级类,如 QNetworkRe…

SPIN-Diffusion:自我博弈微调提升文本到图像扩散模型性能

扩散模型作为生成AI的关键实体&#xff0c;已经在多个领域展现出了卓越的能力。然而&#xff0c;现有的扩散模型&#xff0c;如Stable Diffusion和SDXL&#xff0c;通常在预训练阶段后需要进行微调以更好地符合人类偏好。最近&#xff0c;研究者们开始尝试使用强化学习&#xf…

矩阵键盘与密码锁

目录 1.矩阵键盘介绍​编辑 2.扫描的概念 3.代码演示&#xff08;读取矩阵键盘键码&#xff09; 4.矩阵键盘密码锁 1.矩阵键盘介绍 为了减少I/O口的占用&#xff0c;通常将按键排列成矩阵形式&#xff0c;采用逐行或逐列的 “扫描”&#xff0c;就可以读出任何位置按键的状态…

jenkins配置gitee源码地址连接不上

报错信息如下&#xff1a; 网上找了好多都没说具体原因&#xff0c;最后还是看jenkins控制台输出日志发现&#xff1a; ssh命令执行失败&#xff08;git环境有问题&#xff0c;可能插件没安装成功等其他问题&#xff09; 后面发现是jenkins配置git的地方git安装路径错了。新手…

帕金森病患者在选择运动疗法时应该注意哪些事项?

帕金森病患者在选择运动疗法时&#xff0c;应该遵循以下几点注意事项&#xff1a; 个性化运动处方&#xff1a;根据患者的病情、年龄、健康状况、以往运动能力等因素&#xff0c;制定个体化的运动处方。 避免运动负荷过大&#xff1a;运动时间不宜过长&#xff0c;注意控制心率…

机器学习 C++ 的opencv实现SVM图像二分类的测试 (三)【附源码】

机器学习 C 的opencv实现SVM图像二分类的测试 (三) 数据集合下载地址&#xff1a;https://download.csdn.net/download/hgaohr1021/89506900 根据上节得到的svm.xml&#xff0c;测试结果为&#xff1a; #include <stdio.h> #include <time.h> #include <o…

智慧生活新篇章,Vatee万腾平台领航前行

在21世纪的科技浪潮中&#xff0c;智慧生活已不再是一个遥远的梦想&#xff0c;而是正逐步成为我们日常生活的现实。从智能家居的温馨便捷&#xff0c;到智慧城市的高效运转&#xff0c;科技的每一次进步都在为我们的生活增添新的色彩。而在这场智慧生活的变革中&#xff0c;Va…

stm32定时器与pwm波

文章目录 4 TIM4.1 SysTick系统定时器4.2 TIM定时器中断与微秒级延时4.3 TIM使用PWM波4.3.1 PWM介绍4.3.2 无源蜂鸣器实现 4.4 TIM ,PWM常用函数 4 TIM 4.1 SysTick系统定时器 ​ Systick系统滴答&#xff0c;&#xff08;同时他有属于自己的中断&#xff0c;可以利用它来做看…

Star CCM+界面显示字体大小调整

前言 打开界面字体显示大小是默认的&#xff0c;软件内设置调整默认字体的大小是无法实现&#xff0c;需要在图标属性中进行设置&#xff0c;操作方法与中英文切换很类似&#xff0c;具体方法如下&#xff1a; 操作流程 1. 右击Star-CCM快捷⽅式&#xff0c;选择“属性”&…

【Mindspore进阶】-03.ShuffleNet实战

ShuffleNet图像分类 当前案例不支持在GPU设备上静态图模式运行&#xff0c;其他模式运行皆支持。 ShuffleNet网络介绍 ShuffleNetV1是旷视科技提出的一种计算高效的CNN模型&#xff0c;和MobileNet, SqueezeNet等一样主要应用在移动端&#xff0c;所以模型的设计目标就是利用有…

lodash-es 基本使用

中文文档&#xff1a;https://www.lodashjs.com/ cloneDeep方法文档&#xff1a;https://www.lodashjs.com/docs/lodash.cloneDeep#_clonedeepvalue 参考掘金文章&#xff1a;https://juejin.cn/post/7354940462061715497 安装&#xff1a; pnpm install lodash-esnpm地址&a…

Ad-hoc命令和模块简介

华子目录 Ad-hoc命令和模块简介1.概念2.格式3.Ansible命令常用参数4.模块类型4.1 三种模块类型4.2Ansible核心模块和附加模块 示例1示例2 Ad-hoc命令和模块简介 1.概念 Ansible提供两种方式去完成任务&#xff0c;一是ad-hoc命令&#xff0c;一是写Ansible playbook(剧本)Ad-…