Русский
Русский
English
Статистика
Реклама

Алготрейдинг

Торговый робот CryptoShloma

22.01.2021 12:16:53 | Автор: admin

Мы команда творческих энтузиастов, я автор статьи и разработчик программного обеспечения, мой коллега финансовый теоретик.

В качестве предыстории: на этапе бума криптодвижа, мы начали заниматься майнингом криптовалют (и по сей день продолжаем). Тогда мы ничего не знали о торговле на криптобирже и просто намайненые монетки обменивали по рыночному курсу и выводили по чуть-чуть, как в простом обменнике, плюс к этому добавлялся страх скачков курса монет, и поэтому не совались в торговлю. Накопив некоторую сумму, мы размышляли, как можно увеличить накопления пассивным способом, желательно при этом особо ничего не делая. И решили изучить тему торгов на криптобирже. Поскольку я по своей специальности программист-автоматизатор, то нам в голову пришла идея, а почему бы не написать торгового робота, который по определённому алгоритму торговал бы в автоматическом режиме и по возможности был сделан так, чтобы и деньги не терял. В первую очередь была интересна идея, которая позволяла бы математически всегда работать в плюс, без исключения. Мы хотели найти "Святой Грааль, хотя понимали, что, это конечно утопия. В данной идее мы сразу поняли, чтобы это реализовать, придётся пожертвовать временем, и в торгах и временем на разработку.

Перекопав много ресурсов, обнаружили, что в существует полно роботов, которые торгуют на обычных фондовых биржах и почти никаких "алготрейдинг" находок не нашли в области крипто торговли. Основная причина этого была в том, что традиционные алгоритмы торговли не подходят для криптовалюты, виной этому большая волатильность, непредсказуемость любой из монет, пампы, дампы. Всё это неадекватно сказывается на традиционных осцилляторах, сигналах и т.д.

Мы поняли, что годы разработок сверхсложных алгоритмов на текущей стадии крипто индустрии точно не принесут успех, потому что эта сфера очень быстро меняется. Мы захотели создать простой и математически эффективный алгоритм, по которому торговый робот будет работать 24/7, 365 дней в году, к которому надо подходить раз в пол года просто посмотреть на баланс и радоваться накоплениям:) Разработка была начата, мы придумали незатейливый алгоритм, который по нашим соображениям должен был работать. Спустя некоторое время алгоритм был реализован в торговом роботе. Для экспериментов мы выбрали самую большую и популярную биржу Binance.

Алгоритм
  • Шаг 0 : Заранее задаётся таблица усреднения позиции на определённое кол-во шагов и максимальное кол-во докупов на отскоках цены

    Падение (%)

    Отскок (%)

    3

    2

    6

    3

    11

    7

    17

    12

    23

    17

    30

    21

    33

    27

    Первая колонка обозначает % просадки актива, вторая колонка говорит о том, на сколько % актив должен вырасти от % падения, после которого происходит докуп актива и усреднение позиции. Значения произвольные.

  • Шаг 1: Происходит покупка (лонг) выбранного актива на объём Х.

  • Шаг 2: Происходит выставление ордера на продажу по заданному % прибыли.

  • Шаг 3: Если цена пошла выше % прибыли и сработал ордер, то возврат к шагу 1 и постоянно перезаходим.

  • Шаг 4: Если цена коснулась значения падения из таблицы (колонка 1), то фиксируется шаг со значением и далее происходит ожидание значения отскока (колонка 2) от % падения (колонка 1).

  • Шаг 5: Если цена выросла на % отскока для данного шага, то происходит покупка по рынку (получаем в итоге на балансе X*2), и перевыставление ордера на продажу по усреднённой цене.

  • Шаг 6: Если цена упала ниже, и коснулась % падения следующего шага ниже по таблице, то возвращаемся к шагу 4, бот начинает работать со следующей парой цифр из таблицы и так далее, однако не может перейти к работе с предшествующими значениями.

  • Шаг 7: Если цена прошла ниже % последнего шага падения из таблицы, то увы, здесь можно зафиксировать убыток или оставить текущий ордер на продажу.

  • Шаг 8: Если происходило многоразовое усредение позиции на одном цикле (тоесть падение, отскок, падение ниже на след шаг, потом снова отскок), то каждый докуп позиции происходит на весь пердыдущий объём ранее докупленных позиций * 2.

Торговый робот в конечном итоге родился с кучей багов, само собой. К названию мы подошли с чувством юмора. В процессе отладки пришлось подключать живой аккаунт с монетами, что, конечно, пару раз приводило к небольшим потерям из-за какого-нибудь мелкого глюка, но это издержки разработки, что поделать. В последствии была внедрена микро-эмуляция торгов, но только в отладочном режиме. Для production версии оно не планировалось.

Результат получился, торговый робот заработал, но был нюанс, что всё-таки "Святого Грааля" не существует. И связан он с тем, что торгуя одноразовой монетой, которая "пампилась", а потом стремительно уходила на дно рынка и никогда снова не восстанавливала свою цену, откупаться обратно при необходимости придётся только в минус. Это физика рынка, увы.

Поэтому в душе остался осадок от того, что проект не доведён до конца и не придумано, как решить эту проблему. Энтузиазм поутих в этом направлении, плюс добавились жизненные обстоятельства и проект был заброшен.

Спустя пол года мы снова вспомнили о проекте и решили, почему бы не поделиться этим с обществом, вдруг найдётся группа Новых Золотоискателей, которые заинтересуются нашей идеей и захотят поучаствовать, попробовать торгового робота и предложат новые решения или алгоритмы.

Ссылка на гитхаб

Подробнее..

Инструменты для алготрейдинга на Python. Расчет дневного изменения цены

27.05.2021 16:21:56 | Автор: admin

Привет, Хабр! Сегодня я хочу начать свой цикл статей по алготрейдингу.

Первым делом расскажу о самом простом индикаторе ожидаемой доходности ценной бумаги - дневное изменение цены.

Дневное изменение цены - это отношение цены закрытия текущего дня к цене закрытия предыдущего дня. Говоря простым языком, это процент, на который выросла или упала ценная бумага за 1 день.

Сам по себе этот индикатор не сильно полезен - он просто показывает дневное изменение цены. Но, вот, если мы накопим статистику за какой-либо период (например, за месяц), мы можем рассчитать медиану и, тем самым, попытаться предсказать ожидаемую прибыль за 1 день.

Перейдем к практике:

Для проведения расчетов нам понадобится:

  1. Данные об изменениях цен (вполне сойдет API Мосбиржи)

  2. Знание Python и его библиотек Pandas и Matplotlib

  3. Трейдерская чуйка (уверен, если вы читаете эту статью, то она у вас есть)

Весь код я приведу в ноутбуке на google colab

Далее я буду рассказывать о дневном изменении стоимости ценных бумаг за период с 1 января 2021г. по 25 мая 2021г.

Для примера, возьмем акции компании Лукойл (тикер LKOH). Для них распределение дневного изменения цены выглядит следующим образом:

Для акций Лукойла за период с 1 января 2021 года по 25 мая 2021 года мы имеем медиану, равную 0,26%. Это означает, что если завтра не предвидится никаких хороших или плохих новостей, то мы можем ожидать рост стоимости акций на 0,26%

Для понимания разброса значений мы должны рассчитать нижнюю и верхнюю квантили.

ticker

median

q005

q05

q25

q75

q95

q995

LKOH

0.0026

-0.036

-0.026

-0.01

0.011

0.032

0.035

Рассчитав квантили q25 и q75 мы видим, что 50% всех значений дневного изменения цены лежат в диапазоне [-1%; 1,1%]. Т.е., согласно статистике, в 5 из 10 торговых сессиях, цена акции Лукойла упадет не более чем на -1% или вырастет не более чем на 1,1%.

Рассчитав квантили q005 и q995 мы видим, что 99% всех значений дневного изменения цены лежат в диапазоне [-3,6%; 3,5%]. Т.е., согласно статистике, почти во всех торговых сессиях, цена акции Лукойла упадет не более чем на -3,6% или вырастет не более чем на 3,5%.

Рассчитав квантили q05 и q95 мы видим, что 90% всех значений дневного изменения цены лежат в диапазоне [-2,6%; 3,2%]. Т.е., согласно статистике, в 9 из 10 торговых сессиях, цена акции Лукойла упадет не более чем на -2,6% или вырастет не более чем на 3,2%.

Визуализация этого распределения будет выглядеть следующим образом:

Следующим этапом является разделение данных на месячные периоды. В таком случае, основные квантили будут иметь следующие значения:

month

ticker

median

q005

q05

q25

q75

q95

q995

2021-01-01

LKOH

0.0032

-0.022

-0.029

-0.007

0.011

0.033

0.032

2021-02-01

LKOH

0.0041

-0.027

-0.028

-0.010

0.014

0.027

0.027

2021-03-01

LKOH

0.0029

-0.028

-0.027

-0.006

0.014

0.034

0.035

2021-04-01

LKOH

-0.0005

-0.019

-0.019

-0.011

0.005

0.015

0.015

2021-05-01

LKOH

0.0023

-0.022

-0.024

-0.013

0.016

0.027

0.022

Видим, что в апреле 2021г. медиана стала отрицательной, что намекает нам об открытии коротких позиций в этом месяце. Визуализировав эти цифры мы получим следующую картинку:

В следующей статье расскажу про индикатор "Полосы Боллинджера".

Подробнее..

Инструменты для алготрейдинга на Python. SMA Полосы Боллинджера на акциях Северстали код готовой стратегии

31.05.2021 18:12:13 | Автор: admin

Внимание! Если данная статья наберет 1000 положительных голосов, то я организую хакатон по алготрейдингу с ценными призами.

Предыдущая статья о "Расчете дневного изменения цены"

Когда я писал прошлую статью (она была первой из цикла) я не предполагал, что читатели разделятся на 2 категории:
1. Те, кто верят, что в алготрейдинг
2. Те, кто верят, что я шарлатан

Для обоих групп я напоминаю, что цель алготрейдинга - это увеличить вероятность получить прибыль от сделки
Или же, как говорят в "теории игр" - сделать математическое ожидание от игры положительным

Поэтому, предлагаю аудитории договориться о следующем:
1. Если ваш комментарий несет научный смысл, то пишите его под постом в Хабре.
2. Если ваш комментарий несет дискуссионный посыл, то прошу задавать его в специально созданном канале в телеге:

Собственно, здесь я перехожу к сути данной статьи.

SMA (Simple Moving Average, Скользящее среднее) - индикатор, основанный на подсчете среднего значения цены закрытия ценной бумаги.

Для тех, кто не знает что такое SMA, приведу алгоритм его подсчета:
1. Взять цену закрытия "close" ценной бумаги за период от t1 до t2 и отсортировать ее от t1 к t2.
2. Взять таймфрейм из первых N значений цены close.
3. Посчитать среднее арифметическое значение таймфрейма (simple average).
4. Сдвинуть таймфрейм вперед на одно значение (происходит moving) и выполнить пункт 3
5. Пункт 4 проводить до тех пор, пока таймфрейм не дойдет до точки t2

Отрисуем график SMA (N=20) для цены close акций Северсталь (тикер CHMF) за 27 мая 2021г.:

По графику видно, что SMA является сглаженной версией цены Close с временным лагом в 20 периодов.

Полосы Боллинджера (Bollinger Bands)

В 1980х годах Джон Боллинджер предложил рассчитывать не только SMA, но и STD (standart deviation, среднеквадратическое отклонение). Таким образом, мы будем видеть не только график изменения средней цены, но и ее волатильность.

Обычно, значения std устанавливают равным 2. В таком случае, с вероятностью в 95% следующее значение цены close будет лежать внутри полосы Боллинджера и только в 5% случаях оно будет выходить из этой полосы.

В тех местах, где цена close близка к нижней грани полосы Боллинджера, стоимость акций считается низкой. И, наоборот, если цена close близка к верхней грани полосы Боллинджера, стоимость акций считается высокой.

И тут у трейдера срабатывает чуйка: покупаем на низах, продаем на хаях (никак не наоборот).

Весь код с использованием полос Боллинджера привел на Google Colab. Данная стратегия принесла +1,7% за 1 день (но это не точно).

В следующей статье поговорим об RSI

Подробнее..

Оркестратор бесконечных задач

11.01.2021 20:23:30 | Автор: admin

В данной статье мы поговорим отом,как реализовать оркестратор бесконечных задач с использованием очередей. Как конечная цель- нам необходимо реализовать систему, способную управлять задачами с длительным сроком жизни, систему распределённую, где группа задачхостятсяна определенном сервере и в случае отказа этого сервера, задачи автоматически перераспределяютсяна свободные.

В большинстве случаев всяenterpriseразработка сводится к выполнению одних и тех же требований: создается заявка, в зависимости от типа заявки у нее есть какой-то жизненный цикл, по завершению жизни заявки мы получаем (или не получаем) желаемое. Под заявкой мы можем подразумевать все что угодно, начиная с покупки в интернет-магазине товара, денежного перевода или расчета траектории баллистической ракеты. У каждой заявки есть свой жизненный путь и что важно отметить -время жизни, и чем меньше это время, тем лучше. Иными словами, чем быстрее мой банковский перевод осуществится, тем лучше. Требования тоже схожи, побольшеRPCoperationspersecond, поменьшеLatency, система должна быть отказоустойчивой, масштабируемой и должна быть готовавчера. Есть миллион инструментов, сотни баз данных, различные подходы и паттерны. И все уже давно написано, нам остается лишь правильно использовать готовые технологии в наших проектах.

Темаоркестрациизадач не нова, но к моему удивлению, готовых решений по управлению бесконечными задачами (время жизни которых неограниченно велико), с возможностью перераспределения задач по активным серверам, попросту нет. Поэтому реализуем собственное решение . Но обо всем по порядку.


Давайте сначала поймем, что значит бесконечная задача и где в природе такое вообще может встречаться. Бесконечная задача это некий процесс (Job), который выполняет работу до тех пор, пока ему не скажут прекратить это. Аналогию можно провести с бесконечными циклами. В природе же подобное встречается, когда нам нужны наблюдатели, которые следят и реагируют на определённые события. Например: нам необходимо следить за изменениями цен на бирже, повышением или понижением цены актива. Представим нам нужно следить за всеми валютами, всеми активами, на разных биржах, тогда количество наблюдателей может превышать десятки тысяч единиц. Что же из себя может представлять наблюдатель- это может быть отдельноеWebSocketсоединение, которое должно быть постоянноconnected. Этот наблюдатель, может получать данные,денормализовывать, производить расчеты, сохранять и много чего еще. Для удобства, наблюдателем я буду называть неObserverиз известного паттерна, а модуль, который постоянно в работе и бесконечно долго выполняет полезную работу.

Очевидно, нам нужна распределенная система, так как на ноутбуке ее явно не запустишь. Сформулируемтребование длянаших наблюдателей:

  • Наблюдатели должны быть управляемы, то есть мы можем как добавить нового, так и прекратить работу существующего.

  • Наблюдатели должны быть изолированными, работа одного, никак не должна сказываться на работе других.

  • Система должна быть отказоустойчивой, и горизонтально масштабируемой, мы должны иметь возможность распределить всех наблюдателей на разных серверах. Причем при отказе одного сервера, наблюдатели, которые на нем находились, должны перераспределиться на другие, работающие сервера.

  • Сервис/приложение, должно иметь ограничение на количество наблюдателей, которые могут быть в работе и задаваться данное значение должно через файл конфигурации или рассчитываться исходя из мощности сервера (количество ядер, RAM и т.д.), на котором сервис работает.

Упрощенно: имеем N Сервисов, каждый сервис имеет несколько наблюдателей. Пока не будем задаваться вопросом о том, как это все работает и каким образом компоненты взаимодействуют, разберем это чуть позднее.

Статья описана в 3 актах. Все листинги с кодом на С#, но в процессе написания старался уделять меньше внимания примерам с кодом и больше самой идеи. Поэтому листинги должны быть понятны даже тем людям, которые вообще не писали на C# и не знакомы с .Net.

  1. Все естьTask. Тут мы поговорим о теории и некоторых базовых концепциях. Разберем что естьTaskи что общего между таской и наблюдателем.

  2. Schedulers. Возьмем готовое решение, разберем его и проанализируем. Понимание концепции работы планировщика с возможностью запуска задач на удаленных серверах, может нам лучше понять, что будет происходить в третей части.

  3. Очередь, которая думает, что она планировщик. Финальная часть статьи, где мы реализуем системуоркестрациизадач через очереди сообщений. Я использовалRabbitMq, и какFramework-MassTransit, поэтому все примеры будут тесно связаны с данными инструментами. Но принцип будет оставаться тот же.

Всё естьTask

Наш наблюдатель это ни что иное какTask. И что делать если мы хотим запустить и контролировать таску, не дожидаясь получения результата (ведь если наша таска будет работать бесконечно, то она и не даст нам никогда результат).

Рассмотрим на простом примере. Возьмём метод, которыйпишет HelloWord в консольотправляет письмо:

