Русский
Русский
English
Статистика
Реклама

Очереди

Redis на практических примерах

18.06.2020 10:17:09 | Автор: admin
Redis достаточно популярный инструмент, который из коробки поддерживает большое количество различных типов данных и методов работы с ними. Во многих проектах он используется в качестве кэшируещего слоя, но его возможности намного шире. Мы в ManyChat очень любим Redis и активно используем его в нашем продукте для решения огромного количества задач. Про некоторые интересные кейсы использования этой in-memory key-value базы данных я расскажу на примерах. Надеюсь, вам они будут полезны, и вы сможете применить что-то в своих проектах.

Рассмотрим следующие кейсы:
  • Кэширование данных (да, банально и скучно, но это классный инструмент для кэширования и обойти стороной этот кейс, кажется будет не правильно)
  • Работа с очередями на базе redis
  • Организация блокировок (mutex)
  • Делаем систему rate-limit
  • Pubsub делаем рассылки сообщений на клиенты

Буду работать с сырыми redis командами, чтобы не завязываться на какую-либо конкретную библиотеку, предоставляющую обертку над этими командами. Код буду писать на PHP с использованием ext-redis, но он здесь для наглядности, использовать представленные подходы можно в связке с любым другим языком программирования.



Кэширование данных


Давайте начнем с самого простого, один из самых популярных кейсов использования Redis кэширование данных. Будет полезно для тех, кто не работал с Redis. Для тех, кто уже давно пользуется этим инструментом можно смело переходить к следующему кейсу. Для того, чтобы снизить нагрузку на БД, иметь возможность запрашивать часто используемые данные максимально быстро, используется кэш. Redis это in-memory хранилище, то есть данные хранятся в оперативной памяти. Ещё это key-value хранилище, где доступ к данным по их ключу имеет сложность O(1) поэтому данные мы получаем очень быстро.

Получение данных из хранилища выглядит следующим образом:

public function getValueFromCache(string $key){    return $this->getRedis()->rawCommand('GET', $key);}

Но для того, чтобы данные из кэша получить, их нужно сначала туда положить. Простой пример записи:

public function setValueToCache(string $key, $value){    $this->getRedis()->rawCommand('SET', $key, $value);} 

Таким образом, мы запишем данные в Redis и сможем их считать по тому же самому ключу в любой нужный нам момент. Но если мы будем все время писать в Redis, данные в нем будут занимать все больше и больше места в оперативной памяти. Нам нужно удалять нерелевантные данные, контролировать это вручную достаточно проблематично, поэтому пускай redis занимается этим самостоятельно. Добавим к нашему ключу TTL (время жизни ключа):

public function setValueToCache(string $key, $value, int $ttl = 3600){    $this->getRedis()->rawCommand('SET', $key, $value, 'EX', $ttl);}

По истечении времени ttl (в секундах) данные по этому ключу будут автоматически удалены.

Как говорят, в программировании существует две самых сложных вещи: придумывание названий переменных и инвалидация кэша. Для того, чтобы принудительно удалить значение из Redis по ключу, достаточно выполнить следующую команду:

public function dropValueFromCache(string $key){    $this->getRedis()->rawCommand('DEL', $key);}


Также редис позволяет получить массив значений по списку ключей:

public function getValuesFromCache(array $keys){    return $this->getRedis()->rawCommand('MGET', ...$keys);}

И соответственно массовое удаление данных по массиву ключей:

public function dropValuesFromCache(array $keys){    $this->getRedis()->rawCommand('MDEL', ...$keys);}


Очереди


Используя имеющиеся в Redis структуры данных, мы можем запросто реализовать стандартные очереди FIFO или LIFO. Для этого используем структуру List и методы по работе с ней. Работа с очередями состоит из двух основных действий: отправить задачу в очередь, и взять задачу из очереди. Отправлять задачи в очередь мы можем из любой части системы. Получением задачи из очереди и ее обработкой обычно занимается выделенный процесс, который называется консьюмером (consumer).

Итак, для того, чтобы отправить нашу задачу в очередь, нам достаточно использовать следующий метод:

public function pushToQueue(string $queueName, $payload){    $this->getRedis()->rawCommand('RPUSH', $queueName, serialize($payload));}

Тем самым мы добавим в конец листа с названием $queueName некий $payload, который может представлять из себя JSON для инициализации нужной нам бизнес логики (например данные по денежной транзакции, данные для инициализации отправки письма пользователю, etc.). Если же в нашем хранилище не существует листа с именем $queueName, он будет автоматически создан, и туда попадет первый элемент $payload.

Со стороны консьюмера нам необходимо обеспечить получение задач из очереди, это реализуется простой командой чтения из листа. Для реализации FIFO очереди мы используем чтение с обратной записи стороны (в нашем случае мы писали через RPUSH), то есть читать будем через LPOP:

public function popFromQueue(string $queueName){    return $this->getRedis()->rawCommand('LPOP', $queueName);}

Для реализации LIFO очереди, нам нужно будет читать лист с той же стороны, с которой мы в него пишем, то есть через RPOP.



Тем самым мы вычитываем по одному сообщению из очереди. В случае если листа не существует (он пустой), то мы получим NULL. Каркас консьюмера мог бы выглядеть так:

class Consumer {    private string $queueName;    public function __construct(string $queueName)    {        $this->queueName = $queueName;    }    public function run()    {        while (true) { //Вычитываем в бесконечном цикле нашу очередь            $payload = $this->popFromQueue();            if ($payload === null) { //Если мы получили NULL, значит очередь пустая, сделаем небольшую паузу в ожидании новых сообщений                sleep(1);                continue;            }            //Если очередь не пустая и мы получили $payload, то запускаем обработку этого $payload            $this->process($payload);        }    }    private function popFromQueue()    {        return $this->getRedis()->rawCommand('LPOP', $this->queueName);    }}

Для того, чтобы получить информацию о глубине очереди (сколько значений хранится в нашем листе), можем воспользоваться следующей командой:

public function getQueueLength(string $queueName){    return $this->getRedis()->rawCommand('LLEN', $queueName);}

Мы рассмотрели базовую реализацию простых очередей, но Redis позволяет строить более сложные очереди. Например, мы хотим знать о времени последней активности наших пользователей на сайте. Нам не важно знать это с точностью вплоть до секунды, приемлемая погрешность 3 минуты. Мы можем обновлять поле last_visit пользователя при каждом запросе на наш бэкенд от этого пользователя. Но если этих пользователей большое количество в онлайне 10,000 или 100,000? А если у нас еще и SPA, которое отправляет много асинхронных запросов? Если на каждый такой запрос обновлять поле в бд, мы получим большое количество тупых запросов к нашей БД. Эту задачу можно решать разными способами, один из вариантов это сделать некую отложенную очередь, в рамках которой мы будем схлопывать одинаковые задачи в одну в определенном промежутке времени. Здесь на помощь нам придет такая структура, как Sorted SET. Это взвешенное множество, каждый элемент которого имеет свой вес (score). А что если в качестве score мы будем использовать timestamp добавления элемента в этот sorted set? Тогда мы сможем организовать очередь, в которой можно будет откладывать некоторые события на определенное время. Для этого используем следующую функцию:

public function pushToDelayedQueue(string $queueName, $payload, int $delay = 180){    $this->getRedis()->rawCommand('ZADD', $queueName, 'NX', time() + $delay, serialize($payload))}

В такой схеме идентификатор пользователя, зашедшего на сайт, попадет в очередь $queueName и будет висеть там в течение 180 секунд. Все другие запросы в рамках этого времени будут также отправляться в эту очередь, но они не будут туда добавлены, так как идентификатор этого пользователя уже существует в этой очереди и продублирован он не будет (за это отвечает параметр 'NX'). Так мы отсекаем всю лишнюю нагрузку и каждый пользователь будет генерить не более одного запроса в 3 минуты на обновление поля last_visit.

Теперь возникает вопрос о том, как читать эту очередь. Если методы LPOP и RPOP для листа читают значение и удаляют его из листа атомарно (это значит, что одно и тоже значение не может быть взято несколькими консьюмерами), то sorted set такого метода из коробки не имеет. Мы можем сделать чтение и удаление элемента только двумя последовательными командами. Но мы можем выполнить эти команды атомарно, используя простой LUA скрипт!

public function popFromDelayedQueue(string $queueName){    $command = 'eval "        local val = redis.call(\'ZRANGEBYSCORE\', KEYS[1], 0, ARGV[1], \'LIMIT\', 0, 1)[1]        if val then            redis.call(\'ZREM\', KEYS[1], val)        end        return val"';    return $this->getRedis()->rawCommand($command, 1, $queueName, time());}

В этом LUA скрипте мы пытаемся получить первое значение с весом в диапазоне от 0 до текущего timestamp в переменную val с помощью команды ZRANGEBYSCORE, если нам удалось получить это значение, то удаляем его из sorted set командой ZREM и возвращаем само значение val. Все эти операции выполняются атомарно. Таким образом мы можем вычитывать нашу очередь в консьюмере, аналогично с примером очереди построенной на структуре LIST.

Я рассказал про несколько базовых паттернов очередей, реализованных в нашей системе. На текущий момент у нас в продакшене существуют более сложные механизмы построения очередей линейных, составных, шардированных. При этом Redis позволяет все это делать при помощи смекалки и готовых круто работающих структур из коробки, без сложного программирования.

Блокировки (Mutex)


Mutex (блокировка) это механизм синхронизации доступа к shared ресурсу нескольких процессов, тем самым гарантируя, что только один процесс будет взаимодействовать с этим ресурсом в единицу времени. Этот механизм часто применяется в биллинге и других системах, где важно соблюдать потоковую безопасность (thread safety).

Для реализации mutex на базе Redis прекрасно подойдет стандартный метод SET с дополнительными параметрами:

public function lock(string $key, string $hash, int $ttl = 10): bool{    return (bool)$this->getRedis()->rawCommand('SET', $key, $hash, 'NX', 'EX', $ttl);}

где параметрами для установки mutex являются:
  • $key ключ идентифицирующий mutex;
  • $hash генерируем некую подпись, которая идентифицирует того, кто поставил mutex. Мы же не хотим, чтобы кто-то в другом месте случайно снял блокировку и вся наша логика рассыпалась.
  • $ttl время в секундах, которое мы отводим на блокировку (на тот случай, если что-то пойдет не так, например процесс, поставивший блокировку, по какой-то причине умер и не снял ее, чтобы это блокировка не висела бесконечно).


Основное отличие от метода SET, используемого в механизме кэширования это параметр NX, который говорит Redis о том, что значение, которое уже хранится в Redis по ключу $key, не будет записано повторно. В результате, если в Redis нет значения по ключу $key, туда произведется запись и в ответе мы получим 'OK', если значение по ключу уже есть в Redis, оно не будет туда добавлено (обновлено) и в ответе мы получим NULL. Результат метода lock(): bool, где true блокировка поставлена, false уже есть активная блокировка, создать новую невозможно.

Чаще всего, когда мы пишем код, который пытается работать с shared ресурсом, который заблокирован, мы хотим дождаться его разблокировки и продолжить работу с этим ресурсом. Для этого можем реализовать простой метод для ожидания освободившегося ресурса:

public function tryLock(string $key, string $hash, int $timeout, int $ttl = 10): bool{    $startTime = microtime(true);    while (!this->lock($key, $hash, $ttl)) {        if ((microtime(true) - $startTime) > $timeout) {            return false; // не удалось взять shared ресурс под блокировку за указанный $timeout}usleep(500 * 1000) //ждем 500 миллисекунд до следующей попытки поставить блокировку    }    return true; //блокировка успешно поставлена}

Мы разобрались как ставить блокировку, теперь нам нужно научиться ее снимать. Для того, чтобы гарантировать снятие блокировки тем процессом, который ее установил, нам понадобится перед удалением значения из хранилища Redis, сверить хранимый хэш по этому ключу. Для того, чтобы сделать это атомарно, воспользуемся LUA скриптом:

public function releaseLock(string $key, string $hash): bool{    $command = 'eval "        if redis.call("GET",KEYS[1])==ARGV[1] then            return redis.call("DEL",KEYS[1])        else            return 0        end"';    return (bool) $this->getRedis()->rawCommand($command, 1, $key, $hash);}

Здесь мы пытаемся найти с помощью команды GET значение по ключу $key, если оно равно значению $hash, то удаляем его при помощи команды DEL, которая вернет нам количество удаленных ключей, если же значения по ключу $key не существует, или оно не равно значению $hash, то мы возвращаем 0, что значит блокировку снять не удалось. Базовый пример использования mutex:

class Billing {    public function charge(int $userId, int $amount){        $mutexName = sprintf('billing_%d', $userId);        $hash = sha1(sprintf('billing_%d_%d'), $userId, mt_rand()); //генерим некий хэш запущенного потока        if (!$this->tryLock($mutexName, $hash, 10)) { //пытаемся поставить блокировку в течение 10 секунд            throw new Exception('Не получилось поставить lock, shared ресурс занят');}        //lock получен, процессим бизнес-логику        $this->doSomeLogick();        //освобождаем shared ресурс, снимаем блокировку        $this->releaseLock($mutexName, $hash);}}


Rate limiter


Достаточно частая задача, когда мы хотим ограничить количество запросов к нашему апи. Например на один API endpoint от одного аккаунта мы хотим принимать не более 100 запросов в минуту. Эта задача легко решается с помощью нашего любимого Redis:

public function isLimitReached(string $method, int $userId, int $limit): bool{    $currentTime = time();    $timeWindow = $currentTime - ($currentTime % 60); //Так как наш rate limit имеет ограничение 100 запросов в минуту, //то округляем текущий timestamp до начала минуты  это будет частью нашего ключа,//по которому мы будем считать количество запросов    $key = sprintf('api_%s_%d_%d', $method, $userId, $timeWindow); //генерируем ключ для счетчика, соответственно каждую минуту он будет меняться исходя из $timeWindow    $count = $this->getRedis()->rawCommand('INCR', $key); //метод INCR увеличивает значение по указанному ключу, и возвращает новое значение. //Если ключа не существует, он будут инициализирован со значением 0 и после этого увеличен    if ($count > $limit) { //limit достигнут        return true;    }    return false;} 

Таким простым методом мы можем лимитировать количество запросов к нашему API, базовый каркас нашего контроллера мог бы выглядеть следующим образом:

class FooController {    public function actionBar()    {        if ($this->isLimitReached(__METHOD__, $this->getUserId(), 100)) {            throw new Exception('API method max limit reached');        }        $this->doSomeLogick();    }}


Pub/sub


Pub/sub интересный механизм, который позволяет, с одной стороны, подписаться на канал и получать сообщения из него, с другой стороны отправлять в этот канал сообщение, которое будет получено всеми подписчиками. Наверное у многих, кто работал с вебсокетами, возникла аналогия с этим механизмом, они действительно очень похожи. Механизм pub/sub не гарантирует доставки сообщений, он не гарантирует консистентности, поэтому не стоит его использовать в системах, для которых важны эти критерии. Однако рассмотрим этот механизм на практическом примере. Предположим, что у нас есть большое количество демонизированных команд, которыми мы хотим централизованно управлять. При инициализации нашей команды мы подписываемся на канал, через который будем получать сообщения с инструкциями. С другой стороны у нас есть управляющий скрипт, который отправляет сообщения с инструкциям в указанный канал. К сожалению, стандартный PHP работает в одном блокирующем потоке; для того, чтобы реализовать задуманное, используем ReactPHP и реализованный под него клиент Redis.

Подписка на канал:
class FooDaemon {    private $throttleParam = 10;    public function run()    {        $loop = React\EventLoop\Factory::create(); //инициализируем event-loop ReactPHP        $redisClient = $this->getRedis($loop); //инициализируем клиента Redis для ReactPHP        $redisClient->subscribe(__CLASS__); // подписываемся на нужный нам канал в Redis, в нашем примере название канала соответствует названию класса        $redisClient->on('message', static function($channel, $payload) { //слушаем события message, при возникновении такого события, получаем channel и payload            switch (true) { // Здесь может быть любая логика обработки сообщений, в качестве примера пускай будет так:                case \is_int($payload): //Если к нам пришло число  обновим параметр $throttleParam на полученное значение                    $this->throttleParam = $payload;                    break;                case $payload === 'exit': //Если к нам пришла команда 'exit'  завершим выполнение скрипта                    exit;                default: //Если пришло что-то другое, то просто залогируем это                    $this->log($payload);                    break;            }        });        $loop->addPeriodicTimer(0, function() {            $this->doSomeLogick(); // Здесь в бесконечном цикле может выполняться какая-то логика, например чтение задач из очереди и их процессинг        });        $loop->run(); //Запускаем наш event-loop    }}

Отправка сообщения в канал более простое действие, мы можем сделать это абсолютно из любого места системы одной командой:

public function publishMessage($channel, $message){    $this->getRedis()->publish($channel, $message);}

В результате такой отправки сообщения в канал, все клиенты, которые подписаны на данный канал, получат это сообщение.



Итог


Мы рассмотрели 5 примеров использования Redis на практике, надеюсь что каждый найдет для себя что-то интересное. В нашем стэке технологий Redis занимает важное место, мы любим этот инструмент за его скорость и гибкость. Мы используем Redis в продакшене уже много лет, и он зарекомендовал себя как очень крутой и надежный инструмент, который лежит в основе многих частей нашего продукта. Наш небольшой кластер Redis серверов обрабатывает около 1 миллиона запросов в секунду. А как вы используете Redis в своем проекте? Делитесь опытом в комментариях!
Подробнее..

Используем очереди совместно с БД обсуждение проблем, возможные способы решения

30.04.2021 18:12:21 | Автор: admin

Очереди - прекрасный инструмент, который практически идеально масштабируется. Не справляется железо? Просто добавили узлов в кластер. Когда очередь присутствует в проекте, то возникает соблазн всё больше функционала реализовывать с её помощью.

О подводных камнях такого пути поговорим в этой статье.

Рано или поздно, применяя очереди, пользователь сталкивается с вопросом использования их совместно с каким-то сервисом, базой данных и т.п.

  • Заказ выполнен, нужно отправить СМС-уведомление пользователю.

  • Поступил новый заказ, нужно отправить Push-уведомление исполнителям.

  • Работа выполнена, нужно списать деньги со счёта клиента.

Во всех перечисленных примерах изменения в какой-то бизнес-сущности фиксируются в БД (или сервисе с БД), и возникает большой соблазн соблазн отправлять уведомления с использованием очередей.

Что мы имеем в данной ситуации? Первоначальная, простейшая структура кода:

  • Сервис (наша программа) фиксирует изменение данных в БД.

  • Затем сервис кладёт задание в очередь.

Фактически в данном случае требуется реализовать событийный триггер на изменение записи данных.

И в общем случае, получается, здесь мы имеем две записи в две различные БД: сервиса и очереди.

Теперь перенесёмся в реальный мир и рассмотрим, какие ситуации могут возникнуть:

  • Всё в порядке. БД доступна, БД очереди доступна;

  • БД недоступна, БД очереди доступна;

  • БД доступна, БД очереди недоступна;

  • БД недоступна, БД очереди недоступна.

Наиболее опасен для бизнеспроцессов тут третий случай: после того, как данные были зафиксированы в БД, уведомить заинтересованные стороны не получилось.

И вот, если начинать задумываться над этой проблемой, то реализация становится из очень простой... очень сложной. В пределе приходится реализовывать двухфазный коммит.

Но со сложными решениями всё понятно, давайте подумаем о более простых.

Неправильные решения

Очень многие, впервые столкнувшись с этой проблемой (как правило, происходит это, когда система реализована и уже значительное время работает), начинают с того, что предлагают следующее решение:

  1. Открываем транзакцию в БД.

  2. Изменяем запись в БД.

  3. Записываем в очередь.

  4. Закрываем транзакцию в БД.

В этом случае, если одна из БД недоступна, то и в другую запись не запишется.

Однако тут имеются следующие проблемы:

  • Данное решение условно работоспособно только в том случае, если обработчик очереди никогда не будет обращаться к записи в БД. Поскольку обработчик очереди может взять задачу на исполнение между пунктами 3 и 4 (Записи в БД ещё не зафиксированы).

  • Закрытие транзакции также может завершиться с ошибкой. В этом случае задание в очереди останется неотменённым.

Выполнение всей работы в обработчике

Простейшее решение.

Вместо изменения записи кладём в очередь задание о том, что надо это сделать. Обработчик очереди выполняет оба действия: и изменения записи в БД, и триггерное. Это решение сложнее, чем первоначальное, но всё равно довольно простое.

Проблема в том, что инициирующему действие коду может быть нужен результат работы (например, для того, чтобы вернуть http-код успеха/ошибки).

Дополнительная таблица в БД

Более простое решение, нежели двухфазный коммит.

В игре у нас появляется дополнительный процесс/демон, перекладывающий задачи из дополнительной таблицы БД (назовём её queue) в очередь.

В этом случае запросы, модифицирующие БД, пишут записи в две таблицы в той же транзакции (или в том же запросе):

/* было */UPDATE   "orders"SET   "status" = 'complete'WHERE   "order_id" = $1RETURNING   *
/* стало */WITH "o" AS (    UPDATE       "orders"    SET       "status" = 'complete'    WHERE       "order_id" = $1    RETURNING       *),"q" AS (   INSERT INTO        "queue"   (      "key",      "data"   )   SELECT      "o.order_id",      "o.status"   FROM      "o")SELECT   *FROM   "o"

Примечание: Код записи в таблицу queue можно вынести в триггер уровня БД, а также спроектировать так, чтобы сообщения были универсальными и подходили не только для orders.

Если код, перекладывающий записи из таблицы queue в очередь, не использует двухфазный коммит, то к обработчику задания в очереди появляется требование идемпотентности. Это происходит потому, что при сбоях возможно появление дублей заданий в очереди.

Простейший алгоритм цикла демона:

  • Взять задачу из таблицы queue.

  • Положить её в очередь.

  • Удалить задачу в таблице.

Локальный лог/кеш сообщений

Если потеря данных для нас только неприятна, но не критична, то в целом можно ограничиться первоначальным решением. Однако, если частота возникающих проблем всё-таки заставляет обратить на себя внимание (пользователи жалуются на периодически зависающий интерфейс, на неполучение СМС и т.п.), то можно применить следующий подход:

  • резко понизить вероятность неуспеха при отправке сообщения в очередь.

Для этого, как в предыдущем варианте, применяем промежуточное кеширование записей сообщений для очереди, но пишем их не в дополнительную таблицу, а в локальный O_APPEND-файл или локально запущенную очередь.

  • сервис (наша программа) фиксирует изменение данных в БД,

  • затем сервис кладёт задание в локальный лог.

  • Дополнительный демон перекладывает сообщения из этого лога (или локальной очереди) в общую очередь.

Запись в локальный файл коротких сообщений завершается ошибкой в крайне редких ситуациях, и поэтому можно считать, что проблема, возникающая крайне редко, есть решённая проблема.

Итоги

Как видим, вариантов решения проблемы немного. Если мы хотим сохранять систему простой (KISS-принцип), то введение дополнительного демона и кеша/лога сообщений в БД или локальном файле/БД даст небольшое увеличение сложности. При этом очень важно сохранять обработчик идемпотентным, поскольку при сбоях в момент перекладывания заданий из локального кеша в общую очередь возможно появление дублей.

А обобщённое решение предполагает использование двухфазного коммита.

Подробнее..

Брокер очередей Capella Queue

09.05.2021 00:17:30 | Автор: admin

Привет!

Я часто видел заголовки подобные "Apache Kafka vs RabbitMQ vs NATS", но что делать если что-то не устраивает в готовых решениях? Можно подстроиться, а можно изобрести что-то своё. Я пошел вторым путём. В этой статье я хотел бы рассказать про свою реализацию брокера сообщений. Если стало интересно, добро пожаловать под кат.

1.1 Сохранение заказов

Некоторое время назад я работал в очень крупном интернет магазине. Была общая задача: не "пропустить" ни один заказ. То есть сделать так, что бы была возможность сохранить заказ в несколько мест, и при доступности хотя бы одного из них сохранить заказ.

Уточнение 1: обрабатывая заказ нужно однозначно избежать ситуации когда один заказ обрабатывается дважды.

Уточнение 2: было несколько ДЦ и допускалось, что они могут иногда не иметь между собой связь, а еще внутри ДЦ может рушиться сеть.

Уточнение 3: разумеется отправлять на сборку нужно сразу (считаем каждый склад расположенным на своих независимых нескольких ДЦ).

Постановка задачи в таком виде увы не имеет явного решения :( Однако при наличии определённых допущений всё же реализуема.

Как эту задачу реализовали мы, я тут рассказывать не буду. Но сама постановка задачи предлагает использование очередей сообщений, и при этом рассматриваемые готовые решения увы не подходили по разным причинам.

И у меня зародилось желание написать мутанта сочетающего в себе плюсы разных подходов.

1.2 Портативный анализатор

Некоторое время назад мне позвонил друг и пожаловался, что у него не получается организовать передачу данных от "портативного анализатора". Как выяснилось, требуется сделать так, что бы потребителей данных было много, причём часть из них желает обращаться непосредственно к анализатору, а для части нужно, что бы анализатор сам послал им данные. Причём такая необходимость разного способа получения данных обусловлена внешними требованиям, повлиять на которые, увы, нет возможности.

У меня всплыла в голове идея мутанта, и окончательно сформировалась.

2. Цель

Для начала я решил выписать требования, поставить задачу и понять, а нет ли готового чего-то что удовлетворяет целям.

Основная цель - иметь брокер сообщений.

Требование 0: брокер должен хранить сообщения и предоставлять доступ к ним в течении какого-то времени (Блокирующее).

Требование 1: очередь должна работать на слабом железе (Блокирующее). Интересно что это требование было и в "крупном интернет магазине" ибо выделение железа было затруднительным, а выделяемые ресурсы не всегда были стабильными.

Требование 2: если узел брокера умеет принимать сообщение, то нужно иметь возможность послать туда сообщение, даже если у него нет связи с другими узлами (Блокирующее).

Требование 3: если сообщение было послано в один узел за сообщения определённого типа, то оно в конечном счёте должно появиться во всех узлах отвечающих за сообщение этого типа доступным для получения (Желательное).

Требование 4: если одинаковое сообщение было послано в два узла брокера, отвечающего за сообщения одного типа, то сообщения не должны задублироваться в конечном счёте (Желательное).

*`В конечном счёте` - по прошествии времени. Если были разрывы соединений, то после их восстановления.

Начиная подбирать готовые решения мы начали упираться в их ограничения.

Kafka - требует понимание как работает и требует железо и это блокирующий минус. Плюсы в Kafka - концепция журналирования, то есть возможность прочитать повторно сообщения начиная с какого-то момента времени.

RabbitMQ - блокирующая проблема - нет простого механизма перепрочитать сообщения, хотя извратиться можно. Плюс - крутой механизм маршрутизации.

NATS Streaming - он простой в настройке и быстрый это плюс. Однако у NATS Streaming кластеров нет возможности у узлов работать независимо при потере связи. Мы попробовали использовать NATS Streaming для сохранения заказов и разворачивали несколько независимых кластеров для обеспечения отказоустойчивости. Увы, при использовании в качестве хранилища PostgreSQL NATS Streaming вёл себя не стабильно на боевых нагрузках. Я конечно допускаю, что мы его не правильно готовили, но опыт других команд показал, что готовили его неправильно не только мы. Между прочим для передачи событий между сервисами, где не требуется стабильная работа, NATS Streaming вполне подошел.

Других готовых решений я не рассматривал.

Я подумал и решил на досуге реализовать свою версию очередей, хотя бы частично реализовывающую требования описанные выше.

3. Реализация (краткое описание)

3.1 Общие понятия

Cluster - сервис в котором хранятся очереди, обработчики и ссылки на другие кластера.

Queue - очередь. Очередь хранится в кластере. Очередь принимает сообщения, сохраняет их в хранилище и выдаёт по требованию. Что бы получить сообщение нужно передать ID последнего прочитанного сообщения.

Handler - обработчик. Они используются для того, что бы передать данные из очереди в очередь, для того что бы выгрузить из памяти данные очередей, и что бы перенести или удалить старые данные очереди.

ExternalCluster - ссылки на другие кластера. Можно например положить сообщения из своего кластера в другой. Или наоборот забрать из другого кластера сообщения и положить в очереди этого кластера.

3.2 Очередь изнутри

Очередь представляет из себя набор блоков. Запись ведётся в последний блок. Блок сохраняется целиком. Каждый блок содержит ограниченное количество сообщений. Ограничения накладываются на суммарный объём, на время прошедшее от создания блока и на количество сообщений в блоке.

Предполагается 4 вида сохранения:

  • сохранить сразу после добавления сообщения

  • ничего не делать после добавления сообщения

  • помечать очередь, что она изменилась после добавления сообщения

  • дождаться, что сообщение сохранится после добавления сообщения, сохранение происходит периодически

Очереди поддерживают подписку. Однако подписка реализована так, что одна подписка - один читатель и подписки работают независимо.

Идентификатор сообщения в очереди формируется независимо для каждой очереди. В сообщении есть поля для определения источника, и внешнего идентификатора.

3.3 Хранение данных

Для хранения данных сообщения было решено выделить хранение в отдельный слой. То есть хранилище может быть как диком (уже реализовано) так и неким облачным хранилищем (s3) и даже БД (предстоит реализовать).

Блоки от одной очереди могут храниться в разных хранилищах. Для реализации этого механизма у каждого блока есть отметка в каком хранилище хранить. С помощью обработчиков эту отметку можно поменять.

Один из предполагаемых сценариев: сохранять активные записи (свежие) на быстрый диск, а неиспользуемые записи архивировать и сохранять на более простые и дешёвые диски с меньшей пропускной способностью.

Обработчики могут не только перемещать блоки. На данный момент реализованы следующие обработчики:

  • Выгрузка блоков из памяти

  • Перемещение блоков между местами хранения

  • Удаление старых блоков

  • Копирование сообщений между очередями

Копирование сообщений между очередями

Для копирования используется механизм вставки "уникального сообщения". Уникальность определяется по уникальности пары "источник + внешний ID". Благодаря такому подходу можно настроить копирования очереди в двунаправленном режиме. Особенности реализации таковы, что наибольшая производительность достигается если копирование идёт с нарастающим внешним ID и при этом этого сообщения ещё нет в очереди. То есть в однонаправленном копировании. Так же рекомендуется ограничить количество источников разумным количеством (например 1000) в рамках одной очереди.

FIFO

Сообщения в рамках очереди сохраняются в том порядке в котором они были сохранены в очередь. При копировании порядок вставляемых сообщений (новых сообщений, которых не было в очереди) сохраняется

3.4 Примеры сценариев работы

Сохранение события для сервисов в дата центрах (например сохранение заказа)

Подготовка кластеров:

  • Разворачивается N кластеров Capella Queue

  • В каждом кластере создаётся очередь для приёма события

  • Для каждой очереди определяются обработчики, для удаления, выгрузки из памяти, и перемещения старых сообщений

  • Для каждой очереди определяется обработчик копирования сообщений в другие очереди (в каждую или по кругу или по любой другой схеме)

Реализация сохранения:

Для сервиса который сохраняет событие определяется К кластеров Capella Queue из подготовленных и определяется надёжность M (M < K)

Далее при необходимости сохранения события Сервис:

  • Определяет для сообщения глобально уникальную связку источник+внешний ID. Так же рекомендуется указать сегмент

  • Сохраняет сообщение в M Кластеров

  • Если какое-то сохранение прошло не успешно, то подбирается другой кластер из К определённых для сервиса и производится попытка сохранить сообщение туда.

Обработчик событий, может подписаться на одну любую очередь и получать оттуда все сообщения. Чтение по сегментам еще не реализовано но запланировано.

Сохранение события для удалённых устройств (например для анализатора)

Подготовка кластеров:

  • Разворачивается кластер на устройстве

  • Разворачиваются кластеры в датацентрах, куда хочется выгружать данные

  • В каждом кластере создаётся очередь для приёма события

  • Настраиваются обработчики для удаления, выгрузки из памяти и архивирования

  • Настраиваются обработчики для копирования сообщений (настраиваться может как на устройстве так и в ДЦ)

Реализация сохранения:

События сохраняются в очередь на локальном кластере. Нужно определить для сообщения глобально уникальную связку источник+внешний ID. Благодаря обработчикам копирования сообщения будут доступны как на устройстве так и в ДЦ

4. Ближайшие планы

  1. Сделать туториал, с описанием основных кейсов.

  2. Прикрутить безопасность.

  3. Прикрутить использование сервисом SSL сертификатов.

  4. Добавить сегментацию - возможность чтения и переливки данных по сегментам.

  5. Обновление параметров очередей, кластеров и обработчиков.

  6. Функционал для контроля того, что сообщение отреплицировалось в другие кластера.

  7. Метрики.

Код

На github

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru