Всем привет, меня зовут Иван, я Android-разработчик. Сегодня хочу поделится своим опытом работы с RxJava2 и рассказать, как происходит инициализация цепочки. Почему я вообще решил поднять эту тему? Пообщавшись со знакомыми разработчиками, я понял, что не каждый кто использует этот инструмент понимает, как он работает. И тогда я решил разобраться как же устроены подписки в RxJava2 и в какой последовательности вся работа инициализируется. Я не нашел ни одной статьи, поясняющей это. В свете этого я полез в исходники, чтобы посмотреть, как же все работает и набросал для себя небольшую шпаргалку, которая выросла в данную статью.
В этой статье я не буду описывать что такое
Observable
, Observer
и все остальные
сущности, которые используются в RxJava2. Если вы решили прочитать
данную статью, то я предполагаю, что вы уже знакомы с этой
информацией. А если вы всё же не знакомы с этими понятия, то я
рекомендую перед началом чтения ознакомиться с ними.
Вот с чего можно начать:
Исследуем RxJava 2 для Android
Давайте посмотрим, как работает простейшая цепочка:
Observable.just (1, 2, 3, 4, 5).map {}.filter {}.subscribe();
По верхам
Сначала опишу вкратце каждый шаг, через который пройдем в этой цепочке (шаги начинаются сверху вниз):
-
Создается объект в операторе just
ObservableFromArray
. -
Создается объект в операторе map
ObservableMap
, который принимает в конструктор ссылку на ранее созданный объект в операторе just. -
Создается объект в операторе filter
ObservableFilter
, который принимает в конструктор ссылку на ранее созданный объект в map, в котором уже хранится ссылка на just. -
После создания всех
Observable
ов у последнегоObservable
в цепочки вызывается методsubscribe()
(в нашем случае этоObservableFilter
созданный в операторе filter) в котором создается новыйObserver
, который и будет обрабатывать все полученные события. -
В методе
ObservableFilter.subscribe()
вызывается следующий методObservableFilter.subscribeActual()
, в котором создается внутреннийObserver
, в случае с оператором filter, этоFilterObserver
. В этот внутреннийObserver
передается ссылка на первый созданныйObserver
вObservableFilter.subscribe()
. -
Вызывается
ObservableMap.subscribe()
в котором так же вызываетсяObservableMap.subscribeActual()
и создается внутреннийObserver,
в случае с оператором map, этоMapObserver
, в который передается ссылка наFilterObserver
. -
Вызывается
ObservableFromArray.subscribe()
и послеObservableFromArray.subscribeActual()
, и уже там вызывается методonSubscribe()
у переданного вObservableFromArray.subscribeActual()
Observer
а. -
onSubscribe()
вызывается у каждого нижележащегоObserver
а в цепочке. -
ObservableFromArray
начинает излучать все события в методonNext()
нижележащегоObserver
а.
Создание источников данных
Теперь давайте рассмотрим описанные выше шаги подробнее, сначала
попадаем в метод just()
где происходит проверка
каждого значения на null, далее идет вызов метода
fromArray(),
который возвращает
Observable
.
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) { ObjectHelper.requireNonNull(item1, "item1 is null"); ObjectHelper.requireNonNull(item2, "item2 is null"); ObjectHelper.requireNonNull(item3, "item3 is null"); ObjectHelper.requireNonNull(item4, "item4 is null"); ObjectHelper.requireNonNull(item5, "item5 is null"); return fromArray(item1, item2, item3, item4, item5);}
В fromArray()
проверяется, что метод принимает в
себя не пустой массив и имеет больше одного элемента.
public static <T> Observable<T> fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));}
После прохода через все эти шаги создается новый экземпляр
класса ObservableFromArray
, который на вход принимает
массив с данными.
После этого новый объект передается в метод
onAssembly()
, так как нет необходимости как-то еще
модифицировать Observable
, то возвращается тот же
источник, который поступал на вход.
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source;}
onAssembly()
проверяет хотим ли перехватить
текущий Observable
и как-то модифицировать
его, например таким образом:
RxJavaPlugins.setOnObservableAssembly(o -> {if (o instanceof ObservableFromArray) { return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });}return o;}); Observable.just(1, 2, 3).filter(v -> v > 3).test().assertResult(4, 5, 6);
Только что созданный ObservableFromArray
Вызывается следующий оператор в цепочке map()
.
Оператор проходит почти через те же шаги, что были описаны выше.
Сначала делается проверка на null, потом создается новый экземпляр
класса ObservableMap
.
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}
Вот тут появляется небольшое отличие, в конструктор
ObservableMap
передается не только mapper, который
будет преобразовывать одно значение в другое, но также принимает в
конструктор this (source). В данном случае
this это ссылка на экземпляр класса
ObservableFromArray
созданный на предыдущем шаге.
ObservableMap
расширяет абстрактный класс
AbstractObservableWithUpstream
, в котором и храниться
source.
AbstractObservableWithUpstream
абстрактный
класс, который реализуют Observable
операторы
и в котором хранится ссылка на вышележащий источник
данных.
Далее происходит вызов метода onAssembly()
и
возвращение созданного Observable
.
Переходим к следующему оператору в цепочки
filter()
. В нем не происходит ничего нового, за
исключением того, что создается объект
ObservableFilter
и в его конструктор в
this передается ссылка на экземпляр
ObservableMap
(у которого уже есть ссылка на
ObservableFromArray
, как показано на схеме выше)
созданный на предыдущем шаге.
public final Observable<T> filter(Predicate<? super T> predicate) { ObjectHelper.requireNonNull(predicate, "predicate is null"); return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));}
Обновленная схема с созданным ObservableFilter
Начало подписки
Последний оператор в цепочке subscribe()
, который
вызывает перегруженную версию метода. В нашем случае обрабатывается
только onNext()
. Метод subscribe()
вызывается у ObservableFilter
, который был последним
созданным Observable
в цепочке.
public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());}
В перегруженном методе сначала проверяются все переданные
параметры на null, далее создается объект класса
LambdaObserver
и происходит подписка.
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls;}
А вот и сам метод в котором и происходит подписка.
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { ...... }}
В методе subscribeActual()
производится подписка на
источник данных и в него же передается созданный ранее
LambdaObserver
. subscribeActual()
вызывается в классе ObservableFilter
. И вот что там
происходит.
public void subscribeActual(Observer<? super T> observer) { source.subscribe(new FilterObserver<T>(observer, predicate));}
Создается новый объект класса FilterObserver
,
который принимает в конструктор LambdaObserver
созданный ранее и предикат для фильтрации, которые хранится в
ObservableFilter
в виде поля класса.
Класс FilterObserver
расширяет
класс BasicFuseableObserver
, в котором уже
реализован метод onSubscribe()
.
BasicFuseableObserver
это абстрактный класс,
который реализуют промежуточные Observer
ы.
Если посмотреть исходники, то его реализуют только 6 классов, два
из которых это FilterObserver
и
MapObserver
. В методе
BasicFuseableObserver.onSubscribe()
также
вызывается метод onSubscribe()
у
нижележащего Observer
а, который передавался в
конструктор этого класса. А выглядит это вот так:
public final void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.upstream, d)) { this.upstream = d; if (d instanceof QueueDisposable) { this.qd = (QueueDisposable<T>)d; } if (beforeDownstream()) { downstream.onSubscribe(this); afterDownstream(); } }}
После того как подписались на ObservableFilter
и
создали объект FilterObserver
, он передается в
source.subscribe()
. Хочу напомнить, что
source это объект класса
ObservableMap
, переданный ранее в цепочке. У объекта
ObservableMap
вызывается метод
subscribe()
.
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { ...... }}
Далее происходит те же шаги, в методе subscribe()
вызывается subscribeActual()
, оба этих метода
вызываются у ObservableMap
. В
subscribeActual()
создается новый
MapObserver
с переданным в качестве параметра
экземпляром FilterObserver
и функцией
mapper
а.
public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function));}
public void subscribeActual(Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return; } d.run();}
Все рассмотренные Observer
ы расширяли
абстрактный класс BasicFuseableObserver
, в котором уже
реализован метод onSubscribe()
и так же есть ссылка на
нижележащий Observer
, у которого так же вызывается
метод onSubscribe()
.
В конце метода subscribeActual()
вызывается метод
run()
, в котором и начинается излучение всех данных в
нижележащие Observer
ы.
void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { downstream.onError(new NullPointerException("The element at index " + i + " is null")); return; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); }}
Соответственно вызываются onNext()
для передачи
значений в нижележащие Observer
ы и потом
onComplete()
при завершении излучения данных, либо
может произойти ошибка и вызовется onError()
, который
завершит всю цепочку.
Вывод
Observable
ы вкладываются друг в друга и
вызывают callbackи для создания
Observer
ов, которые и будут обрабатывать
получаемые данные и передавать их дальше по цепочки.
Метод onSubscribe()
вызывается до начала отправки
данных и это надо иметь ввиду если вы пользуетесь такими оператора
как doOnSubscribe()
.
На каждый оператор создается как минимум 3 объекта:
-
Анонимный класс передаваемый в оператор
-
Observable
создаваемый внутри оператора -
Observer
обрабатывающий получаемые данные
Потому при использовании операторов стоит иметь ввиду, что каждый оператор аллоцирует память для несколько объектов и не стоит добавлять операторы в цепочку, только потому что можно.
RxJava мощный инструмент, но нужно понимать как он работает и для каких задач его использовать. Если вам нужно просто выполнение сетевого запроса в фоновом потоке и последующее выполнение результата на главном потоке, то это как стрелять из пушки по воробьям, попасть можно, а вот последствия могут быть серьезными.