今天就跟大家聊聊有关RxJava简单源码的示例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
创新互联公司专注于盘州网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供盘州营销型网站建设,盘州网站制作、盘州网页设计、盘州网站官网定制、小程序制作服务,打造盘州网络公司原创品牌,更为您提供盘州网站排名全网营销落地服务。
demo代码如下
public class ObservableTest { public static void main(String[] args) { Observable
先看第一行代码
Observableobservable = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(ObservableEmitter observer) throws Exception { observer.onNext("处理的数字是:" + Math.random() * 100); observer.onComplete(); } }); //Observable.java //第1560行 public static Observable create(ObservableOnSubscribe source) { ObjectHelper.requireNonNull(source, "source is null"); //RxJavaPlugins里有很多方法可以设置, //有点类似于Spring的ApplicationListener,在对应的生命周期中会被调用 return RxJavaPlugins.onAssembly(new ObservableCreate (source)); } //RxJavaPlugins.java //第1031行 public static Observable onAssembly(@NonNull Observable source) { Function super Observable, ? extends Observable> f = onObservableAssembly; //如果设置了对应的方法,就执行,否则原样返回 if (f != null) { return apply(f, source); } return source; }
可以看到RxJavaPlugins中的方法如果不配置的方法,参数就会原样返回,所以Observable.create最终得到的就是ObservableCreate这个类。
再来看第二行代码
observable.subscribe(new Consumer() { @Override public void accept(Object consumer) throws Exception { System.out.println("我处理的元素是:" + consumer); } }); //Observable.java //第10869行 public final Disposable subscribe(Consumer super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } //Observable.java //第10958行 public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError, Action onComplete, Consumer super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); //这里的onNext就是我们自己写的Consumer类 LambdaObserver ls = new LambdaObserver (onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } //Observable.java //第10974行 public final void subscribe(Observer super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //还记得我们的observable变量是什么类型么?ObservableCreate! subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } //ObservableCreate.java //第35行 protected void subscribeActual(Observer super T> observer) { //这里的observer是LambdaObserver CreateEmitter parent = new CreateEmitter (observer); observer.onSubscribe(parent); //省略部分代码 } //LambdaObserver.java //第47行 public void onSubscribe(Disposable s) { //设置AtomicReference的值(LambdaObserver继承了AtomicReference) //如果之前已经设置过了(AtomicReference的值不为空),则直接返回false if (DisposableHelper.setOnce(this, s)) { try { //在new LambdaObserver()的时候我们设置了onSubscribe = Functions.emptyConsumer() //所以这里什么都不做 onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); onError(ex); } } } //ObservableCreate.java //第35行 protected void subscribeActual(Observer super T> observer) { //省略部分代码 try { //还记得source是啥么,就是你在创建Observable的时候new的ObservableOnSubscribe //于是终于执行到了我们编写的代码中 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } //ObservableOnSubscribe.java //第6行 public static void main(String[] args) { Observable observable = Observable.create(new ObservableOnSubscribe () { //开始执行这个方法 //observer是new CreateEmitter (new LambdaObserver()); @Override public void subscribe(ObservableEmitter observer) throws Exception { observer.onNext("处理的数字是:" + Math.random() * 100); observer.onComplete(); } }); } //ObservableCreate$CreateEmitter //第61行 public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { //这里的observer就是LambdaObserver //t就是《"处理的数字是:" + Math.random() * 100》这段字符串 observer.onNext(t); } } //LambdaObserver.java //第60行 public void onNext(T t) { if (!isDisposed()) { try { onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); get().dispose(); onError(e); } } } //ObservableOnSubscribe.java //第13行 public static void main(String[] args) { //省略部分代码 observable.subscribe(new Consumer () { @Override public void accept(Object consumer) throws Exception { System.out.println("我处理的元素是:" + consumer); } }); } //ObservableOnSubscribe.java //第8行 public static void main(String[] args) { Observable observable = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(ObservableEmitter observer) throws Exception { //省略部分代码 observer.onComplete(); } }); //省略部分代码 } //ObservableCreate.java //第95行 public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { //取消订阅 dispose(); } } } //LambdaObserver.java //第86行 public void onComplete() { if (!isDisposed()) { lazySet(DisposableHelper.DISPOSED); try { //new LambdaObserver的时候设置了为空,所以不执行操作 onComplete.run(); } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); } } }
至此,调用流程分析完成,可以看到虽然在main方法里我们只写了几行代码,但是内部调用的流程还是很繁杂的
看完上述内容,你们对RxJava简单源码的示例分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。