Русский
Русский
English
Статистика
Реклама

Изучаем VoIP-движок Mediastreamer2. Часть 13, заключительная

Материал статьи взят с моего дзен-канала.



В прошлой статье, мы рассмотрели вопросы отладки крафтовых фильтров, связанные с перемещением данных.


Эта статья будет заключительной в цикле и её мы посвятим, как неоднократно обещалось, вопросам оценки нагрузки на тикер и способы борьбы с чррезмерной вычислительной нагрузкой в медиастримере.


Что такое нагрузка на тикер


Нагрузка на тикер вычисляется как отношение времени проведенного в обработке всех фильтров графа, подключенного к этому тикеру к интервалу между тиками. По умолчанию интервал составляет 10мс, но если вы изменили сэмплингрейт, то расчет выполняется для установленного вами значения. Вычисление делается по данным, которые накапливаются тикером во время работы. Медиастример предоставляет функцию, которая возвращает усредненную по нескольким результатам измерений величину, выраженную в процентах:


MS2_PUBLIC float ms_ticker_get_average_load(MSTicker *ticker);

Если возвращаемая величина близка к 100% это означает, что данный тикер еле поспевает сделать свою работу до начала очередного тика. Если у нас приложение, которому не требуется работа в реальном времени (например оно просто пишет звук в файл), то для нас не особо важно на какое время был отложен очередной вызов тикера. Но в реалтайм-приложении задержка обработки может повлиять на момент отправки RTP-пакета, что в свою очередь может сказаться на качестве звука или видео. В некоторых случаях влияние задержки отдельных пакетов можно купировать используя буфер пакетов на приемном конце (так называемый джиттер-буфер). При этом ваш звук будет воспроизводиться без дефектов, но с задержкой, пропорциональной длине буфера. Что может быть не приемлемо в случаях когда звуковой сигнал используется для управления процессами в реальном времени.


На самом деле стоит начинать принимать меры уже тогда, когда нагрузка на тикер перешла границу 80%. При такой нагрузке на отдельных тактах тикер начинает запускать обработку с отставанием. Отставание тикера, это время, на которое был отложен очередной запуск тикера.
Если отставание тикера превысило некоторую величину то генерируется событие:


struct _MSTickerLateEvent{int lateMs; /**<Запаздывание которое было в последний раз, в миллисекундах */uint64_t time; /**< Время возникновения события, в миллисекундах */int current_late_ms; /**< Запаздывание на текущем тике, в миллисекундах */};typedef struct _MSTickerLateEvent MSTickerLateEvent;

По которому в консоль выводится сообщение, которое выглядит примерно так:


ortp-warning-MSTicker: We are late of 164 miliseconds


С помощью функции


void ms_ticker_get_last_late_tick(MSTicker *ticker, MSTickerLateEvent *ev);

можно узнать подробности о последнем таком событии.


Способы снижения загрузки тикера


Здесь у нас есть два варианта действий. Первый это изменить приоритет тикера, второй перенести часть работы тикера в другой тред. Рассмотрим эти варианты.


Изменение приоритета тикера


Приоритет тикера имеет три градации, которые определены в перечислении MSTickerPrio:


enum _MSTickerPrio{MS_TICKER_PRIO_NORMAL, /* Приоритет соответствующий значению по умолчанию для данной ОС. */MS_TICKER_PRIO_HIGH, /* Увеличенный приоритет устанавливается подlinux/MacOS с помощью setpriority() или sched_setschedparams() устанавливается политика SCHED_RR. */MS_TICKER_PRIO_REALTIME /* Наибольший приоритет, для него под Linux используется политика SCHED_FIFO. */};typedef enum _MSTickerPrio MSTickerPrio;

Чтобы поэкспериментировать с нагрузкой тикера, нам требуется схема, которая во время работы будет наращивать нагрузку и завершать работу когда нагрузка достигнет уровня 99%. В качестве нагрузки будем использовать схему:
ticker -> voidsource -> dtmfgen -> voidsink
Нагрузка будет увеличиваться добавлением между dtmfgen и voidsink нового элемента управления уровнем сигнала (тип фильтра MS_VOLUME), с коэффициентом передачи неравным единице, чтобы фильтр не филонил.
Она показана на рисунке

Исходный код приведен ниже, он снабжен комментариями, так что разобраться в нем не составит труда:


Файл mstest13.c Переменная вычислительная нагрузка.
/* Файл mstest13.c Переменная вычислительная нагрузка. */#include <stdio.h>#include <signal.h>#include <mediastreamer2/msfilter.h>#include <mediastreamer2/msticker.h>#include <mediastreamer2/dtmfgen.h>#include <mediastreamer2/mssndcard.h>#include <mediastreamer2/msvolume.h>/*----------------------------------------------------------*/struct _app_vars{    int  step;              /* Количество фильтров добавляемых за раз. */    int  limit;             /* Количество фильтров на котором закончить работу. */    int  ticker_priority;   /* Приоритет тикера. */    char* file_name;        /* Имя выходного файла. */    FILE *file;};typedef struct _app_vars app_vars;/*----------------------------------------------------------*//* Функция преобразования аргументов командной строки в* настройки программы. */void  scan_args(int argc, char *argv[], app_vars *v){    char i;    for (i=0; i<argc; i++)    {        if (!strcmp(argv[i], "--help"))        {            char *p=argv[0]; p=p + 2;            printf("  %s computational load\n\n", p);            printf("--help      List of options.\n");            printf("--version   Version of application.\n");            printf("--step      Filters count per step.\n");            printf("--tprio     Ticker priority:\n"                    "            MS_TICKER_PRIO_NORMAL   0\n"                    "            MS_TICKER_PRIO_HIGH     1\n"                    "            MS_TICKER_PRIO_REALTIME 2\n");            printf("--limit     Filters count limit.\n");            printf("-o          Output file name.\n");            exit(0);        }        if (!strcmp(argv[i], "--version"))        {            printf("0.1\n");            exit(0);        }        if (!strcmp(argv[i], "--step"))        {            v->step = atoi(argv[i+1]);            printf("step: %i\n", v->step);        }        if (!strcmp(argv[i], "--tprio"))        {            int prio = atoi(argv[i+1]);            if ((prio >=MS_TICKER_PRIO_NORMAL) && (prio <= MS_TICKER_PRIO_REALTIME))            {                v->ticker_priority = atoi(argv[i+1]);                printf("ticker priority: %i\n", v->ticker_priority);            }            else            {                printf(" Bad ticker priority: %i\n", prio);                exit(1);            }        }        if (!strcmp(argv[i], "--limit"))        {            v->limit = atoi(argv[i+1]);            printf("limit: %i\n", v->limit);        }        if (!strcmp(argv[i], "-o"))        {            v->file_name=argv[i+1];            printf("file namet: %s\n", v->file_name);        }    }}/*----------------------------------------------------------*//* Структура для хранения настроек программы. */app_vars vars;/*----------------------------------------------------------*/void saveMyData(){    // Закрываем файл.    if (vars.file) fclose(vars.file);    exit(0);}void signalHandler( int signalNumber ){    static pthread_once_t semaphore = PTHREAD_ONCE_INIT;    printf("\nsignal %i received.\n", signalNumber);    pthread_once( & semaphore, saveMyData );}/*----------------------------------------------------------*/int main(int argc, char *argv[]){    /* Устанавливаем настройки по умолчанию. */    app_vars vars={100, 100500, MS_TICKER_PRIO_NORMAL, 0};    // Подключаем обработчик Ctrl-C.    signal( SIGTERM, signalHandler );    signal( SIGINT,  signalHandler );    /* Устанавливаем настройки настройки программы в     * соответствии с аргументами командной строки. */    scan_args(argc, argv, &vars);    if (vars.file_name)    {        vars.file = fopen(vars.file_name, "w");    }    ms_init();    /* Создаем экземпляры фильтров. */    MSFilter  *voidsource=ms_filter_new(MS_VOID_SOURCE_ID);    MSFilter  *dtmfgen=ms_filter_new(MS_DTMF_GEN_ID);    MSSndCard *card_playback=ms_snd_card_manager_get_default_card(ms_snd_card_manager_get());    MSFilter  *snd_card_write=ms_snd_card_create_writer(card_playback);    MSFilter  *voidsink=ms_filter_new(MS_VOID_SINK_ID);    MSDtmfGenCustomTone dtmf_cfg;    /* Устанавливаем имя нашего сигнала, помня о том, что в массиве мы должны     * оставить место для нуля, который обозначает конец строки. */    strncpy(dtmf_cfg.tone_name, "busy", sizeof(dtmf_cfg.tone_name));    dtmf_cfg.duration=1000;    dtmf_cfg.frequencies[0]=440; /* Будем генерировать один тон, частоту второго тона установим в 0.*/    dtmf_cfg.frequencies[1]=0;    dtmf_cfg.amplitude=1.0; /* Такой амплитуде синуса должен соответствовать результат измерения 0.707.*/    dtmf_cfg.interval=0.;    dtmf_cfg.repeat_count=0.;    /* Задаем переменные для хранения результата */    float load=0.;    float latency=0.;    int filter_count=0;    /* Создаем тикер. */    MSTicker *ticker=ms_ticker_new();    ms_ticker_set_priority(ticker, vars.ticker_priority);    /* Соединяем фильтры в цепочку. */    ms_filter_link(voidsource, 0, dtmfgen, 0);    ms_filter_link(dtmfgen, 0, voidsink, 0);    MSFilter* previous_filter=dtmfgen;    int gain=1;    int i;    printf("# filters load\n");    if (vars.file)    {        fprintf(vars.file, "# filters load\n");    }    while ((load <= 99.) && (filter_count < vars.limit))    {        // Временно отключаем  "поглотитель" пакетов от схемы.        ms_filter_unlink(previous_filter, 0, voidsink, 0);        MSFilter  *volume;        for (i=0; i<vars.step; i++)        {            volume=ms_filter_new(MS_VOLUME_ID);            ms_filter_call_method(volume, MS_VOLUME_SET_DB_GAIN, &gain);            ms_filter_link(previous_filter, 0, volume, 0);            previous_filter = volume;        }        // Возвращаем "поглотитель" пакетов в схему.        ms_filter_link(volume, 0, voidsink, 0);        /* Подключаем источник тактов. */        ms_ticker_attach(ticker,voidsource);        /* Включаем звуковой генератор. */        ms_filter_call_method(dtmfgen, MS_DTMF_GEN_PLAY_CUSTOM, (void*)&dtmf_cfg);        /* Даем, время 100 миллисекунд, чтобы были накоплены данные для усреднения. */        ms_usleep(500000);        /* Читаем результат измерения. */        load=ms_ticker_get_average_load(ticker);        filter_count=filter_count + vars.step;        /* Отключаем источник тактов. */        ms_ticker_detach(ticker,voidsource);        printf("%i  %f\n", filter_count, load);        if (vars.file) fprintf(vars.file,"%i  %f\n", filter_count, load);    }    if (vars.file) fclose(vars.file);}

Сохраняем под именем mstest13.c и компилируем командой:


$ gcc mstest13.c -o mstest13 `pkg-config mediastreamer --libs --cflags`

Далее запускаем наш инструмент, чтобы оценить нагрузку тикера работающего с наименьшим приоритетом:


$ ./mstest13 --step 100  --limit 40000 -tprio 0 -o log0.txt

$ ./mstest13 --step 100  --limit 40000 -tprio 1 -o log1.txt

$ ./mstest13 --step 100  --limit 40000 -tprio 2 -o log2.txt

Далее "скармливаем" получившиеся файлы log0.txt, log1.txt, log2.txt великолепной утилите gnuplot:


$ gnuplot -e  "set terminal png; set output 'load.png'; plot 'log0.txt' using 1:2 with lines , 'log1.txt' using 1:2 with lines, 'log2.txt' using 1:2 with lines"

В результате работы программы будет создан файл load.png, в котором будет отрисован график имеющий следующий вид:

По вертикали отложена нагрузка тикера в процентах, по горизонтали количество добавленных фильтров нагрузки.
На этом графике мы видим, что как и ожидалось, для приоритета 2 (голубая линия), первый заметный выброс наблюдается при подключенных 6000 фильтрах, когда как для приоритетов 0 (фиолетовая) и 1(зеленая) выбросы появляются раньше, при 1000 и 3000 фильтров соответственно.


Для получения усредненных результатов нужно выполнить гараздо больше прогонов программы, но уже на этом графике мы видим, что загрузка тикера нарастает линейно, пропорционально количеству фильтров.


Перенос работы в другой тред


Если ваша задача может быть разделена на два и более треда, то тут все просто создаёте один или больше новых тикеров и подключаете на каждый свой граф фильтров. Если задача не может быть распараллелена, то можно её разделить "поперек", разбив цепочку фильтров на несколько сегментов, каждый из которых будет работать в отдельном треде (т.е. со своим тикером). Далее нужно будет выполнить "сшивку" потоков данных, чтобы выходные данные первого сегмента попадали на вход следующего. Такой перенос данных между тредами выполняется с помощью двух специальных фильтров MS_ITC_SINK и MS_ITC_SOURCE, они имеют общее название "интертикеры".


Интертикеры


Фильтр MS_ITC_SINK обеспечивает вывод данных из треда он имеет только один вход, выходов у него нет. Фильтр MS_ITC_SOURCE обеспечивает асинхронный ввод данных в тред, он обладает одним выходом, входов не имеет. В терминах медиастримера, эта пара фильтров дает возможность передавать данные между фильтрами работающими от разных тикеров.


Чтобы началась передача данных, эти фильтры нужно соединить, но не так как мы это делали с обычными фильтрами, т.е. функцией ms_filter_link(). В данном случае, используется метод MS_ITC_SINK_CONNECT фильтра MS_ITC_SINK:


ms_filter_call_method (itc_sink, MS_ITC_SINK_CONNECT, itc_src)

Метод связывает два фильтра с помощью асинхронной очереди. Метода для разъединения интертикеров нет.


Пример использования интертикеров


Ниже показан пример схемы использования. Тройной стрелкой показана асинхронная очередь между двумя тредами.


Теперь модифицируем нашу тестовую программу в соответствии с этой схемой так, чтобы в ней как на рисунке работали два треда связанные интертикерами. В каждом треде будет точка вставки куда будут добавляться новые экземпляров фильтров нагрузки, как показано на рисунке ниже. Добавляемое количество фильтров будет разделено попалам между тредами.

Соответствующий код программы показан ниже.


Файл mstest14.c Переменная вычислительная нагрузка c интертикерами
/* Файл mstest14.c Переменная вычислительная нагрузка c интертикерами. */#include <stdio.h>#include <signal.h>#include <mediastreamer2/msfilter.h>#include <mediastreamer2/msticker.h>#include <mediastreamer2/dtmfgen.h>#include <mediastreamer2/mssndcard.h>#include <mediastreamer2/msvolume.h>#include <mediastreamer2/msitc.h>/*----------------------------------------------------------*/struct _app_vars{    int  step;              /* Количество фильтров добавляемых за раз. */    int  limit;             /* Количество фильтров на котором закончить работу. */    int  ticker_priority;   /* Приоритет тикера. */    char* file_name;        /* Имя выходного файла. */    FILE *file;};typedef struct _app_vars app_vars;/*----------------------------------------------------------*//* Функция преобразования аргументов командной строки в  * настройки программы. */void  scan_args(int argc, char *argv[], app_vars *v){    char i;    for (i=0; i<argc; i++)    {        if (!strcmp(argv[i], "--help"))        {            char *p=argv[0]; p=p + 2;            printf("  %s computational load diveded for two threads.\n\n", p);            printf("--help      List of options.\n");            printf("--version   Version of application.\n");            printf("--step      Filters count per step.\n");            printf("--tprio     Ticker priority:\n"                    "            MS_TICKER_PRIO_NORMAL   0\n"                     "            MS_TICKER_PRIO_HIGH     1\n"                    "            MS_TICKER_PRIO_REALTIME 2\n");            printf("--limit     Filters count limit.\n");            printf("-o          Output file name.\n");            exit(0);        }        if (!strcmp(argv[i], "--version"))        {            printf("0.1\n");            exit(0);        }        if (!strcmp(argv[i], "--step"))        {            v->step = atoi(argv[i+1]);            printf("step: %i\n", v->step);        }        if (!strcmp(argv[i], "--tprio"))        {            int prio = atoi(argv[i+1]);            if ((prio >=MS_TICKER_PRIO_NORMAL) && (prio <= MS_TICKER_PRIO_REALTIME))            {                 v->ticker_priority = atoi(argv[i+1]);                printf("ticker priority: %i\n", v->ticker_priority);            }            else            {                printf(" Bad ticker priority: %i\n", prio);                exit(1);            }        }        if (!strcmp(argv[i], "--limit"))        {            v->limit = atoi(argv[i+1]);            printf("limit: %i\n", v->limit);        }        if (!strcmp(argv[i], "-o"))        {            v->file_name=argv[i+1];            printf("file namet: %s\n", v->file_name);        }    }}/*----------------------------------------------------------*//* Структура для хранения настроек программы. */app_vars vars;/*----------------------------------------------------------*/void saveMyData(){    // Закрываем файл.    if (vars.file) fclose(vars.file);    exit(0);}void signalHandler( int signalNumber ){    static pthread_once_t semaphore = PTHREAD_ONCE_INIT;    printf("\nsignal %i received.\n", signalNumber);    pthread_once( & semaphore, saveMyData );}/*----------------------------------------------------------*/int main(int argc, char *argv[]){    /* Устанавливаем настройки по умолчанию. */    app_vars vars={100, 100500, MS_TICKER_PRIO_NORMAL, 0};    // Подключаем обработчик Ctrl-C.    signal( SIGTERM, signalHandler );    signal( SIGINT,  signalHandler );    /* Устанавливаем настройки настройки программы в      * соответствии с аргументами командной строки. */    scan_args(argc, argv, &vars);    if (vars.file_name)    {        vars.file = fopen(vars.file_name, "w");    }    ms_init();    /* Создаем экземпляры фильтров для первого треда. */    MSFilter  *voidsource = ms_filter_new(MS_VOID_SOURCE_ID);    MSFilter  *dtmfgen    = ms_filter_new(MS_DTMF_GEN_ID);    MSFilter  *itc_sink   = ms_filter_new(MS_ITC_SINK_ID);    MSDtmfGenCustomTone dtmf_cfg;    /* Устанавливаем имя нашего сигнала, помня о том, что в массиве мы должны     * оставить место для нуля, который обозначает конец строки. */    strncpy(dtmf_cfg.tone_name, "busy", sizeof(dtmf_cfg.tone_name));    dtmf_cfg.duration=1000;    dtmf_cfg.frequencies[0]=440; /* Будем генерировать один тон, частоту второго тона установим в 0.*/    dtmf_cfg.frequencies[1]=0;    dtmf_cfg.amplitude=1.0; /* Такой амплитуде синуса должен соответствовать результат измерения 0.707.*/    dtmf_cfg.interval=0.;    dtmf_cfg.repeat_count=0.;    /* Задаем переменные для хранения результата */    float load=0.;    float latency=0.;    int filter_count=0;    /* Создаем тикер. */    MSTicker *ticker1=ms_ticker_new();    ms_ticker_set_priority(ticker1, vars.ticker_priority);    /* Соединяем фильтры в цепочку. */    ms_filter_link(voidsource, 0, dtmfgen, 0);    ms_filter_link(dtmfgen, 0, itc_sink , 0);    /* Создаем экземпляры фильтров для второго треда. */    MSTicker *ticker2=ms_ticker_new();    ms_ticker_set_priority(ticker2, vars.ticker_priority);    MSFilter *itc_src   = ms_filter_new(MS_ITC_SOURCE_ID);    MSFilter *voidsink2 = ms_filter_new(MS_VOID_SINK_ID);    ms_filter_call_method (itc_sink, MS_ITC_SINK_CONNECT, itc_src);    ms_filter_link(itc_src, 0, voidsink2, 0);    MSFilter* previous_filter1=dtmfgen;    MSFilter* previous_filter2=itc_src;    int gain=1;    int i;    printf("# filters load\n");    if (vars.file)    {        fprintf(vars.file, "# filters load\n");    }    while ((load <= 99.) && (filter_count < vars.limit))    {        // Временно отключаем  "поглотители" пакетов от схем.        ms_filter_unlink(previous_filter1, 0, itc_sink, 0);        ms_filter_unlink(previous_filter2, 0, voidsink2, 0);        MSFilter  *volume1, *volume2;        // Делим новые фильтры нагрузки между двумя тредами.        int new_filters = vars.step>>1;        for (i=0; i < new_filters; i++)        {            volume1=ms_filter_new(MS_VOLUME_ID);            ms_filter_call_method(volume1, MS_VOLUME_SET_DB_GAIN, &gain);            ms_filter_link(previous_filter1, 0, volume1, 0);            previous_filter1 = volume1;        }        new_filters = vars.step - new_filters;        for (i=0; i < new_filters; i++)        {            volume2=ms_filter_new(MS_VOLUME_ID);            ms_filter_call_method(volume2, MS_VOLUME_SET_DB_GAIN, &gain);            ms_filter_link(previous_filter2, 0, volume2, 0);            previous_filter2 = volume2;        }        // Возвращаем "поглотители" пакетов в схемы.        ms_filter_link(volume1, 0, itc_sink, 0);        ms_filter_link(volume2, 0, voidsink2, 0);        /* Подключаем источник тактов. */        ms_ticker_attach(ticker2, itc_src);        ms_ticker_attach(ticker1, voidsource);        /* Включаем звуковой генератор. */        ms_filter_call_method(dtmfgen, MS_DTMF_GEN_PLAY_CUSTOM, (void*)&dtmf_cfg);        /* Даем, время, чтобы были накоплены данные для усреднения. */        ms_usleep(500000);        /* Читаем результат измерения. */        load=ms_ticker_get_average_load(ticker1);        filter_count=filter_count + vars.step;        /* Отключаем источник тактов. */        ms_ticker_detach(ticker1, voidsource);        printf("%i  %f\n", filter_count, load);        if (vars.file) fprintf(vars.file,"%i  %f\n", filter_count, load);    }    if (vars.file) fclose(vars.file);}

Далее компилируем и запускаем нашу программу с тикерами, работающими с наименьшим приоритетом:


$ ./mstest14 --step 100  --limit 40000 -tprio 0 -o log4.txt

Результат измерений получится следующий:



Для удобства на график добавлены кривые, полученные для первого варианта программы. Оранжевая кривая показывает результат для "двухтредовой" версии программы. Из графика видно, что скорость нарастания загрузки тикера для "двухтредовой" схемы ниже. Нагрузка на второй тикер не показана.


При необходимости можно соединить треды работающие на разных хостах, только при этом вместо интертикеров использовать RTP-сессию (как это мы делали ранее создавая переговорное устройство), здесь также потребуется учесть, что размер RTP-пакетов ограничен сверху величиной MTU.


По результатам этой статьи можно сделать вывод, что изменение приоритета тикера не влияет на его производительноть, повышение приоритета только снижает вероятность запаздывания тикера. Рецептом к повышению производительности схемы в целом является разделение графа обработки на несколько кластеров со своими индивидуальными тикерами. При этом платой за снижении нагрузки на тикер является увеличение времени прохождения данных со входа на выход схемы.


Заключение


Эта статья завершает цикл о Медиастримере2. Вне всякого сомнения, материал статей освещает лишь часть возможностей и особенностей работы этого движка. Цикл нужно рассматривать как первый шаг к освоению этой технологии.

Источник: habr.com
К списку статей
Опубликовано: 01.07.2020 16:06:10
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

*nix

C

Open source

Звук

Разработка под linux

Медиастример

Медиастример2

Mediastreamer

Mediastreamer2

Voip

Программирование

Обработка звука

Категории

Последние комментарии

© 2006-2020, personeltest.ru