kafka消费者监听消费

1. pom

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2. kafka 监听消费

消费成功调用 ack.acknowledge()方法确认。

import com.xxx.gsc.sci.order.entity.SciMbgPsdHistoryEntity;
import com.xxx.gsc.sci.order.mapper.SciMbgPsdHistoryMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;


/**
 * 同步SCI系统mbg_psd_partial_line_history表。</p>
 * 上游增量推送,PK: VERSION_NUM,SO,SO_ITEM,PSD,PLANNED_QTY
 *
 * @date 2022/08/05 18:06
 * @see org.springframework.kafka.listener.MessageListener
 */
@Component
@Slf4j
public class SyncSciMbgPsdHistory {

    @Autowired
    private SqlSessionTemplate sqlSessionTemplate;

    @KafkaListener(topics = "#{'${customer.kafka.topics}'.split(',')[1]}")
    public void sync(List<SciMbgPsdHistoryEntity> dataList, Acknowledgment ack) {
        SqlSession session = null;
        try {
            log.info("Starting to consume of PSD data ...");
            long startTime = System.currentTimeMillis();
            session = sqlSessionTemplate.getSqlSessionFactory().openSession(ExecutorType.BATCH, false);
            SciMbgPsdHistoryMapper mapper = session.getMapper(SciMbgPsdHistoryMapper.class);
            dataList.forEach(v -> mapper.upsert(v));
            session.commit();
            ack.acknowledge();
            long duration = System.currentTimeMillis() - startTime;
            log.info("Finished to consume of PSD data! total count: {}条, total time: {} s", dataList.size(), duration / 1000.0);
        } catch (Throwable e) {
            e.printStackTrace();
            log.error(e.getMessage());
        } finally {
            if (null != session) {
                session.close();
            }
        }
    }
}

3. 配置

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.xxx.gsc.tech.framework.GscTechAutoConfiguration;
import com.xxx.gsc.tech.framework.jackson.deserializer.DateDeserializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.system.ApplicationHome;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * Kafka自动载入类。<p/>
 * 实现初始化Kafka配置工厂
 *
 * @author zhengwei16
 * @date 2022/8/12 15:29
 * @version 1.0
 */
@SpringBootConfiguration
@AutoConfigureAfter(GscTechAutoConfiguration.class)
@ConditionalOnClass(KafkaTemplate.class)
@Slf4j
public class KafkaAutoConfiguration {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    private KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) throws IOException {
        Resource trustStoreLocation = properties.getSsl().getTrustStoreLocation();
        log.info("SSL file path:" + (Objects.isNull(trustStoreLocation) ? "" : trustStoreLocation.getURI().toString()));
        if (trustStoreLocation != null && !trustStoreLocation.isFile()) {
            ApplicationHome applicationHome = new ApplicationHome(getClass());
            log.info("Application Home:" + applicationHome.getDir().getPath());
            File sslFile = new File(applicationHome.getSource().getParentFile(), Objects.requireNonNull(trustStoreLocation.getFilename()));
            FileUtils.copyInputStreamToFile(Objects.requireNonNull(trustStoreLocation.getInputStream(), "SSL File Not Exist"), sslFile);
            properties.getSsl().setTrustStoreLocation(new FileSystemResource(sslFile));
        }

        this.properties = properties;
    }

    @Bean
    @Primary
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
                                             ProducerListener<Object, Object> kafkaProducerListener,
                                             RecordMessageConverter messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        if (messageConverter != null) {
            kafkaTemplate.setMessageConverter(messageConverter);
        }
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    @Primary
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        Map<String, Object> configs = this.properties.buildConsumerProperties();
        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
        return new DefaultKafkaConsumerFactory<>(configs);
    }

    @Bean
    @Primary
    public ProducerFactory<?, ?> kafkaProducerFactory() {
        Serializer stringSerializer = new StringSerializer();
        Serializer jsonSerializer = new JsonSerializer(new ObjectMapper() {{
            setSerializationInclusion(JsonInclude.Include.NON_NULL);
            configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
        }});
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(this.properties.buildProducerProperties(), stringSerializer, jsonSerializer);
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix + "_" + UUID.randomUUID());
        }
        return factory;
    }

    @Bean
    @Primary
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }

    @Bean
    @Primary
    public RecordMessageConverter kafkaMessageConverter(ObjectMapper objectMapper) {
        ObjectMapper om = new ObjectMapper();
        return new StringJsonMessageConverter(om
                .setSerializationInclusion(JsonInclude.Include.NON_NULL)
                .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE)
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                .registerModule(new SimpleModule().addDeserializer(Date.class, new DateDeserializer()))
                .setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")));
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
        KafkaTransactionManager<?, ?> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
        // kafkaTransactionManager.setTransactionSynchronization(KafkaTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        kafkaTransactionManager.setNestedTransactionAllowed(true);
        return kafkaTransactionManager;
    }

    @Bean
    @ConditionalOnBean(KafkaTransactionManager.class)
    public ChainedKafkaTransactionManager<?, ?> chainedKafkaTransactionManager(DataSourceTransactionManager dataSourceTransactionManager, KafkaTransactionManager<?, ?> kafkaTransactionManager) {
        return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, dataSourceTransactionManager);
    }

}

4. application配置

spring:
  kafka:
# Tst
    bootstrap-servers: n1-mkt-sy.xxx.com:9092,n2-mkt-sy.xxx.com:9092,n3-mkt-sy.xxx.com:9092
    consumer:
      group-id: mbgcpfr
      auto-offset-reset: earliest
      enable-auto-commit: false
      auto-commit-interval: 1000
      max-poll-records: 5000
      security:
        protocol: SASL_SSL
      properties:
        max.partition.fetch.bytes: 104857600
        fetch.min.bytes: 2108576
        fetch.max.wait.ms: 10000
        session.timeout.ms: 300000  # default 10000
        request.timeout.ms: 600000 # default 30000
        max.poll.interval.ms: 600000 # default 300000
        sasl:
          mechanism: SCRAM-SHA-512
          jaas:
            config: org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf-username' password='passwd';
    ssl:
      trust-store-location: classpath:client_truststore.jks
      trust-store-password: PASSWD
    listener:
      #concurrency: 2 #容器中的线程数,用于提高并发量
      #      ack-count: # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
      ack-mode: manual_immediate # Listener AckMode. See the spring-kafka documentation.
      #      ack-time: # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
      #poll-timeout: # Timeout to use when polling the consumer.
      type: batch # Listener type.
      missing-topics-fatal: false

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/779277.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

比赛获奖的武林秘籍:03 好的创意选取-获得国奖的最必要前提

比赛获奖的武林秘籍&#xff1a;03 好的创意选取-获得国奖的最必要前提 摘要 本文主要介绍了大学生电子计算机类比赛和创新创业类比赛创意选取的重要性&#xff0c;并列举了好的创意选取和坏的创意选取的例子&#xff0c;同时说明了好的创意选取具有哪些特点&#xff0c;同时…

Tkinter布局助手

免费的功能基本可以满足快速开发布局&#xff0c; https://pytk.net/ iamxcd/tkinter-helper: 为tkinter打造的可视化拖拽布局界面设计小工具 (github.com) 作者也把项目开源了&#xff0c;有兴趣可以玩玩

Java中线程的常用方法(并发编程基础)

Java中线程的常用方法 sleep 调用sleep会让当前线程从Running进入TIMED WAITING状态其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时sleep方法会抛出InterruptedException睡眠结束后的线程未必会立刻得到执行建议用TimeUnit的sleep代替Thread的sleep来获得更好的可读…

如何在PD虚拟机中开启系统的嵌套虚拟化功能?pd虚拟机怎么用 Parallels Desktop 19 for Mac

PD虚拟机是一款可以在Mac电脑中运行Windows系统的应用软件。使用 Parallels Desktop for Mac 体验 macOS 和 Windows 的最优性能&#xff0c;解锁强大性能和无缝交互。 在ParallelsDesktop&#xff08;PD虚拟机&#xff09;中如何开启系统的嵌套虚拟化功能&#xff1f;下面我们…

vulhub-activemq(CVE-2015-5254)

Apache ActiveMQ 5.13.0版本之前到5.x版本的安全漏洞&#xff0c;该程序引起的漏洞不限制代理中可以序列化的类。远程攻击者可以制作一个特殊的序列化 Java 消息服务 (JMS) ObjectMessage 对象&#xff0c;利用该漏洞执行任意代码。 Apache ActiveMQ 5.x ~ Apache ActiveMQ 5.1…

【人工智能】-- 智能机器人

个人主页&#xff1a;欢迎来到 Papicatch的博客 课设专栏 &#xff1a;学生成绩管理系统 专业知识专栏&#xff1a; 专业知识 文章目录 &#x1f349;引言 &#x1f349;机器人介绍 &#x1f348;机器人硬件 &#x1f34d;机械结构 &#x1f34d;传感器 &#x1f34d;控…

基于Android Studio电影购票系统

目录 项目介绍 图片展示 运行环境 获取方式 项目介绍 主要实为了方便用户随时随地进行电影购票。在配色方面选择了一些富有电影元素的颜色。主要能够实现的功能与流程为&#xff1a; 1.用户首先需要注册用户名填写密码。 2.用户可以用之前注册的用户名和密码进行登录。 3.登…

键盘异常的检测与解决方案

今天对象用Word写文档&#xff0c;按下Ctrl的时候&#xff0c;页面不停地上下滑动&#xff0c;导致无法正常编辑文本。 重启之后&#xff0c;仍然无法解决&#xff0c;推断是键盘坏了。 但是当按下Fn或其他功能键&#xff0c;焦点移除&#xff0c;页面就不会再抖动了。 现在…

[CP_AUTOSAR]_分层软件架构_内容详解

目录 1、软件分层内容1.1、Microcontroller Abstraction Layer1.2、ECU Abstraction Layer1.2.1、I/O HW Abstraction1.2.2、Communication Hardware Abstraction1.2.3、Memory Hardware Abstraction1.2.4、Onboard Device Abstraction1.2.5、Crypto Hardware Abstraction 1.3、…

Docker安装遇到问题:curl: (7) Failed to connect to download.docker.com port 443: 拒绝连接

问题描述 首先&#xff0c;完全按照Docker官方文档进行安装&#xff1a; Install Docker Engine on Ubuntu | Docker Docs 在第1步&#xff1a;Set up Dockers apt repository&#xff0c;执行如下指令&#xff1a; sudo curl -fsSL https://download.docker.com/linux/ubu…

超赞的8款生活APP推荐!

AI视频生成&#xff1a;小说文案智能分镜智能识别角色和场景批量Ai绘图自动配音添加音乐一键合成视频https://aitools.jurilu.com/每天都会有几十个应用程序发布&#xff0c;一款用得好的应用程序可以极大地丰富您的生活。对于那些不知道哪个应用程序适合您以及您需要哪个应用程…

将excel表格转换为element table(下)

在‘将excel表格转换为element table(上)’我们把excel 转换后通过数据重构绑定到了element table上&#xff0c;现在要做的就是根据源文件进行行列进行合并操作 先看看最终处理的结果 这里在一步步分析实现步骤。 先分析一下合并的逻辑 大致思路理理如上。 思路有了接下来…

微信小程序的农产品商城-计算机毕业设计源码46732

摘 要 随着社会经济的发展和人们消费观念的升级&#xff0c;农产品电商行业逐渐壮大。但传统的农产品销售模式存在信息不透明、中间环节复杂等问题&#xff0c;而微信小程序作为一种便捷的移动应用平台&#xff0c;为农产品商城的建设提供了新的可能性。通过微信小程序的设计与…

上位机图像处理和嵌入式模块部署(mcu项目1:用户手册)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 一个完整的产品&#xff0c;除了上位机软件、固件、硬件、包装之外&#xff0c;一般还需要一个用户手册。好的用户手册应该能够兼顾到大多数人的认…

开发个人Go-ChatGPT--1 项目介绍

开发个人Go-ChatGPT--1 项目介绍 开发个人Go-ChatGPT--1 项目介绍知识点大纲文章目录项目地址 开发个人Go-ChatGPT–1 项目介绍 本文将以一个使用Ollama部署的ChatGPT为背景&#xff0c;主要还是介绍和学习使用 go-zero 框架&#xff0c;开发个人Go-ChatGPT的服务器后端&#x…

电脑为什么会提示丢失msvcp140.dll?怎么修复msvcp140.dll文件会靠谱点

电脑为什么会提示丢失msvcp140.dll&#xff1f;其实只要你的msvcp140.dll文件一损坏&#xff0c;然而你的电脑程序需要运用到这个msvcp140.dll文件的时候&#xff0c;就回提示你丢失了msvcp140.dll文件&#xff01;因为没有这个文件&#xff0c;你的很多程序都用不了的。今天我…

Redis基础教程(十三):Redis lua脚本

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…

[240706] 史蒂夫·乔布斯近40年前就预言了苹果智能 | Globalping 用于网络诊断和性能测试的命令行工具

目录 史蒂夫.乔布斯近40年前就预言了苹果智能Globalping 用于网络诊断和性能测试的命令行工具功能1. Ping2. Traceroute3. DNS 查询4. HTTP 请求 使用场景1. 网络性能监测2. 故障排除3. 网站性能优化4. 服务可用性监控 优势1. [全球覆盖](https://www.jsdelivr.com/network)2. …

[Vite]Vite插件生命周期了解

[Vite]Vite插件生命周期了解 Chunk和Bundle的概念 Chunk&#xff1a; 在 Vite 中&#xff0c;chunk 通常指的是应用程序中的一个代码片段&#xff0c;它是通过 Rollup 或其他打包工具在构建过程中生成的。每个 chunk 通常包含应用程序的一部分逻辑&#xff0c;可能是一个路由视…

5个实用的文章生成器,高效输出优质文章

在自媒体时代&#xff0c;优质内容的持续输出是吸引读者、提升影响力的关键。然而&#xff0c;对于许多自媒体创作者来说&#xff0c;频繁的创作难免会遭遇灵感枯竭、创作不出文章的困扰。此时&#xff0c;文章生成器便成为了得力的助手。文章生成器的优势能够快速自动生成高质…