Rx Java 异步编程框架
Rx Java 异步编程框架
- 名词定义
- 举个例子
- 基本概念
- Backpressure
- Upstream, Downstream
- Objects in motion
- Assembly time
- Subscription time
- Runtime
- 特性
- Simple background computation
- Schedulers
- Concurrency within a flow
- Parallel processing
- Dependent sub-flows
- Continuations
- Type conversions
- Operator naming conventions
- R8 and ProGuard settings
- 依赖
- 核心类
- Flowable
- Observable
- Single
- Completable
- Maybe
- 调度器
- 创建操作
- create
- defer
- range
- 变换操作
- flatMap
- take
- filter
- doOnNext
- 错误处理
- 总结
- REFERENCES
在很多软件编程任务中,或多或少你都会期望你写的代码能按照编写的顺序,一次一个的顺序执行和完成。但是在ReactiveX中,很多指令可能是并行执行的,之后他们的执行结果才会被观察者捕获,顺序是不确定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。在这种机制下,存在一个可观察对象(Observable),观察者(Observer)订阅(Subscribe)它,当数据就绪时,之前定义的机制就会分发数据给一直处于等待状态的观察者哨兵。
这种方法的优点是,如果你有大量的任务要处理,它们互相之间没有依赖关系。你可以同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。
有很多术语可用于描述这种异步编程和设计模式,在在本文里我们使用这些术语:一个观察者订阅一个可观察对象 (An observer subscribes to an Observable)。通过调用观察者的方法,Observable发射数据或通知给它的观察者。
在其它的文档和场景里,有时我们也将Observer叫做Subscriber、Watcher、Reactor。这个模型通常被称作Reactor模式。
名词定义
这里给出一些名词的翻译
- Reactive 直译为反应性的,有活性的,根据上下文一般翻译为反应式、响应式;
- Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念;
- Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者;
- Observer 观察者对象,监听 Observable 发射的数据并做出响应,Subscriber 是它的一个特殊实现;
- emit 直译为发射,发布,发出,含义是 Observable 在数据产生或变化时发送通知给 Observer,调用 Observer 对应的方法,文章里一律译为发射;
- items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项;
举个例子
响应式编程
/**
* Rx 测试
*
* @param args
*/
public static void main(String[] args) {
// Demo1
Flowable.just("Hello world!").subscribe(System.out::println);
// Demo2 数据消费
Observable.just("Hello World!").subscribe(System.out::println);
// Demo3 数据中间处理
Observable.just("Hello World!")
.map(p -> p + " Master!")
.subscribe(System.out::println);
// Demo4 输出流数据转换
Observable.just("Hello World!")
.map(p -> p.hashCode())
.subscribe(System.out::println);
Observable.just("Hello World!")
.map(p -> Base64Utils.encodeToString(p.getBytes()))
.map(p -> new String(Base64Utils.decodeFromString(p)))
.subscribe(System.out::println);
}
输出结果:
Hello world!
Hello World!
Hello World! Master!
-969099747
Hello World!
基本概念
Backpressure
在管道运输中,气流或液流由于管道突然变细、急弯等原因导致由某处出现了下游向上游的逆向压力,这种情况称作「back pressure」。这是一个很直观的词:向后的、往回的压力。 在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。 在 RxJava 中反压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
- 反压现象的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
- 反压(Backpressure)并不是一个像 flatMap 一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。
- 生产速度大于消费速度,所以需要 Buffer;
- 外部条件有限制,所以 Buffer 需要有上限;
- Buffer 达到上限这个现象,有一个简化的等价词叫做 Backpressure;
- Backpressure 的出现其实是一种危险边界,唯一的选择是丢弃新事件。
Backpressure 指的是在 Buffer 有上限的系统中,Buffer 溢出的现象;它的应对措施只有一个:丢弃新事件。
当数据流通过异步步骤运行时,每个步骤可以以不同的速度执行不同的操作。
为了避免压倒性的这些步骤,这些步骤通常表现为由于临时缓冲
或需要跳过/删除数据
而增加的内存使用,所谓的反压
被应用,这是一种流控制形式,其中的步骤可以表示它们准备处理多少项。
这允许限制数据流的内存使用,因为通常没有办法让步骤知道上游将向它发送多少条目。
在 RxJava 中,专用的 Flowable 类被指定用于支持反压,Observable 专用于非反压操作(短序列、 GUI 交互等)。其他类型,Single
, Maybe
和 Completable
不支持反压,也不应该,总是有空间暂时存储一个元素。
Upstream, Downstream
上游、下游: RxJava 中的数据流包括一个源、零个或多个中间步骤,然后是数据消费者或组合子步骤(其中该步骤负责通过某种方式使用数据流) :
source.operator1()
.operator2()
.operator3()
.subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
在这里,如果我们把自己想象成操作者2,向左看向源头,我们称之为上游。向右看向订阅者/使用者称为下游。
Objects in motion
运动中的物体: 在 RxJava 的文档中,emission、 emission、 item、 event、 signal、 data 和 message 被认为是同义词,表示沿着数据流传输的对象。
Assembly time
装配时间:
通过应用各种中间操作符来准备数据流的过程发生在所谓的组装时间:
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);
在这一点上,数据还没有流动,也没有发生副作用。
Subscription time
订阅时间:
这是对在内部建立处理步骤链的流调用 subscribe () 时的临时状态:
flow.subscribe(System.out::println)
这时会触发订阅副作用(请参见 doOnSubscribe)。在这种状态下,某些源会立即阻塞或开始发送项。
Runtime
运行时:
这是当流处于主动发出元素、错误或完成信号时的状态:
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
实际上,这是在执行上面给定示例的主体时触发的。
特性
Simple background computation
简单的背景计算:
RxJava 的一个常见用例是在后台线程上运行一些计算、网络请求,并在 UI 线程上显示结果(或错误) :
import io.reactivex.rxjava3.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
这种类型的链接方法称为 fluent API,类似于构建器模式。但是,RxJava 的反应类型是不可变的;每个方法调用都返回一个带有添加行为的新 Flowable。为了说明,这个例子可以重写如下:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
通常,您可以通过 subscribeOn 将计算或阻塞 IO 移动到其他线程。一旦数据准备就绪,您可以确保通过 observeOn 在前台或 GUI 线程上对它们进行处理。
Schedulers
调度器: RxJava 操作符不直接与线程或 ExecutorServices 一起工作,而是与所谓的
Scheduler
一起工作,这些有用的类来自统一的 API. RxJava 3 并发抽象,其提供了几个标准的调度器。
-
Schedulers.computation()
:在后台固定数量的专用线程上运行计算密集型工作。大多数异步操作符都将此作为默认值Scheduler
。 -
Schedulers.io()
:在一组动态更改的线程上运行类I/O
或阻塞操作。 -
Schedulers.single()
:以顺序和 FIFO 方式在单个线程上运行工作。 -
Schedulers.trampoline()
:在一个参与的线程中,以顺序和 FIFO 的方式运行工作,通常是为了测试目的。
此外,还有一个选项可以通过 Scheduler 将现有的 Executor (及其子类型,如 ExecutorService)封装到 Scheduler 中Schedulers.from(Executor)
。例如,可以使用它来拥有一个更大但仍然固定的线程池(分别与 calculation() 和 io() 不同)。
Observable.just("java", "C++", "python")
//.observeOn(Schedulers.io())
.observeOn(Schedulers.from(Executors.newFixedThreadPool(5)))
.subscribe(System.out::println);
Thread.sleep (2000) ;
最后并非偶然。在 RxJava 中,默认的调度程序运行在守护线程上,这意味着一旦 Java 主线程退出,它们就全部停止,后台计算可能永远不会发生。在这个示例情况中,休眠一段时间可以让您在控制台上看到流的输出,并节省时间。
Concurrency within a flow
流中的并发性:
在 RxJava 中,流本质上是连续的,可以被分割成可以并发运行的处理阶段:
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
此示例在计算用的调度器 Scheduler 上将数字从1平方到10,并在“主”线程(更准确地说,blockingSubscribe 的调用方线程)上消费结果。但是,lambda v-> v * v
对于这个流并不是并行运行;它在同一个计算线程上一个接一个地接收值1到10。
Parallel processing
并行处理:
并行处理数字1到10的过程要稍微复杂一些:
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
实际上,RxJava 中的并行性意味着运行独立的流并将它们的结果合并回单个流。运算符 flatMap 首先将每个数字从1到10映射到它自己的 Flowable,然后运行它们并合并计算出的平方。
但是请注意,flatMap 并不保证任何顺序,内部流中的项可能最终交叉存取。还有一些替代操作符:
-
concatMap
:它一次映射并运行一个内部流程。 -
concatMapEager
:它“同时”运行所有内部流,但输出流将按照内部流创建的顺序进行。
另外,Flowable.parallel () 操作符和 ParallelFlowable 类型有助于实现相同的并行处理模式:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
Dependent sub-flows
依赖子流:
flatMap 是一个强大的操作符,在很多情况下都有帮助。例如,给定一个返回 Flowable 的服务,我们希望调用另一个服务,其值由第一个服务发出:
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
.flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
.map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
.subscribe(System.out::println);
Continuations
延续: 有时候,当一个项变得可用时,需要在它上面执行一些依赖的计算。这有时被称为延续,并且根据应该发生什么以及涉及到什么类型,可能需要使用各种操作符来完成。
Dependent
依赖:
最典型的场景是给定一个值,调用另一个服务,等待并继续其结果:
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
通常情况下,后面的序列也需要早期映射的值。这可以通过将外部 flatMap 移动到先前 flatMap 的内部来实现,例如:
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
在这里,原始值将在内部 flatMap 中可用,由 lambda 变量捕获提供。
Non-dependent
非依赖性的:
在其他场景中,第一个源/数据流的结果是不相关的,人们希望继续使用准独立的另一个源。在这里,flatMap 也可以工作:
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
然而,继续在这种情况下保持 Observable
而不是可能更合适的 Single
。(这是可以理解的,因为从 flatMapSingle 的角度来看,sourceObservable 是一个多值源,因此映射也可能导致多个值)。
不过通常有一种方法可以使用 Completable 作为中介,通过它的操作符 andThen
来继续做一些事情:
sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString())
在 sourceObservable
和 someSingleSource
之间的唯一依赖性是前者应该正常完成,以便后者被使用。
Deferred-dependent
依赖性递延:
有时,在前一个序列和新序列之间存在一个隐式的数据依赖关系,由于某种原因,这个依赖关系没有通过“常规通道”。人们倾向于写下这样的延续:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
不幸的是,当数据流甚至还没有运行时,因为 Single.just(count.get())
将在assembly time
时计算,所以这个命令将打印 0。我们需要一些东西,推迟这个Single
源的计算,直到主源完成运行时:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
// 或者
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
Type conversions
类型转换: 有时候,源或服务返回的类型与应该与其一起工作的流不同。例如,在上面的库存示例中,
getDemandAsync
可以返回Single<DemandRecord>
。如果代码示例保持不变,将导致编译时错误(然而,通常会出现关于缺少重载的误导性错误消息)。 在这种情况下,通常有两个选项来修复转换: 1) 转换为所需的类型; 2) 查找并使用支持不同类型的特定运算符的重载。
Converting to the desired type
转换为所需的类型 每个反应性基类都包含能够执行此类转换(包括协议转换)以匹配其他类型的操作符。下表显示了现有的转换选项:
Flowable |
Observable |
Single |
Maybe |
Completable |
|
Flowable |
toObservable |
first, firstOrError, single, singleOrError, last, lastOrError1 |
firstElement, singleElement, lastElement |
ignoreElements |
|
Observable |
toFlowable2 |
first, firstOrError, single, singleOrError, last, lastOrError1 |
firstElement, singleElement, lastElement |
ignoreElements |
|
Single |
toFlowable3 |
toObservable |
toMaybe |
ignoreElement |
|
Maybe |
toFlowable3 |
toObservable |
toSingle |
ignoreElement |
|
Completable |
toFlowable |
toObservable |
toSingle |
toMaybe |
- 将多值源转换为单值源时,应该决定将众多源值中的哪一个作为结果。
- 将
Observable
转化为Flowable
需要额外的决定:如何处理Observable
源的潜在无约束流?有几个策略可用(如缓冲,下降,保持最新)通过BackpressureStrategy
参数或通过标准 Flowable 操作符,如onBackpressureBuffer
,onBackpressureDrop
,onBackpressureLatest
,这也允许进一步定制的反压行为。 - 当只有(最多)一个来源项目,这是一个没有问题的反压,因为它可以始终存储,直到下游准备消费。
Using an overload with the desired type
使用所需类型的重载: 许多经常使用的操作符具有可以处理其他类型的重载。它们通常以目标类型的后缀命名:
Operator |
Overloads |
---|---|
flatMap |
flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable |
concatMap |
concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable |
switchMap |
switchMapSingle, switchMapMaybe, switchMapCompletable |
这些运算符使用后缀而不是简单地使用相同的名称和不同的签名的原因是类型消除。Java 并不认为操作符 operator(Function<T, Single<R>>)
和操作符 operator(Function<T, Maybe<R>>)
是不同的(不像 c #) ,而且由于泛型信息的擦除,这两个操作符最终会成为具有相同签名的重复方法。
Operator naming conventions
运算符命名约定: 在编程中,命名是最困难的事情之一,因为名字被认为不应该太长,表达力强,捕捉力强,容易记住。不幸的是,目标语言(以及预先存在的约定)在这方面可能不会提供太多帮助(不可用的关键字、类型擦除、类型歧义等)。
Unusable keywords
无法使用的关键字 在原始的 Rx.NET 中,发出一个条目然后完成的操作符叫做 Return (t)。由于 Java 约定是以小写字母开头的方法名称,所以这将是 return (t) ,它是 Java 中的一个关键字,因此不可用。因此,RxJava 选择将这个操作符命名为
just(T)
。运算符Switch
也存在同样的限制,它必须命名为switchOnNext
。另一个例子是 Catch,它命名为onErrorResumeNext
。
Type erasure
类型擦除:
许多期望用户提供返回反应类型的函数的操作符不能重载,因为围绕 Function<T, X>
的类型擦除将这些方法签名转换为重复类型。选择通过添加类型作为后缀来命名这些操作符:
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
Type ambiguities
类型歧义:
尽管某些运算符在类型擦除方面没有问题,但它们的签名可能会变得模棱两可,特别是如果使用 java8 和 lambdas。例如,concatWith 使用各种其他反应性基类型作为参数(为了在底层实现中提供方便性和性能优势) ,存在几个重载:
Flowable<T> concatWith(Publisher<? extends T> other);
Flowable<T> concatWith(SingleSource<? extends T> other);
Publisher 和 SingleSource 都显示为函数接口(带有一个抽象方法的类型) ,并可能鼓励用户尝试提供一个 lambda 表达式:
someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);
不幸的是,这种方法不起作用,而且示例根本不会打印2。事实上,自2.1.10版本以来,它甚至不能编译,因为至少存在4个 concatWith 重载,并且编译器会发现上面的代码不明确。
在这种情况下,用户可能希望推迟一些计算,直到 someSource 完成,因此正确的无歧义运算符应该推迟:
someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);
有时候,添加后缀是为了避免逻辑上的歧义,这样可能会编译,但在流程中产生错误的类型:
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> mergeArray(Publisher<? extends T>... sources);
当函数接口类型作为类型参数 T
参与时,这也可能会变得模糊不清。
Error handling
错误处理:
数据流可能会失败,此时错误会发送到消费者。不过有时候,多个源可能会失败,在这个时候可以选择是否等待所有源完成或失败。为了表示这个机会,许多操作符名称都以 DelayError
字符作为后缀(而其他操作符的重载中包含 DelayError
或 delayErrors
布尔标志) :
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
当然,各种后缀可以一起出现:
Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);
Base class vs base type
基类与基类型: 由于基类上的静态方法和实例方法的数量很多,可以认为基类很多。
RxJava 3
的设计受到 Reactive Streams 规范的严重影响,因此,该库为每个 Reactive 类型提供了一个类和一个接口:
Type |
Class |
Interface |
Consumer |
---|---|---|---|
0..N backpressured |
Flowable |
Publisher1 |
Subscriber |
0..N unbounded |
Observable |
ObservableSource2 |
Observer |
1 element or error |
Single |
SingleSource |
SingleObserver |
0..1 element or error |
Maybe |
MaybeSource |
MaybeObserver |
0 element or error |
Completable |
CompletableSource |
CompletableObserver |
-
org.reactivestreams.Publisher
是外部Reactive Streams
库的一部分。它是通过由Reactive Streams
规范管理的标准化机制与其他响应式编程库交互的主要类型。 - 接口的变数命名原则是在半传统的类名后面附加 Source。由于 Publisher 是由 Reactive Streams 库提供的,因此不存在 FlowableSource (子类型化也不会有助于互操作)。但是,这些接口并不是 Reactive Streams 规范意义上的标准接口,目前只是 RxJava 特定的接口。
R8 and ProGuard settings
R8 和 ProGuard 设置
默认情况下,RxJava 本身不需要任何 ProGuard/R8 设置,应该可以毫无问题地工作。不幸的是,Reactive Streams 自1.0.3 版本以来的依赖性已经在其 JAR 中嵌入了 Java 9 类文件,这些文件可能会使用普通的 ProGuard 导致警告:
Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher
建议在应用程序的 proguard-ruleset 文件中设置以下-dontwarn
条目:
-dontwarn java.util.concurrent.Flow*
对于 R8,RxJava jar 包含 META-INF/proguard/rxjava3.pro 和相同的 no-warning 子句,这些应该是自动应用的。
依赖
maven
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>x.y.z</version>
</dependency>
Gradle
implementation 'io.reactivex.rxjava3:rxjava:x.y.z'
官方 API Docs (此处以3.0.8版本为例)
http://reactivex.io/RxJava/3.x/javadoc/3.0.8/
核心类
Flowable
io.reactivex.rxjava3.core.Flowable
- 流数目:0~N(发送0~N个的数据)
- 支持反应流和反压
public abstract class Flowable<@NonNull T> implements Publisher<T>
这个 类来实现 Reactive Streams 与 Flowable 扩展的 Publishers 一起运作。因此,许多中间的 操作符直接接受通用 Publishers,并允许与其他 Reactive Streams 实现直接互操作。
Flowable 为操作符提供 128 个元素的默认缓冲区大小,可以通过 bufferSize () 访问,可以通过系统参数 rx3.buffer-size 全局重写。但是,大多数操作符都有允许显式设置其内部缓冲区大小的重载。
/**
* Flowable 基类测试
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
Disposable d = Flowable.just("Hello world!")
.delay(1, TimeUnit.SECONDS)
.subscribeWith(new DisposableSubscriber<String>() {
@Override
public void onStart() {
System.out.println("Start!");
request(1);
}
@Override
public void onNext(String t) {
System.out.println(t);
request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(5000);
// the sequence can now be cancelled via dispose()
d.dispose();
}
输出:
> Task :rx-java-examples:rx-java-chapter-1:FlowableTest.main()
Start!
Hello world!
Done!
- Reactive Streams 规范在定义发布者和订阅者之间的交互时相对严格,以至于由于某些时间要求和需要通过 Subscription.request (long) 准备无效的请求数量而导致严重的性能损失。
- 因此,RxJava 引入了 FlowableSubscriber 接口,它表明消费者可以使用放松的规则来驱动。所有的 RxJava 操作符都是根据这些宽松的规则实现的。如果订阅的
Subscriber
没有实现此接口,例如,由于它来自另一个 Reactive Streams 兼容库,Flowable 将自动在其周围应用一个兼容包装。 - Flowable 是一个抽象类,但是由于要严格遵循大量的 Reactive Streams 规则,不建议通过直接扩展类来实现源和自定义操作符。如果需要这样的自定义实现,请参阅 wiki 获得一些指导。
- 建议使用 create (FlowableOnSubscribe,backpresssurestrategy) factory 方法来创建自定义 Flowables:
/**
* 工厂方法创建 Flowable
* @param args
*/
public static void main(String[] args) {
Flowable<String> source = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// signal an item
emitter.onNext("Hello");
// could be some blocking operation
Thread.sleep(1000);
// the consumer might have cancelled the flow
if (emitter.isCancelled()) {
return;
}
emitter.onNext("World");
Thread.sleep(1000);
// the end-of-sequence has to be signaled, otherwise the
// consumers may never finish
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER); // 指定反压策略
System.out.println("Subscribe!");
source.subscribe(System.out::println);
System.out.println("Done!");
}
输出:
> Task :rx-java-examples:rx-java-chapter-1:FlowableTest2.main()
Subscribe!
Hello
World
Done!
作为 RxJava 响应源,例如 Flowable,通常本质上是同步的和有序的。在 ReactiveX 设计中,操作符运行的位置(线程)与操作符可以处理数据的位置正交。这意味着异步和并行性必须通过 subscribeOn (Scheduler)
、 observeOn (Scheduler)
和 parallel ()
等操作符显式表示。通常,带有 Scheduler
参数的操作符将这种类型的异步引入到流中。
Flowable虽然可以通过create()来创建,但是你必须指定反压的策略,以保证你创建的Flowable是支持反压的。
一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用 request(n) 来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。
Flowable.range(0, 10)
.subscribe(new Subscriber<Integer>() {
Subscription sub;
// 当订阅后,会首先调用这个方法,其实就相当于onStart(),
// 传入的 Subscription s 参数可以用于请求数据或者取消订阅
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe START");
sub = s;
s.request(1);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("onSubscribe END");
}
@Override
public void onNext(Integer v) {
System.out.println("onNext: " + v);
sub.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(5000);
// the sequence can now be cancelled via dispose()
}
// 输出
onSubscribe START
onSubscribe END
onNext: 0
onNext: 1
...
onNext: 8
onNext: 9
Done!
根据上面的代码的结果输出中可以看到,当我们调用 subscription.request(n) 方法的时候,会等onSubscribe()中后面的代码执行完成后,才会立刻执行到onNext方法。
尽可能确保在request()之前已经完成了所有的初始化工作,否则就有空指针的风险。
Observable
在RxJava中,一个实现了Observer接口的对象可以订阅(subscribe)一个Observable 类的实例。订阅者(subscriber)对Observable发射(emit)的任何数据或数据序列作出响应。这种模式简化了并发操作,因为它不需要阻塞等待Observable发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应Observable的通知。 io.reactivex.rxjava3.core.Observable
- 流数目:0~N (发送0~N个的数据)
- 无反压
在这里插入图片描述
ReactiveX真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理Observable发射的数据。 Rx的操作符让你可以用声明式的风格组合异步操作序列,它拥有回调的所有效率优势,同时又避免了典型的异步系统中嵌套回调的缺点。
just
将一个或多个对象转换成发射这个或这些对象的一个 Observable
public static void main(String[] args) {
System.out.println("DEMO 1 ---------------");
Observable.just(1, 2, 3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
d.dispose();
System.out.println("Cancel !");
}
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("Sequence complete.");
}
});
System.out.println("DEMO 2 ---------------");
Observable.just(1, 2, 3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe ! + " + d.isDisposed());
}
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("Sequence complete.");
}
});
}
// 输出
DEMO 1 ---------------
Cancel !
DEMO 2 ---------------
onSubscribe ! + false
Next: 1
Next: 2
Next: 3
Sequence complete
fromArray
将一个数组转换成一个Observable 相似的还有 fromIterable、fromFuture 可以将一个Iterable, 一个Future转换为一个Observable
// 数组
System.out.println("DEMO 3 ---------------");
Integer[] items = {0, 1, 2, 3, 4, 5};
Observable myObservable = Observable.fromArray(items);
myObservable.subscribe(next -> System.out.println(next),
error -> System.out.println(error),
() -> System.out.println("DONE!")
);
// 异常处理
System.out.println("DEMO 4 ---------------");
Integer[] items2 = {0, 1, null, 3, 4, 5};
myObservable = Observable.fromArray(items2);
myObservable.subscribe(next -> System.out.println(next),
error -> System.out.println(error),
() -> System.out.println("DONE!")
);
// 迭代器
System.out.println("DEMO 5 ---------------");
Integer[] items3 = {0, 1, 2, 3, 4, 5};
myObservable = Observable.fromIterable(Arrays.asList(items3));
myObservable.subscribe(next -> System.out.println(next),
error -> System.out.println(error),
() -> System.out.println("DONE!")
);
// Future
System.out.println("DEMO 6 ---------------");
FutureTask<Long> task = new FutureTask<>(() -> System.currentTimeMillis());
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(task);
try {
System.out.println("get:" + task.get());
} catch (Exception e) {
e.printStackTrace();
}
//executorService.shutdown(); 移至最后执行
myObservable = Observable.fromFuture(task);
myObservable.subscribe(next -> System.out.println("RECIEVE: " + next),
error -> System.out.println(error),
() -> System.out.println("DONE!")
);
// Future 超时
System.out.println("DEMO 7 ---------------");
FutureTask<Long> task2 = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(3);
return System.currentTimeMillis();
});
myObservable = Observable.fromFuture(task2, 1, TimeUnit.SECONDS);//等待1s
executorService.submit(task2);
myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 1s: " + next),
error -> System.out.println(error),
() -> System.out.println("DONE!")
);
System.out.println("DEMO 8 ---------------");
task2 = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(3);
return System.currentTimeMillis();
});
myObservable = Observable.fromFuture(task2, 4, TimeUnit.SECONDS);//等待4s
executorService.submit(task2);
myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 4s: " + next),
error -> System.out.println(error),
() -> System.out.println("DONE!")
);
System.out.println("DEMO 9 ---------------");
task2 = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(3);
return System.currentTimeMillis();
});
myObservable = Observable.fromFuture(task2, 1, TimeUnit.SECONDS);//等待1s
executorService.submit(task2);
myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 1s: " + next),
error -> System.out.println(error),
() -> System.out.println("DONE!")
);
System.out.println("DEMO 10 ---------------");
task2 = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(3);
return System.currentTimeMillis();
});
myObservable = Observable.fromFuture(task2, 4, TimeUnit.SECONDS);//等待4s
executorService.submit(task2);
myObservable.subscribe(next -> System.out.println("RECIEVE WITHIN 4s: " + next),
error -> System.out.println(error),
() -> System.out.println("DONE!")
);
executorService.shutdown();
// 输出
DEMO 3 ---------------
0
1
2
3
4
5
DONE!
DEMO 4 ---------------
0
1
java.lang.NullPointerException: The element at index 2 is null
DEMO 5 ---------------
0
1
2
3
4
5
DONE!
DEMO 6 ---------------
get:1608780273323
RECIEVE: 1608780273323
DONE!
DEMO 7 ---------------
java.util.concurrent.TimeoutException
DEMO 8 ---------------
RECIEVE WITHIN 4s: 1608781164556
DONE!
DEMO 9 ---------------
java.util.concurrent.TimeoutException
DEMO 10 ---------------
RECIEVE WITHIN 4s: 1608781168578
DONE!
repeat
重复发送数据
System.out.println("DEMO 1 ---------------");
Integer[] items3 = {0, 1, 2};
Observable.fromIterable(Arrays.asList(items3))
//.repeat() // 一直重复发送
.repeat(2) // 重复2次
.subscribe(next -> System.out.println(next),
error -> System.out.println(error),
() -> System.out.println("DONE!")
);
System.out.println("DEMO 2 ---------------");
Observable<Integer> observable = Observable.create(sub -> {
System.out.println("START DEMO 2");
sub.onNext(0);
sub.onNext(1);
sub.onComplete();
});
observable
//.repeat() // 一直重复发送
.map(p -> p * 2)
.repeat(2) // 重复2次
.map(p -> "modified: " + p)
.repeat(2) // 重复2次
.subscribe(next -> System.out.println(next),
error -> System.out.println(error),
() -> System.out.println("DONE!")
);
// 输出
DEMO 1 ---------------
0
1
2
0
1
2
DONE!
DEMO 2 ---------------
START DEMO 2
modified: 0
modified: 2
START DEMO 2
modified: 0
modified: 2
START DEMO 2
modified: 0
modified: 2
START DEMO 2
modified: 0
modified: 2
DONE!
-
repeat
操作符在Observable
源序列完成时重新订阅Observable
源(参见 DEMO2)。 -
repeat
操作符重复整个序列重新订阅观察,而不是重复上一个映射操作符,并且在序列重复操作符中使用的位置无关紧要(参见 DEMO2)。 -
repeat
多次执行,最终重复数目等于其重复次数的积
.repeat(2) // 重复2次
.repeat(2) // 重复2次
// 相当于:
.repeat(2*2=4) // 重复4次
Single
RxJava(以及它派生出来的RxGroovy和RxScala)中有一个名为 Single 的Observable变种。 Single类似于Observable,不同的是,它总是只发射一个值,或者一个错误通知,而不是发射一系列的值。 因此,不同于Observable需要三个方法onNext, onError, onCompleted,订阅Single只需要两个方法:
- onSuccess:Single发射单个的值到这个方法
- onError:如果无法发射需要的值,Single发射一个Throwable对象到这个方法
推荐阅读
-
JAVA 并发编程系列 (13) Future、FutureTask 异步小王子
-
SSM三大框架基础面试题-一、Spring篇 什么是Spring框架? Spring是一种轻量级框架,提高开发人员的开发效率以及系统的可维护性。 我们一般说的Spring框架就是Spring Framework,它是很多模块的集合,使用这些模块可以很方便地协助我们进行开发。这些模块是核心容器、数据访问/集成、Web、AOP(面向切面编程)、工具、消息和测试模块。比如Core Container中的Core组件是Spring所有组件的核心,Beans组件和Context组件是实现IOC和DI的基础,AOP组件用来实现面向切面编程。 Spring的6个特征: 核心技术:依赖注入(DI),AOP,事件(Events),资源,i18n,验证,数据绑定,类型转换,SpEL。 测试:模拟对象,TestContext框架,Spring MVC测试,WebTestClient。 数据访问:事务,DAO支持,JDBC,ORM,编组XML。 Web支持:Spring MVC和Spring WebFlux Web框架。 集成:远程处理,JMS,JCA,JMX,电子邮件,任务,调度,缓存。 语言:Kotlin,Groovy,动态语言。 列举一些重要的Spring模块? Spring Core:核心,可以说Spring其他所有的功能都依赖于该类库。主要提供IOC和DI功能。 Spring Aspects:该模块为与AspectJ的集成提供支持。 Spring AOP:提供面向切面的编程实现。 Spring JDBC:Java数据库连接。 Spring JMS:Java消息服务。 Spring ORM:用于支持Hibernate等ORM工具。 Spring Web:为创建Web应用程序提供支持。 Spring Test:提供了对JUnit和TestNG测试的支持。 谈谈自己对于Spring IOC和AOP的理解 IOC(Inversion Of Controll,控制反转)是一种设计思想: 在程序中手动创建对象的控制权,交由给Spring框架来管理。IOC在其他语言中也有应用,并非Spring特有。IOC容器实际上就是一个Map(key, value),Map中存放的是各种对象。 将对象之间的相互依赖关系交给IOC容器来管理,并由IOC容器完成对象的注入。这样可以很大程度上简化应用的开发,把应用从复杂的依赖关系中解放出来。IOC容器就像是一个工厂一样,当我们需要创建一个对象的时候,只需要配置好配置文件/注解即可,完全不用考虑对象是如何被创建出来的。在实际项目中一个Service类可能由几百甚至上千个类作为它的底层,假如我们需要实例化这个Service,可能要每次都搞清楚这个Service所有底层类的构造函数,这可能会把人逼疯。如果利用IOC的话,你只需要配置好,然后在需要的地方引用就行了,大大增加了项目的可维护性且降低了开发难度。 Spring中的bean的作用域有哪些? 1.singleton:该bean实例为单例 2.prototype:每次请求都会创建一个新的bean实例(多例)。 3.request:每一次HTTP请求都会产生一个新的bean,该bean仅在当前HTTP request内有效。 4.session:每一次HTTP请求都会产生一个新的bean,该bean仅在当前HTTP session内有效。 5.global-session:全局session作用域,仅仅在基于Portlet的Web应用中才有意义,Spring5中已经没有了。Portlet是能够生成语义代码(例如HTML)片段的小型Java Web插件。它们基于Portlet容器,可以像Servlet一样处理HTTP请求。但是与Servlet不同,每个Portlet都有不同的会话。 Spring中的单例bean的线程安全问题了解吗? 概念用于理解:大部分时候我们并没有在系统中使用多线程,所以很少有人会关注这个问题。单例bean存在线程问题,主要是因为当多个线程操作同一个对象的时候,对这个对象的非静态成员变量的写操作会存在线程安全问题。 有两种常见的解决方案(用于回答的点): 1.在bean对象中尽量避免定义可变的成员变量(不太现实)。 2.在类中定义一个ThreadLocal成员变量,将需要的可变成员变量保存在ThreadLocal(线程本地化对象)中(推荐的一种方式)。 ThreadLocal解决多线程变量共享问题(参考博客):https://segmentfault.com/a/1190000009236777 Spring中Bean的生命周期: 1.Bean容器找到配置文件中Spring Bean的定义。 2.Bean容器利用Java Reflection API创建一个Bean的实例。 3.如果涉及到一些属性值,利用set方法设置一些属性值。 4.如果Bean实现了BeanNameAware接口,调用setBeanName方法,传入Bean的名字。 5.如果Bean实现了BeanClassLoaderAware接口,调用setBeanClassLoader方法,传入ClassLoader对象的实例。 6.如果Bean实现了BeanFactoryAware接口,调用setBeanClassFacotory方法,传入ClassLoader对象的实例。 7.与上面的类似,如果实现了其他*Aware接口,就调用相应的方法。 8.如果有和加载这个Bean的Spring容器相关的BeanPostProcessor对象,执postProcessBeforeInitialization方法。 9.如果Bean实现了InitializingBean接口,执行afeterPropertiesSet方法。 10.如果Bean在配置文件中的定义包含init-method属性,执行指定的方法。 11.如果有和加载这个Bean的Spring容器相关的BeanPostProcess对象,执行postProcessAfterInitialization方法。 12.当要销毁Bean的时候,如果Bean实现了DisposableBean接口,执行destroy方法。 13.当要销毁Bean的时候,如果Bean在配置文件中的定义包含destroy-method属性,执行指定的方法。 Spring框架中用到了哪些设计模式? 1.工厂设计模式:Spring使用工厂模式通过BeanFactory和ApplicationContext创建bean对象。 2.代理设计模式:Spring AOP功能的实现。 3.单例设计模式:Spring中的bean默认都是单例的。 4.模板方法模式:Spring中的jdbcTemplate、hibernateTemplate等以Template结尾的对数据库操作的类,它们就使用到了模板模式。 5.包装器设计模式:我们的项目需要连接多个数据库,而且不同的客户在每次访问中根据需要会去访问不同的数据库。这种模式让我们可以根据客户的需求能够动态切换不同的数据源。 6.观察者模式:Spring事件驱动模型就是观察者模式很经典的一个应用。 7.适配器模式:Spring AOP的增强或通知(Advice)使用到了适配器模式、Spring MVC中也是用到了适配器模式适配Controller。 还有很多。。。。。。。 @Component和@Bean的区别是什么 1.作用对象不同。@Component注解作用于类,而@Bean注解作用于方法。 2.@Component注解通常是通过类路径扫描来自动侦测以及自动装配到Spring容器中(我们可以使用@ComponentScan注解定义要扫描的路径)。@Bean注解通常是在标有该注解的方法中定义产生这个bean,告诉Spring这是某个类的实例,当我需要用它的时候还给我。 3.@Bean注解比@Component注解的自定义性更强,而且很多地方只能通过@Bean注解来注册bean。比如当引用第三方库的类需要装配到Spring容器的时候,就只能通过@Bean注解来实现。 @Configuration public class AppConfig { @Bean public TransferService transferService { return new TransferServiceImpl; } } <beans> <bean id="transferService" class="com.kk.TransferServiceImpl"/> </beans> @Bean public OneService getService(status) { case (status) { when 1: return new serviceImpl1; when 2: return new serviceImpl2; when 3: return new serviceImpl3; } } 将一个类声明为Spring的bean的注解有哪些? 声明bean的注解: @Component 组件,没有明确的角色 @Service 在业务逻辑层使用(service层) @Repository 在数据访问层使用(dao层) @Controller 在展现层使用,控制器的声明 注入bean的注解: @Autowired:由Spring提供 @Inject:由JSR-330提供 @Resource:由JSR-250提供 *扩:JSR 是 java 规范标准 Spring事务管理的方式有几种? 1.编程式事务:在代码中硬编码(不推荐使用)。 2.声明式事务:在配置文件中配置(推荐使用),分为基于XML的声明式事务和基于注解的声明式事务。 Spring事务中的隔离级别有哪几种? 在TransactionDefinition接口中定义了五个表示隔离级别的常量:ISOLATION_DEFAULT:使用后端数据库默认的隔离级别,Mysql默认采用的REPEATABLE_READ隔离级别;Oracle默认采用的READ_COMMITTED隔离级别。ISOLATION_READ_UNCOMMITTED:最低的隔离级别,允许读取尚未提交的数据变更,可能会导致脏读、幻读或不可重复读。ISOLATION_READ_COMMITTED:允许读取并发事务已经提交的数据,可以阻止脏读,但是幻读或不可重复读仍有可能发生ISOLATION_REPEATABLE_READ:对同一字段的多次读取结果都是一致的,除非数据是被本身事务自己所修改,可以阻止脏读和不可重复读,但幻读仍有可能发生。ISOLATION_SERIALIZABLE:最高的隔离级别,完全服从ACID的隔离级别。所有的事务依次逐个执行,这样事务之间就完全不可能产生干扰,也就是说,该级别可以防止脏读、不可重复读以及幻读。但是这将严重影响程序的性能。通常情况下也不会用到该级别。 Spring事务中有哪几种事务传播行为? 在TransactionDefinition接口中定义了八个表示事务传播行为的常量。 支持当前事务的情况:PROPAGATION_REQUIRED:如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务。PROPAGATION_SUPPORTS: 如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行。PROPAGATION_MANDATORY: 如果当前存在事务,则加入该事务;如果当前没有事务,则抛出异常。(mandatory:强制性)。 不支持当前事务的情况:PROPAGATION_REQUIRES_NEW: 创建一个新的事务,如果当前存在事务,则把当前事务挂起。PROPAGATION_NOT_SUPPORTED: 以非事务方式运行,如果当前存在事务,则把当前事务挂起。PROPAGATION_NEVER: 以非事务方式运行,如果当前存在事务,则抛出异常。 其他情况:PROPAGATION_NESTED: 如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行;如果当前没有事务,则该取值等价于PROPAGATION_REQUIRED。 二、SpringMVC篇 什么是Spring MVC ?简单介绍下你对springMVC的理解? Spring MVC是一个基于Java的实现了MVC设计模式的请求驱动类型的轻量级Web框架,通过把Model,View,Controller分离,将web层进行职责解耦,把复杂的web应用分成逻辑清晰的几部分,简化开发,减少出错,方便组内开发人员之间的配合。 Spring MVC的工作原理了解嘛? image.png Springmvc的优点: (1)可以支持各种视图技术,而不仅仅局限于JSP; (2)与Spring框架集成(如IoC容器、AOP等); (3)清晰的角色分配:前端控制器(dispatcherServlet) , 请求到处理器映射(handlerMapping), 处理器适配器(HandlerAdapter), 视图解析器(ViewResolver)。 (4) 支持各种请求资源的映射策略。 Spring MVC的主要组件? (1)前端控制器 DispatcherServlet(不需要程序员开发) 作用:接收请求、响应结果,相当于转发器,有了DispatcherServlet 就减少了其它组件之间的耦合度。 (2)处理器映射器HandlerMapping(不需要程序员开发) 作用:根据请求的URL来查找Handler (3)处理器适配器HandlerAdapter 注意:在编写Handler的时候要按照HandlerAdapter要求的规则去编写,这样适配器HandlerAdapter才可以正确的去执行Handler。 (4)处理器Handler(需要程序员开发) (5)视图解析器 ViewResolver(不需要程序员开发) 作用:进行视图的解析,根据视图逻辑名解析成真正的视图(view) (6)视图View(需要程序员开发jsp) View是一个接口, 它的实现类支持不同的视图类型(jsp,freemarker,pdf等等) springMVC和struts2的区别有哪些? (1)springmvc的入口是一个servlet即前端控制器(DispatchServlet),而struts2入口是一个filter过虑器(StrutsPrepareAndExecuteFilter)。 (2)springmvc是基于方法开发(一个url对应一个方法),请求参数传递到方法的形参,可以设计为单例或多例(建议单例),struts2是基于类开发,传递参数是通过类的属性,只能设计为多例。 (3)Struts采用值栈存储请求和响应的数据,通过OGNL存取数据,springmvc通过参数解析器是将request请求内容解析,并给方法形参赋值,将数据和视图封装成ModelAndView对象,最后又将ModelAndView中的模型数据通过reques域传输到页面。Jsp视图解析器默认使用jstl。 SpringMVC怎么样设定重定向和转发的? (1)转发:在返回值前面加"forward:",譬如"forward:user.do?name=method4" (2)重定向:在返回值前面加"redirect:",譬如"redirect:http://www.baidu.com" SpringMvc怎么和AJAX相互调用的? 通过Jackson框架就可以把Java里面的对象直接转化成Js可以识别的Json对象。具体步骤如下 : (1)加入Jackson.jar (2)在配置文件中配置json的映射 (3)在接受Ajax方法里面可以直接返回Object,List等,但方法前面要加上@ResponseBody注解。 如何解决POST请求中文乱码问题,GET的又如何处理呢? (1)解决post请求乱码问题: 在web.xml中配置一个CharacterEncodingFilter过滤器,设置成utf-8; <filter> <filter-name>CharacterEncodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>utf-8</param-value> </init-param> </filter> <filter-mapping> <filter-name>CharacterEncodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> (2)get请求中文参数出现乱码解决方法有两个: ①修改tomcat配置文件添加编码与工程编码一致,如下: <ConnectorURIEncoding="utf-8" connectionTimeout="20000" port="8080" protocol="HTTP/1.1" redirectPort="8443"/> ②另外一种方法对参数进行重新编码: String userName = new String(request.getParamter("userName").getBytes("ISO8859-1"),"utf-8") ISO8859-1是tomcat默认编码,需要将tomcat编码后的内容按utf-8编码。 Spring MVC的异常处理 ? 统一异常处理: Spring MVC处理异常有3种方式: (1)使用Spring MVC提供的简单异常处理器SimpleMappingExceptionResolver; (2)实现Spring的异常处理接口HandlerExceptionResolver 自定义自己的异常处理器; (3)使用@ExceptionHandler注解实现异常处理; 统一异常处理的博客:https://blog.csdn.net/ctwy291314/article/details/81983103 SpringMVC的控制器是不是单例模式,如果是,有什么问题,怎么解决? 是单例模式,所以在多线程访问的时候有线程安全问题,不要用同步,会影响性能的,解决方案是在控制器里面不能写成员变量。(此题目类似于上面Spring 中 第5题 有两种解决方案) SpringMVC常用的注解有哪些? @RequestMapping:用于处理请求 url 映射的注解,可用于类或方法上。用于类上,则表示类中的所有响应请求的方法都是以该地址作为父路径。 @RequestBody:注解实现接收http请求的json数据,将json转换为java对象。 @ResponseBody:注解实现将conreoller方法返回对象转化为json对象响应给客户。 SpingMvc中的控制器的注解一般用那个,有没有别的注解可以替代? 一般用@Controller注解,也可以使用@RestController,@RestController注解相当于@ResponseBody + @Controller,表示是表现层,除此之外,一般不用别的注解代替。 如果在拦截请求中,我想拦截get方式提交的方法,怎么配置? 可以在@RequestMapping注解里面加上method=RequestMethod.GET。 怎样在方法里面得到Request,或者Session? 直接在方法的形参中声明request,SpringMVC就自动把request对象传入。 如果想在拦截的方法里面得到从前台传入的参数,怎么得到? 直接在形参里面声明这个参数就可以,但必须名字和传过来的参数一样。 如果前台有很多个参数传入,并且这些参数都是一个对象的,那么怎么样快速得到这个对象? 直接在方法中声明这个对象,SpringMVC就自动会把属性赋值到这个对象里面。 SpringMVC中函数的返回值是什么? 返回值可以有很多类型,有String, ModelAndView。ModelAndView类把视图和数据都合并的一起的。 SpringMVC用什么对象从后台向前台传递数据的? 通过ModelMap对象,可以在这个对象里面调用put方法,把对象加到里面,前台就可以拿到数据。 怎么样把ModelMap里面的数据放入Session里面? 可以在类上面加上@SessionAttributes注解,里面包含的字符串就是要放入session里面的key。 SpringMvc里面拦截器是怎么写的: 有两种写法,一种是实现HandlerInterceptor接口,另外一种是继承适配器类,接着在接口方法当中,实现处理逻辑;然后在SpringMvc的配置文件中配置拦截器即可: <!-- 配置SpringMvc的拦截器 --> <mvc:interceptors> <!-- 配置一个拦截器的Bean就可以了 默认是对所有请求都拦截 --> <bean id="myInterceptor" class="com.zwp.action.MyHandlerInterceptor"></bean> <!-- 只针对部分请求拦截 --> <mvc:interceptor> <mvc:mapping path="/modelMap.do" /> <bean class="com.zwp.action.MyHandlerInterceptorAdapter" /> </mvc:interceptor> </mvc:interceptors> 注解原理: 注解本质是一个继承了Annotation的特殊接口,其具体实现类是Java运行时生成的动态代理类。我们通过反射获取注解时,返回的是Java运行时生成的动态代理对象。通过代理对象调用自定义注解的方法,会最终调用AnnotationInvocationHandler的invoke方法。该方法会从memberValues这个Map中索引出对应的值。而memberValues的来源是Java常量池 三、Mybatis篇 什么是MyBatis? MyBatis是一个可以自定义SQL、存储过程和高级映射的持久层框架。 讲下MyBatis的缓存 MyBatis的缓存分为一级缓存和二级缓存,一级缓存放在session里面,默认就有, 二级缓存放在它的命名空间里,默认是不打开的,使用二级缓存属性类需要实现Serializable序列化接口, 可在它的映射文件中配置<cache/> Mybatis是如何进行分页的?分页插件的原理是什么? 1)Mybatis使用RowBounds对象进行分页,也可以直接编写sql实现分页,也可以使用Mybatis的分页插件。 2)分页插件的原理:实现Mybatis提供的接口,实现自定义插件,在插件的拦截方法内拦截待执行的sql,然后重写sql。 举例:select * from student,拦截sql后重写为:select t.* from (select * from student)t limit 0,10 简述Mybatis的插件运行原理,以及如何编写一个插件? 1)Mybatis仅可以编写针对ParameterHandler、ResultSetHandler、StatementHandler、 Executor这4种接口的插件,Mybatis通过动态代理, 为需要拦截的接口生成代理对象以实现接口方法拦截功能, 每当执行这4种接口对象的方法时,就会进入拦截方法, 具体就是InvocationHandler的invoke方法,当然, 只会拦截那些你指定需要拦截的方法。 2)实现Mybatis的Interceptor接口并复写intercept方法, 然后在给插件编写注解,指定要拦截哪一个接口的哪些方法即可, 记住,别忘了在配置文件中配置你编写的插件。 Mybatis动态sql是做什么的?都有哪些动态sql?能简述一下动态sql的执行原理不? 1)Mybatis动态sql可以让我们在Xml映射文件内, 以标签的形式编写动态sql,完成逻辑判断和动态拼接sql的功能。 2)Mybatis提供了9种动态sql标签:trim|where|set|foreach|if|choose|when|otherwise|bind。 3)其执行原理为,使用OGNL从sql参数对象中计算表达式的值, 根据表达式的值动态拼接sql,以此来完成动态sql的功能。 #{}和${}的区别是什么? 1)#{}是预编译处理,${}是字符串替换。 2)Mybatis在处理#{}时,会将sql中的#{}替换为?号,调用PreparedStatement的set方法来赋值(有效的防止SQL注入); 3)Mybatis在处理${}时,就是把${}替换成变量的值。 为什么说Mybatis是半自动ORM映射工具?它与全自动的区别在哪里? Hibernate属于全自动ORM映射工具, 使用Hibernate查询关联对象或者关联集合对象时, 可以根据对象关系模型直接获取,所以它是全自动的。 而Mybatis在查询关联对象或关联集合对象时, 需要手动编写sql来完成,所以,称之为半自动ORM映射工具。 Mybatis是否支持延迟加载?如果支持,它的实现原理是什么? 1)Mybatis仅支持association关联对象和collection关联集合对象的延迟加载, association指的就是一对一,collection指的就是一对多查询。 在Mybatis配置文件中, 可以配置是否启用延迟加载lazyLoadingEnabled=true|false。 2)它的原理是,使用CGLIB创建目标对象的代理对象, 当调用目标方法时,进入拦截器方法, 比如调用a.getB.getName, 拦截器invoke方法发现a.getB是null值, 那么就会单独发送事先保存好的查询关联B对象的sql, 把B查询上来,然后调用a.setB(b), 于是a的对象b属性就有值了, 接着完成a.getB.getName方法的调用。 这就是延迟加载的基本原理。 MyBatis与Hibernate有哪些不同? 1)Mybatis和hibernate不同,它不完全是一个ORM框架, 因为MyBatis需要程序员自己编写Sql语句, 不过mybatis可以通过XML或注解方式灵活配置要运行的sql语句, 并将java对象和sql语句映射生成最终执行的sql, 最后将sql执行的结果再映射生成java对象。 2)Mybatis学习门槛低,简单易学,程序员直接编写原生态sql, 可严格控制sql执行性能,灵活度高,非常适合对关系数据模型要求不高的软件开发, 例如互联网软件、企业运营类软件等,因为这类软件需求变化频繁, 一但需求变化要求成果输出迅速。但是灵活的前提是mybatis无法做到数据库无关性, 如果需要实现支持多种数据库的软件则需要自定义多套sql映射文件,工作量大。 3)Hibernate对象/关系映射能力强,数据库无关性好, 对于关系模型要求高的软件(例如需求固定的定制化软件) 如果用hibernate开发可以节省很多代码,提高效率。 但是Hibernate的缺点是学习门槛高,要精通门槛更高, 而且怎么设计O/R映射,在性能和对象模型之间如何权衡, 以及怎样用好Hibernate需要具有很强的经验和能力才行。 总之,按照用户的需求在有限的资源环境下只要能做出维护性、 扩展性良好的软件架构都是好架构,所以框架只有适合才是最好。 MyBatis的好处是什么? 1)MyBatis把sql语句从Java源程序中独立出来,放在单独的XML文件中编写, 给程序的维护带来了很大便利。 2)MyBatis封装了底层JDBC API的调用细节,并能自动将结果集转换成Java Bean对象, 大大简化了Java数据库编程的重复工作。 3)因为MyBatis需要程序员自己去编写sql语句, 程序员可以结合数据库自身的特点灵活控制sql语句, 因此能够实现比Hibernate等全自动orm框架更高的查询效率,能够完成复杂查询。 简述Mybatis的Xml映射文件和Mybatis内部数据结构之间的映射关系? Mybatis将所有Xml配置信息都封装到All-In-One重量级对象Configuration内部。 在Xml映射文件中,<parameterMap>标签会被解析为ParameterMap对象, 其每个子元素会被解析为ParameterMapping对象。 <resultMap>标签会被解析为ResultMap对象, 其每个子元素会被解析为ResultMapping对象。 每一个<select>、<insert>、<update>、<delete> 标签均会被解析为MappedStatement对象, 标签内的sql会被解析为BoundSql对象。 什么是MyBatis的接口绑定,有什么好处? 接口映射就是在MyBatis中任意定义接口,然后把接口里面的方法和SQL语句绑定, 我们直接调用接口方法就可以,这样比起原来了SqlSession提供的方法我们可以有更加灵活的选择和设置. 接口绑定有几种实现方式,分别是怎么实现的? 接口绑定有两种实现方式,一种是通过注解绑定,就是在接口的方法上面加 上@Select@Update等注解里面包含Sql语句来绑定, 另外一种就是通过xml里面写SQL来绑定,在这种情况下, 要指定xml映射文件里面的namespace必须为接口的全路径名. 什么情况下用注解绑定,什么情况下用xml绑定? 当Sql语句比较简单时候,用注解绑定;当SQL语句比较复杂时候,用xml绑定,一般用xml绑定的比较多 MyBatis实现一对一有几种方式?具体怎么操作的? 有联合查询和嵌套查询,联合查询是几个表联合查询,只查询一次, 通过在resultMap里面配置association节点配置一对一的类就可以完成; 嵌套查询是先查一个表,根据这个表里面的结果的外键id, 去再另外一个表里面查询数据,也是通过association配置, 但另外一个表的查询通过select属性配置。 Mybatis能执行一对一、一对多的关联查询吗?都有哪些实现方式,以及它们之间的区别? 能,Mybatis不仅可以执行一对一、一对多的关联查询, 还可以执行多对一,多对多的关联查询,多对一查询, 其实就是一对一查询,只需要把selectOne修改为selectList即可; 多对多查询,其实就是一对多查询,只需要把selectOne修改为selectList即可。 关联对象查询,有两种实现方式,一种是单独发送一个sql去查询关联对象, 赋给主对象,然后返回主对象。另一种是使用嵌套查询,嵌套查询的含义为使用join查询, 一部分列是A对象的属性值,另外一部分列是关联对象B的属性值, 好处是只发一个sql查询,就可以把主对象和其关联对象查出来。 MyBatis里面的动态Sql是怎么设定的?用什么语法? MyBatis里面的动态Sql一般是通过if节点来实现,通过OGNL语法来实现, 但是如果要写的完整,必须配合where,trim节点,where节点是判断包含节点有 内容就插入where,否则不插入,trim节点是用来判断如果动态语句是以and 或or 开始,那么会自动把这个and或者or取掉。 Mybatis是如何将sql执行结果封装为目标对象并返回的?都有哪些映射形式? 第一种是使用<resultMap>标签,逐一定义列名和对象属性名之间的映射关系。 第二种是使用sql列的别名功能,将列别名书写为对象属性名, 比如T_NAME AS NAME,对象属性名一般是name,小写, 但是列名不区分大小写,Mybatis会忽略列名大小写,
-
Java 异步编程(5 种异步实现详解)
-
Java 实现异步编程的 8 种方法
-
Java 异步编程
-
Java 异步编程最佳实践
-
Java 异步编程入门
-
了解 Java 异步编程 FutureTask 的文章
-
了解 Java 异步编程
-
Java 异步编程 Future, CompletableFuture