Русский
Русский
English
Статистика
Реклама

Mapreduce

Архитектура отказоустойчивого планировщика задач. Доклад Яндекса

03.12.2020 12:07:50 | Автор: admin
В Яндексе десятки тысяч машин, которые постоянно нагружены под завязку разными вычислительными задачами. Большая часть этих вычислений относится к так называемой batch-нагрузке как правило, оформленной в виде операций в парадигме MapReduce. Мы используем собственную систему YT, которая предоставляет распределённый storage и интерфейс запуска распределённых вычислений с произвольным пользовательским кодом. В докладе я рассказал о задачах, возникающих при попытке написать софт, который будет что-то планировать на кластерах из большого количества машин.

Давайте первым делом обсудим, чем вообще занимаются вычислительные кластеры Яндекса.

Они переваривают большое количество данных, постоянно работают десятки тысяч машин. Например, строятся поисковые базы, каждая из которых требует перемолоть десятки петабайт данных, только чтобы мы видели свежую информацию, заходя каждый день на поиск.

Суммарно хранится порядка экзабайта, то есть миллиард гигабайт исторических данных. Все эти данные надо где-то хранить и обрабатывать. Понятно, что чтобы их обрабатывать, нужно большое количество вычислительной мощности. И чтобы это все работало, чтобы можно было это использовать, необходима соответствующая инфраструктура.



Я работаю над продуктом, который называется YT.

Мы не очень много выступали с этой разработкой на конференциях, но исправляем досадную оплошность. Поэтому я сейчас быстро введу в курс дела. YT внутренняя разработка компании, которая объединяет в себе много разных продуктов.

В первую очередь есть часть, которая является распределенным storage, хранилищем этой информации. Эта часть похожа больше всего на такие продукты внешнего мира, как HBase и ZooKeeper из стека Apache.

Дальше есть вычислительный фреймворк, который позволяет производить вычисления над данными, лежащими в распределенном storage, и делать это в парадигме, который отталкивается от MapReduce. Но понятно, что c 2004 года, когда был представлен MapReduce, индустрия шагнула вперед, поэтому у нас не MapReduce в смысле статьи 2004 года, а гораздо более развитая концепция. Близким эквивалентом из внешнего мира является Hadoop.

Дальше у нас есть горизонтально масштабируемый kv-storage, который позволяет держать realtime-нагрузку. Есть возможность запускать в распределенном окружении код наподобие того, как это делает YARN. И возможность делать поверх тех данных, которые лежат в YT, разные аналитические запросы посредством более высокоуровневого интерфейса. Например, языка, который очень близок к SQL. Продукт называется YQL, и про него мы тоже как-то рассказывали.



YT работает на довольно больших кластерах. Например, типичный большой кластер содержит порядка десятков тысяч машин, которые непосредственно хранят и обрабатывают данные. Схематически на слайде можно увидеть изображение того, как устроен наш кластер.

Это все управляется служебными машинами, которые условно разделены на две категории, мастера, непосредственно заведующие распределенным storage, то есть той частью, которая в нашей терминологии называется Cypress, по-русски Кипарис.

Эти машины хранят всю информацию, необходимую, чтобы понимать, где находятся разные части, относящиеся к тем или иным таблицам. И есть машины-планировщики, schedulers. Их тоже порядка десяти штук. О них я сегодня по большей части и буду рассказывать.

Что такое batch-нагрузка, которая фигурирует в названии доклада? Она обладает следующими свойствами.

Во-первых, она характеризуется тем, что для нее типичен большой throughput, то есть большая пропускная способность. С помощью нее можно обрабатывать данные так быстро, насколько позволяют ресурсы.



Если у вас в два раза больше вычислительной квоты, то вы ожидаете, что ваши вычисления будут в два раза быстрее в первом приближении. Это не реалтайм-нагрузка, а нагрузка, которая может работать минуты, часы, дни, возможно, недели.

И разумеется, такая нагрузка не возникает вследствие того, что пользователь заходит на веб-страницу, нажимает на кнопочку, а дальше ждет результат. Вряд ли он будет ждать час, пока считается операция, чтобы получить ответ.

Подобные вычисления хорошо описываются в концепции MapReduce, с которой довольно много людей, я уверен, в прямом приближении знакомы. Поэтому концепция MapReduce и завоевала популярность потому что действительно хорошо позволяет раскладывать примитивные кирпичики в batch-вычисления, которые часто возникают в больших компаниях. И в компаниях поменьше тоже, и даже в маленьких продуктах.

Как же batch-нагрузку планировать? Что нужно сделать, чтобы эффективно утилизировать мощности дата-центра, кластера, в котором у вас есть железо, как их утилизировать batch-нагрузкой?

Я проиллюстрирую, как выглядит типичная batch-нагрузка. Начинается все с того, что есть какое-то количество таблиц, раскиданных по десяткам, сотням, тысячам машин в нашем кластере. Эти таблички преобразовываются, переходят одна в другую с помощью некоторых примитивных кирпичиков таких как операция merge, сортировка.



Не то чтобы типичные, но возможные времена показаны на картинке. Скажем, отдельный кирпичик типа сортировки может занимать, скажем, час реального времени. Поэтому нужно утилизировать сотню CPU-часов, потому что эта операция делается с большой степенью параллельности. Последующая операция может занимать 20 минут времени, тоже десятки CPU-часов, и так далее. Чтобы с большой параллельностью утилизировать CPU-время, необходимо, чтобы все эти вычисления происходили параллельно, содержали в себе параллельность.



Давайте договоримся относительно терминологии. Я буду говорить в терминах, которые свойственны нашей системе. Они немножко отличаются от общепринятых в Hadoop, и, возможно, это будет немножко сбивать с толку. Но я сейчас проговорю всю терминологию.

Начнем с того, что введем понятие операции. Операция это законченный логический блок, который преобразовывает набор таблиц, некоторый другой набор таблиц, согласно некоторому принципу. Например, это может быть операция сортировки по ключу, операция типа map преобразования строчек, или еще какие-нибудь операции, которые есть в нашей модели.

Операция преобразовывает целые таблички и состоит из маленьких отдельных блоков. Такие блоки называются джобами, каждый джоб это одна независимая часть внутри операции, которая обрабатывает свою порцию входных данных и получает свою порцию выходных данных. При этом джоб сущность, которая бежит в виде одного процессе на одной машине в кластере.

Соответственно, выход операции складывается из выхода всех джобов, запущенных в этой операции.



Схематично это выглядит так. У нас есть процесс планировщика. Он знает, что ему надо запустить некую операцию. Запуская операцию, которая преобразовывает некоторую входную таблицу в выходную, планировщик разбивает ее на какое-то количество джобов.

Скажем, на картинке он разбил ее на четыре джоба, каждый из которых независимо обрабатывает свою порцию входных данных. Затем джобы попадают на вычислительные узлы кластера и работают уже независимо друг от друга. Они вычитывают части входной таблицы и получают соответствующие части выходной таблицы.



Давайте поговорим про характерные числа, которые связаны с джобами. Сколько времени работает типичный джоб? Он должен работать порядка минуты. Почему? Почему, скажем, не одну секунду? Почему он на самом деле не может работать одну секунду? Потому что в таком распределенном окружении возникает вопрос как доставить код, который написал пользователь и которым он хочет обрабатывать свои данные, на машину, где код будет исполняться? Его по меньшей мере нужно куда-то распространить.

Могут быть дополнительные накладные расходы, связанные с работой с распределенным storage. На практике, когда речь идет про batch-нагрузку, джобы редко работают очень быстро. Они работают порядка единиц минут, и стараться их ускорять дальше бессмысленно, потому что большая часть времени будет потрачена на накладные расходы, а не на саму полезную работу внутри джоба.

Соответственно, из типичного времени работы, порядка одной минуты, получается, что типичный джоб за время своей жизни успевает обработать порядка гигабайта информации протащить через себя и преобразовать.

Типичный джоб использует только одно ядро, хотя бывают, конечно, и джобы, которые по каким-то причинам полагаются на многопоточность внутри себя и утилизируют больше одного ядра CPU.

Будем считать, что в кластере, про который я сегодня будут говорить, порядка нескольких сотен тысяч ядер. Из этого всего складывается следующая занимательная арифметика.

Если есть, скажем, сотня тысяч ядер, а типичный джоб живет порядка одной минуты, значит, в секунду будет происходить порядка несколько тысяч событий вида: какой-то джоб закончился, а, значит, на этой машине освободились значительные ресурсы.



А нам бы, конечно, хотелось, чтобы в любой момент времени все ядра были чем-то заняты. Предположим, планировщик не успевает реагировать на события, что какие-то ресурсы освободились, и не вовремя сообщает, что дальше нужно делать с этими ядрами. Следовательно, они не заняты, железо в наших дата-центрах не утилизировано, и мы теряем деньги.

То есть можно ввести важное требование, которое мы ожидаем от проектируемого нами планировщика: он должен быть эффективным, эффективно утилизировать ресурсы в дата-центрах.



Большую часть времени планировщик выглядит так. Он оркестрирует дикое количество событий, которые возникают рядом с ним. У него пропадают машины из кластера, отдельные джобы не работают, и все это происходит с такой интенсивностью, что успевать реагировать действительно нетривиальная задача.



Давайте еще немножко поговорим про джобы. Джобы должны удовлетворять следующим свойствам:

Они должны быть stateless в том смысле, что вся логика, которую они преобразовывают в информацию, должна быть зашита во входные данные для этой информации. Она не должна браться еще откуда-то.

Они должны быть детерминированные выдавать один и тот же результат, независимо от того, сколько раз мы их запустим.

У них не должно быть сайд-эффектов.

Совокупность этих требований дает нам возможность запускать джобы столько раз, сколько нам хочется, запускать несколько копий одного и того же джоба параллельно, если по каким-то причинам нам это захотелось. И в целом развязывает нам руки запускать джобы так, как мы хотим.

Давайте поговорим про еще одно важное требование требование отказоустойчивости. Что из себя представляет операция? Это сколько-то джобов, которые запускают примерно один и тот же код и преобразовывают входные таблицы в выходные. Операция может быть долгой. Она, в отличие от джоба, может бежать и часы, и сутки, и даже недели, возможно, даже месяцы.



Конечно, за то время, пока операция бежит, может приключиться какая-нибудь неприятность с процессом планировщика.

Например, он может уйти на обслуживание. Это такой же код, как и любой другой, его периодически нужно обновлять. Или нужно обновлять машину, на которой планировщик работает. С ним может произойти незапланированная неприятность. Например, машина, на которой он работает, может выйти из строя, либо он может упасть из-за бага.

Все эти ситуации случаются, и когда так происходит, не хочется, чтобы операция, которая бежала неделю до этого, улетела в трубу и мы потеряли все, что она делала. Не хочется перезапускать ее с нуля. Требование о том, что планировщик должен перевивать падение, отключение, переключение процесса, которым он заведует, можно назвать отказоустойчивостью.

Вот как выглядит ситуация с точки зрения планировщика. Предположим, запущено две операции, вот они бегут рядом. Сегодня мы будем видеть довольно много таких картинок. Отдельные горизонтальные отрезочки это джобы, которые запущены в рамках операции. Длительность джоба порядка единиц минут, как я уже говорил. Сами операции могут быть довольно длинными. Скажем, верхняя операция добежала, она бежала порядка двух часов.



Вторая продолжает бежать Бац! Происходит переключение планировщика, связанное с тем, что процесс планировщика упал. Такое может случиться. К этому моменту вторая операция бежала целых десять часов. Мы не хотим, чтобы все, что она насчитала, пропало. Нас интересует, как восстановиться из этой ситуации.



Третье важное требование требование честности планировщика. Я не буду сегодня много о нем говорить, но если в общих чертах, планировщик раздает ресурсы потребителям, а, значит, он должен это делать честно. Если у какого-то потребителя больше квоты или больше ресурсов в каких-то терминах, то ему должно доставаться больше машинного времени под работу его кода. Как правило, желающих на ресурсы гораздо больше, чем самих ресурсов. И чтобы раздать эти ресурсы честно, чтобы в моменте примерно соблюдалось желаемое распределение ресурсов между потребителями, необходимо использовать разные сложные техники.

У нас используется алгоритм, который относится к семейству fair share scheduling. Но, опять же, сегодня я про это не буду много говорить. Надеюсь, мои коллеги, которые этим занимаются, тоже выступят с докладом про это, stay tuned.

Картинка про честность:



Разные потребители могут задавать свои пожелания еще и в совершенно разных терминах. И от этого становится окончательно тяжело. Кто-то может говорит хочу себе 40% всего кластера. Кто-то хочу не меньше 50 тысяч ядер. Третий мне 150 видеокарточек. Кто-то еще мне ничего не положено, но я все равно хочу чего-нибудь посчитать, пожалуйста, дайте мне ресурсы. Это не тривиально.

Итак, чего мы хотим от оставшейся части нашего рассказа? Мы хотим построить эффективный отказоустойчивый планировщик.



Начнем обсуждать какую-нибудь модель, в которой будет работать планировщик.

Сначала поймем, что он должен помнить про каждую операцию. Мы введем термин контроллер операции, который описывает все состояние, которое планировщик помнит про операцию.

В первую очередь планировщик должен помнить информацию о том, что, собственно, эта операция делает. Фактически информацию о том, как операция запустилась. Эта информацию планировщику передает пользователь, когда хочет запустить операцию.

В этой информации есть то, какой код нужно запускать, какие таблицы мы хотим обрабатывать, какие таблицы мы хотим получить на выходе, скажем адреса этих таблиц в нашем storage. Есть и настройки, связанные с конкретной операцией: если это сортировка, то по какому ключу мы сортируем нашу таблицу.

Всю совокупность информации, которую пользователь сообщает нам, чтобы запустить операцию, я буду называть спецификацией операции.

Планировщик должен помнить, какие входные данные он уже обработал, какие еще нет, какие только предстоит обработать, какие выходные данные уже порождены пробежавшими джобами, какие джобы в операции сейчас бегут, и если бегут, то на каких машинах и какие данные они обрабатывают.

Вся эта информация часть состояния, относящегося к конкретной операции. И это состояние мы будем называть контроллером. Это структура данных в памяти планировщика. Если планировщик не будет знать всю эту информацию про операцию, то он не будет в состоянии оркестрировать эту операцию, чтобы она дальше успешно работала, чтобы запускались джобы, чтобы происходил прогресс.

Более того, эта информация совершенно не статична, она меняется во времени, и удобно мыслить про совокупность этой информации как про state machine про автомат, реагирующий на определенное воздействие со стороны внешнего мира.



Какого рода бывают воздействия? Например, такое. Приходит к планировщику машина кластера и говорит: У меня образовались свободные ресурсы, пять ядер CPU и 80 гигабайт оперативной памяти. Не хочешь ли ты запустить джоб, который может на мне работать в таких обстоятельствах? Планировщик: Да, хочу. Давай ты запустишь этот бинарник, на вход подашь такие-то части таких-то табличек, и все будет хорошо.

Информация, которую он сообщает в ответ, это спецификация конкретного джоба. Либо он в ответ может сообщить: Нет, знаешь, чего-то у меня нет. Сейчас никакой полезной работы всего лишь на одно ядро и всего лишь на один гигабайт памяти. Поэтому давай, жди дальше.

Может быть событие вида: какой-то джоб закончил работать. Он мог закончить работать, потому что упал или доработал. Во втором случае он еще, наверное, породил выходные данные которые тоже являются частью события.

Также удобно рассмотреть такое событие: мы закончили запускать все необходимые джобы, у нас больше не осталось необработанных входных данных. В таком случае нужно финализировать операцию, собрать выходные таблицы из всего того, что породили джобы.

Отмечу еще, что из этих трех событий, которые я обрисовал, два инициируются внешним миром по отношению к планировщику, приходят со стороны ноды, на которой запускаются джобы.



Последнее событие фактически вытекает из внутреннего состояния планировщика. Когда планировщик понимает, что операция закончила бежать, он инициирует событие о том, что операция заканчивается, подходит к логическому завершению.



Если мы хотим иллюстрацию, то вот она. У нас есть контроллер, в котором вход состоит из двух квадратиков: красного и синего. Дальше возникают события: например, пришло событие, что мы хотим запланировать джоб (дальше будем говорить зашедулить от англ. schedule). И мы говорим: окей, давайте отдадим синий квадратик в джоб А. Дальше приходит событие, что, джоб А закончился, этот джоб преобразовал синий квадратик в синий треугольничек. Дальше точно так же мы запланируем джоб, который будет перерабатывать красный квадратик. Красный квадратик перейдет в красный кружочек. Наконец, операция закончится, а контроллер операции претерпит последовательность из некоторого количества состояний, каждое из которых получается под воздействием очередного события.

В такой модели есть одно очень приятное свойство: контроллеры могут жить в нескольких разных планировщиках, каждый из которых равноправен со всеми остальными. Я могу равномерно раскидать контроллеры операций, скажем, по пяти машинам, на которых работает процесс планировщика.

У меня будет пять планировщиков. Они будут независимо обрабатывать события, приходящие к ним от внешнего мира. И если они по каким-то причинам будет не успевать это делать, скажем, не будут удовлетворять требованию эффективности, то я могу сделать не пять машин, а десять.



Я получаю горизонтальную масштабируемость, это очень приятное свойство, потому что оно позволяет решить проблему эффективности планировщика. Скажем, на машинах кластера могут бежать джобы от разных планировщиков. И эти машины будут ходить со своими событиями к процессам планировщика, которые на них эти джобы поместили.



Окей, попробуем вообразить самую базовую реализацию планировщика. Она может звучать так: давайте держать состояние каждой операции в оперативной памяти и реагировать на эти самые внешние воздействия.

Конечно, такая реализация не выдерживает никакой критики, потому что она никак не решает проблему отказоустойчивости. Предположим, планировщик упадет: оперативная память не самый надежный storage. Тогда я потеряю всю свою информацию о прогрессе. Хуже того, я потеряю даже знания о том, какие операции бежали в предыдущей инкарнации планировщика. Я не смогу даже перезапустить их.

С точки зрения пользователя это будет выглядеть так: он запустил операцию, пришел на следующий день и обнаружил, что планировщик ни сном, ни духом про его операцию не знает, вообще ее не закончил и даже не в курсе, что она на нем когда-то бежала. Это неприятно. Давайте с этой проблемой поборемся.



Нам нужно место. Например, давайте для начала спецификацию операции будем куда-нибудь сохранять скажем, на диск. Диск это чуть более надежный storage, чем оперативная память. Но все равно придется решать проблемы того, что диски тоже выходят из строя, что если ушла машина целиком, на которой лежал планировщик, а не только один процесс, то мы потеряем еще и диск на этой машине.

Чтобы решить эти проблемы, мы используем другую часть нашей системы, с которой я начал. Она называется Кипарис или Cypress. Это и есть надежный распределенный storage, в который мы складываем часть состояния планировщика. Давайте хранить спецификацию операции в Кипарисе. Если мы будем это делать, то сможем реализовать следующий подход.



Когда новая инкарнация планировщика просыпается, она оглядывается, смотрит, какие операции бежали, вычитывает их спецификации из Кипариса и начинает исполнять их с нуля. Это уже кое-что. Мы, по крайней мере, теоретически можем когда-нибудь эту операцию закончить.



На картинке это выглядит так. Начинаем с того, что на старте операции записываем спецификацию этой операции в Кипарис. Начинают бежать джобы Бац! Происходит переключение планировщика.

Новая инкарнация планировщика просыпается, прочитывает спецификацию той операции, которая была. Узнаёт: ага, тут была такая-то операция. К сожалению, я не знаю, что я успел в предыдущей своей инкарнации сделать, но я могу начать делать то же самое.

Дальше он начинает запускать те же самые джобы так же, по порядочку. И в этот раз они доходят до успеха.



Это решение как-то работает, но, конечно, тоже не очень практичное: если каждый раз, когда происходит переключение планировщика, мы будем вынуждены не то что терять, а начинать сначала считать то, на что мы могли уже потратить часы или дни, это неприятно.

Из реальной практики: характерное время, которое живет процесс планировщика, это десяток дней. Если смотреть на перспективу, скажем, месяца, то за это время мы его обязательно либо обновим, либо уроним, либо его уронит кто-нибудь другой.

Нужно что-то предпринимать, потому что некоторые операции и вовсе работают дольше, чем десять дней. Если операция работает сама по себе месяц, а такое тоже бывает, то у нее есть шанс оказаться в ситуации, напоминающей день сурка. А именно: операция бежит, происходит переключение, она сказала: окей, начну сначала. Бежит, снова переключение. Так она никогда она не добежит до успеха, будет вечно бежать.



Нужно решение. И решение в нашем случае использовать snapshotting.



Мы периодически будем сохранять из состояния больше, чем просто спецификацию. Мы будем сохранять состояние контроллера целиком в надежное место.

Общая идея понятна. Есть контроллер структура данных, которую мы с вами описали несколькими слайдами ранее. Давайте все эти данные спецификацию, всё, что сейчас есть из необработанных входных данных и из выходных, сериализовывать. Скажем, просто бинарным дампом. И класть в Кипарис.

Отлично. Если теперь произойдет переключение и проснется новая инкарнация планировщика, то она сможет прочитать этот снепшот, поднять точно такую же копию всех структур данных на момент создания снепшота и начать реагировать из этой копии.

На этом пути есть сложный технический вопрос: как же эти снепшоты будут возникать, в какие моменты они будут писаться? Контроллер постоянно воспринимает события. Если это большая операция, десятки или сотни тысяч джобов, то контроллер будет каждую секунду много раз претерпевать изменения под воздействием событий, что джоб закончился или его нужно зашедулить.



Если параллельно с тем, как эти изменения применяются, пытаться писать снепшоты, то мы будем читать постоянно меняющуюся структуру данных. Если мы будем делать это из другого потока, то мы рискуем много чем. Мы можем в лучшем случае получить неконсистентное состояние, а в худшем вообще получим проезд по памяти и процесс планировщика упадет.

Нужно что-то придумать.

Добавлю, что у больших операций размер этой структуры данных в памяти планировщика тоже может быть довольно большим, порядка гигабайта или десяти гигабайт.

Как же взять и что-то сериализовать такого большого размера, если оно постоянно меняется? Нужна оригинальная идея. Если это делать, просто останавливая на время принятие всех изменений, это будет как-то работать, но не будет очень приятным для пользователя. Почему? Давайте посмотрим на картинке.



Вот линия жизни контроллера. Он периодически обрабатывает события, отвечает на попытки зашедулить или закончить джоб, просто сохраняет знание о том, какие части выходных данных джоб породил, и говорит: окей.

А потом мы такие: хорошо, мы хотим начать писать снепшот. Значит, мы больше не принимаем никаких изменений.

В это время к нам продолжают приходить запросы, и мы вынуждены на них отвечать: извини, сейчас я не работаю, я пишу снепшот, не могу тебе сейчас зашедулить джобы. И нет, джоб, который ты закончил, я тоже не запомню. Приходи потом, завтра.

Эта ситуация может быть довольно долгой: чтобы записать, скажем, десять гигабайт состояния в распределенный storage куда-то по сети, на диск, куда-то далеко, может понадобиться 15-20 минут. В течение всего этого времени контроллер этой операции будет отклонять все попытки что-нибудь с ним делать и не будет шедулить ни один новый джоб. Мы довольно существенное время будем простаивать, это недопустимо. Когда мы наконец отлипнем, то начнем реагировать дальше, как было, но 10-15 минут живого времени потеряются. (...)

Системный вызов fork это такая черная магия, которая позволяет нам расклонировать процесс, причем сохранить состояние памяти родительского процесса в ребенке, причем в неизменном виде. Ребенок, если будет читать свою память, не будет видеть изменения, которые происходят в родительском процессе.



Также приятно, что fork это довольно быстрый системный вызов. Скажем, чтобы форкнуть процесс, который использует 100 гигабайт оперативной памяти, вам нужно не сильно больше, чем 10 секунд реального времени, потому что fork реализован через концепцию copy-on-write.

Если воспользоваться fork умело, можно придумать схему построения снепшотов, которая не предполагает заморозки на долгое время и непринятия запросов в течение долгого времени.

Давайте делать так. Когда мы хотим построить снепшот, давайте заморозим все контроллеры. Мы не будем принимать никакие изменения. Но это мы сделаем на совсем небольшое время. Мы позовем fork, который отработает буквально за десять секунд, после чего оставшийся родительский процесс такой: ага, fork прошел, я снова размораживаю все контроллеры, начинаю принимать изменения, начинаю шедулить новые джобы, реагировать на окончание предыдущих джобов. В общем, все замечательно.

В это время родившийся ребенок такой: ага, у меня есть состояние родителя в какой-то момент времени. Это консистентное состояние, потому что сейчас не идут никакие изменения в контроллерах. Давайте я начну все эти контроллеры обходить, писать их снепшоты куда бы то ни было. И буду делать это столько времени, сколько захочу. Потрачу на все эти большие контроллеры, скажем, 25 минут. Захотел потратил. Он их пишет и потом заканчивает свою работу. Хронологически это выглядит так.



Время простоя относительно основного процесса буквально 10 секунд на fork. Форкнутый процесс пишет что-то в Кипарис 10-15-20 минут.

Чем такое решение чревато? Оно обладает следующими свойствами.



Простой, как я уже сказал, небольшой. Его можно повторять, то есть форкать один и тот же процесс, произвольное количество раз. Например, можно делать следующий fork, когда предыдущий закончил работу. При такой схеме все контроллеры будут получать свои снепшоты примерно одновременно с тем, как мы делаем очередной fork.

Цена такого решения это двукратное потребление по памяти. Потому что в худшем случае copy-on-write, копирующий странички памяти, когда тот или иной процесс их трогает и мы хотим сохранить видимость старого состояния в другом процессе, раздвоит всю память нашего планировщика. Об этом надо помнить.

Окей, мы научились писать снепшоты. Давайте поймем, какая теперь будет логика восстановления. Она не очень сложная, но в ней есть ряд моментов, которые стоит проговорить.



Если у меня есть снепшот, я, наверно, могу из него восстановиться? Есть риск, что я прочитал снепшот, но не могу из него восстановиться, потому что он записан старой версией кода. Неприятно. В такой ситуации я восстановиться действительно не могу. Надо стремиться, чтобы такая ситуация была редкой, стараться сохранять совместимость снепшотов как можно чаще, не ломать ее при минорных обновлениях. Потому что если совместимости нет, то нет выхода, кроме как сделать clean start и начать все с нуля.

Если у операции нет ни одного снепшота, то я тоже не могу сделать ничего, кроме clean start. Но это не очень страшно: значит, операция бежала к текущему моменту не очень долго, не больше, чем те самые 15-20 минут, а это регулярность, с которой появляются снепшоты.



Что же мы теряем, когда просыпаемся из снепшота? Снепшот сделан в какое-то время в прошлом. Мы теряем часть последних событий. Давайте поймем, что мы знаем про джобы.

Джоб, который закончился до момента последнего снепшота, мы точно не потеряем. Это замечательно. Если же джоб закончился после последнего снепшота, то мы точно потеряли информацию о том, какие выходные данные он породил. С этой проблемой мы пока не умеем бороться. Давайте просто такие джобы игнорировать. Будем считать, что их и не было. Нам придется их перезапустить, пересчитать.

В снепшот попадают только завершившиеся джобы, и я утверждаю, что это уже довольно хорошее решение.



На картинке это выглядит так. Есть момент, когда я сделал снепшот. К этому моменту какие-то джобы успели завершиться. Эти джобы мне не нужно перезапускать после переподъема планировщика: я про них все знаю. А джобы, которые не завершились во время последнего снепшота, я, к сожалению, теряю. И неважно, успели они завершиться к моменту падения планировщика или бежали во время его падения.

Но таких джобов не очень много. Получается, если они довольно короткие, то я на этой картинке теряю отрезок порядка 20 минут реального времени прогресса. Если операция длинная, 20 минут прогресса для нее не очень много, их можно и переиграть.



Что происходит, если джобы длинные? Такое иногда бывает нужно скажем, у джоба может быть очень тяжелая установка начала работы, которую невозможно ускорить, раздробив ее ход на две части. Такое аддитивное слагаемое во время работы джоба. Если такой джоб есть, он длинный, работает час или два, то мы, к сожалению, будем терять много. Картинка будет выглядеть так.



Несмотря на то, что мы исправно делали снепшоты каждые 20 минут, мы все равно потеряем порядка продолжительности джоба порядка часа на этой картинке, когда произойдет переключение. Так как я вынужден переиграть все джобы.

Поэтому длинные джобы это плохо, но иногда они возникают. К сожалению, это реальность, с которой приходится сталкиваться. Например, задачи машинного обучения не очень хорошо параллелятся, и периодически в YT запускаются операции, в которых бывают двухчасовые, пятичасовые джобы, просто исполняющие некоторый процесс. YT используется как место, где можно запускать нечто в таком роде. Что же делать?



Давайте пытаться восстанавливать из снепшота что-то и про бегущие джобы тоже.

У нас есть ряд проблем. Предположим, я только что проснулся, поднялся из снепшота и мне необходимо узнать что-то про джобы, которые сейчас бегут. Я могу попросить ноды сообщать мне, что на них бегут какие-то джобы, периодически приходя ко мне с heartbetas. Это половинчатое решение. Если ко мне придет событие о том, что джоб бежит, отлично. Я про него все знаю, я его подцепил. А если не придет, то я оказываюсь в непонятках: этот джоб уже успел закончиться? Или упал, или еще бежит, но просто до меня не дошло это событие? Что же делать?



На картинке, если было три джоба, один из которых упал, второй закончился и породил какой-то выход, а третий на моментупереключения продолжает бежать, то, проснувшись в новой инкарнации, я узнаю что-то про третий джоб. Он придет ко мне и скажет: я здесь. Но про первый и второй джобы я не узнаю ничего, и что самое ужасное, я потеряю вывод, который второй джоб успел породить. Потому что у меня его нет в снепшоте, на ноде его уже тоже нет, откуда мне его взять?

Решение напрашивается само собой. Давайте попросим ноды запоминать ту часть состояния, которую я потенциально могу потерять.



Давайте попросим их придерживать события, что джоб закончился, до тех пор, пока это имеет смысл. То есть они будут придерживать всё то, что я могу потерять. А я как планировщик буду периодически их просить: слушай, я вот только что проснулся и забыл, что было в последние 15 минут. Напомни мне, пожалуйста, все события вида джоб закончился, которые я мог пропустить.



Сколько времени нужно держать такую информацию на ноде? Утверждается, что нужно вспомнить, когда мы надежно запоминаем хоть что-то. Мы надежно запоминаем что-то, когда оно попадает в снепшот. Если в снепшот попало событие, что джоб закончился, тогда можно смело забывать про это событие на ноде, потому что я как планировщик уже никогда про него не забуду.

Чудесно. Введем еще один вызов, который будет идти от планировщика к ноде и говорить, что можно забыть про джоб. С помощью него построим эту конструкцию. Посмотрим на картинке.



У меня есть джоб. Он бежал и закончился. К текущему момент в снепшоте нет информации, что джоб закончился. Поэтому нода запоминает этот джоб и держит событие, которое она отправила планировщику, еще какое-то время. Затем планировщик пишет снепшот. Всё, это событие уже никуда не пропадет. И наконец, планировщик говорит: всё, про этот джоб можно забывать, мы этот джоб выкидываем из ноды.



В плохой ситуации, когда происходит переключение, картинка будет чуть сложнее. Джоб закончился, происходит переключение. Просыпается новая инкарнация планировщика. Что происходит с джобом с ее точки зрения? Он с ее точки зрения бежит, потому что она проснулась в момент, когда он еще бежал.

Планировщик говорит ноде: слушай, а что там было с этим джобом? Она отвечает: да он вообще-то уже закончился. Планировщик такой: окей. Нода продолжает помнить про этот джоб, потому что мало ли, вдруг планировщик еще раз упадет? В конце концов планировщик делает еще один снепшот, и этот джоб вымывается из ноды.

Такое решение у нас сейчас и используется. Давайте я назову числа, которые этот подход оправдывают.



Мы экономим порядка сотен тысяч ядер CPU на больших кластерах в неделю, только потому что мы делаем снепшоты. И это действительно спасает. Но это что касается утилизации. А есть еще один момент, который касается непосредственно того, как это все видит пользователь.

Вернемся к человеку, запустившему задачу машинного обучения, которая типично выглядит как операция из одного джоба на несколько часов. Посмотрим на такие операции. Выберем из них те, которые зацепили собой переключение планировщика. Я утверждаю, что таких операций несколько десятков тысяч в месяц. И такие операции в среднем экономят по семь часов на переключениях, потому что мы умеем обратно подцеплять джобы от этих операций.

Если бы мы этого не делали, то в среднем приходилось бы тратить на семь часов больше на каждую такую операцию, только потому что операция бежала десять часов, и на седьмом часу жизни произошло переключение планировщика.

Конечно, люди, которые сидят и целый день терпят в ожидании, что операция добежит, очень рады, что мы экономим эти часы.

Что можно делать дальше? Есть места, в которых можно улучшить эту схему. Можно пытаться восстанавливать не только джобы, которые бежали во время последнего снепшота, но и джобы, запущенные уже после него.



Это сложно, потому что требует от нас попытки восстановить, а как же менялось состояние планировщика после последнего снепшота. Контроллер довольно сложная структура. Это я на пальцах объяснил, что там всего два воздействия и внутри она держит только список джобов. На самом деле она, конечно, гораздо сложнее. И восстановить это постфактум, зная, что моя предыдущая инкарнация запустила такой-то и такой-то джоб, интересно, в какой последовательности мутации она это сделала, очень сложно. Мы еще такое не умеем, мы над этим размышляем.



Более перспективным нам кажется другой способ: все состояние планировщика можно полностью поселить в некоторый персистентный storage. Не делать снепшоты периодически, а пусть состояние планировщика в любой момент времени живет в персистентном storage. Если вдруг этот планировщик падает, то на его место встает другой, фактически stateless-планировщик, и просто глядя на то состояние, с которым он работал, продолжает его модифицировать.

Что может выступать в роли storage? Кипарис не сможет: он, к сожалению, предназначен не для этого. У нас есть немного другая технология те самые горизонтальные масштабируемые key-value storage, о которых мы тоже не рассказывали. Но я надеюсь, что коллеги тоже разродятся докладом на эту тему. Я покажу последнюю картинку того, как это будет выглядеть в светлом будущем.



И поблагодарю вас за внимание, потому что на этом все. Всем спасибо.
Подробнее..

Рекомендации Друзей ВКонтакте ML на эго-графах

13.04.2021 14:10:30 | Автор: admin

Дружбы одна из важнейших механик любой социальной сети. Подавляющее большинство взаимодействий происходит между пользователями, которые дружат: мы видим и комментируем записи друг друга в лентах, заходим в список друзей, чтобы найти знакомых и написать сообщение. Именно поэтому рост социального графа так важен.

Меня зовут Женя Замятин, я работаю в команде Core ML ВКонтакте. Хочу рассказать, как устроены рекомендации, которые делают ближе пользователей самой крупной социальной сети рунета.

Обзор

Современные рекомендательные системы зачастую состоят из двух уровней, и наша не исключение. Задача первого уровня искать наиболее релевантных кандидатов среди всего множества пользователей (у нас их сотни миллионов). Такая постановка задачи подразумевает высокую скорость работы. Обычно здесь используют простые в применении модели вроде матричных факторизаций или эвристики на базе числа общих друзей. Полученные на первом уровне кандидаты отправляются на второй. Здесь на модель уже не накладываются такие жёсткие ограничения по скорости, её главная задача обеспечить максимальную точность предсказаний и сформировать список, который увидит пользователь. В этой статье мы рассмотрим только первый этап уровень отбора кандидатов.

Прежде всего сформулируем задачу, которую будем решать: для каждого пользователя необходимо найти k кандидатов, которых он с наибольшей вероятностью добавит в друзья. Метрика, на которую будем ориентироваться, recall@k. Она идеально описывает задачу: на первом уровне нам не интересен порядок кандидатов, но важна их релевантность.

Сначала рассмотрим базовые решения, придуманные десятки лет назад, но до сих пор актуальные. Первым приходит на ум одно из самых логичных эвристика на основе числа общих друзей. Для каждого пользователя отбираются кандидаты с наибольшим таким значением. Этот подход просто реализуется и неплох по качеству.

Ещё один важный метод рекомендаций Adamic/Adar. В его основе лежит всё тот же анализ общих друзей, но с модификацией: авторы предлагают учитывать число друзей у общего друга. Чем больше это значение, тем меньше информации о релевантности он несёт.

Кроме методов на основе анализа общих друзей, довольно распространены рекомендации на базе эмбеддингов. В Лаборатории искусственного интеллекта ВКонтакте в МФТИ мы провели исследование: сравнили эффективность разных подходов к задаче предсказания дружб в VK. Результаты совпали с нашим опытом решения на базе графовых эмбеддингов у нас работают плохо. Учитывая это, мы стали развивать систему отбора кандидатов по пути анализа общих друзей.

EGOML

Общая схема нашего метода продолжает идеи числа общих друзей и Adamic/Adar. Финальная мера релевантности E(u, v), с помощью которой мы будем отбирать кандидатов, всё так же раскладывается в сумму по общим друзьям u и v. Ключевое отличие в форме слагаемого под суммой: в нашем случае это мера ez_c(u, v).

Сначала попробуем понять физический смысл меры ez_c(u, v). Представим, что мы взяли пользователя c и спросили у него: Насколько вероятно, что два твоих друга, u и v, подружатся? Чем больше информации для оценки он учтёт, тем точнее будет его предсказание. Например, если c сможет вспомнить только число своих друзей, его рассуждения могут выглядеть следующим образом: Чем больше у меня друзей, тем менее вероятно, что случайные двое из них знакомы. Тогда оценка вероятность дружбы u и v (с точки зрения c) может выглядеть как 1/log(n), где n число друзей. Именно так устроен Adamic/Adar. Но что если c возьмёт больше контекста?

Прежде чем отвечать на этот вопрос, разберёмся, почему ez_c(u, v) важно определять через пользователя c. Дело в том, что в таком виде очень удобно решать задачу распределённо. Представим, что теперь мы разослали всем пользователям платформы анкету с просьбой оценить вероятность дружбы в каждой паре их друзей. Получив все ответы, мы можем подставить значения в формулу E(u, v). Именно так выглядит вычисление E(u, v) с помощью MapReduce:

  • Подготовка. Для каждого c выделяется тот контекст, который он будет учитывать для вынесения оценок. Например, в Adamic/Adar это будет просто список друзей.

  • Map. Спрашиваем у каждого c, что он думает про возможность дружбы в каждой паре его друзей. По сути, вычисляем ez_c(u, v) и сохраняем в виде (u, v) ez_c(u, v) для всех u, v in N(c). В случае Adamic/Adar: (u, v) 1/log|N(c)|.

  • Reduce. Для каждой пары (u, v) суммируем все соответствующие ей значения. Их будет ровно столько, сколько общих друзей у u и v.

Таким образом мы получаем все ненулевые значения E(u, v). Заметим: необходимое условие того, что E(u, v) > 0, существование хотя бы одного общего друга у u и v.

Эго-граф ХоппераЭго-граф Хоппера

Контекстом пользователя c в случае меры ez_c будет тот же список друзей, но дополненный информацией о связях внутри этого списка. Такую структуру в науке называют эго-графом. Если более формально, эго-граф вершины x это такой подграф исходного графа, вершинами которого являются все соседи x и сама x, а рёбрами все рёбра исходного графа между этими вершинами. Коллеги из Одноклассников написали подробную статью об эго-графах и затронули в ней вопрос их эффективного построения.

Ключевая идея меры ez_c в том, что её можно сделать обучаемой. Для каждого пользователя c, его эго-графа и всех пар пользователей u, v внутри него мы можем посчитать много разных признаков, например:

  • число общих друзей u и v внутри эго-графа c;

  • число общих друзей u и c;

  • интенсивность взаимодействий между v и c;

  • время, прошедшее с последней дружбы между u и кем-либо из эго-графа c;

  • плотность эго-графа c;

  • и другие.

Таким образом мы получим датасет с признаками. Но для обучения нужны ещё и метки. Пусть датасет был построен по состоянию графа на момент времени T. Тогда в качестве положительных примеров возьмём те пары пользователей, которые не были друзьями на момент T, но подружились к T + T. А как отрицательные все остальные, не подружившиеся, пары пользователей. Заметим: поскольку мы решаем задачу предсказания новых дружб, те пары пользователей, которые уже дружат на момент T, учитывать не нужно ни на обучении, ни на применении.

В конечном счёте мы получаем датасет следующего вида:

  • для каждой пары пользователей u и v, а также их общего друга c, посчитаны признаки по эго-графу c;

  • пара пользователей u и v встречается в датасете ровно столько раз, сколько у них общих друзей;

  • все пары пользователей в датасете не являются друзьями на момент времени T;

  • для каждой пары u и v проставлена метка подружились ли они в течение определённого промежутка времени начиная с T.

По такому датасету мы и будем обучать нашу меру ez_c. В качестве модели выбрали градиентный бустинг с pairwise функцией потерь, где идентификатором группы выступает пользователь u.
По сути, мера ez_c(u, v) определяется как предсказание описанной выше модели. Но есть один нюанс: при pairwise-обучении распределение предсказаний модели похоже на нормальное. Поэтому, если в качестве определения меры ez_c(u, v) взять сырое предсказание, может возникнуть ситуация, когда мы будем штрафовать финальную меру E(u, v) за общих друзей, так как значения предсказаний бывают отрицательными. Это выглядит не совсем логично хочется, чтобы с ростом числа общих друзей мера E(u, v) не убывала. Так что поверх предсказания модели мы решили взять экспоненту:

Такой подход хорошо себя показывает на небольших графах. Но чтобы применить его на реальных данных, необходимо выполнить ещё одно действие. Суть проблемы такая: мы не можем вычислять признаки и применять модель для каждой пары пользователей всех эго-графов это слишком долго. Для решения мы придумали специальный трюк. Представим, что наш градиентный бустинг обучился таким образом, что каждое дерево использует признаки только одного пользователя: либо u, либо v. Тогда мы могли бы разделить весь ансамбль на две группы: к группе A мы бы отнесли деревья, которые используют только признаки пользователя u, к B пользователя v. Предсказание такой модели можно представить в виде:

Имея такую модель, мы могли бы получить предсказания для всех пар пользователей одного эго-графа быстрее. Достаточно применить модели A и B для каждого пользователя, а затем сложить соответствующие парам предсказания. Таким образом, для эго-графа из n вершин мы могли бы сократить число применений модели с O(n^2) до O(n). Но как получить такую модель, каждое дерево которой зависит только от одного пользователя? Для этого сделаем следующее:

  1. Исключим из датасета все признаки, которые одновременно зависят и от u и от v. Например, от признака число общих друзей u и v внутри эго-графа c придётся отказаться.

  2. Обучим модель A, используя только признаки на базе u, c и эго-графа c.

  3. Для обучения модели B оставим только признаки на базе v, c и эго-графа c. Также в качестве базовых предсказаний передадим предсказания модели A.

Если объединим модели A и B, получим то что нужно: первая часть использует признаки u, вторая признаки v. Совокупность моделей осмысленна, поскольку B была обучена корректировать предсказания A. Эта оптимизация позволяет ускорить вычисления в сотни раз и делает подход применимым на практике. Финальный вид ez_c(u, v) и E(u, v) выглядит так:

Вычисление меры E в онлайне

Заметим, что E(u, v) можно представить в виде:

Эта формула скалярное произведение разреженных векторов, индексами которых являются пользователи, а значениями экспоненты предсказаний модели. Ненулевые значения здесь проставлены только у друзей u по сути это просто списки друзей с дополнительными значениями.

При построении рекомендаций мы уже вычислили предсказания моделей для всех существующих дружб. Поэтому для каждого пользователя мы можем собрать векторы и сложить их в доступное онлайн key-value хранилище. После этого сможем получать значение E(u, v) для любой пары пользователей в онлайне простой операцией перемножения векторов. Это даёт возможность использовать E(u, v) как лёгкую функцию релевантности в нагруженных местах либо как дополнительный признак финальной модели ранжирования.

Итог

В результате система EGOML позволяет:

  1. Распределённо отбирать кандидатов для каждого пользователя в офлайне. Асимптотическая сложность оптимизированного алгоритма составляет O(|E|) вычислений признаков и применений модели, где |E| число связей в графе. На кластере из 250 воркеров время работы алгоритма составляет около двух часов.

  2. Быстро вычислять меру релевантности E(u, v) для любой пары пользователей в онлайне. Асимптотическая сложность операции O(|N(u)| + |N(v)|).

  3. Улучшать качество рекомендаций, расширяя количество учтённых графов (по дружбам, скрытиям рекомендаций, отправленным сообщениям и другим графам) и добавляя всевозможные метки на рёбра и вершины. Например, интенсивность взаимодействий на ребре, дату образования ребра, город, место работы или учёбы пользователя.

В конечном счёте мы перешли со способа отбора кандидатов с использованием Adamic/Adar к системе EGOML и внедрили в модель второй уровень признаков на основе меры E(u, v). И это позволило увеличить количество подтверждённых дружб со всей платформы на несколько десятков процентов.

Благодарность

Хочу сказать спасибо руководителю команды Core ML Андрею Якушеву за помощь в разработке метода и подготовке статьи, а также всей команде Core ML за поддержку на разных этапах этой работы.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru