Русский
Русский
English
Статистика
Реклама

Async

Зоны в Dart большой брат следит за тобой

16.07.2020 12:18:27 | Автор: admin

Привет! Меня зовут Дима, я frontend-разработчик в компании Wrike. Клиентскую часть проекта мы пишем на Dart, однако работать с асинхронными операциями нам приходится не меньше, чем на других технологиях. Зоны один из удобных инструментов, который Dart для этого предоставляет. Недавно я начал разбирать эту тему, а сегодня планирую показать оставшиеся у меня примеры применения зон и неочевидные особенности их использования. Как обещал посмотрим на AngularDart.


Если хотите разобраться с базовыми возможностями зон, прочитайте мою первую статью.


image


NgZone и оптимизация процесса change detection


Представим рабочую ситуацию: на ревью спринта вы рассказываете про новую фичу, уверенно обходите стороной известные баги и показываете функциональность с лучшей стороны. Но через пару кликов самописный счетчик производительности фреймворка показывает over 9000 попыток перерисовать интерфейс. И это только за секунду!


После окончания ревью возникает непреодолимое желание исправить ситуацию. Скорее всего, первой идеей будет обмануть счетчик. Для этого надо разобраться, как он работает. Идем в код. Вероятно, мы увидим там эти строки:


class StandardPerformanceCounter {  final NgZone _zone;  StandardPerformanceCounter(this._zone) {    _zone.onMicrotaskEmpty.listen(_countPerformance);  }  // ...}

После дальнейшего исследования становится понятно, что это неспроста: Angular использует именно стрим onMicrotaskEmpty в корне каждого приложения для того, чтобы на каждый его event автоматически запускать процесс change detection:


class ApplicationRef extends ChangeDetectionHost {  ApplicationRef._(    this._ngZone, // ...  ) {    // ...    _onMicroSub = _ngZone.onMicrotaskEmpty.listen((_) {      _ngZone.runGuarded(tick);    });  }  // Start change detection  void tick() {    _changeDetectors.forEach((detector) {      detector.detectChanges();    });  }  // ...}

Похоже, надо разбираться с тем, что же такое NgZone, как она работает и фиксить приложение как положено. Смотрим под капот.


NgZone это не зона, а обертка над двумя другими зонами внешней (зона, в которой стартовало Angular приложение) и внутренней (зона, которую создал Angular и внутри которой он автоматически запускает все операции приложения). Обе сохраняются на этапе создания NgZone:


class NgZone {  NgZone._() {    _outerZone = Zone.current; // Save reference to current zone    _innerZone = _createInnerZone(      Zone.current,      handleUncaughtError: _onErrorWithoutLongStackTrace,    );  }  // Create Angular zone  Zone _createInnerZone(    Zone zone, // ...  ) {    return zone.fork(      specification: ZoneSpecification(        scheduleMicrotask: _scheduleMicrotask,        run: _run,        runUnary: _runUnary,        runBinary: _runBinary,        handleUncaughtError: handleUncaughtError,        createTimer: _createTimer,      ),      zoneValues: {_thisZoneKey: true, _anyZoneKey: true},    );  }  // ...}

Внутренняя зона принимает на себя много работы и использует разные особенности зон.


Для начала поверхностно напомню, зачем нужен change detection.



Так может выглядеть простая древовидная структура компонентов


Для построения интерфейса Angular использует компоненты, которые выстраиваются в древовидную структуру. В обычной ситуации после старта приложения компоненты гидрируются данными, выстраивают DOM дерево, ждут и сохраняют изменения в своих данных. При это сам Angular не узнает о существовании этих изменений тотчас: в удобные для него моменты он запускает процесс обхода дерева компонентов от корня по всем потенциально затронутым нодам в поисках измененных данных change detection. Если изменения произошли, то фреймворк запускает обновление соответствующего DOM поддерева.


Источников у этих изменений может быть море пользовательские события, онлайн нотификации, временные отсечки. Любой из них может повлиять на интерфейс, а значит надо проверить, не изменилось ли что-то в данных компонентов и не надо ли обновить интерфейс. Отсюда растет первая задача отследить все события.


Событий может быть очень много за короткий промежуток времени. Если Angular попытается отследить изменения после каждого события, то приложение резко просядет по производительности. Более того, реакции на события могут быть моментальными для пользователя, но асинхронными для потока выполнения.


Вспомним про event loop браузера:



Эту схему я позаимствовал из крутого доклада Джейка Арчибальда об event loop


Слева находится секция, в которой будет выполняться какой-то таск, а справа секция, в которой будут по очереди выполняться сначала запланированные с помощью requestAnimationFrame скрипты, потом произойдет калькуляция стилей, калькуляция лейаута и отрисовка. Скриптовые задачи мы можем выполнять только в желтых секциях.


А теперь попробуем залезть в голову к авторам Angular и понять, когда лучше всего выполнять detectChanges.


Проще всего это сделать после выполнения таски, но она может запланировать микротаски, которые, в свою очередь, могут изменить данные. Это приведет либо к неконсистентности, либо к еще одному пуску detectChanges. Не годится.


Интересно было бы отслеживать изменения в рамках requestAnimationFrame, но тут мне пришел в голову целый набор но:


  • Change detection на большом приложении может быть довольно долгим, из-за этого может вылететь много фреймов.
  • Скрипты, выполняемые в requestAnimationFrame, также могут запланировать микротаски, которые будут запускаться сразу же после выполнения скрипта и перед отрисовкой, последствия мы уже обсуждали.
  • Интерфейс после change detection может быть не полностью стабилен. Есть риск, что пользователь увидит незапланированную анимацию изменения интерфейса вместо ожидаемого конечного результата.

Остается еще один вариант запускать detectChanges после того, как будет выполнен скрипт и все его микротаски, если они есть. Это задача номер два.


Получается, что для магии Angular было бы неплохо:


  • Ловить все возможные пользовательские события.
  • Запускать change detection после того, как закончится выполнение скриптов в стеке и закончат выполнение все запланированные на тот момент микротаски.

Ловим все возможные пользовательские события. С этим прекрасно справляется та самая innerZone.


Посмотрим на нее еще раз:


class NgZone {  // ...  // Create Angular zone  Zone _createInnerZone(    Zone zone, // ...  ) {    return zone.fork(      specification: ZoneSpecification(        scheduleMicrotask: _scheduleMicrotask,        run: _run,        runUnary: _runUnary,        runBinary: _runBinary,        handleUncaughtError: handleUncaughtError,        createTimer: _createTimer,      ),      zoneValues: {_thisZoneKey: true, _anyZoneKey: true},    );  }  // ...}

В предыдущей статье мы уже разбирали, что Future выполняет свой коллбек в той зоне, в которой он был создан. Так как Angular при старте пытается создавать и выполнять все в своей внутренней зоне, то после комплита Future задача выполняется с помощью хендлера _run.


Вот как он выглядит:


class NgZone {  // ...  R _run<R>(Zone self, ZoneDelegate parent, Zone zone, R fn()) {    return parent.run(zone, () {      try {        _nesting++; // Count nested zone calls        if (_isStable) {          _isStable = false; // Set view may change          //         }        return fn();      } finally {        _nesting--;        _checkStable(); // Check we can try to start change detection      }    });  }  // ...}

C помощью семейства методов run* мы и ловим все пользовательские события, потому что после старта приложения изменения в нем, скорее всего, произойдут от асинхронных взаимодействий. Перед выполнением коллбека NgZone запоминает, что сейчас в приложении могут происходить изменения, и считает вложенность коллбеков. После выполнения коллбека зона вызывает метод _checkStable прямо в рамках основного потока, не планируя это на следующую итерацию event loop.


Запускаем change detection после того, как закончится выполнение скриптов в стеке и закончат выполнение все запланированные на тот момент микротаски. Второй важный элемент внутренней зоны scheduleMicrotask:


class NgZone {  // ...  void _scheduleMicrotask(Zone _, ZoneDelegate parent, Zone zone, void fn()) {    _pendingMicrotasks++; // Count scheduled microtasks    parent.scheduleMicrotask(zone, () {      try {        fn();      } finally {        _pendingMicrotasks--;        if (_pendingMicrotasks == 0) {          _checkStable(); // Check we can try to start change detection        }      }    });  }  // ...}

Эта функция отслеживает, когда закончат свою работу все микротаски. Работа похожа на run мы считаем, сколько было запланировано микротасок и сколько уже успело выполниться. Микротасок может быть запланировано сразу много, и они обязательно выполнятся до запуска следующего скрипта. Зона вызывает _checkStable в рамках последней микротаски, не планируя еще одну.


Наконец, посмотрим в тот метод, которым все заканчивается:


class NgZone {  // ...  void _checkStable() {    // Check task and microtasks are done    if (_nesting == 0 && !_hasPendingMicrotasks && !_isStable) {      try {        // ...        _onMicrotaskEmpty.add(null); // Notify change detection      } finally {        if (!_hasPendingMicrotasks) {          try {            runOutsideAngular(() {              _onTurnDone.add(null);            });          } finally {            _isStable = true; // Set view is done with changes          }        }      }    }  }  // ...}

Тут-то мы и добрались до того самого! Этот метод проверяет, есть ли еще вложенность или невыполненные микротаски. Если все завершено, он посылает событие через _onMicrotaskEmpty. Это и есть тот самый стрим, который синхронно запускает detectChanges! Дополнительно в конце проверяется, не создалось ли в момент работы change detection новых микротасок. Если все хорошо, NgZone считает вьюху стабильной и сообщает, что проход закончился.


Подытожим:


Angular старается выполнить все в NgZone. Каждый Future при комплите, каждый Stream при каждом событии и каждый Timer по истечении времени запустит run* или scheduleMicrotask, а значит и detectChanges.


Важно помнить, что это не все. Например, addEventListener на объекте Element также обязательно расскажет текущей зоне о запланированной работе, несмотря на то что это не стрим, не таймер и не фьючер. Еще один похожий пример сам по себе вызов _zone.run() точно также запустит detectChanges, ведь мы напрямую используем NgZone.


Этот процесс оптимизирован. Метод detectChanges запустится только один раз в самом конце того таска, который его триггернул, или в рамках самой последней микротаски, которая была запланирована в прошедшем скрипте. Change detection произойдет не в следующей итерации event loop, а в текущей.


Мы в проекте используем OnPush стратегию для change detection компонентов. Это позволяет нам сильно сэкономить на этой операции. Однако как бы ни был быстр холостой запуск detectChanges, события типа scroll и mouseMove могут запускать его очень часто. Я тестировал: 1000 таких вызовов в секунду могут съесть у пользователя 200мс времени. Зависит от многих условий, но есть над чем задуматься.


И раз уж это погружение в недра Angular началось с желания оптимизировать, то закончим мы парой забавных и не очень очевидных выводов из полученных знаний.


Stream и runOutsideAngular


Основной кейс runOutsideAngular относится как раз к ситуации, когда мы слушаем очень быстрый стрим, который хотим еще и фильтровать. Например, onMouseMove у объекта Element. Быстро посмотреть под капот стрима не получится, поскольку реализаций стримов в Dart уйма. Но в статье Zones написано простое и действенное правило:


Трансформации и другие коллбеки выполняются в той зоне, в которой стрим начали слушать.

Зона зависит от подписки. Где она создана, там и выполняется. Поэтому рекомендуется подписываться и фильтровать быстрый стрим вне зоны Angular:


// Part of AngularDart component classfinal NgZone _zone;final ChangeDetectorRef _detector;final Element _element;void onSomeLifecycleHook() {  _zone.runOutsideAngular(() {    _element.onMouseMove.where(filterEvent).listen((event) {      doWork(event);      _zone.run(_detector.markForCheck);    });  });}

Не очевидно тут вот что зачем же класть стрим вне зоны ангуляра, если он все равно фильтруется? Было бы лаконично и без этого:


// Part of AngularDart component classfinal Element _element;void onSomeLifecycleHook() {  _element.onMouseMove.where(filterEvent).listen(doWork);}

Проблема в том, что здесь мы делаем не одну подписку. Метод where при вызове возвращает стрим. И это не тот же стрим, это новый _WhereStream:


// Part of AngularDart component classfinal Element _element;void onSomeLifecycleHook() {  _element.onMouseMove // _ElementEventStreamImpl      .where(filterEvent) // _WhereStream      .listen(doWork);}

Когда мы подписываемся на _WhereStream, он тут же подписывается на родительский стрим и так далее до самого источника. И все эти подписки будут созданы в текущей зоне, а значит detectChanges будет срабатывать столько раз, сколько срабатывает самый быстрый стрим в цепочке. Даже если мы создали всю цепочку в другой зоне.


Контроль зоны для package:redux_epics


Мы часто используем в наших вьюшках пакет redux_epics. Под капотом он очень активно использует стримы и принуждает и нас их использовать. Бывает, что экшены, которые мы диспатчим, могут не повлиять на наше состояние. К тому же наш change detection запустится в любом случае после того, как action отработает и что-то изменит, не стоит пинать его лишний раз. Поэтому, чтобы избежать ложных срабатываний, нужно выполнять эпики вне зоны ангуляра. Как это сделать?


Раз уж все действия стрима выполняются в зоне, внутри которой мы на него подписались, стоит поискать метод listen в коде redux_epics:


class EpicMiddleware<State> extends MiddlewareClass<State> {  bool _isSubscribed = false;  // ...  @override  void call(Store<State> store, dynamic action, NextDispatcher next) {    // Init on first call    if (!_isSubscribed) {      _epics.stream          .switchMap((epic) => epic(_actions.stream, EpicStore(store)))          .listen(store.dispatch); // Forward all stream actions to dispatch      _isSubscribed = true; // Set middleware is initialized    }    next(action);    // ...  }}

Мы его найдем в методе call. Значит подписка создается в момент вызова мидлвары (в данном случае первого вызова), а это происходит при диспатче экшена.


Отсюда простой вывод первый action нужно диспатчить вне зоны ангуляра. Например, в корневом компоненте после создания стора:


// Part of AngularDart component classfinal NgZone _zone;final AppDispatcher _element;void onInit() {  _zone.runOutsideAngular(() {    // ...    _dispatcher.dispatch(const InitApp());  });}

А если диспатчить нечего, то и null сойдет:


// Part of AngularDart component classfinal NgZone _zone;final AppDispatcher _element;void onInit() {  _zone.runOutsideAngular(() {    // ...    _dispatcher.dispatch(null);  });}

После этого стримы эпиков будут выполняться вне зоны ангуляра, что избавит от части паразитных запусков change detection.


Многократный change detection для нативных событий


Вот теперь совсем забавный трюк. Допустим, у нас есть компонент родитель, в нем есть компонент дочка, в дочке есть элемент button:


<!-- parent-component --><child-component  (click)="handleClick()"></child-component> <!-- child-component --><button  type="button"  (click)="handleClick()">  Click</button>

В каждом из этих компонентов мы слушаем нативное событие click. Оно прорастет в родителя благодаря всплыванию. Подвох в том, что change detection здесь запустится дважды. Дело в том, что в шаблоне event listeners компилируются не как подписка на стрим, а как близкий к нативному addEventListener:


_el_0.addEventListener('click', eventHandler(_handleClick_0));

Так произойдет в обоих компонентах. А значит мы переносим сюда и интересную особенность addEventListener: когда пользователь нажмет на кнопку, то браузер создаст один таск, который в рамках одной итерации event loop породит столько выполнений скриптов, сколько подписок будет затронуто всплыванием события. И после каждого скрипта будут сразу выполняться все порожденные им микротаски, а вместе с ними и detectChanges.


Поэтому в Angular выгоднее будет не рассчитывать на всплытие ивента, а сделать в дочернем компоненте Output:


<!-- parent-component --><child-component  (buttonPress)="handleButtonPress()"></child-component> <!-- child-component --><button  type="button"  (click)="handleClick()">  Click</button>

Такой вариант запустит change detection единожды, поскольку Output это стрим, а даже асинхронный стрим использует микротаски, которые, как мы уже знаем, NgZone хорошо отслеживает.


Это странное поведение всплывающих событий отлично описано в статье о микротасках все того же Джейка Арчибальда.


Как пройти в библиотеку


Зоны это мощный инструмент, который решает специальные задачи и зачастую упрощает интерфейс. Но при этом ни один из показанных выше примеров не является чем-то написанным нами в нашем проекте, все примеры из сторонних библиотек.


Явное лучше, чем неявное. Коду приложения лучше быть легким, четким и понятным. Зона это инструмент, который выглядит как магия, что приемлемо в хорошо протестированных сообществом библиотеках или в собственноручно разработанных и хорошо протестированных утилитах. Но надо быть осторожными, внедряя такие инструменты в код, с которым мы ежедневно работаем.


Закончить хотелось бы на небольшом предостережении. Зоны не очень хорошо задокументированная функциональность, и периодически с их использованием связаны баги, которые просто так не пофиксить. Вот, например, issue, который мы завели по следам одного из них. Вкратце о нем расскажу.


При создании Future сохраняет текущую зону, это дает нам некоторый контроль. Но оказалось, что в Dart SDK есть как минимум два заранее созданных и закомпличеных Future с сохраненной в них root зоной:


abstract class Future<T> {  final Future<Null> _nullFuture = Future<Null>.zoneValue(null, Zone.root);  final Future<bool> _falseFuture = Future<bool>.zoneValue(false, Zone.root);  // ...}

Еще раз напомню, что любой Future обязательно должен выполнять запланированные коллбеки в микротаске. Если мы попытаемся к Future пристыковать задачу через метод then, то он, как минимум, выполнит:


  • zone.scheduleMicrotask;
  • zone.registerUnaryCallback;
  • zone.runUnary.

Мы разбирали, что коллбек гарантированно будет регистрироваться и выполняться в той зоне, в которой его передали в метод then. А вот со scheduleMicrotask все интереснее.


У Future существует оптимизация если на один Future повешено несколько коллбеков, то он постарается выполнить их все в одной микротаске:


// Callbacks doFirstWork and doSecondWork will be called in same microtaskvoid doWork(Future future) {  future.then(doFirstWork).then(doSecondWork);}

На оба коллбека из этого примера придется только один вызов scheduleMicrotask. Круто. Но бывает, что коллбеки были повешены в разных зонах:


void doWork(Future future) {  runZoned(() {    // First zone    future.then(doFirstWork);  }, zoneValues: {#isFirst: true});  runZoned(() {    // Second zone    future.then(doSecondWork);  }, zoneValues: {#isFirst: false});}

В этом случае они все еще будут выполнены в одной микротаске. Вопрос на засыпку какая зона должна запланировать этот микротаск? Первая? Вторая? Ребята из Dart выбрали, что планировать это всегда будет зона, которая записана в изначальный Future:


// Zone that is saved in [future] argument will schedule microtaskvoid doWork(Future future) {  runZoned(() {    // First zone    future.then(doFirstWork);  }, zoneValues: {#isFirst: true});  runZoned(() {    // Second zone    future.then(doSecondWork);  }, zoneValues: {#isFirst: false});}

Это значит, что, если мы запланируем выполнение коллбека для заранее созданного и закомпличенного _nullFuture, то scheduleMicrotask будет вызван не из текущей зоны, а из root зоны:


final future = Future._nullFuture;final currentZone = Zone.current;future.then(doWork);// currentZone.registerUnaryCallback(...);// _rootZone.scheduleMicrotask(...);// currentZone.runUnary(...);

Текущая зона так и не узнает, что была запланирована микротаска. Такое поведение легко может сломать рассмотренный ранее FakeAsync: он не сможет выполнить синхронно то, о чем понятия не имеет.


Можно подумать, что _nullFuture никогда наружу не вылезет, но:


final controller = StreamController<void>(sync: true);final subscription = controller.stream.listen(null);subscription.cancel(); // Returns Future._nullFuture

Не так уж и сложно его достать, причем из совершенно неожиданного места. Отсюда и баги с FakeAsync.


Нам бы пригодилась помощь в дискуссии об этом странном поведении, заходите в issue, вместе победим! К тому же там есть дополнительная информация от контрибьюторов о том, как еще зоны взаимодействуют с Future и Stream, не упустите!


У меня все. Буду рад ответить на вопросы!

Подробнее..

Асинхронное взаимодействие. Брокеры сообщений. Apache Kafka

24.12.2020 18:19:54 | Автор: admin
Данная публикация предназначена для тех, кто интересуется устройством распределенных систем, брокерами сообщений и Apache Kafka.
Здесь вы не найдете эксклюзивного материала или лайфхаков, задача этой статьи заложить фундамент и рассказать о внутреннем устройстве упомянутого брокера. Таким образом, в следующих публикациях мы сможем делать ссылки на данную статью, рассказывая о более узкоспециализированных темах.


Привет! Меня зовут Дмитрий Шеламов и я работаю в Vivid.Money на должности backend-разработчика в отделе Customer Care. Наша компания европейский стартап, который создает и развивает сервис интернет-банкинга для стран Европы. Это амбициозная задача, а значит и ее техническая реализация требует продуманной инфраструктуры, способной выдерживать высокие нагрузки и масштабироваться согласно требованиям бизнеса.

В основе проекта лежит микросервисная архитектура, которая включает в себя десятки сервисов на разных языках. В их числе Scala, Java, Kotlin, Python и Go. На последнем я пишу код, поэтому практические примеры, приведенные в этой серии статей, будут задействовать по большей части Go (и немного docker-compose).

Работа с микросервисами имеет свои особенности, одна из которых организация коммуникаций между сервисами. Модель взаимодействия в этих коммуникациях бывает синхронной или асинхронной и может оказать существенное влияние на производительность и отказоустойчивость системы в целом.

Асинхронное взаимодействие


Итак, представим что у нас есть два микросервиса (А и Б). Будем считать, что коммуникация между ними осуществляется через API и они ничего не знают о внутренней реализации друг друга, как и предписывает микросервисный подход. Формат передаваемых между ними данных заранее оговорен.

image

Задача перед нами стоит следующая: нам нужно организовать передачу данных от одного приложения к другому и, желательно, с минимальными задержками.
В самом простом случае поставленная задача достигается синхронным взаимодействием, когда А отправляет приложению Б запрос, после чего сервис Б его обрабатывает и, в зависимости от того, успешно или не успешно был обработан запрос, отправляет некоторый ответ сервису А, который этот ответ ожидает.
Если же ответ на запрос так и не был получен (например, Б рвет соединение до отправки ответа или А отваливается по таймауту), сервис А может повторить свой запрос к Б.

С одной стороны, такая модель взаимодействия дает определенность статуса доставки данных для каждого запроса, когда отправитель точно знает, были ли получены данные получателем и какие дальнейшие действия ему необходимо делать в зависимости от ответа.
С другой стороны, плата за это ожидание. После отправки запроса сервис А (или поток, в котором выполняется запрос) блокируется до того момента, пока не получит ответ или не сочтет запрос неудавшимся согласно своей внутренней логике, после чего примет дальнейшие действия.

Проблема не только в том, что ожидание и простой имеют место быть, задержки в сетевом взаимодействии неизбежны. Основная проблема заключается в непредсказуемости этой задержки. Участники коммуникации в микросервисном подходе не знают подробностей реализации друг друга, поэтому для запрашивающей стороны не всегда очевидно, обрабатывается ли ее запрос штатно или нужно переотправить данные.

Все, что остается А при такой модели взаимодействия это просто ждать. Может быть наносекунду, а может быть час. И эта цифра вполне реальна в том случае, если Б в процессе обработки данных выполняет какие-либо тяжеловесные операции, вроде обработки видео.

Возможно, вам проблема не показалась существенной одна железка ждет пока другая ответит, велика ли потеря?
Чтобы сделать эту проблему более личной, представим, что сервис А это приложение, запущенное на вашем телефоне, и пока оно ожидает ответ от Б, вы видите на экране анимацию загрузки. Вы не можете продолжить пользоваться приложением до тех пор, пока сервис Б не ответит, и вынуждены ждать. Неизвестное количество времени. При том, что ваше время гораздо ценнее, чем время работы куска кода.

Подобные шероховатости решаются следующим образом вы разделяете участников взаимодействия на два лагеря: одни не могут работать быстрее, как бы вы их ни оптимизировали (обработка видео), а другие не могут ждать дольше определенного времени (интерфейс приложения на вашем телефоне).
Затем вы заменяете cинхронное взаимодействие между ними (когда одна часть вынуждена ждать другую, чтобы удостовериться, что данные были доставлены и обработаны сервисом-получателем) на асинхронное, то есть модель работы по принципу отправил и забыл в этом случае сервис А продолжит свою работу, не дожидаясь ответа от Б.

Но как в этом случае гарантировать то, что передача прошла успешно? Вы же не можете, допустим, загрузив видео на видеохостинг, вывести пользователю сообщение: ваше видео может быть обрабатывается, а может быть и нет, потому что сервис, занимающийся загрузкой видео, не получил от сервиса-обработчика подтверждение, что видео дошло до него без происшествий.

В качестве одного из решений данной проблемы мы можем добавить между сервисами А и Б прослойку, которая будет выступать временным хранилищем и гарантом доставки данных в удобном для отправителя и получателя темпе. Таким образом мы сможем расцепить сервисы, синхронное взаимодействие которых потенциально может быть проблемным:

  • Данные, которые теряются при аварийном завершении сервиса-получателя теперь могут быть снова получены из промежуточного хранилища, в то время как сервис-отправитель продолжает выполнять свою работу. Таким образом мы получаем механизм гарантии доставки;
  • Эта прослойка также защищает получателей от скачков нагрузки, ведь получателю выдаются данные по мере их обработки, а не по мере их поступления;
  • Запросы на выполнение тяжеловесных операций (таких как рендеринг видео) теперь могут быть переданы через эту прослойку, обеспечивая меньшую связность между быстрыми и медленными частями приложения.


Под вышеобозначенные требования вполне подходит и обычная СУБД. Данные в ней можно хранить в течении продолжительного времени, не беспокоясь о потере информации. Также исключена и перегрузка получателей, ведь они вольны сами выбрать темп и объемы чтения предназначенных для них записей. Подтверждение же обработки можно реализовать, помечая прочитанные записи в соответствующих таблицах.

Однако выбор СУБД в качестве инструмента для обмена данными может привести к проблемам с производительностью с ростом нагрузки. Причина в том, что большинство баз данных не предназначены для такого сценария использования. Также во многих СУБД отсутствует возможность разделения подключенных клиентов на получателей и отправителей (Pub/Sub) в этом случае, логика доставки данных должна быть реализована на клиентской стороне.
Вероятно, нам нужно нечто более узкоспециализированное, чем база данных.

Брокеры сообщений


Брокер сообщений (очередь сообщений) это отдельный сервис, который отвечает за хранение и доставку данных от сервисов-отправителей к сервисам-получателям с помощью модели Pub/Sub.
Эта модель предполагает, что асинхронное взаимодействие осуществляется согласно следующей логике двух ролей:

  • Publishers публикуют новую информацию в виде сгруппированных по некоторому атрибуту сообщений;
  • Subscribers подписываются на потоки сообщений с определенными атрибутами и обрабатывают их.

Группирующим сообщения аттрибутом выступает очередь, которая нужна, чтобы разделять потоки данных, таким образом, получатели могут подписываться только на те группы сообщений, которые их интересуют.
По аналогии с подписками на различных контент-платформах подписавшись на определенного автора, вы можете фильтровать контент, выбирая смотреть только тот, который вам интересен.

image

Очередь можно представить как канал связи, натянутый между писателем и читателем. Писатели кладут сообщения в очередь, после чего они проталкиваются (push) читателям, которые подписаны на эту очередь. Один читатель получает одно сообщение за раз, после чего оно становится недоступно другим читателям
Под сообщением же подразумевается единица данных, обычно состоящая из тела сообщения и метаданных брокера.

В общем случае, тело представляет из себя набор байт определенного формата.
Получатель обязательно должен знать этот формат, чтобы после получения сообщения иметь возможность десериализовать его тело для дальнейшей обработки.
Использовать можно любой удобный формат, однако, важно помнить об обратной совместимости, которую поддерживают, например, бинарный Protobuf и фреймворк Apache Avro.

По такому принципу работает большинство брокеров сообщений, построенных на AMQP (Advanced Message Queuing Protocol) протоколе, который описывает стандарт отказоустойчивого обмена сообщениями посредством очередей.
Данный подход обеспечивает нам несколько важных преимуществ:

  • Слабая связанность. Она достигается за счет асинхронной передачи сообщений: то есть, отправитель скидывает данные и продолжает работать, не дожидаясь ответа от получателя, а получатель вычитывает и обрабатывает сообщения, когда удобно ему, а не когда они были отправлены. В данном случае очередь можно сравнить с почтовым ящиком, в который почтальон кладет ваши письма, а вы их забираете, когда удобно вам.
  • Масштабируемость. Если сообщения появляются в очереди быстрее, чем консьюмер успевает их обрабатывать (речь идет не о пиковых нагрузках, а о стабильном разрыве между скоростью записи и обработки), мы можем запустить несколько экземпляров приложения-консьюмера и подписать их на одну очередь.
    Этот подход называется горизонтальным масштабированием, а экземпляры одного сервиса принято называть репликами. Реплики сервиса-консьюмера будут читать сообщения из одной очереди и обрабатывать их независимо друг от друга.
  • Эластичность. Наличие между приложениями такой прослойки, как очередь, помогает справляться с пиковыми нагрузками: в этом случае очередь будет выступать буфером, в котором сообщения будут копиться и по мере возможности считываться консьюмером, вместо того, чтобы ронять приложение-получатель, отправляя данные ему напрямую.
  • Гарантии доставки. Большинство брокеров предоставляют гарантии at least once и at most once.


At most once исключает повторную обработку сообщений, однако допускает их потерю. В этом случае брокер будет доставлять сообщения получателям по принципу отправил и забыл. Если получатель не смог по какой-то причине обработать сообщение с первой попытки, брокер не будет осуществлять переотправку.

At least once, напротив, гарантирует получение сообщения получателем, однако при этом есть вероятность повторной обработки одних и тех же сообщений.
Зачастую эта гарантия достигается с помощью механизма Ack/Nack (acknowledgement/negative acknowledgement), который предписывает совершать переотправку сообщения, если получатель по какой-то причине не смог его обработать.
Таким образом, для каждого отправленного брокером (но еще не обработанного) сообщения существует три итоговых состояния получатель вернул Ack (успешная обработка), вернул Nack (неуспешная обработка) или разорвал соединение.
Последние два сценария приводят в переотправке сообщения и повторной обработке.

Однако брокер может произвести повторную отправку и при успешной обработке сообщения получателем. Например, если получатель обработал сообщение, но завершил свою работу, не отправив сигнал Ack брокеру.
В этом случае брокер снова положит сообщение в очередь, после чего оно будет обработано повторно, что может привести к ошибкам и порче данных, если разработчик не предусмотрел механизм устранения дублей на стороне получателя.

Стоит отметить, что существует еще одна гарантия доставки, которая называется exactly once. Ее трудно достичь в распределенных системах, но при этом она же является наиболее желаемой.
В этом плане, Apache Kafka, о которой мы будем говорить далее, выгодно выделяется на фоне многих доступных на рынке решений. Начиная с версии 0.11, Kafka предоставляет гарантию доставки exactly once в пределах кластера и транзакций, в то время как AMQP-брокеры таких гарантий предоставить не могут.
Транзакции в Кафке тема для отдельной публикации, сегодня же мы начнем со знакомства с Apache Kafka.

Apache Kafka


Мне кажется, что будет полезно для понимания начать рассказ о Кафке со схематичного изображения устройства кластера.

image

Отдельный сервер Кафки именуется брокером. Брокеры образуют собой кластер, в котором один из этих брокеров выступает контроллером, берущим на себя некоторые административные операции (помечен красным).

За выбор брокера-контроллера, в свою очередь, отвечает отдельный сервис ZooKeeper, который также осуществляет service discovery брокеров, хранит конфигурации и принимает участие в распределении новых читателей по брокерам и в большинстве случаев хранит информацию о последнем прочитанном сообщении для каждого из читателей.
Это важный момент, изучение которого требует опуститься на уровень ниже и рассмотреть, как отдельный брокер устроен внутри.

Commit log


Структура данных, лежащая в основе Kafka, называется commit log или журнал фиксации изменений.

image

Новые элементы, добавляемые в commit log, помещаются строго в конец, и их порядок после этого не меняется, благодаря чему в каждом отдельном журнале элементы всегда расположены в порядке их добавления.

Свойство упорядоченности журнала фиксаций позволяет использовать его, например, для репликации по принципу eventual consistency между репликами БД: в них хранят журнал изменений, производимых над данными в мастер-ноде, последовательное применение которых на слейв-нодах позволяет привести данные в них к согласованному с мастером виду.
В Кафке эти журналы называются партициями, а данные, хранимые в них, называются сообщениями.
Что такое сообщение? Это основная единица данных в Kafka, представляющая из себя просто набор байт, в котором вы можете передавать произвольную информацию ее содержимое и структура не имеют значения для Kafka.
Сообщение может содержать в себе ключ, так же представляющий из себя набор байт. Ключ позволяет получить больше контроля над механизмом распределения сообщений по партициям.

Партиции и топики


Почему это может быть важно? Дело в том, что партиция не является аналогом очереди в Кафке, как может показаться на первый взгляд.
Я напомню, что формально очередь сообщений это средство для группирования и управления потоками сообщений, позволяющее определенным читателям подписываться только на определенные потоки данных.

image

Так вот в Кафке функцию очереди выполняет не партиция, а topic. Он нужен для объединения нескольких партиций в общий поток. Сами же партиции, как мы сказали ранее, хранят сообщения в упорядоченном виде согласно структуре данных commit log.
Таким образом, сообщение, относящееся к одному топику, может хранится в двух разных партициях, из которых читатели могут вытаскивать их по запросу.

Следовательно, единицей параллелизма в Кафке выступает не топик (или очередь в AMQP брокерах), а партиция. За счет этого Кафка может обрабатывать разные сообщения, относящиеся к одному топику, на нескольких брокерах одновременно, а также реплицировать не весь топик целиком, а только отдельные партиции, предоставляя дополнительную гибкость и возможности для масштабирования в сравнении с AMQP брокерами.

Pull и Push


Обратите внимание, что я не случайно использовал слово вытаскивает по отношению к читателю.
В описанных ранее брокерах доставка сообщений осуществляется путем их проталкивания (push) получателям через условную трубу в виде очереди.
В Кафке процесса доставки как такового нет: каждый читатель сам ответственен за вытягивание (pull) сообщений из партиций, которые он читает.

image

Производители, формируя сообщения, прикрепляют к нему ключ и номер партиции. Номер партиции может быть выбран рандомно (round-robin), если у сообщения отсутствует ключ.

Если вам нужен больший контроль, к сообщению можно прикрепить ключ, а затем использовать hash-функцию или написать свой алгоритм, по которому будет выбираться партиция для сообщения. После формирования, производитель отправляет сообщение в Кафку, которая сохраняет его на диск, помечая, к какой партиции оно относится.

Каждый получатель закреплен за определенной партицией (или за несколькими партициями) в интересующем его топике, и при появлении нового сообщения получает сигнал на вычитывание следующего элемента в commit log, при этом отмечая, какое последнее сообщение он прочитал. Таким образом при переподключении он будет знать, какое сообщение ему вычитать следующим.

Какие преимущества имеет данный подход?


  • Персистентность. В классических брокерах сообщение хранится в памяти брокера ровно до того момента, как брокер получит сигнал об успешной обработке сообщения читателем, который это сообщение вытащил из очереди. В Кафке же сообщения хранятся столько, сколько нужно (в зависимости от Retention Policy, об этом позднее), а значит из одной партиции одновременно могут читать сообщения несколько получателей.
  • Message Replay. Читатели могут перечитывать сообщения сколько угодно раз, начиная с произвольного места в партиции. Это может быть полезно, например, для восстановления данных на стороне читателя при потере части изменений в БД.
  • Упорядоченность. Она гарантируется в том числе потому, что нет механизма переотправки (в силу ненадобности) в обычных брокерах в процессе доставки переотправлямые сообщения постоянно перетасовываются в очереди, так как они закидываются в нее снова после каждой неудачной попытки их обработать.
  • Чтение и запись пачками. Читатель может читать сообщения пачками (batch) из одной партиции, а не по отдельности, как это происходит с обычными брокерами. Это бывает полезно для уменьшения сетевой задержки: при передаче большого количества сообщений (1кк и выше), гонять по сети каждое сообщение отдельно становится дорого.


Недостатки


К недостаткам данного подхода можно отнести работу с проблемными сообщениями. В отличие от классических брокеров, битые сообщения (которые не удается обработать с учетом существующей логики получателя или из-за проблем с десериализацей) нельзя бесконечно перезакидывать в очередь, пока получатель не научится их корректно обрабатывать.
В Кафке по умолчанию вычитывание сообщений из партиции останавливается, когда получатель доходит до битого сообщения, и до тех пор, пока оно не будет пропущено и закинуто в карантинную очередь (также именуемой dead letter queue) для последующей обработки, чтение партиции продолжить не получится.

Также в Кафке сложнее (в сравнении с AMQP-брокерами) реализовать приоритет сообщений. Это напрямую вытекает из того факта, что сообщения в партициях хранятся и читаются строго в порядке их добавления. Один из способов обойти данное ограничение в Кафке создать нескольких топиков под сообщения с разным приоритетом (отличаться топики будут только названием), например, events_low, events_medium, events_high, а затем реализовать логику приоритетного чтения перечисленных топиков на стороне приложения-консьюмера.

Еще один недостаток данного подхода связан тем, что необходимо вести учет последнего прочитанного сообщения в партиции каждым из читателей.
В силу простоты структуры партиций, эта информация представлена в виде целочисленного значения, именуемого offset (смещение). Оффсет позволяет определить, какое сообщение в данный момент читает каждый из читателей. Ближайшая аналогия оффсета это индекс элемента в массиве, а процесс чтения похож на проход по массиву в цикле с использованием итератора в качестве индекса элемента.
Однако этот недостаток нивелируется за счет того, что Kafka, начиная с версии 0.9, хранит оффсеты по каждому пользователю в специальном топике __consumer_offsets (до версии 0.9 оффсеты хранились в ZooKeeper).
К тому же, вести учет оффсетов можно непосредственно на стороне получателей.

image

Также усложняется и масштабирование: напомню, что в AMQP брокерах для того, чтобы ускорить обработку потока сообщений, нужно просто добавить несколько экземпляров сервиса-читателя и подписать их на одну очередь, при этом не требуется вносить никаких изменений в конфигурации самого брокера.

Однако в Кафке масштабирование происходит несколько сложнее, чем в AMQP брокерах.
Например, если вы добавите еще один экземпляр читателя и натравите его на ту же партицию, вы получите нулевой КПД, так как в этом случае оба экземпляра будут читать один и тот же набор данных.
Поэтому базовое правило масштабирования Кафки количество конкурентных читателей (то бишь группа сервисов, реализующих одинаковую логику обработки (реплик)) топика не должно превышать количество партиций в этом топике, иначе какая-то пара читателей будут обрабатывать одинаковый набор данных.

Consumer Group


Чтобы избежать ситуации с чтением одной партиции конкурентными читателями, в Кафке принято объединять несколько реплик одного сервиса в consumer Group, в рамках которого Zookeeper будет назначать одной партиции не более одного читателя.

Так как читатели привязываются непосредственно к партиции (при этом читатель обычно ничего не знает о количестве партиций в топике), ZooKeeper при подключении нового читателя производит перераспределение участников в Consumer Group таким образом, чтобы каждая партиция имела одного и только одного читателя.
Читатель обозначает свою Consumer Group при подключении к Kafka.

image

В то же время ничего не мешает вам повесить на одну партицию несколько читателей с разной логикой обработки. Например вы храните в топике список событий по действиям пользователей и хотите использовать эти события для формирования нескольких представлений одних и тех же данных (например для бизнес-аналитиков, продуктовых-аналитиков, системных-аналитиков и пакета Яровой) и последующей отправкой их в соответствующие хранилища.

Но здесь мы можем столкнуться с другой проблемой, порожденной тем, что Кафка использует структуру из топиков и партиций. Я напомню, что Кафка не гарантирует упорядоченность сообщений в рамках топика, только в рамках партиции, что может оказаться критичным, например, при формировании отчетов о действиях по пользователю и отправке их в хранилище as is.

image

Чтобы решить эту проблему, мы можем пойти от обратного: если все события, относящиеся к одной сущности (например, все действия относящиеся к одному user_id), будут всегда добавляться в одну и ту же партицию, они будут упорядочены в рамках топика просто потому, что находятся в одной партиции, порядок внутри которой гарантирован Кафкой.
Для этого нам и нужен ключ у сообщений: например, если мы будем использовать для выбора партиции, в которую будет добавлено сообщение, алгоритм, вычисляющий хэш от ключа, то сообщения с одинаковым ключом будут гарантированно попадать в одну партицию, а значит и вытаскивать получатель сообщения с одинаковым ключом в порядке их добавления в топик.
В кейсе с потоком событий о действиях пользователей ключом партицирования может выступать user_id.

Retention Policy


Теперь пришло время поговорить о Retention Policy.
Это настройка, которая отвечает за удаление сообщений с диска при превышении пороговых значений даты добавления (Time Based Retention Policy) или занимаемого на диске пространства (Size Based Retention Policy).

  • Если вы настроите TBRP на 7 суток, то все сообщения старше 7 суток будут помечаться для последующего удаления. Иными словами, эта настройка гарантирует, что в каждый момент времени будут доступны для чтения сообщения младше порогового возраста. Можно задавать в часах, минутах и милисекундах.
  • SBRP работает аналогичным образом: при превышении порога занимаемого дискового пространства, сообщения будут помечаться для удаления с конца (более старые). Нужно иметь в виду: так как удаление сообщений происходит не мгновенно, занимаемый объем диска всегда будет чуть больше указанного в настройке. Задается в байтах.


Retention Policy можно настроить как для всего кластера, так и для отдельных топиков: например, сообщения в топике для отслеживания деиствии пользователеи можно хранить несколько днеи, в то время как пуши в течении нескольких часов. Удаляя данные согласно их актуальности, мы экономим место не диске, что может быть важно при выборе SSD в качестве основного дискового хранилища.

Compaction Policy


Еще одним способом оптимизации объема, занимаемого на диске, может быть использование Compaction Policy эта настройка позволяет хранить только последнее сообщение по каждому ключу, удаляя все предыдущие сообщения. Это может быть полезно, когда нас интересует только последнее изменение.

Сценарии использования Kafka


  • Отслеживание действий пользователей на клиентской части. При этом логгируемая информация может быть самой разной: от списка просмотренных страниц до щелчков мыши. Сообщения о действиях публикуются в один или несколько топиков, где потребителем может выступать, например, хранилище аналитических данных (Clickhouse можно подписать непосредственно на топик Кафки!) для дальнейшего построения отчетов или рекомендательных систем.
    В Customer Care отделе Vivid.Money мы используем топик Кафки для доставки в аналитическое хранилище логов о действиях операторов в нашей CRM.
  • Обмен сообщениями. Кафка может выступать этаким единым интерфейсом для отправки различных уведомлений, пушей или электронных писем во всем проекте. Любой сервис может подключиться к Кафке и отправить сообщение в определенный топик для уведомлений, из которого на той стороне сервис-консьюмер (имеющий доступ к контактной информации клиентов) его считает, преобразует в формат пригодный для отправки нотификации непосредственно клиенту, и осуществит фактическую отправку.
    Благодаря этому мы можем отправить пуш буквально из любой части нашей инфраструктуры, без необходимости получения контактных данных пользователя (и его предпочтений по способу связи) в инициирующем отправку нотификации сервисе. В свою свою очередь, успешно получив сообщение, Кафка гарантирует то, что оно будет доставлено клиенту, даже если на стороне сервиса нотификаций возникли неполадки.
  • Мониторинг. Через топики кафки можно организовать сбор и агрегацию логов и метрик для их централизованной обработки, используя ее как транспорт.
  • Журнал фиксации (commit log). Можно дублировать в топик транзакции БД, чтобы сервисы-потребители синхронизировали состояние связанных данных уже в своих базах/сторонних системах.
    Опять же, долгосрочное хранение сообщений позволяет выступать Кафке этаким буфером для изменений, который позволяет переиграть изменения из топика Кафки для приведения данных на стороне получателя к согласованному виду в случае сбоев приложений получателей или повреждению данных в их БД.
    По такому принципу у нас в Customer Care организована синхронизация данных профиля клиента в используемых нами CRM-системах с изменениями данных пользователей в наших внутренних базах.


Подытожим основные преимущества Kafka


  • В один топик может писать один или несколько производителей идеально для агрегирования данных из большого количества источников, что становится особенно полезно при использовании Кафки в качестве системы доставки сообщений в микросервисной архитектуре;
  • Несколько потребителей с учетом особенностей механизма получения сообщений (pull) один и тот же поток сообщений может читать несколько потребителей, не мешая при этом друг другу.
    При этом конкурентных читателей (например, реплики одного сервиса) можно объединить в Consumer Group, а ZooKeeper, в свою очередь, будет следить, чтобы каждая партиция одновременно читалась не более, чем одним участником каждой группы;
  • Хранение данных на диске в течение длительного времени позволяет не беспокоится о потере данных при резком росте нагрузки. Кафка, будучи своего рода буфером, компенсирует отставание потребителей, позволяя накапливать в себе сообщения до нормализации нагрузки или масштабирования консьюмеров. Также обеспечивается гибкая конфигурация, где отдельные потоки данных (топики) хранятся на диске с разным сроком;
  • Хорошо масштабируется, засчет меньшей, в сравнении с AMQP брокерами, единицей параллелизма партицией. Разные партиции могут храниться в разных брокерах, обеспечивая дополнительную гибкость при горизонтальном масштабировании;
  • Быстродействие. В силу простоты механизма, при которой процесса доставки нет как такового, а процесс передачи данных представляет из себя запись-хранение-выдача, Кафка обладает очень большой пропускной способностью она исчисляется миллионами сообщений в секунду.
Подробнее..

Собираем данные AlphaVantage с Faust. Часть 1. Подготовка и введение

20.09.2020 14:22:27 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть I Введение

20.09.2020 16:05:00 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть II Агенты и Команды

23.09.2020 04:06:02 | Автор: admin

Оглавление

  1. Часть I: Введение

  2. Часть II: Агенты и Команды

Что мы тут делаем?

Итак-итак, вторая часть. Как и писалось ранее, в ней мы сделаем следующее:

  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.

  2. Сделаем агента, который будет собирать данные о ценных бумагах и мета информацию по ним.

Но, это то, что мы сделаем для самого проекта, а в плане исследования faust мы узнаем, как писать агентов, обрабатывающих стрим событий из kafka, а так же как написать команды (обёртка на click), в нашем случаи - для ручного пуша сообщения в топик, за которым следит агент.

Подготовка

Клиент AlphaVantage

Для начала, напишем небольшой aiohttp клиентик для запросов на alphavantage.

alphavantage.py

Spoiler
import urllib.parse as urlparsefrom io import StringIOfrom typing import Any, Dict, List, Unionimport aiohttpimport pandas as pdimport stringcasefrom loguru import loggerfrom horton.config import API_ENDPOINTclass AlphaVantageClient:    def __init__(        self,        session: aiohttp.ClientSession,        api_key: str,        api_endpoint: str = API_ENDPOINT,    ):        self._query_params = {"datatype": "json", "apikey": api_key}        self._api_endpoint = api_endpoint        self._session = session    @logger.catch    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:        formatted_data = {}        for field, item in data.items():            formatted_data[stringcase.snakecase(field)] = item        return formatted_data    @logger.catch    async def _construct_query(        self, function: str, to_json: bool = True, **kwargs    ) -> Union[Dict[str, Any], str]:        path = "query/"        async with self._session.get(            urlparse.urljoin(self._api_endpoint, path),            params={"function": function, **kwargs, **self._query_params},        ) as response:            data = (await response.json()) if to_json else (await response.text())            if to_json:                data = self._format_fields(data)        return data    @logger.catch    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)        data = pd.read_csv(StringIO(data))        securities = data.to_dict("records")        for index, security in enumerate(securities):            security = self._format_fields(security)            security["_type"] = "physical"            securities[index] = security        return securities    @logger.catch    async def get_security_overview(self, symbol: str) -> Dict[str, str]:        return await self._construct_query("OVERVIEW", symbol=symbol)    @logger.catch    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query(            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"        )    @logger.catch    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)    @logger.catch    async def get_indicator_data(        self, symbol: str, indicator: str, **indicator_options    ) -> Dict[str, Any]:        return await self._construct_query(            indicator, symbol=symbol, **indicator_options        )

Собственно по нему всё ясно:

  1. API AlphaVantage достаточно просто и красиво спроектирована, поэтому все запросы я решил проводить через метод construct_query где в свою очередь идёт http вызов.

  2. Все поля я привожу к snake_case для удобства.

  3. Ну и декорация logger.catch для красивого и информативного вывода трейсбека.

P.S. Незабываем локально добавить токен alphavantage в config.yml, либо экспортировать переменную среды HORTON_SERVICE_APIKEY. Получаем токен тут.

CRUD-класс

У нас будет коллекция securities для хранения мета информации о ценных бумагах.

database/security.py

Тут по-моему ничего пояснять не нужно, а базовый класс сам по себе достаточно прост.

get_app()

Добавим функцию создания объекта приложения в app.py

Spoiler
import faustfrom horton.config import KAFKA_BROKERSdef get_app():    return faust.App("horton", broker=KAFKA_BROKERS)

Пока у нас будет самое простое создание приложения, чуть позже мы его расширим, однако, чтобы не заставлять вас ждать, вот референсы на App-класс. На класс settings тоже советую взглянуть, так как именно он отвечает за большую часть настроек.

Основная часть

Агент сбора и сохранения списка ценных бумаг

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Так, сначала получаем объект faust-приложения - это достаточно просто. Далее, мы явно объявляем топик для нашего агента... Тут стоит упомянуть, что это такое, что за параметр internal и как это можно устроить по-другому.

  1. Топики в kafka, если мы хотим узнать точное определение, то лучше прочитать офф. доку, либо можно прочитать конспект на хабре на русском, где так же всё достаточно точно отражено :)

  2. Параметр internal, достаточно хорошо описанный в доке faust, позволяет нам настраивать топик прямо в коде, естественно, имеются ввиду параметры, предусмотренные разработчиками faust, например: retention, retention policy (по-умолчанию delete, но можно установить и compact), кол-во партиций на топик (partitions, чтобы сделать, например, меньшее чем глобальное значение приложения faust).

  3. Вообще, агент может создавать сам управляемый топик с глобальными значениями, однако, я люблю объявлять всё явно. К тому же, некоторые параметры (например, кол-во партиций или retention policy) топика в объявлении агента настроить нельзя.

    Вот как это могло было выглядеть без ручного определения топика:

app = get_app()@app.agent()async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Ну а теперь, опишем, что будет делать наш агент :)

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for _ in stream:            logger.info("Start collect securities")            client = AlphaVantageClient(session, API_KEY)            securities = await client.get_securities()            for security in securities:                await SecurityCRUD.update_one(                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True                )            yield True

Итак, в начале агента мы открываем aiohttp сессию для запросов через наш клиент. Таким образом, при запуске воркера, когда будет запущен наш агент, сразу же будет открыта сессия - одна, на всё время работы воркера (или несколько, если изменить параметр concurrency у агента с дефолтной единички).

Далее, мы идём по стриму (сообщение мы помещаем в _, так как нам, в данном агенте, безразлично содержание) сообщений из нашего топика, если они есть при текущем сдвиге (offset), иначе, наш цикл будет ожидать их поступления. Ну а внутри нашего цикла, мы логируем поступление сообщения, получаем список активных (get_securities возвращает по-умолчания только active, см. код клиента) ценных бумаг и сохраняем его в базу, проверяя при этом, есть ли бумага с таким тикером и биржей в БД, если есть, то она (бумага) просто обновится.

Запустим наше творение!

> docker-compose up -d... Запуск контейнеров ...> faust -A horton.agents worker --without-web -l info

P.S. Возможности веб-компонента faust я рассматривать в статьях не буду, поэтому выставляем соответствующий флаг.

В нашей команде запуска мы указали faust'у, где искать объект приложения и что делать с ним (запустить воркер) с уровнем вывода логов info. Получаем следующий вывод:

Spoiler
aS v1.10.4 id           horton                                             transport    [URL('kafka://localhost:9092')]                    store        memory:                                            log          -stderr- (info)                                    pid          1271262                                            hostname     host-name                                          platform     CPython 3.8.2 (Linux x86_64)                       drivers                                                           transport  aiokafka=1.1.6                                       web        aiohttp=3.6.2                                      datadir      /path/to/project/horton-data                       appdir       /path/to/project/horton-data/v1                   ... логи, логи, логи ...Topic Partition Set topic                       partitions  collect_securities          {0-7}       horton-__assignor-__leader  {0}         

Оно живое!!!

Посмотрим на partition set. Как мы видим, был создан топик с именем, которое мы обозначили в коде, кол-во партиций дефолтное (8, взятое из topic_partitions - параметра объекта приложения), так как у нашего топика мы индивидуальное значение (через partitions) не указывали. Запущенному агенту в воркере отведены все 8 партициций, так как он единственный, но об этом будет подробнее в части про кластеринг.

Что же, теперь можем зайти в другое окно терминала и отправить пустое сообщение в наш топик:

> faust -A horton.agents send @collect_securities{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. с помощью @ мы показываем, что посылаем сообщение в топик с именем "collect_securities".

В данном случае, сообщение ушло в 6 партицию - это можно проверить, зайдя в kafdrop на localhost:9000

Перейдя в окно терминала с нашим воркером, мы увидим радостное сообщение, посланное с помощью loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Так же, можем заглянуть в mongo (с помощью Robo3T или Studio3T) и увидеть, что ценные бумаги в базе:

Я не миллиардер, а потому, довольствуемся первым вариантом просмотра.

Счастье и радость - первый агент готов :)

Агент готов, да здравствует новый агент!

Да, господа, нами пройдена только 1/3 пути, уготованного этой статьёй, но не унывайте, так как сейчас будет уже легче.

Итак, теперь нам нужен агент, который собирает мета информацию и складывает её в документ коллекции:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[?],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            ...

Так как этот агент будет обрабатывать информацию о конкретной security, нам нужно в сообщении указать тикер (symbol) этой бумаги. Для этого в faust существуют Records - классы, декларирующие схему сообщения в топике агента.

В таком случае перейдём в records.py и опишем, как должно выглядеть сообщение у этого топика:

import faustclass CollectSecurityOverview(faust.Record):    symbol: str    exchange: str

Как вы уже могли догадаться, faust для описания схемы сообщения использует аннотацию типов в python, поэтому и минимальная версия, поддерживаемая библиотекой - 3.6.

Вернёмся к агенту, установим типы и допишем его:

collect_security_overview_topic = app.topic(    "collect_security_overview", internal=True, value_type=CollectSecurityOverview)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[CollectSecurityOverview],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            logger.info(                "Start collect security [{symbol}] overview", symbol=event.symbol            )            client = AlphaVantageClient(session, API_KEY)            security_overview = await client.get_security_overview(event.symbol)            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)            yield True

Как видите, мы передаём в метод инициализации топика новый параметр со схемой - value_type. Далее, всё по той же самой схеме, поэтому останавливаться на чём то ещё - смысла не вижу.

Ну что же, последний штрих - добавим в collect_securitites вызов агента сбора мета информации:

....for security in securities:    await SecurityCRUD.update_one({            "symbol": security["symbol"],            "exchange": security["exchange"]        },        security,        upsert = True,    )    await collect_security_overview.cast(        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])    )....

Используем ранее объявлению схему для сообщения. В данном случае, я использовал метод .cast, так как нам не нужно ожидать результат от агента, но стоит упомянуть, что способов послать сообщение в топик:

  1. cast - не блокирует, так как не ожидает результата. Нельзя послать результат в другой топик сообщением.

  2. send - не блокирует, так как не ожидает результата. Можно указать агента в топик которого уйдёт результат.

  3. ask - ожидает результата. Можно указать агента в топик которого уйдёт результат.

Итак, на этом с агентами на сегодня всё!

Команда мечты

Последнее, что я обещал написать в этой части - команды. Как уже говорилось ранее, команды в faust - это обёртка над click. Фактически faust просто присоединяет нашу кастомную команду к своему интерфейсу при указании ключа -A

После объявленных агентов в agents.py добавим функцию с декоратором app.command, вызывающую метод cast у collect_securitites:

@app.command()async def start_collect_securities():    """Collect securities and overview."""    await collect_securities.cast()

Таким образом, если мы вызовем список команд, в нём будет и наша новая команда:

> faust -A horton.agents --help....Commands:  agents                    List agents.  clean-versions            Delete old version directories.  completion                Output shell completion to be evaluated by the...  livecheck                 Manage LiveCheck instances.  model                     Show model detail.  models                    List all available models as a tabulated list.  reset                     Delete local table state.  send                      Send message to agent/topic.  start-collect-securities  Collect securities and overview.  tables                    List available tables.  worker                    Start worker instance for given app.

Ею мы можем воспользоваться, как любой другой, поэтому перезапустим faust воркер и начнём полноценный сбор ценных бумаг:

> faust -A horton.agents start-collect-securities

Что будет дальше?

В следующей части мы, на примере оставшихся агентов, рассмотрим, механизм sink для поиска экстремум в ценах закрытия торгов за год и cron-запуск агентов.

На сегодня всё! Спасибо за прочтение :)

Код этой части

P.S. Под прошлой частью меня спросили про faust и confluent kafka (какие есть у confluent фичи). Кажется, что confluent во многом функциональнее, но дело в том, что faust не имеет полноценной поддержки клиента для confluent - это следует из описания ограничений клиентов в доке.

Подробнее..

Оно живое! Вышла версия Flask 2.0

13.05.2021 18:23:09 | Автор: admin

Незаметно от всех 12 мая 2021 вышла новая версия известного микрофреймворка Flask. Хотя казалось, что во Flask есть уже все, ну или почти все, что нужно для микрофреймворка.
Предвкушая интерес, а что же нового завезли, оставлю ссылку на Change log.

Из приглянувшихся особенностей новой версии:

  • Прекращена поддержка Python версии 2. Минимальная версия Python 3.6

  • Поддержка асинхронных view и других обратных вызовов, таких как обработчики ошибок, определенные с помощью async def. Обычные синхронные view продолжают работать без изменений. Функции ASGI, такие как веб-сокеты, не поддерживаются.

  • Добавьте декораторы роутов для общих методов HTTP API.
    @app.post ("/ login") == @ app.route ("/ login", methods = ["POST"])

  • Новая функция Config.from_file для загрузки конфигурации из файла любого формата.

  • Команда flask shell включает завершение табуляции, как это делает обычная оболочка python.

  • При обслуживании статических файлов браузеры будут кэшировать на основе содержимого, а не на основе 12-часового таймера. Это означает, что изменения статического содержимого, такого как стили CSS, будут немедленно отражены при перезагрузке без необходимости очистки кеша.

Рассмотрим асинхронность

Все бы было хорошо, но в самом начале после установки был не найден модуль asgiref. Доустановим руками.

Для примера напишем самое простое приложение: Ping/Pong. Оно не имеет особого смысла и сложной логики, только имитирует некоторую проверку "жив ли сервис". Также это приложение станет бенчмарком.

from flask import Flaskapp = Flask(__name__)@app.get('/')async def ping():    return {'message': 'pong'}if __name__ == '__main__':    app.run(host='0.0.0.0')

Деплой

Как было сказано в Change log: "Функции ASGI, такие как веб-сокеты, не поддерживаются."
То есть только единственный способ задеплоить приложение используя gunicorn.

Команда: gunicorn -w 8 --bind 0.0.0.0:5000 app:app
-w 8 - 8 запущенных процессов
--bind 0.0.0.0:5000 - адрес приложения

Сверим производительность

Команда для нагрузочного тестирования: wrk -t 8 -c 100 -d 5 http://localhost:5000

Асинхронное приложение Flask 2.0:
Running 5s test @ http://localhost:5000
8 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 17.80ms 2.37ms 36.95ms 91.92%
Req/Sec 673.44 163.80 3.86k 99.75%
26891 requests in 5.10s, 4.21MB read
Requests/sec: 5273.84
Transfer/sec: 844.69KB

Синхронное приложение Flask 2.0:
Running 5s test @ http://localhost:5000
8 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.91ms 842.62us 21.56ms 89.86%
Req/Sec 2.38k 410.20 7.64k 93.53%
95301 requests in 5.10s, 14.91MB read
Requests/sec: 18689.25
Transfer/sec: 2.92MB

Синхронное приложение Flask 1.1.2:
Running 5s test @ http://localhost:5000
8 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.98ms 823.42us 17.40ms 88.55%
Req/Sec 2.37k 505.28 12.23k 98.50%
94522 requests in 5.10s, 14.78MB read
Requests/sec: 18537.84
Transfer/sec: 2.90MB

В качестве вывода

Исходя из результатов бенчмарков можно увидеть, что 1 и 2 версия в синхронном режиме выдают одинаковые результаты(с небольшой погрешностью). Что касается асинхронности в Flask 2.0 можно сделать вывод, что пока она слишком сырая даже в dev режиме запуска асинхронный view отстает от синхронного. Но также не стоит забывать о том что ASGI пока не поддерживается, и нет возможности запустить через uvicorn. Остается только ждать обновления и следить за дальнейшим развитием.

Обновилась именно major версия, а это значит у нас есть надежда новую переосмысленную итерацию фреймворка. Разработчиков стоит похвалить как минимум за то, что проект не заброшен и старается успевать за основными тенденциями в других фреймворках. Лично мне очень нравится Flask, он совершенно не перегружен, как например Django.

Если у вас есть идеи, почему получились именно такие результаты - пожалуйста поделитесь ими в комментариях.

Подробнее..

Flutter. Асинхронность (async) ltgt параллельность (isolate). Совсем

11.02.2021 02:11:06 | Автор: admin

Вступление


Недавно с удивлением обнаружил, что у коллег нет полной ясности, что такое асинхронность во Flutter. Почему-то у них было представление, что если асинхронная функция правильно написана, то она не блокирует интерфейс. Пролистав, пару статей не нашел простого, полного и ясного объяснения всей этой кухни (тут все по принципу выберите 2 из 3-х)). В одной статье даже прочитал, что Dart обладает некоей чудесной асинхронностью, которая позволяет отложить выполнения кода, до тех пор, пока поток не будет посвободнее (что на мой взгляд вводит немного в заблуждение).

Для кого статья


Статья рассчитана на тех, кто только начинает знакомиться с Flutter, поэтому попытаюсь на простом примере в этой небольшой заметке показать, что асинхронность это всего лишь возможность выполнения кода не последовательно. Но, если у вас есть тяжелая функция (пусть она даже будет трижды асинхронной) она все равно заблокирует вам интерфейс. Конечно, в реальном продакте, вряд ли вы столкнетесь с такими явными проявлениями (на текущий момент процессора достаточно мощные), но понимать все-таки стоит как это работает.

Поехали


И так, возьмем для экспериментов пример из документации к библиотеке flutter_bloc. Немного модифицируем функцию "_mapTimerStartedToState" класса timer_bloc закомментируем обновление счетчика чтобы не мешал:

Stream<TimerState> _mapTimerStartedToState(TimerStarted start) async* {  yield TimerRunInProgress(start.duration);  _tickerSubscription?.cancel();  // _tickerSubscription = _ticker  //     .tick(ticks: start.duration)  //     .listen((duration) => add(TimerTicked(duration: duration)));}


Добавим новую статическую (заранее делаем ее такой isolate работает только с ними) функцию:

static Future<void> _heavyComput (SendPort sendPort) async {  await Future.delayed(Duration(seconds: 5));  print('=======================');  print('!!!function finished!!!');  print('=======================');  return null;}


Тут в качестве эмуляции тяжелых вычислений ждем окончания задержки в 5 секунд.
Модифицируем функцию mapEventToState добавляем в конце асинхронный вызов _heavyComput:

@overrideStream<TimerState> mapEventToState(  TimerEvent event,) async* {. . .    _heavyComput(null);}


Для первого теста все готово наша задача наблюдать за волшебными волнами.
Запускаем и видим волны волнуются, интерфейс не блокируется, сообщение об окончании работы функции через 5 секунд выводится.



Вот она чудесная асинхронность паника была ложной. Хм А что если Future.delayed(Duration(seconds: 5)) заменить циклом?

static Future<void> _heavyComput(SendPort sendPort) async {  int pause = 1200000000;  for (int i = 0; i < pause; i++) {}  print('=======================');  print('!!!function finished!!!');  print('=======================');  return null;}

Запускаем и все приехали волны больше не волнуются.



Думаю, тут особых объяснений не требуется: даже асинхронная тяжелая функция блокирует все. По-умолчанию весь код выполняется в одном потоке. Просто в первом случае никаких вычислений не требовалось, а требовалось просто подождать, а во втором нужны были вычисления.

Ну, и чтобы статья не получилась совсем уж микроскопической давайте вызовем эту функцию при помощи isolate. Изменим mapEventToState:

@overrideStream<TimerState> mapEventToState(  TimerEvent event,) async* {. . .  var _receivePort = ReceivePort();  var _isolate = Isolate.spawn(_heavyComput, _receivePort.sendPort);}


Запускаем и видим, что интерфейс не блокируется, сообщение о завершении работы функции получаем с заметной задержкой.



На этом все (как работают async и await есть много статей, думаю, не стоит на этом останавливаться).

Пример можно скачать по ссылке flutter_timer_async_and_parallels
Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru