Что вы знаете о Schedulers в RxJS? Они скрывают от разработчиков работу с контекстом выполнения Observable. Как те эльфы-домовики из Гарри Поттера, которые выполняют всю черную работу в Хогвартсе, а о них никто даже и не слышал. Давайте исправим это и узнаем о них чуть больше.
Что такое Scheduler
Scheduler позволяет определить в каком контексте выполнения Observable будет доставлять нотификации до Observer. (вольный перевод документации)
Другими словами, Scheduler управляет очередностью и временем выполнения операций в Observable. Пример ниже поможет разобраться нам, как это работает.
import { of } from "rxjs";console.log("Start");of("Observable").subscribe(console.log);console.log("End");// Logs:// Start// Observable// End
Два console.log зажали Observable между собой. Код выполняется синхронно. Если мы хотим, чтобы наш Observable выполнялся асинхронно, нужно добавить оператор observeOn и внутрь него прокинуть нужный нам Scheduler.
import { asyncScheduler, of } from "rxjs";import { observeOn } from "rxjs/operators";console.log("Start");of("Observable") .pipe(observeOn(asyncScheduler)) .subscribe(console.log);console.log("End");// Logs:// Start// End// Observable
Теперь наш Observable отдает данные асинхронно. Отлично. Это похоже, если бы мы обернули его в setTimeout(, 0), не правда ли?
Как вы уже успели заметить, мы использовали asyncScheduler в нашем коде. Это один из легендарных Schedulers. Но их гораздо больше.
Типы Schedulers
Для понимания всей мощи Schedulers нужно знать, как работает Event Loop в JavaScript. Освежить знания можно с помощью этого видео или этой статьи на хабре.
Если кратко, то в браузере свой порядок выполнения кода:
-
Сначала выполняется синхронный код (callstack)
-
Дальше очередь микрозадач (Promise)
-
Потом очередь макрозадач (setTimeout, setInterval, XMLHttpRequest и т.д.).
-
Отдельно стоит очередь для задач, которые выполняются сразу перед следующим циклом перерисовки контента. (requestAnimationFrame)
В RxJS есть Scheduler на каждый из этих пунктов:
queueScheduler |
планирование синхронного кода |
asapScheduler |
планирование кода в очередь микрозадач |
asyncScheduler |
планирование кода в очередь макрозадач |
animationFrameScheduler |
планирование кода в очередь перед перерисовкой контента |
Еще существуют VirtualTimeScheduler иTestScheduler, которые используются для тестов. О них читайте здесь.
Взгляните на код ниже.
import { of, merge, asapScheduler,asyncScheduler,queueScheduler, animationFrameScheduler } from "rxjs";import { observeOn } from "rxjs/operators";const async$ = of("asyncScheduler").pipe(observeOn(asyncScheduler));const asap$ = of("asapScheduler").pipe(observeOn(asapScheduler));const queue$ = of("queueScheduler").pipe(observeOn(queueScheduler));const animationFrame$ = of("animationFrameScheduler").pipe( observeOn(animationFrameScheduler));merge(async$, asap$, queue$, animationFrame$).subscribe(console.log);console.log("synchronous code");// Logs:// queueScheduler// synchronous code// asapScheduler// animationFrameScheduler// asyncScheduler
Как вы видите, "queueScheduler" отработал синхронно, так как он перед "synchronous code". А "asapScheduler" раньше "asyncScheduler", потому что в нем используется очередь микрозадач.
Как использовать Schedulers
Scheduler используется с операторами observeOn и subscribeOn. Оба принимают в себя первым аргументом Scheduler, а вторым аргументом delay, который по умолчанию равен нулю.
import { of, asyncScheduler } from "rxjs";import { observeOn, subscribeOn } from "rxjs/operators";of("observeOn") .pipe(observeOn(asyncScheduler, 100)) .subscribe(console.log);of("subscribeOn") .pipe(subscribeOn(asyncScheduler, 50)) .subscribe(console.log);// Logs:// subscribeOn// observeOn
Различие их в том, что observeOn планирует в каком контексте будут выполняться методы observer next, error и complete выполняются в соответствующем с Scheduler контексте. А subscribeOn влияет на subscriber метод subscribe будет выполняться в другом контексте.
Интересный факт, что если задать delay не равный нулю в
observeOn/subscribeOn, то вне зависимости какой используется
Scheduler, будет использоваться asyncScheduler. Бессмысленный код
observeOn(animationFrameScheduler, 100)
.
До версии RxJS 6.5.0 можно было добавить Scheduler вторым аргументом для of, from, merge, range и т.д. В новых версиях RxJS это поведение deprecated, и необходимо использоватьфункцию scheduled для этого.
import { of, scheduled, asapScheduler } from 'rxjs';// DEPRECATED// of(2, asapScheduler).subscribe(console.log);scheduled(of('scheduled'), asapScheduler).subscribe(console.log);
Пример использования Scheduler
Мы не задумываемся о Schedulers при работе с RxJS, так как авторы библиотеки провели великолепную работу по абстрагированию этой логики. Но бывает, когда использование Scheduler будет органично, и вы должны быть к этому готовы. На моей практике был один такой случай. Меня попросили реализовать кэширование запросов отличающихся по индетификатору. Примерно такую функцию я написал:
const cache = new Map<number, any>();function get(id: number): Observable<any> { if (cache.has(id)) { return of(cache.get(id)); } return http.get(some-url\ + id).pipe( tap(data => { cache.set(id, data); }), );}
Код работает. Но есть один нюанс. В первый раз, когда использовали мою функцию, она возвращала значение асинхронно, а во второй раз - синхронно. Когда функция ведет себя так непресказуемо для вызывающего, может случится трагедия. Выходит, что я высвободил Залго.
Добавим scheduled с asyncScheduler в 4 строчке, чтобы исправить это.
return scheduled(of(cache.get(id)), asyncScheduler);
Теперь все работает предсказуемо. Залго низвержен туда, где был.
Заключение
Schedulers влияют на время и порядок выполнения задач. Четверть операторов RxJS использует их под капотом. С большой вероятностью на практике осведомленность о них не нужна. Но никогда не знаешь, в какой момент жизни пригодится Scheduler.
Я буду рад услышать о вашем опыте использования Schedulers в комментариях. Спасибо за внимание!