Русский
Русский
English
Статистика
Реклама

Инициализация Rx цепочки

Всем привет, меня зовут Иван, я Android-разработчик. Сегодня хочу поделится своим опытом работы с RxJava2 и рассказать, как происходит инициализация цепочки. Почему я вообще решил поднять эту тему? Пообщавшись со знакомыми разработчиками, я понял, что не каждый кто использует этот инструмент понимает, как он работает. И тогда я решил разобраться как же устроены подписки в RxJava2 и в какой последовательности вся работа инициализируется. Я не нашел ни одной статьи, поясняющей это. В свете этого я полез в исходники, чтобы посмотреть, как же все работает и набросал для себя небольшую шпаргалку, которая выросла в данную статью.

В этой статье я не буду описывать что такое Observable, Observer и все остальные сущности, которые используются в RxJava2. Если вы решили прочитать данную статью, то я предполагаю, что вы уже знакомы с этой информацией. А если вы всё же не знакомы с этими понятия, то я рекомендую перед началом чтения ознакомиться с ними.

Вот с чего можно начать:

Грокаем* RxJava

Исследуем RxJava 2 для Android

Давайте посмотрим, как работает простейшая цепочка:

Observable.just (1, 2, 3, 4, 5).map {}.filter {}.subscribe();

По верхам

Сначала опишу вкратце каждый шаг, через который пройдем в этой цепочке (шаги начинаются сверху вниз):

  • Создается объект в операторе just ObservableFromArray.

  • Создается объект в операторе map ObservableMap, который принимает в конструктор ссылку на ранее созданный объект в операторе just.

  • Создается объект в операторе filter ObservableFilter, который принимает в конструктор ссылку на ранее созданный объект в map, в котором уже хранится ссылка на just.

  • После создания всех Observableов у последнего Observable в цепочки вызывается метод subscribe() (в нашем случае это ObservableFilter созданный в операторе filter) в котором создается новый Observer, который и будет обрабатывать все полученные события.

  • В методе ObservableFilter.subscribe() вызывается следующий метод ObservableFilter.subscribeActual(), в котором создается внутренний Observer, в случае с оператором filter, это FilterObserver. В этот внутренний Observer передается ссылка на первый созданный Observer в ObservableFilter.subscribe().

  • Вызывается ObservableMap.subscribe() в котором так же вызывается ObservableMap.subscribeActual()и создается внутренний Observer, в случае с оператором map, это MapObserver, в который передается ссылка на FilterObserver.

  • Вызывается ObservableFromArray.subscribe() и после ObservableFromArray.subscribeActual(), и уже там вызывается метод onSubscribe()у переданного в ObservableFromArray.subscribeActual() Observerа.

  • onSubscribe() вызывается у каждого нижележащего Observerа в цепочке.

  • ObservableFromArray начинает излучать все события в метод onNext() нижележащего Observerа.

Визуальное представление описанной выше схемы.Визуальное представление описанной выше схемы.

Создание источников данных

Теперь давайте рассмотрим описанные выше шаги подробнее, сначала попадаем в метод just() где происходит проверка каждого значения на null, далее идет вызов метода fromArray(), который возвращает Observable.

public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {   ObjectHelper.requireNonNull(item1, "item1 is null");   ObjectHelper.requireNonNull(item2, "item2 is null");   ObjectHelper.requireNonNull(item3, "item3 is null");   ObjectHelper.requireNonNull(item4, "item4 is null");   ObjectHelper.requireNonNull(item5, "item5 is null");   return fromArray(item1, item2, item3, item4, item5);}

В fromArray() проверяется, что метод принимает в себя не пустой массив и имеет больше одного элемента.

public static <T> Observable<T> fromArray(T... items) {   ObjectHelper.requireNonNull(items, "items is null");   if (items.length == 0) {       return empty();   }   if (items.length == 1) {       return just(items[0]);   }   return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));}

После прохода через все эти шаги создается новый экземпляр класса ObservableFromArray, который на вход принимает массив с данными.

После этого новый объект передается в метод onAssembly(), так как нет необходимости как-то еще модифицировать Observable, то возвращается тот же источник, который поступал на вход.

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {   Function<? super Observable, ? extends Observable> f = onObservableAssembly;   if (f != null) {       return apply(f, source);   }   return source;}

onAssembly() проверяет хотим ли перехватить текущий Observable и как-то модифицировать его, например таким образом:

RxJavaPlugins.setOnObservableAssembly(o -> {if (o instanceof ObservableFromArray) {    return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });}return o;}); Observable.just(1, 2, 3).filter(v -> v > 3).test().assertResult(4, 5, 6);
Только что созданный ObservableFromArrayТолько что созданный ObservableFromArray

Вызывается следующий оператор в цепочке map(). Оператор проходит почти через те же шаги, что были описаны выше. Сначала делается проверка на null, потом создается новый экземпляр класса ObservableMap.

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {   ObjectHelper.requireNonNull(mapper, "mapper is null");   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}

Вот тут появляется небольшое отличие, в конструктор ObservableMap передается не только mapper, который будет преобразовывать одно значение в другое, но также принимает в конструктор this (source). В данном случае this это ссылка на экземпляр класса ObservableFromArray созданный на предыдущем шаге. ObservableMap расширяет абстрактный класс AbstractObservableWithUpstream, в котором и храниться source.

AbstractObservableWithUpstream абстрактный класс, который реализуют Observable операторы и в котором хранится ссылка на вышележащий источник данных.

Далее происходит вызов метода onAssembly() и возвращение созданного Observable.

Обновленная схема с созданным ObservableMapОбновленная схема с созданным ObservableMap

Переходим к следующему оператору в цепочки filter(). В нем не происходит ничего нового, за исключением того, что создается объект ObservableFilter и в его конструктор в this передается ссылка на экземпляр ObservableMap (у которого уже есть ссылка на ObservableFromArray, как показано на схеме выше) созданный на предыдущем шаге.

public final Observable<T> filter(Predicate<? super T> predicate) {   ObjectHelper.requireNonNull(predicate, "predicate is null");   return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));}
Обновленная схема с созданным ObservableFilterОбновленная схема с созданным ObservableFilter

Начало подписки

Последний оператор в цепочке subscribe(), который вызывает перегруженную версию метода. В нашем случае обрабатывается только onNext(). Метод subscribe() вызывается у ObservableFilter, который был последним созданным Observable в цепочке.

public final Disposable subscribe(Consumer<? super T> onNext) {   return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());}

В перегруженном методе сначала проверяются все переданные параметры на null, далее создается объект класса LambdaObserver и происходит подписка.

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,       Action onComplete, Consumer<? super Disposable> onSubscribe) {   ObjectHelper.requireNonNull(onNext, "onNext is null");   ObjectHelper.requireNonNull(onError, "onError is null");   ObjectHelper.requireNonNull(onComplete, "onComplete is null");   ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");   LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);   subscribe(ls);   return ls;}

А вот и сам метод в котором и происходит подписка.

public final void subscribe(Observer<? super T> observer) {   ObjectHelper.requireNonNull(observer, "observer is null");   try {       observer = RxJavaPlugins.onSubscribe(this, observer);       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");       subscribeActual(observer);   } catch (NullPointerException e) {      ...... }}

В методе subscribeActual() производится подписка на источник данных и в него же передается созданный ранее LambdaObserver. subscribeActual() вызывается в классе ObservableFilter. И вот что там происходит.

public void subscribeActual(Observer<? super T> observer) {   source.subscribe(new FilterObserver<T>(observer, predicate));}

Создается новый объект класса FilterObserver, который принимает в конструктор LambdaObserver созданный ранее и предикат для фильтрации, которые хранится в ObservableFilter в виде поля класса.

Класс FilterObserver расширяет класс BasicFuseableObserver, в котором уже реализован метод onSubscribe(). BasicFuseableObserver это абстрактный класс, который реализуют промежуточные Observerы. Если посмотреть исходники, то его реализуют только 6 классов, два из которых это FilterObserver и MapObserver. В методе BasicFuseableObserver.onSubscribe() также вызывается метод onSubscribe() у нижележащего Observerа, который передавался в конструктор этого класса. А выглядит это вот так:

public final void onSubscribe(Disposable d) {   if (DisposableHelper.validate(this.upstream, d)) {       this.upstream = d;       if (d instanceof QueueDisposable) {           this.qd = (QueueDisposable<T>)d;       }       if (beforeDownstream()) {           downstream.onSubscribe(this);           afterDownstream();       }   }}

После того как подписались на ObservableFilter и создали объект FilterObserver, он передается в source.subscribe(). Хочу напомнить, что source это объект класса ObservableMap, переданный ранее в цепочке. У объекта ObservableMap вызывается метод subscribe().

public final void subscribe(Observer<? super T> observer) {   ObjectHelper.requireNonNull(observer, "observer is null");   try {       observer = RxJavaPlugins.onSubscribe(this, observer);       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");       subscribeActual(observer);   } catch (NullPointerException e) {      ...... }}

Далее происходит те же шаги, в методе subscribe() вызывается subscribeActual(), оба этих метода вызываются у ObservableMap. В subscribeActual() создается новый MapObserver с переданным в качестве параметра экземпляром FilterObserver и функцией mapperа.

public void subscribeActual(Observer<? super U> t) {   source.subscribe(new MapObserver<T, U>(t, function));}
public void subscribeActual(Observer<? super T> observer) {   FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);   observer.onSubscribe(d);   if (d.fusionMode) {       return;   }   d.run();}

Все рассмотренные Observerы расширяли абстрактный класс BasicFuseableObserver, в котором уже реализован метод onSubscribe() и так же есть ссылка на нижележащий Observer, у которого так же вызывается метод onSubscribe().

В конце метода subscribeActual() вызывается метод run(), в котором и начинается излучение всех данных в нижележащие Observerы.

void run() {   T[] a = array;   int n = a.length;   for (int i = 0; i < n && !isDisposed(); i++) {       T value = a[i];       if (value == null) {           downstream.onError(new NullPointerException("The element at index " + i + " is null"));           return;       }       downstream.onNext(value);   }   if (!isDisposed()) {       downstream.onComplete();   }}

Соответственно вызываются onNext() для передачи значений в нижележащие Observerы и потом onComplete() при завершении излучения данных, либо может произойти ошибка и вызовется onError(), который завершит всю цепочку.

Визуальное представление процесса создания и подписокВизуальное представление процесса создания и подписок

Вывод

Observableы вкладываются друг в друга и вызывают callbackи для создания Observerов, которые и будут обрабатывать получаемые данные и передавать их дальше по цепочки.

Метод onSubscribe() вызывается до начала отправки данных и это надо иметь ввиду если вы пользуетесь такими оператора как doOnSubscribe().

На каждый оператор создается как минимум 3 объекта:

  • Анонимный класс передаваемый в оператор

  • Observable создаваемый внутри оператора

  • Observer обрабатывающий получаемые данные

Потому при использовании операторов стоит иметь ввиду, что каждый оператор аллоцирует память для несколько объектов и не стоит добавлять операторы в цепочку, только потому что можно.

RxJava мощный инструмент, но нужно понимать как он работает и для каких задач его использовать. Если вам нужно просто выполнение сетевого запроса в фоновом потоке и последующее выполнение результата на главном потоке, то это как стрелять из пушки по воробьям, попасть можно, а вот последствия могут быть серьезными.

Источник: habr.com
К списку статей
Опубликовано: 31.05.2021 14:10:41
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Java

Разработка под android

Rxjava

Rx

Rxjava2

Rxjava 2

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru