Русский
Русский
English
Статистика
Реклама

Mongo

Паттерн сага как способ обеспечения консистентности данных

18.09.2020 14:13:41 | Автор: admin
Всем привет. Уже сейчас в OTUS открывает набор в новую группу курса Highload Architect. В связи с этим я продолжаю серию своих публикаций, написанных специально для этого курса, а также приглашаю вас на свой бесплатный демо урок по теме: Индексы в MySQL: best practices и подводные камни. Записаться на вебинар можно тут.



Введение


Как известно, переход от монолита к микросервисной архитектуре вызывает ряд сложностей, связанных как с технической частью проекта, так и с человеческим фактором. Одной из самых сложных технических проблем вызывает обеспечение согласованности в распределенной системе.

В прошлый раз мы обсудили причины возникновения проблем с согласованностью в микросервисной архитектуре, оптимистичный подход в обеспечению согласованности и обеспечение согласованности с применением двухфазного коммита.

Паттерн Сага


Сага это механизм, обеспечивающий согласованность данных в микросервисной архитектуры без применения распределенных транзакций.

Для каждой системной команды, которой надо обновлять данные в нескольких сервисах, создается некоторая сага. Сага представляет из себя некоторый чек-лист, состоящий из последовательных локальных ACID-транзакций, каждая из которых обновляет данные в одном сервисе. Для обработки сбоев применяется компенсирующая транзакция. Такие транзакции выполняются в случае сбоя на всех сервисах, на которых локальные транзакции выполнились успешно.

Типов транзакций в саге несколько, целых четыре:

  • Компенсирующая отменяет изменение, сделанное локальной транзакцией.
  • Компенсируемая это транзакция, которую необходимо компенсировать (отменить) в случае, если последующие транзакции завершаются неудачей.
  • Поворотная транзакция, опеределяющая успешность всей саги. Если она выполняется успешно, то сага гарантированно дойдет до конца.
  • Повторяемая идет после поворотной и гарантированно завершается успехом.

Организовывать сагу можно с помощью хореографии или оркестрации.

В случае с хореографической саги выделенный оркестратор отсутствует. На примере сервиса заказов и пользователей она может выглядеть так: сервис заказов получает запрос и создает заказ в состоянии PENDING, а затем публикует событие Заказ создан. Обработчик событий в сервисе пользователей обрабатывает данное событие, пытается зарезервировать товар и публикует результат в виде события. Сервис заказов обрабывает данное событие, подтверждая или отменяя заказ в зависимости от прочитанного результата.

Сага с оркестрацией выглядит чуть более интересно. На примере указанных выше сервисов может получиться так: сервис заказов получает запрос, создает сагу, которая создает заказ в состоянии PENDING, а затем отправляет команду резервирования товара для сервиса пользователей. Сервис пользователей пытается зарезервировать товар и отправляет ответное сообщение с указанием результата. Сага одобряет или отменяет заказ.

Паттерн сага позволяет приложению поддерживать согласованность данных между нескольких сервисов без использования распределенных транзакций (двухфазных коммитов) и с избежанием проблем, обсужденных в предыдущей статье. Но с другой стороны, сильно осложняется модель программирования: например, разработчик для каждой транзакции должен писать компенсирующую транзакцию, которая отменяет изменения, сделанные внутри саги ранее.

Сага позволяет добиться ACD-модели (Atomicity + Consistency + Durability в терминах ACID), но одну букву мы потеряли. Недостаток буквы I приводит к известным проблемам недостатка изолированности. К ним относятся: потерянные обновления (lost updates) одна сага перезаписывает изменения, внесенные другой, не читая их при этом, грязное чтение (dirty reads) транзакция или сага читают незавершенные обновления другой саги, нечеткое/неповторяемое чтение (fuzzy/nonrepeatable reads) два разных этапа саги читают одни и те же данные, но получают разные результаты, потому что другая сага внесла изменения. Существует ряд паттернов, позволяющих пофиксить те или иные аномалии: семантическая блокировка, коммутативные обновления, пессимистическое представление, повторное чтение значения, файл изменений и по значению. Вопрос обеспечения изоляции остается открытым.

Еще одна интересная проблема заключается в невозможности атомарных обновления базы данных и публикации сообщения в брокер сообщений для запуска дальнейших шагов саги.

Заключение


Мы поговорили о способах организации саги с применением хореографии и оркестрации, а также о проблемах, которые влечет применения данного паттерна. Далее мы поговорим о способах исправления некоторых аномалий и транзакционной отправки сообщений в брокер сообщений.

Подробнее..

Из песочницы Serverless и полтора программиста

05.08.2020 18:06:10 | Автор: admin

В повседневной продуктовой разработке, запертой в корпоративных технологических ограничениях, редко выпадает случай шагнуть за грань добра и зла в самое пекло хипстерских технологий. Но, когда ты сам несешь все риски и каждый день разработки вынимает деньги из твоего собственного кармана очень хочется срезать путь. В один из таких моментов я решил шагнуть в такой темный серверлес, о котором раньше думать было как-то стыдно. Под впечатлением от того, что получилось, я даже хотел написать статью Конец гегемонии программистов, но спустя полгода эксплуатации и развития проекта понял, что ну не совсем еще конец и остались еще места в этом самом serverless-бэкенде, где надо использовать знания и опыт.


Архитектура


Первое что сделал, вычеркнул из списка ограничений страх вендорлока. Надо еще дожить до масштаба, чтобы это стало проблемой.


Второе решение было не хочу ничего менеджить собственными силами, цена devopsов сейчас слишком высока, одной зарплатой можно оплатить год managed-решений в облаках.


Третье, пожалуй самое безрассудное и на грани слабоумия и отваги, рискнуть пойти на новорожденное решение от MongoDB, которое в тот момент называлось Stitch, а сейчас называется Realm (но это не совсем тот самый Realm, а ядреная смесь из Stitch и Realm, которая получилась после приобретения последнего MongoDB, Inc в конце 2019 года)


Backend


В результате серверная сторона вышла вот такой:



Node и Redis в ней появились только для реализации Server Side Rendering и кэширования (ну и для того, чтобы не кормить Atlas лишними платными запросами), для тех кто не решает задачи CEO-оптимизации их можно легко выкинуть.


В результате бэк масштабируется в пару кликов до практически любого размера. И самое главное стоит копейки, за счет того, что платный computed-runtime по большей части потребляется один раз на измененные данные и сохраняется в кеше.


Frontend


Клиентская часть классическая: React + Redux + Redux-Saga + TypeScript



Здесь не стал экспериментировать, потому что клиент планировался сложный и довольно толстый, чтобы его можно было развивать и поддерживать нужно было что-то более-менее вменяемое. Ну и еще один ключевой момент, эти технологии знают почти все, поэтому легче найти разработчиков.


Аутентификация и авторизация


Ну а теперь самое интересно, для чего вообще нужно было все это мракобесие с Mongo.Realm. Вместе с лямбдами мы получаем полноценную интеграцию из коробки с ворохом способов аутентификации (Google, Apple, Facebook, Email/Password и прочими) и механизмом авторизации операций до уровня полей в коллекциях:



Кроме этого решение о доступе можно принимать по результатам выполнения функции и таким образом вполне реализуем полноценный механизм цепочки голосующих (пользователь, роль, группа, замещения, суперпользователь и т.д.).


Прочие радости


Также облачная монга нам из коробки дает sync наборов данных с клиентом, push-уведомления, хранилище секретов, триггеры (на события базы, аутентификация, работу по расписанию), вебхуки, хорошие логи и еще много чего. То есть снаружи оно обслуживается и воспринимается как полноценный сервис, при этом не приходится решать вопросы масштабирования и переноса между датацентрами, бекапов, мониторинга и кучи других задач.


Ну, а для самых безбашенных, есть возможность работы через GraphQL.


Как все это работает



В случае если попали в кэш, при 100 RPS на один экземпляр (в конфигурации по одному ядру и одному гигабайту на один экземпляр Node.js под управлением PM2), время ответа укладывается в 200 мс, в противном случае вместе со всеми запросами к Mongo серверный рендер отрабатывает до 500 мс.


В работе с Mongo.Realm есть нюансы, которые никак не отражены в документации, но проявляются во всех недорогих инстансах с разделяемой памятью (M1, M2, M5): если запросы выполняются от имени клиента, то, видимо в качестве защиты от перегрузок, периодически время ответа на какой-нибудь aggregation-pipeline может резко вырасти до 5-10 секунд на запрос. При этом, если вызывается серверная функция (с тем же самым aggregation-pipeline), которая выполняется от имени системного пользователя, то таких трюков не наблюдается.
Возможно дело именно в типе кластера, и со временем это исправят или все решится переходом на М10 и выше, но сейчас для некоторых сложных запросов пришлось пойти на рискованный шаг и для чтения данных анонимными пользователями сделать несколько функций исполняемых от имени системного пользователя, в этом случае правила авторизации для доступа к данным игнорируются, и за безопасностью надо следить уже самим в коде.



В случае аутентифицированного доступа Server Side Rendering не нужен, все работает прямо на клиенте.


Выводы


В заголовок вынесен ресурс потраченный на разработку, а именно полтора программиста (1 фронтендер и бэкендера). Ровно столько и 5 месяцев работы, понадобилось для вывода в прод довольно развесистого портала с собственной системой управления контентом, интеграцией с несколькими поставщиками данных включая нечеткие сопоставления, оптимизированного под самые суровые требования по SEO и c поддержкой мобильный браузеров как first class citizen.


Весь мой предыдущий, более чем 16-летний опыт управления разработкой говорил, что ресурсоемкость этого проекта будет раза в 4 выше.


Дальнейшая эксплуатация и развитие безусловно покажут насколько опрометчивы были эти решения, но на текущий момент сэкономлено примерно 1,5 миллиона и все работает как надо.

Подробнее..

Проблематика распределенных транзакций в контексте микросервисной архитектуры

27.08.2020 10:09:54 | Автор: admin
Всем привет. Уже в сентябре OTUS открывает набор в новую группу курса Highload Architect. В связи с этим я продолжаю серию своих публикаций, написанных специально для этого курса, а также приглашаю вас на свой бесплатный вебинар, в рамках которого я подробно расскажу о программе курса и формате обучения в OTUS. Записаться на вебинар можно тут.



Введение


Как известно, переход от монолита к микросервисной архитектуре вызывает ряд сложностей, связанных как с технической частью проекта, так и с человеческим фактором. Одной из самых сложных технических проблем вызывает обеспечение согласованности в распределенной системе.

Согласованность


Достаточно тонким моментом является то, что согласованность в контексте распределенных систем отличается от согласованности в контексте баз данных. Далее под согласованностью мы будем понимать именно первое: незавершенная (ошибочная) операция не вносит никаких эффектов и не меняет данные, при конкурентном доступе к данным все операции рассматриваются как атомарные (нельзя увидеть промежуточный результат операции), если у данных имеется несколько копий (репликация), то последовательность применения операций на всех копиях одна и та же. То есть на самом деле мы хотим получить ACID транзакцию, но только распределенную.

Причина проблемы


Почему обеспечение согласованности затруднено именно в микросервисной архитектуре? Дело в том, что данный архитектурный стиль зачастую предполагает применение паттерна database per service. Позволю себе напомнить, что этот паттерн заключается в том, что у каждого микросервиса своя независимая база или базы (базы, потому что помимо первичного источника данных может использоваться, например, кеш). Такой подход позволяет с одной стороны не добавлять неявные связи по формату данных между микросервисами (микросервисы взаимодействуют только явно через API), с другой стороны по максимуму использовать такое преимущество микросервисной архитектуры как technology agnostic (мы можем выбирать подходящую под особую нагрузку на микросервис технологию хранения данных). Но при всем при этом мы потеряли гарантию согласованности данных. Посудите сами, монолит общался с одной большой базой, которая предоставляла возможности по обеспечению ACID транзакций. Теперь баз данных стало много, а вместо одной большой ACID транзакции у нас много небольших ACID транзакций. Нашей задачей будет объединение всех этих транзакций в одну распределенную.

Оптимистичная согласованность


Первое что может прийти в голову это концепция оптимистичной согласованности: мы совершаем столько транзакций сколько хотим на столько механизмов хранения сколько нужно. При этом мы ждем, что все будет хорошо, а если все плохо, то говорим, что все будет хорошо в итоге. Если все плохо в итоге, то говорим: Да, такое случается, но с крайне низкой вероятностью.

Если без шуток, то пренебрежение обеспечением согласованности в случае ее некритичности для бизнеса является хорошим решением, особенно если учесть то, каких усилий нам будет стоить ее обеспечение (в чем, как я надеюсь, вы убедитесь несколько позже).

Варианты обеспечения консистентности


Если для бизнеса все-таки согласованность является критичной, можно попытаться ее обеспечить несколькими способами. Если мы говорим о ситуации, когда данные обновляются одним сервисом (например имеет место репликация базы данных), то можно применить стандартные алгоритмы обеспечения консистентности такие как Paxos или Raft. Такие транзакции называются гомогенными. Если данные обновляются несколькими сервисами (то есть имеет место гетерогенная транзакция), то как тут как раз и начинаются сложности, о которых мы говорили выше.

С одной стороны мы можем все-таки обойти необходимость обеспечения распределенной транзакции путем стремления к service-based архитектуры (объединяем сервисы таким образом, чтобы транзакция была гомогенная). Такое решение не очень каноничное с точки зрения принципов микросервисной архитектуры, но зато оно технически намного проще из-за чего часто применяется на практике. С другой стороны мы можем оставить каноничные микросервисы, но при этом применить один из механизмов обеспечения распределенных транзакций: двухфазный коммит или сагу. В этой статье будет изучен первый вариант, а второй мы обсудим в следующий раз.

Двухфазный коммит


Механизм предельно прост: есть некоторый transaction manager, который собственно оркестрирует транзакцию. На первом этапе (prepare) transaction manager подает соответствующую команду для resource manager'ов, по которой они в свои журналы записывают данные, которые будут закоммичены. Получив подтверждение ото всех resource manager'ов об успешном завершении первого этапа, transaction manager начинает второй этап и подает следующую команду (commit), по которой resource manager'ы применяют принятые ранее изменения.

Несмотря на кажущуюся простоту, такой подход обладает рядом недостатков. Во-первых, если хотя бы один resource manager даст сбой на втором этапе, вся транзакция должна быть отменена. Таким образом, нарушается один из принципов микросервисной архитектуры устойчивость к отказам (когда мы приходили к распределенной системе, мы сразу закладывались на то, что отказ в ней является нормой а не исключительной ситуацией). Более того, если отказов будет много (а их будет много), то процесс отмены транзакций необходимо будет автоматизировать (в том числе и писать транзакции, откатывающие транзакции). Во-вторых, сам transaction manager является единой точкой отказа. Он должен уметь транзакционно выдавать id-шники транзакциям. В-третьих, поскольку хранилищу подаются специальные команды, логично предположить, что хранилище должно уметь это делать, то есть соответствовать стандарту XA, а не все современные технологии ему соответствуют (такие брокеры как Kafka, RabbitMQ и NoSQL решения как MongoDB и Cassandra не поддерживают двухфазные коммиты).

Вывод, напрашивающийся из всех этих факторов, был отлично сформулирован Крисом Ричардсоном: 2PC not an option (двухфазный коммит не вариант).

Вывод


Мы разобрались почему распределенные транзакции являются основной технической болью микросервисной архитектуры и поговорили о различных вариантах решения данной задачи, детально обсудили механизм двухфазного коммита.



Приглашаю всех записаться на свой вебинар о курсе, в рамках которого я подробно расскажу о формате обучения и ознакомлю всех желающих с программой обучения.




Читать ещё:


Подробнее..

Делаем поиск в веб-приложении с нуля

05.11.2020 18:21:17 | Автор: admin
В статье Делаем современное веб-приложение с нуля я рассказал в общих чертах, как выглядит архитектура современных высоконагруженных веб-приложений, и собрал для демонстрации простейшую реализацию такой архитектуры на стеке из нескольких предельно популярных и простых технологий и фреймворков. Мы построили single page application с server side rendering, поддерживающее просмотр неких карточек, набранных в Markdown, и навигацию между ними.

В этой статье я затрону чуть более сложную и интересную (как минимум мне, разработчику команды поиска) тему: полнотекстовый поиск. Мы добавим в наш контейнерный рай ноду Elasticsearch, научимся строить индекс и делать поиск по контенту, взяв в качестве тестовых данных описания пяти тысяч фильмов из TMDB 5000 Movie Dataset. Также мы научимся делать поисковые фильтры и копнём совсем немножко в сторону ранжирования.




Инфраструктура: Elasticsearch


Elasticsearch популярное хранилище документов, умеющее строить полнотекстовые индексы и, как правило, используемое именно как поисковый движок. Elasticsearch добавляет к движку Apache Lucene, на котором он основан, шардирование, репликацию, удобный JSON API и ещё миллион мелочей, которые сделали его одним из самых популярных решений для полнотекстового поиска.

Давайте добавим одну ноду Elasticsearch в наш docker-compose.yml:

services:  ...  elasticsearch:    image: "elasticsearch:7.5.1"    environment:      - discovery.type=single-node    ports:      - "9200:9200"  ...


Переменная окружения discovery.type=single-node подсказывает Elasticsearch, что надо готовиться к работе в одиночку, а не искать другие ноды и объединяться с ними в кластер (таково поведение по умолчанию).

Обратите внимание, что мы публикуем 9200 порт наружу, хотя наше приложение ходит в него внутри сети, создаваемой docker-compose. Это исключительно для отладки: так мы сможем обращаться в Elasticsearch напрямую из терминала (до тех пор, пока не придумаем более умный способ об этом ниже).

Добавить клиент Elasticsearch в наш вайринг не составит труда благо, Elastic предоставляет минималистичный Python-клиент.

Индексация


В прошлой статье мы положили наши основные сущности карточки в коллекцию MongoDB. Из коллекции мы умеем быстро извлекать их содержимое по идентификатору, потому что MongoDB построила для нас прямой индекс в ней для этого используются B-деревья.

Теперь же перед нами стоит обратная задача по содержимому (или его фрагментам) получить идентификаторы карточек. Стало быть, нам нужен обратный индекс. Для него-то нам и пригодится Elasticsearch!

Общая схема построения индекса обычно выглядит как-то так.
  1. Создаём новый пустой индекс с уникальным именем, конфигурируем его как нам нужно.
  2. Обходим все наши сущности в базе и кладём их в новый индекс.
  3. Переключаем продакшн, чтобы все запросы начали ходить в новый индекс.
  4. Удаляем старый индекс. Тут по желанию вы вполне можете захотеть хранить несколько последних индексов, чтобы, например, удобнее было отлаживать какие-то проблемы.


Давайте создадим скелет индексатора и потом разберёмся подробнее с каждым шагом.

import datetimefrom elasticsearch import Elasticsearch, NotFoundErrorfrom backend.storage.card import Card, CardDAOclass Indexer(object):    def __init__(self, elasticsearch_client: Elasticsearch, card_dao: CardDAO, cards_index_alias: str):        self.elasticsearch_client = elasticsearch_client        self.card_dao = card_dao        self.cards_index_alias = cards_index_alias    def build_new_cards_index(self) -> str:        # Построение нового индекса.        # Сначала придумываем для индекса оригинальное название.        index_name = "cards-" + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")        # Создаём пустой индекс.         # Здесь мы укажем настройки и опишем схему данных.        self.create_empty_cards_index(index_name)        # Кладём в индекс все наши карточки одну за другой.        # В настоящем проекте вы очень скоро захотите         # переписать это на работу в пакетном режиме.        for card in self.card_dao.get_all():            self.put_card_into_index(card, index_name)        return index_name    def create_empty_cards_index(self, index_name):        ...     def put_card_into_index(self, card: Card, index_name: str):        ...    def switch_current_cards_index(self, new_index_name: str):        ... 


Индексация: создаём индекс


Индекс в Elasticsearch создаётся простым PUT-запросом в /имя-индекса или, в случае использования Python-клиента (нашем случае), вызовом

elasticsearch_client.indices.create(index_name, {    ...})


Тело запроса может содержать три поля.

  • Описание алиасов ("aliases": ...). Система алиасов позволяет держать знание о том, какой индекс сейчас актуальный, на стороне Elasticsearch; мы поговорим про неё ниже.
  • Настройки ("settings": ...). Когда мы будем большими дядями с настоящим продакшном, мы сможем сконфигурировать здесь репликацию, шардирование и другие радости SRE.
  • Схема данных ("mappings": ...). Здесь мы можем указать, какого типа какие поля в документах, которые мы будем индексировать, для каких из этих полей нужны обратные индексы, по каким должны быть поддержаны агрегации и так далее.


Сейчас нас интересует только схема, и у нас она очень простая:

{    "mappings": {        "properties": {            "name": {                "type": "text",                "analyzer": "english"            },            "text": {                "type": "text",                "analyzer": "english"            },            "tags": {                "type": "keyword",                "fields": {                    "text": {                        "type": "text",                        "analyzer": "english"                    }                }            }        }    }}


Мы пометили поля name и text как текстовые на английском языке. Анализатор это сущность в Elasticsearch, которая обрабатывает текст перед сохранением в индекс. В случае english анализатора текст будет разбит на токены по границам слов (подробности), после чего отдельные токены будут лемматизированы по правилам английского языка (например, слово trees упростится до tree), слишком общие леммы (вроде the) будут удалены и оставшиеся леммы будут положены в обратный индекс.

С полем tags чуть-чуть сложнее. Тип keyword предполагает, что значения этого поля некие строковые константы, которые не надо обрабатывать анализатором; обратный индекс будет построен по их сырым значениям без токенизации и лемматизации. Зато Elasticsearch создаст специальные структуры данных, чтобы по значениям этого поля можно было считать агрегации (например, чтобы одновременно с поиском можно было узнать, какие теги встречались в документах, удовлетворяющих поисковому запросу, и в каком количестве). Это очень удобно для полей, которые по сути enum; мы воспользуемся этой фичей, чтобы сделать клёвые поисковые фильтры.

Но чтобы по тексту тегов можно было искать и текстовым поиском тоже, мы добавляем к нему подполе "text", настроенное по аналогии с name и text выше по существу это означает, что Elasticsearch во всех приходящих ему документах будет создавать ещё одно виртуальное поле под названием tags.text, в которое будет копировать содержимое tags, но индексировать его по другим правилам.

Индексация: наполняем индекс


Для индексации документа достаточно сделать PUT-запрос в /имя-индекса/_create/id-документа или, при использовании Python-клиента, просто вызвать нужный метод. Наша реализация будет выглядеть так:

    def put_card_into_index(self, card: Card, index_name: str):        self.elasticsearch_client.create(index_name, card.id, {            "name": card.name,            "text": card.markdown,            "tags": card.tags,        })


Обратите внимание на поле tags. Хотя мы описали его как содержащее keyword, мы отправляем не одну строку, а список строк. Elasticsearch поддерживает такое; наш документ будет находиться по любому из значений.

Индексация: переключаем индекс


Чтобы реализовать поиск, нам надо знать имя самого свежего полностью достроенного индекса. Механизм алиасов позволяет нам держать эту информацию на стороне Elasticsearch.

Алиас это указатель на ноль или более индексов. API Elasticsearch позволяет использовать имя алиаса вместо имени индекса при поиске (POST /имя-алиаса/_search вместо POST /имя-индекса/_search); в таком случае Elasticsearch будет искать по всем индексам, на которые указывает алиас.

Мы заведём алиас под названием cards, который всегда будет указывать на актуальный индекс. Соответственно, переключение на актуальный индекс после завершения построения будет выглядеть так:

    def switch_current_cards_index(self, new_index_name: str):        try:            # Нужно удалить ссылку на старый индекс, если она есть.            remove_actions = [                {                    "remove": {                        "index": index_name,                         "alias": self.cards_index_alias,                    }                }                for index_name in self.elasticsearch_client.indices.get_alias(name=self.cards_index_alias)            ]        except NotFoundError:            # Ого, старого индекса-то и не существует вовсе.            # Наверное, мы впервые запустили индексацию.            remove_actions = []        # Одним махом удаляем ссылку на старый индекс         # и добавляем ссылку на новый.        self.elasticsearch_client.indices.update_aliases({            "actions": remove_actions + [{                "add": {                    "index": new_index_name,                     "alias": self.cards_index_alias,                }            }]        })


Я не стану подробнее останавливаться на alias API; все подробности можно посмотреть в документации.

Здесь надо сделать ремарку, что в реальном высоконагруженном сервисе такое переключение может быть довольно болезненным и может иметь смысл сделать предварительный прогрев нагрузить новый индекс каким-то пулом сохранённых пользовательских запросов.

Весь код, реализующий индексацию, можно посмотреть в этом коммите.

Индексация: добавляем контент


Для демонстрации в этой статье я использую данные из TMDB 5000 Movie Dataset. Чтобы избежать проблем с авторскими правами, я лишь привожу код утилиты, импортирующей их из CSV-файла, который предлагаю вам скачать самостоятельно с сайта Kaggle. После загрузки достаточно выполнить команду

docker-compose exec -T backend python -m tools.add_movies < ~/Downloads/tmdb-movie-metadata/tmdb_5000_movies.csv


, чтобы создать пять тысяч карточек, посвящённых кино, и команду

docker-compose exec backend python -m tools.build_index


, чтобы построить индекс. Обратите внимание, что последняя команда на самом деле не строит индекс, а только ставит задачу в очередь задач, после чего она выполнится на воркере подробнее об этом подходе я рассказывал в прошлой статье. docker-compose logs worker покажут вам, как воркер старался!

Прежде, чем мы приступим к, собственно, поиску, нам хочется своими глазами увидеть, записалось ли что-нибудь в Elasticsearch, и если да, то как оно выглядит!

Наиболее прямой и быстрый способ это сделать воспользоваться HTTP API Elasticsearch. Сперва проверим, куда указывает алиас:

$ curl -s localhost:9200/_cat/aliasescards                cards-2020-09-20-16-14-18 - - - -


Отлично, индекс существует! Посмотрим на него пристально:

$ curl -s localhost:9200/cards-2020-09-20-16-14-18 | jq{  "cards-2020-09-20-16-14-18": {    "aliases": {      "cards": {}    },    "mappings": {      ...    },    "settings": {      "index": {        "creation_date": "1600618458522",        "number_of_shards": "1",        "number_of_replicas": "1",        "uuid": "iLX7A8WZQuCkRSOd7mjgMg",        "version": {          "created": "7050199"        },        "provided_name": "cards-2020-09-20-16-14-18"      }    }  }}


Ну и, наконец, посмотрим на его содержимое:

$ curl -s localhost:9200/cards-2020-09-20-16-14-18/_search | jq{  "took": 2,  "timed_out": false,  "_shards": {    "total": 1,    "successful": 1,    "skipped": 0,    "failed": 0  },  "hits": {    "total": {      "value": 4704,      "relation": "eq"    },    "max_score": 1,    "hits": [      ...    ]  }}


Итого в нашем индексе 4704 документа, а в поле hits (которое я пропустил, потому что оно слишком большое) можно даже увидеть содержимое некоторых из них. Успех!

Более удобным способом просмотра содержимого индекса и вообще всевозможного баловства с Elasticsearch будет воспользоваться Kibana. Добавим контейнер в docker-compose.yml:

services:  ...  kibana:    image: "kibana:7.5.1"    ports:      - "5601:5601"    depends_on:      - elasticsearch  ...


После повторного docker-compose up мы сможем зайти в Kibana по адресу localhost:5601 (внимание, сервер может стартовать небыстро) и, после короткой настройки, просмотреть содержимое наших индексов в симпатичном веб-интерфейсе.



Очень советую вкладку Dev Tools при разработке вам часто нужно будет делать те или иные запросы в Elasticsearch, и в интерактивном режиме с автодополнением и автоформатированием это гораздо удобнее.

Поиск


После всех невероятно скучных приготовлений пора нам уже добавить функциональность поиска в наше веб-приложение!

Разделим эту нетривиальную задачу на три этапа и обсудим каждый в отдельности.

  1. Добавляем в бэкенд компонент Searcher, отвечающий за логику поиска. Он будет формировать запрос к Elasticsearch и конвертировать результаты в более удобоваримые для нашего бэкенда.
  2. Добавляем в API эндпоинт (ручку/роут/как у вас в компании это называют?) /cards/search, осуществляющий поиск. Он будет вызывать метод компонента Searcher, обрабатывать полученные результаты и возвращать клиенту.
  3. Реализуем интерфейс поиска на фронтенде. Он будет обращаться в /cards/search, когда пользователь определился, что он хочет искать, и отображать результаты (и, возможно, какие-то дополнительные контролы).


Поиск: реализуем


Не так сложно написать менеджер, осуществляющий поиск, как его задизайнить. Давайте опишем результат поиска и интерфейс менеджера и обсудим, почему он такой, а не иной.

# backend/backend/search/searcher.pyimport abcfrom dataclasses import dataclassfrom typing import Iterable, Optional@dataclassclass CardSearchResult:    total_count: int    card_ids: Iterable[str]    next_card_offset: Optional[int]class Searcher(metaclass=abc.ABCMeta):    @abc.abstractmethod    def search_cards(self, query: str = "",                      count: int = 20, offset: int = 0) -> CardSearchResult:        pass


Какие-то вещи очевидны. Например, пагинация. Мы амбициозный молодой убийца IMDB стартап, и результаты поиска никогда не будут вмещаться на одну страницу!

Какие-то менее очевидны. Например, список ID, а не карточек в качестве результата. Elasticsearch по умолчанию хранит наши документы целиком и возвращает их в результатах поиска. Это поведение можно отключить, чтобы сэкономить на размере поискового индекса, но для нас это явно преждевременная оптимизация. Так почему бы не возвращать сразу карточки? Ответ: это нарушит single-responsibility principle. Возможно, когда-нибудь мы накрутим в менеджере карточек сложную логику, переводящую карточки на другие языки в зависимости от настроек пользователя. Ровно в этот момент данные на странице карточки и данные в результатах поиска разъедутся, потому что добавить ту же самую логику в поисковый менеджер мы забудем. И так далее и тому подобное.

Реализация этого интерфейса настолько проста, что мне было лень писать этот раздел :-(

# backend/backend/search/searcher_impl.pyfrom typing import Anyfrom elasticsearch import Elasticsearchfrom backend.search.searcher import CardSearchResult, SearcherElasticsearchQuery = Any  # для аннотаций типовclass ElasticsearchSearcher(Searcher):    def __init__(self, elasticsearch_client: Elasticsearch, cards_index_name: str):        self.elasticsearch_client = elasticsearch_client        self.cards_index_name = cards_index_name    def search_cards(self, query: str = "", count: int = 20, offset: int = 0) -> CardSearchResult:        result = self.elasticsearch_client.search(index=self.cards_index_name, body={            "size": count,            "from": offset,            "query": self._make_text_query(query) if query else self._match_all_query        })        total_count = result["hits"]["total"]["value"]        return CardSearchResult(            total_count=total_count,            card_ids=[hit["_id"] for hit in result["hits"]["hits"]],            next_card_offset=offset + count if offset + count < total_count else None,        )    def _make_text_query(self, query: str) -> ElasticsearchQuery:        return {            # Multi-match query делает текстовый поиск по             # совокупности полей документов (в отличие от match            # query, которая ищет по одному полю).            "multi_match": {                "query": query,                # Число после ^  приоритет. Найти фрагмент текста                # в названии карточки лучше, чем в описании и тегах.                "fields": ["name^3", "tags.text", "text"],            }        }    _match_all_query: ElasticsearchQuery = {"match_all": {}}


По сути мы просто ходим в API Elasticsearch и аккуратно достаём ID найденных карточек из результата.

Реализация эндпоинта тоже довольно тривиальна:

# backend/backend/server.py...    def search_cards(self):        request = flask.request.json        search_result = self.wiring.searcher.search_cards(**request)        cards = self.wiring.card_dao.get_by_ids(search_result.card_ids)        return flask.jsonify({            "totalCount": search_result.total_count,            "cards": [                {                    "id": card.id,                    "slug": card.slug,                    "name": card.name,                    # Здесь не нужны все поля, иначе данных на одной                    # странице поиска будет слишком много, и она будет                    # долго грузиться.                } for card in cards            ],            "nextCardOffset": search_result.next_card_offset,        })...


Реализация фронтенда, пользующегося этим эндпоинтом, хоть и объёмна, но в целом довольно прямолинейна и в этой статье я не хочу заострять на ней внимание. На весь код можно посмотреть в этом коммите.



So far so good, идём дальше.

Поиск: добавляем фильтры


Поиск по тексту это клёво, но если вы когда-нибудь пользовались поиском на серьёзных ресурсах, вы наверняка видели всякие плюшки вроде фильтров.

У наших описаний фильмов из базы TMDB 5000 помимо названий и описаний есть теги, так что давайте для тренировки реализуем фильтры по тегам. Наша цель на скриншоте: при клике на тег в выдаче должны остаться только фильмы с этим тегом (их число указано в скобках рядом с ним).



Чтобы реализовать фильтры, нам нужно решить две проблемы.
  • Научиться по запросу понимать, какой набор фильтров доступен. Мы не хотим показывать все возможные значения фильтра на каждом экране, потому что их очень много и при этом большинство будет приводить к пустому результату; нужно понять, какие теги есть у документов, найденных по запросу, и в идеале оставить N самых популярных.
  • Научиться, собственно, применять фильтр оставить в выдаче только документы с тегами, фильтр по которым выбрал пользователь.


Второе в Elasticsearch элементарно реализуется через API запросов (см. terms query), первое через чуть менее тривиальный механизм агрегаций.

Итак, нам надо знать, какие теги встречаются у найденных карточек, и уметь отфильтровывать карточки с нужными тегами. Сперва обновим дизайн поискового менеджера:

# backend/backend/search/searcher.pyimport abcfrom dataclasses import dataclassfrom typing import Iterable, Optional@dataclassclass TagStats:    tag: str    cards_count: int@dataclassclass CardSearchResult:    total_count: int    card_ids: Iterable[str]    next_card_offset: Optional[int]    tag_stats: Iterable[TagStats]class Searcher(metaclass=abc.ABCMeta):    @abc.abstractmethod    def search_cards(self, query: str = "",                      count: int = 20, offset: int = 0,                     tags: Optional[Iterable[str]] = None) -> CardSearchResult:        pass


Теперь перейдём к реализации. Первое, что нам нужно сделать завести агрегацию по полю tags:

--- a/backend/backend/search/searcher_impl.py+++ b/backend/backend/search/searcher_impl.py@@ -10,6 +10,8 @@ ElasticsearchQuery = Any  class ElasticsearchSearcher(Searcher): +    TAGS_AGGREGATION_NAME = "tags_aggregation"+     def __init__(self, elasticsearch_client: Elasticsearch, cards_index_name: str):         self.elasticsearch_client = elasticsearch_client         self.cards_index_name = cards_index_name@@ -18,7 +20,12 @@ class ElasticsearchSearcher(Searcher):         result = self.elasticsearch_client.search(index=self.cards_index_name, body={             "size": count,             "from": offset,             "query": self._make_text_query(query) if query else self._match_all_query,+            "aggregations": {+                self.TAGS_AGGREGATION_NAME: {+                    "terms": {"field": "tags"}+                }+            }         })


Теперь в поисковом результате от Elasticsearch будет приходить поле aggregations, из которого по ключу TAGS_AGGREGATION_NAME мы сможем достать бакеты, содержащие информацию о том, какие значения лежат в поле tags у найденных документов и как часто они встречаются. Давайте извлечём эти данные и вернём в удобоваримом виде (as designed above):

--- a/backend/backend/search/searcher_impl.py+++ b/backend/backend/search/searcher_impl.py@@ -28,10 +28,15 @@ class ElasticsearchSearcher(Searcher):         total_count = result["hits"]["total"]["value"]+        tag_stats = [+            TagStats(tag=bucket["key"], cards_count=bucket["doc_count"])+            for bucket in result["aggregations"][self.TAGS_AGGREGATION_NAME]["buckets"]+        ]         return CardSearchResult(             total_count=total_count,             card_ids=[hit["_id"] for hit in result["hits"]["hits"]],             next_card_offset=offset + count if offset + count < total_count else None,+            tag_stats=tag_stats,         )


Добавить применение фильтра самая лёгкая часть:

--- a/backend/backend/search/searcher_impl.py+++ b/backend/backend/search/searcher_impl.py@@ -16,11 +16,17 @@ class ElasticsearchSearcher(Searcher):         self.elasticsearch_client = elasticsearch_client         self.cards_index_name = cards_index_name -    def search_cards(self, query: str = "", count: int = 20, offset: int = 0) -> CardSearchResult:+    def search_cards(self, query: str = "", count: int = 20, offset: int = 0,+                     tags: Optional[Iterable[str]] = None) -> CardSearchResult:         result = self.elasticsearch_client.search(index=self.cards_index_name, body={             "size": count,             "from": offset,-            "query": self._make_text_query(query) if query else self._match_all_query,+            "query": {+                "bool": {+                    "must": self._make_text_queries(query),+                    "filter": self._make_filter_queries(tags),+                }+            },             "aggregations": {


Подзапросы, включённые в must-клаузу, обязательны к выполнению, но также будут учитываться при расчёте скоров документов и, соответственно, ранжировании; если мы когда-нибудь будем добавлять ещё какие-то условия на тексты, их лучше добавить сюда. Подзапросы в filter-клаузе только фильтруют, не влияя на скоры и ранжирование.

Осталось реализовать _make_filter_queries():

    def _make_filter_queries(self, tags: Optional[Iterable[str]] = None) -> List[ElasticsearchQuery]:        return [] if tags is None else [{            "term": {                "tags": {                    "value": tag                }            }        } for tag in tags]


На фронтенд-части опять-таки не стану останавливаться; весь код в этом коммите.

Ранжирование


Итак, наш поиск ищет карточки, фильтрует их по заданному списку тегов и выводит в каком-то порядке. Но в каком? Порядок очень важен для практичного поиска, но всё, что мы сделали за время наших разбирательств в плане порядка это намекнули Elasticsearch, что находить слова в заголовке карточки выгоднее, чем в описании или тегах, указав приоритет ^3 в multi-match query.

Несмотря на то, что по умолчанию Elasticsearch ранжирует документы довольно хитрой формулой на основе TF-IDF, для нашего воображаемого амбициозного стартапа этого вряд ли хватит. Если наши документы это товары, нам надо уметь учитывать их продажи; если это user-generated контент уметь учитывать его свежесть, и так далее. Но и просто отсортировать по числу продаж/дате добавления мы не можем, потому что тогда мы никак не учтём релевантность поисковому запросу.

Ранжирование это большое и запутанное царство технологий, которое никак не покрыть за один раздел в конце статьи. Поэтому здесь я перехожу в режим крупных мазков; я попробую рассказать в самых общих словах, как может быть устроено industrial grade ранжирование в поиске, и раскрою немного технических деталей того, как его можно реализовать с Elasticsearch.

Задача ранжирования очень сложна, так что неудивительно, что один из основных современных методов её решения машинное обучение. Приложение технологий машинного обучения к ранжированию собирательно называется learning to rank.

Типичный процесс выглядит так.

Определяемся, что мы хотим ранжировать. Мы кладём интересующие нас сущности в индекс, научаемся для заданного поискового запроса получать какой-то разумный топ (например, какой-нибудь простой сортировкой и отсечением) этих сущностей и теперь хотим научиться его ранжировать более умным способом.

Определяемся, как мы хотим ранжировать. Мы решаем, по какой характеристике надо отранжировать нашу выдачу, в соответствии с бизнес-целями нашего сервиса. Например, если наши сущности это товары, которые мы продаём, мы можем хотеть отсортировать их по убыванию вероятности покупки; если мемы по вероятности лайка или шера и так далее. Эти вероятности мы, конечно, не умеем считать в лучшем случае прикидывать, да и то только для старых сущностей, для которых у нас набрано достаточно статистики, но мы попытаемся научить модель предсказывать их, исходя из косвенных признаков.

Извлекаем признаки. Мы придумываем для наших сущностей какое-то множество признаков, которые могли бы помочь нам оценить релевантность сущностей поисковым запросам. Помимо того же TF-IDF, который уже умеет для нас вычислять Elasticsearch, типичный пример CTR (click-through rate): мы берём логи нашего сервиса за всё время, для каждой пары сущность+поисковый запрос считаем, сколько раз сущность появлялась в выдаче по этому запросу и сколько раз её кликали, делим одно на другое, et voil простейшая оценка условной вероятности клика готова. Мы также можем придумать признаки для пользователя и парные признаки пользователь-сущность, чтобы сделать ранжирование персонализированным. Придумав признаки, мы пишем код, который их вычисляет, кладёт в какое-то хранилище и умеет отдавать в real time для заданного поискового запроса, пользователя и набора сущностей.

Собираем обучающий датасет. Тут много вариантов, но все они, как правило, формируются из логов хороших (например, клик и потом покупка) и плохих (например, клик и возврат на выдачу) событий в нашем сервисе. Когда мы собрали датасет, будь то список утверждений оценка релевантности товара X запросу Q примерно равна P, список пар товар X релевантнее товара Y запросу Q или набор списков для запроса Q товары P1, P2, правильно отранжировать так-то, мы ко всем фигурирующим в нём строкам подтягиваем соответствующие признаки.

Обучаем модель. Тут вся классика ML: train/test, гиперпараметры, переобучение, перфовидеокарты и так далее. Моделей, подходящих (и повсеместно использующихся) для ранжирования, много; упомяну как минимум XGBoost и CatBoost.

Встраиваем модель. Нам остаётся так или иначе прикрутить вычисление модели на лету для всего топа, чтобы до пользователя долетали уже отранжированные результаты. Тут много вариантов; в иллюстративных целях я (опять-таки) остановлюсь на простом Elasticsearch-плагине Learning to Rank.

Ранжирование: плагин Elasticsearch Learning to Rank


Elasticsearch Learning to Rank это плагин, добавляющий в Elasticsearch возможность вычислить ML-модель на выдаче и тут же отранжировать результаты согласно посчитанным ею скорам. Он также поможет нам получить признаки, идентичные используемым в real time, переиспользовав при этом способности Elasticsearch (TF-IDF и тому подобное).

Для начала нам нужно подключить плагин в нашем контейнере с Elasticsearch. Нам потребуется простенький Dockerfile

# elasticsearch/DockerfileFROM elasticsearch:7.5.1RUN ./bin/elasticsearch-plugin install --batch http://es-learn-to-rank.labs.o19s.com/ltr-1.1.2-es7.5.1.zip


и сопутствующие изменения в docker-compose.yml:

--- a/docker-compose.yml+++ b/docker-compose.yml@@ -5,7 +5,8 @@ services:   elasticsearch:-    image: "elasticsearch:7.5.1"+    build:+      context: elasticsearch     environment:       - discovery.type=single-node


Также нам потребуется поддержка плагина в Python-клиенте. С изумлением я обнаружил, что поддержка для Python не идёт в комплекте с плагином, так что специально для этой статьи я её запилил. Добавим elasticsearch_ltr в requirements.txt и проапгрейдим клиент в вайринге:

--- a/backend/backend/wiring.py+++ b/backend/backend/wiring.py@@ -1,5 +1,6 @@ import os +from elasticsearch_ltr import LTRClient from celery import Celery from elasticsearch import Elasticsearch from pymongo import MongoClient@@ -39,5 +40,6 @@ class Wiring(object):         self.task_manager = TaskManager(self.celery_app)          self.elasticsearch_client = Elasticsearch(hosts=self.settings.ELASTICSEARCH_HOSTS)+        LTRClient.infect_client(self.elasticsearch_client)         self.indexer = Indexer(self.elasticsearch_client, self.card_dao, self.settings.CARDS_INDEX_ALIAS)         self.searcher: Searcher = ElasticsearchSearcher(self.elasticsearch_client, self.settings.CARDS_INDEX_ALIAS)


Ранжирование: пилим признаки


Каждый запрос в Elasticsearch возвращает не только список ID документов, которые нашлись, но и некоторые их скоры (как вы бы перевели на русский язык слово score?). Так, если это match или multi-match query, которую мы используем, то скор это результат вычисления той самой хитрой формулы с участием TF-IDF; если bool query комбинация скоров вложенных запросов; если function score query результат вычисления заданной функции (например, значение какого-то числового поля в документе) и так далее. Плагин ELTR предоставляет нам возможность использовать скор любого запроса как признак, позволяя легко скомбинировать данные о том, насколько хорошо документ соответствует запросу (через multi-match query) и какие-то предрассчитанные статистики, которые мы заранее кладём в документ (через function score query).

Поскольку на руках у нас база TMDB 5000, в которой лежат описания фильмов и, помимо прочего, их рейтинги, давайте возьмём рейтинг в качестве образцово-показательного предрассчитанного признака.

В этом коммите я добавил в бэкенд нашего веб-приложения некую базовую инфраструктуру для хранения признаков и поддержал подгрузку рейтинга из файла с фильмами. Дабы не заставлять вас читать очередную кучу кода, опишу самое основное.

  • Признаки мы будем хранить в отдельной коллекции и доставать отдельным менеджером. Сваливать все данные в одну сущность порочная практика.
  • В этот менеджер мы будем обращаться на этапе индексации и класть все имеющиеся признаки в индексируемые документы.
  • Чтобы знать схему индекса, нам надо перед началом построения индекса знать список всех существующих признаков. Этот список мы пока что захардкодим.
  • Поскольку мы не собираемся фильтровать документы по значениям признаков, а собираемся только извлекать их из уже найденных документов для обсчёта модели, мы выключим построение по новым полям обратных индексов опцией index: false в схеме и сэкономим за счёт этого немного места.


Ранжирование: собираем датасет


Поскольку, во-первых, у нас нет продакшна, а во-вторых, поля этой статьи слишком малы для рассказа про телеметрию, Kafka, NiFi, Hadoop, Spark и построение ETL-процессов, я просто сгенерирую случайные просмотры и клики для наших карточек и каких-то поисковых запросов. После этого нужно будет рассчитать признаки для получившихся пар карточка-запрос.

Пришла пора закопаться поглубже в API плагина ELTR. Чтобы рассчитать признаки, нам нужно будет создать сущность feature store (насколько я понимаю, фактически это просто индекс в Elasticsearch, в котором плагин хранит все свои данные), потом создать feature set список признаков с описанием, как вычислять каждый из них. После этого нам достаточно будет сходить в Elasticsearch с запросом специального вида, чтобы получить вектор значений признаков для каждой найденной сущности в результате.

Начнём с создания feature set:

# backend/backend/search/ranking.pyfrom typing import Iterable, List, Mappingfrom elasticsearch import Elasticsearchfrom elasticsearch_ltr import LTRClientfrom backend.search.features import CardFeaturesManagerclass SearchRankingManager:    DEFAULT_FEATURE_SET_NAME = "card_features"    def __init__(self, elasticsearch_client: Elasticsearch,                  card_features_manager: CardFeaturesManager,                 cards_index_name: str):        self.elasticsearch_client = elasticsearch_client        self.card_features_manager = card_features_manager        self.cards_index_name = cards_index_name    def initialize_ranking(self, feature_set_name=DEFAULT_FEATURE_SET_NAME):        ltr: LTRClient = self.elasticsearch_client.ltr        try:            # Создать feature store обязательно для работы,            # но при этом его нельзя создавать дважды \_()_/            ltr.create_feature_store()        except Exception as exc:            if "resource_already_exists_exception" not in str(exc):                raise        # Создаём feature set с невероятными ТРЕМЯ признаками!        ltr.create_feature_set(feature_set_name, {            "featureset": {                "features": [                    # Совпадение поискового запроса с названием                    # карточки может быть более сильным признаком,                     # чем совпадение со всем содержимым, поэтому                     # сделаем отдельный признак про это.                    self._make_feature("name_tf_idf", ["query"], {                        "match": {                            # ELTR позволяет параметризовать                            # запросы, вычисляющие признаки. В данном                            # случае нам, очевидно, нужен текст                             # запроса, чтобы правильно посчитать                             # скор match query.                            "name": "{{query}}"                        }                    }),                    # Скор запроса, которым мы ищем сейчас.                    self._make_feature("combined_tf_idf", ["query"], {                        "multi_match": {                            "query": "{{query}}",                            "fields": ["name^3", "tags.text", "text"]                        }                    }),                    *(                        # Добавляем все имеющиеся предрассчитанные                        # признаки через механизм function score.                        # Если по какой-то причине в документе                         # отсутствует искомое поле, берём 0.                        # (В настоящем проекте вам стоит                        # предусмотреть умолчания получше!)                        self._make_feature(feature_name, [], {                            "function_score": {                                "field_value_factor": {                                    "field": feature_name,                                    "missing": 0                                }                            }                        })                        for feature_name in sorted(self.card_features_manager.get_all_feature_names_set())                    )                ]            }        })    @staticmethod    def _make_feature(name, params, query):        return {            "name": name,            "params": params,            "template_language": "mustache",            "template": query,        }


Теперь функция, вычисляющая признаки для заданного запроса и карточек:

    def compute_cards_features(self, query: str, card_ids: Iterable[str],                                feature_set_name=DEFAULT_FEATURE_SET_NAME) -> Mapping[str, List[float]]:        card_ids = list(card_ids)        result = self.elasticsearch_client.search({            "query": {                "bool": {                    # Нам не нужно проверять, находятся ли карточки                    # на самом деле по такому запросу  если нет,                     # соответствующие признаки просто будут нулевыми.                    # Поэтому оставляем только фильтр по ID.                    "filter": [                        {                            "terms": {                                "_id": card_ids                            }                        },                        # Это  специальный новый тип запроса,                        # вводимый плагином SLTR. Он заставит                        # плагин посчитать все факторы из указанного                        # feature set.                        # (Несмотря на то, что мы всё ещё в разделе                        # filter, этот запрос ничего не фильтрует.)                        {                            "sltr": {                                "_name": "logged_featureset",                                "featureset": feature_set_name,                                "params": {                                    # Та самая параметризация.                                     # Строка, переданная сюда,                                    # подставится в запросах                                    # вместо {{query}}.                                    "query": query                                }                            }                        }                    ]                }            },            # Следующая конструкция заставит плагин запомнить все            # рассчитанные признаки и добавить их в результат поиска.            "ext": {                "ltr_log": {                    "log_specs": {                        "name": "log_entry1",                        "named_query": "logged_featureset"                    }                }            },            "size": len(card_ids),        })        # Осталось достать значения признаков из (несколько        # замысловатого) результата поиска.        # (Чтобы понять, где в недрах результатов нужные мне         # значения, я просто делаю пробные запросы в Kibana.)        return {            hit["_id"]: [feature.get("value", float("nan")) for feature in hit["fields"]["_ltrlog"][0]["log_entry1"]]            for hit in result["hits"]["hits"]        }


Простенький скрипт, принимающий на вход CSV с запросами и ID карточек и выдающий CSV с признаками:

# backend/tools/compute_movie_features.pyimport csvimport itertoolsimport sysimport tqdmfrom backend.wiring import Wiringif __name__ == "__main__":    wiring = Wiring()    reader = iter(csv.reader(sys.stdin))    header = next(reader)    feature_names = wiring.search_ranking_manager.get_feature_names()    writer = csv.writer(sys.stdout)    writer.writerow(["query", "card_id"] + feature_names)    query_index = header.index("query")    card_id_index = header.index("card_id")    chunks = itertools.groupby(reader, lambda row: row[query_index])    for query, rows in tqdm.tqdm(chunks):        card_ids = [row[card_id_index] for row in rows]        features = wiring.search_ranking_manager.compute_cards_features(query, card_ids)        for card_id in card_ids:            writer.writerow((query, card_id, *features[card_id]))


Наконец можно это всё запустить!

# Создаём feature setdocker-compose exec backend python -m tools.initialize_search_ranking# Генерируем событияdocker-compose exec -T backend \    python -m tools.generate_movie_events \    < ~/Downloads/tmdb-movie-metadata/tmdb_5000_movies.csv \    > ~/Downloads/habr-app-demo-dataset-events.csv# Считаем признакиdocker-compose exec -T backend \    python -m tools.compute_features \    < ~/Downloads/habr-app-demo-dataset-events.csv \    > ~/Downloads/habr-app-demo-dataset-features.csv


Теперь у нас есть два файла с событиями и признаками и мы можем приступить к обучению.

Ранжирование: обучаем и внедряем модель


Опустим подробности загрузки датасетов (скрипт полностью можно посмотреть в этом коммите) и перейдём сразу к делу.

# backend/tools/train_model.py... if __name__ == "__main__":    args = parser.parse_args()    feature_names, features = read_features(args.features)    events = read_events(args.events)    # Разделим запросы на train и test в соотношении 4 к 1.    all_queries = set(events.keys())    train_queries = random.sample(all_queries, int(0.8 * len(all_queries)))    test_queries = all_queries - set(train_queries)    # DMatrix  это тип данных, используемый xgboost.    # Фактически это массив значений признаков с названиями     # и лейблами. В качестве лейбла мы берём 1, если был клик,     # и 0, если не было (детали см. в коммите).    train_dmatrix = make_dmatrix(train_queries, events, feature_names, features)    test_dmatrix = make_dmatrix(test_queries, events, feature_names, features)    # Учим модель!    # Поля этой статьи всё ещё крайне малы для долгого разговора     # про ML, так что я возьму минимально модифицированный пример     # из официального туториала к XGBoost.    param = {        "max_depth": 2,        "eta": 0.3,        "objective": "binary:logistic",        "eval_metric": "auc",    }    num_round = 10    booster = xgboost.train(param, train_dmatrix, num_round, evals=((train_dmatrix, "train"), (test_dmatrix, "test")))    # Сохраняем обученную модель в файл.     booster.dump_model(args.output, dump_format="json")     # Санитарный минимум проверки того, как прошло обучение: давайте    # посмотрим на топ признаков по значимости и на ROC-кривую.    xgboost.plot_importance(booster)    plt.figure()    build_roc(test_dmatrix.get_label(), booster.predict(test_dmatrix))    plt.show()


Запускаем

python backend/tools/train_search_ranking_model.py \    --events ~/Downloads/habr-app-demo-dataset-events.csv \    --features ~/Downloads/habr-app-demo-dataset-features.csv \     -o ~/Downloads/habr-app-demo-model.xgb


Обратите внимание, что поскольку мы экспортировали все нужные данные предыдущими скриптами, этот скрипт уже не надо запускать внутри докера его нужно запускать на вашей машине, предварительно установив xgboost и sklearn. Аналогично в настоящем продакшне предыдущие скрипты нужно было бы запускать где-то, где есть доступ в продакшн-окружение, а этот нет.

Если всё сделано правильно, модель успешно обучится, и мы увидим две красивых картинки. Первая график значимости признаков:



Хотя события генерировались случайно, combined_tf_idf оказался сильно значимее других потому что я сделал фокус и искусственно понизил вероятность клика для карточек, которые ниже в выдаче, отранжированной нашим старым способом. То, что модель это заметила добрый знак и признак того, что мы не совершили в процессе обучения каких-то совсем глупых ошибок.

Второй график ROC-кривая:



Синяя линия выше красной, а значит, наша модель предсказывает лейблы чуть-чуть лучше, чем бросок монетки. (Кривая ML-инженера маминой подруги должна почти касаться верхнего левого угла).

Дело совсем за малым добавляем скрипт для заливки модели, заливаем и добавляем маленький новый пункт в поисковый запрос рескоринг:

--- a/backend/backend/search/searcher_impl.py+++ b/backend/backend/search/searcher_impl.py@@ -27,6 +30,19 @@ class ElasticsearchSearcher(Searcher):                     "filter": list(self._make_filter_queries(tags, ids)),                 }             },+            "rescore": {+                "window_size": 1000,+                "query": {+                    "rescore_query": {+                        "sltr": {+                            "params": {+                                "query": query+                            },+                            "model": self.ranking_manager.get_current_model_name()+                        }+                    }+                }+            },             "aggregations": {                 self.TAGS_AGGREGATION_NAME: {                     "terms": {"field": "tags"}


Теперь после того, как Elasticsearch произведёт нужный нам поиск и отранжирует результаты своим (довольно быстрым) алгоритмом, мы возьмём топ-1000 результатов и переранжируем, применив нашу (относительно медленную) машинно-обученную формулу. Успех!

Заключение


Мы взяли наше минималистичное веб-приложение и прошли путь от отсутствия фичи поиска как таковой до масштабируемого решения со множеством продвинутых возможностей. Сделать это было не так уж просто. Но и не так уж сложно! Итоговое приложение лежит в репозитории на Github в ветке со скромным названием feature/search и требует для запуска Docker и Python 3 с библиотеками для машинного обучения.

Чтобы показать, как это в целом работает, какие проблемы встречаются и как их можно решить, я использовал Elasticsearch, но это, конечно, не единственный инструмент, который можно выбрать. Solr, полнотекстовые индексы PostgreSQL и другие движки точно так же заслуживают вашего внимания при выборе, на чём построить свою многомиллиардную корпорацию поисковую систему.

И, конечно, это решение не претендует на законченность и готовность к продакшну, а является исключительно иллюстрацией того, как всё может быть сделано. Улучшать его можно практически бесконечно!
  • Инкрементальная индексация. При модификации наших карточек через CardManager хорошо бы сразу обновлять их в индексе. Чтобы CardManager не знал, что у нас в сервисе есть ещё и поиск, и обошлось без циклических зависимостей, придётся прикрутить dependency inversion в том или ином виде.
  • Для индексации в конкретно нашем случае связки MongoDB с Elasticsearch можно использовать готовые решения вроде mongo-connector.
  • Пока пользователь вводит запрос, мы можем предлагать ему подсказки для этого в Elasticsearch есть специальная функциональность.
  • Когда запрос введён, стоит попытаться исправить в нём опечатки, и это тоже целое дело.
  • Для улучшения ранжирования нужно организовать логирование всех пользовательских событий, связанных с поиском, их агрегацию и расчёт признаков на основе счётчиков. Признаки сущность-запрос, сущность-пользователь, сущность-положение Меркурия тысячи их!
  • Особенно весело пилить агрегации событий не офлайновые (раз в день, раз в неделю), а реалтаймовые (задержка от события до учёта в признаках в пределах пяти минут). Вдвойне весело, когда событий сотни миллионов.
  • Предстоит разобраться с прогревом, нагрузочным тестированием, мониторингами.
  • Оркестрировать кластер нод с шардированием и репликацией это целое отдельное наслаждение.

Но чтобы статья осталась читабельного размера, я остановлюсь на этом и оставлю вас наедине с этими челленджами. Спасибо за внимание!
Подробнее..

Собираем данные AlphaVantage с Faust. Часть 1. Подготовка и введение

20.09.2020 14:22:27 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть I Введение

20.09.2020 16:05:00 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть II Агенты и Команды

23.09.2020 04:06:02 | Автор: admin

Оглавление

  1. Часть I: Введение

  2. Часть II: Агенты и Команды

Что мы тут делаем?

Итак-итак, вторая часть. Как и писалось ранее, в ней мы сделаем следующее:

  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.

  2. Сделаем агента, который будет собирать данные о ценных бумагах и мета информацию по ним.

Но, это то, что мы сделаем для самого проекта, а в плане исследования faust мы узнаем, как писать агентов, обрабатывающих стрим событий из kafka, а так же как написать команды (обёртка на click), в нашем случаи - для ручного пуша сообщения в топик, за которым следит агент.

Подготовка

Клиент AlphaVantage

Для начала, напишем небольшой aiohttp клиентик для запросов на alphavantage.

alphavantage.py

Spoiler
import urllib.parse as urlparsefrom io import StringIOfrom typing import Any, Dict, List, Unionimport aiohttpimport pandas as pdimport stringcasefrom loguru import loggerfrom horton.config import API_ENDPOINTclass AlphaVantageClient:    def __init__(        self,        session: aiohttp.ClientSession,        api_key: str,        api_endpoint: str = API_ENDPOINT,    ):        self._query_params = {"datatype": "json", "apikey": api_key}        self._api_endpoint = api_endpoint        self._session = session    @logger.catch    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:        formatted_data = {}        for field, item in data.items():            formatted_data[stringcase.snakecase(field)] = item        return formatted_data    @logger.catch    async def _construct_query(        self, function: str, to_json: bool = True, **kwargs    ) -> Union[Dict[str, Any], str]:        path = "query/"        async with self._session.get(            urlparse.urljoin(self._api_endpoint, path),            params={"function": function, **kwargs, **self._query_params},        ) as response:            data = (await response.json()) if to_json else (await response.text())            if to_json:                data = self._format_fields(data)        return data    @logger.catch    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)        data = pd.read_csv(StringIO(data))        securities = data.to_dict("records")        for index, security in enumerate(securities):            security = self._format_fields(security)            security["_type"] = "physical"            securities[index] = security        return securities    @logger.catch    async def get_security_overview(self, symbol: str) -> Dict[str, str]:        return await self._construct_query("OVERVIEW", symbol=symbol)    @logger.catch    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query(            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"        )    @logger.catch    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)    @logger.catch    async def get_indicator_data(        self, symbol: str, indicator: str, **indicator_options    ) -> Dict[str, Any]:        return await self._construct_query(            indicator, symbol=symbol, **indicator_options        )

Собственно по нему всё ясно:

  1. API AlphaVantage достаточно просто и красиво спроектирована, поэтому все запросы я решил проводить через метод construct_query где в свою очередь идёт http вызов.

  2. Все поля я привожу к snake_case для удобства.

  3. Ну и декорация logger.catch для красивого и информативного вывода трейсбека.

P.S. Незабываем локально добавить токен alphavantage в config.yml, либо экспортировать переменную среды HORTON_SERVICE_APIKEY. Получаем токен тут.

CRUD-класс

У нас будет коллекция securities для хранения мета информации о ценных бумагах.

database/security.py

Тут по-моему ничего пояснять не нужно, а базовый класс сам по себе достаточно прост.

get_app()

Добавим функцию создания объекта приложения в app.py

Spoiler
import faustfrom horton.config import KAFKA_BROKERSdef get_app():    return faust.App("horton", broker=KAFKA_BROKERS)

Пока у нас будет самое простое создание приложения, чуть позже мы его расширим, однако, чтобы не заставлять вас ждать, вот референсы на App-класс. На класс settings тоже советую взглянуть, так как именно он отвечает за большую часть настроек.

Основная часть

Агент сбора и сохранения списка ценных бумаг

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Так, сначала получаем объект faust-приложения - это достаточно просто. Далее, мы явно объявляем топик для нашего агента... Тут стоит упомянуть, что это такое, что за параметр internal и как это можно устроить по-другому.

  1. Топики в kafka, если мы хотим узнать точное определение, то лучше прочитать офф. доку, либо можно прочитать конспект на хабре на русском, где так же всё достаточно точно отражено :)

  2. Параметр internal, достаточно хорошо описанный в доке faust, позволяет нам настраивать топик прямо в коде, естественно, имеются ввиду параметры, предусмотренные разработчиками faust, например: retention, retention policy (по-умолчанию delete, но можно установить и compact), кол-во партиций на топик (partitions, чтобы сделать, например, меньшее чем глобальное значение приложения faust).

  3. Вообще, агент может создавать сам управляемый топик с глобальными значениями, однако, я люблю объявлять всё явно. К тому же, некоторые параметры (например, кол-во партиций или retention policy) топика в объявлении агента настроить нельзя.

    Вот как это могло было выглядеть без ручного определения топика:

app = get_app()@app.agent()async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Ну а теперь, опишем, что будет делать наш агент :)

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for _ in stream:            logger.info("Start collect securities")            client = AlphaVantageClient(session, API_KEY)            securities = await client.get_securities()            for security in securities:                await SecurityCRUD.update_one(                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True                )            yield True

Итак, в начале агента мы открываем aiohttp сессию для запросов через наш клиент. Таким образом, при запуске воркера, когда будет запущен наш агент, сразу же будет открыта сессия - одна, на всё время работы воркера (или несколько, если изменить параметр concurrency у агента с дефолтной единички).

Далее, мы идём по стриму (сообщение мы помещаем в _, так как нам, в данном агенте, безразлично содержание) сообщений из нашего топика, если они есть при текущем сдвиге (offset), иначе, наш цикл будет ожидать их поступления. Ну а внутри нашего цикла, мы логируем поступление сообщения, получаем список активных (get_securities возвращает по-умолчания только active, см. код клиента) ценных бумаг и сохраняем его в базу, проверяя при этом, есть ли бумага с таким тикером и биржей в БД, если есть, то она (бумага) просто обновится.

Запустим наше творение!

> docker-compose up -d... Запуск контейнеров ...> faust -A horton.agents worker --without-web -l info

P.S. Возможности веб-компонента faust я рассматривать в статьях не буду, поэтому выставляем соответствующий флаг.

В нашей команде запуска мы указали faust'у, где искать объект приложения и что делать с ним (запустить воркер) с уровнем вывода логов info. Получаем следующий вывод:

Spoiler
aS v1.10.4 id           horton                                             transport    [URL('kafka://localhost:9092')]                    store        memory:                                            log          -stderr- (info)                                    pid          1271262                                            hostname     host-name                                          platform     CPython 3.8.2 (Linux x86_64)                       drivers                                                           transport  aiokafka=1.1.6                                       web        aiohttp=3.6.2                                      datadir      /path/to/project/horton-data                       appdir       /path/to/project/horton-data/v1                   ... логи, логи, логи ...Topic Partition Set topic                       partitions  collect_securities          {0-7}       horton-__assignor-__leader  {0}         

Оно живое!!!

Посмотрим на partition set. Как мы видим, был создан топик с именем, которое мы обозначили в коде, кол-во партиций дефолтное (8, взятое из topic_partitions - параметра объекта приложения), так как у нашего топика мы индивидуальное значение (через partitions) не указывали. Запущенному агенту в воркере отведены все 8 партициций, так как он единственный, но об этом будет подробнее в части про кластеринг.

Что же, теперь можем зайти в другое окно терминала и отправить пустое сообщение в наш топик:

> faust -A horton.agents send @collect_securities{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. с помощью @ мы показываем, что посылаем сообщение в топик с именем "collect_securities".

В данном случае, сообщение ушло в 6 партицию - это можно проверить, зайдя в kafdrop на localhost:9000

Перейдя в окно терминала с нашим воркером, мы увидим радостное сообщение, посланное с помощью loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Так же, можем заглянуть в mongo (с помощью Robo3T или Studio3T) и увидеть, что ценные бумаги в базе:

Я не миллиардер, а потому, довольствуемся первым вариантом просмотра.

Счастье и радость - первый агент готов :)

Агент готов, да здравствует новый агент!

Да, господа, нами пройдена только 1/3 пути, уготованного этой статьёй, но не унывайте, так как сейчас будет уже легче.

Итак, теперь нам нужен агент, который собирает мета информацию и складывает её в документ коллекции:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[?],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            ...

Так как этот агент будет обрабатывать информацию о конкретной security, нам нужно в сообщении указать тикер (symbol) этой бумаги. Для этого в faust существуют Records - классы, декларирующие схему сообщения в топике агента.

В таком случае перейдём в records.py и опишем, как должно выглядеть сообщение у этого топика:

import faustclass CollectSecurityOverview(faust.Record):    symbol: str    exchange: str

Как вы уже могли догадаться, faust для описания схемы сообщения использует аннотацию типов в python, поэтому и минимальная версия, поддерживаемая библиотекой - 3.6.

Вернёмся к агенту, установим типы и допишем его:

collect_security_overview_topic = app.topic(    "collect_security_overview", internal=True, value_type=CollectSecurityOverview)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[CollectSecurityOverview],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            logger.info(                "Start collect security [{symbol}] overview", symbol=event.symbol            )            client = AlphaVantageClient(session, API_KEY)            security_overview = await client.get_security_overview(event.symbol)            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)            yield True

Как видите, мы передаём в метод инициализации топика новый параметр со схемой - value_type. Далее, всё по той же самой схеме, поэтому останавливаться на чём то ещё - смысла не вижу.

Ну что же, последний штрих - добавим в collect_securitites вызов агента сбора мета информации:

....for security in securities:    await SecurityCRUD.update_one({            "symbol": security["symbol"],            "exchange": security["exchange"]        },        security,        upsert = True,    )    await collect_security_overview.cast(        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])    )....

Используем ранее объявлению схему для сообщения. В данном случае, я использовал метод .cast, так как нам не нужно ожидать результат от агента, но стоит упомянуть, что способов послать сообщение в топик:

  1. cast - не блокирует, так как не ожидает результата. Нельзя послать результат в другой топик сообщением.

  2. send - не блокирует, так как не ожидает результата. Можно указать агента в топик которого уйдёт результат.

  3. ask - ожидает результата. Можно указать агента в топик которого уйдёт результат.

Итак, на этом с агентами на сегодня всё!

Команда мечты

Последнее, что я обещал написать в этой части - команды. Как уже говорилось ранее, команды в faust - это обёртка над click. Фактически faust просто присоединяет нашу кастомную команду к своему интерфейсу при указании ключа -A

После объявленных агентов в agents.py добавим функцию с декоратором app.command, вызывающую метод cast у collect_securitites:

@app.command()async def start_collect_securities():    """Collect securities and overview."""    await collect_securities.cast()

Таким образом, если мы вызовем список команд, в нём будет и наша новая команда:

> faust -A horton.agents --help....Commands:  agents                    List agents.  clean-versions            Delete old version directories.  completion                Output shell completion to be evaluated by the...  livecheck                 Manage LiveCheck instances.  model                     Show model detail.  models                    List all available models as a tabulated list.  reset                     Delete local table state.  send                      Send message to agent/topic.  start-collect-securities  Collect securities and overview.  tables                    List available tables.  worker                    Start worker instance for given app.

Ею мы можем воспользоваться, как любой другой, поэтому перезапустим faust воркер и начнём полноценный сбор ценных бумаг:

> faust -A horton.agents start-collect-securities

Что будет дальше?

В следующей части мы, на примере оставшихся агентов, рассмотрим, механизм sink для поиска экстремум в ценах закрытия торгов за год и cron-запуск агентов.

На сегодня всё! Спасибо за прочтение :)

Код этой части

P.S. Под прошлой частью меня спросили про faust и confluent kafka (какие есть у confluent фичи). Кажется, что confluent во многом функциональнее, но дело в том, что faust не имеет полноценной поддержки клиента для confluent - это следует из описания ограничений клиентов в доке.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru