RxJS 是一个响应式的库,它接收从事件源发出的一个个事件,经过处理管道的层层处理之后,传入最终的接收者,这个处理管道是由操作符组成的,开发者只需要选择和组合操作符就能完成各种异步逻辑,极大简化了异步编程。除此以外,RxJS 的设计还遵循了函数式、流的理念。
直接讲概念比较难理解,不如我们实现一个简易的 RxJS 再来看这些。
RxJS 的使用RxJS 会对事件源做一层封装,叫做 Observable,由它发出一个个事件。
比如这样:
const source = new Observable ( ( observer ) => { let i = 0 ; setInterval ( ( ) => { observer .next ( ++ i ) ; } , 1000 ) ; } ) ;
在回调函数里面设置一个定时器,不断通过 next 传入事件。
这些事件会被接受者监听,叫做 Observer。
const subscription = source .subscribe ( { next : ( v ) => console .log ( v ) , error : ( err ) => console .error ( err ) , complete : ( ) => console .log ( 'complete' ) , } ) ;
observer 可以接收 next 传过来的事件,传输过程中可能有 error,也可以在这里处理 error,还可以处理传输完成的事件。
这样的一个监听或者说订阅,叫做 Subscription。
可以订阅当然也可以取消订阅:
subscription .unsubscribe ( ) ;
取消订阅时的回调函数是在 Observable 里返回的:
const source = new Observable ( ( observer ) => { let i = 0 ; const timer = setInterval ( ( ) => { observer .next ( ++ i ) ; } , 1000 ) ; return function unsubscribe ( ) { clearInterval ( timer ) ; } ; } ) ;
发送事件、监听事件只是基础,处理事件的过程才是 RxJS 的精髓,它设计了管道的概念,可以用操作符 operator 来组装这个管道:
source .pipe ( map ( ( i ) => ++ i ) , map ( ( i ) => i * 10 ) ) .subscribe ( ( ) => { // ... } )
事件经过管道之后才会传到 Observer,在传输过程中会经过一个个操作符的处理。
比如这里的处理逻辑是,对传过来的数据加 1,然后再乘以 10。
综上,使用 RxJS 的代码就是这样的:
const source = new Observable ( ( observer ) => { let i = 0 ; const timer = setInterval ( ( ) => { observer .next ( ++ i ) ; } , 1000 ) ; return function unsubscribe ( ) { clearInterval ( timer ) ; } ; } ) ; const subscription = source .pipe ( map ( ( i ) => ++ i ) , map ( ( i ) => i * 10 ) ) .subscribe ( { next : ( v ) => console .log ( v ) , error : ( err ) => console .error ( err ) , complete : ( ) => console .log ( 'complete' ) , } ) ; setTimeout ( ( ) => { subscription .unsubscribe ( ) ; } , 4500 ) ;
我们通过 Observable 创建了一个事件源,每秒发出一个事件,这些事件会经过管道的处理再传递给 Observer,管道的组成是两个 map 操作符,对数据做了 + 1 和 * 10 的处理。
Observer 接收到传递过来的数据,做了打印,还对错误和结束时的事件做了处理。此外,Observable 提供了取消订阅时的处理逻辑,当我们在 4.5s 取消订阅时,就可以清除定时器。
使用 RxJS 基本就是这个流程,那它是怎么实现的呢?
80 行代码实现RxJS先从事件源开始,实现 Observable:
观察下它的特点:
它接收一个回调函数,里面可以调用 next 来传输数据。 它有 subscribe 方法可以用来添加 Observer 的订阅,返回 subscription 它可以在回调函数里返回 unsbscribe 时的处理逻辑 它有 pipe 方法可以传入操作符
我们按照这些特点来实现下:
首先,Observable 的构造函数要接收回调函数 _subscribe,但是不是立刻调用,而是在 subscribe 的时候才调用:
class Observable { constructor ( _subscribe ) { this ._subscribe = _subscribe ; } subscribe ( ) { this ._subscribe ( ) ; } }
回调函数的参数是有 next、error、complete 方法的对象,用于传递事件:
class Observable { constructor ( _subscribe ) { this ._subscribe = _subscribe ; } subscribe ( observer ) { const subscriber = new Subscriber ( observer ) ; this ._subscribe ( subscriber ) ; } } class Subscriber { constructor ( observer ) { super ( ) ; this .observer = observer ; this .isStopped = false ; } next ( value ) { if ( this .observer .next && ! this .isStopped ) { this .observer .next ( value ) ; } } error ( value ) { this .isStopped = true ; if ( this .observer .error ) { this .observer .error ( value ) ; } } complete ( ) { this .isStopped = true ; if ( this .observer 测试数据plete ) { this .observer 测试数据plete ( ) ; } if ( this .unsubscribe ) { this .unsubscribe ( ) ; } } }
这样,在回调函数里面就可以调用 next、error、complete 方法了:
此外,回调函数的返回值是 unsbscribe 时的处理逻辑,要收集起来,在取消订阅时调用:
class Subscription { constructor ( ) { this ._teardowns = [ ] ; } unsubscribe ( ) { this ._teardowns .forEach ( ( teardown ) => { typeof teardown === 'function' ? teardown ( ) : teardown .unsubscribe ( ) } ) ; } add ( teardown ) { if ( teardown ) { this ._teardowns .push ( teardown ) ; } } }
提供 unsubscribe 方法用于取消订阅,_teardowns 用于收集所有的取消订阅时的回调,在 unsubscribe 时调用所有 teardown 回调。
这段逻辑比较通用,可以作为 Subscriber 的父类。
然后,在 Observable 里调用 add 来添加 teardown,并且返回 subscription(它有 unsubscribe 方法):
class Observable { constructor ( _subscribe ) { this ._subscribe = _subscribe ; } subscribe ( observer ) { const subscriber = new Subscriber ( observer ) ; subscriber .add ( this ._subscribe ( subscriber ) ) ; return subscriber ; } } class Subscriber extends Subscription { constructor ( observer ) { super ( ) ; this .observer = observer ; this .isStopped = false ; } next ( value ) { if ( this .observer .next && ! this .isStopped ) { this .observer .next ( value ) ; } } error ( value ) { this .isStopped = true ; if ( this .observer .error ) { this .observer .error ( value ) ; } } complete ( ) { this .isStopped = true ; if ( this .observer 测试数据plete ) { this .observer 测试数据plete ( ) ; } if ( this .unsubscribe ) { this .unsubscribe ( ) ; } } } class Subscription { constructor ( ) { this ._teardowns = [ ] ; } unsubscribe ( ) { this ._teardowns .forEach ( ( teardown ) => { typeof teardown === 'function' ? teardown ( ) : teardown .unsubscribe ( ) } ) ; } add ( teardown ) { if ( teardown ) { this ._teardowns .push ( teardown ) ; } } }
这样,我们就实现了 Observable 和 Observer,只写了 50 行代码。先来测试下:
const source = new Observable ( ( observer ) => { let i = 0 ; const timer = setInterval ( ( ) => { observer .next ( ++ i ) ; } , 1000 ) ; return function unsubscribe ( ) { clearInterval ( timer ) ; } ; } ) ; const subscription = source .subscribe ( { next : ( v ) => console .log ( v ) , error : ( err ) => console .error ( err ) , complete : ( ) => console .log ( 'complete' ) , } ) ; setTimeout ( ( ) => { subscription .unsubscribe ( ) ; } , 4500 ) ;
Observer 监听到了 Observable 传递过来的 1、2、3、4 的数据,因为在 4.5s 时取消了订阅,所以后面就不再有数据了。
我们用 50 行实现了基础的 RxJS!
当然,最精髓的 operator 还没有实现,接下来继续完善。
我们给 Observable 添加 pipe 方法,它会调用传入的 operator,并且上个的结果是下个的输入,这样就串起来了,也就是管道的概念:
class Observable { constructor ( _subscribe ) { // ... } subscribe ( observer ) { // ... } pipe ( ...operations ) { return pipeFromArray ( operations ) ( this ) ; } } function pipeFromArray ( fns ) { if ( fns .length === 0 ) { return ( x ) => x ; } if ( fns .length === 1 ) { return fns [ 0 ] ; } return ( input ) => { return fns .reduce ( ( prev , fn ) => fn ( prev ) , input ) ; } ; }
当传入的参数是 0 个的时候,就直接返回之前的 Observable,1 个的时候直接返回,否则就通过 reduce 的方式串联起来,组成管道。
operator 的实现就是监听上一个 Observable,返回一个新的。
比如 map 的实现,就是传入 project 对 value 做处理,把结果用 next 传下去:
function map ( project ) { return ( observable ) => new Observable ( ( subscriber ) => { const subcription = observable .subscribe ( { next ( value ) { return subscriber .next ( project ( value ) ) ; } , error ( err ) { subscriber .error ( err ) ; } , complete ( ) { subscriber 测试数据plete ( ) ; } , } ) ; return subcription ; } ) ; }
这样我们就实现了 operator,来测试下:
我们调用了 pipe 方法,使用两个 map 操作符来组织处理流程,对数据做了 +1 和 *10 的处理。
所以,Observable 传递过来的 1、2、3、4 传递给 Observer 的时候就变成了 20、30、40、50。
至此,我们实现了 RxJS 的 Observable、Observer、Subscription、operator 等概念,是一个简易版 RxJS 了。只用了 80 行代码。
再来看最开始的那些理念:
为什么叫做响应式呢?
因为是对事件源做监听和一系列处理的,这种编程模式就叫做响应式。
为什么叫函数式呢?
因为每一步 operator 都是纯函数,返回一个新的 Observable,这符合函数式的不可变,修改后返回一个新的的理念。
为什么叫流呢?
因为一个个事件是动态产生和传递的,这种数据的动态产生和传递就可以叫做流。
完整代码如下:
function pipeFromArray ( fns ) { if ( fns .length === 0 ) { return ( x ) => x ; } if ( fns .length === 1 ) { return fns [ 0 ] ; } return ( input ) => { return fns .reduce ( ( prev , fn ) => fn ( prev ) , input ) ; } ; } class Subscription { constructor ( ) { this ._teardowns = [ ] ; } unsubscribe ( ) { this ._teardowns .forEach ( ( teardown ) => { typeof teardown === 'function' ? teardown ( ) : teardown .unsubscribe ( ) } ) ; } add ( teardown ) { if ( teardown ) { this ._teardowns .push ( teardown ) ; } } } class Subscriber extends Subscription { constructor ( observer ) { super ( ) ; this .observer = observer ; this .isStopped = false ; } next ( value ) { if ( this .observer .next && ! this .isStopped ) { this .observer .next ( value ) ; } } error ( value ) { this .isStopped = true ; if ( this .observer .error ) { this .observer .error ( value ) ; } } complete ( ) { this .isStopped = true ; if ( this .observer 测试数据plete ) { this .observer 测试数据plete ( ) ; } if ( this .unsubscribe ) { this .unsubscribe ( ) ; } } } class Observable { constructor ( _subscribe ) { this ._subscribe = _subscribe ; } subscribe ( observer ) { const subscriber = new Subscriber ( observer ) ; subscriber .add ( this ._subscribe ( subscriber ) ) ; return subscriber ; } pipe ( ...operations ) { return pipeFromArray ( operations ) ( this ) ; } } function map ( project ) { return ( observable ) => new Observable ( ( subscriber ) => { const subcription = observable .subscribe ( { next ( value ) { return subscriber .next ( project ( value ) ) ; } , error ( err ) { subscriber .error ( err ) ; } , complete ( ) { subscriber 测试数据plete ( ) ; } , } ) ; return subcription ; } ) ; } const source = new Observable ( ( observer ) => { let i = 0 ; const timer = setInterval ( ( ) => { observer .next ( ++ i ) ; } , 1000 ) ; return function unsubscribe ( ) { clearInterval ( timer ) ; } ; } ) ; const subscription = source .pipe ( map ( ( i ) => ++ i ) , map ( ( i ) => i * 10 ) ) .subscribe ( { next : ( v ) => console .log ( v ) , error : ( err ) => console .error ( err ) , complete : ( ) => console .log ( 'complete' ) , } ) ; setTimeout ( ( ) => { subscription .unsubscribe ( ) ; } , 4500 ) ;
总结
为了理解 RxJS 的响应式、函数式、流等理念,我们实现了简易版的 RxJS。
我们实现了 Observable、Observer、Subscription 等概念,完成了事件的产生和订阅以及取消订阅。
接着又实现了 operator 和 pipe,每个 operator 返回一个新的 Observable,对数据做层层处理。
写完以后,我们能更清晰的理解响应式、函数式、流等理念在 RxJS 里是怎么体现的。
实现简易版 RxJS,只需要 80 行代码。
原文地址:https://mp.weixin.qq测试数据/s/G-Af4gDuBuXuHMTV-de-0w
dy("nrwz");
查看更多关于一文带你用80行代码实现简易 RxJS的详细内容...