public async Task SendEmailAsync(Email email, CancellationToken token) {    // отправляем письмо }

Чтобы отправить письмо, не дожидаясь получения результата, нам достаточно просто забыть поставитьawaitперед вызовомSendEmailAsync.

foreach (var email in emails {    if(token.IsCancellationRequested)         break;     _emailSender.SendEmailAsync(email, token); //нет await } 

Минусов у данного подхода много:

  • Мы никак не гарантируем выполнение отправки письма.

  • FireAndForgetи как следствие о возникновенииExceptionмы не узнаем.

  • Так же не узнаем и о выполнении.

  • Многие считают, что это грех большой, вообщеантипаттерни я с ними согласен.

Более детально о том почему желательно рано или поздноawait-ить таску, можно почитатьпро async/await антипаттерны.

Наша задача во многом похожа на отправкуemail, только внутри у нас будет подобие бесконечного цикла и метод закончит работу естественным путем только тогда, когда будет вызванCancellationToken. Мы можем, конечно, написать свои костыли, которые позволят нам отслеживать состояние задачи и уведомлять, когда она завершилась. У нее будутRetryPolicyи много чего ещё, но зачем?! Когда есть уже готовые планировщики задач, которые заточены под данные требования.

Schedulers

На .NET есть как минимум два планировщика задач с промежуточным хранением задач в базе данных, поддержкой отложенного выполнения и возможностью распределенного выполнения на разных серверах.

Тутесть неплохое сравнение планировщиков. Больше всего нас интересует возможность иметь неограниченное количествосерверов, (тут может быть недопонимание, сервер это не физическая машина, где выполняется наше приложение, этоinstanceпланировщика)где будут исполнятся наши задачи/Tasks. Лично я отдал предпочтениеHangfire, по большей части из-за хорошо описанной документации и встроенного UI, который позволяет не только отображать метрики по задачам, но и вручную запускать их. Всё это весьма приятные бонусы.

А теперь посмотрим на то, как отправить наше письмо с использованиемHangfire. В этом нам поможет статический методBackgroundJob.Enqueue(Expression<Action>methodCall).

var jobIds = new List<string>(); foreach (var email in emails) {    if(token.IsCancellationRequested)       break;    jobIds.Add(BackgroundJob.Enqueue(       async () => await _emailSender.SendEmailAsync(email, token))); } 

Мы не дожидаемся отправки всех писем, а кладем их в очередь и можем не переживать за выполнение, обо всем позаботиться планировщик. Есть настройкаRetryPolicy, через которую мы можем задать количество повторений вызова метода в случае ошибок. В итоге мы знаем сколько задач было выполнено успешно, сколько с ошибками, сколько времени потребовалось на выполнение каждой.

Но нас же интересует не просто запустить задачу и гарантировать ее исполнение, а иметь возможность запустить ее на другом сервере. Забудем уже про отправку писем и представим, что наш наблюдатель запускается через вызов метода:

_observer.DoWork(observerArg,newCancellationToken())

Мы передаем какие-то аргументы для работы и главное, передаем токен отмены.Для этого нам потребуется указать еще имя очереди в созданномBackgroundJobClient.

var client = new BackgroundJobClient(JobStorage.Current);//задаем имя очереди, где будет хоститься задача.var state = new EnqueuedState(unique-queue-name); client.Create(() =>_observer.DoWork(observerArg,newCancellationToken()), state);

И конечно же мы должны иметь сервис, который займется обработкой данной очереди. В настройках которого будет указано имя той самой очереди-unique-queue-name.

// Настраиваем instance hangfire сервера. _server = new BackgroundJobServer(new BackgroundJobServerOptions() {       WorkerCount = 10,     Queues = new[] { unique-queue-name },     ServerName = _serverOptions.ServerName }); 

WorkerCount- отвечает за то, сколько сервер может одновременно обрабатывать задач. Запомним ее, так как в последствии мы о ней будем много говорить.

Теперь у нас есть возможность запускать любое количество задач на разных серверах, указывая очередь, которую слушает сервис. Пока нам не хватает только одного: возможности мониторинга. Мы должны понимать какой сервер свободен, а какой нет и запускать задачи на свободном. Для этого вHangfireесть статический класс, которой предоставляет все метрики, начиная с того сколько серверов сейчас активно и заканчивая информированием о том сколько раз задача была выпалена с ошибкой.

_monitoringApi = JobStorage.Current.GetMonitoringApi(); 

Наша система с планировщиком теперь будет выглядеть следующим образом:

Observer-service - сервис, который может выполнять одновременно несколько задач, количество задается через конфиг или рассчитывается с учетом количества ядер и мощности сервера (ВHangFilreэтоWorkerCount).

Observer-manager- сервис, который отвечает за... наблюдателей.Валидируетзапросы, решает на каком сервисе будет запущен наблюдатель, а также имеет возможность удалить его. Он знает сколько сейчас доступно сервисов и на сколько каждый из них загружен.

Schedulercommondbпсевдо-очередьи хранилище всей информации по задачам,Hangfireподдерживает какMsSql, такPostgreSqlи дажеRedis.

Отправка задачи в очередь это сохранение в базу данных с предварительнойсериализациейее вместе со значениями входных параметров. Поэтому менеджер и сервис должны иметь доступ к одной сборке с кодом нашего наблюдателя.

Если зависимости, при проектировании системы в общей сборке, имплементированы в сервисе, то их не обязательно копировать со всем функционалом в менеджер, достаточно имплементировать как заглуши, без логики, ведь исполняться они будут на стороне сервиса.

С помощью планировщика мы можем запускать задачи на удаленных сервисах, мониторить состояние задач и останавливать их, когда нам это потребуется. Но поговорим о проблемах, с которым столкнулся и подведем итог с учетом наших изначальных требований.Так же перед использованием планировщика обязательно прочитайте статью оподдержке очередей в Hangfire. Так вот:

1)Общая сборка для менеджера и сервиса. Не могу сказать, что это прям минус, главное помнить это при проектировании системы.

2)Высокая нагрузка на сервер. Каждый сервис опрашивает базу данных на предмет изменений. Можно, конечно, увеличить интервал между запросами, но это ухудшит отклик системы.

3)Добавление задачи в очередь возвращает числовой идентификатор и идентифицировать задачу можно только по нему. Нельзя задать свойcustom-id, например поиск по названию. Поэтому получив идентификатор задачи нужно сохранить связку идентификатор-название в собственное хранилище.

4)В случае ошибки во время исполнения задачи, она автоматически будет перемещена в default очередь. Крайне неприятный момент, о котором узнал уже на этапе тестирования, так как в документации о таком не рассказали. Решается черезjob-filtersили черезатрибуты. Второй вариант делает код более связанным и не подходит, так как значение атрибута не может задаваться динамически.

5)В случае если сервер откажет, задачи, которые на нем исполнялись, не будут перераспределены между работающими. Можно, конечно, реализовать данную логику в менеджере и сделать его ответственным за это, но хотелось бы чтобыframeworkумел это из коробки.

6)Отсутствие транзакционности, ВедьHangfireуниверсален как дляMsSql, так и дляRedis, а в нем транзакции не предусмотрены.

На протяжении всей разработки, меня не покидало ощущение что систему можно реализовать гораздо проще. Некоторые фичи, такие как перераспределение задач, не предусмотрены в планировщике, приходилось обходить ограничение, путем добавления собственных костылей и посему было принято решение реализовать собственное решение.

Очередь, которая думает, что она планировщик

Намучившись, пытаясь использовать верблюда как лошадь, перейдем к описанию того, как это можно сделать через очередь. Но сперва зададимся вопросом. Что мы знаем о планировщиках? Мы знаем, что планировщики это системы, которые выполняют задачи с учетом заданных правил, расписания. А что мы знаем об очередях, шинах данных? Мы используем очереди как транспорт данных, как средство доставки сообщений. Конечно же это все очень абстрактно, и тут можно говорить часами, но пока ограничимся этим. Давай те изменим шаблонное мышление и на время представим, что очередь тоже может быть планировщиком.

Как же сделать из очереди сообщений планировщик задач? Хотя тут корректней был бы термин оркестратор.Весь вышеописанный функционал решается использованием только одной настройки-PrefetchCountи особенностью обработки сообщений.

  • Когда сообщение попадает в очередь оно имеет состояниеReady.

  • КогдаConumerобрабатывает сообщение, оно переходит в состояниеUnacked. И другойConsumerможет взять следующие сообщение из очереди.

  • Если в момент обработки сообщения происходит ошибка, оно помещается в _Errorочередь.

  • Если сообщение после обработки не былоacknowledged, то оно возвращается обратно в очередь и его может прочитать любой другойConsumer.

И теперь главное -PrefetchCountэто количество одновременно обрабатываемых сообщений в очереди, а если сообщение никогда не будет дочитано (бесконечно обрабатывается), то его можно воспринимать какWorkerCount, прям как уHangfire.

Разберем на пальцах:

На данной схеме у нас есть триObserver-services, каждый из них слушает очередь в ожидании поступления сообщения.PrefetchCountу каждого стоит 1. Это значит, что за раз каждый сервис будет обрабатывать одно сообщение. А так как мы знаем, что сообщение это запуск бесконечной задачи, то оно никогда не прочитается и всегда будет в состоянииUnacked.

Дадим команду на создание двух "наблюдателей, таким образом в очереди у нас окажется два сообщения:

Так какObserver-servicesслушают одну и ту же очередь, то сообщения между ними будут распределятся равномерно, черезRound-robin.

  • msg1поступает в очередь. Его начинает обрабатывать один из свободныхконсьюмеров, допустим Observer1. Сообщение переходит в состояниеUnackedи теперь новые сообщения, которые поступят в очередь будут доступны для другихконсьюмеров.

  • msg2поступает в очередь. Observer1у нас уже занят, и поэтому сообщение на обработку достанется всем свободнымконсьюмерам, в данном случае оно достаетсяObserver2.

Давайте теперь представим, что Observer-service1у нас сломался, например он находится на отдельном сервере и сервер вышел из строя (самый популярный контраргумент - а что... если свет вырубили?).

Ошибка произошла не в самомконсьюмере, и поэтому сообщение, по которому не былоacknowledgementпопадает обратно в очередь и переходит из статусаUnackedвReady. Теперь его может считать любой свободныйконсьюмер. Второй у нас занят, первый умер, свободен только третий и поэтому ему задача и достаётся.

Резонным будет замечание - что будет если ошибка произойдет в самомконсьюмере, в процессе обработки сообщения. Оно в таком случае прейдёт в очередь с пометкой _Error, чтобы этого избежать можем настроитьRetryPolicy. И тогда в случае ошибки сообщение не попадет обратно в очередь, аконсьюмерпопытается заново обработать это сообщение.
Правила дляRetryPolicyмогут быть гибкими:

  • Попробовать 1000 раза и положить в очередь ошибок.

  • Попробовать 5 раз с интервалом 1,4,10...минут и потом положить в очередь ошибок.

  • Вообще попробоватьint.MaxValueраз.

Что же мы имеем в итоге? Мы можем иметь абсолютно любое количество наблюдателей, каждый из которых смотрит на одну очередь и каждый обрабатывает свою задачу/сообщение. Мы можем увеличитьPrefetchCount, допустим до 10, и тогда у сервиса будет 10 свободных консьюмеров, которые будут ждать команды на работу. Сервисы можно распределять по разным серверамиесли мы допускаем что кой-то сервер может выйти из строя, нужно просто иметь свободный сервис, который в случае поломки возьмет задачибольного.Например, если у нас есть 10 серверов, мощностей каждого из которых достаточно для обработки 5 наблюдателей, и шанс того, что один процент из них может выйти из строя, нужнозадеплоитьодин 11-ый сервер с той же мощностью, который будет на подстраховку.

А как жеконсистентность? Да и как вообще всем этим управлять? Да, мы можем добавить сообщения в очередь, но как убрать их оттуда... не очищать же очередь вручную?! Тем более, в идеале, наши "наблюдатели" должны закончить свою жизнь естественным образом, то есть через вызовCancellationToken.

И тут нам снова потребуетсяManager. Менеджеру неплохо бы знать об активных сервисах в системе. Это позволит понимать, перед запуском задачи, сколько свободных сервисов и может ли один из них взять в работу новую задачу. Так же это даст возможность отображать сколько их в системе, на сколько каждый из них загружен и какие задачи обрабатывает. Поэтому, когда сервис только поднимается он отправляет сообщение о своем рождении, которое содержит:

  • Id(Идентификатор) -Guidгенерируемый при рождении.

  • Name(Имя), которое мы сами дали ему, когда сервис деплоили, оно уникальное для каждого сервиса.

  • CreatedAt/ModifyAt(Дата создания/Дата изменения).

  • WorkersCount, это будетPrefetchCount- его мощность, сколько он может обрабатывать задач одновременно.

Managerпринимает эти сообщения с делает записи в базу данных о новых активных сервисах.

Id

Name

WorkerCount

CreatedAt

ModifyAt

IsDeleted

{Uniqueid}

Observerservice1

10

{somedate}

null

false

{Uniqueid}

Observerservice2

10

{somedate}

null

false

{Uniqueid}

Observerservice3

10

{somedate}

null

false

И нам не важно работает ли менеджер или мы вообще забыли егозадеплоить. В тот момент, когда он заработает ему сразу придет информация о том, что в системе есть 3 сервиса с такими-то параметрами.

Перед отключением сервисы так же отправят сообщения о том, что они закончили работать и большенедоступны, менеджербудет знать, что теперь на N сервисов у него меньше в строю. Тем самым он сделает пометку в базе данных и проставит каждому удаленному значениеIsDeleted=true.

Есть вероятность с тем, что сервис может не успеть отправить свое последнее сообщение о прекращении работы (Kill9, все тот же свет вырубили). За работоспособность компонентов у нас должна отвечать инфраструктура, напримерDocker. Мы должны быть уверены, если сервис непредвиденно прекратил работу, контейнер пере поднимется и сервис заново начнет работу. В таком случае при рождении, он заново отправит сообщение, но уже с новым идентификатором, но старым именем. Менеджеру достаточно будет данной информации чтобы привести данные в консистентное состояние и понять, что со старым сервисом случилось что-то страшное.

А теперь попробуем создать нового наблюдателя через API. Отправляем команду на создание (Мы должны позаботиться о том, что менеджер в процессе инициализации прочитает все сообщения из Statequeue и будет содержать последние актуальные данные о состоянии сервисов). Менеджер проверяет есть ли наблюдатель с таким именем уже, если нет, он проверяет наличие свободных сервисов, а они пока все свободны, далее он дает команду на создание - кладет сообщение в очередь.

Мы уже знаем, как распределится данное сообщение, оно просто достанется одному из наблюдателей. И тогда, когда он его получит, задача будет запущена и наблюдатель отправит сообщение в очередь состояний о том, что он получил на обработку определенную задачу, указав метаданные задачи и сервиса в рамках которого она работает.

Менеджер при отправке сообщения делает пометку в базу данных, записывая информацию отом,что был создан новый наблюдатель и находится в статусеCreated.

Id

Name

CreatedAt

ModifyAt

ServiceId

Status

{Observerid}

My_new_observer

{created date}

null

null

Created

Менеджер, дождавшись ответа от сервиса, которому досталась задача, изменяет статус наProcessingи связывает задачу с сервисом.

Id

Name

CreatedAt

ModifyAt

ServiceId

Status

{Observerid}

My_new_observer

{created date}

{modifydate}

{Observerservice1id}

Processing

Мы можем получить список всех наблюдателей и узнать кто на каком сервисе работает и в каком статусе находится.

Перечень статусов:

  • Created

  • Processing

  • OnDeleting

  • Deleted

Разберем теперь как удалить "наблюдателя", тут можно пойти двумя путями:

1) направить конкретному сервису сообщение о том, что нужно найти у себя наблюдателя с указанным идентификатором и вызватьCancellationToken.

2) Направить сообщение всем доступным сервисам, черезFanOut.Сервис,у которого есть наблюдатель с нужным идентификатором будет удален, а все остальные сервисы просто проигнорируют это сообщение.

Лично я отдал предпочтение второму варианту, одна из причин это то, что нет необходимости хранить адрес сервиса...но тут как говорится ап ту ю.

КаждыйObserver-serviceимеет свою очередь, где он ожидает получить команду на остановку работы наблюдателя. Когда приходит такая команда, сервис проверяет наличие такого наблюдателя и в случае обнаружения вызываетCancellationToken. Тем самым завершая работу наблюдателя естественным путем.

Пошагово завершение работы Наблюдателя выглядит следующим образом. Пользователь отправляет команду на завершение работы, для этого ему нужно знать лишьidнаблюдателя. Менеджер проверят если ли такой наблюдатель в системе, и его статус.

  • ЕслиCreated, пользователю возвращается ответ что наблюдатель еще не активирован. Из-за гонки условий сообщений об удалении может прийти раньше, чем сервису придет сообщение о запуске наблюдателя.

  • ЕслиOnDeletingилиDeleted, то возвращается ответ - запрос на удаление уже был отправлен или наблюдатель удален, соответственно.

  • ЕслиProcessing, то менеджер переводит наблюдателя в статусOnDeletingи отправляет сообщения на удаление в очередь. Сообщениеброадкаститсявсем сервисам. Сервис, у которого был нужный наблюдатель, вызываетCancellationTokenи оправляет сообщение в statequeue. Менеджер же, получив данное сообщение актуализирует данные и делает пометку переводя изOnDeletingвDeleted.

Id

Name

CreatedAt

ModifyAt

ServiceId

Status

{Observerid}

My_new_observer

{created date}

{modifydate}

{Observerservice1id}

Deleted

Рассмотрим критичные сценарии:

1) Отказала шина данных.

Вся инфраструктура, будь то шина данных или база данных должна находится опосредованно от нашей системы и бытькластеризированной. От себя добавлю следующий тезис, который как бритва Оккама отсечет ряд критичных сценариев -MsSql,RabbitMq,Kafka, дажеKubernetesсюда можно добавить, все это надежные системы, и при соблюдении SLA будут работать без отказа. За спиной у них огромные компании или комьюнити, сотни разработчиков. А вот собственную систему нужно воспринимать как что-то ненадежное, где любой компонент в любой момент времени может выйти из строя.

2) Полныйblackout, везде нет света.

Тогда, когда свет все-таки включат, докер поднимет все контейнеры, а вместе с ними сервисы, каждый сервис сообщает о том, что он заново родился и начинает разбирать из очереди свободные сообщения, запускает наблюдателей, они отправляют метаданные в менеджер. Менеджер актуализирует информацию о наблюдателях, когда они были созданы и на каких сервисах они теперь расположены. (Сообщения записаны на диск, поэтому никуда не делись.)

3) Вылетел один сервер.

Сценарий мы уже описывали выше, "наблюдатели, которые выполнялись на нем просто перераспределятся по всем свободным сервисам. Когда менеджеру придут сообщения от новых "наблюдателей, которые были перераспределены, он запишет на каких сервисах они теперь работают.

4) Отказал менеджер. В процессе того пока он был неактивен, ломались сервера с "наблюдателями.

Сервисы будут класть сообщения в очередь о своем состоянии. Когда менеджер все-таки придет в себя, он примет сразу пачку сообщения, и в конце концов дойдет до консистентного состояния.

5) Попытка удалить конкретного наблюдателя, в том момент, пока он перераспределяется.

Есть маленькое окно, когда пользователь отправляет команду на удаление, наблюдатель может перераспределяться. Тем самым его нет в моменте на конкретном сервисе. И сообщение об удалении может быть проигнорировано сервисами. В таком случае, менеджер, не получив ответа, по истечению времени, отправляет сообщение на удаление, повторно.

Итог

Мы реализовали оркестратор задач, на базе механизма отправки сообщений. Где сообщение это задача, с двумя статусами, в работе -Unacked, и в ожидании работы -Ready. Очередь сама распределяет задачи между исполнителями, делая это событийно, а не черезpollingсостояния, как это делают планировщики. Система масштабируемая - мы можем иметь неограниченно количество "наблюдателей, которые могут быть распределены на разных серверах. Более того мы можем масштабировать как горизонтально, так и вертикально, увеличивая количество одновременно обрабатываемых сервисом задач, просто увеличиваяPrefetchCount. И последнее, время на разработку оказалось меньше, чем время на изучение и внедрение планировщика.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru