好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

详解Reactor中Context的用法

在响应式编程中,多线程异步性成为天然的内在,多线程之间的切换也成为 原生 的,在处理一个数据流Flux/Mono时,基本无法知道是运行在哪个线程上或哪个线程池里,可以说,每一个操作符operator以及内部的函数都可能运行在不同的线程上。这就意味着,以前用ThreadLocal来作为方法间透明传递共享变量的方式不再行得通。为此,Reactor提供了Context来替代ThreadLocal实现 一个跨线程的共享变量的透明方式 。

本文会从以下几个方面来介绍Context的相关知识:

context的基本用法 从源码上解读context的用法 用log的MDC案例介绍如何用context实现与threadlocal的桥接 总结下context以及目前的一些局限性

一、使用介绍

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

static String KEY = "TEST_CONTEXT_KEY" ;

static String KEY2 = "TEST_CONTEXT_KEY2" ;

 

public static void main(String[] args) {

     Flux<String> flux = convert( "hello" , Flux.just( 1 , 2 , 3 ));

     flux

         .subscriberContext(Context.of(KEY, "Outside" ))

         .subscribe(v -> System.out.println(v));

}

 

public static Flux<String> convert(String prefix, Flux<Integer> publisher) {

     return publisher.map(v -> prefix + " " + v)

         .subscriberContext(Context.of(KEY, "NotUsed" ))

         .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v))

         .subscriberContext(context -> context.put(KEY2, "Inside" ))

         .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + v));

}

上面是context的使用方案介绍,其输出如下:

Outside Outside Inside hello 1
Outside Outside Inside hello 2
Outside Outside Inside hello 3

上面的使用案例展示了一个使用context的常见例子。通过在外部方法里传入context,如 flux.subscriberContext(Context.of(KEY, "Outside")) ,使得内部方法convert能够获取外界环境的context,同时内部方法还可以增加自己的context数据,如 subscriberContext(context -> context.put(KEY2, "Inside")) ,结合之后,在让内部的方法(flatMap里的方法)感知到整个上下文context的数据内容。

对于context的使用,主要分为几个部分: 1. context的创建 2. context的写入(传入)与读取 3. 执行顺序

1. context —— 不可变对象

由于reactor天然是跨线程的,所以context设计为了不可变的对象 ,即每次的更新都是创建一个新的对象。每次的put/putAll操作,都是先把旧对象的值复制到新对象,然后再进行put/putAll等更新操作。

2. context的写入与读取

context写入 是使用subscriberContext方法,其入参有两种形式:传值方式subscriberContext(ctx)与lambda函数方式 —— subscriberContext(ctx -> ctx.put(key,value))。

context的读取 是利用Mono的静态方法subscriberContext()来获取,由于其返回的是一个Mono, 所以通常与flatMap结合使用。

3. 执行顺序

context的传入是发生在subscribe()订阅阶段的, 所以其写入的顺序是从下往上的 ,即在示例中,先执行 subscriberContext(Context.of(KEY, "Outside")) ,再执行 subscriberContext(context -> context.put(KEY2, "Inside")) , 最后执行 subscriberContext(Context.of(KEY, "NotUsed")) 。 在订阅阶段执行完后,进入运行阶段,数据流从上往下执行,每次读取context的时候 Mono.subscriberContext() 都是读取下一个的context。所以"NotUsed"的context并没有生效。

此外,context.put()操作是复制旧的再update新的对象,所以 Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v) 这个阶段仍能读取前一个context关于KEY的内容。

总结

context是不可变对象,每次更新都是新的context context是存在于subscriber的内部的,一个context是绑定在当前subscriber上的,如 FluxContextStart 的对象 context的写入顺序是从下而上的,读取的时候是从上而下的,只能读取之后的subscriber里的context。 每个subscriber中的context都是独有的,运行阶段的时候,无法改变其他subscriber的context。

注意

subscriberContext(Context.of("Outside") 与 subscriberContext(context -> Context.of("Outside")) 是有区别,前者是会结合复用前面的context,而后者是直接返回一个新的context并不会复用前面的context。 其原因是, subscriberContext(Context.of("Outside")) 其实内部调用的是 subscriberContext(context -> context.putAll(Context.of("Outside")) ,其入参的context就是前面的context,putAll方法会复用前面的context。而 subscriberContext(context -> Context.of("Outside"))不复用的原因就是因为放弃了入参的context。所以,可以利用这种方式来放弃之前的context,当然不鼓励这么做,因为你不清楚之前context会不会影响后续的程序。

本文章的代码用的事reactor 3.3的版本,自3.5之后,subscriberContext方法改为 contextWrite ,读取的方法改为 deferContextual 。

二、源码解读

现在我们从源代码上看看,context写入为什么是自下而上的,读取的时候又是依附于下一个subscriber并且自上而下的。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

public final Flux<T> subscriberContext(Function<Context, Context> doOnContext) {

     return new FluxContextStart<>( this , doOnContext);

}

FluxContextStart(Flux<? extends T> source, Function<Context, Context> doOnContext) {

     super (source);

     this .doOnContext = Objects.requireNonNull(doOnContext, "doOnContext" );

}

 

@Override

public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {

     Context c = doOnContext.apply(actual.currentContext());

     return new ContextStartSubscriber<>(actual, c);

}

ContextStartSubscriber(CoreSubscriber<? super T> actual, Context context) {

     this .actual = actual;

     this .context = context;

     if (actual instanceof ConditionalSubscriber) {

         this .actualConditional = (ConditionalSubscriber<? super T>) actual;

     }

     else {

         this .actualConditional = null ;

     }

}

@Override

public Context currentContext() {

     return this .context;

}

上面截取了subscriberContext方法的源代码,可以看到subscriberContext方法最终会创建ContextStartSubscriber的对象,并将生成的context赋值 Context c = doOnContext.apply(actual.currentContext()) ,所以context是伴随subscriberContext方法对应的subscriber里的。

由于context赋值操作 Context c = doOnContext.apply(actual.currentContext()) 是发生在subscribeOrReturn方法里, 即发生在subscribe()订阅阶段,所以整个执行的顺序是自下而上的(沿着整个flow自下而上至源头的publisher) 。

那读取context的时候为什么是自上而下的呢?我们来看下读取操作Mono.subscribeContext()的源码。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

public static Mono<Context> subscriberContext() {

     return onAssembly(MonoCurrentContext.INSTANCE);

}

final class MonoCurrentContext extends Mono<Context>

         implements Fuseable, Scannable {

     static final MonoCurrentContext INSTANCE = new MonoCurrentContext();

     public void subscribe(CoreSubscriber<? super Context> actual) {

         Context ctx = actual.currentContext();

         actual.onSubscribe(Operators.scalarSubscription(actual, ctx));

     }

}

interface InnerOperator<I, O>

         extends InnerConsumer<I>, InnerProducer<O> {

     @Override

     default Context currentContext() {

         return actual().currentContext();

     }

}

Mono.subscribeContext() 方法返回的是一个MonoCurrentContext的静态对象,在订阅subscribe时期,就会去读取当前的context,即 Context ctx = actual.currentContext() 。而对于一个InnerOperator的接口而言,其currentContext()方法会不断寻找下一个subscriber的context,即 actual().currentContext() ,直到有哪个subscriber覆写了currentContext方法,如先前的ContextStartSubscriber对象。对于InnerOperator接口,是大多数subscriber都会实现的接口,例如map、filter、flatmap这些,都会实现这个接口。

在找到context之后,通过 Operators.scalarSubscription(actual, ctx) 写入,这个方法其实也是Mono.just()的实现,所以相当于把context当做value,生成了一个Mono.just(ctx)来完成了context读取。

所以,context读取的是从当前操作operator之后的那个最接近的subscriber的context。这也解释了前面使用案例中, subscriberContext(Context.of(KEY, "NotUsed")) ,没有作用的缘故。

三、如何桥接现有的ThreadLocal系统

虽然reactor提供了context来替代ThreadLocal的使用,但目前大多数的代码库仍然是命令式编程的,使用的方式仍然是基于ThreadLocal的,如Logger里的MDC。本小节以Logger中的MDC来介绍,如何利用context实现与旧系统中的基于ThreadLocal方式的打通。

我们假设有这样的一个场景,每一次的Http请求都有一个trace id,我们称为request id,并通过Http Header "X-Request-Id"来命名,打印日志的时候,希望每条日志里都包含请求id,这样方便跟踪整个请求链路的情况。

为此,我们把日志配置里的pattern设置为: [%X{X-Request-Id}] [%thread] %-5level - %msg %n 。

可以在 SpringBoot 的 application.yml 里设置,如:

?

1

logging.pattern.level: "[%X{X-Request-Id}] [%thread] %-5level - %msg %n"

因此,要使得每条日志里有request id,那就必须要MDC里有key为 X-Request-Id 的内容。下面来看下,reactor中是如何实现的。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

@SpringBootApplication

@Slf4j

@RestController

public class MdcApplication {

 

   public static void main(String[] args) {

     SpringApplication.run(MdcApplication. class , args);

   }

 

   private final static String X_REQUEST_ID_KEY = "X-Request-Id" ;

 

   @GetMapping ( "/" )

   Flux<String> split( @RequestParam ( "value" ) String value, @RequestHeader (X_REQUEST_ID_KEY) String requestId) {

     return Flux.fromArray(value.split( "_" ))

         .doOnEach(logWithContext(ch -> log.info( "handling one item: {}" , ch)))

         .subscriberContext(Context.of(X_REQUEST_ID_KEY, requestId));

   }

 

   private static <T> Consumer<Signal<T>> logWithContext(Consumer<T> logStatement) {

     return signal -> {

       if (!signal.isOnNext()) {

         return ;

       }

       String requestId = signal.getContext().get(X_REQUEST_ID_KEY);

       try (MDC.MDCCloseable closeable = MDC.putCloseable(X_REQUEST_ID_KEY, requestId)) {

         logStatement.accept(signal.get());

       }

     };

   }

}

这是一个简单的示例程序,对于请求输入的value值通过"-"分割后,再一个个返回给客户端。首先利用subscriberContext方法,将http header里的 X-Request-Id 作为context来传入。然后利用doOnEach的方式获取signal。doOnEach的方法可以工作在onNext、onComplete、onError等所有事件,每一个信号signal里都包含有context,当为onNext则还包含value值,当为onError时,则还包含有exception。因此可以通过signal来获取context。

在从context获取X-Request-Id后,可以利用try-with-resource方式来更新MDC,其效果是在执行完try里面的程序后,将更新的value回退。等价于:

?

1

2

3

4

5

6

try {

     MDC.put(X_REQUEST_ID_KEY, requestId);

     logStatement.accept(signal.get());

} finally {

     MDC.remove(X_REQUEST_ID_KEY);

}

置于为什么需要操作完之后回退掉MDC中的更新, 那是因为reactor中所有的操作都是异步执行在不同线程中的,如果不回退的话,很有可能造成污染 ,其原因还是MDC内部是用ThreadLocal实现的,所以跨线程的时候,如果不把ThreadLocal值清理干净,很容易造成互相污染。

用curl命令发送请求: curl --header "X-Request-Id:12345" localhost:8080?value=a_b_c ,返回的结果是 abc ,打印的日志如下:

[12345] [reactor-http-nio-2] INFO  - handling one item: a 
[12345] [reactor-http-nio-2] INFO  - handling one item: b 
[12345] [reactor-http-nio-2] INFO  - handling one item: c

其中 12345 就是从context里获取到的request id。

如果想要将request id继续贯穿后续请求流程,如请求第三方服务,可以在用webClient发送请求的时候,把request id作为header加入到它的request请求里,如:

?

1

2

3

4

5

Mono.subscriberContext().map(ctx -> {  

     RequestHeadersSpec<?> request = webClient.get().uri(uri);      

     request = request.header( "X-Request-ID" , ctx.get(X_REQUEST_ID_KEY));

     // The rest of your request logic...   

});

四、总结

本文介绍了reactor中context的概念,并用代码示例的方式介绍了如何使用。再然后,通过源码的解读来加深对context使用规则的理解: 自下而上的context写入,以及与subscriber绑定后的自上而下的读取。 在这之后,用以传递并打印日志中包含request id的一个实际例子,来介绍如何使用context与log的MDC一起使用。

虽然reactor自3.1开始提供了context来弥补无法使用ThreadLocal的不足,但与ThreaLocal相比,context仍然有不少局限。比如使用上的不方便,要么利用Mono.subscribeContext().map并搭配flatmap来使用,要么需要将数据流转化成信号signal流来使用,总之远不如ThreadLocal来的简单易用。另外,context的不可变特性,虽然有助于thread safe,但使得不同方法之间无法传递更新,比如方法A内修改后再传递给方法B,因为context是只读的,但这在ThreadLocal上却是轻而易举就能实现。

好消息的是,reactor在3.5开始,提供了新的方法deferContextual来简化context的使用。以及提出了context view的概念来简化context传递问题,感兴趣的可以阅读reactor文档。

到此这篇关于详解Reactor中Context的用法的文章就介绍到这了,更多相关Reactor Context内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://juejin.cn/post/7201084288523452476

查看更多关于详解Reactor中Context的用法的详细内容...

  阅读:23次