Русский
Русский
English
Статистика
Реклама

Message-passing

Релиз акторного фреймворка rotor v0.09 (c)

10.10.2020 20:06:06 | Автор: admin

actor system


rotor ненавязчивый С++ акторный микрофремворк, похожий на своих старших братьев caf и sobjectizer. В новом релизе внутреннее ядро полностью было переделано с помощью механизмов плагинов, так что это затронуло жизненный цикл акторов.


Связывание акторов


Всякая система акторов базируется на взаимодействии между ними, т.е. в отправлении сообщений друг другу (а также в возможных побочных эффектах в качестве реакции на эти сообщения или в создании новых сообщений, появляющихся в качестве реакции на события внешнего мира). Однако, чтобы сообщение было доставлено целевому актору, он должен оставаться активным (1); другими словами, если актор A собирается отправить сообщение М актору B, он должен быть уверен, что актор B онлайн и не будет выключен в процессе пересылки сообщения M.


До версии v0.09 подобная гарантия была только для отношений родитель/потомок, между супервайзером и дочерним актором, т. к. для последнего выполняется гарантия того, сообщение будет доставлено до его супервайзера в силу того, что супервайзер владеет дочерним актором, и его время жизни покрывает времена жизни всех своих дочерних акторов. Начиная с версии v0.09 появилась возможность связывания двух произвольных акторов A и B, так что после подтверждения связи (link), можно быть уверенным, что все последующие сообщения будут доставлены.


Для связывания акторов можно использовать такой код:


namespace r = rotor;void some_actor_t::on_start() noexcept override {    request<payload::link_request_t>(b_address).send(timeout);}void some_actor_t::on_link_response(r::message::link_response_t &response) noexcept {    auto& ec = message.payload.ec;    if (!ec) {        // успех связывания    }}

Однако, данный код не рекомендуется использовать напрямую потому что он не очень удобен. Это становится очевидным при попытке связать актор A с двумя и более акторами, т.к. some_actor_t должен будет вести внутренний счётчик успешных связываний. В данном случае помогает система плагинов, и код будет уже выглядеть так:


namespace r = rotor;void some_actor_t::configure(r::plugin::plugin_base_t &plugin) noexcept override {    plugin.with_casted<r::plugin::link_client_plugin_t>(        [&](auto &p) {            p.link(B1_address);            p.link(B2_address);        }    );}

Это более удобно в виду того, что плагин link_client_plugin_t поставляется в базовом классе всех акторов actor_base_t. Тем не менее, это скорей всего не всё, что хотелось бы иметь, т.к. остаются не отвеченными важные вопросы: 1) Когда происходит связывание акторов (и обратный вопрос когда происходит их разъединение)? 2) Что случится, если целевой актор ("сервер") не существует или откажет в связывании? 3) Что случится если целевой актор решит выключиться, в то время как есть связанные с ним акторы-клиенты?


Чтобы ответить на этот вопрос нужно рассмотреть жизненный цикл актора.


Асинхронная инициализация и выключение актора


Упрощённо жизненный цикл актора (состояние, state) выглядит следующим образом: new (ctor) -> initializing -> initialized -> operational -> shutting down -> shut down.



Основная работа осуществляется актором, когда он находится в состоянии operational, и здесь собственно пользователь фреймворка должен решить, чем именно будет заниматься актор.


Во время фазы инициализации (I-фазы, т.е. initializing -> initialized), актор подготавливает себя для будущей работы: находит и связывается с другими акторами, устанавливает соединение с БД, получает необходимые ему ресурсы для полноценной работы. Ключевая особенность rotor'а, что I-фаза асинхронна, т. е. актор сообщает супервайзеру, когда он готов (2).


Фаза выключения (S-фаза, т.е. shutting down -> shut down ) комплиментарна I-фазе, т.е. актора просят выключится, а когда он готов, он сообщает об этом своему супервайзеру.


Несмотря на кажущуюся простоту, основная сложность лежит здесь в масштабируемости (composability) подхода, при котором акторы формируют эрланго-подобные иерархии ответственностей (см. мою статью Trees of Supervisors). Перефразируя, можно сказать, что любой актор может дать сбой во время I- или S-фазы, что может повлечь за собой безопасный и ожидаемый коллапс всей иерархии независимо от местоположения актора в ней. Конечная цель в итоге это либо вся иерархия приходит в рабочее состояние (operational), либо она в конце концов становится выключенной (shut down).



(Пояснение к картинке. Сплошная линия обозначение отношение владения, пунктирная отношения связи).


rotor уникален в этом отношении. Ничего подобного нет в caf. Может создаться ошибочное представление, что в sobjectizer'е присутствует shutdown helper, предназначение которого аналогично S-фазе выше; однако, после публичных дискуссий с автором в англоязчыной версии статьи, выяснилось, что данные вспомогательные классы нужны для "длительного" (гарантированного) выключения акторов, даже если в Environment'е был вызван метод stop. С точки зрения sobjectizer'а отложенная инициализация (и выключение), аналогичные I- и S-фазам rotor'a, могут быть смоделированы с помощью встроенного механизма поддержки состояний и это обязанность пользователя фрейворка, если такова потребность его бизнес-логики. В rotor'е же это встроено в сам фреймворк.


При использовании rotor'а было обнаружено, что во время I-фазы (S-фазы) потенциально множество ресурсов должны быть получены (освобождены) асинхронно, что обозначает, что нет единого компонента, способного решить, что текущая фаза завершена. Вместо этого это решение есть плод совместных усилий, приложенных в определённом порядке. И здесь в игру вступают плагины, представляющие из себя атомарные кусочки, каждый из которых ответственен за собственную работу по инициализации или выключению.


Что же такое плагин в контексте rotor'а? Плагин это некий аспект поведения актора, определяющий реакцию актора на некоторые сообщения или группу сообщений. Лучше пояснить на примерах. Плагин init_shutdown, ответственен за инициализацию (выключение) актора, т. е. после опроса о готовности всех плагины, генерируется ответ на запрос о готовности инициализации (выключения); или, например, плагин child_manager, доступный только для супервайзеров, и ответственный за порождение дочерних акторов и всю машинерию связанную с этим, как то генерация запросов дочерним акторам на инициализацию, выключение и т. п. Несмотря на то, что существует возможность свои плагины, на текущий момент я не вижу необходимости в этом, поэтому она остаётся недокументированной.


Таким образом, обещанные ответы, относящиеся к link_client_plugin_t:


  • В: когда происходит связывание (отвязывание) актров? О: когда актор в состоянии initializing (shutting down).


  • В: что случится, если целевой актор не существует или отказывает в связывании? О: т. к. это случается во время инициализации актора, то плагин обнаружит это условие и начнёт выключение актора-клиента; также, возможно, это вызовет каскадный эффект, т.е. его супервайзер тоже выключится и так далее вверх по иерархии владения.


  • В: что случится, если целевой актор решит выключиться, при том, что с ним связаны активные акторы-клиенты? О: актор-сервер попросит клиентов отвязаться, и только когда все связанные клиенты подтвердят это, актор-сервер продолжит процедуру выключения (3).



Упрощённый пример


Будем предполагать, что имеется драйвер базы данных с асинхронным интерфейсом для одного из движков событий (event loop), доступных для rotor'а, а также что имеются TCP-клиенты, подключающиеся к нашему сервису. За обслуживание базы данных будет отвечать актор db_actor_t, а принимать клиентов будет acceptor_t. Начнём с первого:


namespace r = rotor;struct db_actor_t: r::actor_base_t {    struct resource {        static const constexpr r::plugin::resource_id_t db_connection = 0;    }    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        plugin.with_casted<r::plugin::registry_plugin_t>([this](auto &p) {            p.register_name("service::database", this->get_address())        });        plugin.with_casted<r::plugin::resources_plugin_t>([this](auto &) {            resources->acquire(resource::db_connection);            // инициировать асинхронное соединение с базой данных        });    }    void on_db_connection_success() {        resources->release(resource::db_connection);        ...    }    void on_db_disconnected() {        resources->release(resource::db_connection);    }    void shutdown_start() noexcept override {        r::actor_base_t::shutdown_start();        resources->acquire(resource::db_connection);        // асинхронное закрытие соединения с базой данных и сброс данных    }};

Внутреннее пространство имён resource используется для идентификации соединения с БД как ресурсом. Это общепринятая практика, чтобы не использовать в коде магические цифры вроде 0. Во время конфигурации, которая является частью инициализации, когда плагин registry_plugin_t готов, он асинхронно зарегистрирует адрес актора в регистре (о нём будет рассказано позже). Затем с помощью resources_plugin_t захватывается "ресурс" подключения к БД, чтобы блокировать дальнейшую инициализацию актора и начинается соединение с БД. Когда будет подтверждено соединение с БД, ресурс будет освобождён и актор db_actor_t перейдёт в рабочее состояние. S-фаза аналогична: блокируется выключение до тех пор, пока все данные не будут сброшены в БД и пока соединение не будет закрыто; после этого процедура выключения актора завершается (4).


Код актора, который будет принимать клиентов, выглядит приблизительно так:


namespace r = rotor;struct acceptor_actor_t: r::actor_base_t {    r::address_ptr_t db_addr;    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        plugin.with_casted<r::plugin::registry_plugin_t>([](auto &p) {            p.discover_name("service::database", db_addr, true).link();        });    }    void on_start() noexcept override {        r::actor_base_t::on_start();        // начать приём клиентов, например:        // asio::ip::tcp::acceptor.async_accept(...);    }    void on_new_client(client_t& client) {        // send<message::log_client_t>(db_addr, client)    }};

Основное в данном случае, это метод configure. Когда плагин registry_plugin_t готов, он будет сконфигурирован на обнаружение сервиса service::database, а когда адрес db_actor_t будет найден и сохранён в члене класса db_addr, то тогда с ним будет произведено связывание. Если же адрес актора service::database не будет обнаружен, то актор acceptor_actor_t начнёт выключаться (т. е. on_start не будет вызван). Если всё будет успешно проинициализировано, то актор начнёт принимать новых клиентов.


Собственно, сама операционная часть была опущена ради краткости, т. к. она не изменилась в новой версии rotor'а. Как обычно нужно определить полезную нагрузку (payload), сообщения, методы для работы с этими сообщениями и подписаться на них.


Скомпонуем всё вместе в файле main.cpp; будем считать, что используется boost::asio в качестве цикла событий.


namespace asio = boost::asio;namespace r = rotor;...asio::io_context io_context;auto system_context = rotor::asio::system_context_asio_t(io_context);auto strand = std::make_shared<asio::io_context::strand>(io_context);auto timeout = r::pt::milliseconds(100);auto sup = system_context->create_supervisor<r::asio::supervisor_asio_t>()               .timeout(timeout)               .strand(strand)               .create_registry()               .finish();sup->create_actor<db_actor_t>().timeout(timeout).finish();sup->create_actor<acceptor_actor_t>().timeout(timeout).finish();sup->start();io_context.run();

Как видно, в новом rotor'е активно используется шаблон builder. С помощью него создаётся корневой супервайзер sup, а уже он в свою очередь порождает 3 актора: два пользовательских (db_actor_t и acceptor_actor_t) и неявно созданный актор-регистр. Как обычно для акторных систем, все акторы слабо связаны друг с другом, т.к. они разделяют только общий интерфейс сообщений (опущено в статье).


Акторы "просто создаются" в данном месте, без знания о том, как они связаны между собой. Это следствие слабой связанности между акторами, которые с версии v0.09 стали более автономными.


Конфигурация выполнения могла бы быть полностью другой: акторы могли бы создаваться на разных потоках, на разных супервайзерах, и даже на различных движках, но их реализация оставалась бы по сути одной и той же (5). В этих случаях было бы несколько корневых супервайзеров, однако актор-регистр должен быть создан один, и его адрес разделён между всеми супервайзерами, чтобы акторы смогли найти друг друга. Для этого существует метод get_registry_address() в базовом супервайзере.


Итоги


Наиболее существенным изменением в новой версии rotor'а является разбиение на плагины его ядра. Наиболее важными для пользователя фрейморка являются плагины: link_client_plugin_t, позволяющий установить виртуальное соединение между акторами, плагин registry_plugin_t, дающий возможность регистрации и обнаружения адресов акторов по символическим именам, а также плагин resources_plugin_t, с помощью которого можно приостановить процедуру инициализации (выключения) до появления некоторого асинхронного внешнего события.


Так же присутствуют менее важные изменения в новом релизе, такие как система доступа к непубличным
свойствам и шаблон builder для интанцирования акторов.


Любая обратная связь приветствуется.


P.S. Я хотел бы поблагодарить Crazy Panda за поддержку в моих начинаниях по развитию данного проекта, а также автора sobjectizer'а за высказанные им критические замечания.


Примечания


(1) В настоящее время попытка доставки сообщения актору, супервайзер которого уже отработал и был удалён, ведёт к краху программы (UB).


(2) Если актор не подтвердит успешную инициализацию супервайзеру, сработает таймер инициализации, и супервайзер сделает запрос на выключение актора, т.е. состояние operational будет пропущено.


(3) Может возникнуть вопрос, что произойдёт, если актор не подтвердит отвязывание вовремя? Это нарушение контракта, и будет вызван метод system_context_t::on_error(const std::error_code&), который распечатает ошибку на консоль и вызовет std::terminate(). Для избегания нарушений контрактов, нужно настраивать таймеры, чтобы позволить акторам-клиентам во время отвязаться.


(4) Во время процедуры выключения плагин registry_plugin_t проинструктирует регистр, чтобы все зарегистрированные имена текущего актора были удалены из регистра.


(5) Исключение составляет, когда используются различные циклы событий. Если актор использует API цикла событий, то, очевидно, что смена цикла событий повлечёт переписывание внутренностей актора. Тем не менее, это никак не затронет использование API rotor'а.

Подробнее..

Не хочется ждать в очереди? Напишем свой диспетчер для SObjectizer с приоритетной доставкой

07.12.2020 12:09:16 | Автор: admin


SObjectizer это небольшой фреймворк для C++, который дает возможность разработчику использовать такие подходы, как Actor Model, Communicating Sequential Processes и Publish/Subscribe.


Одной из ключевых концепций в SObjectizer являются диспетчеры. Диспетчеры определяют где и как акторы (агенты в терминологии SObjectizer-а) обрабатывают свои события. Диспетчеры в SObjectizer бывают разных типов и пользователь может создавать в своем приложении столько разнообразных диспетчеров, сколько ему потребуется.


При этом у пользователя есть возможность написать свой собственный тип диспетчера, если ничего из стандартных диспетчеров ему не подходит. Два с половиной года назад уже рассказывалось о том, как можно сделать диспетчер для SObjectizer-а со специфическими свойствами.


Сегодня мы еще раз поговорим об этом. На примере уже другой задачи. Да и реализация будет отличаться, поскольку за прошедшее время SObjectizer успел обновиться сперва до версии 5.6, а затем и 5.7. И в этих версиях много отличий от версии 5.5, про которую в основном и рассказывалось в прошлом. В том числе и в механизме диспетчеров.


О решаемой задаче в двух словах


Предположим, что у нас есть агент, который ожидает два сообщения: msg_result с финальным результатом ранее начатой агентом операции и msg_status с промежуточной информацией о том, как протекает начатая операция.


Сообщения msg_status могут идти большим потоком. Например, на одно msg_result может приходиться до 1000 msg_status. И нам бы хотелось, чтобы когда в очереди уже стоит 900 сообщений msg_status новое сообщение msg_result вставало не в конец очереди, а в самое ее начало. Чтобы msg_result не ждало пока разгребутся 900 старых msg_status.



Вроде бы простая задача. Но с ее реализацией могут возникнуть неожиданные сложности. И чтобы понять, что это за сложности и как с ними справится, нужно посмотреть на SObjectizer-овские диспетчеры и понять, что это за звери такие. После чего можно будет обсудить возможные способы реализации доставки сообщений с учетом их приоритетов. Воплотить один из вариантов в жизнь и обсудить его реализацию.


Как диспетчеры связаны с политикой доставки сообщений до агентов?


Диспетчер в SObjectizer-е это сущность, которая определяет где и когда агенты будут обрабатывать свои сообщения.


Допустим, у нас есть агент Bob, который подписывается на сообщение msg_result из почтового ящика mbox. Когда в mbox отсылается msg_result, то это сообщение должно встать в очередь Bob-а, откуда оно будет рано или поздно извлечено и обработано. И вот здесь уже в полный рост встают диспетчеры, поскольку у агентов нет никаких очередей, тогда как у диспетчеров есть.


Когда агент регистрируется в SObjectizer-е, то агент привязывается к какому-то диспетчеру. И в момент привязки агенту дается указатель на сущность event_queue. Это интерфейс, за которым скрыта некая машинерия по передаче сообщения, адресованного агенту, именно тому диспетчеру, к которому агент привязан.


Когда сообщение передается диспетчеру, тот формирует заявку (demand) на обработку сообщения агентом и сохраняет заявку где-то у себя (очередь заявок диспетчера называется demand_queue).


Процесс доставки сообщения до агента в SObjectizer выглядит следующим образом:



Когда сообщение отсылается в mbox, то mbox видит Bob-а в подписчиках и говорит Bob-у: вот тебе новое сообщение. Bob берет это сообщение и передает его в свой event_queue. И уже этот event_queue формирует для диспетчера заявку (demand) на обработку сообщения для агента Bob. Заявка сохраняется в demand_queue диспетчера.


Диспетчер владеет одной или несколькими рабочими нитями, которые занимаются тем, что извлекают заявки из demand_queue и отправляют их на обработку. Эти рабочие нити диспетчера называются рабочим контекстом на котором будут работать агенты.


В общем, получается, что диспетчеры и предоставляют рабочий контекст для агентов, и хранят в своих demand_queue адресованные агентам сообщения.


А это означает, что когда нам нужна приоритетная доставка сообщений (т.е. msg_result вперед msg_status), то нам потребуется диспетчер, который эту самую приоритетную доставку реализует.


А приоритетов для сообщений в SObjectizer-5 и нет :(


Совсем.


Вот так вот.


Приоритеты для сообщений были в SObjectizer-4. Но они на практике использовались всего раз или два. Зато хлопот с их существованием и поддержкой было много.


Поэтому при разработке SObjectizer-5 от приоритетов для сообщений агента было решено отказаться. И за 10 лет развития и использования SObjectizer-5 пожалеть об этом пока что не пришлось.


Однако, время от времени задачи, в которых требуется какое-либо приоритетное обслуживание, все-таки встречаются. Поэтому некоторое время назад в SObjectizer-5 было добавлено понятие приоритета для агента. Именно для агента, а не для отдельного сообщения. Для поддержки приоритетов агентов в SObjectizer-5 появились три диспетчера, которые реализуют разные политики обслуживания приоритетов.


Так как можно решить задачу с msg_result и msg_status?


Использование двух агентов с разными приоритетами и диспетчера one_thread::strictly_ordered


Самое "простое" и "искаробочное" решение.


Логика агента A размазывается на двух агентов A_result и A_status. Агенты A_result и A_status получают разные приоритеты и привязываются к одному и тому же диспетчеру типа one_thread::strictly_ordered. Агент A_result подписывается на msg_result, тогда как агент A_status подписывается на msg_status.


Общие для агентов данные выносятся в какой-то общий объект, которыми они совместно владеют. Через shared_ptr, например. Получится что-то вроде:


struct A_data { ... };class A_result final : public so_5::agent_t {   std::shared_ptr<A_data> m_data;public:   A_result(context_t ctx, std::shared_ptr<A_data> data) {...}   void so_define_agent() override {      so_subscribe(m_data->m_mbox, &A_result::on_result);   }private:   void on_result(const msg_result & msg) {...}};class A_status final : public so_5::agent_t {   std::shared_ptr<A_data> m_data;public:   A_status(context_t ctx, std::shared_ptr<A_data> data) {...}   void so_define_agent() override {      so_subscribe(m_data->m_mbox, &A_status::on_status);   }private:   void on_status(const msg_status & msg) {...}};

Разделять общие данные между агентами A_result и A_status безопасно до тех пор, пока они работают на one_thread::strictly_ordered диспетчере.


Собственно говоря, это тот подход, который я бы и рекомендовал использовать, если вам потребовалась приоритетная обработка событий.


Но у этого способа есть очевидный недостаток: трудоемкость и размазывание логики по нескольким сущностям. Эта трудоемкость будет увеличиваться по мере увеличения количества сообщений, которые вам потребуется обрабатывать с разными приоритетами. Так что в какой-то момент может показаться, что двигаться в этом направлении слишком дорого и что нужно искать другое решение.


Собственный диспетчер с приоритетами для сообщений


Другой подход состоит в том, чтобы сделать собственный диспетчер, который будет доставлять сообщения до агентов-получателей с учетом приоритетов сообщений. Благо, если не заморачиваться вопросами мониторинга работы диспетчера и сбора статистики, то это делается не так уж и сложно.


Если такой диспетчер будет написан, то можно избавить себя от недостатков подхода с несколькими разноприоритетными агентами: вся логика будет находится в одном месте, поэтому добавлять новые обрабатываемые сообщения или избавляться от каких-то старых сообщений будет гораздо проще.


Именно по этому пути мы и пойдем в данной статье.


Что именно мы попытаемся сделать?


Мы попытаемся посмотреть на проблему немного шире (как это и положено русским программистам, которые ищут универсальные решения там, где можно обойтись методом грубой силы).


Начнем с того, что нам нужно собственный диспетчер. Который сможет упорядочивать сообщения в своей очереди согласно их приоритетов.


И тут возникает вопрос, а как должны определяться эти приоритеты?


А это зависит от задачи.


Где-то приоритеты будут определяться типом сообщения и эти приоритеты можно зафиксировать прямо в compile-time. Скажем, приоритет у msg_result единичка, а у msg_status нолик.


Где-то приоритеты могут зависеть от типа сообщения и агента-получателя. Скажем, для агента A сообщение msg_result будет иметь приоритет 1, а сообщение msg_status 0. Тогда как для агента L оба эти сообщения будут иметь приоритет 0.


Где-то приоритеты могут зависеть только от почтового ящика, из которого они были получены. Скажем все сообщения из mbox_A должны быть приоритетнее, чем сообщения из mbox_B.


А где-то приоритеты почтовых ящиков должны быть дополнены еще и приоритетами сообщений. Т.к. msg_result из mbox_A имеет приоритет 2, msg_status из mbox_A 1, msg_result из mbox_B 1, msg_status из mbox_B 0.


Ну и другие сочетания параметров так же возможны.


Значит ли это, что под каждое такое сочетание нужно делать свой диспетчер?


Скорее всего нет.


Возможно, диспетчер должен быть один, просто он должен уметь настраиваться на любой способ вычисления приоритета сообщений. Например, пользователь параметризует диспетчер объектом priority_detector который и определяет приоритет очередного сообщения: при получении очередной заявки диспетчер дергает priority_detector и располагает заявку в очереди согласно вычисленного priority_detector-ом приоритета.


Уже хорошо.


Но можно попробовать пойти еще дальше.


Диспетчеры были придуманы для того, чтобы дать возможность разработчику распределять своих агентов по тем рабочим контекстам, которые нужны разработчику. Скажем, захотел программист, чтобы агенты Alice и Bob работали на одной общей рабочей нити, значит зачем-то ему это нужно. Не суть важно зачем: для экономии ресурсов или для того, чтобы облегчить Alice и Bob совместную работу над общими разделяемыми данным. Разработчик хочет привязать Alice и Bob к общему контексту, значит он должен суметь это сделать.


А теперь представим себе, что Alice и Bob должны работать на общем рабочем контексте, но приоритеты доставки сообщений до Alice и Bob должны вычисляться по-разному. Скажем, приоритет сообщений для Alice зависит только от их типа. Тогда как приоритет для Bob-а зависит и от типа, и от почтового ящика.


Тут можно пойти, как минимум, двумя путями.


Первый путь. Диспетчер владеет одной приоритетной очередью. А мы пишем сложный priority_detector-а, который разбирается кто получатель сообщения, Alice или Bob, затем для каждого получателя определяет приоритет по тем или иным критериям.


Вполне понятное направление для движения. Но его трудоемкость будет расти по мере увеличения количества разнотипных агентов, которые нам нужно будет привязать к одному диспетчеру.


Второй путь. А что, если у каждого из агентов будет своя собственная приоритетная очередь? И в каждой очереди будет собственный priority_detector со своими правилами? Тогда мы сможем привязывать к одном диспетчеру агентов с разными политиками доставки сообщений. И в этом случае нам нужно научить диспетчера работать не с одной единственной очередью, в которую сваливается вообще все, а с набором отдельных очередей.


Мы пойдем вторым путем. Поскольку он открывает интересные возможности. Ведь если мы можем научить диспетчер обслуживать агентов с собственными очередями, то не обязательно это будут очереди с приоритетом. Например, это могут быть очереди фиксированного размера с автоматическим выбрасыванием самых старых (или самых новых) элементов при попытке добавления заявки в уже полную очередь. Или же очереди с контролем времени пребывания: скажем, если заявка простояла в очереди дольше 250ms, то она уже не актуальна и должна быть проигнорирована.


Демо проект so5_custom_queue_disps


Для иллюстрации предлагаемого решения на GitHub-е создан демо-проект so5_custom_queue_disps, который содержит исходные тексты обсуждаемого ниже диспетчера.


На данный момент в нем есть реализация только одного типа диспетчера one_thread, который привязывает всех агентов к одной единственной рабочей нити. Если данная тема кого-нибудь заинтересует, то туда же можно будет добавить и реализации thread_pool и/или adv_thread_pool диспетчеров. Для иллюстрации. Ну или для того, чтобы их можно было скопипастить, если вдруг кому-нибудь понадобятся.


Общая идея: список из непустых очередей


Идея, положенная в основу so5_custom_queue_disps, состоит в том, что есть N независимых друг от друга очередей заявок. В пределе у каждого агента может быть своя очередь заявок. Эти очереди живут до тех пор, пока кто-то их использует, как только очередь стала никому не нужна она уничтожается.


Диспетчер хранит у себя список указателей на очереди заявок. Если этот список пуст, значит все очереди заявок пусты. Как только в какую-то из них попадает заявка, эта очередь добавляется в конец списка диспетчера.


Диспетчер спит пока в его списке ничего нет. Но как только какая-то из очередей добавляется в список, то диспетчер просыпается и начинает обслуживать очереди из своего списка.


Диспетчер изымает из списка первую очередь. Извлекает заявку из очереди. Если после извлечения заявки очередь оказывается пустой, то диспетчер забывает про эту очередь. Если же в очереди что-то остается, то диспетчер возвращает эту очень в свой список, в самый его конец.


После чего диспетчер запускает обработку извлеченной заявки. А после завершения обработки проверяет свой список: если список не пуст, то обрабатывается следующий его элемент. Если пуст, то диспетчер засыпает.



Несколько агентов с одной очередью заявок и FIFO для агентов


Для большей гибкости в so5_custom_queue_disps есть возможность связать нескольких агентов с одной общей очередью заявок. Например, если агенты должны использовать общую политику доставки сообщений (скажем, у них общие приоритеты).


Но можно к one_thread диспетчеру привязать и агентов, у каждого из которых будет своя собственная очередь заявок. Получится, что агенты исполняются на одном рабочем контексте, но заявки для них диспетчер будет брать из разных очередей.


И тут возможны сюрпризы с обеспечением FIFO для агентов.


Предположим, что мы используем простые очереди заявок. Без приоритетов, без ограничения на размер, без выбрасывания чего-либо. Просто очереди.


Предположим, что агенты Alice и Bob разделяют одну общую очередь. И мы отсылаем сообщение M1 агенту Alice, а затем отсылаем сообщение M2 агенту Bob. Доставка сообщений до агентов произойдет в таком же порядке: сперва M1, затем M2.


Но вот если агенты Alice и Bob имеют независимые очереди заявок, то при отсылке вначале M1 агенту Alice, а затем M2 агенту Bob, порядок доставки может быть любым. Это зависит и от наполнения очередей заявок самих агентов, и от того, где эти очереди находились в списке непустых очередей диспетчера. Так что может случится и так, что M2 будет обработано агентом Bob еще до того, как M1 дойдет до Alice.


Эту особенность нужно иметь в виду при работе с таким диспетчером.


Как описанный выше подход выглядит для пользователя на практике?


Прежде чем перейти к рассмотрению деталей реализации custom_queue_disps::one_thread-диспетчера посмотрим на то, как выглядит использование этого диспетчера в коде:


so_5::launch( [](so_5::environment_t & env) {   env.introduce_coop( [](so_5::coop_t & coop) {      // (1)      auto queue = std::make_shared<dynamic_per_agent_priorities_t>();      // (2)      auto binder = custom_queue_disps::one_thread::make_dispatcher(            coop.environment() ).binder( queue );      // (3)      auto * alice = coop.make_agent_with_binder<demo_agent_t>(            binder, "Alice" );      auto * bob = coop.make_agent_with_binder<demo_agent_t>(            binder, "Bob" );      // (4)      queue->define_priority( alice,            typeid(demo_agent_t::hello),            dynamic_per_agent_priorities_t::low );      queue->define_priority( alice,            typeid(demo_agent_t::bye),            dynamic_per_agent_priorities_t::high );      queue->define_priority( bob,            typeid(demo_agent_t::hello),            dynamic_per_agent_priorities_t::high );      queue->define_priority( bob,            typeid(demo_agent_t::bye),            dynamic_per_agent_priorities_t::low );   } );} );

Здесь в точке (1) мы создаем отдельную очередь заявок, которая реализует доставку сообщений с учетом получателя сообщения и приоритета этого сообщения именно для этого получателя (класс dynamic_per_agent_priorities_t в деталях рассматривается ниже).


В точке (2) мы делаем сразу два действия:


  • во-первых, создаем новый экземпляр диспетчера;
  • во-вторых, получаем от этого диспетчера объект disp_binder, который нам нужен чтобы привязать агентов к этому диспетчеру. Этот disp_binder знает, что агенты, которых он привязывает к диспетчеру, будут совместно использовать очередь заявок, созданную в точке (1).

В точке (3) мы создаем двух агентов, каждый из которых будет привязан к one_thread-диспетчеру, созданному в точке (2).


В точке (4) определяются приоритеты для сообщений, которые обрабатываются агентами Alice и Bob. Можно увидеть, что одним и тем же сообщениям назначаются разные приоритеты. Соответственно, даже если сообщения отсылаются агентам единовременно, обрабатываться сообщения будут в разном порядке. В соответствии со своими приоритетами. А это как раз то, что нам и нужно.


Некоторые пояснения касательно деталей реализации so5_custom_queue_disps


Базовый класс demand_queue_t и его наследники


Итак, нам нужно иметь возможность привязать к диспетчеру агентов, использующих совершенно разные очереди заявок. Поэтому, с одной стороны, диспетчер должен уметь работать с непохожими друг на друга очередями. И, с другой стороны, пользователь должен иметь возможность без проблем подсунуть диспетчеру свою реализацию очереди.


Для этого предназначен интерфейс demand_queue_t.


Публичная часть demand_queue_t


Публичная часть demand_queue_t выглядит так:


      [[nodiscard]]      virtual bool      empty() const noexcept = 0;      [[nodiscard]]      virtual std::optional<so_5::execution_demand_t>      try_extract() noexcept = 0;      virtual void      push( so_5::execution_demand_t demand ) = 0;

Это как раз те методы, которые пользователь должен переопределить в своем классе очереди заявок для того, чтобы диспетчер с этой очередью заявок смог работать.


Надеюсь, что смысл методов empty() и push() очевиден, поэтому на них останавливаться не буду (если что, то отвечу на вопросы в комментариях). А вот по поводу возврата std::optional из try_extract() можно сказать пару слов.


Может показаться, что если диспетчер обращается к очереди заявок только когда очередь не пуста, то достаточно иметь только один метод extract(), который возвращает первую заявку из очереди (конструктуры/операторы копирования/перемещения для execution_demand_t не бросают исключений, поэтому можно обойтись одним extract() вместо пары front()+pop()).


Но в таком случае мы теряем некоторую долю гибкости. Скажем, возможность игнорировать заявки, которые ждали в очереди слишком долго. Тогда как возврат std::optional дает нам такую возможность. Поэтому диспетчер ожидает следующего поведения от очереди заявок:


  • если empty() возвращает false, то диспетчер может безопасно вызывать try_extract();
  • try_extract() может вернуть пустой std::optional даже если очередь была непустой. Это всего лишь означает, что актуальной заявки для обработки на самом деле не оказалось. Поэтому диспетчер просто идет дальше.

thread-safety для demand_queue_t


Диспетчер вызывает методы empty/try_extract/push только под своим собственным mutex-ом. Поэтому, если очередь заявок модифицирует свое состояние только в этих методах, то об обеспечении thread-safety можно не беспокоится, она обеспечивается диспетчером автоматически.


Однако, если внутри empty/try_extract/push требуется доступ к информации, которая каким-то образом может модифицироваться еще какими-то методами, то тогда ответственность за обеспечение thread-safety для этой дополнительной информации ложится на разработчика очереди заявок. Пример этого мы увидим ниже, когда будем говорить о классе dynamic_per_agent_priorities_t.


Специальные приоритеты для заявок evt_start и evt_finish


Если реализация очереди заявок выполняет переупорядочивание заявок согласно приоритетам или если реализация очереди заявок выбрасывает какие-то заявки из очереди, то следует учесть факт существования двух типов заявок: evt_start и evt_finish.


Заявка evt_start ставится в очередь заявок самой первой, прямо во время привязки агента к диспетчеру. Именно благодаря этой заявке у агента вызывается виртуальный метод so_evt_start.


Заявка evt_finish является самой последней заявкой для агента. Происходит это в процессе дерегистрации агента. И evt_finish должна быть самой последней заявкой, которая для агента будет выполнена. Никакие другие заявки после обработки evt_finish для агента обрабатываться не должны.


Так вот, поскольку это архиважные заявки для жизненного цикла агента, то выбрасывать их категорически нельзя. Это должно быть учтено при реализации очереди заявок.


Если заявки упорядочиваются в очереди согласно приоритетам, то у заявки evt_start должен быть наивысший приоритет. А у заявки evt_finish самый низкий.


В SObjectizer 5.5, 5.6 и 5.7 заявки evt_start и evt_finish можно различить не по execution_demand_t::m_msg_type, как для обычных заявок. А по execution_demand_t::m_demand_handler. Ниже будет показано, как именно это происходит. Возможно, в SObjectizer-5.8 будет применен единообразный подход и все будет идентифицироваться посредством execution_demand_t::m_msg_type. Но в ближайших планах ветки 5.8 нет от слова совсем :)


Написание собственного demand_queue_t


В so5_custom_queue_disps_demo есть три реализации собственных очередей заявок, начиная от самой тривиальной до более-менее сложной. В этой статье мы рассмотрим два крайних случая.


Простейший случай: simple_fifo_t


Простейшая реализация использует обычную FIFO очередь без всяких изысков. Поэтому ее реализация тривиальна и компактна:


class simple_fifo_t final : public custom_queue_disps::demand_queue_t   {      std::queue< so_5::execution_demand_t > m_queue;   public:      simple_fifo_t() = default;      [[nodiscard]]      bool      empty() const noexcept override { return m_queue.empty(); }      [[nodiscard]]      std::optional<so_5::execution_demand_t>      try_extract() noexcept override         {            std::optional<so_5::execution_demand_t> result{               std::move(m_queue.front())            };            m_queue.pop();            return result;         }      void      push( so_5::execution_demand_t demand ) override         {            m_queue.push( std::move(demand) );         }   };

Поскольку мы обеспечиваем строгий FIFO и ничего не выбрасываем, то нам здесь даже не нужно задумываться о существовании заявок evt_start/evt_finish.


Наиболее сложный случай: dynamic_per_agent_priorities_t


Наиболее сложная реализация использует динамически задаваемые приоритеты, которые назначаются агенту в зависимости от типа получаемого сообщения.


Для того, чтобы хранить описания заданных пользователем приоритетов потребуются следующие типы данных и члены класса dynamic_per_agent_priorities_t:


using type_to_prio_map_t = std::map< std::type_index, priority_t >;using agent_to_prio_map_t =            std::map< so_5::agent_t *, type_to_prio_map_t >;std::mutex m_prio_map_lock;agent_to_prio_map_t m_agent_prios;

Можно увидеть std::mutex который и будет задействован для обеспечения thread-safety при работе с m_agent_prios.


Здесь нам нужно использовать очередь с приоритетами. И, дабы воспользоваться std::priority_queue из стандартной библиотеки, мы будем хранить в очереди не so_5::execution_demand_t, а собственный класс:


struct actual_demand_t   {      so_5::execution_demand_t m_demand;      priority_t m_priority;      actual_demand_t(         so_5::execution_demand_t demand,         priority_t priority )         :  m_demand{ std::move(demand) }         ,  m_priority{ priority }         {}      [[nodiscard]]      bool      operator<( const actual_demand_t & o ) const noexcept         {            return m_priority < o.m_priority;         }   };

Далее, для реализации метода push(), в котором новая заявка должна встать в очередь согласно своему приоритету, нам потребуется определить тип заявки, кому она адресуется и какой из всего этого получается приоритет:


[[nodiscard]]priority_thandle_new_demand_priority( const so_5::execution_demand_t & d ) noexcept   {      if( so_5::agent_t::get_demand_handler_on_start_ptr()            == d.m_demand_handler )         return highest;      if( so_5::agent_t::get_demand_handler_on_finish_ptr()            == d.m_demand_handler )         {            std::lock_guard< std::mutex > lock{ m_prio_map_lock };            m_agent_prios.erase( d.m_receiver );            return lowest;         }      {         std::lock_guard< std::mutex > lock{ m_prio_map_lock };         auto it_agent = m_agent_prios.find( d.m_receiver );         if( it_agent != m_agent_prios.end() )            {               auto it_msg = it_agent->second.find( d.m_msg_type );               if( it_msg != it_agent->second.end() )                  return it_msg->second;            }      }      return normal;   }

Сперва мы проверяем, является ли заявка заявкой типа evt_start. Если да, то ей присваивается наивысший приоритет.


Далее такая же проверка делается для заявки типа evt_finish. Но если пришла заявка evt_finish, то кроме назначения ей наименьшего приоритета мы делаем дополнительное действие: удаляем информацию о приоритетах для этого агента. Т.к. эта информация больше не потребуется, других заявок уже не будет.


Также в коде метода handle_new_demand_priority() можно обратить внимание на захват mutex-а в тех местах, где нам требуется модифицировать информацию о приоритетах. Это необходимо делать, т.к. эта информация модифицируется/используется не только при работе push(), но и при работе метода define_priority() о котором диспетчер не знает.


Вот, собственно, и все особенности. Остальная часть тривиальна:


voiddefine_priority(   so_5::agent_t * receiver,   std::type_index msg_type,   priority_t priority )   {      std::lock_guard< std::mutex > lock{ m_prio_map_lock };      m_agent_prios[ receiver ][ msg_type ] = priority;   }[[nodiscard]]boolempty() const noexcept override { return m_queue.empty(); }[[nodiscard]]std::optional<so_5::execution_demand_t>try_extract() noexcept override   {      std::optional<so_5::execution_demand_t> result{         m_queue.top().m_demand      };      m_queue.pop();      return result;   }voidpush( so_5::execution_demand_t demand ) override   {      const auto prio = handle_new_demand_priority( demand );      m_queue.emplace( std::move(demand), prio );   }

Приватная часть demand_queue_t


Кроме описанной выше публичной части demand_queue_t есть еще и "приватная" часть, которая предназначена для использования только со стороны диспетчера:


class demand_queue_t   {      demand_queue_t * m_next{ nullptr };   public:      [[nodiscard]]      demand_queue_t *      next() const noexcept { return m_next; }      void      set_next( demand_queue_t * q ) noexcept { m_next = q; }      void      drop_next() noexcept { set_next( nullptr ); }

Эта часть нужна для того, чтобы диспетчер мог провязывать непустые очереди заявок в интрузивный односвязный список.


dispatcher_handle. Что это и зачем?


После того, как с очередями разобрались, можно перейти к самому диспетчеру. И первое, с чем мы сталкиваемся, так это с тем, что в SObjectizer 5.6 и 5.7 нет никакого явного интерфейса для диспетчера, как это было в SObjectizer 5.5 и более ранних версиях. Т.е. реализовать диспетчер можно как угодно и в виде чего угодно (в SO-5.5 же диспетчер должен был наследоваться от специального класса dispatcher_t).


В SO-5.6/5.7 пользователь взаимодействует с диспетчерами посредством двух сущностей: dispatcher_handle и disp_binder. При disp_binder мы поговорим ниже, а пока рассмотрим dispatcher_handle.


За создание экземпляра диспетчера в SO-5.6/5.7 обычно отвечает функция-фабрика make_dispatcher(). Эта функция должна что-то возвратить, но что, если для диспетчера в современном SObjectizer-е нет никакого C++ного интерфейса?


А вот некий дескриптор/хэндл и возвращается. Именно это и называется dispatcher_handle.


Dispatcher_handle может рассматриваться как shared_ptr для какого-то неизвестного пользователю типа. И работать dispatcher_handle должен именно как shared_ptr: пока есть хотя бы один непустой dispatcher_handle, ссылающийся на экземпляр диспетчера, этот экземпляр будет существовать и будет работать.


Обычно у dispacher_handle есть публичный метод binder() который создает экземпляр disp_binder-а для этого диспетчера. Возможно, у dispatcher_handle есть и другие методы, специфические для конкретного типа диспетчера. Но обычно это binder() и несколько методов, делающих dispatcher_handle похожим на shared_ptr.


В рассматриваемой реализации у dispatcher_handler минималистичный интерфейс:


namespace impl{class dispatcher_t;using dispatcher_shptr_t = std::shared_ptr< dispatcher_t >;class dispatcher_handle_maker_t;} /* namespace impl */class [[nodiscard]] dispatcher_handle_t   {      friend class impl::dispatcher_handle_maker_t;      impl::dispatcher_shptr_t m_disp;      dispatcher_handle_t( impl::dispatcher_shptr_t disp );      [[nodiscard]]      bool      empty() const noexcept;   public :      dispatcher_handle_t() noexcept = default;      [[nodiscard]]      so_5::disp_binder_shptr_t      binder( demand_queue_shptr_t demand_queue ) const;      [[nodiscard]]      operator bool() const noexcept { return !empty(); }      [[nodiscard]]      bool      operator!() const noexcept { return empty(); }      void      reset() noexcept;   };

Функция-фабрика make_dispatcher() для нашего one_thread-диспетчера будет возвращать экземпляр dispatcher_handler именно этого типа.


disp_binder. Что это и что нам нужно от disp_binder для one_thread-диспетчера?


Выше мы уже несколько раз упоминали привязку агентов к диспетчерам. Теперь же настало время поговорить об это в подробностях.


И вот тут в дело вступают специальные объекты под названием disp_binder-ы. Они служат как раз для того, чтобы привязать агента к диспетчеру при регистрации кооперации с агентом. А также для того, чтобы отвязать агента от диспетчера при дерегистрации кооперации.



В SObjectizer определен интерфейс, который должны поддерживать все disp_binder-ы. Конкретные же реализации disp_binder-ов зависят от конкретного типа диспетчера. И каждый диспетчер реализует свои собственные disp_binder-ы.


Начиная с версии 5.6 интерфейс этот выглядит следующим образом:


class disp_binder_t   : private std::enable_shared_from_this< disp_binder_t >{   public:      disp_binder_t() = default;      virtual ~disp_binder_t() noexcept = default;      virtual void      preallocate_resources( agent_t & agent ) = 0;      virtual void      undo_preallocation( agent_t & agent ) noexcept = 0;      virtual void      bind( agent_t & agent ) noexcept = 0;      virtual void      unbind( agent_t & agent ) noexcept = 0;};

Три первых метода, preallocate_resources(), undo_preallocation() и bind() используются при привязке агента к диспетчеру.


Их три поскольку регистрация агента это достаточно сложный процесс, который SObjectizer пытается провести в транзакционной манере. Т.е. либо все агенты кооперации успешно регистрируются, либо не регистрируется не один из них. Для обеспечения этой транзакционности процесс регистрации разбит на несколько стадий.


На первой стадии SObjectizer пытается выделить все ресурсы, необходимые для агентов из новой кооперации. Например, какие-то диспетчеры должны создать для новых агентов новые рабочие нити. Как раз на этой стадии у disp_binder-а вызывается preallocate_resources(). В этом методе disp_binder должен создать все, что агенту потребуется для работы внутри SObjectizer (например, новая рабочая нить, очередь заявок и т.д.). Если все это создалось нормально, то disp_binder должен сохранить это у себя до тех пор, пока у него не вызовут метод bind() для этого же агента.


На стадии резервирования ресурсов могут возникнуть проблемы. Например, не запустится новая рабочая нить. Или не хватит памяти для создания еще одной очереди заявок.


При возникновении подобных проблем все, что было сделано до этого момента, нужно откатить. Для чего и предназначен метод undo_preallocation(). Если у disp_binder-а вызывается метод undo_preallocation(), то disp_binder должен освободить все ресурсы для агента, которые ранее были зарезервированы в preallocate_resources().


Если же стадия резервирования ресурсов завершилась успешно, то выполняется стадия собственно привязки агентов к диспетчерам. И вот здесь уже у disp_binder-а вызывается метод bind(). В этом методе disp_binder обязательно должен вызывать у привязываемого агента метод so_bind_to_dispatcher().


Метод bind() не случайно помечен как noexcept, т.к. на этой стадии исключений SObjectizer не ожидает (а если таковое возникнет, то восстановиться уже не получится).


Метод unbind(), очевидно, используется уже когда кооперация дерегистрируется и агент завершил все свои активности на рабочем контексте (включая и обработку evt_finish). Так что в unbind() disp_binder должен освободить все ресурсы, которые были выделены для агента в preallocate_resources().


Возможно звучит все это сложновато. Но в данном случае реализация disp_binder-а оказывается очень простой:


class actual_disp_binder_t final : public so_5::disp_binder_t   {      actual_event_queue_t m_event_queue;   public:      actual_disp_binder_t(         demand_queue_shptr_t demand_queue,         dispatcher_data_shptr_t disp_data ) noexcept         :  m_event_queue{ std::move(demand_queue), std::move(disp_data) }         {}      void      preallocate_resources(         so_5::agent_t & /*agent*/ ) override         {}      void      undo_preallocation(         so_5::agent_t & /*agent*/ ) noexcept override         {}      void      bind(         so_5::agent_t & agent ) noexcept override         {            agent.so_bind_to_dispatcher( m_event_queue );         }      void      unbind(         so_5::agent_t & /*agent*/ ) noexcept override         {}   };

Все, что данный disp_binder должен сделать это вызывать so_bind_to_dispatcher(). Никакого резервирования ресурсов выполнять не нужно. Т.к. единственный ресурс это экземпляр actual_event_queue_t, который автоматически создается вместе с disp_binder-ом.


one_thread-диспетчер


Рассматриваемый нами one_thread-диспетчер состоит из трех частей.


Во-первых, это структура dispatcher_data_t, которая хранит необходимые диспетчеру данные:


struct dispatcher_data_t   {      std::mutex m_lock;      std::condition_variable m_wakeup_cv;      bool m_shutdown{ false };      demand_queue_t * m_head{ nullptr };      demand_queue_t * m_tail{ nullptr };   };using dispatcher_data_shptr_t =      std::shared_ptr< dispatcher_data_t >;

Эти данные выделены в отдельную структуру для того, чтобы к ним можно было напрямую обращаться из event_queue, которая будет отвечать за сохранение заявок агентов в очереди заявок и за добавление ссылки на непустую очередь заявок в список диспетчерах.


Так что вторая часть диспетчера это реализация интерфейса event_queue_t для one_thread-диспетчера:


class actual_event_queue_t final : public so_5::event_queue_t   {      demand_queue_shptr_t m_demand_queue;      dispatcher_data_shptr_t m_disp_data;   public:      actual_event_queue_t(         demand_queue_shptr_t demand_queue,         dispatcher_data_shptr_t disp_data ) noexcept         :  m_demand_queue{ std::move(demand_queue) }         ,  m_disp_data{ std::move(disp_data) }         {}      void      push( so_5::execution_demand_t demand ) override         {            std::lock_guard< std::mutex > lock{ m_disp_data->m_lock };            auto & q = *m_demand_queue;            const bool queue_was_empty = q.empty();            q.push( std::move(demand) );            if( queue_was_empty )               {                  // В этом блоке кода исключений быть не должно.                  [&]() noexcept {                     const bool disp_was_sleeping =                           (nullptr == m_disp_data->m_head);                     if( disp_was_sleeping )                        {                           m_disp_data->m_head = m_disp_data->m_tail = &q;                           m_disp_data->m_wakeup_cv.notify_one();                        }                     else                        {                           m_disp_data->m_tail->set_next( &q );                           m_disp_data->m_tail = &q;                        }                  }();               }         }   };

Именно экземпляр такой event_queue и хранится внутри описанного выше disp_binder-а.


Тут нужно отметить, что в actual_event_queue_t хранится два умных указателя. Один на demand_queue, второй на экземпляр dispatcher_data_t. Тем самым контролируется время жизни этих сущностей. Т.е. и demand_queue, и dispatcher_data_t живут до тех пор, пока есть хотя бы один actual_event_queue_t. А поскольку actual_event_queue_t является частью disp_binder-а, то время жизни demand_queue и dispatcher_data_t определяется временем жизни disp_binder-ов. Когда все disp_binder-у исчезнут, пропадет и надобность в demand_queue и dispatcher_data_t.


Третья часть диспетчера это собственно dispatcher_t, который запускает единственную рабочую нить и использует ее для обработки заявок. Полный код класса dispatcher_t можно увидеть в репозитории. Здесь же мы посмотрим лишь на два фрагмента.


Первый фрагмент это основная функция рабочей нити и обслуживание заявок из непустых demand_queue:


class dispatcher_t final   :  public std::enable_shared_from_this< dispatcher_t >   {      dispatcher_data_t m_disp_data;      std::thread m_worker_thread;      void      thread_body() noexcept         {            const auto thread_id = so_5::query_current_thread_id();            bool shutdown_initiated{ false };            while( !shutdown_initiated )               {                  std::unique_lock< std::mutex > lock{ m_disp_data.m_lock };                  shutdown_initiated = try_extract_and_execute_one_demand(                        thread_id,                        std::move(lock) );               }         }      [[nodiscard]]      bool      try_extract_and_execute_one_demand(         so_5::current_thread_id_t thread_id,         std::unique_lock< std::mutex > unique_lock ) noexcept         {            do               {                  auto [demand, has_non_empty_queues] =                        try_extract_demand_to_execute();                  if( demand )                     {                        unique_lock.unlock();                        demand->call_handler( thread_id );                        break;                     }                  else if( !has_non_empty_queues )                     {                        m_disp_data.m_wakeup_cv.wait( unique_lock );                     }               }            while( !m_disp_data.m_shutdown );            return m_disp_data.m_shutdown;         }      [[nodiscard]]      std::tuple< std::optional< so_5::execution_demand_t >, bool >      try_extract_demand_to_execute() noexcept         {            std::optional< so_5::execution_demand_t > result;            bool has_non_empty_queues{ false };            if( !m_disp_data.m_head )               return { result, has_non_empty_queues };            auto * dq = m_disp_data.m_head;            m_disp_data.m_head = dq->next();            dq->drop_next();            if( !m_disp_data.m_head )               m_disp_data.m_tail = nullptr;            else               has_non_empty_queues = true;            result = dq->try_extract();            if( !dq->empty() )               {                  if( m_disp_data.m_tail )                     m_disp_data.m_tail->set_next( dq );                  else                     m_disp_data.m_head = m_disp_data.m_tail = dq;               }            return { result, has_non_empty_queues };         }

Второй фрагмент это реализация метода make_disp_binder:


[[nodiscard]]so_5::disp_binder_shptr_tmake_disp_binder(   demand_queue_shptr_t demand_queue )   {      return std::make_shared< actual_disp_binder_t >(            std::move(demand_queue),            dispatcher_data_shptr_t{ shared_from_this(), &m_disp_data } );   }

Важный момент, который стоит здесь пояснить это факт того, что экземпляр dispatcher_data_t хранится в dispatcher_t по значению. Но в методе make_disp_binder в конструктор actual_disp_binder_t передается shared_ptr<dispatcher_data_t>. Тут всего лишь используется трюк c aliasing constructor для std::shared_ptr: хранить shared_ptr будет указатель на объект T, но вот счетчик ссылок будет использоваться от объекта Y. В нашем случае счетчик ссылок от dispatcher_t.


Вот, собственно, и все. Если кто-то хочет взглянуть на реализацию dispatcher_handler и make_dispatcher, то сделать это можно здесь.


Заключение


Данная статья преследует две цели:


Во-первых, хочется показать, что хоть в самом SObjectizer-е приоритетной доставки сообщений нет, но если это кому-то нужно, то это можно сделать самостоятельно. Например, показанным выше способом.


Во-вторых, хочется понять, насколько вообще может быть востребованна подобная функциональность. Если у тех, кто пробовал SObjectizer или же присматривался к SObjectizer-у, время от времени надобность в приоритетной доставке сообщений возникает, то ее можно добавить в so5extra. Или даже в сам SObjectizer. Мы вполне можем это сделать. Но только если такая функциональность действительно востребована.


Так что если у кого-то из читателей есть мнение на этот счет (скажем, есть желание иметь подобный диспетчер в SObjectizer-е прямо "искаропки"), то можно высказаться в комментариях. Обязательно прислушаемся.


Кстати, SObjectizer-5 уже 10 лет


Разработка SObjectizer-5 началась осенью 2010-го года и продолжается до сих пор. Радует и удивляет. А если кому-то интересно что лично я думаю по этому поводу, то можно заглянуть сюда.


Но еще больше меня поражает то, что кто-то берет и использует наш SObjectizer для реализации серьезных проектов несмотря на то, что его делают никому неизвестные люди из белорусской глубинки и за спиной SObjectizer-а нет больших компаний с громкими именами и толстыми кошельками. Вот это удивляет по-настоящему. А еще вселяет в нас уверенность в том, что все это не зря. Так что большое спасибо всем, кто рискнул и выбрал SObjectizer. Если бы не вы, этого маленького юбилея не было бы.

Подробнее..

Обзор последних изменений в rotorе (v0.10 v0.14)

20.02.2021 08:20:29 | Автор: admin

actor system


rotor ненавязчивый С++ акторный микрофремворк с возможностью создания иерархий супервайзеров, похожий на своих старших братьев caf и sobjectizer. В последних релизах с момента последнего анонса накопились существенные улучшения, которые хотелось бы осветить.


Общий интерфейс для таймеров (v0.10)


Таймеры сами по себе вездесущи во всех акторных фрейморках, т. к. они делают программы более надёжными. До v0.10 не было API для того, чтобы взвести таймер; это можно было сделать только посредством доступа к низлежащему движку событий (event loop) и использованию соответствующего API, разного для разных движков. Это было не очень удобно и ломало абстракции: кроме доступа к API движка, нужно было в обработчике таймера использовать низкоуровневый API rotor'а, чтобы работала доставка сообщений. Кроме того, отмена таймера также имеет свои особенности в каждом цикле событий, что захламляло ненужными деталями логику работы актора.


Начиная с v0.10 в rotor'е, можно делать что-то вроде:


namespace r = rotor;struct some_actor_t: r::actor_base_t {    void on_start() noexcept {        timer_request = start_timer(timeout, *this, &some_actor_t::on_timer);    }    void on_timer(r::request_id_t, bool cancelled) noexcept {        ...;    }    void some_method() noexcept {        ...        cancel_timer(timer_id);    }    r::request_id_t timer_id;};

Надо отметить, что к моменту завершения работы актора (shutdown_finish), все взведённые таймеры должны сработать или быть отменены, иначе это ведёт к неопределённому поведению (undefined behavior).


Поддержка отмены запросов (v0.10)


По-моему мнению, у всех сообщений в caf семантика "запрос-ответ", в то время как в sobjectizer'е все сообщения имеют обычную "отправил-и-забыл" ("fire-and-forget") семантику. rotor поддерживает оба типа, причём по-умолчанию сообщения являются "отправил-и-забыл", а "запрос-ответ" можно сделать поверх обычных сообщений.


В обоих фреймворках, caf и sobjectizer, у каждого актора есть управляемая очередь сообщений, что обозначает, что фреймворк не доставляет новое сообщение, пока предыдущее не было обработано. В противоположность этим фреймворкам, в rotor'е нет управляемой очереди сообщений, что обозначает, что актор сам должен создать свою собственную очередь и заботиться о перегрузках при необходимости. Для мгновенно обрабатываемых сообщений типа "пинг-понг" это не имеет особого значений, однако для "тяжёлых" запросов, которые в процессе своей обработки делают ввод-вывод (I/O), разница может быть существенна. Например, если актор опрашивает удалённую сторону через HTTP-запросы, нежелательно начинать новый запрос, пока предыдущий ещё не закончился. Ещё раз: сообщение не доставлено, пока предыдущее не обработано, и не важно, что за типа сообщения.


Это также обозначает, что для управляемых очередей сообщений отмена запросов в общем случае невозможна, т.к. сообщение-отмена находится в очереди, и у него нет шансов быть обработанным до тех пор, пока предыдущее сообщение-запрос не будет обработано.


В rotor'е, при необходимости, надо создавать собственную очередь запросов, и, если сообщение-отмена поступает, то актор должен либо поискать в очереди и отменить его, либо, если сообщение уже обрабатывается, отменить соответствующие операции ввода-вывода, и, в конечном итоге, ответить на исходный запрос со статусом "отменено". Нужно заметить, что только сообщения-запросы могут быт отменены, т. к. у них есть внутренний идентификатор.


namespace r = rotor;namespace payload {struct pong_t {};struct ping_t {    using response_t = pong_t;};} // namespace payloadnamespace message {using ping_request_t = r::request_traits_t<payload::ping_t>::request::message_t;using ping_response_t = r::request_traits_t<payload::ping_t>::response::message_t;using ping_cancel_t = r::request_traits_t<payload::ping_t>::cancel::message_t;} // namespace messagestruct some_actor_t: r::actor_base_t {    using ping_ptr_t = r::intrusive_ptr_t<message::ping_request_t>;    void on_ping(ping_request_t& req) noexcept {        // just store request for further processing        ping_req.reset(&req);    }    void on_cancel(ping_cancel_t&) noexcept {        if (req) {            // make_error is v0.14 feature            make_response(*req, make_error(r::make_error_code(r::error_code_t::cancelled)));            req.reset();        }    }    // простейшая "очередь" из одного сообщения.    ping_ptr_t ping_req;};

Вообще говоря, отмена запросов может быть сделана и в sobjectizer'е, однако, там, во-первых, нужно сделать собственный механизм "запрос-ответ", и, во-вторых, свою собственную очередь поверх той, что уже предоставляется sobjectizer'ом, т. е. ценой дополнительных накладных расходов. Впрочем, по-моему мнению, парадигма "запрос-ответ" несколько инородна для sobjectizer'а, поскольку он скорее ориентирован на блокирующие операции в обработчиках сообщений, которые, конечно, не могут быть отменены, вне зависимости от используемого фреймворка.


std::thread backend/supervisor (v0.12)


Это давно ожидаемая возможность, которая делает rotor похожим на sobjectizer: в случае, когда актору нужно осуществить блокирующие операции и ему не нужен никакой цикл событий. Например, обработчик сообщений должен выполнить интенсивные вычисления на центральном процессоре.


Очевидно, что во время блокирующих операций нет возможности сработать таймерам или другим сообщениям быть доставленными. Другими словами, во время блокирующих операций актор теряет свою реактивность, так как он не может реагировать на входящие сообщения. Чтобы преодолеть это, блокирующие операции должны быть разбиты на более мелкие итеративные куски; а когда актор закончит обрабатывать текущий кусок работы, он посылает себе сообщение с номером следующего куска для обработки, и так далее, пока все части не будут обработаны. Это даст текущему потоку выполнения немного воздуха, чтобы доставить другие сообщения, выполнить код, связанный с истекшими таймерами и т.п. Например, вмето того, чтобы вычислять свёртку sha512 для всего файла размером 1TB, задание может быть разделено на вычисление свёрток помегабайтово, что сделает поток вычисления достаточно реактивным. Данный подход универсален и применим для любого актороного фреймворка.


Само собой разумеется, что целая иерархия акторов может быть запущена на std::thread бэкенде, не только один актор. Следующий нюанс, на который следует обратить внимание, это то, что rotor'у надо подсказать, какие обработчики сообщений "тяжёлые" (блокирующие), чтобы обновить таймеры после этих обработчиков. Это делается во время подписки, т.е.:


struct sha_actor_t : public r::actor_base_t {    ...    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        r::actor_base_t::configure(plugin);        plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {            p.subscribe_actor(&sha_actor_t::on_process)->tag_io(); // важно        });    }

Полный исходный код реактивного актора, который вычисляет свёртку sha512, который реагирует на CTRL+c, доступен по ссылке.


Идентификация Акторов (v0.14)


Канонический способ идентификации актора по основной адресу (в rotor'е у актора может быть несколько адресов, аналогично как в sobjectizer'е агент может быть подписан на несколько mbox'ов). Однако, иногда желательно вывести в лог имя актора, который завершил работу, в соответствующем колбеке в его супервайзере:


struct my_supervisor_t : public r::supervisor_t {    void on_child_shutdown(actor_base_t *actor) noexcept override {        std::cout << "actor " << (void*) actor->get_address().get() << " died \n";    }}

Адрес актора динамичен и меняется при каждом запуске программы, так что эта информация почти бесполезна. Чтобы она имела смысл, актор должен напечатать свой основной адрес, где, например, в перекрытом методе on_start(). Однако, это решение не очень удобно, поэтому было решено ввести свойство std::string identity непосредственно в базовый класс actor_base_t. Таким образом, идентичность актора может быть выставлена в конструкторе или во время конфигурации плагина address_maker_plugin_t:


struct some_actor_t : public t::actor_baset_t {    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) {            p.set_identity("my-actor-name", false);        });        ...    }};

Теперь можно выводить идентичность актора в его супервайзере:


struct my_supervisor_t : public r::supervisor_t {    void on_child_shutdown(actor_base_t *actor) noexcept override {        std::cout << actor->get_identity() << " died \n";    }}

Иногда акторы уникальны в одной программе, а иногда сосуществуют несколько экземпляров одного и того же класса акторов. Чтобы различать между их, адрес каждого может быть добавлен к имени актора. В этом и смысл второго параметра bool в методе set_identity(name, append_addr) выше.


По умолчанию идентичность актора равна чему-то вроде actor 0x7fd918016d70 или supervisor 0x7fd218016d70. Эта возможность появилась в rotor'е начиная с версии v0.14.


Extended Error вместо std::error_code, shutdown reason (v0.14)


Когда случается что-то непредвиденное в процессе обработки запроса, до версии v0.14, ответ содержал код ошибки в виде std::error_code. Такой подход хорошо служил своей цели, но, ввиду, иерархичной природы rotor'а, этого оказалось не достаточно. Представим себе случай: супервыйзер запускает двух акторов, и у одного из них произошёл сбой в инициализации. Супервайзер экскалирует проблему, т. е. выключается сам и шлёт запрос на выключение второму актору. Однако, на момент, когда второй актор выключился, исходный контекст уже был потерян, и совершенно неясно, почему он выключился. А причина кроется в том, что std::error_code не содержит в себе достаточно информации.


Поэтому было решено ввести собственный класс расширенной ошибки rotor::extended_error_t, который содержит std::error_code, std::string в качестве контекста (обычно это идентичность актора), а также умный указатель на следующую расширенную ошибку, что вызвала текущую. Теперь схлопывание иерархии акторов может быть представлено цепочкой расширенных ошибок, где причина выключения каждого актора может быть отслежена:


struct my_supervisor_t : public r::supervisor_t {    void on_child_shutdown(actor_base_t *actor) noexcept override {        std::cout << actor->get_identity() << " died, reason :: " << actor->get_shutdown_reason()->message();    }}

выведет что-то вроде:


actor-2 due to supervisor shutdown has been requested by supervisor <- actor-1 due initialization failed

Вместе с идентификацией акторов данная возможность предоставляет больше диагностических инструментов для разработчиков, которые используют rotor.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru