Русский
Русский
English
Статистика
Реклама

Kafka streams

Перевод Pulsar vs Kafka сравнение и мифы

26.03.2021 08:19:22 | Автор: admin


Pulsar или Kafka что лучше? Здесь мы обсудим плюсы и минусы, распространенные мифы и нетехнические критерии, чтобы найти лучший инструмент для ваших задач.


Обычно я рассказываю об Apache Kafka и ее экосистеме. О Pulsar за последние годы меня спрашивали только коммитеры и авторы Pulsar. Они задавали сложные технические вопросы, чтобы показать, что Kafka не идет ни в какое сравнение с Pulsar. На Reddit и подобных платформах разгораются яростные и очень субъективные споры на эту тему. Я поделюсь своей точкой зрения, основанной на многолетнем опыте работы со стриминговыми опенсорс-платформами.


Сравнение технологий нынче в моде: Kafka vs. Middleware, Event Streaming и API Platforms


Цель сравнения технологий помочь людям подобрать подходящее решение и архитектуру для своих потребностей. Универсального решения не существует. И это не дело вкуса. Просто для каждой задачи есть свой инструмент.


При этом такие сравнения почти всегда субъективны. Даже если автор не работает на вендора и представляется независимым консультантом, скорее всего, у него все равно есть любимчики, даже если он сам себе в этом не признается. Однако полезно посмотреть на инструменты с разных точек зрения. Поскольку Apache Pulsar сейчас обсуждают, я решил поделиться своим мнением и сравнить его с Kafka. Я работаю в Confluent, а мы лучшие эксперты по Apache Kafka и ее экосистеме. И все же я постараюсь быть максимально объективным и оперировать голыми фактами.


Опенсорс-фреймворки и коммерческие программы постоянно сравнивают. Я и сам делал такие сравнения в своем блоге и на других платформах, например InfoQ: Сравнение платформ интеграции, Выбор подходящего ESB для ваших потребностей, Kafka vs. ETL / ESB / MQ, Kafka vs. Mainframe и Apache Kafka и API Management / API Gateway. Просто заказчики хотели понять, какой инструмент поможет решить те или иные задачи.


Если говорить о сравнении Pulsar и Kafka, ситуация немного отличается.


Зачем сравнивать Pulsar и Kafka?


Реальные и потенциальные заказчики редко спрашивают о Pulsar. Хотя, если честно, в последние месяцы чуть чаще где-то на каждой 15-й или 20-й встрече. Потому что фичи и варианты использования у них частично совпадают. Хотя, по-моему, все дело в паре статей о том, что Pulsar якобы в чем-то лучше Kafka. В пользу противоположной точки зрения нет никаких фактов и очень мало материала.


Я не видел ни одной организации, которая бы всерьез задумывалась о Pulsar в продакшене, хотя по всему миру множество пользователей, которым нужна технология распределенной обработки сообщений, вроде Kafka или Pulsar. По-моему, те, кто рекомендует Pulsar, чего-то не договаривают.


Например, их главный пользователь Tencent, крупная китайская технологическая компания, которая при этом очень активно использует Kafka везде, кроме одного проекта, где пригодился Pulsar. Tencent обрабатывает с Kafka десять триллионов сообщений в день (только представьте: 10 000 000 000 000). Если посчитать, Tencent использует Kafka в тысячу раз активнее, чем Pulsar (10 трлн против десятков млрд). Tencent с удовольствием рассказывают о своем деплое Kafka: Как Tencent PCG использует Apache Kafka для обработки 10+ трлн сообщений в день.


Сравнение двух конкурирующих опенсорс-платформ


Apache Kafka и Apache Pulsar две замечательные конкурирующие технологии. Логично будет их сравнить.


У Apache Kafka и Apache Pulsar очень схожий набор функций. Советую оценить обе платформы на предмет доступных функций, зрелости, распространения на рынке, опенсорс-инструментов и проектов, обучающего материала, проведения митапов по регионам, видео, статей и так далее. Примеры вариантов использования в вашей отрасли помогут принять правильное решение.


Confluent уже писал сравнение Kafka vs. Pulsar vs. RabbitMQ: производительность, архитектура и функции. Я тоже поучаствовал. Значит, сравнение уже есть


О чем тогда эта статья?


Я хочу рассмотреть несколько мифов из споров на тему Kafka против Pulsar, которые регулярно возникают в блогах и на форумах. Затем я сравню их не только по техническим аспектам, потому что обычно никто не говорит о Pulsar с этой стороны.



Kafka vs Pulsar мифы


Мне попадалось несколько мифов. С некоторыми я согласен, другие можно легко развенчать фактами. Ваше мнение может отличаться от моего, это нормально. Я излагаю исключительно свою точку зрения.


Миф 1. У Pulsar есть характеристики, которых нет у Kafka.


Правда.


Если сравнивать Apache Kafka и Apache Pulsar, можно найти различия в многоуровневой архитектуре, очередях и мультитенантности.


Но!


У Kafka тоже есть уникальные особенности:


  • В два раза меньше серверов, которыми приходится управлять.
  • Данные сохраняются на диск только один раз.
  • Данные кэшируются в памяти только однажды.
  • Проверенный протокол репликации.
  • Производительность zero-copy.
  • Транзакции.
  • Встроенная обработка потока.
  • Долгосрочное хранилище.
  • В разработке: удаление ZooKeeper (KIP-500), чтобы использовать и деплоить Kafka было еще проще, чем Pulsar с его четырехкомпонентной архитектурой (Pulsar, ZooKeeper, BookKeeper и RocksDB), а еще чтобы повысить масштабируемость, устойчивость и т. д.
  • В разработке: многоуровневое хранилище (KIP-405), чтобы повысить эластичность и экономичность Kafka.

Спросите себя: можно ли сравнивать опенсорс-платформы или продукты и вендоров с комплексным предложением?


Легко добавлять новые функции, если для них не нужно предоставлять критическую поддержку. Не оценивайте характеристики по списку. Узнайте, насколько они проверены в рабочих сценариях. Сколько из этих уникальных особенностей имеют низкое качество и реализованы наспех?


Например. Понадобилось несколько лет, чтобы реализовать и испытать в бою Kafka Streams в качестве нативного движка обработки потоков. Как это можно сравнивать с Pulsar Functions? Последнее позволяет добавлять определяемые пользователем функции (UDF) безо всякой связи с обработкой реальных потоков. Или это скорее похоже на Single Message Transformations (SMT), основную функцию Kafka Connect? Не сравнивайте круглое с квадратным и не забывайте учитывать зрелость. Чем критичнее функция, тем больше зрелости ей требуется.


Сообщество Kafka вкладывает невероятные усилия в работу над основным проектом и его экосистемой. Только в Confluent 200 инженеров занимаются исключительно проектом Kafka, дополнительными компонентами, коммерческими продуктами и предложениями SaaS в основных облачных провайдерах.


Миф 2. У Pulsar есть несколько крупных пользователей, например китайский Tencent.


Правда.


Но! Tencent использует Kafka активнее, чем Pulsar. Отдел выставления счетов, где используется Pulsar, это лишь малая часть Tencent, в то время как остальные используют Kafka. У них есть глобальная архитектура на основе Kafka, где больше тысячи брокеров сгруппированы в единый логический кластер.


Будьте осторожны с опенсорс-проектами. Проверяйте, какого успеха добились с ними обычные компании. Если эту технологию использует один технологический гигант, это еще не значит, что вам она тоже подойдет. Сколько компаний из Fortune 2000 могут похвастаться историями успеха с Pulsar?


Ищите примеры не только среди гигантов!


Примеры обычных компаний позволят лучше понять, как применяется тот или иной инструмент в реальном мире. Если это не истории успеха от самих вендоров. На сайте Kafka можно найти много примеров компаний. Более того, на конференциях Kafka Summit в Сан-Франциско, Нью-Йорке и Лондоне каждый год разные предприятия из разных отраслей делятся своими историями и кейсами. Это компании из Fortune 2000, предприятия среднего бизнеса и стартапы.


Приведу один пример про Kafka. Для репликации данных в реальном времени между отдельными кластерами Kafka существует много разных инструментов, включая MirrorMaker 1 (часть проекта Apache Kafka), MirrorMaker 2 (часть проекта Apache Kafka), Confluent Replicator (от Confluent, доступен только в рамках Confluent Platform и Confluent Cloud), uReplicator (опенсорс от Uber), Mirus (опенсорс от Salesforce), Brooklin (опенсорс от LinkedIn).


На практике разумно использовать только два варианта, если, конечно, вы не хотите обслуживать и улучшать код самостоятельно. Это MirrorMaker 2 (еще не вполне зрелая новинка, но отличный выбор для средне- и долгосрочной перспективы) и Confluent Replicator (проверенный на практике во многих критически важных для бизнеса деплоях, но платный). Остальные варианты тоже нормальные. Но кто обслуживает эти проекты? Кто разбирается с багами и проблемами безопасности? Кому звонить, если в продакшене что-то случилось? Одно дело деплоить критически важные системы в продакшене, другое оценивать и пробовать опенсорс-проект.


Миф 3. Pulsar предлагает очереди и стриминг в одном решении.


Частично.


Очереди используются для обмена сообщениями между двумя точками. Они предоставляют асинхронный протокол взаимодействия, то есть отправитель и получатель сообщения не должны обращаться к очереди одновременно.


Pulsar предлагает только ограниченную поддержку очередей сообщений и стриминга событий. В обеих областях он пока не может составить крепкую конкуренцию по двум причинам:


  1. Pulsar предлагает ограниченную поддержку очередей сообщений, потому что ему не хватает таких популярных функций, как XA-транзакции, маршрутизация, фильтрация сообщений и т. д. Это обычные функции в таких системах сообщений, как IBM MQ, RabbitMQ, и ActiveMQ. Адаптеры Pulsar для систем сообщений тоже ограничены в этом смысле. В теории все выглядит нормально, но на практике...


  2. Pulsar предлагает ограниченную поддержку стриминга. Например, на практике в большинстве случаев он не поддерживает семантику строго однократной (exactly-once) доставки и обработки. Вряд ли кто-то станет использовать Pulsar для платежной системы, потому что платежи могут дублироваться и теряться. Ему не хватает функционала для обработки потоков с соединением, агрегированием, окнами, отказоустойчивым управлением состоянием и обработкой на основе времени событий. Топики в Pulsar отличаются от топиков в Kafka в худшую сторону из-за BookKeeper, который был придуман в 2008 году как лог упреждающей записи для HDFS namenode в Hadoop в расчете на краткосрочное хранение.



Примечание. Адаптер Kafka для Pulsar тоже ограничен. В теории звучит заманчиво, но в реальности не все получается, потому что он поддерживает только небольшую часть функционала Kafka.


Как и Pulsar, Kafka предлагает ограниченную поддержку очереди.


В Kafka можно использовать разные обходные пути, чтобы реализовать нормальное поведение очереди. Если вы хотите использовать отдельные очереди вместо общих топиков Kafka для:


  • Безопасности => используйте Kafka ACL (и дополнительные инструменты, вроде контроля доступа на основе ролей (RBAC) от Confluent).
  • Семантики (отдельных приложений) => используйте группы консюмеров в Kafka.
  • Балансировки нагрузки => используйте партиции Kafka.

Обычно я спрашиваю заказчиков, зачем им нужны очереди. Kafka предлагает готовые решения для разных сценариев, на которые можно просто посмотреть с другой стороны. Редко встречаются случаи, когда действительно требуется высокая пропускная способность с очередями.


Зная обходные пути и ограничения Pulsar и Kafka в плане сообщений, давайте проясним: ни одна из платформ не предлагает решение для обмена сообщениями.


Если вам нужно именно оно, возьмите что-то вроде RabbitMQ или NATS.


Однозначного решения здесь нет. Многие наши заказчики заменяют имеющиеся системы сообщений, вроде IBM MQ, на Kafka (из соображений масштабируемости и затрат). Просто нужно знать имеющиеся варианты и их недостатки, все взвесить и подобрать решение, которое лучше всего подходит именно вам.


Миф 4. Pulsar предоставляет потоковую обработку.


Неправда.


Или, если по-честному, все зависит от того, что вы называете обработкой потоков. Это какая-то простейшая функция или прямо полноценная обработка?


Если в двух словах, обработкой потоков я называю непрерывное потребление, обработку и агрегирование событий из разных источников данных. В реальном времени. В любом масштабе. Естественно, с защитой от сбоев, особенно если речь идет о stateful.



Pulsar предлагает минимальную потоковую обработку через Pulsar Functions. Она подходит для простых обратных вызовов, но не сравнится с функционалом Kafka Streams или ksqlDB для создания стриминговых приложений со stateful-информацией, скользящими окнами и другими функциями. Варианты применения можно найти в разных отраслях. Например, на сайте Kafka Streams есть кейсы New York Times, Pinterest, Trivago, Zalando и других.


Для стриминга аналитики Pulsar обычно используется в сочетании с нормальной платформой обработки потоков, вроде Apache Spark или Apache Flink, но вам придется управлять дополнительным элементами распределенной архитектуры и разбираться в их сложном взаимодействии.


Миф 5. Pulsar предоставляет семантику exactly-once, как и Kafka.


Неправда.


В Pulsar есть функция дедупликации, чтобы одно сообщение не сохранялось в брокере Pulsar дважды, но ничто не мешает консюмеру прочитать это сообщение несколько раз. Этого недостаточно для любого применения обработки потоков, где для входа и выхода используется Pulsar.


В отличие от функции транзакций в Kafka, здесь нельзя привязывать отправленные сообщения к состоянию, записанному в обработчике потоков.


Семантика Exactly-Once Semantics (EOS) доступна с версии Kafka 0.11, которая вышла три года назад, и используется во многих продакшен-деплоях. Kafka EOS поддерживает всю экосистему Kafka, включая Kafka Connect, Kafka Streams, ksqlDB и такие клиенты, как Java, C, C++, Go и Python. На конференции Kafka Summit было несколько выступлений, посвященных функционалу Kafka EOS, включая это превосходное и понятное введение со слайдами и видео.


Миф 6. У Pulsar производительность выше, чем у Kafka.


Неправда.


Я не поклонник разных бенчмарков производительности и пропускной способности. Обычно они субъективны и относятся к конкретной задаче независимо от того, кто их проводит вендор, независимый консультант или исследователь.


Например, GIGAOM опубликовали один бенчмарк, где сравнивается задержка и производительность в Kafka и Pulsar. Автор намеренно замедлил Kafka, настроив параметр flush.messages = 1, в результате которого каждый запрос вызывает fsync и Kafka синхронизируется с диском при каждом сообщении. В этом же бенчмарке консюмер Kafka отправляет подтверждение синхронно, а консюмер Pulsar асинхронно. Неудивительно, что Pulsar вышел явным победителем. Правда, автор забыл упомянуть или объяснить существенные отличия в конфигурации и измерениях. Давайте не будем путать теплое с мягким.


На самом деле архитектура Pulsar сильнее нагружает сеть (из-за брокера, который выступает как прокси перед серверами bookie BookKeeper) и требует в два раза больше пропускной способности (потому что BookKeeper записывает данные в лог упреждающей записи и в главный сегмент).


Confluent тоже делал бенчмарки. Только там сравнивалось сопоставимое. Неудивительно, что результаты получились другими. Но нужно ли вообще встревать в эти бои на бенчмарках между вендорами?


Оцените свои требования к производительности. Сделайте proof-of-concept с Kafka и Pulsar, если надо. Скорее всего, в 99% случаев обе платформы покажут приемлемую производительность для вашего сценария. Не доверяйте посторонним субъективным бенчмаркам! Ваш случай все равно уникален, и производительность это только один из аспектов.


Миф 7. Pulsar проще в использовании, чем Kafka.


Неправда.


Без дополнительных инструментов вам будет сложно и с Kafka, и с Pulsar.


У Kafka две распределенных системы: сама Kafka и Apache ZooKeeper.


Но! У Pulsar три распределенных системы, а еще хранилище: Pulsar, ZooKeeper и Apache BookKeeper. Как и Pulsar, BookKeeper использует ZooKeeper. Для некоторых задач хранения используется RocksDB. Это означает, что Pulsar гораздо сложнее понять и настроить по сравнению с Kafka. Кроме того, у Pulsar больше параметров конфигурации, чем у Kafka.


Kafka движется в противоположном направлении скоро ZooKeeper будет удален (см. KIP-500), так что останется всего одна распределенная система, которую нужно деплоить, обслуживать, масштабировать и мониторить:



ZooKeeper мешает масштабированию в Kafka и усложняет эксплуатацию. Но у Pulsar все еще хуже!


Одна из главных проблем моих заказчиков использование ZooKeeper в критически важных деплоях в большом масштабе. С нетерпением жду упрощения архитектуры Kafka, чтобы остались только брокеры. Кроме того, мы получим единую модель безопасности, потому что больше не придется отдельно настраивать безопасность для ZooKeeper. Это огромное преимущество, особенно для крупных организаций и отраслей со строгим регулированием. Ребята из комплаенса и ИБ скажут вам спасибо за упрощенную архитектуру.


Использование платформы связано НЕ только с архитектурой


У Kafka гораздо более подробная документация, огромное сообщество экспертов и большой набор поддерживающих инструментов, которые упрощают работу.


Кроме того, по Kafka есть много вариантов обучения онлайн-курсы, книги, митапы и конференции. К сожалению, у Pulsar с этим все плохо.


Миф 8. Архитектура с тремя уровнями лучше, чем с двумя.


Зависит от ситуации.


Лично я скептически отношусь к тому, что трехуровневую архитектуру Pulsar (брокеры Pulsar, ZooKeeper и BookKeeper) можно считать преимуществом для большинства проектов. Тут есть издержки.


Twitter рассказал, как отказался от BookKeeper + DistributedLog (DistributedLog похож на Pulsar по архитектуре и дизайну) около года назад, соблазнившись такими преимуществами одноуровневой архитектуры Kafka, как экономичность и улучшенная производительность, которых недоставало двухуровневой архитектуре с отдельным хранилищем.


Как и Pulsar, DistributedLog построен на BookKeeper и добавляет стриминговый функционал, схожий с Pulsar (например, уровень обработки существует отдельно от уровня хранилища). Изначально DistributedLog был отдельным проектом, но потом присоединился к BookKeeper, хотя сегодня им, похоже, мало кто занимается (всего пара коммитов за последний год). Среди основных причин перехода Twitter на Kafka называлась значительная экономия и повышение производительности, а также обширное сообщество и популярность Kafka. Вот какие выводы они сделали: Для одного консюмера экономия ресурсов составила 68%, а для нескольких целых 75%.


Преимущества трехуровневой архитектуры проявляются при создании масштабируемой инфраструктуры. При этом дополнительный уровень увеличивает нагрузку на сеть минимум на 33%, а данные, которые хранятся в брокерах Pulsar, приходится дополнительно кэшировать на обоих уровнях, чтобы добиться аналогичной производительности, а еще дважды записывать на диск, потому что формат хранения в Bookkeeper основан не на логе.


В облаке, где работает большинство деплоев Kafka, лучшим внешним хранилищем выступает не такая узкоспециализированная технология, как BookKeeper, а популярное и проверенное хранилище объектов, вроде AWS S3 и GCP GCS.


Tiered Storage в Confluent Platform на основе AWS S3, GCP GCS и им подобных, дает те же преимущества без дополнительного слоя BookKeeper, как у Pulsar, и без дополнительных затрат и задержек, связанных с передачей данных по сети. У Confluent ушло два года на то, чтобы выпустить GA-версию Tiered Storage for Kafka с круглосуточной поддержкой для самых критичных данных. Tiered Storage пока недоступно для опенсорсной версии Apache Kafka, но Confluent вместе с сообществом Kafka (включая крупные технологические компании, вроде Uber) работает над KIP-405, чтобы добавить Tiered Storage в Kafka с разными вариантами хранения.


У обеих архитектур есть плюсы и минусы. Лично мне кажется, что 95% проектов не нуждаются в сложной трехуровневой архитектуре. Она может пригодиться там, где требуется внешнее недорогое хранилище, но вам придется отвечать за круглосуточное соблюдение SLA, масштабируемость и пропускную способность. А еще за безопасность, управление, поддержку и интеграцию в вашу экосистему. Если вам действительно нужна трехуровневая архитектура, не стану вас останавливать.


Связанный миф: Pulsar лучше подходит для отстающих консюмеров благодаря уровню кэширования и хранения.


Неправда.


Основная проблема отстающих консюмеров в том, что они занимают page cache, то есть последние сообщения уже кэшированы. При чтении из предыдущих сегментов они заменяются, что снижает производительность консюмеров, читающих с начала лога.


Архитектура Pulsar проявляет себя даже хуже в этом отношении. Проблема сброса кэша сохраняется, при этом для чтения нужно сделать лишний прыжок по сети и загрузить сеть вместо того, чтобы просто прочитать сообщение с локального носителя.


Миф 9. Kafka масштабируется хуже, чем Pulsar.


Неправда.


Это один из главных аргументов сообщества Pulsar. Как я уже говорил, это всегда зависит от выбранного бенчмарка. Например, я видел тесты с эквивалентными вычислительными ресурсами, где на высокой пропускной способности у Kafka результат был гораздо лучше, чем у Pulsar. Вот один из таких бенчмарков:



В большинстве случаев масштабируемость не проблема. Kafka можно легко разогнать до нескольких гигабайтов в секунду, как в демонстрации Масштабирование Apache Kafka до 10+ ГБ в секунду в Confluent Cloud:



Честно говоря, этот вопрос должен волновать менее 1% пользователей. Если у вас требования, как у Netflix (петабайты в день) или LinkedIn (триллионы сообщений), есть смысл обсуждать самую подходящую архитектуру, железо и конфигурацию. Остальным можно не беспокоиться.


Связанный миф: сейчас в Kafka может храниться всего 500к партиций на кластер.


Правда.


Пока у Kafka не лучшая архитектура для масштабных деплоев с сотнями тысяч топиков и партиций.


Но! Pulsar тоже не резиновый. Просто у него другие лимиты.


Ограничения в Kafka связаны с Zookeeper. Когда Zookeeper удалят через KIP-500, верхняя граница уйдет вместе с ним.


Примечание: успех зависит от подходящего устройства архитектуры!

Если у заказчика возникли проблемы с количеством партиций и масштабируемостью в Kafka, скорее всего, он ошибся в архитектуре и приложениях (Pulsar бы тут точно не помог).


Kafka это платформа стриминга, а не очередной IBM MQ. Если вы попробуете воссоздать любимое решение и архитектуру MQ с Kafka, у вас вряд ли что-то получится. У меня было несколько заказчиков, которые потерпели крах, но смогли привести все в порядок с нашей помощью.


Скорее всего, у вас не возникнет проблем с количеством партиций и масштабируемостью, даже пока Kafka еще использует ZooKeeper, если вы правильно все спроектируете и разберетесь в базовых принципах Kafka. Такие проблемы часто возникают с любой технологией, вроде Kafka, которая вводит совершенно новый уровень и парадигму (например, при первом появлении облаков немало компаний набили себе шишки при попытке их освоить).


Связанный миф: Pulsar поддерживает практически неограниченное число партиций.


Неправда.


У BookKeeper существуют те же ограничения по одному файлу на ledger, что и у Kafka, но в одной партиции таких ledger несколько. Брокеры Pulsar объединяют партиции в группы, но на уровне хранения, в Bookkeeper, данные хранятся в сегментах, и на каждую партицию приходится много сегментов.


В Kafka метаданные для этих сегментов хранятся в Zookeeper, откуда и возникают эти ограничения по числу. Когда Kafka избавится от этой зависимости, границы возможного раздвинутся. С нетерпением жду реализации KIP-500. Подробности читайте в статье Apache Kafka справится сама: удаление зависимости от Apache ZooKeeper.


Связанный миф: потребности масштабирования в Kafka необходимо определять при создании топика.


Отчасти это правда.


Если требуется очень большой масштаб, в топиках Kafka можно сразу создать больше партиций, чем обычно требуется для этой задачи. (См. Потоки и таблицы в Apache Kafka: топики, партиции и хранение.) Или можно добавить партиции позже. Это не идеальное решение, но так уж устроены распределенные системы стриминга (которые, кстати, масштабируются лучше, чем традиционные системы обработки сообщений, вроде IBM MQ).


Оптимальные методы создания топиков и изменения их конфигурации уже тщательно продуманы за вас, так что не волнуйтесь.


Но! У топиков Pulsar есть то же ограничение!


Пропускная способность для записи зависит от числа партиций, назначенных топику Pulsar, точно так же, как это происходит в топике Kafka, поэтому там тоже придется создавать партиции с запасом. Дело в том, что в каждой партиции записывать можно только в один ledger за раз (а их может быть несколько). Увеличение числа партиций динамически влияет на порядок сообщений, как и в Kafka (порядок нарушается).


У Kafka и Pulsar с масштабированием все отлично. Этого будет более чем достаточно почти в любом сценарии.


Если вам нужен совсем заоблачный масштаб, лучше взять реализацию без ZooKeeper. Так что KIP-500 это самое ожидаемое изменение в Kafka, судя по сообществу и заказчикам Confluent.


Миф 10. В случае аппаратного сбоя Pulsar восстанавливается моментально, а Kafka приходится перегружать данные.


И да, и нет.


Если брокер Pulsar вырубится, ничего не страшного не будет, это правда, но, в отличие от брокера Kafka, он и не хранит данные, а просто выступает в качестве прокси для уровня хранения, то есть BookKeeper. Так что подобные заявления о Pulsar это просто маркетинговый ход. Почему никто не говорит, что бывает, если полетит нода BookKeeper (bookie)?


Если завершить и перезапустить BookKeeper bookie, данные будут точно так же перераспределяться, как и в Kafka. В этом суть распределенных систем с их репликацией и партициями.


Kafka уже предлагает эластичность.


Это важно. Основатель Confluent, Джей Крепс (Jay Kreps) недавно писал об этом: Эластичные кластеры Apache Kafka в Confluent Cloud. В облачном сервисе SaaS, вроде Confluent Cloud, конечный пользователь не думает об аппаратных сбоях. Он ожидает непрерывный аптайм и SLA на уровне 99,xx. С оплатой за потребление пользователь не заботится об управлении брокерами, изменении размеров нод, увеличении или уменьшении кластеров и подобных деталях.


Самоуправляемым кластерам Kafka нужны такие же возможности. Tiered Storage for Kafka это огромное хранилище, с которым данные не хранятся на брокере и восстановление после сбоев происходит почти моментально. Если добавить сюда такие инструменты, как Self-Balancing Kafka (эта фича от Confluent описана в статье по ссылке выше), можно вообще забыть об эластичности в самоуправляемых кластерах.


К сожалению, в Pulsar вы ничего подобного не найдете.


Миф 11. Pulsar справляется с репликацией между кластерами лучше, чем Kafka.


Неправда.


Любая распределенная система должна решать такие проблемы, как CAP-теорема и кворум в распределенных вычислениях. Кворум это минимальное число голосов, которое распределенная транзакция должна получить, чтобы выполнить операцию в распределенной системе. Такой подход позволяет обеспечить согласованность операций.


Задачи по кворуму Kafka поручает ZooKeeper. Даже после реализации KIP-500 и удаления ZooKeeper законы физики не перестанут действовать: проблемы задержки в распределенных системах существуют в таких регионах, как Восточная, Центральная и Западная часть США и даже по всему миру. Скорость света, конечно, впечатляет, но все же имеет ограничения.


Эту проблему можно обойти разными способами, включая использование инструментов репликации в реальном времени, например MirrorMaker 2 в Apache Kafka, Confluent Replicator или Confluent Multi-Region-Clusters. Эти варианты и советы см. в статье Архитектурные шаблоны для распределенных, гибридных, периферийных и глобальных деплоев Apache Kafka.



Вы не найдете универсальное решение, которое обеспечит глобальную репликацию + нулевой простой + нулевую потерю данных. Для самых критически важных приложений Confluent Multi-Region-Clusters предлагает RTO=0 и RPO=0 (нулевой простой и нулевую потерю данных) с автоматическим аварийным восстановлением и отработкой отказа, даже если упадет целый датацентр или облачный регион.


Для таких случаев архитектура Pulsar будет еще сложнее, чем ее базовая версия. Просто для георепликации Pulsar требует дополнительный глобальный кластер Zookeeper, а для больших расстояний это не годится. Обходной путь есть, но от CAP-теоремы и законов физики не уйдешь.


Что с Kafka, что с Pulsar вам нужна проверенная архитектура для борьбы с физикой в глобальных деплоях.


Миф 12. Pulsar совместим с интерфейсом и API Kafka.


Отчасти.


Pulsar предлагает простейшую реализацию с зачаточной совместимостью с протоколом Kafka v2.0.


У Pulsar есть конвертер для базовых элементов протокола Kafka.


В теории совместимость с Kafka выглядит убедительно, но вряд ли это серьезный аргумент для переноса действующей инфраструктуры Kafka в Pulsar. Зачем такой риск?


Мы слышали заявления о совместимости с Kafka и в других примерах, например для гораздо более зрелого сервиса Azure Event Hubs. Почитайте об ограничивающих факторах их Kafka API. Вы удивитесь. Никакой поддержки основных функций Kafka, вроде транзакций (и семантики exactly-once), сжатия или compaction для логов.


Раз, по сути, это не Kafka, существующие приложения Kafka будут вести себя непредсказуемо в таких вот совместимых системах, будь то Azure Event Hubs, Pulsar или другая оболочка.


Kafka vs. Pulsar комплексное сравнение


Выше мы говорили о мифах, которые гуляют по разным статьям и постам. С ними, вроде, все понятно.


Но я обещал, что мы поговорим не только о технических деталях. При выборе технологии важны и другие аспекты.


Рассмотрим три из них: доля на рынке, корпоративная поддержка и облачные предложения.


Доля на рынке у Apache Kafka и Apache Pulsar


Статистика в Google Trends за последние пять лет совпадает с моими личными наблюдениями интерес к Apache Kafka гораздо выше, чем к Apache Pulsar:



Примерно так же выглядит картинка сравнения на Stack Overflow и аналогичных платформах, по числу и размеру поддерживающих вендоров, открытой экосистеме (интеграция инструментов, обертки, вроде Spring Kafka) и подобным характеристикам.


Открытые вакансии еще один индикатор распространения технологии. Для Pulsar их мало, то есть его использует не так много компаний. Убедитесь сами поищите на любом сайте. Если искать по всему миру, для Pulsar вакансий меньше сотни, а для Kafka тысячи. Кроме того, в большинстве вакансий для Pulsar указано, что требуется опыт с Kafka, Pulsar, Kinesis и аналогичными технологиями.


В большинстве случаев такие характеристики важнее для успешного проекта, чем технические детали. В конце концов ваша цель решить определенную бизнес-задачу, правда же?


Раз Pulsar так мало распространен, почему о нем вообще говорят? Во-первых, потому, что независимые консультанты, аналитики и блоггеры (включая меня) должны рассказывать о новых современных технологиях, чтобы привлечь аудиторию. Если честно, люди любят об этом читать.


Корпоративная поддержка для Kafka и Pulsar


Корпоративная поддержка для Kafka и Pulsar существует.


Но все не совсем так, как можно ожидать. Вот несколько вендоров, к которым вы можете обратиться, если решили работать с Pulsar:


  • Streamlio (теперь принадлежит Splunk) компания, которая раньше стояла за Apache Pulsar. Splunk пока не объявили о своей будущей стратегии для поддержки пользователей, которые работают над проектами на основе Pulsar. Splunk славится своими популярными аналитическими платформами. Это их основной бизнес (1,8 млрд долларов в 2019 году). Единственное, на что жалуются пользователи, ценник. Splunk активно используют Kafka, а сейчас интегрируют Pulsar в Splunk Data Stream Processor (DSP). Сильно сомневаюсь, что Splunk вдруг увлечется опенсорсом, чтобы поддержать ваш драгоценный проект на базе Pulsar (возможно, чего-то можно ожидать от DSP). Будущее покажет.
  • StreamNative основана одним из изначальных разработчиков Apache Pulsar и поддерживает платформу стриминга на основе Pulsar. На июнь 2020 года у StreamNative было 13 (целых 13, да!) сотрудников в LinkedIn. Уж не знаю, хватит этого для поддержки вашего критически важного проекта или нет.
  • TIBCO объявили о поддержке Pulsar в декабре 2019 года. За последние годы стратегия у них сместилась с интеграции на аналитику. Пользователи их промежуточного ПО уходят от TIBCO толпами. В отчаянии они приняли стратегическое решение: поддерживать другие платформы, даже если у них нет никакого опыта и никакой связи с ними. Да, звучит как миф, но TIBCO делают то же самое для Kafka. Любопытный факт: TIBCO предлагает Kafka и ZooKeeper в Windows! Такого больше никто не делает. Все знают, это приводит к нестабильности и несогласованности. Но если что, TIBCO помогут вам с Kafka и Pulsar. Зачем вообще сравнивать эти платформы, если можно получить обе у одного вендора, причем даже на Windows, с .exe и bat-скриптами для запуска серверных компонентов:


Вендоров для Kafka больше с каждым днем.


Это очень распространенная на рынке платформа. Лучшее подтверждение поддержка от крупнейших вендоров. IBM, Oracle, Amazon, Microsoft и многие другие поддерживают Kafka и встраивают возможности интеграции с ней в свои продукты.


Окончательное подтверждение этому я получил на конференции Oracle OpenWorld 2019 в Сан-Франциско, где выступал менеджер по GoldenGate (великолепный и очень дорогой инструмент CDC от Oracle). В основном он рассказывал, как GoldenGate станет платформой интеграции данных вообще для всего. Половина выступления была посвящена стримингу, платформе Kafka и тому, что GoldenGate будет обеспечивать интеграцию с разными базами/озерами данных и Kafka в обоих направлениях.


Полностью управляемые облачные сервисы для Kafka и Pulsar


Какие облачные решения доступны для Kafka и Pulsar?


Для Apache Pulsar есть облачный сервис с очень говорящим названием:


Kafkaesque.


Я серьезно. Сами посмотрите [UPD: 17 июня они переименовали KAFKAESQUE в KESQUE. Видимо, поняли, всю комичность ситуации.]


Для Apache Kafka существуют разные предложения на выбор:


  • Confluent Cloud (SaaS) полностью управляемый сервис с оплатой по мере использования, круглосуточной доступностью, эластичностью и бессерверными функциями для Apache Kafka и связанной экосистемы (Schema Registry, коннекторы Kafka Connect и ksqlDB для обработки потоков).
  • Amazon MSK (PaaS) подготавливает ZooKeeper и брокеры Kafka, а конечные пользователи сами обслуживают их, исправляют баги, устанавливают обновления и т. д. Важный факт: AWS не включает проблемы с Kafka в поддержку и SLA 99,95.
  • Azure Event Hubs (SaaS) предоставляет конечную точку Kafka (с проприетарной реализацией) для взаимодействия с приложениями Kafka. Это легко масштабируемое решение с высокой производительностью. Это не вполне Kafka, просто эмуляция, и тут не хватает некоторых важных функций, вроде семантики exactly-once, compaction логов и сжатия. Не говоря уже о дополнительных возможностях, вроде Kafka Connect и Kafka Streams
  • Big Blue (IBM) и Big Red (Oracle) предлагают облачные решения для Kafka и соответствующих API. Не знаю, использует их кто-нибудь или нет. Понятия не имею, насколько они хороши, никогда не видел их в деле.
  • Много компаний поменьше, вроде Aiven, CloudKarafka, Instaclustr и других.

В общем, про распространенность Kafka и Pulsar на рынке все очевидно.


И все же Apache Kafka или Apache Pulsar?


В двух словах: Pulsar пока здорово отстает от Kafka по зрелости в плане надежности для больших масштабов и обширности сообщества.


И вообще не факт, что Pulsar лучше.


Есть смысл сравнивать их, если вы решили использовать только опенсорс-инструменты. Узнайте, что подойдет лучше именно вам. Не забудьте учесть технические характеристики, уровень зрелости, вендоров, сообщество разработчиков и другие важные факторы. Что вписывается в вашу ситуацию?


Если в Kafka и Pulsar есть не все нужные вам функции, выбирайте Kafka: для нее есть много вендоров и облачных решений. Pulsar, к сожалению, сильно отстает и вряд ли догонит в ближайшем будущем.

Подробнее..

Использование Spring Cloud Stream Binding с брокером сообщений Kafka

15.04.2021 16:11:54 | Автор: admin

Всем привет! Меня зовут Виталий, я разработчик в компании Web3Tech. В этом посте я представлю основные концепции и конструкции платформы Spring Cloud Stream для поддержки и работы с брокерами сообщений Kafka, с полным циклом их контекстного unit-тестирования. Мы используем такую схему в своем проекте всероссийского электронного голосования на блокчейн-платформе Waves Enterprise.

Являясь частью группы проектов Spring Cloud, Spring Cloud Stream основан на Spring Boot и использует Spring Integration для обеспечения связи с брокерами сообщений. При этом он легко интегрируется с различными брокерами сообщений и требует минимальной конфигурации для создания event-driven или message-driven микросервисов.

Конфигурация и зависимости

Для начала нам нужно добавить зависимость spring-cloud-starter-stream-kafka в build.gradle:

dependencies {   implementation(kotlin("stdlib"))   implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")   implementation("com.fasterxml.jackson.module:jackson-module-kotlin")   implementation("org.springframework.boot:spring-boot-starter-web")   implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")   testImplementation("org.springframework.boot:spring-boot-starter-test")   testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")   testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")}

В конфигурацию проекта Spring Cloud Stream необходимо включить URL Kafka-брокера, имя очереди (топик) и другие параметры биндинга. Вот пример YAML-конфигурации для сервиса application.yaml:

spring: application:   name: cloud-stream-binding-kafka-app cloud:   stream:     kafka:       binder:         brokers: 0.0.0.0:8080         configuration:           auto-offset-reset: latest     bindings:       customChannel:                   #Channel name         destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)         group: input-group-N         contentType: application/json         consumer:           max-attempts: 1           autoCommitOffset: true           autoCommitOnError: false

Концепция и классы

По сути, мы имеем дело с сервисом, построенным на Spring Cloud Stream, который прослушивает входящую очередь, используя биндинги (SpringCloudStreamBindingKafkaApp.kt):

@EnableBinding(ProducerBinding::class)@SpringBootApplication  class SpringCloudStreamBindingKafkaApp fun main(args: Array<String>) { SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args) }

Аннотация @EnableBinding указывает сервису на биндинг как входящего, так и исходящего канала.

Здесь необходимо уточнить ряд концепций.

Binding интерфейс, в котором описаны входящие и исходящие каналы.
Binder имплементация middleware для сообщений.
Channel представляет канал для передачи сообщений между middleware и приложением.
StreamListeners методы обработки сообщений в виде бинов (beans), которые будут автоматически вызваны после того, как MessageConverter осуществит сериализацию или десериализацию между событиями в middleware и типами объектов в домене DTO.
Message Schema схемы, используемые для сериализации и десериализации сообщений. Могут быть прочитаны из источника или динамически загружены.

Тестирование

Чтобы протестировать сообщение и операции send/receive, нам нужно создать как минимум одного producer и одного consumer. Вот простейший пример того, как это можно сделать в Spring Cloud Stream.

Инстанс бина Producer будет отправлять сообщение в топик Kafka, используя биндер (ProducerBinding.kt):

interface ProducerBinding {   @Output(BINDING_TARGET_NAME)   fun messageChannel(): MessageChannel}

Инстанс бина Сonsumer будет слушать топик Kafka и получать сообщения.

ConsumerBinding.kt:

interface ConsumerBinding {   companion object {       const val BINDING_TARGET_NAME = "customChannel"   }   @Input(BINDING_TARGET_NAME)   fun messageChannel(): MessageChannel}

Consumer.kt:

@EnableBinding(ConsumerBinding::class)class Consumer(val messageService: MessageService) {   @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)   fun process(       @Payload message: Map<String, Any?>,       @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?   ) {       messageService.consume(message)   }}

Мы создали брокер Kafka с топиком. Для тестирования будем использовать встроенную Kafka, доступную нам с зависимостью spring-kafka-test.

Функциональное тестирование с MessageCollector

Мы имеем дело с имплементацией биндера, позволяющей взаимодействовать с каналами и получать сообщения. Отправим сообщение в канал ProducerBinding и затем получим его в виде payloadProducerTest.kt:

@SpringBootTestclass ProducerTest {   @Autowired   lateinit var producerBinding: ProducerBinding   @Autowired   lateinit var messageCollector: MessageCollector   @Test   fun `should produce somePayload to channel`() {       // ARRANGE       val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)       // ACTproducerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())       val payload = messageCollector.forChannel(producerBinding.messageChannel())           .poll()           .payload       // ASSERT       val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)       assertTrue(request.entries.stream().allMatch { re ->           re.value == payloadAsMap[re.key.toString()]       })       messageCollector.forChannel(producerBinding.messageChannel()).clear()   }}

Тестирование с брокером Embedded Kafka

Используем аннотацию @ClassRule для создания брокера. Так мы сможем поднять сервера Kafka и Zookeeper на случайном порте перед началом теста и выключить их, когда тест завершится. Это избавляет нас от необходимости в рабочем инстансе Kafka и Zookeper на всё время проведения теста (ConsumerTest.kt):

@SpringBootTest@ActiveProfiles("test")@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])@EnableBinding(ProducerBinding::class)class ConsumerTest {   @Autowired   lateinit var producerBinding: ProducerBinding   @Autowired   lateinit var objectMapper: ObjectMapper   @MockBean   lateinit var messageService: MessageService   companion object {       @ClassRule @JvmField       var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")   }   @Test   fun `should consume via txConsumer process`() {       // ACT       val request = mapOf(1 to "foo", 2 to "bar")       producerBinding.messageChannel().send(MessageBuilder.withPayload(request)           .setHeader("someHeaderName", "someHeaderValue")           .build())       // ASSERT       val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))       runBlocking {           delay(20)           verify(messageService).consume(requestAsMap)       }   }}

Заключение

В этом посте я продемонстрировал возможности Spring Cloud Stream и использования его с Kafka. Spring Cloud Stream предлагает удобный интерфейс с упрощенными нюансами настройки брокера, быстро внедряется, стабильно работает и поддерживает современные популярные брокеры, такие как Kafka. По итогам я привел ряд примеров с unit-тестированием на основе EmbeddedKafkaRule с использованием MessageCollector.

Все исходники можно найти на Github. Спасибо за прочтение!

Подробнее..

Kafka Streams непростая жизнь в production

14.01.2021 16:09:15 | Автор: admin

Привет, Хабр! Вокруг меня сформировался позитивный информационный фон на тему обработки событий через Kafka Streams. Этот инструмент привлекает множеством видео-докладов и статей на Хабре, подробной документацией, понятным API и красивой архитектурой. Некоторые мои знакомые и коллеги разрабатывают с его помощью свои системы. Но что происходит с в реальной жизни, когда эти системы уходят в production?

В этой статье я опущу введение в Kafka Streams, предполагая, что читатель уже знаком с ней, и расскажу о нашем опыте жизни с этой библиотекой на примере достаточно нагруженной системы.

Коротко о проекте

Внутренней командой вместе с партнерами мы работаем над биржой Ad Exchange, которая помогает перепродавать рекламный трафик. Специфику подобных инструментов мы уже когда-то описывали в статье на Хабре. По мере роста числа партнеров среди SSP и DSP, нагрузка на сервера биржи растет. А для повышения ценности самой биржи мы должны собирать с этого трафика развернутую аналитику. Тут-то мы и попытались применить Kafka Streams.

Внедрить новый инструмент у себя на продакшене - это всегда некоторый риск. Мы его осознавали, но смирились с этим, поскольку в целом Kafka Streams концептуально должен был хорошо подойти для расчета агрегатов. И хотя первое впечатление было отличным, далее я расскажу о проблемах, к которым это привело.

Статья не претендует на объективность и описывает только лишь наш опыт и проблемы, с которыми мы столкнулись, работая с этой технологией. Надеюсь, кому-то она поможет избежать ошибок при использовании Kafka Streams. А может быть даст повод посмотреть по сторонам.

Агрегаты

В нашу систему приходят десятки и сотни тысяч сообщений в секунду. Первое, что нам необходимо было сделать, - агрегации по различным ключам. Например, идёт поток событий "Показ рекламы". Для него надо считать на лету общую стоимость и количество, группируя по различным ключам: стране, партнёру, рекламной площадке и т.д.

Вроде бы стандартный кейс для Kafka Streams: используем функции groupBy и aggregate и получаем нужный результат. Всё работает именно так, причем с отличной скоростью за счёт внутреннего кэша: несколько последовательных изменений по одному и тому же ключу сначала выполняются в кэше и только в определённые моменты отправляются в changelog-топик. Далее Kafka в фоне удаляет устаревшие дублирующиеся ключи через механизм log compaction. Что здесь может пойти не так?

Репартиционирование

Если ваш ключ группировки отличается от ключа, под которым изначально пришло событие, то Kafka Streams создаёт специальный repartition-топик, отправляет в него событие уже под новым ключом, а потом считывает его оттуда и только после этого проводит агрегацию и отправку в changelog-топик. В нашем примере вполне может быть, что событие "Показ рекламы" пришло с ключом в виде UUID. Почему нет? Если вам надо сделать группировку, например по трём другим ключам, то это будет три разных repartition-топика. На каждый топик будет одно дополнительное чтение и одна дополнительная запись в Kafka. Чувствуете, к чему я веду?

Предположим, на входе у вас 100 тысяч показов рекламы в секунду. В нашем примере вы создадите дополнительную нагрузку на брокер сообщений в размере +600 тысяч сообщений в секунду (300 на запись и 300 на чтение). И ведь не только на брокер. Для таких объёмов надо добавлять дополнительные сервера с сервисами Kafka Streams. Можете посчитать, во сколько тысяч долларов обойдется такое решение с учётом цен на железо.

Для читателей, не очень хорошо знакомых с механизмом репартиционирования, я поясню один момент. Это не баг или недоработка Kafka Streams. С учётом её идеологии и архитектуры это единственное возможное поведение - его нельзя просто взять и отключить. Когда у сообщения меняется ключ, этот новый ключ надо равномерно "размазать" по кластеру так, чтобы каждый инстанс Kafka Streams имел собственный набор ключей. Для этого и служат дополнительные запись/чтение в repartition-топик. При этом если инстанс А записал в топик сообщение, то не факт, что он же его и прочитает. Это может сделать инстанс Б, В и т.д. в зависимости от того, кто какую партицию читает. В итоге каждый ключ группировки у вас будет более-менее равномерно распределён по серверам кластера (если вы его хэшируете, конечно).

Запросы к агрегатам

Данные пишут для того, чтобы с ними потом можно было работать. Например делать к ним запросы. И здесь наши возможности оказались серьезно ограничены. Исчерпывающий список того, как мы можем запрашивать данные у Kafka Streams, есть здесь или здесь для оконных агрегатов. Если используется стандартная реализация state-store на основе RocksDB (а скорее всего это так), фактически данные можно получать только по ключам.

Предположу, что первое, что вам хочется сделать с полученными данными, - это фильтрация, сортировка и пагинация. Но здесь таких возможностей нет. Максимум, что вы можете, - это считать всё, что есть, используя метод all(), а потом вручную делать нужные операции. Вам повезет, если ключей не слишком много и данные уместятся в оперативной памяти. Нам не повезло. Пришлось писать дополнительный модуль, который по ночам выгружал данные из RocksDB и отправлял в Postgres.

Ещё надо помнить, что на каждом отдельно взятом сервисе находится только часть ключей. Что если вам необходимо отдавать ваши агрегаты, скажем, по HTTP? Вот кто-то сделал запрос к одному из сервисов Kafka Streams: мол, дай мне такие-то данные. Сервис смотрит у себя локально - данных нет. Или есть, но какая-то часть. Другая часть может быть на другом сервере. Или на всех. Что делать? Документация Kafka Streams говорит, что это наши проблемы.

Стабильность Kafka Streams

У нас бывают проблемы с хостинг провайдером. Иногда выходит из строя оборудование, иногда человеческий фактор, иногда и то, и другое. Если по какой-то причине теряется связь с Kafka, то Kafka Streams переводит все свои потоки в состояние DEAD и ждёт, когда мы проснёмся и сделаем ей рестарт. При этом рядом стоят соседние сервисы, которые работают с той же Kafka через Spring и @KafkaListener. Они восстанавливаются сами, как ни в чём не бывало.

Kafka Streams может умереть и по другой причине: не пойманные исключения в вашем коде. Предположим, вы делаете какой-то внешний вызов внутри потоков Kafka Streams. Пусть это будет обращение к базе данных. Когда база данных падает, у вас два варианта. Первый - вы ловите эту ошибку и продолжаете работать дальше. Но если база будет лежать долго, а работа с ней - критична, вы будете считывать большое количество сообщений из топиков и некорректно их обрабатывать. Второй вариант кажется лучше - совсем не ловить исключение и дать Kafka Streams умереть. Дальше как обычно: ночной подъём, рестарт всех серверов с Kafka Streams. "Из коробки" у вас нет варианта подождать, пока база восстановится, и продолжить работу.

Нам пришлось дописать в каждый сервис Kafka Streams дополнительный модуль, который работает как watchdog: поднимает Kafka Streams, если видит, что она умерла.

Кстати, если вы работаете с Kafka Streams через Spring, то не забудьте переопределить стандартный StreamsBuilderFactoryBean, указав в нём свой CleanupConfig. Иначе будете неприятно удивлены тем, что при каждом рестарте будет удаляться вся локальная база RocksDB. Напомню, что это приведёт к тому, что при каждом рестарте все сервера начнут активно считывать данные из changelog-топика. Поверьте, вам это не нужно.

KStream-KStream Join

Здесь можно было бы обойтись одной фразой: никогда это не используйте. Джоин двух потоков создаёт десятки топиков в Kafka и огромную нагрузку на всю систему. Просто не делайте этого. Ну или хотя бы проверьте все под нагрузкой, прежде чем ставить в production.

Вообще Kafka Streams любит создавать топики под различные свои нужды. Если вы не изучили под лупой документацию и не протестировали, как это работает, то это может стать для вас и ваших DevOps неприятным сюрпризом. Чем больше топиков, тем сложнее их администрировать.

Масштабируемость

Любой человек, знакомый с Kafka, скажет, что масштабируемость - одна из главных её преимуществ: "В чём проблема? Добавляем партиций и радуемся". Всё так. Но не для Kafka Streams.

Если вы используете джоины, то все ваши топики должны быть ко-партиционированы (co-partitioning), что, в числе прочего, означает, что у них должно быть одинаковое количество партиций. Так в чём же проблема?

Представим, что вы сделали несколько топологий: с агрегатами, джоинами, всё как положено. Поставили в production, оно работает, вы радуетесь. Дальше бизнес пошел в гору, нагрузка начала расти, вы добавили серверов и хотите увеличить количество партиций, чтобы загрузить эти новые сервера. Но Kafka Streams наплодил десятки топиков. И у всех надо поднимать количество партиций, иначе следующий рестарт ваших сервисов может стать последним. Если Kafka Streams увидит, что топики не ко-партиционированы, она выдаст ошибку и просто не запустится.

На вопрос, что с этим делать, у меня сегодня ответа нет. Вероятно, можно потушить все рабочие инстансы Kafka Streams, потом поднять число партиций на всех причастных топиках, затем поднять Kafka Streams обратно и молиться. А может быть последовать совету отсюда: Matthias J. Sax пишет, что это нужно делать, создавая новый топик с новым количеством партиций и подключать к нему Kafka Streams с новым application.id. Там же есть совет, что если вы знаете заранее, что нагрузка будет большая, то лучше сделать партиций с запасом.

Заключение

Если у вас в системе нет и не планируется высоких нагрузок, то со всеми этими проблемами вы скорее всего не столкнётесь. Всё будет работать отлично. В противном случае стоит серьёзно задуматься, нужно ли вам это, особенно если планируете использовать следующий уровень абстракции - KSQL.

Автор статьи: Андрей Буров, Максилект.

P.S. Мы публикуем наши статьи на нескольких площадках Рунета. Подписывайтесь на наши страницы в VK, FB, Instagram или Telegram-канал, чтобы узнавать обо всех наших публикациях и других новостях компании Maxilect.

Подробнее..

Собираем данные AlphaVantage с Faust. Часть 1. Подготовка и введение

20.09.2020 14:22:27 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть I Введение

20.09.2020 16:05:00 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть II Агенты и Команды

23.09.2020 04:06:02 | Автор: admin

Оглавление

  1. Часть I: Введение

  2. Часть II: Агенты и Команды

Что мы тут делаем?

Итак-итак, вторая часть. Как и писалось ранее, в ней мы сделаем следующее:

  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.

  2. Сделаем агента, который будет собирать данные о ценных бумагах и мета информацию по ним.

Но, это то, что мы сделаем для самого проекта, а в плане исследования faust мы узнаем, как писать агентов, обрабатывающих стрим событий из kafka, а так же как написать команды (обёртка на click), в нашем случаи - для ручного пуша сообщения в топик, за которым следит агент.

Подготовка

Клиент AlphaVantage

Для начала, напишем небольшой aiohttp клиентик для запросов на alphavantage.

alphavantage.py

Spoiler
import urllib.parse as urlparsefrom io import StringIOfrom typing import Any, Dict, List, Unionimport aiohttpimport pandas as pdimport stringcasefrom loguru import loggerfrom horton.config import API_ENDPOINTclass AlphaVantageClient:    def __init__(        self,        session: aiohttp.ClientSession,        api_key: str,        api_endpoint: str = API_ENDPOINT,    ):        self._query_params = {"datatype": "json", "apikey": api_key}        self._api_endpoint = api_endpoint        self._session = session    @logger.catch    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:        formatted_data = {}        for field, item in data.items():            formatted_data[stringcase.snakecase(field)] = item        return formatted_data    @logger.catch    async def _construct_query(        self, function: str, to_json: bool = True, **kwargs    ) -> Union[Dict[str, Any], str]:        path = "query/"        async with self._session.get(            urlparse.urljoin(self._api_endpoint, path),            params={"function": function, **kwargs, **self._query_params},        ) as response:            data = (await response.json()) if to_json else (await response.text())            if to_json:                data = self._format_fields(data)        return data    @logger.catch    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)        data = pd.read_csv(StringIO(data))        securities = data.to_dict("records")        for index, security in enumerate(securities):            security = self._format_fields(security)            security["_type"] = "physical"            securities[index] = security        return securities    @logger.catch    async def get_security_overview(self, symbol: str) -> Dict[str, str]:        return await self._construct_query("OVERVIEW", symbol=symbol)    @logger.catch    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query(            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"        )    @logger.catch    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)    @logger.catch    async def get_indicator_data(        self, symbol: str, indicator: str, **indicator_options    ) -> Dict[str, Any]:        return await self._construct_query(            indicator, symbol=symbol, **indicator_options        )

Собственно по нему всё ясно:

  1. API AlphaVantage достаточно просто и красиво спроектирована, поэтому все запросы я решил проводить через метод construct_query где в свою очередь идёт http вызов.

  2. Все поля я привожу к snake_case для удобства.

  3. Ну и декорация logger.catch для красивого и информативного вывода трейсбека.

P.S. Незабываем локально добавить токен alphavantage в config.yml, либо экспортировать переменную среды HORTON_SERVICE_APIKEY. Получаем токен тут.

CRUD-класс

У нас будет коллекция securities для хранения мета информации о ценных бумагах.

database/security.py

Тут по-моему ничего пояснять не нужно, а базовый класс сам по себе достаточно прост.

get_app()

Добавим функцию создания объекта приложения в app.py

Spoiler
import faustfrom horton.config import KAFKA_BROKERSdef get_app():    return faust.App("horton", broker=KAFKA_BROKERS)

Пока у нас будет самое простое создание приложения, чуть позже мы его расширим, однако, чтобы не заставлять вас ждать, вот референсы на App-класс. На класс settings тоже советую взглянуть, так как именно он отвечает за большую часть настроек.

Основная часть

Агент сбора и сохранения списка ценных бумаг

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Так, сначала получаем объект faust-приложения - это достаточно просто. Далее, мы явно объявляем топик для нашего агента... Тут стоит упомянуть, что это такое, что за параметр internal и как это можно устроить по-другому.

  1. Топики в kafka, если мы хотим узнать точное определение, то лучше прочитать офф. доку, либо можно прочитать конспект на хабре на русском, где так же всё достаточно точно отражено :)

  2. Параметр internal, достаточно хорошо описанный в доке faust, позволяет нам настраивать топик прямо в коде, естественно, имеются ввиду параметры, предусмотренные разработчиками faust, например: retention, retention policy (по-умолчанию delete, но можно установить и compact), кол-во партиций на топик (partitions, чтобы сделать, например, меньшее чем глобальное значение приложения faust).

  3. Вообще, агент может создавать сам управляемый топик с глобальными значениями, однако, я люблю объявлять всё явно. К тому же, некоторые параметры (например, кол-во партиций или retention policy) топика в объявлении агента настроить нельзя.

    Вот как это могло было выглядеть без ручного определения топика:

app = get_app()@app.agent()async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Ну а теперь, опишем, что будет делать наш агент :)

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for _ in stream:            logger.info("Start collect securities")            client = AlphaVantageClient(session, API_KEY)            securities = await client.get_securities()            for security in securities:                await SecurityCRUD.update_one(                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True                )            yield True

Итак, в начале агента мы открываем aiohttp сессию для запросов через наш клиент. Таким образом, при запуске воркера, когда будет запущен наш агент, сразу же будет открыта сессия - одна, на всё время работы воркера (или несколько, если изменить параметр concurrency у агента с дефолтной единички).

Далее, мы идём по стриму (сообщение мы помещаем в _, так как нам, в данном агенте, безразлично содержание) сообщений из нашего топика, если они есть при текущем сдвиге (offset), иначе, наш цикл будет ожидать их поступления. Ну а внутри нашего цикла, мы логируем поступление сообщения, получаем список активных (get_securities возвращает по-умолчания только active, см. код клиента) ценных бумаг и сохраняем его в базу, проверяя при этом, есть ли бумага с таким тикером и биржей в БД, если есть, то она (бумага) просто обновится.

Запустим наше творение!

> docker-compose up -d... Запуск контейнеров ...> faust -A horton.agents worker --without-web -l info

P.S. Возможности веб-компонента faust я рассматривать в статьях не буду, поэтому выставляем соответствующий флаг.

В нашей команде запуска мы указали faust'у, где искать объект приложения и что делать с ним (запустить воркер) с уровнем вывода логов info. Получаем следующий вывод:

Spoiler
aS v1.10.4 id           horton                                             transport    [URL('kafka://localhost:9092')]                    store        memory:                                            log          -stderr- (info)                                    pid          1271262                                            hostname     host-name                                          platform     CPython 3.8.2 (Linux x86_64)                       drivers                                                           transport  aiokafka=1.1.6                                       web        aiohttp=3.6.2                                      datadir      /path/to/project/horton-data                       appdir       /path/to/project/horton-data/v1                   ... логи, логи, логи ...Topic Partition Set topic                       partitions  collect_securities          {0-7}       horton-__assignor-__leader  {0}         

Оно живое!!!

Посмотрим на partition set. Как мы видим, был создан топик с именем, которое мы обозначили в коде, кол-во партиций дефолтное (8, взятое из topic_partitions - параметра объекта приложения), так как у нашего топика мы индивидуальное значение (через partitions) не указывали. Запущенному агенту в воркере отведены все 8 партициций, так как он единственный, но об этом будет подробнее в части про кластеринг.

Что же, теперь можем зайти в другое окно терминала и отправить пустое сообщение в наш топик:

> faust -A horton.agents send @collect_securities{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. с помощью @ мы показываем, что посылаем сообщение в топик с именем "collect_securities".

В данном случае, сообщение ушло в 6 партицию - это можно проверить, зайдя в kafdrop на localhost:9000

Перейдя в окно терминала с нашим воркером, мы увидим радостное сообщение, посланное с помощью loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Так же, можем заглянуть в mongo (с помощью Robo3T или Studio3T) и увидеть, что ценные бумаги в базе:

Я не миллиардер, а потому, довольствуемся первым вариантом просмотра.

Счастье и радость - первый агент готов :)

Агент готов, да здравствует новый агент!

Да, господа, нами пройдена только 1/3 пути, уготованного этой статьёй, но не унывайте, так как сейчас будет уже легче.

Итак, теперь нам нужен агент, который собирает мета информацию и складывает её в документ коллекции:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[?],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            ...

Так как этот агент будет обрабатывать информацию о конкретной security, нам нужно в сообщении указать тикер (symbol) этой бумаги. Для этого в faust существуют Records - классы, декларирующие схему сообщения в топике агента.

В таком случае перейдём в records.py и опишем, как должно выглядеть сообщение у этого топика:

import faustclass CollectSecurityOverview(faust.Record):    symbol: str    exchange: str

Как вы уже могли догадаться, faust для описания схемы сообщения использует аннотацию типов в python, поэтому и минимальная версия, поддерживаемая библиотекой - 3.6.

Вернёмся к агенту, установим типы и допишем его:

collect_security_overview_topic = app.topic(    "collect_security_overview", internal=True, value_type=CollectSecurityOverview)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[CollectSecurityOverview],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            logger.info(                "Start collect security [{symbol}] overview", symbol=event.symbol            )            client = AlphaVantageClient(session, API_KEY)            security_overview = await client.get_security_overview(event.symbol)            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)            yield True

Как видите, мы передаём в метод инициализации топика новый параметр со схемой - value_type. Далее, всё по той же самой схеме, поэтому останавливаться на чём то ещё - смысла не вижу.

Ну что же, последний штрих - добавим в collect_securitites вызов агента сбора мета информации:

....for security in securities:    await SecurityCRUD.update_one({            "symbol": security["symbol"],            "exchange": security["exchange"]        },        security,        upsert = True,    )    await collect_security_overview.cast(        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])    )....

Используем ранее объявлению схему для сообщения. В данном случае, я использовал метод .cast, так как нам не нужно ожидать результат от агента, но стоит упомянуть, что способов послать сообщение в топик:

  1. cast - не блокирует, так как не ожидает результата. Нельзя послать результат в другой топик сообщением.

  2. send - не блокирует, так как не ожидает результата. Можно указать агента в топик которого уйдёт результат.

  3. ask - ожидает результата. Можно указать агента в топик которого уйдёт результат.

Итак, на этом с агентами на сегодня всё!

Команда мечты

Последнее, что я обещал написать в этой части - команды. Как уже говорилось ранее, команды в faust - это обёртка над click. Фактически faust просто присоединяет нашу кастомную команду к своему интерфейсу при указании ключа -A

После объявленных агентов в agents.py добавим функцию с декоратором app.command, вызывающую метод cast у collect_securitites:

@app.command()async def start_collect_securities():    """Collect securities and overview."""    await collect_securities.cast()

Таким образом, если мы вызовем список команд, в нём будет и наша новая команда:

> faust -A horton.agents --help....Commands:  agents                    List agents.  clean-versions            Delete old version directories.  completion                Output shell completion to be evaluated by the...  livecheck                 Manage LiveCheck instances.  model                     Show model detail.  models                    List all available models as a tabulated list.  reset                     Delete local table state.  send                      Send message to agent/topic.  start-collect-securities  Collect securities and overview.  tables                    List available tables.  worker                    Start worker instance for given app.

Ею мы можем воспользоваться, как любой другой, поэтому перезапустим faust воркер и начнём полноценный сбор ценных бумаг:

> faust -A horton.agents start-collect-securities

Что будет дальше?

В следующей части мы, на примере оставшихся агентов, рассмотрим, механизм sink для поиска экстремум в ценах закрытия торгов за год и cron-запуск агентов.

На сегодня всё! Спасибо за прочтение :)

Код этой части

P.S. Под прошлой частью меня спросили про faust и confluent kafka (какие есть у confluent фичи). Кажется, что confluent во многом функциональнее, но дело в том, что faust не имеет полноценной поддержки клиента для confluent - это следует из описания ограничений клиентов в доке.

Подробнее..

Управление признаками сущностей в Apache Kafka

20.12.2020 20:21:03 | Автор: admin

Введение


Во время работы над задачами машинного обучения с онлайн-данными есть необходимость собирать различные сущности в одну для дальнейшего анализа и оценки. Процесс сбора должен быть удобным и быстрым. А также часто должен предусматривать бесшовный переход от процесса разработки к промышленному использованию без дополнительных усилий и рутинной работы. Для решения этой проблемы можно воспользоваться подходом с использованием Feature Store. Этот подход со многими деталями описан вот здесь: Meet Michelangelo: Ubers Machine Learning Platform. В этой статье описывается интерпретация указанного решения для управления признаками в виде прототипа.


Feature Store для онлайн потоков


Feature Store можно рассматривать как сервис, который должен выполнять свои функции строго по его спецификации. Прежде чем определить эту спецификацию, следует разобрать простой пример.


Пример


Пусть даны следующие сущности.
Фильм, который обладает идентификатором и заголовком.
Рейтинг фильма, у которого так же есть собственный идентификатор, идентификатор фильма, а также значение рейтинга. Рейтинг меняется во времени.
Источник рейтинга, который так же имеет собственный рейтинг. И меняется во времени.
И нужно эти сущности объединить в одну.
Вот что получается.


image
Диаграмма сущностей.


Как можно понять, объединение происходит по ключам сущностей. Т.е. к фильму ищутся все рейтинги фильма, а к рейтингу фильма все рейтинги источника.


Обобщение примера


Теперь можно обобщить пример и масштабировать его на все сущности, которые могут быть связаны по ключам.


Есть kafka-потоки, которые определяют собой сущности: A, B NN.
Нужно объединять эти потоки для создания новых потоков: AB, BCD NM.
Этим процессом должен управлять сервис: Feature Stream Engine.
Feature Stream Engine умеет объединять сущности в kafka-потоках, используя хранилище метаданных Feature Stream Store и Feature Stream Center, как единую точку входа по управлению объединением.


image
Обобщенная диаграмма сущностей и Feature Stream Engine.


Feature Stream Store


Хранилище метаданных представляет из себя сервис по хранению данных о потоках, сущностях и их связях.


Основная единица хранилища это признак (feature).
Признак имеет свой идентификатор, ссылку на источник, наименование и тип.


Источник группирует признаки и привязывается к определенному потоку.


Feature Stream Center


Центр управления позволяет создавать новые потоки, а также взаимодействовать со службами доставки и развертывания для поддержки работы новых потоков в различных средах исполнения, в том числе и промышленной среде.


Feature Stream Engine


Feature Stream Engine обеспечивает работу с потоками, а так же взаимодействие с внешними сервисами и конечными пользователями.


image
Компоненты Feature Stream Engine.


Архитектура Feature Stream Engine


Feature Stream Engine представляет из себя конструктор, который позволяет собирать признаки из различных потоков и доставлять этот функционал на различные среды.


Feature Stream Engine должен реализовывать следующие функции.


Описывать источники данных.
Привязывать источники данных к потокам kafka.
Описывать признаки и привязывать их к источникам данных.
Создавать новые источники данных на основе имеющихся путем объединения по ключам (особым признакам).
Развертывать функционал работы потоков в различных средах, включая промышленную среду.


image
Архитектура Feature Stream Engine.


Прототип


Для реализации идеи необходимо упросить функциональность.


Будут объединяться несколько потоков по ключам и записываться в один поток.


Предположим, что метаданные описываются файлами со свойствами ("configration.properties").


Эти данные реализуют следующею модель.


Источники данных в виде имен topic-ов для kafka. Перечисляются через ,.
Ключи в этих источниках данных. Перечисляются через ,.
Имя результирующего topic-а.


Конвертация входных параметров в структуру, которая описывает объединение потоков.


public static FeaturesDescriptor createFromProperties(Properties properties) {    String sources = properties.getProperty(FEATURES_DESCRIPTOR_FEATURE_DESCRIPTORS_SOURCES);    String keys = properties.getProperty(FEATURES_DESCRIPTOR_FEATURE_DESCRIPTORS_KEYS);    String sinkSource = properties.getProperty(FEATURES_DESCRIPTOR_SINK_SOURCE);    String[] sourcesArray = sources.split(",");    String[] keysArray = keys.split(",");    List<FeatureDescriptor> featureDescriptors = new ArrayList<>();    for (int i = 0; i < sourcesArray.length; i++) {        FeatureDescriptor featureDescriptor =                new FeatureDescriptor(sourcesArray[i], keysArray[i]);        featureDescriptors.add(featureDescriptor);    }    return new FeaturesDescriptor(featureDescriptors, sinkSource);}

public static class FeatureDescriptor {    public final String source;    public final String key;    public FeatureDescriptor(String source, String key) {        this.source = source;        this.key = key;    }}

public static class FeaturesDescriptor {    public final List<FeatureDescriptor> featureDescriptors;    public final String sinkSource;    public FeaturesDescriptor(List<FeatureDescriptor> featureDescriptors, String sinkSource) {        this.featureDescriptors = featureDescriptors;        this.sinkSource = sinkSource;    }}

Основной метод по объединению.


void buildStreams(StreamsBuilder builder)

Создаются topic-и, в которых ключам являются, те ключи, по которым нужно объединять.


Serde<String> stringSerde = Serdes.String();List<KStream<String, String>> streams = new ArrayList<>();for (FeatureDescriptor featureDescriptor : featuresDescriptor.featureDescriptors) {    KStream<String, String> stream =            builder.stream(featureDescriptor.source, Consumed.with(stringSerde, stringSerde))                    .map(new KeyValueMapperSimple(featureDescriptor.key));    streams.add(stream);}

Само объединение.


KStream<String, String> pref = streams.get(0);for (int i = 1; i < streams.size(); i++) {    KStream<String, String> cur = streams.get(i);    pref = pref.leftJoin(cur,            new ValueJoinerSimple(),            JoinWindows.of(Duration.ofSeconds(1)),            StreamJoined.with(                    Serdes.String(),                    Serdes.String(),                    Serdes.String())    );}

Отправка в конечный topic.


pref.to(featuresDescriptor.sinkSource);

Все вместе.


public void buildStreams(StreamsBuilder builder) {    Serde<String> stringSerde = Serdes.String();    List<KStream<String, String>> streams = new ArrayList<>();    for (FeatureDescriptor featureDescriptor : featuresDescriptor.featureDescriptors) {        KStream<String, String> stream =                builder.stream(featureDescriptor.source, Consumed.with(stringSerde, stringSerde))                        .map(new KeyValueMapperSimple(featureDescriptor.key));        streams.add(stream);    }    if (streams.size() > 0) {        if (streams.size() == 1) {            KStream<String, String> stream = streams.get(0);            stream.to(featuresDescriptor.sinkSource);        } else {            KStream<String, String> pref = streams.get(0);            for (int i = 1; i < streams.size(); i++) {                KStream<String, String> cur = streams.get(i);                pref = pref.leftJoin(cur,                        new ValueJoinerSimple(),                        JoinWindows.of(Duration.ofSeconds(1)),                        StreamJoined.with(                                Serdes.String(),                                Serdes.String(),                                Serdes.String())                );            }            pref.to(featuresDescriptor.sinkSource);        }    }}

Запуск.


void run(Properties config)

Конструируется объект объединения потоков (основной объект).


FeaturesStream featuresStream = new FeaturesStream(config);

Создается обвязка для kafka.


StreamsBuilder builder = new StreamsBuilder();featuresStream.buildStreams(builder);Topology topology = builder.build();KafkaStreams streams = new KafkaStreams(topology, featuresStream.buildStreamsProperties());

Осуществляется запуск.


streams.start();

Все вместе.


public static void run(Properties config) {    StreamsBuilder builder = new StreamsBuilder();    FeaturesStream featuresStream = new FeaturesStream(config);    featuresStream.buildStreams(builder);    Topology topology = builder.build();    KafkaStreams streams = new KafkaStreams(topology, featuresStream.buildStreamsProperties());    CountDownLatch latch = new CountDownLatch(1);    Runtime.getRuntime().addShutdownHook(new Thread(() -> {        streams.close();        latch.countDown();    }));    try {        streams.start();        latch.await();    } catch (Throwable e) {        System.exit(1);    }    System.exit(0);}

Пример запуска приложения.


java -jar features-stream-1.0.0.jar -c plain.properties

Язык: Java 1.8.
Библиотеки: kafka 2.6.0, jsoup 1.13.1.


Заключение


Изложенное решение имеет ряд ограничений и не реализует полный функционал. Но имеет и несколько преимуществ.
Во-первых: позволяет быстро конструировать объединение topic-в.
Во-вторых: позволяет быстро запускать объединение в различных средах.


Стоит отметить, что решение налагает ограничение на структуру входных данных. А именно, topic-и должны иметь табличную структуру. Для преодоления этого ограничения можно ввести дополнительный слой, который будет позволять сводить различные структуры к табличной.


Для промышленной реализации полной функциональности стоит обратить внимание на очень мощный и, самое главное, гибкий функционал: KSQL.


Ссылки и ресурсы


Исходный код;
Meet Michelangelo: Ubers Machine Learning Platform.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru