springboot整合kafka多数据源

整合kafka多数据源

  • 项目背景
  • 依赖
  • 配置
  • 生产者
  • 消费者
  • 消息体

项目背景

在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafka存在多数据源的情况,就与单机的情况有所不同。

依赖

    implementation 'org.springframework.kafka:spring-kafka:2.8.2'

配置

单机的情况
如果是单机的kafka我们直接通过springboot自动配置的就可以使用,例如在yml里面直接引用

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    bootstrap-servers: server001.bbd:9092

在使用的时候直接注入,然后就可以使用里面的方法了

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

在这里插入图片描述

多数据源情况下

本篇文章主要讲的是在多数据源下的使用,和单机的有所不同,我也看了网上的一些博客,但是当我去按照网上的配置的时候,总是会报错 kafakTemplate这个bean找不到,所以没办法只有按照springboot自动配置里面的来改
在这里插入图片描述

package com.ddb.zggz.config;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.io.IOException;

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {

    private final KafkaProperties properties;

    private final KafkaSecondProperties kafkaSecondProperties;



    public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {
        this.properties = properties;
        this.kafkaSecondProperties = kafkaSecondProperties;
    }

    @Bean("kafkaTemplate")
    @Primary
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
                                             ProducerListener<Object, Object> kafkaProducerListener,
                                             ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }


    @Bean("kafkaSecondTemplate")
    public KafkaTemplate<?, ?> kafkaSecondTemplate(@Qualifier("kafkaSecondProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
                                                   @Qualifier("kafkaSecondProducerListener") ProducerListener<Object, Object> kafkaProducerListener,
                                                   ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }


    @Bean("kafkaProducerListener")
    @Primary
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<>();
    }


    @Bean("kafkaSecondProducerListener")
    public ProducerListener<Object, Object> kafkaSecondProducerListener() {
        return new LoggingProducerListener<>();
    }

    @Bean("kafkaConsumerFactory")
    @Primary
    public ConsumerFactory<Object, Object> kafkaConsumerFactory(
            ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
                this.properties.buildConsumerProperties());
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean("kafkaSecondConsumerFactory")
    public ConsumerFactory<Object, Object> kafkaSecondConsumerFactory(
            ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
                this.kafkaSecondProperties.buildConsumerProperties());
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }


    @Bean("zwKafkaContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> zwKafkaContainerFactory(@Qualifier(value = "kafkaSecondConsumerFactory") ConsumerFactory<Object, Object> kafkaSecondConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaSecondConsumerFactory);
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }


    @Bean("kafkaProducerFactory")
    @Primary
    public ProducerFactory<Object, Object> kafkaProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
                this.properties.buildProducerProperties());
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean("kafkaSecondProducerFactory")
    public ProducerFactory<Object, Object> kafkaSecondProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
                this.kafkaSecondProperties.buildProducerProperties());
        String transactionIdPrefix = this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
    public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
        KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
        KafkaProperties.Jaas jaasProperties = this.properties.getJaas();
        if (jaasProperties.getControlFlag() != null) {
            jaas.setControlFlag(jaasProperties.getControlFlag());
        }
        if (jaasProperties.getLoginModule() != null) {
            jaas.setLoginModule(jaasProperties.getLoginModule());
        }
        jaas.setOptions(jaasProperties.getOptions());
        return jaas;
    }

    @Bean("kafkaAdmin")
    @Primary
    public KafkaAdmin kafkaAdmin() {
        KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
        kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
        return kafkaAdmin;
    }

}


生产者


package com.ddb.zggz.event;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import com.ddb.zggz.config.ApplicationConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;

@Component
@Slf4j
public class KafkaPushEvent {


    @Resource
    private KafkaTemplate<String, String> kafkaSecondTemplate;

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ApplicationConfiguration configuration;


    public void pushEvent(PushParam param) {
        ListenableFuture<SendResult<String, String>> sendResultListenableFuture = null;
        if ("zw".equals(configuration.getEnvironment())){
            sendResultListenableFuture = kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));
        }
        if ("net".equals(configuration.getEnvironment())){
            sendResultListenableFuture = kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));
        }
        if (sendResultListenableFuture == null){
            throw new IllegalArgumentException("kakfa发送消息失败");
        }
        sendResultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("kafka发送的message报错,发送数据:{}", param);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("kafka发送的message成功,发送数据:{}", param);
            }
        });


    }


}

消费者

package com.ddb.zggz.event;

import com.alibaba.fastjson.JSONObject;

import com.ddb.zggz.config.ApplicationConfiguration;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.service.GzApprovalService;
import com.ddb.zggz.service.GzServiceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;


@Component
@Slf4j
public class SendMessageListener {

    @Autowired
    private GzApprovalService gzApprovalService;

    @Autowired
    private GzServiceService gzServiceService;

    @KafkaListener(topics = "${application.config.push-topic}", groupId = "zggz",containerFactory = "zwKafkaContainerFactory")
    @RetryableTopic(include = {Exception.class},
            backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000)
    )
    public void listen(ConsumerRecord<?, ?> consumerRecord) {
        String value = (String) consumerRecord.value();
        PushParam pushParam = JSONObject.parseObject(value, PushParam.class);

        //版本提审
        if ("version-approval".equals(pushParam.getEvent())) {
            ApprovalDTO approvalDTO = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);
            gzApprovalService.approval(approvalDTO);
        }

        //服务下架
        if (pushParam.getEvent().equals("server-OffShelf-gzt")) {
            OffShelfParam offShelfParam = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);
            gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), "ZGGZ", offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());

        }

    }
    @DltHandler
    public void processMessage(String message) {

    }
}

消息体

package com.ddb.zggz.event;

import com.alibaba.fastjson.annotation.JSONField;
import com.ddb.zggz.model.GzH5VersionManage;
import com.ddb.zggz.model.GzService;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.param.PublishParam;
import com.ddb.zggz.param.ReviewAndRollback;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * @author bbd
 */
@Data
public class PushParam implements Serializable {

    /**
     * 发送的消息数据
     */
    private Object data;
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JSONField(format = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime createTime = LocalDateTime.now();

    /**
     * 事件名称,用于消费者处理相关业务
     */
    private String event;


    /**
     * 保存版本参数
     */
    public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {
        PushParam pushParam = new PushParam();
        pushParam.setData(gzH5VersionManage);
        pushParam.setEvent("save-version");
        return pushParam;
    }

    /**
     * 保存服务参数
     */
    public static PushParam toKafkaServer(GzService gzService) {
        PushParam pushParam = new PushParam();
        pushParam.setData(gzService);
        pushParam.setEvent("save-server");
        return pushParam;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/74758.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Golang通过alibabaCanal订阅MySQLbinlog

最近在做redis和MySQL的缓存一致性&#xff0c;一个方式是订阅MySQL的BinLog文件&#xff0c;我们使用阿里巴巴的Canal的中间件来做。 Canal是服务端和客户端两部分构成&#xff0c;我们需要先启动Canal的服务端&#xff0c;然后在Go程序里面连接Canal服务端&#xff0c;即可监…

P8642 [蓝桥杯 2016 国 AC] 路径之谜

[蓝桥杯 2016 国 AC] 路径之谜 题目描述 小明冒充 X X X 星球的骑士&#xff0c;进入了一个奇怪的城堡。 城堡里边什么都没有&#xff0c;只有方形石头铺成的地面。 假设城堡地面是 n n n\times n nn 个方格。如图所示。 按习俗&#xff0c;骑士要从西北角走到东南角。 …

​比特丛林用量子纠缠对抗高智商犯罪

世界上没有绝对完美的犯罪&#xff0c;但是预谋和统筹良久的高智商犯罪都几乎接近于完美和无比烧脑。 警局的洽谈室&#xff0c;只有我和嫌疑人两个人。 各自坐在桌子两边&#xff0c;门已关。在这个封闭的空间里&#xff0c;我一手拿着筷子吃着盒饭&#xff0c;一边撇了一下…

面向云思考安全

Gartner最近的一项研究表明&#xff0c;到 2025 年&#xff0c;85% 的企业会采用云战略&#xff0c;虽然这一数字是面向全球的&#xff0c;但可以看到在中国的环境中&#xff0c;基于云所带来的优势&#xff0c;越来越多的企业也同样开始积极向云转型。 但同时&#xff0c;有报…

CSS自学框架之表单

首先我们看一下表单样式&#xff0c;下面共有5张截图 一、CSS代码 /*表单*/fieldset{border: none;margin-bottom: 2em;}fieldset > *{ margin-bottom: 1em }fieldset:last-child{ margin-bottom: 0 }fieldset legend{ margin: 0 0 1em }/* legend标签是CSS中用于定义…

【数据结构】树和二叉树

一、树的概念及结构 1、树的概念 树 是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因 为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。 有一个特殊的结点&a…

机器学习笔记值优化算法(十四)梯度下降法在凸函数上的收敛性

机器学习笔记之优化算法——梯度下降法在凸函数上的收敛性 引言回顾&#xff1a;收敛速度&#xff1a;次线性收敛二次上界引理 梯度下降法在凸函数上的收敛性收敛性定理介绍证明过程 引言 本节将介绍梯度下降法在凸函数上的收敛性。 回顾&#xff1a; 收敛速度&#xff1a;次…

Jay17 2023.8.14日报 即 留校集训阶段性总结

8.14 打了moeCTF&#xff0c;还剩一题ak Web。 Jay17-集训结束阶段性总结&#xff1a; 集训产出&#xff1a; 自集训开始以来一个半月&#xff0c;最主要做的事情有三。 一是跟课程&#xff0c;复习学过的知识&#xff0c;学习新的知识&#xff1b;目前课程已大体听完&…

问道管理:缩量小幅上涨说明什么?

股市里面&#xff0c;股票价格上涨或跌落都是常见现象。可是关于那些在商场上寻求收益的出资者来说&#xff0c;他们需要对每一个股市中的价格动摇有深化的了解&#xff0c;以便做出更正确的出资决策。最近&#xff0c;出资者们发现商场缩量小幅上涨的现象时有发生&#xff0c;…

word 应用 打不开 显示一直是正在启动中

word打开来显示一直正在启动中&#xff0c;其他调用word的应用也打不开&#xff0c;网上查了下以后进程关闭spoolsv.exe,就可以正常打开word了

redis的三种集群方式

redis有三种集群方式&#xff1a;主从复制&#xff0c;哨兵模式和集群。 1.主从复制 主从复制原理&#xff1a; 从服务器连接主服务器&#xff0c;发送SYNC命令&#xff1b; 主服务器接收到SYNC命名后&#xff0c;开始执行BGSAVE命令生成RDB文件并使用缓冲区记录此后执行的所…

C++入门篇8---vector

vecctor是动态顺序表 一、了解vector的相关接口及其功能 1.构造函数相关接口 函数声明功能介绍vector()无参构造vector(size_type n,const value_type& valvalue_type())构造并初始化n个valvector(const value& x)拷贝构造vector(InputIterator first, InputIterato…

【Git】本地搭建Gitee、Github环境

本地 &#xff08;Local&#xff09; 1、使用命令生成公钥&#xff08;pub文件&#xff09; 1. $ ssh-keygen -t rsa -C "xxxxxxxemail.com" -f "github_id_rsa" 2. $ ssh-keygen -t rsa -C "xxxxxxxemail.com" -f "gitee_id_rsa" …

解读spring中@Value 如何将配置转自定义的bean

实现方式 着急寻求解决方式的猿友先看这块 定义配置转化类 public class UserConverter implements Converter<String, List<User>> {Overridepublic List<User> convert(String config) {if (StringUtils.isEmpty(config)) {return Collections.emptyLis…

数组的详述(1)

1、一维数组的创建和初始化&#xff1a; 数组是一组相同类型元素的集合。 &#xff08;1)创建方式&#xff1a; type_t 是指数组的元素类型 arr_name 数组名 [const_n] 一个常量表达式&#xff0c;用来指定数组的大小 //在c99标准之前&#xff0c;数组的大小必须是常…

普通上班族学Python有用吗?

普通上班族学Python有用吗&#xff1f;对于广大上班族而言&#xff0c;时间和精力主要问题&#xff0c;学习Python编程语言为了能提高工作效率。学Python不是单纯的为了增加知识储备&#xff0c;Python本质上是一个工具和手段&#xff0c;最终目的是要通过它来帮我们解决实际工…

Falco操作系统安全威胁监测利器

原理简介 Falco是一个开源的云原生安全工具&#xff0c;用于检测和防御容器和云原生环境中的安全威胁。它基于Linux内核的eBPF技术&#xff0c;通过监控系统调用和内核事件来实现安全检测和响应。 具体来说&#xff0c;Falco的实现原理如下&#xff1a; 1. 内核模块&#xf…

并发编程系列-Semaphore

Semaphore&#xff0c;如今通常被翻译为"信号量"&#xff0c;过去也曾被翻译为"信号灯"&#xff0c;因为类似于现实生活中的红绿灯&#xff0c;车辆是否能通行取决于是否是绿灯。同样&#xff0c;在编程世界中&#xff0c;线程是否能执行取决于信号量是否允…

添加vue devtools扩展工具+添加后F12不显示Vue图标

前言&#xff1a;在开启Vue学习之旅时&#xff0c;遇到问题两个问题&#xff0c;第一添加不上vue devtools扩展工具&#xff0c;第二添加完成后&#xff0c;F12不显示Vue图标。查阅了很多博客&#xff0c;自己解决了问题&#xff0c;故写此博客记录。如果你遇到和我一样的问题&…

item_get_sales-获取商品销量详情

一、接口参数说明&#xff1a; item_get_sales-获取商品销量详情&#xff0c;点击更多API调试&#xff0c;请移步注册API账号点击获取测试key和secret 公共参数 请求地址: https://api-gw.onebound.cn/taobao/item_get_sales 名称类型必须描述keyString是调用key&#xff08…