SpringBoot日常:集成Kafka

文章目录

      • 1、pom.xml文件
      • 2、application.yml
      • 3、生产者配置类
      • 4、消费者配置类
      • 5、消息订阅
      • 6、生产者发送消息
      • 7、测试发送消息

本章内容主要介绍如何在springboot项目对kafka进行整合,最终能达到的效果就是能够在项目中通过配置相关的kafka配置,就能进行消息的生产和消费。

1、pom.xml文件

原本项目用 Spring Boot 的版本为2.6.X,所以这里用spring-cloud-starter-stream-kafka的版本用的是2.2.1.RELEASE,也可以用其他版本,但是注意兼容性,不然会编译运行报错

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2021.0.2</version>  <!-- 确保与 Spring Boot 2.6.x 兼容 -->
            <scope>import</scope>
            <type>pom</type>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
	<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        <version>2.2.1.RELEASE</version>
    </dependency>
</dependencies>

2、application.yml

添加kafka的相关配置

spring:
  kafka:
    bootstrap-servers: 192.168.102.179:9092
    producer:
      acks: 1
      retries: 0
      batch-size: 30720000
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #消费者配置
    consumer:
      group-id: test-kafka
      #是否开启手动提交 默认自动提交
      enable-auto-commit: true
      #如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔
      auto-commit-interval: 5000
      #earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #一次调用 poll() 操作时返回的最大记录数 默认为 500 条
      max-poll-records: 2
      #kafka session timeout
      session:
        timeout:
          ms: 300000
    listener:
      #kafka 没有创建指定的 topic 下  项目启动是否报错 true  false
      missing-topics-fatal: false
      #Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
      type: single
      ack-mode: manual_immediate

3、生产者配置类

添加一个生产者配置类KafkaProducerConfig ,主要设置消息的序列化方式等消息处理方式

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author 码至终章
 * @Date 2025/1/8 11:33
 * @Version 1.0
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean("myProducerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(4);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> newProducerFactory() {
        return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(newProducerFactory());
    }

}

4、消费者配置类

创建一个消费者配置类KafkaConsumerConfig,主要设置一些消息的接收处理配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author 码至终章
 * @Date 2025/1/8 12:09
 * @Version 1.0
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String offsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitIntervalMs;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean("myConsumerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(12);
        //是否自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //kafak 服务器
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        //消费组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //一次调用poll()操作时返回的最大记录数,默认值为500
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //自动提交时间间隔 默认 5秒
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        //props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        return props;
    }


    /**
     * 消费者工厂
     */
    @Bean("myContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));
        // 并发创建的消费者数量
        factory.setConcurrency(3);
        // 开启批处理
        factory.setBatchListener(true);
        //拉取超时时间
        factory.getContainerProperties().setPollTimeout(1500);
        //是否自动提交 ACK kafka 默认是自动提交
        if (!enableAutoCommit) {
            //共有其中方式
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
        }
        return factory;
    }
}

5、消息订阅

创建一个消费者监听消息类,里面对主题消息监听,这里的测试主题为testone

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @Author 码至终章
 * @Date 2025/1/8 14:19
 * @Version 1.0
 */
@Slf4j
@Component
public class MyKafkaConsumer {

    @KafkaListener(id = "my-kafka-consumer",
            idIsGroup = false, topics = "topicone",
            containerFactory = "myContainerFactory")
    public void listen(String message) {
        log.info("接收到主题消息,消息内容:{}", message);
    }
}

6、生产者发送消息

为了方便调用测试,这里在controller编写一个方法发送消息

@RestController
@Slf4j
public class TestController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping ("/sendMessage")
    public void sendMessage(@RequestParam String message) {
        this.kafkaTemplate.send("topicone", message);
    }
}

7、测试发送消息

这里简单用postman调用接口发送一条消息
在这里插入图片描述
从idea的程序控制台可以看到消费者监听可以正常接收到消息
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/950733.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分布式IO模块:激光切割机产线高效控制的创新引擎

在智能制造的浪潮中&#xff0c;激光切割技术以其高精度、高效率的特点&#xff0c;成为了现代工业生产中不可或缺的一部分。特别是在汽车制造、航空航天、电子设备及精密零部件加工等领域&#xff0c;激光切割机以其无与伦比的切割精度和灵活性&#xff0c;引领着制造业的转型…

RK3562编译Android13 ROOT固件教程,触觉智能开发板演示

本文介绍编译Android13 ROOT权限固件的方法&#xff0c;触觉智能RK3562开发板演示&#xff0c;搭载4核A53处理器&#xff0c;主频高达2.0GHz&#xff1b;内置独立1Tops算力NPU&#xff0c;可应用于物联网网关、平板电脑、智能家居、教育电子、工业显示与控制等行业。 关闭seli…

wireshark抓包工具新手使用教程

wireshark抓包工具新手入门使用教程 一、Wireshark软件安装二、Wireshark 抓包示范三、Wireshakr抓包界面四、Wireshark过滤器设置五、wireshark过滤器表达式的规则六、Wireshark抓包分析TCP三次握手七、Wireshark分析常用列标签格式 Wireshark是一款开源的网络协议分析工具&am…

如何用Python编程实现自动整理XML发票文件

传统手工整理发票耗时费力且易出错&#xff0c;而 XML 格式发票因其结构化、标准化的特点&#xff0c;为实现发票的自动化整理与保存提供了可能。本文将详细探讨用python来编程实现对 XML 格式的发票进行自动整理。 一、XML 格式发票的特点 结构化数据&#xff1a;XML 格式发票…

【网络安全 | 漏洞挖掘】HubSpot 全账户接管(万字详析)

未经许可,不得转载。 今天我们将分享一个关于在 Bugcrowd 平台的 HubSpot 公共漏洞赏金计划中实现全账户接管的故事。 文章目录 正文SQL 注入主机头污染(Host Header Poisoning)负载均衡器主机头覆盖(Load Balancer Host Header Override)Referer Header 测试ORIGIN Heade…

2025_0105_生活记录

3号去内蒙看了流星雨。还记得上次看流星的时间是2018年&#xff0c;也是冬天&#xff0c;大家在雁栖湖校区的操场上仰望星空。那个时候幸运的看到了一颗流星&#xff0c;便迅速地在心里许愿。这次看到了三颗流星&#xff0c;我也许了愿&#xff0c;希望实现。 24年走过了十多个…

(四)ROS通信编程——服务通信

前言 学完了话题通信其实操作流程基本都已经很熟悉了&#xff0c;因此服务通讯的学习就会流畅许多。 服务通信也是ROS中一种极其常用的通信模式&#xff0c;服务通信是基于请求响应模式的&#xff0c;是一种应答机制。也即: 一个节点A向另一个节点B发送请求&#xff0c;B接收…

《繁星路》V1.8.3(Build16632266)官方中文学习版

《繁星路》官方中文版https://pan.xunlei.com/s/VODae2_2Z3QyMF02I5y321uHA1?pwdqgsh# 作为一款星际模拟游戏&#xff0c;完美融合了硬科幻元素与基地建设玩法&#xff0c;体验改造行星的恢弘与壮阔。化身人工意识AMI&#xff0c;遵照基本指示推进火星改造的各项工作&#xf…

学习threejs,导入wrl格式的模型

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️THREE.VRMLLoader wrl模型加…

Element-plus、Element-ui之Tree 树形控件回显Bug问题。

需求&#xff1a;提交时&#xff0c;需要把选中状态和半选中状态 的数据id提交。如图所示&#xff1a; 数据回显时&#xff0c;会出现代码如下&#xff1a; <template><el-tree ref"treeRef" :data"data" show-checkbox node-key"id" …

专家混合(MoE)大语言模型:免费的嵌入模型新宠

专家混合&#xff08;MoE&#xff09;大语言模型&#xff1a;免费的嵌入模型新宠 今天&#xff0c;我们深入探讨一种备受瞩目的架构——专家混合&#xff08;Mixture-of-Experts&#xff0c;MoE&#xff09;大语言模型&#xff0c;它在嵌入模型领域展现出了独特的魅力。 一、M…

cursor试用出现:Too many free trial accounts used on this machine 的解决方法

文章精选推荐 1 JetBrains Ai assistant 编程工具让你的工作效率翻倍 2 Extra Icons&#xff1a;JetBrains IDE的图标增强神器 3 IDEA插件推荐-SequenceDiagram&#xff0c;自动生成时序图 4 BashSupport Pro 这个ides插件主要是用来干嘛的 &#xff1f; 5 IDEA必装的插件&…

若依 ruoyi-vue HandlerInterceptor 拦截器 文件接口自定义权限

文件资源添加自定义权限 package com.huida.framework.config;import com.huida.framework.interceptor.FileInterceptor; import com.huida.framework.interceptor.RequestInterceptor; import org.springframework.beans.factory.annotation.Autowired; import org.springfr…

网络安全-web渗透环境搭建-BWAPP(基础篇)

01--所需系统环境&#xff1a; 虚拟主机系统部署&#xff08;vmware&#xff0c;虚拟主机创建、虚拟主机网络配置&#xff08;桥接&#xff0c;便于网络中多个主机都能访问虚拟主机&#xff09;、虚拟软件功能&#xff0c;快照、克隆、镜像文件加载&#xff0c;ova文件制作&am…

【计算机网络】课程 实验四 配置快速生成树协议(RSTP)

实验四 配置快速生成树协议&#xff08;RSTP&#xff09; 一、实验目的 1&#xff0e;理解快速生成树协议RSTP的工作原理。 2&#xff0e;掌握如何在交换机上配置快速生成树。 二、实验分析与设计 【背景描述】 某学校为了开展计算机教学和网络办公&#xff0c;建立了一个计…

API架构风格的深度解析与选择策略:SOAP、REST、GraphQL与RPC

❃博主首页 &#xff1a; 「码到三十五」 &#xff0c;同名公众号 :「码到三十五」&#xff0c;wx号 : 「liwu0213」 ☠博主专栏 &#xff1a; <mysql高手> <elasticsearch高手> <源码解读> <java核心> <面试攻关> ♝博主的话 &#xff1a…

目标检测文献阅读-DETR:使用Transformer进行端到端目标检测

目录 摘要 Abstract 1 引言 2 DETR结构 2.1 Backbone 2.2 Encoder 2.3 Decoder 2.4 FFN 3 目标检测集合预测损失 3.1 二分图匹配损失 3.2 损失函数 总结 摘要 本周阅读的论文题目是《End-to-End Object Detection with Transformers》(使用Transformer进行端到端目…

服务器双网卡NCCL通过交换机通信

1、NCCL变量设置 export CUDA_DEVICE_MAX_CONNECTIONS1 export NCCL_SOCKET_IFNAMEeno2 export NCCL_IB_DISABLE0 #export NCCL_NETIB export NCCL_IB_HCAmlx5_0,mlx5_1 export NCCL_IB_GID_INDEX3 export NCCL_DEBUGINFOGPUS_PER_NODE4MASTER_ADDR192.168.1.2 MASTER_PORT600…

B树及其Java实现详解

文章目录 B树及其Java实现详解一、引言二、B树的结构与性质1、节点结构2、性质 三、B树的操作1、插入操作1.1、插入过程 2、删除操作2.1、删除过程 3、搜索操作 四、B树的Java实现1、节点类实现2、B树类实现 五、使用示例六、总结 B树及其Java实现详解 一、引言 B树是一种多路…

数据分析思维(八):分析方法——RFM分析方法

数据分析并非只是简单的数据分析工具三板斧——Excel、SQL、Python&#xff0c;更重要的是数据分析思维。没有数据分析思维和业务知识&#xff0c;就算拿到一堆数据&#xff0c;也不知道如何下手。 推荐书本《数据分析思维——分析方法和业务知识》&#xff0c;本文内容就是提取…