Русский
Русский
English
Статистика
Реклама

Распараллеливание

Немного об ускорении программы распараллеливание (ручное или автоматическое) на базе сверхоптимистичных вычислений

18.08.2020 20:07:52 | Автор: admin
Здравствуйте, уважаемые читатели. В этой публикации речь пойдет о такой (уже ставшей привычной) вещи как ускорение работы программы путем применения параллельных вычислений. Технологии организации таких вычислений известны это и обычное многопоточное программирование, и применение специальных интерфейсов: OpenMP, OpenAcc, MPI, DVM и многих других (при этом распараллеливаются циклы, используется векторизация или конвейеризация, организуются ленивые вычисления, выделяются независимые блоки программы, которые можно запустить в параллель и т.п.).

При этом обычно исходят из той идеи, что распараллеливание не должно каким-то образом влиять на результаты исполнения программы. Это жесткое, но справедливое для многих случаев требование. Однако если мы пытаемся распараллелить программу, ведущую какие-либо расчеты численными методами (обучаем нейронную сеть, моделируем динамику жидкости или молекулярной системы, решаем обыкновенные дифференциальные уравнения или оптимизационные задачи), то результат и так (в любом случае) будет иметь некоторую погрешность. Поэтому, почему бы не применить рискованные технологии распараллеливания, которые могут внести в математическое решение небольшую дополнительную погрешность, но позволят получить еще некоторое дополнительное ускорение? Об одной из таких технологий о расщеплении тел циклов с предсказанием промежуточных результатов и откатом при неудачном предсказании (собственно, это и есть сверхоптимистичные вычисления в частично транзакционной памяти) и пойдет речь.

Идея распараллеливания


Предположим, что мы имеем цикл, тело которого состоит из двух последовательных частей, причем вторая часть зависит от первой. Пусть и отдельные витки цикла зависят друг от друга. Например:

for (int i = 0; i < N; i++) {x = f(y);y = g(x);}

На первый взгляд, распараллелить такой цикл невозможно. Однако мы попробуем. Попытаемся исполнять параллельно первый и второй операторы тела цикла. Проблема состоит в том, что на момент вычисления g(x) должен быть известен x, но он будет рассчитан только в конце первой части. Что же, введем некоторую схему, которая в начале второй части попытается предсказать новое значение x. Можно это сделать, например, с помощью линейной предикции, которая обучится предсказывать новое значение x, опираясь на историю его изменения. Тогда вторую часть можно считать параллельно с первой (это и есть сверхоптимизм), а когда обе будут подсчитаны, сравнить предсказанное значение x с реальным, полученным в конце первой части. Если они примерно равны, то результат вычислений обеих частей можно принять (и перейти к следующему витку цикла). А если они сильно отличаются, то потребуется пересчитать только вторую часть. При такой схеме в какой-то части случаев получим чистое распараллеливание, в остальных фактический последовательный счет. Алгоритм выполнения цикла при этом такой:

for (int i = 0; i < N; i++) {Распараллеливаем на два ядра {На ядре 1  считаем x = f(y). Далее передаем во вторую часть получение значение x;На ядре 2  предсказываем значение x* и считаем y* = g(x*). Получаем значение x из первой части и сравниваем его с x*. Если разница невелика, то y = y* и завершаем итерацию цикла. Если различие большое, повторяем вычисление с новыми данными: y = g(x). }}

Базовый алгоритм ясен. Теоретическое ускорение в два раза, но на практике будет, конечно, меньше, поскольку: а) часть времени тратится на предсказания и согласования; б) не все итерации выполнятся параллельно; в) первая и вторая части тела цикла могут иметь различную трудоемкость (в идеале требуется равная). Перейдем к реализации.

Реализация распараллеливания сверхоптимистичные вычисления


Поскольку в алгоритме распараллеливания идет речь об отмене части расчетов (при неудаче) и их повторном выполнении, здесь явно есть что-то от идеи работы в транзакционной памяти. Лучше в частично транзакционной, где определенные переменные работают по схеме транзакционной памяти, а остальные переменные как обычно. Передачу данных из первой части во вторую можно организовать с помощью некоторого специального канала. Пусть этот канал будет предсказывающим: а) если на момент приема данные в канал уже переданы, то они из него и читаются, б) если на момент приема данные в канал еще не поступили, то он пытается предсказать эти данные и возвращает результат предсказания. Этот канал и будет работать по немного не свойственной обычной транзакционной памяти схеме: если в конце транзакции второй части цикла обнаружится расхождение между поступившими в канал данными и предсказанными им данными, то транзакция второй части цикла отменяется и исполняется повторно, при этом из канала будут читаться уже не предсказания, а реально пришедшие данные. Цикл приобретет вид:

for (int i = 0; i < N; i++) {Распараллеливаем на два ядра, включаем частично транзакционную память {Ядро 1 (транзакция 1):x = f(y);Предсказывающий_Канал.put(x);Ядро 2 (транзакция 2):Предсказывающий_Канал.get(x);y = g(x);}}

Готово. Заботу о предсказании данных взял на себя канал, заботу об отмене расчетов при излишне оптимистичном предсказании взяла на себя частично транзакционная память.

Некоторые полезные применения: нейронные сети, метод частиц в ячейках


Такую схему распараллеливания тела цикла можно применить в ряде математических алгоритмов, например, при моделировании электростатической линзы методом частиц в ячейках, а также при обучении нейронной сети прямого распространения методом обратного распространения ошибки. Первая задача очень специальная, поэтому обсуждать ее здесь я не буду, скажу только, что изложенный подход к распараллеливанию дал ускорение на 10-15%. А вот вторая задача уже более популярная, поэтому о ней несколько слов сказать просто необходимо.

Цикл обучения нейронной сети включает последовательный проход по обучающим парам, причем для каждой пары выполняется прямой ход (расчет выхода сети) и обратный ход (коррекция весов и смещений). Это и есть две части тела цикла по обучающим парам и для их распараллеливания можно применить вышеизложенный подход (кстати, его можно применить и при параллельном проходе по обучающим парам, с незначительными изменениями). В результате на типичной задаче обучения нейронной сети я получил 50% выигрыша по скорости работы.

Автоматизация распараллеливания C-программ


Идея сверхоптимистичных вычислений не очень сложна, поэтому была написана специальная программа-транслятор, которая занимается автоматическим распараллеливанием находит в исходной C-программе циклы, для которых такое распараллеливание может дать положительный результат и расщепляет их тела на две части, вставляя необходимые директивы OpenMP, находя потенциальные переменные для каналов, подключая библиотеку работы с частично транзакционной памятью и предицирующими каналами и, в конечном итоге, порождая выходную распараллеленную программу.

В частности, такой транслятор был применен к программе моделирования электростатической линзы. Приведу обе программы исходную (в которую включена директива-указание на распараллеливание циклов) и полученную после трансляции.

Исходная программа:


