JAVA(SpringBoot)集成Kafka实现消息发送和接收。

SpringBoot集成Kafka实现消息发送和接收。

  • 一、Kafka 简介
  • 二、Kafka 功能
  • 三、POM依赖
  • 四、配置文件
  • 五、生产者
  • 六、消费者

君子之学贵一,一则明,明则有功

一、Kafka 简介

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 LinkedIn 公司开发,并于 2011 年开源。它是一种高吞吐量的分布式发布 - 订阅消息系统,以可持久化、高吞吐、低延迟、高容错等特性而著称。
Kafka 主要由生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和代理(Broker)等组件构成。生产者负责将数据发送到 Kafka 集群,消费者从集群中读取数据。主题是一种逻辑上的分类,数据被发送到特定的主题。每个主题又可以划分为多个分区,以实现数据的并行处理和提高系统的可扩展性。代理则是 Kafka 集群中的服务器节点,负责接收和存储生产者发送的数据,并为消费者提供数据读取服务。

二、Kafka 功能

消息队列功能:Kafka 可以作为消息队列使用,在应用程序之间传递消息。生产者将消息发送到主题,不同的消费者可以从主题中订阅并消费消息,实现应用程序解耦。例如,在电商系统中,订单生成模块可以将订单消息发送到 Kafka 主题,后续的库存管理、物流配送等模块可以从该主题消费订单消息,各自独立处理,降低模块间的耦合度。
数据存储功能:Kafka 具有持久化存储能力,它将消息数据存储在磁盘上,并且通过多副本机制保证数据的可靠性。即使某个节点出现故障,数据也不会丢失。这种特性使得 Kafka 不仅可以作为消息队列,还能用于数据的长期存储和备份,例如用于存储系统的操作日志,方便后续的数据分析和故障排查。
流处理功能:Kafka 可以与流处理框架(如 Apache Flink、Spark Streaming 等)集成,对实时数据流进行处理。通过将实时数据发送到 Kafka 主题,流处理框架可以从主题中读取数据并进行实时计算、分析和转换。例如,在实时监控系统中,通过 Kafka 收集服务器的性能指标数据,然后使用流处理框架对这些数据进行实时分析,及时发现性能异常并发出警报。

三、POM依赖

    <!-- kafka-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.8.11</version>
    </dependency>

四、配置文件

spring:
  # Kafka 配置
  kafka:
    # Kafka 服务器地址和端口 代理地址,可以多个
    bootstrap-servers: IP:9092
    # 生产者配置
    producer:
      # 发送失败时的重试次数
      retries: 3
      # 每次批量发送消息的数量,调整为较小值
      batch-size: 1
      # 生产者缓冲区大小
      buffer-memory: 33554432
      # 消息 key 的序列化器,将 key 序列化为字节数组
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息 value 的序列化器,将消息体序列化为字节数组
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者配置
    consumer:
      # 当没有初始偏移量或当前偏移量不存在时,从最早的消息开始消费
      auto-offset-reset: earliest
      # 是否自动提交偏移量
      enable-auto-commit: true
      # 自动提交偏移量的时间间隔(毫秒),延长自动提交时间间隔
      auto-commit-interval: 1000
      # 消息 key 的反序列化器,将字节数组反序列化为 key
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 消息 value 的反序列化器,将字节数组反序列化为消息体
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

五、生产者


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 生产者
 *
 * @author chenlei
 */
@Slf4j
@Component
public class KafkaProducer {

    /**
     * KafkaTemplate
     */
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 发送消息到指定的 Kafka 主题,并可指定分组信息
     *
     * @param topic   消息要发送到的 Kafka 主题
     * @param message 要发送的消息内容
     */
    public void sendMessage(String topic, String message) {
        // 使用 KafkaTemplate 发送消息,将消息发送到指定的主题
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                // 消息发送成功后的处理逻辑,可根据需要添加
                log.info("已发送消息=[" + message + "],其偏移量=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                // 消息发送失败后的处理逻辑,使用日志记录异常
                log.error("发送消息=[" + message + "] 失败", ex);
            }
        });
    }
}

六、消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @author 消费者
 * chenlei
 */
@Slf4j
@Component
public class KafkaConsumer {

    /**
     * 监听 Kafka 主题方法。
     *
     * @param record 从 Kafka 接收到的 ConsumerRecord,包含消息的键值对
     */
    @KafkaListener(topics = {"topic"}, groupId = "consumer.group-id", concurrency = "5")
    public void listen(ConsumerRecord<?, ?> record) {
        // 打印接收到的消息的详细信息
        log.info("接收到 Kafka 消息: 主题 = {}, 分区 = {}, 偏移量 = {}, 键 = {}, 值 = {}",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/960106.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Arduino大师练成手册 -- 控制 MH-SD 卡模块

要在 Arduino 上控制 MH-SD 卡模块&#xff0c;你可以按照以下步骤进行&#xff1a; 硬件连接 VCC&#xff1a;连接到 Arduino 的 3.3V 或 5V 引脚&#xff08;根据模块的要求&#xff09;。 GND&#xff1a;连接到 Arduino 的 GND 引脚。 CS&#xff1a;连接到 Arduino 的…

Android OpenGL(六) 纹理

纹理 纹理是一个2D图片&#xff08;甚至也有1D和3D的纹理&#xff09;&#xff0c; 它可以用来添加物体的细节&#xff1b;你可以想象纹理是一张绘有砖块的纸&#xff0c;无缝折叠贴合到你的3D的 房子上&#xff0c;这样你的房子看起来就像有砖墙外表了 纹理环绕方式 纹理坐…

一文大白话讲清楚webpack基本使用——18——HappyPack

文章目录 一文大白话讲清楚webpack基本使用——18——HappyPack1. 建议按文章顺序从头看&#xff0c;一看到底&#xff0c;豁然开朗2. 啥是HappyPack3. 怎么使用HappyPack 一文大白话讲清楚webpack基本使用——18——HappyPack 1. 建议按文章顺序从头看&#xff0c;一看到底&a…

深入 Rollup:从入门到精通(二)Rollup简介

1. 简介 对于将rollup描述为一个打包工具&#xff0c;我觉得并不是很精准&#xff0c;稍有些笼统&#xff0c;未能完全展示它的功能。 我更倾向于将它定义为&#xff1a; 以模块化的方式加载你的 js 文件&#xff08;如果使用插件&#xff0c;也可以支持其他静态资源文件&…

【数据结构】_C语言实现不带头非循环单向链表

目录 1. 链表的概念及结构 2. 链表的分类 3. 单链表的实现 3.1 SList.h头文件 3.2 SList.c源文件 3.3 Test_SList.c测试文件 关于线性表&#xff0c;已介绍顺序表&#xff0c;详见下文&#xff1a; 【数据结构】_顺序表-CSDN博客 本文介绍链表&#xff1b; 基于顺序表…

HTML5 Web Worker 的使用与实践

引言 在现代 Web 开发中&#xff0c;用户体验是至关重要的。如果页面在执行复杂计算或处理大量数据时变得卡顿或无响应&#xff0c;用户很可能会流失。HTML5 引入了 Web Worker&#xff0c;它允许我们在后台运行 JavaScript 代码&#xff0c;从而避免阻塞主线程&#xff0c;保…

Excel 技巧21 - Excel中整理美化数据实例,Ctrl+T 超级表格(★★★)

本文讲Excel中如何整理美化数据的实例&#xff0c;以及CtrlT 超级表格的常用功能。 目录 1&#xff0c;Excel中整理美化数据 1-1&#xff0c;设置间隔行颜色 1-2&#xff0c;给总销量列设置数据条 1-3&#xff0c;根据总销量设置排序 1-4&#xff0c;加一个销售趋势列 2&…

手撕Diffusion系列 - 第九期 - 改进为Stable Diffusion(原理介绍)

手撕Diffusion系列 - 第九期 - 改进为Stable Diffusion&#xff08;原理介绍&#xff09; 目录 手撕Diffusion系列 - 第九期 - 改进为Stable Diffusion&#xff08;原理介绍&#xff09;DDPM 原理图Stable Diffusion 原理Stable Diffusion的原理解释Stable Diffusion 和 Diffus…

倍频增量式编码器--角度插值法输出A,B(Aangular Interpolation)

问题是&#xff1a; 最大速度&#xff0c;周期刻度&#xff0c;最小细分刻度&#xff0c;可以计算得到&#xff1a; 结论&#xff1a; 按照最高速度采样&#xff1b;数字A,B输出间隔时间&#xff1a;按照计算角度 插入细分角度运算算时间&#xff08;最快速度&#xff09;&a…

基于微信小程序的助农扶贫系统设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…

嵌入式C语言:结构体对齐

目录 一、对齐的原因 1.1. 硬件访问效率 1.2. 内存管理简化 1.3. 编译器优化 1.4. 代码示例 二、对齐规则 2.1. 基本数据类型对齐 2.2. 结构体成员对齐 2.3. 结构体整体对齐 2.4. 代码示例 三、对齐控制 3.1. 使用 #pragma pack 3.2. 使用 __attribute__((packed)…

网络工程师 (1)数据的表示

一、R进制的表示及互转 R进制是一种数制表示方法&#xff0c;其中R代表基数&#xff0c;采用R个基本符号&#xff08;0, 1, 2, …, R-1&#xff09;表示各位上的数字&#xff0c;并遵循“逢R进一”的运算规则。在R进制中&#xff0c;每一位上的数字均不超过R-1。R进制与十进制之…

coffee销售数据集分析:基于时间趋势分析的实操练习

**文章说明&#xff1a;**对coffee销售数据集的简单分析练习&#xff08;时间趋势分析练习&#xff09;&#xff0c;主要是为了强化利用python进行数据分析的实操能力。属于个人的练习文章。 **注&#xff1a;**这是我第一次使用md格式编辑博客文章&#xff0c;排版上还是不是很…

SQL在DBA手里-改写篇

背景 最近运营需要做月报汇总交易情况&#xff0c;之前一直是他们手工出的数据&#xff0c;他们想做成月初自动发送邮件&#xff0c;从而减轻他们的工作量。于是他们提供SQL我们在邮件服务器配置做定时发送任务。 表介绍&#xff08;表及字段已做脱敏处理&#xff09; trans…

vue3入门基础学习之搭建登录验证功能

环境准备&#xff1a;node.js、Visual Studio Code&#xff08;也可以是其他开发工具&#xff0c;选自己熟悉的就行&#xff09; 下载地址&#xff1a;https://nodejs.p2hp.com/https://code.visualstudio.com/ 新建一个vue3的项目&#xff0c;选一个文件夹执行以下命令 使用…

Scrapy如何设置iP,并实现IP重用, IP代理池重用

前置知识 1/3乐观锁 2/3 Scrapy流程(非全部) 3/3 关于付费代理 我用的"快代理", 1000个ip, 每个ip1min的有效期, 你用的时候, 把你的链接, 用户名填上去就行 设置代理IP &#x1f512; & 帮助文档: ①meta ②meta#proxy$ 语法: ①proxy的设置: Request对象中…

消息队列篇--通信协议篇--网络通信模型(OSI7层参考模型,TCP/IP分层模型)

一、OSI参考模型&#xff08;Open Systems Interconnection Model&#xff09; OSI参考模型是一个用于描述和标准化网络通信功能的七层框架。它由国际标准化组织&#xff08;ISO&#xff09;提出&#xff0c;旨在为不同的网络设备和协议提供一个通用的语言和结构&#xff0c;以…

开源智慧园区管理系统对比五款主流产品探索智能运营新模式

内容概要 在这个数字化迅速发展的时代&#xff0c;园区管理也迎来了全新的机遇和挑战。众所周知&#xff0c;开源智慧园区管理系统作为一种创新解决方案&#xff0c;正逐步打破传统管理的局限性。它的开放性不仅使得系统可以根据具体需求进行灵活调整&#xff0c;也为用户提供…

leetcode——删除链表的倒数第N个节点(java)

给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5] 示例 2&#xff1a; 输入&#xff1a;head [1], n 1 输出&#xff1a;[] 示例 3&#xf…

WPS数据分析000009

一、函数与数据透视表统计数据时效率差异 函数 F4绝对引用 数据透视表 二、数据透视表基础操作 数据透视表&#xff1a;一个快速的生成报表的工具 显示详细信息 方式一; 方式二&#xff1a; 移动数据透视表 删除数据透视表 复制粘贴数据透视表 留足空间&#xff0c;否则拖动字…