【JUC进阶】14. TransmittableThreadLocal

目录

1、前言

2、TransmittableThreadLocal

2.1、使用场景

2.2、基本使用

3、实现原理

4、小结


1、前言

书接上回《【JUC进阶】13. InheritableThreadLocal》,提到了InheritableThreadLocal虽然能进行父子线程的值传递,但是如果在线程池中,就无法达到预期的效果了。为了更好的解决该问题,TransmittableThreadLocal诞生了。

2、TransmittableThreadLocal

TransmittableThreadLocal 是Alibaba开源的、用于解决 “在使用线程池等会缓存线程的组件情况下传递ThreadLocal” 问题的 InheritableThreadLocal 扩展。既然是扩展,那么自然具备InheritableThreadLocal不同线程间值传递的能力。但是他也是专门为了解决InheritableThreadLocal在线程池中出现的问题的。

官网地址:https://github.com/alibaba/transmittable-thread-local

2.1、使用场景

  1. 分布式跟踪系统 或 全链路压测(即链路打标)
  2. 日志收集记录系统上下文
  3. Session级Cache
  4. 应用容器或上层框架跨应用代码给下层SDK传递信息

2.2、基本使用

我们拿《【JUC进阶】13. InheritableThreadLocal》文中最后的demo进行改造。这里需要配合TtlExecutors一起使用。这里先讲述使用方法,具体为什么下面细说。

首先,我们需要添加依赖:

<!-- https://mvnrepository.com/artifact/com.alibaba/transmittable-thread-local -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.14.2</version>
</dependency>

其次,ThreadLocal的实现改为TransmittableThreadLocal。

static ThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();

最后创建线程池的时候,使用TTL装饰器:

static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());

完整代码如下:

// threadlocal改为TransmittableThreadLocal
static ThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();

// 线程池添加TtlExecutors
static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());

public static void main(String[] args) throws InterruptedException {

    //threadLocal.set("我是主线程的threadlocal变量,变量值为:000000");
    
    // 线程池执行子线程
    executorService.submit(() -> {
        System.out.println("-----> 子线程" + Thread.currentThread() + " <----- 获取threadlocal变量:" + threadLocal.get());
    });

    // 主线程睡眠3s,模拟运行
    Thread.sleep(3000);
    // 将变量修改为11111,在InheritableThreadLocal中修改是无效的
    threadLocal.set("我是主线程的threadlocal变量,变量值为:11111");

    // 这里线程池重新执行线程任务
    executorService.submit(() -> {
        System.out.println("-----> 子线程" + Thread.currentThread() + " <----- 获取threadlocal变量:" + threadLocal.get());
    });

    // 线程池关闭
    executorService.shutdown();
}

执行看下效果:

已经成功获取到threadlocal变量。

该方式也解决了因为线程被重复利用,而threadlocal重新赋值失效的问题。

3、实现原理

首先可以看到TransmittableThreadLocal继承InheritableThreadLocal,同时实现了TtlCopier接口。TtlCopier接口只提供了一个方法copy()。看到这里,可能有人大概猜出来他的实现原理了,既然实现了copy()方法,那么大概率是将父线程的变量复制一份存起来,接着找个地方存起来,然后找个适当的时机再还回去。没错,其实就是这样。

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
}

知道了TransmittableThreadLocal类的定义之后,我们再来看一个重要的属性holder:

// Note about the holder:
// 1. holder self is a InheritableThreadLocal(a *ThreadLocal*).
// 2. The type of value in the holder is WeakHashMap<TransmittableThreadLocal<Object>, ?>.
//    2.1 but the WeakHashMap is used as a *Set*:
//        the value of WeakHashMap is *always* null, and never used.
//    2.2 WeakHashMap support *null* value.
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
        new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
            @Override
            protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
                return new WeakHashMap<>();
            }

            @Override
            protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
                return new WeakHashMap<>(parentValue);
            }
        };

这里存放的是一个全局的WeakMap(同ThreadLocal一样,weakMap也是为了解决内存泄漏的问题),里面存放了TransmittableThreadLocal对象并且重写了initialValue和childValue方法,尤其是childValue,可以看到在即将异步时父线程的属性是直接作为初始化值赋值给子线程的本地变量对象。引入holder变量后,也就不必对外暴露Thread中的 inheritableThreadLocals,保持ThreadLocal.ThreadLocalMap的封装性。

而TransmittableThreadLocal中的get()和set()方法,都是从该holder中获取或添加该map。

重点来了,前面不是提到了需要借助于TtlExecutors.getTtlExecutorService()包装线程池才能达到效果吗?我们来看看这里做了什么事。

我们从TtlExecutors.getTtlExecutorService()方法跟进可以发现一个线程池的ttl包装类ExecutorServiceTtlWrapper。其中包含了我们执行线程的方法submit()和execute()。我们进入submit()方法:

@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
    return executorService.submit(TtlCallable.get(task, false, idempotent));
}

可以发现在线程池进行任务执行时,对我们提交的任务进行了一层预处理,TtlCallable.get()。TtlCallable也是Callable的装饰类,同样还有TtlRunnable,也是同样道理。我们跟进该方法偷瞄一眼:

@Nullable
@Contract(value = "null, _, _ -> null; !null, _, _ -> !null", pure = true)
public static <T> TtlCallable<T> get(@Nullable Callable<T> callable, boolean releaseTtlValueReferenceAfterCall, boolean idempotent) {
    if (callable == null) return null;

    if (callable instanceof TtlEnhanced) {
        // avoid redundant decoration, and ensure idempotency
        if (idempotent) return (TtlCallable<T>) callable;
        else throw new IllegalStateException("Already TtlCallable!");
    }
    return new TtlCallable<>(callable, releaseTtlValueReferenceAfterCall);
}

上面判断下当前线程的类型是否已经是TtlEnhanced,如果是直接返回,否则创建一个TtlCallable。接着进入new TtlCallable()方法:

private TtlCallable(@NonNull Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) {
    this.capturedRef = new AtomicReference<>(capture());
    this.callable = callable;
    this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
}

可以看到在初始化线程的时候,调用了一个capture()方法,并将该方法得到的值存放在capturedRef中。没错,这里就是上面我们提到的将父线程的本地变量复制一份快照,存放起来。跟进capture():

@NonNull
public static Object capture() {
    final HashMap<Transmittee<Object, Object>, Object> transmittee2Value = newHashMap(transmitteeSet.size());
    for (Transmittee<Object, Object> transmittee : transmitteeSet) {
        try {
            transmittee2Value.put(transmittee, transmittee.capture());
        } catch (Throwable t) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "exception when Transmitter.capture for transmittee " + transmittee +
                        "(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);
            }
        }
    }
    return new Snapshot(transmittee2Value);
}

这里的transmitteeSet是一个存放Transmitteedede 集合,在初始化中会将我们 前面提到的holder注册进去:

private static final Set<Transmittee<Object, Object>> transmitteeSet = new CopyOnWriteArraySet<>();

static {
    registerTransmittee(ttlTransmittee);
    registerTransmittee(threadLocalTransmittee);
}

@SuppressWarnings("unchecked")
public static <C, B> boolean registerTransmittee(@NonNull Transmittee<C, B> transmittee) {
    return transmitteeSet.add((Transmittee<Object, Object>) transmittee);
}

跟进transmittee.capture()方法,该方法由静态内部类Transmitter实现并重写,com.alibaba.ttl.TransmittableThreadLocal.Transmitter.Transmittee#capture

private static final Transmittee<HashMap<TransmittableThreadLocal<Object>, Object>, HashMap<TransmittableThreadLocal<Object>, Object>> ttlTransmittee =
        new Transmittee<HashMap<TransmittableThreadLocal<Object>, Object>, HashMap<TransmittableThreadLocal<Object>, Object>>() {
            @NonNull
            @Override
            public HashMap<TransmittableThreadLocal<Object>, Object> capture() {
                final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = newHashMap(holder.get().size());
                for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
                    ttl2Value.put(threadLocal, threadLocal.copyValue());
                }
                return ttl2Value;
            }
}

transmittee.capture()扫描holder里目前存放的k-v里的key,就是需要传给子线程的TTL对象,其中调用的threadLocal.copyValue()便是前面看到的TtlCopier接口提供的方法。

看到这里已经大致符合我们前面的猜想,将变量复制一份存起来。那么不出意外接下来应该就是要找个适当的机会还回去。我们接着看。

接下来我们看真正执行线程的时候,也就是call()方法。由于前面线程被TtlCallable包装过,以为这里的call()方法肯定是TtlCallable.call():

@Override
@SuppressFBWarnings("THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION")
public V call() throws Exception {
    // 获取由之前捕获到的父线程变量集
    final Object captured = capturedRef.get();
    if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) {
        throw new IllegalStateException("TTL value reference is released after call!");
    }
    // 这里的backup是当前线程原有的变量,这里进行备份,等线程执行完毕后,会将该变量进行恢复
    final Object backup = replay(captured);
    try {
        // 任务执行
        return callable.call();
    } finally {
        // 恢复上述提到的backup原有变量
        restore(backup);
    }
}

果然,在执行线程时,先获取之前存放起来的变量。然后调用replay():

@NonNull
public static Object replay(@NonNull Object captured) {
    final Snapshot capturedSnapshot = (Snapshot) captured;

    final HashMap<Transmittee<Object, Object>, Object> transmittee2Value = newHashMap(capturedSnapshot.transmittee2Value.size());
    for (Map.Entry<Transmittee<Object, Object>, Object> entry : capturedSnapshot.transmittee2Value.entrySet()) {
        Transmittee<Object, Object> transmittee = entry.getKey();
        try {
            Object transmitteeCaptured = entry.getValue();
            transmittee2Value.put(transmittee, transmittee.replay(transmitteeCaptured));
        } catch (Throwable t) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "exception when Transmitter.replay for transmittee " + transmittee +
                        "(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);
            }
        }
    }
    return new Snapshot(transmittee2Value);
}

继续跟进transmittee.replay(transmitteeCaptured):

@NonNull
@Override
public HashMap<TransmittableThreadLocal<Object>, Object> replay(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
    final HashMap<TransmittableThreadLocal<Object>, Object> backup = newHashMap(holder.get().size());

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
        TransmittableThreadLocal<Object> threadLocal = iterator.next();
        
        // 这里便是所有原生的本地变量都暂时存储在backup里,用于之后恢复用
        backup.put(threadLocal, threadLocal.get());

        // clear the TTL values that is not in captured
        // avoid the extra TTL values after replay when run task
        // 这里检查如果当前变量不存在于捕获到的线程变量,那么就将他清除掉,对应线程的本地变量也清理掉
        // 为什么要清除?因为从使用这个子线程做异步那里,捕获到的本地变量并不包含原生的变量,当前线程
        // 在做任务时的首要目标,是将父线程里的变量完全传递给任务,如果不清除这个子线程原生的本地变量,
        // 意味着很可能会影响到任务里取值的准确性。这也就是为什么上面需要做备份的原因。
        if (!captured.containsKey(threadLocal)) {
            iterator.remove();
            threadLocal.superRemove();
        }
    }

    // set TTL values to captured
    setTtlValuesTo(captured);

    // call beforeExecute callback
    doExecuteCallback(true);

    return backup;
}

继续跟进setTtlValuesTo(captured),这里就是把父线程本地变量赋值给当前线程了:

private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
    for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
        TransmittableThreadLocal<Object> threadLocal = entry.getKey();
        threadLocal.set(entry.getValue());
    }
}

到这里基本的实现原理也差不多了,基本和我们前面猜想的一致。但是这里还少了前面提到的backup变量如何恢复的步骤,既然到这里了,一起看一下,跟进restore(backup):

public static void restore(@NonNull Object backup) {
    for (Map.Entry<Transmittee<Object, Object>, Object> entry : ((Snapshot) backup).transmittee2Value.entrySet()) {
        Transmittee<Object, Object> transmittee = entry.getKey();
        try {
            Object transmitteeBackup = entry.getValue();
            transmittee.restore(transmitteeBackup);
        } catch (Throwable t) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "exception when Transmitter.restore for transmittee " + transmittee +
                        "(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);
            }
        }
    }
}

继续看transmittee.restore(transmitteeBackup):

@Override
public void restore(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
    // call afterExecute callback
    doExecuteCallback(false);

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
        TransmittableThreadLocal<Object> threadLocal = iterator.next();

        // clear the TTL values that is not in backup
        // avoid the extra TTL values after restore
        if (!backup.containsKey(threadLocal)) {
            iterator.remove();
            threadLocal.superRemove();
        }
    }

    // restore TTL values
    setTtlValuesTo(backup);
}

与replay类似,只是重复进行了将backup赋给当前线程的步骤。到此基本结束。附上官网的时序图帮助理解:

4、小结

所以总结下来,TransmittableThreadLocal的实现原理主要就是依赖于TtlRunnable或TtlCallable装饰类的预处理方法,TtlExecutors是将普通线程转换成Ttl包装的线程,而ttl包装的线程会进行本地变量的预处理,也就是capture()拷贝一份快照到内存中,然后通过replay方法将父线程的变量赋值给当前线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/320137.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

e2studio开发三轴加速度计LIS2DW12(4)----测量倾斜度

e2studio开发三轴加速度计LIS2DW12.4--测量倾斜度 概述视频教学样品申请源码下载计算倾斜角度工作原理单轴倾斜检测双轴倾斜检测三轴倾斜检测通信模式管脚定义IIC通信模式速率新建工程工程模板保存工程路径芯片配置工程模板选择时钟设置UART配置UART属性配置设置e2studio堆栈e…

自编C++题目——输入程序

预估难度 简单 题目描述 小明编了一个输入程序&#xff0c;当用户的输入之中有<时&#xff0c;光标移动到最右边&#xff1b;当输入有>时&#xff0c;光标移动到最左边&#xff0c;当输入有^时&#xff0c;光标移动到前一个字符&#xff0c;当输入为#时&#xff0c;清…

大模型实战营Day4 XTuner 大模型单卡低成本微调实战 作业

按照文档操作&#xff1a; 单卡跑完训练&#xff1a; 按照要求更改微调的数据&#xff1a; 完成微调数据的脚本生成&#xff1a; 修改配置文件&#xff1a; 替换好文件后启动&#xff1a; 启动后终端如图&#xff1a; 用于微调的一些数据显示&#xff1a; 训练时间&#x…

记录一次git merge后发现有些文件不对的问题,排查过程

分支进行merge&#xff08;A merge到B&#xff09;之后&#xff0c;发现string.xml中有些字段的值没有merge过来&#xff0c;一开始还以为自己是自己merge错误&#xff0c;检查了一遍自己的merge操作没有问题。 那为啥没有merge过来呢&#xff1f;有一种可能是&#xff0c;merg…

vue2实现日历12个月平铺,显示工作日休息日

参考&#xff1a;https://blog.csdn.net/weixin_40292154/article/details/125312368 1.组件DateCalendar.vue&#xff0c;sass改为less <template><div class"cc-calendar"><div class"calendar-title"><span>{{ year }}年{{ mo…

【Linux Shell】5. 运算符

文章目录 【 1. 算术运算符 】1.1 expr 命令1.2 [ ] 方括号 【 2. 关系运算符 】【 3. 布尔运算符 】【 4. 逻辑运算符 】【 5. 字符串运算符 】【 6. 文件测试运算符 】 【 1. 算术运算符 】 运算符说明举例赋值a$b 把变量 b 的值赋给 a。 1.1 expr 命令 原生 bash 不支持简…

SDRAM小项目——写模块

写模块跟着视频看了一个多星期&#xff0c;一开始始终有点弄不清楚&#xff0c;现在记录一下理解的过程。 阅读文档信息&#xff1a; 首先阅读文档信息&#xff0c;了解SDRAM写过程的状态转换和时序图 SDRAM整体状态流程如图所示&#xff1a; 在SDRAM整体系统中&#xff0c…

数据结构之bool类

bool类 bool 是布尔类。它是最简单的一个类&#xff0c;其取值有两种&#xff0c;1和O&#xff0c;即 True 和 False。可以这样简单地理解&#xff0c;除了1和0以及 True 和 False 的情况之外&#xff0c;但凡有值&#xff08;非空&#xff09;即为真&#xff0c;但凡无值&…

nodemon(自动重启 node 应用程序)的安装与使用

1、安装&#xff0c;在随意一个命令窗口都可以 我们可以执行安装选项 -g 进行全局安装 npm i -g nodemon 全局安装完成之后就可以在命令行的任何位置运行 nodemon 命令 该命令的作用是 自动重启 node 应用程序 2、使用&#xff1a; 可能报错如下 windows 默认不允许 npm …

【数据结构 | 直接插入排序】

直接插入排序 基本思路直接插入排序SelectSort 基本思路 扑克牌是我们几乎每个人都可能玩过的游戏最基本的扑克玩法都是—边模牌,— 边理牌。加入我们拿到如图这样的扑克牌&#xff1a; 我们会按照如下理牌&#xff1a; 将3和4移动至5的左侧&#xff0c;再将2移动到最左侧&a…

顺序表的实现(上)(C语言)

本文章主要对顺序表的介绍以及数据结构的定义,以及几道相关例题,帮助大家更好理解顺序表. 文章目录 前言 一、顺序表的静态实现 二、顺序表的动态实现 三.定义打印顺序表函数 四.定义动态增加顺序表长度函数 五.创建顺序表并初始化 六.顺序表的按位查找 七.顺序表的按值…

网络爬虫丨基于scrapy+mysql爬取博客信息并保存到数据库中

文章目录 写在前面实验描述实验框架实验需求 实验内容1.安装依赖库2.创建Scrapy项目3.配置系统设置4.配置管道文件5.连接数据库6.分析要爬取的内容7.编写爬虫文件 运行结果写在后面 写在前面 本期内容&#xff1a;基于scrapymysql爬取博客信息并保存到数据库中 实验需求 ana…

java并发编程

一、java线程 1.三种创建线程的方式 Integer sum futureTask.get(); 会等待其对应的线程执行完 &#xff0c;即阻塞 再获得结果。 所以我在测试时&#xff0c;出现一个小插曲 Slf4j public class ThreeWays {//1.自定义MyThread进行继承Threadstatic void test001(){Thread t…

HCIA-Datacom题库(自己整理分类的)_09_Telnet协议【14道题】

一、单选 1.某公司网络管理员希望能够远程管理分支机构的网络设备&#xff0c;则下面哪个协议会被用到&#xff1f; RSTP CIDR Telnet VLSM 2.以下哪种远程登录方式最安全&#xff1f; Telnet Stelnet v100 Stelnet v2 Stelnet v1 解析&#xff1a; Telnet 明文传输…

DAY7--learning english

一、积累 1.instinct Bro showed me his primal instinct 老兄给我展示他原始的本能&#xff08;返祖现象&#xff09;. 2. assembly Todays assembly is about part of journey. 今天的集会是讲述关于旅程的一部分。 3.aluminum Aluminum Casting Motocycle Engine Cover. …

【优选算法】专题四:前缀和(一)

文章目录 DP34 【模板】前缀和DP35 【模板】二维前缀和724.寻找数组的中心下标238.除自身以外数组的乘积 DP34 【模板】前缀和 DP34 【模板】前缀和 此方法的时间复杂度是O(Q)O(N); import java.util.Scanner;// 注意类名必须为 Main, 不要有任何 package xxx 信息 public cl…

LeetCode讲解篇之2280. 表示一个折线图的最少线段数

文章目录 题目描述题解思路题解代码 题目描述 题解思路 折线图中如果连续的线段共线&#xff0c;那么我们可以可以将其合并成一条线段 首先将坐标点按照横坐标升序排序 然后遍历数组 我们可以通过计算前一个线段的斜率和当前线段的斜率来判断是否共线 如果二者相等&#x…

[c语言]猜数字游戏

一男子学了分支与循环后&#xff0c;觉得的不够得劲&#xff0c;于是半夜打开浏览器查询了相关的学习资料&#xff0c;发现了猜数字这款游戏&#xff0c;然后他被这游戏深深的吸引毅然决然完成了这道题&#xff0c;玩了几把后并表示这比金铲铲、原神等游戏都要好玩。 猜数字&a…

从Demo理解Thrift Thrift和Dubbo的区别

文章目录 安装demo尝试Thrift协议栈Thrift 与 Dubbo 的区别 字节里的RPC框架都是用的Thrift&#xff0c;我猜这主要原因有2: Thrift是Facebook开源的项目&#xff0c;平台中立Thrift支持跨语言调用&#xff0c;这非常适合字节Java、Go语言都存在的环境&#xff0c;语言中立 但…

苹果电脑清理内存 怎么清理删不掉的软件

苹果电脑是很多人的首选&#xff0c;因为它有着优秀的性能和设计。但是&#xff0c;随着时间的推移&#xff0c;你可能会发现你的苹果电脑变得越来越慢&#xff0c;或者出现一些奇怪的问题。这可能是因为你的电脑内存不足&#xff0c;或者有一些删不掉的软件占用了你的空间和资…