Sprint Boot教程之五十八:动态启动/停止 Kafka 监听器

Spring Boot – 动态启动/停止 Kafka 监听器

当 Spring Boot 应用程序启动时,Kafka Listener 的默认行为是开始监听某个主题。但是,有些情况下我们不想在应用程序启动后立即启动它。

动态启动或停止 Kafka Listener,我们需要三种主要方法,即在需要处理 Kafka 消息时启动/停止、使用@KafkaListener注释、使用 kafkaListenerEndpointRegistry

在本文中,我们将介绍如何动态启动或停止 Kafka 监听器。

启动/停止 Kafka 监听器的不同方法

方法一

  • 当需要处理 Kafka 消息时,启动一个应用程序。
  • 处理成功后停止应用程序。

方法 2:在注册 Kafka Listener 时,我们可以设置以下 id 属性。

@KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", 
                   containerFactory = "messageListenerFactory", autoStartup = "false")
    public void consumeMessage(Message message)

方法 3:自动连接KafkaListenerEndpointRegistry bean 来控制 Kafka Listener 的启动或停止。

@Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

开始:

 public boolean startListener(String listenerId) {
        MessageListenerContainer listenerContainer = 
          kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        assert listenerContainer != null : false;
        listenerContainer.start();

停止:

public boolean stopListener(String listenerId) {
        MessageListenerContainer listenerContainer = 
          kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        assert listenerContainer != null : false;
        listenerContainer.stop();
        logger.info("{} Kafka Listener Stopped.", listenerId);

下面我们将以上述句法方法为例进行实现。

启动或停止特定 Kafka Listener的实现

创建一个类,其对象将被 Kafka 侦听器使用。

文件:Message.java

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    private String message;
}

配置 Kafka Listener 将使用的消费者。

文件:KakfaConsumerConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    private String kafkaUrl = "localhost:9092";

    @Bean
    public ConsumerFactory<String, Message> messageConsumerFactory() {
        JsonDeserializer<Message> deserializer = 
             new JsonDeserializer<>(Message.class, false);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                   StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                   JsonDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), 
                                                 deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> 
      messageListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> containerFactory = 
          new ConcurrentKafkaListenerContainerFactory();
        containerFactory.setConsumerFactory(messageConsumerFactory());
        return containerFactory;
    }
}

创建一个具有必要参数的 Kafka 监听器。

  • id:此侦听器的容器唯一标识符。如果未指定,则使用自动生成的 ID。
  • groupId:仅为该监听器使用该值覆盖消费者工厂的 group.id 属性。
  • 主题:此侦听器的主题。条目可以是“主题名称”、“属性占位符键”或“表达式”。主题名称必须从表达式解析。这使用组管理,Kafka 将为组成员分配分区。
  • containerFactory:KafkaListenerContainerFactory的 bean 名称,将用于创建为该端点提供服务的消息侦听器容器。
  • autoStartup:设置为 true 或 false 以覆盖容器工厂的默认设置。默认情况下,该值设置为 true,因此,它将在我们的应用程序启动时立即开始使用消息。

文件:KafkaMessageListener.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaMessageListener {

    Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);

    @KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", 
                   containerFactory = "messageListenerFactory", autoStartup = "false")
    public void consumeMessage(Message message) {
        logger.info("Message received : -> {}", message);
    }
}

KafkaListenerEndpointRegistry 类可用于通过 listenerId 获取 Kafka 侦听器容器。这里我们使用了@KafkaListener注释来将 bean 方法声明为 Kafka 侦听器容器的侦听器。现在可以使用此容器启动或停止 Kafka 侦听器。

文件:KafkaListenerAutomation.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
public class KafkaListenerAutomation {

    private final Logger logger = LoggerFactory.getLogger(KafkaListenerAutomation.class);

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public boolean startListener(String listenerId) {
        MessageListenerContainer listenerContainer = 
           kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        assert listenerContainer != null : false;
        listenerContainer.start();
        logger.info("{} Kafka Listener Started", listenerId);
        return true;
    }

    public boolean stopListener(String listenerId) {
        MessageListenerContainer listenerContainer = 
          kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        assert listenerContainer != null : false;
        listenerContainer.stop();
        logger.info("{} Kafka Listener Stopped.", listenerId);
        return true;
    }
}

使用 API 端点,我们可以通过提供 listenerID 来启动或停止特定的 Kafka 监听器。

文件:StartOrStopListenerController.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class StartOrStopListenerController {

    @Autowired
    KafkaListenerAutomation kafkaListenerAutomation;

    @GetMapping("/start")
    public void start(@RequestParam("id") String listenerId) {
        kafkaListenerAutomation.startListener(listenerId);
    }

    @GetMapping("/stop")
    public void stop(@RequestParam("id") String listenerId) {
        kafkaListenerAutomation.stopListener(listenerId);
    }
}

输出:

1.Kafka Listener启动:

2.Kafka Listener 收到消息:

3. Kafka Listener 停止:

最后

理想情况下,应用程序应在需要处理 Kafka 消息时启动,并在该过程完成后立即停止。限制 Kafka 侦听器以有效利用它是一种很好的做法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/954147.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker save load 镜像 tag 为 <none>

一、场景分析 我从 docker hub 上拉了这么一个镜像。 docker pull tomcat:8.5-jre8-alpine 我用 docker save 命令想把它导出成 tar 文件以便拷贝到内网机器上使用。 docker save -o tomcat-8.5-jre8-alpine.tar.gz 镜像ID 当我把这个镜像传到别的机器&#xff0c;并用 dock…

计算机网络(三)——局域网和广域网

一、局域网 特点&#xff1a;覆盖较小的地理范围&#xff1b;具有较低的时延和误码率&#xff1b;使用双绞线、同轴电缆、光纤传输&#xff0c;传输效率高&#xff1b;局域网内各节点之间采用以帧为单位的数据传输&#xff1b;支持单播、广播和多播&#xff08;单播指点对点通信…

浅谈云计算01 | 云计算服务的特点

在当今数字化时代&#xff0c;云计算作为一种强大的技术解决方案&#xff0c;正逐渐改变着企业和个人对信息技术的使用方式。本文将详细探讨云计算的五个主要特点&#xff0c;包括按需自助服务、广泛的网络接入、资源池化、快速弹性伸缩以及可计量服务。 一、按需自助服务 云…

【Qt】01-了解QT

踏入QT的殿堂之路 前言一、创建工程文件1.1 步骤介绍1.2 编译介绍方法1、方法2、编译成功 二、了解框架2.1 main.cpp2.2 .Pro文件2.2.1 注释需要打井号。2.2.2 F1带你进入帮助模式2.2.3 build文件 2.3 构造函数 三、编写工程3.1 main代码3.2 结果展示 四、指定父对象4.1 main代…

DDD - 微服务设计与领域驱动设计实战(上)_统一建模语言及事件风暴会议

文章目录 Pre概述业务流程需求分析的困境统一语言建模事件风暴会议什么是事件风暴&#xff08;Event Storming&#xff09;事件风暴会议 总结 Pre DDD - 软件退化原因及案例分析 DDD - 如何运用 DDD 进行软件设计 DDD - 如何运用 DDD 进行数据库设计 DDD - 服务、实体与值对…

ssh2详细使用步骤,以及常用方法介绍

开源地址&#xff1a;https://github.com/mscdex/ssh2 ssh2 是一个功能强大的 Node.js 库&#xff0c;用于通过 SSH 协议与远程服务器交互。它支持命令执行、文件上传下载、端口转发等操作&#xff0c;常用于自动化脚本和远程服务器管理。 下面是 ssh2 的详细使用步骤和常用方…

Leetcode 377. 组合总和 Ⅳ 动态规划

原题链接&#xff1a;Leetcode 377. 组合总和 Ⅳ 可参考官解 class Solution { public:int combinationSum4(vector<int>& nums, int target) {vector<int> dp(target 1);dp[0] 1;// 总和为 i 的元素组合的个数for (int i 1; i < target; i) {// 每次都…

从epoll事件的视角探讨TCP:三次握手、四次挥手、应用层与传输层之间的联系

目录 一、应用层与TCP之间的联系 二、 当通信双方中的一方如客户端主动断开连接时&#xff0c;仅是在客户端的视角下连接已经断开&#xff0c;在服务端的眼中&#xff0c;连接依然存在&#xff0c;为什么&#xff1f;——触发EPOLLRDHUP事件&#xff1a;对端关闭连接或停止写…

dockerfile实现lnmp

dockerfile实现lnmp 自定义镜像实现整个架构 (基础镜像centos7) nginx cd /opt mkdir nginx mysql php vim Dockerfile docker network create --subnet172.111.0.0/16 mynetwork #创建自定义网段 docker run -itd --name nginx -p 80:80 --cpu-quota 20000 -m 512m -v /op…

unity下载newtonsoft-json

Package Manager&#xff0c;输入com.unity.nuget.newtonsoft-json 右键Assets-Reinport All

python学opencv|读取图像(三十一)缩放图像的三种方法

【1】引言 前序学习进程中&#xff0c;我们至少掌握了两种方法&#xff0c;可以实现对图像实现缩放。 第一种方法是调用cv2.resize()函数实现&#xff0c;相关学习链接为&#xff1a; python学opencv|读取图像&#xff08;三&#xff09;放大和缩小图像_python opencv 读取图…

PyCharm 引用其他路径下的文件报错 ModuleNotFound 或报红

PyCharm 中引用其他路径下的文件提示 ModuleNotFound&#xff0c;将被引用目录添加到系统路径&#xff1a; # # 获取当前目录 dir_path os.path.dirname(os.path.realpath(__file__)) # # 获取上级目录 parent_dir_path os.path.abspath(os.path.join(dir_path, os.pardir))…

ClickHouse-CPU、内存参数设置

常见配置 1. CPU资源 1、clickhouse服务端的配置在config.xml文件中 config.xml文件是服务端的配置&#xff0c;在config.xml文件中指向users.xml文件&#xff0c;相关的配置信息实际是在users.xml文件中的。大部分的配置信息在users.xml文件中&#xff0c;如果在users.xml文…

自动连接校园网wifi脚本实践(自动网页认证)

目录 起因执行步骤分析校园网登录逻辑如何判断当前是否处于未登录状态&#xff1f; 书写代码打包设置开机自动启动 起因 我们一般通过远程控制的方式访问实验室电脑&#xff0c;但是最近实验室老是断电&#xff0c;但重启后也不会自动连接校园网账户认证&#xff0c;远程工具&…

iOS 解决两个tableView.嵌套滚动手势冲突

我们有这样一个场景&#xff0c;就是页面上有一个大的tableView&#xff0c; 每一个cell都是和屏幕一样高的&#xff0c;然后cell中还有一个可以 tableView&#xff0c;比如直播间的情形&#xff0c;这个时候如果我们拖动 cell里面的tableView滚动的话&#xff0c;如果滚动到内…

机组存储系统

局部性 理论 程序执行&#xff0c;会不均匀访问主存&#xff0c;有些被频繁访问&#xff0c;有些很少被访问 时间局部性 被用到指令&#xff0c;不久可能又被用到 产生原因是大量循环操作 空间局部性 某个数据和指令被使用&#xff0c;附近数据也可能使用 主要原因是顺序存…

LeetCode热题100-二叉树的中序遍历【JavaScript讲解】

题目&#xff1a; 二叉树&#xff1a; 二叉树的遍历是指按照某种特定的顺序访问二叉树中的每个节点&#xff0c;使得每个节点被访问且仅被访问一次。二叉树的遍历主要分为三种&#xff1a;先序遍历&#xff08;前序遍历&#xff09;、中序遍历和后序遍历。 ‌先序遍历&#x…

【Linux】正则表达式

正则表达式是一种可供Linux工具过滤文本的自定义模板&#xff0c;Linux工具&#xff08;如sed、gawk&#xff09;会在读取数据时使用正则表达式对数据进行模式匹配。 正则表达式使用元字符来描述数据流中的一个或多个字符。它是由正则表达式引擎实现的。正则表达式引擎是一种底…

计算机视觉算法实战——步态识别(主页有源码)

✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ ​ ​​​​​​​​​​​​​​​​​​ 1. 步态识别简介✨✨ 步态识别&#xff08;Gait Recognition&#xff09;是计算机视觉领域中的一个…

【RTSP】使用webrtc播放rtsp视频流

一、简介 rtsp流一般是监控、摄像机的实时视频流,现在的主流浏览器是不支持播放rtsp流文件的,所以需要借助其他方案来播放实时视频,下面介绍下我采用的webrtc方案,实测可行。 二、webrtc-streamer是什么? webrtc-streamer是一个使用简单机制通过 WebRTC 流式传输视频捕获…