InheritableThreadLocal 在 TTL 之前, 先谈谈 JDK 自带的 InheritableThreadLocal
InheritableThreadLocal
可以将变量在父子线程中传递。 根据 ThreadLocal
分析, 现成变量是存在 ThreadLocalMap
中的, InheritableThreadLocal
应该要将 ThreadLocalMap
复制一份给子线程。
InheritableThreadLocal 源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class InheritableThreadLocal<T> extends ThreadLocal<T> { protected T childValue(T parentValue) { return parentValue; } ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap (this , firstValue); } }
InheritableThreadLocal
源码非常少, 继承于 ThreadLocal
。 那么 get、set 也是使用的 ThreadLocal
提供的, 即操作的是线程的 t.threadlocals
变量
复制原理 Thread 初始化时会调用 init
, 其中有部分逻辑是:
1 2 3 4 5 // ..... Threadif (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals) ;// ..... Thread
从 Thread
构造函数来看 inheritableThreadLocals
默认是 true, 即父线程 inheritableThreadLocals
不为 null, 就将父线程的 inheritableThreadLocals
复制给子线程, 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private ThreadLocalMap (ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table ; int len = parentTable.length ; setThreshold (len); table = new Entry [len]; for (int j = 0 ; j < len; j++) { Entry e = parentTable[j]; if (e != null ) { @SuppressWarnings ("unchecked" ) ThreadLocal<Object > key = (ThreadLocal<Object >) e.get (); if (key != null ) { Object value = key .childValue (e.value ); Entry c = new Entry (key , value); int h = key .threadLocalHashCode & (len - 1 ); while (table[h] != null ) h = nextIndex (h, len); table[h] = c; size ++; } } } }
childValue 含义 InheritableThreadLocal
中实现了 childValue
方法, 从父线程复制 ThreaLocalMap
到子线程时,值从childValue 函数过了一遍再赋值给 Entry.
这里特殊处理的含义: 这个是 ThreadLocal
留给子类实现的, 有些情况下设置的值是一个自定义的引用类型,那么从父线程复制到多个子线程的值就存在并发问题(值传递,地址值是共享的),所以复制的时候要保证复制给每个子线程的地址值不一样。 需要实现这个 childValue
的深拷贝。(如 TTL 中 holder 的实现)
TTL 概述 JDK ThreadLocal
、InheritableThreadLocal
的最大局限性就是:无法为预先创建好(未投入使用)的线程实例传递变量(准确来说是首次传递某些场景是可行的,而后面由于线程池中的线程是复用的,无法进行更新或者修改变量的传递值),泛线程池Executor体系、TimerTask和ForkJoinPool等一般会预先创建(核心)线程,也就它们都是无法在线程池中由预创建的子线程执行的Runnable任务实例中使用。
InheritableThreadLocal存在的问题 无法在主线程和子线程中透传 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 static InheritableThreadLocal<String> ITL = new InheritableThreadLocal<>() static ExecutorService executorService = Executors.newFixedThreadPool(1 ) /** * ITL 无法再父子线程中透传 */ public static void main(String[] args) throws Exception { ITL.set("parent-set" ) executorService.execute(() -> { System.out.println(ITL.get()) }) TimeUnit.SECONDS.sleep(1 ) ITL.set("parent-new-value" ) executorService.execute(() -> { System.out.println(ITL.get()) }) }= = = = = = = = = = = = = = 输出 parent-set parent-set
可以看到主线程第二次设置的值并没有透传到提交的线程池中。 这是因为ITL只有第一次创建线程的时候会从父线程拿到 inheritableThreadLocals 中的数据,之后父线程对 inheritableThreadLocals 的操作都不会传递给子线程
线程池中线程存在复用的问题, 导致不同子线程之间的值互相影响 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 static InheritableThreadLocal<String> ITL = new InheritableThreadLocal<>() static ExecutorService executorService = Executors.newFixedThreadPool(1 ) /** * ITL 无法再父子线程中透传 */ public static void main(String[] args) throws Exception { ITL.set("parent-set" ) executorService.execute(() -> { System.out.println(ITL.get()) ITL.set("old-set" ) }) TimeUnit.SECONDS.sleep(1 ) ITL.set("new-set" ) executorService.execute(() -> { System.out.printf(ITL.get()) }) }= = = = = = = = = = = = 输出 parent-set old-set
可以看第二次线程池打印出了第一次在线程池中设置的值 “old-set”。
这是因为第二次执行任务的时候复用了第一次执行任务的线程, 导致第一次设置的值传递到了第二次任务
TTL 解决方案和使用 根据上面 ITL 存的局限性, 我们推出: 我们需要的并不是创建线程的那一刻父线程的ThreadLocal值,而是提交任务时父线程的ThreadLocal值。或者说需要把任务提交给线程池时的ThreadLocal值传递到任务执行时。
具体的思路是: 父线程把任务提交给线程池时一同附上此刻自己的ThreadLocalMap,包装在task里,待线程池中某个线程执行到该任务时,用task里的ThreadLocalMap赋盖当前线程ThreadLocalMap,这样就完成了父线程向池化的子线程传递线程私有数据的目标。为了避免数据污染,待任务执行完后,线程归还回线程池之前,还需要还原ThreadLocalMap,如下示:
1. 父子线程传递 与 InheritableThreadLocal
类似:
1 2 3 4 5 6 TransmittableThreadLocal<String > context = new TransmittableThreadLocal <>(); context.set ("value-set-in-parent" );String value = context.get ();
2. 线程池中传递-修饰 Runnable 和 Callable 可以使用 TtlRunnable
和 TtlCallable
来修饰传入线程池的Runnable和Callable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>() // = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // 在父线程中设置 context.set("value-set-in-parent" ) Runnable task = new RunnableTask() // 额外的处理,生成修饰了的对象ttlRunnable Runnable ttlRunnable = TtlRunnable.get(task) // 第一次提交 executorService.submit(ttlRunnable) // = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // Task中可以读取,值是"value-set-in-parent" String value = context.get() // = = = = = = = = = = = = = = = = = = = = = // 业务逻辑代码,并且修改了 TransmittableThreadLocal上下文 ... context.set("value-modified-in-parent" ) // 再次提交,重新执行修饰,以传递修改了的 TransmittableThreadLocal上下文 executorService.submit(TtlRunnable.get(task))
这里需要注意的是即使是同一个Runnable任务多次提交到线程池时,每次提交时都需要通过修饰操作(TtlRunnable.get
)
源码解析 TTL 整体框架结构 TTL 除了提供给用户使用的API,还提供了基于Agent和字节码增强实现了无感知增强泛线程池对应类的功能。 整体代码框架如下:
1 2 3 4 5 6 7 8 9 10 - transmittable-thread-local - com.alibaba.ttl - spi SPI接口和一些实现 TtlAttachments TtlAttachmentsDelegate TtlEnhanced TtlWrapper - threadpool 线程池增强,包括ThreadFactory和线程池的Wrapper 等 - agent 线程池的Agent实现相关 最外层的包有一些Wrapper 的实现和TTL
TTL 时序图:
TTL核心流程和原理是通过 TransmittableThreadLocal.Transmitter
抓取当前线程的所有TTL值并在其他线程进行回放,然后在回放线程执行完业务操作后,再恢复为回放线程原来的TTL值。
TransmittableThreadLocal(核心类) TransmittableThreadLocal
是TTL的核心类
构造函数和关键属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @FunctionalInterfacepublic interface TtlCopier <T > { T copy(T parentValue); }public class TransmittableThreadLocal <T > extends InheritableThreadLocal <T > implements TtlCopier <T > { private static final Logger logger = Logger.getLogger(TransmittableThreadLocal.class.getName()); private final bool ean disableIgnoreNullValueSemantics; public TransmittableThreadLocal() { this (false ); } public TransmittableThreadLocal(bool ean disableIgnoreNullValueSemantics) { this .disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics; } }
针对disableIgnoreNullValueSemantics
属性可以参考: TTL Issue 157 - 对于 set(null) 保持和 InheritableThreadLocal 一致语义
主要方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 public class TransmittableThreadLocal <T> extends InheritableThreadLocal <T> implements TtlCopier <T> { public T copy (T parentValue ) { return parentValue; } protected void beforeExecute ( ) { } protected void afterExecute ( ) { } @Override public final T get ( ) { T value = super .get (); if (disableIgnoreNullValueSemantics || null != value) addThisToHolder (); return value; } @Override public final void set (T value ) { if (!disableIgnoreNullValueSemantics && null == value) { remove (); } else { super .set (value); addThisToHolder (); } } @Override public final void remove ( ) { removeThisFromHolder (); super .remove (); } private void superRemove ( ) { super .remove (); } private T copyValue ( ) { return copy (get ()); } private static InheritableThreadLocal <WeakHashMap <TransmittableThreadLocal <Object >, ?>> holder = new InheritableThreadLocal <WeakHashMap <TransmittableThreadLocal <Object >, ?>>() { @Override protected WeakHashMap <TransmittableThreadLocal <Object >, ?> initialValue ( ) { return new WeakHashMap <TransmittableThreadLocal <Object >, Object >(); } @Override protected WeakHashMap <TransmittableThreadLocal <Object >, ?> childValue (WeakHashMap<TransmittableThreadLocal<Object >, ?> parentValue ) { return new WeakHashMap <TransmittableThreadLocal <Object >, Object >(parentValue); } }; @SuppressWarnings ("unchecked" ) private void addThisToHolder ( ) { if (!holder.get ().containsKey (this )) { holder.get ().put ((TransmittableThreadLocal <Object >) this , null ); } } private void removeThisFromHolder ( ) { holder.get ().remove (this ); } private static void doExecuteCallback (boolean isBefore ) { for (TransmittableThreadLocal <Object > threadLocal : holder.get ().keySet ()) { try { if (isBefore) threadLocal.beforeExecute (); else threadLocal.afterExecute (); } catch (Throwable t) { if (logger.isLoggable (Level .WARNING )) { logger.log (Level .WARNING , "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute" ) + ", cause: " + t.toString (), t); } } } } static void dump (@Nullable String title ) { if (title != null && title.length () > 0 ) { System .out .printf ("Start TransmittableThreadLocal[%s] Dump...%n" , title); } else { System .out .println ("Start TransmittableThreadLocal Dump..." ); } for (TransmittableThreadLocal <Object > threadLocal : holder.get ().keySet ()) { System .out .println (threadLocal.get ()); } System .out .println ("TransmittableThreadLocal Dump end!" ); } static void dump ( ) { dump (null ); } }
holder 从表象上看是一个静态类, 整个 JVM 只有一份变量。 实际上不是的,因为继承于 InheritableThreadLocal
,意味着,每一个线程有且只有一份这个 Holder。 这里体现的设计:
static
修饰: 一个线程中,无论TransmittableThreadLocal
被创建多少次,需要保证维护的是同一个缓存WeakHashMap
: 弱引用(发生GC就回收), 避免内存泄露整体存储结构如下:
这里有一个关键变量, 也是上面提到的 disableIgnoreNullValueSemantics
。 默认情况下disableIgnoreNullValueSemantics=false
,TTL如果设置 NULL 值,会直接从holder 移除对应的 TTL 实例,在TTL#get()
方法被调用的时候,如果原来持有的属性不为NULL,该TTL实例会重新加到holder。
如果设置为true,则set(null)
的语义和ThreadLocal一致。详细说明见上文 ISSUE 地址
Transmitter(发射器) 发射器 Transmitter
是 TransmittableThreadLocal
的一个公有静态类 的核心功能是传输所有的TransmittableThreadLocal实例
和提供静态方法注册当前线程的变量到其他线程
。
构造方法和关键属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 # TransmittableThreadLocal #Transmitter public static class Transmitter { private static volatile WeakHashMap <ThreadLocal <Object >, TtlCopier <Object >> threadLocalHolder = new WeakHashMap <ThreadLocal <Object >, TtlCopier <Object >>(); private static final Object threadLocalHolderUpdateLock = new Object (); private static final Object threadLocalClearMark = new Object (); private static final TtlCopier <Object > shadowCopier = new TtlCopier <Object >() { @Override public Object copy (Object parentValue) { return parentValue; } }; private Transmitter () { throw new InstantiationError ("Must not instantiate this class" ); } private static class Snapshot { final WeakHashMap <TransmittableThreadLocal <Object >, Object > ttl2Value; final WeakHashMap <ThreadLocal <Object >, Object > threadLocal2Value; private Snapshot (WeakHashMap <TransmittableThreadLocal <Object >, Object > ttl2Value, WeakHashMap <ThreadLocal <Object >, Object > threadLocal2Value) { this.ttl2Value = ttl2Value; this.threadLocal2Value = threadLocal2Value; } } }
主要方法 Transmitter在设计上是一个典型的工具类,外部只能调用其公有静态方法。静态方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 public static class Transmitter { @NonNull public static Object capture() { return new Snapshot (captureTtlValues(), captureThreadLocalValues()); } private static WeakHashMap <TransmittableThreadLocal <Object >, Object > captureTtlValues() { WeakHashMap <TransmittableThreadLocal <Object >, Object > ttl2Value = new WeakHashMap <TransmittableThreadLocal <Object >, Object >(); for (TransmittableThreadLocal <Object > threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } return ttl2Value; } private static WeakHashMap <ThreadLocal <Object >, Object > captureThreadLocalValues() { final WeakHashMap <ThreadLocal <Object >, Object > threadLocal2Value = new WeakHashMap <ThreadLocal <Object >, Object >(); for (Map .Entry <ThreadLocal <Object >, TtlCopier <Object >> entry : threadLocalHolder.entrySet()) { final ThreadLocal <Object > threadLocal = entry.getKey(); final TtlCopier <Object > copier = entry.getValue(); threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get())); } return threadLocal2Value; } @NonNull public static Object replay(@NonNull Object captured) { final Snapshot capturedSnapshot = (Snapshot ) captured; return new Snapshot (replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); } @NonNull private static WeakHashMap <TransmittableThreadLocal <Object >, Object > replayTtlValues(@NonNull WeakHashMap <TransmittableThreadLocal <Object >, Object > captured) { WeakHashMap <TransmittableThreadLocal <Object >, Object > backup = new WeakHashMap <TransmittableThreadLocal <Object >, Object >(); for (final Iterator <TransmittableThreadLocal <Object >> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal <Object > threadLocal = iterator.next(); backup.put(threadLocal, threadLocal.get()); if (! captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } setTtlValuesTo(captured); doExecuteCallback(true ); return backup; } private static void setTtlValuesTo(@NonNull WeakHashMap <TransmittableThreadLocal <Object >, Object > ttlValues) { for (Map .Entry <TransmittableThreadLocal <Object >, Object > entry : ttlValues.entrySet()) { TransmittableThreadLocal <Object > threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); } } private static WeakHashMap <ThreadLocal <Object >, Object > replayThreadLocalValues(@NonNull WeakHashMap <ThreadLocal <Object >, Object > captured) { final WeakHashMap <ThreadLocal <Object >, Object > backup = new WeakHashMap <ThreadLocal <Object >, Object >(); for (Map .Entry <ThreadLocal <Object >, Object > entry : captured.entrySet()) { final ThreadLocal <Object > threadLocal = entry.getKey(); backup.put(threadLocal, threadLocal.get()); final Object value = entry.getValue(); if (value == threadLocalClearMark) threadLocal.remove(); else threadLocal.set(value); } return backup; } public static void restore(@NonNull Object backup) { final Snapshot backupSnapshot = (Snapshot ) backup; restoreTtlValues(backupSnapshot.ttl2Value); restoreThreadLocalValues(backupSnapshot.threadLocal2Value); } private static void restoreTtlValues(@NonNull WeakHashMap <TransmittableThreadLocal <Object >, Object > backup) { doExecuteCallback(false ); for (final Iterator <TransmittableThreadLocal <Object >> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal <Object > threadLocal = iterator.next(); if (! backup.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } setTtlValuesTo(backup); } private static void restoreThreadLocalValues(@NonNull WeakHashMap <ThreadLocal <Object >, Object > backup) { for (Map .Entry <ThreadLocal <Object >, Object > entry : backup.entrySet()) { final ThreadLocal <Object > threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); } } }
capture
:捕获,捕获父线程的TTL和TL值, 快照保存。reply
:重放, 备份子线程的 TTL和TL值, 将父线程的快照覆盖给子线程restore
:复原,任务执行完后将子线程的 ThreadLocalMap 复原TtlRunnable 使用示例 在线程池场景, 采取 TtlRunable
修饰 Runnable
, 如:
1 2 3 4 Runnable ttlRunnable = TtlRunnable.get(() -> { System.out.println(TTL.get()); }) ;EXECUTOR .submit (ttlRunnable) ;
源码流程 TtlRunnable
使用了 Transmitter
的 capture、reply 和 restore 等, 主要关注 run
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public final class TtlRunnable implements Runnable , TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments { private final AtomicReference<Object> capturedRef; private final Runnable runnable; private final boolean releaseTtlValueReferenceAfterRun; private TtlRunnable (@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { this .capturedRef = new AtomicReference <Object>(capture()); this .runnable = runnable; this .releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; } @Override public void run () { Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null )) { throw new IllegalStateException ("TTL value reference is released after run!" ); } Object backup = replay(captured); try { runnable.run(); } finally { restore(backup); } } @Nullable public static TtlRunnable get (@Nullable Runnable runnable) { return get(runnable, false , false ); } @Nullable public static TtlRunnable get (@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) { if (null == runnable) return null ; if (runnable instanceof TtlEnhanced) { if (idempotent) return (TtlRunnable) runnable; else throw new IllegalStateException ("Already TtlRunnable!" ); } return new TtlRunnable (runnable, releaseTtlValueReferenceAfterRun); }
参考资料