Русский
Русский
English
Статистика
Реклама

Concurrency

Concurrent Mode в React адаптируем веб-приложения под устройства и скорость интернета

06.08.2020 10:11:52 | Автор: admin
В этой статье я расскажу о конкурентном режиме в React. Разберёмся, что это: какие есть особенности, какие новые инструменты появились и как с их помощью оптимизировать работу веб-приложений, чтобы у пользователей всё летало. Конкурентный режим новая фишка в React. Его задача адаптировать приложение к разным устройствам и скорости сети. Пока что Concurrent Mode эксперимент, который может быть изменён разработчиками библиотеки, а значит, новых инструментов нет в стейбле. Я вас предупредил, а теперь поехали.

Сейчас для отрисовки компонентов есть два ограничения: мощность процессора и скорость передачи данных по сети. Когда требуется что-то показать пользователю, текущая версия React пытается отрисовать каждый компонент от начала и до конца. Неважно, что интерфейс может зависнуть на несколько секунд. Такая же история с передачей данных. React будет ждать абсолютно все необходимые компоненту данные, вместо того чтобы рисовать его по частям.



Конкурентный режим решает перечисленные проблемы. С ним React может приостанавливать, приоритизировать и даже отменять операции, которые раньше были блокирующими, поэтому в конкурентном режиме можно начинать отрисовывать компоненты независимо от того, были ли получены все данные или только часть.

Concurrent Mode это Fiber-архитектура


Конкурентный режим это не новая штука, которую разработчики внезапно решили добавить, и всё тут же заработало. К его выходу готовились заранее. В 16-й версии движок React переключили на Fiber-архитектуру, которая по принципу работы напоминает планировщик задач в операционной системе. Планировщик распределяет вычислительные ресурсы между процессами. Он способен переключаться в любой момент времени, поэтому у пользователя возникает иллюзия, что процессы работают параллельно.

Fiber-архитектура делает то же самое, но с компонентами. Несмотря на то, что она уже есть в React, Fiber-архитектура будто находится в анабиозе и не использует свои возможности по максимуму. Конкурентный режим включит её на полную мощность.

При обновлении компонента в обычном режиме приходится целиком рисовать новый кадр на экране. Пока обновление не завершится, пользователь ничего не увидит. В этом случае React работает синхронно. Fiber использует другую концепцию. Каждые 16 мс происходит прерывание и проверка: изменилось ли виртуальное дерево, появились ли новые данные? Если да, пользователь увидит их сразу.

Почему 16 мс? Разработчики React стремятся, чтобы перерисовка экрана происходила на скорости, близкой к 60 кадрам в секунду. Чтобы уложить 60 обновлений в 1000 мс, нужно осуществлять их примерно каждые 16 мс. Отсюда и цифра. Конкурентный режим включается из коробки и добавляет новые инструменты, которые делают жизнь фронтендера лучше. Расскажу о каждом подробно.

Suspense


Suspense появился в React 16.6 в качестве механизма динамической загрузки компонентов. В конкурентном режиме эта логика сохраняется, но появляются дополнительные возможности. Suspense превращается в механизм, который работает в связке с библиотекой загрузки данных. Мы запрашиваем специальный ресурс через библиотеку и читаем из него данные.

Suspense в конкурентном режиме читает данные, которые ещё не готовы. Как? Запрашиваем данные, а пока они не пришли целиком, уже начинаем читать их по небольшим кусочкам. Самая крутая фишка для разработчиков это управление порядком отображения загруженных данных. Suspense позволяет отображать компоненты страницы как одновременно, так и независимо друг от друга. Он делает код понятным: достаточно взглянуть на структуру Suspense, чтобы узнать, в каком порядке запрашиваются данные.

Типичное решение для загрузки страниц в старом React Fetch-On-Render. В этом случае мы запрашиваем данные после render внутри useEffect или componentDidMount. Это стандартная логика в случае, когда нет Redux или другого слоя, работающего с данными. Например, мы хотим нарисовать 2 компонента, каждому из которых нужны данные:

  • Запрос компонента 1
  • Ожидание
  • Получение данных отрисовка компонента 1
  • Запрос компонента 2
  • Ожидание
  • Получение данных отрисовка компонента 2

В таком подходе запрос следующего компонента происходит только после отрисовки первого. Это долго и неудобно.

Рассмотрим другой способ, Fetch-Then-Render: сначала запрашиваем все данные, потом отрисовываем страничку.

  • Запрос компонента 1
  • Запрос компонента 2
  • Ожидание
  • Получение компонента 1
  • Получение компонента 2
  • Отрисовка компонентов

В этом случае мы выносим состояние запроса куда-то наверх делегируем библиотеке для работы с данными. Способ работает здорово, но есть нюанс. Если один из компонентов грузится гораздо дольше, чем другой, пользователь ничего не увидит, хотя мы могли бы ему уже что-то показать. Рассмотрим пример кода из демки с 2 компонентами: User и Posts. Оборачиваем компоненты в Suspense:

const resource = fetchData() // где-то выше в дереве Reactfunction Page({ resource }) {    return (        <Suspense fallback={<h1>Loading user...</h1>}>            <User resource={resource} />            <Suspense fallback={<h1>Loading posts...</h1>}>                <Posts resource={resource} />            </Suspense>        </Suspense>    )}

Может показаться, что такой подход близок к Fetch-On-Render, когда мы запрашивали данные после отрисовки первого компонента. Но на самом деле с использованием Suspense данные придут гораздо быстрее. Всё благодаря тому, что оба запроса отправляются параллельно.

В Suspense можно указать fallback, компонент, который хотим отобразить, и внутрь компонента передать ресурс, реализуемый библиотекой получения данных. Мы используем его as is. Внутри компонентов запрашиваем данные с ресурса и вызываем метод чтения. Это promise, который делает за нас библиотека. Suspense поймёт, загрузились ли данные, и если да покажет их.

Обратите внимание, что компоненты пробуют читать данные, которые ещё находятся в процессе получения:

function User() {    const user = resource.user.read()    return <h1>{user.name}</h1>}function Posts() {    const posts = resource.posts.read()    return // список постов}

В текущих демках Дэна Абрамова в качестве заглушки для ресурса используется такая штука.

read() {    if (status === 'pending') {        throw suspender    } else if (status === 'error') {        throw result    } else if (status === 'success') {        return result    }}


Если ресурс ещё загружается, мы выбрасываем объект Promise в качестве исключения. Suspense ловит это исключение, понимает, что это Promise, и продолжает грузиться. Если же вместо Promise прилетит исключение с любым другим объектом, станет понятно, что запрос завершился ошибкой. Когда же вернётся готовый результат, Suspense его отобразит. Нам важно получить ресурс и вызвать у него метод. Как это реализовано внутри решение разработчиков библиотеки, главное, чтобы их реализацию понимал Suspense.

Когда запрашивать данные? Запрашивать наверху дерева не лучшая идея, потому что они могут никогда не потребоваться. Более подходящий вариант делать это сразу при переходе внутри обработчиков событий. Например, получить начальное состояние через хук, а дальше делать запрос за ресурсами, как только пользователь нажмёт на кнопку.

Вот как это будет выглядеть в коде:

function App() {    const [resource, setResource] = useState(initialResource)    return (        <>            <Button text='Далее' onClick={() => {                setResource(fetchData())            }}>            <Page resource={resource} />        </>    );}

Suspense невероятно гибкая штука. С его помощью можно отображать компоненты друг за другом.

return (    <Suspense fallback={<h1>Loading user...</h1>}>        <User />        <Suspense fallback={<h1>Loading posts...</h1>}>            <Posts />        </Suspense>    </Suspense>)

Или одновременно, тогда оба компонента нужно обернуть в один Suspense.

return (    <Suspense fallback={<h1>Loading user and posts...</h1>}>        <User />        <Posts />    </Suspense>)

Или же грузить компоненты отдельно друг от друга, обернув их в независимые Suspense. Ресурс будет загружаться через библиотеку. Это очень здорово и удобно.

return (    <>        <Suspense fallback={<h1>Loading user...</h1>}>            <User />        </Suspense>        <Suspense fallback={<h1>Loading posts...</h1>}>            <Posts />        </Suspense>    </>)

Кроме того, компоненты Error Boundary поймают ошибки внутри Suspense. Если что-то пошло не так, мы сможем показать, что пользователь загрузился, а посты нет, и выдать ошибку.

return (    <Suspense fallback={<h1>Loading user...</h1>}>        <User resource={resource} />        <ErrorBoundary fallback={<h2>Could not fetch posts</h2>}>            <Suspense fallback={<h1>Loading posts...</h1>}>                <Posts resource={resource} />            </Suspense>        </ErrorBoundary>    </Suspense>)

А теперь давайте взглянем на другие инструменты, благодаря которым преимущества конкурентного режима раскрываются полностью.

SuspenseList


SuspenseList в конкурентном режиме помогает управлять порядком загрузки Suspense. Если бы нам потребовалось загрузить несколько Suspense строго друг за другом без него, их бы пришлось вложить друг в друга:

return (    <Suspense fallback={<h1>Loading user...</h1>}>        <User />        <Suspense fallback={<h1>Loading posts...</h1>}>            <Posts />            <Suspense fallback={<h1>Loading facts...</h1>}>                <Facts />            </Suspense>        </Suspense>    </Suspense>)

SuspenseList позволяет сделать это гораздо проще:

return (    <SuspenseList revealOrder="forwards" tail="collapsed">        <Suspense fallback={<h1>Loading posts...</h1>}>            <Posts />        </Suspense>        <Suspense fallback={<h1>Loading facts...</h1>}>            <Facts />        </Suspense>    </Suspense>)

Гибкость SuspenseList поражает. Можно как угодно вкладывать SuspenseList друг в друга и настраивать порядок загрузки внутри так, как будет удобно для отображения виджетов и любых других компонентов.

useTransition


Специальный хук, который откладывает обновление компонента до полной готовности и убирает промежуточное состояние загрузки. Для чего это нужно? React при изменении состояния стремится сделать переход как можно быстрее. Но иногда важно не торопиться. Если на действие пользователя подгружается часть данных, то обычно в момент загрузки мы показываем лоадер или скелетон. Если данные придут очень быстро, то лоадер не успеет совершить даже пол-оборота. Он моргнёт, затем исчезнет, и мы нарисуем обновлённый компонент. В таких случаях разумнее вообще не показывать лоадер.

Здесь на помощь придёт useTransition. Как он работает в коде? Вызываем хук useTransition и указываем тайм-аут в миллисекундах. Если данные не придут за указанное время, то мы всё равно покажем лоадер. Но если получим их быстрее, произойдёт моментальный переход.

function App() {    const [resource, setResource] = useState(initialResource)    const [startTransition, isPending] = useTransition({ timeoutMs: 2000 })    return <>        <Button text='Далее' disabled={isPending} onClick={() => {            startTransition(() => {                setResource(fetchData())            })        }}>        <Page resource={resource} />    </>}

Иногда при переходе на страницу мы не хотим показывать лоадер, но всё равно нужно что-то изменить в интерфейсе. Например, на время перехода заблокировать кнопку. Тогда придётся кстати свойство isPending оно сообщит, что мы находимся в стадии перехода. Для пользователя обновление будет моментальным, но здесь важно отметить, что магия useTransition действует только на компоненты, обёрнутые в Suspense. Сам по себе useTransition не заработает.

Переходы часто встречаются в интерфейсах. Логику, отвечающую за переход, было бы здорово зашить в кнопку и интегрировать в библиотеку. Если есть компонент, ответственный переходы между страницами, можно обернуть в handleClick тот onClick, который передаётся через пропсы в кнопку, и показать состояние isDisabled.

function Button({ text, onClick }) {    const [startTransition, isPending] = useTransition({ timeoutMs: 2000 })    function handleClick() {        startTransition(() => {            onClick()        })    }    return <button onClick={handleClick} disabled={isPending}>text</button>}

useDeferredValue


Итак, есть компонент, с помощью которого мы делаем переходы. Иногда возникает такая ситуация: пользователь хочет уйти на другую страницу, часть данных мы получили и готовы показать. При этом страницы между собой отличаются незначительно. В этом случае было бы логично показывать пользователю устаревшие данные, пока не подгрузится всё остальное.

Сейчас React так не умеет: в текущей версии на экране пользователя могут отображаться только данные из актуального состояния. Но useDeferredValue в конкурентном режиме умеет возвращать отложенную версию значения, показывать устаревшие данные вместо мигающего лоадера или fallback во время загрузки. Этот хук принимает значение, для которого мы хотим получить отложенную версию, и задержку в миллисекундах.

Интерфейс становится супер текучим. Производить обновления можно с минимальным количеством данных, а всё остальное грузится постепенно. У пользователя создаётся впечатление, что приложение работает быстро и плавно. В действии useDeferredValue выглядит так:

function Page({ resource }) {    const deferredResource = useDeferredValue(resource, { timeoutMs: 1000 })    const isDeferred = resource !== deferredResource;    return (        <Suspense fallback={<h1>Loading user...</h1>}>            <User resource={resource} />            <Suspense fallback={<h1>Loading posts...</h1>}>                <Posts resource={deferredResource} isDeferred={isDeferred}/>            </Suspense>        </Suspense>    )}

Можно сравнить значение из пропсов с тем, что получено через useDeferredValue. Если они отличаются, значит страница ещё находится в состоянии загрузки.

Интересно, что useDeferredValue позволит повторить фокус с отложенной загрузкой не только для данных, которые передаются по сети, но и для того, чтобы убрать фриз интерфейса из-за больших вычислений.

Почему это здорово? Разные устройства работают по-разному. Если запустить приложение, использующее useDeferredValue, на новом iPhone, переход со страницы на страницу будет моментальным, даже если страницы тяжёлые. Но при использовании debounced задержка появится даже на мощном устройстве. UseDeferredValue и конкурентный режим адаптируются к железу: если оно работает медленно, инпут всё равно будет летать, а сама страничка обновляться так, как позволяет устройство.

Как переключить проект в Concurrent Mode?


Конкурентный режим это именно режим, поэтому его потребуется включить. Как тумблер, который заставляет Fiber работать на полную мощность. С чего же начать?

Убираем легаси. Избавляемся от всех устаревших методов в коде и убеждаемся, что их нет в библиотеках. Если приложение без проблем работает в React.StrictMode, то всё в порядке переезд будет простым. Потенциальная сложность проблемы внутри библиотек. В этом случае нужно либо перейти на новую версию, либо сменить библиотеку. Или же отказаться от конкурентного режима. После избавления от легаси останется только переключить root.

С приходом Concurrent Mode будет доступно три режима подключения root:

  • Старый режим
    ReactDOM.render(<App />, rootNode)
    Рендер после выхода конкурентного режима устареет.
  • Блокирующий режим
    ReactDOM.createBlockingRoot(rootNode).render(<App />)
    В качестве промежуточного этапа будет добавлен блокирующий режим, который даёт доступ к части возможностей конкурентного режима на проектах, где есть легаси или другие трудности с переездом.
  • Конкурентный режим
    ReactDOM.createRoot(rootNode).render(<App />)
    Если всё хорошо, нет легаси, и проект можно сразу переключить, замените в проекте рендер на createRoot и вперёд в светлое будущее.

Выводы


Блокирующие операции внутри React превращаются в асинхронные за счёт переключения на Fiber. Появляются новые инструменты, с которыми легко адаптировать приложение и к возможностям устройства, и к скорости сети:

  • Suspense, благодаря которому можно указывать порядок загрузки данных.
  • SuspenseList, с которым это ещё удобнее.
  • useTransition для создания плавных переходов между компонентами, обёрнутыми в Suspense.
  • useDeferredValue чтобы показывать устаревшие данные во время операций ввода-вывода и обновления компонентов

Попробуйте поэкспериментировать с конкурентным режимом, пока он ещё не вышел. Concurrent Mode позволяет добиться впечатляющих результатов: быстрой и плавной загрузки компонентов в любом удобном порядке, сверхтекучести интерфейса. Подробности описаны в документации, там же лежат демки с примерами, которые стоит поизучать самостоятельно. А если вам любопытно, как работает Fiber-архитектура, вот ссылка на интересный доклад.

Оцените свои проекты что можно улучшить, используя новые инструменты? А когда конкурентный режим выйдет, смело переезжайте. Всё будет супер!
Подробнее..

Joker 2020 продолжение сезона онлайн-конференций

29.11.2020 22:13:53 | Автор: admin
Только что, c 25 по 28 ноября 2020 года, прошла Java-конференция Joker 2020. Это уже второй сезон конференций, проводимых JUG Ru Group в формате онлайн.

В онлайн-формате конференция стала лучше или хуже? Что нового организаторами было придумано? Кого из спикеров с какими докладами можно было увидеть и услышать? Что полезного и интересного было в докладах? Имеет ли смысл посещать конференцию в следующем году?



Второй сезон онлайн-конференций: казалось бы, что ещё можно придумать, кроме возможности просмотра докладов при вынужденных ограничениях, наложенных пандемией? Теперь часть потенциальных посетителей (стремящихся преимущественно к получению новых знаний и свежей информации в области профессиональных интересов) конференцию по-прежнему посетит и билет купит, а другая часть (которую в большей степени привлекает личное общение) увы, больше нет. Выхода из создавшейся ситуации нет или он всё же есть? Можно ли конференции стать по-прежнему привлекательной и для второй части участников конференций, которые жаждут социальности?

В замечательной статье из блога компании JUG Ru Group на Хабре Руслан ARG89 Ахметзянов постарался проанализировать ситуацию (попробуйте оценить, Вы в большей степени персонаж Саша или Женя в отношении конференций). Далее там же анонсируются дополнительные механики, добавленные в стриминговую платформу конференций для того, чтобы удовлетворить вкусы как можно большего числа участников. Удалось или нет достигнуть этим поставленных целей, постараемся разобраться далее.

В преддверии конференции также вышло 8 выпусков шоу Вторая чашка кофе с Joker, в которых в эфире ведущие успели взять интервью с Алексеем Фёдоровым, Дмитрием Чуйко, Александром Белокрыловым, Дмитрием Александровым, Олегом Шелаевым, Сергеем Егоровым, Евгением Борисовым и Тагиром Валеевым.

Так что же, собственно, сама-то конференция?

Открытие


В проведение, открытие, закрытие каждой конференции организаторы раз за разом стараются привнести что-то новое. В данном случае открытие началось с импровизаций Алексея Фёдорова и Глеба Смирнова. На правом фото Алексей Фёдоров демонстрирует возможности игрового вида конференции (о нём рассказывается далее в отдельном разделе обзора).



Все виды прошедших активностей, связанных с донесением технической информации посредством слайдов и видео, можно было бы условно разделить на

  • интервью;
  • большие доклады;
  • мини-доклады партнёров;
  • воркшопы.

Интервью


В интервью Эволюция Java и Kotlin. Что нас ждет?, взятом у Романа Елизарова, можно было узнать о пути развития языка программирования Kotlin. Накануне конференции официально было объявлено о передаче управления и координации работ по проекту Kotlin от Андрея Бреслава к Роману Елизарову. По этой причине особенно интересно было узнать мнение Романа и про его изменившийся круг обязанностей, и о возможных изменениях развития языка и платформы.



Зачем нужно знание многопоточной разработки в enterprise мини-интервью Евгения phillennium Трифонова с Юрием Бабаком, представителем компании-партнёра конференции. Любопытными показались разнообразные примеры из собственной практики, про которые Юрий живо и интересно рассказал в ответ на очень уместные вопросы Евгения.

Адская кухня: Как приготовить новую версию Java и не отравить пользователей LTS релизов? мини-интервью с Александром Белокрыловым из компании BellSoft, хорошо известной, вероятно, большинству по дистрибутиву Liberica JDK. Новостью стала информация о вхождении представителей компании в исполнительный комитет JCP.



Доклады


Доклад Кирилла Тимофеева под названием JVM-профайлер, который смог (стать кроссплатформенным) был про добавление поддержки Windows в async-profiler при его использовании из среды разработки IntelliJ IDEA. Андрей Паньгин (поздравляем его с присвоением звания Java Champion за неделю до конференции!) выступил в качестве приглашённого эксперта доклада. Отличный докладчик (автор Windows-порта), хороший доклад с глубоким пониманием темы, идеальный эксперт (автор оригинального продукта), полезная информация о скором появлении предмета обсуждения в составе IntelliJ IDEA.



Предполагаю, что аболютное большинство видевших доклад и читающих данный обзор использует Spring Boot как-никак это промышленный стандарт Java-разработки сегодня. Толстый (fat) JAR при использовании Spring Boot также абсолютно распространённая практика. Рискну предположить, что Владимир Плизга со своим докладом Spring Boot fat JAR: Тонкие части толстого артефакта представил информацию, которая наиболее практически применима и востребована. Неплохо дополнили доклад три Андрея Беляев, Когунь и Зарубин.



Доклад Thread Safety with Phaser, StampedLock and VarHandle от легендарного Heinz Kabutz (ведущий известнейшей рассылки JavaSpecialists) и его коллеги John Green. Просмотр данного доклада может быть полезно тем, что в нём акцентируется внимание о менее известных concurrency-классах Phaser, StampedLock и VarHandle (в отличие от, например, многим знакомых классов CountDownLatch и CyclicBarrier).



Spring Patterns для взрослых в исполнении (сначала буквально даже в сопровождении гитары) Евгения Борисова. Ничто не помешало в итоге Жене (ни разряжающаяся батарея ноутбука, ни стремительно заканчивающееся время) успешно завершить доклад. В отличие от привычных докладчику стиля и тем технической жести, хардкора и расчленёнки (только библиотек и фреймворков. естественно) речь в этот раз шла как раз о наиболее типовом использовании Spring Framework. Ещё один абсолютно практически применимый доклад в блестящем исполнении с рекомендацией к просмотру.



Доклад Заменят ли роботы программистов? от Тагира Валеева расстроил меня вслед за докладчиком я тоже осознал, что роботы (библиотеки, сервисы, плагины) в значительной части уже заменили программистов. Частично успокаивает то, что ими автоматизируется наиболее неинтересная и рутинная часть работы программиста. Полезной и приятной частью в подобных докладах является информация о каких-то сервисах, которые можно будет попробовать после конференции. В случае доклада Тагира это информация о сервисах Mergify (есть приложение для GitHub) для автоматизации принятия pull request и сервис Diffblue (есть плагин для IntelliJ IDEA) для автоматизации создания unit-тестов (выглядит впечатляюще, надо попробовать). Полезный, интересный и даже неожиданно, не побоюсь этого слова, философский доклад.



Мини-доклады партнёров


На мой взгляд, мини-доклады партнёров очень удачная форма докладов, относительно коротких и информативных одновременно. Подводные камни загрузчиков классов в Java и как они могут повлиять на скорость работы с XML от Ильи Ермолина (слева) и Как сказать нет архитектору? Советы по выбору размера микросервиса в исполнении Андрея Даминцева (справа) являются примерами таких докладов.



В мини-докладе Java Licensing Tips от Юрия Милютина была, например, представлены волнующие многих теперь (после изменении компанией Oracle модели лицензирования) данные о сокращении (или даже исключении) расходов на лицензирование используемой в промышленной эксплуатации Java.

После мини-доклада Самое время попробовать машинное обучение на Java у Артёма Селезнева (фото справа) взял интервью Евгений Трифонов. В какой-то степени были развенчаны мифы (или хотя бы изменено мнение) о слабой применимости Java для машинного обучения.



Воркшопы


Ещё одна замечательная форма донесения технической информации и ещё одная известная личность воркшоп (мастер-класс) Хватит писать тесты, пора писать спецификации! от Алексея Нестерова. Алексей в настоящий момент один из соведущих популярнейшего подкаста Радио-Т (т.н. Алексей второй и Алексей добрый, в отличие от Алексея Абашева).

Воркшоп на конференциях JUG Ru Group обычно разбит на две части и суммарно занимает один конференционный день, к чему надо быть готовым. Для демонстрации написания тестов использовался проект в репозитории (если используете Windows, то дополнительно придётся изменить две строчки в файле frontend/package.json). Высококвалифицированный приятный инструктор-докладчик, возможность спокойно покопаться в проекте на своём удобном привычном рабочем месте, рекомендую.



Сайт


Сайт онлайн-платформы для стриминга конференций, как уже говорилось ранее, претерпел существенные изменения. Имеющаяся функциональность была значительно расширена. Кроме добавления удобства существующим возможностям (например, изменение скорости воспроизведения и возможность догнать текущее время доклада) появился совершенно новый игровой вид конференции. Участник конференции может выбрать персонажа и совершать виртуальную прогулку по разным локациям.

Слева представлен общий план возможных локаций, справа локация Холл.



Далее локация Улица слева, Партнёрская выставка справа. При приближении к персонажам других людей внизу появляется видео рядом расположенных субъектов с возможностью диалога. Наверное, следующим шагом могут стать VR-шлемы и трёхмерное пространство выставки с видом от первого лица.



Просмотр информации о конференциях и игра


В обзорах предыдущих конференций уже рассказывалось про приложение, доступное на сайте jugspeakers.info (репозиторий с кодом находится на GitHub). Приложение состоит из двух частей:

  1. Просмотр информации о конференциях JUG Ru Group и JUG-митапах (с поиском данных о конференциях, спикерах, докладах, просмотром видео докладов и презентаций);
  2. Игра Угадай спикера.

Первая часть приложения была дополнена возможностью просмотра статистики по компании, к которым относятся спикеры на момент последнего доклада или в настоящий момент. По количеству сделанных докладов (так сказать, в командном зачёте) уверенно побеждает компания JetBrains. То есть в настоящий момент в данной компании работают спикеры, сделавшие в сумме наибольшее количество докладов в конференциях JUG Ru Group (с учётом и без учёта Java-митапов).



Во второй части приложения (в игре Угадай спикера) было добавлено 2 новых режима: угадывание компании по спикеру и спикера по компании. Можно проверить, так ли хорошо Вы знаете принадлежность спикеров к их компаниям. В режиме угадать спикера по компании правильным ответом может быть множественный выбор спикеры, относящиеся к одной компании.



Java-код программы на 100% процентов покрыт тестами, для сбора информации о покрытии кода используется библиотека JaCoCo, для контроля покрытия тестами и качества кода сервисы Codecov и SonarCloud.

На конференции Heisenbug две недели назад Евгений Мандриков, ведущий разработчик проектов JaCoCo и SonarQube, проводил воркшоп Покрытие кода в JVM. Посмотреть видео воркшопа могут обладатели билета на конференцию Heisenbug или единого билета.

Закрытие


Открывавшие три дня назад Алексей Фёдоров и Глеб Смирнов и закрыли конференцию. Традиционный выход на сцену спикеров, участников программного комитета и организаторов вынужденно пока заменён демонстрацией слайдов с фото.



В качестве итогов можно сказать, что конференция стала ещё интереснее и разнообразнее (по крайней мере, насколько ей это позволяют рамки онлайн-конференции). Любители социальности тоже получили в этом сезоне желаемое в виде возможности пообщаться, пусть виртуально.

Всем до следующих Java-конференций!

Осенне-зимний сезон онлайн-конференций JUG Ru Group продолжится конференциями DotNext, DevOops (2-5 декабря 2020 года) и SmartData (9-12 декабря 2020 года). Можно посетить любую из конференций отдельно или купить единый билет на все восемь конференций сезона (пять уже прошедших и три оставшихся), видео докладов при этом доступны сразу же после завершения конференций.
Подробнее..

Параллельные запросы в Kotlin для автоматизации сборки данных

10.02.2021 18:20:17 | Автор: admin

Всем привет! В своей работе я часто использую Kotlin для автоматизации. Деятельность моя не связана напрямую с программированием, но Котлин здорово упрощает рабочие задачи.

Недавно нужно было собрать данные немаленького размера, дабы сделать анализ, поэтому решил написать небольшой скрипт, для получения данных и сохранения их в Excel. С последним пунктом проблем не возникло - почитал про Apache POI, взял пару примеров из официальной документации, доработав под себя. Чего не скажешь про запросы в Сеть.

Источник отдавал пачками json и надо было как-то быстро эти "пачки" собирать, преобразовывая в текст и записывая в файл таблицу.

Асинхронный метод

Начать решил с простой асинхронщины. Немного поковыряв HttpUrlConnection, отправил туда, где ему и место, заменив на HttpClient из Java.

Для тестов взял сервис https://jsonplaceholder.typicode.com/, который мне подсказал один знакомый разработчик. Сохранил ссылку, которая выдает Json с комментариями в переменную, дабы не дублировать и начал тесты.

const val URL = "https://jsonplaceholder.typicode.com/comments"

Функция была готова и даже работала. Данные приходили.

fun getDataAsync(url: String): String? {    val httpClient = HttpClient.newBuilder()        .build()    val httpRequest = HttpRequest.newBuilder()        .uri(URI.create(link)).build()    return httpClient.sendAsync(httpRequest, BodyHandlers.ofString())        .join().body()}

Теперь надо было проверить скорость работы. Вооружившись measureTimeMillis я запустил код.

val asyncTime = measureTimeMillis {     val res = (1..10)        .toList()        .map {getDataAsync("$URL/$it")}    res.forEach { println(it) }}println("Асинхронный запрос время $asyncTime мс")

Все работало как надо, но хотелось быстрее. Немного покопавшись в Интернете, я набрел на решение, в котором задачи выполняются параллельно.

Parallel Map

Автор в своем блоге пишет, что код ниже выполняется параллельно с использованием корутин. Ну что, я давно хотел их попробовать, а тут представилась возможность.

suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> =    coroutineScope {        map { async { f(it) } }.awaitAll()    }

Если я все верно понял, то здесь расширяется стандартная коллекция (класс Iterable) функцией pmap, в которую передается лямбда. В лямбду поочередно приходит параметр A. Затем после окончания прохода по списку async дожидается выполнения всех элементов списка, и с помощью .awaitAll() выдает результат в виде списка. Причем для каждого элемента функция с модификатором suspend, то есть блокироваться она не будет.

Пришло время тестов, и сказать, что я был разочарован - значит не сказать ничего.

val parmapTime = measureTimeMillis {    runBlocking {        val res = (1..10)            .toList()            .pmap { getDataAsync("$URL/$it") }        println(mapResult)    }}println("Время pmap $parmapTime мс")

Средний результат был в районе - 1523мс, что не сильно то отличалось по скорости от первого решения. Задачи может и работали параллельно благодаря map и async, но уж очень медленно.

Parallel Map v 2.0

После работы, вооружившись малиновым чаем, я сел читать документацию по корутинам и через некоторое время переписал реализацию автора.

suspend fun <T, V> Iterable<T>.parMap(func: suspend (T) -> V): Iterable<V> =    coroutineScope {        map { element ->             async(Dispatchers.IO) { func(element) }         }.awaitAll()     }val parMapTime = measureTimeMillis {    runBlocking {        val res = (1..10)            .toList()            .parMap { getDataAsync("$URL/$it") }    }    println(res)}println("Параллельная map время $parMapTime мс")

После добавления контекста Dispatchers.IO задача выполнялась в 2 раза быстрее ~ 610 мс. Другое дело! Остановившись на этом варианте и дописав все до полноценного рабочего скрипта (проверка ошибок, запись в excel и т.д.) я успокоился. Но мысль в голове о том, что можно еще что-то улучшить не покидала меня.

Java ParallelStream

Через несколько дней, в одном из постов на stackowerflow прочитал о parallelStream. Не откладывая дело в долгий ящик, после работы вновь запустил IDEA.

val javaParallelTime = measureTimeMillis {     val res = (1..10).toList()        .parallelStream()        .map { getDataAsync("$URL/$it") }    res.forEach { println(it) }}println("Java parallelSrtream время $javaParallelTime мс")

Код выполнялся даже чуть быстрее, чем моя реализация. Но радость длилась ровно до того момента, когда пришло время обрабатывать ошибки. Точки останова насколько я понял в stream нет. Иногда, у меня получалось так, что все считалось до конца, вываливалась ошибка и в виде результата "прилетал" то неполный, то пустой Json.

Может, я делал что-то не так, но с async таких проблем не возникло. Там можно контролировать данные на каждом шаге итерации и удобно обрабатывать ошибки.

Выводы

Результаты можно посмотреть в таблице ниже. Для себя я однозначно решил оставить async await. В основном конечно из-за более простой обработки ошибок. Да и за пределы корутин тут выходить не надо.

Метод

Время (ms)

Асинхронный метод

1487

Реализация pmap из Сети

1523

Мой вариант - parallelMap

610

Java.parallelStream

578

В дальнейшем, есть мысли оформить это в небольшую библиотеку и использовать в личных целях, и конечно переписать все это с "индусского кода" на человеческий, на сколько хватит возможностей. А потом залить все это на vds.

Надеюсь мой опыт кому-нибудь пригодится. Буду рад конструктивной критике и советам! Всем спасибо

Подробнее..

Разгоняем REACTOR

12.06.2021 18:20:44 | Автор: admin

Кому будет интересно?

Реактор сегодня - это стильно, модно, молодежно. Почему многие из нас практикуют реактивное программирование? Мало кто может ответить однозначно на этот вопрос. Хорошо - если Вы понимаете свой выигрыш, плохо - если реактор навязан организацией как данность. Большинство аргументов "ЗА" - это использование микросервисной архитектуры, которая в свою очередь обязывает микросервисы часто и много коммуницировать между собой. Для коммуникации в большинстве случаев выбирают HTTP взаимодействие. Для HTTP нужен легковесный веб-сервер, а что первое приходит на ум? Tomcat. Тут появляются проблемы с лимитом на максимальное количество сессий, при превышении которого веб-сервер начинает реджектить запросы (хотя лимита этого не так уж и легко достичь). Здесь на подмогу приходит реактор, который подобными лимитами не ограничен, и, например, Netty в качестве веб-сервера, который работает с реактивностью из коробки. Раз есть реактивный веб-сервер, нужен реактивный веб-клиент (Spring WebClient или Reactive Feign), а раз клиент реактивный, то вся эта жуть просачивается в бизнес логику, Mono и Flux становятся Вашими лучшими друзьями (хотя по началу есть только ненависть :))

Среди бизнес задач, очень часто встречаются серьезные процедуры, которые обрабатывают большие массивы данных, и нам приходится применять реактор и для них. Тут начинаются сюрпризы, если реактор не уметь готовить, можно и проблем схлопотать очень много. Превышение лимита файловых дескрипторов на сервере, OutOfMemory из-за неконтролируемой скорости работы неблокирующего кода и многое многое другое, о чем мы сегодня поговорим. Мы с коллегами испытали очень много трудностей из-за проблем с пониманием как держать реактор под контролем, но всё что нас не убивает - делает нас умнее!

Блокирующий и неблокирующий код

Вы ничего не поймете дальше, если не будете понимать разницу между блокирующим и неблокирующим кодом. Поэтому, остановимся и внимательно разберемся в чем разница. Вы уже знаете, блокирующий код реактору - враг, неблокирующий - бро. Проблема лишь в том, что в настоящий момент времени, не все взаимодействия имеют неблокирующие аналоги.

Лидер здесь - HTTP взаимодействие, вариантов масса, выбирай любой. Я предпочитаю Reactive Feign от Playtika, в комбинации со Spring Boot + WebFlux + Eureka мы получаем очень годную сборку для микросервисной архитектуры.

Давайте по-простому: НЕблокирующий код, это обычно всё, в названии чего есть reactive, а блокирующий - все оставшееся :) Hibernate + PostgreSQL - блокирующий, отправить почту через JavaMail - блокирующий, скинуть сообщение в очередь IBMMQ - блокирующий. Но есть, например, реактивный драйвер для MongoDB - неблокирующий. Отличительной особенностью блокирующего кода, является то, что глубоко внутри произойдет вызов метода, который заставит Ваш поток ждать (Thread.sleep() / Socket.read() и многие подобные), что для реактора - как нож в спину. Что же делать? Большинство бизнес логики завязано на базу данных, без нее никуда. На самом деле достаточно знать и уметь делать 2 вещи:

  • Необходимо понимать где блокирующий код. В этом может помочь проект BlockHound или его аналоги (тут тема для отдельной статьи)

  • Исполнение блокирующего кода необходимо переключать на пулы, готовые его выполнять, например: Schedulers.boundedElastic(). Делается это при помощи операторов publishOn & subscribeOn

Разгоняемся сами

Перед тем, как продолжить, необходимо немного размяться!

Уровень 1

    @Test    fun testLevel1() {        val result = Mono.just("")            .map { "123" }            .block()        assertEquals("123", result)    }

Начнем с простого, такой код обычно пишут начинающие reactor программисты. Как начать цепочку? Mono.just и ты на коне :) Оператор map трансформирует пустую строку в "123" и оператор block делает subscribe.

Обращаю особенное внимание на оператор block, не поддавайтесь соблазну использовать его в Вашем коде, исключение составляют тесты, где это очень удобно. При вызове block внутри метода Вашего RestController, Вы сразу получите исключение в рантайме.

Уровень 2

    fun nonBlockingMethod1sec(data: String)     = data.toMono().delayElement(Duration.ofMillis(1000))    @Test    fun testLevel2() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { nonBlockingMethod1sec(it) }            .block()        assertEquals("Hello world", result)    }

Усложняем наш код, добавляем неблокирующий метод nonBlockingMethod1sec, все что он делает - ожидает одну секунду. Все что делает данный код - дважды, по очереди, запускает неблокирующий метод.

Уровень 3

    fun collectTasks() = (0..99)    @Test    fun testLevel3() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { businessContext ->                collectTasks()                    .toFlux()                    .map {                        businessContext + it                    }                    .collectList()            }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Начинаем добавлять самое интересное - Flux! У нас появляется метод collectTasks, который собирает массив из сотни чисел, и далее мы делаем из него Flux - это будет наш список задач. К каждой задаче мы применяем трансформацию через оператор map. Оператор collectList собирает все результаты в итоговый список для дальнейшего использования.

Здесь наш код начинает превращаться в рабочий паттерн, который можно использовать для массового выполнения задач. Сначала мы собираем некий "бизнес контекст", который мы используем в дальнейшем для выполнения задач.

Уровень 4

    fun collectTasks() = (0..100)        @Test    fun testLevel4() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { businessContext ->                collectTasks().toFlux()                    .flatMap {                        Mono.deferContextual { reactiveContext ->                            val hash = businessContext + it + reactiveContext["requestId"]                            hash.toMono()                        }                    }.collectList()            }            .contextWrite { it.put("requestId", UUID.randomUUID().toString()) }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Добавляем немного плюшек. Появилась запись данных (15) в реактивный контекст, а также чтение (10) из него. Мы почти у цели. Постепенно переходим к итоговому варианту

Уровень 5

    fun collectTasks() = (0..1000)        fun doSomethingNonBlocking(data: String)        = data.toMono().delayElement(Duration.ofMillis(1000))        fun doSomethingBlocking(data: String): String {        Thread.sleep(1000); return data    }    val pool = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "test-pool")    private val logger = getLogger()    @Test    fun testLevel5() {        val counter = AtomicInteger(0)        val result = nonBlockingMethod1sec("Hello world")            .flatMap { _ ->                collectTasks().toFlux()                    .parallel()                    .runOn(pool)                    .flatMap {                        Mono.deferContextual { _ ->                            doSomethingNonBlocking(it.toString())                                .doOnRequest { logger.info("Added task in pool ${counter.incrementAndGet()}") }                                .doOnNext { logger.info("Non blocking code finished ${counter.get()}") }                                .map { doSomethingBlocking(it) }                                .doOnNext { logger.info("Removed task from pool ${counter.decrementAndGet()}") }                        }                    }.sequential()                    .collectList()            }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Вот мы и добрались до итогового варианта! Часть с реактивным контекстом была опущена для более наглядной демонстрации того, зачем мы здесь собрались. У нас появились два новых метода: doSomethingNonBlocking (3) & doSomethingBlocking (6) - один с неблокирующим ожиданием в секунду, второй с блокирующим. Мы создали пул потоков для обработки задач (10), добавили счетчик активных задач в реакторе (15). У нас появился оператор parallel (19) и обратный ему sequential (29). Задачи мы назначили на свежесозданный пул (20). Для понимания, что же происходит внутри, добавили логирование внутри операторов doOnRequest (вызывается перед исполнением метода), doOnNext (вызывается после исполнения метода). Основная задумка - на примере, определить сколько задач одновременно выполняется в реакторе и за какое время цепочка завершит свою работу.

Такой "паттерн", мы с коллегами очень часто применяем для выполнения сложных задач, таких как отправка отчетов или массовая обработка транзакций. Первым делом собирается бизнес контекст - это некая структура, содержащая в себе информацию, полученную в результате вызовов других микросервисов. Бизнес контекст необходим нам для выполнения самих задач, и собирается он заранее, чтобы не тратить время в процессе обработки. Далее мы собираем список задач, превращаем их во Flux и скармливаем реактору на параллельную обработку.

И вот здесь начинается самое интересное. Попробуйте ответить на несколько вопросов. Как Вы считаете, сколько времени будет выполнятся данная цепочка? В ней 100 задач, в каждой задаче неблокирующее ожидание в 1 секунду, блокирующее ожидание в 1 секунду, и у нас в наличии пул из 10 потоков? (Вполне годная задачка на собеседование senior reactor developer :))

Правильный ответ

Около 12 секунд. Рассуждаем от блокирующего :) Блокирующее ожидание никуда не деть, и тут имеем 100 блокирующих секунд на 10 потоков, итого 10 секунд. Неблокирующее ожидание заметно нам лишь в первый раз, далее оно незаметно запускается в передышках между блокирующим. Не забываем про одну секунду сбора "бизнес контекста" перед запуском задач.

А теперь уберем строку (26) .map { doSomethingBlocking(it) } . Освободим наш реактор от блокирующего кода, интересно, сколько теперь времени займет выполнение цепочки?

Правильный ответ

2 секунды! 1 на сбор "бизнес контекста" и 1 на выполнение всех задач. Реактор запустит 100 задач одновременно. Но ведь у нас пул из 10 потоков? Как так? Первый разрыв шаблона.

Мы идем до конца и увеличиваем количество задач в методе collectTasks() до ... 1000? а может быть сразу до 15000? Как долго реактор будет выполнять столько задач?

Правильный ответ

2 секунды! 1 на сбор "бизнес контекста" и 1 на выполнение всех задач. Реактор запустит ВСЕ задачи одновременно. Второй разрыв шаблона. Где предел?

А это вообще легально?

Как же так и как это контролировать? Почему это опасно? Что если внутри параллельной обработки Вы решите вызвать другой микросервис? Если у вас 30000 задач, и по завершению каждой, Вам нужно отправлять запрос соседнему микросервису, Вы с удивлением можете обнаружить, что реактор непременно постарается выполнить все вызовы одновременно (Вы ведь используете реактивный web-client или реактивный feign, верно?) Открытие такого большого количества сокетов повлечет за собой превышение лимита открытых файловых дескрипторов в системе, что как минимум создаст проблемы с невозможностью создания новых сокетов в системе и помешает другим сервисам, а как максимум повалит Вам на сервере SSH и Вы потеряете доступ к серверу. Сомневаюсь, что в этот момент, программист будет кричать "зато смотри как быстро работает".

Разрыв шаблона. Thread Pool & Reactor

Основная проблема начинающего реактор программиста - это образ мышления, если есть медленный процесс - добавь X потоков, будет быстрее в X раз, а если слишком быстро - сократи количество потоков. Как всё просто было раньше? :) С реактором это не работает.

Классический thread pool - двери. Больше дверей - больше пропускная способность, все работает быстрее.

Теперь встречайте reactor! Вы видите двери? Нет никаких дверей

Реактор это большой мешок с подарками, или воздушная труба, задачи в которую валятся и летают там пока не выполнятся. А кто эти люди в желтом? Это наши epoll реактивные потоки, которые ни в коем случае нельзя нагружать блокирующими задачами. Можно провести аналогию с прорабами или инженерами. Они здесь, чтобы управлять процессом, а не чтобы выполнять тяжелую работу. Займите одного инженера тяжелой задачей, и когда к нему придет следующий рабочий с вопросом "что делать дальше?", он не сможет ответить, потому что был занят. Вот так и появляются таймауты в реактивном коде. Казалось бы микросервис стоит без нагрузки, выполняет какие-то задачки, а один из 500 запросов к нему падает с тайм-аутом, и непонятно почему. Велика вероятность что инженер был занят блокирующей задачей! Заботьтесь о своих инженерах и поручайте тяжелую работу специально обученным рабочим, например, Schedulers.boundedElastic().

Как контролировать эту "трубу", в которую валится всё без контроля? Вот мы и подошли к кульминации

Конфигурируем реактор!

В своей дефолтной конфигурации, параллельная обработка в реакторе зависит от количества ядер процессора сервера, на котором запускается код, поэтому, к своему удивлению, Вы получите разные результаты, проверяя работу реактора в тесте на локальной машине с 4-8 ядрами и production сервере с 32 ядрами.

Парад настроек открывает parallel с его аргументом parallelism

Меняя parallelism, мы можем регулировать количество запускаемых rails (это местное понятие реактора, которое похоже на корутины, но по сути является количеством одновременно выполняемых неблокирующих задач). Prefetch мы рассмотрим более подробно в следующем разделе.

Но одного parallelism недостаточно, реактор все еще будет нагребать задач как не в себя.

Мало кто обращал внимание что у оператора flatMap (только того что запускается на Flux) есть перегрузки с интересными аргументами, а именно maxConcurrency

maxConcurrency очень важен, по дефолту значение стоит Integer.MAX_VALUE (определяет сколько неблокирующих задач может выполняться одновременно на одной рельсе. Понимаете теперь откуда аппетит у реактора?

Также, не стоит забывать, что если цепочка будет запущена несколько раз (вызов одного http метода контроллера несколько раз), то все помножится! Никакой пул не спасет.

Количество запусков цепочки напрямую влияет на количество одновременно выполняемых задач.

Подведем небольшой итог:

  • parallel (parallelism)

  • flatMap (maxConcurrency)

  • Количество запусков цепочки

Эти три параметра являются множителями, для расчета количества одновременных задач.

По дефолту это Кол-во ядер * Integer.MAX_VALUE * Количество запусков цепочки

Напротив же, запустив данный код для 5 задач длительностью в секунду мы получим цепочку работающую 5 секунд. Теперь всё под контролем!

        val result = nonBlockingMethod1sec("Hello world")            .flatMap { _ ->                collectTasks().toFlux()                    .parallel(1)                    .runOn(pool, 1)                    .flatMap({                        Mono.deferContextual { _ ->                            doSomethingNonBlocking(it.toString())                        }                    }, false, 1, 1)                    .sequential()                    .collectList()            }            .block()!!

Стоп, или не всё?

Thread Pool

Зачем же нужен пул потоков в реакторе? Думайте о нем как о двигателе для Вашего автомобиля. Чем пул мощнее - тем блокирующие задачи будут разбираться быстрее, а если потоков мало, то и блокирующие задачи задержатся у вас надолго! А куда же мы без блокирующих вызовов? На количество одновременно выполняемых задач в реакторе он не влияет, вот это поворот :)

Надеюсь, Вы не пробовали использовать Schedulers.parallel() для исполнения Вашего блокирующего кода? =) Несмотря на свое подходящее название (ну называется он parallel, значит и нужен для параллельной обработки) использовать этот пул можно только для неблокирующего кода, в доке указано что он живет с одним воркером, и содержит в себе только особенные, реактивные потоки.

Распределение задач по рельсам

Не коснулись мы еще одной важной темы. Обычно, мы пытаемся закончить обработку большого массива данных в кратчайший срок, с чем нам определенно поможет изложенный выше материал, но это еще не все. В тестах мы часто используем синтетические данные, которые генерируем одинаковыми порциями, исключая погрешности production среды. Задачи обычно выполняются разное время и это создает проблемы с равномерным распределением задач.

Зеленые прямоугольники это наши задачи, которые распределяются в реакторе по алгоритму round-robin, что в случае с синтетическими данными дает красивую картинку.

Хорошо загруженный реактор (задачи равномерно распределены). 54 блокирующих задачи (каждая по 1сек), round-robin распределение по 6 рельсамХорошо загруженный реактор (задачи равномерно распределены). 54 блокирующих задачи (каждая по 1сек), round-robin распределение по 6 рельсам

Но запуская код в production среде, мы можем встретиться с долгим запросом в базу, сетевыми задержками, плохим настроением микросервиса да и чего только не бывает.

Плохо загруженный пул (задачи распределены не равномерно)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамПлохо загруженный пул (задачи распределены не равномерно)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсам

Оператор collectList() вернет нам результат только после завершения последней задачи, и как мы видим, наш пул будет простаивать пока 1 поток трудится разгребая очередь накопившихся задач. Это создает неприятные задержки, когда Вы знаете что можно быстрее, но быстрее не происходит.

Бороться с этим можно несколькими способами

  • concatMap вместо flatMap (посмотрите в профилировщик на ваш пул, передумаете)

  • правильно планировать задачи, чтобы исключить аномалии (почти невозможно)

  • дробить каждую задачу на много мелких, и также запускать их в параллельную обработку чтобы нивелировать проблемы с распределением (вполне рабочий вариант)

  • prefetch (наш выбор!)

Параметр prefetch у flatMap & runOn позволяет определить, сколько задач будет взято на одну рельсу на старте, а затем при достижении некоторого порога выполнения задач, реквесты будут повторяться с этим количеством. Значение по умолчанию - 256. Сменив значение на 1, можно заставить реактор использовать механизм "work stealing", при котором, рельсы и потоки, которые освободились, будут забирать задачи себе на выполнение и картина получится гораздо более приятная.

Хорошо загруженный пул (задачи равномерно распределены)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамPrefetch !Хорошо загруженный пул (задачи равномерно распределены)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамPrefetch !

На этом у меня всё. Будет интересно прочесть Ваши замечания и комментарии, на 100% истину не претендую, но все результаты подкреплены практическими примерами, на Spring Boot + Project Reactor 3.4. Всем спасибо!

Подробнее..
Категории: Kotlin , Java , Concurrency , Parallel , Reactor , Parallelism , Mono , Threads , Flux , Pool , Prefetch

Релиз акторного фреймворка rotor v0.09 (c)

10.10.2020 20:06:06 | Автор: admin

actor system


rotor ненавязчивый С++ акторный микрофремворк, похожий на своих старших братьев caf и sobjectizer. В новом релизе внутреннее ядро полностью было переделано с помощью механизмов плагинов, так что это затронуло жизненный цикл акторов.


Связывание акторов


Всякая система акторов базируется на взаимодействии между ними, т.е. в отправлении сообщений друг другу (а также в возможных побочных эффектах в качестве реакции на эти сообщения или в создании новых сообщений, появляющихся в качестве реакции на события внешнего мира). Однако, чтобы сообщение было доставлено целевому актору, он должен оставаться активным (1); другими словами, если актор A собирается отправить сообщение М актору B, он должен быть уверен, что актор B онлайн и не будет выключен в процессе пересылки сообщения M.


До версии v0.09 подобная гарантия была только для отношений родитель/потомок, между супервайзером и дочерним актором, т. к. для последнего выполняется гарантия того, сообщение будет доставлено до его супервайзера в силу того, что супервайзер владеет дочерним актором, и его время жизни покрывает времена жизни всех своих дочерних акторов. Начиная с версии v0.09 появилась возможность связывания двух произвольных акторов A и B, так что после подтверждения связи (link), можно быть уверенным, что все последующие сообщения будут доставлены.


Для связывания акторов можно использовать такой код:


namespace r = rotor;void some_actor_t::on_start() noexcept override {    request<payload::link_request_t>(b_address).send(timeout);}void some_actor_t::on_link_response(r::message::link_response_t &response) noexcept {    auto& ec = message.payload.ec;    if (!ec) {        // успех связывания    }}

Однако, данный код не рекомендуется использовать напрямую потому что он не очень удобен. Это становится очевидным при попытке связать актор A с двумя и более акторами, т.к. some_actor_t должен будет вести внутренний счётчик успешных связываний. В данном случае помогает система плагинов, и код будет уже выглядеть так:


namespace r = rotor;void some_actor_t::configure(r::plugin::plugin_base_t &plugin) noexcept override {    plugin.with_casted<r::plugin::link_client_plugin_t>(        [&](auto &p) {            p.link(B1_address);            p.link(B2_address);        }    );}

Это более удобно в виду того, что плагин link_client_plugin_t поставляется в базовом классе всех акторов actor_base_t. Тем не менее, это скорей всего не всё, что хотелось бы иметь, т.к. остаются не отвеченными важные вопросы: 1) Когда происходит связывание акторов (и обратный вопрос когда происходит их разъединение)? 2) Что случится, если целевой актор ("сервер") не существует или откажет в связывании? 3) Что случится если целевой актор решит выключиться, в то время как есть связанные с ним акторы-клиенты?


Чтобы ответить на этот вопрос нужно рассмотреть жизненный цикл актора.


Асинхронная инициализация и выключение актора


Упрощённо жизненный цикл актора (состояние, state) выглядит следующим образом: new (ctor) -> initializing -> initialized -> operational -> shutting down -> shut down.



Основная работа осуществляется актором, когда он находится в состоянии operational, и здесь собственно пользователь фреймворка должен решить, чем именно будет заниматься актор.


Во время фазы инициализации (I-фазы, т.е. initializing -> initialized), актор подготавливает себя для будущей работы: находит и связывается с другими акторами, устанавливает соединение с БД, получает необходимые ему ресурсы для полноценной работы. Ключевая особенность rotor'а, что I-фаза асинхронна, т. е. актор сообщает супервайзеру, когда он готов (2).


Фаза выключения (S-фаза, т.е. shutting down -> shut down ) комплиментарна I-фазе, т.е. актора просят выключится, а когда он готов, он сообщает об этом своему супервайзеру.


Несмотря на кажущуюся простоту, основная сложность лежит здесь в масштабируемости (composability) подхода, при котором акторы формируют эрланго-подобные иерархии ответственностей (см. мою статью Trees of Supervisors). Перефразируя, можно сказать, что любой актор может дать сбой во время I- или S-фазы, что может повлечь за собой безопасный и ожидаемый коллапс всей иерархии независимо от местоположения актора в ней. Конечная цель в итоге это либо вся иерархия приходит в рабочее состояние (operational), либо она в конце концов становится выключенной (shut down).



(Пояснение к картинке. Сплошная линия обозначение отношение владения, пунктирная отношения связи).


rotor уникален в этом отношении. Ничего подобного нет в caf. Может создаться ошибочное представление, что в sobjectizer'е присутствует shutdown helper, предназначение которого аналогично S-фазе выше; однако, после публичных дискуссий с автором в англоязчыной версии статьи, выяснилось, что данные вспомогательные классы нужны для "длительного" (гарантированного) выключения акторов, даже если в Environment'е был вызван метод stop. С точки зрения sobjectizer'а отложенная инициализация (и выключение), аналогичные I- и S-фазам rotor'a, могут быть смоделированы с помощью встроенного механизма поддержки состояний и это обязанность пользователя фрейворка, если такова потребность его бизнес-логики. В rotor'е же это встроено в сам фреймворк.


При использовании rotor'а было обнаружено, что во время I-фазы (S-фазы) потенциально множество ресурсов должны быть получены (освобождены) асинхронно, что обозначает, что нет единого компонента, способного решить, что текущая фаза завершена. Вместо этого это решение есть плод совместных усилий, приложенных в определённом порядке. И здесь в игру вступают плагины, представляющие из себя атомарные кусочки, каждый из которых ответственен за собственную работу по инициализации или выключению.


Что же такое плагин в контексте rotor'а? Плагин это некий аспект поведения актора, определяющий реакцию актора на некоторые сообщения или группу сообщений. Лучше пояснить на примерах. Плагин init_shutdown, ответственен за инициализацию (выключение) актора, т. е. после опроса о готовности всех плагины, генерируется ответ на запрос о готовности инициализации (выключения); или, например, плагин child_manager, доступный только для супервайзеров, и ответственный за порождение дочерних акторов и всю машинерию связанную с этим, как то генерация запросов дочерним акторам на инициализацию, выключение и т. п. Несмотря на то, что существует возможность свои плагины, на текущий момент я не вижу необходимости в этом, поэтому она остаётся недокументированной.


Таким образом, обещанные ответы, относящиеся к link_client_plugin_t:


  • В: когда происходит связывание (отвязывание) актров? О: когда актор в состоянии initializing (shutting down).


  • В: что случится, если целевой актор не существует или отказывает в связывании? О: т. к. это случается во время инициализации актора, то плагин обнаружит это условие и начнёт выключение актора-клиента; также, возможно, это вызовет каскадный эффект, т.е. его супервайзер тоже выключится и так далее вверх по иерархии владения.


  • В: что случится, если целевой актор решит выключиться, при том, что с ним связаны активные акторы-клиенты? О: актор-сервер попросит клиентов отвязаться, и только когда все связанные клиенты подтвердят это, актор-сервер продолжит процедуру выключения (3).



Упрощённый пример


Будем предполагать, что имеется драйвер базы данных с асинхронным интерфейсом для одного из движков событий (event loop), доступных для rotor'а, а также что имеются TCP-клиенты, подключающиеся к нашему сервису. За обслуживание базы данных будет отвечать актор db_actor_t, а принимать клиентов будет acceptor_t. Начнём с первого:


namespace r = rotor;struct db_actor_t: r::actor_base_t {    struct resource {        static const constexpr r::plugin::resource_id_t db_connection = 0;    }    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        plugin.with_casted<r::plugin::registry_plugin_t>([this](auto &p) {            p.register_name("service::database", this->get_address())        });        plugin.with_casted<r::plugin::resources_plugin_t>([this](auto &) {            resources->acquire(resource::db_connection);            // инициировать асинхронное соединение с базой данных        });    }    void on_db_connection_success() {        resources->release(resource::db_connection);        ...    }    void on_db_disconnected() {        resources->release(resource::db_connection);    }    void shutdown_start() noexcept override {        r::actor_base_t::shutdown_start();        resources->acquire(resource::db_connection);        // асинхронное закрытие соединения с базой данных и сброс данных    }};

Внутреннее пространство имён resource используется для идентификации соединения с БД как ресурсом. Это общепринятая практика, чтобы не использовать в коде магические цифры вроде 0. Во время конфигурации, которая является частью инициализации, когда плагин registry_plugin_t готов, он асинхронно зарегистрирует адрес актора в регистре (о нём будет рассказано позже). Затем с помощью resources_plugin_t захватывается "ресурс" подключения к БД, чтобы блокировать дальнейшую инициализацию актора и начинается соединение с БД. Когда будет подтверждено соединение с БД, ресурс будет освобождён и актор db_actor_t перейдёт в рабочее состояние. S-фаза аналогична: блокируется выключение до тех пор, пока все данные не будут сброшены в БД и пока соединение не будет закрыто; после этого процедура выключения актора завершается (4).


Код актора, который будет принимать клиентов, выглядит приблизительно так:


namespace r = rotor;struct acceptor_actor_t: r::actor_base_t {    r::address_ptr_t db_addr;    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        plugin.with_casted<r::plugin::registry_plugin_t>([](auto &p) {            p.discover_name("service::database", db_addr, true).link();        });    }    void on_start() noexcept override {        r::actor_base_t::on_start();        // начать приём клиентов, например:        // asio::ip::tcp::acceptor.async_accept(...);    }    void on_new_client(client_t& client) {        // send<message::log_client_t>(db_addr, client)    }};

Основное в данном случае, это метод configure. Когда плагин registry_plugin_t готов, он будет сконфигурирован на обнаружение сервиса service::database, а когда адрес db_actor_t будет найден и сохранён в члене класса db_addr, то тогда с ним будет произведено связывание. Если же адрес актора service::database не будет обнаружен, то актор acceptor_actor_t начнёт выключаться (т. е. on_start не будет вызван). Если всё будет успешно проинициализировано, то актор начнёт принимать новых клиентов.


Собственно, сама операционная часть была опущена ради краткости, т. к. она не изменилась в новой версии rotor'а. Как обычно нужно определить полезную нагрузку (payload), сообщения, методы для работы с этими сообщениями и подписаться на них.


Скомпонуем всё вместе в файле main.cpp; будем считать, что используется boost::asio в качестве цикла событий.


namespace asio = boost::asio;namespace r = rotor;...asio::io_context io_context;auto system_context = rotor::asio::system_context_asio_t(io_context);auto strand = std::make_shared<asio::io_context::strand>(io_context);auto timeout = r::pt::milliseconds(100);auto sup = system_context->create_supervisor<r::asio::supervisor_asio_t>()               .timeout(timeout)               .strand(strand)               .create_registry()               .finish();sup->create_actor<db_actor_t>().timeout(timeout).finish();sup->create_actor<acceptor_actor_t>().timeout(timeout).finish();sup->start();io_context.run();

Как видно, в новом rotor'е активно используется шаблон builder. С помощью него создаётся корневой супервайзер sup, а уже он в свою очередь порождает 3 актора: два пользовательских (db_actor_t и acceptor_actor_t) и неявно созданный актор-регистр. Как обычно для акторных систем, все акторы слабо связаны друг с другом, т.к. они разделяют только общий интерфейс сообщений (опущено в статье).


Акторы "просто создаются" в данном месте, без знания о том, как они связаны между собой. Это следствие слабой связанности между акторами, которые с версии v0.09 стали более автономными.


Конфигурация выполнения могла бы быть полностью другой: акторы могли бы создаваться на разных потоках, на разных супервайзерах, и даже на различных движках, но их реализация оставалась бы по сути одной и той же (5). В этих случаях было бы несколько корневых супервайзеров, однако актор-регистр должен быть создан один, и его адрес разделён между всеми супервайзерами, чтобы акторы смогли найти друг друга. Для этого существует метод get_registry_address() в базовом супервайзере.


Итоги


Наиболее существенным изменением в новой версии rotor'а является разбиение на плагины его ядра. Наиболее важными для пользователя фрейморка являются плагины: link_client_plugin_t, позволяющий установить виртуальное соединение между акторами, плагин registry_plugin_t, дающий возможность регистрации и обнаружения адресов акторов по символическим именам, а также плагин resources_plugin_t, с помощью которого можно приостановить процедуру инициализации (выключения) до появления некоторого асинхронного внешнего события.


Так же присутствуют менее важные изменения в новом релизе, такие как система доступа к непубличным
свойствам и шаблон builder для интанцирования акторов.


Любая обратная связь приветствуется.


P.S. Я хотел бы поблагодарить Crazy Panda за поддержку в моих начинаниях по развитию данного проекта, а также автора sobjectizer'а за высказанные им критические замечания.


Примечания


(1) В настоящее время попытка доставки сообщения актору, супервайзер которого уже отработал и был удалён, ведёт к краху программы (UB).


(2) Если актор не подтвердит успешную инициализацию супервайзеру, сработает таймер инициализации, и супервайзер сделает запрос на выключение актора, т.е. состояние operational будет пропущено.


(3) Может возникнуть вопрос, что произойдёт, если актор не подтвердит отвязывание вовремя? Это нарушение контракта, и будет вызван метод system_context_t::on_error(const std::error_code&), который распечатает ошибку на консоль и вызовет std::terminate(). Для избегания нарушений контрактов, нужно настраивать таймеры, чтобы позволить акторам-клиентам во время отвязаться.


(4) Во время процедуры выключения плагин registry_plugin_t проинструктирует регистр, чтобы все зарегистрированные имена текущего актора были удалены из регистра.


(5) Исключение составляет, когда используются различные циклы событий. Если актор использует API цикла событий, то, очевидно, что смена цикла событий повлечёт переписывание внутренностей актора. Тем не менее, это никак не затронет использование API rotor'а.

Подробнее..

Не хочется ждать в очереди? Напишем свой диспетчер для SObjectizer с приоритетной доставкой

07.12.2020 12:09:16 | Автор: admin


SObjectizer это небольшой фреймворк для C++, который дает возможность разработчику использовать такие подходы, как Actor Model, Communicating Sequential Processes и Publish/Subscribe.


Одной из ключевых концепций в SObjectizer являются диспетчеры. Диспетчеры определяют где и как акторы (агенты в терминологии SObjectizer-а) обрабатывают свои события. Диспетчеры в SObjectizer бывают разных типов и пользователь может создавать в своем приложении столько разнообразных диспетчеров, сколько ему потребуется.


При этом у пользователя есть возможность написать свой собственный тип диспетчера, если ничего из стандартных диспетчеров ему не подходит. Два с половиной года назад уже рассказывалось о том, как можно сделать диспетчер для SObjectizer-а со специфическими свойствами.


Сегодня мы еще раз поговорим об этом. На примере уже другой задачи. Да и реализация будет отличаться, поскольку за прошедшее время SObjectizer успел обновиться сперва до версии 5.6, а затем и 5.7. И в этих версиях много отличий от версии 5.5, про которую в основном и рассказывалось в прошлом. В том числе и в механизме диспетчеров.


О решаемой задаче в двух словах


Предположим, что у нас есть агент, который ожидает два сообщения: msg_result с финальным результатом ранее начатой агентом операции и msg_status с промежуточной информацией о том, как протекает начатая операция.


Сообщения msg_status могут идти большим потоком. Например, на одно msg_result может приходиться до 1000 msg_status. И нам бы хотелось, чтобы когда в очереди уже стоит 900 сообщений msg_status новое сообщение msg_result вставало не в конец очереди, а в самое ее начало. Чтобы msg_result не ждало пока разгребутся 900 старых msg_status.



Вроде бы простая задача. Но с ее реализацией могут возникнуть неожиданные сложности. И чтобы понять, что это за сложности и как с ними справится, нужно посмотреть на SObjectizer-овские диспетчеры и понять, что это за звери такие. После чего можно будет обсудить возможные способы реализации доставки сообщений с учетом их приоритетов. Воплотить один из вариантов в жизнь и обсудить его реализацию.


Как диспетчеры связаны с политикой доставки сообщений до агентов?


Диспетчер в SObjectizer-е это сущность, которая определяет где и когда агенты будут обрабатывать свои сообщения.


Допустим, у нас есть агент Bob, который подписывается на сообщение msg_result из почтового ящика mbox. Когда в mbox отсылается msg_result, то это сообщение должно встать в очередь Bob-а, откуда оно будет рано или поздно извлечено и обработано. И вот здесь уже в полный рост встают диспетчеры, поскольку у агентов нет никаких очередей, тогда как у диспетчеров есть.


Когда агент регистрируется в SObjectizer-е, то агент привязывается к какому-то диспетчеру. И в момент привязки агенту дается указатель на сущность event_queue. Это интерфейс, за которым скрыта некая машинерия по передаче сообщения, адресованного агенту, именно тому диспетчеру, к которому агент привязан.


Когда сообщение передается диспетчеру, тот формирует заявку (demand) на обработку сообщения агентом и сохраняет заявку где-то у себя (очередь заявок диспетчера называется demand_queue).


Процесс доставки сообщения до агента в SObjectizer выглядит следующим образом:



Когда сообщение отсылается в mbox, то mbox видит Bob-а в подписчиках и говорит Bob-у: вот тебе новое сообщение. Bob берет это сообщение и передает его в свой event_queue. И уже этот event_queue формирует для диспетчера заявку (demand) на обработку сообщения для агента Bob. Заявка сохраняется в demand_queue диспетчера.


Диспетчер владеет одной или несколькими рабочими нитями, которые занимаются тем, что извлекают заявки из demand_queue и отправляют их на обработку. Эти рабочие нити диспетчера называются рабочим контекстом на котором будут работать агенты.


В общем, получается, что диспетчеры и предоставляют рабочий контекст для агентов, и хранят в своих demand_queue адресованные агентам сообщения.


А это означает, что когда нам нужна приоритетная доставка сообщений (т.е. msg_result вперед msg_status), то нам потребуется диспетчер, который эту самую приоритетную доставку реализует.


А приоритетов для сообщений в SObjectizer-5 и нет :(


Совсем.


Вот так вот.


Приоритеты для сообщений были в SObjectizer-4. Но они на практике использовались всего раз или два. Зато хлопот с их существованием и поддержкой было много.


Поэтому при разработке SObjectizer-5 от приоритетов для сообщений агента было решено отказаться. И за 10 лет развития и использования SObjectizer-5 пожалеть об этом пока что не пришлось.


Однако, время от времени задачи, в которых требуется какое-либо приоритетное обслуживание, все-таки встречаются. Поэтому некоторое время назад в SObjectizer-5 было добавлено понятие приоритета для агента. Именно для агента, а не для отдельного сообщения. Для поддержки приоритетов агентов в SObjectizer-5 появились три диспетчера, которые реализуют разные политики обслуживания приоритетов.


Так как можно решить задачу с msg_result и msg_status?


Использование двух агентов с разными приоритетами и диспетчера one_thread::strictly_ordered


Самое "простое" и "искаробочное" решение.


Логика агента A размазывается на двух агентов A_result и A_status. Агенты A_result и A_status получают разные приоритеты и привязываются к одному и тому же диспетчеру типа one_thread::strictly_ordered. Агент A_result подписывается на msg_result, тогда как агент A_status подписывается на msg_status.


Общие для агентов данные выносятся в какой-то общий объект, которыми они совместно владеют. Через shared_ptr, например. Получится что-то вроде:


struct A_data { ... };class A_result final : public so_5::agent_t {   std::shared_ptr<A_data> m_data;public:   A_result(context_t ctx, std::shared_ptr<A_data> data) {...}   void so_define_agent() override {      so_subscribe(m_data->m_mbox, &A_result::on_result);   }private:   void on_result(const msg_result & msg) {...}};class A_status final : public so_5::agent_t {   std::shared_ptr<A_data> m_data;public:   A_status(context_t ctx, std::shared_ptr<A_data> data) {...}   void so_define_agent() override {      so_subscribe(m_data->m_mbox, &A_status::on_status);   }private:   void on_status(const msg_status & msg) {...}};

Разделять общие данные между агентами A_result и A_status безопасно до тех пор, пока они работают на one_thread::strictly_ordered диспетчере.


Собственно говоря, это тот подход, который я бы и рекомендовал использовать, если вам потребовалась приоритетная обработка событий.


Но у этого способа есть очевидный недостаток: трудоемкость и размазывание логики по нескольким сущностям. Эта трудоемкость будет увеличиваться по мере увеличения количества сообщений, которые вам потребуется обрабатывать с разными приоритетами. Так что в какой-то момент может показаться, что двигаться в этом направлении слишком дорого и что нужно искать другое решение.


Собственный диспетчер с приоритетами для сообщений


Другой подход состоит в том, чтобы сделать собственный диспетчер, который будет доставлять сообщения до агентов-получателей с учетом приоритетов сообщений. Благо, если не заморачиваться вопросами мониторинга работы диспетчера и сбора статистики, то это делается не так уж и сложно.


Если такой диспетчер будет написан, то можно избавить себя от недостатков подхода с несколькими разноприоритетными агентами: вся логика будет находится в одном месте, поэтому добавлять новые обрабатываемые сообщения или избавляться от каких-то старых сообщений будет гораздо проще.


Именно по этому пути мы и пойдем в данной статье.


Что именно мы попытаемся сделать?


Мы попытаемся посмотреть на проблему немного шире (как это и положено русским программистам, которые ищут универсальные решения там, где можно обойтись методом грубой силы).


Начнем с того, что нам нужно собственный диспетчер. Который сможет упорядочивать сообщения в своей очереди согласно их приоритетов.


И тут возникает вопрос, а как должны определяться эти приоритеты?


А это зависит от задачи.


Где-то приоритеты будут определяться типом сообщения и эти приоритеты можно зафиксировать прямо в compile-time. Скажем, приоритет у msg_result единичка, а у msg_status нолик.


Где-то приоритеты могут зависеть от типа сообщения и агента-получателя. Скажем, для агента A сообщение msg_result будет иметь приоритет 1, а сообщение msg_status 0. Тогда как для агента L оба эти сообщения будут иметь приоритет 0.


Где-то приоритеты могут зависеть только от почтового ящика, из которого они были получены. Скажем все сообщения из mbox_A должны быть приоритетнее, чем сообщения из mbox_B.


А где-то приоритеты почтовых ящиков должны быть дополнены еще и приоритетами сообщений. Т.к. msg_result из mbox_A имеет приоритет 2, msg_status из mbox_A 1, msg_result из mbox_B 1, msg_status из mbox_B 0.


Ну и другие сочетания параметров так же возможны.


Значит ли это, что под каждое такое сочетание нужно делать свой диспетчер?


Скорее всего нет.


Возможно, диспетчер должен быть один, просто он должен уметь настраиваться на любой способ вычисления приоритета сообщений. Например, пользователь параметризует диспетчер объектом priority_detector который и определяет приоритет очередного сообщения: при получении очередной заявки диспетчер дергает priority_detector и располагает заявку в очереди согласно вычисленного priority_detector-ом приоритета.


Уже хорошо.


Но можно попробовать пойти еще дальше.


Диспетчеры были придуманы для того, чтобы дать возможность разработчику распределять своих агентов по тем рабочим контекстам, которые нужны разработчику. Скажем, захотел программист, чтобы агенты Alice и Bob работали на одной общей рабочей нити, значит зачем-то ему это нужно. Не суть важно зачем: для экономии ресурсов или для того, чтобы облегчить Alice и Bob совместную работу над общими разделяемыми данным. Разработчик хочет привязать Alice и Bob к общему контексту, значит он должен суметь это сделать.


А теперь представим себе, что Alice и Bob должны работать на общем рабочем контексте, но приоритеты доставки сообщений до Alice и Bob должны вычисляться по-разному. Скажем, приоритет сообщений для Alice зависит только от их типа. Тогда как приоритет для Bob-а зависит и от типа, и от почтового ящика.


Тут можно пойти, как минимум, двумя путями.


Первый путь. Диспетчер владеет одной приоритетной очередью. А мы пишем сложный priority_detector-а, который разбирается кто получатель сообщения, Alice или Bob, затем для каждого получателя определяет приоритет по тем или иным критериям.


Вполне понятное направление для движения. Но его трудоемкость будет расти по мере увеличения количества разнотипных агентов, которые нам нужно будет привязать к одному диспетчеру.


Второй путь. А что, если у каждого из агентов будет своя собственная приоритетная очередь? И в каждой очереди будет собственный priority_detector со своими правилами? Тогда мы сможем привязывать к одном диспетчеру агентов с разными политиками доставки сообщений. И в этом случае нам нужно научить диспетчера работать не с одной единственной очередью, в которую сваливается вообще все, а с набором отдельных очередей.


Мы пойдем вторым путем. Поскольку он открывает интересные возможности. Ведь если мы можем научить диспетчер обслуживать агентов с собственными очередями, то не обязательно это будут очереди с приоритетом. Например, это могут быть очереди фиксированного размера с автоматическим выбрасыванием самых старых (или самых новых) элементов при попытке добавления заявки в уже полную очередь. Или же очереди с контролем времени пребывания: скажем, если заявка простояла в очереди дольше 250ms, то она уже не актуальна и должна быть проигнорирована.


Демо проект so5_custom_queue_disps


Для иллюстрации предлагаемого решения на GitHub-е создан демо-проект so5_custom_queue_disps, который содержит исходные тексты обсуждаемого ниже диспетчера.


На данный момент в нем есть реализация только одного типа диспетчера one_thread, который привязывает всех агентов к одной единственной рабочей нити. Если данная тема кого-нибудь заинтересует, то туда же можно будет добавить и реализации thread_pool и/или adv_thread_pool диспетчеров. Для иллюстрации. Ну или для того, чтобы их можно было скопипастить, если вдруг кому-нибудь понадобятся.


Общая идея: список из непустых очередей


Идея, положенная в основу so5_custom_queue_disps, состоит в том, что есть N независимых друг от друга очередей заявок. В пределе у каждого агента может быть своя очередь заявок. Эти очереди живут до тех пор, пока кто-то их использует, как только очередь стала никому не нужна она уничтожается.


Диспетчер хранит у себя список указателей на очереди заявок. Если этот список пуст, значит все очереди заявок пусты. Как только в какую-то из них попадает заявка, эта очередь добавляется в конец списка диспетчера.


Диспетчер спит пока в его списке ничего нет. Но как только какая-то из очередей добавляется в список, то диспетчер просыпается и начинает обслуживать очереди из своего списка.


Диспетчер изымает из списка первую очередь. Извлекает заявку из очереди. Если после извлечения заявки очередь оказывается пустой, то диспетчер забывает про эту очередь. Если же в очереди что-то остается, то диспетчер возвращает эту очень в свой список, в самый его конец.


После чего диспетчер запускает обработку извлеченной заявки. А после завершения обработки проверяет свой список: если список не пуст, то обрабатывается следующий его элемент. Если пуст, то диспетчер засыпает.



Несколько агентов с одной очередью заявок и FIFO для агентов


Для большей гибкости в so5_custom_queue_disps есть возможность связать нескольких агентов с одной общей очередью заявок. Например, если агенты должны использовать общую политику доставки сообщений (скажем, у них общие приоритеты).


Но можно к one_thread диспетчеру привязать и агентов, у каждого из которых будет своя собственная очередь заявок. Получится, что агенты исполняются на одном рабочем контексте, но заявки для них диспетчер будет брать из разных очередей.


И тут возможны сюрпризы с обеспечением FIFO для агентов.


Предположим, что мы используем простые очереди заявок. Без приоритетов, без ограничения на размер, без выбрасывания чего-либо. Просто очереди.


Предположим, что агенты Alice и Bob разделяют одну общую очередь. И мы отсылаем сообщение M1 агенту Alice, а затем отсылаем сообщение M2 агенту Bob. Доставка сообщений до агентов произойдет в таком же порядке: сперва M1, затем M2.


Но вот если агенты Alice и Bob имеют независимые очереди заявок, то при отсылке вначале M1 агенту Alice, а затем M2 агенту Bob, порядок доставки может быть любым. Это зависит и от наполнения очередей заявок самих агентов, и от того, где эти очереди находились в списке непустых очередей диспетчера. Так что может случится и так, что M2 будет обработано агентом Bob еще до того, как M1 дойдет до Alice.


Эту особенность нужно иметь в виду при работе с таким диспетчером.


Как описанный выше подход выглядит для пользователя на практике?


Прежде чем перейти к рассмотрению деталей реализации custom_queue_disps::one_thread-диспетчера посмотрим на то, как выглядит использование этого диспетчера в коде:


so_5::launch( [](so_5::environment_t & env) {   env.introduce_coop( [](so_5::coop_t & coop) {      // (1)      auto queue = std::make_shared<dynamic_per_agent_priorities_t>();      // (2)      auto binder = custom_queue_disps::one_thread::make_dispatcher(            coop.environment() ).binder( queue );      // (3)      auto * alice = coop.make_agent_with_binder<demo_agent_t>(            binder, "Alice" );      auto * bob = coop.make_agent_with_binder<demo_agent_t>(            binder, "Bob" );      // (4)      queue->define_priority( alice,            typeid(demo_agent_t::hello),            dynamic_per_agent_priorities_t::low );      queue->define_priority( alice,            typeid(demo_agent_t::bye),            dynamic_per_agent_priorities_t::high );      queue->define_priority( bob,            typeid(demo_agent_t::hello),            dynamic_per_agent_priorities_t::high );      queue->define_priority( bob,            typeid(demo_agent_t::bye),            dynamic_per_agent_priorities_t::low );   } );} );

Здесь в точке (1) мы создаем отдельную очередь заявок, которая реализует доставку сообщений с учетом получателя сообщения и приоритета этого сообщения именно для этого получателя (класс dynamic_per_agent_priorities_t в деталях рассматривается ниже).


В точке (2) мы делаем сразу два действия:


  • во-первых, создаем новый экземпляр диспетчера;
  • во-вторых, получаем от этого диспетчера объект disp_binder, который нам нужен чтобы привязать агентов к этому диспетчеру. Этот disp_binder знает, что агенты, которых он привязывает к диспетчеру, будут совместно использовать очередь заявок, созданную в точке (1).

В точке (3) мы создаем двух агентов, каждый из которых будет привязан к one_thread-диспетчеру, созданному в точке (2).


В точке (4) определяются приоритеты для сообщений, которые обрабатываются агентами Alice и Bob. Можно увидеть, что одним и тем же сообщениям назначаются разные приоритеты. Соответственно, даже если сообщения отсылаются агентам единовременно, обрабатываться сообщения будут в разном порядке. В соответствии со своими приоритетами. А это как раз то, что нам и нужно.


Некоторые пояснения касательно деталей реализации so5_custom_queue_disps


Базовый класс demand_queue_t и его наследники


Итак, нам нужно иметь возможность привязать к диспетчеру агентов, использующих совершенно разные очереди заявок. Поэтому, с одной стороны, диспетчер должен уметь работать с непохожими друг на друга очередями. И, с другой стороны, пользователь должен иметь возможность без проблем подсунуть диспетчеру свою реализацию очереди.


Для этого предназначен интерфейс demand_queue_t.


Публичная часть demand_queue_t


Публичная часть demand_queue_t выглядит так:


      [[nodiscard]]      virtual bool      empty() const noexcept = 0;      [[nodiscard]]      virtual std::optional<so_5::execution_demand_t>      try_extract() noexcept = 0;      virtual void      push( so_5::execution_demand_t demand ) = 0;

Это как раз те методы, которые пользователь должен переопределить в своем классе очереди заявок для того, чтобы диспетчер с этой очередью заявок смог работать.


Надеюсь, что смысл методов empty() и push() очевиден, поэтому на них останавливаться не буду (если что, то отвечу на вопросы в комментариях). А вот по поводу возврата std::optional из try_extract() можно сказать пару слов.


Может показаться, что если диспетчер обращается к очереди заявок только когда очередь не пуста, то достаточно иметь только один метод extract(), который возвращает первую заявку из очереди (конструктуры/операторы копирования/перемещения для execution_demand_t не бросают исключений, поэтому можно обойтись одним extract() вместо пары front()+pop()).


Но в таком случае мы теряем некоторую долю гибкости. Скажем, возможность игнорировать заявки, которые ждали в очереди слишком долго. Тогда как возврат std::optional дает нам такую возможность. Поэтому диспетчер ожидает следующего поведения от очереди заявок:


  • если empty() возвращает false, то диспетчер может безопасно вызывать try_extract();
  • try_extract() может вернуть пустой std::optional даже если очередь была непустой. Это всего лишь означает, что актуальной заявки для обработки на самом деле не оказалось. Поэтому диспетчер просто идет дальше.

thread-safety для demand_queue_t


Диспетчер вызывает методы empty/try_extract/push только под своим собственным mutex-ом. Поэтому, если очередь заявок модифицирует свое состояние только в этих методах, то об обеспечении thread-safety можно не беспокоится, она обеспечивается диспетчером автоматически.


Однако, если внутри empty/try_extract/push требуется доступ к информации, которая каким-то образом может модифицироваться еще какими-то методами, то тогда ответственность за обеспечение thread-safety для этой дополнительной информации ложится на разработчика очереди заявок. Пример этого мы увидим ниже, когда будем говорить о классе dynamic_per_agent_priorities_t.


Специальные приоритеты для заявок evt_start и evt_finish


Если реализация очереди заявок выполняет переупорядочивание заявок согласно приоритетам или если реализация очереди заявок выбрасывает какие-то заявки из очереди, то следует учесть факт существования двух типов заявок: evt_start и evt_finish.


Заявка evt_start ставится в очередь заявок самой первой, прямо во время привязки агента к диспетчеру. Именно благодаря этой заявке у агента вызывается виртуальный метод so_evt_start.


Заявка evt_finish является самой последней заявкой для агента. Происходит это в процессе дерегистрации агента. И evt_finish должна быть самой последней заявкой, которая для агента будет выполнена. Никакие другие заявки после обработки evt_finish для агента обрабатываться не должны.


Так вот, поскольку это архиважные заявки для жизненного цикла агента, то выбрасывать их категорически нельзя. Это должно быть учтено при реализации очереди заявок.


Если заявки упорядочиваются в очереди согласно приоритетам, то у заявки evt_start должен быть наивысший приоритет. А у заявки evt_finish самый низкий.


В SObjectizer 5.5, 5.6 и 5.7 заявки evt_start и evt_finish можно различить не по execution_demand_t::m_msg_type, как для обычных заявок. А по execution_demand_t::m_demand_handler. Ниже будет показано, как именно это происходит. Возможно, в SObjectizer-5.8 будет применен единообразный подход и все будет идентифицироваться посредством execution_demand_t::m_msg_type. Но в ближайших планах ветки 5.8 нет от слова совсем :)


Написание собственного demand_queue_t


В so5_custom_queue_disps_demo есть три реализации собственных очередей заявок, начиная от самой тривиальной до более-менее сложной. В этой статье мы рассмотрим два крайних случая.


Простейший случай: simple_fifo_t


Простейшая реализация использует обычную FIFO очередь без всяких изысков. Поэтому ее реализация тривиальна и компактна:


class simple_fifo_t final : public custom_queue_disps::demand_queue_t   {      std::queue< so_5::execution_demand_t > m_queue;   public:      simple_fifo_t() = default;      [[nodiscard]]      bool      empty() const noexcept override { return m_queue.empty(); }      [[nodiscard]]      std::optional<so_5::execution_demand_t>      try_extract() noexcept override         {            std::optional<so_5::execution_demand_t> result{               std::move(m_queue.front())            };            m_queue.pop();            return result;         }      void      push( so_5::execution_demand_t demand ) override         {            m_queue.push( std::move(demand) );         }   };

Поскольку мы обеспечиваем строгий FIFO и ничего не выбрасываем, то нам здесь даже не нужно задумываться о существовании заявок evt_start/evt_finish.


Наиболее сложный случай: dynamic_per_agent_priorities_t


Наиболее сложная реализация использует динамически задаваемые приоритеты, которые назначаются агенту в зависимости от типа получаемого сообщения.


Для того, чтобы хранить описания заданных пользователем приоритетов потребуются следующие типы данных и члены класса dynamic_per_agent_priorities_t:


using type_to_prio_map_t = std::map< std::type_index, priority_t >;using agent_to_prio_map_t =            std::map< so_5::agent_t *, type_to_prio_map_t >;std::mutex m_prio_map_lock;agent_to_prio_map_t m_agent_prios;

Можно увидеть std::mutex который и будет задействован для обеспечения thread-safety при работе с m_agent_prios.


Здесь нам нужно использовать очередь с приоритетами. И, дабы воспользоваться std::priority_queue из стандартной библиотеки, мы будем хранить в очереди не so_5::execution_demand_t, а собственный класс:


struct actual_demand_t   {      so_5::execution_demand_t m_demand;      priority_t m_priority;      actual_demand_t(         so_5::execution_demand_t demand,         priority_t priority )         :  m_demand{ std::move(demand) }         ,  m_priority{ priority }         {}      [[nodiscard]]      bool      operator<( const actual_demand_t & o ) const noexcept         {            return m_priority < o.m_priority;         }   };

Далее, для реализации метода push(), в котором новая заявка должна встать в очередь согласно своему приоритету, нам потребуется определить тип заявки, кому она адресуется и какой из всего этого получается приоритет:


[[nodiscard]]priority_thandle_new_demand_priority( const so_5::execution_demand_t & d ) noexcept   {      if( so_5::agent_t::get_demand_handler_on_start_ptr()            == d.m_demand_handler )         return highest;      if( so_5::agent_t::get_demand_handler_on_finish_ptr()            == d.m_demand_handler )         {            std::lock_guard< std::mutex > lock{ m_prio_map_lock };            m_agent_prios.erase( d.m_receiver );            return lowest;         }      {         std::lock_guard< std::mutex > lock{ m_prio_map_lock };         auto it_agent = m_agent_prios.find( d.m_receiver );         if( it_agent != m_agent_prios.end() )            {               auto it_msg = it_agent->second.find( d.m_msg_type );               if( it_msg != it_agent->second.end() )                  return it_msg->second;            }      }      return normal;   }

Сперва мы проверяем, является ли заявка заявкой типа evt_start. Если да, то ей присваивается наивысший приоритет.


Далее такая же проверка делается для заявки типа evt_finish. Но если пришла заявка evt_finish, то кроме назначения ей наименьшего приоритета мы делаем дополнительное действие: удаляем информацию о приоритетах для этого агента. Т.к. эта информация больше не потребуется, других заявок уже не будет.


Также в коде метода handle_new_demand_priority() можно обратить внимание на захват mutex-а в тех местах, где нам требуется модифицировать информацию о приоритетах. Это необходимо делать, т.к. эта информация модифицируется/используется не только при работе push(), но и при работе метода define_priority() о котором диспетчер не знает.


Вот, собственно, и все особенности. Остальная часть тривиальна:


voiddefine_priority(   so_5::agent_t * receiver,   std::type_index msg_type,   priority_t priority )   {      std::lock_guard< std::mutex > lock{ m_prio_map_lock };      m_agent_prios[ receiver ][ msg_type ] = priority;   }[[nodiscard]]boolempty() const noexcept override { return m_queue.empty(); }[[nodiscard]]std::optional<so_5::execution_demand_t>try_extract() noexcept override   {      std::optional<so_5::execution_demand_t> result{         m_queue.top().m_demand      };      m_queue.pop();      return result;   }voidpush( so_5::execution_demand_t demand ) override   {      const auto prio = handle_new_demand_priority( demand );      m_queue.emplace( std::move(demand), prio );   }

Приватная часть demand_queue_t


Кроме описанной выше публичной части demand_queue_t есть еще и "приватная" часть, которая предназначена для использования только со стороны диспетчера:


class demand_queue_t   {      demand_queue_t * m_next{ nullptr };   public:      [[nodiscard]]      demand_queue_t *      next() const noexcept { return m_next; }      void      set_next( demand_queue_t * q ) noexcept { m_next = q; }      void      drop_next() noexcept { set_next( nullptr ); }

Эта часть нужна для того, чтобы диспетчер мог провязывать непустые очереди заявок в интрузивный односвязный список.


dispatcher_handle. Что это и зачем?


После того, как с очередями разобрались, можно перейти к самому диспетчеру. И первое, с чем мы сталкиваемся, так это с тем, что в SObjectizer 5.6 и 5.7 нет никакого явного интерфейса для диспетчера, как это было в SObjectizer 5.5 и более ранних версиях. Т.е. реализовать диспетчер можно как угодно и в виде чего угодно (в SO-5.5 же диспетчер должен был наследоваться от специального класса dispatcher_t).


В SO-5.6/5.7 пользователь взаимодействует с диспетчерами посредством двух сущностей: dispatcher_handle и disp_binder. При disp_binder мы поговорим ниже, а пока рассмотрим dispatcher_handle.


За создание экземпляра диспетчера в SO-5.6/5.7 обычно отвечает функция-фабрика make_dispatcher(). Эта функция должна что-то возвратить, но что, если для диспетчера в современном SObjectizer-е нет никакого C++ного интерфейса?


А вот некий дескриптор/хэндл и возвращается. Именно это и называется dispatcher_handle.


Dispatcher_handle может рассматриваться как shared_ptr для какого-то неизвестного пользователю типа. И работать dispatcher_handle должен именно как shared_ptr: пока есть хотя бы один непустой dispatcher_handle, ссылающийся на экземпляр диспетчера, этот экземпляр будет существовать и будет работать.


Обычно у dispacher_handle есть публичный метод binder() который создает экземпляр disp_binder-а для этого диспетчера. Возможно, у dispatcher_handle есть и другие методы, специфические для конкретного типа диспетчера. Но обычно это binder() и несколько методов, делающих dispatcher_handle похожим на shared_ptr.


В рассматриваемой реализации у dispatcher_handler минималистичный интерфейс:


namespace impl{class dispatcher_t;using dispatcher_shptr_t = std::shared_ptr< dispatcher_t >;class dispatcher_handle_maker_t;} /* namespace impl */class [[nodiscard]] dispatcher_handle_t   {      friend class impl::dispatcher_handle_maker_t;      impl::dispatcher_shptr_t m_disp;      dispatcher_handle_t( impl::dispatcher_shptr_t disp );      [[nodiscard]]      bool      empty() const noexcept;   public :      dispatcher_handle_t() noexcept = default;      [[nodiscard]]      so_5::disp_binder_shptr_t      binder( demand_queue_shptr_t demand_queue ) const;      [[nodiscard]]      operator bool() const noexcept { return !empty(); }      [[nodiscard]]      bool      operator!() const noexcept { return empty(); }      void      reset() noexcept;   };

Функция-фабрика make_dispatcher() для нашего one_thread-диспетчера будет возвращать экземпляр dispatcher_handler именно этого типа.


disp_binder. Что это и что нам нужно от disp_binder для one_thread-диспетчера?


Выше мы уже несколько раз упоминали привязку агентов к диспетчерам. Теперь же настало время поговорить об это в подробностях.


И вот тут в дело вступают специальные объекты под названием disp_binder-ы. Они служат как раз для того, чтобы привязать агента к диспетчеру при регистрации кооперации с агентом. А также для того, чтобы отвязать агента от диспетчера при дерегистрации кооперации.



В SObjectizer определен интерфейс, который должны поддерживать все disp_binder-ы. Конкретные же реализации disp_binder-ов зависят от конкретного типа диспетчера. И каждый диспетчер реализует свои собственные disp_binder-ы.


Начиная с версии 5.6 интерфейс этот выглядит следующим образом:


class disp_binder_t   : private std::enable_shared_from_this< disp_binder_t >{   public:      disp_binder_t() = default;      virtual ~disp_binder_t() noexcept = default;      virtual void      preallocate_resources( agent_t & agent ) = 0;      virtual void      undo_preallocation( agent_t & agent ) noexcept = 0;      virtual void      bind( agent_t & agent ) noexcept = 0;      virtual void      unbind( agent_t & agent ) noexcept = 0;};

Три первых метода, preallocate_resources(), undo_preallocation() и bind() используются при привязке агента к диспетчеру.


Их три поскольку регистрация агента это достаточно сложный процесс, который SObjectizer пытается провести в транзакционной манере. Т.е. либо все агенты кооперации успешно регистрируются, либо не регистрируется не один из них. Для обеспечения этой транзакционности процесс регистрации разбит на несколько стадий.


На первой стадии SObjectizer пытается выделить все ресурсы, необходимые для агентов из новой кооперации. Например, какие-то диспетчеры должны создать для новых агентов новые рабочие нити. Как раз на этой стадии у disp_binder-а вызывается preallocate_resources(). В этом методе disp_binder должен создать все, что агенту потребуется для работы внутри SObjectizer (например, новая рабочая нить, очередь заявок и т.д.). Если все это создалось нормально, то disp_binder должен сохранить это у себя до тех пор, пока у него не вызовут метод bind() для этого же агента.


На стадии резервирования ресурсов могут возникнуть проблемы. Например, не запустится новая рабочая нить. Или не хватит памяти для создания еще одной очереди заявок.


При возникновении подобных проблем все, что было сделано до этого момента, нужно откатить. Для чего и предназначен метод undo_preallocation(). Если у disp_binder-а вызывается метод undo_preallocation(), то disp_binder должен освободить все ресурсы для агента, которые ранее были зарезервированы в preallocate_resources().


Если же стадия резервирования ресурсов завершилась успешно, то выполняется стадия собственно привязки агентов к диспетчерам. И вот здесь уже у disp_binder-а вызывается метод bind(). В этом методе disp_binder обязательно должен вызывать у привязываемого агента метод so_bind_to_dispatcher().


Метод bind() не случайно помечен как noexcept, т.к. на этой стадии исключений SObjectizer не ожидает (а если таковое возникнет, то восстановиться уже не получится).


Метод unbind(), очевидно, используется уже когда кооперация дерегистрируется и агент завершил все свои активности на рабочем контексте (включая и обработку evt_finish). Так что в unbind() disp_binder должен освободить все ресурсы, которые были выделены для агента в preallocate_resources().


Возможно звучит все это сложновато. Но в данном случае реализация disp_binder-а оказывается очень простой:


class actual_disp_binder_t final : public so_5::disp_binder_t   {      actual_event_queue_t m_event_queue;   public:      actual_disp_binder_t(         demand_queue_shptr_t demand_queue,         dispatcher_data_shptr_t disp_data ) noexcept         :  m_event_queue{ std::move(demand_queue), std::move(disp_data) }         {}      void      preallocate_resources(         so_5::agent_t & /*agent*/ ) override         {}      void      undo_preallocation(         so_5::agent_t & /*agent*/ ) noexcept override         {}      void      bind(         so_5::agent_t & agent ) noexcept override         {            agent.so_bind_to_dispatcher( m_event_queue );         }      void      unbind(         so_5::agent_t & /*agent*/ ) noexcept override         {}   };

Все, что данный disp_binder должен сделать это вызывать so_bind_to_dispatcher(). Никакого резервирования ресурсов выполнять не нужно. Т.к. единственный ресурс это экземпляр actual_event_queue_t, который автоматически создается вместе с disp_binder-ом.


one_thread-диспетчер


Рассматриваемый нами one_thread-диспетчер состоит из трех частей.


Во-первых, это структура dispatcher_data_t, которая хранит необходимые диспетчеру данные:


struct dispatcher_data_t   {      std::mutex m_lock;      std::condition_variable m_wakeup_cv;      bool m_shutdown{ false };      demand_queue_t * m_head{ nullptr };      demand_queue_t * m_tail{ nullptr };   };using dispatcher_data_shptr_t =      std::shared_ptr< dispatcher_data_t >;

Эти данные выделены в отдельную структуру для того, чтобы к ним можно было напрямую обращаться из event_queue, которая будет отвечать за сохранение заявок агентов в очереди заявок и за добавление ссылки на непустую очередь заявок в список диспетчерах.


Так что вторая часть диспетчера это реализация интерфейса event_queue_t для one_thread-диспетчера:


class actual_event_queue_t final : public so_5::event_queue_t   {      demand_queue_shptr_t m_demand_queue;      dispatcher_data_shptr_t m_disp_data;   public:      actual_event_queue_t(         demand_queue_shptr_t demand_queue,         dispatcher_data_shptr_t disp_data ) noexcept         :  m_demand_queue{ std::move(demand_queue) }         ,  m_disp_data{ std::move(disp_data) }         {}      void      push( so_5::execution_demand_t demand ) override         {            std::lock_guard< std::mutex > lock{ m_disp_data->m_lock };            auto & q = *m_demand_queue;            const bool queue_was_empty = q.empty();            q.push( std::move(demand) );            if( queue_was_empty )               {                  // В этом блоке кода исключений быть не должно.                  [&]() noexcept {                     const bool disp_was_sleeping =                           (nullptr == m_disp_data->m_head);                     if( disp_was_sleeping )                        {                           m_disp_data->m_head = m_disp_data->m_tail = &q;                           m_disp_data->m_wakeup_cv.notify_one();                        }                     else                        {                           m_disp_data->m_tail->set_next( &q );                           m_disp_data->m_tail = &q;                        }                  }();               }         }   };

Именно экземпляр такой event_queue и хранится внутри описанного выше disp_binder-а.


Тут нужно отметить, что в actual_event_queue_t хранится два умных указателя. Один на demand_queue, второй на экземпляр dispatcher_data_t. Тем самым контролируется время жизни этих сущностей. Т.е. и demand_queue, и dispatcher_data_t живут до тех пор, пока есть хотя бы один actual_event_queue_t. А поскольку actual_event_queue_t является частью disp_binder-а, то время жизни demand_queue и dispatcher_data_t определяется временем жизни disp_binder-ов. Когда все disp_binder-у исчезнут, пропадет и надобность в demand_queue и dispatcher_data_t.


Третья часть диспетчера это собственно dispatcher_t, который запускает единственную рабочую нить и использует ее для обработки заявок. Полный код класса dispatcher_t можно увидеть в репозитории. Здесь же мы посмотрим лишь на два фрагмента.


Первый фрагмент это основная функция рабочей нити и обслуживание заявок из непустых demand_queue:


class dispatcher_t final   :  public std::enable_shared_from_this< dispatcher_t >   {      dispatcher_data_t m_disp_data;      std::thread m_worker_thread;      void      thread_body() noexcept         {            const auto thread_id = so_5::query_current_thread_id();            bool shutdown_initiated{ false };            while( !shutdown_initiated )               {                  std::unique_lock< std::mutex > lock{ m_disp_data.m_lock };                  shutdown_initiated = try_extract_and_execute_one_demand(                        thread_id,                        std::move(lock) );               }         }      [[nodiscard]]      bool      try_extract_and_execute_one_demand(         so_5::current_thread_id_t thread_id,         std::unique_lock< std::mutex > unique_lock ) noexcept         {            do               {                  auto [demand, has_non_empty_queues] =                        try_extract_demand_to_execute();                  if( demand )                     {                        unique_lock.unlock();                        demand->call_handler( thread_id );                        break;                     }                  else if( !has_non_empty_queues )                     {                        m_disp_data.m_wakeup_cv.wait( unique_lock );                     }               }            while( !m_disp_data.m_shutdown );            return m_disp_data.m_shutdown;         }      [[nodiscard]]      std::tuple< std::optional< so_5::execution_demand_t >, bool >      try_extract_demand_to_execute() noexcept         {            std::optional< so_5::execution_demand_t > result;            bool has_non_empty_queues{ false };            if( !m_disp_data.m_head )               return { result, has_non_empty_queues };            auto * dq = m_disp_data.m_head;            m_disp_data.m_head = dq->next();            dq->drop_next();            if( !m_disp_data.m_head )               m_disp_data.m_tail = nullptr;            else               has_non_empty_queues = true;            result = dq->try_extract();            if( !dq->empty() )               {                  if( m_disp_data.m_tail )                     m_disp_data.m_tail->set_next( dq );                  else                     m_disp_data.m_head = m_disp_data.m_tail = dq;               }            return { result, has_non_empty_queues };         }

Второй фрагмент это реализация метода make_disp_binder:


[[nodiscard]]so_5::disp_binder_shptr_tmake_disp_binder(   demand_queue_shptr_t demand_queue )   {      return std::make_shared< actual_disp_binder_t >(            std::move(demand_queue),            dispatcher_data_shptr_t{ shared_from_this(), &m_disp_data } );   }

Важный момент, который стоит здесь пояснить это факт того, что экземпляр dispatcher_data_t хранится в dispatcher_t по значению. Но в методе make_disp_binder в конструктор actual_disp_binder_t передается shared_ptr<dispatcher_data_t>. Тут всего лишь используется трюк c aliasing constructor для std::shared_ptr: хранить shared_ptr будет указатель на объект T, но вот счетчик ссылок будет использоваться от объекта Y. В нашем случае счетчик ссылок от dispatcher_t.


Вот, собственно, и все. Если кто-то хочет взглянуть на реализацию dispatcher_handler и make_dispatcher, то сделать это можно здесь.


Заключение


Данная статья преследует две цели:


Во-первых, хочется показать, что хоть в самом SObjectizer-е приоритетной доставки сообщений нет, но если это кому-то нужно, то это можно сделать самостоятельно. Например, показанным выше способом.


Во-вторых, хочется понять, насколько вообще может быть востребованна подобная функциональность. Если у тех, кто пробовал SObjectizer или же присматривался к SObjectizer-у, время от времени надобность в приоритетной доставке сообщений возникает, то ее можно добавить в so5extra. Или даже в сам SObjectizer. Мы вполне можем это сделать. Но только если такая функциональность действительно востребована.


Так что если у кого-то из читателей есть мнение на этот счет (скажем, есть желание иметь подобный диспетчер в SObjectizer-е прямо "искаропки"), то можно высказаться в комментариях. Обязательно прислушаемся.


Кстати, SObjectizer-5 уже 10 лет


Разработка SObjectizer-5 началась осенью 2010-го года и продолжается до сих пор. Радует и удивляет. А если кому-то интересно что лично я думаю по этому поводу, то можно заглянуть сюда.


Но еще больше меня поражает то, что кто-то берет и использует наш SObjectizer для реализации серьезных проектов несмотря на то, что его делают никому неизвестные люди из белорусской глубинки и за спиной SObjectizer-а нет больших компаний с громкими именами и толстыми кошельками. Вот это удивляет по-настоящему. А еще вселяет в нас уверенность в том, что все это не зря. Так что большое спасибо всем, кто рискнул и выбрал SObjectizer. Если бы не вы, этого маленького юбилея не было бы.

Подробнее..

Sobjectizer Можно ли написать один обработчик сразу для нескольких типов сообщений? И если нет, то как быть?

16.02.2021 14:15:09 | Автор: admin

Статья написана по следам недавнего вопроса, который можно сформулировать следующим образом: "Можно ли в SObjectizer написать обработчик, который бы обрабатывал сразу нескольких типов сообщений?"

Вопрос интересный.

Автор вопроса любезно описал свой сценарий: ему нужно собирать изображения с множества промышленных камер, а затем эти изображения должны проходить по цепочке блоков обработки изображений. Используются разные типы камер, соответственно, каждое изображение имеет собственный формат и может иметь кучу сопутствующей специфической информации. Какие-то блоки обработки рассчитаны на работу только с изображениями определенного формата. Какие-то могут работать сразу с несколькими форматами. Какие-то блоки вообще безразличны к типу изображения (например, блок считает общее количество прошедших изображений).

Если изображения передаются в виде SObjectizer-овских сообщений, а блоками обработки являются SObjectizer-овские агенты, то можно ли сделать как-то так:

void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image_vendor_A & cmd) {}) // Изображение типа A.    .event([this](const image_vendor_B & cmd) {}) // Изображение типа B.    .event([this](any_other_image_type) {      // Отказываемся обрабатывать другие типы.      throw unsupported_image_type{};    });}void image_counter::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](any_image_type) { // Тип изображения не важен.      ++captured_images_;    });}

Итак, сценарий понятен. Давайте поговорим насколько он реализуем в SObjectizer.

Так можно ли в SObjectizer повесить один разработчик сразу на несколько типов сообщений?

Нет. Написать что-то вроде:

void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image_vendor_A & cmd) {...}) // Изображение типа A.    .event([this](const image_vendor_B & cmd) {...}) // Изображение типа B.    .event([this](any_other_image_type) {      // Отказываемся обрабатывать другие типы.      throw unsupported_image_type{};    });}

в текущем SObjectizer-5 нельзя. В принципе.

Во-первых, в SObjectizer-5 ключем для поиска обработчика сообщения является триплет из состояния агента, идентификатора почтового ящика (mbox-а) и типа сообщения.

Грубо говоря, когда есть вот такой агент:

class demo final : public so_5::agent_t {  so_5::state_t st_free{this};  so_5::state_t st_busy{this};    const so_5::mbox_t command_board_;  ...  void so_define_agent() override {    st_free // Подписки для состояния st_free.      .event([this](const some_msg &) {...})      .event(command_board_, [this](const report_status &) {...});        st_busy // Подписки для состояния st_busy.      .event(command_board_, [this](const report_status &) {...});  }  ...};

то информация о сделанных вso_define_agentподписках может быть представлена приблизительно такой картинкой (на самом деле там все несколько хитрее, но общая схема именно такая):

Таблица подписок агентов с ссылками на обработчики сообщенийТаблица подписок агентов с ссылками на обработчики сообщений

Соответственно, когда для агента приходит сообщение, то формируется триплет из текущего состояния агента, идентификатора mbox-а из которого сообщение пришло и типа самого сообщения. После чего в таблице подписок агента ищется вхождение этого триплета.

Если в таблице подписок триплет найден, то сообщение обрабатывается. Если нет, то отбрасывается (не совсем так, но для простоты будем считать что просто отбрасывается).

Поиск в большой таблице подписок по составному ключу хорош тем, что он позволяет эффективно обрабатывать большое количество подписок. Скажем, нужно агенту подписаться на одно сообщение из 100500 mbox-ов? Нет проблем. Будет большая таблица подписок с эффективным поиском обработчиков в ней.

Необработанные сообщения в SObjectizer-е просто выбрасываются. Нет специальных обработчиков для подобных сообщений, нет никаких mbox-ов, в которые бы подобные сообщения пересылались бы... Ничего подобного нет.

Корни этого решения уходят на десятилетия назад в буквальном смысле. Подобная логика была использована еще в предтече SObjectizer-а, проекте SCADA Objectizer, который создавался под нужды АСУТП в середине 1990-х. И в котором именно такая логика и нужна была: если агент не заинтересован в каком-то сообщении в своем текущем состоянии, то это сообщение безжалостно выбрасывается.

Эта логка отлично работала на протяжении 25 лет. И ситуаций, когда хотелось бы как-то обрабатывать проигнорированные сообщения за эти годы встречалось не очень много. А для случаев, когда в этом был смысл, в SObjectizer-5 были добавлены т.н. deadletter handler-ы.

Так что можно спорить о том, оправдан ли такой жесткий подход к проигнорированным сообщениям или нет. Но сейчас дела обстоят именно так. Так что если на конкретный тип сообщение нет подписки, то это сообщение просто выбрасывается.

Если нельзя, но очень нужно, то как?

Итак, SObjectizer не позволяет сделать так, чтобы какой-то агент из множества сообщений типа image_vendor_A, image_vendor_B,image_vendor_C,image_vendor_D и т.д. мог бы подписаться лишь на image_vendor_Aи image_vendor_B, а все остальные сообщения image_vendor_* обрабатывать каким-то одним обработчиком.

Но если нам нужна именно такая логика, то как же нам быть?

Пожалуй, единственный выход -- это иметь один тип сообщения для всех изображений.

Самый простой вариант, который лично мне сразу приходит в голову -- это использовать ООП-иерархии, т.е. ввести базовый тип для изображения:

class image_base {public:virtual ~image_base() = default;  ... // Какой-то набор общих для всех изображений методов.};

от которого уже будут наследоваться конкретные типы изображений:

class image_vendor_A : public image_base {...};class image_vendor_B : public image_base {...};class image_vendor_C : public image_base {...};

Затем вводится тип сообщения image в котором конкретный экземпляр сообщения передается по указателю (для простоты приведем в примере shared_ptr):

struct image {std::shared_ptr<image_base> image_;};

Соответственно, агенты будут подписываться на сообщение image, а уже с конкретным типом изображения они будут разбираться внутри обработчика тем или иным способом:

void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image & cmd) {      if(auto * p = dynamic_cast<image_vendor_A*>(cmd.image_.get())) {        ... // Изображение типа A.      }      else if(auto * p = dynamic_cast<image_vendor_B*>(cmd.image_.get())) {        ... // Изображение типа B.      }      else {        // Отказываемся обрабатывать другие типы.      throw unsupported_image_type{};      }    });}...void image_counter::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image &) { // Тип изображения не важен.      ++captured_images_;    });}

Конечно, вариант с ручными dynamic_cast-ами выглядит криво и в реальном коде, как по мне, лучше было бы использовать паттерн visitor. Но для иллюстрации мысли вполне сгодится.

Парочка вариаций на эту тему

Если кому-то не нравится иметь сообщение image, внутри которого будет лежать shared_ptr на экземпляр изображения, то можно воспользоваться одной из нижеописанных вариаций.

Использование std::variant

Когда список типов изображений конечен и зафиксирован на этапе компиляции, то в качестве типа сообщения можно задействовать std::variant:

class image_vendor_A {...};class image_vendor_B {...};class image_vendor_C {...};...class image_vendor_Last {...};using image = std::variant<  image_vendor_A,image_vendor_B,image_vendor_C,...  image_vendor_Last>;void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image & cmd) {      ... // Какой-то способ работы с std::variant.    });}...void image_counter::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image &) { // Тип изображения не важен.      ++captured_images_;    });}

Отсылка сообщений по базовому типу

Если же список типов изображений на этапе компиляции не зафиксирован (например, может расширяться со временем по мере реализации новых модулей), то можно использовать такой трюк, как отсылку сообщения по его базовому типу:

class image : public so_5::message_t { // Наследование от message_t важно.public:... // Какие-то общие для всех изображений свойства.};class image_vendor_A : public image {...};class image_vendor_B : public image {...};...// Сперва создаем экземпляр конкретного типа...so_5::message_holder_t<image_vendor_A> msg{  std::make_unique<image_vendor_A>(...)  };// ...а затем отсылаем его так, как будто его тип -- image.so_5::send<image>(std::move(msg));

Фокус здесь в том, что при отсылке в качестве типа сообщения фиксируется именно тот тип, который был задан в виде первого шаблонного параметра для so_5::send. Так, если мы пишем so_5::send<image>, то типом сообщения внутри SObjectizer-а будет считаться именно image, а не image_vendor_A.

Вот здесь можно найти полный код примера, который иллюстрирует этот трюк.

А есть ли надежда на появление такой фичи?

С одной стороны, ответ на этот вопрос прост и уже традиционен в последние несколько лет: в SObjectizer сейчас попадает лишь то, в чем возникла необходимость. Наполнение SObjectizer-а фичами просто "шоб було" уже давно закончилось.

Так что если это кому-то нужно, то следует нам об этом сказать. Крайне желательно с описанием сценария, в котором вам такая функциональность была бы полезна.

Но, с другой стороны, добавить подобную функциональность в SObjectizer не так-то просто. По крайней мере первые прикидки пока не подсказали каких-то хороших способов достижения цели.

Не сработало: явная отметка возможности сделать upcast для типа сообщений

Был сделан неудачный подход вот с такой идеей: пусть пользователя будет указывать для некоторых типов сообщений возможность upcasting-а к базовому типу.

Т.е. для примера с изображениями это могло бы выглядеть, например, так:

class image_base : public so_5::upcastable_message_root_t<image_base>{... // Какие-то общие для всех изображений свойства.};class image_vendor_A  : public so_5::upcastable_message_t<image_vendor_A, image_base>{...};class image_vendor_B  : public so_5::upcastable_message_t<image_vendor_B, image_base>{...};...

Когда к агенту прилетает сообщение, у которого в базовых классах есть so_5::upcastable_message<T>, то поиск обработчика выполняется по более сложной процедуре:

  1. Сперва ищется обработчик для актуального типа сообщения. Если найден, то он вызывается и цикл поиска обработчика прерывается.

  2. Если обработчик не найден, то берется тип, к которому можно сделать upcasting. Если такой тип есть, то идем к шагу 1. Если же иерархия upcastable-сообщений закончилась, но обработчик так и не найден, то цикл поиска обработчика прерывается.

При таком алгоритме поиска обработчиков если агент делает подписку вот так:

void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image_vendor_A & cmd) {... /* (1) */})    .event([this](const image_base &) {... /* (2) */});}

то для сообщения image_vendor_A будет найден обработчик (1), а для сообщения image_vendor_B будет найден обработчик сообщения (2).

Данный подход выглядит привлекательным и вроде бы он даже не несет очень уж больших накладных расходов при диспетчеризации сообщений (фактически, добавляется один дополнительный if в начале процедуры поиска обработчика). Более того, этот подход интересен еще и тем, что он (вроде бы) может использоваться и при работе с mchain-ами.

Но засада встретилась там, где не ждали.

На самом деле есть несколько таблиц подписки

Выше показывалась таблика подписок для агента. Но это не единственная таблица подписок, которая участвует в диспетчеризации сообщений.

Еще одна таблица, но более простая, хранится в mbox-е. Поэтому полная картина подписок выглядит как-то так:

Т.е. обычный mbox знает на какие типы сообщений у него есть подписчики. Например, на сообщение типа M1 подписаны агенты Alice и Bob, а на сообщение M2 -- только Bob.

Благодаря такой информации mbox знает, что когда в него отсылают сообщение M1, то это сообщение должно быть доставлено и Alice, и Bob-у. А вот когда отсылают сообщение M2, то оно доставляется только Bob-у.

Так вот, засада в том, что когда агент подписывается вот таким образом:

void some_image_processor::so_define_agent() {  so_subscribe(image_mbox_)    .event([this](const image_vendor_A & cmd) {... /* (1) */})    .event([this](const image_base &) {... /* (2) */});}

то у mbox-а image_mbox_ есть информация только о подписке на сообщение типа image_vendor_Aи на сообщение типа image_base. Подписок на сообщения других типов для этого агента у image_mbox_ нет.

Соответственно, если в image_mbox_ отсылается image_vendor_B, то это сообщение агенту вообще отправлено не будет.

И это более чем серьезная засада. Особенно с учетом того, что mbox-ы в SObjectizer-е предназначены в том числе и для того, чтобы разработчики могли создавать свои специфические mbox-ы под свои собственные нужды (примеры специализированных mbox-ов можно найти в so5extra). И заставить разработчиков принимать во внимание наличие сообщений, которые могут быть приведены к базовому типу... Как-то уж это все слишком уж. Слишком уж геморрно, что ли.

Что еще можно было бы попробовать: принудительный upcasting к базовому типу

Есть еще одна мысль, которая выглядит вполне себе реализуемой, но которая еще не проверялась на практике (и вряд ли будет в ближайшее время). Здесь так же используется наследование от специальных типов для того, чтобы SObjectizer знал откуда и куда можно делать upcasting:

class image_base : public so_5::upcastable_message_root_t<image_base>{... // Какие-то общие для всех изображений свойства.};class image_vendor_A  : public so_5::upcastable_message_t<image_vendor_A, image_base>{...};class image_vendor_B  : public so_5::upcastable_message_t<image_vendor_B, image_base>{...};...

При отсылке любого сообщения SObjectizer смотрит на то, можно ли для сообщения сделать upcasting. Так, если отсылается сообщение image_vendor_A, то SObjectizer уже в компайл-тайм понимает, что здесь возможен upcasting до типа image_base. Поэтому внутри send-а SObjectizer выполняет приведение к базовому типу и отсылает сообщение как имеющее тип image_base, а не image_vendor_A.

Далее, когда агент подписывается на сообщение какого-то типа, то SObjectizer опять смотрит на то, можно ли для этого сообщения делать upcasting. Если можно, то SObjectizer делает хитрую подписку: в таблицы подписки добавляется самый верхний тип, к которому можно делать upcasting.

Допустим, что агент infrared_image_processor подписывается на сообщение image_vendor_A иimage_vendor_Bиз почтового ящика incoming_images. SObjectizer понимает, что здесь возможен upcasting до image_base. Поэтому в почтовый ящик добавляется подписка для сообщения image_base, а не image_vendor_A или image_vendor_B. В таблицу подписок агента так же добавляется подписка только на image_base, но в этом случае подписывается не простой обработчик сообщения, а специальный:

Таблицы подписок в случае с принудительным upcasting-ом к базовому типуТаблицы подписок в случае с принудительным upcasting-ом к базовому типу

Этот специальный обработчик берет экземпляр поступившего сообщения и смотрит, относится ли оно к типу image_vendor_A (или производному от него). Если относится, то вызывает обработчик для image_vendor_A. Если же сообщение относится к типу image_vendor_B,то вызывается обработчик для image_vendor_B. Если же ни одно из условий не выполнилось, то сообщение игнорируется.

Причем все эти фокусы с upcasting-ом SObjectizer делает только в том случае, если сообщение это допускает (т.е. наследуется от so_5::upcastable_message_t<T>). Если же используются обычные сообщение, то никакой хитрой магии SObjectizer не делает.

Вместо заключения

Так уж получилось, что я занимаюсь SObjectizer-ом уже очень давно. А когда работаешь над одним и тем же долгое время, то рано или поздно ты оказываешься в ситуации, когда твои же проектные решения, но принятые много-много лет назад, обнаруживают свои последствия вот прямо здесь и сейчас.

Описанный в статье сценарий обработки сообщений невозможен в текущем варианте SObjectizer-а как раз из-за проектных решений, принятых мной сперва в 2002-ом году в SObjectizer-4, а затем и в 2010-ом году в SObjectizer-5...

А теперь важный вопрос, ради которого эта статья и была написана: нужна ли подобная функциональность кому-нибудь из попробовавших SObjectizer читателей? Следует ли внести ее в список хотелок для будущих версий SObjectizer-а?

PS. Возможно читатели, которые интересуются нашими открытыми проектами RESTinio и SObjectizer (+so5extra), уже знают, что мы вынуждены приостановить их развитие. К сожалению, целевое финансирование для этих открытых проектов найти не удалось. Поэтому мы постараемся поднакопить средства на заказных разработках, чтобы затем вернуться к работам над RESTinio/SObjectizer/so5extra. И если кому-то нужна помощь опытных разработчиков, то у нас как раз есть пара свободных рук. Не самых плохих ;)

Подробнее..

Project Loom Современная маcштабируемая многопоточность для платформы Java

19.02.2021 18:05:42 | Автор: admin


Эффективное использование многочисленных ядер современных процессоров сложная, но всё более важная задача. Java была одним из первых языков программирования со встроенной поддержкой concurrency. Ее concurrency-модель, основанная на нативных тредах, хорошо масштабируется для тысяч параллельно выполняющихся стримов, но оказывается слишком тяжеловесной для современного реактивного программирования с сотнями тысяч параллельных потоков.


Ответ на эту проблему Project Loom. Он определяет и реализует в Java новые легковесные параллельные примитивы.


Алан Бейтман, руководитель проекта OpenJDK Core Libraries Project, потратил большую часть последних лет на проектирование Loom таким образом, чтобы он естественно и органично вписывался в богатый набор существующих библиотек Java и парадигм программирования. Об этом он и рассказал на Joker 2020. Под катом запись с английскими и русскими субтитрами и перевод его доклада.



Меня зовут Алан Бейтман, я работаю в группе Java Platform в Oracle, преимущественно над OpenJDK. Сегодня я буду говорить о Project Loom.


Мы занялись этим проектом в конце 2017 года (точнее, технически в начале 2018-го). Он появился как проект в OpenJDK для того, чтобы упростить написание масштабируемых многопоточных приложений. Цель в том, чтобы позволить разработчикам писать масштабируемые многопоточные приложения в так называемом синхронном стиле. Это достигается путем доведения базовой единицы многопоточности потока до такой легковесности, чтобы им можно было представлять любую параллельную задачу. Даже задачи, которые блокируются или выполняются в течение длительного времени.


Кто-то говорит: чем больше несвязанных запросов поступает в серверное приложение, тем большей степени многопоточности можно достичь.


План выступления такой:


  1. Начну с пары слов о мотивации этого проекта.
  2. Поговорю о том, как мы имплементировали эти так называемые легкие потоки.
  3. Переключусь на IDE и покажу несколько демо, напишу немного кода.
  4. Наконец, рассмотрю другие аспекты проекта.

Потоки


Платформа Java (и язык, и JVM) во многом построена на концепции потоков:


  • Если вы сталкиваетесь с исключением, то получаете трассировку стека определенного потока.
  • Вы можете связать некоторые данные с потоками, используя ThreadLocal.
  • Если вы находитесь в отладчике и выполняете пошаговое выполнение кода, вы шагаете по выполнению потока. Когда вы нажимаете step over, это означает переход к следующей инструкции в потоке, с которым вы работаете.
  • А когда вы находитесь в профайлере, профайлеры обычно группируют данные по потокам, сообщают вам, какие потоки выполняются и что они делают.

В общем, всё, что касается платформы и инструментов, связано с потоками.


В Java API поток означает java.lang.Thread. В реализации JDK есть только одна реализация потока, которая фактически основана на потоке операционной системы. Между java.lang.Thread и потоком ОС существует связь один-к-одному. Те из вас, кто уже давно работает с платформой Java, могут вспомнить зелёные потоки в ранних выпусках JDK. Я немного расскажу об этом позже. Но по меньшей мере последние 20 лет, когда мы говорим о java.lang.Thread, мы говорим о тонкой оболочке вокруг потока ОС.


Cами потоки ОС ничего не знают о Java. Они очень общие. Обычно они должны поддерживать множество разных языков и сред выполнения. Они ничего не знают о том, как эти языки и программы используют стек, и им обычно приходится выделять очень большой фиксированный стек, чтобы поддерживать универсальность
программ, которые должны выполняться.


Другой аспект потоков операционной системы заключается в том, что они проходят через ядро ОС для переключения контекста, что обычно требует
значительное количество времени. В течение многих лет это было около микросекунды или больше.


Еще одна особенность потоков операционной системы заключается в том, что ядро ОС должно иметь некоторую форму планирования. Ему нужно выбрать, какой поток
запускать на каждом ядре процессора. Веб-серверы ведут себя совсем иначе, чем, например, поток с вычислениями для воспроизводения видео. Планировщик ОС должен быть очень общим и в некотором роде компромиссным, чтобы поддерживать все различные варианты использования.


Давайте немного поговорим об использовании потоков. Можно писать многопоточные приложения в стиле, который мы называем синхронным. Простой, императивный блокирующий синхронный код. Задачи выполняются в одном потоке от начала до конца. Код обычно очень легко читать и писать. Именно так большинство из нас, я думаю, учились его писать. Мы научились работать так с одним потоком до того, как узнали, что такое поток.


Этот синхронный стиль очень хорошо сочетается с дизайном языка Java. Он очень хорошо сочетается с инструментами. И в целом, как любит говорить мой коллега Рон Пресслер, гармонично сочетается с платформой. Но поскольку поток по сути представляет собой тонкую оболочку вокруг потока ОС, это ограниченный ресурс.


Если каждая транзакция или запрос использует поток, то максимальное количество потоков это максимальное количество транзакций, которые система может обрабатывать за раз. По сути, это наш уровень многопоточности.


А с современным сервером теоретически вы можете иметь миллионы сетевых подключений. Я видел, как Хайнц Кабуц делает демо с Project Loom, где он фактически использовал два миллиона соединений. И серверы могут поддерживать подобное, если у них достаточно памяти.


Итак, если у вас миллионы сетевых подключений, но только тысячи активных потоков, потоки становятся вашим ограничивающим фактором, если на каждое соединение приходится один поток. Так что они ограничивают наш уровень многопоточности. И это, безусловно, может сильно повлиять на пропускную способность.


Ладно, что нам с этим делать?


Если потоки дорогой ресурс, почему бы нам ими не делиться? Это означает пулы потоков. Вместо создания потока для каждого запроса или транзакции мы заимствуем поток из пула, выполняем транзакцию, а затем возвращаем его в пул.


Но у пулов много проблем. Думаю, большинство тех, кто их использовал, знает о проблеме того, как они засоряют ThreadLocals. Эта так называемая утечка памяти с пулами потоков существует долгое время. Также проблематична отмена: если я хочу отменить транзакцию, но она почти завершена, я, возможно, прерываю поток после того, как он был взят для другой транзакции.


И даже если бы мы решили приведенные проблемы, этого все равно недостаточно, потому что мы все еще выполняем транзакцию от начала до конца в одном потоке. Это занимает его на протяжении всей операции, а уровень многопоточности по-прежнему ограничен количеством потоков, которые может обработать ОС.


Мы можем заметить, что для большого количества запросов и транзакций,
которые не упираются в CPU, поток проводит большую часть своего времени в ожидании, в блокировке, ожидая базу данных, IO, что-то еще. А нам, на самом деле, не нужен поток, когда он ожидает.


Предположим, что мы использовали подход, при котором поток во время ожидания возвращается в пул. Это позволяет одновременно выполнять гораздо больше транзакций. Проблема в том, что многие API, блокирующие API, удерживают поток на протяжении какой-либо операции в ожидании блокировки, ожидании данных в сокете.


Это приводит нас к созданию новых API, по существу, несовместимых со старыми. Или в итоге у нас есть синхронные и асинхронные версии API.


Другой вопрос, что это вынуждает нас разделять транзакцию или запрос на мелкие кусочки, где разные части работают на разных потоках. Большая проблема с этим мы теряем всякую связь с потоками, контекст становится очень и очень трудно отслеживать. Например, если появилось исключение, которое было брошено для определенной части или определенного этапа транзакции, у меня нет общего контекста, где оно на самом деле было брошено.


Когда мы отлаживаем, это будет отладка лишь этапа транзакции. Если мы профилируем, то не увидим многого о том, что на самом деле происходит. Потому что многие транзакции выглядят ничего не делающими, они просто ждут IO. Профайлер, который смотрит на незанятый пул потоков, не особо много видит.


Я говорю здесь о том, что мы называем асинхронным стилем. Его преимущество в том, что он очень масштабируемый. Количество транзакций больше не ограничено количеством потоков. Но его трудно читать и иногда трудно поддерживать, потому что мы потеряли контекст.


Таким образом, в целом это позволяет нам лучше использовать аппаратные ресурсы, но наше приложение сложнее писать и поддерживать.


Что приводит нас к дилемме.



Разработчики могут написать простой синхронный код и потратить больше денег на оборудование. Или, если вы хотите эффективно использовать оборудование и управлять асинхронным приложением, тогда мы больше платим разработчикам.


Итак, как нам решить эту дилемму? Что, если бы мы могли снизить стоимость потоков и иметь их неограниченное количество? Тогда мы могли бы написать простой синхронный код, который гармонирует с платформой, полностью использует оборудование и масштабируется как асинхронный код. Project Loom именно об этом.


API


Давайте пойдем дальше и поговорим немного об API.


Если Project Loom снижает стоимость потоков, то как это будет отражаться на разработчиках и на API? Эта проблема сложнее, чем кажется на первый взгляд, и мы потратили более двух лет на борьбу с ней.


Один из вариантов, с которого мы начали и к которому в итоге вернулись, это
использование для легких форм потоков java.lang.Thread. Это старый API, который существует с JDK 1.0. Проблема в том, что у него много багажа. Там есть такие вещи, как группы потоков, загрузчик классов контекстов потоков. Есть множество полей и других API, которые связаны с потоками, которые просто не интересны.


Другой вариант начать все сначала и ввести совершенно новую
конструкцию или новый API. Если вы с самого начала интересовались Project Loom, возможно, вы видели некоторые из ранних прототипов, где мы представили для дешевых легких потоков совершенно новый API под названием fiber.


Помимо изучения API, мы также много изучали, как люди используют потоки. Оказалось, что одни их части используются очень широко, другие в меньшей степени. Thread.currentThread() используется и прямо и косвенно везде, например, для блокировки.


Вопрос, который часто возникает в викторинах: Сколько раз Thread.currentThread() используется при первом использовании популярной библиотеки логирования? Люди, не знающие ответа на этот вопрос, могут ответить 2 или 5. Правильный ответ 113.


Другой широко используемый аспект потока это ThreadLocals. Они используются везде, что иногда не радует. Если сломать Thread.currentThread() или ThreadLocals, то в контексте этих новых более дешевых потоков будет не запустить много уже существующего кода. Поэтому вначале, когда у нас был fiber API, нам пришлось эмулировать Thread API, чтобы существующий код запускался в контексте того, что называлось в то время fiber. Таким образом, мы могли уйти от кода, использующего Thread, не повредив нарыв.


Итак, .currentThread() и Threadlocals очень широко используются. Но в потоках есть и редко используемый багаж. И здесь нам немного помогает расширенная политика депрекации. Если некоторые из этих старых областей со временем могли бы исчезнуть, подвергувшись депрекации, окончательной депрекации и, в конечном итоге, удалению тогда, может быть, удастся жить с java.lang.Thread.


Два года исследований, около пяти прототипов и мы пришли к выводу, что избежать
гравитационного притяжения 25 лет существующего кода невозможно. Эти новые дешевые потоки будут представлены с существующим API java.lang.Thread. То есть java.lang.Thread будет представлять и потоки ОС, и новые дешевые потоки.


Мы также решили дать этим новым потокам имя. Оно появилось благодаря Брайану Гетцу, он придумал название виртуальный поток (virtual thread).


Использование привычного Thread хорошая новость для разработчиков. Нет новой модели программирования, нет новых концепций для изучения, вместо этого вам фактически придется отучиться от некоторых старых привычек. Когда я говорю отучиться, я имею в виду такие вещи, как пулы потоков, ThreadLocals и так далее.


Как реализованы эти виртуальные потоки?



Они мультиплексируются поверх небольшого пула потоков операционной системы. Я сказал потоки во множественном числе, и вот тут уместно вспомнить уже упомянутые green threads. Ранние выпуски JDK, особенно 1.0.1.1 с классической виртуальной машиной, поддерживали модель, где потоки мультиплексировались в один-единственный поток ОС. То, что мы делаем теперь, перекликается с этим, но сейчас речь о более чем одном потоке ОС.


Итак, у нас есть набор потоков, на которые эти виртуальные потоки мультиплексируются. Под капотом виртуальная машина HotSpot была обновлена для поддержки новой конструкции: scoped stackful one-shot delimited continuations. Виртуальные потоки объединяют континуации в HotSpot с планировщиками в библиотеке Java. Когда код, выполняющийся в виртуальном потоке, блокируется, скажем, в операции блокировки или в блокирующей IO-операции, соответствующая континуация приостанавливается, стек потока, на концептуальном уровне, вымещается в кучу Java, а планировщик выберет и возобновит другой виртуальный поток в этом же потоке ОС. Исходный виртуальный поток может быть возобновлен в том же потоке ОС или в другом.


Таким образом, код, в котором есть остановка и возобновление, может делать это в разных потоках ОС с течением времени. Есть небольшой набор потоков операционной системы, который обычно как минимум соответствует количеству ядер в системе. Они поддерживают выполнение многих виртуальных потоков, осуществляя прогресс кусочек за кусочком.


Пользовательский код, использующий API Java, не знает о распределении, которое
происходит под капотом, а yield и resume происходит глубоко в библиотеках JDK, поэтому мы говорим, что планирование является вытесняющим и не требует сотрудничества со стороны кода пользователя.


С точки зрения стоимости, поток ОС слева, виртуальный поток справа.



Обычно операционная система резервирует около мегабайта стека для потока операционной системы. Некоторые ядра выделяют дополнительные данные ядра, и 16КБ не редкость. Это то, что операционная система имеет на поток ОС. Кроме того, виртуальная машина HotSpot добавляет к этому пару КБ метаданных.


Виртуальные потоки намного дешевле, текущий прототип составляет около 256 байт на виртуальный поток. Еще есть стек, он уменьшается и увеличивается по мере необходимости и обычно составляет пару КБ в этом главное преимущество
виртуальных потоков перед потоками ОС.


Переключение задач также немного лучше, в типичных ОС оно составляет около микросекунды, в некоторых случаях может быть хуже. Лучший вариант с виртуальными потоками на данный момент работает лучше, около пары сотен наносекунд. Меньший размер и лучшее время переключения контекста означают, что мы можем иметь столько потоков, сколько захотим, и уровень многопоточности может
расти без ограничений.


Самое время перейти от слайдов к IDE и показать вам несколько примеров в коде.


Демо


У меня открыта IDE с пустым методом, и мы начнем с самого начала.


import ...public class Demo {    public static void main(String[] args) throws Exception {...}    void run() throws Exception {    }}

Я упомянул, что мы ввели новый фабричный метод, и начну с использования фабричного метода Thread.startVirtualThread().


import ...public class Demo {    public static void main(String[] args) throws Exception {...}    void run() throws Exception {        Thread thread = Thread.startVirtualThread(() -> System.out.println("hello"));        thread.join();    }}

Вывел сообщение hello, ничего особенного. Это немного отличается от использования конструкторов и метода start(), здесь всего лишь один фабричный метод.


Я изменю тело лямбда-выражения, просто выведу трассировку стека, чтобы вы могли видеть, что на самом деле происходит.


void run() throws Exception {    Thread thread = Thread.startVirtualThread(Thread::dumpStack);    thread.join();}

Этот референс-метод просто вызывает дамп стека в контексте виртуального потока.



Возможно, это выглядит немного иначе, чем то, что вы видели бы с обычным java.lang.Thread, потому что фреймы, которые вы видите здесь, не те, что вы видите в обычном JDK. Это своего рода эквивалент запуска потока, потому что виртуальный поток запускает континуацию. Это дает представление о том, в чем вы можете увидеть разницу.


Давайте рассмотрим еще один из аспектов API. Что делает этот startVirtualThread()?


В дополнение к введению фабричных методов для создания виртуальных потоков, текущий прототип имеет новый билдер для создания потоков. Создадим билдер с помощью метода Thread.builder(). Мы можем при этом вызвать несколько методов, позволяющих настроить поток: является ли он потоком-демоном, какое у него имя и некоторые другие аспекты.



В числе этих методов есть virtual(). Создание виртуального потока cо startVirtualThread(), было, по сути, тем же самым. Вот длинная форма того, что я сделал минуту назад:


void run() throws Exception {    Thread thread = Thread.builder().virtual().task(() -> {        System.out.println("hello");    }).start();    thread.join();    }}

Мы снова сделали то же самое многословнее, но теперь использовали билдер потоков. А он избавляет нас от того, чтобы сначала использовать конструктор для создания потоков, а затем вызывать setDaemon() или setName(). Это очень полезно.


Это хорошее улучшение API для тех, кто в конечном итоге использует Thread API напрямую. Запускаем и получаем то же, что и в случае с startVirtualThread().


Еще мы можем создать ThreadFactory.


void run() throws Exception {    ThreadFactory factory = Thread.builder().name(prefix:"worker-", start:0).factory();}

Это создает фабрику потоков она создает потоки, которые называют себя worker-0, worker-1, worker-2 и так далее. На самом деле worker это только начальный аффикс, который добавляется к префиксу. Это еще один полезный способ создания фабрик потоков.


Покажу вам простой пример того, что вы можете делать с билдером потоков, используя возможности, которые дают нам дешёвые виртуальные потоки.


Большинство людей фактически не используют Thread API напрямую. Начиная с JDK 5, они перешли на использование ThreadExecutor и других API из java.util.concurrent.


Я хочу показать вам использование одного из этих ThreadExecutor. Мы создадим множество потоков и покажем вам, что на самом деле происходит.


Я собираюсь создать ExecutorService executor:


try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {}

Этот фабричный метод для Executors создает виртуальные потоки. Обратите внимание, что здесь я использую try-with-resources. Одна из вещей, которые мы сделали в Loom, мы модернизировали ExecutorService для расширения AutoCloseable, чтобы вы могли использовать их с конструкцией try-with-resources.


Приятная вещь в том, что когда он закрыт, как на примере сверху, он гарантирует, что все задачи, которые являются потоками в этом случае, будет завершены до того, как завершится закрытие. На самом деле закрытие будет ждать, пока все задачи или потоки не будут завершены, что очень полезно. Этот тип Executor'а создает поток для каждой задачи, поэтому он сильно отличается от пулов потоков, которые вы обычно создаете с помощью фабричного класса Executors.


Давайте создадим здесь миллион потоков.


import ...  public class Demo {      public static void main(String[] args) throws Exception {...}      void run() throws Exception {          try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {              IntStream.range(0, 1_000_000).forEach(i -> {                  executor.submit(() -> { });              });          }      }      String fetch(String url) throws IOException {...}      void sleep(Duration duration) {...}  }

Я использую IntStream.range(), вместо цикла for. Это вызовет метод executor.submit() один миллион раз, он создаст миллион потоков, которые ничего не делают. Если это запустить, ничего интересного не произойдет Process finished with exit code 0.


Для наглядности давайте добавим счетчик, который будет обновляться каждым из потоков.


import ...public class Demo {    public static void main(String[] args) throws Exception {...}    void run() throws Exception {        AtomicInteger counter = new AtomicInteger();        try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {            IntStream.range(0, 1_000_000).forEach(i -> {                executor.submit(counter::incrementAndGet);            });        }        System.out.println(counter.get());    }    String fetch(String url) throws IOException {...}    void sleep(Duration duration) {...}  }

Мы создаем Executor, отправляем миллион задач, каждая из этих задач будет увеличивать счетчик, и когда все закончится, выводится значение счетчика. Если все работает правильно, как у нас, должен быть выведен миллион.


Отрабатывает быстро как видите, эти потоки очень дешевы в создании.


Давайте покажу вам, что еще мы можем делать с Executor'ами. У меня есть метод, который просто принимает байты из определенного URL-адреса, создает из него строку. Это не очень интересно разве что то, что это блокирующая операция.


String fetch(String url) throws IOExpection {    try (InputStream in = URI.create(url).toURL().openStream()) {        byte[] bytes = in.readAllBytes();        return new String(bytes, charsetName:"ISO-8859-1");    }}

Он создает сетевые подключения, HTTP-соединение будет ожидать результатов от сервера, мы просто воспользуемся этим как частью примера.


Давайте посмотрим вот на что:


void run() throws Exception {       try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/");           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/en");           String first = executor.invokeAny(List.of(task1, task2));           System.out.println(first.length());       }     }

Вот одна задача, которая получает HTML-страницу с jokerconf.com. Я собираюсь создать вторую задачу, которая будет делать то же самое, но будет получать английскую версию страницы. Если кто-то говорит на двух языках, то он сможет читать и русскую и английскую страницу, это не имеет значения.


Мы используем executor.invokeAny() и даем ему две задачи.
ExecutorService имеет несколько комбинаторов, invokeAny(), invokeAll(), они существуют уже давно. Мы можем использовать их с виртуальными потоками.


В данном случае мы подставим в first результат, в зависимости от того, какая из этих задач будет выполнена первой.


Я запущу два виртуальных потока. Один из них получит первую страницу, другой вторую, в зависимости от того, что вернется первым, я получу результат в String first. Другой будет отменен (прерван). Запускаем и получаем результат: 200160, то есть одна из страниц размером 200 КБ.


Итак, что произошло: были созданы два потока, один выполнял блокирующую операцию получения данных с первого URL-адреса, другой со второго URL-адреса, и я получил то, что пришло первое. Если запущу еще пару раз, буду получать разные значения: одна из страниц всего 178 КБ, другая 200 КБ.


Это один из комбинаторов. На самом деле, я бы мог хотеть обе страницы и что-то с ними сделать, в этом случае я мог бы использовать invokeAll().


void run() throws Exception {       try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/");           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/en");           executor.invokeAll(List.of(task1, task2)); List>Future>String>>                   .stream() Stream<Future<String>>                   .map(Future::join) Stream<String>                   .map(String::length) Stream<integer>                   .forEach(System.out.println);       }     }

Как видите, это не слишком интересно всё, что мы здесь делаем, это invokeAll(). Мы выполним обе задачи, они выполняются в разных потоках. InvokeAll() блокируется до тех пор, пока не будет доступен результат всех задач, потому что вы получаете здесь Future, которые гарантированно будут выполнены. Создаем поток, получаем результат, получаем длины, а затем просто выводим их. Получаем 200 КБ и 178 КБ. Вот что вы можете делать с ExecutorService.


Покажу вам еще кое-что. В рамках Loom мы немного поработали над CompletableFuture, чтобы вы могли делать подобное. Добавить задачу и получить CompletableFuture, а не Future, и тогда я смогу написать такой код:


void run() throws Exception {       try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/");           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/en");           CompletableFuture<String> future1 = executor.submitTask(task1);           CompletableFuture<String> future2 = executor.submitTask(task2);           CompletableFuture.completed(future1, future2) Stream<CompletableFuture<String>>                   .map(Future::join) Stream<String>                   .map(String::length) Stream<integer>                   .forEach(System.out.println);       }     }

Я вызываю в CompletableFuture-метод под названием completed(). Это возвращает мне стрим, который заполняется Future в ленивом режиме по мере их завершения. Это намного интереснее, чем invokeAll(), который я показал ранее, поскольку метод не блокируется, пока не будут выполнены все задачи. Вместо этого поток заполняется результатом в ленивом режиме. Это похоже на стримо-подобную форму CompletionService, если вы когда-нибудь такое видели.


Я запущу несколько раз, порядок, вероятно, будет случайным. В любом случае, это дает вам представление о других вещах, которые вы можете делать с CompletableFuture, стримами и виртуальными потоками.


Еще одна вещь, которую я хочу сделать, забегая вперед. Мы еще поговорим об этом подробнее после демо. В прототипе есть ограничение. Виртуальные потоки делают то, что мы называем закреплением потока ОС, когда мы пытаемся выполнить IO-операции, удерживая монитор. Я объясню это лучше после демо, но пока у меня открыта IDE, покажу вам это на практике и объясню, на что это влияет.


import ...public class Demo {    public static void main(String[] args) throws Exception {...}    void run() throws Exception {        Thread.startVirtualThread(() ->            sleep(Duration.ofSeconds(2));        }).join();    }    String fetch(String url) throws IOException {...}    void sleep(Duration duration) {...}  }

Виртуальный выполняет блокирующую операцию, спит две секунды, завершается. Пока не слишком интересно. Теперь предположим, что он должен спать, пока держит монитор.


void run() throws Exception {        Thread.startVirtualThread(() -> {            Object lock = new Object();            synchronized (lock) {                sleep(Duration.ofSeconds(2));            }        }).join();}

Я запускаю это с диагностическим свойством, которое даст мне трассировку стека, когда поток закреплен.



Мы видим трассировку стека, которая говорит мне, что поток был закреплен. Она снабжена примечаниями, которые говорят мне, где удерживается монитор. Это ограничение в нашем текущем прототипе. По сути, потоки могут блокироваться, удерживая монитор. Поток закрепляет соответствующий поток ОС, эта скорее качество сервиса текущей реализации, чем ошибка. Я вернусь к тому, что мы делаем c этим, через несколько минут. Это своего рода базовое демо.


У меня есть более полная демонстрация, переключусь для этого на немного другой проект.


package demo;import ...@Path("/")public class SleepService {    @GET    @Path("sleep")    @Producers(MediaType.APPLICATION_JSON)    public String sleep(@QueryParam("millis") long millis) throws Exception {        Thread.sleep(millis);        return "{ \"millis\": \"" + millis + "\" };    }}

Уже существуют несколько серверов, которые работают с виртуальными потоками.
Есть сервер Helidon MP. Я думаю, MP означает MicroProfile. Helidon настроен, они недавно внесли некоторые изменения, теперь вы можете запустить его со свойством, при котором он будет запускать каждый запрос в отдельном виртуальном потоке. Мой код может выполнять операции блокировки, и они не будут закреплять базовый поток ОС. У меня может быть намного больше запросов, чем потоков, работающих одновременно и выполняющих блокирующие операции, это действительно очень полезно.


Первый сервис, который я вам покажу, что-то вроде эквивалента hello world при использовании подобных служб. Запускаем код из примера выше, переходим в окно терминала и вводим curl-команду.



Curl-команда кодирует параметр миллисекунд обратно в JSON, который возвращается.
Не слишком интересно, потому что все, что было сделано, это сон. Остановлю сервер и вставлю Thread.dumpStack():


public String sleep(@QueryParam("millis") long millis) throws Exception {    Thread.dumpStack();    Thread.sleep(millis);    return "{ \"millis\": \"" + millis + "\" };}

Снова запущу сервер. Я снова выполняю команду curl, которая устанавливает HTTP-соединение с сервером, она подключается к эндпоинту сна, параметр millis=100.


curl http://localhost:8081/sleep?millis=100


Посмотрим на вывод: печатается трассировка стека, созданная Thread.dumpStack() в сервисе.



Огромная трассировка стека, мы видим здесь кучу всего: код Helidon, код Weld, JAX-RS Довольно интересно просто увидеть это всё. Это сервер, который создает виртуальный поток для каждого запроса, что довольно интересно.


Теперь посмотрим на более сложный сервис. Я показал вам комбинаторы
invokeAny и involeAll в простом демо в самом начале, когда показывал новый ExecutorService.


import ...@Path("/")public class AggregatorServices {    @GET    @Path("anyOf")    @Produces(MediaType.APPLICATION_JSON)    public String anyOf(@QueryParam("left") String left,                        @QueryParam("right") String right) throws Exception {        if (left == null || right == null) {            throw new WebApplicationException(Response.Status.BAD_REQUEST);        }        try (var executor :ExecutorService = Executors.newVirtualThreadExecutor()) {            Callable<String> task1 = () -> query(left);            Callable<String> task2 = () -> query(right);            // return the first to succeed, cancel the other            return executor.invokeAny(List.of(task1, task2));        }    }    @GET    @Path("allOf")    @Produces(MediaType.APPLICATION_JSON)    public String allOf(@QueryParam("left") String left,                        @QueryParam("right") String right) throws Exception {        if (left == null || right == null) {            throw new WebApplicationException(Response.Status.BAD_REQUEST)        }        try (var executor :ExecutorService = Executors.newVirtualThreadExecutor()) {            Callable<String> task1 = () -> query(left);            Callable<String> task2 = () -> query(right);            // if one falls, the other is cancelled            return executor.invokeAll(List.of(task1, task2), cancelOnException: true) List<Future<String>>                    .stream() Stream<Future<String>>                    .map(Future::join) Stream<String>                    .collect(Collectors.joining(delimiter:", ", prefix:"{", suffix:" }"));        }    }    private String query(String endpoint) {...}}

Здесь у нас несколько сервисов, они находятся в этом исходном файле под названием AggregatorServices. Здесь есть две службы, два метода я бы сказал: anyOf и allOf. anyOf выполняет левый и правый запросы и выбирает тот, который возвращается первым, а другой отменяет.


Начнем с anyOf. Я вызвал curl-команду:


curl http://localhost:8081/anyOf?left=/greeting\&right=/sleep?millis=200

localhost:8081 это текущий порт, эндпоинт anyOf, и я дал ей два параметра left и right. Я выполняю это и получаю hello world:


{"message":"Hello World!"}$


Причина в том, что сервис приветствия просто выводит hello world, а сервис сна спит 200 мс. Я предполагаю, что большую часть времени hello world будет быстрее, чем 200 мс, и всегда будет возвращаться hello world.


Если я уменьшу сон до 1 мс, то, возможно, сервис сна завершится раньше, чем другой сервис.


Теперь давайте изменим запрос на allOf, который объединит два результата:


curl http://localhost:8081/allOf?left=/greeting\&right=/sleep?millis=1

Запускаю и получаю два результата.


{ {"message":"Hello World!"}, { "millis": "1" } }$


Что интересно в allOf, он делает два запроса параллельно.


private String query(String endpoint) {        URI uri = URI.create("http://localhost:8081").resolve(endpoint);        return ClientBuilder.newClient() Client                    .target(uri) WebTarget                    .request(MediaType.APPLICATION_JSON) Invocation.Builder                    .get(String.class);    }

Кстати, это блокирующий код. Он использует клиентский API JAX-RS для подключения к этому эндпойнту. Он использует вызов invokeAll(), а затем .stream (), .map для получения результата, а затем Collectors.joining(), для объединения в JSON.


Это простой пример разветвления. Интересно то, что тут invokeAll() это вариант, в котором есть параметр cancelOnException. Если вы хотите вызвать несколько задач одновременно, но если одна из них не работает, вы отменяете все остальные. Это важно сделать, чтобы не застрять в ожидании завершения всех остальных задач.


В этих примерах я использую сборку для раннего доступа Loom. Мы очень близки к первому рампдауну JDK 16, поэтому код, над которым я работаю это JDK 16, каков он сейчас, плюс вся реализация Loom.


Ограничения


Поговорим об ограничениях.


В настоящее время существует два сценария, в которых виртуальные потоки, пытающиеся уступить, не освобождают базовый поток ОС. В примере я показал, как можно получить трассировку стека, когда поток закреплен, что на самом деле очень полезно для диагностики проблем такого типа.


Первый сценарий возникает, когда вы используете нативные фреймы. Если у вас есть код JNI, вы вызываете код JNI, он обращается обратно в код Java, а затем этот код Java пытается заблокироваться или совершить IO-операцию. На континуации есть нативный фрейм, мы мало что можем сделать. Это потенциально постоянное ограничение, но подобное должно происходить очень и очень редко.


Второй случай более проблематичен, и это то, что я показал в демонстрации: попытка
парковки, удерживая монитор. Скорее всего, первая версия будет с этим ограничением, это особенность реализации, влияющая на качество сервиса. В настоящее время ведутся исследования по реимплементации мониторов, чтобы преодолеть это ограничение. Но это серьезный объем работы, на это потребуется время.


На самом деле это не очень критично. По той простой причине, что всё, что сегодня использует мониторы Java, можно механически преобразовать из использования synchronized и wait-notify в использование блокировок из java.util.concurrent. Так что существуют эквиваленты мониторов в блокировках java.util.concurrent и различные формы блокировок, самый простой из которых ReentrantLock, они очень хорошо работают с виртуальными потоками.


Что вы можете сделать при подготовке к Loom?


Если вам нравится Loom и вы заинтересованы в том, чтобы подготовить код для работы с Loom, есть пара вещей, о которых стоит подумать.


Предположим, у вас миллион потоков и код, который использует много ThreadLocals. Хотя виртуальные потоки поддерживают ThreadLocals, при их большом количестве требуется много памяти. Тут есть над чем подумать. Мы уже довольно давно работаем в JDK над устранением многих из ThreadLocals, которые использовались в различных местах.


Распространенным является кэширование объектов SimpleDateFormat.
SimpleDateFormat может быть дорогостоящим в создании, они не являются потокобезопасными, поэтому люди повсеместно кэшируют их в ThreadLocals.


В JDK мы заменили кэширование SimpleDateFormats на новый неизменяемый формат даты java.date dateformatter. Он неизменяем, вы можете сохранить его
в static final поле, это достаточно хорошо. Мы удалили ThreadLocals и из некоторых других мест.


Другая сложность сводится к масштабированию приложения и обработке десятков тысяч запросов. Если у вас много данных на запрос или на транзакцию, это может занимать много места. Если у вас миллион TCP-соединений, это миллион буферов сокетов. Если вы оборачиваете каждый из них в BufferedOutputStream, PrintStream или что-то в этом роде, это много памяти. Мы работали над подобными вещами и в JDK, но я уверен, что дальше по стеку у людей будет много данных на запрос или на транзакцию.


Третье, как я уже говорил, переход от мониторов к java.util.concurrent позволяет избежать краткосрочных проблем.


Я говорил в основном о виртуальном потоке как о потоке в коде, но давайте поговорим о нескольких других вещах.


Расскажу немного об отладчике.



При отладке действительно важно, чтобы при движении по шагам, вы
работали в каком-то контексте. Обычно отладчики Java (в IntelliJ, NetBeans, Eclipse) используют интерфейс отладчика под названием JDI, где под капотом находится wire protocol, а в виртуальной машине есть интерфейс инструментов, называемый JVM Tool Interface или JVM TI, как мы его иногда называем. Это все необходимо обновить, чтобы иметь возможность поддерживать виртуальные потоки.


Оказывается, это значительный объем работы. Отладчики занимаются приостановкой и возобновлением, они фактически перечисляют потоки. Тут возникают сложности с масштабируемостью при переходе от тысяч потоков к сотням тысяч или
миллионам потоков.


Кроме того, наш подход к поддержке подобного в отладчиках оказался сбивающим с толку. Отладчик видит два потока: основной носитель (поток нашей ОС) и виртуальный поток. С этим было связано с множеством проблем, поэтому мы решили остановиться и пересмотреть все это. Так что это область сейчас работает не очень хорошо, но мы близки к тому, чтобы решить эти проблемы. Так что в это время ведется гигантский объем работы для повторной реализации и перестройки частей JVM TI для лучшей поддержки виртуальных потоков.


Итак, мы приближаемся к цели, большая часть деталей уже реализована, и мы надеемся, что очень скоро у нас будет гораздо лучшая ситуация с отладкой.


И последнее, что нужно сказать об отладчиках: кто-то спрашивает, когда мы все это доделаем, будут ли имеющиеся отладчики просто работать? Мы ожидаем, что в отладчики и инструментарий потребуется внести хотя бы некоторые небольшие изменения, чтобы они могли работать с виртуальными потоками. В основном на фронте масштабируемости, не имеет смысла пытаться визуализировать миллион потоков в отладчике. Такие вещи, как группы потоков это уже легаси, и попытки визуализировать что-либо в группах потоков, вероятно, не будут работать хорошо.


Перейдем к виртуальным потокам в профилировщике.


Это тоже очень важная область. Java Flight Recorder был обновлен в сборках Loom для поддержки виртуальных потоков. Я не был уверен, что во время доклада успею продемонстрировать использование JFR с виртуальными потоками, поэтому вместо этого я просто зафиксировал вывод команды print в JFR, просто чтобы показать вам, на что он способен.


В данном случае я сделал запись с JFR.



Он просто называется server.jfr, это имя файла записи. Эта конкретная запись была сделана при запуске сервера Jetty, настроенного для использования виртуальных потоков. Выходные данные показывают одно событие, чтение сокета. И оно произошло в виртуальном потоке. JFR по умолчанию имеет порог, кажется, около 200 мс, он может захватывать медленную операцию чтения, которая занимает больше времени, чем время порога.


Давайте расскажу, что именно здесь запечатлено. virtual = true указывает на то, что это виртуальный поток. Я распечатал всю трассировку стека, поэтому вы можете увидеть, что это действительно работает в виртуальном потоке, мы видим все фреймы, тут используются java.net.url и HTTP для чтения сокета, и это блокирует более чем на 200 мс. Это записано здесь в этой трассировке стека. Это то, что вы можете делать с JFR, что весьма полезно.


Помимо Flight Recorder, поддерживающего виртуальные потоки, существует множество других инструментов и профилировщиков, использующих JVM TI, поэтому нам приходится работать над множеством вещей, чтобы иметь возможность поддерживать профилировщики на основе JVM TI, работающие с виртуальными потоками.


В этой области есть проблемы, о которых я упоминал в контексте отладчиков, попытки визуализировать тысячи и тысячи потоков.


То же самое и с профилировщиками. Если их используют для приложения с миллионом потоков, не думаю, что следует пытаться визуализировать их все, это будет неудобно для пользователя. Это область, которая, вероятно, не будет масштабироваться, по крайней мере, с точки зрения пользовательского интерфейса. Это тип проблем, с которыми мы сталкиваемся при работе с некоторыми инструментами.


Serviceability


Чтобы иметь возможность использовать виртуальные потоки в продакшне, мы должны решить множество других задач по устранению неполадок и диагностике.


Я показал вам довольно простую распечатку трассировки стека, когда потоки закреплены. Будут и другие сценарии, значимые для разработчиков. Они не смогут идентифицировать, например, запущенные виртуальные потоки, выполняющие вычислительные задачи (упирающиеся в CPU), они никогда не блокируются. Было бы полезно идентифицировать их.


Дамп потоков это то, с чем мы боролись в течение некоторого времени. Люди привыкли использовать дампы потоков для устранения неполадок, но что это значит, если у вас миллион потоков? Вы хотите просто увидеть все потоки, но дедуплицировать их? Вы хотите иметь другие формы? Это те вещи, которые нам еще предстоит изучить.


Текущий статус того, где мы находимся с Loom


Цель состоит в том, чтобы получить первую демо-версию с виртуальными потоками. У нас был ряд проблем со стабильностью, неприятные сбои, но мы думаем, что уже решили большинство из этих проблем.


Было много тонких проблем с взаимодействием с многопоточными сборщиками мусора, которые беспокоили довольно долгое время. Мы очень надеемся, что скоро получим сборку для раннего доступа, в которой будут решены эти проблемы.


Производительность также высока, был достигнут значительный прогресс в производительности за последний год, потому что это критически важно.


Поверхность API небольшая, но, как и во всех API, очень важно получить как можно больше обратной связи. API всегда проходят доработку, не бывает так, чтобы всё получилось как надо на первой итерации, мы пройдем через другие итерации, прежде чем все будет сделано.


Поддержка отладчика, как я уже упоминал, была проблемой, мы думаем, что сейчас на правильном пути, и как только мы доделаем последние штрихи, я действительно надеюсь, что у людей, которые обслуживают IDE и отладчики, будет время поработать с нами и создать условия хорошего опыта отладки.


Что еще нужно сделать для нашего первого Preview: нам необходимо выполнить перенос на ARM64 или Aarch64, мы были сосредоточены на 64-разрядной версии Intel на сегодняшний день; и нам нужно что-то сделать с дампом потоков.


Направления для будущего развития


Я должен упомянуть еще несколько аспектов этого проекта. Это такой список из будущего: мы не ожидаем, что перечисленное в нём будет в первой версии.


Во многих случаях для запуска задачи создается виртуальный поток, который выполняет одно действие и дает один результат. В других случаях вам может понадобиться, чтобы виртуальные потоки производили стрим результатов или общались через сообщения той или иной формы.


Как вы видите в других моделях программирования, CSP или Actors. У других языков есть каналы, у Erlang есть почтовые ящики. В Java есть вещи, близкие к этому: есть BlockingQueues, SynchronousQueue, у которой нет емкости, LinkedTransferQueue, у которой есть емкость.


Профессор Даг Ли работал с нами над этим проектом, и он обновил реализации блокирующих очередей в java.util.concurrent, так что они дружелюбны к виртуальным потокам. Он также изучает то, что ближе к каналам. Текущее рабочее название этого проекта conduits, а не каналы, потому что у нас есть каналы в пакете java.nio.channels. Посмотрим, как это пойдет.


Другая область это структурированная многопоточность. При структурированном программировании последовательное выполнение ограничено каким-либо четко
определенным блоком кода. Структурированная многопоточность заключается в распространении контроля на многопоточную среду. По сути, если задача управления разделяется на несколько задач или потоков в некоторой области, то им необходимо снова объединиться, прежде чем выйти из этой области.


В ранних прототипах Project Loom у нас действительно были первые прототипы для исследований в этой области. И мы вернемся к этому, есть проблемные области, связанные с этим. Распространение исключений и ошибок, отмена выполнения и так далее. На данный момент мы сделали, и я показывал это в демонстрациях, модифицировали ExecutorService, чтоб он расширял AutoCloseable, чтобы, по крайней мере, мы могли иметь некоторую конструкцию, которая гарантирует, что все потоки завершатся до продолжения основного потока.


Со структурированной многопоточностью связано то, что мы называем структурированным serviceability или структурированным observability. Я упоминал о проблемах профилировщиков, отладчиков и других инструментов, пытающихся
визуализировать миллионы потоков. Что, если бы эти инструменты каким-то образом исследовали бы структуру или отношения между потоками. Тогда, возможно, нам лучше бы удалось это визуализировать. На это мы готовы потратить больше времени.


Последний пункт в этом списке отмена. Мы сделали несколько прототипов в этой области, несколько прототипов кооперативной отмены. В Java есть механизм прерывания, это устаревший механизм, но на самом деле он очень хорошо работает с виртуальными потоками. Для большинства разработчиков этот механизм находится на слишком низком уровне. Мы хотим понять, сможем ли мы сделать что-то лучше. У нас были механизмы отмены в ранних прототипах. Я полагаю, что основная проблема заключается в том, что наличие двух механизмов одновременно может сбивать с толку, поэтому необходимо подумать над этим немного времени, прежде чем принимать какие-то решения по этому поводу.


Главные выводы


Основные выводы из этого доклада:


Виртуальный поток это поток в коде, во время выполнения, в отладчике, профилировщике и других инструментах.


Виртуальный поток это не оболочка вокруг потока ОС, а, по сути, просто объект Java.


Создание виртуального потока действительно очень дешево, у вас их могут быть миллионы, их не надо объединять в пулы.


Блокировка в виртуальном потоке стоит мало, что позволяет использовать синхронный стиль.


Немного дополнительной информации


Один из замечательных способов внести свой вклад в такой проект, как Project Loom, это загружать его сборки (или делать их самостоятельно из кода) и пробовать его с реальными приложениями. Отправляйте отзывы о производительности, сообщениях об ошибках, проблемах и опыте. Такие вещи очень полезны для такого проекта.


Вот ссылки на сборки раннего доступа: https://jdk.java.net/loom
Список рассылки: loom-dev@openjdk.java.net
И вики-страница: https://wiki.openjdk.java.net/display/loom/Main


У нас не очень получается поддерживать вики-страницу, поэтому список рассылки лучшее место для поиска чего-либо.


Это все, что я хотел рассказать.


Напоследок традиционный слайд Safe harbor: не верьте ничему, что я говорю.



Как можно понять по этому докладу, на наших Java-конференциях хватает хардкора: тут про Java-платформу порой рассказывают те люди, которые её и делают. В апреле мы проведём JPoint, и там тоже будет интересный состав спикеров (многие знают, например, Джоша Лонга из VMware). Часть имён уже названа на сайте, а другие позже появятся там же.
Подробнее..

Обзор последних изменений в rotorе (v0.10 v0.14)

20.02.2021 08:20:29 | Автор: admin

actor system


rotor ненавязчивый С++ акторный микрофремворк с возможностью создания иерархий супервайзеров, похожий на своих старших братьев caf и sobjectizer. В последних релизах с момента последнего анонса накопились существенные улучшения, которые хотелось бы осветить.


Общий интерфейс для таймеров (v0.10)


Таймеры сами по себе вездесущи во всех акторных фрейморках, т. к. они делают программы более надёжными. До v0.10 не было API для того, чтобы взвести таймер; это можно было сделать только посредством доступа к низлежащему движку событий (event loop) и использованию соответствующего API, разного для разных движков. Это было не очень удобно и ломало абстракции: кроме доступа к API движка, нужно было в обработчике таймера использовать низкоуровневый API rotor'а, чтобы работала доставка сообщений. Кроме того, отмена таймера также имеет свои особенности в каждом цикле событий, что захламляло ненужными деталями логику работы актора.


Начиная с v0.10 в rotor'е, можно делать что-то вроде:


namespace r = rotor;struct some_actor_t: r::actor_base_t {    void on_start() noexcept {        timer_request = start_timer(timeout, *this, &some_actor_t::on_timer);    }    void on_timer(r::request_id_t, bool cancelled) noexcept {        ...;    }    void some_method() noexcept {        ...        cancel_timer(timer_id);    }    r::request_id_t timer_id;};

Надо отметить, что к моменту завершения работы актора (shutdown_finish), все взведённые таймеры должны сработать или быть отменены, иначе это ведёт к неопределённому поведению (undefined behavior).


Поддержка отмены запросов (v0.10)


По-моему мнению, у всех сообщений в caf семантика "запрос-ответ", в то время как в sobjectizer'е все сообщения имеют обычную "отправил-и-забыл" ("fire-and-forget") семантику. rotor поддерживает оба типа, причём по-умолчанию сообщения являются "отправил-и-забыл", а "запрос-ответ" можно сделать поверх обычных сообщений.


В обоих фреймворках, caf и sobjectizer, у каждого актора есть управляемая очередь сообщений, что обозначает, что фреймворк не доставляет новое сообщение, пока предыдущее не было обработано. В противоположность этим фреймворкам, в rotor'е нет управляемой очереди сообщений, что обозначает, что актор сам должен создать свою собственную очередь и заботиться о перегрузках при необходимости. Для мгновенно обрабатываемых сообщений типа "пинг-понг" это не имеет особого значений, однако для "тяжёлых" запросов, которые в процессе своей обработки делают ввод-вывод (I/O), разница может быть существенна. Например, если актор опрашивает удалённую сторону через HTTP-запросы, нежелательно начинать новый запрос, пока предыдущий ещё не закончился. Ещё раз: сообщение не доставлено, пока предыдущее не обработано, и не важно, что за типа сообщения.


Это также обозначает, что для управляемых очередей сообщений отмена запросов в общем случае невозможна, т.к. сообщение-отмена находится в очереди, и у него нет шансов быть обработанным до тех пор, пока предыдущее сообщение-запрос не будет обработано.


В rotor'е, при необходимости, надо создавать собственную очередь запросов, и, если сообщение-отмена поступает, то актор должен либо поискать в очереди и отменить его, либо, если сообщение уже обрабатывается, отменить соответствующие операции ввода-вывода, и, в конечном итоге, ответить на исходный запрос со статусом "отменено". Нужно заметить, что только сообщения-запросы могут быт отменены, т. к. у них есть внутренний идентификатор.


namespace r = rotor;namespace payload {struct pong_t {};struct ping_t {    using response_t = pong_t;};} // namespace payloadnamespace message {using ping_request_t = r::request_traits_t<payload::ping_t>::request::message_t;using ping_response_t = r::request_traits_t<payload::ping_t>::response::message_t;using ping_cancel_t = r::request_traits_t<payload::ping_t>::cancel::message_t;} // namespace messagestruct some_actor_t: r::actor_base_t {    using ping_ptr_t = r::intrusive_ptr_t<message::ping_request_t>;    void on_ping(ping_request_t& req) noexcept {        // just store request for further processing        ping_req.reset(&req);    }    void on_cancel(ping_cancel_t&) noexcept {        if (req) {            // make_error is v0.14 feature            make_response(*req, make_error(r::make_error_code(r::error_code_t::cancelled)));            req.reset();        }    }    // простейшая "очередь" из одного сообщения.    ping_ptr_t ping_req;};

Вообще говоря, отмена запросов может быть сделана и в sobjectizer'е, однако, там, во-первых, нужно сделать собственный механизм "запрос-ответ", и, во-вторых, свою собственную очередь поверх той, что уже предоставляется sobjectizer'ом, т. е. ценой дополнительных накладных расходов. Впрочем, по-моему мнению, парадигма "запрос-ответ" несколько инородна для sobjectizer'а, поскольку он скорее ориентирован на блокирующие операции в обработчиках сообщений, которые, конечно, не могут быть отменены, вне зависимости от используемого фреймворка.


std::thread backend/supervisor (v0.12)


Это давно ожидаемая возможность, которая делает rotor похожим на sobjectizer: в случае, когда актору нужно осуществить блокирующие операции и ему не нужен никакой цикл событий. Например, обработчик сообщений должен выполнить интенсивные вычисления на центральном процессоре.


Очевидно, что во время блокирующих операций нет возможности сработать таймерам или другим сообщениям быть доставленными. Другими словами, во время блокирующих операций актор теряет свою реактивность, так как он не может реагировать на входящие сообщения. Чтобы преодолеть это, блокирующие операции должны быть разбиты на более мелкие итеративные куски; а когда актор закончит обрабатывать текущий кусок работы, он посылает себе сообщение с номером следующего куска для обработки, и так далее, пока все части не будут обработаны. Это даст текущему потоку выполнения немного воздуха, чтобы доставить другие сообщения, выполнить код, связанный с истекшими таймерами и т.п. Например, вмето того, чтобы вычислять свёртку sha512 для всего файла размером 1TB, задание может быть разделено на вычисление свёрток помегабайтово, что сделает поток вычисления достаточно реактивным. Данный подход универсален и применим для любого актороного фреймворка.


Само собой разумеется, что целая иерархия акторов может быть запущена на std::thread бэкенде, не только один актор. Следующий нюанс, на который следует обратить внимание, это то, что rotor'у надо подсказать, какие обработчики сообщений "тяжёлые" (блокирующие), чтобы обновить таймеры после этих обработчиков. Это делается во время подписки, т.е.:


struct sha_actor_t : public r::actor_base_t {    ...    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        r::actor_base_t::configure(plugin);        plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {            p.subscribe_actor(&sha_actor_t::on_process)->tag_io(); // важно        });    }

Полный исходный код реактивного актора, который вычисляет свёртку sha512, который реагирует на CTRL+c, доступен по ссылке.


Идентификация Акторов (v0.14)


Канонический способ идентификации актора по основной адресу (в rotor'е у актора может быть несколько адресов, аналогично как в sobjectizer'е агент может быть подписан на несколько mbox'ов). Однако, иногда желательно вывести в лог имя актора, который завершил работу, в соответствующем колбеке в его супервайзере:


struct my_supervisor_t : public r::supervisor_t {    void on_child_shutdown(actor_base_t *actor) noexcept override {        std::cout << "actor " << (void*) actor->get_address().get() << " died \n";    }}

Адрес актора динамичен и меняется при каждом запуске программы, так что эта информация почти бесполезна. Чтобы она имела смысл, актор должен напечатать свой основной адрес, где, например, в перекрытом методе on_start(). Однако, это решение не очень удобно, поэтому было решено ввести свойство std::string identity непосредственно в базовый класс actor_base_t. Таким образом, идентичность актора может быть выставлена в конструкторе или во время конфигурации плагина address_maker_plugin_t:


struct some_actor_t : public t::actor_baset_t {    void configure(r::plugin::plugin_base_t &plugin) noexcept override {        plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) {            p.set_identity("my-actor-name", false);        });        ...    }};

Теперь можно выводить идентичность актора в его супервайзере:


struct my_supervisor_t : public r::supervisor_t {    void on_child_shutdown(actor_base_t *actor) noexcept override {        std::cout << actor->get_identity() << " died \n";    }}

Иногда акторы уникальны в одной программе, а иногда сосуществуют несколько экземпляров одного и того же класса акторов. Чтобы различать между их, адрес каждого может быть добавлен к имени актора. В этом и смысл второго параметра bool в методе set_identity(name, append_addr) выше.


По умолчанию идентичность актора равна чему-то вроде actor 0x7fd918016d70 или supervisor 0x7fd218016d70. Эта возможность появилась в rotor'е начиная с версии v0.14.


Extended Error вместо std::error_code, shutdown reason (v0.14)


Когда случается что-то непредвиденное в процессе обработки запроса, до версии v0.14, ответ содержал код ошибки в виде std::error_code. Такой подход хорошо служил своей цели, но, ввиду, иерархичной природы rotor'а, этого оказалось не достаточно. Представим себе случай: супервыйзер запускает двух акторов, и у одного из них произошёл сбой в инициализации. Супервайзер экскалирует проблему, т. е. выключается сам и шлёт запрос на выключение второму актору. Однако, на момент, когда второй актор выключился, исходный контекст уже был потерян, и совершенно неясно, почему он выключился. А причина кроется в том, что std::error_code не содержит в себе достаточно информации.


Поэтому было решено ввести собственный класс расширенной ошибки rotor::extended_error_t, который содержит std::error_code, std::string в качестве контекста (обычно это идентичность актора), а также умный указатель на следующую расширенную ошибку, что вызвала текущую. Теперь схлопывание иерархии акторов может быть представлено цепочкой расширенных ошибок, где причина выключения каждого актора может быть отслежена:


struct my_supervisor_t : public r::supervisor_t {    void on_child_shutdown(actor_base_t *actor) noexcept override {        std::cout << actor->get_identity() << " died, reason :: " << actor->get_shutdown_reason()->message();    }}

выведет что-то вроде:


actor-2 due to supervisor shutdown has been requested by supervisor <- actor-1 due initialization failed

Вместе с идентификацией акторов данная возможность предоставляет больше диагностических инструментов для разработчиков, которые используют rotor.

Подробнее..

Перевод Основы Cat Concurrency с Ref и Deferred

10.03.2021 18:12:48 | Автор: admin

Параллельный доступ и ссылочная прозрачность

Для будущих учащихся на курсе Scala-разработчик приготовили перевод материала.

Приглашаем также на вебинар по теме
Эффекты в Scala. На занятии рассмотрим понятие эффекта и сложности, которые могут возникать при их наличии. Также введем понятие функционального эффекта, рассмотрим его свойства и реализуем свой небольшой функциональный эффект. Присоединяйтесь.


*Concurrency конкурентность, допускающая одновременное выполнение нескольких вычислительных процессов.

Ref и Deferred являются основными строительными блоками в FP, используемыми параллельно, в манере concurrent. Особенно при использовании c tagless final (неразмеченной конечной) абстракцией, эти два блока, при построении бизнес-логики, могут дать нам и то, и другое: параллельный доступ (concurrent access) и ссылочную прозрачность (referential transparency), и мы можем использовать их для построения более продвинутых структур, таких как counters (счетчики) и state machines (конечные автоматы).

Перед тем, как мы углубимся в Ref и Deferred, нам полезно узнать, что concurrency в Cats строится на Java AtomicReference, и здесь мы и начнем наше путешествие.

Atomic Reference

AtomicReference это один из элементов пакета java.util.concurrent.atomic. В Oracle docs мы можем прочитать, чтоjava.util.concurrent.atomic это:

Небольшой инструментарий классов, поддерживающих потокобезопасное программирование без блоков с одиночными переменными. По сути, классы в данном пакете расширяют понятие volatile значений, полей и элементов массива до тех, которые также обеспечивают условную операцию atomic обновления

Экземпляры классов AtomicBoolean, AtomicInteger, AtomicLong, и AtomicReference обеспечивают доступ и обновление от одиночных переменных к соответствующему типу (функционального блока).

AtomicReference с нами начиная с Java 1.5 и используется для получения лучшей производительности, чем синхронизации (хотя это не всегда так).

Когда вам приходится совместно использовать некоторые данные между нитями (threads), вы должны защитить доступ к этой части данных. Самым простым примером будет увеличение некоторого количества int: i = i + 1. Наш пример состоит из фактически 3 операций, сначала мы читаем значение i , затем добавляем 1 к этому значению, а в конце снова присваиваем вычисленное значение i . В отношении многопоточных приложений, мы можем столкнуться с ситуацией, когда каждый thread будет выполнять эти 3 шага между шагами другого thread, а конечное значение i предсказать не удастся.

Обычно в вашей голове появляется слово synchronised или механизм класса lock, но с atomic.* вам больше не нужно беспокоиться о явной синхронизации, и вы можете перейти на предоставленные atomic (атомарные) типы утилит, где проверка выполнения операции в один шаг включается автоматически.

Давайте, возьмем для примера AtomicInteger.incrementAndGet:

/**     * Atomically increments by one the current value.     *     * @return the updated value     */    public final int incrementAndGet() {        for (;;) {            int current = get();            int next = current + 1;            if (compareAndSet(current, next))                return next;        }    }

С помощью операции compareAndSet мы либо обновляем наши данные, либо терпим неудачу, но никогда не заставляем thread ждать. Таким образом, если операция compareAndSet в incrementAndGet не удаётся, мы просто пытаемся повторить всю операцию заново, извлекая текущее значение наших данных с помощью функции get() в начале. С другой стороны, при использовании синхронизированных механизмов нет ограничений на количество операторов (statement), которые вы хотите выполнить во время блокировки, но этот блок никогда не выйдет из строя и может заставить вызывающий thread ждать, предоставляя возможность заблокировать или снизить производительность.

Теперь, зная определенные основы, давайте перейдем к нашей первой мега-звезде concurrency.

Ref

Ref в Cats очень похож на упомянутую выше atomic (атомарную) ссылку Java. Основные отличия заключаются в том, что Ref используется с tagless final абстракцией F . Он всегда содержит значение, а значение, содержащееся в Ref типа A, всегда является неизменным (immutable).

abstract class Ref[F[_], A] {  def get: F[A]  def set(a: A): F[Unit]  def modify[B](f: A => (A, B)): F[B]  // ... and more}

Ref[F[_], A] это функциональная изменяемая (mutable) ссылка:

  • Concurrent ( конкурентная)

  • Lock free ( без блоков)

  • Всегда содержит значение

Она создается путем предоставления начального значения, и каждая операция осуществляется в
F, например, cats.effect.IO.

Если мы внимательно посмотрим на сопутствующий объект для Cats Ref, мы увидим, что наша F должна соответствовать некому требованию, а именно быть Sync.

def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))

Вышеприведенный метод является лишь примером многих операций, доступных на нашем Ref; он используется для построения Ref с исходным значением.

Sync дает нам возможность приостанавливать любые побочные эффекты с помощью метода
delayдля каждой операции на Ref.

Ref довольно простая конструкция, мы можем сосредоточиться в основном на ее get, set и of чтобы понять, как она работает.

Метод get and set

Допустим, у нас есть объект (для этого блога мы назовем его Shared), который нужно обновить несколькими threads, и мы используем для этого наши методы get и set , создавая утилитный метод, который поможет нам в дальнейшем:

def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] = {for {sh <- trace.get()_ <- trace.set(Shared(sh, msg))} yield ()}

Наш Shared объект может быть построен путем использования его предыдущего состояния и нового значения для создания нового экземпляра Shared, который может быть на самом деле всем, что мы хотим простым списком, картой или чем угодно, к чему мы хотим получить одновременный безопасный доступ.

Я только что создал Shared(prev: Shared, msg: String) для данной статьи.

В нашем примере выше F был заменён конкретным IO из Cats Effect, но имейте в виду, что Ref является полиморфным в F и может быть использован с другими библиотеками.

С помощьюmonadic (монадический) IO мы применяем функцию flatMap на каждом шаге и устанавливаем значение, сохраненное в нашем Ref на желаемое значение или... подождите, может быть, мы этого не делаем.

При таком подходе, когда modifyShared будет вызываться одновременно, и мы можем потерять обновления! Это происходит потому, что мы можем столкнуться с ситуацией, когда, например, двое threads могут прочитать значение с помощью get и каждый из них будет выполнять set одновременно. Методы get и set не вызываются атомарно (atomically) вместе.

Atomic (атомарный) update

Конечно, мы можем улучшить приведенный выше пример и использовать другие доступные методы из Ref. Для совместной реализации get и set мы можем использовать update.

def update(f: A => A): F[Unit] 

Это решит нашу проблему с обновлением значения, однако update имеет свои недостатки. Если мы захотим обратиться к переменной сразу после обновления, аналогично тому, как мы использовали get и set , мы можем в итоге получить устаревшие данные, допустим, наш Ref будет содержать ссылку на Int:

for {_ <- someRef.update(_ + 1)curr <- someRef.get_ <- IO { println(s"current value is $curr")}} yield ()

Нас спасет modify

Мы можем немного улучшить вышеупомянутую ситуацию, используя modify , которая будет делать то же самое, что и update , но тем не менее, modify вернет нам обновленное значение для дальнейшего использования.

def modify[B](f: A => (A, B)): F[B] = {      @tailrec      def spin: B = {        val c = ar.get        val (u, b) = f(c)        if (!ar.compareAndSet(c, u)) spin        else b      }      F.delay(spin)    }

Как видите, это практически та же имплементация, что и в примере с AtomicInteger.incrementAndGet, который я показывал в начале, но только в Scala. Нам четко видно, что для выполнения своей работы Ref также работает на основе AtomicReference .

Ref ограничения

Вы, вероятно, уже заметили, что в случае неудачи при обновлении значения функция, переданная update/ modify, должна быть запущена недетерминированно (nondeterministically) и, возможно, должна быть запущена несколько раз. Хорошая новость заключается в том, что это решение в целом оказывается намного быстрее, чем стандартный механизм блокировки и синхронизации, и гораздо безопаснее, так как это решение не может быть заблокировано.

Как только мы узнаем, как работает простой Ref, мы можем перейти к другому классу Cats Concurrent: Deferred (Отложенный вызов).

Deferred

В отличие от Ref, Deferred:

  • создается пустым (отложенный результат выполнения)

  • может быть выполнен один раз

  • и после установки его нельзя изменить или снова сделать пустым.

Эти свойства делают Deferred простым и в то же время довольно интересным.

abstract class Deferred[F[_], A] {  def get: F[A]  def complete(a: A): F[Unit]}

Deferred используется для явной функциональной синхронизации. Когда мы вызываем get в пустой Deferred мы устанавливаем блокировку до того момента, как значение станет вновь доступно. В соответствии с документацией из самого класса:

  • Блокировка указана только семантическая, никакие реальные threads (нити) не блокируются имплементацией

Тот же вызов get непустого Deferred немедленно вернет сохраненное значение.

Другой метод complete заполнит значение, если экземпляр пуст и при вызове непустого Deferred приведет к сбою (неудачная попытка IO).

Здесь важно отметить, что Deferred требует, чтобы F было Concurrent, что означает, что его можно отменить.

Хорошим примером использования Deferred является ситуация, когда одна часть вашего приложения должна ждать другую.

Пример ниже взят из великолепного выступления Фабио Лабеллы на выставке Scala Italy 2019 Composable Concurrency with Ref + Deferred available at Vimeo

def consumer(done: Deferred[IO, Unit]) = for {c <- Consumer.setup_ <- done.complete(())msg <- c.read_ <- IO(println(s"Received $msg"))} yield ()def producer(done: Deferred[IO, Unit]) = for {p <- Producer.setup()_ <- done.getmsg = "Msg A"_ <- p.write(msg)_ <- IO(println(s"Sent $msg"))} yield ()def prog = for {  d <- Deferred[IO, Unit]  _ <- consumer(d).start  _ <- producer(d).start} yield ()

В приведенном выше примере у нас есть producer (производитель) и consumer (потребитель), и мы хотим, чтобы producer ждал, пока consumer setup закончится, прежде чем писать сообщения, в противном случае все, что бы мы ни написали в producer, будет потеряно. Для преодоления этой проблемы мы можем использовать общий экземпляр Deferred и блокировать get до тех пор, пока не будет заполнен экземпляр done Deferred со стороны consumer (значение в данном случае простая Unit () ).

Конечно, вышеуказанное решение не обошлось без проблем, когда consumer setup никогда не прекращался, мы застревали в ожидании, а producer не мог отправлять сообщения. Чтобы преодолеть это, мы можем использовать таймаут с get , а также использовать Either[Throwable, Unit] или какую-либо другую конструкцию вместо простой Unit внутри нашего объекта Deferred.

Deferred довольно прост, но в сочетании с Ref он может быть использован для построения более сложных структур данных, таких как semaphores (семафоры).

Для получения более подробной информации я рекомендую вам ознакомиться с самой документацией о Cats, где вы можете узнать больше о Cats concurrency и структуре данных, которые она предоставляет.


Узнать подробнее о курсе Scala-разработчик.

Смотреть открытый вебинар по теме Эффекты в Scala.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru