Русский
Русский
English
Статистика
Реклама

Rx

Как безболезненно мигрировать с RxJava на Kotlin CoroutinesFlow

11.01.2021 10:24:00 | Автор: admin
Для выполнения асинхронных операций в Android-приложениях, где нужна загрузка и обработка любых данных, долгое время использовали RxJava и о том, как перейти на RxJava 3, мы уже писали в нашем блоге. Сейчас на смену фреймворку постепенно приходят инструменты Kotlin Coroutines+Flow. Актуальность этой связки подтверждается тем, что Google сделал Kotlin приоритетным языком для Android-разработки.

Корутины позволяют тратить меньше системных ресурсов, чем RxJava. Кроме того, поскольку они являются частью Kotlin, Android предоставляет удобные инструменты для работы с ними например, viewModelScope и lifecycleScope. В этой статье мы рассмотрим use cases, распространенные в Rx Java, и то, какие возможности вы получите при переходе на Flow.



Переключение потоков и создание


Для начала сравним, как происходит переключение потоков в RxJava и Flow.

RxJava


Observable.create<Int> { emitter ->emitter.onNext(1)emitter.onNext(2)emitter.onNext(3)emitter.onComplete()}.observeOn(Schedulers.io()).map {printThread(map1 value = $it)it + it}.doOnNext { printThread(after map1 -> $it) }.observeOn(Schedulers.computation()).map {printThread(map2 value = $it)it * it}.doOnNext { printThread(after map2 -> $it) }.observeOn(Schedulers.single()).subscribe ({printThread(On Next $it)},{printThread(On Error)},{printThread(On Complete)})


При этом сложение выполняется в IO шедулере, умножение в computation шедулере, а подписка в single.

Flow


Повторим этот же пример для Flow:

launch {flow {emit(1)emit(2)emit(3)}.map {printThread(map1 value = $it)it + it}.onEach { printThread(after map1 -> $it) }.flowOn(Dispatchers.IO).map {printThread(map2 value = $it)it * it}.onEach { printThread(after map2 -> $it) }.flowOn(Dispatchers.Default).onCompletion { printThread(onCompletion) }.collect { printThread(received value $it) }}


В результате можно отметить следующее:

1) observeOn переключает поток, в котором будут выполняться последующие операторы, а flowOn определяет диспетчер выполнения для предыдущих операторов.

2) Метод collect() будет выполняться в том же диспетчере, что и launch, а emit данных будет происходить в Dispatchers.IO. Метод subscribe() будет выполняться в Schedulers.single(), потому что идет после него.

3) Flow также имеет стандартные методы создания flow:

  • flowOf(): в примере можно было бы использовать Observable.fromArray(1, 2, 3) и flowOf(1, 2, 3)
  • extenstion function asFlow(), который превращает Iterable, Sequence, массивы во flow
  • билдер flow { }

4) В данном примере Flow, как и RxJava, представляет собой cold stream данных: до вызова методов collect() и subscribe() никакой обработки происходить не будет.

5) В RxJava нужно явно вызывать emitter.onComplete(). В Flow метод onCompletion() будет автоматически вызываться после окончания блока flow { }.

6) При попытке сделать эмит данных из другого диспетчера, с помощью withContext, например, приведет к ошибке.

Exception in thread main java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@5df83c81, BlockingEventLoop@3383bcd],
but emission happened in [DispatchedCoroutine{Active}@7fbc37eb, Dispatchers.IO].
Please refer to 'flow' documentation or use 'flowOn' instead

Подписка и отписка на источник данных


В RxJava метод Observable.subscribe() возвращает объект Disposable. Он служит для отписки от источника данных, когда новые порции данных от текущего источника уже не нужны. Важно иметь доступ к этому объекту, чтобы вовремя отписываться и избегать утечек.

Для Flow ситуация схожа: так как метод collect() suspend метод, он может быть запущен только внутри корутины. Следовательно, отписка от flow происходит в момент отмены Job корутины:

val job = scope.launch {flow.collect { }}job.cancel() // тут произойдет отписка от flow

В случае же использования viewModelScope об этом заботиться не нужно: все корутины, запущенные в рамках этого scope, будут отменены, когда ViewModel будет очищена, т.е. вызовется метод ViewModel.onCleared(). Для lifecycleScope ситуация аналогична: запущенные в его рамках корутины будут отменены, когда соответствующий Lifecycle будет уничтожен.

Обработка ошибок


В RxJava есть метод onError(), который будет вызван в случае возникновения какой-либо ошибки и на вход получит данные о ней. В Flow тоже есть такой метод, он называется catch(). Рассмотрим следующий пример.

RxJava


Observable.fromArray(1, 2, 3).map {val divider = Random.Default.nextInt(0, 1)it / divider}.subscribe({ value ->println(value)},{ e ->println(e)})


При возникновении ArithmeticException будет срабатывать onError(), и информация об ошибке будет напечатана в консоль.

Flow


flowOf(1, 2, 3).map {val divider = Random.Default.nextInt(0, 1)it / divider}.catch { e -> println(e) }.collect { println(it) }


Этот же пример, переписанный на flow, можно представить с помощью catch { }, который под капотом имеет вид привычной конструкции try/catch.

Операторы RxJava onErrorResumeNext и onErrorReturn можно представить в виде:

catch { emit(defaultValue) } // onErrorReturn

catch { emitAll(fallbackFlow) } // onErrorResumeNext

В Flow, как и в RxJava, есть операторы retry и retryWhen, позволяющие повторить операции в случае возникновения ошибки.

Операторы


Рассмотрим наиболее распространенные операторы RxJava и найдем их аналоги из Flow.



Подробнее с операторами Flow можно познакомиться здесь.

Некоторые операторы Flow (например, merge) помечены как экспериментальные или отсутствующие. Их api может измениться (как, например, для flatMapMerge), или их могут задепрекейтить, то есть они станут недоступны. Это важно помнить при работе с Flow. При этом отсутствие некоторых операторов компенсируется тем, что flow всегда можно собрать в список и работать уже с ним. В стандартной библиотеке Kotlin есть множество функций для работы со списками.

Также у Flow отсутствуют отдельные операторы троттлинга и другие операторы, которые работают с временными промежутками. Это можно объяснить молодостью библиотеки, а также тем, что, согласно словам разработчика Kotlin Романа Елизарова, команда Jetbrains не планирует раздувать библиотеку множеством операторов, оставляя разработчикам возможность компоновать нужные операторы самостоятельно, предоставляя им удобные блоки для сборки.

Backpressure


Backpressure это ситуация, когда производитель данных выдает элементы подписчику быстрее, чем тот их может обработать. Готовые данные, в ожидании того, как подписчик сможет их обработать, складываются в буфер Observable. Проблема такого подхода в том, что буфер может переполниться, вызвав OutOfMemoryError.

В ситуациях, когда возможен backpressure, для Observable нужно применять различные механизмы для предотвращения ошибки MissingBackpressureException.

После появления в RxJava 2 Flowable произошло разделение на источники данных с поддержкой backpressure (Flowable) и Observable, которые теперь не поддерживают backpressure. При работе с RxJava требуется правильно выбрать тип источника данных для корректной работы с ним.

У Flow backpressure заложена в Kotlin suspending functions. Если сборщик flow не может принимать новые данные в настоящий момент, он приостанавливает источник. Возобновление происходит позднее, когда сборщик flow снова сможет получать данные. Таким образом, в Kotlin нет необходимости выбирать тип источника данных, в отличие от RxJava.

Hot streams


Горячий источник рассылает новые порции данных по мере их появления, вне зависимости от того, есть ли активные подписчики. Новые подписчики получат не всю сгенерированную последовательность данных с самого начала, а только те данные, что были сгенерированы после подписки. В этом отличие горячих и холодных источников: холодные не начинают генерацию данных, пока нет хотя бы одного подписчика, а новые подписчики получают всю последовательность.

Горячие источники данных полезны, например, при подписке на события от View: при этом нужно получать только новые события, нет смысла обрабатывать заново все пользовательские действия. Также мы не можем запретить пользователю нажимать на экран до тех пор, пока мы не будем готовы обрабатывать его действия. Для обработки событий от View в реактивном виде существует библиотека RxBinding, которая имеет поддержку RxJava3.

В Kotlin Flow есть свои возможности для работы с горячим flow, который производит данные вне зависимости от наличия подписчиков и выдает новые данные одновременно всем имеющимся подписчикам. Для этого можно использовать Channel, SharedFlow, чтобы отправлять новые порции данных одновременно всем подписанным сборщикам.

Кстати, для Flow тоже есть отличная библиотека для обработки событий от View Corbind. В ней есть поддержка большинства Android-виджетов.

RxJava Subjects


Subject в RxJava это специальный элемент, который одновременно является источником данных и подписчиком. Он может подписаться на один или несколько источников данных, получать от них порции данных и отдавать их своим подписчикам.

Аналог Subject в Flow это Channel, в частности, BroadcastChannel. Существуют различные варианты их реализации: с буферизацией данных (ArrayBroadcastChannel), с хранением только последнего элемента (ConflatedBroadcastChannel). Но важно помнить, что, так как библиотека Kotlin Flow молода и постоянно развивается, ее части могут меняться. Так получилось и в случае с BroadcastChannel: в своей статье Роман Елизаров сообщил, что, начиная с версии 1.4 будет предложено лучшее решение shared flows, а BroadcastChannel ждет deprecation в ближайшем будущем.

Заключение


В данной статье мы сравнили RxJava и Kotlin Flow, рассмотрели их схожие моменты и аналоги частей RxJava в Flow. При этом Flow хорошо подойдет в качестве инструмента для обработки событий в реактивном стиле в проектах на Kotlin, использующих паттерн MVVM: благодаря viewModelScope и lifecycleScope запускать корутины можно быстро и удобно, не боясь утечек. В связи с тем, что популярность Kotlin и его инструментов растет, а также этот язык является приоритетным для разработки Android-приложений, в ближайшие годы связка Coroutines+Flow может заменить RxJava скорее всего, новые проекты будут написаны именно с помощью нее. На первый взгляд, миграция с RxJava на Flow не представляется болезненной, потому что в обоих случаях есть похожие операторы и разделение общей концепции Reactive streams. Кроме того, Kotlin имеет достаточно большое комьюнити, которое постоянно развивается и помогает разработчикам в изучении новых возможностей.

А вы готовы мигрировать на корутины? Приглашаем поделиться мнениями!
Подробнее..

Инициализация Rx цепочки

31.05.2021 14:10:41 | Автор: admin

Всем привет, меня зовут Иван, я Android-разработчик. Сегодня хочу поделится своим опытом работы с RxJava2 и рассказать, как происходит инициализация цепочки. Почему я вообще решил поднять эту тему? Пообщавшись со знакомыми разработчиками, я понял, что не каждый кто использует этот инструмент понимает, как он работает. И тогда я решил разобраться как же устроены подписки в RxJava2 и в какой последовательности вся работа инициализируется. Я не нашел ни одной статьи, поясняющей это. В свете этого я полез в исходники, чтобы посмотреть, как же все работает и набросал для себя небольшую шпаргалку, которая выросла в данную статью.

В этой статье я не буду описывать что такое Observable, Observer и все остальные сущности, которые используются в RxJava2. Если вы решили прочитать данную статью, то я предполагаю, что вы уже знакомы с этой информацией. А если вы всё же не знакомы с этими понятия, то я рекомендую перед началом чтения ознакомиться с ними.

Вот с чего можно начать:

Грокаем* RxJava

Исследуем RxJava 2 для Android

Давайте посмотрим, как работает простейшая цепочка:

Observable.just (1, 2, 3, 4, 5).map {}.filter {}.subscribe();

По верхам

Сначала опишу вкратце каждый шаг, через который пройдем в этой цепочке (шаги начинаются сверху вниз):

  • Создается объект в операторе just ObservableFromArray.

  • Создается объект в операторе map ObservableMap, который принимает в конструктор ссылку на ранее созданный объект в операторе just.

  • Создается объект в операторе filter ObservableFilter, который принимает в конструктор ссылку на ранее созданный объект в map, в котором уже хранится ссылка на just.

  • После создания всех Observableов у последнего Observable в цепочки вызывается метод subscribe() (в нашем случае это ObservableFilter созданный в операторе filter) в котором создается новый Observer, который и будет обрабатывать все полученные события.

  • В методе ObservableFilter.subscribe() вызывается следующий метод ObservableFilter.subscribeActual(), в котором создается внутренний Observer, в случае с оператором filter, это FilterObserver. В этот внутренний Observer передается ссылка на первый созданный Observer в ObservableFilter.subscribe().

  • Вызывается ObservableMap.subscribe() в котором так же вызывается ObservableMap.subscribeActual()и создается внутренний Observer, в случае с оператором map, это MapObserver, в который передается ссылка на FilterObserver.

  • Вызывается ObservableFromArray.subscribe() и после ObservableFromArray.subscribeActual(), и уже там вызывается метод onSubscribe()у переданного в ObservableFromArray.subscribeActual() Observerа.

  • onSubscribe() вызывается у каждого нижележащего Observerа в цепочке.

  • ObservableFromArray начинает излучать все события в метод onNext() нижележащего Observerа.

Визуальное представление описанной выше схемы.Визуальное представление описанной выше схемы.

Создание источников данных

Теперь давайте рассмотрим описанные выше шаги подробнее, сначала попадаем в метод just() где происходит проверка каждого значения на null, далее идет вызов метода fromArray(), который возвращает Observable.

public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {   ObjectHelper.requireNonNull(item1, "item1 is null");   ObjectHelper.requireNonNull(item2, "item2 is null");   ObjectHelper.requireNonNull(item3, "item3 is null");   ObjectHelper.requireNonNull(item4, "item4 is null");   ObjectHelper.requireNonNull(item5, "item5 is null");   return fromArray(item1, item2, item3, item4, item5);}

В fromArray() проверяется, что метод принимает в себя не пустой массив и имеет больше одного элемента.

public static <T> Observable<T> fromArray(T... items) {   ObjectHelper.requireNonNull(items, "items is null");   if (items.length == 0) {       return empty();   }   if (items.length == 1) {       return just(items[0]);   }   return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));}

После прохода через все эти шаги создается новый экземпляр класса ObservableFromArray, который на вход принимает массив с данными.

После этого новый объект передается в метод onAssembly(), так как нет необходимости как-то еще модифицировать Observable, то возвращается тот же источник, который поступал на вход.

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {   Function<? super Observable, ? extends Observable> f = onObservableAssembly;   if (f != null) {       return apply(f, source);   }   return source;}

onAssembly() проверяет хотим ли перехватить текущий Observable и как-то модифицировать его, например таким образом:

RxJavaPlugins.setOnObservableAssembly(o -> {if (o instanceof ObservableFromArray) {    return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });}return o;}); Observable.just(1, 2, 3).filter(v -> v > 3).test().assertResult(4, 5, 6);
Только что созданный ObservableFromArrayТолько что созданный ObservableFromArray

Вызывается следующий оператор в цепочке map(). Оператор проходит почти через те же шаги, что были описаны выше. Сначала делается проверка на null, потом создается новый экземпляр класса ObservableMap.

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {   ObjectHelper.requireNonNull(mapper, "mapper is null");   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}

Вот тут появляется небольшое отличие, в конструктор ObservableMap передается не только mapper, который будет преобразовывать одно значение в другое, но также принимает в конструктор this (source). В данном случае this это ссылка на экземпляр класса ObservableFromArray созданный на предыдущем шаге. ObservableMap расширяет абстрактный класс AbstractObservableWithUpstream, в котором и храниться source.

AbstractObservableWithUpstream абстрактный класс, который реализуют Observable операторы и в котором хранится ссылка на вышележащий источник данных.

Далее происходит вызов метода onAssembly() и возвращение созданного Observable.

Обновленная схема с созданным ObservableMapОбновленная схема с созданным ObservableMap

Переходим к следующему оператору в цепочки filter(). В нем не происходит ничего нового, за исключением того, что создается объект ObservableFilter и в его конструктор в this передается ссылка на экземпляр ObservableMap (у которого уже есть ссылка на ObservableFromArray, как показано на схеме выше) созданный на предыдущем шаге.

public final Observable<T> filter(Predicate<? super T> predicate) {   ObjectHelper.requireNonNull(predicate, "predicate is null");   return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));}
Обновленная схема с созданным ObservableFilterОбновленная схема с созданным ObservableFilter

Начало подписки

Последний оператор в цепочке subscribe(), который вызывает перегруженную версию метода. В нашем случае обрабатывается только onNext(). Метод subscribe() вызывается у ObservableFilter, который был последним созданным Observable в цепочке.

public final Disposable subscribe(Consumer<? super T> onNext) {   return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());}

В перегруженном методе сначала проверяются все переданные параметры на null, далее создается объект класса LambdaObserver и происходит подписка.

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,       Action onComplete, Consumer<? super Disposable> onSubscribe) {   ObjectHelper.requireNonNull(onNext, "onNext is null");   ObjectHelper.requireNonNull(onError, "onError is null");   ObjectHelper.requireNonNull(onComplete, "onComplete is null");   ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");   LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);   subscribe(ls);   return ls;}

А вот и сам метод в котором и происходит подписка.

public final void subscribe(Observer<? super T> observer) {   ObjectHelper.requireNonNull(observer, "observer is null");   try {       observer = RxJavaPlugins.onSubscribe(this, observer);       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");       subscribeActual(observer);   } catch (NullPointerException e) {      ...... }}

В методе subscribeActual() производится подписка на источник данных и в него же передается созданный ранее LambdaObserver. subscribeActual() вызывается в классе ObservableFilter. И вот что там происходит.

public void subscribeActual(Observer<? super T> observer) {   source.subscribe(new FilterObserver<T>(observer, predicate));}

Создается новый объект класса FilterObserver, который принимает в конструктор LambdaObserver созданный ранее и предикат для фильтрации, которые хранится в ObservableFilter в виде поля класса.

Класс FilterObserver расширяет класс BasicFuseableObserver, в котором уже реализован метод onSubscribe(). BasicFuseableObserver это абстрактный класс, который реализуют промежуточные Observerы. Если посмотреть исходники, то его реализуют только 6 классов, два из которых это FilterObserver и MapObserver. В методе BasicFuseableObserver.onSubscribe() также вызывается метод onSubscribe() у нижележащего Observerа, который передавался в конструктор этого класса. А выглядит это вот так:

public final void onSubscribe(Disposable d) {   if (DisposableHelper.validate(this.upstream, d)) {       this.upstream = d;       if (d instanceof QueueDisposable) {           this.qd = (QueueDisposable<T>)d;       }       if (beforeDownstream()) {           downstream.onSubscribe(this);           afterDownstream();       }   }}

После того как подписались на ObservableFilter и создали объект FilterObserver, он передается в source.subscribe(). Хочу напомнить, что source это объект класса ObservableMap, переданный ранее в цепочке. У объекта ObservableMap вызывается метод subscribe().

public final void subscribe(Observer<? super T> observer) {   ObjectHelper.requireNonNull(observer, "observer is null");   try {       observer = RxJavaPlugins.onSubscribe(this, observer);       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");       subscribeActual(observer);   } catch (NullPointerException e) {      ...... }}

Далее происходит те же шаги, в методе subscribe() вызывается subscribeActual(), оба этих метода вызываются у ObservableMap. В subscribeActual() создается новый MapObserver с переданным в качестве параметра экземпляром FilterObserver и функцией mapperа.

public void subscribeActual(Observer<? super U> t) {   source.subscribe(new MapObserver<T, U>(t, function));}
public void subscribeActual(Observer<? super T> observer) {   FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);   observer.onSubscribe(d);   if (d.fusionMode) {       return;   }   d.run();}

Все рассмотренные Observerы расширяли абстрактный класс BasicFuseableObserver, в котором уже реализован метод onSubscribe() и так же есть ссылка на нижележащий Observer, у которого так же вызывается метод onSubscribe().

В конце метода subscribeActual() вызывается метод run(), в котором и начинается излучение всех данных в нижележащие Observerы.

void run() {   T[] a = array;   int n = a.length;   for (int i = 0; i < n && !isDisposed(); i++) {       T value = a[i];       if (value == null) {           downstream.onError(new NullPointerException("The element at index " + i + " is null"));           return;       }       downstream.onNext(value);   }   if (!isDisposed()) {       downstream.onComplete();   }}

Соответственно вызываются onNext() для передачи значений в нижележащие Observerы и потом onComplete() при завершении излучения данных, либо может произойти ошибка и вызовется onError(), который завершит всю цепочку.

Визуальное представление процесса создания и подписокВизуальное представление процесса создания и подписок

Вывод

Observableы вкладываются друг в друга и вызывают callbackи для создания Observerов, которые и будут обрабатывать получаемые данные и передавать их дальше по цепочки.

Метод onSubscribe() вызывается до начала отправки данных и это надо иметь ввиду если вы пользуетесь такими оператора как doOnSubscribe().

На каждый оператор создается как минимум 3 объекта:

  • Анонимный класс передаваемый в оператор

  • Observable создаваемый внутри оператора

  • Observer обрабатывающий получаемые данные

Потому при использовании операторов стоит иметь ввиду, что каждый оператор аллоцирует память для несколько объектов и не стоит добавлять операторы в цепочку, только потому что можно.

RxJava мощный инструмент, но нужно понимать как он работает и для каких задач его использовать. Если вам нужно просто выполнение сетевого запроса в фоновом потоке и последующее выполнение результата на главном потоке, то это как стрелять из пушки по воробьям, попасть можно, а вот последствия могут быть серьезными.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru