全链路追踪必备组件之 TransmittableThreadLocal 详解
作者 | 姜日游
杏仁 Java 工程师。今日持续在线的程序玩家。
我们都知道 ThreadLocal 作为一种多线程处理手段,将数据限制在当前线程中,避免多线程情况下出现错误。
一般的使用场景大多会是服务上下文、分布式日志跟踪。
但是在业务代码中,为了提高响应速度,将多个复杂、长时间的计算或调用过程异步进行,让主线程可以先进行其他操作。像我们项目中最常用的就是 CompletableFuture
了,默认会使用预设的 ForkJoin ThreadPool
执行。
这也就引入了一个问题,如果保证 ThreadLocal 的信息能够传递异步线程?通过 ThreadLocal?通过线程池?通过 Runnable 或者 Callable?
有些场景丢了就丢了,比如目前我们的服务上下文传递,一般都没有很严谨的处理 ……
但是,如果是分布式追踪的场景,丢了就要累惨了。
注:以下代码仅保留关键代码,其余无关紧要则忽略
InheritableThreadLocal
InheritableThreadLocal
是 JDK 本身自带的一种线程传递解决方案。顾名思义,由当前线程创建的线程,将会继承当前线程里 ThreadLocal 保存的值。
其本质上是 ThreadLocal 的一个子类,通过覆写父类中创建初始化的相关方法来实现的。我们知道,ThreadLocal 实际上是 Thread 中保存的一个 ThreadLocalMap
类型的属性搭配使用才能让广大 Javaer 直呼真香的,所以 InheritableThreadLocal 也是如此。
public class Thread implements Runnable { // 如果单纯使用 ThreadLocal,则 Thread 使用该属性值保存 ThreadLocalMap ThreadLocal.ThreadLocalMap threadLocals = null; // 否则使用该属性值 ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc) { Thread parent = currentThread(); if (parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); } }
init
方法作为 Thread 初始化的核心方法,相关 ThreadLocal 代码已经全部摘出。如我们所见,仅仅就只是这一点改动。在创建线程时,如果当前线程的 inheritableThreadLocals 不为空,则根据它创建出新的 InheritableThreadLocals 保存到新线程中。
Ps : ThreadLocal 作为老牌选手,默认都是使用时,直接初始化 Thread 的 threadLocals 属性。
只有像是 InheritableThreadLocal 这样的后辈,需要特殊处理一下。
public class InheritableThreadLocal extends ThreadLocal { protected T childValue(T parentValue) { return parentValue; } ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } // Thread 中 ThreadLocalMap 不存在时的初始化动作,需要改为初始化 inheritableThreadLocals void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } }
因此,原先 ThreadLocal 会从 Thread 的 threadLocals 获取 Map,那么 InheritableThreadLocal 就要从 inheritableThreadLocals 拿了。 childValue
方法用作从父线程中获取值,可以看到,这边是直接返回的,如果是复杂对象,就直接传引用了。当然,继承覆写该方法,可以实现浅拷贝、深拷贝等等方式。
缺点
这样的方式解决了创建线程时的 ThreadLocal 传值的问题,但不可能一直创建新的线程,那实在耗费资源。因此通用做法是线程复用,比如线程池呗。但是,递交异步任务是相应的 ThreadLocal 的值就无法传递过去了。
我们希望的是,异步线程执行任务的所使用的 ThreadLocal 值,是将任务提交给线程时主线程持有的。即从任务创建时传递到任务执行时。
想想,如果我们在创建异步任务时,在任务代码外获取当前线程的值临时保存,再传递给执行线程,在真正的任务执行前保存到当前线程即可。对,确实可以,但是麻烦不?每个创建异步任务的地方都要写。
那就把它封装到递交任务的方法中。
RunnableWrapper & CallableWrapper
假设按照服务上下文的场景举例,目前项目中的执行异步操作的方案是定义一个 AsyncExecutor
,并声明执行 Supplier 返回 CompletableFuture 的方法。
既然这样就可以对方法做一些改造,保证上下文的传递。
private static ThreadLocal contextHolder = new ThreadLocal(); public static CompletableFuture invokeToCompletableFuture(Supplier supplier, String errorMessage) { // 第一步 String context = contextHolder.get(); Supplier newSupplier = () -> { // 第二步 String origin = contextHolder.get(); try { contextHolder.set(context); // 第三步 return supplier.get(); } finally { // 第四步 contextHolder.set(origin); log.info(origin); } }; return CompletableFuture.supplyAsync(newSupplier).exceptionally(e -> { throw new ServerErrorException(errorMessage, e); }); } // test code public static void main(String[] args) throws ExecutionException, InterruptedException { contextHolder.set("main"); log.info(contextHolder.get()); CompletableFuture context = invokeToCompletableFuture(() -> test.contextHolder.get(), "error"); log.info(context.get()); }
总得来说,就是在将异步任务派发给线程池时,对其做一下上下文传递的处理。
第一步:主线程获取上下文,传递给任务暂存。
1 之后的操作都将是异步执行线程操作的。
第二步:异步执行线程将原有上下文取出,暂时保存。并将主线程传递过来的上下文设置。
第三步:执行异步任务
第四步:将原有上下文设置回去。
可以看到一般并不会在异步线程执行完任务之后直接进行 remove
。而是一开始取出原上下文(可能为 NULL,也可能是线程创建时 InheritableThreadLocal 继承过来的值。当然后续也会被清除的),并在任务执行结束重新放回。这样的方式可以说是异步 ThreadLocal 传递的标准范式(大佬说的)。
这样子既起到了显式清除主线程带来的上下文,也避免了如果线程池的拒绝策略为 CallerRunsPolicy
,后续处理时上下文丢失的问题。
Supplier 不算是典型例子,更为典型的应该是 Runnable 和 Callable。不过举一推三,都是修饰一下,再丢给线程池。
public final class DelegatingContextRunnable implements Runnable { private final Runnable delegate; private final Optional delegateContext; public DelegatingContextRunnable(Runnable delegate, Optional context) { assert delegate != null; assert context != null; this.delegate = delegate; this.delegateContext = context; } public DelegatingContextRunnable(Runnable delegate) { // 修饰原有的任务,并保存当前线程的值 this(delegate, ContextHolder.get()); } public void run() { Optional originalContext = ContextHolder.get(); try { ContextHolder.set(delegateContext); delegate.run(); } finally { ContextHolder.set(originalContext); } } } public final void execute(Runnable task) { // 递交给真正的执行线程池前,对任务进行修饰 executor.execute(wrap(task)); } protected final Runnable wrap(Runnable task) { return new DelegatingContextRunnable(task); }
后续,使用线程池执行异步任务的时候,事先对任务进行封装代理即可。
不过,还是比较麻烦。自定义的线程池,需要显式处理任务。而且更严谨的做法,不同业务场景之间的线程池应该是隔离的,以免受到影响,就比如 Hystrix
的线程池。
每一个线程池都要处理就麻烦了。所以换个思路,代理线程池。
DelegaingExecutor
这个就不多说了,实际很简单,就照搬我们上下文相关类库。
public class DelegatingContextExecutor implements Executor { private final Executor delegate; public DelegatingContextExecutor(Executor delegateExecutor) { this.delegate = delegateExecutor; } public final void execute(Runnable task) { delegate.execute(wrap(task)); } protected final Runnable wrap(Runnable task) { return new DelegatingContextRunnable(task); } protected final Executor getDelegateExecutor() { return delegate; } } // 自定义的线程池,用于执行项目中的异步任务 public Executor queryExecutor() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(); // 封装服务上下文的线程池修饰 return new DelegatingContextExecutorService(threadPoolExecutor); }
问题似乎都解决了,那还有什么?
对,适用场景不够通用。上面的做法只针对于指定的 ThreadLocal,其他场景例如链路追踪、应用容器或上层框架跨应用代码给下层 SDK
传递信息(像是契约包 Feign
的执行线程)。
那么 TransmittableThreadLocal
就是为了解决通用化场景而设计的。
TransmittableThreadLocal
作为一个核心代码不超过一千行的工具框架,实际使用和架构设计都十分简单。
其使用方法本质上与上述提到的 CallableWrapper 和 DelegatingExecutor 是一样的,并且为了方便使用,对外提供了静态工厂方法或工具类。
public final void execute(Runnable task) { executor.execute(TtlCallable.get(task)); } // 或者 public Executor queryExecutor() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(); // 封装服务上下文的线程池修饰 return TtlExecutors.getTtlExecutorService(threadPoolExecutor); }
当然,前提是 ThreadLocal 必须使用 TransmittableThreadLocal。至于为什么,我们源码分析时再细细说来。
先看看核心实现类的结构,以 Callable 和 ExecutorService 为例。
整体主要是三个部分:任务( TtlCallable
)、线程池( ExecutorServiceTtlWrapper
)、ThreadLocal( TransmittableThreadLocal
)。其实对应上述讲到的 CallableWrapper、DelegatingExecutor、InheritableThreadLocal。
但是无论是任务和线程池,本身还是依赖于 TransmittableThreadLocal 对于存储值的管理。
用官方的时序图直观展示一下,框架是如何起作用的:
可以看到,从第三步创建完任务,第四步修饰完任务,后续大部分过程都依赖于 TransmittableThreadLocal 或 TransmittableThreadLocal 中声明的静态工具类 Transmitter
。Transmitter 主要负责 ThreadLocal 的管理和值的传递。
首先看看 TtlCallable。
TtlCallable
该类实际上是 JDK Callable 的一个修饰。类比于,上文讲到的 RunnableWrapper,只是为了临时保存父线程 ThreadLocal 的值,以便在执行任务之前,赋值到子线程中。
因此,TtlCallable 和 TtlExecutorService 都实现了 TtlWrapper 接口。也许你以为,该接口是实现修饰的语义,但是它只提供了一个方法,表达了拆修饰的语义:
public interface TtlWrapper extends TtlEnhanced { @NonNull T unwrap(); }
毕竟核心是修饰,所以该类主要为了提供修饰的核心抽象,便于框架对其进行判断和管理。
该方法语义要求,必须返回修饰的源对象或下层对象(毕竟可能修饰了很多层),因此也是空值安全的。null 进来,null 出去。
public final class TtlCallable implements Callable, TtlWrapper<Callable>, TtlEnhanced, TtlAttachments { // 保存父线程的 ThreadLocal 快照 private final AtomicReference capturedRef; // 实际执行任务 private final Callable callable; // 判断是否执行完,清除任务所保存的 ThreadLocal 快照 private final boolean releaseTtlValueReferenceAfterCall; private TtlCallable(@NonNull Callable callable, boolean releaseTtlValueReferenceAfterCall) { // 1.创建时, 从 Transmitter 抓取快照 this.capturedRef = new AtomicReference(capture()); this.callable = callable; this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall; } @Override public V call() throws Exception { Object captured = capturedRef.get(); // 如果 releaseTtlValueReferenceAfterCall 为 true,则在执行线程取出快照后清除。 if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after call!"); } // 2.使用 Transmitter 将快照重做到当前执行线程,并将原来的值取出 Object backup = replay(captured); try { // 3.执行任务 return callable.call(); } finally { // 4.Transmitter 重新将原值放回执行线程 restore(backup); } } }
可以看到,从实例化到任务执行的顺序,和上文讲到的 CallableWrapper 是完全一致的。但是在其之上,提供了更为完整的特性和线程安全性。
-
releaseTtlValueReferenceAfterCall
的可控,保证了任务执行完,依然被业务代码持有的场景下,避免 ThreadLocal 快照继续持有而造成的内存泄漏。毕竟,对于业务方来说,这个东西是我不关心的,无需跟随任务本身的生命周期。 -
快照使用
AtomicReference
保存,保证任务误重用下,清除快照动作的多线程安全性。
上面两者的合用,相当于期望一个任务只能被执行一次,尽量避免任务重用和继续持有。
任务重用的间隔之间,可能出现 ThreadLocal 值被修改的情况,那么后一次任务执行时,快照实际是不准确的。业务场景应该尽量避免这种情况出现才对。
该类提供了静态工厂方法,方便业务方创建。
public static TtlCallable get(@Nullable Callable callable) { return get(callable, false); } @Nullable public static TtlCallable get(@Nullable Callable callable, boolean releaseTtlValueReferenceAfterCall) { return get(callable, releaseTtlValueReferenceAfterCall, false); } @Nullable public static TtlCallable get(@Nullable Callable callable, boolean releaseTtlValueReferenceAfterCall, boolean idempotent) { if (null == callable) return null; if (callable instanceof TtlEnhanced) { // avoid redundant decoration, and ensure idempotency if (idempotent) return (TtlCallable) callable; else throw new IllegalStateException("Already TtlCallable!"); } return new TtlCallable(callable, releaseTtlValueReferenceAfterCall); }
可以看到,默认工厂方法的 releaseTtlValueReferenceAfterCall
是 false。如果想要使用执行完清除,就要注意方法的使用。
其次,这里还有一个幂等的参数控制: idempotent
。如果传入的 Callable 已经是修饰过的,那么根据 idempotent 的值,要么返回原 Callable,要么报错。
我觉得这里有个两难的点。
我们调用静态工厂方法期望得到的是调用该方法时 ThreadLocal 的快照。所以理论上,应该无论传入什么 Callable,都应该返回一个保存当前本地线程值快照的 TtlCallable。
但是,如果这样的逻辑下,传入的是已修饰的类,那么最后结果就是在任务执行时,会造成外层修饰的快照被内层修饰的覆盖。实际使用的是之前保存的快照了。
因此默认情况就只能 FastFail
。
官方并不建议设置 idempotent 为 true,因为直接返回原修饰类,本身也就违反静态工厂方法的语义。所以官方建议: DO NOT set, only when you know why.
ExecutorServiceTtlWrapper
该类并不需要多讲,本身与上文的 DelegatingExecutor 一样。
class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService, TtlEnhanced { private final ExecutorService executorService; ExecutorServiceTtlWrapper(@NonNull ExecutorService executorService) { super(executorService); this.executorService = executorService; } @NonNull @Override public Future submit(@NonNull Callable task) { return executorService.submit(TtlCallable.get(task)); } }
其余方法都是一样的做法。
从上文看到,实际 ThreadLocal 的线程传递的核心在于 TransmittableThreadLocal 和 Transmitter。
TransmittableThreadLocal
TransmittableThreadLocal 只继承了 InheritableThreadLocal 和实现了该框架提供的函数接口 TtlCopier。
因此 TransmittableThreadLocal 自身是一个 InheritableThreadLocal,同样具备了线程创建时传递的特性。
其次,从类体系上看,TransmittableThreadLocal 自身是比较简单的,本质上只是为了让框架能够进行线程传递,做了一些小动作而已。
可以看到提供的方法是十分少的,源码行数总共也才不超过200行。
首先说一下构造函数。
private final boolean disableIgnoreNullValueSemantics; public TransmittableThreadLocal() { this(false); } public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) { this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics; }
一共两个构造函数,有参构造函数允许设置 “是否禁用忽略空值语义”。默认是开启的,表现行为是如果是 null 值,那么 TransmittableThreadLocal 是不会传递这个值,并且如果 set null,同时执行 remove 操作。表达的意思就是,“我不要 null,不归我管。你敢给我,我就再也不管你了“。
这样设计可能是因为一开始设计服务于业务,是希望业务不要通过 NULL 来表达任何含义,同时避免 NPE 和优化 GC。但是后来官方考虑到作为一个基础服务框架,应该尽量保证完整的语义。毕竟这样的特性是 JDK 的 ThreadLocal 不兼容的。因此后来,官方为了保证兼容性,加了控制参数,允许禁用该特性。
TtlCopier
TransmittableThreadLocal 实现了一个类,TtlCopier。顾名思义,该类定义了线程传递时,值复制的抽象语义。
public interface TtlCopier { T copy(T parentValue); }
而 TransmittableThreadLocal 的默认实现是与 InheritableThreadLocal 相同的,返回值的引用。
public T copy(T parentValue) { return parentValue; }
同时,该接口也为业务方留下了扩展点。开发者可以重写该方法,来定义线程传递时,如何进行值的复制。
TransmittableThreadLocal 内部维护了一个非常关键的属性,用来注册项目中维护的 TransmittableThreadLocal,从而保证 Transmitter 去正确传递 ThreadLocal 的值。
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal, ?>>() { @Override protected WeakHashMap<TransmittableThreadLocal, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal, Object>(); } @Override protected WeakHashMap<TransmittableThreadLocal, ?> childValue(WeakHashMap<TransmittableThreadLocal, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal, Object>(parentValue); } };
holder 是一个 InheritableThreadLocal,用来保存所有注册的 TransmittableThreadLocal。父子线程传递时,可以直接将父线程的注册表传递过来。使用 InheritableThreadLocal,主要保证了嵌套线程场景下,注册表的正确传递。官方有个 issue 以及为其 fix 的 release 版本,从 ThreadLocal 改成了 InheritableThreadLocal。嵌入Thread调用的bug
其次,存储的是 WeakHashMap
,value 都是无意义的 null,并且永远不会被使用。这样一来,保证项目使用 TransmittableThreadLocal 的话,不会引入新的内存泄漏问题。其内存泄漏的可能风险,就只完全来自于 InheritableThreadLocal 本身。
@Override public final T get() { T value = super.get(); if (disableIgnoreNullValueSemantics || null != value) addThisToHolder(); return value; } @Override public final void set(T value) { if (!disableIgnoreNullValueSemantics && null == value) { // may set null to remove value remove(); } else { super.set(value); addThisToHolder(); } } @Override public final void remove() { removeThisFromHolder(); super.remove(); } @SuppressWarnings("unchecked") private void addThisToHolder() { if (!holder.get().containsKey(this)) { holder.get().put((TransmittableThreadLocal) this, null); // WeakHashMap supports null value. } } private void removeThisFromHolder() { holder.get().remove(this); }
get & set 会将当前的 TransmittableThreadLocal 注册到 holder 中, remove 时,会删除对应注册。
可以看到,前文说到的 disableIgnoreNullValueSemantics
的值在 get 和 set 时使用到。默认为 false 时,ThreadLocal 不会保存 null,holder 不会注册对应的 TransmittableThreadLocal。
TransmittableThreadLocal 就这样没了,可以看到就很简单。但是,线程传递的内容呢,为什么没有?
这是因为,TransmittableThreadLocal 将线程传递的所有工作全部委托给了其静态内部类 Transmitter。
Transmitter
我们讲到 TransmittableThreadLocal 会将有值的对象,注册到 holder 中,以便 Transmitter 去知道传递哪一些实例的值。但是如果这样,那不是都要修改代码,将项目中的 ThreadLocal 都改掉吗?
这当然不可能,因此 Transmitter 承担了这个任务,允许业务代码将原有的 ThreadLocal 注册进来,以方便 Transmitter 来识别和传递。
// 注册 ThreadLocal 的 threadLocalHolder 依然是 WeakHashMap private static volatile WeakHashMap<ThreadLocal, TtlCopier> threadLocalHolder = new WeakHashMap<ThreadLocal, TtlCopier>(); // ThreadLocal 手动注册时用的锁 private static final Object threadLocalHolderUpdateLock = new Object(); // 标记 ThreadLocal 的值已清除,类似于设置一个 null private static final Object threadLocalClearMark = new Object(); // 传递 TtlCopier,来确定 threadLocal 传递值的方式。默认是 引用传递,与 TransmittableThreadLocal 的 copy 一致。 public static boolean registerThreadLocal(@NonNull ThreadLocal threadLocal, @NonNull TtlCopier copier) { return registerThreadLocal(threadLocal, copier, false); } @SuppressWarnings("unchecked") public static boolean registerThreadLocalWithShadowCopier(@NonNull ThreadLocal threadLocal) { // 默认是内部定义个 shadowCopier return registerThreadLocal(threadLocal, (TtlCopier) shadowCopier, false); } public static boolean registerThreadLocal(@NonNull ThreadLocal threadLocal, @NonNull TtlCopier copier, boolean force) { // 如果是 TransmittableThreadLocal,则没有必要再维护了。默认就实现了其的传递。 if (threadLocal instanceof TransmittableThreadLocal) { logger.warning("register a TransmittableThreadLocal instance, this is unnecessary!"); return true; } synchronized (threadLocalHolderUpdateLock) { // force 为 false,则不会更新对应的 copier if (!force && threadLocalHolder.containsKey(threadLocal)) return false; // copy on write WeakHashMap<ThreadLocal, TtlCopier> newHolder = new WeakHashMap<ThreadLocal, TtlCopier>(threadLocalHolder); newHolder.put((ThreadLocal) threadLocal, (TtlCopier) copier); threadLocalHolder = newHolder; return true; } } public static boolean registerThreadLocalWithShadowCopier(@NonNull ThreadLocal threadLocal, boolean force) { return registerThreadLocal(threadLocal, (TtlCopier) shadowCopier, force); } // 清除 ThreadLocal 的注册 public static boolean unregisterThreadLocal(@NonNull ThreadLocal threadLocal) { if (threadLocal instanceof TransmittableThreadLocal) { logger.warning("unregister a TransmittableThreadLocal instance, this is unnecessary!"); return true; } synchronized (threadLocalHolderUpdateLock) { if (!threadLocalHolder.containsKey(threadLocal)) return false; WeakHashMap<ThreadLocal, TtlCopier> newHolder = new WeakHashMap<ThreadLocal, TtlCopier>(threadLocalHolder); newHolder.remove(threadLocal); threadLocalHolder = newHolder; return true; } } // 默认实现的 TtlCopier,直接引用传递 private static final TtlCopier shadowCopier = new TtlCopier() { @Override public Object copy(Object parentValue) { return parentValue; } };
其实我自己有个想不明白的,既然已经用了 threadLocalHolderUpdateLock 做锁,为什么还要用 copy on write?GC 友好?mark 一下。
剩下的部分,就是 Transmitter 怎么传递 ThreadLocal 的值了。
实际就是三个步骤,capture -> reply -> restore,crr。
1.抓取当前线程的值快照
// 快照类,用来保存当前线程的 TtlThreadLocal 和 ThreadLocal 的快照 private static class Snapshot { final WeakHashMap<TransmittableThreadLocal, Object> ttl2Value; final WeakHashMap<ThreadLocal, Object> threadLocal2Value; private Snapshot(WeakHashMap<TransmittableThreadLocal, Object> ttl2Value, WeakHashMap<ThreadLocal, Object> threadLocal2Value) { this.ttl2Value = ttl2Value; this.threadLocal2Value = threadLocal2Value; } } public static Object capture() { // 抓取快照 return new Snapshot(captureTtlValues(), captureThreadLocalValues()); } // 抓取 TransmittableThreadLocal 的快照 private static WeakHashMap<TransmittableThreadLocal, Object> captureTtlValues() { WeakHashMap<TransmittableThreadLocal, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal, Object>(); // 从 TransmittableThreadLocal 的 holder 中,遍历所有有值的 TransmittableThreadLocal,将 TransmittableThreadLocal 取出和值复制到 Map 中。 for (TransmittableThreadLocal threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } return ttl2Value; } // 抓取注册的 ThreadLocal。 private static WeakHashMap<ThreadLocal, Object> captureThreadLocalValues() { final WeakHashMap<ThreadLocal, Object> threadLocal2Value = new WeakHashMap<ThreadLocal, Object>(); // 从 threadLocalHolder 中,遍历注册的 ThreadLocal,将 ThreadLocal 和 TtlCopier 取出,将值复制到 Map 中。 for (Map.Entry<ThreadLocal, TtlCopier> entry : threadLocalHolder.entrySet()) { final ThreadLocal threadLocal = entry.getKey(); final TtlCopier copier = entry.getValue(); threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get())); } return threadLocal2Value; }
2.将快照重做到执行线程
@NonNull public static Object replay(@NonNull Object captured) { final Snapshot capturedSnapshot = (Snapshot) captured; return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); } // 重播 TransmittableThreadLocal,并保存执行线程的原值 @NonNull private static WeakHashMap<TransmittableThreadLocal, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal, Object> captured) { WeakHashMap<TransmittableThreadLocal, Object> backup = new WeakHashMap<TransmittableThreadLocal, Object>(); for (final Iterator<TransmittableThreadLocal> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal threadLocal = iterator.next(); // 遍历 holder,从 父线程继承过来的,或者之前注册进来的 backup.put(threadLocal, threadLocal.get()); // clear the TTL values that is not in captured // avoid the extra TTL values after replay when run task // 清除本次没有传递过来的 ThreadLocal,和对应值。毕竟一是可能会有因为 InheritableThreadLocal 而传递并保留的值。二来保证主线程 set 过的 ThreadLocal,不应该被传递过来。明确,其传递是由业务代码控制的,就是明确 set 过值的。 if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 将 map 中的值,设置到 ThreadLocal 中。 setTtlValuesTo(captured); // TransmittableThreadLocal 的回调方法,在任务执行前执行。 doExecuteCallback(true); return backup; } private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal, Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal, Object> entry : ttlValues.entrySet()) { TransmittableThreadLocal threadLocal = entry.getKey(); // set 的同时,也就将 TransmittableThreadLocal 注册到当前线程的注册表了。 threadLocal.set(entry.getValue()); } } private static WeakHashMap<ThreadLocal, Object> replayThreadLocalValues(@NonNull WeakHashMap<ThreadLocal, Object> captured) { final WeakHashMap<ThreadLocal, Object> backup = new WeakHashMap<ThreadLocal, Object>(); for (Map.Entry<ThreadLocal, Object> entry : captured.entrySet()) { final ThreadLocal threadLocal = entry.getKey(); backup.put(threadLocal, threadLocal.get()); final Object value = entry.getValue(); // 如果值是标记已删除,则清除 if (value == threadLocalClearMark) threadLocal.remove(); else threadLocal.set(value); } return backup; }
doExecuteCallback 是 TransmittableThreadLocal 定义的回调方法,保证任务执行前和执行后的回调动作。
isBefore 控制是执行前还是执行后。
内部调用了 beforeExecute 和 afterExecute 方法。默认是不做任何动作。
private static void doExecuteCallback(boolean isBefore) { for (TransmittableThreadLocal threadLocal : holder.get().keySet()) { try { if (isBefore) threadLocal.beforeExecute(); else threadLocal.afterExecute(); } catch (Throwable t) { // 忽略所有异常,保证任务的执行 if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t); } } } } protected void beforeExecute() { } protected void afterExecute() { }
3.恢复备份的原快照
public static void restore(@NonNull Object backup) { final Snapshot backupSnapshot = (Snapshot) backup; restoreTtlValues(backupSnapshot.ttl2Value); restoreThreadLocalValues(backupSnapshot.threadLocal2Value); } private static void restoreTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal, Object> backup) { // call afterExecute callback 任务执行完回调 doExecuteCallback(false); for (final Iterator<TransmittableThreadLocal> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal threadLocal = iterator.next(); // clear the TTL values that is not in backup // avoid the extra TTL values after restore // 恢复快照时,清除本次传递注册进来,但是原先不存在的 TransmittableThreadLocal if (!backup.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // restore TTL values // 恢复快照中的 value 到 TransmittableThreadLocal 中 setTtlValuesTo(backup); } private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal, Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal, Object> entry : ttlValues.entrySet()) { TransmittableThreadLocal threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); } } private static void restoreThreadLocalValues(@NonNull WeakHashMap<ThreadLocal, Object> backup) { for (Map.Entry<ThreadLocal, Object> entry : backup.entrySet()) { final ThreadLocal threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); } }
对特殊场景以及 Lambda 的支持
Transmitter 定义了几个特殊场景下以及 Java 8 lambda 表达式的使用。
特殊场景就是指,执行前,清除当前执行线程 ThreadLocal 的值,包括 TtlThreadLocal 和注册 ThreadLocal 。
像一开始讲到的业务代码喜欢使用 Supplier,所以也对其做了支持。本质是为了简化工作。
不过,注意的是,快照的捕获则需要业务代码自己完成并传递。
public static R runSupplierWithCaptured(@NonNull Object captured, @NonNull Supplier bizLogic) { Object backup = replay(captured); try { return bizLogic.get(); } finally { restore(backup); } } public static R runSupplierWithClear(@NonNull Supplier bizLogic) { Object backup = clear(); try { return bizLogic.get(); } finally { restore(backup); } } public static R runCallableWithCaptured(@NonNull Object captured, @NonNull Callable bizLogic) throws Exception { Object backup = replay(captured); try { return bizLogic.call(); } finally { restore(backup); } } public static R runCallableWithClear(@NonNull Callable bizLogic) throws Exception { Object backup = clear(); try { return bizLogic.call(); } finally { restore(backup); } }
简化方法,使用起来也就是:
// 线程A Object captured = Transmitter.capture(); // 线程B @Async String result = runSupplierWithCaptured(captured, () -> { System.out.println("Hello"); ... return "World"; });
否则只能按照全套流程了:
// 线程A Object captured = Transmitter.capture(); // 线程B @Async String result = runSupplierWithCaptured(captured, () -> { System.out.println("Hello"); ... return "World"; }); Object backup = Transmitter.replay(captured); // (2) try { System.out.println("Hello"); // ... return "World"; } finally { // restore the TransmittableThreadLocal of thread B when replay Transmitter.restore(backup); (3)
Clear
上面可以看到,一些方法是做了 clear 操作。
就是不依赖快照的捕获,将空值的快照信息,传递给重做方法执行,就能清除当前执行线程的值,并得到返回原值备份。
public static Object clear() { final WeakHashMap<TransmittableThreadLocal, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal, Object>(); final WeakHashMap<ThreadLocal, Object> threadLocal2Value = new WeakHashMap<ThreadLocal, Object>(); for (Map.Entry<ThreadLocal, TtlCopier> entry : threadLocalHolder.entrySet()) { final ThreadLocal threadLocal = entry.getKey(); // threadLocalClearMark 标记为未被传递和注册,更为合适,从而避免和 null 混淆。否则无法区分原有就是 null,还是未被注册 threadLocal2Value.put(threadLocal, threadLocalClearMark); } return replay(new Snapshot(ttl2Value, threadLocal2Value)); }
注意
如果注意到 TransmittableThreadLocal 是继承 InheritableThreadLocal,就应该知道,子线程创建时,值还是会被传递过去。这也就可能带来内存泄漏问题。
所以,同时提供 DisableInheritableThreadFactoryWrapper,以方便业务代码自定义线程池,禁止值的继承传递。
class DisableInheritableThreadFactoryWrapper implements DisableInheritableThreadFactory { private final ThreadFactory threadFactory; DisableInheritableThreadFactoryWrapper(@NonNull ThreadFactory threadFactory) { this.threadFactory = threadFactory; } @Override public Thread newThread(@NonNull Runnable r) { // 调用了 Transmitter 的 clear 方法,在创建子线程前,清除当前线程的值,并保存下来 final Object backup = clear(); try { return threadFactory.newThread(r); } finally { // 创建完,再重新恢复。以此,避免了值的继承传递。 restore(backup); } } @NonNull @Override public ThreadFactory unwrap() { return threadFactory; } }
对于 1.8 特性,还提供了 ForkJoinWorkerThreadFactory 和 TtlForkJoinPoolHelper 等类的支持。
Java Agent 支持
避免代码改动的话,可以使用 Java Agent,来隐式替换 JDK 的相应类。对于 1.8 的 CompletableFuture 和 Stream,在底层通过对 ForkJoinPool 的支持,也做了透明支持。
总结
到此,TransmittableThreadLocal 的源码解析就结束了。核心源码是不是很简单?但是某些思想和考量还是很值得学习的。
ThreadLocal 的使用,本身类似于全局变量,而且是可修改的。一旦中间过程被修改,就无法保证整体流程的前后一致性。它将是一个隐藏的强依赖,一个可能被忽略、意想不到的坑。(我不承认,我在还原大佬的话。)
应该尽量避免在业务代码中使用的。 DO NOT use, only when you know why .
嗯,还有加上一句,让其他人也明白,文档务必齐全。(说实话,我挺想用英文的,想想算了)。
全文完
以下文章您可能也会感兴趣:
我们正在招聘 Java 工程师,欢迎有兴趣的同学投递简历到 rd-hr@xingren.com 。