Files
notes/resource/java/RxJava.md
T
2026-03-01 01:43:46 +08:00

4.1 KiB
Raw Blame History

Demo

package org.example.test;  
  
import io.reactivex.rxjava3.core.Observable;  
import io.reactivex.rxjava3.schedulers.Schedulers;  
  
public class RxJavaDemo {  
  
    public static void main(String[] args) {  
        // 创建一个简单的Observable,发出三个事件  
        Observable<String> simpleObservable = Observable.create(emitter -> {  
            // 发送三个事件  
            emitter.onNext("RxJava");  
            emitter.onNext("is");  
            emitter.onNext("Powerful!");  
            emitter.onComplete(); // 发送完成事件  
        });  
  
        // 订阅Observable并定义观察者行为  
        simpleObservable  
                // 将发射项转换为大写  
                .map(String::toUpperCase)  
                // 在观察者的操作中添加延时模拟异步操作  
                .delay(1, java.util.concurrent.TimeUnit.SECONDS)  
                // 运行在io调度器上,用于io密集型操作,比如文件操作,网络调用等。  
                .subscribeOn(Schedulers.io())  
                // 观察结果运行在计算调度器上,用于CPU密集型计算任务  
                .observeOn(Schedulers.computation())  
                .subscribe(  
                        // onNext方法处理  
                        item -> System.out.println("Observer received: " + item + " on thread " + Thread.currentThread().getName()),  
                        // onError方法处理  
                        Throwable::printStackTrace,  
                        // onComplete方法处理  
                        () -> System.out.println("Observer got Completed event")  
                );  
  
        // 由于Observable是异步的,我们需要让主线程等待,否则main方法可能在Observable完成之前结束  
        sleep(2000);  
    }  
  
    private static void sleep(long millis) {  
        try {  
            Thread.sleep(millis);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}
<dependency>  
    <groupId>io.reactivex.rxjava3</groupId>  
    <artifactId>rxjava</artifactId>  
    <version>3.0.0</version>  
</dependency>

好处

RxJava 的好处在于其提供了一种强大且灵活的方式来处理异步和基于事件的程序设计。它是响应式编程的一个 Java 实现,响应式编程是一种面向数据流和变化传播的编程范式。

以下是使用 RxJava 的一些优势:

  1. 函数式编程风格:RxJava 鼓励使用不可变值和函数式操作符,这可以减少错误并促进清晰的编程模式。
  2. 链式调用:RxJava 操作符可以被链式调用,这使得创建复杂的数据流变得直观且可读性强。
  3. 组合异步操作:RxJava 强大的组合操作符支持你轻松组合多个异步操作,并以声明式的方式处理它们的输出。
  4. 错误处理:RxJava 提供了全面的错误处理操作符,可以让你控制错误的传播和处理策略。
  5. 背压支持:RxJava 支持背压,即处理在异步流中生产者生成事件速度快于消费者消费的情况。
  6. 线程控制RxJava 通过 Schedulers 类,允许你明确地控制工作在哪个线程运行。
  7. 松耦合:RxJava 可以帮助创建松耦合的系统,系统的不同部分可以易于独立更改和重构。

与传统的异步处理(例如使用 Java 中的 FutureCallable)相比,RxJava 更加灵活和强大。在传统的异步模式中,处理多个异步操作并且需要它们协同工作时可能会非常复杂。而 RxJava 通过提供丰富的操作符集,使得合并、过滤、转换、延迟异步事件流成为可能。

例如,如果你想合并来自不同数据源的数据,使用传统的异步模式可能需要编写复杂的同步代码。而在 RxJava 中,你只需使用 zipmerge 等操作符即可轻松实现。

总的来说,RxJava 提供了一个强大的抽象层,使得开发者能够以声明式的方式处理异步数据流,而不必担心底层的线程管理和复杂的状态同步问题。