服务容错之限流之 Tomcat 限流 Tomcat 线程池的拒绝策略

在文章开头,先和大家抛出两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

JUC 线程池

在谈 Tomcat 的线程池前,先看一下 JUC 中线程池的执行流程,这里使用《Java 并发编程的艺术》中的一张图:
在这里插入图片描述

即执行流程为:

  1. 收到提交任务
  2. 当前线程数小于核心线程数,创建一个新的线程来执行任务
  3. 当前线程数大于等于核心线程数,
    • 如果阻塞队列未满,将任务存储到队列
    • 如果阻塞队列已满
      • 如果当前线程数小于最大线程数,则创建一个线程来执行新提交的任务
      • 如果当前线程数大于等于最大线程数,执行拒绝策略

可以看到设计思想是任务可以等待执行,但要尽量少的创造过多线程。如果队列很大,则很难扩大到最大线程数,同时会有大量的任务等待。

Tomcat 线程池分析

Tomcat 线程池是在 LifeCycle 中创建的。跳过前面繁琐的流程,直接看 org.apache.tomcat.util.net.NioEndpoint#startInternal

    /**
     * Start the NIO endpoint, creating acceptor, poller threads.
     */
    @Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());

            // Create worker collection
            if ( getExecutor() == null ) {
                createExecutor();
            }

            initializeConnectionLatch();

            // Start poller threads
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            startAcceptorThreads();
        }
    }

再看 org.apache.tomcat.util.net.AbstractEndpoint#createExecutor

    public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue(); //无界队列
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }

要注意这里的 ThreadPoolExecutor 不是 JUC 里面的 java.util.concurrent.ThreadPoolExecutor,而是 Tomcat 的 org.apache.tomcat.util.threads.ThreadPoolExecutor,它继承了 JUC 的 java.util.concurrent.ThreadPoolExecutor

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
  ...
}

查看它的构造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
        //提前启动核心线程
        prestartAllCoreThreads();
    }

可以发现它在构造的时候就会启动核心线程,而 java.util.concurrent.ThreadPoolExecutor 则是需要手动启动。而阻塞队列使用是 org.apache.tomcat.util.threads.TaskQueue

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = 1L;

    private volatile ThreadPoolExecutor parent = null;

    // No need to be volatile. This is written and read in a single thread
    // (when stopping a context and firing the  listeners)
    private Integer forcedRemainingCapacity = null;

    public TaskQueue() {
        super();
    }
  ...
}

而在创建 org.apache.tomcat.util.threads.TaskQueue 的时候,并没有传递 capacity,也就是说 Tomcat 的线程池使用的是无界队列。

接下来看一下最核心的org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable)

/**
     * {@inheritDoc}
     */
    @Override
    public void execute(Runnable command) {
        //重载 java.util.concurrent.ThreadPoolExecutor#execute
        execute(command,0,TimeUnit.MILLISECONDS);
    }

    public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

本质上还是执行的 java.util.concurrent.ThreadPoolExecutor#execute 方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // Tomcat 中这块逻辑不会执行,因为构造时已经初始化了核心线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    //强制入队
    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

这里的 workQueueorg.apache.tomcat.util.threads.TaskQueueorg.apache.tomcat.util.threads.TaskQueue#offer

    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        //当前线程数达到最大,任务入队
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        //如果已提交未执行完的任务数小于当前线程数(来了任务先+1,再入队,执行完才-1,说明还有空闲的worker线程),任务入队
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        // 如果当前线程数小于最大线程数量,则直接返回false,java.util.concurrent.ThreadPoolExecutor#execute 会创建新的线程来执行任务
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        //任务入队(当前线程数大于最大线程数)
        return super.offer(o);
    }

再看下拒绝策略,结合 java.util.concurrent.ThreadPoolExecutor#execute 方法,需要 java.util.concurrent.ThreadPoolExecutor#addWorker 返回 false 才会触发,即达到了最大线程数才会触发,而 org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable) 在触发了拒绝策略后还有一个特殊处理:

					//如果是 TaskQueue
					if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    //强制入队
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else { //非 TaskQueue 直接触发拒绝策略
                submittedCount.decrementAndGet();
                throw rx;
            }

再看 org.apache.tomcat.util.threads.TaskQueue#force(java.lang.Runnable, long, java.util.concurrent.TimeUnit)

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

说白了就是直接入队(无界队列):

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) { //capacity是Integer最大值
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

这么看,Tomcat 的线程池基本上不会触发拒绝策略。可以写个例子试一下:

package blog.dongguabai.others.tomcat_threadpool;

import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;

import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author dongguabai
 * @date 2023-11-18 22:04
 */
public class Demo {

    public static void main(String[] args) {
        //无界队列
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory("dongguabai_blog" + "-exec-", false, 2);
        final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, taskqueue, tf);
        taskqueue.setParent(executor);
        observe(executor);
        while (true) {
            executor.execute(new Runnable() {
                public void run() {
                    excuteForever();
                }
            });
        }

    }

    private static void observe(final ThreadPoolExecutor executor) {
        Runnable task = new Runnable() {
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(new Date().toLocaleString() + "->" + executor.getQueue().size());
                }
            }
        };
        new Thread(task).start();
    }

    public static void excuteForever() {
        while (true) {
        }
    }
}

输出:

2023-11-18 22:18:27->6541506
2023-11-18 22:18:34->14395417
2023-11-18 22:18:37->25708908
2023-11-18 22:18:50->32014458
2023-11-18 22:19:07->47236736
2023-11-18 22:19:10->65616058
2023-11-18 22:19:32->66856933
...

可以看到,队列里的任务都有六千多万了,还没有触发拒绝策略,线程池还是可以继续接收任务。

当然我们也是可以自定义的,只需要重写 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 即可:

    public Executor getExecutor() { return executor; }

org.apache.tomcat.util.net.NioEndpoint#startInternal 会进行判断:

@Override
    public void startInternal() throws Exception {

        if (!running) {
            ...
            if ( getExecutor() == null ) {
                createExecutor(); //如果没有自定义实现,就会使用默认实现
            }
        }
      ...
    }

Tomcat 默认线程池优先创建线程执行任务,达到了最大线程数,不会直接执行拒绝策略,而是尝试返回等待队列,但由于等待队列的容量是 Integer 最大值,所以几乎不会触发拒绝策略。

最后

最后再回过头看文章开头的两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?

    Tomcat 的确可以用来做限流,比如可以控制最大线程数,这样后续的任务均会在队列等待,并不会执行。org.apache.tomcat.util.net.AbstractEndpoint#setMaxConnectionsConnector 的角度设置,这块不在本文探讨范围之内。

    虽然基于 Tomcat 的限流是一种可能的方案,但在实际应用中,我们通常会选择其他的层次来实现服务限流:

    • 可扩展性:基于 Tomcat 的限流方案通常只能在单个服务实例上工作,且只能针对HTTP/HTTPS协议的请。而在微服务或者分布式系统中,我们可能需要分布式限流方案和针对多协议的 限流。
    • 灵活性:在应用层或者分布式系统层实现的限流方案通常可以提供更多的配置选项和更精细的控制。例如,请求的资源、来源或者其他属性来进行限流。
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

    Tomcat 默认无限队列,难以触发拒绝策略,所以会有内存泄漏的风险。可以基于 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 自定义线程池进行控制。

References

  • 《Java 并发编程的艺术》

欢迎关注公众号:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/162438.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[acwing周赛复盘] 第 94 场周赛20230311

[acwing周赛复盘] 第 94 场周赛20231118 总结5295. 三元组1. 题目描述2. 思路分析3. 代码实现 5296. 边的定向1. 题目描述2. 思路分析3. 代码实现 六、参考链接 总结 好久没做acw了&#xff0c;挺难的。T1 模拟T2 前缀和以及优化。T3 贪心 5295. 三元组 链接: 5295. 三元组…

操作系统(存储管理进程管理设备管理)

文章目录 存储管理页式存储管理概念优点缺点页面置换算法快表&#xff08;很快速的页表&#xff09; 段式存储管理概念优点缺点 段页式存储管理概念优点缺点 进程管理概述作用特征功能分类计算机启动基本流程 进程管理进程的组成进程的基础状态前趋图进程资源图同步和互斥信号量…

os.path.join函数用法

os.path.join()是Python中用于拼接文件路径的函数&#xff0c;它可以将多个字符串拼接成一个路径&#xff0c;并且会根据操作系统的规则自动使用合适的路径分隔符。 注&#xff1a;Linux用的是/分隔符&#xff0c;而Windows才用的是\。 该函数属于os.path模块&#xff0c;因此在…

解决Redis分布式锁宕机出现不可靠问题-zookeeper分布式锁

核心思想&#xff1a;当客户端要获取锁&#xff0c;则创建节点&#xff0c;使用完锁&#xff0c;则删除该节点。 客户端获取锁时&#xff0c;在 lock 节点下创建临时顺序节点。然后获取 lock下面的所有子节点&#xff0c;客户端获取到所有的子节点之后&#xff0c;如果发现自己…

DEEP-FRI: Sampling Outside the Box Improves Soundness论文学习笔记

1. 引言 前序博客有&#xff1a; DEEP FRI协议A summary on the FRI low degree test前2页导读RISC Zero的手撕STARKReed-Solomon Codes——RS纠错码Reed-Solomon Codes及其与RISC Zero zkVM的关系 Eli Ben-Sasson等人2019年论文《DEEP-FRI: Sampling Outside the Box Impro…

解决:微软在登录时总是弹出需要家长或监护人同意才能使用该账户并且不断循环?

目录 问题来源&#xff1a; 解决办法&#xff1a; 问题来源&#xff1a; 我的edge浏览器账号登录&#xff0c;一直弹出来需要家长或监护人同意才能使用&#xff0c;然后按照提示操作&#xff0c;会一直循环&#xff0c;是个无穷循环。 解决办法&#xff1a; 参考&#xff1…

【Java 进阶篇】唤醒好运:JQuery 抽奖案例详解

在现代社交网络和电商平台中&#xff0c;抽奖活动成为吸引用户、提升用户参与度的一种常见手段。通过精心设计的抽奖页面&#xff0c;不仅可以增加用户的互动体验&#xff0c;还能在一定程度上提高品牌的知名度。本篇博客将通过详细解析 JQuery 抽奖案例&#xff0c;带领你走进…

Linux:firewalled服务常规操作汇总

一、firewalled防火墙工作原理 firewalled的内部结构&#xff0c;可以简单的看做下图&#xff0c;有两个集合&#xff0c;一个集合管理关闭的端口&#xff0c;另一个集合管理放开的端口。 二、常用操作 1、开启和关闭防火墙 临时性配置&#xff1a; systemctl [start | stop …

【Java 进阶篇】插上翅膀:JQuery 插件机制详解

在前端开发中&#xff0c;JQuery 作为一个广泛应用的 JavaScript 库&#xff0c;为开发者提供了丰富的工具和方法&#xff0c;简化了 DOM 操作、事件处理等繁琐的任务。而在这个庞大的生态系统中&#xff0c;插件机制是 JQuery 的一项重要特性&#xff0c;使得开发者能够轻松地…

电磁场与电磁波part4--时变电磁场

1、采用洛伦兹条件使得矢量位 与标量位 分离在两个独立的方程中&#xff0c;且矢量位 仅与电流密度 有关&#xff0c;而标量位 仅与电荷密度 有关。 2、电磁能量守恒定理&#xff08;坡印廷定理&#xff09; 即减少的电磁能量电磁场所做的功流出的电磁能量 3、设u(r,t)是…

【双指针】复写0

复写0 1089. 复写零 - 力扣&#xff08;LeetCode&#xff09; 给你一个长度固定的整数数组 arr &#xff0c;请你将该数组中出现的每个零都复写一遍&#xff0c;并将其余的元素向右平移。 注意&#xff1a;请不要在超过该数组长度的位置写入元素。请对输入的数组 就地 进行上…

Git命令总结-常用-后续使用频繁的再添加~

Git命令总结-常用 久了不用&#xff0c;有些时候老是会忘记一些命令&#xff0c;多的都记录一下&#xff0c;方便查找 git init 初始化一个Git仓库&#xff0c;执行完git init命令后&#xff0c;会生成一个**.git**目录&#xff0c;该目录包含了资源数据&#xff0c;且只会在…

新增文章分类

pojo.Category package com.lin.springboot01.pojo;import jakarta.validation.constraints.NotEmpty; import lombok.Data;import java.time.LocalDateTime;Data public class Category {private Integer id;//主键NotEmptyprivate String categoryName;//分类名称NotEmptypr…

redis cluster搭建

k8s部署 Redis Insight k8s部署redis集群_mob6454cc6c6291的技术博客_51CTO博客 占用的内存竟然这么小&#xff0c;才200M左右 随便选个节点进去&#xff0c;看能否连接上其他节点 redis-cli -h redis-cluster-v1-0.redis-cluster.project-gulimall.svc.cluster.local 再创建个…

【总结】I/O接口中的数据线,地址线,控制线,状态线传输什么信息?

数据线 方向&#xff1a;双向功能&#xff1a;在内存、寄存器和数据缓冲寄存器进行数据交换&#xff1b;接口和设备的状态信息也通过数据线传给CPU&#xff08;这里的状态指的是设备独有的&#xff0c;和状态线中的忙碌、空闲相区别&#xff09;&#xff1b;CPU对外设的控制命…

tomcat8.5处理get请求时,控制台输出中文乱码问题的解决

问题描述 控制台输出中文乱码 版本信息 我使用的是tomcat8.5 问题解决 配置web.xml 注&#xff1a;SpringMVC中处理编码的过滤器一定要配置到其他过滤器之前&#xff0c;否则无效 <!--配置springMVC的编码过滤器--> <filter><filter-name>CharacterEn…

Lstm+transformer的刀具磨损预测

视频讲解: 基于Lstm+transformer的刀具磨损预测实战_哔哩哔哩_bilibili 结果展示: 数据展示: 主要代码: # pip install openpyxl -i https://pypi.tuna.tsinghua.edu.cn/simple/ # pip install optuna -i https://pypi.tuna.tsinghua.edu.cn/simple/ import numpy as np…

Java之线程的概念及方法的学习

线程创建 方法一 直接使用Thread public class demo {public static void main(String[] args) {new Thread(){Overridepublic void run() {System.out.println(Thread.currentThread().getName());}}.start();System.out.println(Thread.currentThread().getName());} } main…

The ultimate UI kit and design system for Figma 组件库下载

Untitled UI 是世界上最大的 Figma UI 套件和设计系统。可以启动任何项目&#xff0c;为您节省数千小时&#xff0c;并祝您升级为专业设计师。 采用 100% 自动布局 5.0、变量、智能变体和 WCAG 可访问性精心制作。 900全局样式、变量&#xff1a;超级智能的全局颜色、排版和效…

JDBC,Java连接数据库

下载 JDBC https://mvnrepository.com/ 创建项目&#xff0c;然后创建一个目录并将下载好的 jar 包拷贝进去 选择 Add as Library&#xff0c;让这个目录能被项目识别 连接数据库服务器 在 JDBC 里面&#xff0c;使用 DataSource 类来描述数据库的位置 import com.mysql.cj.…