Русский
Русский
English
Статистика
Реклама

Parallel

Параллельные запросы в Kotlin для автоматизации сборки данных

10.02.2021 18:20:17 | Автор: admin

Всем привет! В своей работе я часто использую Kotlin для автоматизации. Деятельность моя не связана напрямую с программированием, но Котлин здорово упрощает рабочие задачи.

Недавно нужно было собрать данные немаленького размера, дабы сделать анализ, поэтому решил написать небольшой скрипт, для получения данных и сохранения их в Excel. С последним пунктом проблем не возникло - почитал про Apache POI, взял пару примеров из официальной документации, доработав под себя. Чего не скажешь про запросы в Сеть.

Источник отдавал пачками json и надо было как-то быстро эти "пачки" собирать, преобразовывая в текст и записывая в файл таблицу.

Асинхронный метод

Начать решил с простой асинхронщины. Немного поковыряв HttpUrlConnection, отправил туда, где ему и место, заменив на HttpClient из Java.

Для тестов взял сервис https://jsonplaceholder.typicode.com/, который мне подсказал один знакомый разработчик. Сохранил ссылку, которая выдает Json с комментариями в переменную, дабы не дублировать и начал тесты.

const val URL = "https://jsonplaceholder.typicode.com/comments"

Функция была готова и даже работала. Данные приходили.

fun getDataAsync(url: String): String? {    val httpClient = HttpClient.newBuilder()        .build()    val httpRequest = HttpRequest.newBuilder()        .uri(URI.create(link)).build()    return httpClient.sendAsync(httpRequest, BodyHandlers.ofString())        .join().body()}

Теперь надо было проверить скорость работы. Вооружившись measureTimeMillis я запустил код.

val asyncTime = measureTimeMillis {     val res = (1..10)        .toList()        .map {getDataAsync("$URL/$it")}    res.forEach { println(it) }}println("Асинхронный запрос время $asyncTime мс")

Все работало как надо, но хотелось быстрее. Немного покопавшись в Интернете, я набрел на решение, в котором задачи выполняются параллельно.

Parallel Map

Автор в своем блоге пишет, что код ниже выполняется параллельно с использованием корутин. Ну что, я давно хотел их попробовать, а тут представилась возможность.

suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> =    coroutineScope {        map { async { f(it) } }.awaitAll()    }

Если я все верно понял, то здесь расширяется стандартная коллекция (класс Iterable) функцией pmap, в которую передается лямбда. В лямбду поочередно приходит параметр A. Затем после окончания прохода по списку async дожидается выполнения всех элементов списка, и с помощью .awaitAll() выдает результат в виде списка. Причем для каждого элемента функция с модификатором suspend, то есть блокироваться она не будет.

Пришло время тестов, и сказать, что я был разочарован - значит не сказать ничего.

val parmapTime = measureTimeMillis {    runBlocking {        val res = (1..10)            .toList()            .pmap { getDataAsync("$URL/$it") }        println(mapResult)    }}println("Время pmap $parmapTime мс")

Средний результат был в районе - 1523мс, что не сильно то отличалось по скорости от первого решения. Задачи может и работали параллельно благодаря map и async, но уж очень медленно.

Parallel Map v 2.0

После работы, вооружившись малиновым чаем, я сел читать документацию по корутинам и через некоторое время переписал реализацию автора.

suspend fun <T, V> Iterable<T>.parMap(func: suspend (T) -> V): Iterable<V> =    coroutineScope {        map { element ->             async(Dispatchers.IO) { func(element) }         }.awaitAll()     }val parMapTime = measureTimeMillis {    runBlocking {        val res = (1..10)            .toList()            .parMap { getDataAsync("$URL/$it") }    }    println(res)}println("Параллельная map время $parMapTime мс")

После добавления контекста Dispatchers.IO задача выполнялась в 2 раза быстрее ~ 610 мс. Другое дело! Остановившись на этом варианте и дописав все до полноценного рабочего скрипта (проверка ошибок, запись в excel и т.д.) я успокоился. Но мысль в голове о том, что можно еще что-то улучшить не покидала меня.

Java ParallelStream

Через несколько дней, в одном из постов на stackowerflow прочитал о parallelStream. Не откладывая дело в долгий ящик, после работы вновь запустил IDEA.

val javaParallelTime = measureTimeMillis {     val res = (1..10).toList()        .parallelStream()        .map { getDataAsync("$URL/$it") }    res.forEach { println(it) }}println("Java parallelSrtream время $javaParallelTime мс")

Код выполнялся даже чуть быстрее, чем моя реализация. Но радость длилась ровно до того момента, когда пришло время обрабатывать ошибки. Точки останова насколько я понял в stream нет. Иногда, у меня получалось так, что все считалось до конца, вываливалась ошибка и в виде результата "прилетал" то неполный, то пустой Json.

Может, я делал что-то не так, но с async таких проблем не возникло. Там можно контролировать данные на каждом шаге итерации и удобно обрабатывать ошибки.

Выводы

Результаты можно посмотреть в таблице ниже. Для себя я однозначно решил оставить async await. В основном конечно из-за более простой обработки ошибок. Да и за пределы корутин тут выходить не надо.

Метод

Время (ms)

Асинхронный метод

1487

Реализация pmap из Сети

1523

Мой вариант - parallelMap

610

Java.parallelStream

578

В дальнейшем, есть мысли оформить это в небольшую библиотеку и использовать в личных целях, и конечно переписать все это с "индусского кода" на человеческий, на сколько хватит возможностей. А потом залить все это на vds.

Надеюсь мой опыт кому-нибудь пригодится. Буду рад конструктивной критике и советам! Всем спасибо

Подробнее..

Разгоняем REACTOR

12.06.2021 18:20:44 | Автор: admin

Кому будет интересно?

Реактор сегодня - это стильно, модно, молодежно. Почему многие из нас практикуют реактивное программирование? Мало кто может ответить однозначно на этот вопрос. Хорошо - если Вы понимаете свой выигрыш, плохо - если реактор навязан организацией как данность. Большинство аргументов "ЗА" - это использование микросервисной архитектуры, которая в свою очередь обязывает микросервисы часто и много коммуницировать между собой. Для коммуникации в большинстве случаев выбирают HTTP взаимодействие. Для HTTP нужен легковесный веб-сервер, а что первое приходит на ум? Tomcat. Тут появляются проблемы с лимитом на максимальное количество сессий, при превышении которого веб-сервер начинает реджектить запросы (хотя лимита этого не так уж и легко достичь). Здесь на подмогу приходит реактор, который подобными лимитами не ограничен, и, например, Netty в качестве веб-сервера, который работает с реактивностью из коробки. Раз есть реактивный веб-сервер, нужен реактивный веб-клиент (Spring WebClient или Reactive Feign), а раз клиент реактивный, то вся эта жуть просачивается в бизнес логику, Mono и Flux становятся Вашими лучшими друзьями (хотя по началу есть только ненависть :))

Среди бизнес задач, очень часто встречаются серьезные процедуры, которые обрабатывают большие массивы данных, и нам приходится применять реактор и для них. Тут начинаются сюрпризы, если реактор не уметь готовить, можно и проблем схлопотать очень много. Превышение лимита файловых дескрипторов на сервере, OutOfMemory из-за неконтролируемой скорости работы неблокирующего кода и многое многое другое, о чем мы сегодня поговорим. Мы с коллегами испытали очень много трудностей из-за проблем с пониманием как держать реактор под контролем, но всё что нас не убивает - делает нас умнее!

Блокирующий и неблокирующий код

Вы ничего не поймете дальше, если не будете понимать разницу между блокирующим и неблокирующим кодом. Поэтому, остановимся и внимательно разберемся в чем разница. Вы уже знаете, блокирующий код реактору - враг, неблокирующий - бро. Проблема лишь в том, что в настоящий момент времени, не все взаимодействия имеют неблокирующие аналоги.

Лидер здесь - HTTP взаимодействие, вариантов масса, выбирай любой. Я предпочитаю Reactive Feign от Playtika, в комбинации со Spring Boot + WebFlux + Eureka мы получаем очень годную сборку для микросервисной архитектуры.

Давайте по-простому: НЕблокирующий код, это обычно всё, в названии чего есть reactive, а блокирующий - все оставшееся :) Hibernate + PostgreSQL - блокирующий, отправить почту через JavaMail - блокирующий, скинуть сообщение в очередь IBMMQ - блокирующий. Но есть, например, реактивный драйвер для MongoDB - неблокирующий. Отличительной особенностью блокирующего кода, является то, что глубоко внутри произойдет вызов метода, который заставит Ваш поток ждать (Thread.sleep() / Socket.read() и многие подобные), что для реактора - как нож в спину. Что же делать? Большинство бизнес логики завязано на базу данных, без нее никуда. На самом деле достаточно знать и уметь делать 2 вещи:

  • Необходимо понимать где блокирующий код. В этом может помочь проект BlockHound или его аналоги (тут тема для отдельной статьи)

  • Исполнение блокирующего кода необходимо переключать на пулы, готовые его выполнять, например: Schedulers.boundedElastic(). Делается это при помощи операторов publishOn & subscribeOn

Разгоняемся сами

Перед тем, как продолжить, необходимо немного размяться!

Уровень 1

    @Test    fun testLevel1() {        val result = Mono.just("")            .map { "123" }            .block()        assertEquals("123", result)    }

Начнем с простого, такой код обычно пишут начинающие reactor программисты. Как начать цепочку? Mono.just и ты на коне :) Оператор map трансформирует пустую строку в "123" и оператор block делает subscribe.

Обращаю особенное внимание на оператор block, не поддавайтесь соблазну использовать его в Вашем коде, исключение составляют тесты, где это очень удобно. При вызове block внутри метода Вашего RestController, Вы сразу получите исключение в рантайме.

Уровень 2

    fun nonBlockingMethod1sec(data: String)     = data.toMono().delayElement(Duration.ofMillis(1000))    @Test    fun testLevel2() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { nonBlockingMethod1sec(it) }            .block()        assertEquals("Hello world", result)    }

Усложняем наш код, добавляем неблокирующий метод nonBlockingMethod1sec, все что он делает - ожидает одну секунду. Все что делает данный код - дважды, по очереди, запускает неблокирующий метод.

Уровень 3

    fun collectTasks() = (0..99)    @Test    fun testLevel3() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { businessContext ->                collectTasks()                    .toFlux()                    .map {                        businessContext + it                    }                    .collectList()            }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Начинаем добавлять самое интересное - Flux! У нас появляется метод collectTasks, который собирает массив из сотни чисел, и далее мы делаем из него Flux - это будет наш список задач. К каждой задаче мы применяем трансформацию через оператор map. Оператор collectList собирает все результаты в итоговый список для дальнейшего использования.

Здесь наш код начинает превращаться в рабочий паттерн, который можно использовать для массового выполнения задач. Сначала мы собираем некий "бизнес контекст", который мы используем в дальнейшем для выполнения задач.

Уровень 4

    fun collectTasks() = (0..100)        @Test    fun testLevel4() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { businessContext ->                collectTasks().toFlux()                    .flatMap {                        Mono.deferContextual { reactiveContext ->                            val hash = businessContext + it + reactiveContext["requestId"]                            hash.toMono()                        }                    }.collectList()            }            .contextWrite { it.put("requestId", UUID.randomUUID().toString()) }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Добавляем немного плюшек. Появилась запись данных (15) в реактивный контекст, а также чтение (10) из него. Мы почти у цели. Постепенно переходим к итоговому варианту

Уровень 5

    fun collectTasks() = (0..1000)        fun doSomethingNonBlocking(data: String)        = data.toMono().delayElement(Duration.ofMillis(1000))        fun doSomethingBlocking(data: String): String {        Thread.sleep(1000); return data    }    val pool = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "test-pool")    private val logger = getLogger()    @Test    fun testLevel5() {        val counter = AtomicInteger(0)        val result = nonBlockingMethod1sec("Hello world")            .flatMap { _ ->                collectTasks().toFlux()                    .parallel()                    .runOn(pool)                    .flatMap {                        Mono.deferContextual { _ ->                            doSomethingNonBlocking(it.toString())                                .doOnRequest { logger.info("Added task in pool ${counter.incrementAndGet()}") }                                .doOnNext { logger.info("Non blocking code finished ${counter.get()}") }                                .map { doSomethingBlocking(it) }                                .doOnNext { logger.info("Removed task from pool ${counter.decrementAndGet()}") }                        }                    }.sequential()                    .collectList()            }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Вот мы и добрались до итогового варианта! Часть с реактивным контекстом была опущена для более наглядной демонстрации того, зачем мы здесь собрались. У нас появились два новых метода: doSomethingNonBlocking (3) & doSomethingBlocking (6) - один с неблокирующим ожиданием в секунду, второй с блокирующим. Мы создали пул потоков для обработки задач (10), добавили счетчик активных задач в реакторе (15). У нас появился оператор parallel (19) и обратный ему sequential (29). Задачи мы назначили на свежесозданный пул (20). Для понимания, что же происходит внутри, добавили логирование внутри операторов doOnRequest (вызывается перед исполнением метода), doOnNext (вызывается после исполнения метода). Основная задумка - на примере, определить сколько задач одновременно выполняется в реакторе и за какое время цепочка завершит свою работу.

Такой "паттерн", мы с коллегами очень часто применяем для выполнения сложных задач, таких как отправка отчетов или массовая обработка транзакций. Первым делом собирается бизнес контекст - это некая структура, содержащая в себе информацию, полученную в результате вызовов других микросервисов. Бизнес контекст необходим нам для выполнения самих задач, и собирается он заранее, чтобы не тратить время в процессе обработки. Далее мы собираем список задач, превращаем их во Flux и скармливаем реактору на параллельную обработку.

И вот здесь начинается самое интересное. Попробуйте ответить на несколько вопросов. Как Вы считаете, сколько времени будет выполнятся данная цепочка? В ней 100 задач, в каждой задаче неблокирующее ожидание в 1 секунду, блокирующее ожидание в 1 секунду, и у нас в наличии пул из 10 потоков? (Вполне годная задачка на собеседование senior reactor developer :))

Правильный ответ

Около 12 секунд. Рассуждаем от блокирующего :) Блокирующее ожидание никуда не деть, и тут имеем 100 блокирующих секунд на 10 потоков, итого 10 секунд. Неблокирующее ожидание заметно нам лишь в первый раз, далее оно незаметно запускается в передышках между блокирующим. Не забываем про одну секунду сбора "бизнес контекста" перед запуском задач.

А теперь уберем строку (26) .map { doSomethingBlocking(it) } . Освободим наш реактор от блокирующего кода, интересно, сколько теперь времени займет выполнение цепочки?

Правильный ответ

2 секунды! 1 на сбор "бизнес контекста" и 1 на выполнение всех задач. Реактор запустит 100 задач одновременно. Но ведь у нас пул из 10 потоков? Как так? Первый разрыв шаблона.

Мы идем до конца и увеличиваем количество задач в методе collectTasks() до ... 1000? а может быть сразу до 15000? Как долго реактор будет выполнять столько задач?

Правильный ответ

2 секунды! 1 на сбор "бизнес контекста" и 1 на выполнение всех задач. Реактор запустит ВСЕ задачи одновременно. Второй разрыв шаблона. Где предел?

А это вообще легально?

Как же так и как это контролировать? Почему это опасно? Что если внутри параллельной обработки Вы решите вызвать другой микросервис? Если у вас 30000 задач, и по завершению каждой, Вам нужно отправлять запрос соседнему микросервису, Вы с удивлением можете обнаружить, что реактор непременно постарается выполнить все вызовы одновременно (Вы ведь используете реактивный web-client или реактивный feign, верно?) Открытие такого большого количества сокетов повлечет за собой превышение лимита открытых файловых дескрипторов в системе, что как минимум создаст проблемы с невозможностью создания новых сокетов в системе и помешает другим сервисам, а как максимум повалит Вам на сервере SSH и Вы потеряете доступ к серверу. Сомневаюсь, что в этот момент, программист будет кричать "зато смотри как быстро работает".

Разрыв шаблона. Thread Pool & Reactor

Основная проблема начинающего реактор программиста - это образ мышления, если есть медленный процесс - добавь X потоков, будет быстрее в X раз, а если слишком быстро - сократи количество потоков. Как всё просто было раньше? :) С реактором это не работает.

Классический thread pool - двери. Больше дверей - больше пропускная способность, все работает быстрее.

Теперь встречайте reactor! Вы видите двери? Нет никаких дверей

Реактор это большой мешок с подарками, или воздушная труба, задачи в которую валятся и летают там пока не выполнятся. А кто эти люди в желтом? Это наши epoll реактивные потоки, которые ни в коем случае нельзя нагружать блокирующими задачами. Можно провести аналогию с прорабами или инженерами. Они здесь, чтобы управлять процессом, а не чтобы выполнять тяжелую работу. Займите одного инженера тяжелой задачей, и когда к нему придет следующий рабочий с вопросом "что делать дальше?", он не сможет ответить, потому что был занят. Вот так и появляются таймауты в реактивном коде. Казалось бы микросервис стоит без нагрузки, выполняет какие-то задачки, а один из 500 запросов к нему падает с тайм-аутом, и непонятно почему. Велика вероятность что инженер был занят блокирующей задачей! Заботьтесь о своих инженерах и поручайте тяжелую работу специально обученным рабочим, например, Schedulers.boundedElastic().

Как контролировать эту "трубу", в которую валится всё без контроля? Вот мы и подошли к кульминации

Конфигурируем реактор!

В своей дефолтной конфигурации, параллельная обработка в реакторе зависит от количества ядер процессора сервера, на котором запускается код, поэтому, к своему удивлению, Вы получите разные результаты, проверяя работу реактора в тесте на локальной машине с 4-8 ядрами и production сервере с 32 ядрами.

Парад настроек открывает parallel с его аргументом parallelism

Меняя parallelism, мы можем регулировать количество запускаемых rails (это местное понятие реактора, которое похоже на корутины, но по сути является количеством одновременно выполняемых неблокирующих задач). Prefetch мы рассмотрим более подробно в следующем разделе.

Но одного parallelism недостаточно, реактор все еще будет нагребать задач как не в себя.

Мало кто обращал внимание что у оператора flatMap (только того что запускается на Flux) есть перегрузки с интересными аргументами, а именно maxConcurrency

maxConcurrency очень важен, по дефолту значение стоит Integer.MAX_VALUE (определяет сколько неблокирующих задач может выполняться одновременно на одной рельсе. Понимаете теперь откуда аппетит у реактора?

Также, не стоит забывать, что если цепочка будет запущена несколько раз (вызов одного http метода контроллера несколько раз), то все помножится! Никакой пул не спасет.

Количество запусков цепочки напрямую влияет на количество одновременно выполняемых задач.

Подведем небольшой итог:

  • parallel (parallelism)

  • flatMap (maxConcurrency)

  • Количество запусков цепочки

Эти три параметра являются множителями, для расчета количества одновременных задач.

По дефолту это Кол-во ядер * Integer.MAX_VALUE * Количество запусков цепочки

Напротив же, запустив данный код для 5 задач длительностью в секунду мы получим цепочку работающую 5 секунд. Теперь всё под контролем!

        val result = nonBlockingMethod1sec("Hello world")            .flatMap { _ ->                collectTasks().toFlux()                    .parallel(1)                    .runOn(pool, 1)                    .flatMap({                        Mono.deferContextual { _ ->                            doSomethingNonBlocking(it.toString())                        }                    }, false, 1, 1)                    .sequential()                    .collectList()            }            .block()!!

Стоп, или не всё?

Thread Pool

Зачем же нужен пул потоков в реакторе? Думайте о нем как о двигателе для Вашего автомобиля. Чем пул мощнее - тем блокирующие задачи будут разбираться быстрее, а если потоков мало, то и блокирующие задачи задержатся у вас надолго! А куда же мы без блокирующих вызовов? На количество одновременно выполняемых задач в реакторе он не влияет, вот это поворот :)

Надеюсь, Вы не пробовали использовать Schedulers.parallel() для исполнения Вашего блокирующего кода? =) Несмотря на свое подходящее название (ну называется он parallel, значит и нужен для параллельной обработки) использовать этот пул можно только для неблокирующего кода, в доке указано что он живет с одним воркером, и содержит в себе только особенные, реактивные потоки.

Распределение задач по рельсам

Не коснулись мы еще одной важной темы. Обычно, мы пытаемся закончить обработку большого массива данных в кратчайший срок, с чем нам определенно поможет изложенный выше материал, но это еще не все. В тестах мы часто используем синтетические данные, которые генерируем одинаковыми порциями, исключая погрешности production среды. Задачи обычно выполняются разное время и это создает проблемы с равномерным распределением задач.

Зеленые прямоугольники это наши задачи, которые распределяются в реакторе по алгоритму round-robin, что в случае с синтетическими данными дает красивую картинку.

Хорошо загруженный реактор (задачи равномерно распределены). 54 блокирующих задачи (каждая по 1сек), round-robin распределение по 6 рельсамХорошо загруженный реактор (задачи равномерно распределены). 54 блокирующих задачи (каждая по 1сек), round-robin распределение по 6 рельсам

Но запуская код в production среде, мы можем встретиться с долгим запросом в базу, сетевыми задержками, плохим настроением микросервиса да и чего только не бывает.

Плохо загруженный пул (задачи распределены не равномерно)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамПлохо загруженный пул (задачи распределены не равномерно)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсам

Оператор collectList() вернет нам результат только после завершения последней задачи, и как мы видим, наш пул будет простаивать пока 1 поток трудится разгребая очередь накопившихся задач. Это создает неприятные задержки, когда Вы знаете что можно быстрее, но быстрее не происходит.

Бороться с этим можно несколькими способами

  • concatMap вместо flatMap (посмотрите в профилировщик на ваш пул, передумаете)

  • правильно планировать задачи, чтобы исключить аномалии (почти невозможно)

  • дробить каждую задачу на много мелких, и также запускать их в параллельную обработку чтобы нивелировать проблемы с распределением (вполне рабочий вариант)

  • prefetch (наш выбор!)

Параметр prefetch у flatMap & runOn позволяет определить, сколько задач будет взято на одну рельсу на старте, а затем при достижении некоторого порога выполнения задач, реквесты будут повторяться с этим количеством. Значение по умолчанию - 256. Сменив значение на 1, можно заставить реактор использовать механизм "work stealing", при котором, рельсы и потоки, которые освободились, будут забирать задачи себе на выполнение и картина получится гораздо более приятная.

Хорошо загруженный пул (задачи равномерно распределены)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамPrefetch !Хорошо загруженный пул (задачи равномерно распределены)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамPrefetch !

На этом у меня всё. Будет интересно прочесть Ваши замечания и комментарии, на 100% истину не претендую, но все результаты подкреплены практическими примерами, на Spring Boot + Project Reactor 3.4. Всем спасибо!

Подробнее..
Категории: Kotlin , Java , Concurrency , Parallel , Reactor , Parallelism , Mono , Threads , Flux , Pool , Prefetch

Правильно cчитаем параллельные планы PostgreSQL

10.08.2020 12:13:56 | Автор: admin
Исторически, модель работы сервера PostgreSQL выглядит как множество независимых процессов с частично разделяемой памятью. Каждый из них обслуживает только одно клиентское подключение и один запрос в любой момент времени и никакой многопоточности.

Поэтому внутри каждого отдельного процесса нет никаких традиционных странных проблем с параллельным выполнением кода, блокировками, race condition, А разработка самой СУБД приятна и проста.

Но эта же простота накладывает существенное ограничение. Раз внутри процесса всего один рабочий поток, то и использовать он может не более одного ядра CPU для выполнения запроса а, значит, скорость работы сервера впрямую зависит от частоты и архитектуры отдельного ядра.

В наш век закончившейся гонки мегагерцев и победивших многоядерных и многопроцессорных систем такое поведение является непозволительной роскошью и расточительностью. Поэтому, начиная с версии PostgreSQL 9.6, при отработке запроса часть операций может выполняться несколькими процессами одновременно.


Со схемами работы некоторых параллельных узлов можно ознакомиться в статье Parallelism in PostgreSQL by Ibrar Ahmed, откуда взято и это изображение.
Правда, читать планы в этом случае становится нетривиально.

Вкратце хронология внедрения параллельного исполнения операций плана выглядит так:


Поэтому, если вы пользуетесь одной из последних версий PostgreSQL, шансы увидеть в плане Parallel ... весьма велики. А с ним приходят и

Странности со временем


Возьмем план из PostgreSQL 9.6:


[посмотреть на explain.tensor.ru]

Только один Parallel Seq Scan выполнялся 153.621ms внутри поддерева, а Gather вместе со всеми подузлами всего 104.867ms.

Как так? Суммарно времени наверху стало меньше?..

Взглянем чуть подробнее на Gather-узел:

Gather (actual time=0.969..104.867 rows=333333 loops=1)  Workers Planned: 2  Workers Launched: 2  Buffers: shared hit=4425

Workers Launched: 2 говорит нам о том, что дополнительно к основному процессу ниже по дереву были задействованы еще 2 дополнительных итого 3. Поэтому все, что происходило внутри Gather-поддерева является суммарным творчеством всех 3 процессов сразу.

Теперь посмотрим, что там в Parallel Seq Scan:

Parallel Seq Scan on tst (actual time=0.024..51.207 rows=111111 loops=3)  Filter: ((i % 3) = 0)  Rows Removed by Filter: 222222  Buffers: shared hit=4425

Ага! loops=3 это сводная информация по всем 3 процессам. И, в среднем, каждый такой цикл занял по 51.207ms. То есть суммарно для отработки этого узла серверу понадобилось 51.207 x 3 = 153.621 миллисекунды процессорного времени. То есть если мы хотим понять чем был занят сервер именно это число и поможет нам понять.
Замечу, что для понимания реального времени выполнения надо суммарное время разделить на количество worker'ов то есть [actual time] x [loops] / [Workers Launched].

В нашем примере каждый worker выполнил лишь один цикл по узлу, поэтому 153.621 / 3 = 51.207. И да, теперь уже нет ничего странного, что единственный Gather в головном процессе выполнился за как бы меньшее время.

Итого: смотрите на explain.tensor.ru суммарное (по всем процессам) время узла, чтобы понять, какой именно нагрузкой был занят ваш сервер, и оптимизации какой части запроса стоит уделять время.

В этом смысле поведение того же explain.depesz.com, показывающего сразу усредненное реальное время, выглядит менее полезным для целей отладки:



Не согласны? Добро пожаловать в комментарии!

Gather Merge теряет все


Теперь выполним тот же запрос на версии PostgreSQL 10:


[посмотреть на explain.tensor.ru]

Обратим внимание, что в плане у нас вместо узла Gather теперь оказался Gather Merge, а последовательное чтение таблицы превратилось в упорядоченное, по индексу, хоть и параллельное. Вот что говорит по этому поводу мануал:
Когда над параллельной частью плана оказывается узел Gather Merge, а не Gather, это означает, что все процессы, выполняющие части параллельного плана, выдают кортежи в отсортированном порядке, и что ведущий процесс выполняет слияние с сохранением порядка. Узел же Gather, напротив, получает кортежи от подчинённых процессов в произвольном удобном ему порядке, нарушая порядок сортировки, который мог существовать.

Но не все ладно в датском королевстве:

Limit (actual time=110.740..113.138 rows=10000 loops=1)  Buffers: shared hit=888 read=801, temp read=18 written=218  I/O Timings: read=9.709  ->  Gather Merge (actual time=110.739..117.654 rows=10000 loops=1)        Workers Planned: 2        Workers Launched: 2        Buffers: shared hit=2943 read=1578, temp read=24 written=571        I/O Timings: read=17.156

При передаче атрибутов Buffers и I/O Timings вверх по дереву часть данных была безвременно утрачена. Мы можем оценить размер этой потери как раз примерно в 2/3, которые формируются вспомогательными процессами.

Увы, в самом плане эту информацию взять неоткуда отсюда и минусы на вышележащем узле. И если посмотреть дальнейшую эволюцию этого плана в PostgreSQL 12, то он принципиально не меняется, разве что добавляется немного статистики по каждому worker на Sort-узле:

Limit (actual time=77.063..80.480 rows=10000 loops=1)  Buffers: shared hit=1764, temp read=223 written=355  ->  Gather Merge (actual time=77.060..81.892 rows=10000 loops=1)        Workers Planned: 2        Workers Launched: 2        Buffers: shared hit=4519, temp read=575 written=856        ->  Sort (actual time=72.630..73.252 rows=4278 loops=3)              Sort Key: i              Sort Method: external merge  Disk: 1832kB              Worker 0:  Sort Method: external merge  Disk: 1512kB              Worker 1:  Sort Method: external merge  Disk: 1248kB              Buffers: shared hit=4519, temp read=575 written=856              ->  Parallel Seq Scan on tst (actual time=0.014..44.970 rows=111111 loops=3)                    Filter: ((i % 3) = 0)                    Rows Removed by Filter: 222222                    Buffers: shared hit=4425Planning Time: 0.142 msExecution Time: 83.884 ms

Итого: не доверяйте данным узла над Gather Merge.
Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru