kafka-消费者-指定offset消费(SpringBoot整合Kafka)

文章目录

  • 1、指定offset消费
    • 1.1、创建消费者监听器‘
    • 1.2、application.yml配置
    • 1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
    • 1.4、创建生产者发送消息
      • 1.4.1、分区0中的数据
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:

1、指定offset消费

1.1、创建消费者监听器‘

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaPartitionListener {

    //初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用
    @KafkaListener(
            topicPartitions = {
                    @TopicPartition(topic = "my_topic1"
                    ,partitionOffsets = {
                            @PartitionOffset(partition = "0",initialOffset = "2")
                    })}
            , groupId = "my_group1")
    public void onMessage1(ConsumerRecord<String, String> record) {
        System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()
                +",partition:"+record.partition()
                +",offset = "+record.offset()
                +",key = "+record.key()
                +",value = "+record.value());
    }

}

1.2、application.yml配置

server:
  port: 8120

# v1
spring:
  Kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    consumer:
      # read-committed读事务已提交的消息 解决脏读问题
      isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
      # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
      enable-auto-commit: true 
      # 消费者提交ack时多长时间批量提交一次
      auto-commit-interval: 1000
      # 消费者第一次消费主题消息时从哪个位置开始
      auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本

package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class MyKafkaConfig {
    @Bean
    public NewTopic springTestPartitionTopic() {
        return TopicBuilder.name("my_topic1") //主题名称
                .partitions(3) //分区数量
                .replicas(3) //副本数量
                .build();
    }
}


在这里插入图片描述
在这里插入图片描述

1.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest
class SpringKafkaConsumerApplicationTests {
    @Resource
    KafkaTemplate kafkaTemplate;

    @Test
    void contextLoads() {
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send("my_topic1",i%3,"", "指定分区消费"+i);
        }
    }

}


在这里插入图片描述
在这里插入图片描述

1.4.1、分区0中的数据

[
  [
    {
      "partition": 0,
      "offset": 0,
      "msg": "指定offset消费0",
      "timespan": 1717660785962,
      "date": "2024-06-06 07:59:45"
    },
    {
      "partition": 0,
      "offset": 1,
      "msg": "指定offset消费3",
      "timespan": 1717660785974,
      "date": "2024-06-06 07:59:45"
    },
    {
      "partition": 0,
      "offset": 2,
      "msg": "指定offset消费6",
      "timespan": 1717660785975,
      "date": "2024-06-06 07:59:45"
    },
    {
      "partition": 0,
      "offset": 3,
      "msg": "指定offset消费9",
      "timespan": 1717660785975,
      "date": "2024-06-06 07:59:45"
    }
  ]
]

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaConsumerApplication.class, args);
    }

}


1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu</groupId>
    <artifactId>spring-kafka-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-kafka-consumer</name>
    <description>spring-kafka-consumer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.8、消费者控制台:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/688089.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

算法003:快乐数

这道题采用快慢双指针的方法。 为了弄清楚这个题到底是要我们干嘛&#xff0c;我们把整个过程类比一下&#xff1a; 不管是n19还是n2&#xff0c;我们都把它当成一种判断链表是否有环的方式。 对于n19&#xff0c;题干是这样解释的&#xff1a; 我们把它当成链表&#xff0c…

如何挑选最适合你的渲染工具

随着技术的发展&#xff0c;云渲染平台逐渐成为设计师、动画师、影视制作人员等创意工作者的得力助手。然而&#xff0c;市场上的云渲染平台种类繁多&#xff0c;如何选择最适合自己的渲染工具成为了一个需要认真考虑的问题。 在挑选适合自己的云渲染工具时&#xff0c;我们需…

YOLOv10涨点改进:原创自研 | GhostNet融合 | 从廉价的操作中生成更多的特征图

文章目录 GhostNet理论基础实验部分改进方案新增yolov10s-ghost.yaml文件代码运行GhostNet理论基础 Ghost Module是一种模型压缩的方法,即在保证网络精度的同时减少网络参数和计算量,从而提升计算速度(speed),降低延时(latency)。Ghost 模块可以代替现有卷积网络中的每…

OpenAI模型规范概览

这是OpenAI对外分享的模型规范文档&#xff08;Model Spec&#xff09;&#xff0c;它定义了OpenAI希望在API接口和ChatGPT&#xff08;含GPT系列产品&#xff09;中模型的行为方式&#xff0c;这也是OpenAI超级对齐团队奉行的行为准则&#xff0c;希望能对国内做RLHF的同学有帮…

.Net实现SCrypt Hash加密

方案1 &#xff08;加密后存储“算法设置”、“盐(随机值)”、“Hash值”&#xff0c;以“$”分隔&#xff09;&#xff1a; //Nuget引入SCrypt.NET库 using Org.BouncyCastle.Crypto.Generators; using Scrypt; using System; using System.Security.Cryptography; namespace …

Apache OFBiz 路径遍历导致RCE漏洞复现(CVE-2024-36104)

0x01 产品简介 Apache OFBiz是一个电子商务平台,用于构建大中型企业级、跨平台、跨数据库、跨应用服务器的多层、分布式电子商务类应用系统。是美国阿帕奇(Apache)基金会的一套企业资源计划(ERP)系统。该系统提供了一整套基于Java的Web应用程序组件和工具。 0x02 漏洞概…

90后机器人创业者再获10亿元融资,为精密传动行业注入新动力!

据了解&#xff0c;一位90后机器人创业者再次获得近10亿元人民币的融资&#xff0c;这一消息在精密传动行业引起了广泛关注。 杭州宇树科技有限公司&#xff08;简称“宇树”&#xff09;&#xff0c;2024年春节前完成了B2轮融资&#xff0c;融资近10亿元人民币&#xff0c;本轮…

pyqt5 tablewidget实现excel拖曳填充

代码主要涉及鼠标事件和绘图&#xff0c;selectionModel&#xff0c;selectedIndexes。 import sys from PyQt5.QtCore import QPoint, Qt, QCoreApplication, pyqtSlot from PyQt5.QtGui import QBrush, QPixmap, QColor, QPainter,QIcon,QPolygon from PyQt5.QtWidgets imp…

【Java】解决Java报错:ClassCastException

文章目录 引言1. 错误详解2. 常见的出错场景2.1 错误的类型转换2.2 泛型集合中的类型转换2.3 自定义类和接口转换 3. 解决方案3.1 使用 instanceof 检查类型3.2 使用泛型3.3 避免不必要的类型转换 4. 预防措施4.1 使用泛型和注解4.2 编写防御性代码4.3 使用注解和检查工具 5. 示…

64. UE5 RPG 创建新的双手攻击怪物

在上一篇文章中&#xff0c;我们实现了新的功能&#xff0c;现在可以创建多个普通攻击动画&#xff0c;并且可以根据你所使用的普通攻击动画&#xff0c;设置不同的攻击位置。比如&#xff0c;你使用武器&#xff0c;那么攻击位置需要从武器上获取&#xff0c;如果你没有持有武…

《精通ChatGPT:从入门到大师的Prompt指南》第2章:Prompt的基本概念

2.1 什么是Prompt 在了解和使用ChatGPT的过程中&#xff0c;理解和掌握Prompt的概念是至关重要的。Prompt可以简单地定义为一段指令或请求&#xff0c;它向AI模型提供了生成特定回应或行为所需的初始信息。具体而言&#xff0c;Prompt是用户与AI系统之间的桥梁&#xff0c;通过…

MatrixOne→MatrixOS:矩阵起源的创业史即将用“AI Infra”和“AI Platform”书写新章程

在数字化浪潮的推动下&#xff0c;MatrixOne的故事就像一部科技界的创业史诗&#xff0c;它始于一个简单而宏伟的梦想——构建一个能够支撑起新一代数字世界的操作系统。想象一下&#xff0c;在AIGC时代&#xff0c;数据流动如同“血液”&#xff0c;算法运转如同“心跳”&…

【Neo4j】Windows11使用Neo4j导入CSV数据可视化知识图谱

Windows11使用Neo4j导入CSV数据可视化知识图谱 序1. 安装JDK21&#xff08;1&#xff09;下载&#xff08;2&#xff09;安装&#xff08;3&#xff09;环境配置 2. 安装Neo4j&#xff08;1&#xff09;下载&#xff08;2&#xff09;解压安装&#xff08;3&#xff09;环境配置…

国货美妆品牌站上C位,抖音618大促期间相关产品销量同比增长53%

每年618大促时期是各类美妆产品销售旺季。近两年&#xff0c;爱美人士的囤货清单里出现越来越多国货美妆品牌。 据《2023年中国化妆品年鉴》&#xff0c;去年国内美妆市场总体规模达7972亿元&#xff0c;其中&#xff0c;国货市场份额达到50.4%&#xff0c;首次超越外资品牌。…

Cloudpods 强大的多云管理平台部署

简介 Cloudpods 是一款简单、可靠的企业IaaS资源管理软件。帮助未云化企业全面云化IDC物理资源&#xff0c;提升企业IT管理效率。 Cloudpods 帮助客户在一个地方管理所有云计算资源。统一管理异构IT基础设施资源&#xff0c;极大简化多云架构复杂度和难度&#xff0c;帮助企业…

遗址博物馆ar互动展示软件提供丰富的趣味化体验

在自然博物馆的每一个角落&#xff0c;都隐藏着大自然的奥秘与魅力。为了让每一位参观者都能深入体验、探索这些奥秘&#xff0c;我们引入了前沿的AR技术&#xff0c;为您带来一场前所未有的沉浸式自然之旅。 步入博物馆&#xff0c;您手中的AR相机将成为您的更佳向导。自然博物…

「动态规划」如何求最小路径和?

64. 最小路径和https://leetcode.cn/problems/minimum-path-sum/description/ 给定一个包含非负整数的m x n网格grid&#xff0c;请找出一条从左上角到右下角的路径&#xff0c;使得路径上的数字总和为最小。说明&#xff1a;每次只能向下或者向右移动一步。 输入&#xff1a;…

短视频矩阵源码----如何做正规开发规则分享:

一、什么是SaaS化服务技术开发&#xff1f; &#xff08;短视频矩阵系统是源头开发的应该分为3个端口---- 总后台控制端、总代理端口&#xff0c;总商户后台&#xff09; SaaS是软件即服务&#xff08;Software as a Service&#xff09;的缩写。它是一种通过互联网提供软件应…

jvm学习笔记(二) ----- 垃圾回收

GC 一、判定对象是否是垃圾1.引用计数法2.可达性分析算法 二、垃圾回收算法1.标记清除2.标记整理3. 复制4. 分代垃圾回收1.尝试在伊甸园分配2.大对象直接晋升至老年代3.多次存活的对象4.老年代连续空间不足&#xff0c;触发 Full GC 链接: jvm学习笔记(一) ----- JAVA 内存 链接…

Android存储空间不足?试试这8个快速解决方案!

在当今的科技时代&#xff0c;Android智能手机已成为我们日常生活的重要组成部分&#xff0c;因为它们保存着我们大量的关键数据。然而&#xff0c;随着我们的使用模式不断扩大&#xff0c;手机内部存储的可用性经常变得有限。手机存储空间不足不仅会损害设备的功能和响应能力&…