Русский
Русский
English
Статистика
Реклама

Actor model

Релиз акторного фреймворка rotor v0.09 (c)

10.10.2020 20:06:06 | Автор: admin

actor system


rotor ненавязчивый С++ акторный микрофремворк, похожий на своих старших братьев caf и sobjectizer. В новом релизе внутреннее ядро полностью было переделано с помощью механизмов плагинов, так что это затронуло жизненный цикл акторов.


Связывание акторов


Всякая система акторов базируется на взаимодействии между ними, т.е. в отправлении сообщений друг другу (а также в возможных побочных эффектах в качестве реакции на эти сообщения или в создании новых сообщений, появляющихся в качестве реакции на события внешнего мира). Однако, чтобы сообщение было доставлено целевому актору, он должен оставаться активным (1); другими словами, если актор A собирается отправить сообщение М актору B, он должен быть уверен, что актор B онлайн и не будет выключен в процессе пересылки сообщения M.


До версии v0.09 подобная гарантия была только для отношений родитель/потомок, между супервайзером и дочерним актором, т. к. для последнего выполняется гарантия того, сообщение будет доставлено до его супервайзера в силу того, что супервайзер владеет дочерним актором, и его время жизни покрывает времена жизни всех своих дочерних акторов. Начиная с версии v0.09 появилась возможность связывания двух произвольных акторов A и B, так что после подтверждения связи (link), можно быть уверенным, что все последующие сообщения будут доставлены.


Для связывания акторов можно использовать такой код:


namespace r = rotor;void some_actor_t::on_start() noexcept override {    request<payload::link_request_t>(b_address).send(timeout);}void some_actor_t::on_link_response(r::message::link_response_t &response) noexcept {    auto& ec = message.payload.ec;    if (!ec) {        // успех связывания    }}

Однако, данный код не рекомендуется использовать напрямую потому что он не очень удобен. Это становится очевидным при попытке связать актор A с двумя и более акторами, т.к. some_actor_t должен будет вести внутренний счётчик успешных связываний. В данном случае помогает система плагинов, и код будет уже выглядеть так:


namespace r = rotor;void some_actor_t::configure(r::plugin::plugin_base_t &plugin) noexcept override {    plugin.with_casted<r::plugin::link_client_plugin_t>(        [&](auto &p) {            p.link(B1_address);            p.link(B2_address);        }    );}

Это более удобно в виду того, что плагин link_client_plugin_t поставляется в базовом классе всех акторов actor_base_t. Тем не менее, это скорей всего не всё, что хотелось бы иметь, т.к. остаются не отвеченными важные вопросы: 1) Когда происходит связывание акторов (и обратный вопрос когда происходит их разъединение)? 2) Что случится, если целевой актор ("сервер") не существует или откажет в связывании? 3) Что случится если целевой актор решит выключиться, в то время как есть связанные с ним акторы-клиенты?


Чтобы ответить на этот вопрос нужно рассмотреть жизненный цикл актора.


Асинхронная инициализация и выключение актора


Упрощённо жизненный цикл актора (состояние, state) выглядит следующим образом: new (ctor) -> initializing -> initialized -> operational -> shutting down -> shut down.



Основная работа осуществляется актором, когда он находится в состоянии operational, и здесь собственно пользователь фреймворка должен решить, чем именно будет заниматься актор.


Во время фазы инициализации (I-фазы, т.е. initializing -> initialized), актор подготавливает себя для будущей работы: находит и связывается с другими акторами, устанавливает соединение с БД, получает необходимые ему ресурсы для полноценной работы. Ключевая особенность rotor'а, что I-фаза асинхронна, т. е. актор сообщает супервайзеру, когда он готов (2).


Фаза выключения (S-фаза, т.е. shutting down -> shut down ) комплиментарна I-фазе, т.е. актора просят выключится, а когда он готов, он сообщает об этом своему супервайзеру.


Несмотря на кажущуюся простоту, основная сложность лежит здесь в масштабируемости (composability) подхода, при котором акторы формируют эрланго-подобные иерархии ответственностей (см. мою статью Trees of Supervisors). Перефразируя, можно сказать, что любой актор может дать сбой во время I- или S-фазы, что может повлечь за собой безопасный и ожидаемый коллапс всей иерархии независимо от местоположения актора в ней. Конечная цель в итоге это либо вся иерархия приходит в рабочее состояние (operational), либо она в конце концов становится выключенной (shut down).



(Пояснение к картинке. Сплошная линия обозначение отношение владения, пунктирная отношения связи).


rotor уникален в этом отношении. Ничего подобного нет в caf. Может создаться ошибочное представление, что в sobjectizer'е присутствует shutdown helper, предназначение которого аналогично S-фазе выше; однако, после публичных дискуссий с автором в англоязчыной версии статьи, выяснилось, что данные вспомогательные классы нужны для "длительного" (гарантированного) выключения акторов, даже если в Environment'е был вызван метод stop. С точки зрения sobjectizer'а отложенная инициализация (и выключение), аналогичные I- и S-фазам rotor'a, могут быть смоделированы с помощью встроенного механизма поддержки состояний и это обязанность пользователя фрейворка, если такова потребность его бизнес-логики. В rotor'е же это встроено в сам фреймворк.


При использовании rotor'а было обнаружено, что во время I-фазы (S-фазы) потенциально множество ресурсов должны быть получены (освобождены) асинхронно, что обозначает, что нет единого компонента, способного решить, что текущая фаза завершена. Вместо этого это решение есть плод совместных усилий, приложенных в определённом порядке. И здесь в игру вступают плагины, представляющие из себя атомарные кусочки, каждый из которых ответственен за собственную работу по инициализации или выключению.


Что же такое плагин в контексте rotor'а? Плагин это некий аспект поведения актора, определяющий реакцию актора на некоторые сообщения или группу сообщений. Лучше пояснить на примерах. Плагин init_shutdown, ответственен за инициализацию (выключение) актора, т. е. после опроса о готовности всех плагины, генерируется ответ на запрос о готовности инициализации (выключения); или, например, плагин child_manager, доступный только для супервайзеров, и ответственный за порождение дочерних акторов и всю машинерию связанную с этим, как то генерация запросов дочерним акторам на инициализацию, выключение и т. п. Несмотря на то, что существует возможность свои плагины, на текущий момент я не вижу необходимости в этом, поэтому она остаётся недокументированной.


Таким образом, обещанные ответы, относящиеся к link_client_plugin_t:


  • В: когда происходит связывание (отвязывание) актров? О: когда актор в состоянии initializing (shutting down).


  • В: что случится, если целевой актор не существует или отказывает в связывании? О: т. к. это случается во время инициализации актора, то плагин обнаружит это условие и начнёт выключение актора-клиента; также, возможно, это вызовет каскадный эффект, т.е. его супервайзер тоже выключится и так далее вверх по иерархии владения.


  • В: что случится, если целевой актор решит выключиться, при том, что с ним связаны активные акторы-клиенты? О: актор-сервер попросит клиентов отвязаться, и только когда все связанные клиенты подтвердят это, актор-сервер продолжит процедуру выключения (3).



Упрощённый пример


Будем предполагать, что имеется драйвер базы данных с асинхронным интерфейсом для одного из движков событий (event loop), доступных для rotor'а, а также что имеются TCP-клиенты, подключающиеся к нашему сервису. За обслуживание базы данных будет отвечать актор db_actor_t, а принимать клиентов будет acceptor_t. Начнём с первого:


namespace r = rotor;struct db_actor_t: r::actor_base_t {    struct resource {        static const constexpr r::plugin::resource_id_t db_connection = 0;    }    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        plugin.with_casted<r::plugin::registry_plugin_t>([this](auto &p) {            p.register_name("service::database", this->get_address())        });        plugin.with_casted<r::plugin::resources_plugin_t>([this](auto &) {            resources->acquire(resource::db_connection);            // инициировать асинхронное соединение с базой данных        });    }    void on_db_connection_success() {        resources->release(resource::db_connection);        ...    }    void on_db_disconnected() {        resources->release(resource::db_connection);    }    void shutdown_start() noexcept override {        r::actor_base_t::shutdown_start();        resources->acquire(resource::db_connection);        // асинхронное закрытие соединения с базой данных и сброс данных    }};

Внутреннее пространство имён resource используется для идентификации соединения с БД как ресурсом. Это общепринятая практика, чтобы не использовать в коде магические цифры вроде 0. Во время конфигурации, которая является частью инициализации, когда плагин registry_plugin_t готов, он асинхронно зарегистрирует адрес актора в регистре (о нём будет рассказано позже). Затем с помощью resources_plugin_t захватывается "ресурс" подключения к БД, чтобы блокировать дальнейшую инициализацию актора и начинается соединение с БД. Когда будет подтверждено соединение с БД, ресурс будет освобождён и актор db_actor_t перейдёт в рабочее состояние. S-фаза аналогична: блокируется выключение до тех пор, пока все данные не будут сброшены в БД и пока соединение не будет закрыто; после этого процедура выключения актора завершается (4).


Код актора, который будет принимать клиентов, выглядит приблизительно так:


namespace r = rotor;struct acceptor_actor_t: r::actor_base_t {    r::address_ptr_t db_addr;    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        plugin.with_casted<r::plugin::registry_plugin_t>([](auto &p) {            p.discover_name("service::database", db_addr, true).link();        });    }    void on_start() noexcept override {        r::actor_base_t::on_start();        // начать приём клиентов, например:        // asio::ip::tcp::acceptor.async_accept(...);    }    void on_new_client(client_t& client) {        // send<message::log_client_t>(db_addr, client)    }};

Основное в данном случае, это метод configure. Когда плагин registry_plugin_t готов, он будет сконфигурирован на обнаружение сервиса service::database, а когда адрес db_actor_t будет найден и сохранён в члене класса db_addr, то тогда с ним будет произведено связывание. Если же адрес актора service::database не будет обнаружен, то актор acceptor_actor_t начнёт выключаться (т. е. on_start не будет вызван). Если всё будет успешно проинициализировано, то актор начнёт принимать новых клиентов.


Собственно, сама операционная часть была опущена ради краткости, т. к. она не изменилась в новой версии rotor'а. Как обычно нужно определить полезную нагрузку (payload), сообщения, методы для работы с этими сообщениями и подписаться на них.


Скомпонуем всё вместе в файле main.cpp; будем считать, что используется boost::asio в качестве цикла событий.


namespace asio = boost::asio;namespace r = rotor;...asio::io_context io_context;auto system_context = rotor::asio::system_context_asio_t(io_context);auto strand = std::make_shared<asio::io_context::strand>(io_context);auto timeout = r::pt::milliseconds(100);auto sup = system_context->create_supervisor<r::asio::supervisor_asio_t>()               .timeout(timeout)               .strand(strand)               .create_registry()               .finish();sup->create_actor<db_actor_t>().timeout(timeout).finish();sup->create_actor<acceptor_actor_t>().timeout(timeout).finish();sup->start();io_context.run();

Как видно, в новом rotor'е активно используется шаблон builder. С помощью него создаётся корневой супервайзер sup, а уже он в свою очередь порождает 3 актора: два пользовательских (db_actor_t и acceptor_actor_t) и неявно созданный актор-регистр. Как обычно для акторных систем, все акторы слабо связаны друг с другом, т.к. они разделяют только общий интерфейс сообщений (опущено в статье).


Акторы "просто создаются" в данном месте, без знания о том, как они связаны между собой. Это следствие слабой связанности между акторами, которые с версии v0.09 стали более автономными.


Конфигурация выполнения могла бы быть полностью другой: акторы могли бы создаваться на разных потоках, на разных супервайзерах, и даже на различных движках, но их реализация оставалась бы по сути одной и той же (5). В этих случаях было бы несколько корневых супервайзеров, однако актор-регистр должен быть создан один, и его адрес разделён между всеми супервайзерами, чтобы акторы смогли найти друг друга. Для этого существует метод get_registry_address() в базовом супервайзере.


Итоги


Наиболее существенным изменением в новой версии rotor'а является разбиение на плагины его ядра. Наиболее важными для пользователя фрейморка являются плагины: link_client_plugin_t, позволяющий установить виртуальное соединение между акторами, плагин registry_plugin_t, дающий возможность регистрации и обнаружения адресов акторов по символическим именам, а также плагин resources_plugin_t, с помощью которого можно приостановить процедуру инициализации (выключения) до появления некоторого асинхронного внешнего события.


Так же присутствуют менее важные изменения в новом релизе, такие как система доступа к непубличным
свойствам и шаблон builder для интанцирования акторов.


Любая обратная связь приветствуется.


P.S. Я хотел бы поблагодарить Crazy Panda за поддержку в моих начинаниях по развитию данного проекта, а также автора sobjectizer'а за высказанные им критические замечания.


Примечания


(1) В настоящее время попытка доставки сообщения актору, супервайзер которого уже отработал и был удалён, ведёт к краху программы (UB).


(2) Если актор не подтвердит успешную инициализацию супервайзеру, сработает таймер инициализации, и супервайзер сделает запрос на выключение актора, т.е. состояние operational будет пропущено.


(3) Может возникнуть вопрос, что произойдёт, если актор не подтвердит отвязывание вовремя? Это нарушение контракта, и будет вызван метод system_context_t::on_error(const std::error_code&), который распечатает ошибку на консоль и вызовет std::terminate(). Для избегания нарушений контрактов, нужно настраивать таймеры, чтобы позволить акторам-клиентам во время отвязаться.


(4) Во время процедуры выключения плагин registry_plugin_t проинструктирует регистр, чтобы все зарегистрированные имена текущего актора были удалены из регистра.


(5) Исключение составляет, когда используются различные циклы событий. Если актор использует API цикла событий, то, очевидно, что смена цикла событий повлечёт переписывание внутренностей актора. Тем не менее, это никак не затронет использование API rotor'а.

Подробнее..

Не хочется ждать в очереди? Напишем свой диспетчер для SObjectizer с приоритетной доставкой

07.12.2020 12:09:16 | Автор: admin


SObjectizer это небольшой фреймворк для C++, который дает возможность разработчику использовать такие подходы, как Actor Model, Communicating Sequential Processes и Publish/Subscribe.


Одной из ключевых концепций в SObjectizer являются диспетчеры. Диспетчеры определяют где и как акторы (агенты в терминологии SObjectizer-а) обрабатывают свои события. Диспетчеры в SObjectizer бывают разных типов и пользователь может создавать в своем приложении столько разнообразных диспетчеров, сколько ему потребуется.


При этом у пользователя есть возможность написать свой собственный тип диспетчера, если ничего из стандартных диспетчеров ему не подходит. Два с половиной года назад уже рассказывалось о том, как можно сделать диспетчер для SObjectizer-а со специфическими свойствами.


Сегодня мы еще раз поговорим об этом. На примере уже другой задачи. Да и реализация будет отличаться, поскольку за прошедшее время SObjectizer успел обновиться сперва до версии 5.6, а затем и 5.7. И в этих версиях много отличий от версии 5.5, про которую в основном и рассказывалось в прошлом. В том числе и в механизме диспетчеров.


О решаемой задаче в двух словах


Предположим, что у нас есть агент, который ожидает два сообщения: msg_result с финальным результатом ранее начатой агентом операции и msg_status с промежуточной информацией о том, как протекает начатая операция.


Сообщения msg_status могут идти большим потоком. Например, на одно msg_result может приходиться до 1000 msg_status. И нам бы хотелось, чтобы когда в очереди уже стоит 900 сообщений msg_status новое сообщение msg_result вставало не в конец очереди, а в самое ее начало. Чтобы msg_result не ждало пока разгребутся 900 старых msg_status.



Вроде бы простая задача. Но с ее реализацией могут возникнуть неожиданные сложности. И чтобы понять, что это за сложности и как с ними справится, нужно посмотреть на SObjectizer-овские диспетчеры и понять, что это за звери такие. После чего можно будет обсудить возможные способы реализации доставки сообщений с учетом их приоритетов. Воплотить один из вариантов в жизнь и обсудить его реализацию.


Как диспетчеры связаны с политикой доставки сообщений до агентов?


Диспетчер в SObjectizer-е это сущность, которая определяет где и когда агенты будут обрабатывать свои сообщения.


Допустим, у нас есть агент Bob, который подписывается на сообщение msg_result из почтового ящика mbox. Когда в mbox отсылается msg_result, то это сообщение должно встать в очередь Bob-а, откуда оно будет рано или поздно извлечено и обработано. И вот здесь уже в полный рост встают диспетчеры, поскольку у агентов нет никаких очередей, тогда как у диспетчеров есть.


Когда агент регистрируется в SObjectizer-е, то агент привязывается к какому-то диспетчеру. И в момент привязки агенту дается указатель на сущность event_queue. Это интерфейс, за которым скрыта некая машинерия по передаче сообщения, адресованного агенту, именно тому диспетчеру, к которому агент привязан.


Когда сообщение передается диспетчеру, тот формирует заявку (demand) на обработку сообщения агентом и сохраняет заявку где-то у себя (очередь заявок диспетчера называется demand_queue).


Процесс доставки сообщения до агента в SObjectizer выглядит следующим образом:



Когда сообщение отсылается в mbox, то mbox видит Bob-а в подписчиках и говорит Bob-у: вот тебе новое сообщение. Bob берет это сообщение и передает его в свой event_queue. И уже этот event_queue формирует для диспетчера заявку (demand) на обработку сообщения для агента Bob. Заявка сохраняется в demand_queue диспетчера.


Диспетчер владеет одной или несколькими рабочими нитями, которые занимаются тем, что извлекают заявки из demand_queue и отправляют их на обработку. Эти рабочие нити диспетчера называются рабочим контекстом на котором будут работать агенты.


В общем, получается, что диспетчеры и предоставляют рабочий контекст для агентов, и хранят в своих demand_queue адресованные агентам сообщения.


А это означает, что когда нам нужна приоритетная доставка сообщений (т.е. msg_result вперед msg_status), то нам потребуется диспетчер, который эту самую приоритетную доставку реализует.


А приоритетов для сообщений в SObjectizer-5 и нет :(


Совсем.


Вот так вот.


Приоритеты для сообщений были в SObjectizer-4. Но они на практике использовались всего раз или два. Зато хлопот с их существованием и поддержкой было много.


Поэтому при разработке SObjectizer-5 от приоритетов для сообщений агента было решено отказаться. И за 10 лет развития и использования SObjectizer-5 пожалеть об этом пока что не пришлось.


Однако, время от времени задачи, в которых требуется какое-либо приоритетное обслуживание, все-таки встречаются. Поэтому некоторое время назад в SObjectizer-5 было добавлено понятие приоритета для агента. Именно для агента, а не для отдельного сообщения. Для поддержки приоритетов агентов в SObjectizer-5 появились три диспетчера, которые реализуют разные политики обслуживания приоритетов.


Так как можно решить задачу с msg_result и msg_status?


Использование двух агентов с разными приоритетами и диспетчера one_thread::strictly_ordered


Самое "простое" и "искаробочное" решение.


Логика агента A размазывается на двух агентов A_result и A_status. Агенты A_result и A_status получают разные приоритеты и привязываются к одному и тому же диспетчеру типа one_thread::strictly_ordered. Агент A_result подписывается на msg_result, тогда как агент A_status подписывается на msg_status.


Общие для агентов данные выносятся в какой-то общий объект, которыми они совместно владеют. Через shared_ptr, например. Получится что-то вроде:


struct A_data { ... };class A_result final : public so_5::agent_t {   std::shared_ptr<A_data> m_data;public:   A_result(context_t ctx, std::shared_ptr<A_data> data) {...}   void so_define_agent() override {      so_subscribe(m_data->m_mbox, &A_result::on_result);   }private:   void on_result(const msg_result & msg) {...}};class A_status final : public so_5::agent_t {   std::shared_ptr<A_data> m_data;public:   A_status(context_t ctx, std::shared_ptr<A_data> data) {...}   void so_define_agent() override {      so_subscribe(m_data->m_mbox, &A_status::on_status);   }private:   void on_status(const msg_status & msg) {...}};

Разделять общие данные между агентами A_result и A_status безопасно до тех пор, пока они работают на one_thread::strictly_ordered диспетчере.


Собственно говоря, это тот подход, который я бы и рекомендовал использовать, если вам потребовалась приоритетная обработка событий.


Но у этого способа есть очевидный недостаток: трудоемкость и размазывание логики по нескольким сущностям. Эта трудоемкость будет увеличиваться по мере увеличения количества сообщений, которые вам потребуется обрабатывать с разными приоритетами. Так что в какой-то момент может показаться, что двигаться в этом направлении слишком дорого и что нужно искать другое решение.


Собственный диспетчер с приоритетами для сообщений


Другой подход состоит в том, чтобы сделать собственный диспетчер, который будет доставлять сообщения до агентов-получателей с учетом приоритетов сообщений. Благо, если не заморачиваться вопросами мониторинга работы диспетчера и сбора статистики, то это делается не так уж и сложно.


Если такой диспетчер будет написан, то можно избавить себя от недостатков подхода с несколькими разноприоритетными агентами: вся логика будет находится в одном месте, поэтому добавлять новые обрабатываемые сообщения или избавляться от каких-то старых сообщений будет гораздо проще.


Именно по этому пути мы и пойдем в данной статье.


Что именно мы попытаемся сделать?


Мы попытаемся посмотреть на проблему немного шире (как это и положено русским программистам, которые ищут универсальные решения там, где можно обойтись методом грубой силы).


Начнем с того, что нам нужно собственный диспетчер. Который сможет упорядочивать сообщения в своей очереди согласно их приоритетов.


И тут возникает вопрос, а как должны определяться эти приоритеты?


А это зависит от задачи.


Где-то приоритеты будут определяться типом сообщения и эти приоритеты можно зафиксировать прямо в compile-time. Скажем, приоритет у msg_result единичка, а у msg_status нолик.


Где-то приоритеты могут зависеть от типа сообщения и агента-получателя. Скажем, для агента A сообщение msg_result будет иметь приоритет 1, а сообщение msg_status 0. Тогда как для агента L оба эти сообщения будут иметь приоритет 0.


Где-то приоритеты могут зависеть только от почтового ящика, из которого они были получены. Скажем все сообщения из mbox_A должны быть приоритетнее, чем сообщения из mbox_B.


А где-то приоритеты почтовых ящиков должны быть дополнены еще и приоритетами сообщений. Т.к. msg_result из mbox_A имеет приоритет 2, msg_status из mbox_A 1, msg_result из mbox_B 1, msg_status из mbox_B 0.


Ну и другие сочетания параметров так же возможны.


Значит ли это, что под каждое такое сочетание нужно делать свой диспетчер?


Скорее всего нет.


Возможно, диспетчер должен быть один, просто он должен уметь настраиваться на любой способ вычисления приоритета сообщений. Например, пользователь параметризует диспетчер объектом priority_detector который и определяет приоритет очередного сообщения: при получении очередной заявки диспетчер дергает priority_detector и располагает заявку в очереди согласно вычисленного priority_detector-ом приоритета.


Уже хорошо.


Но можно попробовать пойти еще дальше.


Диспетчеры были придуманы для того, чтобы дать возможность разработчику распределять своих агентов по тем рабочим контекстам, которые нужны разработчику. Скажем, захотел программист, чтобы агенты Alice и Bob работали на одной общей рабочей нити, значит зачем-то ему это нужно. Не суть важно зачем: для экономии ресурсов или для того, чтобы облегчить Alice и Bob совместную работу над общими разделяемыми данным. Разработчик хочет привязать Alice и Bob к общему контексту, значит он должен суметь это сделать.


А теперь представим себе, что Alice и Bob должны работать на общем рабочем контексте, но приоритеты доставки сообщений до Alice и Bob должны вычисляться по-разному. Скажем, приоритет сообщений для Alice зависит только от их типа. Тогда как приоритет для Bob-а зависит и от типа, и от почтового ящика.


Тут можно пойти, как минимум, двумя путями.


Первый путь. Диспетчер владеет одной приоритетной очередью. А мы пишем сложный priority_detector-а, который разбирается кто получатель сообщения, Alice или Bob, затем для каждого получателя определяет приоритет по тем или иным критериям.


Вполне понятное направление для движения. Но его трудоемкость будет расти по мере увеличения количества разнотипных агентов, которые нам нужно будет привязать к одному диспетчеру.


Второй путь. А что, если у каждого из агентов будет своя собственная приоритетная очередь? И в каждой очереди будет собственный priority_detector со своими правилами? Тогда мы сможем привязывать к одном диспетчеру агентов с разными политиками доставки сообщений. И в этом случае нам нужно научить диспетчера работать не с одной единственной очередью, в которую сваливается вообще все, а с набором отдельных очередей.


Мы пойдем вторым путем. Поскольку он открывает интересные возможности. Ведь если мы можем научить диспетчер обслуживать агентов с собственными очередями, то не обязательно это будут очереди с приоритетом. Например, это могут быть очереди фиксированного размера с автоматическим выбрасыванием самых старых (или самых новых) элементов при попытке добавления заявки в уже полную очередь. Или же очереди с контролем времени пребывания: скажем, если заявка простояла в очереди дольше 250ms, то она уже не актуальна и должна быть проигнорирована.


Демо проект so5_custom_queue_disps


Для иллюстрации предлагаемого решения на GitHub-е создан демо-проект so5_custom_queue_disps, который содержит исходные тексты обсуждаемого ниже диспетчера.


На данный момент в нем есть реализация только одного типа диспетчера one_thread, который привязывает всех агентов к одной единственной рабочей нити. Если данная тема кого-нибудь заинтересует, то туда же можно будет добавить и реализации thread_pool и/или adv_thread_pool диспетчеров. Для иллюстрации. Ну или для того, чтобы их можно было скопипастить, если вдруг кому-нибудь понадобятся.


Общая идея: список из непустых очередей


Идея, положенная в основу so5_custom_queue_disps, состоит в том, что есть N независимых друг от друга очередей заявок. В пределе у каждого агента может быть своя очередь заявок. Эти очереди живут до тех пор, пока кто-то их использует, как только очередь стала никому не нужна она уничтожается.


Диспетчер хранит у себя список указателей на очереди заявок. Если этот список пуст, значит все очереди заявок пусты. Как только в какую-то из них попадает заявка, эта очередь добавляется в конец списка диспетчера.


Диспетчер спит пока в его списке ничего нет. Но как только какая-то из очередей добавляется в список, то диспетчер просыпается и начинает обслуживать очереди из своего списка.


Диспетчер изымает из списка первую очередь. Извлекает заявку из очереди. Если после извлечения заявки очередь оказывается пустой, то диспетчер забывает про эту очередь. Если же в очереди что-то остается, то диспетчер возвращает эту очень в свой список, в самый его конец.


После чего диспетчер запускает обработку извлеченной заявки. А после завершения обработки проверяет свой список: если список не пуст, то обрабатывается следующий его элемент. Если пуст, то диспетчер засыпает.



Несколько агентов с одной очередью заявок и FIFO для агентов


Для большей гибкости в so5_custom_queue_disps есть возможность связать нескольких агентов с одной общей очередью заявок. Например, если агенты должны использовать общую политику доставки сообщений (скажем, у них общие приоритеты).


Но можно к one_thread диспетчеру привязать и агентов, у каждого из которых будет своя собственная очередь заявок. Получится, что агенты исполняются на одном рабочем контексте, но заявки для них диспетчер будет брать из разных очередей.


И тут возможны сюрпризы с обеспечением FIFO для агентов.


Предположим, что мы используем простые очереди заявок. Без приоритетов, без ограничения на размер, без выбрасывания чего-либо. Просто очереди.


Предположим, что агенты Alice и Bob разделяют одну общую очередь. И мы отсылаем сообщение M1 агенту Alice, а затем отсылаем сообщение M2 агенту Bob. Доставка сообщений до агентов произойдет в таком же порядке: сперва M1, затем M2.


Но вот если агенты Alice и Bob имеют независимые очереди заявок, то при отсылке вначале M1 агенту Alice, а затем M2 агенту Bob, порядок доставки может быть любым. Это зависит и от наполнения очередей заявок самих агентов, и от того, где эти очереди находились в списке непустых очередей диспетчера. Так что может случится и так, что M2 будет обработано агентом Bob еще до того, как M1 дойдет до Alice.


Эту особенность нужно иметь в виду при работе с таким диспетчером.


Как описанный выше подход выглядит для пользователя на практике?


Прежде чем перейти к рассмотрению деталей реализации custom_queue_disps::one_thread-диспетчера посмотрим на то, как выглядит использование этого диспетчера в коде:


so_5::launch( [](so_5::environment_t & env) {   env.introduce_coop( [](so_5::coop_t & coop) {      // (1)      auto queue = std::make_shared<dynamic_per_agent_priorities_t>();      // (2)      auto binder = custom_queue_disps::one_thread::make_dispatcher(            coop.environment() ).binder( queue );      // (3)      auto * alice = coop.make_agent_with_binder<demo_agent_t>(            binder, "Alice" );      auto * bob = coop.make_agent_with_binder<demo_agent_t>(            binder, "Bob" );      // (4)      queue->define_priority( alice,            typeid(demo_agent_t::hello),            dynamic_per_agent_priorities_t::low );      queue->define_priority( alice,            typeid(demo_agent_t::bye),            dynamic_per_agent_priorities_t::high );      queue->define_priority( bob,            typeid(demo_agent_t::hello),            dynamic_per_agent_priorities_t::high );      queue->define_priority( bob,            typeid(demo_agent_t::bye),            dynamic_per_agent_priorities_t::low );   } );} );

Здесь в точке (1) мы создаем отдельную очередь заявок, которая реализует доставку сообщений с учетом получателя сообщения и приоритета этого сообщения именно для этого получателя (класс dynamic_per_agent_priorities_t в деталях рассматривается ниже).


В точке (2) мы делаем сразу два действия:


  • во-первых, создаем новый экземпляр диспетчера;
  • во-вторых, получаем от этого диспетчера объект disp_binder, который нам нужен чтобы привязать агентов к этому диспетчеру. Этот disp_binder знает, что агенты, которых он привязывает к диспетчеру, будут совместно использовать очередь заявок, созданную в точке (1).

В точке (3) мы создаем двух агентов, каждый из которых будет привязан к one_thread-диспетчеру, созданному в точке (2).


В точке (4) определяются приоритеты для сообщений, которые обрабатываются агентами Alice и Bob. Можно увидеть, что одним и тем же сообщениям назначаются разные приоритеты. Соответственно, даже если сообщения отсылаются агентам единовременно, обрабатываться сообщения будут в разном порядке. В соответствии со своими приоритетами. А это как раз то, что нам и нужно.


Некоторые пояснения касательно деталей реализации so5_custom_queue_disps


Базовый класс demand_queue_t и его наследники


Итак, нам нужно иметь возможность привязать к диспетчеру агентов, использующих совершенно разные очереди заявок. Поэтому, с одной стороны, диспетчер должен уметь работать с непохожими друг на друга очередями. И, с другой стороны, пользователь должен иметь возможность без проблем подсунуть диспетчеру свою реализацию очереди.


Для этого предназначен интерфейс demand_queue_t.


Публичная часть demand_queue_t


Публичная часть demand_queue_t выглядит так:


      [[nodiscard]]      virtual bool      empty() const noexcept = 0;      [[nodiscard]]      virtual std::optional<so_5::execution_demand_t>      try_extract() noexcept = 0;      virtual void      push( so_5::execution_demand_t demand ) = 0;

Это как раз те методы, которые пользователь должен переопределить в своем классе очереди заявок для того, чтобы диспетчер с этой очередью заявок смог работать.


Надеюсь, что смысл методов empty() и push() очевиден, поэтому на них останавливаться не буду (если что, то отвечу на вопросы в комментариях). А вот по поводу возврата std::optional из try_extract() можно сказать пару слов.


Может показаться, что если диспетчер обращается к очереди заявок только когда очередь не пуста, то достаточно иметь только один метод extract(), который возвращает первую заявку из очереди (конструктуры/операторы копирования/перемещения для execution_demand_t не бросают исключений, поэтому можно обойтись одним extract() вместо пары front()+pop()).


Но в таком случае мы теряем некоторую долю гибкости. Скажем, возможность игнорировать заявки, которые ждали в очереди слишком долго. Тогда как возврат std::optional дает нам такую возможность. Поэтому диспетчер ожидает следующего поведения от очереди заявок:


  • если empty() возвращает false, то диспетчер может безопасно вызывать try_extract();
  • try_extract() может вернуть пустой std::optional даже если очередь была непустой. Это всего лишь означает, что актуальной заявки для обработки на самом деле не оказалось. Поэтому диспетчер просто идет дальше.

thread-safety для demand_queue_t


Диспетчер вызывает методы empty/try_extract/push только под своим собственным mutex-ом. Поэтому, если очередь заявок модифицирует свое состояние только в этих методах, то об обеспечении thread-safety можно не беспокоится, она обеспечивается диспетчером автоматически.


Однако, если внутри empty/try_extract/push требуется доступ к информации, которая каким-то образом может модифицироваться еще какими-то методами, то тогда ответственность за обеспечение thread-safety для этой дополнительной информации ложится на разработчика очереди заявок. Пример этого мы увидим ниже, когда будем говорить о классе dynamic_per_agent_priorities_t.


Специальные приоритеты для заявок evt_start и evt_finish


Если реализация очереди заявок выполняет переупорядочивание заявок согласно приоритетам или если реализация очереди заявок выбрасывает какие-то заявки из очереди, то следует учесть факт существования двух типов заявок: evt_start и evt_finish.


Заявка evt_start ставится в очередь заявок самой первой, прямо во время привязки агента к диспетчеру. Именно благодаря этой заявке у агента вызывается виртуальный метод so_evt_start.


Заявка evt_finish является самой последней заявкой для агента. Происходит это в процессе дерегистрации агента. И evt_finish должна быть самой последней заявкой, которая для агента будет выполнена. Никакие другие заявки после обработки evt_finish для агента обрабатываться не должны.


Так вот, поскольку это архиважные заявки для жизненного цикла агента, то выбрасывать их категорически нельзя. Это должно быть учтено при реализации очереди заявок.


Если заявки упорядочиваются в очереди согласно приоритетам, то у заявки evt_start должен быть наивысший приоритет. А у заявки evt_finish самый низкий.


В SObjectizer 5.5, 5.6 и 5.7 заявки evt_start и evt_finish можно различить не по execution_demand_t::m_msg_type, как для обычных заявок. А по execution_demand_t::m_demand_handler. Ниже будет показано, как именно это происходит. Возможно, в SObjectizer-5.8 будет применен единообразный подход и все будет идентифицироваться посредством execution_demand_t::m_msg_type. Но в ближайших планах ветки 5.8 нет от слова совсем :)


Написание собственного demand_queue_t


В so5_custom_queue_disps_demo есть три реализации собственных очередей заявок, начиная от самой тривиальной до более-менее сложной. В этой статье мы рассмотрим два крайних случая.


Простейший случай: simple_fifo_t


Простейшая реализация использует обычную FIFO очередь без всяких изысков. Поэтому ее реализация тривиальна и компактна:


class simple_fifo_t final : public custom_queue_disps::demand_queue_t   {      std::queue< so_5::execution_demand_t > m_queue;   public:      simple_fifo_t() = default;      [[nodiscard]]      bool      empty() const noexcept override { return m_queue.empty(); }      [[nodiscard]]      std::optional<so_5::execution_demand_t>      try_extract() noexcept override         {            std::optional<so_5::execution_demand_t> result{               std::move(m_queue.front())            };            m_queue.pop();            return result;         }      void      push( so_5::execution_demand_t demand ) override         {            m_queue.push( std::move(demand) );         }   };

Поскольку мы обеспечиваем строгий FIFO и ничего не выбрасываем, то нам здесь даже не нужно задумываться о существовании заявок evt_start/evt_finish.


Наиболее сложный случай: dynamic_per_agent_priorities_t


Наиболее сложная реализация использует динамически задаваемые приоритеты, которые назначаются агенту в зависимости от типа получаемого сообщения.


Для того, чтобы хранить описания заданных пользователем приоритетов потребуются следующие типы данных и члены класса dynamic_per_agent_priorities_t:


using type_to_prio_map_t = std::map< std::type_index, priority_t >;using agent_to_prio_map_t =            std::map< so_5::agent_t *, type_to_prio_map_t >;std::mutex m_prio_map_lock;agent_to_prio_map_t m_agent_prios;

Можно увидеть std::mutex который и будет задействован для обеспечения thread-safety при работе с m_agent_prios.


Здесь нам нужно использовать очередь с приоритетами. И, дабы воспользоваться std::priority_queue из стандартной библиотеки, мы будем хранить в очереди не so_5::execution_demand_t, а собственный класс:


struct actual_demand_t   {      so_5::execution_demand_t m_demand;      priority_t m_priority;      actual_demand_t(         so_5::execution_demand_t demand,         priority_t priority )         :  m_demand{ std::move(demand) }         ,  m_priority{ priority }         {}      [[nodiscard]]      bool      operator<( const actual_demand_t & o ) const noexcept         {            return m_priority < o.m_priority;         }   };

Далее, для реализации метода push(), в котором новая заявка должна встать в очередь согласно своему приоритету, нам потребуется определить тип заявки, кому она адресуется и какой из всего этого получается приоритет:


[[nodiscard]]priority_thandle_new_demand_priority( const so_5::execution_demand_t & d ) noexcept   {      if( so_5::agent_t::get_demand_handler_on_start_ptr()            == d.m_demand_handler )         return highest;      if( so_5::agent_t::get_demand_handler_on_finish_ptr()            == d.m_demand_handler )         {            std::lock_guard< std::mutex > lock{ m_prio_map_lock };            m_agent_prios.erase( d.m_receiver );            return lowest;         }      {         std::lock_guard< std::mutex > lock{ m_prio_map_lock };         auto it_agent = m_agent_prios.find( d.m_receiver );         if( it_agent != m_agent_prios.end() )            {               auto it_msg = it_agent->second.find( d.m_msg_type );               if( it_msg != it_agent->second.end() )                  return it_msg->second;            }      }      return normal;   }

Сперва мы проверяем, является ли заявка заявкой типа evt_start. Если да, то ей присваивается наивысший приоритет.


Далее такая же проверка делается для заявки типа evt_finish. Но если пришла заявка evt_finish, то кроме назначения ей наименьшего приоритета мы делаем дополнительное действие: удаляем информацию о приоритетах для этого агента. Т.к. эта информация больше не потребуется, других заявок уже не будет.


Также в коде метода handle_new_demand_priority() можно обратить внимание на захват mutex-а в тех местах, где нам требуется модифицировать информацию о приоритетах. Это необходимо делать, т.к. эта информация модифицируется/используется не только при работе push(), но и при работе метода define_priority() о котором диспетчер не знает.


Вот, собственно, и все особенности. Остальная часть тривиальна:


voiddefine_priority(   so_5::agent_t * receiver,   std::type_index msg_type,   priority_t priority )   {      std::lock_guard< std::mutex > lock{ m_prio_map_lock };      m_agent_prios[ receiver ][ msg_type ] = priority;   }[[nodiscard]]boolempty() const noexcept override { return m_queue.empty(); }[[nodiscard]]std::optional<so_5::execution_demand_t>try_extract() noexcept override   {      std::optional<so_5::execution_demand_t> result{         m_queue.top().m_demand      };      m_queue.pop();      return result;   }voidpush( so_5::execution_demand_t demand ) override   {      const auto prio = handle_new_demand_priority( demand );      m_queue.emplace( std::move(demand), prio );   }

Приватная часть demand_queue_t


Кроме описанной выше публичной части demand_queue_t есть еще и "приватная" часть, которая предназначена для использования только со стороны диспетчера:


class demand_queue_t   {      demand_queue_t * m_next{ nullptr };   public:      [[nodiscard]]      demand_queue_t *      next() const noexcept { return m_next; }      void      set_next( demand_queue_t * q ) noexcept { m_next = q; }      void      drop_next() noexcept { set_next( nullptr ); }

Эта часть нужна для того, чтобы диспетчер мог провязывать непустые очереди заявок в интрузивный односвязный список.


dispatcher_handle. Что это и зачем?


После того, как с очередями разобрались, можно перейти к самому диспетчеру. И первое, с чем мы сталкиваемся, так это с тем, что в SObjectizer 5.6 и 5.7 нет никакого явного интерфейса для диспетчера, как это было в SObjectizer 5.5 и более ранних версиях. Т.е. реализовать диспетчер можно как угодно и в виде чего угодно (в SO-5.5 же диспетчер должен был наследоваться от специального класса dispatcher_t).


В SO-5.6/5.7 пользователь взаимодействует с диспетчерами посредством двух сущностей: dispatcher_handle и disp_binder. При disp_binder мы поговорим ниже, а пока рассмотрим dispatcher_handle.


За создание экземпляра диспетчера в SO-5.6/5.7 обычно отвечает функция-фабрика make_dispatcher(). Эта функция должна что-то возвратить, но что, если для диспетчера в современном SObjectizer-е нет никакого C++ного интерфейса?


А вот некий дескриптор/хэндл и возвращается. Именно это и называется dispatcher_handle.


Dispatcher_handle может рассматриваться как shared_ptr для какого-то неизвестного пользователю типа. И работать dispatcher_handle должен именно как shared_ptr: пока есть хотя бы один непустой dispatcher_handle, ссылающийся на экземпляр диспетчера, этот экземпляр будет существовать и будет работать.


Обычно у dispacher_handle есть публичный метод binder() который создает экземпляр disp_binder-а для этого диспетчера. Возможно, у dispatcher_handle есть и другие методы, специфические для конкретного типа диспетчера. Но обычно это binder() и несколько методов, делающих dispatcher_handle похожим на shared_ptr.


В рассматриваемой реализации у dispatcher_handler минималистичный интерфейс:


namespace impl{class dispatcher_t;using dispatcher_shptr_t = std::shared_ptr< dispatcher_t >;class dispatcher_handle_maker_t;} /* namespace impl */class [[nodiscard]] dispatcher_handle_t   {      friend class impl::dispatcher_handle_maker_t;      impl::dispatcher_shptr_t m_disp;      dispatcher_handle_t( impl::dispatcher_shptr_t disp );      [[nodiscard]]      bool      empty() const noexcept;   public :      dispatcher_handle_t() noexcept = default;      [[nodiscard]]      so_5::disp_binder_shptr_t      binder( demand_queue_shptr_t demand_queue ) const;      [[nodiscard]]      operator bool() const noexcept { return !empty(); }      [[nodiscard]]      bool      operator!() const noexcept { return empty(); }      void      reset() noexcept;   };

Функция-фабрика make_dispatcher() для нашего one_thread-диспетчера будет возвращать экземпляр dispatcher_handler именно этого типа.


disp_binder. Что это и что нам нужно от disp_binder для one_thread-диспетчера?


Выше мы уже несколько раз упоминали привязку агентов к диспетчерам. Теперь же настало время поговорить об это в подробностях.


И вот тут в дело вступают специальные объекты под названием disp_binder-ы. Они служат как раз для того, чтобы привязать агента к диспетчеру при регистрации кооперации с агентом. А также для того, чтобы отвязать агента от диспетчера при дерегистрации кооперации.



В SObjectizer определен интерфейс, который должны поддерживать все disp_binder-ы. Конкретные же реализации disp_binder-ов зависят от конкретного типа диспетчера. И каждый диспетчер реализует свои собственные disp_binder-ы.


Начиная с версии 5.6 интерфейс этот выглядит следующим образом:


class disp_binder_t   : private std::enable_shared_from_this< disp_binder_t >{   public:      disp_binder_t() = default;      virtual ~disp_binder_t() noexcept = default;      virtual void      preallocate_resources( agent_t & agent ) = 0;      virtual void      undo_preallocation( agent_t & agent ) noexcept = 0;      virtual void      bind( agent_t & agent ) noexcept = 0;      virtual void      unbind( agent_t & agent ) noexcept = 0;};

Три первых метода, preallocate_resources(), undo_preallocation() и bind() используются при привязке агента к диспетчеру.


Их три поскольку регистрация агента это достаточно сложный процесс, который SObjectizer пытается провести в транзакционной манере. Т.е. либо все агенты кооперации успешно регистрируются, либо не регистрируется не один из них. Для обеспечения этой транзакционности процесс регистрации разбит на несколько стадий.


На первой стадии SObjectizer пытается выделить все ресурсы, необходимые для агентов из новой кооперации. Например, какие-то диспетчеры должны создать для новых агентов новые рабочие нити. Как раз на этой стадии у disp_binder-а вызывается preallocate_resources(). В этом методе disp_binder должен создать все, что агенту потребуется для работы внутри SObjectizer (например, новая рабочая нить, очередь заявок и т.д.). Если все это создалось нормально, то disp_binder должен сохранить это у себя до тех пор, пока у него не вызовут метод bind() для этого же агента.


На стадии резервирования ресурсов могут возникнуть проблемы. Например, не запустится новая рабочая нить. Или не хватит памяти для создания еще одной очереди заявок.


При возникновении подобных проблем все, что было сделано до этого момента, нужно откатить. Для чего и предназначен метод undo_preallocation(). Если у disp_binder-а вызывается метод undo_preallocation(), то disp_binder должен освободить все ресурсы для агента, которые ранее были зарезервированы в preallocate_resources().


Если же стадия резервирования ресурсов завершилась успешно, то выполняется стадия собственно привязки агентов к диспетчерам. И вот здесь уже у disp_binder-а вызывается метод bind(). В этом методе disp_binder обязательно должен вызывать у привязываемого агента метод so_bind_to_dispatcher().


Метод bind() не случайно помечен как noexcept, т.к. на этой стадии исключений SObjectizer не ожидает (а если таковое возникнет, то восстановиться уже не получится).


Метод unbind(), очевидно, используется уже когда кооперация дерегистрируется и агент завершил все свои активности на рабочем контексте (включая и обработку evt_finish). Так что в unbind() disp_binder должен освободить все ресурсы, которые были выделены для агента в preallocate_resources().


Возможно звучит все это сложновато. Но в данном случае реализация disp_binder-а оказывается очень простой:


class actual_disp_binder_t final : public so_5::disp_binder_t   {      actual_event_queue_t m_event_queue;   public:      actual_disp_binder_t(         demand_queue_shptr_t demand_queue,         dispatcher_data_shptr_t disp_data ) noexcept         :  m_event_queue{ std::move(demand_queue), std::move(disp_data) }         {}      void      preallocate_resources(         so_5::agent_t & /*agent*/ ) override         {}      void      undo_preallocation(         so_5::agent_t & /*agent*/ ) noexcept override         {}      void      bind(         so_5::agent_t & agent ) noexcept override         {            agent.so_bind_to_dispatcher( m_event_queue );         }      void      unbind(         so_5::agent_t & /*agent*/ ) noexcept override         {}   };

Все, что данный disp_binder должен сделать это вызывать so_bind_to_dispatcher(). Никакого резервирования ресурсов выполнять не нужно. Т.к. единственный ресурс это экземпляр actual_event_queue_t, который автоматически создается вместе с disp_binder-ом.


one_thread-диспетчер


Рассматриваемый нами one_thread-диспетчер состоит из трех частей.


Во-первых, это структура dispatcher_data_t, которая хранит необходимые диспетчеру данные:


struct dispatcher_data_t   {      std::mutex m_lock;      std::condition_variable m_wakeup_cv;      bool m_shutdown{ false };      demand_queue_t * m_head{ nullptr };      demand_queue_t * m_tail{ nullptr };   };using dispatcher_data_shptr_t =      std::shared_ptr< dispatcher_data_t >;

Эти данные выделены в отдельную структуру для того, чтобы к ним можно было напрямую обращаться из event_queue, которая будет отвечать за сохранение заявок агентов в очереди заявок и за добавление ссылки на непустую очередь заявок в список диспетчерах.


Так что вторая часть диспетчера это реализация интерфейса event_queue_t для one_thread-диспетчера:


class actual_event_queue_t final : public so_5::event_queue_t   {      demand_queue_shptr_t m_demand_queue;      dispatcher_data_shptr_t m_disp_data;   public:      actual_event_queue_t(         demand_queue_shptr_t demand_queue,         dispatcher_data_shptr_t disp_data ) noexcept         :  m_demand_queue{ std::move(demand_queue) }         ,  m_disp_data{ std::move(disp_data) }         {}      void      push( so_5::execution_demand_t demand ) override         {            std::lock_guard< std::mutex > lock{ m_disp_data->m_lock };            auto & q = *m_demand_queue;            const bool queue_was_empty = q.empty();            q.push( std::move(demand) );            if( queue_was_empty )               {                  // В этом блоке кода исключений быть не должно.                  [&]() noexcept {                     const bool disp_was_sleeping =                           (nullptr == m_disp_data->m_head);                     if( disp_was_sleeping )                        {                           m_disp_data->m_head = m_disp_data->m_tail = &q;                           m_disp_data->m_wakeup_cv.notify_one();                        }                     else                        {                           m_disp_data->m_tail->set_next( &q );                           m_disp_data->m_tail = &q;                        }                  }();               }         }   };

Именно экземпляр такой event_queue и хранится внутри описанного выше disp_binder-а.


Тут нужно отметить, что в actual_event_queue_t хранится два умных указателя. Один на demand_queue, второй на экземпляр dispatcher_data_t. Тем самым контролируется время жизни этих сущностей. Т.е. и demand_queue, и dispatcher_data_t живут до тех пор, пока есть хотя бы один actual_event_queue_t. А поскольку actual_event_queue_t является частью disp_binder-а, то время жизни demand_queue и dispatcher_data_t определяется временем жизни disp_binder-ов. Когда все disp_binder-у исчезнут, пропадет и надобность в demand_queue и dispatcher_data_t.


Третья часть диспетчера это собственно dispatcher_t, который запускает единственную рабочую нить и использует ее для обработки заявок. Полный код класса dispatcher_t можно увидеть в репозитории. Здесь же мы посмотрим лишь на два фрагмента.


Первый фрагмент это основная функция рабочей нити и обслуживание заявок из непустых demand_queue:


class dispatcher_t final   :  public std::enable_shared_from_this< dispatcher_t >   {      dispatcher_data_t m_disp_data;      std::thread m_worker_thread;      void      thread_body() noexcept         {            const auto thread_id = so_5::query_current_thread_id();            bool shutdown_initiated{ false };            while( !shutdown_initiated )               {                  std::unique_lock< std::mutex > lock{ m_disp_data.m_lock };                  shutdown_initiated = try_extract_and_execute_one_demand(                        thread_id,                        std::move(lock) );               }         }      [[nodiscard]]      bool      try_extract_and_execute_one_demand(         so_5::current_thread_id_t thread_id,         std::unique_lock< std::mutex > unique_lock ) noexcept         {            do               {                  auto [demand, has_non_empty_queues] =                        try_extract_demand_to_execute();                  if( demand )                     {                        unique_lock.unlock();                        demand->call_handler( thread_id );                        break;                     }                  else if( !has_non_empty_queues )                     {                        m_disp_data.m_wakeup_cv.wait( unique_lock );                     }               }            while( !m_disp_data.m_shutdown );            return m_disp_data.m_shutdown;         }      [[nodiscard]]      std::tuple< std::optional< so_5::execution_demand_t >, bool >      try_extract_demand_to_execute() noexcept         {            std::optional< so_5::execution_demand_t > result;            bool has_non_empty_queues{ false };            if( !m_disp_data.m_head )               return { result, has_non_empty_queues };            auto * dq = m_disp_data.m_head;            m_disp_data.m_head = dq->next();            dq->drop_next();            if( !m_disp_data.m_head )               m_disp_data.m_tail = nullptr;            else               has_non_empty_queues = true;            result = dq->try_extract();            if( !dq->empty() )               {                  if( m_disp_data.m_tail )                     m_disp_data.m_tail->set_next( dq );                  else                     m_disp_data.m_head = m_disp_data.m_tail = dq;               }            return { result, has_non_empty_queues };         }

Второй фрагмент это реализация метода make_disp_binder:


[[nodiscard]]so_5::disp_binder_shptr_tmake_disp_binder(   demand_queue_shptr_t demand_queue )   {      return std::make_shared< actual_disp_binder_t >(            std::move(demand_queue),            dispatcher_data_shptr_t{ shared_from_this(), &m_disp_data } );   }

Важный момент, который стоит здесь пояснить это факт того, что экземпляр dispatcher_data_t хранится в dispatcher_t по значению. Но в методе make_disp_binder в конструктор actual_disp_binder_t передается shared_ptr<dispatcher_data_t>. Тут всего лишь используется трюк c aliasing constructor для std::shared_ptr: хранить shared_ptr будет указатель на объект T, но вот счетчик ссылок будет использоваться от объекта Y. В нашем случае счетчик ссылок от dispatcher_t.


Вот, собственно, и все. Если кто-то хочет взглянуть на реализацию dispatcher_handler и make_dispatcher, то сделать это можно здесь.


Заключение


Данная статья преследует две цели:


Во-первых, хочется показать, что хоть в самом SObjectizer-е приоритетной доставки сообщений нет, но если это кому-то нужно, то это можно сделать самостоятельно. Например, показанным выше способом.


Во-вторых, хочется понять, насколько вообще может быть востребованна подобная функциональность. Если у тех, кто пробовал SObjectizer или же присматривался к SObjectizer-у, время от времени надобность в приоритетной доставке сообщений возникает, то ее можно добавить в so5extra. Или даже в сам SObjectizer. Мы вполне можем это сделать. Но только если такая функциональность действительно востребована.


Так что если у кого-то из читателей есть мнение на этот счет (скажем, есть желание иметь подобный диспетчер в SObjectizer-е прямо "искаропки"), то можно высказаться в комментариях. Обязательно прислушаемся.


Кстати, SObjectizer-5 уже 10 лет


Разработка SObjectizer-5 началась осенью 2010-го года и продолжается до сих пор. Радует и удивляет. А если кому-то интересно что лично я думаю по этому поводу, то можно заглянуть сюда.


Но еще больше меня поражает то, что кто-то берет и использует наш SObjectizer для реализации серьезных проектов несмотря на то, что его делают никому неизвестные люди из белорусской глубинки и за спиной SObjectizer-а нет больших компаний с громкими именами и толстыми кошельками. Вот это удивляет по-настоящему. А еще вселяет в нас уверенность в том, что все это не зря. Так что большое спасибо всем, кто рискнул и выбрал SObjectizer. Если бы не вы, этого маленького юбилея не было бы.

Подробнее..

Проект arataga реальный пример использования SObjectizer и RESTinio для работы с большим количеством HTTP-соединений

18.01.2021 12:13:17 | Автор: admin

В последние 4.5 года я много рассказывал на Хабре про такие OpenSource проекты, как SObjectizer и RESTinio. Но вот об использовании SObjectizer и/или RESTinio в реальных проектах пока еще ни разу не удавалось поговорить (была лишь одна статья от стороннего автора).

Причина простая: мы не можем обсуждать те проекты, в которых мы сами применяли SObjectizer/RESTinio, ибо NDA. Равно как и не можем рассказывать о тех чужих проектах, о которых узнали в частном общении. Так что с наглядными примерами использования SObjectizer/RESTinio в реальной жизни всегда была напряженка.

Дабы как-то улучшить ситуацию пару лет назад мы даже сделали небольшой демо-проект Shrimp и опубликовали здесь серию статей о нем (раз, два, три). Но все-таки это было не более чем демонстрация.

К счастью или к несчастью, но далеко не самый удачный 2020-й год предоставил нам возможность показать как же выглядит реальный проект, в разработке которого SObjectizer и RESTinio активно используются. И в данной статье я попробую рассказать о том, как и для чего SObjectizer и RESTinio применяются в arataga, исходники которого можно найти на GitHub.

Тем, кто хочет больше узнать об arataga и причинах его появления на GitHub-е, рекомендую прочитать этот блог-пост. Здесь же в двух словах скажу лишь, что arataga -- это socks5+http/1.1 прокси-сервер, который затачивался под использование в следующих условиях (перечислены те из них, которые актуальны с точки зрения внутренней архитектуры arataga):

  • много точек входа, счет идет на тысячи (8 тысяч нужно было поддерживать сразу). У каждой точки входа уникальное сочетание IP+port;

  • подключения на одну точку входа могут идти с темпом в несколько десятков в секунду, параллельно на одной точке входа могут "висеть" сотни подключений;

  • одновременно могут существовать десятки тысяч подключений, общий темп появления новых подключений на всех точках входа может быть от 1500 в секунду;

  • аутентификация клиентов должна укладываться в единицы миллисекунд;

  • обновленная конфигурация и новые списки пользователей, которым разрешена работа с прокси-сервером, периодически доставляется через HTTP POST на специальный административный HTTP-вход.

Далее в статье я сосредоточусь на архитектурных и технических моментах, не затрагивая тему "зачем это все было нужно?"

Вероятно, для того, чтобы лучше понимать изложенное ниже, нужно иметь общие представления о SObjectizer-овских агентах, диспетчерах и почтовых ящиках (mbox-ах). Старая обзорная статья может в этом помочь (хотя сам SObjectizer за это время несколько изменился).

Тезисно о принятых проектных и архитектурных решениях

Многопроцессность или многопоточность?

Прежде всего нам предстояло сделать выбор между реализацией arataga в виде мультипроцессного или мультипоточного приложения.

В случае мультипроцессного приложения потребовалось бы иметь процесс-родитель, который бы запускал N дочерних процессов-исполнителей. Процесс-родитель отвечал бы за управление дочерними процессами и за распределение конфигурации/списков пользователей между дочерними процессами. Дочерние же процессы открывали бы точки входа и обслуживали бы входящие соединения от клиентов и исходящие к целевым хостам.

Каждый процесс при этом мог бы быть простым однопоточным процессом (может быть за исключением родительского, в котором могло бы потребоваться больше одного рабочего потока).

Достаточно простая и надежная схема, почти что в духе unix way. Но связываться с ней не очень хотелось из-за большого количества IPC-взаимодействия между родительским и дочерними процессами: в одну сторону должна идти управляющая информация, в другую -- диагностическая.

Вариант в виде многопоточного приложения выглядел проще, т.к. все взаимодействующие сущности оказывались в рамках одного общего адресного пространства и распространение любой информации в любых направлениях не представляло вообще никакой проблемы.

Плюс к тому, у нас в руках был большой молоток, SObjectizer, с помощью которого разрабатывать многопоточные приложения гораздо проще, чем при ручной работе с std::thread, std::mutex и std::condition_variable и т.п. Если выразить основные сущности приложения в виде SObjectizer-овских агентов, распределить этих агентов должным образом по рабочим нитям и организовать их взаимодействие на базе асинхронных сообщений, то ужасы многопоточности можно спокойно обойти стороной. Что, собственно, в очередной раз и произошло.

Сколько рабочих потоков нужно?

Здесь все было понятно изначально, еще даже до начала разработки arataga, поскольку старый прокси-сервер, на замену которого arataga и разрабатывался, использовал модель thread-per-connection. Когда счет одновременно живущих соединений идет на десятки тысяч, то thread-per-connection не есть хорошо.

Поэтому об использовании в arataga схемы thread-per-connection речь не шла вообще. Сразу же был взят курс на применение чего-то похожего на thread-per-core.

Было решено под обслуживание подключений отводить не все доступные вычислительные ядра, а использовать формулу (nCPU-2), где nCPU -- это количество ядер.

Так, если на машине 8 ядер, то в arataga будет создано 6 рабочих потоков, которые будут отвечать за обслуживание соединений. Т.е. работой с трафиком будут заняты 6 ядер. Два оставшихся ядра будут доступны для других рабочих потоков внутри arataga. Плюс и самой ОС нужно что-то оставить, а то не очень приятно удаленно подключаться к работающему серверу по ssh, когда у него все ядра загружены под 100%

Что будет агентом, а что не будет?

Изначально в работе arataga выделялось несколько типов операций, которые должны были выполняться на разных рабочих контекстах:

  • разбор конфигурации. Обработка конфигурации для нескольких тысяч точек входа может занимать некоторое время (например, 5ms). Сюда входит и парсинг/валидация конфигурации, и сохранение ее локальной копии, и удаление устаревших точек входа, которых нет в новой конфигурации, и создание новых точек входа, добавленных в обновленной конфигурации;

  • разбор списка пользователей и формирование структур для представления этого списка в памяти. Это также может занимать некоторое время, поскольку списки пользователей могут насчитывать несколько десятков тысяч строк;

  • обработка точек входа. Т.е. создание серверных сокетов на заданных IP+port, прием новых подключений с учетом ограничений на максимальное количество параллельных подключений и т.д.;

  • обслуживание принятых подключений. Т.е. вычитывание данных от клиента, определение типа протокола, осуществление проксирования подключения в зависимости от типа протокола и команды, пришедшей от клиента.

Первые три операции явно напрашивались на то, чтобы быть представленными в виде отдельных агентов, которые привязываются к тому или иному диспетчеру.

А вот по поводу последнего, т.е. обслуживания уже принятых подключений, можно было поступить по-разному. Можно было попытаться представить каждое подключение в виде отдельного агента.

Но этот подход меня лично не вдохновлял по нескольким причинам.

Во-первых, дело в том, что обслуживание разных команд в рамках одного протокола (не говоря уже про разные протоколы) может существенным образом отличаться. И если попытаться все эти различия запихнуть в одного агента, то этот агент распухнет до неприличия.

Во-вторых, не очень понятно, в чем смысл использовать здесь агента. Взаимодействие с сетью предполагалось делать с использованием Asio и асинхронных операций. Т.е., если вызывается Asio-шный async_read_some, то когда-то Asio вызовет для этой операции completion_handler. И если мне нужно передать результат async_read_some агенту, то в completion_handler нужно будет отослать агенту сообщение, которое агент обработает. Что, вообще-то говоря, не бесплатно, т.к. сперва на каком-то io_context будет запланирован вызов completion_handler, а затем, когда до completion_handler дойдет очередь, будет запланирован вызов обработчика события у агента. И только затем когда-то будет вызван этот самый обработчик. Что наводит на мысль "а не слишком ли много приходится платить за концептуальную чистоту?" Тем более, что по прошлому опыту я знал, что когда операции чтения/записи сокета доставляются до агента в виде сообщений, то как-то кардинально это работу с сетью не упрощает. Может быть даже наоборот.

Добавим сюда еще и то, что регистрация/дерегистрация агентов в SObjectizer все-таки не самая дешевая операция (это связано с механизмом коопераций и гарантиями транзакционности операции регистрации). Конечно, если принимать новые подключения с темпом в 2000-3000 в секунду, то "тормоза" SObjectizer-а здесь не проявятся, SObjectizer может регистрировать/дерегистрировать кооперации с гораздо более высоким темпом. Но все-таки у частого создания/удаления агентов есть своя цена, и если ее можно не платить, то лучше ее не платить.

В результате было принято решение, что агентом будет точка входа, но вот отдельные подключения агентами не будут. Вместо этого, агент, представляющий точку входа, будет обслуживать все подключения к этой точке.

Для чего планировалось использовать RESTinio?

Изначально предполагалось, что посредством RESTinio будет реализован административный HTTP-вход. Причем не в виде SObjectizer-овского агента, а просто единственная отдельная нить, на которой будет работать RESTinio-сервер.

Библиотека RESTinio, действительно, была использована таким очевидным образом. А вот о паре менее очевидных способов применения RESTinio в arataga речь пойдет ниже.

Погрузимся в подробности

Некоторые детали использования SObjectizer

В этой части бегло пройдемся по некоторым моментам использования SObjectizer в коде arataga. Выбор чисто субъективный, мне показалось, что именно они наиболее показательны и/или не обычны.

В рамках одной статьи невозможно обсудить все аспекты применения SObjectizer в проекте arataga, поэтому что-то неизбежно останется "за кадром". Так что если у кого-то из читателей возникнут вопросы, то постараюсь ответить в комментариях.

Агент startup_manager и "глобальный таймер"

Работа arataga начинается с запуска агента startup_manager. Этот агент отвечает за последовательный запуск основных "компонентов" arataga: сперва стартует агент user_list_processor, затем агент config_processor, затем уже запускается отдельная нить с RESTinio-сервером для административного HTTP-входа.

Каждый из запускаемых startup_manager агентов должен затем подтвердить свой успешный запуск. Если этого не произошло за отведенное время, то startup_manager прерывает работу приложения вообще.

В агенте startup_manager можно обнаружить один непривычный трюк: запуск при старте периодического сообщения one_second_timer_t, которое отсылается раз в секунду на специальный mbox.

В какой-то мере это преждевременная оптимизация.

Дело в том, что всем точкам входа нужно время от времени контролировать тайм-ауты выполняемых операций. Например, нужно принудительно закрывать соединения, в которых долго нет никакой активности.

Обычно это выполняется посредством отложенного сообщения: агент отсылает самому себе отложенное на 1 секунду сообщение, а когда оно приходит, то проверяет времена жизни подключений, времена отсутствия активностей в них и т.д. После чего шлет себе следующее отложенное сообщение. Или же вместо перепосылки отложенных сообщений один раз запускает периодическое сообщение.

Но когда агентов, нуждающихся в подобных периодических проверках, будут тысячи, то это будет означать, что раз в секунду будут порождаться тысячи отложенных/периодических сообщений.

Ну и зачем, спрашивается, это делать, если достаточно всего одного сообщения, которое будет отсылаться в отдельный mbox один раз в секунду? И на которое смогут подписаться столько получателей, сколько потребуется.

Вот как раз для того, чтобы каждый агент, обслуживающий точку входа, не запускал свое собственное периодическое сообщение для контроля тайм-аутов, агент startup_manager делает это посредством one_second_timer_t.

Понятие io_thread и ее реализация на базе asio_one_thread-диспетчера

Выше говорилось, что в реализации arataga было решено придерживаться модели, похожей на thread-per-core, при этом (nCPU-2) рабочих потока будет выделено под выполнение операций с сетью. Такие рабочие нити получили условное название io_thread (далее io_thread == "I/O нить" и поэтому "io_thread" будет иметь женский род).

Т.к. работа с сетью в arataga работа идет посредством Asio, то для следования идее thread-per-core было решено сделать так, чтобы на каждой io_thread работал свой собственный экземпляр asio::io_context.

Т.е. нам нужна была рабочая нить, которая создает экземпляр asio::io_context, а затем вызывает для него run(). После чего на этой нити нужно было бы еще как-то разместить и агентов, реализующих точки входа. И чтобы одна и та же нить обслуживала и операции Asio, и события SObjectizer-овских агентов.

В arataga для этих целей просто используются экземпляры asio_one_thread-диспетчера из so5extra. Каждый такой диспетчер -- это отдельный рабочий поток (т.е. io_thread).

Агент config_processor, о котором речь пойдет ниже, создает N экземпляров asio_one_thread-диспетчеров, а затем просто привязывает к этим экземплярам агентов, обслуживающих точки входа.

Агенты authentificator и dns_resolver, их дублирование на каждой io_thread

Подключающихся к arataga клиентов нужно аутентифицировать. Эта задача была поручена отдельному агенту authentificator. Когда агенту, обслуживающему точку входа, нужно аутентифицировать клиента, то отсылается сообщение агенту authentificator, который выполняет аутентификацию и шлет назад результат аутентификации.

Еще агентам, которые обслуживают точки входа, нужно выполнять резолвинг доменных имен. За эту операцию отвечает еще один специальный агент -- dns_resolver. Тут используется подобная схема: агент-точка-входа отсылает агенту dns_resolver сообщение, а тот присылает назад результат поиска доменного имени.

Мне показалось, что если уж мы пытаемся следовать модели thread-per-core, то логичным было бы сделать отдельную копию агентов dns_resolver и authentificator для каждой из io_thread. Поэтому эти агенты создаются сразу же вслед за очередным asio_one_thread-диспетчером и привязываются к только что созданному диспетчеру.

Распределение агентов по диспетчерам

В результате получилась следующая картинка:

У агентов config_processor и user_list_processor есть собственные рабочие потоки, которые реализуются посредством штатного диспетчера SObjectizer под названием one_thread.

Каждая io_thread представлена отдельным диспетчером asio_one_thread из so5extra. И на каждом таком диспетчере работает сразу несколько (десятков, сотен, тысяч) агентов acl_handler.

Агент config_processor

Агент config_processor отвечает за обработку конфигурации arataga, поддержание списка существующих точек входа, их распределение между io_threads, и за создание/удаление точек входа при изменении конфигурации.

При своем старте config_processor пытается прочитать локальную копию конфигурационного файла. Если это удалось, то config_processor принимает поднятую из конфига информацию за текущую конфигурацию и создает описанные в ней точки входа (регистрирует агентов acl_handler).

Если же локальной копии конфигурации нет или же прочитать ее не удалось, то config_processor ждет, пока придет новая конфигурация от административного HTTP-входа.

Когда от административного HTTP-входа поступает новая конфигурация, что после ее успешной обработки config_processor рассылает уведомления об изменении параметров arataga на отдельный multi-producer/multi-consumer mbox. Так изменения в конфигурации становятся доступны всем, кто в них заинтересован.

В работе config_processor с acl_handler есть важная особенность: сейчас в arataga агент acl_handler намертво привязывается к конкретному asio_one_thread и живет на этом asio_one_thread до тех пор, пока не будет удален.

Эта особенность может привести к ситуации, когда из конфигурации удалили, скажем, 1000 точек входа, и в результате на одной из io_threads осталось заметно меньше работающих агентов, чем на других io_threads.

Эта диспропорция может быть устранена при добавлении новых точек входа. В этом случае config_processor при создании новых агентов acl_handler будет привязывать их к тем диспетчерам asio_one_thread, на которых сейчас меньше всего живых acl_handler.

Но если новые точки входа не создаются, то диспропорция останется и config_processor не будет перераспределять старые acl_handler между диспетчерами asio_one_thread. По двум причинам:

  • во-первых, в SObjectizer нет возможности перепривязать агента с одного диспетчера на другой. Поэтому к какому диспетчеру агента изначально привязали, на таком он и будет продолжать работать;

  • во-вторых, в каждом агенте acl_handler может существовать множество Asio-шных объектов, завязанных на конкретный экземпляр asio::io_context. Если перемещать содержимое acl_handler с одной io_thread на другую, то нужно будет и переконструировать эти Asio-шные объекты.

Чтобы не возиться со всем этим добром было решено смириться с такими диспропорциями в первой версии aragata. Может на практике они вообще не будут столь серьезными, чтобы оказывать влияние на производительность. Ну а если таки будут, вот тогда можно было бы и озадачиться проблемой миграции точки входа с одной io_thread на другую.

Агент user_list_processor

Агент user_list_processor отвечает за работу со списком пользователей.

При своем старте user_list_processor пытается прочитать локальную копию списка пользователей. Если это удалось, то локальная копия становится актуальным списком. Если не удалось, то user_list_processor ждет поступления нового списка от административного HTTP-входа.

Когда от административного HTTP-входа прилетает новый список пользователей, то user_list_processor пытается его обработать. И, если это получается успешно, то обновленный список рассылается на отдельный multi-producer/multi-consumer mbox на которые подписаны агенты authentificator. Что позволяет authentificator-ам получать обновленные списки пользователей сразу после того, как user_list_processor завершит их обработку.

Агент acl_handler

Агент acl_handler является, наверное, самым сложным и объемным агентом в arataga (hpp-файл, cpp-файл). Его задача -- это создать серверный сокет для точки входа, принимать и обслуживать подключения к этому серверному сокету.

В связи с этим в рамках данной статьи можно выделить несколько аспектов в реализации acl_handler.

Во-первых, acl_handler инициирует асинхронные I/O операции с помощью Asio, в частности, вызывает async_accept у Asio-шного asio::ip::tcp::acceptor. В async_accept передается completion-handler в виде лямбды, где напрямую вызываются методы агента. Такие вызовы безопасны потому, что агент привязан к asio_one_thread диспетчеру. Этот диспетчер специально создан для того, чтобы на одном рабочем контексте можно было работать и с агентами, и с Asio. Если бы acl_handler привязывался к какому-то другому диспетчеру (например, к штатному one_thread-диспетчеру), то этого делать было бы нельзя.

Во-вторых, логика у acl_handler достаточно тривиальная, но даже и здесь нашлось место для применения иерархического конечного автомата (которые в SObjectizer-е уже довольно давно поддерживаются):

Иерархический автомат потребовался здесь потому, что acl_handler должен ограничивать количество одновременных подключений к точке входа. Поэтому, когда это количество достигает разрешенного максимума, агент переходит в состояние st_too_many_connections, в котором он делает практически всю свою обычную работу, за исключением новых вызовов async_accept.

В-третьих, агент acl_handler не занимается сам обслуживанием принятых подключений и обработкой протоколов socks5/http. Вместо этого для каждого соединения создается т.н. connection_handler. Это объект, в который отдается принятое подключение и который уже с этим подключением непосредственно работает. Т.е. читает из него данные, пытается определить протокол, пытается обслуживать подключение согласно того или иного протокола.

Т.е. когда в результате async_connect у acl_handler появляется новый asio::ip::tcp::socket, то создается объект, реализующий интерфейс connection_handler_t, и новый socket отдается этому только что созданному connection_handler-у. Больше про новое подключение acl_handler ничего не знает. Далее acl_handler лишь хранит у себя умный указатель на connection_handler и лишь время от времени дергает у connection_handler-а метод on_timer (тот самый "глобальный таймер" о котором речь шла выше). А вот connection_handler дальше живет своей собственной и весьма непростой жизнью.

Вообще говоря, как раз тот кусок arataga, который относится к connection_handler-ам, является самым мудреным в проекте. Вероятно, далеко не самым удачным. И, скорее всего, именно он мог бы стать первым кандидатом на рефакторинг по итогам опытной эксплуатации. Поскольку первая версия arataga создавалась в высоком темпе, в условиях сжатых сроков, то ничего лучше сходу придумать не удалось. Возможно, здесь следовало бы применить stackful-короутины. Но экспериментировать с ними не было времени. Так что, если история с arataga получит продолжение (что вряд ли, но вдруг), к этому куску arataga нужно будет сделать еще один, более вдумчивый подход. Пока что получилось то, что получилось :(

Использование retained_mbox для распространения конфигурации

Примечательным моментом использования SObjectizer-овских mbox-ов в arataga является применение retained_mbox из so5extra для распространения информации о текущей конфигурации и списках пользователей.

Особенностью retained_mbox является то, что retained_mbox хранит у себя последние отосланные сообщения (по одному для каждого типа отосланных сообщений). И когда кто-то делает новую подписку на некий тип сообщения, то retained_mbox сразу же после завершения подписки отсылает этому подписчику последнее сохраненное сообщение этого типа.

В arataga это свойство retained_mbox используется для того, чтобы агенты authentificator сразу же могли получить текущий список пользователей. Сценарий работы такой:

  • стартует агент user_list_processor. Он читает локальную копию списка пользователей и отсылает сообщение с этим списком в retained_mbox. На этом этапе подписчиков у retained_mbox еще нет, но нас это не волнует;

  • затем стартует агент config_processor, который читает локальную копию конфигурации и создает io_threads вместе с authentificator-ами;

  • каждый запущенный authentificator должен получить список пользователей, для чего authentificator-ы подписываются на соответствующее сообщение из retained_mbox-а;

  • поскольку в retained_mbox уже есть сообщение со списком пользователей, то это сообщение сразу же отсылается каждому новому подписчику.

Благодаря этому authentificator-у при старте не нужно ни у кого ничего явно запрашивать.

Правда, платить за это нужно тем, что authentificator вынужден делать подписку в so_evt_start, а не в предназначенном для этого so_define_agent.

Некоторые детали использования RESTinio

Теперь попробуем пробежаться по нескольким аспектам применения RESTinio в arataga. Тем более, что одно из применений было очевидным, тогда как одно совсем не очевидным.

Первое применение RESTinio, самое очевидное: административный HTTP-вход

Очевидное применение, ради которого RESTinio и подтягивался в проект arataga, -- это реализация административного HTTP-входа. Как раз здесь простота асинхронной обработки входящих запросов посредством RESTinio и была нужна с самого начала.

В этом месте RESTinio используется классическим способом. Запускается RESTinio-сервер, этот сервер принимает входящие запросы, выполняет их первичную валидацию. После чего пересылает запрос во внутрь arataga и ждет ответа. Когда ответ сформирован, то RESTinio отсылает его клиенту.

Упомянуть здесь можно разве лишь то, что для взаимодействия RESTinio- и SObjectizer-частей приложения были сделаны вспомогательные интерфейсы (раз, два). Когда startup_manager запускает RESTinio-сервер, то отдает серверу реализацию интерфейса requests_mailbox_t. RESTinio-сервер через этот интерфейс передает в SObjectizer-часть arataga входящие запросы. И каждый входящий запрос сопровождается реализацией интерфейса replier_t. Посредством этого интерфейса реальные обработчики запроса могут отвечать RESTinio-серверу.

Два эти интерфейса скрывают от RESTinio-части существование SObjectizer-части arataga и наоборот. Что мне показалось полезным. Хотя бы плане сокращения времени компиляции отдельных cpp-файлов.

Второе применение RESTinio, не очевидное, но вполне ожидаемое: работа с HTTP-заголовками при проксировании HTTP-соединений

Второе применение стало для меня лично неожиданным, хотя оно было вполне себе логичным. Дело в том, что применить RESTinio в его текущем виде для проксирования HTTP-соединений не представляется возможным в силу нынешних архитектурных особенностей RESTinio.

Поэтому работу с проксируемыми HTTP-соединениями в arataga пришлось делать "с нуля". И одной из составляющей этой работы является разбирательство с HTTP-заголовками в запросах/ответах, а также с содержимым некоторых из них.

И вот когда у меня руки дошли до обработки HTTP-заголовков в проксируемых соединениях, то я внезапно (c) осознал, что в RESTinio уже есть инструментарий для этого. Чем и воспользовался, хотя изначально о таком даже и не думал.

Третье применение RESTinio, совсем не очевидное: синтаксический разбор конфигурации

А вот третье, совсем не очевидное, применение RESTinio не то, чтобы было сразу мной задумано. Но такой вариант начал просматриваться чуть ли не с самого начала.

В arataga нужно было парсить два типа конфигурационных файлов (собственно конфигурация и список пользователя). У каждого из которых был свой, уже устоявшийся к тому времени синтаксис.

Плюс к тому в этот синтаксис хотелось внести небольшие, но важные с точки зрения простоты использования, дополнения. Вроде суффиксов, которые бы обозначали объем данных или времена таймаутов. Например:

acl.io.chunk_size 16kib # Вместо 16384timeout.authentification 1200ms # Вместо 1200timeout.connect_target 7s # Вместо 7000

Ну и для того, чтобы не делать парсинг конфигов вручную только штатными средствами стандартной библиотеки C++ (очень куцей в этом плане), нужно было задействовать какой-то генератор парсеров. А зачем тащить в проект еще что-то, если нечто подобное уже есть в RESTinio?

Так что easy_parser из RESTinio, который изначально появился в RESTinio для упрощения разбора HTTP-заголовков, пригодился в arataga и для разбора конфигурации.

Любопытный, на мой взгляд, пример использования easy_parser-а из RESTinio можно найти здесь. Именно этот transfer_speed_p() используется для реализации команд конфигурации вроде:

bandlim.in 850kibbandlim.out 700kib

Заключение

Реальные проекты, подобные arataga, являются отличным полигоном, на котором разработчик библиотеки может увидеть насколько удобна его библиотека вне тепличных условий уютного манямирка. Как правило, при использовании библиотеки в новом проекте отлично проявляются просчеты в дизайне библиотеки и/или же обнажаются подводные камни, о которых ты пока что даже и не подозревал. Из чего затем вырастают вполне себе конкретные идеи о том, куда следует двигаться дальше.

С RESTinio это сработало на 100%. Опыт применения RESTinio в arataga дал пинка под за толчок в сторону разработки пусть приблизительного, но аналога middleware из ExpressJS. Первый результат уже доступен в 0.6.13. И, если пойти на слом API в ветке 0.7, то можно будет поддержать и цепочки асинхронных обработчиков.

Но еще более важным итогом использования RESTinio в arataga стало изменение лично моих взглядов на позиционирование RESTinio в мире аналогичных фреймворков для C++.

Так что для RESTinio испытание arataga оказалось исключительно полезным.

А вот с SObjectizer-ом получилось скучно и обыденно. Он просто работал. Ничего не пришлось подкручивать или добавлять. Кой-какие мысли о потенциально привлекательных направлениях дальнейшего движения появились. Но пока не вполне оформившиеся и не имеющие однозначно четкого подтверждения того, что это реально необходимо делать.

Я же надеюсь, что данная статья даст читателям, которые с любопытством посматривают на RESTinio и/или SObjectizer(+so5extra), но пока что сами эти проекты не пробовали, возможность взглянуть на реальный код, написанный с применением наших инструментов. И сделать собственные выводы о том, нравится ли увиденное или нет.

Что касается перспектив самих RESTinio и SObjectizer/so5extra, то тут все не так однозначно и без внешней поддержки, боюсь, их развитие приостановится. Впрочем, это уже совсем другая история

Подробнее..

Sobjectizer Можно ли написать один обработчик сразу для нескольких типов сообщений? И если нет, то как быть?

16.02.2021 14:15:09 | Автор: admin

Статья написана по следам недавнего вопроса, который можно сформулировать следующим образом: "Можно ли в SObjectizer написать обработчик, который бы обрабатывал сразу нескольких типов сообщений?"

Вопрос интересный.

Автор вопроса любезно описал свой сценарий: ему нужно собирать изображения с множества промышленных камер, а затем эти изображения должны проходить по цепочке блоков обработки изображений. Используются разные типы камер, соответственно, каждое изображение имеет собственный формат и может иметь кучу сопутствующей специфической информации. Какие-то блоки обработки рассчитаны на работу только с изображениями определенного формата. Какие-то могут работать сразу с несколькими форматами. Какие-то блоки вообще безразличны к типу изображения (например, блок считает общее количество прошедших изображений).

Если изображения передаются в виде SObjectizer-овских сообщений, а блоками обработки являются SObjectizer-овские агенты, то можно ли сделать как-то так:

void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image_vendor_A & cmd) {}) // Изображение типа A.    .event([this](const image_vendor_B & cmd) {}) // Изображение типа B.    .event([this](any_other_image_type) {      // Отказываемся обрабатывать другие типы.      throw unsupported_image_type{};    });}void image_counter::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](any_image_type) { // Тип изображения не важен.      ++captured_images_;    });}

Итак, сценарий понятен. Давайте поговорим насколько он реализуем в SObjectizer.

Так можно ли в SObjectizer повесить один разработчик сразу на несколько типов сообщений?

Нет. Написать что-то вроде:

void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image_vendor_A & cmd) {...}) // Изображение типа A.    .event([this](const image_vendor_B & cmd) {...}) // Изображение типа B.    .event([this](any_other_image_type) {      // Отказываемся обрабатывать другие типы.      throw unsupported_image_type{};    });}

в текущем SObjectizer-5 нельзя. В принципе.

Во-первых, в SObjectizer-5 ключем для поиска обработчика сообщения является триплет из состояния агента, идентификатора почтового ящика (mbox-а) и типа сообщения.

Грубо говоря, когда есть вот такой агент:

class demo final : public so_5::agent_t {  so_5::state_t st_free{this};  so_5::state_t st_busy{this};    const so_5::mbox_t command_board_;  ...  void so_define_agent() override {    st_free // Подписки для состояния st_free.      .event([this](const some_msg &) {...})      .event(command_board_, [this](const report_status &) {...});        st_busy // Подписки для состояния st_busy.      .event(command_board_, [this](const report_status &) {...});  }  ...};

то информация о сделанных вso_define_agentподписках может быть представлена приблизительно такой картинкой (на самом деле там все несколько хитрее, но общая схема именно такая):

Таблица подписок агентов с ссылками на обработчики сообщенийТаблица подписок агентов с ссылками на обработчики сообщений

Соответственно, когда для агента приходит сообщение, то формируется триплет из текущего состояния агента, идентификатора mbox-а из которого сообщение пришло и типа самого сообщения. После чего в таблице подписок агента ищется вхождение этого триплета.

Если в таблице подписок триплет найден, то сообщение обрабатывается. Если нет, то отбрасывается (не совсем так, но для простоты будем считать что просто отбрасывается).

Поиск в большой таблице подписок по составному ключу хорош тем, что он позволяет эффективно обрабатывать большое количество подписок. Скажем, нужно агенту подписаться на одно сообщение из 100500 mbox-ов? Нет проблем. Будет большая таблица подписок с эффективным поиском обработчиков в ней.

Необработанные сообщения в SObjectizer-е просто выбрасываются. Нет специальных обработчиков для подобных сообщений, нет никаких mbox-ов, в которые бы подобные сообщения пересылались бы... Ничего подобного нет.

Корни этого решения уходят на десятилетия назад в буквальном смысле. Подобная логика была использована еще в предтече SObjectizer-а, проекте SCADA Objectizer, который создавался под нужды АСУТП в середине 1990-х. И в котором именно такая логика и нужна была: если агент не заинтересован в каком-то сообщении в своем текущем состоянии, то это сообщение безжалостно выбрасывается.

Эта логка отлично работала на протяжении 25 лет. И ситуаций, когда хотелось бы как-то обрабатывать проигнорированные сообщения за эти годы встречалось не очень много. А для случаев, когда в этом был смысл, в SObjectizer-5 были добавлены т.н. deadletter handler-ы.

Так что можно спорить о том, оправдан ли такой жесткий подход к проигнорированным сообщениям или нет. Но сейчас дела обстоят именно так. Так что если на конкретный тип сообщение нет подписки, то это сообщение просто выбрасывается.

Если нельзя, но очень нужно, то как?

Итак, SObjectizer не позволяет сделать так, чтобы какой-то агент из множества сообщений типа image_vendor_A, image_vendor_B,image_vendor_C,image_vendor_D и т.д. мог бы подписаться лишь на image_vendor_Aи image_vendor_B, а все остальные сообщения image_vendor_* обрабатывать каким-то одним обработчиком.

Но если нам нужна именно такая логика, то как же нам быть?

Пожалуй, единственный выход -- это иметь один тип сообщения для всех изображений.

Самый простой вариант, который лично мне сразу приходит в голову -- это использовать ООП-иерархии, т.е. ввести базовый тип для изображения:

class image_base {public:virtual ~image_base() = default;  ... // Какой-то набор общих для всех изображений методов.};

от которого уже будут наследоваться конкретные типы изображений:

class image_vendor_A : public image_base {...};class image_vendor_B : public image_base {...};class image_vendor_C : public image_base {...};

Затем вводится тип сообщения image в котором конкретный экземпляр сообщения передается по указателю (для простоты приведем в примере shared_ptr):

struct image {std::shared_ptr<image_base> image_;};

Соответственно, агенты будут подписываться на сообщение image, а уже с конкретным типом изображения они будут разбираться внутри обработчика тем или иным способом:

void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image & cmd) {      if(auto * p = dynamic_cast<image_vendor_A*>(cmd.image_.get())) {        ... // Изображение типа A.      }      else if(auto * p = dynamic_cast<image_vendor_B*>(cmd.image_.get())) {        ... // Изображение типа B.      }      else {        // Отказываемся обрабатывать другие типы.      throw unsupported_image_type{};      }    });}...void image_counter::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image &) { // Тип изображения не важен.      ++captured_images_;    });}

Конечно, вариант с ручными dynamic_cast-ами выглядит криво и в реальном коде, как по мне, лучше было бы использовать паттерн visitor. Но для иллюстрации мысли вполне сгодится.

Парочка вариаций на эту тему

Если кому-то не нравится иметь сообщение image, внутри которого будет лежать shared_ptr на экземпляр изображения, то можно воспользоваться одной из нижеописанных вариаций.

Использование std::variant

Когда список типов изображений конечен и зафиксирован на этапе компиляции, то в качестве типа сообщения можно задействовать std::variant:

class image_vendor_A {...};class image_vendor_B {...};class image_vendor_C {...};...class image_vendor_Last {...};using image = std::variant<  image_vendor_A,image_vendor_B,image_vendor_C,...  image_vendor_Last>;void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image & cmd) {      ... // Какой-то способ работы с std::variant.    });}...void image_counter::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image &) { // Тип изображения не важен.      ++captured_images_;    });}

Отсылка сообщений по базовому типу

Если же список типов изображений на этапе компиляции не зафиксирован (например, может расширяться со временем по мере реализации новых модулей), то можно использовать такой трюк, как отсылку сообщения по его базовому типу:

class image : public so_5::message_t { // Наследование от message_t важно.public:... // Какие-то общие для всех изображений свойства.};class image_vendor_A : public image {...};class image_vendor_B : public image {...};...// Сперва создаем экземпляр конкретного типа...so_5::message_holder_t<image_vendor_A> msg{  std::make_unique<image_vendor_A>(...)  };// ...а затем отсылаем его так, как будто его тип -- image.so_5::send<image>(std::move(msg));

Фокус здесь в том, что при отсылке в качестве типа сообщения фиксируется именно тот тип, который был задан в виде первого шаблонного параметра для so_5::send. Так, если мы пишем so_5::send<image>, то типом сообщения внутри SObjectizer-а будет считаться именно image, а не image_vendor_A.

Вот здесь можно найти полный код примера, который иллюстрирует этот трюк.

А есть ли надежда на появление такой фичи?

С одной стороны, ответ на этот вопрос прост и уже традиционен в последние несколько лет: в SObjectizer сейчас попадает лишь то, в чем возникла необходимость. Наполнение SObjectizer-а фичами просто "шоб було" уже давно закончилось.

Так что если это кому-то нужно, то следует нам об этом сказать. Крайне желательно с описанием сценария, в котором вам такая функциональность была бы полезна.

Но, с другой стороны, добавить подобную функциональность в SObjectizer не так-то просто. По крайней мере первые прикидки пока не подсказали каких-то хороших способов достижения цели.

Не сработало: явная отметка возможности сделать upcast для типа сообщений

Был сделан неудачный подход вот с такой идеей: пусть пользователя будет указывать для некоторых типов сообщений возможность upcasting-а к базовому типу.

Т.е. для примера с изображениями это могло бы выглядеть, например, так:

class image_base : public so_5::upcastable_message_root_t<image_base>{... // Какие-то общие для всех изображений свойства.};class image_vendor_A  : public so_5::upcastable_message_t<image_vendor_A, image_base>{...};class image_vendor_B  : public so_5::upcastable_message_t<image_vendor_B, image_base>{...};...

Когда к агенту прилетает сообщение, у которого в базовых классах есть so_5::upcastable_message<T>, то поиск обработчика выполняется по более сложной процедуре:

  1. Сперва ищется обработчик для актуального типа сообщения. Если найден, то он вызывается и цикл поиска обработчика прерывается.

  2. Если обработчик не найден, то берется тип, к которому можно сделать upcasting. Если такой тип есть, то идем к шагу 1. Если же иерархия upcastable-сообщений закончилась, но обработчик так и не найден, то цикл поиска обработчика прерывается.

При таком алгоритме поиска обработчиков если агент делает подписку вот так:

void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image_vendor_A & cmd) {... /* (1) */})    .event([this](const image_base &) {... /* (2) */});}

то для сообщения image_vendor_A будет найден обработчик (1), а для сообщения image_vendor_B будет найден обработчик сообщения (2).

Данный подход выглядит привлекательным и вроде бы он даже не несет очень уж больших накладных расходов при диспетчеризации сообщений (фактически, добавляется один дополнительный if в начале процедуры поиска обработчика). Более того, этот подход интересен еще и тем, что он (вроде бы) может использоваться и при работе с mchain-ами.

Но засада встретилась там, где не ждали.

На самом деле есть несколько таблиц подписки

Выше показывалась таблика подписок для агента. Но это не единственная таблица подписок, которая участвует в диспетчеризации сообщений.

Еще одна таблица, но более простая, хранится в mbox-е. Поэтому полная картина подписок выглядит как-то так:

Т.е. обычный mbox знает на какие типы сообщений у него есть подписчики. Например, на сообщение типа M1 подписаны агенты Alice и Bob, а на сообщение M2 -- только Bob.

Благодаря такой информации mbox знает, что когда в него отсылают сообщение M1, то это сообщение должно быть доставлено и Alice, и Bob-у. А вот когда отсылают сообщение M2, то оно доставляется только Bob-у.

Так вот, засада в том, что когда агент подписывается вот таким образом:

void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image_vendor_A & cmd) {... /* (1) */})    .event([this](const image_base &) {... /* (2) */});}

то у mbox-а image_mbox_ есть информация только о подписке на сообщение типа image_vendor_Aи на сообщение типа image_base. Подписок на сообщения других типов для этого агента у image_mbox_ нет.

Соответственно, если в image_mbox_ отсылается image_vendor_B, то это сообщение агенту вообще отправлено не будет.

И это более чем серьезная засада. Особенно с учетом того, что mbox-ы в SObjectizer-е предназначены в том числе и для того, чтобы разработчики могли создавать свои специфические mbox-ы под свои собственные нужды (примеры специализированных mbox-ов можно найти в so5extra). И заставить разработчиков принимать во внимание наличие сообщений, которые могут быть приведены к базовому типу... Как-то уж это все слишком уж. Слишком уж геморрно, что ли.

Что еще можно было бы попробовать: принудительный upcasting к базовому типу

Есть еще одна мысль, которая выглядит вполне себе реализуемой, но которая еще не проверялась на практике (и вряд ли будет в ближайшее время). Здесь так же используется наследование от специальных типов для того, чтобы SObjectizer знал откуда и куда можно делать upcasting:

class image_base : public so_5::upcastable_message_root_t<image_base>{... // Какие-то общие для всех изображений свойства.};class image_vendor_A  : public so_5::upcastable_message_t<image_vendor_A, image_base>{...};class image_vendor_B  : public so_5::upcastable_message_t<image_vendor_B, image_base>{...};...

При отсылке любого сообщения SObjectizer смотрит на то, можно ли для сообщения сделать upcasting. Так, если отсылается сообщение image_vendor_A, то SObjectizer уже в компайл-тайм понимает, что здесь возможен upcasting до типа image_base. Поэтому внутри send-а SObjectizer выполняет приведение к базовому типу и отсылает сообщение как имеющее тип image_base, а не image_vendor_A.

Далее, когда агент подписывается на сообщение какого-то типа, то SObjectizer опять смотрит на то, можно ли для этого сообщения делать upcasting. Если можно, то SObjectizer делает хитрую подписку: в таблицы подписки добавляется самый верхний тип, к которому можно делать upcasting.

Допустим, что агент infrared_image_processor подписывается на сообщение image_vendor_A иimage_vendor_Bиз почтового ящика incoming_images. SObjectizer понимает, что здесь возможен upcasting до image_base. Поэтому в почтовый ящик добавляется подписка для сообщения image_base, а не image_vendor_A или image_vendor_B. В таблицу подписок агента так же добавляется подписка только на image_base, но в этом случае подписывается не простой обработчик сообщения, а специальный:

Таблицы подписок в случае с принудительным upcasting-ом к базовому типуТаблицы подписок в случае с принудительным upcasting-ом к базовому типу

Этот специальный обработчик берет экземпляр поступившего сообщения и смотрит, относится ли оно к типу image_vendor_A (или производному от него). Если относится, то вызывает обработчик для image_vendor_A. Если же сообщение относится к типу image_vendor_B,то вызывается обработчик для image_vendor_B. Если же ни одно из условий не выполнилось, то сообщение игнорируется.

Причем все эти фокусы с upcasting-ом SObjectizer делает только в том случае, если сообщение это допускает (т.е. наследуется от so_5::upcastable_message_t<T>). Если же используются обычные сообщение, то никакой хитрой магии SObjectizer не делает.

Вместо заключения

Так уж получилось, что я занимаюсь SObjectizer-ом уже очень давно. А когда работаешь над одним и тем же долгое время, то рано или поздно ты оказываешься в ситуации, когда твои же проектные решения, но принятые много-много лет назад, обнаруживают свои последствия вот прямо здесь и сейчас.

Описанный в статье сценарий обработки сообщений невозможен в текущем варианте SObjectizer-а как раз из-за проектных решений, принятых мной сперва в 2002-ом году в SObjectizer-4, а затем и в 2010-ом году в SObjectizer-5...

А теперь важный вопрос, ради которого эта статья и была написана: нужна ли подобная функциональность кому-нибудь из попробовавших SObjectizer читателей? Следует ли внести ее в список хотелок для будущих версий SObjectizer-а?

PS. Возможно читатели, которые интересуются нашими открытыми проектами RESTinio и SObjectizer (+so5extra), уже знают, что мы вынуждены приостановить их развитие. К сожалению, целевое финансирование для этих открытых проектов найти не удалось. Поэтому мы постараемся поднакопить средства на заказных разработках, чтобы затем вернуться к работам над RESTinio/SObjectizer/so5extra. И если кому-то нужна помощь опытных разработчиков, то у нас как раз есть пара свободных рук. Не самых плохих ;)

Подробнее..

Обзор последних изменений в rotorе (v0.10 v0.14)

20.02.2021 08:20:29 | Автор: admin

actor system


rotor ненавязчивый С++ акторный микрофремворк с возможностью создания иерархий супервайзеров, похожий на своих старших братьев caf и sobjectizer. В последних релизах с момента последнего анонса накопились существенные улучшения, которые хотелось бы осветить.


Общий интерфейс для таймеров (v0.10)


Таймеры сами по себе вездесущи во всех акторных фрейморках, т. к. они делают программы более надёжными. До v0.10 не было API для того, чтобы взвести таймер; это можно было сделать только посредством доступа к низлежащему движку событий (event loop) и использованию соответствующего API, разного для разных движков. Это было не очень удобно и ломало абстракции: кроме доступа к API движка, нужно было в обработчике таймера использовать низкоуровневый API rotor'а, чтобы работала доставка сообщений. Кроме того, отмена таймера также имеет свои особенности в каждом цикле событий, что захламляло ненужными деталями логику работы актора.


Начиная с v0.10 в rotor'е, можно делать что-то вроде:


namespace r = rotor;struct some_actor_t: r::actor_base_t {    void on_start() noexcept {        timer_request = start_timer(timeout, *this, &some_actor_t::on_timer);    }    void on_timer(r::request_id_t, bool cancelled) noexcept {        ...;    }    void some_method() noexcept {        ...        cancel_timer(timer_id);    }    r::request_id_t timer_id;};

Надо отметить, что к моменту завершения работы актора (shutdown_finish), все взведённые таймеры должны сработать или быть отменены, иначе это ведёт к неопределённому поведению (undefined behavior).


Поддержка отмены запросов (v0.10)


По-моему мнению, у всех сообщений в caf семантика "запрос-ответ", в то время как в sobjectizer'е все сообщения имеют обычную "отправил-и-забыл" ("fire-and-forget") семантику. rotor поддерживает оба типа, причём по-умолчанию сообщения являются "отправил-и-забыл", а "запрос-ответ" можно сделать поверх обычных сообщений.


В обоих фреймворках, caf и sobjectizer, у каждого актора есть управляемая очередь сообщений, что обозначает, что фреймворк не доставляет новое сообщение, пока предыдущее не было обработано. В противоположность этим фреймворкам, в rotor'е нет управляемой очереди сообщений, что обозначает, что актор сам должен создать свою собственную очередь и заботиться о перегрузках при необходимости. Для мгновенно обрабатываемых сообщений типа "пинг-понг" это не имеет особого значений, однако для "тяжёлых" запросов, которые в процессе своей обработки делают ввод-вывод (I/O), разница может быть существенна. Например, если актор опрашивает удалённую сторону через HTTP-запросы, нежелательно начинать новый запрос, пока предыдущий ещё не закончился. Ещё раз: сообщение не доставлено, пока предыдущее не обработано, и не важно, что за типа сообщения.


Это также обозначает, что для управляемых очередей сообщений отмена запросов в общем случае невозможна, т.к. сообщение-отмена находится в очереди, и у него нет шансов быть обработанным до тех пор, пока предыдущее сообщение-запрос не будет обработано.


В rotor'е, при необходимости, надо создавать собственную очередь запросов, и, если сообщение-отмена поступает, то актор должен либо поискать в очереди и отменить его, либо, если сообщение уже обрабатывается, отменить соответствующие операции ввода-вывода, и, в конечном итоге, ответить на исходный запрос со статусом "отменено". Нужно заметить, что только сообщения-запросы могут быт отменены, т. к. у них есть внутренний идентификатор.


namespace r = rotor;namespace payload {struct pong_t {};struct ping_t {    using response_t = pong_t;};} // namespace payloadnamespace message {using ping_request_t = r::request_traits_t<payload::ping_t>::request::message_t;using ping_response_t = r::request_traits_t<payload::ping_t>::response::message_t;using ping_cancel_t = r::request_traits_t<payload::ping_t>::cancel::message_t;} // namespace messagestruct some_actor_t: r::actor_base_t {    using ping_ptr_t = r::intrusive_ptr_t<message::ping_request_t>;    void on_ping(ping_request_t& req) noexcept {        // just store request for further processing        ping_req.reset(&req);    }    void on_cancel(ping_cancel_t&) noexcept {        if (req) {            // make_error is v0.14 feature            make_response(*req, make_error(r::make_error_code(r::error_code_t::cancelled)));            req.reset();        }    }    // простейшая "очередь" из одного сообщения.    ping_ptr_t ping_req;};

Вообще говоря, отмена запросов может быть сделана и в sobjectizer'е, однако, там, во-первых, нужно сделать собственный механизм "запрос-ответ", и, во-вторых, свою собственную очередь поверх той, что уже предоставляется sobjectizer'ом, т. е. ценой дополнительных накладных расходов. Впрочем, по-моему мнению, парадигма "запрос-ответ" несколько инородна для sobjectizer'а, поскольку он скорее ориентирован на блокирующие операции в обработчиках сообщений, которые, конечно, не могут быть отменены, вне зависимости от используемого фреймворка.


std::thread backend/supervisor (v0.12)


Это давно ожидаемая возможность, которая делает rotor похожим на sobjectizer: в случае, когда актору нужно осуществить блокирующие операции и ему не нужен никакой цикл событий. Например, обработчик сообщений должен выполнить интенсивные вычисления на центральном процессоре.


Очевидно, что во время блокирующих операций нет возможности сработать таймерам или другим сообщениям быть доставленными. Другими словами, во время блокирующих операций актор теряет свою реактивность, так как он не может реагировать на входящие сообщения. Чтобы преодолеть это, блокирующие операции должны быть разбиты на более мелкие итеративные куски; а когда актор закончит обрабатывать текущий кусок работы, он посылает себе сообщение с номером следующего куска для обработки, и так далее, пока все части не будут обработаны. Это даст текущему потоку выполнения немного воздуха, чтобы доставить другие сообщения, выполнить код, связанный с истекшими таймерами и т.п. Например, вмето того, чтобы вычислять свёртку sha512 для всего файла размером 1TB, задание может быть разделено на вычисление свёрток помегабайтово, что сделает поток вычисления достаточно реактивным. Данный подход универсален и применим для любого актороного фреймворка.


Само собой разумеется, что целая иерархия акторов может быть запущена на std::thread бэкенде, не только один актор. Следующий нюанс, на который следует обратить внимание, это то, что rotor'у надо подсказать, какие обработчики сообщений "тяжёлые" (блокирующие), чтобы обновить таймеры после этих обработчиков. Это делается во время подписки, т.е.:


struct sha_actor_t : public r::actor_base_t {    ...    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        r::actor_base_t::configure(plugin);        plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {            p.subscribe_actor(&sha_actor_t::on_process)->tag_io(); // важно        });    }

Полный исходный код реактивного актора, который вычисляет свёртку sha512, который реагирует на CTRL+c, доступен по ссылке.


Идентификация Акторов (v0.14)


Канонический способ идентификации актора по основной адресу (в rotor'е у актора может быть несколько адресов, аналогично как в sobjectizer'е агент может быть подписан на несколько mbox'ов). Однако, иногда желательно вывести в лог имя актора, который завершил работу, в соответствующем колбеке в его супервайзере:


struct my_supervisor_t : public r::supervisor_t {    void on_child_shutdown(actor_base_t *actor) noexcept override {        std::cout << "actor " << (void*) actor->get_address().get() << " died \n";    }}

Адрес актора динамичен и меняется при каждом запуске программы, так что эта информация почти бесполезна. Чтобы она имела смысл, актор должен напечатать свой основной адрес, где, например, в перекрытом методе on_start(). Однако, это решение не очень удобно, поэтому было решено ввести свойство std::string identity непосредственно в базовый класс actor_base_t. Таким образом, идентичность актора может быть выставлена в конструкторе или во время конфигурации плагина address_maker_plugin_t:


struct some_actor_t : public t::actor_baset_t {    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) {            p.set_identity("my-actor-name", false);        });        ...    }};

Теперь можно выводить идентичность актора в его супервайзере:


struct my_supervisor_t : public r::supervisor_t {    void on_child_shutdown(actor_base_t *actor) noexcept override {        std::cout << actor->get_identity() << " died \n";    }}

Иногда акторы уникальны в одной программе, а иногда сосуществуют несколько экземпляров одного и того же класса акторов. Чтобы различать между их, адрес каждого может быть добавлен к имени актора. В этом и смысл второго параметра bool в методе set_identity(name, append_addr) выше.


По умолчанию идентичность актора равна чему-то вроде actor 0x7fd918016d70 или supervisor 0x7fd218016d70. Эта возможность появилась в rotor'е начиная с версии v0.14.


Extended Error вместо std::error_code, shutdown reason (v0.14)


Когда случается что-то непредвиденное в процессе обработки запроса, до версии v0.14, ответ содержал код ошибки в виде std::error_code. Такой подход хорошо служил своей цели, но, ввиду, иерархичной природы rotor'а, этого оказалось не достаточно. Представим себе случай: супервыйзер запускает двух акторов, и у одного из них произошёл сбой в инициализации. Супервайзер экскалирует проблему, т. е. выключается сам и шлёт запрос на выключение второму актору. Однако, на момент, когда второй актор выключился, исходный контекст уже был потерян, и совершенно неясно, почему он выключился. А причина кроется в том, что std::error_code не содержит в себе достаточно информации.


Поэтому было решено ввести собственный класс расширенной ошибки rotor::extended_error_t, который содержит std::error_code, std::string в качестве контекста (обычно это идентичность актора), а также умный указатель на следующую расширенную ошибку, что вызвала текущую. Теперь схлопывание иерархии акторов может быть представлено цепочкой расширенных ошибок, где причина выключения каждого актора может быть отслежена:


struct my_supervisor_t : public r::supervisor_t {    void on_child_shutdown(actor_base_t *actor) noexcept override {        std::cout << actor->get_identity() << " died, reason :: " << actor->get_shutdown_reason()->message();    }}

выведет что-то вроде:


actor-2 due to supervisor shutdown has been requested by supervisor <- actor-1 due initialization failed

Вместе с идентификацией акторов данная возможность предоставляет больше диагностических инструментов для разработчиков, которые используют rotor.

Подробнее..

Развитие проекта arataga пара рефакторингов по результатам натурных испытаний

27.05.2021 10:19:54 | Автор: admin

OpenSource-проект arataga -- это работающий прототип производительного socks5+http/1.1 прокси-сервера. Реализован arataga на базе Asio, SObjectizer и RESTinio. Об arataga уже рассказывалось несколько месяцев назад именно как о хорошем примере того, как выглядит реальный код на SObjectizer-е. Ведь одно дело повествовать о сильных сторонах SObjectizer-а с иллюстрациями из игрушечных примеров. Совсем другое -- иметь возможность показать почти что продакшен-код.

За время, прошедшее с первой публикации, удалось погонять arataga под более серьезной нагрузкой. Можно сказать, повезло провести первые натурные испытания.

Эти испытания выявили пару узких мест, устранение которых, как мне показалось, можно привести в качестве хороших примеров применения возможностей SObjectizer-а в ситуациях "вот здесь не то, что нам хотелось бы, и это нужно как можно быстрее исправить".

В частности, мы сегодня поговорим о том, что взаимодействие агентов друг с другом только посредством асинхронных сообщений -- это не догма. И что диспетчеры SObjectizer-а влияют на безопасность многопоточного программирования не менее серьезно, чем этот самый обмен сообщениями.

Собственная реализация взаимодействия с DNS

Первым узким местом, которые выявили натурные испытания, стала процедура резолвинга доменных имен.

Отказ от Asio-шного async_resolve

В первой версии arataga ради экономии времени мы не стали делать реализацию взаимодействия с DNS посредством UDP. Вместо этого использовали средства для асинхронного резолвинга, которые доступны в Asio из коробки.

Как я понял, Asio для выполнения async_resolve использует дополнительную рабочую нить, на которой делает обычные синхронные обращения к ОС для преобразования доменного имени в набор IP-адресов. И, если одно такое обращение "притормозит", то будут приостановлены и все последующие обращения к async_resolve. Кроме того, если нам нужно выполнить резолвинг сразу нескольких имен, то параллельно мы этого сделать не можем. Резолвинг будет выполняться последовательно.

В итоге, когда потребовалось обслуживать множество параллельных преобразований доменных имен в IP-адреса Asio-шный async_resolve буквально "вставал колом". Некоторым клиентам результат операции async_resolve приходилось ждать по несколько десятков секунд (максимальные значения, насколько я помню, колебались в районе от 35 до 40 секунд, в зависимости от нагрузки).

Естественно, это никуда не годилось: при наступлении таких тормозов у большинства клиентов просто истекали тайм-ауты и куча соединений тупо отваливалась. Поэтому мы поменяли работу с DNS на свою собственную реализацию.

Работа с DNS на уровне агентов: было и стало

Не думаю, что есть смысл рассказывать о том, как именно было реализовано общение с DNS-серверами посредством UDP-датаграмм, поскольку вся эта работа сосредоточена внутри одного агента a_namesever_interactor_t и никто деталей этой работы не видит.

Гораздо интереснее рассказывать о том, как резолвинг доменных имен представлен на уровне взаимодействия агентов в arataga.

Первоначальная схема была тривиальной: был некий интерфейсный почтовый ящик (mbox) и два сообщения -- resolve_request_t и resolve_reply_t. За этим почтовым ящиком скрывался единственный агент a_dns_resolver_t, который и отвечал за процедуру резолвинга доменных имен: держал кэш уже обработанных имен, организовывал список ждущих своей очереди доменных имен, дергал async_resolve и обрабатывал результаты резолвинга.

Важный момент, на который хотелось бы обратить внимание -- это то, что само существование dns_resolver было неизвестно другим частям arataga. Механизм резолвинга доменных имен был практически в буквальном смысле "черным ящиком". Просто есть некий mbox, в который нужно отсылать сообщения resolve_request. В каждом resolve_request передается mbox, на который затем прилетает resolve_reply. И это все, что требовалось знать.

Поэтому когда старая реализация резолвинга доменных имен была практически полностью выброшена и заменена новой, то этого внутри arataga никто и не заметил.

Новая же реализация потребовала двух разных агентов вместо одного.

Во-первых, это агент interactor::a_nameserver_interactor_t, который непосредственно общается с DNS-серверами по UDP. Агент nameserver_interactor отвечает за преобразование конкретного запроса на резолвинг в соответствующую UDP-датаграмму и обработку ответов от DNS-серверов.

Во-вторых, это агент lookup_conductor::a_conductor_t, который принимает входящие resolve_request, проверяет наличие результатов в кэше, выстраивает в очередь запросы, для которых нужен резолвинг, и отправляет команды агенту nameserver_interactor.

А вот с conductor все не так просто :)

И самое любопытное здесь то, что на самом деле conductor-ов два.

Первоначально планировалось использовать всего один экземпляр conductor. Который бы обрабатывал и IPv4, и IPv6. По аналогии с тем, как это делал dns_resolver из первых версий arataga: dns_resolver в случае успешного результата получал список IP-адресов, в котором присутствовали и IPv4, и IPv6 адреса. И когда приходил resolve_request_t для получения IPv4 адреса, то dns_resolver выбирал из списка IPv4 адрес. А когда приходил resolve_request для IPv6 адреса, то dns_resolver искал в списке IPv6 адрес (либо же конвертировал в IPv6 один из IPv4 адресов).

Такая схема была принята потому что Asio-шный async_resolver мог вернуть в случае успеха список с двумя типами адресов.

И при рефакторинге взаимодействия с DNS-серверами предполагалось, что таким же образом будет работать и nameserver_interactor.

Но в процессе тестирования nameserver_interactor с реальными DNS-серверами выяснилось, что работает либо запрос ресурсной записи типа A, либо запрос ресурсной записи типа AAAA. Но вот если отослать в одной UDP-датаграмме сразу два запроса (и для A, и для AAAA), то ответ придет только на один из них.

Посему имея на руках почти готовые реализации conductor и nameserver_interactor нужно было что-то оперативно решать. Понятно, что при обращении к nameserver_interactor придется явно указывать тип IP-адреса (IPv4 или IPv6). Но не понятно, как быть с кэшированием результатов и выстраиванием запросов в очереди.

Усложнять conductor и делать в нем отдельные кэши/очереди для разных типов IP-адресов? Вести общий кэш, в котором будут хранится адреса обоих типов и инициировать запрос к nameserver_interactor, если адресов нужного типа в кэше нет?

Тратить дополнительное время на рефакторинг conductor не хотелось, поэтому в модном и молодежном стиле "фигак-фигак и в продакшен" был применен метод грубой силы: вместо одного conductor-а запускается сразу два. Один обслуживает запросы для IPv4, второй -- запросы для IPv6. И оба они подписываются на сообщения resolve_request из интерфейсного mbox-а.

Вопрос был лишь в том, чтобы разрулить поток resolve_request между этими двумя агентами. Нужно был как-то сделать так, чтобы один агент получал только запросы для IPv4, а второй -- только запросы для IPv6.

Сделать это оказалось совсем не сложно. Были применены фильтры доставки для сообщений.

Фильтры доставки -- это предикаты, которые агент может "навесить" на сообщения из конкретного multi-consumer mbox-а. Когда сообщение отсылается в mbox, то mbox обращается к предикату с вопросом "можно ли доставлять конкретно этот экземпляр сообщения до твого владельца?" Если предикат говорит, что можно, то сообщение доставляется. Если нет, то игнорируется.

Благодаря фильтрам доставки агента conductor не пришлось переделывать для того, чтобы запускать его в двух экземплярах. Все, что потребовалось сделать -- это добавить установку фильтра перед подпиской:

voida_conductor_t::so_define_agent(){// We want to receive only requests for our IP-version.so_set_delivery_filter(m_incoming_requests_mbox,[ip_ver = m_ip_version]( const resolve_request_t & req ) {return ip_ver == req.m_ip_version;} );so_subscribe( m_incoming_requests_mbox ).event( &a_conductor_t::on_resolve );

Каков результат этих изменений?

Главный результат -- это возможность параллельного выполнения множества операций резолвинга имен без формирования длинных очередей и долгого ожидания в этих очередях. В итоге по тайм-ауту отваливаются только те запросы, на которые не отвечают сами DNS-сервера. А это на практике происходит очень и очень редко. Так что количество соединений, не получивших обслуживания из-за тайм-аутов DNS, сократилось драматически.

Таймеры для acl_handler-ов

Первые запуски arataga под реальной нагрузкой показали, что потребление ресурсов у arataga не настолько низкое, как хотелось бы. Что и не удивительно, т.к. arataga все еще в стадии прототипа и под высокой тестов было совсем мало, так что узких мест там наверняка множество.

Одно из них оказалось прямо на поверхности, не пришлось даже глубоко копать.

Дело в том, что внутри arataga есть специальное сообщение one_second_timer, которое генерируется раз в секунду и которое используется acl_handler-ами для разных целей: и для контроля тайм-аутов текущих операций, и для пересчета лимитов по подключениям.

Дабы не тратить время при выпуске первой версии arataga была использована самая простая схема: каждый acl_handler просто подписывается на one_second_timer. Что означает, что когда это сообщение возникает, то оно доставляется всем подписчикам.

Т.е., если у нас создано 15K acl_handler-ов, то раз в секунду у них у всех запускается обработчик one_second_timer. Что и увеличивает потребление CPU.

Проблема в том, что далеко не у всех из этих 15K acl_handler-ов в настоящий момент есть необходимость реагировать на one_second_timer. У части acl_handler-ов не будет вообще подключений, которые нужно обслуживать. И доставлять one_second_timer до таких acl_handler-ов нет смысла.

Только вот если агент подписался на сообщение, то сообщение к нему будет доставлено. Даже если сейчас агент в нем и не заинтересован.

Поэтому встал вопрос о том, как сделать так, чтобы на таймер реагировали только те acl_handler-ы, которые в таймере сейчас заинтересованы. И сделать это дешево.

Какие подходы нельзя назвать хорошими?

Фильтры доставки

Можно было бы попробовать задействовать фильтры доставки (о которых речь уже шла выше). Так, фильтр доставки при отсылке сообщения проверял бы наличие принятых подключений у acl_handler-а и, если подключений нет, то запрещал бы доставку сообщения.

Тут сразу несколько проблем.

Во-первых, отсылка сообщения one_second_timer идет с контекста специальной нити таймера, которой управляет сам SObjectizer. И на этой же нити в процессе отсылки сообщения запускаются фильтры доставки. Следовательно, если фильтр доставки хочет обратиться к каким-то потрохам acl_handler-а, то эти потроха должны быть как-то защищены с точки зрения thread-safety. Например, посредством mutex-а. Что отнюдь не бесплатно. И затрудняет реализацию самого acl_handler-а, т.к. внутри агента нужно заботиться о thread-safety, хотя агенты как раз и нужны, чтобы такими вещами не заниматься.

Во-вторых, если у нас много acl_handler-ов, то запуск фильтров доставки для каждого них -- это ведь такая же лишняя работа, как и доставка one_second_timer. Да, это может быть чуть дешевле. Но именно что чуть.

Так что фильтры доставки, наверное, могли бы немного снизить актуальность проблемы, но точно не устранили бы ее.

Подписка/отписка на/от one_second_timer

Если нужно доставлять one_second_timer только до агентов, которые обслуживают принятые подключения, то напрашивается простой и логичный подход: пусть acl_handler при принятии первого подключения создает подписку на one_second_timer, а при потере последнего подключения -- отписывается от one_second_timer.

Это нормальный подход. По крайней мере он идеологически правильный и не ведет к каким-либо проблемам с thread-safety.

Но его нельзя назвать совсем уж дешевым. Ведь подписка на mbox -- это захват нескольких объектов синхронизации (в mbox-е и в агенте-подписчике) + несколько аллокаций памяти на создание объектов-подписок.

А хотелось бы, чтобы работа с one_second_timer была еще дешевле.

Использованный подход

Для того, чтобы еще более снизить расходы на доставку one_second_timer только до тех, кто в этом нуждается, был применен следующий трюк.

Введено понятие timer_provider. Это интерфейс объекта, который должен присутствовать на каждой io_thread в единственном числе. Его задачей является периодический вызов метода on_timer у привязанных к этой же io_thread объектов timer_consumer. Но не у всех timer_consumer, а только у тех, кто заявил о себе timer_provider-у. Т.е., когда у timer_consumer появляется потребность в обработке таймера, он вызывает у timer_provider-а метод activate_consumer, а когда такая потребность исчезает -- вызывает метод deactivate_consumer. Таким образом у timer_provider есть список активных timer_consumer-ов.

За интерфейсом timer_provider скрывается простой агент, который подписывается на one_second_timer. Получив это сообщение timer_provider просто бежит по своему списку активных timer_consumer-ов и вызывает у них on_timer.

timer_consumer-ами являются acl_handler-ы. Когда у acl_handler-а (он же timer_consumer) появляется первое принятое подключение, то acl_handler добавляет себя в список к timer_provider-у. А когда подключений не остается acl_handler вычеркивает себя из списка timer_provider-а.

Фокус в том, что все взаимодействующие в рамках одной io_thread timer_provider и timer_consumer-ы привязанны к одному и тому же диспетчеру. Что гарантирует, что они работают на одной и той же рабочей нити. А значит могут обращаться друг к другу не боясь приключений с thread-safety.

Это отличный пример того, как при работе с SObjectizer-ом отказаться от взаимодействия с агентами только посредством асинхронного взаимодействия и выполнять синхронные вызовы методов агентов, но при этом не иметь проблем с thread-safety. Ведь диспетчеры в SObjectizer-е и были введены как раз для того, чтобы программист мог управлять тем где и как работают его агенты. И, если у программиста есть возможность директивно привязать своих агентов к конкретной нити, то почему бы этим не пользоваться. По крайней мере в условиях, когда хочется выжимать максимум.

Защита от повисших указателей

Сейчас timer_provider и timer_consumer хранят указатели друг на друга. Но, при этом агенты, реализующие данные интерфейсы, принадлежат разным кооперациям. Т.е. уничтожаться они могут в разном порядке.

Когда у acl_handler-а (т.е. у timer_consumer-а) вызывается so_evt_finish (это последнее событие для агента перед дерегистрацией), то acl_handler вычеркивает себя из списка timer_provider-а. Так что у timer_provider-а не может остаться повисших указателей на timer_consumer-ы (в теории, по крайней мере).

Но что с самим указателем на timer_provider? Как сделать так, чтобы он оставался валидным до тех пор, пока жив хотя бы один acl_handler?

Очень просто: кооперация с агентом timer_provider выступает в качестве родительской для всех коопераций с acl_handler-aми. В этом случае SObjectizer гарантирует, что родительская кооперация не будет уничтожена пока есть хотя бы одна живая дочерняя кооперация. Т.е. указатель на timer_provider внутри timer_consumer-ов будет оставаться валидным.

Каков результат этих изменений?

В зависимости от конфигурации, количества ACL внутри arataga и входящей нагрузки расход CPU на тестовых прогонах снизился от 1.5 до 4-х раз.

Заключение

Для нас arataga получился интересным полигоном для еще одних натурных испытаний SObjectizer и RESTinio. Да еще и с возможностью открыто показать как выглядит реальный код на SObjectizer-е.

Надеюсь, что читателям, которые следят за SObjectizer, было интересно. Если есть какие-то вопросы или непонятные моменты, то я с удовольствием отвечу на них в комментариях.

В завершение хочу сказать вот что: не смотря на то, что пока у нас нет ресурсов на дальнейшее развитие своего OpenSource, тем не менее проекты не заброшены. Если кто-то находит ошибки, мы их исправляем. Если у кого-то возникают вопросы, то стараемся помочь. Иногда даже выпускаем обновления с мелкими улучшениями.

Так что, если у кого-то есть сомнения о том, брать или не брать SObjectizer/RESTinio/json-dto в работу, то отбросьте их и попробуйте. В случае сложностей или проблем смело обращайтесь к нам, без помощи не оставим.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru