一、背景介绍
使用线程池ThreadPoolExecutor的过程中你是否有以下痛点呢?
- 代码中创建了一个 ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适
- 凭经验设置参数值,上线后发现需要调整,改代码重新发布服务,非常麻烦
- 线程池相对开发人员来说是个黑盒,运行情况不能及时感知到,直到出现问题
如果有以上痛点,动态可监控线程池框架(DynamicTp)或许可以帮助到我们。
如果看过 ThreadPoolExecutor 的源码,大概可以知道它对核心参数基本都有提供 set / get 方法以及一些扩展方法,可以在运行时动态修改、获取相应的值,这些方法有:
public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
public void allowCoreThreadTimeOut(boolean value);
public int getCorePoolSize();
public int getMaximumPoolSize();
public long getKeepAliveTime(TimeUnit unit);
public BlockingQueue<Runnable> getQueue();
public RejectedExecutionHandler getRejectedExecutionHandler();
public boolean allowsCoreThreadTimeOut();
protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);
现在大多数的互联网项目都会微服务化部署,有一套自己的服务治理体系,微服务组件中的分布式配置中心扮演的就是动态修改配置,实时生效的角色。
那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢?
答案是肯定的,而且配置中心相对都是高可用的,使用它也不用过于担心配置推送失败这类问题,而且也能减少研发动态线程池组件本身的难度和工作量。
综上,可以总结出以下的背景:
- 广泛性:在 Java 开发中,想要提高系统性能,线程池已经是一个 90% 以上开发都会选择使用的基础工具
- 不确定性:项目中可能会创建很多线程池,既有IO密集型的,也有CPU密集型的,但线程池的参数并不好确定,需要有套机制在运行过程中动态去调整参数
- 无感知性:线程池运行过程中的各项指标一般感知不到;需要有套监控报警机制在事前、事中就能让开发人员感知到线程池的运行状况,及时处理
- 高可用性:配置变更需要及时推送到客户端,需要有高可用的配置管理推送服务,配置中心是现在大多数互联网系统都会使用的组件,与之结合可以极大提高系统可用性
二、DynamicTp的功能特性
基于以上背景分析,DynamicTp框架对线程池 ThreadPoolExecutor 做一些扩展增强,主要实现以下目标:
- 实现对运行中线程池参数的动态修改,实时生效
- 实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息推送办公平台
- 定时采集线程池指标数据,配合像 Grafana 这种可视化监控平台做大盘监控
DynamicTp框架的官网地址为:
https://dynamictp.cn/
经过多个版本的迭代,目前最新版本 v1.1.7 具有以下特性:
代码零侵入:我们改变了线程池以往的使用姿势,所有配置均放在配置中心,服务启动时会从配置中心拉取配置生成线程池对象放到 Spring 容器中,使用时直接从 Spring 容器中获取,对业务代码零侵入
通知告警:提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝触发报警、任务执行或等待超时报警),已支持企业微信、钉钉、飞书、邮件报警,同时提供 SPI 接口可自定义扩展实现
运行监控:定时采集线程池指标数据,支持通过 MicroMeter、JsonLog 日志输出、Endpoint 三种方式,可通过 SPI 接口自定义扩展实现
任务增强:提供任务包装功能,实现 TaskWrapper 接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper,可以支持线程池上下文信息传递
多配置中心支持:基于主流配置中心实现线程池参数动态调整,实时生效,已支持 Nacos、Apollo、Zookeeper、Consul、Etcd、Polaris,同时也提供 SPI 接口可自定义扩展实现
中间件线程池管理:集成管理常用第三方组件的线程池,已集成 Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix、Grpc、Motan、Okhttp3、Brpc、Tars、SofaRpc、RabbitMq 等组件的线程池管理(调参、监控报警)
轻量简单:基于 SpringBoot 实现,引入 starter,接入只需简单 4 步就可完成,顺利 3 分钟搞定
多模式:参考 Tomcat 线程池提供了 IO 密集型场景使用的 EagerDtpExecutor 线程池
兼容性:JUC 普通线程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被框架监控,@Bean 定义时加 @DynamicTp 注解即可
可靠性:框架提供的线程池实现 Spring 生命周期方法,可以在 Spring 容器关闭前尽可能多的处理队列中的任务
高可扩展:框架核心功能都提供 SPI 接口供用户自定义个性化实现(配置中心、配置文件解析、通知告警、监控数据采集、任务包装等等)
线上大规模应用:参考美团线程池实践,美团内部已经有该理论成熟的应用经验
美团线程池实践:
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
三、架构设计
DynamicTp框架功能大体可以分为以下几个模块:
配置变更监听模块
线程池管理模块
监控模块
通知告警模块
三方组件线程池管理模块
四、接入步骤
4.1 引入依赖
在pom.xml中引入下述依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.bc</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- 整合Spring Boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.1.3.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- 整合Spring Cloud -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- 整合Spring Cloud Alibaba -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.0.1.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.dromara.dynamictp</groupId>
<artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
<version>1.1.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
这里有一个可能需要关注的问题,就是曾经在引入了上述依赖后,启动项目会报下述错误:
Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/json/JsonMapper
解决方法:就是关于jackson的版本依赖低了,将其版本设定为2.10.0版本就可以解决这个问题
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.10.0</version>
</dependency>
4.2 配置中心配置线程池实例
采用nacos作为配置中心,nacos服务端的版本为v2.1.2(此处我是采用Docker进行搭建服务端),nacos客户端的版本在上述依赖引入中也是2.1.2版本。
第一步: 在nacos中创建一个新的命名空间local
新创建的命名空间local其命名空间ID为:322d6d5e-16bb-4f0d-846a-bbb1012eff92
第二步:在配置管理的local命名空间下新增配置记录
新增配置的各个选项中,其Data ID为“dtp_config”,GROUP为“DTP_SERVER_GROUP”,描述为“DTP动态线程池服务的配置分组”,配置格式为YAML。
其中Data ID的命令规则为:应用名-环境profile名.文件后缀
- 应用名(Data ID前缀):默认为 ${spring.appliction.name}, 也可以使用${spring.cloud.nacos.config.prefix}来配置
- 环境profile名:既${spring.profiles.active}指定的环境。 若不区分环境,则本内容既前面的中划线都可以不用存在。
- 文件后缀:既SpringBoot中配置文件扩展名,也是Nacos中的配置格式。
上述yaml里面的配置内容如下所示:
spring:
dynamic:
tp:
enabled: true # 是否启用dynamictp,默认true
enabledBanner: true # 是否打印启动图标,默认为true表示打印,值为false表示不打印
enabledCollect: true # 是否开启监控指标采集,默认true
collectorTypes: micrometer,logging # 监控数据采集器类型(logging | micrometer | internal_logging | JMX),默认micrometer
logPath: /home/logs/dynamictp/user-center/ # 监控日志数据路径,默认 ${user.home}/logs,采集类型非logging不用配置
monitorInterval: 5 # 监控时间间隔(报警检测、指标采集),默认5s
platforms: # 通知报警平台配置
- platform: wechat
platformId: 1 # 平台id,自定义
urlKey: 3a700-127-4bd-a798-c53d8b69c # webhook 中的 key
receivers: test1,test2 # 接受人企微账号
- platform: ding
platformId: 2 # 平台id,自定义
urlKey: f80dad441fcd655438f4a08dcd6a # webhook 中的 access_token
secret: SECb5441fa6f375d5b9d21 # 安全设置在验签模式下才的秘钥,非验签模式没有此值
receivers: 18888888888 # 钉钉账号手机号
- platform: lark
platformId: 3
urlKey: 0d944ae7-b24a-40 # webhook 中的 token
secret: 3a750012874bdac5c3d8b69c # 安全设置在签名校验模式下才的秘钥,非验签模式没有此值
receivers: test1,test2 # 接受人username / openid
- platform: email
platformId: 4
receivers: 123456789@163.com # 收件人邮箱,多个用逗号隔开
executors: # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量
- threadPoolName: dtpExecutor # 线程池名称,必填
threadPoolAliasName: 测试线程池 # 线程池别名,可选
executorType: common # 线程池类型 common、eager、ordered、scheduled、priority,默认 common
corePoolSize: 6 # 核心线程数,默认为1
maximumPoolSize: 8 # 最大线程数,默认cpu核数
queueCapacity: 2000 # 队列容量,默认1024
queueType: VariableLinkedBlockingQueue # 任务队列,查看源码QueueTypeEnum枚举类,默认VariableLinkedBlockingQueue
rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类,默认AbortPolicy
keepAliveTime: 60 # 空闲线程等待超时时间,默认60
threadNamePrefix: test # 线程名前缀,默认dtp
allowCoreThreadTimeOut: false # 是否允许核心线程池超时,默认false
waitForTasksToCompleteOnShutdown: true # 参考spring线程池设计,优雅关闭线程池,默认true
awaitTerminationSeconds: 5 # 优雅关闭线程池时,阻塞等待线程池中任务执行时间,默认3,单位(s)
preStartAllCoreThreads: false # 是否预热所有核心线程,默认false
runTimeout: 0 # 任务执行超时阈值,单位(ms),默认0(不统计)
queueTimeout: 0 # 任务在队列等待超时阈值,单位(ms),默认0(不统计)
taskWrapperNames: ["ttl", "mdc"] # 任务包装器名称,继承TaskWrapper接口
notifyEnabled: true # 是否开启报警,默认true
platformIds: [4] # 报警平台id,不配置默认拿上层platforms配置的所有平台
notifyItems: # 报警项,不配置自动会按默认值(查看源码NotifyItem类)配置(变更通知、容量报警、活性报警、拒绝报警、任务超时报警)
- type: change
enabled: true
- type: capacity # 队列容量使用率,报警项类型,查看源码 NotifyTypeEnum枚举类
enabled: true
threshold: 80 # 报警阈值,默认70,意思是队列使用率达到70%告警
platformIds: [2] # 可选配置,本配置优先级 > 所属线程池platformIds > 全局配置platforms
interval: 120 # 报警间隔(单位:s),默认120
- type: liveness # 线程池活性
enabled: true
threshold: 80 # 报警阈值,默认 70,意思是活性达到70%告警
- type: reject # 触发任务拒绝告警
enabled: true
threshold: 100 # 默认阈值10
- type: run_timeout # 任务执行超时告警
enabled: true
threshold: 5 # 默认阈值10
- type: queue_timeout # 任务排队超时告警
enabled: true
threshold: 100 # 默认阈值10
点击发布以后,在其配置管理列表中就会新增一条记录:
4.3 客户端的配置文件
在resources目录下新建一个名为application.yml的文件:
server:
port: 10086
spring:
application:
name: dtp-server
接着还需在resources目录下新建一个名为bootstrap.yml的文件(bootstrap.yml文件加载顺序先于application.yml):
spring:
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:28999
namespace: 322d6d5e-16bb-4f0d-846a-bbb1012eff92
config:
server-addr: 127.0.0.1:28999
namespace: 322d6d5e-16bb-4f0d-846a-bbb1012eff92
group: DTP_SERVER_GROUP # 配置组(如果不指定,则默认为DEFAULT_GROUP)
prefix: dtp_config # Data ID的前缀(如果不指定,则默认取 ${spring.appliction.name})
file-extension: yaml # 指定文件后缀(如果不指定,则默认为properties),此处指定为yaml格式
extension-configs:
- dataId: dtp_config.yaml
group: DTP_SERVER_GROUP
refresh: true # 必须配置,负责自动刷新不生效
refresh-enabled: true # 如果在Nacos控制台界面中人工调整配置项的值,SpringBoot会立即自动取得最新值。因为Nacos客户端带自动刷新功能,可以通过配置 spring.cloud.nacos.config.refresh.enabled=false 来关闭自动刷新
至此关于这一步的操作算是已经完成了,接下来将说一下nacos的共享和常规配置文件相关的扩展内容。
spring.cloud.nacod.config.shared-configs:这是一个用于读取共享的配置文件的配置项,里面可以有多组配置。 在源码中有三个属性:data-id、group、refresh。
public static class Config {
private String dataId;
private String group;
private boolean refresh;
...
}
spring.cloud.nacod.config.extension-configs:在shared-configs之后加载,但是优先级大于shared-configs,一般用于单模块配置。shared-configs可以配置为项目共有配置,如redis配置,数据库链接等等。
spring:
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:28999
namespace: 322d6d5e-16bb-4f0d-846a-bbb1012eff92
config:
server-addr: 127.0.0.1:28999
namespace: 322d6d5e-16bb-4f0d-846a-bbb1012eff92
group: DTP_SERVER_GROUP # 配置组(如果不指定,则默认为DEFAULT_GROUP)
prefix: dtp_config # Data ID的前缀(如果不指定,则默认取 ${spring.appliction.name})
file-extension: yaml # 指定文件后缀(如果不指定,则默认为properties),此处指定为yaml格式
# 用于共享的配置文件
shared-configs:
- data-id: common-mysql.yaml
group: SPRING_CLOUD_EXAMPLE_GROUP
- data-id: common-redis.yaml
group: SPRING_CLOUD_EXAMPLE_GROUP
- data-id: common-base.yaml
group: SPRING_CLOUD_EXAMPLE_GROUP
# 常规配置文件,优先级大于 shared-configs,在 shared-configs 之后加载
extension-configs:
- data-id: nacos-config-advanced.yaml
group: SPRING_CLOUD_EXAMPLE_GROUP
refresh: true
- data-id: nacos-config-base.yaml
group: SPRING_CLOUD_EXAMPLE_GROUP
refresh: true
refresh-enabled: true
4.4 启动类添加@EnableDynamicTp注解
package com.example.demo;
import org.dromara.dynamictp.core.spring.EnableDynamicTp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@EnableDynamicTp
@EnableDiscoveryClient
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
4.5 使用DtpExecutor
第一种方式:使用@Resource 或 @Autowired 进行依赖注入
package com.example.demo.controller;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.net.InetAddress;
@Slf4j
@RefreshScope
@RestController
@RequestMapping("/demo")
public class DemoController {
@Value("${spring.dynamic.tp.executors[0].corePoolSize}")
String springDynamicTpExecutor0CorePoolSize;
@Resource
private DtpExecutor dtpExecutor;
@GetMapping("/select")
public String select() throws Exception {
log.info("executors[0]的核心线程数:{}",springDynamicTpExecutor0CorePoolSize);
dtpExecutor.execute(() -> {
log.info("开始执行任务");
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
log.info("结束执行任务");
});
JSONObject jsonObject = new JSONObject(true);
jsonObject.put("IP", InetAddress.getLocalHost().getHostAddress());
jsonObject.put("可用处理器数量", Runtime.getRuntime().availableProcessors());
jsonObject.put("核心线程数", dtpExecutor.getCorePoolSize());
jsonObject.put("最大线程数", dtpExecutor.getMaximumPoolSize());
jsonObject.put("正在工作的线程数", dtpExecutor.getActiveCount());
jsonObject.put("队列中的任务数", dtpExecutor.getQueue().size());
jsonObject.put("已提交的任务总数", dtpExecutor.getTaskCount());
return jsonObject.toString();
}
}
第二种方式:通过 DtpRegistry.getDtpExecutor("name") 获取
package com.example.demo.controller;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.core.DtpRegistry;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.net.InetAddress;
@Slf4j
@RefreshScope
@RestController
@RequestMapping("/demo")
public class DemoController {
@Value("${spring.dynamic.tp.executors[0].corePoolSize}")
String springDynamicTpExecutor0CorePoolSize;
@GetMapping("/select")
public String select() throws Exception {
log.info("executors[0]的核心线程数:{}",springDynamicTpExecutor0CorePoolSize);
DtpExecutor dtpExecutor = DtpRegistry.getDtpExecutor("dtpExecutor");
dtpExecutor.execute(() -> {
log.info("开始执行任务");
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
log.info("结束执行任务");
});
JSONObject jsonObject = new JSONObject(true);
jsonObject.put("IP", InetAddress.getLocalHost().getHostAddress());
jsonObject.put("可用处理器数量", Runtime.getRuntime().availableProcessors());
jsonObject.put("核心线程数", dtpExecutor.getCorePoolSize());
jsonObject.put("最大线程数", dtpExecutor.getMaximumPoolSize());
jsonObject.put("正在工作的线程数", dtpExecutor.getActiveCount());
jsonObject.put("队列中的任务数", dtpExecutor.getQueue().size());
jsonObject.put("已提交的任务总数", dtpExecutor.getTaskCount());
return jsonObject.toString();
}
}
经过测试,不管是才有那种获取方式都可以实现线程池的动态修改和实时更新!
五、通知报警
触发报警阈值会推送相应报警消息(活性、容量、拒绝、任务等待超时、任务执行超时),且会高亮显示相应字段,如下图所示:
配置变更会推送通知消息,且会高亮变更的字段,如下图所示:
特别说明:在引入邮件推送时,这个还需要引入额外的依赖包,具体怎么做可以参考官方技术文档。
六、监控
目前框架提供了四种监控数据采集方式,通过 collectorTypes 属性配置监控指标采集类型,默认 Micrometer:
Logging:线程池指标数据会以 Json 格式输出到指定的日志文件里
Internal_logging:线程池指标数据会以 Json 格式输出到项目日志文件里
Micrometer:采用监控门面,通过引入相关 Micrometer 依赖采集到相应的存储平台里(如 Prometheus,InfluxDb...)
Endpoint:暴露 Endpoint 端点,可以通过 http 方式实时获取指标数据