Русский
Русский
English
Статистика
Реклама

Redis

Перевод Использование Redis в инфраструктурных микросервисах

19.02.2021 16:11:01 | Автор: admin
В 2019 году я писал о том, как создать хранилище событий, основанное на Redis. Я рассказывал о том, что потоки Redis хорошо подходят для организации хранения событий, так как они позволяют хранить события с использованием иммутабельного механизма, напоминающего журнал транзакций, поддерживающего только присоединение новых данных к уже имеющимся, но не изменение существующих данных. Теперь же, используя обновлённое приложение OrderShop, речь о котором шла в вышеупомянутом материале, я хочу продемонстрировать пример использования Redis для организации работы очереди событий, продолжая рассказывать о возможностях применения Redis Enterprise, выходящих за пределы кеширования.



Обзор микросервисов, инфраструктурных сервисов и распределённых систем


Redis это отлично решение, применимое для создания инфраструктурных сервисов наподобие очередей сообщений и хранилищ событий. Но при использовании микросервисных архитектур при разработке распределённых систем нужно принимать во внимание несколько важных моментов. Реляционные базы данных часто хорошо показывали себя в монолитных приложениях, но лишь NoSQL-базы данных, вроде Redis, способны удовлетворить требованиям по масштабируемости и доступности, выдвигаемым микросервисными архитектурами.

В распределённых системах используется и распределённое состояние приложений. В соответствии с теоремой CAP в любой реализации распределённых вычислений возможно обеспечить не более двух из трёх следующих свойств: согласованность данных (consistency), доступность информации (availability) и устойчивость к разделению (partition tolerance) (первые буквы английских наименований этих свойств и сформировали акроним CAP). Получается, что для того чтобы сделать свою реализацию распределённой системы устойчивой к отказам, необходимо выбирать между доступностью информации и согласованностью данных. Если мы выберем доступность информации то, в итоге, в нашем распоряжении окажется то, что называют согласованностью в конечном счёте (eventual consistency). То есть данные будут согласованными, но лишь через некоторое время после последнего обновления. А если выбрать согласованность данных это окажет воздействие на производительность, так как возникнет необходимость в синхронизации и изоляции операций записи данных во всей распределённой системе.

Порождение событий (event sourcing) это концепция, в соответствии с которой состояние бизнес-сущностей, таких, как заказ или клиент, хранится в виде последовательности событий, изменяющих состояние. Это обеспечивает доступность информации вместо согласованности данных. В результате можно применять очень просто устроенные операции записи, но выполнение операций чтения при этом усложняется, так как в том случае, если такие операции распространяются на несколько сервисов, для их выполнения может потребоваться использование особой модели чтения данных.

Для организации коммуникаций в распределённой системе может использоваться брокер, но можно обойтись и без него. Системы, в которых нет брокера, широко известны. Самая известная из них это, пожалуй, система обмена данными, реализованная в рамках протокола HTTP. Системы, в которых используется брокер, как можно судить из их названия, обладают особой сущностью-брокером, находящейся между отправителем и получателем сообщения. Брокер разделяет отправителя и получателя, позволяя организовывать синхронный и асинхронный обмен данными. Это улучшает отказоустойчивость системы, так как потребитель сообщения не обязательно должен быть доступен в момент отправки сообщения. Коммуникации, основанные на использовании брокера, кроме того, позволяют независимо масштабировать системы, занимающиеся отправкой и получением сообщений. Вот материал, содержащий некоторые сведения по вопросу выбора технологии для организации синхронного и асинхронного обмена данными в распределённых системах.

Проект OrderShop: пример реализации распределённой системы электронной коммерции


OrderShop это своего рода Hello World из сферы микросервисных архитектур, простая реализация системы электронной коммерции, в которой используется подход, основанный на событиях. Этот демонстрационный проект использует простую модель домена, но данная модель удовлетворяет нуждам приложения. Здесь мы будем рассматривать вторую версию OrderShop.

Оркестровка OrderShop выполняется с использованием Docker Compose. Все сетевые коммуникации основаны на gRPC. Центральными компонентами системы являются Event store (хранилище событий) и Message queue (очередь событий). Абсолютно все сервисы подключены только к ним по gRPC. Код OrderShop написан на Python. Вот GitHub-репозиторий проекта. Обратите внимание на то, что этот проект не рассчитан на продакшн-использование. Это лишь учебный пример распределённой системы.

Работа с OrderShop v2


Сначала клонируйте GitHub-репозиторий проекта. После этого вы сможете выполнять различные действия. А именно:

  1. Для запуска приложения используйте команду docker-compose up.
  2. Если после запуска приложения открыть браузер и перейти по адресу http://localhost:5000/, можно будет наблюдать за событиями и анализировать состояние проекта.
  3. Запустить клиент можно командой python -m unittest tests/unit.py.
  4. Если открыть другую вкладку браузера и перейти по адресу http://localhost:8001/ можно будет, использовав redis:6379, подключиться к тестовой базе данных.
  5. Для остановки приложения воспользуйтесь командой docker-compose down.

Архитектура OrderShop v2


В нашем случае серверная архитектура проекта состоит из нескольких сервисов. Состояние приложения распределено по нескольким доменным сервисам, но хранится в едином хранилище событий. В компоненте Read model (модель чтения) сосредоточена логика чтения и кеширования данных.


Архитектура и потоки данных приложения OrderShop v2

Команды и запросы передаются через компонент Message queue. А передача событий осуществляется через компонент Event store, который, кроме прочего, играет роль шины событий.

Инфраструктурные сервисы


В OrderShop v2 все одноадресные передачи данных выполняются через компонент Message queue. Для реализации этого механизма используются списки Redis (Redis List), в частности два списка, объединённые в так называемую надёжную очередь (reliable queue). Система обрабатывает простые команды (например операции, затрагивающие единственную сущность) синхронно, а команды, на выполнение которых нужно длительное время (например операции пакетной обработки данных, операции, выполняемые при работе с почтой) асинхронно и, в исходном виде, умеет реагировать на синхронные сообщения.

Компонент Event store основан на потоках Redis (Redis stream). Доменные сервисы (в нашем случае это просто макеты реальных сервисов, используемые для демонстрации функционала OrderShop) подписаны на потоки событий, имена которых соответствуют именам тем событий (то есть именам сущностей) и отправляют события в эти потоки. Каждое событие это элемент потока, которому назначена временная метка события, играющая роль ID. Совокупность всех событий, отправленных в потоки, представляет собой состояние всей системы.

Сервисы приложения


Компонент Read model кеширует полученные из Event store сущности в Redis, используя модель домена. Несмотря на использование кеша это система, не хранящая состояние.

Компонент API gateway (шлюз API) тоже не хранит состояние и обслуживает REST-API на порте 5000. Он перехватывает HTTP-соединения и перенаправляет их либо к компоненту Read model для чтения данных из состояния приложения (для выполнения запросов), либо к выделенным доменным сервисам для записи данных в состояние (для выполнения команд). Это концептуальное разделение между операциями чтения и записи данных представляет собой паттерн, называемый Command Query Responsibility Segregation (CQRS). При применении этого паттерна код, изменяющий состояние приложения, отделяется от кода, читающего это состояние.

Доменные сервисы


Доменные сервисы получают команды на запись данных от компонента API gateway через компонент Message queue. После успешного выполнения команды они отправляют соответствующее событие в компонент Event store. А операции чтения, в свою очередь, обрабатываются компонентом Read model, который получает сведения о состоянии от компонента Event store.

CRM service (Customer Relation Management service, сервис системы управления взаимоотношениями с клиентами) это компонент, не хранящий состояние. Он подписан на доменные события из хранилища событий и отправляет клиентам электронные письма, используя Mail service (почтовый сервис).

Центральная доменная сущность называется Order (заказ). У неё есть поле, называемое status, указывающее на состояние заказа. Переходы между состояниями осуществляются с использованием конечного автомата. Это показано на следующем рисунке.


Состояния, в котором может пребывать заказ

Эти переходы выполняются в нескольких обработчиках событий, которые подписаны на доменные события (тут используется паттерн SAGA).

Клиенты


Работа клиентов имитируется с использованием фреймворка для проведения модульных тестов из Python. В настоящий момент реализовано 10 модульных тестов. Узнать подробности об этом можно, заглянув в файл tests/units.py.

Если обратиться к порту 5000 можно увидеть простой интерфейс, используемый для наблюдения за событиями и для просмотра состояния приложения (с использованием WebSockets).

Для взаимодействия с экземпляром Redis можно использовать контейнер RedisInsight, обратившись к нему по адресу http://localhost:8001/ и воспользовавшись redis:6379 для подключения к тестовой базе данных.


RedisInsight удобное средство для работы с базами данных Redis

Итоги


Redis это не только мощный инструмент для построения доменного слоя (например для хранения каталога товаров и для организации поиска по нему) и слоя приложения проектов (например для хранения HTTP-сессий). Redis может применяться и в инфраструктурном слое (например в хранилище событий и в очереди сообщений). Использование Redis в этих слоях проектов способствует снижению накладных расходов на разработку и поддержку приложений и позволяет программистам пользоваться технологиями, с которыми они уже знакомы.

Если вас заинтересовало то, о чём вы узнали из этого материала взгляните на код демонстрационного приложения и попробуйте реализовать что-то подобное сами. Надеюсь, это поможет вам ощутить универсальность и гибкость Redis при создании доменных и инфраструктурных сервисов, а так же покажет то, что Redis может оказаться полезным не только в деле кеширования данных.

Как вы используете Redis?

Подробнее..

Web Security by Bugbounty

06.11.2020 14:12:14 | Автор: admin

Александр Колесников (вирусный аналитик в международной компании) приглашает на мастер-класс Основы технологии, необходимые для понимания уязвимостеи. Классификация OWASP TOP 10, который пройдёт в рамках профессионального курса . А также Александр поделился статьёй для начинающих bug hunter-ов, где рассматривает TOP 10 Уязвимостей 2020 года, которые были найдены платформой HackerOne.

Идея заключается в следующем подготовить лабораторный стенд для изучения уязвимостей, находящихся в данном топе. О том, как запустить и найти пример приложения с уязвимостью, можно почитать вот здесь. Задания будут решены до момента использования уязвимости, которая заложена в приложение, последний шаг оставим за читателями.

TOP 10 уязвимостей by HackerOne

Информацию по топу уязвимостей можно найти здесь. Приведу ниже статистику по выплатам для каждой из уязвимостей:

Напрашивается сравнение с очень популярным списком OWASP TOP 10. Но надо учитывать, что последний раз обновление в OWASP TOP 10 вносились в 2017 году. На данный момент ведется сбор информации для обновления списка. Итак, проведем сравнительный анализ данных рейтингов:

В списке OWASP представлены множества уязвимостей, тогда как в списке HackerOne содержатся конкретные уязвимости. Объединим все уязвимости из списка HackerOne в типы:

  • Injection

  • Broken Authentication

  • Sensitive Data Exposure

  • Security Misconfiguration

Получается, что для создания подходящей лаборатории для исследований достаточно найти 3 уязвимых приложения.

Injection

Injection метод эксплуатации уязвимостей, позволяющий добавить в обрабатываемые приложением данные свои команды для исполнения. Это может привести к различным последствиям от получения доступа ко всем ресурсам приложения до выполнения кода на стороне сервера. Тестовое приложение текущего раздела будет содержать уязвимость, которая позволяет инжектить собственный python код в код приложения.

В качестве тестового рассмотрим приложение, которое было использовано на соревновании Real World CTF. Приложение предоставляет пользователю информацию о книгах.

Попробуем выяснить, какая структура у данного приложения. Отправим любой HTTP запрос с помощью BurpSuite Community, получим следующее:

Форма логина и страница admin представляет собой одну и ту же страницу Login.

Стандартные тесты на типичные уязвимости для этой формы не принесли результата, также не подошли стандартные пароли. Вероятно, это не является вектором для атаки. Обратимся к подсказке, которую оставили создатели уязвимого приложения. Подсказка заключается в исходном коде приложения. Скачать его можно по адресу source.zip. Таким образом, у нас есть полностью работоспособное приложение, которое можно использовать для локального исследования и поиска уязвимостей/недочетов, которые допущены при его написании.

Нас интересуют только файлы, которые содержат код:

После анализа исходного кода становится ясно, что приложение использует фреймворк для создания веб-приложений Flask. Известно, что этот фреймворк может запускать приложение в 2х режимах - debug и release. В debug версии стандартный порт - 5000. Проверим, используется ли этот порт на сервере:

Похоже, что запущен еще один вариант приложения. Возможно, это может быть полезно для проверки версий атак на приложение. Поищем уязвимые части исходного кода.

Файл, который представляет наибольший интерес:

.\views\user.py

Уязвимость находится в отрезке кода, который представлен выше на скриншоте. Проблема данного кода заключается в том, что декоратор @login_required указан в неверной последовательности, из-за чего его использование бессмысленно. Любой пользователь приложения может использовать код, который вызывается через обращение к /admin/system/change_name/. Так же есть кусок кода, который так же может быть интересен:

В исходнике используется часть Lua кода для работы с Redis. Данные инициализируются прямо из токена, который отправляет на сервер пользователь. Атака на приложение может быть проведена через Redis. Данные из токена в дальнейшем будут отправлены на обработку модулю python pickle. Это можно использовать для инжекта кода.

Security Misconfiguration

Уязвимость заключается в том, что в технологическом стеке приложения может быть использована неверная конфигурация или может содержаться общеизвестная уязвимость, для которой не был применен патч. Security misconfiguration может быть использована для получения чувствительной информации из веб приложения конфиги, исходники и т.д.

Снова будем использовать задание из соревнования Real World CTF. Приложение представляет собой простой загрузчик файлов и может быть использовано для загрузки картинок из любого источника:

Проверим интерфейс на возможность чтения локальных файлов, к примеру, file:///etc/passwd. Этот прием нам поможет выяснить есть ли уязвимость из заголовка в этом задании. Результат можно увидеть ниже:

Задание подразумевает возможность заставить сервер выполнять команды, которые мы ему сообщаем, благодаря добытой с помощью эксплуатации уязвимости информации. Поэтому следующим шагом нам будет полезно узнать, как именно приложение запущено на сервере. С помощью команды file:///proc/self/cmdline можно получить строку запуска приложения:

Приложение работает с использованием uwsgi-сервера и может использовать соответствующий протокол для трансляции данных от сервера к приложению и наоборот. Сервер работает через сокет 8000. Рабочая директория приложения - /usr/src/rwctf. Для завершения атаки достаточно научиться создавать команды для uwsgi, с помощью которых можно будет управлять сервером.

Broken Authentication, Sensitive Data Exposure

Broken Authentication - уязвимость, которая может быть использована для доступа к ресурсам приложения в обход системы разграничения доступа. Sensitive Data Exposure - уязвимость, которая может быть использована для доступа к чувствительным данным приложения. Это могут быть конфигурации приложения, логины и пароли от сторонних сервисов и т.д.

Следующее приложение, которое будет изучено, собрало максимальное количество уязвимостей из искомого списка. Данное Приложение было использовано на соревновании 35с3 CTF. Оно предоставляет минимальный интерфейс для регистрации пользователей и получения доступа к закрытой части. Это выглядит следующим образом:

Поля ввода не принимают ничего, кроме корректной последовательности данных. Попробуем проверить, насколько безошибочно настроен сервер для обработки данных. Запустим инструмент dirbuster, чтобы найти список директорий, которые относятся к приложению. В итоге была найдена директория uploads, которая возвращает HTTP код 403. Модифицируем путь c помощью специальных символов файловой системы. Для этого просто добавим символ перехода в директорию на уровень выше: /uploads../:

Похоже, что мы одновременно нашли все уязвимости. Нам не нужно вводить логин и пароль для доступа к данным приложения. Мы имеем полный доступ к исходному приложения. Теперь необходимо найти способ выполнить свои команды на сервере. Здесь можно использовать подход из первого задания.

Вместо вывода

Содержимое данной статьи может быть использовано в качестве лабораторной работы для тех, кто только начинает путь поиска уязвимостей в веб-приложениях. С их помощью можно получить базовые практические навыки тестирования веб-приложений и приобрести первый самостоятельный опыт в данной сфере.

Disclamer: Все использованные приложения принадлежат их авторам.


Информацию по топу уязвимостей можно найти здесь. Интересно развиваться в данном направлении? Узнайте больше о профессиональной программе Безопасность веб-приложений, приходите на День Открытых Дверей и запишитесь на бесплатный демо-урок Основы технологии, необходимые для понимания уязвимостеи. Классификация OWASP TOP 10.

Подробнее..

Демократия в Telegram-группах

07.05.2021 14:19:42 | Автор: admin

Многие из нас проводят время в профильных телеграм-группах. Власть над общением здесь принадлежит случайным людям со своими недостатками. Нередко встречаются конфликты и злоупотребления. Это побуждает задуматься, а можно ли поддерживать порядок иначе, так, чтобы не расцветал мошеннический спам и одновременно ни у кого не было абсолютной власти над адекватным собеседником?
И в моем случае эти размышления вылились в разработанную и протестированную систему которую уже сегодня можно подключить в ваш Telegram.

Прежде всего, нужно определить что является ключевым аспектом, валютой системы. Не карма и не баллы, не лайки и не рейтинг. Им стало само время. Что из этого следует?

  • У каждого пользователя копится время с момента написания первого сообщения.
    Это аналог безусловного базового дохода. Минута за минутой, час за часом. Он необходим чтобы эмиссия системной валюты происходила без злоупотреблений, независимо от каких-либо центров управления.

  • Абсолютно каждый может использовать часть накопленного времени, чтобы забанить другого пользователя в чате на это время.
    Это электронный аналог второй поправки, необходимый для того чтобы власть не сосредоточилась в группе в одних руках.

  • Каждый также может использовать накопленное время чтобы разбанить другого пользователя.
    Обратимость бана защищает от злоупотреблений свободного оборота банхаммеров. Одновременно эта функция является обратным к эмисии процессом. Время потраченное на выкуп из бана сгорает безвозвратно.

  • Накопленное время можно передать другим людям, полностью или по частям.
    Эта функция в перспективе обеспечивает подключенным группам свободный рынок обмена чем угодно.

Итак, как это работает в теории разобрались, а что необходимо для практического функционирования такой системы?

  • Простой способ подключения чата к системе.
    - Реализована в виде бота

  • Максимально удобный для Telegram команд UX
    - Есть распознавание речи основанное на нормализованных семантических представлениях

Где примеры использования?

Бот понимает в запросах и русский и английский язык в свободной форме. Используются сокращения: d - дни, h - часы, m - минуты, s - секунды. Все уведомления публичны, но исчезают через 15 секунд, чтобы не засорять общий чат.

Вот как можно посмотреть сколько времени накопилось. Вот как можно посмотреть сколько времени накопилось. Забанить токсичного человека, спамера или мошенника можно указав в ответе в ему, в свободной форме, на сколько времени он получает бан (и за что по желанию). Забанить токсичного человека, спамера или мошенника можно указав в ответе в ему, в свободной форме, на сколько времени он получает бан (и за что по желанию). Если это было недопонимание и человек все же хороший, можно таким же образом, просто написать в ответе что его нужно разблокироватьЕсли это было недопонимание и человек все же хороший, можно таким же образом, просто написать в ответе что его нужно разблокироватьЧтобы перевести время другому пользователю указажите в ответе сколько времени нужно передать.Чтобы перевести время другому пользователю указажите в ответе сколько времени нужно передать.

Довольно слов, покажите мне код!

Для бекенда использовался язык Kotlin + JVM, в качестве базы данных используется Redis-кластер. Весь код продокументирован и доступен на GitHub: demidko/timecobot
Чтобы начать использовать бота в вашей телеграм-группе просто добавьте его с правами администратора: @timecbobot

Напишите пожалуйста свое мнение в комментариях, нужное ли это для сообщества дело и стоит ли развивать систему дальше? Что можно улучшить или поменять? Интересна ли подробная статья с кодом на тему как это реализовано внутри?

Всем удачного дня!

Подробнее..

О переезде с Redis на Redis-cluster

18.08.2020 14:15:47 | Автор: admin


Приходя в продукт, который развивается больше десятка лет, совершенно не удивительно встретить в нем устаревшие технологии. Но что если через полгода вы должны держать нагрузку в 10 раз выше, а цена падений увеличится в сотни раз? В этом случае вам необходим крутой Highload Engineer. Но за неимением горничной такового, решать проблему доверили мне. В первой части статьи я расскажу, как мы переезжали с Redis на Redis-cluster, а во второй части дам советы, как начать пользоваться кластером и на что обратить внимание при эксплуатации.


Выбор технологии


Так ли плох отдельный Redis (standalone redis) в конфигурации 1 мастер и N слейвов? Почему я называю его устаревшей технологией?


Нет, Redis не так плох Однако, есть некоторые недочеты которые нельзя игнорировать.

  • Во-первых, Redis не поддерживает механизмы аварийного восстановления после падения мастера. Для решения этой проблемы мы использовали конфигурацию с автоматическим перекидыванием VIP-ов на новый мастер, сменой роли одного из слейвов и переключением остальных. Этот механизм работал, но назвать его надежным решением было нельзя. Во первых, случались ложные срабатывания, а во вторых, он был одноразовым, и после срабатывания требовались ручные действия по взводу пружины.


  • Во-вторых, наличие только одного мастера приводило к проблеме шардирования. Приходилось создавать несколько независимых кластеров 1 мастер и N слейвов, затем вручную разносить базы по этим машинам и надеяться, что завтра одна из баз не распухнет настолько, что её придется выносить на отдельный инстанс.



Какие есть варианты?


  • Самое дорогое и богатое решение Redis-Enterprise. Это коробочное решение с полной технической поддержкой. Несмотря на то, что оно выглядит идеальным с технической точки зрения, нам не подошло по идеологическим соображениям.
  • Redis-cluster. Из коробки есть поддержка аварийного переключения мастера и шардирования. Интерфейс почти не отличается от обычной версии. Выглядит многообещающе, про подводные камни поговорим далее.
  • Tarantool, Memcache, Aerospike и другие. Все эти инструменты делают примерно то же самое. Но у каждого есть свои недостатки. Мы решили не класть все яйца в одну корзину. Memcache и Tarantool мы используем для других задач, и, забегая вперед, скажу, что на нашей практике проблем с ними было больше.

Специфика использования


Давайте взглянем, какие задачи мы исторически решали Redisом и какую функциональность использовали:


  • Кеш перед запросами к удаленным сервисам вроде 2GIS | Golang
    GET SET MGET MSET "SELECT DB"
  • Кеш перед MYSQL | PHP
    GET SET MGET MSET SCAN "KEY BY PATTERN" "SELECT DB"
  • Основное хранилище для сервиса работы с сессиями и координатами водителей | Golang
    GET SET MGET MSET "SELECT DB" "ADD GEO KEY" "GET GEO KEY" SCAN

Как видите, никакой высшей математики. В чём же тогда сложность? Давайте разберем отдельно по каждому методу.


Метод Описание Особенности Redis-cluster Решение
GET SET Записать/прочитать ключ
MGET MSET Записать/прочитать несколько ключей Ключи будут лежать на разных нодах. Готовые библиотеки умеют делать Multi-операции только в рамках одной ноды Заменить MGET на pipeline из N GET операций
SELECT DB Выбрать базу, с которой будем работать Не поддерживает несколько баз данных Складывать всё в одну базу. Добавить к ключам префиксы
SCAN Пройти по всем ключам в базе Поскольку у нас одна база, проходить по всем ключам в кластере слишком затратно Поддерживать инвариант внутри одного ключа и делать HSCAN по этому ключу. Или отказаться совсем
GEO Операции работы с геоключом Геоключ не шардируется
KEY BY PATTERN Поиск ключа по паттерну Поскольку у нас одна база, будем искать по всем ключам в кластере. Слишком затратно Отказаться или поддерживать инвариант, как и в случае со SCAN-ом

Redis vs Redis-cluster


Что мы теряем и что получаем при переходе на кластер?


  • Недостатки: теряем функциональность нескольких баз.
    • Если мы хотим хранить в одном кластере логически не связанные данные, придется делать костыли в виде префиксов.
    • Теряем все операции по базе, такие как SCAN, DBSIZE, CLEAR DB и т.п.
    • Multi-операции стали значительно сложнее в реализации, потому что может требоваться обращение к нескольким нодам.
  • Достоинства:
    • Отказоустойчивость в виде аварийного переключения мастера.
    • Шардирования на стороне Redis.
    • Перенос данных между нодами атомарно и без простоев.
    • Добавление и перераспределение мощностей и нагрузок без простоев.

Я бы сделал вывод, что если вам не нужно обеспечивать высокий уровень отказоустойчивости, то переезд на кластер того не стоит, т.к это может быть нетривиальной задачей. Но если изначально выбирать между отдельной версией и кластерной, то стоит выбирать кластер, так как он ничем не хуже и в дополнение снимет с вас часть головной боли


Подготовка к переезду


Начнем с требований к переезду:


  • Он должен быть бесшовным. Полная остановка сервиса на 5 минут нас не устраивает.
  • Он должен быть максимально безопасным и постепенным. Хочется иметь какой-то контроль над ситуацией. Бухнуть сразу всё и молиться над кнопкой отката мы не желаем.
  • Минимальные потери данных при переезде. Мы понимаем, что переехать атомарно будет очень сложно, поэтому допускаем некоторую рассинхронизацию между данными в обычном и кластерном Redis.

Обслуживание кластера


Перед самым переездом следует задуматься о том, а можем ли мы поддерживать кластер:


  • Графики. Мы используем Prometheus и Grafana для графиков загрузки процессоров, занятой памяти, количества клиентов, количества операций GET, SET, AUTH и т.п.
  • Экспертиза. Представьте, что завтра под вашей ответственностью будет огромный кластер. Если он сломается, никто, кроме вас, починить его не сможет. Если он начнет тормозить все побегут к вам. Если нужно добавить ресурсы или перераспределить нагрузку снова к вам. Чтобы не поседеть в 25, желательно предусмотреть эти случаи и проверить заранее, как технология поведет себя при тех или иных действиях. Поговорим об этом подробнее в разделе Экспертиза.
  • Мониторинги и оповещения. Когда кластер ломается, об этом хочется узнать первым. Тут мы ограничились оповещением о том, что все ноды возвращают одинаковую информацию о состоянии кластера (да, бывает и по-другому). А остальные проблемы быстрее заметить по оповещениям сервисов-клиентов Redis.

Переезд


Как будем переезжать:


  • В первую очередь, нужно подготовить библиотеку для работы с кластером. В качестве основы для версии на Gо мы взяли go-redis и немного изменили под себя. Реализовали Multi-методы через pipeline-ы, а также немного поправили правила повторения запросов. С версией для PHP возникло больше проблем, но в конечном счете мы остановились на php-redis. Недавно они внедрили поддержку кластера, и на наш взгляд она выглядит хорошо.
  • Далее нужно развернуть сам кластер. Делается это буквально в две команды на основе конфигурационного файла. Подробнее настройку обсудим ниже.
  • Для постепенного переезда мы используем dry-mode. Так как у нас есть две версии библиотеки с одинаковым интерфейсом (одна для обычной версии, другая для кластера), ничего не стоит сделать обертку, которая будет работать с отдельной версией и параллельно дублировать все запросы в кластер, сравнивать ответы и писать расхождения в логи (в нашем случае в NewRelic). Таким образом, даже если при выкатке кластерная версия сломается, наш production это не затронет.
  • Выкатив кластер в dry-режиме, мы можем спокойно смотреть на график расхождений ответов. Если доля ошибок медленно, но верно движется к некоторой небольшой константе, значит, всё хорошо. Почему расхождения всё равно есть? Потому что запись в отдельной версии происходит несколько раньше, чем в кластере, и за счет микролага данные могут расходиться. Осталось только посмотреть на логи расхождений, и если все они объяснимы неатомарностью записи, то можно идти дальше.
  • Теперь можно переключить dry-mode в обратную сторону. Писать и читать будем из кластера, а дублировать в отдельную версию. Зачем? В течение следующей недели хочется понаблюдать за работой кластера. Если вдруг выяснится, что в пике нагрузки есть проблемы, или мы что-то не учли, у нас всегда есть аварийный откат на старый код и актуальные данные в благодаря dry-mode.
  • Осталось отключить dry-mode и демонтировать отдельную версию.

Экспертиза


Сначала кратко об устройстве кластера.


В первую очередь, Redis key-value хранилище. В качестве ключа используются произвольные строки. В качестве значений могут использоваться числа, строки и целые структуры. Последних великое множество, но для понимания общего устройства нам это не важно.
Следующий после ключей уровень абстракции слоты (SLOTS). Каждый ключ принадлежит одному из 16 383 слотов. Внутри каждого слота может быть сколько-угодно ключей. Таким образом, все ключи распадаются на 16 383 непересекающихся множеств.


Далее, в кластере должно быть N мастер-нод. Каждую ноду можно представлять как отдельный инстанс Redis, который знает всё о других нодах внутри кластера. Каждая мастер-нода содержит некоторое количество слотов. Каждый слот принадлежит только одной мастер-ноде. Все слоты нужно распределить между нодами. Если какие-то слоты не распределены, то хранящиеся в них ключи будут недоступны. Каждую мастер-ноду имеет смысл запускать на отдельной логической или физической машине. Также стоит помнить, что каждая нода работает только на одном ядре, и если вы хотите запустить на одной логической машине несколько экземпляров Redis, то убедитесь, что они будут работать на разных ядрах (мы не пробовали так делать, но в теории все должно работать). По сути, мастер-ноды обеспечивают обычное шардирование, и большее количество мастер-нод позволяет масштабировать запросы на запись и чтение.


После того, как все ключи распределены по слотам, а слоты раскиданы по мастер-нодам, к каждой мастер-ноде можно добавить произвольное количество слейв-нод. Внутри каждой такой связки мастер-слейв будет работать обычная репликация. Cлейвы нужны для масштабирования запросов на чтение и для аварийного переключения в случае выхода из строя мастера.


Теперь поговорим об операциях, которые лучше бы уметь делать.


Обращаться к системе мы будем через Redis-CLI. Поскольку у Redis нет единой точки входа, выполнять следующие операции можно на любой из нод. В каждом пункте отдельно обращаю внимание на возможность выполнения операции под нагрузкой.


  • Первое и самое главное, что нам понадобится: операция cluster nodes. Она возвращает состояние кластера, показывает список нод, их роли, распределение слотов и т.п. Дополнительные сведения можно получить с помощью cluster info и cluster slots.
  • Хорошо бы уметь добавлять и удалять ноды. Для этого есть операции cluster meet и cluster forget. Обратите внимание, что cluster forget необходимо применить к КАЖДОЙ ноде, как к мастерам, так и к репликам. А cluster meet достаточно вызвать лишь на одной ноде. Такое различие может обескураживать, так что лучше узнать о нем до того, как запустили кластер в эксплуатацию. Добавление ноды безопасно выполняется в бою и никак не затрагивает работу кластера (что логично). Если же вы собираетесь удалить ноду из кластера, то следует убедиться, что на ней не осталось слотов (иначе вы рискуете потерять доступ ко всем ключам на этой ноде). Также не удаляйте мастер, у которого есть слейвы, иначе будет выполняться ненужное голосование за нового мастера. Если на нодах уже нет слотов, то это небольшая проблема, но зачем нам лишние выборе, если можно сначала удалить слейвы.
  • Если нужно насильно поменять местами мастер и слейв, то подойдет команда cluster failover. Вызывая её в бою, нужно понимать, что в течение выполнения операции мастер будет недоступен. Обычно переключение происходит менее, чем за секунду, но не атомарно. Можете рассчитывать, что часть запросов к мастеру в это время завершится с ошибкой.
  • Перед удалением ноды из кластера на ней не должно оставаться слотов. Перераспределять их лучше с помощью команды cluster reshard. Слоты будут перенесены с одного мастера, на другой. Вся операция может занимать несколько минут, это зависит от объема переносимых данных, однако процесс переноса безопасный и на работе кластера никак не сказывается. Таким образом, все данные можно перенести с одной ноды на другую прямо под нагрузкой, и не беспокоиться об их доступности. Однако есть и тонкости. Во-первых, перенос данных сопряжен с определенной нагрузкой на ноду получателя и отправителя. Если нода получателя уже сильно загружена по процессору, то не стоит нагружать её ещё и приемом новых данных. Во-вторых, как только на мастере-отправителе не останется ни одного слота, все его слейвы тут же перейдут к мастеру, на который эти слоты были перенесены. И проблема в том, что все эти слейвы разом захотят синхронизировать данные. И вам еще повезет, если это будет частичная, а не полная синхронизация. Учитывайте это, и сочетайте операции переноса слотов и отключения/переноса слейвов. Или же надейтесь, что у вас достаточный запас прочности.
  • Что делать, если при переносе вы обнаружили, что куда-то потеряли слоты? Надеюсь, эта проблема вас не коснется, но если что, есть операция cluster fix. Она худо-бедно раскидает слоты по нодам в случайном порядке. Рекомендую проверить её работу, предварительно удалив из кластера ноду с распределенными слотами. Поскольку данные в нераспределенных слотам и так недоступны, беспокоиться о проблемах с доступностью этих слотов уже поздно. В свою очередь на распределенные слоты операция не повлияет.
  • Еще одна полезная операция monitor. Она позволяет в реальном времени видеть весь список запросов, идущих на ноду. Более того, по ней можно сделать grep и узнать, есть ли нужный трафик.

Также стоит упомянуть о процедуре аварийного переключения мастера. Если коротко, то она есть, и, на мой взгляд, прекрасно работает. Однако не стоит думать, что если выдернуть шнур из розетки на машине с мастер-нодой, Redis тут же переключится и клиенты не заметят потери. На моей практике переключение происходит несколько секунд. В течение этого времени часть данных будет недоступна: обнаруживается недоступность мастера, ноды голосуют за нового, слейвы переключаются, данные синхронизируются. Лучший способ убедиться самостоятельно в том, что схема рабочая, это провести локальные учения. Поднимите кластер на своем ноутбуке, дайте минимальную нагрузку, сымитируйте падение (например, заблокировав порты), оцените скорость переключения. На мой взгляд, только поиграв таким образом день-два, можно быть уверенным в работе технологии. Ну, или понадеяться, что софт, которым пользуется половина интернета, наверняка работает.


Конфигурация


Зачастую, конфигурация первое, что нужно для начала работы с инструментом., А когда всё заработало, конфиг и трогать не хочется. Необходимы определенные усилия, чтобы заставить себя вернуться к настройкам и тщательно их прошерстить. На моей памяти у нас было как минимум два серьезных факапа из-за невнимательности к конфигурации. Обратите особое внимание на следующие пункты:


  • timeout 0
    Время, через которое закрываются неактивные соединения (в секундах). 0 не закрываются
    Не каждая наша библиотека умела корректно закрывать соединения. Отключив эту настройку, мы рискуем упереться в лимит по количеству клиентов. С другой стороны, если такая проблема есть, то автоматический разрыв потерянных соединений замаскирует её, и мы можем не заметить. Кроме того, не стоит включать эту настройку при использовании persist-соединений.
  • Save x y & appendonly yes
    Сохранение RDB-снепшота.
    Проблемы RDB/AOF мы подробно обсудим ниже.
  • stop-writes-on-bgsave-error no & slave-serve-stale-data yes
    Если включено, то при поломке RDB-снепшота мастер перестанет принимать запросы на изменение. Если соединение с мастером потеряно, то слейв может продолжать отвечать на запросы (yes). Или прекратит отвечать (no)
    Нас не устраивает ситуация, при которой Redis превращается в тыкву.
  • repl-ping-slave-period 5
    Через этот промежуток времени мы начнем беспокоиться о том, что мастер сломался и пора бы провести процедуру failovera.
    Придется вручную находить баланс между ложными срабатываниями и запуском failoverа. На нашей практике это 5 секунд.
  • repl-backlog-size 1024mb & epl-backlog-ttl 0
    Ровно столько данных мы можем хранить в буфере для отвалившейся реплики. Если буфер кончится, то придется полностью синхронизироваться.
    Практика подсказывает, что лучше поставить значение побольше. Причин, по которым реплика может начать отставать, предостаточно. Если она отстает, то, скорей всего, ваш мастер уже с трудом справляется, а полная синхронизация станет последней каплей.
  • maxclients 10000
    Максимальное количество единовременных клиентов.
    По нашему опыту, лучше поставить значение побольше. Redis прекрасно справляется с 10 тыс. соединений. Только убедитесь, что в системе достаточно сокетов.
  • maxmemory-policy volatile-ttl
    Правило, по которому удаляются ключи при достижения лимита доступной памяти.
    Тут важно не само правило, а понимание, как это будет происходить. Redis можно похвалить за умение штатно работать при достижении лимита памяти.

Проблемы RDB и AOF


Хотя сам Redis хранит всю информацию в оперативной памяти, также есть механизм сохранения данных на диск. А точнее, три механизма:


  • RDB-snapshot полный слепок всех данных. Устанавливается с помощью конфигурации SAVE X Y и читается как Сохранять полный снепшот всех данных каждые X секунд, если изменилось хотя бы Y ключей.
  • Append-only file список операций в порядке их выполнения. Добавляет новые пришедшие операции в файл каждые Х секунд или каждые Y операций.
  • RDB and AOF комбинация двух предыдущих.

У всех методов есть свои достоинства и недостатки, я не буду перечислять их все, лишь обращу внимания на не очевидные, на мой взгляд, моменты.


Во-первых, для сохранения RDB-снепшота требуется вызывать FORK. Если данных много, это может повесить весь Redis на период от нескольких миллисекунд до секунды. Кроме того, системе требуется выделить память под такой снепшот, что приводит к необходимости держать на логической машине двойной запас по оперативной памяти: если для Redis выделено 8 Гб, то на виртуалке с ним должно быть доступно 16.


Во-вторых, есть проблемы с частичной синхронизацией. В режиме AOF при переподключении слейва вместо частичной синхронизации может выполняться полная. Почему так происходит, я не смог понять. Но помнить об этом стоит.


Эти два пункта уже заставляют задуматься о том, а так ли нам нужны эти данные на диске, если и так всё дублируется слейвами. Потерять данные можно только при выходе из строя всех слейвов, а это проблема уровня пожар в ДЦ. В качестве компромисса можно предложить сохранять данные только на слейвах, но в этом случае нужно убедиться, что эти слейвы никогда не станут мастером при аварийном восстановлении (для этого есть настройка приоритета слейвов в их конфиге). Для себя мы в каждом конкретном случае думаем над тем, надо ли сохранять данные на диск, и чаще всего отвечаем нет.


Заключение


В заключении буду надеяться, что смог дать общее представление о работе redis-cluster-а тем, кто совсем о нем не слышал, а также обратил внимание на какие-то неочевидные моменты для тех, кто уже давно им пользуется.
Спасибо за уделенное время и, как обычно, комментарии по теме приветствуются.

Подробнее..

Из песочницы Serverless и полтора программиста

05.08.2020 18:06:10 | Автор: admin

В повседневной продуктовой разработке, запертой в корпоративных технологических ограничениях, редко выпадает случай шагнуть за грань добра и зла в самое пекло хипстерских технологий. Но, когда ты сам несешь все риски и каждый день разработки вынимает деньги из твоего собственного кармана очень хочется срезать путь. В один из таких моментов я решил шагнуть в такой темный серверлес, о котором раньше думать было как-то стыдно. Под впечатлением от того, что получилось, я даже хотел написать статью Конец гегемонии программистов, но спустя полгода эксплуатации и развития проекта понял, что ну не совсем еще конец и остались еще места в этом самом serverless-бэкенде, где надо использовать знания и опыт.


Архитектура


Первое что сделал, вычеркнул из списка ограничений страх вендорлока. Надо еще дожить до масштаба, чтобы это стало проблемой.


Второе решение было не хочу ничего менеджить собственными силами, цена devopsов сейчас слишком высока, одной зарплатой можно оплатить год managed-решений в облаках.


Третье, пожалуй самое безрассудное и на грани слабоумия и отваги, рискнуть пойти на новорожденное решение от MongoDB, которое в тот момент называлось Stitch, а сейчас называется Realm (но это не совсем тот самый Realm, а ядреная смесь из Stitch и Realm, которая получилась после приобретения последнего MongoDB, Inc в конце 2019 года)


Backend


В результате серверная сторона вышла вот такой:



Node и Redis в ней появились только для реализации Server Side Rendering и кэширования (ну и для того, чтобы не кормить Atlas лишними платными запросами), для тех кто не решает задачи CEO-оптимизации их можно легко выкинуть.


В результате бэк масштабируется в пару кликов до практически любого размера. И самое главное стоит копейки, за счет того, что платный computed-runtime по большей части потребляется один раз на измененные данные и сохраняется в кеше.


Frontend


Клиентская часть классическая: React + Redux + Redux-Saga + TypeScript



Здесь не стал экспериментировать, потому что клиент планировался сложный и довольно толстый, чтобы его можно было развивать и поддерживать нужно было что-то более-менее вменяемое. Ну и еще один ключевой момент, эти технологии знают почти все, поэтому легче найти разработчиков.


Аутентификация и авторизация


Ну а теперь самое интересно, для чего вообще нужно было все это мракобесие с Mongo.Realm. Вместе с лямбдами мы получаем полноценную интеграцию из коробки с ворохом способов аутентификации (Google, Apple, Facebook, Email/Password и прочими) и механизмом авторизации операций до уровня полей в коллекциях:



Кроме этого решение о доступе можно принимать по результатам выполнения функции и таким образом вполне реализуем полноценный механизм цепочки голосующих (пользователь, роль, группа, замещения, суперпользователь и т.д.).


Прочие радости


Также облачная монга нам из коробки дает sync наборов данных с клиентом, push-уведомления, хранилище секретов, триггеры (на события базы, аутентификация, работу по расписанию), вебхуки, хорошие логи и еще много чего. То есть снаружи оно обслуживается и воспринимается как полноценный сервис, при этом не приходится решать вопросы масштабирования и переноса между датацентрами, бекапов, мониторинга и кучи других задач.


Ну, а для самых безбашенных, есть возможность работы через GraphQL.


Как все это работает



В случае если попали в кэш, при 100 RPS на один экземпляр (в конфигурации по одному ядру и одному гигабайту на один экземпляр Node.js под управлением PM2), время ответа укладывается в 200 мс, в противном случае вместе со всеми запросами к Mongo серверный рендер отрабатывает до 500 мс.


В работе с Mongo.Realm есть нюансы, которые никак не отражены в документации, но проявляются во всех недорогих инстансах с разделяемой памятью (M1, M2, M5): если запросы выполняются от имени клиента, то, видимо в качестве защиты от перегрузок, периодически время ответа на какой-нибудь aggregation-pipeline может резко вырасти до 5-10 секунд на запрос. При этом, если вызывается серверная функция (с тем же самым aggregation-pipeline), которая выполняется от имени системного пользователя, то таких трюков не наблюдается.
Возможно дело именно в типе кластера, и со временем это исправят или все решится переходом на М10 и выше, но сейчас для некоторых сложных запросов пришлось пойти на рискованный шаг и для чтения данных анонимными пользователями сделать несколько функций исполняемых от имени системного пользователя, в этом случае правила авторизации для доступа к данным игнорируются, и за безопасностью надо следить уже самим в коде.



В случае аутентифицированного доступа Server Side Rendering не нужен, все работает прямо на клиенте.


Выводы


В заголовок вынесен ресурс потраченный на разработку, а именно полтора программиста (1 фронтендер и бэкендера). Ровно столько и 5 месяцев работы, понадобилось для вывода в прод довольно развесистого портала с собственной системой управления контентом, интеграцией с несколькими поставщиками данных включая нечеткие сопоставления, оптимизированного под самые суровые требования по SEO и c поддержкой мобильный браузеров как first class citizen.


Весь мой предыдущий, более чем 16-летний опыт управления разработкой говорил, что ресурсоемкость этого проекта будет раза в 4 выше.


Дальнейшая эксплуатация и развитие безусловно покажут насколько опрометчивы были эти решения, но на текущий момент сэкономлено примерно 1,5 миллиона и все работает как надо.

Подробнее..

Реактивный масштабируемый чат на Kotlin Spring WebSockets

13.04.2021 18:11:50 | Автор: admin

Содержание

  1. Конфигурация проекта

    1. Логгер

    2. Домен

    3. Маппер

  2. Настройка Spring Security

  3. Конфигурация веб-сокетов

  4. Архитектура решения

  5. Реализация

    1. Интеграция с Redis

    2. Импелементация сервиса

  6. Заключение

Предисловие

В данном туториале будет рассмотрено создание масштабируемого приложения, подключение и общение с котором происходит по веб-сокетам. Рассмотрим и мужественно преодолеем проблему передачи сообщений между инстансами с помощью месседж брокера. В качестве месседж брокера будет использован Redis.

Конфигурация проекта

Начнём с самого важного, конфигурации логгера!

Конфигурация логгера состоит из того, что нам нужно создать prototype bean, который будет конфигурировать логгер для класса, в которой этот бин инжектится.

@Configurationclass LoggingConfig {    @Bean    @Scope("prototype")    fun logger(injectionPoint: InjectionPoint): Logger {        return LoggerFactory.getLogger(                injectionPoint.methodParameter?.containingClass                        ?: injectionPoint.field?.declaringClass        )    }}

Теперь, чтобы получить сконфигурированный логгер, нам достаточно внедрить его.

@Componentclass ChatWebSocketHandlerService(    private val logger: Logger) 

Далее создадим доменку и сконфигурируем маппер для неё

Класс чата содержит базовую информацию, включая участников чата.

data class Chat(    val chatId: UUID,    val chatMembers: List<ChatMember>,    @JsonSerialize(using = LocalDateTimeSerializer::class)    @JsonDeserialize(using = LocalDateTimeDeserializer::class)    val createdDate: LocalDateTime,    var lastMessage: CommonMessage?)

Класс ChatMember описывает участника чата. Из интересного тут - это флаг deletedChat. Его назначение - убрать чат из выборки списка чатов для пользователя с userId.

data class ChatMember(        val userId: UUID,        var fullName: String,        var avatar: String,        var deletedChat: Boolean)

Ниже представлен базовый класс для всех сообщений в чате. Аннотация @JsonTypeInfo тут нужна для того, чтобы классам-наследникам при заворачивании в JSON проставлялось поле @type с указанием типа сообщения, а при разворачивании были проставлены поля базового класса.

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY)open class CommonMessage(    val messageId: UUID,    val chatId: UUID,    val sender: ChatMember,    @field:JsonSerialize(using = LocalDateTimeSerializer::class) @field:JsonDeserialize(using = LocalDateTimeDeserializer::class)    val messageDate: LocalDateTime,    var seen: Boolean)

Пример конкретного класса сообщения TextMessage - текстового сообщения

class TextMessage(    messageId: UUID,    chatId: UUID,    sender: ChatMember,    var content: String,    messageDate: LocalDateTime,    seen: Boolean) : CommonMessage(messageId, chatId, sender, messageDate, messageType, seen)

Сконфигурируем ObjectMapper

В registerSubtypes добавляются классы-наследники, которые будут сериализоваться и десериализоваться в JSON. Это необходимо для того, чтобы после десериализации определить тип конкретный тип и передать на обработку в нужный метод

@Configurationclass ObjectMapperConfig {    @Bean    fun objectMapper(): ObjectMapper = ObjectMapper()        .registerModule(JavaTimeModule())        .registerModule(Jdk8Module())        .registerModule(ParameterNamesModule())        .registerModule(KotlinModule())        .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)        .apply {            registerSubtypes(                NamedType(NewMessageEvent::class.java, "NewMessageEvent"),                NamedType(MarkMessageAsRead::class.java, "MarkMessageAsRead"),                NamedType(TextMessage::class.java, "TextMessage"),                NamedType(ImageMessage::class.java, "ImageMessage")            )        }}

Конфигурация Spring Security

Для начала нам понадобится ReactiveAuthenticationManager и SecurityContextRepository. Для аутентификации будем использовать JWT, поэтому создаем класс JwtAuthenticationManager со следующим содержанием:

@Componentclass JwtAuthenticationManager(val jwtUtil: JwtUtil) : ReactiveAuthenticationManager {    override fun authenticate(authentication: Authentication): Mono<Authentication> {        val token = authentication.credentials.toString()        val validateToken = jwtUtil.validateToken(token)        var username: String?        try {            username = jwtUtil.extractUsername(token)        } catch (e: Exception) {            username = null            println(e)        }        return if (username != null && validateToken) {            val claims = jwtUtil.getClaimsFromToken(token)            val role: List<String> = claims["roles"] as List<String>            val authorities = role.stream()                    .map { role: String? -> SimpleGrantedAuthority(role) }                    .collect(Collectors.toList())            val authenticationToken = UsernamePasswordAuthenticationToken(                    username,                    null,                    authorities            )            authenticationToken.details = claims            Mono.just(authenticationToken)        } else {            Mono.empty()        }    }}

Чтобы везде, где необходимо, иметь возможность извлечь информацию из seucirty context, заносим claims в details токена (строка 25).

Для извлечения токена из запроса создаем класс SecurityContextRepository. Извлекать токен будем двумя способами:

  1. Заголовок Authorization: Bearer ${JWT_TOKEN}

  2. GET параметр ?access_token=${JWT_TOKEN}

@Componentclass SecurityContextRepository(val authenticationManager: ReactiveAuthenticationManager) : ServerSecurityContextRepository {    override fun save(exchange: ServerWebExchange, context: SecurityContext): Mono<Void> {        return Mono.error { IllegalStateException("Save method not supported") }    }    override fun load(exchange: ServerWebExchange): Mono<SecurityContext> {        val authHeader = exchange.request            .headers            .getFirst(HttpHeaders.AUTHORIZATION)        val accessToken: String = if (authHeader != null && authHeader.startsWith("Bearer ")) {            authHeader.substring(7)        } else exchange.request            .queryParams            .getFirst("access_token") ?: return Mono.empty()        val auth = UsernamePasswordAuthenticationToken(accessToken, accessToken)        return authenticationManager            .authenticate(auth)            .map { authentication: Authentication -> SecurityContextImpl(authentication) }    }}

Теперь имея два необходимых класса мы можем сконфигурировать сам Spring Security.

@EnableWebFluxSecurity@EnableReactiveMethodSecurityclass SecurityConfig(    val reactiveAuthenticationManager: ReactiveAuthenticationManager,    val securityContextRepository: SecurityContextRepository) {    @Bean    fun securityWebFilterChain(httpSecurity: ServerHttpSecurity): SecurityWebFilterChain {        return httpSecurity            .exceptionHandling()            .authenticationEntryPoint { swe: ServerWebExchange, e: AuthenticationException ->                Mono.fromRunnable { swe.response.statusCode = HttpStatus.UNAUTHORIZED }            }            .accessDeniedHandler { swe: ServerWebExchange, e: AccessDeniedException ->                Mono.fromRunnable { swe.response.statusCode = HttpStatus.FORBIDDEN }            }            .and()            .csrf().disable()            .cors().disable()            .formLogin().disable()            .httpBasic().disable()            .authenticationManager(reactiveAuthenticationManager)            .securityContextRepository(securityContextRepository)            .authorizeExchange()            .pathMatchers("/actuator/**").permitAll()            .pathMatchers(HttpMethod.GET, "/ws/**").hasAuthority("ROLE_USER")            .anyExchange().authenticated()            .and()            .build()    }}

Здесь из интересного: конфигурация позволяет подключиться по путям начинающимся с /ws только аутентифицированным пользователям, у которых есть роль ROLE_USER.

С конфигурацией Security закончили, теперь необходимо сконфигурировать подключение по сокетам.

Конфигурация веб-сокетов

В первую очередь нам необходимо задать маппинг между запросом и обработчиком. Чтобы добавить обработчик сокетов по определенному адресу, мы делаем следующее:

  1. Создаем мапу, где ключ - uri, а значение - обработчик. В этом конкретном случае WebSocketHandler.

  2. Создаем обработчик для ранее определенного маппинга и cors.

@Configurationclass ReactiveWebSocketConfig {    @Bean    fun webSocketHandlerMapping(chatWebSocketHandler: ChatWebSocketHandler): HandlerMapping {        val map: MutableMap<String, WebSocketHandler> = HashMap()        map["/ws/chat"] = chatWebSocketHandler        val handlerMapping = SimpleUrlHandlerMapping()        handlerMapping.setCorsConfigurations(Collections.singletonMap("*", CorsConfiguration().applyPermitDefaultValues()))        handlerMapping.order = 1        handlerMapping.urlMap = map        return handlerMapping    }    @Bean    fun handlerAdapter(): WebSocketHandlerAdapter {        return WebSocketHandlerAdapter()    }}

Здесь в качестве обработчика для uri /ws/chat указываем chatWebSocketHandler, его вид представлен ниже, имплементацией займемся позднее. Этот класс реализует интерфейс WebSocketHandler, содержащий один метод handle(session: WebSocketSession): Mono<Void>

@Componentclass ChatWebSocketHandler : WebSocketHandler {    override fun handle(session: WebSocketSession): Mono<Void> {        TODO("Not yet implemented")    }}

С базовой конфигурацией закончили.

Поговорим об архитектуре решения

На данным момент наш чат нельзя масштабировать. Так как подключение по веб-сокету дуплексное и, установив соединение с одним инстансом, все запросы по этому сокету будут направляться на этот инстанс, так при запуске двух и более инстансов у нас не будет доступа до всех имеющихся сессий. Становится возможной ситуация, когда собеседники из одного чата подключены к разным инстансам и нет возможности сразу отправить сообщение по сокетам всем участникам. Для решения необходим Message Broker, который будет получать на вход сообщение для определенного чата и рассылать его на все инстансы. Инстансы посмотрят на сообщение, возьмут информацию об участниках чата из БД, и будут искать этих участников среди своих подключенных пользователей.

Представим, что участники одного чата User 1 и User 2 подключены к разным инстансам чата. User 1 подключен к Chat-Instance-0, а User 2 к Chat-Instance-1. Тогда, когда User 1 отправит сообщение в Chat-Instance-0 (зеленая пунктирная линия), это сообщение попадёт в чат и будет отправлено в Message broker, оттуда разослано по всем инстансам. Chat-Instance-1 получит это сообщение и увидит, что у него есть User 2, который относится к этому чату и ему необходимо отправить это сообщение.

Реализация

Теперь займемся имплементацией нашего обработчика ChatWebSocketHandler

Нам понадобится мапа userId => session, для того, чтобы хранить открытые сессии и иметь возможность достать их по userId. Для поддержки одновременной работы с несколькими сессиями из под одного userId интерфейс мапы будет следующим: MutableMap<UUID, LinkedList<WebSocketSession>>.

Добавлять в мапу запись мы будем при подписке на стрим session.receive, а подчищать будем в doFinally.

В методе getReceiverStream создается стрим-обработчик сообщений, пришедших от клиента. Мы получаем payload как строку и преобразуем его к базовому WebSocketEvent, после чего в зависимости от типа event'a передаем его на обработку в слой сервисов.

В методе getSenderStream происходит конфигурация стрима, который занимается отправкой сообщений по сокету клиенту

@Componentclass ChatWebSocketHandler(    val objectMapper: ObjectMapper,    val logger: Logger,    val chatService: ChatService,    val objectStringConverter: ObjectStringConverter,    val sinkWrapper: SinkWrapper) : WebSocketHandler {    private val userIdToSession: MutableMap<UUID, LinkedList<WebSocketSession>> = ConcurrentHashMap()    override fun handle(session: WebSocketSession): Mono<Void> {        return ReactiveSecurityContextHolder.getContext()            .flatMap { ctx ->                val userId = UUID.fromString((ctx.authentication.details as Claims)["id"].toString())                val sender = getSenderStream(session, userId)                val receiver = getReceiverStream(session, userId)                return@flatMap Mono.zip(sender, receiver).then()            }    }    private fun getReceiverStream(session: WebSocketSession, userId: UUID): Mono<Void> {        return session.receive()            .filter { it.type == WebSocketMessage.Type.TEXT }            .map(WebSocketMessage::getPayloadAsText)            .flatMap {                objectStringConverter.stringToObject(it, WebSocketEvent::class.java)            }            .flatMap { convertedEvent ->                when (convertedEvent) {                    is NewMessageEvent -> chatService.handleNewMessageEvent(userId, convertedEvent)                    is MarkMessageAsRead -> chatService.markPreviousMessagesAsRead(convertedEvent.messageId)                    else -> Mono.error(RuntimeException())                }            }            .onErrorContinue { t, _ -> logger.error("Error occurred with receiver stream", t) }            .doOnSubscribe {                val userSession = userIdToSession[userId]                if (userSession == null) {                    val newUserSessions = LinkedList<WebSocketSession>()                    userIdToSession[userId] = newUserSessions                }                userIdToSession[userId]?.add(session)            }            .doFinally {                val userSessions = userIdToSession[userId]                userSessions?.remove(session)            }            .then()    }    private fun getSenderStream(session: WebSocketSession, userId: UUID): Mono<Void> {        val sendMessage = sinkWrapper.sinks.asFlux()            .filter { sendTo -> sendTo.userId == userId }            .map { sendTo -> objectMapper.writeValueAsString(sendTo.event) }            .map { stringObject -> session.textMessage(stringObject) }            .doOnError { logger.error("", it) }        return session.send(sendMessage)    }}

Для того чтобы писать в websocket нам необходимо создать поток данных, в который мы сможем добавлять данные. С reactora 3.4 для этого рекомендуется использовать Sinks.Many. Создадим такой поток в классе SinkWrapper.

@Componentclass SinkWrapper {    val sinks: Sinks.Many<SendTo> = Sinks.many().multicast().onBackpressureBuffer()}

Теперь, отправив данные в этот поток, они будут обработаны в потоке, сформированном в getSenderStream.

Интеграция с Redis

У Redis есть PUB/SUB модель общения, которая прекрасно решает задачу транслирования сообщений между инстансами.

Итак, для приготовления данного блюда нам понадобится:

  1. RedisChatMessageListener - подписка на топики и перенаправление сообщение в слой сервисов

  2. RedisChatMessagePublisher - публикация сообщений в топики

  3. RedisConfig - конфигурация редиса

  4. RedisListenerStarter - старт листенеров при старте инстанса

Реализация:

RedisConfig стандартный, ничего особенного

@Configurationclass RedisConfig {    @Bean    fun reactiveRedisConnectionFactory(redisProperties: RedisProperties): ReactiveRedisConnectionFactory {        val redisStandaloneConfiguration = RedisStandaloneConfiguration(redisProperties.host, redisProperties.port)        redisStandaloneConfiguration.setPassword(redisProperties.password)        return LettuceConnectionFactory(redisStandaloneConfiguration)    }    @Bean    fun template(reactiveRedisConnectionFactory: ReactiveRedisConnectionFactory): ReactiveStringRedisTemplate {        return ReactiveStringRedisTemplate(reactiveRedisConnectionFactory)    }}

RedisChatMessageListener

Здесь мы создаем подписку на топик по имени базового класса (обычно название топиков выносят в проперти). Получив сообщение из канала преобразуем его в объект (строка 13) и дальше передаем в sendMessage, который достанет участников чата и попробует разослать им это сообщение, если таковы имеются среди подключенных к инстансу.

@Componentclass RedisChatMessageListener(    private val logger: Logger,    private val reactiveStringRedisTemplate: ReactiveStringRedisTemplate,    private val objectStringConverter: ObjectStringConverter,    private val chatService: ChatService) {    fun subscribeOnCommonMessageTopic(): Mono<Void> {        return reactiveStringRedisTemplate.listenTo(PatternTopic(CommonMessage::class.java.name))            .map { message -> message.message }            .doOnNext { logger.info("Receive new message: $it") }            .flatMap { objectStringConverter.stringToObject(it, CommonMessage::class.java) }            .flatMap { message ->                when (message) {                    is TextMessage -> chatService.sendMessage(message)                    is ImageMessage -> chatService.sendMessage(message)                    else -> Mono.error(RuntimeException())                }            }            .then()    }}

RedisChatMessagePublisher

Паблишер имеет один метод для транслирования CommonMessage на все инстансы. Объект сообщения приводится к строке и публикуется в топик по имени базового класса.

@Componentclass RedisChatMessagePublisher(    val logger: Logger,    val reactiveStringRedisTemplate: ReactiveStringRedisTemplate,    val objectStringConverter: ObjectStringConverter) {    fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> {        return objectStringConverter.objectToString(commonMessage)            .flatMap {                logger.info("Broadcast message $it to channel ${CommonMessage::class.java.name}")                reactiveStringRedisTemplate.convertAndSend(CommonMessage::class.java.name, it)            }            .then()    }}

RedisListenerStarter

В этом классе стартуются все листенеры из RedisChatMessageListener. В нашем случае - единственный листенер subscribeOnCommonMessageTopic

@Componentclass RedisListenerStarter(    val logger: Logger,    val redisChatMessageListener: RedisChatMessageListener) {    @Bean    fun newMessageEventChannelListenerStarter(): ApplicationRunner {        return ApplicationRunner { args: ApplicationArguments ->            redisChatMessageListener.subscribeOnCommonMessageTopic()                .doOnSubscribe { logger.info("Start NewMessageEvent channel listener") }                .onErrorContinue { throwable, _ -> logger.error("Error occurred while listening NewMessageEvent channel", throwable) }                .subscribe()        }    }}

Импелементация сервиса

Упрощенная реализация, без сохранения сообщений в БД с замоканным chatRepository. В виду того, что статья выходит и так больше, чем я рассчитывал.

Метод handleNewMessageEvent вызывается из WebSocketHandler и получает на вход userId отправителя и NewMessageEvent - простое текстовое сообщение. В методе происходит проверка на то, что отправитель действительно является участником чата и дальше это сообщение транслируется между инстансами.

@Serviceclass DefaultChatService(    val logger: Logger,    val sinkWrapper: SinkWrapper,    val chatRepository: ChatRepository,    val redisChatPublisher: RedisChatMessagePublisher) : ChatService {    override fun handleNewMessageEvent(senderId: UUID, newMessageEvent: NewMessageEvent): Mono<Void> {        logger.info("Receive NewMessageEvent from $senderId: $newMessageEvent")        return chatRepository.findById(newMessageEvent.chatId)            .filter { it.chatMembers.map(ChatMember::userId).contains(senderId) }            .flatMap { chat ->                val textMessage = TextMessage(UUID.randomUUID(), chat.chatId, chat.chatMembers.first { it.userId == senderId }, newMessageEvent.content, LocalDateTime.now(), false)                chat.lastMessage = textMessage                return@flatMap Mono.zip(chatRepository.save(chat), Mono.just(textMessage))            }            .flatMap { broadcastMessage(it.t2) }    }    /**     * Broadcast the message between instances     */    override fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> {        return redisChatPublisher.broadcastMessage(commonMessage)    }    /**     * Send the message to all of chatMembers of message chat direct     */    override fun sendMessage(message: CommonMessage): Mono<Void> {        return chatRepository.findById(message.chatId)            .map { it.chatMembers }            .flatMapMany { Flux.fromIterable(it) }            .flatMap { member -> sendEventToUserId(member.userId, ChatMessageEvent(message.chatId, message)) }            .then()    }    override fun sendEventToUserId(userId: UUID, webSocketEvent: WebSocketEvent): Mono<Void> {        return Mono.fromCallable { sinkWrapper.sinks.emitNext(SendTo(userId, webSocketEvent), Sinks.EmitFailureHandler.FAIL_FAST) }            .then()    }}

Заключение

В качестве дальнейших доработок можно произвести разделение получаемых и отправляемых ивентов на отдельные классы. Также в месте, где происходит получение сообщения по сокетам от клиента, его приведение к WebSocketEvent и передача в обработчик, можно попробовать избавиться от хардкодного маппинка event => handler. Пока не думал, как это можно сделать красивее, но уверен, что решение есть.

Проект на GitHub

Подробнее..
Категории: Kotlin , Redis , Java , Chat , Spring , Microservice

Redis на практических примерах

18.06.2020 10:17:09 | Автор: admin
Redis достаточно популярный инструмент, который из коробки поддерживает большое количество различных типов данных и методов работы с ними. Во многих проектах он используется в качестве кэшируещего слоя, но его возможности намного шире. Мы в ManyChat очень любим Redis и активно используем его в нашем продукте для решения огромного количества задач. Про некоторые интересные кейсы использования этой in-memory key-value базы данных я расскажу на примерах. Надеюсь, вам они будут полезны, и вы сможете применить что-то в своих проектах.

Рассмотрим следующие кейсы:
  • Кэширование данных (да, банально и скучно, но это классный инструмент для кэширования и обойти стороной этот кейс, кажется будет не правильно)
  • Работа с очередями на базе redis
  • Организация блокировок (mutex)
  • Делаем систему rate-limit
  • Pubsub делаем рассылки сообщений на клиенты

Буду работать с сырыми redis командами, чтобы не завязываться на какую-либо конкретную библиотеку, предоставляющую обертку над этими командами. Код буду писать на PHP с использованием ext-redis, но он здесь для наглядности, использовать представленные подходы можно в связке с любым другим языком программирования.



Кэширование данных


Давайте начнем с самого простого, один из самых популярных кейсов использования Redis кэширование данных. Будет полезно для тех, кто не работал с Redis. Для тех, кто уже давно пользуется этим инструментом можно смело переходить к следующему кейсу. Для того, чтобы снизить нагрузку на БД, иметь возможность запрашивать часто используемые данные максимально быстро, используется кэш. Redis это in-memory хранилище, то есть данные хранятся в оперативной памяти. Ещё это key-value хранилище, где доступ к данным по их ключу имеет сложность O(1) поэтому данные мы получаем очень быстро.

Получение данных из хранилища выглядит следующим образом:

public function getValueFromCache(string $key){    return $this->getRedis()->rawCommand('GET', $key);}

Но для того, чтобы данные из кэша получить, их нужно сначала туда положить. Простой пример записи:

public function setValueToCache(string $key, $value){    $this->getRedis()->rawCommand('SET', $key, $value);} 

Таким образом, мы запишем данные в Redis и сможем их считать по тому же самому ключу в любой нужный нам момент. Но если мы будем все время писать в Redis, данные в нем будут занимать все больше и больше места в оперативной памяти. Нам нужно удалять нерелевантные данные, контролировать это вручную достаточно проблематично, поэтому пускай redis занимается этим самостоятельно. Добавим к нашему ключу TTL (время жизни ключа):

public function setValueToCache(string $key, $value, int $ttl = 3600){    $this->getRedis()->rawCommand('SET', $key, $value, 'EX', $ttl);}

По истечении времени ttl (в секундах) данные по этому ключу будут автоматически удалены.

Как говорят, в программировании существует две самых сложных вещи: придумывание названий переменных и инвалидация кэша. Для того, чтобы принудительно удалить значение из Redis по ключу, достаточно выполнить следующую команду:

public function dropValueFromCache(string $key){    $this->getRedis()->rawCommand('DEL', $key);}


Также редис позволяет получить массив значений по списку ключей:

public function getValuesFromCache(array $keys){    return $this->getRedis()->rawCommand('MGET', ...$keys);}

И соответственно массовое удаление данных по массиву ключей:

public function dropValuesFromCache(array $keys){    $this->getRedis()->rawCommand('MDEL', ...$keys);}


Очереди


Используя имеющиеся в Redis структуры данных, мы можем запросто реализовать стандартные очереди FIFO или LIFO. Для этого используем структуру List и методы по работе с ней. Работа с очередями состоит из двух основных действий: отправить задачу в очередь, и взять задачу из очереди. Отправлять задачи в очередь мы можем из любой части системы. Получением задачи из очереди и ее обработкой обычно занимается выделенный процесс, который называется консьюмером (consumer).

Итак, для того, чтобы отправить нашу задачу в очередь, нам достаточно использовать следующий метод:

public function pushToQueue(string $queueName, $payload){    $this->getRedis()->rawCommand('RPUSH', $queueName, serialize($payload));}

Тем самым мы добавим в конец листа с названием $queueName некий $payload, который может представлять из себя JSON для инициализации нужной нам бизнес логики (например данные по денежной транзакции, данные для инициализации отправки письма пользователю, etc.). Если же в нашем хранилище не существует листа с именем $queueName, он будет автоматически создан, и туда попадет первый элемент $payload.

Со стороны консьюмера нам необходимо обеспечить получение задач из очереди, это реализуется простой командой чтения из листа. Для реализации FIFO очереди мы используем чтение с обратной записи стороны (в нашем случае мы писали через RPUSH), то есть читать будем через LPOP:

public function popFromQueue(string $queueName){    return $this->getRedis()->rawCommand('LPOP', $queueName);}

Для реализации LIFO очереди, нам нужно будет читать лист с той же стороны, с которой мы в него пишем, то есть через RPOP.



Тем самым мы вычитываем по одному сообщению из очереди. В случае если листа не существует (он пустой), то мы получим NULL. Каркас консьюмера мог бы выглядеть так:

class Consumer {    private string $queueName;    public function __construct(string $queueName)    {        $this->queueName = $queueName;    }    public function run()    {        while (true) { //Вычитываем в бесконечном цикле нашу очередь            $payload = $this->popFromQueue();            if ($payload === null) { //Если мы получили NULL, значит очередь пустая, сделаем небольшую паузу в ожидании новых сообщений                sleep(1);                continue;            }            //Если очередь не пустая и мы получили $payload, то запускаем обработку этого $payload            $this->process($payload);        }    }    private function popFromQueue()    {        return $this->getRedis()->rawCommand('LPOP', $this->queueName);    }}

Для того, чтобы получить информацию о глубине очереди (сколько значений хранится в нашем листе), можем воспользоваться следующей командой:

public function getQueueLength(string $queueName){    return $this->getRedis()->rawCommand('LLEN', $queueName);}

Мы рассмотрели базовую реализацию простых очередей, но Redis позволяет строить более сложные очереди. Например, мы хотим знать о времени последней активности наших пользователей на сайте. Нам не важно знать это с точностью вплоть до секунды, приемлемая погрешность 3 минуты. Мы можем обновлять поле last_visit пользователя при каждом запросе на наш бэкенд от этого пользователя. Но если этих пользователей большое количество в онлайне 10,000 или 100,000? А если у нас еще и SPA, которое отправляет много асинхронных запросов? Если на каждый такой запрос обновлять поле в бд, мы получим большое количество тупых запросов к нашей БД. Эту задачу можно решать разными способами, один из вариантов это сделать некую отложенную очередь, в рамках которой мы будем схлопывать одинаковые задачи в одну в определенном промежутке времени. Здесь на помощь нам придет такая структура, как Sorted SET. Это взвешенное множество, каждый элемент которого имеет свой вес (score). А что если в качестве score мы будем использовать timestamp добавления элемента в этот sorted set? Тогда мы сможем организовать очередь, в которой можно будет откладывать некоторые события на определенное время. Для этого используем следующую функцию:

public function pushToDelayedQueue(string $queueName, $payload, int $delay = 180){    $this->getRedis()->rawCommand('ZADD', $queueName, 'NX', time() + $delay, serialize($payload))}

В такой схеме идентификатор пользователя, зашедшего на сайт, попадет в очередь $queueName и будет висеть там в течение 180 секунд. Все другие запросы в рамках этого времени будут также отправляться в эту очередь, но они не будут туда добавлены, так как идентификатор этого пользователя уже существует в этой очереди и продублирован он не будет (за это отвечает параметр 'NX'). Так мы отсекаем всю лишнюю нагрузку и каждый пользователь будет генерить не более одного запроса в 3 минуты на обновление поля last_visit.

Теперь возникает вопрос о том, как читать эту очередь. Если методы LPOP и RPOP для листа читают значение и удаляют его из листа атомарно (это значит, что одно и тоже значение не может быть взято несколькими консьюмерами), то sorted set такого метода из коробки не имеет. Мы можем сделать чтение и удаление элемента только двумя последовательными командами. Но мы можем выполнить эти команды атомарно, используя простой LUA скрипт!

public function popFromDelayedQueue(string $queueName){    $command = 'eval "        local val = redis.call(\'ZRANGEBYSCORE\', KEYS[1], 0, ARGV[1], \'LIMIT\', 0, 1)[1]        if val then            redis.call(\'ZREM\', KEYS[1], val)        end        return val"';    return $this->getRedis()->rawCommand($command, 1, $queueName, time());}

В этом LUA скрипте мы пытаемся получить первое значение с весом в диапазоне от 0 до текущего timestamp в переменную val с помощью команды ZRANGEBYSCORE, если нам удалось получить это значение, то удаляем его из sorted set командой ZREM и возвращаем само значение val. Все эти операции выполняются атомарно. Таким образом мы можем вычитывать нашу очередь в консьюмере, аналогично с примером очереди построенной на структуре LIST.

Я рассказал про несколько базовых паттернов очередей, реализованных в нашей системе. На текущий момент у нас в продакшене существуют более сложные механизмы построения очередей линейных, составных, шардированных. При этом Redis позволяет все это делать при помощи смекалки и готовых круто работающих структур из коробки, без сложного программирования.

Блокировки (Mutex)


Mutex (блокировка) это механизм синхронизации доступа к shared ресурсу нескольких процессов, тем самым гарантируя, что только один процесс будет взаимодействовать с этим ресурсом в единицу времени. Этот механизм часто применяется в биллинге и других системах, где важно соблюдать потоковую безопасность (thread safety).

Для реализации mutex на базе Redis прекрасно подойдет стандартный метод SET с дополнительными параметрами:

public function lock(string $key, string $hash, int $ttl = 10): bool{    return (bool)$this->getRedis()->rawCommand('SET', $key, $hash, 'NX', 'EX', $ttl);}

где параметрами для установки mutex являются:
  • $key ключ идентифицирующий mutex;
  • $hash генерируем некую подпись, которая идентифицирует того, кто поставил mutex. Мы же не хотим, чтобы кто-то в другом месте случайно снял блокировку и вся наша логика рассыпалась.
  • $ttl время в секундах, которое мы отводим на блокировку (на тот случай, если что-то пойдет не так, например процесс, поставивший блокировку, по какой-то причине умер и не снял ее, чтобы это блокировка не висела бесконечно).


Основное отличие от метода SET, используемого в механизме кэширования это параметр NX, который говорит Redis о том, что значение, которое уже хранится в Redis по ключу $key, не будет записано повторно. В результате, если в Redis нет значения по ключу $key, туда произведется запись и в ответе мы получим 'OK', если значение по ключу уже есть в Redis, оно не будет туда добавлено (обновлено) и в ответе мы получим NULL. Результат метода lock(): bool, где true блокировка поставлена, false уже есть активная блокировка, создать новую невозможно.

Чаще всего, когда мы пишем код, который пытается работать с shared ресурсом, который заблокирован, мы хотим дождаться его разблокировки и продолжить работу с этим ресурсом. Для этого можем реализовать простой метод для ожидания освободившегося ресурса:

public function tryLock(string $key, string $hash, int $timeout, int $ttl = 10): bool{    $startTime = microtime(true);    while (!this->lock($key, $hash, $ttl)) {        if ((microtime(true) - $startTime) > $timeout) {            return false; // не удалось взять shared ресурс под блокировку за указанный $timeout}usleep(500 * 1000) //ждем 500 миллисекунд до следующей попытки поставить блокировку    }    return true; //блокировка успешно поставлена}

Мы разобрались как ставить блокировку, теперь нам нужно научиться ее снимать. Для того, чтобы гарантировать снятие блокировки тем процессом, который ее установил, нам понадобится перед удалением значения из хранилища Redis, сверить хранимый хэш по этому ключу. Для того, чтобы сделать это атомарно, воспользуемся LUA скриптом:

public function releaseLock(string $key, string $hash): bool{    $command = 'eval "        if redis.call("GET",KEYS[1])==ARGV[1] then            return redis.call("DEL",KEYS[1])        else            return 0        end"';    return (bool) $this->getRedis()->rawCommand($command, 1, $key, $hash);}

Здесь мы пытаемся найти с помощью команды GET значение по ключу $key, если оно равно значению $hash, то удаляем его при помощи команды DEL, которая вернет нам количество удаленных ключей, если же значения по ключу $key не существует, или оно не равно значению $hash, то мы возвращаем 0, что значит блокировку снять не удалось. Базовый пример использования mutex:

class Billing {    public function charge(int $userId, int $amount){        $mutexName = sprintf('billing_%d', $userId);        $hash = sha1(sprintf('billing_%d_%d'), $userId, mt_rand()); //генерим некий хэш запущенного потока        if (!$this->tryLock($mutexName, $hash, 10)) { //пытаемся поставить блокировку в течение 10 секунд            throw new Exception('Не получилось поставить lock, shared ресурс занят');}        //lock получен, процессим бизнес-логику        $this->doSomeLogick();        //освобождаем shared ресурс, снимаем блокировку        $this->releaseLock($mutexName, $hash);}}


Rate limiter


Достаточно частая задача, когда мы хотим ограничить количество запросов к нашему апи. Например на один API endpoint от одного аккаунта мы хотим принимать не более 100 запросов в минуту. Эта задача легко решается с помощью нашего любимого Redis:

public function isLimitReached(string $method, int $userId, int $limit): bool{    $currentTime = time();    $timeWindow = $currentTime - ($currentTime % 60); //Так как наш rate limit имеет ограничение 100 запросов в минуту, //то округляем текущий timestamp до начала минуты  это будет частью нашего ключа,//по которому мы будем считать количество запросов    $key = sprintf('api_%s_%d_%d', $method, $userId, $timeWindow); //генерируем ключ для счетчика, соответственно каждую минуту он будет меняться исходя из $timeWindow    $count = $this->getRedis()->rawCommand('INCR', $key); //метод INCR увеличивает значение по указанному ключу, и возвращает новое значение. //Если ключа не существует, он будут инициализирован со значением 0 и после этого увеличен    if ($count > $limit) { //limit достигнут        return true;    }    return false;} 

Таким простым методом мы можем лимитировать количество запросов к нашему API, базовый каркас нашего контроллера мог бы выглядеть следующим образом:

class FooController {    public function actionBar()    {        if ($this->isLimitReached(__METHOD__, $this->getUserId(), 100)) {            throw new Exception('API method max limit reached');        }        $this->doSomeLogick();    }}


Pub/sub


Pub/sub интересный механизм, который позволяет, с одной стороны, подписаться на канал и получать сообщения из него, с другой стороны отправлять в этот канал сообщение, которое будет получено всеми подписчиками. Наверное у многих, кто работал с вебсокетами, возникла аналогия с этим механизмом, они действительно очень похожи. Механизм pub/sub не гарантирует доставки сообщений, он не гарантирует консистентности, поэтому не стоит его использовать в системах, для которых важны эти критерии. Однако рассмотрим этот механизм на практическом примере. Предположим, что у нас есть большое количество демонизированных команд, которыми мы хотим централизованно управлять. При инициализации нашей команды мы подписываемся на канал, через который будем получать сообщения с инструкциями. С другой стороны у нас есть управляющий скрипт, который отправляет сообщения с инструкциям в указанный канал. К сожалению, стандартный PHP работает в одном блокирующем потоке; для того, чтобы реализовать задуманное, используем ReactPHP и реализованный под него клиент Redis.

Подписка на канал:
class FooDaemon {    private $throttleParam = 10;    public function run()    {        $loop = React\EventLoop\Factory::create(); //инициализируем event-loop ReactPHP        $redisClient = $this->getRedis($loop); //инициализируем клиента Redis для ReactPHP        $redisClient->subscribe(__CLASS__); // подписываемся на нужный нам канал в Redis, в нашем примере название канала соответствует названию класса        $redisClient->on('message', static function($channel, $payload) { //слушаем события message, при возникновении такого события, получаем channel и payload            switch (true) { // Здесь может быть любая логика обработки сообщений, в качестве примера пускай будет так:                case \is_int($payload): //Если к нам пришло число  обновим параметр $throttleParam на полученное значение                    $this->throttleParam = $payload;                    break;                case $payload === 'exit': //Если к нам пришла команда 'exit'  завершим выполнение скрипта                    exit;                default: //Если пришло что-то другое, то просто залогируем это                    $this->log($payload);                    break;            }        });        $loop->addPeriodicTimer(0, function() {            $this->doSomeLogick(); // Здесь в бесконечном цикле может выполняться какая-то логика, например чтение задач из очереди и их процессинг        });        $loop->run(); //Запускаем наш event-loop    }}

Отправка сообщения в канал более простое действие, мы можем сделать это абсолютно из любого места системы одной командой:

public function publishMessage($channel, $message){    $this->getRedis()->publish($channel, $message);}

В результате такой отправки сообщения в канал, все клиенты, которые подписаны на данный канал, получат это сообщение.



Итог


Мы рассмотрели 5 примеров использования Redis на практике, надеюсь что каждый найдет для себя что-то интересное. В нашем стэке технологий Redis занимает важное место, мы любим этот инструмент за его скорость и гибкость. Мы используем Redis в продакшене уже много лет, и он зарекомендовал себя как очень крутой и надежный инструмент, который лежит в основе многих частей нашего продукта. Наш небольшой кластер Redis серверов обрабатывает около 1 миллиона запросов в секунду. А как вы используете Redis в своем проекте? Делитесь опытом в комментариях!
Подробнее..

Перевод Введение в асинхронное программирование на Python

03.07.2020 14:20:11 | Автор: admin
Всем привет. Подготовили перевод интересной статьи в преддверии старта базового курса Разработчик Python.



Введение


Асинхронное программирование это вид параллельного программирования, в котором какая-либо единица работы может выполняться отдельно от основного потока выполнения приложения. Когда работа завершается, основной поток получает уведомление о завершении рабочего потока или произошедшей ошибке. У такого подхода есть множество преимуществ, таких как повышение производительности приложений и повышение скорости отклика.



В последние несколько лет асинхронное программирование привлекло к себе пристальное внимание, и на то есть причины. Несмотря на то, что этот вид программирования может быть сложнее традиционного последовательного выполнения, он гораздо более эффективен.

Например, вместо того, что ждать завершения HTTP-запроса перед продолжением выполнения, вы можете отправить запрос и выполнить другую работу, которая ждет своей очереди, с помощью асинхронных корутин в Python.

Асинхронность это одна из основных причин популярности выбора Node.js для реализации бэкенда. Большое количество кода, который мы пишем, особенно в приложениях с тяжелым вводом-выводом, таком как на веб-сайтах, зависит от внешних ресурсов. В нем может оказаться все, что угодно, от удаленного вызова базы данных до POST-запросов в REST-сервис. Как только вы отправите запрос в один из этих ресурсов, ваш код будет просто ожидать ответа. С асинхронным программированием вы позволяете своему коду обрабатывать другие задачи, пока ждете ответа от ресурсов.

Как Python умудряется делать несколько вещей одновременно?




1. Множественные процессы

Самый очевидный способ это использование нескольких процессов. Из терминала вы можете запустить свой скрипт два, три, четыре, десять раз, и все скрипты будут выполняться независимо и одновременно. Операционная система сама позаботится о распределении ресурсов процессора между всеми экземплярами. В качестве альтернативы вы можете воспользоваться библиотекой multiprocessing, которая умеет порождать несколько процессов, как показано в примере ниже.

from multiprocessing import Processdef print_func(continent='Asia'):    print('The name of continent is : ', continent)if __name__ == "__main__":  # confirms that the code is under main function    names = ['America', 'Europe', 'Africa']    procs = []    proc = Process(target=print_func)  # instantiating without any argument    procs.append(proc)    proc.start()    # instantiating process with arguments    for name in names:        # print(name)        proc = Process(target=print_func, args=(name,))        procs.append(proc)        proc.start()    # complete the processes    for proc in procs:        proc.join()


Вывод:

The name of continent is :  AsiaThe name of continent is :  AmericaThe name of continent is :  EuropeThe name of continent is :  Africa


2. Множественные потоки

Еще один способ запустить несколько работ параллельно это использовать потоки. Поток это очередь выполнения, которая очень похожа на процесс, однако в одном процессе вы можете иметь несколько потоков, и у всех них будет общий доступ к ресурсам. Однако из-за этого написать код потока будет сложно. Аналогично, все тяжелую работу по выделению памяти процессора сделает операционная система, но глобальная блокировка интерпретатора (GIL) позволит только одному потоку Python запускаться в одну единицу времени, даже если у вас есть многопоточный код. Так GIL на CPython предотвращает многоядерную конкурентность. То есть вы насильно можете запуститься только на одном ядре, даже если у вас их два, четыре или больше.

import threading def print_cube(num):    """    function to print cube of given num    """    print("Cube: {}".format(num * num * num)) def print_square(num):    """    function to print square of given num    """    print("Square: {}".format(num * num)) if __name__ == "__main__":    # creating thread    t1 = threading.Thread(target=print_square, args=(10,))    t2 = threading.Thread(target=print_cube, args=(10,))     # starting thread 1    t1.start()    # starting thread 2    t2.start()     # wait until thread 1 is completely executed    t1.join()    # wait until thread 2 is completely executed    t2.join()     # both threads completely executed    print("Done!")


Вывод:

Square: 100Cube: 1000Done!


3. Корутины и yield:

Корутины это обобщение подпрограмм. Они используются для кооперативной многозадачности, когда процесс добровольно отдает контроль (yield) с какой-то периодичностью или в периоды ожидания, чтобы позволить нескольким приложениям работать одновременно. Корутины похожи на генераторы, но с дополнительными методами и небольшими изменениями в том, как мы используем оператор yield. Генераторы производят данные для итерации, в то время как корутины могут еще и потреблять данные.

def print_name(prefix):    print("Searching prefix:{}".format(prefix))    try :         while True:                # yeild used to create coroutine                name = (yield)                if prefix in name:                    print(name)    except GeneratorExit:            print("Closing coroutine!!") corou = print_name("Dear")corou.__next__()corou.send("James")corou.send("Dear James")corou.close()


Вывод:

Searching prefix:DearDear JamesClosing coroutine!!


4. Асинхронное программирование

Четвертый способ это асинхронное программирование, в котором не участвует операционная система. Со стороны операционной системы у вас останется один процесс, в котором будет всего один поток, но вы все еще сможете выполнять одновременно несколько задач. Так в чем тут фокус?

Ответ: asyncio

Asyncio модуль асинхронного программирования, который был представлен в Python 3.4. Он предназначен для использования корутин и future для упрощения написания асинхронного кода и делает его почти таким же читаемым, как синхронный код, из-за отсутствия callback-ов.

Asyncio использует разные конструкции: event loop, корутины и future.

  • event loop управляет и распределяет выполнение различных задач. Он регистрирует их и обрабатывает распределение потока управления между ними.
  • Корутины (о которых мы говорили выше) это специальные функции, работа которых схожа с работой генераторов в Python, с помощью await они возвращают поток управления обратно в event loop. Запуск корутины должен быть запланирован в event loop. Запланированные корутины будут обернуты в Tasks, что является типом Future.
  • Future отражает результат таска, который может или не может быть выполнен. Результатом может быть exception.


С помощью asyncio вы можете структурировать свой код так, чтобы подзадачи определялись как корутины и позволяли планировать их запуск так, как вам заблагорассудится, в том числе и одновременно. Корутины содержат точки yield, в которых мы определяем возможные точки переключения контекста. В случае, если в очереди ожидания есть задачи, то контекст будет переключен, в противном случае нет.

Переключение контекста в asyncio представляет собой event loop, который передает поток управления от одной корутины к другой.

В следующем примере, мы запускаем 3 асинхронных таска, которые по-отдельности делают запросы к Reddit, извлекают и выводят содержимое JSON. Мы используем aiohttp клиентскую библиотеку http, которая гарантирует, что даже HTTP-запрос будет выполнен асинхронно.

import signal  import sys  import asyncio  import aiohttp  import jsonloop = asyncio.get_event_loop()  client = aiohttp.ClientSession(loop=loop)async def get_json(client, url):      async with client.get(url) as response:        assert response.status == 200        return await response.read()async def get_reddit_top(subreddit, client):      data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=5')    j = json.loads(data1.decode('utf-8'))    for i in j['data']['children']:        score = i['data']['score']        title = i['data']['title']        link = i['data']['url']        print(str(score) + ': ' + title + ' (' + link + ')')    print('DONE:', subreddit + '\n')def signal_handler(signal, frame):      loop.stop()    client.close()    sys.exit(0)signal.signal(signal.SIGINT, signal_handler)asyncio.ensure_future(get_reddit_top('python', client))  asyncio.ensure_future(get_reddit_top('programming', client))  asyncio.ensure_future(get_reddit_top('compsci', client))  loop.run_forever()


Вывод:

50: Undershoot: Parsing theory in 1965 (http://personeltest.ru/away/jeffreykegler.github.io/Ocean-of-Awareness-blog/individual/2018/07/knuth_1965_2.html)12: Question about best-prefix/failure function/primal match table in kmp algorithm (http://personeltest.ru/aways/www.reddit.com/r/compsci/comments/8xd3m2/question_about_bestprefixfailure_functionprimal/)1: Question regarding calculating the probability of failure of a RAID system (http://personeltest.ru/aways/www.reddit.com/r/compsci/comments/8xbkk2/question_regarding_calculating_the_probability_of/)DONE: compsci336: /r/thanosdidnothingwrong -- banning people with python (http://personeltest.ru/aways/clips.twitch.tv/AstutePluckyCocoaLitty)175: PythonRobotics: Python sample codes for robotics algorithms (http://personeltest.ru/aways/atsushisakai.github.io/PythonRobotics/)23: Python and Flask Tutorial in VS Code (http://personeltest.ru/aways/code.visualstudio.com/docs/python/tutorial-flask)17: Started a new blog on Celery - what would you like to read about? (http://personeltest.ru/aways/www.python-celery.com)14: A Simple Anomaly Detection Algorithm in Python (http://personeltest.ru/aways/medium.com/@mathmare_/pyng-a-simple-anomaly-detection-algorithm-2f355d7dc054)DONE: python1360: git bundle (http://personeltest.ru/aways/dev.to/gabeguz/git-bundle-2l5o)1191: Which hashing algorithm is best for uniqueness and speed? Ian Boyd's answer (top voted) is one of the best comments I've seen on Stackexchange. (http://personeltest.ru/aways/softwareengineering.stackexchange.com/questions/49550/which-hashing-algorithm-is-best-for-uniqueness-and-speed)430: ARM launches Facts campaign against RISC-V (http://personeltest.ru/aways/riscv-basics.com/)244: Choice of search engine on Android nuked by Anonymous Coward (2009) (http://personeltest.ru/aways/android.googlesource.com/platform/packages/apps/GlobalSearch/+/592150ac00086400415afe936d96f04d3be3ba0c)209: Exploiting freely accessible WhatsApp data or Why does WhatsApp web know my phones battery level? (http://personeltest.ru/aways/medium.com/@juan_cortes/exploiting-freely-accessible-whatsapp-data-or-why-does-whatsapp-know-my-battery-level-ddac224041b4)DONE: programming


Использование Redis и Redis Queue RQ


Использование asyncio и aiohttp не всегда хорошая идея, особенно если вы пользуетесь более старыми версиями Python. К тому же, бывают моменты, когда вам нужно распределить задачи по разным серверам. В этом случае можно использовать RQ (Redis Queue). Это обычная библиотека Python для добавления работ в очередь и обработки их воркерами в фоновом режиме. Для организации очереди используется Redis база данных ключей/значений.

В примере ниже мы добавили в очередь простую функцию count_words_at_url с помощью Redis.

from mymodule import count_words_at_urlfrom redis import Redisfrom rq import Queueq = Queue(connection=Redis())job = q.enqueue(count_words_at_url, 'http://nvie.com')******mymodule.py******import requestsdef count_words_at_url(url):    """Just an example function that's called async."""    resp = requests.get(url)    print( len(resp.text.split()))    return( len(resp.text.split()))


Вывод:

15:10:45 RQ worker 'rq:worker:EMPID18030.9865' started, version 0.11.015:10:45 *** Listening on default...15:10:45 Cleaning registries for queue: default15:10:50 default: mymodule.count_words_at_url('http://nvie.com') (a2b7451e-731f-4f31-9232-2b7e3549051f)32215:10:51 default: Job OK (a2b7451e-731f-4f31-9232-2b7e3549051f)15:10:51 Result is kept for 500 seconds


Заключение


В качестве примера возьмем шахматную выставку, где один из лучших шахматистов соревнуется с большим количеством людей. У нас есть 24 игры и 24 человека, с которыми можно сыграть, и, если шахматист будет играть с ними синхронно, это займет не менее 12 часов (при условии, что средняя игра занимает 30 ходов, шахматист продумывает ход в течение 5 секунд, а противник примерно 55 секунд.) Однако в асинхронном режиме шахматист сможет делать ход и оставлять противнику время на раздумья, тем временем переходя к следующему противнику и деля ход. Таким образом, сделать ход во всех 24 играх можно за 2 минуты, и выиграны они все могут быть всего за один час.

Это и подразумевается, когда говорят о том, что асинхронность ускоряет работу. О такой быстроте идет речь. Хороший шахматист не начинает играть в шахматы быстрее, просто время более оптимизировано, и оно не тратится впустую на ожидание. Так это работает.

По этой аналогии шахматист будет процессором, а основная идея будет заключаться в том, чтобы процессор простаивал как можно меньше времени. Речь о том, чтобы у него всегда было занятие.

На практике асинхронность определяется как стиль параллельного программирования, в котором одни задачи освобождают процессор в периоды ожидания, чтобы другие задачи могли им воспользоваться. В Python есть несколько способов достижения параллелизма, отвечающих вашим требованиям, потоку кода, обработке данных, архитектуре и вариантам использования, и вы можете выбрать любой из них.



Узнать о курсе подробнее.


Подробнее..

Как перестать DDoS-ить чужой API и начать жить

25.04.2021 12:10:20 | Автор: admin

Поговорим о способах ограничить число исходящих запросов враспределенном приложении. Это нужно, если внешний API непозволяет обращаться кнему тогда, когда вам вздумается.

Вводные

Для начала немного вводных. Есть наше приложение иесть некий внешний сервис. Например, какое-то банковское ПО, API для отслеживания почтовых отправлений, что угодно. При этом наше приложение непросто использует его, там куча очень важной для нас информации. Прибыль компании напрямую зависит отобъема выгруженных оттуда данных. Мыпонимаем, один сервер это слишком мало изаводим себе пару десятков машин. Чтобы приложение масштабировалось лучше, делаем так: разбиваем весь объем намаленькие задачи иотправляем ихвочередь. Каждый сервер извлекает ихоттуда поодной. Втаком сообщении указан, например, IDпользователя. Затем приложение скачивает данные для него исохраняет ихвбазе. Большая ибыстрая машина обработает много задач, маленькая имедленная поменьше.

Создатели внешнего сервиса понимают, что, если неограничивать наше взаимодействие, они начнут отказывать вобслуживании другим клиентам. Поэтому вводится лимит назапросы: теперь разрешено неболее 1000 одновременных обращений кним.

Немного подумав над условиями задачи находим решение семафор. Ондопускает, чтобы одновременно вкритической секции было не болееN потоков выполнения. Когда внутри скапливается предельное ихколичество новые желающие ожидают навходе. Кто-то выходит значит кто-то ещё может войти.

Один семафор на машину

Делим лимит запросов начисло доступных серверов (1000/20) иполучаем по50конкурентных обращений намашину.

Простой семафор в .NET
private const int RequestsLimit = 50;private static readonly SemaphoreSlim Throttler =   new SemaphoreSlim(RequestsLimit);async Task<HttpResponseMessage> InvokeServiceAsync(HttpClient client){try{await Throttler.WaitAsync().ConfigureAwait(false);return await client.GetAsync("todos/1").ConfigureAwait(false);}finally{Throttler.Release();}}

В .NET Core можно сделать типизированный HttpClient, получится очень вдухе новых веяний, янебуду останавливаться наэтом подробнее, новыможете посмотреть сюда. Там ивцелом такой подход раскрывается детальнее, чем яделаю это здесь.

Попробуем проанализировать то, что получилось.

Вэтой конфигурации будут возникать ситуации, когда одна машина свободна, адругие перегружены. Этого можно избежать, беря задачу вработу при условии, что наконкретном сервере число сообщений вобработке недостигло предела. Тогда возникнет другая проблема как только задача выполнится занять еёместо будет некому, сперва потребуется получить новое сообщение изочереди. Можно постоянно держать систему слегка перегруженной это приведет ктому, что некоторые задачи-неудачники долго будут висеть навходе вкритическую секцию. Вцелом, утакого подхода есть потенциал, сним можно достигнуть хорошей производительности снебольшим тюнингом ипри определенных условиях.

Подведем ему некий итог:

Плюсы:

  1. Простой код

  2. Ресурсы машины используются эффективно

Минусы:

  1. Не полностью утилизируется канал во внешний сервис

Один семафор на всех

Подумаем, вкакую сторону двинуться теперь. Попробуем сделать семафор общим ресурсом для всех серверов. Так тоже без проблем необойдется будет нужно тратить время перед каждым запросом вовнешнюю систему наобращение ксервису-throttler-у. Ноадминистрировать такое, наверное, проще засостоянием одного семафора легче следить, ненадо подбирать лимит под каждую машину вотдельности. Какже сделать общий троттлер? Ну, конечноже вRedis.

Сточки зрения пользователя Redis однопоточный (онтак выглядит). Это круто, большая часть проблем сконкурентным доступом кнему сразу снимается.

ВRedis нет готового семафора, номожно построить его насортированных множествах.

Писать код для него будем наLua. Интерпретатор Lua встроен вRedis, такие скрипты выполняются одним махом, атомарно. Мымоглибы без этого обойтись, нотак код получится ещё сложнее, придется учитывать конкуренцию между серверами заотдельные команды.

Теперь подумаем, что писать. Учтём, что машина, взявшая блокировку, может никогда еёнеотпустить. Вдруг кто-то решит выключить рубильник или что-то пойдет непоплану вмомент обновления нашего приложения. Учтём также, что унас может быть несколько таких семафоров. Это пригодится, например, для разных внешних сервисов, API-ключей или если мызахотим выделить отдельный канал под приоритетные запросы.

Скрипт для Redis
--[[  KEYS[1] - Имя семафора ARGV[1] - Время жизни блокировки ARGV[2] - Идентификатор блокировки, чтобы её можно было возвратить ARGV[3] - Доступный объем семафора ]]--   -- Будем использовать команды с недетерминированным результатом,  -- Redis-у важно знать заранее redis.replicate_commands()local unix_time = redis.call('TIME')[1]   -- Удаляем блокировки с истёкшим TTL redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', unix_time - ARGV[1])   -- Получаем число элементов в множестве local count = redis.call('zcard', KEYS[1])   if count < tonumber(ARGV[3]) then-- добавляем блокировку в множество, если есть место  -- время будет являться ключем сортировки (для последующий чистки записей) redis.call('ZADD', KEYS[1], unix_time, ARGV[2])       -- Возвращаем число взятых блокировок (например, для логирования)    return count end   return nil

Чтобы освободить блокировку будем удалять элемент изсортированного множества. Это одна команда. Обойдемся без отдельного скрипта, многие клиенты вт.ч.идля .NET это умеют.

Подробнее о вариантах реализации блокировок с Redis и семафоров в частности можно посмотреть здесь.

Иногда внешний сервис ограничивает число обращений другим образом, например, разрешает делать неболее 1000 запросов вминуту. Вэтом случае вRedis можно завести счётчик сфиксированным временем жизни.

Вкоде нашего приложения сделаем оболочку вокруг написанного скрипта, некий класс, экземпляр которого заменит семафор изпервого варианта.

Код для приложения
public sealed class RedisSemaphore{private static readonly string AcquireScript = "...";private static readonly int TimeToLiveInSeconds = 300;private readonly Func<ConnectionMultiplexer> _redisFactory;public RedisSemaphore(Func<ConnectionMultiplexer> redisFactory){_redisFactory = redisFactory;}public async Task<LockHandler> AcquireAsync(string name, int limit){var handler = new LockHandler(this, name);do{var redisDb = _redisFactory().GetDatabase();var rawResult = await redisDb.ScriptEvaluateAsync(AcquireScript, new RedisKey[] { name },new RedisValue[] { TimeToLiveInSeconds, handler.Id, limit }).ConfigureAwait(false);var acquired = !rawResult.IsNull;if (acquired)break;await Task.Delay(10).ConfigureAwait(false);} while (true);return handler;}public async Task ReleaseAsync(LockHandler handler, string name){var redis = _redisFactory().GetDatabase();await redis.SortedSetRemoveAsync(name, handler.Id).ConfigureAwait(false);}}public sealed class LockHandler : IAsyncDisposable{private readonly RedisSemaphore _semaphore;private readonly string _name;public LockHandler(RedisSemaphore semaphore, string name){_semaphore = semaphore;_name = name;Id = Guid.NewGuid().ToString();}public string Id { get; }public async ValueTask DisposeAsync(){await _semaphore.ReleaseAsync(this, _name).ConfigureAwait(false);}}

Посмотрим, что получилось.

Плюсы:

  1. Просто конфигурировать лимит

  2. Канал используется эффективно

  3. Легко наблюдать за утилизацией канала

Минусы:

  1. Дополнительный элемент инфраструктуры

  2. Ещё одна точка отказа

  3. Накладные расходы на обращение к Redis-у

  4. Нетривиальный код

Если выиспользуете Redis впроекте, топочемубы несделать нанём иблокировки. Тащитьже его ксебе только ради этого уже нетак весело. Что-то подобное можно реализовать, используя базу данных, носкорее всего это добавит немало хлопот инакладных расходов. Второй минус лично мне не кажется очень существенным. Что ни говори, а отказывает Redis обычно не так уж часто, особенно, если это SaaS.

Легкость настройки такой блокировки это существенный плюс. Не надо все перенастраивать при каждом масштабировании, подбирать лимиты под машины. Можно следить за утилизацией канала в графане, это удобно.

Думаю, можно реализовать троттлинг исходящих запросов инауровне инфраструктуры, номне кажется удобным иметь вкоде контроль над состоянием блокировки. Кроме того, настроить это в чужих облаках наверняка будет непросто. Авам приходилось когда-нибудь ограничивать число исходящих запросов? Как делаете этовы?

Подробнее..

Как подружить Redis Cluster c Testcontainers?

20.06.2021 12:09:43 | Автор: admin
В 26-м выпуске NP-полного подкаста я рассказывал, что начал переводить один из своих сервисов из Redis Sentinel на Redis Cluster. На этой неделе я захотел потестировать данный код, и, конечно же, выбрал Testcontainers для этого. К сожалению, Redis Cluster в тестовых контейнерах не завелся из коробки, и мне пришлось вставить несколько костылей. О них и пойдет речь далее.



Вводные


Сначала я бы хотел описать все вводные, а потом рассказать про костыли. Мой проект построен на Spring Boot. Для взаимодействия с редисом используется Lettuce клиент. Для тестирования testcontainers-java с JUnit. Версия обоих редисов 6. В общем, всё типичное, нет ничего особенного с точки зрения стека.

Если кто-то еще не знаком с testcontainers, то пара слов о них. Это библиотека для интеграционного тестирования. Она построена на другой библиотеке https://github.com/docker-java/docker-java. Тестконтейнеры, по сути говоря, помогают быстро и просто запускать контейнеры с разными зависимостями в ваших интеграционных тестах. Обычно это базы данных, очереди и другие сложные системы. Некоторые люди используют testcontainers и для запуска своих сервисов, от которых зависит тестируемое приложение (чтобы тестировать микросервисное взаимодействие).

Про Redis Cluster


Redis Cluster это одна из нескольких реализаций распределнного режима Редиса. К сожалению, в Редисе нет единого правильного способа, как масштабировать базу. Есть Sentinel, есть Redis Cluster, а еще ребята активно разрабатывают RedisRaft распредеделенный редис на базе протокола консенсуса Raft (у них там своя реализация, которая, как они сами заявляют, не совсем каноничный Рафт, но конкретно для Redis самое то).

В целом, про Redis Cluster есть две замечательных статьи на официальном сайте https://redis.io/topics/cluster-tutorial и https://redis.io/topics/cluster-spec. Большинство деталей описано там.

Для использования Redis Cluster в testcontainers важно знать несколько вещей из документации. Во-первых, Redis Cluster использует gossip протокол поэтому каждый узел кластера имеет TCP-соединение со всеми другими узлами. Поэтому, между нодами должна быть сетевая связность, даже в тестах.

Вторая важная штука, которую надо знать при тестировании это наличие в Redis Cluster bootstrap узлов для конфигурации. То есть, вы в настройках можете задать лишь подмножество узлов, которые будут использоваться для старта приложения. В последствие, Redis клиент сам получит Топологию кластера через взаимодействие с Редисом. Исходя из этого, получается вторая особенность тестируемое приложение должно иметь сетевую связность с теми Redis URI, которые будут аннонсированы со стороны редис кластера (кстати, эти адреса можно сконфигурировать через cluster-announce-port и cluster-announce-ip).

Про костыли с Redis Cluster и testcontainers


Для тестирования я выбрал довольно популярный docker-образ https://github.com/Grokzen/docker-redis-cluster. Он не подходит для продакшена, но очень прост в использовании в тестах. Особенность этого образа все Редисы (а их 6 штук, по умолчанию 3 мастера и 3 слейва) будут подняты в рамках одного контейнера. Поэтому, мы автоматически получаем сетевую связность между узлами кластера из коробки. Осталось решить вторую из двух проблем, связанную с получением приложением топологии кластера.

Я не хотел собирать свой docker-образ, а выбранный мной image не предоставляет возможности задавать настройки cluster-announce-port и cluster-announce-ip. Поэтому, если ничего не делать дополнительно, при запуске тестов вы увидите примерно такие ошибки:

Unable to connect to [172.17.0.3/<unresolved>:7003]: connection timed out: /172.17.0.3:7003


Ошибка означает, что мы со стороны приложения пытаеся приконнектится к Узлу редис кластера, используя IP докер контейнера и внутренний порт (порт 7003 используется данным узлом, но наружу он отображается на какой-то случайный порт, который мы и должны использовать в нашем приложении; внутренний порт, по понятным причинам, не доступен из вне). Что касается данного IP-адреса он доступен для приложения, если это Linux, и он не доступен для приложения, если это MacOs/Windows (из-за особенностей реализации докера на этих ОС).

Решение проблемы (а-ка костыль) я собрал по частичкам из разных статей. А давайте сделаем NAT RedisURI на стороне приложения. Ведь это нужно именно для тестов, и тут не так страшно вставлять такой ужас. Решение, на самом деле, состоит из пары строк (огромное спасибо Спрингу и Lettuce, где можно сконфигурировать практически всё, только и успевай, как переопределять бины).

public SocketAddress resolve(RedisURI redisURI) {    Integer mappedPort = redisClusterNatPortMapping.get(redisURI.getPort());    if (mappedPort != null) {        SocketAddress socketAddress = redisClusterSocketAddresses.get(mappedPort);        if (socketAddress != null) {            return socketAddress;        }        redisURI.setPort(mappedPort);    }    redisURI.setHost(DockerClientFactory.instance().dockerHostIpAddress());    SocketAddress socketAddress = super.resolve(redisURI);    redisClusterSocketAddresses.putIfAbsent(redisURI.getPort(), socketAddress);    return socketAddress;}


Полный код выложен на гитхаб https://github.com/Hixon10/spring-redis-cluster-testcontainers.

Идея кода супер простая. Будем хранить две Map. В первой маппинг между внутренними портами редиса (7000..7005) и теми, что доступны для приложения (они могут быть чем-то типа 51343, 51344 и тд). Во-второй внешние порты (типа, 51343) и SocketAddress, полученный для них. Теперь, когда мы получаем от Редиса при обновлении топологии что-то типа 172.17.0.3:7003, мы сможем легко найти нужный внешний порт, по которому сможем найти SocketAddress и переиспользовать его. То есть, с портами проблема решена. А что с IP?

С IP-адресом всё просто. Тут нам на помощь приходят Тест контейнеры в которых есть утилитный метод DockerClientFactory.instance().dockerHostIpAddress(). Для MacOs/Windows он будет отдавать localhost, а для linux IP-адрес контейнера.

Выводы


Программирование это супер интересно, но это вы и без меня знали. А ещё, порой приходится вспомить, что было на первой лекции по сетям в универе, чтобы написать на своей любимой джавке пару новых интеграционных тестов. Приятно, когда знания из института пригождаются в самый неожиданный момент.
Подробнее..

Профилирование. Отслеживаем состояние боевого окружения с помощью Redis, ClickHouse и Grafana

14.07.2020 10:11:05 | Автор: admin

прим. latency/time.

Наверное перед каждым возникает задача профилирования кода в продакшене. С этой задачей хорошо справляется xhprof от Facebook. Вы профилируете, к примеру, 1/1000 запросов и видите картину на текущий момент. После каждого релиза прибегает продакт и говорит до релиза было лучше и быстрее. Исторических данных у вас нет и доказать вы ничего не можете. А что если бы могли?

Не так давно, переписывали проблемный участок кода и ожидали сильный прирост производительности. Написали юнит-тесты, провели нагрузочное тестирование, но как код поведет себя под живой нагрузкой? Ведь мы знаем, нагрузочное тестирование не всегда отображает реальные данные, а после деплоя необходимо быстро получить обратную связь от вашего кода. Если вы собираете данные, то, после релиза, вам достаточно 10-15 минут чтобы понять обстановку в боевом окружении.


прим. latency/time. (1) деплой, (2) откат

Стек


Для своей задачи мы взяли колоночную базу данных ClickHouse (сокр. кх). Скорость, линейная масштабируемость, сжатие данных и отсутствие deadlock стали главными причинами такого выбора. Сейчас это одна из основных баз в проекте.

В первой версии мы писали сообщения в очередь, а уже консьюмерами записывали в ClickHouse. Задержка достигала 3-4 часа (да, ClickHouse медленный на вставку по одной записи). Время шло и надо было что-то менять. Реагировать на оповещения с такой задержкой не было смысла. Тогда мы написали крон-команду, которая выбирала из очереди необходимое количество сообщений и отправляла пачку в базу, после, помечала их обработанными в очереди. Первые пару месяцев все было хорошо, пока и тут не начались в проблемы. Событий стало слишком много, начали появляться дубли данных в базе, очереди использовались не по-прямому назначению (стали базой данных), а крон-команда перестала справляться с записью в ClickHouse. За это время в проект добавилось ещё пара десятков таблиц, которые необходимо было писать пачками в кх. Скорость обработки упала. Необходимо было максимально простое и быстрое решение. Это подтолкнуло нас к написанию кода с помощью списков на redis. Идея такая: записываем сообщения в конец списка, крон-командой формируем пачку и отправляем её в очередь. Дальше консьюмеры разбирают очередь и записывают пачку сообщений в кх.

Имеем: ClickHouse, Redis и очередь (любую rabbitmq, kafka, beanstalkd)

Redis и списки


До определенного времени, Redis использовался как кэш, но всё меняется. База имеет огромный функционал, а для нашей задачи необходимы всего 3 команды: rpush, lrange и ltrim.

С помощью команды rpush будем записывать данные в конец списка. В крон-команде читать данные с помощью lrange и отправлять в очередь, если нам удалось отправить в очередь, то необходимо удалить выбранные данные с помощью ltrim.

От теории к практике. Создаем простой список.



У нас есть список из трех сообщений, добавим ещё немного



Новые сообщения добавляются в конец списка. С помощью команды lrange выбираем пачку (пусть будет =5 сообщений).



Далее пачку отправляем в очередь. Теперь необходимо удалить эту пачку из Redis, чтобы не отправить её повторно.



Алгоритм есть, приступим к реализации.

Реализация


Начнем с таблицы ClickHouse. Не стал сильно заморачиваться и определил всё в тип String.

create table profile_logs(    hostname   String, // хост бэкэнда, отправляющего событие    project    String, // название проекта    version    String, // версия фреймворка    userId     Nullable(String),    sessionId  Nullable(String),    requestId  String, // уникальная строка для всего запроса от клиента    requestIp  String, // ip клиента    eventName  String, // имя события    target     String, // URL    latency    Float32, // время выполнения (latency=endTime - beginTime)    memoryPeak Int32,    date       Date,    created    DateTime)    engine = MergeTree(date, (date, project, eventName), 8192);


Событие будет таким:
{  "hostname": "debian-fsn1-2",  "project": "habr",  "version": "7.19.1",  "userId": null,  "sessionId": "Vv6ahLm0ZMrpOIMCZeJKEU0CTukTGM3bz0XVrM70",  "requestId": "9c73b19b973ca460",  "requestIp": "46.229.168.146",  "eventName": "app:init",  "target": "/",  "latency": 0.01384348869323730,  "memoryPeak": 2097152,  "date": "2020-07-13",  "created": "2020-07-13 13:59:02"}


Структура определена. Чтобы посчитать latency нам нужен временной промежуток. Засекаем с помощью функции microtime:
$beginTime = microtime(true);// код который необходимо отслеживать$latency = microtime(true) - $beginTime;


Для упрощения реализации, будем использовать фреймворк laravel и библиотеку laravel-entry. Добавим модель (таблица profile_logs):
class ProfileLog extends \Bavix\Entry\Models\Entry{    protected $fillable = [        'hostname',        'project',        'version',        'userId',        'sessionId',        'requestId',        'requestIp',        'eventName',        'target',        'latency',        'memoryPeak',        'date',        'created',    ];    protected $casts = [        'date' => 'date:Y-m-d',        'created' => 'datetime:Y-m-d H:i:s',    ];}


Напишем метод tick (я сделал сервис ProfileLogService), который будет записывать сообщения в Redis. Получаем текущее время (наш beginTime) и записываем его в переменную $currentTime:
$currentTime = \microtime(true);


Если тик по событию вызван впервые, то записываем его в массив тиков и завершаем метод:
 if (empty($this->ticks[$eventName])) {    $this->ticks[$eventName] = $currentTime;    return;}


Если тик вызывается повторно, то мы записываем сообщение в Redis, с помощью метода rpush:
$tickTime = $this->ticks[$eventName];unset($this->ticks[$eventName]);Redis::rpush('events:profile_logs', \json_encode([    'hostname' => \gethostname(),    'project' => 'habr',    'version' => \app()->version(),    'userId' => Auth::id(),    'sessionId' => \session()->getId(),    'requestId' => \bin2hex(\random_bytes(8)),    'requestIp' => \request()->getClientIp(),    'eventName' => $eventName,    'target' => \request()->getRequestUri(),    'latency' => $currentTime - $tickTime,    'memoryPeak' => \memory_get_usage(true),    'date' => $tickTime,    'created' => $tickTime,]));


Переменая $this->ticks не статическая. Необходимо зарегистрировать сервис как singleton.

$this->app->singleton(ProfileLogService::class);


Размер пачки ($batchSize) можно сконфигурировать, рекомендуется указывать небольшое значние (например, 10,000 элементов). При возникновении проблем (к примеру, не доступен ClickHouse), очередь начнет уходить в failed, и вам необходимо отлаживать данные.

Напишем крон-команду:
$batchSize = 10000;$key = 'events:profile_logs'do {    $bulkData = Redis::lrange($key, 0, \max($batchSize - 1, 0));    $count = \count($bulkData);    if ($count) {        // все данные храним в json, необходимо применить decode        foreach ($bulkData as $itemKey => $itemValue) {            $bulkData[$itemKey] = \json_decode($itemValue, true);        }        // отправляем в очередь для записи в ch        \dispatch(new BulkWriter($bulkData));        // удаляем пачку из redis        Redis::ltrim($key, $count, -1);    }} while ($count >= $batchSize);


Можно сразу записывать данные в ClickHouse, но, проблема кроется в том, что крон работает в однопоточном режиме. Поэтому мы пойдем другим путем командой сформируем пачки и отправим их в очередь, для последующей многопоточной записи в ClickHouse. Количество консьюмеров можно регулировать это ускорит отправку сообщений.

Перейдем к написанию консьюмера:
class BulkWriter implements ShouldQueue{    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;    protected $bulkData;    public function __construct(array $bulkData)    {        $this->bulkData = $bulkData;    }    public function handle(): void    {            ProfileLog::insert($this->bulkData);        }    }}


Итак, формирование пачек, отправка в очередь и консьюмер разработаны можно приступать к профилированию:
app(ProfileLogService::class)->tick('post::paginate');$posts = Post::query()->paginate();$response = view('posts', \compact('posts'));app(ProfileLogService::class)->tick('post::paginate');return $response;


Если все сделано верно, то данные должны находиться в Redis. Запутим крон-команду и отправим пачки в очередь, а уже консьюмер вставит их в базу.



Данные в базе. Можно строить графики.

Grafana


Теперь перейдем к графическому представлению данных, что является ключевым элементом этой статьи. Необходимо установить grafana. Опустим процесс установки для debain-подобных сборок, можно воспользоваться ссылкой на документацию. Обычно, этап установки сводится к apt install grafana.

На ArchLinux установка выглядит следующим образом:
yaourt -S grafanasudo systemctl start grafana


Сервис запустился. URL: http://localhost:3000

Теперь необходимо установить ClickHouse datasource plugin:
sudo grafana-cli plugins install vertamedia-clickhouse-datasource


Если установили grafana 7+, то ClickHouse работать не будет. Нужно внести изменения в конфигурацию:
sudo vi /etc/grafana.ini


Найдем строку:
;allow_loading_unsigned_plugins =


Заменим её на эту:
allow_loading_unsigned_plugins=vertamedia-clickhouse-datasource


Сохраним и перезапустим сервис:
sudo systemctl restart grafana


Готово. Теперь можем перейти в grafana.
Логин: admin / пароль: admin по умолчанию.



После успешной авторизации, нажмем на шестеренку. В открывшемся popup-окне выберем на Data Sources, добавим соединение с ClickHouse.



Заполняем конфигурацию кх. Нажимаем на кнопку Save & Test, получаем сообщение об успешном соединении.

Теперь добавим новый dashboard:


Добавим панель:


Выберем базу и соответствующие колонки для работы с датами:


Перейдем к запросу:


Получили график, но хочется конкретики. Давайте выведем средний latency с округлением даты-с-временем вниз до начала пятиминутного интервала:


Теперь на графике отображаются выбранные данные, можем ориентироваться на них. Для оповещений настроить триггеры, группировать по события и многое другое.


Профилировщик, ни в коем случае, не является заменой инструментам: xhprof (facebook), xhprof (tideways), liveprof от (Badoo). А только дополняет их.

Весь исходный код находится на github модель профилировщика, сервис, BulkWriteCommand, BulkWriterJob и middleware (1, 2).

Установка пакета:
composer req bavix/laravel-prof


Настройка соединений (config/database.php), добавляем clickhouse:
'bavix::clickhouse' => [    'driver' => 'bavix::clickhouse',    'host' => env('CH_HOST'),    'port' => env('CH_PORT'),    'database' => env('CH_DATABASE'),    'username' => env('CH_USERNAME'),    'password' => env('CH_PASSWORD'),],


Начало работы:
use Bavix\Prof\Services\ProfileLogService;// ...app(ProfileLogService::class)->tick('event-name');// кодapp(ProfileLogService::class)->tick('event-name');


Для отправки пачки в очередь нужно добавить команду в cron:
* * * * * php /var/www/site.com/artisan entry:bulk


Также необходимо запустить консьюмер:
php artisan queue:work --sleep=3 --tries=3

Рекомендуется настроить supervisor. Конфиг (5 консьюмеров):
[program:bulk_write]process_name=%(program_name)s_%(process_num)02dcommand=php /var/www/site.com/artisan queue:work --sleep=3 --tries=3autostart=trueautorestart=trueuser=www-datanumprocs=5redirect_stderr=truestopwaitsecs=3600


UPD:
1. ClickHouse нативно умеет тянуть данные из очереди kafka. Спасибо, sdm
Подробнее..

Паспортный контроль, или Как сжать полтора гигабайта до 42 мегабайт

05.02.2021 14:16:53 | Автор: admin

Однажды, в качестве тестового задания на позицию PHP разработчика была предложена задача реализации сервиса проверки номеров паспортов граждан РФ на предмет нахождения в списке недействительных. Текст задания был лаконичным: Пользовательская база 10 миллионов, время ответа 1 миллисекунда, аптайм 99%.

Входные данные

Для начала посмотрим, в каком виде представлены записи в списке недействительных паспортов. На сайте МВД РФ можно скачать bzip2-архив размером около 460 МБ, внутри которого CSV-файл с двумя колонками PASSP_SERIES,PASSP_NUMBER. Размер распакованного файла примерно 1.5 ГБ. Всего в списке около 130 миллионов записей. Стоит отметить, что не все записи в файле имеют правильный формат номер серии из 4 цифр и номер паспорта из 9 цифр. Встречаются буквенные серии, номера из 5 и меньше цифр, либо номера с символами 3,+,] артефакты распознавания. Итого около 10 тыс. записей имеют неправильный формат. Их можно игнорировать при условии проверки входных данных будущего сервиса не пытаться искать в списке заведомо неправильные номера.

Способ хранения

Требуемое время ответа сервиса в 1 мс накладывает ограничение на возможные способы хранения и поиска по списку. Данные нужно проиндексировать, а индекс хранить в оперативной памяти.

Первое очевидное решение создать таблицу в SQL базе данных с индексом по двум колонкам. В качестве индекса под условия задачи больше подойдет Hash Table со средней сложностью поиска O(1) против O(log n) для B-Tree индекса. Но у такого подхода есть существенный минус избыточность хранимых данных. Например, MEMORY таблица в MySQL занимает 5 ГБ (2 ГБ данные и 3 ГБ индекс).

Для решения исходной задачи необходим только факт наличия или отсутствия записи в списке и не обязательно хранить саму запись. Закодируем серию и номер бинарными значениями: 1 паспорт присутствует в списке, 0 отсутствует. Для всего множества возможных серий и номеров потребуется 9999 х 999999 ~ 10^10 бит ~ 1.25ГБ. Это сопоставимо с размером исходного файла, но уже с поиском за O(1). Но всё множество хранить не обязательно. Заметим, что в исходном списке около 3 тысяч уникальных серий, их можно сделать ключом для секционирования записей хранить номера паспортов с одинаковой серией в одном битовом массиве. Номер паспорта будет соответствовать отступу в массиве. Длина массива будет зависеть от максимального номера в серии. Другими словами, если в серии 3382 встречается только один паспорт с номером 000032, то для всей серии потребуется 4 байта. Однако, если в этой же серии будет ещё паспорт с номером 524288, то размер вырастет до 64 килобайт, при этом почти весь массив будет состоять из нулей. Проанализировав распределение максимальных номеров в сериях, можно приблизительно рассчитать требуемый объем памяти. 3389 серий, среднее максимальное значение номера паспорта 567750. Что даст около 229 мегабайт (в Redis полный список занял 236 МБ). Таким образом мы получим возможность за константное время проверить, присутствует ли конкретный паспорт в списке недействительных, используя объем памяти, в два раза меньший исходного bzip2-архива.

Еще большей экономии можно добиться, воспользовавшись методом сжатия разреженных битовых массивов, например, библиотекой Roaring Bitmaps. Рассчитать занимаемый объем в таком случае уже сложнее, поэтому воспользуемся эмпирическими данными, загрузив список в сервер Pilosa. В итоге получим 42 мегабайта.

Реализация

Соблюдая баланс между эффективным и простым, выберем в качестве хранилища Redis. Используем команды SETBIT/GETBIT для работы с бинарными строками, в качестве имени ключа возьмем серию паспорта, номер паспорта отступ в строке. Чтобы упростить процесс обновления, новый список будем загружать во временную логическую базу Redis-а, а после окончания поменяем местами с активной (команда SWAPDB).

Архив со списком на сервере МВД РФ обновляется раз в сутки. С помощью HTTP запроса HEAD и заголовка ответа Last-Modified можно узнать время последнего обновления и не загружать большой файл без необходимости. Сам файл можно распаковывать и загружать в Redis в потоковом режиме, не сохраняя на диск и используя фильтр потока 'bzip2.decompress'.

Проверку паспортов на вхождение в список недействительных будем осуществлять в пакетном режиме, принимая до 500 номеров в одном запросе. Это позволит проверить всю базу 10 миллионов пользователей при 8 параллельных потоках меньше чем за 5 секунд.

Развёртывание

Осталось выполнить последнее требование задания аптайм 99%. Это означает, что сервис может быть недоступен 3,5 дня в течение года, либо по 14 минут каждый день. Такой доступности можно добиться, разместив сервис у провайдера с соответствующим SLA, добавив репликацию и балансировку.

Следуя современным методикам развёртывание приложений, упакуем сервис в контейнер Docker.

Исходный код

Сервис реализован на PHP 8.0 с использованием библиотек Guzzle, PHP-DI, Workerman.
Исходный код доступен в репозитории https://github.com/maurokouti/passport-control/.

Подробнее..

Red Hat Flatpak, DevNation Day, шпаргалка по программированию на Cи и пять вебинаров на русском

27.08.2020 16:05:58 | Автор: admin


Полезные ссылки на живые мероприятия, видео, митапы, техтолки и книги ниже в нашем еженедельном посте.

Начни новое:



Качай:


  • Шпаргалка по языку программирования C
    C это классика компилируемых языков программирования, концептуальный предок Lua, C++, Java, Go и многих других современных языков, ну и просто отличный выбор, чтобы начать учиться программированию. Эта шпаргалка содержит полезную выжимку по синтаксису C.

Строй:



Событие сентября присоединяйтесь!




15 сентября состоится DevNation Day абсолютно бесплатная виртуальная конференция по новым компьютерным технологиям и защите компьютерным программ (тм) ну то есть вопросам разработки и технологий. В это году в центре внимания 4 темы: Kubernetes/OpenShift, JavaScript, Python и Java.

Помимо экспертов Red Hat выступят представители Google, MongoDB, Redis, Snyk, Tail, Auth0, Ionic и многих других ведущих компаний. Никуда ехать не надо сидите (или лежите) там, где вам удобно, смотрите-слушайте и общайтесь с докладчиками через онлайновые опросы и чаты.

Пообщаться:



По-русски:


Мы начинаем серию пятничных вебинаров про нативный опыт использования Red Hat OpenShift Container Platform и Kubernetes. Регистрируйтесь и приходите:

Подробнее..

Tarantool vs Redis что умеют in-memory технологии

01.04.2021 18:23:17 | Автор: admin

В этой статье я хочу сравнить Redis и Tarantool. У меня нет цели сделать громогласный вывод Tarantool лучше! или Redis круче!. Я хочу понять их сходства и отличия, разобраться, для каких задач какую технологию выбрать. Потому что это очень близкие на первый взгляд вещи, и вопросы про их отличия я вижу часто.

Для этого мы посмотрим на технологии в трёх частях:

  • Вначале посмотрим глазами новичка. Что такое БД в памяти? Какие задачи они решают лучше дисковых БД?
  • Потом посмотрим архитектурно. Как обстоит вопрос с производительностью, надёжностью, масштабированием?
  • В третьей части лезем в технические вещи поглубже. Типы данных, итераторы, индексы, транзакции, ЯП, репликация, коннекторы.

Смело переходите сразу к наиболее интересной вам части. Или даже сразу к итоговой табличке сравнения, которую я прикладываю в заключении.

Поехали!

Содержание


  1. Вводная часть
    • Что такое БД в памяти
    • Зачем нужны решения в памяти
    • Что такое Redis
    • Что такое Tarantool
  2. Архитектурная часть
    • Производительность
    • Надёжность
    • Масштабируемость
    • Валидация схемы данных
  3. Технические особенности
    • Какие типы данных можно хранить
    • Вытеснение данных
    • Итерация по ключам
    • Вторичные индексы
    • Транзакции
    • Персистентность
    • Язык программирования для хранимых процедур
    • Репликация
    • Коннекторы из других языков программирования
    • Под какие задачи плохо подходят
    • Экосистема
    • Чем Redis лучше
    • Чем Tarantool лучше
  4. Вывод
  5. Ссылки

1. Вводная часть


Что такое БД в памяти


Redis и Tarantool это in-memory технологии. Их ещё называют резидентными БД, но я буду писать короче в памяти или in-memory. Так что такое БД в памяти?

Это база, которая весь объём данных хранит целиком в оперативной памяти. Размер такой базы лимитирован ёмкостью оперативной памяти узла, что может ограничивать нас в количестве данных, но увеличивает скорость на порядок.

Если данных слишком много, БД в памяти способны сохранять данные на диск. Можно перезагрузить узел и не потерять информацию. Стереотип про ненадёжность БД в памяти сильно устарел, их можно использовать как основное хранилище в production. Например, Mail.ru Cloud Solutions использует Tarantool как основную БД для хранения метаинформации в своём объектном хранилище [1].

БД в памяти нужны для высокой скорости доступа к данным, условно от 10 000 запросов в секунду. Например, запросы к ленте новостей Кинопоиска в день релиза Снайдерката Лиги Справедливости, Яндекс.Маркет перед Новым Годом или Delivery Club вечером в пятницу.

Зачем нужны решения в памяти


Кеш. БД в памяти традиционно используют как кеш для более медленных баз данных. Это логично, память быстрее диска (даже SSD). Но в жизненном цикле любого кеша случаются перезагрузки, падения, сетевая недоступность, нехватка памяти и прочие инфраструктурные беды.

С течением времени кеши стали уметь в персистентность, резервирование и шардирование.

  • Персистентность кеши сохраняют данные на диск. После перезагрузки восстанавливаем своё состояние без обращений к основному хранилищу. Если этого не делать, то обращение к холодному кешу будет очень долгим или даже положит основную БД.
  • Резервирование кеши содержат функции репликации данных. Если упал один узел, то второй возьмёт на себя запросы. Основное хранилище не упадёт от перегруза, нас спасает резервная нода.
  • Шардирование если горячие данные не помещаются в оперативную память одного узла, мы используем несколько узлов параллельно. Это горизонтальное масштабирование.

Шардирование это большая система. Резервирование это надёжная система. Вместе с персистентностью получается кластерное хранилище данных. Можно положить туда терабайты информации и крутить их на скорости 1 000 000 RPS.


OLTP. Расшифровывается как Online Transaction Processing, обработка транзакций в реальном времени. In-memory решения подходят для такого типа задач благодаря своей архитектуре. OLTP это большое количество коротких on-line транзакций (INSERT, UPDATE, DELETE). Главное в OLTP-системах быстро обработать запросы и обеспечить целостность данных. Эффективность чаще всего определяется количеством RPS.

Что такое Redis


  • Redis называет себя in-memory хранилище структур данных.
  • Redis это key-value.
  • Больше всего известен по кешированию дисковых баз. Если вы будете искать по ключевым словам кеширование баз данных, то в каждой статье найдёте упоминания Redis.
  • Redis поддерживает первичные индексы, не поддерживает вторичные.
  • Redis содержит в себе механизм хранимых процедур на Lua.

Что такое Tarantool


  • Tarantool называет себя платформа для in-memory вычислений.
  • Tarantool умеет в key-value. А еще в документы и реляционную модель данных.
  • Создан для горячих данных кеширования MySQL в соцсети. С течением времени развился в полноценную базу данных.
  • Tarantool может строить произвольное количество индексов по данным.
  • В Tarantool тоже можно написать хранимую процедуру и тоже на Lua.

Разобрались с основами, давайте переходить на следующий уровень.

2. Архитектурная часть


Производительность


Это самый любимый запрос про БД в памяти а насколько вы быстрые? Сколько миллионов РПС можно снять с одного ядра? Проведём простой синтетический тест, в нём максимально приблизим настройки баз данных. Скрипт на Go наполняет хранилище случайными ключами со случайными значениями.

MacBook Pro 2,9 GHz Quad-Core Intel Core i7Redis version=6.0.9, bits=64Tarantool 2.6.2

Redis

redis_test.gopackage mainimport (    "context"    "fmt"    "log"    "math/rand"    "testing"    "github.com/go-redis/redis")func BenchmarkSetRandomRedisParallel(b *testing.B) {    client2 := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379", Password: "", DB: 0})    if _, err := client2.Ping(context.Background()).Result(); err != nil {        log.Fatal(err)    }    b.RunParallel(func(pb *testing.PB) {        for pb.Next() {            key := fmt.Sprintf("bench-%d", rand.Int31())            _, err := client2.Set(context.Background(), key, rand.Int31(), 0).Result()            if err != nil {                b.Fatal(err)            }        }    })}

Tarantool

tarantool>box.cfg{listen='127.0.0.1:3301', wal_mode='none', memtx_memory=2*1024*1024*1024}box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists=true,})box.schema.space.create('kv', {if_not_exists=true,})box.space.kv:create_index('pkey', {type='TREE', parts={{field=1, type='str'}},                                   if_not_exists=true,})

tarantool_test.gopackage mainimport (    "fmt"    "math/rand"    "testing"    "github.com/tarantool/go-tarantool")type Tuple struct {    _msgpack struct{} `msgpack:",asArray"`    Key      string    Value    int32}func BenchmarkSetRandomTntParallel(b *testing.B) {    opts := tarantool.Opts{        User: "guest",    }    pconn2, err := tarantool.Connect("127.0.0.1:3301", opts)    if err != nil {        b.Fatal(err)    }    b.RunParallel(func(pb *testing.PB) {        var tuple Tuple        for pb.Next() {            tuple.Key = fmt.Sprintf("bench-%d", rand.Int31())            tuple.Value = rand.Int31()            _, err := pconn2.Replace("kv", tuple)            if err != nil {                b.Fatal(err)            }        }    })}

Запуск Чтобы полностью прогрузить базы данных, используем больше потоков.

go test -cpu 12 -test.bench . -test.benchtime 10sgoos: darwingoarch: amd64BenchmarkSetRandomRedisParallel-12          929368         15839 ns/opBenchmarkSetRandomTntParallel-12            972978         12749 ns/op

Результаты. Среднее время запроса к Redis составило 15 микросекунд, к Tarantool 12 микросекунд. Это даёт Redis 63 135 RPS, Tarantool 78 437 RPS.

Тест нужен, чтобы показать уровень производительности БД в памяти, а не для замера, кто быстрее. Каждый из вас может измерить так, что быстрее окажется нужный вариант, я это тоже понимаю.


Надёжность


Для надёжности хранения данных используют две основные техники:

  • Персистентность. При перезагрузке БД загрузит свои данные с диска, не будет запросов в сторонние системы.
  • Репликация. Если упал один узел, то есть копия на втором. Бывает асинхронная и синхронная.

И Redis, и Tarantool содержат эти функции. Технические подробности мы рассмотрим далее.

Масштабируемость


Масштабирование может рассматриваться для двух задач:

  • Зарезервировать дополнительные узлы, которые могут заменять друг друга в случае, если сосед вышел из строя.
  • Данные не помещаются на один узел, и их необходимо распределить на несколько.

Redis

Узлы Redis можно соединить друг с другом асинхронной репликацией. Такие узлы будем называть репликационной группой, или replica set. Управлением такой репликационной группой занимается Redis Sentinel.

Redis Sentinel это один или несколько объединенных в кластер специальных процессов, которые следят за узлами Redis. Они выполняют четыре основные задачи:

  • Мониторинг узлов в группе: живой или мертвый.
  • Уведомление администратора или какой-то системы, если что-то случилось в группе.
  • Автоматическое переключение мастера.
  • Провайдер конфигурации для внешних клиентов, чтобы они знали, к кому подключиться.

В случае, когда данные необходимо расшардировать на несколько узлов, Redis предлагает open source-версию Redis Cluster. Она позволяет построить кластер, состоящий из нескольких репликационных групп. Данные в кластере шардируются по 16 384 слотам. Диапазоны слотов распределяются между узлами Redis.

Узлы в кластере общаются по отдельному открытому порту, чтобы понимать состояния соседей. Приложение при работе с Redis Cluster должно использовать специальный коннектор.

Tarantool

Tarantool также содержит в себе оба механизма масштабирования: репликацию и шардирование. Основной инструмент управления масштабированием Tarantool Cartridge. Он объединяет узлы в репликационные группы. В этой ситуации вы можете построить одну такую репликационную группу и использовать её аналогично Redis Sentinel. Tarantool Cartridge может управлять несколькими репликационными группами и шардировать данные между ними. Шардирование выполняется с помощью библиотеки vshard.

Различия

Администрирование

  • Администрирование Redis Cluster с помощью скриптов и команд.
  • В Tarantool Cartridge администрирование с помощью web-интерфейса или через API.

Корзины шардирования

  • Количество корзин шардирования в Redis фиксированное, 16 тыс.
  • Количество корзин шардирования Tarantool Cartridge (vshard) произвольное. Указывается один раз при создании кластера.

Ребалансировка корзин (решардинг)

  • В Redis Cluster настройка и запуск вручную.
  • В Tarantool Cartridge (vshard) автоматически.

Маршрутизация запросов

  • Маршрутизация запросов в Redis Cluster происходит на стороне клиентского приложения.
  • В Tarantool Cartridge маршрутизация запросов происходит на узлах-роутерах кластера.

Инфраструктура

  • Tarantool Cartridge также содержит:

    • механизм map/reduce запросов;
    • утилиту по упаковке приложения в пакеты rpm, dep и tar.gz;
    • Аnsible-роль для автоматического развёртывания приложения;
    • экспорт параметров мониторинга кластера.


Валидация схемы данных


В Redis основная схема данных ключ-значение. Но в значениях могут быть разные структуры. На стороне сервера нет механизма для задания правил. Мы не можем указать, в каком ключе какой тип данных должен использоваться и какая именно структура должна быть у значения. Валидацией схемы должен заниматься или коннектор, или клиентское приложение.

В Tarantool на стороне сервера можно использовать валидацию по схеме данных:

  • с помощью встроенной валидации box.space.format, которая затрагивает только верхний уровень полей;
  • с помощью установленного расширения avro-schema.

3. Технические особенности


Какие типы данных можно хранить


В Redis ключом может быть только строка. В Redis можно хранить и манипулировать следующими типами данных:

  • строки;
  • списки строк;
  • неупорядоченные множества строк;
  • хешмапы или просто строковые пары ключ-значение;
  • упорядоченные множества строк;
  • Bitmap и HyperLogLog.

В Tarantool можно хранить и манипулировать следующими типами данных:

  • Атомарными:
    • строки;
    • логический тип (истина, ложь);
    • целочисленный;
    • с плавающей запятой;
    • с десятичной плавающей запятой;
    • UUID.

  • Комплексными:
    • массивы;
    • хешмапы.


Типы данных Redis лучше подходят для счётчиков событий, в том числе уникальных, для хранения небольших готовых витрин данных. А типы данных Tarantool лучше подходят для хранения объектов и/или документов, как в SQL и NoSQL СУБД.

Вытеснение данных


Redis и Tarantool содержат в себе механизм ограничения занимаемой памяти. Когда клиент попытается добавить ещё данные, когда лимит уже был исчерпан, базы ответят ошибкой. И Redis, и Tarantool в этой ситуации продолжат выполнять запросы на чтение.

Перейдём к другому механизму, когда мы можем настроить алгоритм удаления больше ненужных данных. Redis содержит в себе несколько механизмов вытеснения:

  • TTL вытеснение объектов по завершении срока жизни;
  • LRU вытеснение давно использованных данных;
  • RANDOM вытеснение случайно попавшихся под руку объектов;
  • LFU вытеснение редко используемых данных.

Все механизмы могут быть настроены либо на весь объем данных, либо только на те объекты, которые помечены как вытесняемые.

В Tarantool для вытеснения данных можно использовать расширения expirationd или indexpiration, или создать собственную фоновую процедуру, которая будет проходить по вторичному индексу (с таймштампом) и удалять ненужные данные.

Итерация по ключам


В Redis можно это сделать с помощью операторов:

  • SCAN;
  • итерация по ключам.

Операции возвращают страницы с результатами. Для получения каждой новой страницы, необходимо передать идентификатор предыдущей. Операции поддерживают фильтрацию по шаблону. Для этого используется параметр MATCH. Фильтрация происходит на момент выдачи страницы, поэтому некоторые страницы могут оказаться пустыми. Это не будет означать, что страниц больше не осталось.

В Tarantool доступна гибкая схема итерации по ключам. Можно итерироваться в прямом и обратном направлении. В процессе можно дополнительно фильтровать значения. Можно сместиться на определённое значение ключа, затем проходить по следующим ключам в сторону возрастания или убывания. Направление прохода на лету менять нельзя.

Например:

results = {}for _, tuple in box.space.pairs('key', 'GE') do    if tuple['value'] > 10 then        table.insert(results, tuple)  endendreturn results

Вторичные индексы


Redis

У Redis нет вторичных индексов. Есть некоторые трюки, чтобы их имитировать:

  • В упорядоченных множествах можно использовать порядковый номер элемента как вторичный ключ.
  • Использовать хешмапы, ключ которых является, в некотором смысле, индексом данных.

Tarantool

В Tarantool можно строить произвольное количество вторичных индексов для данных:

  • Вторичные ключи могут состоять из нескольких полей.
  • Для вторичных индексов можно использовать типы HASH, TREE, RTREE, BITSET.
  • Вторичные индексы могут содержать уникальные и не уникальные ключи.
  • У любых индексов можно использовать настройки локали, например, для регистронезависимых строковых значений.
  • Вторичные индексы могут строиться по полям с массивом значений (иногда их называют мультииндексы).

Вывод

Вторичные ключи и удобные итераторы позволяют строить в Tarantool реляционные модели хранения данных. В Redis такую модель построить невозможно.

Транзакции


Механизм транзакций позволяет выполнить несколько операций атомарно. И Redis, и Tarantool поддерживают транзакции. Пример транзакции в Redis:

> MULTIOK> INCR fooQUEUED> INCR barQUEUED> EXEC1) (integer) 12) (integer) 1

Пример транзакции в Tarantool:

do  box.begin()  box.space.kv:update('foo', {{'+', 'value', 1}})  box.space.kv:update('bar', {{'+', 'value', 1}})  box.commit()end

Персистентность


Персистентность данных обеспечивается двумя механизмами:

  • периодическим сбросом in-memory данных на диск snapshoting;
  • последовательной упреждающей записью всех приходящих операций в файл transaction journal.

И Redis, и Tarantool содержат оба механизма персистентности.

Redis

Redis периодически сбрасывает все данные из памяти на диск. Происходит это по-умолчанию каждые 60 секунд (настраивается). Redis использует механизм ОС fork для копирования текущих данных в памяти, затем информация сохраняется на диск. Если происходит аварийное завершение, то Redis восстановит состояние из последнего сохранения. Если последний снапшот был сделан давно, то данные, пришедшие после снапшота, будут утеряны.

Журнал операций используется для сохранения всей приходящей в базу информации. Каждая операция сохраняется в журнал на диске. Так, при запуске Redis восстанавливает своё состояние из снапшота и затем донакатывает оставшиеся транзакции из журнала.

  • Снапшот в Redis называется RDB (redis database).
  • Журнал операций в Redis называется AOF (append only file).

Tarantool

  • Механизм персистентности взят из архитектур баз данных.
  • Он является целостным снапшоты и журналирование.
  • Этот же механизм позволяет существовать надежной WAL-based репликации.

Tarantool периодически сохраняет текущие in-memory данные на диск и записывает каждую операцию в журнал.

  • Снапшот в Tarantool называется snap (snapshot). Можно делать с произвольной частотой.
  • Журнал транзакций в Tarantool называется WAL (write ahead log).

И в Redis, и в Tarantool каждый из механизмов может быть выключен. Для надёжного хранения данных оба механизма надо включить. Для максимального быстродействия можно отключить снапшотинг и журналирование, заплатив персистентностью. Слабоумие и отвага!

Различия

Для снапшотинга в Redis используется механизм ОС fork. Tarantool использует внутренний readview всех данных, это работает быстрее чем fork.

В Redis по умолчанию включён только снапшотинг. В Tarantool включён снапшотинг и журнал.

Redis хранит и использует только по одному файлу для снапшотов и журнала операций. Tarantool по-умолчанию хранит два файла снапшотов (можно настроить и больше) и консистентно дополняющее неограниченное количество журналов операций. При повреждении снапшот-файла Tarantool сможет загрузиться из предыдущего. Для Redis необходимо наладить механизм бекапов.

В Tarantool, в отличие от Redis, снапшоты и журналы образуют единый механизм отображения данных в файловой системе. Это значит, что в Tarantool и в файлах снапшотов и в журналах хранится полная метаинформация о транзакции, кто её сделал и когда. Она одного формата и взаимодополняющая.

Troubleshooting

Если повреждён файл журнала в Redis:

redis-check-aof --fix

Если повреждён файл журнала в Tarantool:

tarantool> box.cfg{force_recovery=true}

Язык программирования для хранимых процедур


Хранимые процедуры это код, выполняющийся рядом с данными. И Redis, и Tarantool предлагают Lua для создания хранимок. С точки зрения пользователя это очень простой язык. Он создавался для людей, для которых программирование будет инструментом решения задач в предметной области.

C точки зрения разработчика базы данных:

  • Lua это язык, который легко встраивается в существующее приложение.
  • Он просто интегрируется с объектами и процессами приложения.
  • Lua имеет динамическую типизацию и автоматическое управление памятью.
  • Язык имеет сборщик мусора incremental Mark&Sweep.

Различия

Реализация

  • В Redis используется ванильная реализация PUC-Rio.
  • В Tarantool используется LuaJIT.

Таймаут задач

  • В Redis можно задать таймаут, после которого выполнение хранимой процедуры прервётся.
  • В Tarantool хранимые процедуры компилируются и выполняются быстрее, но в этом механизме нет возможности выставить таймаут. Для прерывания хранимой процедуры пользователь должен предусмотреть механизм проверки флага прерывания.

Runtime

  • В Redis используется однозадачность: задачи выполняются по одной и целиком.
  • В Tarantool используется кооперативная многозадачность. Задачи выполняются по одной, но при этом задача отдаёт управление на операциях ввода-вывода или явно с помощью yield.

Вывод

  • В Redis Lua это просто хранимые процедуры.
  • В Tarantool это кооперативный runtime, в котором можно взаимодействовать со внешними системами.

Репликация


Репликация это механизм копирования объектов с одного узла на другой. Бывает асинхронная и синхронная.

  • Асинхронная репликация: при вставке объекта на один узел мы не дожидаемся, когда этот же объект будет отреплицирован на второй узел.
  • Синхронная репликация: при вставке объекта мы дожидаемся, когда он будет сохранён на первом и втором узлах.

И Redis, и Tarantool поддерживают асинхронную репликацию. Только Tarantool умеет в синхронную репликацию.

На практике бывают ситуации, когда мы хотим дождаться репликации объекта. И в Redis, и в Tarantool есть способы для этого:

  • В Redis это команда wait. Она принимает два параметра:
    • сколько реплик должны получить объект;
    • сколько ждать, пока это произойдёт.

  • В Tarantool это можно сделать фрагментом кода:

псевдокод:

local netbox = require('net.box')local replica = netbox.connect(...)local replica_vclock, err = replica.eval([[    return box.info().vclock]])while not vclock_compare(box.info().vclock, replica_vclock) do    fiber.sleep(0.1)end

Синхронная репликация

В Redis нет синхронной репликации. Начиная с Tarantool 2.6 синхронная репликация доступна [2].

Коннекторы из других языков программирования


И Redis, и Tarantool поддерживают коннекторы для популярных языков программирования:

  • Go;
  • Python;
  • NodeJS;
  • Java.

Полные списки:


Под какие задачи плохо подходят


И Redis, и Tarantool плохо подходят для решения OLAP-задач. Online analytical processing имеет дело с историческими или архивными данными. OLAP характеризуется относительно низким объёмом транзакций. Запросы часто очень сложны и включают агрегацию.

В обоих случаях данные хранятся построчно, и это снижает эффективность алгоритмов агрегации в сравнении с базами с колоночным хранением.

Redis и Tarantool однопоточные базы данных, что не позволяет распараллелить аналитические запросы.

Экосистема


Redis

Модули Redis представлены в трёх категориях:

  • Enterprise;
  • проверенные и сертифицированные для Enterprise и Open source;
  • непроверенные.

Enterprise-модули:

  • полнотекстовый поиск;
  • хранение и поиск по bloom-фильтрам;
  • хранение временных рядов.

Сертифицированные:

  • хранение графов и запросы к ним;
  • хранение JSON и запросы к нему;
  • хранение и работа с моделями машинного обучения.

Все модули, отсортированные по количеству звёзд на Github: https://redis.io/modules

Tarantool

Модули представлены в двух категориях:


Чем Redis лучше


  • Проще.
  • В интернете представлено больше информации, 20 тыс. вопросов на Stackoverflow (из них 7 тыс. без ответов).
  • Ниже порог входа.
  • Как следствие, проще найти людей, которые умеют работать с Redis.

Чем Tarantool лучше


  • Русскоязычная бесплатная поддержка в Telegram от разработчиков.
  • Есть вторичные индексы.
  • Есть итерация по индексу.
  • Есть UI для администрирования кластера.
  • Предлагает механику сервера приложений с кооперативной многозадачностью. Эта механика похожа на однопоточный Go.

4. Вывод


Redis это классный продвинутый кеш, но его нельзя брать как основное хранилище. Tarantool это мультипарадигменная база данных, можно брать как основное хранилище. Tarantool поддерживает:

  • Реляционную модель хранения с SQL.
  • Распределённое NoSQL-хранилище.
  • Создание продвинутых кешей.
  • Создание брокера очередей.

У Redis ниже порог входа. У Tarantool выше потолок в production.

Сравнение одной таблицей:

Redis Tarantool
Описание Продвинутый кэш в памяти. Мультипарадигменная СУБД с сервером приложений.
Модель данных Key-value Key-value, документы, реляционная
Сайт redis.io www.tarantool.io
Документация redis.io/%C2%ADdocumentation www.tarantool.io/ru/doc/latest
Разработчик Salvatore Sanfilippo, Redis Labs mail.ru Group
Текущий релиз 6.2 2.6
Лицензия The 3-Clause BSD License The 2-Clause BSD License
Язык реализации C C, C++
Поддерживаемые ОС BSD, Linux, MacOS, Win BSD, Linux, MacOS
Схема данных Key-value Гибкая
Вторичные индексы Нет Есть
Поддержка SQL Нет Для одного инстанса, ANSI SQL
Foreign keys Нет Есть, с помощью SQL
Триггеры Нет Есть
Транзакции Оптимистичные блокировки, атомарное выполнение. ACID, read commited
Масштабирование Шардинг по фиксированному диапазону. Шардинг по настраиваемому количеству виртуальных бакетов.
Многозадачность Да, сериализация сервером. Да, кооперативная многозадачность.
Персистентность Снапшоты и журналирование. Снапшоты и журналирование.
Концепция консистентности Eventual ConsistencyStrong eventual consistency with CRDTs Immediate Consistency
API Проприетарный протокол. Свой открытый бинарный протокол.
Язык скриптов Lua Lua
Поддерживаемые языки C
C#, C++, Clojure, Crystal, D, Dart, Elixir, Erlang, Fancy, Go, Haskell, Haxe, Java, JavaScript (Node.js), Lisp, Lua, MatLab, Objective-C, OCaml, Pascal, Perl, PHP, Prolog, Pure Data, Python, R, Rebol, Ruby, Rust, Scala, Scheme, Smalltalk, Swift, Tcl, Visual Basic
C, C#, C++, Erlang, Go, Java, JavaScript, Lua, Perl, PHP, Python, Rust

5. Ссылки


  1. Архитектура S3: три года эволюции Mail.ru Cloud Storage
  2. Синхронная репликация в Tarantool
  3. Скачать Tarantool можно на официальном сайте, а получить помощь в Telegram-чате.
Подробнее..

Как Spring Data работает с Redis

02.09.2020 18:14:51 | Автор: admin

Redis (Remote Dictionary Server) - заслужено считается старичком в мире NoSql решений. Этот пост про то, как Spring Data с ним работает. Идея написания данного поста возникла потому, что Redis не совсем похож на привычную базу, он поддерживает типы данных, которые не удобно использовать для хранения объектов(Кеш не в счет) и выполнять поиск по определенным полям. Здесь на примерах я постараюсь описать как с ним работает Spring Data посредством привычного CrudRepository и QueryDSL. Это не пример HowTo, которых множество. Кому интересны внутренности идем дальше.

Примеры будут основаны на простом проекте. Redis поднимается в докер контейнере, spring-boot приложение, которое тоже в контейнере с ним общается. Приложение содержит простую модель, репозиторий, сервис и контроллер. Потрогать все это можно через swagger на localhost:8080.
Кроме команд, которые сервис выполняет к базе я буду приводить еще небольшой псевдо-код, который более понятно описывает происходящее.

Работать мы будем с сущностью Student:

@Data@AllArgsConstructor@NoArgsConstructor@RedisHash("Student")public class Student {    @Id    private String id;    private String name;    private int age;}

Здесь, необходимо уточнить что аннотация @RedisHash("Student") говорит о том, под каким ключом будут агрегироваться все сущности.

Попробуем сохранить первого студента:

curl -X POST "http://localhost:8080/save" -H  "accept: */*" -H  "Content-Type: application/json" -d "{\"id\":\"1\",\"name\":\"Stephen\",\"age\":12}"

Выполнились 3 команды:

"DEL" "Student:1""HMSET" "Student:1" "_class" "com.odis.redisserviceweb.model.Student" "id" "1" "name" "Stephen" "age" "12""SADD" "Student" "1"

Мы видим, что первая команда - это "DEL" "Student:1", говорит о том что удали запись с ключом "Student:1". Этот ключ был сформирован с помощью значения аннотации @RedisHash + значение поля помеченного аннотацией @Id.

Далее следует "HMSET" "Student:1" "_class" "com.odis.redisserviceweb.model.Student" "id" "1" "name" "Stephen" "age" "12". Эта команда добавляет значения в хеш с именем "Student:1". На псевдо-коде это будет выглядеть как

Map "Student:1";"Student:1".put("_class", "com.odis.redisserviceweb.model.Student");"Student:1".put("id", "1");"Student:1".put("name", "Stephen");"Student:1".put("age", "12");

Ну и завершающая команда - это "SADD" "Student" "1" - добавляет в сет с именем "Student" значение "1".
В итоге что мы получили? Было создано два объекта в Redis. Первый - хеш с именем "Student:1", второй - сет с именем "Student".

Выполнив команду keys * - дай все ключи (Не выполнять на проде под страхом экзекуции) получим:

127.0.0.1:6379> keys *1) "Student"2) "Student:1"

Типы у этих объектов, как и было ранее описано:

127.0.0.1:6379> type "Student"set127.0.0.1:6379> type "Student:1"hash

Собственно - зачем два объекта? Сейчас все станет на свои места.

В поиске по @Id нет ничего необычного:

curl -X GET "http://localhost:8080/get/1" -H  "accept: */*"

Сформировался ключ "Student:1" и выполнилась команда, которая и вернула искомый объект:

"HGETALL" "Student:1"1) "_class"2) "com.odis.redisserviceweb.model.Student"3) "id"4) "1"5) "name"6) "Stephen"7) "age"8) "12"

А теперь попробуем выполнить поиск всего, что мы сохранили, добавив перед этим еще одного студента:

curl -X POST "http://localhost:8080/save" -H  "accept: */*" -H  "Content-Type: application/json" -d "{\"id\":\"2\",\"name\":\"Macaulay\",\"age\":40}"curl -X GET "http://localhost:8080/get" -H  "accept: */*"

Выполнилось 3 команды:

"SMEMBERS" "Student"1) "1"2) "2""HGETALL" "Student:1"1) "_class"2) "com.odis.redisserviceweb.model.Student"3) "id"4) "1"5) "name"6) "Stephen"7) "age"8) "12"127.0.0.1"HGETALL" "Student:2"1) "_class"2) "com.odis.redisserviceweb.model.Student"3) "id"4) "2"5) "name"6) "Macaulay"7) "age"8) "40"

Сначала мы получили список всех ключей и после этого сделали запрос по каждому на получение значения. Именно поэтому было создано несколько объектов - "Student" - хранит все ключи, и по одному объекту на каждого студента с ключом "Student:@Id". Получается, что получение всех студентов имеет сложность O (N) где N - количество объектов в базе.

Удалим студунта:

curl -X DELETE "http://localhost:8080/delete/1" -H  "accept: */*"

Получаем:

"HGETALL" "Student:1"1) "_class"2) "com.odis.redisserviceweb.model.Student"3) "id"4) "1"5) "name"6) "Stephen"7) "age"8) "12""DEL" "Student:1"(integer) 1"SREM" "Student" "1"(integer) 1"SMEMBERS" "Student:1:idx"(empty array)"DEL" "Student:1:idx"(integer) 0

Смотрим, есть ли студент с нужным нам Id Удаляем этот хеш. Удаляем из сета "Student" ключ "1".
А дальше фигурирует объект Student:1:idx. О нем речи не шло ранее. Давайте посмотрим, зачем он необходим. Но для начала попробуем добавить метод в наш репозиторий для поиска студента по имени:

List<Student> findAllByName(String name);

Приложение поднялось сохраняем студента и делаем поиск по имени:

curl -X POST "http://localhost:8080/save" -H  "accept: */*" -H  "Content-Type: application/json" -d "{\"id\":\"1\",\"name\":\"Stephen\",\"age\":12}"curl -X GET "http://localhost:8080/get/filter/Stephen" -H  "accept: */*"

В ответе у нас пустой массив, а в логах запросов к Redis видим:

"SINTER" "Student:name:Stephen"(empty array)

Команда "SINTER" - Возвращает элементы сета, полученные в результате пересечения всех данных сетов, в нашем случае передан только один сет - "Student:name:Stephen" но мы о нем ничего не знаем и он не создавался.
Дело в том, что если мы хотим искать по полю, которое не помечено аннотацией @Id, это поле должно быть помечено аннотацией @Indexed и тогда Spring Data сделает дополнительные манипуляции при сохранении студента, т. к. понятие индекс в Redis отсутствует. Пометим поле name этой аннотацией:

@Data@AllArgsConstructor@NoArgsConstructor@RedisHash("Student")public class Student {    @Id    private String id;    @Indexed    private String name;    private int age;}

Теперь сохраним студента в чистую базу:

curl -X POST "http://localhost:8080/save" -H  "accept: */*" -H  "Content-Type: application/json" -d "{\"id\":\"1\",\"name\":\"Stephen\",\"age\":12}"

И в логах команд видим:

"DEL" "Student:1""HMSET" "Student:1" "_class" "com.odis.redisserviceweb.model.Student" "id" "1" "name" "Stephen" "age" "12""SADD" "Student" "1""SADD" "Student:name:Stephen" "1""SADD" "Student:1:idx" "Student:name:Stephen"

Первые три команды нам знакомы, но добавились еще две: создали сет "Student:name:Stephen" имя которого состоит из ключа, названия поля, помеченного аннотацией @Indexed и значением этого поля. В этот сет был добавлен Id этого студента. Если у нас появится студент с другим Id и именем Stephen его Id так же будет добавлен в этот сет. И был создан сет, который хранит все ключи индексов которые были созданы для этого объекта. Получилось что-то по типу:

Map "Student:1";"Student:1".put("_class", "com.odis.redisserviceweb.model.Student");"Student:1".put("id", "1");"Student:1".put("name", "Stephen");"Student:1".put("age", "12");Set "Student";"Student".add("1");Set "Student:name:Stephen";"Student:name:Stephen".add("1");Set "Student:1:idx";"Student:1:idx".add("Student:name:Stephen");

Теперь должно работать, Выполним поиск по имени и получим наше значение, в логах команд Redis будет:

"SINTER" "Student:name:Stephen""HGETALL" "Student:1"

Получаем Id студентов, и вытаскиваем их из хеша. Опять же количество операций прямо пропорционально количеству данных.

Так же мы можем искать по нескольким полям для этого в качестве аргумента команды SINTER будут передаваться два объекта. Команда вернет пересекающееся id и выполнит по ним поиск.
В примере есть метод поиска по имени и возрасту.

Как видим, Spring Data достаточно неплохо интерпретирует работу с объектами и индексами в Redis. Данный подход можно перенять и не используя решение Spring.
Недостаток - это то, что поля по которым будет проводится поиск должны быть промаркированы аннотацией @Indexed с самого начала. В противном случае, "индексы" будут созданы только для объектов, которые сохраняются после добавления этой аннотации. И да, я понимаю, что Redis это не лучшее решения для таких нужд, но если в силу определенной ситуации его необходимо будет использовать, то SpringData сумеет это сделать достаточно неплохо.

Подробнее..
Категории: Nosql , Redis , Java , Spring , Spring data

Очередь отложенных событий delayedQueue

22.08.2020 10:17:49 | Автор: admin

Пару лет назад в одном из проектов мы столкнулись с необходимостью откладывать выполнение некоего действия на определенный промежуток времени. Например, узнать статус платежа через три часа или повторно отправить уведомление через 45 минут. Однако на тот момент мы не нашли подходящих библиотек, способных "откладывать" и не требующих дополнительного времени на настройку и эксплуатацию. Мы проанализировали возможные варианты и написали собственную маленькую библиотеку delayed queue на Java с использованием Redis в роли хранилища. В этой статье я расскажу про возможности библиотеки, ее альтернативы и те "грабли", на которые мы наткнулись в процессе.


Функциональность


Итак, что же делает delayed queue? Событие, добавленное в отложенную очередь, доставляется обработчику через указанный промежуток времени. Если процесс обработки завершается неудачно, событие будет доставлено снова позднее. При этом максимальное количество попыток ограничено. Redis не дает гарантий сохранности, и к потере событий нужно быть готовым. Однако в кластерном варианте Redis показывает достаточно высокую надежность, и мы ни разу не столкнулись с этим за полтора года эксплуатации.


API


Добавить событие в очередь


eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();

Обратите внимание, что метод возвращает Mono, поэтому для запуска надо выполнить одно из следующих действия:


  • subscribe(...)
  • block()

Более подробные разъяснения приводятся в документации по Project Reactor. Контекст добавляется к событию следующим образом:


eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();

Зарегистрировать обработчик событий


eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);

если вместе с событием необходимо обработать пришедший контекст, то:


eventService.addHandler(        DummyEvent.class,        e -> Mono            .subscriberContext()            .doOnNext(ctx -> {                Map<String, String> eventContext = ctx.get("eventContext");                log.info("context key {}", eventContext.get("key"));            })            .thenReturn(true),        1);

Удалить обработчик событий


eventService.removeHandler(DummyEvent.class);

Создание сервиса


Можно воспользоваться настройками "по-умолчанию":


import static com.github.fred84.queue.DelayedEventService.delayedEventService;var eventService = delayedEventService().client(redisClient).build();

или сконфигурировать всё самому:


import static com.github.fred84.queue.DelayedEventService.delayedEventService;var eventService = delayedEventService()        .client(redisClient)        .mapper(objectMapper)        .handlerScheduler(Schedulers.fromExecutorService(executor))        .schedulingInterval(Duration.ofSeconds(1))        .schedulingBatchSize(SCHEDULING_BATCH_SIZE)        .enableScheduling(false)        .pollingTimeout(POLLING_TIMEOUT)        .eventContextHandler(new DefaultEventContextHandler())        .dataSetPrefix("")        .retryAttempts(10)        .metrics(new NoopMetrics())        .refreshSubscriptionsInterval(Duration.ofMinutes(5))        .build();

Завершить работу сервиса (и всех открытых им соединений в Redis) можно eventService.close() или через фреймворк, поддерживающий аннотацию @javax.annotation.PreDestroy.


Метрики


С любой системой что-то может пойти не так, за ней надо приглядывать. В первую очередь вас должны интересовать:


  • общий размер памяти, используемый Redis;
  • количество событий, готовых к обработке (метрика "delayed.queue.ready.for.handling.count" и тэгом конкретного типа события)

История


Теперь в двух словах о том, как появился и развивался delayed queue. В 2018 году
наш маленький проект был запущен в Amazon Web Services.
Он разрабатывался и поддерживался двумя инженерами, и добавлять в него требующие обслуживания компоненты было накладно с точки зрения времени обслуживания системы. Основным правилом было: "используй подходящие компоненты, обслуживаемые Amazon-ом, если это не стоит очень дорого".


Готовые кандидаты


Мы рассматривали:



Первые два были отсеяны из-за необходимости их настраивать и обслуживать, к тому же с JMS не доставало опыта работы. SQS исключили, поскольку максимальное время задержки не превышает 15 минут.


Первая наивная реализация


На момент старта у нас уже был резервный вариант с обходом "зависших сущностей" в реляционной БД раз в сутки. Почитав статьи про организацию очередей отложенных событий, мы выбрали Redis со следующей структурой:


  • событие добавляется в sorted sets, где весом выступает время ее будущего выполнения
  • по наступлению времени выполнения событие перекладывается из "sorted_set" в "list" (может использоваться в режиме очереди)

Забегая вперед, на тот момент уже полгода существовал проект Netflix dyno-queues
с примерно похожим принципом работы. Однако тогда я его, к сожалению, еще не нашел.


Первая версия диспетчера, который перекладывал "созревшие события" из sorted set в list, выглядела примерно так (здесь и далее приведен упрощенный код):


var events = redis.zrangebyscore("delayed_events", Range.create(-1, System.currentTimeMillis()), 100);events.forEach(key -> {  var payload = extractPayload(key);  var listName = extractType(key);  redis.lpush(listName, payload);  redis.zrem("delayed_events", key);});

Обработчики событий были сделаны поверх Spring Integration, который в свою очередь фактически делал:


redis.brpop(listName)

Первые проблемы не заставили себя долго ждать.


Ненадежный диспетчер


При возникновении ошибки при добавлении в "list" (например, отвалилось соединение), событие помещалось в list несколько раз. Поскольку Redis поддерживает транзакции, мы просто обернули эти 2 метода.


events.forEach(key -> {  ...  redis.multi();  redis.zrem("delayed_events", key);  redis.lpush(listName, payload);  redis.exec();});

Ненадежный обработчик


С другой стороны list-a нас поджидала еще одна проблема. Событие пропадало навсегда, если ошибка происходила внутри обработчика. Решением стала замена удаления элемента из "sorted_set" на перезапись его на более позднее время и удаление только после успешного завершения обработки.


events.forEach(key -> {  ...  redis.multi();  redis.zadd("delayed_events", nextAttempt(key))  redis.zrem("delayed_events", key);  redis.lpush(listName, payload);  redis.exec();});

Не уникальное событие


Как я уже упоминал, у нас изначально был запасной механизм, который обходил "зависшие сущности" в БД и добавлял в "delayed queue" еще одно. Внутри "sorted set" ключ выглядел как
metadata;payload, где payload у нас неизменный, а вот metadata у следующей попытки для одного и того-же события отличалась. В итоге мы могли получить дубликат и много ненужных повторных попыток обработки. Эту ситуацию мы решили, вынеся изменяемую metadata и неизменный payload в Redis hset и оставив в "sorted set" только тип и идентификатор события.
В итоге регистрация события превратилась из


var envelope = metadata + SEPARATOR + payload;redis.zadd(envelope, scheduledAt);

в


var envelope = metadata + SEPARATOR + payload;var key = eventType + SEPARATOR + eventId;redis.multi();redis.zadd(key, scheduledAt);redis.hset("metadata", key, envelope)redis.exec();

Последовательный запуск диспетчера


Все наши обработчики были идемпотентными, и мы не беспокоились о дубликатах событий. Тем не менее, на нескольких экземплярах приложения диспечеры иногда запускались одновременно и добавляли в list одно и то же событие. Добавление банальной блокировки с коротким TTL сделало код чуть более эффективным:


redis.set(lockKey, "value", ex(lockTimeout.toMillis() * 1000).nx());

Эволюция в отдельный проект


Когда такую же задачу понадобилось решить в проекте без Spring, функционал был вынесен в самостоятельную библиотеку. "Под нож" пошли зависимости:



Первая была легко заменена на использование Lettuce напрямую, а со второй все оказалось чуть сложнее. К этому моменту у меня был небольшой опыт работы с реактивными стримами в общем и с Project Reactor в частности, поэтому источником событий для обработчика стал "горячий стрим".
Чтобы добиться равномерного распределения событий между обработчиками в разных экземплярах приложения, пришлось реализовать свой собственный Subscriber


redis  .reactive()  .brpop(timeout, queue)  .map(e -> deserialize(e))  .subscribe(new InnerSubscriber<>(handler, ... params ..))

и


class InnerSubscriber<T extends Event> extends BaseSubscriber<EventEnvelope<T>> {    @Override    protected void hookOnNext(@NotNull EventEnvelope<T> envelope) {        Mono<Boolean> promise = handler.apply(envelope.getPayload());        promise.subscribe(r -> request(1));    }}

В итоге мы получили библиотеку, которая сама доставляет события в зарегистрированные обработчики (в отличии от Netflix dyno queue, гда надо самому poll-ить события).


Что планируем дальше?


  • добавить Kotlin DSL. Новые проекты я все чаще начинаю на Kotlin и использовать suspend fun вместо API Project Reactor будет удобнее
  • сделать настраиваемыми интервалы между повторными попытками

Ccылки


Подробнее..
Категории: Redis , Java , Queue , Reactor

Перевод Асинхронное выполнение задач с использованием Redis и Spring Boot

02.01.2021 16:06:38 | Автор: admin

В этой статье мы рассмотрим, как использовать Spring Boot 2.x и Redis для выполнения асинхронных задач, а полный код продемонстрирует шаги, описанные в этом посте.

Spring/Spring Boot

Spring самый популярный фреймворк для разработки Java приложений.Таким образом, Spring имеет одно из крупнейших сообществ с открытым исходным кодом.Кроме того, Spring предоставляет обширную и актуальную документацию, которая охватывает внутреннюю работу фреймворка и примеры проектов в своем блоге, а наStackOverflowболее 100 тысяч вопросови ответов.

Вначале Spring поддерживал только конфигурацию на основе XML, и из-за этого был подвержен множеству критических замечаний.Позже Spring представила конфигурацию на основе аннотаций, которая изменила все.Spring 3.0 была первой версией, которая поддерживала конфигурацию на основе аннотаций.В 2014 годубыла выпущенаSpring Boot1.0, полностью изменившая наш взгляд на экосистему фреймворка Spring.Более подробное описание истории Spring можно найтиздесь.

Redis

Redis одна из самых популярных NoSQL баз данных в памяти.Redis поддерживает разные типы структур данных. Redis поддерживает различные типы структур данных, например Set, Hash table, List, простую пару ключ-значение это лишь некоторые из них.Задержка вызова Redis составляет менее миллисекунд, поддержка набора реплик и т. д. Задержка операции Redis составляет менее миллисекунд, что делает ее еще более привлекательной для сообщества разработчиков.

Почему асинхронное выполнение задачи

Типичный вызов API состоит из пяти этапов:

  1. Выполние одного или нескольких запросов к базе данных (RDBMS / NoSQL)

  2. Одна или несколько операций системы кэширования (In-Memory, Distributed и т. д.)

  3. Некоторые вычисления (это может быть обработка данных при выполнении некоторых математических операций)

  4. Вызов других служб (внутренних / внешних)

  5. Планирование выполнения одной или нескольких задач на более позднее время или немедленно, но вфоновом режиме

Задачу можно запланировать на более позднее время по многим причинам.Например, счет-фактура должен быть создан через 7 дней после создания или отгрузки заказа.Точно так же уведомления по электронной почте не нужно отправлять немедленно, поэтому мы можем отложить их.

Имея в виду эти реальные примеры, иногда нам нужно выполнять задачи асинхронно, чтобы сократить время ответа API.Например, если мы удалим более 1K записей за один раз, и если мы удалим все эти записи в одном вызове API, то время ответа API наверняка увеличится.Чтобы сократить время ответа API, мы можем запустить задачу в фоновом режиме, которая удалит эти записи.

Отложенная очередь

Каждый раз, когда мы планируем запуск задачи в определенное время или через определенный интервал, мы используем задания cron, которые запланированы на определенное время или интервал.Мы можем запускать задачи по расписанию, используя различные инструменты, такие как crontab в стиле UNIX,Chronos, если мы используем фреймворки Spring, тогда речь идетоб аннотацииScheduled.

Большинство заданий cron просматривают записи о том, когда должно быть предпринято определенное действие, например, поиск всех поставок по истечении семи дней, по которым не были созданы счета.Большинство таких механизмов планирования страдаютпроблемами масштабирования, когда мы сканируем базы данных, чтобы найти соответствующие строки/записи.Во многих случаях это приводит кполному сканированию таблицы,которое работает очень медленно.Представьте себе случай, когда одна и та же база данных используется приложением реального времени и этой системой пакетной обработки.Поскольку она не является масштабируемый, нам понадобится какая-то масштабируемая система, которая может выполнять задачи в заданное время или интервал без каких-либо проблем с производительностью.Есть много способов масштабирования таким образом, например, запускать задачи в пакетном режиме или управлять задачами для определенного подмножества пользователей/регионов.Другой способ запустить конкретную задачу в определенное время без зависимости от других задач, например безсерверной функции.Отложенная очередьможет использоваться в тех случаях, как только таймер достигнет запланированного времени работа будет вызвана. Имеется множествосистем/программного обеспечения для организации очередей, но очень немногие из них, например SQS, предоставляют функцию, которая обеспечивает задержку на 15 минут, а не произвольную задержку, такую как 7 часов или 7 дней и т. д.

Rqueue

Rqueue это брокер сообщений, созданный для платформыSpring,который хранит данные в Redis и предоставляет механизм для выполнения задачи с любой указанной задержкой.Rqueue поддерживается Redis, поскольку Redis имеет некоторые преимущества перед широко используемыми системами очередей, такими как Kafka, SQS.В большинстве серверных приложений веб-приложений Redis используется для хранения данных кеша или для других целей.В настоящее время8,4% веб-приложений используют базу данных Redis.

Как правило, для очереди мы используем либо Kafka/SQS, либо некоторые другие системы, эти системы приносят дополнительные накладные расходы в разных измерениях, например, финансовые затраты, которые можно уменьшить до нуля с помощью Rqueue и Redis.

Помимо затрат, если мы используем Kafka, нам необходимо выполнить настройку инфраструктуры, обслуживание, то есть больше операций, так как большинство приложений уже используют Redis, поэтому у нас не будет накладных расходов на операции, на самом деле можно использовать тот же сервер/кластер Redis с Rqueue.Rqueue поддерживает произвольную задержку

Доставка сообщений

Rqueue гарантирует доставку сообщения хотя бы раз, так как длинные данные не теряются в базе данных.Подробнее об этом читайте настранице Введение в Rqueue.

Инструменты, которые нам понадобятся:

  1. Любая IDE

  2. Gradle

  3. Java

  4. Redis

Мы собираемся использоватьSpring Boot для простоты.Мы создадим проект Gradle с помощью инициализатора Spring Boot по адресуhttps://start.spring.io/.

Из зависимостей нам понадобятся:

  1. Spring Data Redis

  2. Spring Web

  3. Lombok и некоторые другие

Структура каталогов/папок показана ниже:

Мы собираемся использоватьбиблиотекуRqueueдля выполнения любых задач с произвольной задержкой.Rqueue это основанный на Spring исполнитель асинхронных задач, который может выполнять задачи с любой задержкой, он построен на библиотеке обмена сообщениями Spring и поддерживается Redis.

Мы добавим зависимость spring boot starter дляRqueue com.github.sonus21:rqueue-spring-boot-starter:2.0.0-RELEASE с помощью кода:

dependencies {    implementation 'org.springframework.boot:spring-boot-starter-data-redis'  implementation 'org.springframework.boot:spring-boot-starter-web'  implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.0.0-RELEASE'  compileOnly 'org.projectlombok:lombok'     annotationProcessor 'org.projectlombok:lombok'  providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'  testImplementation('org.springframework.boot:spring-boot-starter-test') {    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'    }}

Нам нужно включить функции Redis Spring Boot.В целях тестирования мы также включим WEB MVC.

Обновите файл application как:

@SpringBootApplication@EnableRedisRepositories@EnableWebMvcpublic class AsynchronousTaskExecutorApplication {   public static void main(String[] args) {     SpringApplication.run(AsynchronousTaskExecutorApplication.class, args);  }}

Добавлять задачи с помощью Rqueue очень просто.Нам нужно аннотировать метод с помощьюRqueueListenerRqueuListenerаннотации есть несколько полей, которые можно настроить в зависимости от варианта использования.УстановитеdeadLetterQueueдля отправки задач в другую очередь.В противном случае задача будет отброшена в случае неудачи.Мы также можем установить, сколько раз задача должна быть повторена, используяполе.numRetries

Создайте файл Java с именемMessageListenerи добавьте несколько методов для выполнения задач:

@Component@Slf4jpublic class MessageListener {  @RqueueListener(value = "${email.queue.name}") (1)  public void sendEmail(Email email) {    log.info("Email {}", email);  }  @RqueueListener(value = "${invoice.queue.name}") (2)  public void generateInvoice(Invoice invoice) {    log.info("Invoice {}", invoice);  }}

Нам понадобится классы Email и Invoiceдля хранения данных электронной почты и счетов-фактур соответственно.Для простоты у классов будет только небольшое количество полей.

Invoice.java:

import lombok.Data;@Data@AllArgsConstructor@NoArgsConstructorpublic class Invoice {  private String id;  private String type;}

Email.java:

import lombok.Data;@Data@AllArgsConstructor@NoArgsConstructorpublic class Email {  private String email;  private String subject;  private String content;}

Отправка задач в очередь

Задачу можно отправить в очередь с помощьюRqueueMessageSenderbean-компонента. У которого есть несколько методов для постановки задачи в очередь в зависимости от сценария использования, используйте один из доступных методов.Для простых задач используйте enqueue, для отложенных задач используйте enqueueIn.

Нам нужно автоматически подключитьRqueueMessageSenderили использовать внедрение на основе конструктора для внедрения этих bean-компонентов.

Воткак создать контроллер для тестирования.

Мы планируем создать счет-фактуру, который нужно будет выполнить через 30 секунд.Для этого мы отправим задачу с задержкой 30000 (миллисекунд) в очереди счетов.Кроме того, мы постараемся отправить электронное письмо, которое может выполняться в фоновом режиме.Для этого мы добавим два метода GET,sendEmailи generateInvoice, мы также можем использовать POST.

@RestController@RequiredArgsConstructor(onConstructor = @__(@Autowired))@Slf4jpublic class Controller {  private @NonNull RqueueMessageSender rqueueMessageSender;  @Value("${email.queue.name}")  private String emailQueueName;  @Value("${invoice.queue.name}")  private String invoiceQueueName;  @Value("${invoice.queue.delay}")  private Long invoiceDelay;  @GetMapping("email")  public String sendEmail(      @RequestParam String email, @RequestParam String subject, @RequestParam String content) {    log.info("Sending email");    rqueueMessageSender.enqueu(emailQueueName, new Email(email, subject, content));    return "Please check your inbox!";  }  @GetMapping("invoice")  public String generateInvoice(@RequestParam String id, @RequestParam String type) {    log.info("Generate invoice");    rqueueMessageSender.enqueueIn(invoiceQueueName, new Invoice(id, type), invoiceDelay);    return "Invoice would be generated in " + invoiceDelay + " milliseconds";  }}

Добавим в файл application.properties следующие строки:

email.queue.name=email-queueinvoice.queue.name=invoice-queue# 30 seconds delay for invoiceinvoice.queue.delay=300000

Теперь мы можем запустить приложение.После успешного запуска приложения вы можете просмотреть результат по этойссылке.

В журнале мы видим, что задача электронной почты выполняется в фоновом режиме:

Ниже приведено расписание выставления счетов через 30 секунд:

http://localhost:8080/invoice?id=INV-1234&type=PROFORMA

Заключение

Теперь мы можем планировать задачи с помощью Rqueue без большого объёма вспомогательного кода!Были приведены основные соображения по настройке и использованию библиотеки Rqueue.Следует иметь в виду одну важную вещь: независимо от того, является ли задача отложенной задачей или нет, по умолчанию предполагается, что задачи необходимо выполнить как можно скорее.

Полный код этого поста можно найти в репозиториинаGitHub.

Дополнительноечтение

Spring Boot: Creating Asynchronous Methods Using @Async Annotation

Spring and Threads: Async

Distributed Tasks Execution and Scheduling in Java, Powered by Redis

Подробнее..
Категории: Redis , Java , Spring , Spring boot , Asynchronous task

Перевод Реализация мультиарендности с использованием Spring Boot, MongoDB и Redis

28.02.2021 18:05:07 | Автор: admin

В этом руководстве мы рассмотрим, как реализовать мультиарендность в Spring Boot приложении с использованием MongoDB и Redis.

Используются:

  • Spring Boot 2.4

  • Maven 3.6. +

  • JAVA 8+

  • Монго 4.4

  • Redis 5

Что такое мультиарендность?

Мультиарендность (англ. multitenancy множественная аренда) это программная архитектура, в которой один экземпляр программного приложения обслуживает нескольких клиентов.Все должно быть общим, за исключением данных разных клиентов, которые должны быть должным образом разделены.Несмотря на то, что они совместно используют ресурсы, арендаторы не знают друг друга, и их данные хранятся совершенно отдельно.Каждый покупатель называется арендатором.

Предложение программное обеспечение как услуга (SaaS) является примером мультиарендной архитектуры.Более подробно.

Модели с несколькими арендаторами

Можно выделить три основных архитектурных шаблона для мультиарендной архитектуры (Multitenancy), которые различаются степенью физического разделения данных клиента.

  1. База данных для каждого арендатора: каждый арендатор имеет свою собственную базу данных и изолирован от других арендаторов.

  2. Общая база данных, общая схема:все арендаторы совместно используют базу данных и таблицы.В каждой таблице есть столбец с идентификатором клиента, который показывает владельца строки.

  3. Общая база данных, отдельная схема: все арендаторы совместно используют базу данных, но имеют свои собственные схемы и таблицы базы данных.

Начнем

В этом руководстве мы реализуем мультиарендность на основе базы данных для каждого клиента.

Мы начнем с создания простого проекта Spring Boot наstart.spring.ioсо следующими зависимостями:

<dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-data-mongodb</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-data-redis</artifactId>    </dependency>    <dependency>        <groupId>redis.clients</groupId>        <artifactId>jedis</artifactId>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <optional>true</optional>    </dependency></dependencies>

Определение текущего идентификатора клиента

Идентификатор клиента необходимо определить для каждого клиентского запроса.Для этого мы включим поле идентификатора клиента в заголовок HTTP-запроса.

Давайте добавим перехватчик, который получает идентификатор клиента из http заголовкаX-Tenant.

@Slf4j@Componentpublic class TenantInterceptor implements WebRequestInterceptor {    private static final String TENANT_HEADER = "X-Tenant";    @Override    public void preHandle(WebRequest request) {        String tenantId = request.getHeader(TENANT_HEADER);        if (tenantId != null && !tenantId.isEmpty()) {            TenantContext.setTenantId(tenantId);            log.info("Tenant header get: {}", tenantId);        } else {            log.error("Tenant header not found.");            throw new TenantAliasNotFoundException("Tenant header not found.");        }    }    @Override    public void postHandle(WebRequest webRequest, ModelMap modelMap) {        TenantContext.clear();    }    @Override    public void afterCompletion(WebRequest webRequest, Exception e) {    }}

TenantContextэто хранилище, содержащее переменную ThreadLocal.ThreadLocal можно рассматривать как область доступа (scope of access), такую как область запроса (request scope) или область сеанса (session scope).

Сохраняя tenantId в ThreadLocal, мы можем быть уверены, что каждый поток имеет свою собственную копию этой переменной и что текущий поток не имеет доступа к другому tenantId:

@Slf4jpublic class TenantContext {    private static final ThreadLocal<String> CONTEXT = new ThreadLocal<>();    public static void setTenantId(String tenantId) {        log.debug("Setting tenantId to " + tenantId);        CONTEXT.set(tenantId);    }    public static String getTenantId() {        return CONTEXT.get();    }    public static void clear() {        CONTEXT.remove();    }}

Настройка источников данных клиента (Tenant Datasources)

В нашей архитектуре у нас есть экземпляр Redis, представляющий базу метаданных (master database), в которой централизована вся информация о базе данных клиента.Таким образом, из каждого предоставленного идентификатора клиента информация о подключении к базе данных извлекается из базы метаданных .

RedisDatasourceService.java это класс, отвечающий за управление всеми взаимодействиями с базой метаданных .

@Servicepublic class RedisDatasourceService {    private final RedisTemplate redisTemplate;    private final ApplicationProperties applicationProperties;    private final DataSourceProperties dataSourceProperties;public RedisDatasourceService(RedisTemplate redisTemplate, ApplicationProperties applicationProperties, DataSourceProperties dataSourceProperties) {        this.redisTemplate = redisTemplate;        this.applicationProperties = applicationProperties;        this.dataSourceProperties = dataSourceProperties;    }        /**     * Save tenant datasource infos     *     * @param tenantDatasource data of datasource     * @return status if true save successfully , false error     */        public boolean save(TenantDatasource tenantDatasource) {        try {            Map ruleHash = new ObjectMapper().convertValue(tenantDatasource, Map.class);            redisTemplate.opsForHash().put(applicationProperties.getServiceKey(), String.format("%s_%s", applicationProperties.getTenantKey(), tenantDatasource.getAlias()), ruleHash);            return true;        } catch (Exception e) {            return false;        }    }        /**     * Get all of keys     *     * @return list of datasource     */         public List findAll() {        return redisTemplate.opsForHash().values(applicationProperties.getServiceKey());    }        /**     * Get datasource     *     * @return map key and datasource infos     */         public Map<String, TenantDatasource> loadServiceDatasources() {        List<Map<String, Object>> datasourceConfigList = findAll();        // Save datasource credentials first time        // In production mode, this part can be skip        if (datasourceConfigList.isEmpty()) {            List<DataSourceProperties.Tenant> tenants = dataSourceProperties.getDatasources();            tenants.forEach(d -> {                TenantDatasource tenant = TenantDatasource.builder()                        .alias(d.getAlias())                        .database(d.getDatabase())                        .host(d.getHost())                        .port(d.getPort())                        .username(d.getUsername())                        .password(d.getPassword())                        .build();                save(tenant);            });        }        return getDataSourceHashMap();    }        /**     * Get all tenant alias     *     * @return list of alias     */         public List<String> getTenantsAlias() {        // get list all datasource for this microservice        List<Map<String, Object>> datasourceConfigList = findAll();        return datasourceConfigList.stream().map(data -> (String) data.get("alias")).collect(Collectors.toList());    }        /**     * Fill the data sources list.     *     * @return Map<String, TenantDatasource>     */         private Map<String, TenantDatasource> getDataSourceHashMap() {        Map<String, TenantDatasource> datasourceMap = new HashMap<>();        // get list all datasource for this microservice        List<Map<String, Object>> datasourceConfigList = findAll();        datasourceConfigList.forEach(data -> datasourceMap.put(String.format("%s_%s", applicationProperties.getTenantKey(), (String) data.get("alias")), new TenantDatasource((String) data.get("alias"), (String) data.get("host"), (int) data.get("port"), (String) data.get("database"), (String) data.get("username"), (String) data.get("password"))));        return datasourceMap;    }}

В этом руководстве мы заполнили информацию о клиенте из yml-файла (tenants.yml).В производственном режиме можно создать конечные точки для сохранения информации о клиенте в базе метаданных.

Чтобы иметь возможность динамически переключаться на подключение к базе данных mongo, мы создаемкласс MultiTenantMongoDBFactory, расширяющий класс SimpleMongoClientDatabaseFactory из org.springframework.data.mongodb.core.Он вернетэкземплярMongoDatabase, связанный с текущим арендатором.

@Configurationpublic class MultiTenantMongoDBFactory extends SimpleMongoClientDatabaseFactory {@Autowired    MongoDataSources mongoDataSources;public MultiTenantMongoDBFactory(@Qualifier("getMongoClient") MongoClient mongoClient, String databaseName) {        super(mongoClient, databaseName);    }    @Override    protected MongoDatabase doGetMongoDatabase(String dbName) {        return mongoDataSources.mongoDatabaseCurrentTenantResolver();    }}

Нам нужно инициализироватьконструктор MongoDBFactoryMultiTenantс параметрами по умолчанию (MongoClientиdatabaseName).

Это реализует прозрачный механизм для получения текущего клиента.

@Component@Slf4jpublic class MongoDataSources {    /**     * Key: String tenant alias     * Value: TenantDatasource     */    private Map<String, TenantDatasource> tenantClients;    private final ApplicationProperties applicationProperties;    private final RedisDatasourceService redisDatasourceService;    public MongoDataSources(ApplicationProperties applicationProperties, RedisDatasourceService redisDatasourceService) {        this.applicationProperties = applicationProperties;        this.redisDatasourceService = redisDatasourceService;    }    /**     * Initialize all mongo datasource     */    @PostConstruct    @Lazy    public void initTenant() {        tenantClients = new HashMap<>();        tenantClients = redisDatasourceService.loadServiceDatasources();    }    /**     * Default Database name for spring initialization. It is used to be injected into the constructor of MultiTenantMongoDBFactory.     *     * @return String of default database.     */    @Bean    public String databaseName() {        return applicationProperties.getDatasourceDefault().getDatabase();    }    /**     * Default Mongo Connection for spring initialization.     * It is used to be injected into the constructor of MultiTenantMongoDBFactory.     */    @Bean    public MongoClient getMongoClient() {        MongoCredential credential = MongoCredential.createCredential(applicationProperties.getDatasourceDefault().getUsername(), applicationProperties.getDatasourceDefault().getDatabase(), applicationProperties.getDatasourceDefault().getPassword().toCharArray());        return MongoClients.create(MongoClientSettings.builder()                .applyToClusterSettings(builder ->                        builder.hosts(Collections.singletonList(new ServerAddress(applicationProperties.getDatasourceDefault().getHost(), Integer.parseInt(applicationProperties.getDatasourceDefault().getPort())))))                .credential(credential)                .build());    }    /**     * This will get called for each DB operations     *     * @return MongoDatabase     */    public MongoDatabase mongoDatabaseCurrentTenantResolver() {        try {            final String tenantId = TenantContext.getTenantId();            // Compose tenant alias. (tenantAlias = key + tenantId)            String tenantAlias = String.format("%s_%s", applicationProperties.getTenantKey(), tenantId);            return tenantClients.get(tenantAlias).getClient().                    getDatabase(tenantClients.get(tenantAlias).getDatabase());        } catch (NullPointerException exception) {            throw new TenantAliasNotFoundException("Tenant Datasource alias not found.");        }    }73}

Тест

Давайте создадим CRUD пример с документом Employee.

@Builder@Data@AllArgsConstructor@NoArgsConstructor@Accessors(chain = true)@Document(collection = "employee")public class Employee  {    @Id    private String id;    private String firstName;    private String lastName;    private String email;}

Также нам нужно создать классы EmployeeRepository, EmployeeServiceиEmployeeController.Для тестирования при запуске приложения мы загружаем фиктивные данные в каждую базу данных клиента.

@Overridepublic void run(String... args) throws Exception {    List<String> aliasList = redisDatasourceService.getTenantsAlias();    if (!aliasList.isEmpty()) {        //perform actions for each tenant        aliasList.forEach(alias -> {            TenantContext.setTenantId(alias);            employeeRepository.deleteAll();            Employee employee = Employee.builder()                    .firstName(alias)                    .lastName(alias)                    .email(String.format("%s%s", alias, "@localhost.com" ))                    .build();            employeeRepository.save(employee);            TenantContext.clear();        });    }}

Теперь мы можем запустить наше приложение и протестировать его.

Итак, мы все сделали. Надеюсь, это руководство поможет вам понять, что такое мультиарендность и как она может быть реализована в Spring Boot проекте с использованием MongoDB и Redis.

Полный исходный код примера можно найти наGitHub.

Подробнее..

Эволюция социального фида в iFunny мобильном приложении с UGC-контентом

10.03.2021 20:17:40 | Автор: admin

Несколько лет назад мы добавили в приложение социальные механики: подписку и фид с мемами, которые запостили друзья. В этом материале обсудим вызовы времени и ответы на них в виде нескольких подходов в технической реализации на бэкенде.

Ежедневно наши пользователи загружают десятки тысяч единиц контента. На таком объёме данных приходится применять ухищрения, чтобы сохранить приемлемое время ответа. Под катом расскажу, что именно мы делали, а в конце статьи поделюсь кодом на GitHub для ознакомления.

Принципиально можно выделить две схемы формирования фида:

  1. Push on change.

  2. Pull on demand.

Также возможны гибридные подходы, например, переход от одного механизма к другому в зависимости от количества подписчиков, кеширование новых данных в отдельном слое и так далее.

1. Push on change. Для каждого пользователя создаем отдельный денормализованный фид. При добавлении мема вставляем его в фиды пользователей, подписанных на автора.

Плюсы:

  • очень быстро читать фид из базы.

Минусы:

  • долгое добавление и удаление: время линейно зависит от количества подписок на автора.

Формирование фида по схеме push on changeФормирование фида по схеме push on change

2. Pull on demand. Формируем фид на лету: отправляем по запросу на каждого пользователя, на которого подписаны.

Плюсы:

  • нет нужды актуализировать денормализованные представления: константное по времени добавление и удаление.

Минусы:

  • формирование фида занимает линейно зависимое от количества подписок время;

  • при формировании необходимо обрабатывать избыточное количество данных: чтобы отдать фид из 20 элементов, для каждой подписки приходится выбирать по 20, сортировать и склеивать, а остальное просто выкидывать.

Формирование фида по схеме pull on demandФормирование фида по схеме pull on demand

Первая итерация: push on change на Cassandra

Мы выбрали механизм push on change, а в качестве БД для хранения денормализованных представлений использовали Cassandra. Она использует подход LSM, что позволяет писать с достаточно внушительной скоростью за счет того, что данные просто последовательно пишутся в память (MemTable), а затем сохраняются на диск и сливаются в многоуровневые отсортированные файлы (SSTables).

Была реализована инфраструктура для обработки асинхронных задач наполнения фидов. Суть этих задач сводится к получению подписчиков автора контента и добавления большими пачками нового мема в фид к каждому из них. Тут все логично и хорошо: кластер Cassandra отлично переваривала здоровенные батчи, и по началу проблем с получением тоже не было.

Но спустя время появились проблемы с доступом к данным. Причин несколько:

  1. Требования сторов удалить определённый контент. Например, нарушение копирайта, 18+ или иногда просто лягушонок Пепе.

  2. Удаление самими пользователями.

  3. Отписка пользователей друг от друга. Иногда они отписывались сразу от всех.

  4. Трим старых данных. Нужно было чистить ооочень длинные фиды, потому что пользователи никогда не прочитают до конца.

Всё это создавало в датасете дырки разного размера, которые вынуждали БД в фоне совершать гигантскую работу. Со временем всё становилось только хуже. Cassandra хранит данные в нескольких слоях, каждый из которых больше предыдущего.

Распределение данных по уровням в CassandraРаспределение данных по уровням в Cassandra

Обычно нужные данные находятся в свежих небольших слоях, но когда нужно что-то получить со дна начинаются большие задержки, таймауты и прочие неприятные моменты.

Cassandra написана на Java, поэтому она могла непредсказуемо и надолго уходить в сбор мусора, особенно когда начинала мержить глубокоуровневые SSTableы. К тому времени в кластере было уже порядка 25 нод, а суммарное количество данных с учетом репликации перевалило за 20 ТБ. Это послужило сигналом к началу второй итерации.

Вторая итерация: pull on demand на Redis с формированием фида на стороне приложения

Провели большое количество экспериментов для улучшения ситуации, например:

  • Тюнинг GC Cassandra.

  • Другие стратегии Cassandra Compaction, рассматривали вариант написания своей стратегии.

  • Другие структуры хранения и БД (например, блобы в PostgreSQL).

Но ничего хорошего не вышло, и решили перейти к схеме pull on demand.

Поставили кластер из Redis, разложили данные в сортированные множества (sorted sets) и начали строить фид прямо в момент запроса слиянием на стороне приложения в отдельном сервисе. Это значительно ускорило появление новых мемов в фиде: больше не надо итерировать по подпискам, вообще не нужны асинхронные задачи.

Но ситуация стала чуть хуже в момент получения фида. Пришлось ужесточить ограничение на количество подписок: если во времена нашего первого решения было не страшно разрешать пользователям много подписок, сейчас это критически сказывалось на времени формирования фида дошли до ограничения в 400. Основной проблемой было путешествие избыточного количества данных между базой и приложением. Какое-то время пожили так, но решили, что нужно делать что-то еще.

Третья и текущая итерация: pull on demand на Redis с формированием фида на стороне базы

У предыдущего решения была пара недостатков:

  • Redis однопоточный, поэтому не было возможности распараллелить выполнение сотен запросов более чем на количество шардов в кластере;

  • нарушался принцип data locality: тратилось время на транспортировку данных от базы к приложению, бльшая часть из которых была избыточной.

Redis 4 позволил писать свои модули. Мы решили, что это хороший способ оптимизировать работу фидов. Был написан модуль на C, который на стороне БД получал нужные данные, формировал из них фид, выполняя сортировку на структуре MaxHeap. Команду назвали ZREVMERGE: как понятно из названия, выполняет слияние нескольких сортированных множеств.

Формирование фида на основе модуля с ZREVMERGEФормирование фида на основе модуля с ZREVMERGE

Появилась возможность распараллелить часть работы. ZREVMERGE добавляет задания в свой пул тредов. Доступ модуля к множествам по-прежнему осуществляется в однопоточном режиме из-за ограничений Redis, но сортировка и слияние не требует блокировки.

В итоге получилось более чем в два раза ускорить формирование фида: раньше медиана была около 20 мс, с переносом работы в модуль стала менее 10 мс. Получилось бы лучше, если бы не шардирование данных в кластере: приходится всё же отправлять несколько запросов, по одному на каждый шард и доделывать часть работы в приложении. Получилось увеличить лимит на подписки пользователям с 400 до 5000.

Но были и сложности. Так как модули пишутся на C, а я последний раз писал на нем в университете, столкнулся с парой утечек памяти. Также был найден баг в работе Redis с модулями, но его быстро пофиксили после репорта.

Тем не менее решение получилось достаточно стабильным: за полтора года в проде проблем не доставляло. Были нюансы с клиентом Redis на стороне приложения, но это другая история.

Код для ознакомления выложил в наш GitHub, вот сюда. Если вы знаете, что с ним не так, буду рад послушать в комментах или личке.

Вместо заключения

Конечно, пока не получилось решить все проблемы на 100%. Всегда есть возможности улучшить что-то, но из начальной точки проделан достаточно большой путь. Хоть писать на C в прод и было страшно, но свои результаты это принесло. Буду рад почитать, как фиды работают у вас. Удачного итерирования!

Подробнее..

Оптимизация хранимых данных на 93 (Redis)

12.03.2021 16:12:58 | Автор: admin

Хотелось бы поделиться опытом оптимизации данных с целью уменьшения расходов на ресурсы.

В системе рано или поздно встает вопрос об оптимизации хранимых данных, особенно если данные хранятся в оперативной памяти, как это БД Redis.

Как временное решение, можно увеличить RAM тем самым можно выиграть время.

Redis это no-sql база данных, профилировать ее можно с помощью встроенной команды redis-cli --bigkeys, которая покажет кол-во ключей и сколько в среднем занимает каждый ключ.

Объемными данными оказались исторические данные типо sorted sets. У них была ротация 10 дней из приложения.

Проект в продакшене, поэтому оптимизация никак не должна была сказаться на пользователях.

Данные из себя представляли события измененияцены / даты доставки у оффера. Офферов было очень много порядка 15000 в каждом фиде (прайслисте).

Рассмотрим следующий пример данных события по офферу:

Сделано с помощью http://json.parser.online.fr/Сделано с помощью http://json.parser.online.fr/

{"EventName":"DELIVERY_CHANGED","DateTime":"2021-02-22T00:04:00.112593982+03:00","OfferId":"109703","OfferFrom":{"Id":"109703","Name":"Саундбар LG SN11R","Url":"https://www.example.ru/saundbar-lg-sn11r/?utm_source=yandex_market&utm_medium=cpc&utm_content=948&utm_campaign=3&utm_term=109703","Price":99990,"DeliveryAvailable":true,"DeliveryCost":0,"DeliveryDate":"2021-02-24T23:49:00+03:00"},"OfferTo":{"Id":"109703","Name":"Саундбар LG SN11R","Url":"https://www.example.ru/saundbar-lg-sn11r/?utm_source=yandex_market&utm_medium=cpc&utm_content=948&utm_campaign=3&utm_term=109703","Price":99990,"DeliveryAvailable":true,"DeliveryCost":0,"DeliveryDate":"2021-02-23T00:04:00.112593982+03:00"}}

Такое событие занимает 706 байт.

Оптимизация

  1. Для начала я уменьшил ротацию до 7 дней, так как использовалась именно последняя неделя. Здесь стоит отметить, что шаг весьма легкий(в исходном коде изменил 10 на 7), сразу сокращает размер RAM на 30%.

  2. Удалил из хранилища все данные, которые записывались, но не использовались во время чтения, такие как name, url, offerId что сократило еще примерно на 50%.

    Cтало:

    {"EventName":"DELIVERY_CHANGED","DateTime":"2021-02-22T00:04:00.112593982+03:00","OfferId":"109703","OfferFrom":{"Price":99990,"DeliveryAvailable":true,"DeliveryCost":0,"DeliveryDate":"2021-02-24T23:49:00+03:00"},"OfferTo":{"Price":99990,"DeliveryAvailable":true,"DeliveryCost":0,"DeliveryDate":"2021-02-23T00:04:00.112593982+03:00"}}

    Теперь событие занимает 334 байта.

    1. Переделал формат хранение с json в бинарный protobuf.

      Об этом шаге хотелось бы рассказать подробнее

      1. Составил схему хранение данных, в случее с protobuf это proto - файл:

        syntax = "proto3";import "google/protobuf/timestamp.proto";message OfferEvent {  enum EventType {    PRICE_CHANGED = 0;    DELIVERY_CHANGED = 1;    DELIVERY_SWITCHED = 2;    APPEARED = 3;    DISAPPEARED = 4;  }  EventType event_name = 1;  google.protobuf.Timestamp date_time = 2;  string offer_id = 3;  message Offer {    int32 price = 1;    bool delivery_available = 2;    int32 delivery_cost = 3;    google.protobuf.Timestamp  delivery_date = 4;  }  Offer offer_from = 4;  Offer offer_to = 5;} 
        
      2. Исходное сообщение в текстовом protobuf формате будет выглядеть так

        event_name: DELIVERY_CHANGEDdate_time {  seconds: 1613941440}offer_id: "109703"offer_from {  price: 99990  delivery_available: true  delivery_date {    seconds: 1614199740  }}offer_to {  price: 99990  delivery_available: true  delivery_date {    seconds: 1614027840  }}
        
      3. Сообщение в итоговом бинарном protobuf формате будет выглядеть так

        echo 'event_name: DELIVERY_CHANGEDdate_time {  seconds: 1613941440}offer_id: "109703"offer_from {  price: 99990  delivery_available: true  delivery_date {    seconds: 1614199740  }}offer_to {  price: 99990  delivery_available: true  delivery_date {    seconds: 1614027840  }}' | protoc --encode=OfferEvent offerevent.proto | xxd -p | tr -d "\n"0801120608c095cb81061a06313039373033220e08968d061001220608bcf7da81062a0e08968d061001220608c0b8d08106
        

      Теперь событие занимает 50 байт. Это сократило потребление памяти на 85%.

      Бинарное сообщение без proto-схемы можно посмотреть с помощью онлайн-сервиса https://protogen.marcgravell.com/

Итого

Оптимизация места более, чем в 14 раз (50 байт против 706 байт изначальных), то есть на 93%.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru