Русский
Русский
English
Статистика
Реклама

Асинхронное программирование

Опыт написания асинхронного поллинга сетевых устройств

03.06.2021 14:23:54 | Автор: admin

Основная идея асинхронного поллинга

Асинхронность позволяет сильно увеличить парралельность работы. В случае сетевого взаимодействия и съема SNMP времена ожидания ответов большие, что позволяет программе заняться чем-то полезным пока не пришел ответ.

Есть синхронные способы реализации парралельности форки, треды. Это просто с точки зрения написания программы, но это очень дорого с точки зрения ресурсов ОС

Немного математики

Математика будет инженерно-прикидочная, точность 20%

Я хотел опрашивать 100 тысяч устройств по 100 метрикам каждое один раз в 5 минут, с запасом один раз в минуту.

То есть нужно снимать 10 млн метрик в минуту или 150 тысяч в секунду. Округлю до 100 тысяч метрик в секунду.

Опрос одной метрики занимает около 5мс: 4мс отвечает устройство, + 1мс на обработку.

Это значит, что один синхронный поток может обрабатывать 200 метрик в секунду или нужно 500 синхронных потоков. Вполне реалистично.

(Все, конец, больше писать нечего)

На самом деле это только начало. Фишка в не идеальности мира, а именно в потерях ответов на запросы.Потери происходят из-за потерь траффика или недоступности устройств.

В моем случае таймаут ожидания составлял одну секунду и делалось 2 попытки, значит на недоступном устройстве можно потерять 2000мс на ожидание

Посчитаем еще раз

По нашему опыту в инфраструктуре вчегда недоступны 3% устройств. Причины разные работы по модернизации, обрывы сети, отключения света.

Итак, 3% метрик опрашиваются со скоростью 2000мс.

В таком случае один поток cможет опрашивать всего 15,5 метрик в секунду.

Но введем оптимизацию и в случае, если у устройства 5 раз подряд не удается снять метрики, остальные 95 метрик не будем снимать в текущем цикле опроса. Получится, что один поток может обрабатывать 125 метрик в секунду или нам нужно будет 800 потоков. На грани, но можно.

Учтем специфику задачи у нас мониторинг и нам интересно чтобы он быстро работал _особенно_ когда все идет нет так. Плюс есть технические аргументы 800 потоков генерируют высокую конкуренцию за общие ресурсы и в целом, работают плохо в системе, которая по дизайну была рассчитана на десятки потоков.

А что, если недоступно не 3%, а 20% или все 100% устройств?

Ок, посчитаем еще один раз

Чтобы не было скучно вымышленный, но вполне возможный пример из какого ни будь постмортема ошибка конфигурации X привела к недоступности по мониторингу и управлению примерно половины сети. Исправление было сделано сразу же, проверенные вручную 10 устройств стали доступны, но реально проблема решилась только в одном сегменте, поэтому устранение для остальной части сети затянулось на 2 часа.

Хорошо бы, чтобы мониторинг мог локализовывать и такие аварии, когда и 45%, и 99% сети недоступно.

Тогда немного другая задача: 100% устройств не отвечают. Сколько нужно потоков, чтобы увидеть, что они заработали в течении 30 секунд? - Около 7 тысяч.

Такую параллельность разумно обеспечить асинхронностью

Входные данные

Система написана на С, парралельность обеспечена форками, для синхронизации используются мьютексы, активно используется Shared Memory для хранения объектов. Реализация SNMP на основе Net-SNMP

Задача минимальными усилиями сделать так, чтобы поллинг был асинхронным

Реализация1 : все что работает пусть работает, лишнего не трогай

Кажется, нужно просто добавить парралелизма?

Я взял код синхронной версии поллера SNMP, изменил размеры массивов, немного переписал инициацию SNMP, чуть помучался с сильно возросшими требованиями по памяти, но получил псевдо-асинхронный режим работы:

Работал он примерно так:

Первая реализация парралельного поллингаПервая реализация парралельного поллинга

Сначала идет период получения списка задач на поллинг, потом разом инициируются соединения и запросы по всем метриками и поллер ждет прихода ответа от всех. Большинство хостов отвечают очень быстро, но, как правило, приходилось ждать полный таймаут, так как хоть один из элементов данных терялся по тайм-ауту.

Схема сильно упрощена количество одновременных запросов 2 тысячи, и масштаб ожидания тоже другой. В реальности ожидание таймаута выглядит примерно так:

Ожидание таймаутов огромно по сравнению с нормальными ответамиОжидание таймаутов огромно по сравнению с нормальными ответами

Несмотря на простую реализацию, прирост скорости оказался примерно 100- кратным.

С установкой на продуктив появились особенности устройства, обладающие большим количеством элементов, стали опрашиваться нестабильно.

Но схема хорошо работала на большом количестве одинаковых небольших устройств.

Поэтому появилась вторая реализация:

Реализация2 : Синхронно-асинхронный режим работы

Анализ показал, что устройства с большим количеством метрик получают разом от десятков до сотен запросов, на часть из которых не отвечают. То есть нужно вводить сериализацию запросов для хостов. Перед началом цикла опроса запросы сортировать по хосту и для одного хоста задания выполнять одно за другим. Если случится тайм-аут или два подряд, то дальнейшие задания для этого хоста отбросить.

Стало лучше. Но проблемы с хостами с большим количеством метрик остались: все еще иногда фиксировались не ответы или потери элементов.

На количестве метрик больше 2-3 тысяч такая потеря происходила примерно в каждом цикле опроса и приходилось увеличивать количество попыток или отбрасывать элементы только после 3-4 неудачных опросов, что затягивало общий цикл съема.

В итоге, вторая реализация стала работать качественней, но потеряла в скорости. Стало ясно, что малой кровью не обойтись.

Реализация3 : Настоящая асинхронная

Это первая версия, которая могла считаться истинно асинхронной. Для ее реализации пришлось ввести полноценную машину состояний для каждой операции, благо в случае SNMP это протокол UDP и состояний немного.

Введение состояний позволило разсинхронизировать общую работу соединений и получать новые элементы данных по мере того, как появлялись свободные соединения:

Реализация получилась существенно сложнее, но позволила не зависеть от скорости опроса конкретных хостов и их неответов, утилизация соединений резко возросла, но опять полностью от неответов избавить не удалось, хотя они стали очень редки.

Реализация3.5 : еще лучше, еще быстрее

Вины механизма поллинга в неответах не было. Проблема была в механизмах распределения задач. Один хост мог попасть в несколько асинхронных поллеров и в таком случае сразу несколько поллеров его начинали параллельно опрашивать.

Поэтому нужно было оптимизировать механизмы распределения задач на поллинг.

Сначала я сделал так, чтобы один хост гарантированно попадал в один и тот же поллер. Потери пропали, хотя иногда хосты с 3-4 тысячами метрик могли ожидать 2-4 минуты пока устройство отвечало на все запросы и такие метрики фиксировались как опаздывающие.

Появилась возможность для еще одной оптимизации я не закрывал в одном соединении сокет до тех пор, пока обрабатывал метрики одного и того-же хоста. Фаерволам стало немного легче. Но потом пришлось отказаться: некорректно работал библиотечный механизм таймаутов, который пришлось продублировать своим, но самое неприятное, случались утечки памяти в Net:SNMP.

В итоге, такими стараниями асинхронный поллинг смог достигать цифр 60-70 тысяч метрик в секунду. Дальше он выявил ряд проблем и узких мест в общей архитектуре, упираясь в блокировки больше, чем в ограничения поллинга.

Реализация4.0 : до основанья, а затем

Раз уж удалось гарантировать, что хосты из общей очереди задач всегда будут попадать в свои поллеры, то необходимость держать такие хосты в общей очереди задач отпала.

В 4 версии я реализовал получение сразу всего списка во внутреннюю очередь, с сопутствующей оптимизацией резолвинг и кеширование ДНС, макросов и конфигурации элемента данных делались на входе единоразово. Таким образом, сотни тысяч блокировок конфигурации в общей памяти трансформировались в одну две для выполнения массовой операции обновления элементов в течении достаточно долгого периода времени.

Версию 4.0 оказалось проще переписать заново, так как поллер стал сильно самостоятельным благодаря собственной очереди и опала необходимость подгонять его под логику работы классических поллеров. Еще одна диаграмма работы:

Тут снова не в масштабе расстояния между оранжевыми столбцами обновлениями конфигурации огромны, это минуты, нужно картинку удлинить в десять тысяч раз чтобы было пропорционально.

И осталось еще одно ограничение - количество исходящих портов. Так как серверу нужна еще прочая сетевая активность помимо SNMP, общее количество соединений ограничил в 48 тысяч.

Даже это - большая цифра. Поллер с таймаутом в 2 секунды при 100% неответов сможет проверить 15 хостов за 30 секунд в одном соединении или примерно 720 тысяч! хостов за 30 секунд в 48 тысяч соединений.

Эти цифры больше, чем устраивают, но, если нужно больше всегда можно решить с помощью конфигурации с несколькими IP адресами или RAW socket.

Сейчас такой поллинг в тестах упирается в соединения на отметке в 120-140 тысяч метрик в секунду.

На проде снимает около 30-40 тысяч метрик в секунду, работает в один поток, использует при этом 50% одного ядра.

Вывод:

От Реализации1 до Реализации4 прошло почти 2 года. Каждая реализация занимала по 2-3 человеко-недели.

Стоило оно того?

Как простое решение - можно же было просто поднять пачку контейнеров для скейла.

Думаю, что очень даже стоило.

Все, что снималось на 15 вспомогательных серверах, получилось снимать на одном и тот курит.

А мониторинг должен быть максимально простым, а лучше всего - сосредоточенным в одном сервере. Иначе он тоже претендент на выход из строя при инфраструктурных проблемах.

Мониторьте в радость.

Подробнее..

Интеграция синхронное, асинхронное и реактивное взаимодействие, консистентность и транзакции

25.02.2021 10:21:06 | Автор: admin

Продолжаю серию статей про хорошую интеграцию. В первой статье я говорил, что хорошая админка обеспечит быстрое решение инцидентов как ключевой фактор для устойчивости работы всего комплекса систем. Во второй про использование идемпотентных операций для устойчивой работы в условиях асинхронного взаимодействия и при сбоях. В этой статье рассмотрим синхронный, асинхронный и реактивный способы взаимодействия между сервисами и более крупными модулями. А так же способы обеспечить консистентность данных и организовать транзакции.

На схеме вы можете увидеть все три варианта взаимодействия. Важно то, что схема описывает обработку не одного сообщения, а нескольких из входящей очереди. Потому что для анализа производительности и устойчивости важна не просто обработка одного сообщения важно, как экземпляр сервиса обрабатывает поток сообщений из очереди, переходя из одного к другому и откладывая незавершенную обработку.

На самом деле под любыми способами взаимодействия (в том числе синхронным) лежит посылка сообщения. Которое будет получено некоторым обработчиком и обработано сразу с отправкой сообщения-ответа. Либо поставлено в очередь на обработку с квитанцией о приеме, а содержательная обработка произойдет позднее. Однако это не означает, что все способы одинаковы обработка сообщений и очереди организуются по-разному. Конечно, детали реализации часто скрыты протоколом обмена, библиотеками или синтаксическим сахаром языка реализации, однако для разработки эффективных и устойчивых приложений нам необходимо понимать, как именно организована обработка (включая обработку ошибок), где возникают очереди, задержки и как происходит обработка при большом потоке сообщений.

Синхронное взаимодействие

Синхронное взаимодействие самое простое. Оно скрывает все детали удаленного вызова, что для вызывающего сервиса превращается в обычный вызов функции с получением ответа. Для его организации есть множество протоколов например, давно известные RPC и SOAP. Но очевидная проблема синхронности в том, что удаленный сервис может отвечать не очень быстро даже при простой операции на время ответа влияет загруженность сетевой инфраструктуры, а также другие факторы. И все это время вызывающий сервис находится в режиме ожидания, блокируя память и другие ресурсы (хотя и не потребляя процессор). В свою очередь, блокированные ресурсы могут останавливать работу других экземпляров сервиса по обработке сообщений, замедляя тем самым уже весь поток обработки. А если в момент обращения к внешнему сервису у нас есть незавершенная транзакция в базе данных, которая держит блокировки в БД, мы можем получить каскадное распространение блокировок.

Например, при распиле монолита мы выносим сервис хранения товаров отдельно. И в бизнес-логике обработки заказа (где-то в середине) нам надо получить какой-то атрибут для действий в зависимости от него например, узнать вес и объем товара, чтобы решить: курьер довезет или нужна машина (или даже газель). Если раньше мы обращались за атрибутом локально и быстро получали ответ, то теперь мы используем удаленное обращение и пока он идет, этот процесс держит не только свои ресурсы, но и блокировки, связанные с незавершенной транзакцией.

При этом стандартный шаблон работы с базой данных - все изменения по одному запросу пользователя проводить в одной транзакции (разумно их делить), а не завершать транзакции после каждого оператора для обеспечения консистентности данных. Поэтому при выносе хранения товаров в отдельный сервис нам не просто надо переписать процедуры запроса атрибутов на обращения к сервису, а провести реинжиниринг кода: сначала запросить все необходимые данные от других сервисов, а потом начать делать изменения в базе данных.

Казалось бы, это всё очевидно. Но я встречался со случаями, когда синхронные вызовы ставили без необходимости, искренне не понимая, что выполнение будет долгим, а накладные расходы большими. Отдельная засада заключается в том, что современные системы разработки позволяют вынести сервисы на удаленный сервер не меняя исходного кода, и сделать это могут администраторы при конфигурировании системы. В том числе на уровне базы данных я встречался с идеями, когда централизованное хранение логов на полном серьезе предлагали делать просто за счет переноса локальных таблиц на общий сервис так, чтобы прямые вставки в них превратились в ставки по dblink. Да, это простое решение. Только очень неустойчивое по производительности и чувствительное к сбоям сетевой инфраструктуры.

Другая проблема связана с падениями удаленных вызовов. Падение вызываемого сервиса еще укладывается в общую логику обработки всякий вызов процедуры может породить ошибку, но эту ошибку можно перехватить и обработать. Но ситуация становится сложнее, когда вызываемый сервис отработал успешно, но вызывающий за это время был убит в процессе ожидания или не смог корректно обработать результат. Поскольку этот уровень скрыт и не рассматривается разработчиками, то возможны эффекты типа резервов для несуществующих заказов или проведенные клиентом оплаты, о которых интернет-магазин так и не узнал. Думаю, многие сталкивались с подобными ситуациями.

И третья проблема связана с масштабированием. При синхронном взаимодействии один экземпляр вызывающего сервиса вызывает один экземпляр вызываемого, но который, в свою очередь, тоже может вызывать другие сервисы. И нам приходится существенно ограничивать возможность простого масштабирования через увеличение экземпляров запущенных сервисов, при этом мы должны проводить это масштабирование сразу по всей инфраструктуре, поддерживая примерно одинаковое число запущенных сервисов с соответствующей затратой ресурсов, даже если проблема производительности у нас только в одном месте.

Поэтому синхронное взаимодействие между сервисами и системами зло. Оно ест ресурсы, мешает масштабированию, порождает блокировки и взаимное влияние разных серверов.

Я бы рекомендовал избегать его совсем, но, оказывается, есть одно место, в котором протокол поддерживает только синхронное взаимодействие. А именно взаимодействие между сервером приложений и базой данных по JDBC синхронно принципиально. И только некоторые NoSQL базы данных поддерживают реально асинхронное взаимодействие со стороны сервера приложений и вызовы callback по результату обработки. Хотя казалось бы, мы находимся в поле бэкенд-разработки, которая в наше время должна быть ориентирована на асинхронное взаимодействие... Но нет и это печально.

Транзакции и консистентность

Раз уж зашла речь про базы данных, поговорим о транзакционности работы. Там более, что именно она часто является аргументом за синхронное взаимодействие.

Начнем с того, что транзакционность была в свое время громадным преимуществом реляционных баз данных и снимала с разработчиков громадное количество работы по обработке ошибок. Вы могли рассчитывать, что все изменения, которые бизнес-логика выполняет в базе данных при обработке одного запроса пользователя либо будут целиком зафиксированы, либо целиком отменены (если при обработке произошла ошибка), а база данных вернется в прежнее состояние. Это обеспечивал механизм транзакций реляционной базы данных и паттерн UnitOfWork в приложении-клиенте или на сервере приложений.

Как это проявлялось практически? Например, если вы резервировали заказ, и какой-то одной позиции не хватило, то ошибка обработки автоматически снимала все сделанные резервы. Или если вы исполняли сложный документ, и при этом создавалось много проводок по разным счетам, а также изменялись текущие остатки и история, то вы могли быть уверены, что либо все проводки будут созданы и остатки будут им соответствовать, либо ни одной проводки не останется. Поведение по умолчанию было комфортным и при этом обеспечивало консистентность, и лишь для сохранения частичных результатов (например, для частичного резерва заказа) надо было предпринимать специальные сознательные усилия. Но и в этом случае на уровне базы данных за счет механизма триггеров все равно можно было следить за консистентностью внутри транзакций при частичном сохранении результатов , например, обеспечивая жесткое соответствие проводок и остатков по счетам.

Появление трехзвенной архитектуры и сервера приложений принципиально не изменило ситуацию. Если каждый запрос пользователя обрабатывается сервером приложений в одном вызове, то заботу о консистентности вполне можно возложить на транзакции базы данных. Это тоже было типовым шаблоном реализации.

Когда же пришла пора распределенных систем, то это преимущество решили сохранить. Особенно на уровне базы данных потому что возможность вынести часть хранения на другой сервер средствами администратора представлялась крайне желанной. В результате появились распределенные транзакции и сложный протокол двухфазного завершения, который призван обеспечить консистентность данных в случае распределенного хранения.

Призван, но по факту не гарантирует. Оказывается, в случае сбоев даже промышленные системы межсистемных транзакций, такие, как взаимодействие по Oracle dblink, могут привести к тому, что в одной из систем транзакция будет завершена, а в другой нет. Конечно, это тот самый исчезающе маловероятный случай, когда сбой произошел в крайне неудачный момент. Но при большом количестве транзакций это вполне реально.

Это особенно важно, когда вы проектируете систему с требованиями высокой надежности, и рассчитываете использовать базу данных как средство, обеспечивающее резервирование данных при падении сервера, передавая данные на другой сервер и получая, таким образом, копию, на которую рассчитываете оперативно переключиться при падении основного сервера. Так вот, если падение произошло в этот самый крайне неудачный момент, вы не просто получаете транзакции в неопределенном состоянии, вы должны еще разобраться с ними вручную до запуска штатного режима работы.

Вообще, расчет на штатные средства резервирования базы данных в распределенном IT-ландшафте иногда играет злую шутку. Очень печальная история восстановления после аварии произошла, когда основной сервер одной из систем деградировал по производительности. Причина проблем была совершенно неясна, не исключены были даже проблемы на уровне железа, поэтому решили переключиться на standby в конце концов, его именно для таких случаев и заводили. Переключились. После этого выяснилось, что потерялись несколько последних минут работы, но в корпоративной системе это не проблема пользователей оповестили, они работу повторили.

Проблема оказалась в другом: смежные системы, с которыми взаимодействовал сервер, оказались совершенно не готовы к откату его состояния на несколько минут они же уже обращались и получили успешный ответ. Ситуация усугубилась тем, что об этом просто не подумали при переключении, а проблемы начали проявляться не сразу поэтому эффект оказался довольно большим, и его пришлось несколько дней искать и устранять сложными скриптами.

Так вот там, где взаимодействие было асинхронным, с этим получилось разобраться скриптами на основе сравнения журналов обоих серверов. А вот для синхронного взаимодействия оказалось, что никаких журналов не существует в природе, и восстановление консистентности данных потребовало сложных межсистемных сверок. Впрочем, для асинхронных взаимодействий при отсутствии журналов будет тоже самое, поэтому ведите их с обеих сторон.

Таким образом, механизмы межсистемных транзакций часто погребены очень глубоко на системном уровне, и это реальная проблема в случае сбоев. А при временных нарушениях связи проблема станет еще больше из-за больших таймаутов на взаимодействие систем многие из них разворачивались еще в тот период, когда связь была медленной и ненадежной, а управление ими при этом далеко не всегда вынесено на поверхность.

Поэтому использование транзакций базы данных и встроенных механизмов, в общем случае, не будет гарантией консистентности данных и устойчивости работы комплекса систем. А вот деградацию производительности из-за межсистемных блокировок вы точно получите.

Когда же объектно-ориентированный подход сменил процедурную парадигму разработки, мы получили еще одну проблему. Для поддержки работы с персистентными объектами на уровне сервера приложений были разработаны объектно-реляционные мапперы (ORM). Но тут-то и выяснилось, что шаблон UnitOfWork и возможность отката транзакций концептуально противоречат ORM каждый объект инкапсулирует и сложность, и логику работы, и собственные данные. Включая активное кэширование этих данных, в том числе и между сессиями разных пользователей для повышения производительности при работе с общими справочниками. А отката транзакций в памяти на уровне сервера приложений не предусмотрено.

Конечно, на уровне ORM или в надстройке над ним можно вести списки измененных объектов, сбрасывать их при завершении UnitOfWork, а для отката считывать состояние из базы данных. Но это возможно, когда вся работа идет только через ORM, а внутри базы данных нет никакой собственной бизнес-логики, изменяющей данные, например, триггеров.

Может возникнуть вопрос а какое все это имеет отношение к интеграции, это же проблемы разработки приложения как такового? Это, было бы так, если бы многие legacy-системы не выставляли API интеграции именно на уровне базы данных и не реализовывали логику на этом же уровне. А это уже имеет прямое отношение к интеграции в распределенном IT-ландшафте.

Замечу, что взаимодействие между базами данных тоже не обязательно должно быть синхронным. Тот же Oracle имеет различные библиотеки, которые позволяют организовывать асинхронное взаимодействие между узлами распределенной базы данных. И появились они очень давно мы успешно использовали асинхронное взаимодействие в распределенной АБС Банка еще в 1997 году, даже при скорости канала между городами, по которому шло взаимодействие, всего 64К на всех пользователей интернета (а не только нашей системы).

Асинхронное и реактивное взаимодействие

Асинхронное взаимодействие предполагает, что вы посылаете сообщение, которое будет обработано когда-нибудь позднее. И тут возникают вопросы а как получать ответ об обработке? И нужно ли вообще его получать? Потому что некоторые системы оставляют это пользователям, предлагая периодически обновлять таблицы документов в интерфейсе, чтобы узнать статус. Но достаточно часто статус все-таки нужен для того, чтобы продолжить обработку например, после полного завершения резервирования заказа на складе передать его на оплату.

Для получения ответа есть два основных способа:

  • обычное асинхронное взаимодействие, когда передающая система сама периодически опрашивает состояние документа;

  • и реактивное, при котором принимающая система вызывает callback или отправляет сообщение о результате обработки заданному в исходном запросе адресату.

Оба способа вы можете увидеть на схеме вместе с очередями и логикой обработки, которая при этом возникает.

Какой именно способ использовать зависит от способа связи. Когда канал однонаправленный, как при обычном клиент-серверном взаимодействии или http-протоколе, то клиент может запросить сервер, а вот сервер не может обратиться к клиенту взаимодействие получается асинхронным.

Впрочем, такой асинхронный способ легко превращается в синхронный достаточно в методе отправки сообщения поставить таймер с опросом результата. Сложнее превратить его в реактивный, когда внутри метода отправки находится опрос результата и вызов callback. И вот это второе превращение далеко не столь безобидно, как первое, потому что использующие реактивную интеграцию рассчитывают на ее достоинства: пока ответа нет, мы не тратим ресурсы и ждем реакции. А оказывается, что где-то внутри все равно работает процесс опроса по таймеру

Реактивное взаимодействие требует определенной перестройки мышления, которая не столь проста, как кажется, потому что есть желание не просто упростить запись, а скрыть реактивное программирование и писать в традиционном стиле. Впервые я это осознал, когда был в 2014 году на конференции GoToCon в Копенгагене (мой отчет) и там же услышал про Реактивный манифест (The Reactive Manifesto). Там как раз обсуждалось создание различных библиотек, поддерживающих эту парадигму взаимодействия, потому что она позволяет гибко работать с производительностью. Сейчас это встроено в ряд языков через конструкции async/await, а не просто в библиотеки.

Но фишка в том, что такое скрытие усложняет понимание происходящего внутри приложения. Засада происходит в том случае, когда к объектам, обрабатываемым в таком асинхронном коде, обращаются из других мест например, они могут быть возвращены в виде коллекций, запрашивающих объекты. И если не позаботиться специально, то вполне могут быть ситуации одновременного изменения объектов. Вернее, псевдо-одновременного между двумя асинхронными вызовами объект изменяется, хотя с точки зрения разработчика мы как бы находимся внутри потока управления одной процедуры.

Впрочем, шаблоны реактивного программирования это отдельная тема. Я же хочу заострить внимание на том, что в реактивном взаимодействии есть не только переключение потоков, но и скрытые очереди. А скрытые очереди хуже явных, потому что когда возникает дефицит ресурсов и возрастает нагрузка, все тайное становится явным. Это, конечно, не повод усложнять запись, но, как и в случае с автоматическим управлением памятью, эти механизмы надо понимать и учитывать. Особенно в интеграции, когда это используется для взаимодействия между узлами и сервисами, которые потенциально находятся на разных узлах.

Поэтому я рекомендую представить вашу конкретную интеграцию со всеми скрытыми очередями на схеме как в начале статьи,. И с ее помощью продумать, что произойдет, если один из сервисов упадет в середине обработки? Сделать это нужно как минимум трем людям:

  • Проектирующий систему аналитик, должен понимать, какие будут последствия для пользователей, и как именно с ними разбираться что нужно автоматизировать, а что может решить служба поддержки.

  • При этом аналитик плотно взаимодействует с разработчиком, который смотря на соответствие этой схемы и фактической реализации, может указать на разные проблемы и предложить их решение.

  • А третий человек тестировщик. Он должен придумать, как проверить, что в случае сбоев и падений отдельных сервисов система ведет себя именно так, как задумано что не возникает документов в промежуточных состояниях и которые не видны ни на интерфейсах ни службе поддержки; что отсутствует случайная двойная обработка документа и так далее.

Поясню эти задачи на примерах. Пусть один сервис обрабатывает заказы от покупателей, а другой резервирует товары по этим заказам на остатках. Задача в этом случае несмотря на падения серверов или сбои связи, не должно быть ситуации, когда по заказу на 3 единицы они зарезервировались дважды, то есть заблокировалось 6 единиц, потому что первый раз сервис резервирования его выполнил, а квитанцию о резерве не послал или сервис обработки заказов не смог эту квитанцию обработать.

Другой пример в дата-центре установлен сервис отправки чеков в ФНС, которая выполняется через взаимодействие со специализированным оборудованием ККМ. И тут тоже надо обеспечить, чтобы каждый чек был отправлен в налоговую ровно один раз, при том, что сама ККМ может работать ненадежно и со сбоями. А в тех случаях, когда алгоритм не может однозначно выяснить результат обработки чека, о появлении такой ситуации должна быть оповещена служба поддержки для разбора. Это должны спроектировать аналитики с разработчиками, а тестировщики проверить при различных сбойных ситуациях. И лучше это делать автоматически, особенно если в интеграции есть сложные алгоритмы обработки.

Конечно, никакая проверка не дает гарантий. Поэтому при межсистемном взаимодействии важно вести журналы с обеих сторон (хотя при большом объеме приходится думать, как чистить или архивировать логи). Казалось бы, это очевидно. Но нет я встречал много систем, где журналы отсутствовали и диагностировать проблемные ситуации было невозможно. И много раз доказывал при проектировании, что журналирование обмена необходимо.

Консистентность данных

Вернемся к вопросам консистентности данных при асинхронном взаимодействии. Как его обеспечивать, если транзакции недоступны? В прошлой статье я предложил одно из решений шаблон идемпотентных операций. Он позволяет в случае проблем с получением ответа в обработке сообщения просто отправить его повторно, и оно при этом будет обработано корректно. Этот же прием можно применять в ряде ситуативных технических ошибок, например, при отказе из-за блокировок на сервере.

Если же вы получили содержательную ошибку, например, отказ в отгрузке уже зарезервированного и оплаченного заказа, то, скорее всего, с такими проблемами надо разбираться на бизнес-уровне, никакие транзакции тут не помогут. В самом деле, отказ отгрузки может в том числе означать, что зарезервированный товар присутствовал только в остатках информационной системы, а на реальном складе его не было. То есть, его действительно не получится отправить покупателю, а значит, разбираться с этим надо на уровне бизнес-процесса, а не технически. По моему опыту, подобные ситуации достаточно распространены многие разработчики почему-то забывают, что ИТ-системы лишь отражают реальный мир (и отражение может быть неверным) и сосредотачиваются на техническом уровне.

Организация транзакций

Обработка сообщений порождает транзакции, так как системы по обе стороны взаимодействия работают с базами данных. Но при асинхронном взаимодействии нет никакого смысла сохранять ту транзакционность обработки, какая была в исходной системе. Это опять кажется очевидным, но при обсуждении базовых механизмов интеграции эту задачу почему-то часто ставят и тратят силы не ее решение. Это лишняя работа.

В основе транзакционной работы лежит необходимость персистентного сохранения в базу данных, при этом транзакции и кванты обработки стоит настраивать отдельно, исходя из требований производительности. По моему опыту, если у вас идет массовая загрузка или передача данных, то вовсе не обязательно оптимальным будет commit на каждую запись всегда есть некоторое оптимальное количество. При этом внутри процедуры загрузки можно ставить savepoint с перехватом ошибок для каждой отдельной записи, чтобы откатывать не всю порцию загрузки, а лишь обработку ошибочных. Только помните, что откатываются только изменения в таблицах, а некоторые другие вещи могут не откатываться, например, работа с переменными пакетов в Oracle или некоторые способы постановки в очереди или работа с файлами. Но это и при обычном rollback сохраняется.

Еще надо понимать, что при передаче документов со строками (например, больших накладных) принимающая система далеко не всегда работает корректно, если документ был обработан частично (даже если у строк есть отдельный атрибут-статус). При этом, что интересно, передающая система, наоборот, может не обеспечивать целостность обработки документов. И тогда надо принимать специальные меры для обеспечения согласованности.

У меня было два подобных кейса. В одном случае требовалось в принимающей системе снять ограничение, связанное с долгим проведением больших документов, потому что некоторые накладные включали тысячи позиций и обычный режим исполнения (целиком) приводил к долгим блокировкам. В этом случае для построчной обработки мы создали отдельную технологическую таблицу со статусами строк и очень аккуратно работали со статусом самого документа.

В другом случае складская система обрабатывала документы отгрузки наоборот, построчно, выдавая общий поток ответов по всем строкам всех входящих документов. Тогда как передающая система рассчитывала, что на каждый переданный документ придет целостный ответ. В этом случае нам пришлось создать специальный промежуточный сервис, который на входе запоминал, сколько строк имеет каждый передаваемый документ, фиксировал полученные ответы и передавал их обратно в отправляющую систему только когда по всем строкам получал ответ. Он также поднимал тревогу, когда ответ по некоторым строкам слишком долго задерживался. Эвристики для этого алгоритма мы прорабатывали отдельно, так как за формированием ответа стояла реальная обработка позиций документа людьми, которая, естественно, была далеко не мгновенной.

На этом я завершаю третью статью про интеграцию. Надеюсь, она помогла вам разобраться в различных вариантах взаимодействия систем, и это окажется полезным при проектировании, разработке и тестировании.

С 1 февраля стоимость очного участия в DevOpsConf 2021 составит 36000 рублей. Забронируйте билет сейчас, и у вас будет ещё несколько дней на оплату.

На данный момент Программный комитет одобрил уже около 40 докладов, но до 28 февраля ещё принимает заявки. Если вы хотите быть спикером, то подать доклад можно здесь.

Чтобы быть на связи, подписывайтесь на наши соцсети, чтобы не упустить важные новости о конференции и наших онлайн-событиях VK, FB, Twitter, Telegram-канал, Telegram-чат.

Подробнее..

Анатомия backpressure в реактивных потоках

09.08.2020 22:07:12 | Автор: admin
Читая многочисленные статьи по теме реактивных потоков, читатель может прийти к выводу, что:
backpressure это круто
backpressure доступно только в библиотеках, реализующих спецификацию reactive streams
эта спецификация настолько сложна, что не стоит и пытаться ее реализовать самому

В этой статье я попытаюсь показать, что:
backpressure это очень просто
для реализации асинхронного backpressure достаточно сделать асинхронный вариант семафора
при наличии реализации асинхронного семафора, интерфейс org.reactivestreams.Publisher реализуется в несколько десятков строк кода

Backpressure (обратное давление) это обратная связь, регулирующая скорость работы производителя данных для согласования со скоростью работы потребителя. В отсутствие такой связи более быстрый производитель может переполнить буфер потребителя либо, если буфер безразмерный, исчерпать всю оперативную память.

В многопоточном программировании эта проблема была решена Дейкстрой, предложившим новый механизм синхронизации семафор. Семафор можно трактовать как счетчик разрешений. Предполагается, что перед совершением ресурсоемкого действия производитель запрашивает разрешение у семафора. Если семафор пуст, то поток производителя блокируется.

Асинхронные программы не могут блокировать потоки, поэтому не могут обращаться за разрешением к пустому семафору (но все остальные операции с семафором делать могут). Блокировать свое исполнение они должны другим способом. Этот другой способ заключается в том, что они просто покидают рабочий поток, на котором исполнялись, но перед этим организуют свое возвращение к работе, как только семафор наполнится.

Наиболее элегантный способ организовать приостановку и возобновление работы асинхронной программы это структурировать ее как dataflow актор с портами:
A dataflow model actors with ports, the directed connections between their ports, and initial tokens. Взято из: A Structured Description Of Dataflow Actors And Its Application

Порты бывают входными и выходными. Во входные порты поступают токены (сообщения и сигналы) от выходных портов других акторов. Если входной порт содержит токены, а выходной имеет место для размещения токенов, то он считается активным. Если все порты актора активны, он отправляется на исполнение. Таким образом, при возобновлении своей работы программа актора может безопасно считывать токены из входных портов и записывать в выходные. В этом нехитром механизме таится вся мудрость асинхронного программирования. Выделение портов как отдельных подобъектов акторв резко упрощает кодирование асинхронных программ и позволяет увеличить их разнообразие комбинированием портов разных типов.

Классический актор Хьюитта содержит 2 порта один видимый, с буфером для входящих сообщений, другой скрытый бинарный, блокирующийся, когда актор отправляется на исполнение и, таким образом, препятствующий повторному запуску актора до окончания первоначального запуска. Искомый асинхронный семафор нечто среднее между этими двумя портами. Как и буфер для сообщений, он может хранить много токенов, и как у скрытого порта, эти токены черные, то есть неразличимые, как в сетях Петри, и для их хранения достаточно счетчика токенов.

На первом уровне иерархии у нас определен класс AbstractActor c тремя вложенными классами базовый Port и производные AsyncSemaPort и InPort, a также с механизмом запуска актора на исполнение при отсутствии заблокированных портов. Вкратце это выглядит так:
public abstract class AbstractActor {    /** счетчик заблокированных портов */    private int blocked = 0;    protected synchronized void restart() {            controlPort.unBlock();    }    private synchronized void incBlockCount() {        blocked++;    }    private synchronized void decBlockCount() {        blocked--;        if (blocked == 0) {            controlPort.block();            excecutor.execute(this::run);        }    }    protected abstract void turn() throws Throwable;    /** головной метод */    private void run() {        try {            turn();            restart();        } catch (Throwable throwable) {            whenError(throwable);        }    }}

В него вложен минимальный набор классов-портов:
Port базовый класс всех портов
    protected  class Port {        private boolean isBlocked = true;        public Port() {            incBlockCount();        }        protected synchronized void block() {            if (isBlocked) {                return;            }            isBlocked = true;            incBlockCount();        }        protected synchronized void unBlock() {            if (!isBlocked) {                return;            }            isBlocked = false;            decBlockCount();        }    }

Асинхронный семафор:
    public class AsyncSemaPort extends Port {        private long permissions = 0;        public synchronized void release(long n) {            permissions += n;            if (permissions > 0) {                unBlock();            }        }        public synchronized void aquire(long delta) {            permissions -= n;            if (permissions <= 0) {                 // поток исполнения не блокируется                // но актор не зайдет на следующий раунд исполнения,                // пока счетчик разрешений не станет опять положительным                block();            }        }    }

InPort минимальный буфер для одного входящего сообщения:
    public class InPort<T> extends Port implements OutMessagePort<T> {        private T item;        @Override        public void onNext(T item) {            this.item = item;            unBlock();        }        public synchronized T poll() {            T res = item;            item = null;            return res;        }    }

Полную версию класса AbstractActor можно посмотреть здесь.

На следующем уровне иерархии мы имеем три абстрактных актора с определенными портами, но с неопределенной процедурой обработки:
  • класс AbstractProducer это актор с одним портом типа асинхронный семафор (и внутренним контрольным портом, присутствует у всех акторов по умолчанию).
  • класс AbstractTransformer обычный актор Хьюита, со ссылкой на входной порт следующего актора в цепочке, куда он отправляет преобразованные токены.
  • класс AbstractConsumer также обычный актор, но преобразованные токены он никуда не отправляет, при этом он имеет ссылку на семафор производителя, и открывает этот семафор после поглощения входного токена. Таким образом, количество находящихся в обработке токенов поддерживается постоянным, и никакого переполнения буферов не происходит.

На последнем уровне, уже в директории test, определены конкретные акторы, используемые в тестах:
  • класс ProducerActor генерирует конечный поток целых чисел.
  • класс TransformerActor принимает очередное число из потока и отправляет его дальше по цепочке.
  • класс ConsumerActor принимает и печатает полученные числа

Теперь мы можем построить цепочку асинхронных, параллельно работающих обработчиков следующим образом: производитель любое количество трансформеров потребитель



Тем самым мы реализовали backpressure, и даже в более общем виде, чем в спецификации reactive streams обратная связь может охватывать произвольное число каскадов обработки, а не только соседние, как в спецификации.

Чтобы реализовать спецификацию, надо определить выходной порт, чувствительный к количеству переданных ему с помощью метода request() разрешений это будет Publisher, и дополнить уже существующий InPort вызовом этого метода это будет Subscriber. То есть, мы принимаем, что интерфейсы Publisher и Subscriber описывают поведение портов, а не акторов. Но судя по тому, что в списке интерфейсов присутствует также Processor, который никак не может быть интерфейсом порта, авторы спецификации считают свои интерфейсы интерфейсами акторов. Ну что же, мы можем сделать акторы, реализующие все эти интерфейсы, с помощью делегирования исполнения интерфейсных функций соответствующим портам.

Для простоты пусть наш Publisher не имеет собственного буфера и будет писать сразу в буфер Subscriber'а. Для этого нужно, чтобы какой-либо Subscriber подписался и выполнил request(), то есть, у нас есть 2 условия и, соответственно, нам нужно 2 порта InPort<Subscriber> и AsyncSemaPort. Ни один из них не подходит в качестве базового для реализации Publisher'а, так как содержит ненужные методы, поэтому мы сделаем эти порты внутренними переменными:

public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {    protected AbstractActor.InPort<Subscriber<? super T>> subscriber;    protected AbstractActor.AsyncSemaPort sema;    public ReactiveOutPort(AbstractActor actor) {        subscriber = actor.new InPort<>();        sema = actor.new AsyncSemaPort();    }}

На этот раз мы определили класс ReactiveOutPort не как вложенный, поэтому ему понадобился параметр конструктора ссылка на объемлющий актор, чтобы создать экземпляры портов, определенных как вложенные классы.

Метод subscribe(Subscriber subscriber) сводится к сохранению подписчика и вызову subscriber.onSubscribe():
    public synchronized void subscribe(Subscriber<? super T> subscriber) {        if (subscriber == null) {            throw new NullPointerException();        }        if (this.subscriber.isFull()) {            subscriber.onError(new IllegalStateException());            return;        }        this.subscriber.onNext(subscriber);        subscriber.onSubscribe(this);    }

что обычно приводит к вызову Publisher.request(), который сводится к поднятию семафора с помощью вызова AsyncSemaPort.release():
    public synchronized void request(long n) {        if (subscriber.isEmpty()) {            return; // this spec requirement        }        if (n <= 0) {            subscriber.current().onError(new IllegalArgumentException());            return;        }        sema.release(n);    }

И теперь нам осталось не забыть опускать семафор с помощью вызова AsyncSemaPort.aquire() в момент использования ресурса:
    public synchronized void onNext(T item) {        Subscriber<? super T> subscriber = this.subscriber.current();        if (subscriber == null) {            throw  new IllegalStateException();        }        sema.aquire();        subscriber.onNext(item);    }


Проект AsyncSemaphore был специально разработан для этой статьи. Он намеренно сделан максимально компактным, чтобы не утомлять читателя. Как результат, он содержит существенные ограничения:
  • Одновременно к Publisher'у может быть подписано не более одного Subscriber
  • размер входного буфера Subscriber'а равен 1

Кроме того, AsyncSemaPort не является полным аналогом синхронного семафора только один клиент может выполнять операцию aquire() у AsyncSemaPort (имеется в виду объемлющий актор). Но это не является недостатком AsyncSemaPort хорошо выполняет свою роль. В принципе, можно сделать и по другому взять java.util.concurrent.Semaphore и дополнить его асинхронным интерфейсом подписки (см AsyncSemaphore.java из проекта DF4J). Такой семафор может связывать акторы и потоки исполнения в любом порядке.

Вообще, каждый вид синхронного (блокирующего) взаимодействия имеет свой асинхронный (неблокирующий) аналог. Так, в том же проекте DF4J имеется реализация BlockingQueue, дополненная асинхронным интерфейсом. Это открывает возможность поэтапного преобразования многопоточной программы в асинхронную, по частям заменяя потоки на акторы.
Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru