Русский
Русский
English
Статистика
Реклама

Как подружить RxJava с VIPER в Android, подходы применения и о структуре планировщиков

image

Привет, Хабровчане. Сегодня мы с вами поговорим о RxJava. Я знаю, что о ней написано материала вагон и маленькая тележка, но, как мне кажется, у меня есть пара интересных моментов, которыми стоит поделиться. Сначала расскажу, как мы используем RxJava вместе с архитектурой VIPER для Android приложений, заодно посмотрим на классический способ применения. После этого пробежимся по главным особенностям RxJava и остановимся подробнее на том, как устроены планировщики. Если вы уже запаслись вкусняшками, то добро пожаловать под кат.

Архитектура, которая подойдет всем


RxJava это реализация концепции ReactiveX, а создала эту реализацию компания Netflix. В их блоге есть цикл статей о том, зачем они это сделали и какие проблемы они решили. Ссылки (1, 2) вы найдете в конце статьи. Netflix использовали RxJava на стороне сервера (backend), чтобы распараллелить обработку одного большого запроса. Хотя они предложили способ применения RxJava на backend, такая архитектура подойдет для написания разных типов приложений (мобильные, desktop, backend и многих других). Разработчики Netflix использовали RxJava в сервисном слое таким образом, чтобы каждый метод сервисного слоя возвращал Observable.

Дело в том, что элементы в Observable могу доставляться синхронно и асинхронно. Это дает возможность методу самому решать, возвращать значение сразу синхронно, (например, если оно доступно в кэш) или сначала получить эти значения (например, из базы данных или удаленного сервиса) и вернуть их асинхронно. В любом случае, управление будет возвращаться сразу после вызова метода (либо с данными, либо без).

/** * Метод, который сразу возвращает значение, если оно * доступно, или использует  другой поток исполнения, * чтобы получить значение и передать его через callback `onNext()` */public Observable<T> getProduct(String name) {    if (productInCache(name)) {        // Если данные доступны, возвращаем их сразу        return Observable.create(observer -> {           observer.onNext(getProductFromCache(name));           observer.onComplete();        });    } else {        // Иначе задействуем другой поток исполнения        return Observable.<T>create(observer -> {            try {                // Выполняем работу в отдельном потоке                T product = getProductFromRemoteService(name);                // вовращаем значение                observer.onNext(product);                observer.onComplete();            } catch (Exception e) {                observer.onError(e);            }        })        // Говорим Observable использовать планировщик IO        // для создания/получения данных        .subscribeOn(Schedulers.io());    }}

При таком подходе мы получаем один неизменный API для клиента (в нашем случае контроллер) и разные реализации. Клиент всегда взаимодействует с Observable одинаково. Ему абсолютно не важно, получены ли значения синхронно или нет. При этом реализации API могут меняться от синхронных к асинхронным, никак не затрагивая взаимодействие с клиентом. С помощью такого подхода можно совершенно не задумываться о том, как организовать многопоточность, и сосредоточиться на реализации бизнес задач.

Подход применим не только в сервисном слое на backend, но и в архитектурах MVC, MVP, MVVM и др. Например, для MVP мы можем сделать класс Interactor, который будет ответственным за получение и сохранение данных в различные источники, и сделать так, чтобы все его методы возвращали Observable. Они будут являться контрактом взаимодействия с Model. Это также даст возможность использовать в Presenter всю мощь операторов, имеющихся в RxJava.

image

Мы можем пойти дальше и сделать реактивным API уже у Presenter, но для этого нам понадобится правильно реализовать механизм отписки, позволяющий всем View одновременно отписаться от Presenter.

Дальше посмотрим на пример применения такого подхода для архитектуры VIPER, которая является усовершенствованным MVP. Также стоит помнить, что нельзя делать Observable singleton объектами, потому что подписки к таким Observable будут порождать утечки памяти.

Опыт применения в Android и VIPER


В большинстве текущих и новых Android проектов мы используем архитектуру VIPER. Я познакомился с ней, когда присоединился к одному из проектов, в котором она уже использовалась. Помню, как удивился, когда у меня спросили, не смотрел ли я в сторону iOS. iOS в Android проекте?, подумал я. А между тем, VIPER пришел к нам из мира iOS и по сути является более структурированной и модульной версией MVP. О VIPER очень хорошо написано в этой статье (3).

Поначалу все казалось хорошо: правильно разделенные, не перегруженные слои, у каждого слоя своя область ответственности, понятная логика. Но через какое-то время стал проявляться один недостаток, а по мере роста и изменения проекта он стал очень даже мешать.

Дело в том, что мы использовали Interactor так же, как и коллеги в своей статье. Interactor реализует небольшой use case, например, скачать продукты из сети или взять продукт из БД по id, и выполняет действия в рабочем потоке. Внутри себя Interactor совершает операции, используя Observable. Чтобы запустить Interactor и получить результат, пользователь реализует интерфейс ObserverEntity вместе с его методами onNext, onError и onComplete и передает его вместе с параметрами в метод execute(params, ObserverEntity).

Вы, наверное, уже заметили проблему структура интерфейса. На практике нам редко нужны все три метода, часто используются один или два из них. Из-за этого в коде могут встречаться пустые методы. Конечно, мы можем пометить все методы интерфейса default, но такие методы скорее нужны для добавления новой функциональности в интерфейсы. К тому же, странно иметь интерфейс, все методы которого опциональны. Мы также можем, например, создать абстрактный класс, который наследует интерфейс, и переопределять нужные нам методы. Или, наконец, создать перегруженные версии метода execute(params, ObserverEntity), которые принимают от одного до трех функциональных интерфейсов. Эта проблема плохо сказывается на читаемости кода, но, к счастью, довольно просто решается. Однако, она не единственная.

saveProductInteractor.execute(product, new ObserverEntity<Void>() {    @Override    public void onNext(Void aVoid) {        // Сейчас этот метод нам не нужен,        // но мы обязана его реализовать    }    @Override    public void onError(Throwable throwable) {        // Сейчас этот метод используется        // Какой-то код    }    @Override    public void onComplete() {        // И этот метод тоже используется        // Какой-то код    }});

Кроме пустых методов, есть и более неприятная проблема. Мы используем Interactor, чтобы выполнить какое-то действие, но почти всегда это действие не является единственным. Например, мы можем взять продукт из базы данных, затем получить о нем отзывы и картинку, затем сохранить все это в другое место и в конце перейти на другой экран. Тут у нас каждое действие зависит от предыдущего и при использовании Interactor-ов мы получаем огромную цепочку из колбэков, прослеживать которую бывает очень утомительно.

private void checkProduct(int id, Locale locale) {    getProductByIdInteractor.execute(new TypesUtil.Pair<>(id, locale), new ObserverEntity<Product>() {        @Override        public void onNext(Product product) {            getProductInfo(product);        }        @Override        public void onError(Throwable throwable) {            // Какой-то код        }        @Override        public void onComplete() {        }    });}private void getProductInfo(Product product) {    getReviewsByProductIdInteractor.execute(product.getId(), new ObserverEntity<List<Review>>() {        @Override        public void onNext(List<Review> reviews) {            product.setReviews(reviews);            saveProduct(productInfo);        }        @Override        public void onError(Throwable throwable) {            // Какой-то код        }        @Override        public void onComplete() {            // Какой-то код        }    });    getImageForProductInteractor.execute(product.getId(), new ObserverEntity<Image>() {        @Override        public void onNext(Image image) {            product.setImage(image);            saveProduct(product);        }        @Override        public void onError(Throwable throwable) {            // Какой-то код        }        @Override        public void onComplete() {        }    });}private void saveProduct(Product product) {    saveProductInteractor.execute(product, new ObserverEntity<Void>() {        @Override        public void onNext(Void aVoid) {        }        @Override        public void onError(Throwable throwable) {            // Какой-то код        }        @Override        public void onComplete() {            goToSomeScreen();        }    });}

Ну, как вам такая макаронина? При этом у нас простая бизнес логика и одинарная вложенность, а представьте, что было бы с более сложным кодом. Это также затрудняет повторное использование метода и применение разных планировщиков для Interactor.

Решение на удивление простое. Вы чувствуете, что этот подход пытается повторить поведение Observable, но делает это неправильно и сам создает непонятные ограничения? Как я уже рассказывал раньше, этот код достался нам из уже существующего проекта. При исправлении этого legacy-кода будем использовать подход, который завещали нам ребята из Netflix. Вместо того, чтобы каждый раз реализовывать ObserverEntity, заставим Interactor просто возвращать Observable.

private Observable<Product> getProductById(int id, Locale locale) {    return getProductByIdInteractor.execute(new TypesUtil.Pair<>(id, locale));}private Observable<Product> getProductInfo(Product product) {    return getReviewsByProductIdInteractor.execute(product.getId())    .map(reviews -> {        product.set(reviews);        return product;    })    .flatMap(product -> {        getImageForProductInteractor.execute(product.getId())        .map(image -> {            product.set(image);            return product;        })    });}private Observable<Product> saveProduct(Product product) {    return saveProductInteractor.execute(product);}private doAll(int id, Locale locale) {    // Берем продукт из хранилища    getProductById (id, locale)    // Добавляем информацию    .flatMap(product -> getProductInfo(product))    // Сохраняем все в другое хранилище    .flatMap(product -> saveProduct(product))    // После сохранения продукты в потоке больше не нужны    .ignoreElements()    // Устанавливаем планировщики    .subscribeOn(Schedulers.io())    .observeOn(AndroidSchedulers.mainThread())    // Переходим на другой экран    .subscribe(() -> goToSomeScreen(), throwable -> handleError());}

Вуаля! Так мы не только избавились от того громоздкого и неповоротливого ужаса, но и привнесли мощь RxJava в Presenter.

Концепции в основе


Я довольно часто встречал, как с помощью функционального реактивного программирования (далее ФРП) пытались объяснить концепцию RxJava. На самом деле, оно никак не связано с этой библиотекой. ФРП больше о непрерывных динамически изменяемых значениях (поведениях), непрерывном времени и денотационной семантике. В конце статьи вы сможете найти пару интересных ссылок (4, 5, 6, 7).

В качестве основных концепций RxJava использует реактивное программирование и функциональное программирование. Реактивное программирование можно описать как последовательную передачу информации от наблюдаемого объекта объекту-наблюдателю таким образом, что объект-наблюдатель получает ее автоматически (асинхронно) по мере возникновения этой информации.

Функциональное программирование использует понятие чистых функций, то есть тех, которые не используют и не изменяют внешнее состояние; они полностью зависят от своих входных данных для получения своих выходных данных. Отсутствие у чистых функций побочных эффектов дает возможность использовать результаты одной функции в качестве входных параметров другой. Это дает возможность составлять неограниченную цепочку функций.

Соединение этих двух концепций вместе наряду с использованием GoF шаблонов Observer и Iterator позволяет создавать асинхронные потоки данных и обрабатывать их с помощью огромного арсенала очень удобных функций. Это также дает возможность очень просто, а главное безопасно, использовать многопоточность, не думая о таких ее проблемах как синхронизация, неконсистентность памяти, наложение потоков и т.д.

image

Три кита RxJava


Основные три компонента, на которых строится RxJava Observable, операторы и планировщики.
Observable в RxJava отвечает за реализацию реактивной парадигмы. Observable часто называют потоками, так как они реализуют как концепцию потоков данных, так и распространение изменений. Observable это тип, который достигает реализации реактивной парадигмы за счет объединения в себе двух шаблонов из книги Gang of Four: Observer и Iterator. Observable добавляет в Observer две отсутствующие семантики, которые есть в Iterable:

  • Возможность для производителя сигнализировать потребителю о том, что больше нет доступных данных (цикл foreach в Iterable завершается и просто возвращается; Observable в этом случае вызывает метод onCompleate).
  • Возможность для производителя сообщать потребителю, что произошла ошибка и Observable больше не может испускать элементы (Iterable бросает исключение, если во время итерации возникает ошибка; Observable вызывает метод onError у своего наблюдателя и завершается).

Если Iterable использует pull подход, то есть потребитель запрашивает значение у производителя, и поток исполнения блокируется до тех пор, пока это значение не прибудет, то Observable является его push эквивалентом. Это значит, что производитель отправляет значения потребителю, только когда они становятся доступны.

Observable это только начало RxJava. Он позволяет асинхронно получать значения, но настоящая мощь приходит с реактивными расширениями (отсюда ReactiveX) операторами, которые позволяют преобразовывать, комбинировать и создавать последовательности элементов, испускаемых Observable. Именно тут и выходит на передний план функциональная парадигма со своими чистыми функциями. Операторы используют эту концепцию в полной мере. Они позволяют безопасно работать с последовательностями элементов, которые испускает Observable, не боясь побочных эффектов, если, конечно, не создать их самостоятельно. Операторы позволяют применять многопоточность, не заботясь о таких проблемах как потокобезопасность, низкоуровневое управление потоками, синхронизация, ошибки некосистентности памяти, наложение потоков и т.д. Имея большой арсенал функций, можно легко оперировать различными данными. Это дает нам очень мощный инструмент. Главное помнить, что операторы модифицируют элементы, испускаемые Observable, а не сами Observable. Observable никогда не изменяются с момента их создания. Размышляя о потоках и операторах, лучше всего думать диаграммами. Если вы не знаете, как решить задачу, то подумайте, посмотрите на весь список доступных операторов и подумайте еще.

Хотя сама по себе концепция реактивного программирования является асинхронной (не путайте с многопоточностью), по умолчанию все элементы в Observable доставляются подписчику синхронно, в том же потоке, в котором был вызван метод subscribe(). Чтобы привнести ту самую асинхронность, нужно либо самостоятельно вызывать методы onNext(T), onError(Throwable), onComplete() в другом потоке исполнения, либо использовать планировщики. Обычно все разбирают их поведение, так что давайте посмотрим на их устройство.

Планировщики абстрагируют пользователя от источника параллелизма за собственным API. Они гарантируют, что будут предоставлять определенные свойства, независимо от лежащего в основе механизма параллельности (реализации), например, Threads, event loop или Executor. Планировщики используют daemon потоки. Это означает, что программа завершится вместе с завершением основного потока исполнения, даже если происходят какие-то вычисления внутри оператора Observable.

В RxJava есть несколько стандартных планировщиков, которые подходят для определенных целей. Все они расширяют абстрактный класс Scheduler и реализуют собственную логику управлением workers (рабочими). Например, планировщик ComputationScheduler во время своего создания формирует пул рабочих, количество которых равно количеству процессорных потоков. После этого ComputationScheduler использует рабочих для выполнения Runnable задач. Вы можете передать Runnable планировщику с помощью методов scheduleDirect() и schedulePeriodicallyDirect(). Для обоих методов планировщик берет очередного рабочего из пула и передает ему Runnable.

Рабочий находится внутри планировщика и представляет собой сущность, которая выполняет Runnable объекты (задачи), используя одну из нескольких схем параллельности. Другими словами, планировщик получает Runnable и передает ее рабочему для выполнения. Также можно самостоятельно получить рабочего у планировщика и передать ему один или несколько Runnable, независимо от других рабочих и самого планировщика. Когда рабочий получает задачу, он помещает ее в очередь. Рабочий гарантирует последовательное выполнение задач в том порядке, в котором они были переданы, однако порядок может нарушаться отложенными задачами.

Например, в планировщике ComputationScheduler рабочий реализован с помощью ScheduledExecutorService размером в один поток.

image

Таким образом, мы имеем абстрактных рабочих, которые могут реализовывать любую схему параллельности. Такой подход дает много плюсов: модульность, гибкость, один API, различные реализации. Похожий подход мы видели в ExecutorService. К тому же, мы можем использовать планировщики отдельно от Observable.

Заключение


RxJava очень мощная библиотека, которую можно использовать самыми разными способами во множестве архитектур. Способы ее применения не ограничиваются уже существующими, поэтому всегда пробуйте адаптировать ее под себя. Однако помните о SOLID, DRY и других принципах разработки, а также не забывайте делиться с коллегами своим опытом применения. Надеюсь, что вы смогли почерпнуть из статьи что-то новое и интересное, до встречи!

  1. Причины, почему Netflix начали использовать ReactiveX
  2. Презентация RxJava интернет-сообществу
  3. Объяснение архитектуры VIPER и пример применения
  4. Объяснение ФРП от его создателя
  5. Разница между ФРП и реактивным программированием
  6. Рассуждение о ФРП
  7. Блог Conal Elliot о ФРП
Источник: habr.com
К списку статей
Опубликовано: 23.07.2020 18:11:16
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Блог компании auriga

Java

Разработка мобильных приложений

Разработка под android

Rxjava

Android

Viper

Backend

Mobile developement

Mobile design

Reactive programming

Functional reactive programming

Mvp

Observable

Категории

Последние комментарии

© 2006-2020, personeltest.ru