Spring Boot 中整合 Kafka

在 Spring Boot 中整合 Kafka 非常简单,Spring Kafka 提供了丰富的支持,使得我们可以轻松地实现 Kafka 的生产者和消费者。下面是一个简单的 Spring Boot 整合 Kafka 的示例。

1. 添加依赖

首先,在 pom.xml 中添加 Spring Kafka 的依赖:

xml

复制

<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- Spring Boot Starter Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <!-- Spring Kafka Test -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

运行 HTML

2. 配置 Kafka

在 application.properties 或 application.yml 中配置 Kafka 的连接信息:

properties

复制

# application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

3. 创建 Kafka 生产者

创建一个 Kafka 生产者,用于发送消息到 Kafka 主题:

java

复制

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

4. 创建 Kafka 消费者

创建一个 Kafka 消费者,用于从 Kafka 主题中接收消息:

java

复制

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

5. 创建控制器

创建一个控制器,用于发送消息到 Kafka 主题:

java

复制

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("message") String message) {
        kafkaProducer.sendMessage("my-topic", message);
        return "Message sent: " + message;
    }
}

6. 启动应用程序

创建一个 Spring Boot 应用程序启动类:

java

复制

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }
}

7. 运行应用程序

启动 Spring Boot 应用程序,然后访问 http://localhost:8080/send?message=Hello,你将看到控制台输出 Received message: Hello

总结

通过以上步骤,我们成功地在 Spring Boot 中整合了 Kafka,并实现了消息的生产和消费。Spring Kafka 提供了简单易用的 API,使得我们可以轻松地与 Kafka 进行交互。你可以根据实际需求进一步扩展和优化这个示例。

在 Kafka 中,消费者消费消息失败后是否重新入队(重回队列)取决于消费者的配置和处理逻辑。Kafka 本身并没有直接提供“消费失败重回队列”的功能,但可以通过一些策略来实现类似的效果。

Kafka 消费者的基本行为

  1. 自动提交偏移量:默认情况下,Kafka 消费者会自动提交偏移量。如果消息处理失败,偏移量仍然会被提交,导致消息丢失。

  2. 手动提交偏移量:通过配置 enable.auto.commit=false,消费者可以手动提交偏移量。这样可以在消息处理成功后再提交偏移量,确保消息不会丢失。

实现消费失败重回队列的策略

1. 手动提交偏移量

通过手动提交偏移量,可以在消息处理成功后再提交偏移量,从而避免消息丢失。如果消息处理失败,可以选择不提交偏移量,让消费者在下一次拉取消息时重新处理该消息。

java

复制

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        try {
            // 处理消息
            processMessage(record.value());
            // 处理成功后提交偏移量
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 处理失败,不提交偏移量,让消费者在下一次拉取时重新处理该消息
            System.err.println("Message processing failed: " + e.getMessage());
        }
    }

    private void processMessage(String message) {
        // 模拟消息处理逻辑
        if (message.contains("error")) {
            throw new RuntimeException("Simulated error");
        }
        System.out.println("Processed message: " + message);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/882294.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

论文集搜索网站-dblp 详细使用方法

分享在dblp论文集中的两种论文搜索方式&#xff1a;关键字搜索&#xff0c;指定会议/期刊搜索。 关键字搜索 进入dblp官方网址dblp: computer science bibliography&#xff0c;直接在上方搜索栏&#xff0c;搜索关键字&#xff0c;底下会列出相关论文。 指定会议/期刊搜索 …

三菱FX5U PLC故障处理(各种出错的内容、原因及处理方法进行说明。)

对使用系统时发生的各种出错的内容、原因及处理方法进行说明。 故障排除的步骤 发生故障时&#xff0c;按以下顺序实施故障排除。 1.确认各模块是否正确安装或正确配线。 2、确认CPU模块的LED。 3.确认各智能功能模块的LED。(各模块的用户手册) 4、连接工程工具&#xff0c;启…

从数据仓库到数据中台再到数据飞轮:我了解的数据技术进化史

这里写目录标题 前言数据仓库&#xff1a;数据整合的起点数据中台&#xff1a;数据共享的桥梁数据飞轮&#xff1a;业务与数据的双向驱动结语 前言 在当今这个数据驱动的时代&#xff0c;企业发展离不开对数据的深度挖掘和高效利用。从最初的数据仓库&#xff0c;到后来的数据…

828华为云征文|华为Flexus云服务器搭建Cloudreve私人网盘

一、华为云 Flexus X 实例&#xff1a;开启高效云服务新篇&#x1f31f; 在云计算的广阔领域中&#xff0c;资源的灵活配置与卓越性能犹如璀璨星辰般闪耀。华为云 Flexus X 实例恰似一颗最为耀眼的新星&#xff0c;将云服务器技术推向了崭新的高度。 华为云 Flexus X 实例基于…

使用SpringCloud构建可伸缩的微服务架构

Spring Cloud是一个用于构建分布式系统的开源框架。它基于Spring Boot构建&#xff0c;并提供了一系列的工具和组件&#xff0c;用于简化开发分布式系统的难度。Spring Cloud可以帮助开发人员快速构建可伸缩的微服务架构。 要使用Spring Cloud构建可伸缩的微服务架构&#xff0…

对接阿里asr和Azure asr

1&#xff1a;对接阿里asr 1.1&#xff1a;pom <dependency><groupId>com.alibaba.nls</groupId><artifactId>nls-sdk-recognizer</artifactId><version>2.2.1</version> </dependency>1.2&#xff1a;生成token package c…

C++之STL—vector容器基础篇

头文件 #include <vector> //vector容器 #include <algorithm> //算法 基本用法&&概念 vector<int> v; v.push_back(10); vector<int >::iterator v.begin(); v.end(); 三种遍历方式 #include <vector> #include <algorithm>…

基于区块链的相亲交易系统源码解析

随着区块链技术的成熟与发展&#xff0c;其去中心化、不可篡改的特性逐渐被应用于各行各业。特别是在婚恋市场中&#xff0c;区块链技术的应用为相亲平台带来了新的可能性 。本文将探讨如何利用区块链技术构建一个透明、高效的相亲交易系统&#xff0c;并提供部分源码示例。 区…

大模型的实践应用30-大模型训练和推理中分布式核心技术的应用

大家好,我是微学AI,今天给大家介绍一下大模型的实践应用30-大模型训练和推理中分布式核心技术的应用。本文深入探讨了大模型训练和推理中分布式核心技术的应用。首先介绍了项目背景,阐述了大模型发展对高效技术的需求。接着详细讲解了分布式技术的原理,包括数据并行、模型并…

数据转换器——佛朗哥Chater2

【注:本文基于《数据转换器》一书进行学习、总结编撰,适合新手小白进行学习】 目录 2.1 数据转换器类别 2.2 工作条件 2.3 转换器性能参数 2.3.1 基本特性参数 2.4 静态性能参数 2.5 动态性能参数 2.6 数字和开关性能参数 2.1 数据转换器类别 转换器类型可以被分为两…

英飞凌TC3xx -- Bootstrap Loader分析

目录 1.Bootstrap Loaders作用 2.CAN BSL详解 2.1 CAN BSL的时钟系统 2.2 CAN BSL流程 3.小结 英飞凌TC3xx的Platform Firmware章节里&#xff0c;提供了多种启动模式&#xff1a; Internal start from Flash&#xff1a;b111Alternate Boot Mode&#xff1a;b110Generic …

杀软对抗 ---> Perfect Syscall??

好久没更了&#xff0c;今天想起来更新了&#x1f60b;&#x1f60b;&#x1f60b;&#x1f60b; 目录 1.AV && EDR 2.Perfect Syscall&#xff1f;&#xff1f; 3.Truly Perfect ??? 在开始之前先来展示一下这次的免杀效果 1.AV && EDR 360 天擎EDR …

[c++进阶(九)] STL之deque深度剖析

1.前言 本章重点 本章将会着重的介绍deque底层到底是如何实现它能够双向进出的&#xff0c;并且双向进出的消耗率还特别低&#xff0c;并且讲解deque的优缺点。 2.deque的使用 如果没有看我前面两篇文章的&#xff0c;请先看前面两篇文章再来看这篇文章&#xff0c;可以有助于…

手写Spring第三篇,原来Spring容器是使用反射来初始化对象的

上次是不是你小子和大家说你拿来做登记的样品被我收了&#xff0c;然后取豆子的时候就是这个样品的&#xff1f; 今天我来辟一下谣&#xff0c;真的是这样的。这小子的样品确实被我收了&#xff0c;不过这小子没给真东西给我&#xff0c;只给了一个指针&#xff0c;害我宝贝得存…

Git rebase 的使用(结合图与案例)

目录 Git rebase 的使用Git rebase 概念Git rebase 原理rebase和merge的选择 Git rebase 的使用 在 Git 中整合来自不同分支的修改主要有两种方法&#xff1a;merge 以及 rebase Git rebase 概念 **rebase概念&#xff1a;**用来重新应用提交&#xff08;commits&#xff09…

Llama 3.1 技术研究报告-1

llama3模型 现代⼈⼯智能&#xff08;AI&#xff09;系统由基础模型驱动。本⽂介绍了⼀组新的基础模型&#xff0c;称为Llama 3。它是⼀个语⾔模型群&#xff0c;原⽣⽀持多语⾔性、编码、推理和⼯具使⽤。我们最⼤的模型是⼀个密集变换器&#xff0c;有 405B个参数&#xff0…

oracle 插入date日期类型的数据、插入从表中查出的数据,使用表中的默认数据

date sysdate to_date 插入从表中查出的数据 方式一 方式二 或者指定列名称 下边这个案例的前提是指定列插入&#xff0c;如果不指定&#xff0c;则也是默认的

消息中间件---Kafka

一、什么是Kafka&#xff1f; Kafka是一个分布式流处理平台,类似于消息队列或企业消息传递系统&#xff1b; 流处理事什么呢&#xff1f; 流处理就是数据处理工作流&#xff0c;本质上是一种计算机编程范例。流处理是对接收到的新数据事件的连续处理。‌它涉及对从生产者到消…

HTML+CSS学习笔记

目录 HTML 1.开发环境 2.创建HTML文件 3.HTML元素 3.1HTML文件结构 3.2HTML标签 3.3HTML属性​编辑​编辑 3.4HTML区块 3.4.1块元素 3.4.2行内元素 3.5HTML表单 CSS 1.CSS简介 2.CSS语法​编辑 3.CSS三种导入方式 内联样式 内部样式 外部样式 4.选择器​ 5.C…

9月23日

思维导图 作业 统计家目录下.c文件的个数 #!/bin/bashnum0for file in ~/*.c; doif [ -f "$file" ]; then((num))fi doneecho "家目录下.c文件的个数: $num"