springboot 连接 kafka集群(kafka版本 2.13-3.4.0)

springboot 连接 kafka集群

  • 一、环境搭建
    • 1.1 springboot 环境
    • 1.2 kafka 依赖
  • 二、 kafka 配置类
    • 2.1 发布者
      • 2.1.1 配置
        • 2.1.2 构建发布者类
        • 2.1.3 发布消息
    • 2.2 消费者
      • 2.2.1 配置
      • 2.2.2 构建消费者类
      • 2.2.3 进行消息消费

一、环境搭建

1.1 springboot 环境

JDK 11+
Maven 3.8.x+
springboot 2.5.4 +

1.2 kafka 依赖

springboot的pom文件导入

       <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

二、 kafka 配置类

2.1 发布者

2.1.1 配置

发布者我们使用 KafkaTemplate 来进行消息发布,所以需要先对其进行一些必要的配置。

@Configuration
@EnableKafka
public class KafkaConfig {


     /***** 发布者 *****/

    //生产者工厂
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    //生产者配置
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    //生产者模板
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

2.1.2 构建发布者类

配置完发布者,下来就是发布消息,我们需要继承 ProducerListener<K, V> 接口,该接口完整信息如下:

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

实现该接口的方法,我们可以获取包含发送结果(成功或失败)的异步回调,也就是可以在这个接口的实现中获取发送结果。

我们简单的实现构建一个发布者类,接收主题和发布消息参数,并打印发布结果。

@Component
public class KafkaProducer implements ProducerListener<Object,Object> {

    private static final Logger producerlog = LoggerFactory.getLogger(KafkaProducer.class);

    private final KafkaTemplate<Integer, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<Integer, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void producer (String msg,String topic){
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic,0, msg);
        future.addCallback(new KafkaSendCallback<Integer, String>() {

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                producerlog.info("发送成功 {}", result);
            }

            @Override
            public void onFailure(KafkaProducerException ex) {
                ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
                producerlog.info("发送失败 {}",failed);
            }

        });
    }

}

2.1.3 发布消息

写一个controller类来测试我们构建的发布者类,这个类中打印接收到的消息,来确保信息接收不出问题。

@RestController
public class KafkaTestController {

    private static final Logger kafkaTestLog = LoggerFactory.getLogger(KafkaTestController.class);

    @Resource
    private KafkaProducer kafkaProducer;

    @GetMapping("/kafkaTest")
    public void kafkaTest(String msg,String topic){
        kafkaProducer.producer(msg,topic);
        kafkaTestLog.info("接收到消息 {} {}",msg,topic);
    }
}

一切准备就绪,我们启动程序利用postman来进行简单的测试。

进行消息发布:
在这里插入图片描述

发布结果:
在这里插入图片描述
可以看到消息发送成功。

我们再看看kafka消费者有没有接收到消息:

在这里插入图片描述

看以看到,kakfa的消费者也接收到了消息。

2.2 消费者

2.2.1 配置

消息的接受有多种方式,我们这里选择的是使用 @KafkaListener 注解来进行消息接收。它的使用像下面这样:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}

看起来不是太难吧,但使用这个注解,我们需要配置底层 ConcurrentMessageListenerContainer.kafkaListenerContainerFactor。

我们在原来的kafka配置类 KafkaConfig 中,继续配置消费者,大概就像下面这样

@Configuration
@EnableKafka
public class KafkaConfig {


     /***** 发布者 *****/

    //生产者工厂
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    //生产者配置
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    //生产者模板
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /***** 消费者 *****/

    //容器监听工厂
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    //消费者工厂
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    //消费者配置
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);
        return props;
    }
}

注意,要设置容器属性必须使用getContainerProperties()工厂方法。它用作注入容器的实际属性的模板

2.2.2 构建消费者类

配置好后,我们就可以使用这个注解了。这个注解的使用有多种方式:

1、用它来覆盖容器工厂的concurrency和属性

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

2、可以使用显式主题和分区(以及可选的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}


3、将初始偏移应用于所有已分配的分区

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
             partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}


4、指定以逗号分隔的分区列表或分区范围

@KafkaListener(id = "pp", autoStartup = "false",
        topicPartitions = @TopicPartition(topic = "topic1",
                partitions = "0-5, 7, 10-15"))
public void process(String in) {
    ...
}


5、可以向侦听器提供Acknowledgment

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}


6、添加标头

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}


我们这里写一个简单的,只用它来接受指定主题的数据:

@Component
public class KafkaConsumer {

    private static final Logger consumerlog = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topicPartitions  = @TopicPartition(topic = "kafka-topic-test",
            partitions = "0"))
    public void consumer (String data){
        consumerlog.info("消费者接收数据 {}",data);
    }
}

这里解释一下,因为我们进行了手动分配主题/分区,所以 注解中group.id 可以为空。若要指定group.id请在消费者配置中加上props.put(ConsumerConfig.GROUP_ID_CONFIG, “bzt001”); 或在 @TopicPartition 注解后加上 groupId = “组id”

2.2.3 进行消息消费

继续使用postman调用我们写好的发布者发布消息,观察控制台的消费者类是否有相关日志出现。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/28211.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CSDN问答机器人

文章目录 前言一、背景二、总体流程三、构建知识库四、粗排五、精排六、Prompt总结相关博客 前言 先看结果: 已经连续很多周获得了第二名(万年老二), 上周终于拿了一回第一, 希望继续保持. &#x1f601; 这是今天的榜单, 采纳的数量相对较少, 之前基本上维持在100 重点说明…

SpringBoot项目实战:自定义异常和统一参数验证(附源码)

你好&#xff0c;我是田哥 在实际开发过程中&#xff0c;不可避免的是需要处理各种异常&#xff0c;异常处理方法随处可见&#xff0c;所以代码中就会出现大量的try {...} catch {...} finally {...} 代码块&#xff0c;不仅会造成大量的冗余代码&#xff0c;而且还影响代码的可…

母婴商家怎么建立自己的品牌,母婴产品传播渠道总结

随着互联网的发展逐渐深入我们的生活&#xff0c;线上传播的模式也越来越被大家熟知。越来越多的行业开始重视线上传播。那么母婴商家怎么建立自己的品牌&#xff0c;母婴产品传播渠道总结。 其实&#xff0c;母婴产品线上用户群体众多&#xff0c;且母婴产品用户目的明确&…

深入解析IT专业分类、方向及就业前景:高考毕业生如何选择适合自己的IT专业?重点探索近年来人工智能专业发展及人才需求

目录 一、IT专业的就业前景和发展趋势二、了解IT专业的分类和方向三、你对本专业的看法和感想四、本专业对人能力素养的要求五、建议和思考其它资料下载 当今社会&#xff0c;信息技术行业以其迅猛的发展和无限的潜力成为了吸引无数年轻人的热门选择。特别是对于高考毕业生来说…

你的企业还没搭建这个帮助中心网页,那你太落后了!

作为现代企业&#xff0c;拥有一个完善的帮助中心网页已经成为了不可或缺的一部分。帮助中心网页不仅可以提供给用户有关产品或服务的详细信息&#xff0c;还可以解答用户的疑问和提供技术支持&#xff0c;使用户在使用产品或服务时遇到问题可以很快地得到解决。因此&#xff0…

论文阅读和分析:Binary CorNET Accelerator for HR Estimation From Wrist-PPG

主要贡献&#xff1a; 一种完全二值化网络(bCorNET)拓扑结构及其相应的算法-架构映射和高效实现。对CorNET进行量化后&#xff0c;减少计算量&#xff0c;又能实现减轻运动伪影的效果。 该框架在22个IEEE SPC受试者上的MAE为6.675.49 bpm。该设计采用ST65 nm技术框架&#xff…

数据结构--队列2--双端队列--java双端队列

介绍 双端队列&#xff0c;和前面学的队列和栈的区别在于双端队列2端都可以进行增删&#xff0c;其他2个都是只能一端可以增/删。 实现 链表 因为2端都需要可以操作所以我们使用双向链表 我们也需要一共头节点 所以节点设置 static class Node<E>{E value;Node<E…

jetpack compose —— Card

jetpack compose Card 组件提供了一种简单而强大的方式来呈现卡片式的用户界面。 一、什么是 Card 组件 二、基本用法 三、属性和修饰符 四、嵌套使用和复杂布局 一、什么是 Card 组件 Card 是 Jetpack Compose 中的一个常用组件&#xff0c;用于呈现卡片式的用户界面。它…

Javaweb学习路线(3)——SpringBoot入门、HTTP协议与Tomcat服务器

一、SpringBoot入门 &#xff08;一&#xff09;第一个Springboot案例 1、创建Springboot工程&#xff0c;添加依赖。 2、定义类&#xff0c;添加方法并添加注释 3、运行测试。 pom.xml&#xff08;框架自动生成&#xff09; <?xml version"1.0" encoding&quo…

不同等级的Pads工程师,薪资差距有多大?

作为一种广泛应用在PCB设计的EDA工具&#xff0c;Pads软件在中国的电子设计行业中有着重要地位&#xff0c;尤其是不同等级的Pads工程师&#xff0c;在薪资、工作范围等有很大的差异&#xff0c;本文将从中国出发&#xff0c;多方面分析对比不同等级的Pads工程师&#xff0c;希…

24个Jvm面试题总结及答案

1.什么是Java虚拟机&#xff1f;为什么Java被称作是“平台无关的编程语言”&#xff1f; Java虚拟机是一个可以执行Java字节码的虚拟机进程。Java源文件被编译成能被Java虚拟机执行的字节码文件。 Java被设计成允许应用程序可以运行在任意的平台&#xff0c;而不需要程序员为每…

【VMware】虚拟机安装centos7

目录 一、创建虚拟机 1、自定义 2、选择需要安装的操作系统 3、选择虚拟机安装位置 4、选择处理器配置&#xff08;可先默认&#xff09; 5、设置虚拟内存&#xff08;一般4096&#xff09; 6、选择网络连接方式 7、选择I/O控制器 8、选择磁盘类型 9、选择磁盘 10、指定磁盘容…

国内云服务器全面对比

想要领取优惠券购买云服务可以前往我的云服务器领券购买。 经过疫情三年&#xff0c;大多行业开始复苏&#xff0c;企业开始布局以后得发展&#xff0c;云服务器作为企业发展几乎是必须的&#xff0c;一个企业从无到有&#xff0c;要经历很多&#xff0c;比如企业官网搭建&…

解密混沌工程——混沌工程价值

在数字化转型、十四五规划的大背景 下&#xff0c;大规模上云、分布式的核心改造等“云化”逐渐走进企业。 但是&#xff0c;云化的发展&#xff0c;使企业系统的复杂度呈指数级增长&#xff0c;故障越来越多。 企业在数字化转型中拥抱云计算、 信创国产化、分布式核心等新技…

DVWA-Command Injection

大约 命令注入攻击的目的是在易受攻击的应用程序中注入和执行攻击者指定的命令。 在这种情况下&#xff0c;执行不需要的系统命令的应用程序就像一个伪系统外壳&#xff0c;攻击者可能会使用它 作为任何授权的系统用户。但是&#xff0c;命令的执行权限和环境与 Web 服务具有的…

JVM 调优分析 如何进行JVM调优

文章目录 1.为什么需要进行JVM调优&#xff1f;2.什么情况下可能需要JVM调优3.JVM调优参数4.JVM调优参数设置参考5.JVM内部结构1. 类加载器&#xff08;Class Loader&#xff09;2. 运行时数据区&#xff08;Runtime Data Area&#xff09;3. 垃圾收集器&#xff08;Garbage Co…

jmeter如何将上一个请求的结果作为下一个请求的参数

目录 1、简介 2、用途 3、下载、简单应用 4、如何将上一个请求的结果作为下一个请求的参数 1、简介 在JMeter中&#xff0c;可以通过使用变量来将上一个请求的结果作为下一个请求的参数传递。 ApacheJMeter是Apache组织开发的基于Java的压力测试工具。用于对软件做压力测…

adb shell 调试 Android 串口 百度AI也很

在 Android 平台上进行串口调试需要使用 Android Debug Bridge (ADB) 工具。ADB 是一个命令行工具&#xff0c;可以通过 USB 连接 Android 设备&#xff0c;并执行各种命令来调试应用程序。 以下是使用 ADB shell 进行 Android 串口调试的步骤&#xff1a; 连接 Android 设备…

低代码开发平台介绍

低代码开发平台近两年发展迅猛&#xff0c;并迅速渗透到各个细分领域。本文简要介绍低代码开发的概念以及特性&#xff0c;并结合低代码开发的应用场景介绍两个低代码开发平台。 1、低代码开发概念 1.1 低代码开发介绍 低代码开发&#xff08;Low-code Development&#xff0…

3D格式转换工具HOOPS Exchange​助力Zuken打造电子设计自动化产品

行业&#xff1a;电子制造 挑战&#xff1a;对制造商来说&#xff0c;电子设计变得越来越复杂 - 电气和机械设计的融合需要将二维和三维数据结合起来 - 需要提供对多种不同CAD格式的支持 解决方案&#xff1a;HOOPS Exchange是用于快速、准确的CAD数据转换的领先SDK&#xff…