dubbo线程池为什么耗尽

文章概述

大家可能都遇到过DUBBO线程池打满这个问题,报错如下,本文我们就一起分析DUBBO线程池打满这个问题。
cause: org.apache.dubbo.remoting.RemotingException: Server side(10.0.0.100,20881) thread pool is exhausted, detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-10.0.0.100:20881, Pool Size: 800 (active: 800, core: 800, max: 800, largest: 800), Task: 50397601 (completed: 50396801), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://10.0.0.100:20881!

1 DUBBO线程模型

先看一张图大概了解
未命名文件 (5).png

** IO线程**

IO线程的工作实际上就是处理字节流的输入输出,对消息的读取,序列化,不涉及业务操作
NettyServer中启动netty服务端,初始化boss和work线程信息

  protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

这里分别看线程数量

    bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
    workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);

boss线程设置为1
主要看work线程(IO线程)
从url中获取线程数,如果没设置的话,设置当前机器的线程数,最少设置为32个
这个配置是iothreads,如果配置的这样配置。但是线程池耗尽并不是io线程数量不够的原因

provider:
    iothreads: 100
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        handler.received(channel, msg);
    }
  @Override
    public void received(Channel channel, Object message) throws RemotingException {
        setReadTimestamp(channel);
        if (isHeartbeatRequest(message)) {
            Request req = (Request) message;
            if (req.isTwoWay()) {
                Response res = new Response(req.getId(), req.getVersion());
                res.setEvent(HEARTBEAT_EVENT);
                channel.send(res);
                if (logger.isInfoEnabled()) {
                    int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                    }
                }
            }
            return;
        }
        if (isHeartbeatResponse(message)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
            }
            return;
        }
        handler.received(channel, message);
    }

消息的不同类型有不同的处理方式如果是心跳直接就发送回去了,
如果是业务请求那么交给业务线程池处理

  @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

业务线程池

初始化
不同线程池策略会创建不同特性的线程池:
dubbo提供了不同的线程池类型

fixed
包含固定个数线程

cached
线程空闲一分钟会被回收,当新请求到来时会创建新线程

limited
线程个数随着任务增加而增加,但不会超过最大阈值。空闲线程不会被回收

eager
当所有核心线程数都处于忙碌状态时,优先创建新线程执行任务,而不是立即放入队列

一般实际使用的就是fixed

public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

这里主要看两个参数,分别是线程数,和队列长度。默认的线程数是200,queue默认使用SynchronousQueue
SynchronousQueue由于其独有的线程一一配对通信机制,由于内部没有使用AQS,而是直接使用CAS,其并没有存储任务的队列就是将任务与线程进行匹配,如果任务进来,没用可用线程,那么将直接拒绝,这也是我们碰到拒绝策略的原因
如果需要配置

dubbo:
  protocol:
    threads: 800
    queues: 10000

业务线程线程池拒绝

这里就可以看到线程池拒绝AbortPolicyWithReport

 @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
                + "%d)," +
                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
            threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
            e.getLargestPoolSize(),
            e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
            url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }

也就是开头的那个报错,这里在发生问题会自动dump stack信息

线程池中的 getTaskCount 和 getCompletedTaskCount 是两个重要的方法,它们用于获取线程池的任务和已完成任务的统计信息。

  1. getTaskCount: 这个方法返回线程池中的当前任务数。它包括正在执行的任务和等待执行的任务。换句话说,它返回的是线程池中所有任务的总数,包括那些尚未开始执行的任务。
  2. getCompletedTaskCount: 这个方法返回线程池已完成的任务数量。它只计算那些已经完成执行的任务,而不包括正在执行或等待执行的任务。

再回头我们的那个报错。
Pool Size: 800 (active: 800, core: 800, max: 800, largest: 800), Task: 50397601 (completed: 50396801)

2、估算合适的线程数,寻找慢业务

我们知道DUBBO会选择线程池策略进行业务处理,那么如何估算可能产生的线程数呢?我们首先分析一个问题:一个公司有7200名员工,每天上班打卡时间是早上8点到8点30分,每次打卡时间系统耗时5秒。请问RT、QPS、并发量分别是多少?
RT表示响应时间,问题已经告诉了我们答案:
RT = 5

QPS表示每秒查询量,假设签到行为平均分布:
QPS = 7200 / (30 * 60) = 4

并发量表示系统同时处理的请求数量:
并发量 = QPS x RT = 4 x 5 = 20

根据上述实例引出如下公式:
并发量 = QPS x RT

如果系统为每一个请求分配一个处理线程,那么并发量可以近似等于线程数。基于上述公式不难看出并发量受QPS和RT影响,这两个指标任意一个上升就会导致并发量上升。
但是这只是理想情况,因为并发量受限于系统能力而不可能持续上升,例如DUBBO线程池就对线程数做了限制,超出最大线程数限制则会执行拒绝策略,而拒绝策略会提示线程池已满,这就是DUBBO线程池打满问题的根源。下面我们分别分析RT上升和QPS上升这两个原因。
注意上面仅仅是一个例子,实际上一个服务远比例子复杂,实践往往需要不断的调参数。才能找到合理的值
线程池耗尽,往往是因为某个业务慢导致,我们应该寻找执行缓慢的堆栈,例如使用arthas来监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/267100.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python中json模块的使用与pyecharts绘图的基本介绍

文章目录 json模块json与Python数据的相互转化 pyecharts模块pyecharts基本操作基础折线图配置选项全局配置选项 json模块的数据处理折线图示例示例代码 json模块 json实际上是一种数据存储格式&#xff0c;是一种轻量级的数据交互格式&#xff0c;可以把他理解成一个特定格式…

模型微调入门介绍一

备注&#xff1a;模型微调系列的博客部分内容来源于极客时间大模型微调训练营素材&#xff0c;撰写模型微调一系列博客&#xff0c;主要是期望把训练营的内容内化成自己的知识&#xff0c;我自己写的这一系列博客除了采纳部分训练营的内容外&#xff0c;还会扩展细化某些具体细…

【贪心】单源最短路径Python实现

文章目录 [toc]问题描述Dijkstra算法Dijkstra算法应用示例时间复杂性Python实现 个人主页&#xff1a;丷从心 系列专栏&#xff1a;贪心算法 问题描述 给定一个带权有向图 G ( V , E ) G (V , E) G(V,E)&#xff0c;其中每条边的权是非负实数&#xff0c;给定 V V V中的一个…

Packet Tracer -使用 Ping 和 Traceroute测试 网络的连通性

地址分配表 目标 第 1 部分&#xff1a;测试和恢复 IPv4 连通性 第 2 部分&#xff1a;测试和恢复 IPv6 连通性 场景 本练习中存在连通性方面的问题。除了 收集和记录有关网络的信息&#xff0c;您还需要找出 问题&#xff0c;并实施可行的解决方案来恢复网络的连通性。 注意…

c语言:求1/2+2/3+3/4+……n-1/n的和|练习题

一、题目 求1/22/33/4……n-1/n的和 如图&#xff1a; 二、思路分析 1、1/2、2/3、3/4……可以用(i/i1) 2、设置一个函数&#xff0c;求数的相加之和 三、代码截图【带注释】 四、源代码【带注释】 #include <stdio.h> int main() { int num; printf("输入…

【RabbitMQ】RabbitMQ详解(二)

RabbitMQ详解 死信队列死信来源消息TTL过期队列达到最大长度消息被拒绝 RabbitMQ延迟队列TTL的两种设置队列设置TTL消息设置TTL 整合SrpingBoot队列TTL延时队列TTL优化Rabbtimq插件实现延迟队列 死信队列 先从概念解释上搞清楚这个定义&#xff0c;死信&#xff0c;顾名思义就…

S7项目EMS输送线操作

C型钩装置是支撑轨道的挂件,通过和轨道配合可以组成寓任意输送网络。并且可以拆卸和调整。 轨道是承载重物并供载物车行走的部件,它是通过连接装置(支撑件)悬于辅梁或房架上。它分直轨和弯轨两种形式,与道岔配合,能组合成生产工艺所需的任意输送网络。 道岔是载物车沿 轨…

域内定位个人PC的三种方式(1)

会话搜集 在cmd下调用query session命令可以获得当前环境下的windows会话 NetSessionEnum 这个函数不允许直接查询是谁登陆&#xff0c;但是它允许查询是谁在访问此工作站的网络资源时所创建的网络会话&#xff0c;从而知道来自何处&#xff0c;此函数不需要高权限即可查询 第…

UE和Android互相调用

ue和android互调 这两种方式都是在UE打包的Android工程之上进行的。 一、首先是UE打包Android&#xff0c;勾选下面这项 如果有多个场景需要添加场景 工程文件在这个路径下 然后可以通过Android Studio打开&#xff0c;选择gradle打开 先运行一下&#xff0c;看看是否可以发布…

简述用C++实现SIP协议栈

SIP&#xff08;Session Initiation Protocol&#xff0c;会话初始协议&#xff09;是一个基于文本的应用层协议&#xff0c;用于创建、修改和终止多媒体会话&#xff08;如语音、视频、聊天、游戏等&#xff09;中的通信。SIP协议栈是实现SIP协议的一组软件模块&#xff0c;它…

【数据库系统概论】第3章-关系数据库标准语言SQL(1)

文章目录 3.1 SQL概述3.2 学生-课程数据库3.3 数据定义3.3.1 数据库定义3.3.2 模式的定义3.3.3 基本表的定义3.3.4 索引的建立与删除3.3.5 数据字典 3.1 SQL概述 动词 分类 三级模式 3.2 学生-课程数据库 3.3 数据定义 3.3.1 数据库定义 创建数据库 tips&#xff1a;[ ]表…

【数据结构入门精讲 | 第十七篇】一文讲清图及各类图算法

在上一篇中我们进行了的并查集相关练习&#xff0c;在这一篇中我们将学习图的知识点。 目录 概念深度优先DFS伪代码 广度优先BFS伪代码 最短路径算法&#xff08;Dijkstra&#xff09;伪代码 Floyd算法拓扑排序逆拓扑排序 概念 下面介绍几种在对图操作时常用的算法。 深度优先D…

uniapp自定义头部导航怎么实现?

一、在pages.json文件里边写上自定义属性 "navigationStyle": "custom" 二、在对应的index页面写上以下&#xff1a; <view :style"{ height: headheight px, backgroundColor: #24B7FF, zIndex: 99, position: fixed, top: 0px, width: 100% …

一起玩儿物联网人工智能小车(ESP32)——14. 用ESP32的GPIO控制智能小车运动起来(二)

摘要&#xff1a;本文主要讲解如何使用Mixly实现对单一车轮的运动控制。 下面就该用程序控制我们的小车轮子转起来了。打开Mixly软件&#xff0c;然后单击顶部“文件”菜单中的“新建”功能&#xff0c;我们来开启一个新程序的开发工作。 我们的工作同样是先从最简单的开始&am…

设计模式分类

不同设计模式的复杂程度、 细节层次以及在整个系统中的应用范围等方面各不相同。 我喜欢将其类比于道路的建造&#xff1a; 如果你希望让十字路口更加安全&#xff0c; 那么可以安装一些交通信号灯&#xff0c; 或者修建包含行人地下通道在内的多层互通式立交桥。 最基础的、 底…

视频编码码率控制

什么是码率控制 码率控制是编码器的一个重要模块&#xff0c;主要的作用就是用算法来控制编码器输出码流的大小。虽然它是编码器的一个非常重要的部分&#xff0c;但是它并不是编码标准的一部分&#xff0c;也就是说&#xff0c;标准并没有给码控设定规则。我们平时用的编码器…

50 个具有挑战性的概率问题 [04/50]:尝试直至首次成功

一、说明 你好&#xff0c;我最近对与概率相关的问题产生了兴趣。我偶然发现了 Frederick Mosteller 所著的《五十个具有挑战性的概率问题及其解决方案》这本书。我认为创建一个系列来讨论这些可能作为面试问题出现的迷人问题会很有趣。每篇文章仅包含 1 个问题&#xff0c;使其…

基于python的excel检查和读写软件

软件版本&#xff1a;python3.6 窗口和界面gui代码&#xff1a; class mygui:def _init_(self):passdef run(self):root Tkinter.Tk()root.title(ExcelRun)max_w, max_h root.maxsize()root.geometry(f500x500{int((max_w - 500) / 2)}{int((max_h - 300) / 2)}) # 居中显示…

Python学习路线 - Python语言基础入门 - Python基础综合案例 - 数据可视化 - 动态柱状图

Python学习路线 - Python语言基础入门 - Python基础综合案例 - 数据可视化 - 动态柱状图 基础柱状图构建案例效果通过Bar构建基础柱状图反转x和y轴数值标签在右侧 基础时间线柱状图绘制创建时间线创建时间线自动播放时间线设置主题 动态GDP柱状图绘制需求分析列表的sort方法带名…

分巧克力c语言

分析&#xff1a;分巧克力&#xff0c;把每一种大小列举出来&#xff0c;在对巧克力分解&#xff0c;在加上所以的分解块数&#xff0c;在和人数比较&#xff0c;如果够分&#xff0c;就保存这一次的结果&#xff0c;在增大巧克力&#xff0c;如果不够分了&#xff0c;就打印上…