欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Rx Java 异步编程框架

最编程 2024-07-16 15:25:22
...

Rx Java 异步编程框架

  • 名词定义
  • 举个例子
  • 基本概念
    • Backpressure
    • Upstream, Downstream
    • Objects in motion
    • Assembly time
    • Subscription time
    • Runtime
  • 特性
    • Simple background computation
    • Schedulers
    • Concurrency within a flow
    • Parallel processing
    • Dependent sub-flows
    • Continuations
    • Type conversions
    • Operator naming conventions
    • R8 and ProGuard settings
  • 依赖
  • 核心类
    • Flowable
    • Observable
    • Single
    • Completable
    • Maybe
  • 调度器
  • 创建操作
    • create
    • defer
    • range
  • 变换操作
    • flatMap
    • take
    • filter
    • doOnNext
  • 错误处理
  • 总结
  • REFERENCES

在很多软件编程任务中,或多或少你都会期望你写的代码能按照编写的顺序,一次一个的顺序执行和完成。但是在ReactiveX中,很多指令可能是并行执行的,之后他们的执行结果才会被观察者捕获,顺序是不确定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。在这种机制下,存在一个可观察对象(Observable),观察者(Observer)订阅(Subscribe)它,当数据就绪时,之前定义的机制就会分发数据给一直处于等待状态的观察者哨兵。

这种方法的优点是,如果你有大量的任务要处理,它们互相之间没有依赖关系。你可以同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。

有很多术语可用于描述这种异步编程和设计模式,在在本文里我们使用这些术语:一个观察者订阅一个可观察对象 (An observer subscribes to an Observable)。通过调用观察者的方法,Observable发射数据或通知给它的观察者。

在其它的文档和场景里,有时我们也将Observer叫做SubscriberWatcherReactor。这个模型通常被称作Reactor模式

名词定义

这里给出一些名词的翻译

  • Reactive 直译为反应性的,有活性的,根据上下文一般翻译为反应式、响应式;
  • Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念;
  • Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者;
  • Observer 观察者对象,监听 Observable 发射的数据并做出响应,Subscriber 是它的一个特殊实现;
  • emit 直译为发射,发布,发出,含义是 Observable 在数据产生或变化时发送通知给 Observer,调用 Observer 对应的方法,文章里一律译为发射;
  • items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项;

举个例子

响应式编程

    /**
     * Rx 测试
     *
     * @param args
     */
    public static void main(String[] args) {
        // Demo1
        Flowable.just("Hello world!").subscribe(System.out::println);
        // Demo2 数据消费
        Observable.just("Hello World!").subscribe(System.out::println);
        // Demo3 数据中间处理
        Observable.just("Hello World!")
                .map(p -> p + " Master!")
                .subscribe(System.out::println);
        // Demo4 输出流数据转换
        Observable.just("Hello World!")
                .map(p -> p.hashCode())
                .subscribe(System.out::println);
        Observable.just("Hello World!")
                .map(p -> Base64Utils.encodeToString(p.getBytes()))
                .map(p -> new String(Base64Utils.decodeFromString(p)))
                .subscribe(System.out::println);
    }

输出结果:

Hello world!
Hello World!
Hello World! Master!
-969099747
Hello World!

基本概念

Backpressure

在管道运输中,气流或液流由于管道突然变细、急弯等原因导致由某处出现了下游向上游的逆向压力,这种情况称作「back pressure」。这是一个很直观的词:向后的、往回的压力。 在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。 在 RxJava 中反压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。

  • 反压现象的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
  • 反压(Backpressure)并不是一个像 flatMap 一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。
  • 生产速度大于消费速度,所以需要 Buffer;
  • 外部条件有限制,所以 Buffer 需要有上限;
  • Buffer 达到上限这个现象,有一个简化的等价词叫做 Backpressure;
  • Backpressure 的出现其实是一种危险边界,唯一的选择是丢弃新事件。

Backpressure 指的是在 Buffer 有上限的系统中,Buffer 溢出的现象;它的应对措施只有一个:丢弃新事件。

当数据流通过异步步骤运行时,每个步骤可以以不同的速度执行不同的操作。

为了避免压倒性的这些步骤,这些步骤通常表现为由于临时缓冲需要跳过/删除数据而增加的内存使用,所谓的反压被应用,这是一种流控制形式,其中的步骤可以表示它们准备处理多少项。

这允许限制数据流的内存使用,因为通常没有办法让步骤知道上游将向它发送多少条目。

在 RxJava 中,专用的 Flowable 类被指定用于支持反压,Observable 专用于非反压操作(短序列、 GUI 交互等)。其他类型,Single, MaybeCompletable不支持反压,也不应该,总是有空间暂时存储一个元素。

Upstream, Downstream

上游、下游: RxJava 中的数据流包括一个源、零个或多个中间步骤,然后是数据消费者或组合子步骤(其中该步骤负责通过某种方式使用数据流) :

source.operator1()
    .operator2()
    .operator3()
    .subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());

在这里,如果我们把自己想象成操作者2,向左看向源头,我们称之为上游。向右看向订阅者/使用者称为下游。

Objects in motion

运动中的物体: 在 RxJava 的文档中,emission、 emission、 item、 event、 signal、 data 和 message 被认为是同义词,表示沿着数据流传输的对象。

Assembly time

装配时间:

通过应用各种中间操作符来准备数据流的过程发生在所谓的组装时间:

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);

在这一点上,数据还没有流动,也没有发生副作用。

Subscription time

订阅时间:

这是对在内部建立处理步骤链的流调用 subscribe () 时的临时状态:

flow.subscribe(System.out::println)

这时会触发订阅副作用(请参见 doOnSubscribe)。在这种状态下,某些源会立即阻塞或开始发送项。

Runtime

运行时:

这是当流处于主动发出元素、错误或完成信号时的状态:

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

实际上,这是在执行上面给定示例的主体时触发的。

特性

Simple background computation

简单的背景计算:

RxJava 的一个常见用例是在后台线程上运行一些计算、网络请求,并在 UI 线程上显示结果(或错误) :

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

这种类型的链接方法称为 fluent API,类似于构建器模式。但是,RxJava 的反应类型是不可变的;每个方法调用都返回一个带有添加行为的新 Flowable。为了说明,这个例子可以重写如下:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

通常,您可以通过 subscribeOn 将计算或阻塞 IO 移动到其他线程。一旦数据准备就绪,您可以确保通过 observeOn 在前台或 GUI 线程上对它们进行处理。

Schedulers

调度器: RxJava 操作符不直接与线程或 ExecutorServices 一起工作,而是与所谓的Scheduler 一起工作,这些有用的类来自统一的 API. RxJava 3 并发抽象,其提供了几个标准的调度器。

  • Schedulers.computation():在后台固定数量的专用线程上运行计算密集型工作。大多数异步操作符都将此作为默认值Scheduler
  • Schedulers.io():在一组动态更改的线程上运行类 I/O 或阻塞操作。
  • Schedulers.single():以顺序和 FIFO 方式在单个线程上运行工作。
  • Schedulers.trampoline():在一个参与的线程中,以顺序和 FIFO 的方式运行工作,通常是为了测试目的。

此外,还有一个选项可以通过 Scheduler 将现有的 Executor (及其子类型,如 ExecutorService)封装到 Scheduler 中Schedulers.from(Executor)。例如,可以使用它来拥有一个更大但仍然固定的线程池(分别与 calculation() 和 io() 不同)。

Observable.just("java", "C++", "python")
        //.observeOn(Schedulers.io())
    .observeOn(Schedulers.from(Executors.newFixedThreadPool(5)))
    .subscribe(System.out::println);

Thread.sleep (2000) ;最后并非偶然。在 RxJava 中,默认的调度程序运行在守护线程上,这意味着一旦 Java 主线程退出,它们就全部停止,后台计算可能永远不会发生。在这个示例情况中,休眠一段时间可以让您在控制台上看到流的输出,并节省时间。

Concurrency within a flow

流中的并发性:

在 RxJava 中,流本质上是连续的,可以被分割成可以并发运行的处理阶段:

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

此示例在计算用的调度器 Scheduler 上将数字从1平方到10,并在“主”线程(更准确地说,blockingSubscribe 的调用方线程)上消费结果。但是,lambda v-> v * v 对于这个流并不是并行运行;它在同一个计算线程上一个接一个地接收值1到10。

Parallel processing

并行处理:

并行处理数字1到10的过程要稍微复杂一些:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

实际上,RxJava 中的并行性意味着运行独立的流并将它们的结果合并回单个流。运算符 flatMap 首先将每个数字从1到10映射到它自己的 Flowable,然后运行它们并合并计算出的平方。

但是请注意,flatMap 并不保证任何顺序,内部流中的项可能最终交叉存取。还有一些替代操作符:

  • concatMap:它一次映射并运行一个内部流程。
  • concatMapEager:它“同时”运行所有内部流,但输出流将按照内部流创建的顺序进行。

另外,Flowable.parallel () 操作符和 ParallelFlowable 类型有助于实现相同的并行处理模式:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

依赖子流:

flatMap 是一个强大的操作符,在很多情况下都有帮助。例如,给定一个返回 Flowable 的服务,我们希望调用另一个服务,其值由第一个服务发出:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

延续: 有时候,当一个项变得可用时,需要在它上面执行一些依赖的计算。这有时被称为延续,并且根据应该发生什么以及涉及到什么类型,可能需要使用各种操作符来完成。

Dependent

依赖:

最典型的场景是给定一个值,调用另一个服务,等待并继续其结果:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

通常情况下,后面的序列也需要早期映射的值。这可以通过将外部 flatMap 移动到先前 flatMap 的内部来实现,例如:

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

在这里,原始值将在内部 flatMap 中可用,由 lambda 变量捕获提供。

Non-dependent

非依赖性的:

在其他场景中,第一个源/数据流的结果是不相关的,人们希望继续使用准独立的另一个源。在这里,flatMap 也可以工作:

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

然而,继续在这种情况下保持 Observable 而不是可能更合适的 Single。(这是可以理解的,因为从 flatMapSingle 的角度来看,sourceObservable 是一个多值源,因此映射也可能导致多个值)。

不过通常有一种方法可以使用 Completable 作为中介,通过它的操作符 andThen 来继续做一些事情:

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

sourceObservablesomeSingleSource 之间的唯一依赖性是前者应该正常完成,以便后者被使用。

Deferred-dependent

依赖性递延:

有时,在前一个序列和新序列之间存在一个隐式的数据依赖关系,由于某种原因,这个依赖关系没有通过“常规通道”。人们倾向于写下这样的延续:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

不幸的是,当数据流甚至还没有运行时,因为 Single.just(count.get()) 将在assembly time时计算,所以这个命令将打印 0。我们需要一些东西,推迟这个Single源的计算,直到主源完成运行时:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

// 或者

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversions

类型转换: 有时候,源或服务返回的类型与应该与其一起工作的流不同。例如,在上面的库存示例中,getDemandAsync 可以返回 Single<DemandRecord> 。如果代码示例保持不变,将导致编译时错误(然而,通常会出现关于缺少重载的误导性错误消息)。 在这种情况下,通常有两个选项来修复转换: 1) 转换为所需的类型; 2) 查找并使用支持不同类型的特定运算符的重载。

Converting to the desired type

转换为所需的类型 每个反应性基类都包含能够执行此类转换(包括协议转换)以匹配其他类型的操作符。下表显示了现有的转换选项:

Flowable

Observable

Single

Maybe

Completable

Flowable

toObservable

first, firstOrError, single, singleOrError, last, lastOrError1

firstElement, singleElement, lastElement

ignoreElements

Observable

toFlowable2

first, firstOrError, single, singleOrError, last, lastOrError1

firstElement, singleElement, lastElement

ignoreElements

Single

toFlowable3

toObservable

toMaybe

ignoreElement

Maybe

toFlowable3

toObservable

toSingle

ignoreElement

Completable

toFlowable

toObservable

toSingle

toMaybe

  • 将多值源转换为单值源时,应该决定将众多源值中的哪一个作为结果。
  • Observable 转化为 Flowable 需要额外的决定:如何处理 Observable 源的潜在无约束流?有几个策略可用(如缓冲,下降,保持最新)通过 BackpressureStrategy 参数或通过标准 Flowable 操作符,如 onBackpressureBufferonBackpressureDroponBackpressureLatest,这也允许进一步定制的反压行为。
  • 当只有(最多)一个来源项目,这是一个没有问题的反压,因为它可以始终存储,直到下游准备消费。
Using an overload with the desired type

使用所需类型的重载: 许多经常使用的操作符具有可以处理其他类型的重载。它们通常以目标类型的后缀命名:

Operator

Overloads

flatMap

flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable

concatMap

concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable

switchMap

switchMapSingle, switchMapMaybe, switchMapCompletable

这些运算符使用后缀而不是简单地使用相同的名称和不同的签名的原因是类型消除。Java 并不认为操作符 operator(Function<T, Single<R>>) 和操作符 operator(Function<T, Maybe<R>>) 是不同的(不像 c #) ,而且由于泛型信息的擦除,这两个操作符最终会成为具有相同签名的重复方法。

Operator naming conventions

运算符命名约定: 在编程中,命名是最困难的事情之一,因为名字被认为不应该太长,表达力强,捕捉力强,容易记住。不幸的是,目标语言(以及预先存在的约定)在这方面可能不会提供太多帮助(不可用的关键字、类型擦除、类型歧义等)。

Unusable keywords

无法使用的关键字 在原始的 Rx.NET 中,发出一个条目然后完成的操作符叫做 Return (t)。由于 Java 约定是以小写字母开头的方法名称,所以这将是 return (t) ,它是 Java 中的一个关键字,因此不可用。因此,RxJava 选择将这个操作符命名为 just(T)。运算符 Switch 也存在同样的限制,它必须命名为 switchOnNext。另一个例子是 Catch,它命名为 onErrorResumeNext

Type erasure

类型擦除:

许多期望用户提供返回反应类型的函数的操作符不能重载,因为围绕 Function<T, X> 的类型擦除将这些方法签名转换为重复类型。选择通过添加类型作为后缀来命名这些操作符:

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
Type ambiguities

类型歧义:

尽管某些运算符在类型擦除方面没有问题,但它们的签名可能会变得模棱两可,特别是如果使用 java8 和 lambdas。例如,concatWith 使用各种其他反应性基类型作为参数(为了在底层实现中提供方便性和性能优势) ,存在几个重载:

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

Publisher 和 SingleSource 都显示为函数接口(带有一个抽象方法的类型) ,并可能鼓励用户尝试提供一个 lambda 表达式:

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

不幸的是,这种方法不起作用,而且示例根本不会打印2。事实上,自2.1.10版本以来,它甚至不能编译,因为至少存在4个 concatWith 重载,并且编译器会发现上面的代码不明确。

在这种情况下,用户可能希望推迟一些计算,直到 someSource 完成,因此正确的无歧义运算符应该推迟:

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

有时候,添加后缀是为了避免逻辑上的歧义,这样可能会编译,但在流程中产生错误的类型:

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

当函数接口类型作为类型参数 T 参与时,这也可能会变得模糊不清。

Error handling

错误处理:

数据流可能会失败,此时错误会发送到消费者。不过有时候,多个源可能会失败,在这个时候可以选择是否等待所有源完成或失败。为了表示这个机会,许多操作符名称都以 DelayError 字符作为后缀(而其他操作符的重载中包含 DelayErrordelayErrors 布尔标志) :

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

当然,各种后缀可以一起出现:

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);
Base class vs base type

基类与基类型: 由于基类上的静态方法和实例方法的数量很多,可以认为基类很多。RxJava 3 的设计受到 Reactive Streams 规范的严重影响,因此,该库为每个 Reactive 类型提供了一个类和一个接口:

Type

Class

Interface

Consumer

0..N backpressured

Flowable

Publisher1

Subscriber

0..N unbounded

Observable

ObservableSource2

Observer

1 element or error

Single

SingleSource

SingleObserver

0..1 element or error

Maybe

MaybeSource

MaybeObserver

0 element or error

Completable

CompletableSource

CompletableObserver

  • org.reactivestreams.Publisher 是外部 Reactive Streams 库的一部分。它是通过由 Reactive Streams 规范管理的标准化机制与其他响应式编程库交互的主要类型。
  • 接口的变数命名原则是在半传统的类名后面附加 Source。由于 Publisher 是由 Reactive Streams 库提供的,因此不存在 FlowableSource (子类型化也不会有助于互操作)。但是,这些接口并不是 Reactive Streams 规范意义上的标准接口,目前只是 RxJava 特定的接口。

R8 and ProGuard settings

R8 和 ProGuard 设置

默认情况下,RxJava 本身不需要任何 ProGuard/R8 设置,应该可以毫无问题地工作。不幸的是,Reactive Streams 自1.0.3 版本以来的依赖性已经在其 JAR 中嵌入了 Java 9 类文件,这些文件可能会使用普通的 ProGuard 导致警告:

Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher

建议在应用程序的 proguard-ruleset 文件中设置以下-dontwarn 条目:

-dontwarn java.util.concurrent.Flow*

对于 R8,RxJava jar 包含 META-INF/proguard/rxjava3.pro 和相同的 no-warning 子句,这些应该是自动应用的。

依赖

maven

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>x.y.z</version>
</dependency>

Gradle

implementation 'io.reactivex.rxjava3:rxjava:x.y.z'

官方 API Docs (此处以3.0.8版本为例)

http://reactivex.io/RxJava/3.x/javadoc/3.0.8/

核心类

Flowable

io.reactivex.rxjava3.core.Flowable

  • 流数目:0~N(发送0~N个的数据)
  • 支持反应流和反压
public abstract class Flowable<@NonNull T> implements Publisher<T>

这个 类来实现 Reactive Streams 与 Flowable 扩展的 Publishers 一起运作。因此,许多中间的 操作符直接接受通用 Publishers,并允许与其他 Reactive Streams 实现直接互操作。

Flowable 为操作符提供 128 个元素的默认缓冲区大小,可以通过 bufferSize () 访问,可以通过系统参数 rx3.buffer-size 全局重写。但是,大多数操作符都有允许显式设置其内部缓冲区大小的重载。

  /**
     * Flowable 基类测试
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        Disposable d = Flowable.just("Hello world!")
                .delay(1, TimeUnit.SECONDS)
                .subscribeWith(new DisposableSubscriber<String>() {
                    @Override
                    public void onStart() {
                        System.out.println("Start!");
                        request(1);
                    }
                    @Override
                    public void onNext(String t) {
                        System.out.println(t);
                        request(1);
                    }
                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("Done!");
                    }
                });
        Thread.sleep(5000);
        // the sequence can now be cancelled via dispose()
        d.dispose();
    }

输出:

> Task :rx-java-examples:rx-java-chapter-1:FlowableTest.main()
Start!
Hello world!
Done!
  • Reactive Streams 规范在定义发布者和订阅者之间的交互时相对严格,以至于由于某些时间要求和需要通过 Subscription.request (long) 准备无效的请求数量而导致严重的性能损失。
  • 因此,RxJava 引入了 FlowableSubscriber 接口,它表明消费者可以使用放松的规则来驱动。所有的 RxJava 操作符都是根据这些宽松的规则实现的。如果订阅的 Subscriber 没有实现此接口,例如,由于它来自另一个 Reactive Streams 兼容库,Flowable 将自动在其周围应用一个兼容包装。
  • Flowable 是一个抽象类,但是由于要严格遵循大量的 Reactive Streams 规则,不建议通过直接扩展类来实现源和自定义操作符。如果需要这样的自定义实现,请参阅 wiki 获得一些指导。
  • 建议使用 create (FlowableOnSubscribe,backpresssurestrategy) factory 方法来创建自定义 Flowables:
    /**
     * 工厂方法创建 Flowable
     * @param args
     */
    public static void main(String[] args) {
        Flowable<String> source = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                // signal an item
                emitter.onNext("Hello");

                // could be some blocking operation
                Thread.sleep(1000);

                // the consumer might have cancelled the flow
                if (emitter.isCancelled()) {
                    return;
                }

                emitter.onNext("World");

                Thread.sleep(1000);

                // the end-of-sequence has to be signaled, otherwise the
                // consumers may never finish
                emitter.onComplete();
            }        
        }, BackpressureStrategy.BUFFER); // 指定反压策略

        System.out.println("Subscribe!");

        source.subscribe(System.out::println);

        System.out.println("Done!");
    }

输出:

> Task :rx-java-examples:rx-java-chapter-1:FlowableTest2.main()
Subscribe!
Hello
World
Done!

作为 RxJava 响应源,例如 Flowable,通常本质上是同步的和有序的。在 ReactiveX 设计中,操作符运行的位置(线程)与操作符可以处理数据的位置正交。这意味着异步和并行性必须通过 subscribeOn (Scheduler)observeOn (Scheduler)parallel ()等操作符显式表示。通常,带有 Scheduler 参数的操作符将这种类型的异步引入到流中。

Flowable虽然可以通过create()来创建,但是你必须指定反压的策略,以保证你创建的Flowable是支持反压的。

一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用 request(n) 来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。

Flowable.range(0, 10)
                .subscribe(new Subscriber<Integer>() {

                    Subscription sub;
     // 当订阅后,会首先调用这个方法,其实就相当于onStart(),
              // 传入的 Subscription s 参数可以用于请求数据或者取消订阅
                    @Override
                    public void onSubscribe(Subscription s) {
                        System.out.println("onSubscribe START");
                        sub = s;
                        s.request(1);
                        try {
                            TimeUnit.SECONDS.sleep(3);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("onSubscribe END");
                    }

                    @Override
                    public void onNext(Integer v) {
                        System.out.println("onNext: " + v);
                        sub.request(1);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("Done!");
                    }
                });

        Thread.sleep(5000);
        // the sequence can now be cancelled via dispose()
    }
// 输出
onSubscribe START
onSubscribe END
onNext: 0
onNext: 1
...
onNext: 8
onNext: 9
Done!

根据上面的代码的结果输出中可以看到,当我们调用 subscription.request(n) 方法的时候,会等onSubscribe()中后面的代码执行完成后,才会立刻执行到onNext方法。

尽可能确保在request()之前已经完成了所有的初始化工作,否则就有空指针的风险。

Observable

在RxJava中,一个实现了Observer接口的对象可以订阅(subscribe)一个Observable 类的实例。订阅者(subscriber)对Observable发射(emit)的任何数据或数据序列作出响应。这种模式简化了并发操作,因为它不需要阻塞等待Observable发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应Observable的通知。 io.reactivex.rxjava3.core.Observable

  • 流数目:0~N (发送0~N个的数据)
  • 无反压

在这里插入图片描述

ReactiveX真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理Observable发射的数据。 Rx的操作符让你可以用声明式的风格组合异步操作序列,它拥有回调的所有效率优势,同时又避免了典型的异步系统中嵌套回调的缺点。

just

将一个或多个对象转换成发射这个或这些对象的一个 Observable

 public static void main(String[] args) {
        System.out.println("DEMO 1 ---------------");
        Observable.just(1, 2, 3)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        d.dispose();
                        System.out.println("Cancel !");
                    }
                    @Override
                    public void onNext(Integer item) {
                        System.out.println("Next: " + item);
                    }
                    @Override
                    public void onError(Throwable error) {
                        System.err.println("Error: " + error.getMessage());
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("Sequence complete.");
                    }
                });
        System.out.println("DEMO 2 ---------------");
        Observable.just(1, 2, 3)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        System.out.println("onSubscribe ! + " + d.isDisposed());
                    }
                    @Override
                    public void onNext(Integer item) {
                        System.out.println("Next: " + item);
                    }
                    @Override
                    public void onError(Throwable error) {
                        System.err.println("Error: " + error.getMessage());
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("Sequence complete.");
                    }
                });
    }
// 输出
DEMO 1 ---------------
Cancel !
DEMO 2 ---------------
onSubscribe ! + false
Next: 1
Next: 2
Next: 3
Sequence complete
fromArray

将一个数组转换成一个Observable 相似的还有 fromIterable、fromFuture 可以将一个Iterable, 一个Future转换为一个Observable

  // 数组
  System.out.println("DEMO 3 ---------------");
        Integer[] items = {0, 1, 2, 3, 4, 5};
        Observable myObservable = Observable.fromArray(items);
        myObservable.subscribe(next -> System.out.println(next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
  // 异常处理
        System.out.println("DEMO 4 ---------------");
        Integer[] items2 = {0, 1, null, 3, 4, 5};
        myObservable = Observable.fromArray(items2);
        myObservable.subscribe(next -> System.out.println(next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
  // 迭代器
  System.out.println("DEMO 5 ---------------");
        Integer[] items3 = {0, 1, 2, 3, 4, 5};
        myObservable = Observable.fromIterable(Arrays.asList(items3));
        myObservable.subscribe(next -> System.out.println(next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
  // Future
  System.out.println("DEMO 6 ---------------");
        FutureTask<Long> task = new FutureTask<>(() -> System.currentTimeMillis());
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(task);
        try {
            System.out.println("get:" + task.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
        //executorService.shutdown(); 移至最后执行
        myObservable = Observable.fromFuture(task);
        myObservable.subscribe(next -> System.out.println("RECIEVE: " + next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
  // Future 超时
  System.out.println("DEMO 7 ---------------");
        FutureTask<Long> task2 = new FutureTask<>(() -> {
            TimeUnit.SECONDS.sleep(3);
            return System.currentTimeMillis();
        });
        myObservable = Observable.fromFuture(task2, 1, TimeUnit.SECONDS);//等待1s
        executorService.submit(task2);
        myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 1s: " + next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
        System.out.println("DEMO 8 ---------------");
        task2 = new FutureTask<>(() -> {
            TimeUnit.SECONDS.sleep(3);
            return System.currentTimeMillis();
        });
        myObservable = Observable.fromFuture(task2, 4, TimeUnit.SECONDS);//等待4s
        executorService.submit(task2);
        myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 4s: " + next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
        System.out.println("DEMO 9 ---------------");
        task2 = new FutureTask<>(() -> {
            TimeUnit.SECONDS.sleep(3);
            return System.currentTimeMillis();
        });
        myObservable = Observable.fromFuture(task2, 1, TimeUnit.SECONDS);//等待1s
        executorService.submit(task2);
        myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 1s: " + next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
        System.out.println("DEMO 10 ---------------");
        task2 = new FutureTask<>(() -> {
            TimeUnit.SECONDS.sleep(3);
            return System.currentTimeMillis();
        });
        myObservable = Observable.fromFuture(task2, 4, TimeUnit.SECONDS);//等待4s
        executorService.submit(task2);
        myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 4s: " + next),
                error -> System.out.println(error),
                () -> System.out.println("DONE!")
        );
        executorService.shutdown();
// 输出
DEMO 3 ---------------
0
1
2
3
4
5
DONE!
DEMO 4 ---------------
0
1
java.lang.NullPointerException: The element at index 2 is null
DEMO 5 ---------------
0
1
2
3
4
5
DONE!
DEMO 6 ---------------
get:1608780273323
RECIEVE: 1608780273323
DONE!
DEMO 7 ---------------
java.util.concurrent.TimeoutException
DEMO 8 ---------------
RECIEVE WITHIN 4s: 1608781164556
DONE!
DEMO 9 ---------------
java.util.concurrent.TimeoutException
DEMO 10 ---------------
RECIEVE WITHIN 4s: 1608781168578
DONE!
repeat

重复发送数据

 System.out.println("DEMO 1 ---------------");
        Integer[] items3 = {0, 1, 2};
        Observable.fromIterable(Arrays.asList(items3))
                //.repeat() // 一直重复发送
                .repeat(2) // 重复2次
                .subscribe(next -> System.out.println(next),
                        error -> System.out.println(error),
                        () -> System.out.println("DONE!")
                );
        System.out.println("DEMO 2 ---------------");
        Observable<Integer> observable = Observable.create(sub -> {
            System.out.println("START DEMO 2");
            sub.onNext(0);
            sub.onNext(1);
            sub.onComplete();
        });
        observable
                //.repeat() // 一直重复发送
                .map(p -> p * 2)
                .repeat(2) // 重复2次
                .map(p -> "modified: " + p)
                .repeat(2) // 重复2次
                .subscribe(next -> System.out.println(next),
                        error -> System.out.println(error),
                        () -> System.out.println("DONE!")
                );

// 输出
DEMO 1 ---------------
0
1
2
0
1
2
DONE!
DEMO 2 ---------------
START DEMO 2
modified: 0
modified: 2
START DEMO 2
modified: 0
modified: 2
START DEMO 2
modified: 0
modified: 2
START DEMO 2
modified: 0
modified: 2
DONE!
  • repeat 操作符在 Observable 源序列完成时重新订阅 Observable 源(参见 DEMO2)。
  • repeat 操作符重复整个序列重新订阅观察,而不是重复上一个映射操作符,并且在序列重复操作符中使用的位置无关紧要(参见 DEMO2)。
  • repeat 多次执行,最终重复数目等于其重复次数的积
 .repeat(2) // 重复2次
 .repeat(2) // 重复2次
// 相当于:
 .repeat(2*2=4) // 重复4次

Single

RxJava(以及它派生出来的RxGroovy和RxScala)中有一个名为 Single 的Observable变种。 Single类似于Observable,不同的是,它总是只发射一个值,或者一个错误通知,而不是发射一系列的值。 因此,不同于Observable需要三个方法onNext, onError, onCompleted,订阅Single只需要两个方法:

  • onSuccess:Single发射单个的值到这个方法
  • onError:如果无法发射需要的值,Single发射一个Throwable对象到这个方法

推荐阅读