#include <stdlib.h>#include <stdio.h>#include <math.h>#pragma auto parallelize#pragma auto pure(malloc,fabs,free,sizeof,omp_get_wtime)#define theta 1.83#define NX 40#define NY 40#define h 0.1#define NP 15000// Собирающая электростатическая линза#define U1 200#define U2 5000#define e -1.5E-13#define m 1E-11#define e0 8.85E-12#define V (h*h)#define tau 0.000015#define T 0.09#define POISSON_EPS 0.01#define TOL_EPS 0.25int main() {        double * U  = (double *)malloc(NY*NX*sizeof(double));        double * UU = (double *)malloc(NY*NX*sizeof(double));        double * EX = (double *)malloc(NY*NX*sizeof(double));        double * EY = (double *)malloc(NY*NX*sizeof(double));double * PX = (double *)malloc(NP*sizeof(double));double * PY = (double *)malloc(NP*sizeof(double));int * X = (int *)malloc(NP*sizeof(int));int * Y = (int *)malloc(NP*sizeof(int));double ro[NY][NX];split_private double t;split_private double tm;split_private int i, j;for (i = 0; i < NY; i++)for (j = 0; j < NX; j++) {UU[i*NX+j] = j == NX-1 ? U2 : j == NX/2 && (i < NY/4 || i > 3*NY/4) ? U1 : 0.0;EX[i*NX+j] = 0.0;EY[i*NX+j] = 0.0;}for (i = 0; i < NP; i++) {int x, y;PX[i] = 0.5*NX*h*rand()/RAND_MAX;PY[i] = NY*h*rand()/RAND_MAX;x = PX[i]/h;y = PY[i]/h;if (x < 0) x = 0;else if (x > NX-1) x = NX-1;if (y < 0) y = 0;else if (y > NY-1) y = NY-1;X[i] = x;Y[i] = y;}tm = omp_get_wtime();for (t = 0.0; t < T; t += tau) {unsigned int n[NY][NX] = { 0 };double err;int ptr = 0;for (i = 0; i < NY; i++)    for (j = 0; j < NX; j++, ptr++)U[ptr] = UU[ptr];for (i = 1; i < NY - 1; i++)for (j = 1; j < NX - 1; j++) {EX[i*NX+j] = -(U[i*NX+j+1]-U[i*NX+j-1])/2.0/h;EY[i*NX+j] = -(U[(i+1)*NX+j]-U[(i-1)*NX+j])/2.0/h;}for (i = 0; i < NP; i++) {PX[i] += tau*e*EX[Y[i]*NX+X[i]]/m;PY[i] += tau*e*EY[Y[i]*NX+X[i]]/m;}for (i = 0; i < NP; i++) {int x = PX[i]/h;int y = PY[i]/h;if (x < 0) x = 0;else if (x > NX-1) x = NX-1;if (y < 0) y = 0;else if (y > NY-1) y = NY-1;Y[i] = y;X[i] = x;n[y][x]++;}for (i = 0; i < NY; i++)for (j = 0; j < NX; j++)ro[i][j] = n[i][j]*e/V;do {err = 0.0;for (i = 1; i < NY - 1; i++)for (j = 1+(i-1)%2; j < NX - 1; j+=2) {  int ptr = i*NX + j;  if (!(j == NX/2 && (i < NY/4 || i > 3*NY/4))) {double _new = (1-theta)*UU[ptr] + theta/4.0*(UU[ptr-1]+UU[ptr+1]+UU[ptr+NX]+UU[ptr-NX]-h*h*ro[i][j]/e0);double loc_err = fabs(UU[ptr] - _new);if (loc_err > err) err = loc_err;UU[ptr] = _new;  }}for (i = 1; i < NY - 1; i++)for (j = 1+i%2; j < NX - 1; j+=2) {  int ptr = i*NX + j;  if (!(j == NX/2 && (i < NY/4 || i > 3*NY/4))) {double _new = (1-theta)*UU[ptr] + theta/4.0*(UU[ptr-1]+UU[ptr+1]+UU[ptr+NX]+UU[ptr-NX]-h*h*ro[i][j]/e0);double loc_err = fabs(UU[ptr] - _new);if (loc_err > err) err = loc_err;UU[ptr] = _new;  }}for (j = 0; j < NX; j++) {UU[j] = UU[NX + j];UU[(NY-1)*NX + j] = UU[(NY-2)*NX + j];}} while (err > POISSON_EPS);}for (i = 0; i < NY; i++) {for (j = 0; j < NX; j++)printf("%lf\t", UU[i*NX+j]);printf("\n");}return 0;}

Автоматически распараллеленная программа


#include "transact.h"#define split_private /* split-private */#include <stdlib.h>#include <stdio.h>#include <math.h>#define theta 1.83#define NX 40#define NY 40#define h 0.1#define NP 15000#define U1 200#define U2 5000#define e -1.5E-13#define m 1E-11#define e0 8.85E-12#define V (h*h)#define tau 0.000015#define T 0.09#define POISSON_EPS 0.01#define TOL_EPS 0.25int  main(  ){  double * U  = (double *)malloc(NY*NX*sizeof(double));  double * UU = (double *)malloc(NY*NX*sizeof(double));  double * EX = (double *)malloc(NY*NX*sizeof(double));  double * EY = (double *)malloc(NY*NX*sizeof(double));  double * PX = (double *)malloc(NP*sizeof(double));  double * PY = (double *)malloc(NP*sizeof(double));  int * X = (int *)malloc(NP*sizeof(int));  int * Y = (int *)malloc(NP*sizeof(int));  double ro[NY][NX];  split_private double t;  split_private double tm;  split_private int i, j;  for ( i = 0; i < NY; i++ )    for ( j = 0; j < NX; j++ )      {        UU[i*NX+j] = j == NX-1 ? U2 : j == NX/2 && (i < NY/4 || i > 3*NY/4) ? U1 : 0.0;        EX[i*NX+j] = 0.0;        EY[i*NX+j] = 0.0;      }  for ( i = 0; i < NP; i++ )    {      int x, y;      PX[i] = 0.5*NX*h*rand()/RAND_MAX;      PY[i] = NY*h*rand()/RAND_MAX;      x = PX[i]/h;      y = PY[i]/h;      if ( x < 0 )        x = 0;      else        if ( x > NX-1 )          x = NX-1;      if ( y < 0 )        y = 0;      else        if ( y > NY-1 )          y = NY-1;      X[i] = x;      Y[i] = y;    }  tm = omp_get_wtime();#pragma omp parallel num_threads(2) private(t,tm,i,j)   {    int __id__ = omp_get_thread_num();    TOut<double > * out_ro = __id__ == 0 ? new TOut<double >("ro63", (NY)*(NX), 2, 0.01, -1, "63") : NULL;    TIn<double > * in_ro = __id__ == 1 ? new TIn<double >("ro63", (NY)*(NX), 2, 0.01, -1, "63") : NULL;    for ( t = 0.0; t < T; t += tau )      {        unsigned int n[NY][NX] = { 0 };        double err;        int ptr = 0;        if ( __id__ == 0 )          {            for ( i = 0; i < NY; i++ )              for ( j = 0; j < NX; j++, ptr++ )                U[ptr] = UU[ptr];          }transaction_atomic("63")        {          if ( __id__ == 0 )            {              for ( i = 1; i < NY - 1; i++ )                for ( j = 1; j < NX - 1; j++ )                  {                    EX[i*NX+j] = -(U[i*NX+j+1]-U[i*NX+j-1])/2.0/h;                    EY[i*NX+j] = -(U[(i+1)*NX+j]-U[(i-1)*NX+j])/2.0/h;                  }              for ( i = 0; i < NP; i++ )                {                  PX[i] += tau*e*EX[Y[i]*NX+X[i]]/m;                  PY[i] += tau*e*EY[Y[i]*NX+X[i]]/m;                }              for ( i = 0; i < NP; i++ )                {                  int x = PX[i]/h;                  int y = PY[i]/h;                  if ( x < 0 )                    x = 0;                  else                    if ( x > NX-1 )                      x = NX-1;                  if ( y < 0 )                    y = 0;                  else                    if ( y > NY-1 )                      y = NY-1;                  Y[i] = y;                  X[i] = x;                  n[y][x]++;                }              for ( i = 0; i < NY; i++ )                for ( j = 0; j < NX; j++ )                  ro[i][j] = n[i][j]*e/V;              out_ro->put((double  *)ro);            }          else            {              double  ro[NY][NX];              in_ro->get((double  *)ro, 0);              do                {                  err = 0.0;                  for ( i = 1; i < NY - 1; i++ )                    for ( j = 1+(i-1)%2; j < NX - 1; j+=2 )                      {                        int ptr = i*NX + j;                        if ( !(j == NX/2 && (i < NY/4 || i > 3*NY/4)) )                          {                            double _new = (1-theta)*UU[ptr] + theta/4.0*(UU[ptr-1]+UU[ptr+1]+UU[ptr+NX]+UU[ptr-NX]-h*h*ro[i][j]/e0);                            double loc_err = fabs(UU[ptr] - _new);                            if ( loc_err > err )                              err = loc_err;                            UU[ptr] = _new;                          }                      }                  for ( i = 1; i < NY - 1; i++ )                    for ( j = 1+i%2; j < NX - 1; j+=2 )                      {                        int ptr = i*NX + j;                        if ( !(j == NX/2 && (i < NY/4 || i > 3*NY/4)) )                          {                            double _new = (1-theta)*UU[ptr] + theta/4.0*(UU[ptr-1]+UU[ptr+1]+UU[ptr+NX]+UU[ptr-NX]-h*h*ro[i][j]/e0);                            double loc_err = fabs(UU[ptr] - _new);                            if ( loc_err > err )                              err = loc_err;                            UU[ptr] = _new;                          }                      }                  for ( j = 0; j < NX; j++ )                    {                      UU[j] = UU[NX + j];                      UU[(NY-1)*NX + j] = UU[(NY-2)*NX + j];                    }                }              while ( err > POISSON_EPS )                ;            }        }      }    delete in_ro;    delete out_ro;  }  for ( i = 0; i < NY; i++ )    {      for ( j = 0; j < NX; j++ )        printf("%lf\t", UU[i*NX+j]);      printf("\n");    }  return 0;}

Итоги


Итак, иногда можно пытаться распараллелить программу даже в случаях, когда она состоит из строго последовательных фрагментов, и даже получать положительные результаты по ускорению (в моих экспериментах прирост ускорения от 15 до 50%). Надеюсь, что эта небольшая статья окажется кому-нибудь полезной.
Подробнее..

Game of Life с битовой магией, многопоточностью и на GPU

03.07.2020 20:16:17 | Автор: admin

Всем привет!


Недавняя статья на Хабре в очередной раз показала неостывающий интерес к игре Жизнь в частности и всевозможным оптимизациям в общем. Статья и комментарии к ней, особенно любопытство к вычислениям на GPU, вдохновили меня на то, чтобы поделиться своими изысканиями на данном поприще и, забегая вперёд, скажу, что повествование пойдёт о расчётах на GPU, битовой магии, многопоточности и огромных полях для игры Жизнь, порядка миллиарда клеток.



О себе


Пару слов о себе и проекте. Вот уже несколько лет моим хобби и pet-проектом является написание игры Жизнь, в ходе которого я разбираюсь в интересующих меня темах. Так, сперва я сделал её с помощью библиотеки glut и прикрутил многопользовательский режим, будучи вдохновлённым архитектурой peer-to-peer. А около двух лет назад решил начать всё с нуля, используя Qt и вычисления на GPU.

Масштабы


Идея нового проекта заключалась в том, чтобы сделать кроссплатформенное приложение, в первую очередь для мобильных устройств, в котором пользователи смогут окунуться в игру Жизнь, познакомиться с разнообразием всевозможных паттернов, наблюдать за причудливыми формами и комбинациями этой игры, выбрав скорость от 1 до 100 мс на шаг и задав размер игрового поля вплоть до 32'768 x 32'768, то есть 1'073'741'824 клетки.

На всякий случай напомню об игре Жизнь. Игра происходит на плоскости, поделённой на множество клеток. Клетки квадратные, соответственно, для каждой клетки есть 8 соседей. Каждая клетка может быть либо живой, либо мёртвой, то есть пустой. В рамках игры пользователь заполняет клетки жизнью, выставляя на поле заинтересовавшие его комбинации клеток паттерны.
Далее процесс шаг за шагом развивается по заданным правилам:
  • в пустой клетке, рядом с которой ровно 3 живые клетки, зарождается жизнь
  • если у живой клетки есть 2 или 3 живых соседки, то эта клетка продолжает жить
  • если у живой клетки меньше 2 или больше 3 живых соседки, клетка умирает

Прозорливый читатель может догадаться, что наивная реализация и близко не справится с таким объёмом вычислений, и поэтому в ход пошли всевозможные оптимизации.

В моей реализации поле зациклено, то есть после 32'767-й клетки вновь следует 0-я. Таким образом, перемещающиеся паттерны способны обогнуть поле и оказаться в точке, где они были размещены изначально. Забавный эффект случается с паттерном планёрное ружьё, когда выпущенные им планёры в конечном итоге врезаются в само ружьё и разрушают его, этакое самоубийство в мире клеточных автоматов:

Gosper glider gun


Архитектура


Что касается реализации, то в первую очередь хочется отметить, что каждая клетка в игровом поле представляет собой всего лишь 1 бит в памяти. Так, при выборе размера игрового поля в памяти выделяется буфер размером n^2 / 8 байт. Это улучшает локальность данных и снижает объём потребляемой памяти, а главное позволяет применить ещё более хитроумные оптимизации, о которых поговорим ниже. Игровое поле всегда квадратное и его сторона всегда степени 2, в частности затем, чтобы без остатка осуществлять деление на 8.

Архитектурно выделяются два слоя, ответственные за вычисления:
  • низкий уровень платформозависимый; на текущий момент существует реализация на Metal API графическое API от Apple позволяет задействовать GPU на устройствах от Apple, а также fallback-реализация на CPU. Одно время существовала версия на OpenCL. Планируется реализация на Vulkan API для запуска на Android
  • высокий уровень кроссплатформенный; по сути класс-шаблонный метод, делегирующий реализацию нужных методов на низкий уровень

Низкий уровень


Задача низкого уровня заключается непосредственно в расчёте состояния игрового поля и устроена следующим образом. В памяти хранятся два буфера n^2 / 8 байт. Первый буфер служит как input текущее состояние игрового поля. Второй буфер как output, в который записывается новое состояние игрового поля в процессе расчётов. По завершении вычислений достаточно сделать swap буферов и следующий шаг игры готов. Такая архитектура позволяет распараллелить расчёты в силу константности input. Дело в том, что каждый поток может независимо обрабатывать клетки игрового поля.

За счёт многопоточности GPU-подход добивается максимальной эффективности. На CPU также происходит распараллеливание, но, соответственно, с гораздо меньшим числом потоков и меньшей эффективностью, что отражено в секции бенчмарков ниже. Сам алгоритм идентичен для всех реализаций и заключается в следующем.

Назовём атомарной операцией расчёт одного байта буфера, хранящего игровое поле, то есть расчёт восьми клеток. Этот расчёт совершается одним потоком и требует информации о соседних восьми байтах. Попробуем схематично изобразить один байт следующим образом, где точка это бит:

[........]


А участок игрового поля с интересующим нас байтом и его восемью соседними байтами изобразим так:

[........][........][........]
[........][********][........]
[........][........][........]


Исходя из рисунка, можно догадаться, что все соседние клетки можно уложить в один uint64_t (назовём его neighbours), а ещё в одном uint8_t (назовём его self), будет храниться информация о соседях в самом обрабатываемом байте.

Рассмотрим на примере расчёт 0-го бита целевого байта. Звёздочкой отметим интересующий бит, а нулём соседние для него биты:

[.......0][00......][........]
[.......0][*0......][........]
[.......0][00......][........]


Пример посложнее. Здесь звёздочкой отмечены 0-й, 3-й и 7-й биты целевого байта и соответствующими числами соседние биты:

[.......0][00333.77][7.......]
[.......0][*03*3.7*][7.......]
[.......0][00333.77][7.......]


Думаю, кто-то из читателей уже догадался, в чём заключается магия. Имея указанную информацию, остаётся лишь составить битовые маски для каждого бита целевого байта и применить их к neighbours и self соответственно. Результатом станут 2 значения, сумма единичных бит которых будет характеризовать число соседей, что можно интерпретировать как правила игры Жизнь: 2 или 3 бита для поддержания жизни в живой клетке и 3 бита для зарождения новой жизни в пустой клетке. В противном случае клетка остаётся/становится пустой.

Описанные вычисления на нижнем уровне происходят каждый шаг игры, и поле всегда обрабатывается целиком вне зависимости от его заполненности и без трюков с мемоизацией а-ля Hashlife.

После заполнения всего выходного буфера игровое поле считается рассчитанным.
Код Metal-шейдера по обработке 1 байта
// Бросается в глаза C++-подобный синтаксис#include <metal_stdlib>#include <metal_integer>using namespace metal;// Вспомогательные функции для вычисления позиции клеткиushort2 pos(uint id) {   return ushort2(id % WIDTH, id / HEIGHT); }uint idx(ushort2 pos) {   return pos.x + pos.y * HEIGHT; }ushort2 loopPos(short x, short y) {   return ushort2((x + WIDTH) % WIDTH, (y + HEIGHT) % HEIGHT); }// Битовые маски для вычисления соседей интересующего битаtemplate<uint Bit> struct Mask {   constant constexpr static uint c_n_e_s_w = 0x70007 << (Bit - 1);   constant constexpr static uint c_nw_ne_se_sw = 0x0;   constant constexpr static uint c_self = 0x5 << (Bit - 1); };template<> struct Mask<0> {   constant constexpr static uint c_n_e_s_w = 0x80030003;   constant constexpr static uint c_nw_ne_se_sw = 0x80000080;   constant constexpr static uint c_self = 0x2; };template<> struct Mask<7> {   constant constexpr static uint c_n_e_s_w = 0xC001C0;   constant constexpr static uint c_nw_ne_se_sw = 0x10100;   constant constexpr static uint c_self = 0x40; };// Для указанного бита функция вычисляет состояние клетки в зависимости от её соседей, применяя соответствующие биту маскиtemplate<uint Bit>uint isAlive(uint self, uint n_e_s_w, uint nw_ne_se_sw) {   /*  [.......0][00333.77][7.......]  [.......0][*03*3.7*][7.......]  [.......0][00333.77][7.......]  */  // До определённой версии в Metal не было 64-битного целого, поэтому составляются две маски  uint neighbours = popcount(Mask<Bit>::c_n_e_s_w & n_e_s_w)     + popcount(Mask<Bit>::c_nw_ne_se_sw & nw_ne_se_sw)     + popcount(Mask<Bit>::c_self & self);   return static_cast<uint>((self >> Bit & 1) == 0     ? neighbours == 3     : neighbours == 2 || neighbours == 3) << Bit;}// Язык Metal даже умеет в шаблонную магиюtemplate<uint Bit>uint calculateLife(uint self, uint n_e_s_w, uint nw_ne_se_sw) {   return isAlive<Bit>(self, n_e_s_w, nw_ne_se_sw)     | calculateLife<Bit - 1>(self, n_e_s_w, nw_ne_se_sw); }template<>uint calculateLife<0>(uint self, uint n_e_s_w, uint nw_ne_se_sw){  return isAlive<0>(self, n_e_s_w, nw_ne_se_sw); }// Главная функция compute-шейдера. На вход подаются два буфера, о которых речь шла выше - константный input и output, а также id - координата целевого байтаkernel void lifeStep(constant uchar* input [[buffer(0)]],                             device uchar* output [[buffer(1)]],                             uint id [[thread_position_in_grid]]) {   ushort2 gid = pos(id * 8);   // Вычисляем соседние байты  uint nw = idx(loopPos(gid.x - 8, gid.y + 1));   uint n  = idx(loopPos(gid.x,     gid.y + 1));   uint ne = idx(loopPos(gid.x + 8, gid.y + 1));   uint e  = idx(loopPos(gid.x + 8, gid.y    ));   uint se = idx(loopPos(gid.x + 8, gid.y - 1));   uint s  = idx(loopPos(gid.x    , gid.y - 1));   uint sw = idx(loopPos(gid.x - 8, gid.y - 1));   uint w  = idx(loopPos(gid.x - 8, gid.y    ));   // Вычисляем байт с целевым битом  uint self = static_cast<uint>(input[id]);   // Подготавливаем битовые маски с соседями  // north_east_south_west  uint n_e_s_w = static_cast<uint>(input[n >> 3]) << 0 * 8     | static_cast<uint>(input[e >> 3]) << 1 * 8     | static_cast<uint>(input[s >> 3]) << 2 * 8     | static_cast<uint>(input[w >> 3]) << 3 * 8;   // north-west_north-east_south-east_south-west  uint nw_ne_se_sw = static_cast<uint>(input[nw >> 3]) << 0 * 8     | static_cast<uint>(input[ne >> 3]) << 1 * 8     | static_cast<uint>(input[se >> 3]) << 2 * 8     | static_cast<uint>(input[sw >> 3]) << 3 * 8;     // В этой строчке рассчитываются все 8 клеток обрабатываемого байта  output[id] = static_cast<uchar>(calculateLife<7>(self, n_e_s_w, nw_ne_se_sw)); };


Высокий уровень


После получения сигнала с нижнего уровня о том, что расчёт окончен, на высоком уровне происходит копирование всего буфера с целью обеспечения потокобезопасного чтения данных для отрисовки и возможного сохранения состояния игры при выходе.

Отрисовка клеток выполняется средствами Qt, а именно посредством заполнения пикселей в QImage. Интерфейс выполнен в QML. Пиксели заполняются лишь для небольшой области видимого игроку игрового поля. Таким образом удаётся избежать лишних затрат ресурсов на отрисовку.

Процесс вычисления как на нижнем, так и на верхнем уровне происходит в разных потоках. На верхнем уровне обрабатывается ввод новых клеток, составляющих паттерн, который выбрал пользователь. Интересно, что это единственная логика, которая использует мьютекс. Для всех других операций синхронизации как на верхнем, так и на нижнем уровне и между ними используются атомарные переменные.

Бенчмарки


Результаты замеров скорости работы игры для разных машин в зависимости от размеров игрового поля приведены в таблицах. Полученные значения в таблицах указаны в миллисекундах. При проведении тестов игровое поле заполнялось жизнью на 50% с помощью генератора случайных чисел.

В таблицах есть замеры скорости работы нижнего и верхнего уровня, причём берутся минимальные значения. А для более общей картины приведены замеры скорости вычисления 100 шагов игры и среднего времени выполнения шага. Последнее более показательно, так как включает в себя время на отрисовку и прочие сопутствующие операции.

MacBook Pro 2014
Processor 2,6 GHz Dual-Core Intel Core i5
Memory 8 GB 1600 MHz DDR3
Graphics Intel Iris 1536 MB

GPU реализация
1024 2048 4096 8192 16384 32768
Низкий уровень (min) 0 0 2 9 43 170
Высокий уровень (min) 0 0 0 1 12 55
100 шагов 293 446 1271 2700 8603 54287
Время на шаг (avg) 3 4 13 27 86 542

CPU реализация
1024 2048 4096 8192 16384 32768
Низкий уровень (min) 3 25 117 552 2077 21388
Высокий уровень (min) 0 0 0 1 4 51
100 шагов 944 3894 15217 65149 231260 -
Время на шаг (avg) 9 39 152 651 2312 -

MacBook Pro 2017
Processor 2.8 GHz Intel Core i7
Memory 16 GB 2133 MHz LPDDR3
Graphics Intel HD Graphics 630 1536 MB

GPU реализация
1024 2048 4096 8192 16384 32768
Низкий уровень (min) 0 0 0 2 8 38
Высокий уровень (min) 0 0 0 0 3 13
100 шагов 35 67 163 450 1451 5886
Время на шаг (avg) 0 1 2 5 15 59

CPU реализация
1024 2048 4096 8192 16384 32768
Низкий уровень (min) 1 7 33 136 699 2475
Высокий уровень (min) 0 0 0 0 3 18
100 шагов 434 1283 4262 18377 79656 264711
Время на шаг (avg) 4 13 43 184 797 2647

Видно, что на стареньком макбуке процессор совсем не справляется с огромным игровым полем и приложение становится неюзабельным. Даже процессор нового макбука едва вытягивает такой объём вычислений. Зато видеокарта значительно превосходит по производительности процессор как на старом, так и на новом железе. С использованием GPU новый макбук предлагает вполне комфортный пользовательский опыт даже на огромных игровых пространствах.

Итог


На текущий момент игра находится в стадии разработки интерфейсов и в первую очередь затачивается под мобильные интерфейсы, а потому на компьютерах управление хромает.

В отличие от оригинальной статьи я не ставил целью провести бенчмарк различных оптимизаций, поэтому сравнивать две статьи один к одному нельзя, зато материал отлично раскрывает тему сравнения потенциала GPU и CPU со значительным превосходством первого.

В будущем хочется провести бенчмарки на мобильных устройствах. Проект находится на ранней стадии разработки, а потому Apple-Developer аккаунта для проведения таких тестов сейчас у меня нет.

Спасибо за внимание! Буду рад любым комментариям к статье и к проекту:
Код на GitHub.

Код Metal реализации нижнего уровня

Код CPU реализации нижнего уровня

Код верхнего уровня
Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru