Русский
Русский
English
Статистика
Реклама

Celery

Отложенные задачи в рамках микро-сервисной архитектуры

13.02.2021 20:21:59 | Автор: admin

Часто в проектах возникает необходимость выполнения отложенных задач, таких как отправка email, push и других специфических задач, свойственных доменной области вашего приложения. Сложности начинаются, когда обычного crontab уже недостаточно, когда пакетная обработка не подходит и когда у каждой единицы задачи свое время выполнения или оно назначается динамически.

Для решения такой задачи было создано очередное решение под названием Trigger Hook. Принципиальная схема работы показана на рисунке 1. На схеме показано, что происходит с заданиями в течения всего их жизненного цикла. Смена цвета означает смену статуса задачи.

Рисунок 1 - Принципиальная схема работы Trigger HookРисунок 1 - Принципиальная схема работы Trigger Hook

задача, время запуска которой наступит не скоро

задача, время запуска которой скоро наступит

задача, время запуска которой наступило

обработанное задание

неподтвержденный статус задания в базе данных

команда на удаление

Жизненный цикл задачи:

  • При создании задачи она попадает в базу данных (квадратный блок) (красные и желтые).

  • В оперативную память подгружаются задачи (треугольный блок), если их время запуска скоро наступит (переход красный->желтый). Данная структура реализована в виде приоритезированной очереди (кучи).

  • При наступлении времени выполнения задачи, она посылается на выполнение (переход желтый->зеленый). Используется промежуточный буфер перед обработкой для компенсации пиковых нагрузок.

  • В случае успешной отправки задачи, она удаляется из базы данных (переход зеленый->голубой->удаление). Используется промежуточный буфер перед удалением, также для компенсации пиковых нагрузок.

Дальше постараюсь подробнее описать некоторые особенности и привести аргументы в пользу выбора данного решения.

Простота API

Id принимается в формате UUIDv4. Если не передать, то будет сгенерирован самостоятельно. Возможность передачи id задачи со стороны внешнего сервиса будет полезна при использовании асинхронного канала. Время запуска указывается в формате UNIX.

Создание:

task := &domain.Task{Id:     id,ExecTime: time,}triggerHook.Create(task)

Удаление:

triggerHook.Delete(task.Id)

Получение событий наступления времени запуска:

for {result := triggerHook.Consume()if err != send(result.Task()) {result.Rollback()}result.Confirm()}

Стойкость к сбоям

При обработке задач может произойти сбой, например, если потеряно соединение с брокером сообщений. В таком случае выполнение задачи подтверждено не будет, а будет проведена повторная попытка отправки. Задача отметится как обработанная только при вызове метода подтверждения. Внезапная остановка приложения не приведет к несогласованности в базе данных.

Кроме того, с учетом общей тенденции рынка к переносу приложений в облако в виде микро-сервисов формируются новые требования к приложению. По крайней мере то, что выходило на задний план ранее, сейчас становится более важным. При этом подходе контейнеризированные приложения имеют временную природу. Механизм Trigger Hook делает возможным сворачивание микро-сервиса на одном сервере и разворачивание на другом в производственной среде без мягкой остановки.

В случае аварийного завершения приложения, есть вероятность, что выполнение некоторых задач может быть не подтверждено в базе данных. При повторном запуске приложения эти задачи будут отправлены на выполнение повторно. Такое поведение является компромиссом в пользу обеспечения стойкости к сбоям. Получив сообщение от Trigger Hook, Ваше приложение должно выполнять задачу только один раз, а при повторном получении - игнорировать. Такие ситуации обычное явление в событийно ориентированных архитектурах и они не должны нарушать внутреннее состояние и генерировать большое количество ошибок.

Точность и производительность

Для избежания высокой частоты запросов в базу данных предусмотрен механизм периодической предзагрузки наборов задач, время выполнения которых находится в заданном диапазоне. Другими словами, делаются редкие запросы наборов задач вместо частых запросов задач по одной. Такая схема хорошо подходит, если например, на одно время назначено выполнение нескольких сотен тысяч задач. После загрузки задач они сортируются в порядке приоритета. Когда время таймера для самой приоритетной задачи истекает, то она сразу же поступает на обработку клиентскому коду. Это позволяет добиться большой пиковой производительности и обработки задач с секундной точностью.

Также, большая производительность отправки задач на выполнение достигается за счет простой схемы хранения задач, индексирования и много-поточного доступа к базе данных.

Были измерены основные показатели скорости обработки задач.

Сервер приложения:

  • AWS EC2 Ubuntu 20

  • t2.micro

  • 1 vCPUs 2.5 GHz

  • 1 GiB RAM

Сервер базы данных:

  • AWS RDS MySQL 8.0

  • db.t3.micro

  • 2 vCPUs

  • 1 GiB RAM

  • Network: 2085 Mbps

Тест

Длительность теста

Средняя скорость (задач/сек)

Количество задач

Создание задач

1 минута 11 сек

1396

100000

Удаление задач

52 сек

1920

100000

Отправка задач (состояние задачи от красной до голубой)

498 милисекунд

200668

100000

Подтверждение задач (состояние задачи от голубой до удаления)

2 сек

49905

100000

Мониторинг

Для быстрой проверки корректной работы Trigger Hook предоставляет возможность подключить time-series базу данных. На этапе инициализации есть возможность определить периодичность измерений и выбрать интересующие метрики. Полный список доступных метрик есть тут.

Также есть возможность подключить систему логирования через адаптер. Доступны:

  • фатальные ошибки - приводящие к полной остановке приложения

  • ошибки на которые стоит обратить внимание, но которые не приводят к остановке

  • дебаг сообщения

Далее в примере Вы можете увидеть пример подключения к InfluxDB+Grafana

Trigger Hook в составе микро-сервисной архитектуры

Асинхронное взаимодействие

При использовании микро-сервисной архитектуры, обычно, предпочтение отдают асинхронному взаимодействию. Trigger Hook хорошо вписывается в микро-сервисную, событийно ориентированную архитектуру. Но в любом случае, входящие (создание, удаление) и исходящее (событие наступления времени запуска задачи) каналы могут быть как асинхронными, так и синхронными в зависимости от требований.

Ниже, на рисунке 2 приведен один из возможных вариантов схемы коммуникации через асинхронный канал. В качестве брокера сообщений может выступать какая-нибудь очередь, например, RabbitMQ. Эта схема исключает блокировку вызываемого микро-сервиса вызывающим, как при синхронном запросе посредством, например HTTP. Брокер принимает неограниченное количество задач (условно неограниченное), а обработчик этих задач берется за них по мере освобождения. Как только команда на создание будет обработана, отправляется событие об успешном создании задачи. Так же через брокер, клиентский сервис получает это событие и реагирует на него соответствующим образом - меняет статус сущности, использующей отложенное задание. В качестве сущности может выступать, например Push уведомление на мобильные устройства с рекламой.

Существенным недостатком данной схемы является усложнение инфраструктуры обслуживающей подобное взаимодействие. По сути, введение статусов ожидания ответа от других микро-сервисов это есть распределенные транзакции.

Рисунок 2 - Схема коммуникации через асинхронный каналРисунок 2 - Схема коммуникации через асинхронный канал

На рисунке 3 показаны процессы создания сущности имеющий отложенное выполнение и на рисунке4 выполнение при наступлении времени.

Рисунок 3 - Процесс создания сущности с отложенным выполнениемРисунок 3 - Процесс создания сущности с отложенным выполнениемРисунок 4 - Выполнение задания сущностиРисунок 4 - Выполнение задания сущности

Совместный доступ

Отсутствие возможности передачи некоторой полезной нагрузки при создании задачи может некоторых разочаровать. Но уверяю, в этом нет необходимости. Trigger Hook содержит достаточный функционал для построения менеджера задач. Относитесь к Trigger Hook как к слою абстракции, находящегося на инфраструктурном уровне. Полная информация о задаче, например, тип, статус, время исполнения, количество попыток выполнения, полезная нагрузка и тп, будут содержаться в слое абстракции выше Trigger Hook.

Верхний слой будет обладать доменным знанием. Другими словами, менеджер задач будет иметь определенный набор типов задач, определенный набор событий, относящихся к тем или иным типам задач. Например, обращение к интерфейсу будет звучать как создай отложенную задачу на отправку email сообщения или создай отложенную задачу на списание платы по подписке на YouTube, а уже сам менеджер задач будет обращаться к Trigger Hook с запросом создай отложенную задачу. Когда придет время запустить задачу, Trigger Hook создаст событие время выполнения задания наступило. Это событие перехватит менеджер задач, обработает его, выдав, например, событие время списания платы по подписке наступило. На рисунках 5 и 6 показан этот процесс.

Рисунок 5 - Создание задания с использованием промежуточного слояРисунок 5 - Создание задания с использованием промежуточного слояРисунок 6 - Обработка события с использованием промежуточного слояРисунок 6 - Обработка события с использованием промежуточного слоя

Связь между компонентами приложения должна быть очень слабой. Это касается и микро-сервисов в целом. На практике, одной из причин усиления связи, является перенос части ответственности одного сервиса в другой. Поэтому, одной из самых сложных задач, является поиск границы раздела (монолитного, например) приложения на микро-сервисы. Что бы это сделать удачно, нужно учитывать специфику доменной области знаний и текущей реализации приложения. Теперь вопрос - в какой микро-сервис поместить слой менеджер задач?

Рисунок 7 - Менеджер задач в одном м/с с Trigger HookРисунок 7 - Менеджер задач в одном м/с с Trigger Hook

На рисунке 7 показана схема, где менеджер задач является отдельным, микро-сервисом, содержащий доменное знание о типах задач, событиях относящихся к этим задачам. Как видно из схемы, предполагается совместное использование одного микро-сервиса менеджера заданий для разных клиентских микро-сервисов. У каждого микро-сервиса свой канал для получения событий. В RabbitMq такой канал событий легко реализовать в виде схемы direct.

Рисунок 8 - Менеджер задач как часть клиентского м/сРисунок 8 - Менеджер задач как часть клиентского м/с

На рисунке 8 показана иная схема, где менеджер задач является частью клиентского микро-сервиса и используется только для своих внутренних нужд. Такая схема подойдет если нет других микро-сервисов использующих отложенные задания или же каждый микро-сервис имеет свой менеджер задач с Trigger Hook микро-сервисом.

Масштабирование

Некоторые приложения сложнее масштабировать, чем другие. Все намного проще, если состояние приложения хранится только во внешнем хранилище с поддержкой конкурентного доступа, например, классическая связка PHP + MySQL. В этом случае несколько экземпляров приложения PHP разворачиваются на разных серверах, а Nginx балансирует нагрузку между ними, при этом, MySQL ресурс остается один на все экземпляры PHP приложений. Если MySQL не справляется, то уже независимо от PHP приложения, могут быть добавлены реплики.

Все несколько сложнее, когда приложение хранит собственное состояние. Его сложнее масштабировать горизонтально. Trigger Hook хранит свое состояние в оперативной памяти. Оно подгружает задачи, время запуска которых скоро наступит. Допустим, Вы создали задачу, время выполнения которой наступит примерно через 5 секунд. Это означает, что Trigger Hook уже погрузил ее для выполнения. Но Вы захотели отменить эту задачу. Для этого нужно вызвать метод API delete. Важно вызвать этот метод у того экземпляра приложения, который взял задачу на обработку. Это первая сложность.

Вторая сложность заключается в том, что каждый экземпляр Trigger Hook должен иметь собственную схему в БД. Это связано с обеспечением согласованности базы данных при сбоях. В общем, с точки зрения производительности нет смысла использовать экземпляры Trigger Hook для одной базы данных, во первых, потому что Trigger Hook работает в много-поточном режиме, а во вторых, при прочих равных БД является узким горлышком.

На рисунке 9 показан пример масштабирования нагрузки. У каждого экземпляра Trigger Hook своя БД, на разных серверах (иначе особого смысла нет). Перед экземплярами Trigger Hook имеется балансировщик нагрузки. Кроме балансировки, он пишет в какую-нибудь hash map базу данных, например, Redis, пару ключ-значение:

task_id:instance_host
Рисунок 9 - Схема горизонтального масштабированияРисунок 9 - Схема горизонтального масштабирования

Это нужно для обеспечения функции удаления задачи. Если в Вашем приложении не предусмотрено удаление, то достаточно балансера без базы данных. События, генерируемые экземплярами Trigger Hook можно пересылать по одному каналу через брокер. Генерирование id будет происходить на стороне клиентского сервиса (при асинхронном взаимодействии) или на стороне trigger hook (при асинхронном или синхронном взаимодействии). Для клиентских сервисов интерфейс не изменится.

Приложение для демонстрации Trigger Hook

Приложение состоит из пяти микро-сервисов. Каждый использует Docker контейнер. Все работает на Kubernetes. Приложение легко можно развернуть в minikube. Тут описана подробная инструкция.

Рисунок 10 - Упрощенная схема взаимодействия микро-сервисовРисунок 10 - Упрощенная схема взаимодействия микро-сервисов

Message service - сервис (рисунок 11), который предоставляет API для создания email сообщений и назначения отправки на определенное время или отмены. Также позволяет просмотреть полный список сообщений и их статусы.

Некоторые особенности:

  • Находится на уровне домена.

  • Состоит из менеджера сообщений и менеджера заданий.

  • Написан на PHP, фреймворк Symfony 5.

  • Работает в двух экземплярах. Первый обслуживает API запросы при помощи Nginx. Второй - запускает демон через supervisor для прослушивания события из очереди RabbitMQ. Имеет вспомогательные экземпляры для запуска миграций.

  • Использует схему с рисунка 8 для управления заданиями.

Рисунок 11 - Message serviceРисунок 11 - Message service

Message Dashboard - интерфейс для Message service (рисунок 12).

Рисунок 12 - Интерфейс демо-приложенияРисунок 12 - Интерфейс демо-приложения

Сервис Mailer находится на уровне инфраструктуры. Должен непосредственно делать рассылку. Не реализован, так как не важен в рамках демо.

Trigger service - сервис уровня инфраструктуры. Использует GRPC канал для получения команд на создание и удаление заданий, AMQP для рассылки события наступления времени выполнения задания (триггер).

Рисунок 13 - Trigger serviceРисунок 13 - Trigger service

Monitoring - также находится на инфраструктурном уровне, так как показывает технические метрики без привязки к бизнес событиям. На рисунке 14 показано как выглядит панель. Используется Grafana и InfluxDB. Полное описание метрик есть тут.

Рисунок 14 - Технические метрики Trigger HookРисунок 14 - Технические метрики Trigger Hook

Надеюсь, приложение и статья будут Вам полезны! Следите за моим github, следите за проектом, ставьте звездочки). Спасибо!

Подробнее..

Перевод Как настроить мультинодовый кластер Airflow с помощью Celery и RabbitMQ

25.01.2021 10:05:20 | Автор: admin

Что такое Airflow?


Apache Airflow это продвинутый workflow менеджер и незаменимый инструмент в арсенале современного дата инженера.


Airflow позволяет создавать рабочие процессы в виде направленных ациклических графов (DAG) задач. Разнообразные служебные программы командной строки выполняют сложные операции на DAG. Пользовательский интерфейс легко визуализирует конвейеры, работающие в производственной среде, отслеживает ход выполнения и при необходимости устраняет неполадки.


Программно создавайте, планируйте и контролируйте рабочий процесс. Он предоставляет функциональную абстракцию в виде идемпотентного DAG (направленного ациклического графа). Функция как служба абстракции для выполнения задач с заданными интервалами.


Кластер с одним узлом Airflow


В одноузловом кластере Airflow все компоненты (рабочий, планировщик, веб-сервер) установлены на одном узле, известном как "Master нода". Чтобы масштабировать кластер с одним узлом, Airflow должен быть настроен в режиме LocalExecutor. Worker берет (pull) задачу из очереди IPC (межпроцессное взаимодействие), это очень хорошо масштабируется до тех пор, пока ресурсы доступны на Master нода. Чтобы масштабировать Airflow на много нод, необходимо включить Celery Executor.



Архитектура с одной нодой Airflow


Мультинодовый кластер Airflow


В мультинодовой архитектуре Airflow его демоны распределены по всем рабочим нодам. Поскольку веб-сервер и планировщик будут установлены на главной ноде, а рабочие будут установлены на каждом отдельном рабочей ноде, поэтому он может хорошо масштабироваться как по горизонтали, так и по вертикали. Чтобы использовать этот режим архитектуры, необходимо настроить Airflow с помощью CeleryExecutor.


Серверную часть Celery необходимо настроить для включения режима CeleryExecutor в архитектуре Airflow. Популярными фреймворками / приложениями для бэкэнда Celery являются Redis и RabbitMQ. RabbitMQ это брокер сообщений. Его задача управлять обменом данными между несколькими службами задач путем управления очередями сообщений. Вместо канала связи IPC, который был бы в архитектуре с одной нодой, RabbitMQ предоставляет модель механизма публикации подписки для обмена сообщениями в разных очередях. Каждая очередь в RabbitMQ опубликована с событиями / сообщениями в виде команд задач, работники Celery будут извлекать команды задач из каждой очереди и выполнять их как действительно распределенные и параллельные способы. Что действительно может ускорить действительно мощное одновременное и параллельное выполнение задач в кластере.



Мультинодовая архитектура Airflow


Celery:


Celery это асинхронная очередь задач, основанная на распределенной передаче сообщений. Он ориентирован на работу в реальном времени, но также поддерживает планирование. Airflow использует его для выполнения нескольких параллельных операций на уровне задач на нескольких рабочих узлах с использованием многопроцессорности и многозадачности. Мультинодовая архитектура Airflow позволяет масштабировать Airflow, легко добавляя новые воркеры.


Устновка мультинодового кластера Airflow и настройка Celery:


Примечание. Мы используем операционную систему CentOS 7 Linux.


  1. Установка RabbitMQ

yum install epel-releaseyum install rabbitmq-server

  1. Включение и запуск RabbitMQ Server

systemctl enable rabbitmq-server.servicesystemctl start rabbitmq-server.service

  1. Включение интерфейса веб-консоли управления RabbitMQ

rabbitmq-plugins enable rabbitmq_management


Номер порта сервера rabbitmq по умолчанию 15672, имя пользователя и пароль по умолчанию для веб-консоли управления admin/admin.



  1. Установка протокола транспорта pyamqp для RabbitMQ и адаптера PostGreSQL

pip install pyamqp

amqp:// это псевдоним, который использует librabbitmq, если он доступен, или py-amqp, если его нет.


Вы должны использовать pyamqp:// или librabbitmq://, если хотите точно указать, какой протокол передачи данных использовать. Протокол pyamqp:// использует библиотеку amqp (http://github.com/celery/py-amqp)


Установка адаптера PostGreSQL: psycopg2


Psycopg это адаптер PostgreSQL для языка программирования Python.


pip install psycopg2

  1. Установка Airflow.

pip install 'apache-airflow[all]'

Проверьте версию airflow


airflow version


Мы используем версию Airflow v1.10.0, рекомендованную и стабильную в настоящее время.


  1. Инициализация базы данных

airflow initdb

После установки и настройки вам необходимо инициализировать базу данных, прежде чем вы сможете запустить группы обеспечения доступности баз данных и ее задачу. Поэтому последние изменения будут отражены в метаданных Airflow из конфигурации.


  1. Установка Celery

Celery должен быть установлен на главной ноде и на всех рабочих нодах.


pip install celery==4.3.0

Проверка версии Celery


celery --version4.3.0 (rhubarb)

  1. Изменение файла airflow.cfg для Celery Executor.

executor = CeleryExecutorsql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow dags_are_paused_at_creation = Trueload_examples = False

После внесения этих изменений в файл конфигурации airflow.cfg необходимо обновить метаданные airflow с помощью команды airflow initdb, а затем перезапустить airflow.


Теперь вы можете запустить веб-сервер airflow с помощью следующей команды


# default port is 8080airflow webserver -p 8000

Вы можете запустить планировщик


# start the schedulerairflow scheduler

Вы также должны запустить airflow на каждом рабочем узле.


airflow worker

Как только вы закончите запускать различные службы airflow, вы можете проверить фантастический интерфейс airflow при помощи команды:


http://<IP-ADDRESS/HOSTNAME>:8000

поскольку мы указали порт 8000 в нашей команде запуска службы веб-сервера, в противном случае номер порта по умолчанию 8080.


Да! Мы закончили создание кластера с мультинодовый архитектурой Airflow. :)

Подробнее..

Перевод Настройка мультинодового кластера Airflow с HDP Ambari и Celery для конвейеров данных

26.01.2021 10:19:42 | Автор: admin

Airflow идеальный выбор для конвейеров данных, то есть для оркестрации и планирования ETL. Он широко применяется и популярен для создания конвейеров передачи данных будущего. Он обеспечивает обратное заполнение, управление версиями и происхождение с помощью функциональной абстракции.



Функциональное программирование это будущее.


Оператор определяет единицу в рабочем процессе, DAG это набор Задач. Операторы обычно работают независимо, на самом деле они могут работать на совершенно двух разных машинах. Если вы инженер данных и работали с Apache Spark или Apache Drill, вы, вероятно, знаете, что такое DAG! Такая же концепция и у Airflow.


Создавайте конвейеры данных, которые:


  1. Идемпотентны.


  2. Детерминированы.


  3. Не имеют побочных эффектов.


  4. Используют неизменные источники и направления.


  5. Не обновляются, не добавляются.



Модульность и масштабируемость основная цель функциональных конвейеров данных.


Если вы работали с функциональным программированием с помощью Haskell, Scala, Erlang или Kotlin, вы удивитесь, что это то, что мы делаем в функциональном программировании, и вышеперечисленные пункты относятся к функциональному программированию, да! Вы правы. Функциональное программирование это мощный инструмент будущего.


Если у вас есть ETL / Data Lake / Streaming Infrastructure как часть платформы разработки данных, у вас должен быть кластер Hadoop / Spark с некоторым дистрибутивом, таким как Hortonworks, MapR, Cloudera и т. д. Поэтому я собираюсь рассказать о том, как вы можете использовать ту же инфраструктуру, где у вас есть Apache Hadoop / Apache Spark Cluster, и вы используете его для создания Airflow Cluster и его масштабирования.


Если у вас много заданий ETL и вы хотите организовать и составить расписание с помощью некоторых инструментов планирования, у вас есть несколько вариантов, например Oozie, Luigi и Airflow. Oozie основан на XML, и мы взяли его 2019 году! :), Luigi чуть не выбросили после того, как Airflow родился на Airbnb.


Почему не используем Luigi с Airflow?


  1. У Airflow есть собственный планировщик, в котором Luigi требует синхронизировать задачи в задании cron.


  2. С Luigi навигация по пользовательскому интерфейсу становится сложной задачей.


  3. В Luigi сложно создавать задачи.


  4. Luigi не масштабируется из-за тесной связи с заданиями Cron.


  5. Повторный запуск процесса в Luigi невозможен.


  6. Luigi не поддерживает распределенное выполнение, так как оно плохо масштабируется.



До появления Airflow я использовал Luigi для поддержки рабочего процесса моделей машинного обучения с помощью Scikit-learn, Numpy, Pandas, Theano и т. д.


В последнем сообщении блога мы обсудили, как настроить мультинодовый кластер Airflow с Celery и RabbitMQ без использования Ambari.


Ага, переходим к главному.


Как настроить мультинодовый кластер Airflow в Hadoop Spark Cluster, чтобы Airflow мог запускать задания Spark/Hive/Hadoop Map Reduce, а также выполнять координацию и планирование.


Давайте сделаем это!


Вы должны использовать airflow-ambari-mpack (пакет управления Apache Airflow для Apache Ambari), я использовал реализацию с открытым исходным кодом от FOSS Contributor https://github.com/miho120/ambari-airflow-mpack, Спасибо за ваш вклад.


Следующие шаги:


Из предыдущего сообщения в блоге вы должны выполнить шаги с 1 по 4, чтобы установить RabbitMQ и другие пакеты.


Ref. Как настроить мультинодовый кластер Airflow с помощью Celery и RabbitMQ


  1. Устанавливаем Apache MPack для Airflow

a. git clone https://github.com/miho120/ambari-mpack.gitb. stop ambari serverc. install the apache mpack for airflow on ambari serverd. start ambari server

  1. Добавляем Airflow Service в Ambari

После успешного выполнения вышеуказанных шагов вы можете открыть интерфейс Ambari


http://<HOST_NAME>:8080

Откройте пользовательский интерфейс Ambari (Ambari UI), нажмите Действия -> Добавить службу. (Actions -> Add Service)



HDP Ambari Dashboard


Если шаг 1 выполнен успешно, вы сможете увидеть Airflow как часть службы Ambari.



Сервис Airflow в Ambari


Вы должны выбрать, на каком узле вы хотите установить веб-сервер, планировщик и воркер. Я бы порекомендовал установить Airflow на веб-сервер, планировщик на master ноде, то есть на узле имени, и на Install Worker на data-нодах.



Ambari Master нода / Name нода для Airflow


Как вы можете видеть на изображении выше, веб-сервер Airflow и планировщик Airflow установлены на Name ноде кластера Hadoop / Spark.



Как вы можете видеть на скриншоте выше, служба Airflow Worker установлена на Data ноде кластера.


В итоге, у меня есть 3 воркер (worker) ноды на трех data нодах.



Сервис Airflow в Ambari



Ambari UI: 3 воркера в Airflow


Вы можете добавлять рабочие нода столько, сколько хотите, вы можете добавлять / удалять воркеров, когда захотите. Эта стратегия может масштабироваться по горизонтали + вертикали.


Конфигурация Airflow в Ambari:


Нажмите на Airflow Service, а затем на Config в пользовательском интерфейсе Ambari.



Конфигурация Airflow в Ambari


  1. Смените Executor

executor = CeleryExecutor


В разделе Advanced airflow-core-site укажите Executor как CeleryExecutor.


  1. SQL Alchemy Connection

sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow


SQL Alchemy Connection


Измените соединение SQL Alchemy на соединение postgresql, пример приведен выше.


  1. URL-адрес брокера

broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow


URL-адрес брокера и Celery result backend для Airflow


  1. Прочие

dags_are_paused_at_creation = Trueload_examples = False


Конфигурация Airflow-core-site.


После того, как все эти изменения будут внесены в конфигурацию Ambari Airflow, Ambari попросит вас перезапустить все затронутые службы, перезапустите службы и нажмите Service Actions -> InitDB.



Airflow Initdb из Ambari


А затем запустите службу airflow. Теперь у вас должен получиться мультинодовый кластер Airflow.


Кое-что из Чек-листа для проверки служб для мультинодового кластера Airflow:


  1. Очереди RabbitMQ должны быть запущены:


  1. Подключения RabbitMQ должны быть активными:


  1. Каналы RabbitMQ должны быть запущены:


  1. Отображение Celery Flower

Celery Flower это веб-инструмент для мониторинга и управления кластерами Celery. Номер порта по умолчанию 5555.



Вы также можете видеть здесь, что 3 рабочих находятся в сети, и вы можете отслеживать одну единицу задачи Celery здесь.


Подробнее о Celery Flower: https://flower.readthedocs.io/en/latest/


Обратите внимание, что вы также можете запустить Celery Flower, веб-интерфейс, созданный поверх Celery, для наблюдения за своими рабочими. Вы можете использовать команду быстрого доступа airflow flower, чтобы запустить веб-сервер Flower.


nohup airflow flower >> /var/local/airflow/logs/flower.logs &

Мы закончили установку и настройку мультинодовый кластер Airflow на Ambari HDP Hadoop / Spark Cluster.


Я столкнулся с некоторыми проблемами, о которых я расскажу в следующей статье в блоге.

Подробнее..

Собираем данные AlphaVantage с Faust. Часть 1. Подготовка и введение

20.09.2020 14:22:27 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть I Введение

20.09.2020 16:05:00 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть II Агенты и Команды

23.09.2020 04:06:02 | Автор: admin

Оглавление

  1. Часть I: Введение

  2. Часть II: Агенты и Команды

Что мы тут делаем?

Итак-итак, вторая часть. Как и писалось ранее, в ней мы сделаем следующее:

  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.

  2. Сделаем агента, который будет собирать данные о ценных бумагах и мета информацию по ним.

Но, это то, что мы сделаем для самого проекта, а в плане исследования faust мы узнаем, как писать агентов, обрабатывающих стрим событий из kafka, а так же как написать команды (обёртка на click), в нашем случаи - для ручного пуша сообщения в топик, за которым следит агент.

Подготовка

Клиент AlphaVantage

Для начала, напишем небольшой aiohttp клиентик для запросов на alphavantage.

alphavantage.py

Spoiler
import urllib.parse as urlparsefrom io import StringIOfrom typing import Any, Dict, List, Unionimport aiohttpimport pandas as pdimport stringcasefrom loguru import loggerfrom horton.config import API_ENDPOINTclass AlphaVantageClient:    def __init__(        self,        session: aiohttp.ClientSession,        api_key: str,        api_endpoint: str = API_ENDPOINT,    ):        self._query_params = {"datatype": "json", "apikey": api_key}        self._api_endpoint = api_endpoint        self._session = session    @logger.catch    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:        formatted_data = {}        for field, item in data.items():            formatted_data[stringcase.snakecase(field)] = item        return formatted_data    @logger.catch    async def _construct_query(        self, function: str, to_json: bool = True, **kwargs    ) -> Union[Dict[str, Any], str]:        path = "query/"        async with self._session.get(            urlparse.urljoin(self._api_endpoint, path),            params={"function": function, **kwargs, **self._query_params},        ) as response:            data = (await response.json()) if to_json else (await response.text())            if to_json:                data = self._format_fields(data)        return data    @logger.catch    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)        data = pd.read_csv(StringIO(data))        securities = data.to_dict("records")        for index, security in enumerate(securities):            security = self._format_fields(security)            security["_type"] = "physical"            securities[index] = security        return securities    @logger.catch    async def get_security_overview(self, symbol: str) -> Dict[str, str]:        return await self._construct_query("OVERVIEW", symbol=symbol)    @logger.catch    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query(            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"        )    @logger.catch    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)    @logger.catch    async def get_indicator_data(        self, symbol: str, indicator: str, **indicator_options    ) -> Dict[str, Any]:        return await self._construct_query(            indicator, symbol=symbol, **indicator_options        )

Собственно по нему всё ясно:

  1. API AlphaVantage достаточно просто и красиво спроектирована, поэтому все запросы я решил проводить через метод construct_query где в свою очередь идёт http вызов.

  2. Все поля я привожу к snake_case для удобства.

  3. Ну и декорация logger.catch для красивого и информативного вывода трейсбека.

P.S. Незабываем локально добавить токен alphavantage в config.yml, либо экспортировать переменную среды HORTON_SERVICE_APIKEY. Получаем токен тут.

CRUD-класс

У нас будет коллекция securities для хранения мета информации о ценных бумагах.

database/security.py

Тут по-моему ничего пояснять не нужно, а базовый класс сам по себе достаточно прост.

get_app()

Добавим функцию создания объекта приложения в app.py

Spoiler
import faustfrom horton.config import KAFKA_BROKERSdef get_app():    return faust.App("horton", broker=KAFKA_BROKERS)

Пока у нас будет самое простое создание приложения, чуть позже мы его расширим, однако, чтобы не заставлять вас ждать, вот референсы на App-класс. На класс settings тоже советую взглянуть, так как именно он отвечает за большую часть настроек.

Основная часть

Агент сбора и сохранения списка ценных бумаг

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Так, сначала получаем объект faust-приложения - это достаточно просто. Далее, мы явно объявляем топик для нашего агента... Тут стоит упомянуть, что это такое, что за параметр internal и как это можно устроить по-другому.

  1. Топики в kafka, если мы хотим узнать точное определение, то лучше прочитать офф. доку, либо можно прочитать конспект на хабре на русском, где так же всё достаточно точно отражено :)

  2. Параметр internal, достаточно хорошо описанный в доке faust, позволяет нам настраивать топик прямо в коде, естественно, имеются ввиду параметры, предусмотренные разработчиками faust, например: retention, retention policy (по-умолчанию delete, но можно установить и compact), кол-во партиций на топик (partitions, чтобы сделать, например, меньшее чем глобальное значение приложения faust).

  3. Вообще, агент может создавать сам управляемый топик с глобальными значениями, однако, я люблю объявлять всё явно. К тому же, некоторые параметры (например, кол-во партиций или retention policy) топика в объявлении агента настроить нельзя.

    Вот как это могло было выглядеть без ручного определения топика:

app = get_app()@app.agent()async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Ну а теперь, опишем, что будет делать наш агент :)

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for _ in stream:            logger.info("Start collect securities")            client = AlphaVantageClient(session, API_KEY)            securities = await client.get_securities()            for security in securities:                await SecurityCRUD.update_one(                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True                )            yield True

Итак, в начале агента мы открываем aiohttp сессию для запросов через наш клиент. Таким образом, при запуске воркера, когда будет запущен наш агент, сразу же будет открыта сессия - одна, на всё время работы воркера (или несколько, если изменить параметр concurrency у агента с дефолтной единички).

Далее, мы идём по стриму (сообщение мы помещаем в _, так как нам, в данном агенте, безразлично содержание) сообщений из нашего топика, если они есть при текущем сдвиге (offset), иначе, наш цикл будет ожидать их поступления. Ну а внутри нашего цикла, мы логируем поступление сообщения, получаем список активных (get_securities возвращает по-умолчания только active, см. код клиента) ценных бумаг и сохраняем его в базу, проверяя при этом, есть ли бумага с таким тикером и биржей в БД, если есть, то она (бумага) просто обновится.

Запустим наше творение!

> docker-compose up -d... Запуск контейнеров ...> faust -A horton.agents worker --without-web -l info

P.S. Возможности веб-компонента faust я рассматривать в статьях не буду, поэтому выставляем соответствующий флаг.

В нашей команде запуска мы указали faust'у, где искать объект приложения и что делать с ним (запустить воркер) с уровнем вывода логов info. Получаем следующий вывод:

Spoiler
aS v1.10.4 id           horton                                             transport    [URL('kafka://localhost:9092')]                    store        memory:                                            log          -stderr- (info)                                    pid          1271262                                            hostname     host-name                                          platform     CPython 3.8.2 (Linux x86_64)                       drivers                                                           transport  aiokafka=1.1.6                                       web        aiohttp=3.6.2                                      datadir      /path/to/project/horton-data                       appdir       /path/to/project/horton-data/v1                   ... логи, логи, логи ...Topic Partition Set topic                       partitions  collect_securities          {0-7}       horton-__assignor-__leader  {0}         

Оно живое!!!

Посмотрим на partition set. Как мы видим, был создан топик с именем, которое мы обозначили в коде, кол-во партиций дефолтное (8, взятое из topic_partitions - параметра объекта приложения), так как у нашего топика мы индивидуальное значение (через partitions) не указывали. Запущенному агенту в воркере отведены все 8 партициций, так как он единственный, но об этом будет подробнее в части про кластеринг.

Что же, теперь можем зайти в другое окно терминала и отправить пустое сообщение в наш топик:

> faust -A horton.agents send @collect_securities{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. с помощью @ мы показываем, что посылаем сообщение в топик с именем "collect_securities".

В данном случае, сообщение ушло в 6 партицию - это можно проверить, зайдя в kafdrop на localhost:9000

Перейдя в окно терминала с нашим воркером, мы увидим радостное сообщение, посланное с помощью loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Так же, можем заглянуть в mongo (с помощью Robo3T или Studio3T) и увидеть, что ценные бумаги в базе:

Я не миллиардер, а потому, довольствуемся первым вариантом просмотра.

Счастье и радость - первый агент готов :)

Агент готов, да здравствует новый агент!

Да, господа, нами пройдена только 1/3 пути, уготованного этой статьёй, но не унывайте, так как сейчас будет уже легче.

Итак, теперь нам нужен агент, который собирает мета информацию и складывает её в документ коллекции:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[?],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            ...

Так как этот агент будет обрабатывать информацию о конкретной security, нам нужно в сообщении указать тикер (symbol) этой бумаги. Для этого в faust существуют Records - классы, декларирующие схему сообщения в топике агента.

В таком случае перейдём в records.py и опишем, как должно выглядеть сообщение у этого топика:

import faustclass CollectSecurityOverview(faust.Record):    symbol: str    exchange: str

Как вы уже могли догадаться, faust для описания схемы сообщения использует аннотацию типов в python, поэтому и минимальная версия, поддерживаемая библиотекой - 3.6.

Вернёмся к агенту, установим типы и допишем его:

collect_security_overview_topic = app.topic(    "collect_security_overview", internal=True, value_type=CollectSecurityOverview)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[CollectSecurityOverview],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            logger.info(                "Start collect security [{symbol}] overview", symbol=event.symbol            )            client = AlphaVantageClient(session, API_KEY)            security_overview = await client.get_security_overview(event.symbol)            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)            yield True

Как видите, мы передаём в метод инициализации топика новый параметр со схемой - value_type. Далее, всё по той же самой схеме, поэтому останавливаться на чём то ещё - смысла не вижу.

Ну что же, последний штрих - добавим в collect_securitites вызов агента сбора мета информации:

....for security in securities:    await SecurityCRUD.update_one({            "symbol": security["symbol"],            "exchange": security["exchange"]        },        security,        upsert = True,    )    await collect_security_overview.cast(        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])    )....

Используем ранее объявлению схему для сообщения. В данном случае, я использовал метод .cast, так как нам не нужно ожидать результат от агента, но стоит упомянуть, что способов послать сообщение в топик:

  1. cast - не блокирует, так как не ожидает результата. Нельзя послать результат в другой топик сообщением.

  2. send - не блокирует, так как не ожидает результата. Можно указать агента в топик которого уйдёт результат.

  3. ask - ожидает результата. Можно указать агента в топик которого уйдёт результат.

Итак, на этом с агентами на сегодня всё!

Команда мечты

Последнее, что я обещал написать в этой части - команды. Как уже говорилось ранее, команды в faust - это обёртка над click. Фактически faust просто присоединяет нашу кастомную команду к своему интерфейсу при указании ключа -A

После объявленных агентов в agents.py добавим функцию с декоратором app.command, вызывающую метод cast у collect_securitites:

@app.command()async def start_collect_securities():    """Collect securities and overview."""    await collect_securities.cast()

Таким образом, если мы вызовем список команд, в нём будет и наша новая команда:

> faust -A horton.agents --help....Commands:  agents                    List agents.  clean-versions            Delete old version directories.  completion                Output shell completion to be evaluated by the...  livecheck                 Manage LiveCheck instances.  model                     Show model detail.  models                    List all available models as a tabulated list.  reset                     Delete local table state.  send                      Send message to agent/topic.  start-collect-securities  Collect securities and overview.  tables                    List available tables.  worker                    Start worker instance for given app.

Ею мы можем воспользоваться, как любой другой, поэтому перезапустим faust воркер и начнём полноценный сбор ценных бумаг:

> faust -A horton.agents start-collect-securities

Что будет дальше?

В следующей части мы, на примере оставшихся агентов, рассмотрим, механизм sink для поиска экстремум в ценах закрытия торгов за год и cron-запуск агентов.

На сегодня всё! Спасибо за прочтение :)

Код этой части

P.S. Под прошлой частью меня спросили про faust и confluent kafka (какие есть у confluent фичи). Кажется, что confluent во многом функциональнее, но дело в том, что faust не имеет полноценной поддержки клиента для confluent - это следует из описания ограничений клиентов в доке.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru