SpringBoot 集成 Kafka 高级实现

1、简介

        之前博客中记录了直接使用Kafka客户端实现生产者和消费者之间的交互,这种方式通过设置各种参数编码繁琐,因此通过SpringBoot集成Kafka成为一种常用的实现,下面就详细介绍 SpringBoot 是如何和Kafka进行集成的,本文主要参考官网进行学习(Messaging)。

2、引入依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
3、生产者
3.1、生产者配置文件
spring:
  kafka:
    bootstrap-servers: node-1:9092,node-2:9092
    producer:
      # 指定key-value 序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 指定缓冲区一批大小,默认16k
      batch-size: *
      # 指定缓冲区总大小,默认32m
      buffer-memory: *
      # 指定消息确认方式
      acks: -1
      # 指定消息发送压缩方式
      compression-type: snappy
      # 配置额外参数
      properties:
        # 自定义分区
        partitioner.class:
        # 指定发送延迟时间,默认0ms
        linger.ms: 
3.2、java代码实现生产者发送消息
public class KafkaProducerController {
    @Autowired
    private KafkaTemplate<String, String> template;

    public String test(){
// 发送消息,send方法有不同的重载
        template.send("topic1", "hello long!");
        return "ok!";
    }
}
3.3、自定义 KafkaTemplate

         通过自定义的 KafkaTemplate 可以快速指定需要的生产者参数,能够做到高度可控,灵活编码。

@SpringBootConfiguration
@ConfigurationProperties(prefix="kafka")
public class KafkaProducerConfig {
    private String bootstrap_servers_config;
    private String retries_config;
    private String batch_size_config;
    private String linger_ms_config;
    private String buffer_memory_config;
    private String topic;
    @Bean
    public KafkaTemplate kafkaTemplate(){
        HashMap<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap_servers_config);
        configs.put(ProducerConfig.RETRIES_CONFIG,retries_config);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG,batch_size_config);
        configs.put(ProducerConfig.LINGER_MS_CONFIG,linger_ms_config);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG,buffer_memory_config);
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,RoundRobinPartitioner.class);
        //设置序列化
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        //设置自定义分区
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configs);
        return new KafkaTemplate(producerFactory);
    }
}
4、消费者
4.1、消费者配置文件
spring:
  kafka:
    bootstrap-servers: node-1:9092,node-2:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test  # 指定消费者组
4.2、消费者java代码实现
@SpringBootConfiguration
public class KafkaConsumerConfig {
    @KafkaListener(topics = {"topic1"})
    public void consumer(String msg){
        System.out.println("consumer massage from kafka: " + msg);
    }
}
4.3、@KafkaListener参数详解

注:topicPartitions和topics、topicPattern不能同时使用

示例如下:

@KafkaListener(id = "test1", // 监听器ID(唯一)
               groupId = "test", // 设置消费者组
               topicPartitions = { // 配置topic和分区:有两个topic,分别为topic1、topic2,topic1只接收分区0,2的消息;
                    // topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为10
                    @TopicPartition(topic = "topic1", partitions = { "0", "2" }),                      
                    @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = 
                                 @PartitionOffset(partition = "1", initialOffset = "9"))},
                    properties = {"enable.auto.commit:false","max.poll.interval.ms:6000"})

参数详解: 

参数描述

 topic

指定要监听哪些topic(与topicPattern、topicPartitions 三选一)
topicPattern匹配Topic进行监听(与topics、topicPartitions 三选一)
topicPartitions显式分区分配
errorHandler异常处理,填写 beanName
properties配置其他属性

注意:

1)、消费者组使用优先级:groupId > id > 配置文件中指定的消费者组。

2)、如果 groupId 不存在,id 存在,但是在注解中将 idIsGroup 设置为 false,则使用配置文件中的消费者组。

3)、@KafkaListener 注解中和配置文件中的相同配置优先级高于配置文件。

4.4、自定义消费者异常
@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
    	//do someting
        return null;
    }
}
5、总结

        本文详细介绍 SpringBoot 集成 kafka,举例说明生产者和消费者的使用方式,以及一些自定义参数如何配置,帮助大家进一步熟悉在 SpringBoot 框架下 kafka的使用。关于kafka 幂等性、事务等更高级用法,将会在公众号分享。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/312741.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

响应式Web开发项目教程(HTML5+CSS3+Bootstrap)第2版 例3-4 CSS 立方体

代码 <!doctype html> <html> <head> <meta charset"utf-8"> <title>CSS 立方体</title> <link href"CSS/style.css" rel"stylesheet" type"text/css"> <style> .box {width: 200px…

解密!神奇代码消除 Vue 中 Mac 电脑左滑右滑页面跳转

想知道如何让Mac电脑左滑右滑不再意外跳转页面吗&#xff1f;本文将揭示一个独家秘籍&#xff0c;通过简单的一行代码&#xff0c;让你的用户体验飞速提升&#xff01;别错过这个让你的Vue表格组件更顺畅的宝贵技巧&#xff01; 最近&#xff0c;我在使用 Vue 开发表格组件时遇…

初识Hadoop-概述与关键技术

一.大数据概述 1.什么是大数据 高速发展的信息时代&#xff0c;新一轮科技革命和变革正在加速推进&#xff0c;技术创新日益成为重塑经济发展模式和促进经济增长的重要驱动力量&#xff0c;而“大数据”无疑是核心推动力。 那么&#xff0c;什么是“大数据”呢&#xff1…

odoo linux环境打印乱码或无内容

在odoo打印中会遇到乱码或者无内容显示&#xff0c;需要安装一些包 sudo apt-get install ttf-wqy-zenhei sudo apt-get install ttf-wqy-microhei安装前 安装后

Oladance、南卡、Cleer开放式耳机怎么样?全方位测评大PK!

​开放式耳机作为新兴的音频设备领域中备受欢迎的选择&#xff0c;但市场上琳琅满目的产品汇集了质量千差万别的耳机&#xff0c;其中存在着一些粗制滥造的产品。身为一位音频设备测评博主&#xff0c;我经常收到有关哪个品牌的开放式耳机质量好的疑问。面对市面上众多选择&…

数据结构(三)堆和哈希表

目录 哈希表和堆什么是哈希表 &#xff1f;什么是堆 &#xff1f;什么是图 &#xff1f;案例一&#xff1a;使用python实现最小堆案例二 &#xff1a; 如何用Python通过哈希表的方式完成商品库存管理闯关题 &#xff08;包含案例三&#xff1a;python实现哈希表&#xff09; 本…

谷歌浏览器安装不在默认安装位置Selenium无法打开解决方法

Selenium之cannot find Chrome binary错误-CSDN博客 上面是我找的解决方案的链接 通过option.setBinary()的方法来指定谷歌浏览器的实际运行文件路径&#xff1b; 下面是结合我这边具体情况下写的代码 option.setBinary()中的路径是谷歌浏览器运行文件的路径&#xff1b;Sy…

SGX Enclave Measurement

文章目录 前言一、简介二、Measuring ECREATE三、Measuring Enclave Attributes四、Measuring EADD五、Measuring EEXTEND六、Measuring EINIT 前言 本文来自 Intel SGX Explained 一、简介 SGX&#xff08;Intel Software Guard Extensions&#xff09;实现了一种软件认证方…

Java多线程并发篇----第八篇

系列文章目录 文章目录 系列文章目录前言一、简述一下你对线程池的理解二、线程生命周期(状态)三、新建状态(NEW)四、就绪状态(RUNNABLE)五、运行状态(RUNNING)前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站…

基于Python的在线考试系统-计算机毕业设计源码78268

摘 要 本论文主要论述了如何使用python语言、Django框架开发一个在线考试系统&#xff0c;本系统将严格按照软件开发流程&#xff0c;进行各个阶段的工作&#xff0c;面向对象编程思想进行项目开发。在引言中&#xff0c;作者将论述该系统的当前背景以及系统开发的目的&#xf…

Memory Wall in Neural Network Inference

Memory Wall in Neural Network Inference 神经网络推理的瓶颈在于访存带宽&#xff0c;通常无法发挥出加速器的全部算力。本文总结了目前常用的推理加速器及其设计&#xff0c;并分析了常用神经网络的访存瓶颈。文章大部分内容参考自Computer Architecture: A Quantitative A…

审稿变慢?还疯狂拒稿?这本毕业神刊如今争议不断,还值得一投吗?

【SciencePub学术】 IEEE ACCESS 期刊评说 网友辣评 评说1&#xff1a;麻了&#xff0c;11月17收到外审&#xff0c;现在意见还没回来啊&#xff0c;神刊肿么了&#xff1f; 评说2&#xff1a;两个审稿人评审的&#xff0c;一个拒绝&#xff08;最终意见大修&#xff09;&…

SpringBoot 把PageHelper分页信息返回给前端

第1步&#xff1a;定义线程容器收纳HttpHeaders和HttpStatus import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus;public class ResponseUtils {private static ThreadLocal<HttpHeaders> ThreadLocalHeaders new InheritableT…

07- OpenCV:模糊图像

目录 一、模糊原理 二、模糊的相关处理方法&#xff1a; 1、均值滤波&#xff08;归一化盒子滤波&#xff09; 2、高斯滤波&#xff08;正态分布的形状&#xff09; 3、中值模糊 4、双边模糊算法&#xff08;美容软件&#xff09; 5、相关代码&#xff1a; 6、几种模糊算法的比…

网页设计达人的首选!这6款顶级工具助你设计完美网页!

即时设计 即时设计是国内为当地设计师量身定制的完全免费的网页设计工具。是集成原型、设计、交互、交付等所有网页设计需求的一站式设计平台。内部集成了大量优秀的插件&#xff0c;包括组件、图标、字体、色板、填充等功能&#xff0c;基本涵盖了网页设计师常用的大部分工具…

Redis常见命令

我们可以通过Redis的中文文档&#xff1a;Redis命令中心&#xff08;Redis commands&#xff09; -- Redis中国用户组&#xff08;CRUG&#xff09;&#xff0c;来学习各种命令。 也可以通过菜鸟教程官网来学习&#xff1a;Redis 键(key) | 菜鸟教程 一、Redis数据结构介绍 Red…

Elasticsearch--Master选举

角色 主节点&#xff08;active master&#xff09;&#xff1a;一般指的是活跃的主节点&#xff0c;避免负载任务&#xff0c;主节点主要用来管理集群&#xff0c;专用master节点仍将充当协调节点 候选节点&#xff08;master-eligible nodes&#xff09;&#xff1a;默认具备…

堆叠线:实现高效连接和数据传输的利器

堆叠线是一种常见的网络连接解决方案&#xff0c;主要应用于数据中心和企业网络等领域。本文将介绍堆叠线的定义、分类、作用以及与光纤线的区别&#xff0c;同时提供详细的堆叠线接法和相关问题的解答。 第一部分&#xff1a;堆叠线是什么 堆叠线是一种用于连接网络设备的高…

一行代码给Button添加一个光标焦点动画:得着焦点按钮放大,失去焦点按钮恢复

当光标进入Button的时候&#xff0c;也就是Button得着焦点时&#xff0c;Button出现放大效果&#xff0c;失去焦点的时候&#xff0c;恢复原来的尺寸。 本例仅供学习交流之用 一、效果 按钮得着焦点&#xff0c;放大 按钮失去焦点&#xff0c;恢复 二、给按钮添加动效 得着…

vbs读取数据库值前端FlexGrid前导0出不来的原因

vbs读取数据库值前端FlexGrid前导0出不来的原因 原因 系统设置问题 解决 修改系统默认数值显示&#xff1a; 1&#xff09;控制面板找到“区域”&#xff0c;点击“更改日期、时间和数字模式”&#xff0c;在弹出窗口点击“其他设置” 2&#xff09;在数字一栏中的“显示前…