kafka-消费者服务搭建配置简单消费(SpringBoot整合Kafka)

文章目录

  • 1、使用efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
  • 2、创建生产者发送消息
  • 3、application.yml配置
  • 4、创建消费者监听器
  • 5、创建SpringBoot启动类
  • 6、屏蔽 kafka debug 日志 logback.xml
  • 7、引入spring-kafka依赖

1、使用efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述在这里插入图片描述

2、创建生产者发送消息

[root@localhost ~]# kafka-console-producer.sh --bootstrap-server 192.168.74.148:9095,192.168.748:9096,192.168.74.148:9097 --topic my_topic1
>1
>2
>3
>

在这里插入图片描述

[
  [
    {
      "partition": 1,
      "offset": 0,
      "msg": "1",
      "timespan": 1717592203289,
      "date": "2024-06-05 12:56:43"
    },
    {
      "partition": 1,
      "offset": 1,
      "msg": "2",
      "timespan": 1717592204046,
      "date": "2024-06-05 12:56:44"
    },
    {
      "partition": 1,
      "offset": 2,
      "msg": "3",
      "timespan": 1717592204473,
      "date": "2024-06-05 12:56:44"
    }
  ]
]

3、application.yml配置

server:
  port: 8120

# v1
spring:
  Kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    consumer:
      # read-committed读事务已提交的消息 解决脏读问题
      isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
      # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
      enable-auto-commit: true # ??????offset
      # 消费者提交ack时多长时间批量提交一次
      auto-commit-interval: 1000
      # 消费者第一次消费主题消息时从哪个位置开始
      auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

4、创建消费者监听器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListener {

    @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println("消费者获取到消息:topic = "+ record.topic()
                +",partition:"+record.partition()
                +",offset = "+record.offset()
                +",key = "+record.key()
                +",value = "+record.value());
    }

}

5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaConsumerApplication.class, args);
    }

}

6、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>

7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu</groupId>
    <artifactId>spring-kafka-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-kafka-consumer</name>
    <description>spring-kafka-consumer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

此时启动SpringKafkaConsumerApplication,控制台会打印数据

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = null,value = 1
消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = null,value = 2
消费者获取到消息:topic = my_topic1,partition:1,offset = 2,key = null,value = 3

如果此时重新启动SpringKafkaConsumerApplication,控制台将不会打印数据,因为已经消费过数据

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/681993.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

awfawfaw

c语言中的小小白-CSDN博客c语言中的小小白关注算法,c,c语言,贪心算法,链表,mysql,动态规划,后端,线性回归,数据结构,排序算法领域.https://blog.csdn.net/bhbcdxb123?spm1001.2014.3001.5343 给大家分享一句我很喜欢我话&#xff1a; 知不足而奋进&#xff0c;望远山而前行&am…

12- Redis 中的 链表 数据结构

Redis 的 List 对象的底层实现之一就是链表。C 语言本身没有链表这个数据结构&#xff0c;所以 Redis 自己设计了一个链表数据结构。 1. 链表节点结构设计 先来看看【链表节点】结构的样子&#xff1a; typedef struct listNode {//前置节点struct listNode *prev;//后置节点…

【AI大模型】Transformers大模型库(二):AutoModelForCausalLM

目录​​​​​​​ 一、引言 二、AutoModelForCausalLM 2.1 概述 2.2 主要功能 2.3 代码示例 三、总结 一、引言 这里的Transformers指的是huggingface开发的大模型库&#xff0c;为huggingface上数以万计的预训练大模型提供预测、训练等服务。 &#x1f917; Transfo…

K8s service 底层逻辑

文章目录 K8s service 底层逻辑Kube-proxy 代理模式Service 请求情况Service-Iptables 模式iptables 规则介绍ClusterIP 模式分析NodePort 模式分析 Service- IPVS 模式 服务发现环境变量CoreDNSCoreDNS 策略ClusterFirst&#xff08;默认DNS策略&#xff09;CluterFirstWithHo…

Python学习从0开始——Kaggle机器学习004总结2

Python学习从0开始——Kaggle机器学习004总结2 一、缺失值二、分类变量2.1介绍2.2实现1.获取训练数据中所有分类变量的列表。2.比较每种方法方法1(删除分类变量)方法2(序数编码)方法3独热编码 三、管道3.1介绍3.2实现步骤1:定义预处理步骤步骤2:定义模型步骤3:创建和评估管道 四…

网络安全快速入门(十五)(中)用户的文件属性及用户相关文件详解

15.4 序言 我们之前已经了解了关于用户管理的一些基础命令&#xff0c;本章节我们就来了解一下关于文件权限的一些小知识以及基于某些文件来手动创建一个用户&#xff0c;话不多说&#xff0c;我们开始吧&#xff01; 15.5 文件权限 在linux中&#xff0c;文件都是通过查看属主…

深入解析ArrayList是如何实现自动扩容的?【源码深度解析】

一 、分析ArrayList扩容源码 通过在 &#xff08;JDK 6 及更低版本中&#xff09;API我们知道&#xff0c;调用无参构造创建的ArrayList集合&#xff0c;初始容量为10 接下来我们深入源码&#xff0c;探究当时作者是怎么构思的 借助Debug工具&#xff0c;一步一步进入 上图看到…

修复Windows上“发生意外错误”问题的5种方法,总有一种适合你

在尝试启动网络适配器的设置菜单时,是否收到“发生意外错误”消息?不用担心,因为在大多数情况下解决这个问题很容易。我们将向你展示在Windows 11或Windows 10计算机上解决此问题的多种方法。 为什么我收到“发生意外错误”的消息 当网络适配器出现问题时,Windows会显示一…

2023年计算机图形学课程知识总结

去年就该写的&#xff0c;但是去年这个时候太忙了。 就写来自己看看。留个记录留个念 文章目录 1. 图形&#xff0c;图像的定义2. 点阵、矢量3. 走样&#xff0c;反走样4. 字符裁剪精度&#xff08;1&#xff09; 串精度&#xff08;2&#xff09; 字符精度&#xff08;3&…

记录:linux桌面管理基础-X11协议(X window system)

1、认识X11 X11是X协议&#xff0c;版本号为11。X协议是专门被设计为linux桌面管理服务的&#xff0c;而linux桌面环境不像windows那样作为系统内核的一部分&#xff0c;作为一个普通程序运行在用户态上。该协议的设计初衷是为了linux的图形界面满足跨平台、跨网络、与具体硬件…

C/C++图形库Easyx的使用教学

绘制简单的图形窗口 学会创建图形化窗口 包含头文件 graphics.h包含已被淘汰的函数easyx.h包含最新的函数 两个函数就可以创建窗口 Initgraph&#xff08;&#xff09;函数的定义 图形窗口的创建 #include<graphics.h>int main() {initgraph(800, 600);while (1);…

【数据库】MySQL概述(初阶)

文章目录 一、mysql概述1、数据库基本概念&#xff1a;2. 数据模型2.1 关系型数据库2.2 理解数据模型 更多数据库MySQL系统内容就在以下专栏&#xff1a; 专栏链接&#xff1a;数据库MySQL 一、mysql概述 1、数据库基本概念&#xff1a; 数据库&#xff1a; 数据存储的仓库。数…

雨课堂课件快速自动刷完

文章目录 背景f12检查 查看源代码脚本脚本使用方法总结 背景 有时候老师让我们在雨课堂里刷完这个课件。这个课件呢有时候它有三百多页&#xff0c;每一页需要停留3秒左右才可以算看过课件&#xff0c;你如果一页一页的去点的话非常的折磨人。因为课件太多页了&#xff0c;我就…

什么是Spark RDD?(RDD的介绍与创建)

什么是Spark RDD&#xff1f;(RDD的介绍与创建) 一、RDD介绍 1、特点2、RDD的存储和指向3、RDD与DAG4、RDD的特性5、RDD分区6、RDD操作类型 二、RDD创建 1、引入必要的 Spark 库2、配置 Spark3、RDD创建4、示例代码 一、RDD介绍 RDD: 弹性分布式数据集&#xff08;Resilient…

Ant Design Vue Table组件全单元格编辑实现方案

在ant上的table常见用法是一行的元素可编辑&#xff0c;如下&#xff1a; 但是现在有一个需求是全部单元格均可编辑&#xff0c;如何实现呢&#xff1f; 表格组件 <a-tablev-if"query.personnel_type 0"size"middle"row-key"id":scroll&qu…

【深度学习】安全帽检测,目标检测,Faster RCNN训练

文章目录 资料环境尝试训练安全帽数据训练测试预测全部数据、代码、训练完的权重等资料见&#xff1a; 资料 依据这个进行训练&#xff1a; https://github.com/WZMIAOMIAO/deep-learning-for-image-processing/tree/master/pytorch_object_detection/faster_rcnn ├── bac…

人工智能机器学习系统技术要求

一 术语和定义 1.1机器学习系统 machinelearningsystem 能运行或用于开发机器学习模型、算法和相关应用的软件系统。 1.2机器学习框架 machinelearningframework 利用预先构建和优化好的组件集合定义模型,实现对机器学习算法封装、数据调用处理和计算资源使用的软件库。 1…

实时监控与报警:人员跌倒检测算法的实践

在全球范围内&#xff0c;跌倒事件对老年人和儿童的健康与安全构成了重大威胁。据统计&#xff0c;跌倒是老年人意外伤害和死亡的主要原因之一。开发人员跌倒检测算法的目的是通过技术手段及时发现和响应跌倒事件&#xff0c;减少因延迟救助而造成的严重后果。这不仅对老年人群…

特征交叉系列:FM和深度神经网络的结合,DeepFM原理简述和实践

从FM&#xff0c;FFM到DeepFM 在上两节中介绍了FM和FFM 这两种算法是推荐算法中经典的特征交叉结构&#xff0c;FM将特征交叉分解到底层属性&#xff0c;通过底层属性的点乘来刻画特征交叉的计算&#xff0c;而FFM引入特征域的概念&#xff0c;对不同的特征对所引用的底层属性…

Redis 单线程问题 BigKey问题

前言 简单的redis基础类型以及常用操作我们都也已经介绍过了 现在今天我们来谈谈redis对于单线程是需要怎么理解的 以及redis假设遇见大key我们需要怎么去查询和删除呢??? redis单线程 假设有个人现在问你一个问题:redis是单线程的还是多线程的 这个问题本身就不严谨 就像问…