
Давайте посмотрим, какие решения есть у этой задачи.
Распределение объектов по мощности
Чтобы не углубляться в неинтересные абстракции, будем рассматривать на примере конкретной задачи мониторинга. Соотнести предлагаемые методики на свои конкретные задачи, уверен, вы сможете самостоятельно.
Равномощные объекты мониторинга
В качестве примера можно привести наши коллекторы метрик для Zabbix, которые исторически имеют с коллекторами логов PostgreSQL общую архитектуру.
И правда, каждый объект мониторинга (хост) генерирует для zabbix практически стабильно один и тот набор метрик с одной и той же частотой все время:

Как видно на графике, разница между min-max значениями количества генерируемых метрик не превышает 15%. Поэтому мы можем считать все объекты равными в одинаковых попугаях.
Сильный дисбаланс между объектами
В отличие от предыдущей модели, для коллекторов логов наблюдаемые хосты совсем не являются однородными.
Например, база рабочего один хост может генерировать в лог миллион планов за сутки, другой десятки тысяч, а какой-то и вовсе единицы. Да и сами эти планы по объему и сложности и по распределению во времени суток сильно отличаются. Так и получается, что нагрузку сильно качает, в разы:

Ну, а раз нагрузка может меняться настолько сильно, то надо учиться ей управлять
Координатор
Сразу понимаем, что нам явно понадобится масштабирование системы коллекторов, поскольку один отдельный узел со всей нагрузкой когда-то точно перестанет справляться. А для этого нам потребуется координатор тот, кто будет управлять всем зоопарком.
Получается примерно такая схема:

Каждый worker свою нагрузку в попугаях и в процентах CPU периодически сбрасывает master'у, те коллектору. А он, на основании этих данных, может выдать команду типа новый хост посадить на ненагруженный worker#4 или hostA надо пересадить на worker#3.
Тут еще надо помнить, что, в отличие от объектов мониторинга, сами коллекторы имеют вовсе не равную мощность например, на одном у вас может оказаться 8 ядер CPU, а на другом только 4, да еще и меньшей частоты. И если нагрузить их задачами поровну, то второй начнет затыкаться, а первый простаивать. Отсюда и вытекают
Задачи координатора
По сути, задача всего одна обеспечивать максимально равномерное распределение всей нагрузки (в %cpu) по всем доступным worker'ам. Если мы сможем решить ее идеально, то и равномерность распределения %cpu-нагрузки по коллекторам получим автоматом.
Понятно, что, даже если каждый объект генерирует одинаковую нагрузку, со временем какие-то из них могут отмирать, а какие-то возникать новые. Поэтому управлять всей ситуацией надо уметь динамически и поддерживать баланс постоянно.
Динамическая балансировка
Простую задачу (zabbix) мы можем решить достаточно банально:
- вычисляем относительную мощность каждого коллектора в задачах
- делим все задачи между ними пропорционально
- между worker'ами распределяем равномерно

Но что делать в случае сильно неравных объектов, как для коллектора логов?..
Оценка равномерности
Выше мы все время употребляли термин "максимально равномерное распределение", а как вообще можно формально сравнить два распределения, какое из них равномернее?
Для оценки равномерности в математике давно существует такая вещь как среднеквадратичное отклонение. Кому лениво вчитываться:
S[X] = sqrt( sum[ ( x -
avg[X] ) ^ 2 of X ] / count[X] )
Поскольку количество worker'ов на каждом из коллекторов у нас тоже может отличаться, то нормировать разброс по нагрузке надо не только между ними, но и между коллекторами в целом.
То есть распределение нагрузки по worker'ам двух коллекторов
[ (10%, 10%, 10%, 10%, 10%, 10%) ; (20%) ]
это тоже не
очень хорошо, поскольку на первом получается 10%, а на
втором 20%, что как бы вдвое больше в относительных
величинах.Поэтому введем единую метрику-расстояние для общей оценки равномерности:
d([%wrk], [%col]) = sqrt(
S[%wrk] ^ 2 + S[%col] ^ 2 )
То есть величины среднеквадратичного отклонения для наборов величин
нагрузки по всем worker'ам и по всем коллекторам воспринимаем как
координаты вектора, длину которого будем стараться
минимизировать.Моделирование
Если бы объектов у нас было немного, то мы могли бы полным перебором разложить их между worker'ами так, чтобы метрика оказалась минимальной. Но объектов у нас тысячи, поэтому такой способ не подойдет. Зато мы знаем, что коллектор умеет перемещать объект с одного worker'а на другой давайте этот вариант и смоделируем, используя метод градиентного спуска.
Понятно, что идеальный минимум метрики мы так можем и не найти, но локальный точно. Да и сама нагрузка может изменяться во времени настолько сильно, что искать за бесконечное время идеал абсолютно незачем.
То есть нам осталось всего лишь определить, какой объект и на какой worker эффективнее всего переместить. И сделаем это банальным переборным моделированием:
- для каждой пары (целевой host, worker) моделируем перенос нагрузки
-
нагрузку от host внутри исходного worker'а считаем
пропорционально попугаям
В нашем случае за попугая оказалось вполне разумно взять объем получаемого потока логов в байтах. - относительную мощность между коллекторами считаем пропорциональной суммарным попугаям
- вычисляем метрику d для перенесенного состояния
Выстраиваем все пары по возрастанию метрики. В идеале, нам всегда стоит реализовать перенос именно первой пары, как дающий минимальную целевую метрику. К сожалению, в реальности сам процесс переноса стоит ресурсов, поэтому не стоит запускать его для одного и того же объекта чаще определенного интервала охлаждения.
В этом случае мы можем взять вторую, третью, по рангу пару лишь бы целевая метрика уменьшалась относительно текущего значения.
Если же уменьшать некуда вот он локальный минимум!
Пример на картинке:

Запускать итерации до упора при этом вовсе не обязательно. Например, можно делать усредненный анализ нагрузки на интервале 1 мин, и по его завершению делать единственный перенос.
Микро-оптимизации
Понятно, что алгоритм со сложностью
T(целей) x
W(процессов)
это не очень хорошо. Но в нем стоит не забыть
применить некоторые более-менее очевидные оптимизации, которые его
могут ускорить в разы.Нулевые попугаи
Если на замеренном интервале объект/задача/хост сгенерировал нагрузку 0 штук, то его не то что перемещать куда-то его даже рассматривать и анализировать не надо.
Самоперенос
При генерации пар нет необходимости оценивать эффективность переноса объекта на тот же самый worker, где он и так находится. Все-таки уже будет
T x (W - 1)
мелочь, а
приятно!Неразличимая нагрузка
Поскольку мы моделируем все-таки перенос именно нагрузки, а объект всего лишь инструмент, то пробовать переносить одинаковый %cpu нет смысла значения метрик останутся точно те же, хоть и для другого распределения объектов.
То есть достаточно оценить единственную модель для кортежа (wrkSrc, wrkDst, %cpu). Ну, а одинаковыми вы можете считать, например, значения %cpu, совпадающие до 1 знака после запятой.
var col = { 'c1' : { 'wrk' : { 'w1' : { 'hst' : { 'h1' : 5 , 'h2' : 1 , 'h3' : 1 } , 'cpu' : 80.0 } , 'w2' : { 'hst' : { 'h4' : 1 , 'h5' : 1 , 'h6' : 1 } , 'cpu' : 20.0 } } }, 'c2' : { 'wrk' : { 'w1' : { 'hst' : { 'h7' : 1 , 'h8' : 2 } , 'cpu' : 100.0 } , 'w2' : { 'hst' : { 'h9' : 1 , 'hA' : 1 , 'hB' : 1 } , 'cpu' : 50.0 } } }};// вычисляем опорные метрики и нормализуем по "мощности"let $iv = (obj, fn) => Object.values(obj).forEach(fn);let $mv = (obj, fn) => Object.values(obj).map(fn);// initial reparsefor (const [cid, c] of Object.entries(col)) { $iv(c.wrk, w => { w.hst = Object.keys(w.hst).reduce((rv, hid) => { if (typeof w.hst[hid] == 'object') { rv[hid] = w.hst[hid]; return rv; } // нулевые значения ничего не решают, поэтому сразу отбрасываем if (w.hst[hid]) { rv[hid] = {'qty' : w.hst[hid]}; } return rv; }, {}); }); c.wrk = Object.keys(c.wrk).reduce((rv, wid) => { // ID воркеров должны быть глобально-уникальны rv[cid + ':' + wid] = c.wrk[wid]; return rv; }, {});}// среднеквадратичное отклонениеlet S = col => { let wsum = 0 , wavg = 0 , wqty = 0 , csum = 0 , cavg = 0 , cqty = 0; $iv(col, c => { $iv(c.wrk, w => { wsum += w.cpu; wqty++; }); csum += c.cpu; cqty++; }); wavg = wsum/wqty; wsum = 0; cavg = csum/cqty; csum = 0; $iv(col, c => { $iv(c.wrk, w => { wsum += (w.cpu - wavg) ** 2; }); csum += (c.cpu - cavg) ** 2; }); return [Math.sqrt(wsum/wqty), Math.sqrt(csum/cqty)];};// метрика-расстояниеlet distS = S => Math.sqrt(S[0] ** 2 + S[1] ** 2);// выбираем оптимальный перенос и моделируем егоlet iterReOrder = col => { let qty = 0 , max = 0; $iv(col, c => { c.qty = 0; c.cpu = 0; $iv(c.wrk, w => { w.qty = 0; $iv(w.hst, h => { w.qty += h.qty; }); w.max = w.qty * (100/w.cpu); c.qty += w.qty; c.cpu += w.cpu; }); c.cpu = c.cpu/Object.keys(c.wrk).length; c.max = c.qty * (100/c.cpu); qty += c.qty; max += c.max; }); $iv(col, c => { c.nrm = c.max/max; $iv(c.wrk, w => { $iv(w.hst, h => { h.cpu = h.qty/w.qty * w.cpu; h.nrm = h.cpu * c.nrm; }); }); }); // "текущее" среднеквадратичное отклонение console.log(S(col), distS(S(col))); // формируем набор хостов и воркеров let wrk = {}; let hst = {}; for (const [cid, c] of Object.entries(col)) { for (const [wid, w] of Object.entries(c.wrk)) { wrk[wid] = { wid , cid , 'wrk' : w , 'col' : c }; for (const [hid, h] of Object.entries(w.hst)) { hst[hid] = { hid , wid , cid , 'hst' : h , 'wrk' : w , 'col' : c }; } } } // реализация переноса нагрузки на целевой worker let move = (col, hid, wid) => { let w = wrk[wid] , h = hst[hid]; let wsrc = col[h.cid].wrk[h.wid] , wdst = col[w.cid].wrk[w.wid]; wsrc.cpu -= h.hst.cpu; wsrc.qty -= h.hst.qty; wdst.qty += h.hst.qty; // перенос на другой коллектор с "процентованием" нагрузки на CPU if (h.cid != w.cid) { let csrc = col[h.cid] , cdst = col[w.cid]; csrc.qty -= h.hst.qty; csrc.cpu -= h.hst.cpu/Object.keys(csrc.wrk).length; wsrc.hst[hid].cpu = h.hst.cpu * csrc.nrm/cdst.nrm; cdst.qty += h.hst.qty; cdst.cpu += h.hst.cpu/Object.keys(cdst.wrk).length; } wdst.cpu += wsrc.hst[hid].cpu; wdst.hst[hid] = wsrc.hst[hid]; delete wsrc.hst[hid]; }; // моделирование и оценка переноса для пары (host, worker) let moveCheck = (orig, hid, wid) => { let w = wrk[wid] , h = hst[hid]; // тот же воркер - ничего не делаем if (h.wid == w.wid) { return; } let col = JSON.parse(JSON.stringify(orig)); move(col, hid, wid); return S(col); }; // хэш уже проверенных переносов (hsrc,hdst,%cpu) let checked = {}; // перебираем все возможные пары (какой хост -> на какой воркер) let moveRanker = col => { let currS = S(col); let order = []; for (hid in hst) { for (wid in wrk) { // нет смысла пробовать повторно перемещать одну и ту же (с точностью до 0.1%) "мощность" между одной парой воркеров let widsrc = hst[hid].wid; let idx = widsrc + '|' + wid + '|' + hst[hid].hst.cpu.toFixed(1); if (idx in checked) { continue; } let _S = moveCheck(col, hid, wid); if (_S === undefined) { _S = currS; } checked[idx] = { hid , wid , S : _S }; order.push(checked[idx]); } } order.sort((x, y) => distS(x.S) - distS(y.S)); return order; }; let currS = S(col); let order = moveRanker(col); let opt = order[0]; console.log('best move', opt); // реализуем перенос if (distS(opt.S) < distS(currS)) { console.log('move!', opt.hid, opt.wid); move(col, opt.hid, opt.wid); console.log('after move', JSON.parse(JSON.stringify(col))); return true; } else { console.log('none!'); } return false;};// пока есть что-куда переноситьwhile(iterReOrder(col));
В результате, нагрузка по нашим коллекторам распределяется практически одинаково в каждый момент времени, оперативно нивелируя возникающие пики:
