SpingBoot集成kafka发送读取消息

SpingBoot集成kafka开发

    • kafka的几个常见概念
  • 1、springboot和kafka对应版本(重要)
  • 2、创建springboot项目,引入kafka依赖
    • 2.1、生产者EventProducer
    • 2.2、消费者EventConsumer
    • 2.3、启动生产者的方法SpringBoot01KafkaBaseApplication
    • 2.4、application.yml
    • 2.5、pom.xml
    • 2.6、启动springboot项目的启动类(Application)报错
  • 3、springboot集成kafka读取最早的消息
    • 3.1、如何设置消费者auto-offset-reset: earliest
    • 3.2、设置消费者auto-offset-reset: earliest后存在的问题
      • 3.2.1、修改消费组ID
      • 3.2.2、手动重置偏移量
          • 3.2.2.1、手动将偏移量设置为最早
          • 3.2.2.2、手动将偏移量设置为最新

kafka的几个常见概念

在这里插入图片描述

1、springboot和kafka对应版本(重要)

https://spring.io/projects/spring-kafka

在这里插入图片描述

2、创建springboot项目,引入kafka依赖

在这里插入图片描述

2.1、生产者EventProducer

package com.power.producer;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    public void sendEvent(){
        kafkaTemplate.send("hello-topic","hello kafka");
    }
}

2.2、消费者EventConsumer

package com.power.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {

    //采用监听的方式接收事件(消息,数据)
    @KafkaListener(topics = {"hello-topic"},groupId="hello-group")
    public void onEvent(String event){
        System.out.println("读取到的事件:"+event);
    }
}

2.3、启动生产者的方法SpringBoot01KafkaBaseApplication

执行一次该方法,会调用一次生产者发送一次消息。
即每执行一次,会调用EventProducer类下的sendEvent方法一次。

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void test01(){
        eventProducer.sendEvent();
    }
}

2.4、application.yml

spring:
  application:
    #应用名称
    name: spring-boot-01-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的服务器ip>:9092
    #配置生产者(有24个配置)
    #producer:
    #配置消费者(有24个配置)
    #consumer:

启动服务后发现报错:
在这里插入图片描述

2.5、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.2</version>
        <relativePath />
    </parent>

    <groupId>org.powernode</groupId>
    <artifactId>spring-boot-01-kafka-base</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafkaSpringBootProject</name>
    <description>kafka project for Spring Boot</description>

    <properties>
        <java.version>8</java.version>
    </properties>


    <repositories>
        <repository>
            <id>central</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <layout>default</layout>
            <!-- 是否开启发布版构件下载 -->
            <releases>
                <enabled>true</enabled>
            </releases>
            <!-- 是否开启快照版构件下载 -->
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>2.8.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2.6、启动springboot项目的启动类(Application)报错

项目启动类

package com.power;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
		System.out.println("启动成功--------------------------");
	}
}

修改server.properties配置文件:
在这里插入图片描述修改前:

在这里插入图片描述
修改后:
在这里插入图片描述

3、springboot集成kafka读取最早的消息

已经被消费者读取/消费的消息,无法被新启动的消费组消息的,那么新启动的消费组该如何读取最早的消息呢,可以通过设置消费者auto-offset-reset: earliest去实现。
在这里插入图片描述

3.1、如何设置消费者auto-offset-reset: earliest

在这里插入图片描述

1、修改application.yml
在这里插入图片描述

3.2、设置消费者auto-offset-reset: earliest后存在的问题

在这里插入图片描述

3.2.1、修改消费组ID

原消费组ID
在这里插入图片描述
修改后的消费组ID
在这里插入图片描述4、新的消费组ID成功读取到之前的消息
在这里插入图片描述

3.2.2、手动重置偏移量

3.2.2.1、手动将偏移量设置为最早
#示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-earliest --execute

来到kafka安装目录下:

在这里插入图片描述执行如下命令:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-earliest --execute

执行后报错

在这里插入图片描述
需要先停掉服务,在去手动重置偏移量,此时重置偏移量成功,偏移量为0

3.2.2.2、手动将偏移量设置为最新
#示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-latest --execute
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-latest --execute

设置成功,此时偏移量已为最新:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/871564.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

蓝队技能-应急响应篇挖矿病毒系统样本家族威胁情报异常定性排查分析处置封锁

知识点 1、应急响应-挖矿病毒-定性&排查 2、应急响应-挖矿病毒-应急&处置演示案例-蓝队技能-挖矿病毒-样本&定性&排查&应急&处置 挖矿病毒 随着虚拟货币的疯狂炒作&#xff0c;挖矿病毒已经成为不法分子利用最为频繁的攻击方式之一。 可以利用个人电…

[论文笔记]Improving Retrieval Augmented Language Model with Self-Reasoning

引言 今天带来一篇百度提出的关于提升RAG准确率的论文笔记&#xff0c;Improving Retrieval Augmented Language Model with Self-Reasoning。 为了简单&#xff0c;下文中以翻译的口吻记录&#xff0c;比如替换"作者"为"我们"。 检索增强语言模型(Retrie…

你应该停止使用的 7 个已弃用的 Python 库

欢迎来到雲闪世界。升级您的 Python 工具包&#xff1a;发现 7 个应停止使用的过时库以及替代它们的功能。最近&#xff0c;我回顾了 Python 的新特性&#xff0c;发现每个版本都引入了创新&#xff0c;使我们的日常开发工作变得更加轻松。 这让我意识到科技是一门永无止境的艺…

有了这4款工具,你就知道电脑怎么录屏了!

电脑屏幕录屏这个问题很多人都会碰到&#xff0c;比如教学视频录制&#xff0c;游戏技巧分享&#xff0c;软件操作演示等等。因为场景众多&#xff0c;电脑自带的录屏功能不一定能满足&#xff0c;所以借助第三方工具是一个很有效的办法。如果大家不知道如何录屏&#xff0c;可…

I2C学习:上拉电阻选取

一&#xff0e;I2C简介 I2C总线是由Philips公司开发的一种简单、双向二线制同步串行总线。I2C总线在使用时&#xff0c;需要接上拉电阻&#xff0c;这是因为I2C接口是开漏输出&#xff0c;如图1所示。 图1 I2C开漏输出 I2C有5种速度模式&#xff1a;标准&#xff08;100KHz&am…

在亚马逊云科技上安全、合规地创建AI大模型训练基础设施并开发AI应用服务

项目简介&#xff1a; 小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案&#xff0c;帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践&#xff0c;并应用到自己的日常工作里。 本次介绍的是如何在亚马逊云科技利用Servi…

就业c++02 随处可见红黑树

通过key来比较节点插入哪个地方 一种key value 另一种顺序执行 比如查找小于50的数字在左面还是在右面 访问那个资源 他的次数是多少构建了 资源key 次数 value 海量的io 过来访问 知道哪一个io 就是key value查找 根据黑色高度的差异 红色节点和红色节点是不相邻的

Python数据分析:数据可视化(Matplotlib、Seaborn)

数据可视化是数据分析中不可或缺的一部分&#xff0c;通过将数据以图形的方式展示出来&#xff0c;可以更直观地理解数据的分布和趋势。在Python中&#xff0c;Matplotlib和Seaborn是两个非常流行和强大的数据可视化库。本文将详细介绍这两个库的使用方法&#xff0c;并附上一个…

OceanMind海睿思入选《2024中国企业数智化转型升级服务全景图/产业图谱》

近日&#xff0c;国内知名数据智能产业创新服务媒体数据猿携手上海大数据联盟发布了《2024中国企业数智化转型升级服务全景图/产业图谱1.0版》。中新赛克海睿思从数千家企业中脱颖而出&#xff0c;成功入选「底层技术服务 - 大数据」细分领域。 在历经数月的时间里&#xff0c;…

实现Bezier样条曲线

1.给出n1 个控制点pk(xk,yk,zk),这里k可取值0-n,多项式函数公式如下 获取的单个点的代码 void zmBezier::getPoint(float u, double p[3]) {int n m_count - 1;double x 0, y 0, z 0;for(int k 0; k < n; k){x m_ctrlPoints[k][0] * BEZ_k_n(n, k, u);y m_ctrlPoin…

《机器学习》—— 使用过采样方法实现逻辑回归分类问题

文章目录 一、什么是过采样方法&#xff1f;二、使用过采样方法实现逻辑回归分类问题三、过采样的优缺点 本篇内容是 基于Python的scikit-learn库中sklearn.linear_model 类中的 LogisticRegression&#xff08;&#xff09;逻辑回归方法实现的&#xff0c;其内容中只是在处理…

创建、使用、删除 数据库

一、创建数据库 1.1 使用DDL语句创建数据库 CREATE DATABASE 数据库名 CHARACTER SET 字符编码 COLLATE 排序规则; 如果不指定数据库编码&#xff0c;默认是utf8&#xff1b; 如果不指定排序规则&#xff0c;默认是utf8_general_ci&#xff0c;即不区分大小写&#xff0c;区分…

我“开发“了一款大模型应用,AI门槛这么低了吗?

现在国产大模型多如牛毛。虽然可选的大模型产品很多&#xff0c;但普遍存在同质化、高分低能、实用性差、专业性不足的问题&#xff0c;哪怕是诸如ChatGPT、Gemini这样全球顶尖的大模型也会存在这种情况。 还有一点比较重要的是&#xff0c;由于大模型需要算力、算法、数据的基…

vue+ckEditor5 复制粘贴wold文字+图片并保存格式

第一步在vue2项目下安装 npm install --save ckeditor/ckeditor5-build-decoupled-document 第二 项目下新建一个plugins的文件夹将这个包ckeditor5-build-classic放入 &#xff08;包在页面最上方 有个下载按钮 可以下载&#xff09; 刚开始时 ckeditor5-build-classic文件…

「字符串」前缀函数|KMP匹配:规范化next数组 / LeetCode 28(C++)

目录 概述 思路 核心概念&#xff1a;前缀函数 1.前缀函数 2.next数组 1.考研版本 2.竞赛版本 算法过程 构建next数组 匹配过程 复杂度 Code 概述 为什么大家总觉得KMP难&#xff1f;难的根本就不是这个算法本身。 在互联网上你可以见到八十种KMP算法的next数组…

SQL 布尔盲注 (injection 第六关)

简介 SQL注入&#xff08;SQL Injection&#xff09;是一种常见的网络攻击方式&#xff0c;通过向SQL查询中插入恶意的SQL代码&#xff0c;攻击者可以操控数据库&#xff0c;SQL注入是一种代码注入攻击&#xff0c;其中攻击者将恶意的SQL代码插入到应用程序的输入字段中&am…

26.删除有序数组中的重复项---力扣

题目链接&#xff1a; . - 力扣&#xff08;LeetCode&#xff09;. - 备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能,轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/remove-duplicates-from-sorted-array/descript…

嵌入式学习——(Linux高级编程——线程)

线程 一、pthread 线程概述 pthread&#xff08;POSIX threads&#xff09;是一种用于在程序中实现多线程的编程接口。它与进程一样&#xff0c;可以用于实现并发执行任务&#xff0c;但与进程相比有一些不同的特点。 二、优点 1. 比多进程节省资源&#xff1a;进程在创建时…

PDPS软件 那智机器人 (丰田版)离线程序导出处理

在PDPS仿真软件中导出的那智机器人离线程序&#xff0c;一般是无法直接给TFD控制装置-那智机器人&#xff08;丰田式样版&#xff09;导入及识别使用。因此要对导出的程序进行转换编译处理&#xff0c;才能给TFD那智机器人&#xff08;丰田式样版&#xff09;导入离线程序。以下…

HarmonyOS 开发

环境 下载IDE 代码 import { hilog } from kit.PerformanceAnalysisKit; import testNapi from libentry.so; import { router } from kit.ArkUI; import { common, Want } from kit.AbilityKit;Entry Component struct Index {State message: string Hello HarmonyOS!;p…