Русский
Русский
English
Статистика
Реклама

Threads

Разгоняем REACTOR

12.06.2021 18:20:44 | Автор: admin

Кому будет интересно?

Реактор сегодня - это стильно, модно, молодежно. Почему многие из нас практикуют реактивное программирование? Мало кто может ответить однозначно на этот вопрос. Хорошо - если Вы понимаете свой выигрыш, плохо - если реактор навязан организацией как данность. Большинство аргументов "ЗА" - это использование микросервисной архитектуры, которая в свою очередь обязывает микросервисы часто и много коммуницировать между собой. Для коммуникации в большинстве случаев выбирают HTTP взаимодействие. Для HTTP нужен легковесный веб-сервер, а что первое приходит на ум? Tomcat. Тут появляются проблемы с лимитом на максимальное количество сессий, при превышении которого веб-сервер начинает реджектить запросы (хотя лимита этого не так уж и легко достичь). Здесь на подмогу приходит реактор, который подобными лимитами не ограничен, и, например, Netty в качестве веб-сервера, который работает с реактивностью из коробки. Раз есть реактивный веб-сервер, нужен реактивный веб-клиент (Spring WebClient или Reactive Feign), а раз клиент реактивный, то вся эта жуть просачивается в бизнес логику, Mono и Flux становятся Вашими лучшими друзьями (хотя по началу есть только ненависть :))

Среди бизнес задач, очень часто встречаются серьезные процедуры, которые обрабатывают большие массивы данных, и нам приходится применять реактор и для них. Тут начинаются сюрпризы, если реактор не уметь готовить, можно и проблем схлопотать очень много. Превышение лимита файловых дескрипторов на сервере, OutOfMemory из-за неконтролируемой скорости работы неблокирующего кода и многое многое другое, о чем мы сегодня поговорим. Мы с коллегами испытали очень много трудностей из-за проблем с пониманием как держать реактор под контролем, но всё что нас не убивает - делает нас умнее!

Блокирующий и неблокирующий код

Вы ничего не поймете дальше, если не будете понимать разницу между блокирующим и неблокирующим кодом. Поэтому, остановимся и внимательно разберемся в чем разница. Вы уже знаете, блокирующий код реактору - враг, неблокирующий - бро. Проблема лишь в том, что в настоящий момент времени, не все взаимодействия имеют неблокирующие аналоги.

Лидер здесь - HTTP взаимодействие, вариантов масса, выбирай любой. Я предпочитаю Reactive Feign от Playtika, в комбинации со Spring Boot + WebFlux + Eureka мы получаем очень годную сборку для микросервисной архитектуры.

Давайте по-простому: НЕблокирующий код, это обычно всё, в названии чего есть reactive, а блокирующий - все оставшееся :) Hibernate + PostgreSQL - блокирующий, отправить почту через JavaMail - блокирующий, скинуть сообщение в очередь IBMMQ - блокирующий. Но есть, например, реактивный драйвер для MongoDB - неблокирующий. Отличительной особенностью блокирующего кода, является то, что глубоко внутри произойдет вызов метода, который заставит Ваш поток ждать (Thread.sleep() / Socket.read() и многие подобные), что для реактора - как нож в спину. Что же делать? Большинство бизнес логики завязано на базу данных, без нее никуда. На самом деле достаточно знать и уметь делать 2 вещи:

  • Необходимо понимать где блокирующий код. В этом может помочь проект BlockHound или его аналоги (тут тема для отдельной статьи)

  • Исполнение блокирующего кода необходимо переключать на пулы, готовые его выполнять, например: Schedulers.boundedElastic(). Делается это при помощи операторов publishOn & subscribeOn

Разгоняемся сами

Перед тем, как продолжить, необходимо немного размяться!

Уровень 1

    @Test    fun testLevel1() {        val result = Mono.just("")            .map { "123" }            .block()        assertEquals("123", result)    }

Начнем с простого, такой код обычно пишут начинающие reactor программисты. Как начать цепочку? Mono.just и ты на коне :) Оператор map трансформирует пустую строку в "123" и оператор block делает subscribe.

Обращаю особенное внимание на оператор block, не поддавайтесь соблазну использовать его в Вашем коде, исключение составляют тесты, где это очень удобно. При вызове block внутри метода Вашего RestController, Вы сразу получите исключение в рантайме.

Уровень 2

    fun nonBlockingMethod1sec(data: String)     = data.toMono().delayElement(Duration.ofMillis(1000))    @Test    fun testLevel2() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { nonBlockingMethod1sec(it) }            .block()        assertEquals("Hello world", result)    }

Усложняем наш код, добавляем неблокирующий метод nonBlockingMethod1sec, все что он делает - ожидает одну секунду. Все что делает данный код - дважды, по очереди, запускает неблокирующий метод.

Уровень 3

    fun collectTasks() = (0..99)    @Test    fun testLevel3() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { businessContext ->                collectTasks()                    .toFlux()                    .map {                        businessContext + it                    }                    .collectList()            }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Начинаем добавлять самое интересное - Flux! У нас появляется метод collectTasks, который собирает массив из сотни чисел, и далее мы делаем из него Flux - это будет наш список задач. К каждой задаче мы применяем трансформацию через оператор map. Оператор collectList собирает все результаты в итоговый список для дальнейшего использования.

Здесь наш код начинает превращаться в рабочий паттерн, который можно использовать для массового выполнения задач. Сначала мы собираем некий "бизнес контекст", который мы используем в дальнейшем для выполнения задач.

Уровень 4

    fun collectTasks() = (0..100)        @Test    fun testLevel4() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { businessContext ->                collectTasks().toFlux()                    .flatMap {                        Mono.deferContextual { reactiveContext ->                            val hash = businessContext + it + reactiveContext["requestId"]                            hash.toMono()                        }                    }.collectList()            }            .contextWrite { it.put("requestId", UUID.randomUUID().toString()) }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Добавляем немного плюшек. Появилась запись данных (15) в реактивный контекст, а также чтение (10) из него. Мы почти у цели. Постепенно переходим к итоговому варианту

Уровень 5

    fun collectTasks() = (0..1000)        fun doSomethingNonBlocking(data: String)        = data.toMono().delayElement(Duration.ofMillis(1000))        fun doSomethingBlocking(data: String): String {        Thread.sleep(1000); return data    }    val pool = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "test-pool")    private val logger = getLogger()    @Test    fun testLevel5() {        val counter = AtomicInteger(0)        val result = nonBlockingMethod1sec("Hello world")            .flatMap { _ ->                collectTasks().toFlux()                    .parallel()                    .runOn(pool)                    .flatMap {                        Mono.deferContextual { _ ->                            doSomethingNonBlocking(it.toString())                                .doOnRequest { logger.info("Added task in pool ${counter.incrementAndGet()}") }                                .doOnNext { logger.info("Non blocking code finished ${counter.get()}") }                                .map { doSomethingBlocking(it) }                                .doOnNext { logger.info("Removed task from pool ${counter.decrementAndGet()}") }                        }                    }.sequential()                    .collectList()            }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Вот мы и добрались до итогового варианта! Часть с реактивным контекстом была опущена для более наглядной демонстрации того, зачем мы здесь собрались. У нас появились два новых метода: doSomethingNonBlocking (3) & doSomethingBlocking (6) - один с неблокирующим ожиданием в секунду, второй с блокирующим. Мы создали пул потоков для обработки задач (10), добавили счетчик активных задач в реакторе (15). У нас появился оператор parallel (19) и обратный ему sequential (29). Задачи мы назначили на свежесозданный пул (20). Для понимания, что же происходит внутри, добавили логирование внутри операторов doOnRequest (вызывается перед исполнением метода), doOnNext (вызывается после исполнения метода). Основная задумка - на примере, определить сколько задач одновременно выполняется в реакторе и за какое время цепочка завершит свою работу.

Такой "паттерн", мы с коллегами очень часто применяем для выполнения сложных задач, таких как отправка отчетов или массовая обработка транзакций. Первым делом собирается бизнес контекст - это некая структура, содержащая в себе информацию, полученную в результате вызовов других микросервисов. Бизнес контекст необходим нам для выполнения самих задач, и собирается он заранее, чтобы не тратить время в процессе обработки. Далее мы собираем список задач, превращаем их во Flux и скармливаем реактору на параллельную обработку.

И вот здесь начинается самое интересное. Попробуйте ответить на несколько вопросов. Как Вы считаете, сколько времени будет выполнятся данная цепочка? В ней 100 задач, в каждой задаче неблокирующее ожидание в 1 секунду, блокирующее ожидание в 1 секунду, и у нас в наличии пул из 10 потоков? (Вполне годная задачка на собеседование senior reactor developer :))

Правильный ответ

Около 12 секунд. Рассуждаем от блокирующего :) Блокирующее ожидание никуда не деть, и тут имеем 100 блокирующих секунд на 10 потоков, итого 10 секунд. Неблокирующее ожидание заметно нам лишь в первый раз, далее оно незаметно запускается в передышках между блокирующим. Не забываем про одну секунду сбора "бизнес контекста" перед запуском задач.

А теперь уберем строку (26) .map { doSomethingBlocking(it) } . Освободим наш реактор от блокирующего кода, интересно, сколько теперь времени займет выполнение цепочки?

Правильный ответ

2 секунды! 1 на сбор "бизнес контекста" и 1 на выполнение всех задач. Реактор запустит 100 задач одновременно. Но ведь у нас пул из 10 потоков? Как так? Первый разрыв шаблона.

Мы идем до конца и увеличиваем количество задач в методе collectTasks() до ... 1000? а может быть сразу до 15000? Как долго реактор будет выполнять столько задач?

Правильный ответ

2 секунды! 1 на сбор "бизнес контекста" и 1 на выполнение всех задач. Реактор запустит ВСЕ задачи одновременно. Второй разрыв шаблона. Где предел?

А это вообще легально?

Как же так и как это контролировать? Почему это опасно? Что если внутри параллельной обработки Вы решите вызвать другой микросервис? Если у вас 30000 задач, и по завершению каждой, Вам нужно отправлять запрос соседнему микросервису, Вы с удивлением можете обнаружить, что реактор непременно постарается выполнить все вызовы одновременно (Вы ведь используете реактивный web-client или реактивный feign, верно?) Открытие такого большого количества сокетов повлечет за собой превышение лимита открытых файловых дескрипторов в системе, что как минимум создаст проблемы с невозможностью создания новых сокетов в системе и помешает другим сервисам, а как максимум повалит Вам на сервере SSH и Вы потеряете доступ к серверу. Сомневаюсь, что в этот момент, программист будет кричать "зато смотри как быстро работает".

Разрыв шаблона. Thread Pool & Reactor

Основная проблема начинающего реактор программиста - это образ мышления, если есть медленный процесс - добавь X потоков, будет быстрее в X раз, а если слишком быстро - сократи количество потоков. Как всё просто было раньше? :) С реактором это не работает.

Классический thread pool - двери. Больше дверей - больше пропускная способность, все работает быстрее.

Теперь встречайте reactor! Вы видите двери? Нет никаких дверей

Реактор это большой мешок с подарками, или воздушная труба, задачи в которую валятся и летают там пока не выполнятся. А кто эти люди в желтом? Это наши epoll реактивные потоки, которые ни в коем случае нельзя нагружать блокирующими задачами. Можно провести аналогию с прорабами или инженерами. Они здесь, чтобы управлять процессом, а не чтобы выполнять тяжелую работу. Займите одного инженера тяжелой задачей, и когда к нему придет следующий рабочий с вопросом "что делать дальше?", он не сможет ответить, потому что был занят. Вот так и появляются таймауты в реактивном коде. Казалось бы микросервис стоит без нагрузки, выполняет какие-то задачки, а один из 500 запросов к нему падает с тайм-аутом, и непонятно почему. Велика вероятность что инженер был занят блокирующей задачей! Заботьтесь о своих инженерах и поручайте тяжелую работу специально обученным рабочим, например, Schedulers.boundedElastic().

Как контролировать эту "трубу", в которую валится всё без контроля? Вот мы и подошли к кульминации

Конфигурируем реактор!

В своей дефолтной конфигурации, параллельная обработка в реакторе зависит от количества ядер процессора сервера, на котором запускается код, поэтому, к своему удивлению, Вы получите разные результаты, проверяя работу реактора в тесте на локальной машине с 4-8 ядрами и production сервере с 32 ядрами.

Парад настроек открывает parallel с его аргументом parallelism

Меняя parallelism, мы можем регулировать количество запускаемых rails (это местное понятие реактора, которое похоже на корутины, но по сути является количеством одновременно выполняемых неблокирующих задач). Prefetch мы рассмотрим более подробно в следующем разделе.

Но одного parallelism недостаточно, реактор все еще будет нагребать задач как не в себя.

Мало кто обращал внимание что у оператора flatMap (только того что запускается на Flux) есть перегрузки с интересными аргументами, а именно maxConcurrency

maxConcurrency очень важен, по дефолту значение стоит Integer.MAX_VALUE (определяет сколько неблокирующих задач может выполняться одновременно на одной рельсе. Понимаете теперь откуда аппетит у реактора?

Также, не стоит забывать, что если цепочка будет запущена несколько раз (вызов одного http метода контроллера несколько раз), то все помножится! Никакой пул не спасет.

Количество запусков цепочки напрямую влияет на количество одновременно выполняемых задач.

Подведем небольшой итог:

  • parallel (parallelism)

  • flatMap (maxConcurrency)

  • Количество запусков цепочки

Эти три параметра являются множителями, для расчета количества одновременных задач.

По дефолту это Кол-во ядер * Integer.MAX_VALUE * Количество запусков цепочки

Напротив же, запустив данный код для 5 задач длительностью в секунду мы получим цепочку работающую 5 секунд. Теперь всё под контролем!

        val result = nonBlockingMethod1sec("Hello world")            .flatMap { _ ->                collectTasks().toFlux()                    .parallel(1)                    .runOn(pool, 1)                    .flatMap({                        Mono.deferContextual { _ ->                            doSomethingNonBlocking(it.toString())                        }                    }, false, 1, 1)                    .sequential()                    .collectList()            }            .block()!!

Стоп, или не всё?

Thread Pool

Зачем же нужен пул потоков в реакторе? Думайте о нем как о двигателе для Вашего автомобиля. Чем пул мощнее - тем блокирующие задачи будут разбираться быстрее, а если потоков мало, то и блокирующие задачи задержатся у вас надолго! А куда же мы без блокирующих вызовов? На количество одновременно выполняемых задач в реакторе он не влияет, вот это поворот :)

Надеюсь, Вы не пробовали использовать Schedulers.parallel() для исполнения Вашего блокирующего кода? =) Несмотря на свое подходящее название (ну называется он parallel, значит и нужен для параллельной обработки) использовать этот пул можно только для неблокирующего кода, в доке указано что он живет с одним воркером, и содержит в себе только особенные, реактивные потоки.

Распределение задач по рельсам

Не коснулись мы еще одной важной темы. Обычно, мы пытаемся закончить обработку большого массива данных в кратчайший срок, с чем нам определенно поможет изложенный выше материал, но это еще не все. В тестах мы часто используем синтетические данные, которые генерируем одинаковыми порциями, исключая погрешности production среды. Задачи обычно выполняются разное время и это создает проблемы с равномерным распределением задач.

Зеленые прямоугольники это наши задачи, которые распределяются в реакторе по алгоритму round-robin, что в случае с синтетическими данными дает красивую картинку.

Хорошо загруженный реактор (задачи равномерно распределены). 54 блокирующих задачи (каждая по 1сек), round-robin распределение по 6 рельсамХорошо загруженный реактор (задачи равномерно распределены). 54 блокирующих задачи (каждая по 1сек), round-robin распределение по 6 рельсам

Но запуская код в production среде, мы можем встретиться с долгим запросом в базу, сетевыми задержками, плохим настроением микросервиса да и чего только не бывает.

Плохо загруженный пул (задачи распределены не равномерно)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамПлохо загруженный пул (задачи распределены не равномерно)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсам

Оператор collectList() вернет нам результат только после завершения последней задачи, и как мы видим, наш пул будет простаивать пока 1 поток трудится разгребая очередь накопившихся задач. Это создает неприятные задержки, когда Вы знаете что можно быстрее, но быстрее не происходит.

Бороться с этим можно несколькими способами

  • concatMap вместо flatMap (посмотрите в профилировщик на ваш пул, передумаете)

  • правильно планировать задачи, чтобы исключить аномалии (почти невозможно)

  • дробить каждую задачу на много мелких, и также запускать их в параллельную обработку чтобы нивелировать проблемы с распределением (вполне рабочий вариант)

  • prefetch (наш выбор!)

Параметр prefetch у flatMap & runOn позволяет определить, сколько задач будет взято на одну рельсу на старте, а затем при достижении некоторого порога выполнения задач, реквесты будут повторяться с этим количеством. Значение по умолчанию - 256. Сменив значение на 1, можно заставить реактор использовать механизм "work stealing", при котором, рельсы и потоки, которые освободились, будут забирать задачи себе на выполнение и картина получится гораздо более приятная.

Хорошо загруженный пул (задачи равномерно распределены)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамPrefetch !Хорошо загруженный пул (задачи равномерно распределены)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамPrefetch !

На этом у меня всё. Будет интересно прочесть Ваши замечания и комментарии, на 100% истину не претендую, но все результаты подкреплены практическими примерами, на Spring Boot + Project Reactor 3.4. Всем спасибо!

Подробнее..
Категории: Kotlin , Java , Concurrency , Parallel , Reactor , Parallelism , Mono , Threads , Flux , Pool , Prefetch

Project Loom Современная маcштабируемая многопоточность для платформы Java

19.02.2021 18:05:42 | Автор: admin


Эффективное использование многочисленных ядер современных процессоров сложная, но всё более важная задача. Java была одним из первых языков программирования со встроенной поддержкой concurrency. Ее concurrency-модель, основанная на нативных тредах, хорошо масштабируется для тысяч параллельно выполняющихся стримов, но оказывается слишком тяжеловесной для современного реактивного программирования с сотнями тысяч параллельных потоков.


Ответ на эту проблему Project Loom. Он определяет и реализует в Java новые легковесные параллельные примитивы.


Алан Бейтман, руководитель проекта OpenJDK Core Libraries Project, потратил большую часть последних лет на проектирование Loom таким образом, чтобы он естественно и органично вписывался в богатый набор существующих библиотек Java и парадигм программирования. Об этом он и рассказал на Joker 2020. Под катом запись с английскими и русскими субтитрами и перевод его доклада.



Меня зовут Алан Бейтман, я работаю в группе Java Platform в Oracle, преимущественно над OpenJDK. Сегодня я буду говорить о Project Loom.


Мы занялись этим проектом в конце 2017 года (точнее, технически в начале 2018-го). Он появился как проект в OpenJDK для того, чтобы упростить написание масштабируемых многопоточных приложений. Цель в том, чтобы позволить разработчикам писать масштабируемые многопоточные приложения в так называемом синхронном стиле. Это достигается путем доведения базовой единицы многопоточности потока до такой легковесности, чтобы им можно было представлять любую параллельную задачу. Даже задачи, которые блокируются или выполняются в течение длительного времени.


Кто-то говорит: чем больше несвязанных запросов поступает в серверное приложение, тем большей степени многопоточности можно достичь.


План выступления такой:


  1. Начну с пары слов о мотивации этого проекта.
  2. Поговорю о том, как мы имплементировали эти так называемые легкие потоки.
  3. Переключусь на IDE и покажу несколько демо, напишу немного кода.
  4. Наконец, рассмотрю другие аспекты проекта.

Потоки


Платформа Java (и язык, и JVM) во многом построена на концепции потоков:


  • Если вы сталкиваетесь с исключением, то получаете трассировку стека определенного потока.
  • Вы можете связать некоторые данные с потоками, используя ThreadLocal.
  • Если вы находитесь в отладчике и выполняете пошаговое выполнение кода, вы шагаете по выполнению потока. Когда вы нажимаете step over, это означает переход к следующей инструкции в потоке, с которым вы работаете.
  • А когда вы находитесь в профайлере, профайлеры обычно группируют данные по потокам, сообщают вам, какие потоки выполняются и что они делают.

В общем, всё, что касается платформы и инструментов, связано с потоками.


В Java API поток означает java.lang.Thread. В реализации JDK есть только одна реализация потока, которая фактически основана на потоке операционной системы. Между java.lang.Thread и потоком ОС существует связь один-к-одному. Те из вас, кто уже давно работает с платформой Java, могут вспомнить зелёные потоки в ранних выпусках JDK. Я немного расскажу об этом позже. Но по меньшей мере последние 20 лет, когда мы говорим о java.lang.Thread, мы говорим о тонкой оболочке вокруг потока ОС.


Cами потоки ОС ничего не знают о Java. Они очень общие. Обычно они должны поддерживать множество разных языков и сред выполнения. Они ничего не знают о том, как эти языки и программы используют стек, и им обычно приходится выделять очень большой фиксированный стек, чтобы поддерживать универсальность
программ, которые должны выполняться.


Другой аспект потоков операционной системы заключается в том, что они проходят через ядро ОС для переключения контекста, что обычно требует
значительное количество времени. В течение многих лет это было около микросекунды или больше.


Еще одна особенность потоков операционной системы заключается в том, что ядро ОС должно иметь некоторую форму планирования. Ему нужно выбрать, какой поток
запускать на каждом ядре процессора. Веб-серверы ведут себя совсем иначе, чем, например, поток с вычислениями для воспроизводения видео. Планировщик ОС должен быть очень общим и в некотором роде компромиссным, чтобы поддерживать все различные варианты использования.


Давайте немного поговорим об использовании потоков. Можно писать многопоточные приложения в стиле, который мы называем синхронным. Простой, императивный блокирующий синхронный код. Задачи выполняются в одном потоке от начала до конца. Код обычно очень легко читать и писать. Именно так большинство из нас, я думаю, учились его писать. Мы научились работать так с одним потоком до того, как узнали, что такое поток.


Этот синхронный стиль очень хорошо сочетается с дизайном языка Java. Он очень хорошо сочетается с инструментами. И в целом, как любит говорить мой коллега Рон Пресслер, гармонично сочетается с платформой. Но поскольку поток по сути представляет собой тонкую оболочку вокруг потока ОС, это ограниченный ресурс.


Если каждая транзакция или запрос использует поток, то максимальное количество потоков это максимальное количество транзакций, которые система может обрабатывать за раз. По сути, это наш уровень многопоточности.


А с современным сервером теоретически вы можете иметь миллионы сетевых подключений. Я видел, как Хайнц Кабуц делает демо с Project Loom, где он фактически использовал два миллиона соединений. И серверы могут поддерживать подобное, если у них достаточно памяти.


Итак, если у вас миллионы сетевых подключений, но только тысячи активных потоков, потоки становятся вашим ограничивающим фактором, если на каждое соединение приходится один поток. Так что они ограничивают наш уровень многопоточности. И это, безусловно, может сильно повлиять на пропускную способность.


Ладно, что нам с этим делать?


Если потоки дорогой ресурс, почему бы нам ими не делиться? Это означает пулы потоков. Вместо создания потока для каждого запроса или транзакции мы заимствуем поток из пула, выполняем транзакцию, а затем возвращаем его в пул.


Но у пулов много проблем. Думаю, большинство тех, кто их использовал, знает о проблеме того, как они засоряют ThreadLocals. Эта так называемая утечка памяти с пулами потоков существует долгое время. Также проблематична отмена: если я хочу отменить транзакцию, но она почти завершена, я, возможно, прерываю поток после того, как он был взят для другой транзакции.


И даже если бы мы решили приведенные проблемы, этого все равно недостаточно, потому что мы все еще выполняем транзакцию от начала до конца в одном потоке. Это занимает его на протяжении всей операции, а уровень многопоточности по-прежнему ограничен количеством потоков, которые может обработать ОС.


Мы можем заметить, что для большого количества запросов и транзакций,
которые не упираются в CPU, поток проводит большую часть своего времени в ожидании, в блокировке, ожидая базу данных, IO, что-то еще. А нам, на самом деле, не нужен поток, когда он ожидает.


Предположим, что мы использовали подход, при котором поток во время ожидания возвращается в пул. Это позволяет одновременно выполнять гораздо больше транзакций. Проблема в том, что многие API, блокирующие API, удерживают поток на протяжении какой-либо операции в ожидании блокировки, ожидании данных в сокете.


Это приводит нас к созданию новых API, по существу, несовместимых со старыми. Или в итоге у нас есть синхронные и асинхронные версии API.


Другой вопрос, что это вынуждает нас разделять транзакцию или запрос на мелкие кусочки, где разные части работают на разных потоках. Большая проблема с этим мы теряем всякую связь с потоками, контекст становится очень и очень трудно отслеживать. Например, если появилось исключение, которое было брошено для определенной части или определенного этапа транзакции, у меня нет общего контекста, где оно на самом деле было брошено.


Когда мы отлаживаем, это будет отладка лишь этапа транзакции. Если мы профилируем, то не увидим многого о том, что на самом деле происходит. Потому что многие транзакции выглядят ничего не делающими, они просто ждут IO. Профайлер, который смотрит на незанятый пул потоков, не особо много видит.


Я говорю здесь о том, что мы называем асинхронным стилем. Его преимущество в том, что он очень масштабируемый. Количество транзакций больше не ограничено количеством потоков. Но его трудно читать и иногда трудно поддерживать, потому что мы потеряли контекст.


Таким образом, в целом это позволяет нам лучше использовать аппаратные ресурсы, но наше приложение сложнее писать и поддерживать.


Что приводит нас к дилемме.



Разработчики могут написать простой синхронный код и потратить больше денег на оборудование. Или, если вы хотите эффективно использовать оборудование и управлять асинхронным приложением, тогда мы больше платим разработчикам.


Итак, как нам решить эту дилемму? Что, если бы мы могли снизить стоимость потоков и иметь их неограниченное количество? Тогда мы могли бы написать простой синхронный код, который гармонирует с платформой, полностью использует оборудование и масштабируется как асинхронный код. Project Loom именно об этом.


API


Давайте пойдем дальше и поговорим немного об API.


Если Project Loom снижает стоимость потоков, то как это будет отражаться на разработчиках и на API? Эта проблема сложнее, чем кажется на первый взгляд, и мы потратили более двух лет на борьбу с ней.


Один из вариантов, с которого мы начали и к которому в итоге вернулись, это
использование для легких форм потоков java.lang.Thread. Это старый API, который существует с JDK 1.0. Проблема в том, что у него много багажа. Там есть такие вещи, как группы потоков, загрузчик классов контекстов потоков. Есть множество полей и других API, которые связаны с потоками, которые просто не интересны.


Другой вариант начать все сначала и ввести совершенно новую
конструкцию или новый API. Если вы с самого начала интересовались Project Loom, возможно, вы видели некоторые из ранних прототипов, где мы представили для дешевых легких потоков совершенно новый API под названием fiber.


Помимо изучения API, мы также много изучали, как люди используют потоки. Оказалось, что одни их части используются очень широко, другие в меньшей степени. Thread.currentThread() используется и прямо и косвенно везде, например, для блокировки.


Вопрос, который часто возникает в викторинах: Сколько раз Thread.currentThread() используется при первом использовании популярной библиотеки логирования? Люди, не знающие ответа на этот вопрос, могут ответить 2 или 5. Правильный ответ 113.


Другой широко используемый аспект потока это ThreadLocals. Они используются везде, что иногда не радует. Если сломать Thread.currentThread() или ThreadLocals, то в контексте этих новых более дешевых потоков будет не запустить много уже существующего кода. Поэтому вначале, когда у нас был fiber API, нам пришлось эмулировать Thread API, чтобы существующий код запускался в контексте того, что называлось в то время fiber. Таким образом, мы могли уйти от кода, использующего Thread, не повредив нарыв.


Итак, .currentThread() и Threadlocals очень широко используются. Но в потоках есть и редко используемый багаж. И здесь нам немного помогает расширенная политика депрекации. Если некоторые из этих старых областей со временем могли бы исчезнуть, подвергувшись депрекации, окончательной депрекации и, в конечном итоге, удалению тогда, может быть, удастся жить с java.lang.Thread.


Два года исследований, около пяти прототипов и мы пришли к выводу, что избежать
гравитационного притяжения 25 лет существующего кода невозможно. Эти новые дешевые потоки будут представлены с существующим API java.lang.Thread. То есть java.lang.Thread будет представлять и потоки ОС, и новые дешевые потоки.


Мы также решили дать этим новым потокам имя. Оно появилось благодаря Брайану Гетцу, он придумал название виртуальный поток (virtual thread).


Использование привычного Thread хорошая новость для разработчиков. Нет новой модели программирования, нет новых концепций для изучения, вместо этого вам фактически придется отучиться от некоторых старых привычек. Когда я говорю отучиться, я имею в виду такие вещи, как пулы потоков, ThreadLocals и так далее.


Как реализованы эти виртуальные потоки?



Они мультиплексируются поверх небольшого пула потоков операционной системы. Я сказал потоки во множественном числе, и вот тут уместно вспомнить уже упомянутые green threads. Ранние выпуски JDK, особенно 1.0.1.1 с классической виртуальной машиной, поддерживали модель, где потоки мультиплексировались в один-единственный поток ОС. То, что мы делаем теперь, перекликается с этим, но сейчас речь о более чем одном потоке ОС.


Итак, у нас есть набор потоков, на которые эти виртуальные потоки мультиплексируются. Под капотом виртуальная машина HotSpot была обновлена для поддержки новой конструкции: scoped stackful one-shot delimited continuations. Виртуальные потоки объединяют континуации в HotSpot с планировщиками в библиотеке Java. Когда код, выполняющийся в виртуальном потоке, блокируется, скажем, в операции блокировки или в блокирующей IO-операции, соответствующая континуация приостанавливается, стек потока, на концептуальном уровне, вымещается в кучу Java, а планировщик выберет и возобновит другой виртуальный поток в этом же потоке ОС. Исходный виртуальный поток может быть возобновлен в том же потоке ОС или в другом.


Таким образом, код, в котором есть остановка и возобновление, может делать это в разных потоках ОС с течением времени. Есть небольшой набор потоков операционной системы, который обычно как минимум соответствует количеству ядер в системе. Они поддерживают выполнение многих виртуальных потоков, осуществляя прогресс кусочек за кусочком.


Пользовательский код, использующий API Java, не знает о распределении, которое
происходит под капотом, а yield и resume происходит глубоко в библиотеках JDK, поэтому мы говорим, что планирование является вытесняющим и не требует сотрудничества со стороны кода пользователя.


С точки зрения стоимости, поток ОС слева, виртуальный поток справа.



Обычно операционная система резервирует около мегабайта стека для потока операционной системы. Некоторые ядра выделяют дополнительные данные ядра, и 16КБ не редкость. Это то, что операционная система имеет на поток ОС. Кроме того, виртуальная машина HotSpot добавляет к этому пару КБ метаданных.


Виртуальные потоки намного дешевле, текущий прототип составляет около 256 байт на виртуальный поток. Еще есть стек, он уменьшается и увеличивается по мере необходимости и обычно составляет пару КБ в этом главное преимущество
виртуальных потоков перед потоками ОС.


Переключение задач также немного лучше, в типичных ОС оно составляет около микросекунды, в некоторых случаях может быть хуже. Лучший вариант с виртуальными потоками на данный момент работает лучше, около пары сотен наносекунд. Меньший размер и лучшее время переключения контекста означают, что мы можем иметь столько потоков, сколько захотим, и уровень многопоточности может
расти без ограничений.


Самое время перейти от слайдов к IDE и показать вам несколько примеров в коде.


Демо


У меня открыта IDE с пустым методом, и мы начнем с самого начала.


import ...public class Demo {    public static void main(String[] args) throws Exception {...}    void run() throws Exception {    }}

Я упомянул, что мы ввели новый фабричный метод, и начну с использования фабричного метода Thread.startVirtualThread().


import ...public class Demo {    public static void main(String[] args) throws Exception {...}    void run() throws Exception {        Thread thread = Thread.startVirtualThread(() -> System.out.println("hello"));        thread.join();    }}

Вывел сообщение hello, ничего особенного. Это немного отличается от использования конструкторов и метода start(), здесь всего лишь один фабричный метод.


Я изменю тело лямбда-выражения, просто выведу трассировку стека, чтобы вы могли видеть, что на самом деле происходит.


void run() throws Exception {    Thread thread = Thread.startVirtualThread(Thread::dumpStack);    thread.join();}

Этот референс-метод просто вызывает дамп стека в контексте виртуального потока.



Возможно, это выглядит немного иначе, чем то, что вы видели бы с обычным java.lang.Thread, потому что фреймы, которые вы видите здесь, не те, что вы видите в обычном JDK. Это своего рода эквивалент запуска потока, потому что виртуальный поток запускает континуацию. Это дает представление о том, в чем вы можете увидеть разницу.


Давайте рассмотрим еще один из аспектов API. Что делает этот startVirtualThread()?


В дополнение к введению фабричных методов для создания виртуальных потоков, текущий прототип имеет новый билдер для создания потоков. Создадим билдер с помощью метода Thread.builder(). Мы можем при этом вызвать несколько методов, позволяющих настроить поток: является ли он потоком-демоном, какое у него имя и некоторые другие аспекты.



В числе этих методов есть virtual(). Создание виртуального потока cо startVirtualThread(), было, по сути, тем же самым. Вот длинная форма того, что я сделал минуту назад:


void run() throws Exception {    Thread thread = Thread.builder().virtual().task(() -> {        System.out.println("hello");    }).start();    thread.join();    }}

Мы снова сделали то же самое многословнее, но теперь использовали билдер потоков. А он избавляет нас от того, чтобы сначала использовать конструктор для создания потоков, а затем вызывать setDaemon() или setName(). Это очень полезно.


Это хорошее улучшение API для тех, кто в конечном итоге использует Thread API напрямую. Запускаем и получаем то же, что и в случае с startVirtualThread().


Еще мы можем создать ThreadFactory.


void run() throws Exception {    ThreadFactory factory = Thread.builder().name(prefix:"worker-", start:0).factory();}

Это создает фабрику потоков она создает потоки, которые называют себя worker-0, worker-1, worker-2 и так далее. На самом деле worker это только начальный аффикс, который добавляется к префиксу. Это еще один полезный способ создания фабрик потоков.


Покажу вам простой пример того, что вы можете делать с билдером потоков, используя возможности, которые дают нам дешёвые виртуальные потоки.


Большинство людей фактически не используют Thread API напрямую. Начиная с JDK 5, они перешли на использование ThreadExecutor и других API из java.util.concurrent.


Я хочу показать вам использование одного из этих ThreadExecutor. Мы создадим множество потоков и покажем вам, что на самом деле происходит.


Я собираюсь создать ExecutorService executor:


try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {}

Этот фабричный метод для Executors создает виртуальные потоки. Обратите внимание, что здесь я использую try-with-resources. Одна из вещей, которые мы сделали в Loom, мы модернизировали ExecutorService для расширения AutoCloseable, чтобы вы могли использовать их с конструкцией try-with-resources.


Приятная вещь в том, что когда он закрыт, как на примере сверху, он гарантирует, что все задачи, которые являются потоками в этом случае, будет завершены до того, как завершится закрытие. На самом деле закрытие будет ждать, пока все задачи или потоки не будут завершены, что очень полезно. Этот тип Executor'а создает поток для каждой задачи, поэтому он сильно отличается от пулов потоков, которые вы обычно создаете с помощью фабричного класса Executors.


Давайте создадим здесь миллион потоков.


import ...  public class Demo {      public static void main(String[] args) throws Exception {...}      void run() throws Exception {          try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {              IntStream.range(0, 1_000_000).forEach(i -> {                  executor.submit(() -> { });              });          }      }      String fetch(String url) throws IOException {...}      void sleep(Duration duration) {...}  }

Я использую IntStream.range(), вместо цикла for. Это вызовет метод executor.submit() один миллион раз, он создаст миллион потоков, которые ничего не делают. Если это запустить, ничего интересного не произойдет Process finished with exit code 0.


Для наглядности давайте добавим счетчик, который будет обновляться каждым из потоков.


import ...public class Demo {    public static void main(String[] args) throws Exception {...}    void run() throws Exception {        AtomicInteger counter = new AtomicInteger();        try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {            IntStream.range(0, 1_000_000).forEach(i -> {                executor.submit(counter::incrementAndGet);            });        }        System.out.println(counter.get());    }    String fetch(String url) throws IOException {...}    void sleep(Duration duration) {...}  }

Мы создаем Executor, отправляем миллион задач, каждая из этих задач будет увеличивать счетчик, и когда все закончится, выводится значение счетчика. Если все работает правильно, как у нас, должен быть выведен миллион.


Отрабатывает быстро как видите, эти потоки очень дешевы в создании.


Давайте покажу вам, что еще мы можем делать с Executor'ами. У меня есть метод, который просто принимает байты из определенного URL-адреса, создает из него строку. Это не очень интересно разве что то, что это блокирующая операция.


String fetch(String url) throws IOExpection {    try (InputStream in = URI.create(url).toURL().openStream()) {        byte[] bytes = in.readAllBytes();        return new String(bytes, charsetName:"ISO-8859-1");    }}

Он создает сетевые подключения, HTTP-соединение будет ожидать результатов от сервера, мы просто воспользуемся этим как частью примера.


Давайте посмотрим вот на что:


void run() throws Exception {       try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/");           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/en");           String first = executor.invokeAny(List.of(task1, task2));           System.out.println(first.length());       }     }

Вот одна задача, которая получает HTML-страницу с jokerconf.com. Я собираюсь создать вторую задачу, которая будет делать то же самое, но будет получать английскую версию страницы. Если кто-то говорит на двух языках, то он сможет читать и русскую и английскую страницу, это не имеет значения.


Мы используем executor.invokeAny() и даем ему две задачи.
ExecutorService имеет несколько комбинаторов, invokeAny(), invokeAll(), они существуют уже давно. Мы можем использовать их с виртуальными потоками.


В данном случае мы подставим в first результат, в зависимости от того, какая из этих задач будет выполнена первой.


Я запущу два виртуальных потока. Один из них получит первую страницу, другой вторую, в зависимости от того, что вернется первым, я получу результат в String first. Другой будет отменен (прерван). Запускаем и получаем результат: 200160, то есть одна из страниц размером 200 КБ.


Итак, что произошло: были созданы два потока, один выполнял блокирующую операцию получения данных с первого URL-адреса, другой со второго URL-адреса, и я получил то, что пришло первое. Если запущу еще пару раз, буду получать разные значения: одна из страниц всего 178 КБ, другая 200 КБ.


Это один из комбинаторов. На самом деле, я бы мог хотеть обе страницы и что-то с ними сделать, в этом случае я мог бы использовать invokeAll().


void run() throws Exception {       try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/");           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/en");           executor.invokeAll(List.of(task1, task2)); List>Future>String>>                   .stream() Stream<Future<String>>                   .map(Future::join) Stream<String>                   .map(String::length) Stream<integer>                   .forEach(System.out.println);       }     }

Как видите, это не слишком интересно всё, что мы здесь делаем, это invokeAll(). Мы выполним обе задачи, они выполняются в разных потоках. InvokeAll() блокируется до тех пор, пока не будет доступен результат всех задач, потому что вы получаете здесь Future, которые гарантированно будут выполнены. Создаем поток, получаем результат, получаем длины, а затем просто выводим их. Получаем 200 КБ и 178 КБ. Вот что вы можете делать с ExecutorService.


Покажу вам еще кое-что. В рамках Loom мы немного поработали над CompletableFuture, чтобы вы могли делать подобное. Добавить задачу и получить CompletableFuture, а не Future, и тогда я смогу написать такой код:


void run() throws Exception {       try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/");           Callable<String> task1 = () -> fetch(url:"https://jokerconf.com/en");           CompletableFuture<String> future1 = executor.submitTask(task1);           CompletableFuture<String> future2 = executor.submitTask(task2);           CompletableFuture.completed(future1, future2) Stream<CompletableFuture<String>>                   .map(Future::join) Stream<String>                   .map(String::length) Stream<integer>                   .forEach(System.out.println);       }     }

Я вызываю в CompletableFuture-метод под названием completed(). Это возвращает мне стрим, который заполняется Future в ленивом режиме по мере их завершения. Это намного интереснее, чем invokeAll(), который я показал ранее, поскольку метод не блокируется, пока не будут выполнены все задачи. Вместо этого поток заполняется результатом в ленивом режиме. Это похоже на стримо-подобную форму CompletionService, если вы когда-нибудь такое видели.


Я запущу несколько раз, порядок, вероятно, будет случайным. В любом случае, это дает вам представление о других вещах, которые вы можете делать с CompletableFuture, стримами и виртуальными потоками.


Еще одна вещь, которую я хочу сделать, забегая вперед. Мы еще поговорим об этом подробнее после демо. В прототипе есть ограничение. Виртуальные потоки делают то, что мы называем закреплением потока ОС, когда мы пытаемся выполнить IO-операции, удерживая монитор. Я объясню это лучше после демо, но пока у меня открыта IDE, покажу вам это на практике и объясню, на что это влияет.


import ...public class Demo {    public static void main(String[] args) throws Exception {...}    void run() throws Exception {        Thread.startVirtualThread(() ->            sleep(Duration.ofSeconds(2));        }).join();    }    String fetch(String url) throws IOException {...}    void sleep(Duration duration) {...}  }

Виртуальный выполняет блокирующую операцию, спит две секунды, завершается. Пока не слишком интересно. Теперь предположим, что он должен спать, пока держит монитор.


void run() throws Exception {        Thread.startVirtualThread(() -> {            Object lock = new Object();            synchronized (lock) {                sleep(Duration.ofSeconds(2));            }        }).join();}

Я запускаю это с диагностическим свойством, которое даст мне трассировку стека, когда поток закреплен.



Мы видим трассировку стека, которая говорит мне, что поток был закреплен. Она снабжена примечаниями, которые говорят мне, где удерживается монитор. Это ограничение в нашем текущем прототипе. По сути, потоки могут блокироваться, удерживая монитор. Поток закрепляет соответствующий поток ОС, эта скорее качество сервиса текущей реализации, чем ошибка. Я вернусь к тому, что мы делаем c этим, через несколько минут. Это своего рода базовое демо.


У меня есть более полная демонстрация, переключусь для этого на немного другой проект.


package demo;import ...@Path("/")public class SleepService {    @GET    @Path("sleep")    @Producers(MediaType.APPLICATION_JSON)    public String sleep(@QueryParam("millis") long millis) throws Exception {        Thread.sleep(millis);        return "{ \"millis\": \"" + millis + "\" };    }}

Уже существуют несколько серверов, которые работают с виртуальными потоками.
Есть сервер Helidon MP. Я думаю, MP означает MicroProfile. Helidon настроен, они недавно внесли некоторые изменения, теперь вы можете запустить его со свойством, при котором он будет запускать каждый запрос в отдельном виртуальном потоке. Мой код может выполнять операции блокировки, и они не будут закреплять базовый поток ОС. У меня может быть намного больше запросов, чем потоков, работающих одновременно и выполняющих блокирующие операции, это действительно очень полезно.


Первый сервис, который я вам покажу, что-то вроде эквивалента hello world при использовании подобных служб. Запускаем код из примера выше, переходим в окно терминала и вводим curl-команду.



Curl-команда кодирует параметр миллисекунд обратно в JSON, который возвращается.
Не слишком интересно, потому что все, что было сделано, это сон. Остановлю сервер и вставлю Thread.dumpStack():


public String sleep(@QueryParam("millis") long millis) throws Exception {    Thread.dumpStack();    Thread.sleep(millis);    return "{ \"millis\": \"" + millis + "\" };}

Снова запущу сервер. Я снова выполняю команду curl, которая устанавливает HTTP-соединение с сервером, она подключается к эндпоинту сна, параметр millis=100.


curl http://localhost:8081/sleep?millis=100


Посмотрим на вывод: печатается трассировка стека, созданная Thread.dumpStack() в сервисе.



Огромная трассировка стека, мы видим здесь кучу всего: код Helidon, код Weld, JAX-RS Довольно интересно просто увидеть это всё. Это сервер, который создает виртуальный поток для каждого запроса, что довольно интересно.


Теперь посмотрим на более сложный сервис. Я показал вам комбинаторы
invokeAny и involeAll в простом демо в самом начале, когда показывал новый ExecutorService.


import ...@Path("/")public class AggregatorServices {    @GET    @Path("anyOf")    @Produces(MediaType.APPLICATION_JSON)    public String anyOf(@QueryParam("left") String left,                        @QueryParam("right") String right) throws Exception {        if (left == null || right == null) {            throw new WebApplicationException(Response.Status.BAD_REQUEST);        }        try (var executor :ExecutorService = Executors.newVirtualThreadExecutor()) {            Callable<String> task1 = () -> query(left);            Callable<String> task2 = () -> query(right);            // return the first to succeed, cancel the other            return executor.invokeAny(List.of(task1, task2));        }    }    @GET    @Path("allOf")    @Produces(MediaType.APPLICATION_JSON)    public String allOf(@QueryParam("left") String left,                        @QueryParam("right") String right) throws Exception {        if (left == null || right == null) {            throw new WebApplicationException(Response.Status.BAD_REQUEST)        }        try (var executor :ExecutorService = Executors.newVirtualThreadExecutor()) {            Callable<String> task1 = () -> query(left);            Callable<String> task2 = () -> query(right);            // if one falls, the other is cancelled            return executor.invokeAll(List.of(task1, task2), cancelOnException: true) List<Future<String>>                    .stream() Stream<Future<String>>                    .map(Future::join) Stream<String>                    .collect(Collectors.joining(delimiter:", ", prefix:"{", suffix:" }"));        }    }    private String query(String endpoint) {...}}

Здесь у нас несколько сервисов, они находятся в этом исходном файле под названием AggregatorServices. Здесь есть две службы, два метода я бы сказал: anyOf и allOf. anyOf выполняет левый и правый запросы и выбирает тот, который возвращается первым, а другой отменяет.


Начнем с anyOf. Я вызвал curl-команду:


curl http://localhost:8081/anyOf?left=/greeting\&right=/sleep?millis=200

localhost:8081 это текущий порт, эндпоинт anyOf, и я дал ей два параметра left и right. Я выполняю это и получаю hello world:


{"message":"Hello World!"}$


Причина в том, что сервис приветствия просто выводит hello world, а сервис сна спит 200 мс. Я предполагаю, что большую часть времени hello world будет быстрее, чем 200 мс, и всегда будет возвращаться hello world.


Если я уменьшу сон до 1 мс, то, возможно, сервис сна завершится раньше, чем другой сервис.


Теперь давайте изменим запрос на allOf, который объединит два результата:


curl http://localhost:8081/allOf?left=/greeting\&right=/sleep?millis=1

Запускаю и получаю два результата.


{ {"message":"Hello World!"}, { "millis": "1" } }$


Что интересно в allOf, он делает два запроса параллельно.


private String query(String endpoint) {        URI uri = URI.create("http://localhost:8081").resolve(endpoint);        return ClientBuilder.newClient() Client                    .target(uri) WebTarget                    .request(MediaType.APPLICATION_JSON) Invocation.Builder                    .get(String.class);    }

Кстати, это блокирующий код. Он использует клиентский API JAX-RS для подключения к этому эндпойнту. Он использует вызов invokeAll(), а затем .stream (), .map для получения результата, а затем Collectors.joining(), для объединения в JSON.


Это простой пример разветвления. Интересно то, что тут invokeAll() это вариант, в котором есть параметр cancelOnException. Если вы хотите вызвать несколько задач одновременно, но если одна из них не работает, вы отменяете все остальные. Это важно сделать, чтобы не застрять в ожидании завершения всех остальных задач.


В этих примерах я использую сборку для раннего доступа Loom. Мы очень близки к первому рампдауну JDK 16, поэтому код, над которым я работаю это JDK 16, каков он сейчас, плюс вся реализация Loom.


Ограничения


Поговорим об ограничениях.


В настоящее время существует два сценария, в которых виртуальные потоки, пытающиеся уступить, не освобождают базовый поток ОС. В примере я показал, как можно получить трассировку стека, когда поток закреплен, что на самом деле очень полезно для диагностики проблем такого типа.


Первый сценарий возникает, когда вы используете нативные фреймы. Если у вас есть код JNI, вы вызываете код JNI, он обращается обратно в код Java, а затем этот код Java пытается заблокироваться или совершить IO-операцию. На континуации есть нативный фрейм, мы мало что можем сделать. Это потенциально постоянное ограничение, но подобное должно происходить очень и очень редко.


Второй случай более проблематичен, и это то, что я показал в демонстрации: попытка
парковки, удерживая монитор. Скорее всего, первая версия будет с этим ограничением, это особенность реализации, влияющая на качество сервиса. В настоящее время ведутся исследования по реимплементации мониторов, чтобы преодолеть это ограничение. Но это серьезный объем работы, на это потребуется время.


На самом деле это не очень критично. По той простой причине, что всё, что сегодня использует мониторы Java, можно механически преобразовать из использования synchronized и wait-notify в использование блокировок из java.util.concurrent. Так что существуют эквиваленты мониторов в блокировках java.util.concurrent и различные формы блокировок, самый простой из которых ReentrantLock, они очень хорошо работают с виртуальными потоками.


Что вы можете сделать при подготовке к Loom?


Если вам нравится Loom и вы заинтересованы в том, чтобы подготовить код для работы с Loom, есть пара вещей, о которых стоит подумать.


Предположим, у вас миллион потоков и код, который использует много ThreadLocals. Хотя виртуальные потоки поддерживают ThreadLocals, при их большом количестве требуется много памяти. Тут есть над чем подумать. Мы уже довольно давно работаем в JDK над устранением многих из ThreadLocals, которые использовались в различных местах.


Распространенным является кэширование объектов SimpleDateFormat.
SimpleDateFormat может быть дорогостоящим в создании, они не являются потокобезопасными, поэтому люди повсеместно кэшируют их в ThreadLocals.


В JDK мы заменили кэширование SimpleDateFormats на новый неизменяемый формат даты java.date dateformatter. Он неизменяем, вы можете сохранить его
в static final поле, это достаточно хорошо. Мы удалили ThreadLocals и из некоторых других мест.


Другая сложность сводится к масштабированию приложения и обработке десятков тысяч запросов. Если у вас много данных на запрос или на транзакцию, это может занимать много места. Если у вас миллион TCP-соединений, это миллион буферов сокетов. Если вы оборачиваете каждый из них в BufferedOutputStream, PrintStream или что-то в этом роде, это много памяти. Мы работали над подобными вещами и в JDK, но я уверен, что дальше по стеку у людей будет много данных на запрос или на транзакцию.


Третье, как я уже говорил, переход от мониторов к java.util.concurrent позволяет избежать краткосрочных проблем.


Я говорил в основном о виртуальном потоке как о потоке в коде, но давайте поговорим о нескольких других вещах.


Расскажу немного об отладчике.



При отладке действительно важно, чтобы при движении по шагам, вы
работали в каком-то контексте. Обычно отладчики Java (в IntelliJ, NetBeans, Eclipse) используют интерфейс отладчика под названием JDI, где под капотом находится wire protocol, а в виртуальной машине есть интерфейс инструментов, называемый JVM Tool Interface или JVM TI, как мы его иногда называем. Это все необходимо обновить, чтобы иметь возможность поддерживать виртуальные потоки.


Оказывается, это значительный объем работы. Отладчики занимаются приостановкой и возобновлением, они фактически перечисляют потоки. Тут возникают сложности с масштабируемостью при переходе от тысяч потоков к сотням тысяч или
миллионам потоков.


Кроме того, наш подход к поддержке подобного в отладчиках оказался сбивающим с толку. Отладчик видит два потока: основной носитель (поток нашей ОС) и виртуальный поток. С этим было связано с множеством проблем, поэтому мы решили остановиться и пересмотреть все это. Так что это область сейчас работает не очень хорошо, но мы близки к тому, чтобы решить эти проблемы. Так что в это время ведется гигантский объем работы для повторной реализации и перестройки частей JVM TI для лучшей поддержки виртуальных потоков.


Итак, мы приближаемся к цели, большая часть деталей уже реализована, и мы надеемся, что очень скоро у нас будет гораздо лучшая ситуация с отладкой.


И последнее, что нужно сказать об отладчиках: кто-то спрашивает, когда мы все это доделаем, будут ли имеющиеся отладчики просто работать? Мы ожидаем, что в отладчики и инструментарий потребуется внести хотя бы некоторые небольшие изменения, чтобы они могли работать с виртуальными потоками. В основном на фронте масштабируемости, не имеет смысла пытаться визуализировать миллион потоков в отладчике. Такие вещи, как группы потоков это уже легаси, и попытки визуализировать что-либо в группах потоков, вероятно, не будут работать хорошо.


Перейдем к виртуальным потокам в профилировщике.


Это тоже очень важная область. Java Flight Recorder был обновлен в сборках Loom для поддержки виртуальных потоков. Я не был уверен, что во время доклада успею продемонстрировать использование JFR с виртуальными потоками, поэтому вместо этого я просто зафиксировал вывод команды print в JFR, просто чтобы показать вам, на что он способен.


В данном случае я сделал запись с JFR.



Он просто называется server.jfr, это имя файла записи. Эта конкретная запись была сделана при запуске сервера Jetty, настроенного для использования виртуальных потоков. Выходные данные показывают одно событие, чтение сокета. И оно произошло в виртуальном потоке. JFR по умолчанию имеет порог, кажется, около 200 мс, он может захватывать медленную операцию чтения, которая занимает больше времени, чем время порога.


Давайте расскажу, что именно здесь запечатлено. virtual = true указывает на то, что это виртуальный поток. Я распечатал всю трассировку стека, поэтому вы можете увидеть, что это действительно работает в виртуальном потоке, мы видим все фреймы, тут используются java.net.url и HTTP для чтения сокета, и это блокирует более чем на 200 мс. Это записано здесь в этой трассировке стека. Это то, что вы можете делать с JFR, что весьма полезно.


Помимо Flight Recorder, поддерживающего виртуальные потоки, существует множество других инструментов и профилировщиков, использующих JVM TI, поэтому нам приходится работать над множеством вещей, чтобы иметь возможность поддерживать профилировщики на основе JVM TI, работающие с виртуальными потоками.


В этой области есть проблемы, о которых я упоминал в контексте отладчиков, попытки визуализировать тысячи и тысячи потоков.


То же самое и с профилировщиками. Если их используют для приложения с миллионом потоков, не думаю, что следует пытаться визуализировать их все, это будет неудобно для пользователя. Это область, которая, вероятно, не будет масштабироваться, по крайней мере, с точки зрения пользовательского интерфейса. Это тип проблем, с которыми мы сталкиваемся при работе с некоторыми инструментами.


Serviceability


Чтобы иметь возможность использовать виртуальные потоки в продакшне, мы должны решить множество других задач по устранению неполадок и диагностике.


Я показал вам довольно простую распечатку трассировки стека, когда потоки закреплены. Будут и другие сценарии, значимые для разработчиков. Они не смогут идентифицировать, например, запущенные виртуальные потоки, выполняющие вычислительные задачи (упирающиеся в CPU), они никогда не блокируются. Было бы полезно идентифицировать их.


Дамп потоков это то, с чем мы боролись в течение некоторого времени. Люди привыкли использовать дампы потоков для устранения неполадок, но что это значит, если у вас миллион потоков? Вы хотите просто увидеть все потоки, но дедуплицировать их? Вы хотите иметь другие формы? Это те вещи, которые нам еще предстоит изучить.


Текущий статус того, где мы находимся с Loom


Цель состоит в том, чтобы получить первую демо-версию с виртуальными потоками. У нас был ряд проблем со стабильностью, неприятные сбои, но мы думаем, что уже решили большинство из этих проблем.


Было много тонких проблем с взаимодействием с многопоточными сборщиками мусора, которые беспокоили довольно долгое время. Мы очень надеемся, что скоро получим сборку для раннего доступа, в которой будут решены эти проблемы.


Производительность также высока, был достигнут значительный прогресс в производительности за последний год, потому что это критически важно.


Поверхность API небольшая, но, как и во всех API, очень важно получить как можно больше обратной связи. API всегда проходят доработку, не бывает так, чтобы всё получилось как надо на первой итерации, мы пройдем через другие итерации, прежде чем все будет сделано.


Поддержка отладчика, как я уже упоминал, была проблемой, мы думаем, что сейчас на правильном пути, и как только мы доделаем последние штрихи, я действительно надеюсь, что у людей, которые обслуживают IDE и отладчики, будет время поработать с нами и создать условия хорошего опыта отладки.


Что еще нужно сделать для нашего первого Preview: нам необходимо выполнить перенос на ARM64 или Aarch64, мы были сосредоточены на 64-разрядной версии Intel на сегодняшний день; и нам нужно что-то сделать с дампом потоков.


Направления для будущего развития


Я должен упомянуть еще несколько аспектов этого проекта. Это такой список из будущего: мы не ожидаем, что перечисленное в нём будет в первой версии.


Во многих случаях для запуска задачи создается виртуальный поток, который выполняет одно действие и дает один результат. В других случаях вам может понадобиться, чтобы виртуальные потоки производили стрим результатов или общались через сообщения той или иной формы.


Как вы видите в других моделях программирования, CSP или Actors. У других языков есть каналы, у Erlang есть почтовые ящики. В Java есть вещи, близкие к этому: есть BlockingQueues, SynchronousQueue, у которой нет емкости, LinkedTransferQueue, у которой есть емкость.


Профессор Даг Ли работал с нами над этим проектом, и он обновил реализации блокирующих очередей в java.util.concurrent, так что они дружелюбны к виртуальным потокам. Он также изучает то, что ближе к каналам. Текущее рабочее название этого проекта conduits, а не каналы, потому что у нас есть каналы в пакете java.nio.channels. Посмотрим, как это пойдет.


Другая область это структурированная многопоточность. При структурированном программировании последовательное выполнение ограничено каким-либо четко
определенным блоком кода. Структурированная многопоточность заключается в распространении контроля на многопоточную среду. По сути, если задача управления разделяется на несколько задач или потоков в некоторой области, то им необходимо снова объединиться, прежде чем выйти из этой области.


В ранних прототипах Project Loom у нас действительно были первые прототипы для исследований в этой области. И мы вернемся к этому, есть проблемные области, связанные с этим. Распространение исключений и ошибок, отмена выполнения и так далее. На данный момент мы сделали, и я показывал это в демонстрациях, модифицировали ExecutorService, чтоб он расширял AutoCloseable, чтобы, по крайней мере, мы могли иметь некоторую конструкцию, которая гарантирует, что все потоки завершатся до продолжения основного потока.


Со структурированной многопоточностью связано то, что мы называем структурированным serviceability или структурированным observability. Я упоминал о проблемах профилировщиков, отладчиков и других инструментов, пытающихся
визуализировать миллионы потоков. Что, если бы эти инструменты каким-то образом исследовали бы структуру или отношения между потоками. Тогда, возможно, нам лучше бы удалось это визуализировать. На это мы готовы потратить больше времени.


Последний пункт в этом списке отмена. Мы сделали несколько прототипов в этой области, несколько прототипов кооперативной отмены. В Java есть механизм прерывания, это устаревший механизм, но на самом деле он очень хорошо работает с виртуальными потоками. Для большинства разработчиков этот механизм находится на слишком низком уровне. Мы хотим понять, сможем ли мы сделать что-то лучше. У нас были механизмы отмены в ранних прототипах. Я полагаю, что основная проблема заключается в том, что наличие двух механизмов одновременно может сбивать с толку, поэтому необходимо подумать над этим немного времени, прежде чем принимать какие-то решения по этому поводу.


Главные выводы


Основные выводы из этого доклада:


Виртуальный поток это поток в коде, во время выполнения, в отладчике, профилировщике и других инструментах.


Виртуальный поток это не оболочка вокруг потока ОС, а, по сути, просто объект Java.


Создание виртуального потока действительно очень дешево, у вас их могут быть миллионы, их не надо объединять в пулы.


Блокировка в виртуальном потоке стоит мало, что позволяет использовать синхронный стиль.


Немного дополнительной информации


Один из замечательных способов внести свой вклад в такой проект, как Project Loom, это загружать его сборки (или делать их самостоятельно из кода) и пробовать его с реальными приложениями. Отправляйте отзывы о производительности, сообщениях об ошибках, проблемах и опыте. Такие вещи очень полезны для такого проекта.


Вот ссылки на сборки раннего доступа: https://jdk.java.net/loom
Список рассылки: loom-dev@openjdk.java.net
И вики-страница: https://wiki.openjdk.java.net/display/loom/Main


У нас не очень получается поддерживать вики-страницу, поэтому список рассылки лучшее место для поиска чего-либо.


Это все, что я хотел рассказать.


Напоследок традиционный слайд Safe harbor: не верьте ничему, что я говорю.



Как можно понять по этому докладу, на наших Java-конференциях хватает хардкора: тут про Java-платформу порой рассказывают те люди, которые её и делают. В апреле мы проведём JPoint, и там тоже будет интересный состав спикеров (многие знают, например, Джоша Лонга из VMware). Часть имён уже названа на сайте, а другие позже появятся там же.
Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru