Asterisk фи, это же моветон
Здравствуйте уважаемые читатели этого замечательного ресурса. По уже сложившейся традиции являюсь давним читателем habr'а, но только сейчас решил сделать пост. Что, собственно, побудило к написанию? Честно сказать, и сам не знаю. То ли притянутые статьи о производительности FreeSWITCH/Yate/3CX/etc в сравнении с Asterisk, то ли действительные, реальные проблемы архитектуры последнего, а, возможно, желание сделать что-нибудь уникальное.
И что удивительно, в первом случае, как правило, сравнивают мягкое и теплое, так сказать, FreeSWITCH/Yate/etc и FreePBX. Да-да, именно FreePBX. Это не опечатка. Причем интересно, что во всех сравнениях зачастую один Asterisk в дефолтной конфигурации. Ну, вы знаете, эта конфигурация загруженные все имеющиеся модули, кривой диалплан (FreePBX как бы способствует) и куча остальной необъективщины. Что до родовых болячек Asterisk'а да, объективно их вагон и маленькая тележка.
Что со всем этим делать? Разрушать стереотипы и исправлять родовые травмы. Этим и займемся.
Скрещиваем ежа с ужом
Многие из новичков испытывают дискомфорт, глядя на синтаксис описания диалплана в Asterisk'е, а некоторые на полном серьезе обосновывают выбор другого сервера телефонии именно необходимостью писать диалплан в том виде, в котором он есть по дефолту. Типа, перелопачивать многострочный XML это верх комфорта. Да, есть возможность юзать LUA/AEL, и это хорошо. Но лично я отнес бы эту возможность в минусы и в частности то, что касается pbx_lua.
Как уже было сказано, иметь возможность описывать диалплан полноценным языком программирования это хорошо. Проблема в том, что время жизни скрипта и его окружения равно времени жизни канала. Для каждого нового канала запускается свой экземпляр скрипта, следовательно, прощай, разделяемые между каналами переменные, единичная загрузка сторонних модулей, один разделяемый коннект к базе и т.д., и т.п. Строго говоря, от встраиваемого языка описания сценария этого и не нужно, но уж сильно хочется. А если хочется значит, надо.
Итак, от классического Asterisk'а возьмем принципы pbx_lua, от Yate возьмем модель маршрутизации, а от FreeSWITCH ничего брать не будем, ибо "overhead" не нужен. ОК, с тем, что нам нужно родить, определились. Что же будем использовать для генетических экспериментов:
- Asterisk, причем хотелось бы без привязок к версии. Тот же ARI был анонсирован, если мне не изменяет память, с 12-й версии. Если учесть, что до сих пор где то юзаются 1.8/1.6, а возможно и 1.4, то зависимость от версионных плюшек нам не нужна.
- Lua замечательный, гибкий и крайне функциональный скриптовый язык. Сам бог велел, так сказать, без комментариев.
- Lunapark интересный проект на github'е, своего рода сервер voip-приложений.
Про Lunapark стоит рассказать подробнее. Это сервер, реализующий потенциал AMI-протокола в связке с классическим FastAGI, что немаловажно в едином пространстве выполнения. То есть, получаем аналог ARI посредством тесной кооперации AGI и AMI в одном флаконе.
Предвижу логичный вопрос: для чего это все? Есть же Asterisk REST Interface, чей функционал ты тут пытаешься переизобрести! Ответ на этот вопрос неоднозначен. Согласен, ARI декларирует ряд преимуществ: да, он асинхронен, да, позволяет работать с "сырыми" примитивами, WebSockets и да, стильный, модный, молодежный XML/JSON куда ж без него. Но, черт возьми, часть этих так называемых преимуществ крайне сомнительна и добавляет один, а то и более уровней абстракции. Другая же часть вообще не преимущества. Преимущества это когда что-то свойственно только тебе, ниже мы это увидим на примере той же асинхронности.
Как это работает? Стандартными средствами заворачиваем канал в FastAGI-приложение, внутри которого получаем возможность управлять звонком, как будто юзаем pbx_lua с незначительным изменением синтаксиса. Вишенкой на торте является возможность управлять состоянием самого Asteriskа и окружением канала, для этого в распоряжении текущего FastAGI-приложения есть глобальный AMI-объект. Кстати, можно не заворачивать канал в FastAGI-приложение, а создать глобальный обработчик события, допустим, для NewChannel. А это уже преимущество по сравнению с ARI, там как известно, вне stasis'а ARI слеп.
Реализован Lunapark в лучших традициях кооперативной многозадачности, а именно всеми любимая асинхронность на сопрограммах. И как следствие отсутствие проблем с "shared data". То есть плюсы присутствуют, но и проблемы появляются. Одна из них это необходимость описывать логику с оглядкой на асинхронность, но я думаю, это мы как-нибудь переживем.
Что дальше?
Синтаксис описания контекстов а что же с ним не так? Да все с ним нормально, более того, практику написания диалплана нужно прописывать как профилактику для формирования структурированного мышления у новичков. Но, вместе с тем нужно понижать порог вхождения. Поэтому будем упрощать и в то же время добавлять функционала.
Простой пример:
[test]exten => _XXX/102,1,Hangup()exten => _XXX,1,Dial(SIP/${EXTEN})
В этом примере идет дозвон до трехзнака, кроме абонента 102. Вроде бы все логично и лаконично за исключением того, что шаблоны соответствия экстеншена ограничены небольшим набором правил, а так называемая extended маршрутизация возможна только по CallerID звонящего. А хотелось бы, к примеру, по CallerIDName или по текущему состоянию звонящего канала, а возможно по имени самого канала, а если реализовать полноценный regexp, так вообще красота. И да, я знаю, все эти хотелки можно реализовать, расписав контекст в таком виде:
[test]exten => _XXX/102,1,Hangup(); по CallerIDNameexten => _XXX,1,ExecIf($[ "${CALLERID(name)}" == "Vasya" ]?Hangup()); По состоянию каналаexten => _XXX,n,ExecIf($[ "${CHANNEL(state)}" != "Ring" ]?Hangup()); По имени каналаexten => _XXX,n,ExecIf($[ "${CUT(CUT(CHANNEL,-,1),/,2)}" == "333" ]?Hangup())exten => _XXX,n,Dial(SIP/${EXTEN})
Но мой внутренний перфекционист начинает бунтовать при виде такого, а если представить аналогичную выборку по всем пользователям, да еще и действия нужны разные и посложнее Hangup'а, то extensions.conf превращается в длииинную портянку вызовов Goto, GoSub, Macro и, не дай бог, с каналами типа Local.
Выход один прикручивать свои правила маршрутизации с подкидным и дамами с низкой социальной ответственностью.
В качестве примера:
${Exten}:match('%d%d%d') and ( ${CallerIDNum}:match('201') or ${CallerIDName}:match('Vasya') or ${State}:lower() ~= 'ring' or ${Channel}:match('^[^/]+/([^%-]+)') == '333') => Hangup();${Exten}:match('%d%d%d') => Dial {callee = ('SIP/%s'):format(${Exten})};
Хм, вырвиглазненько получилось, но на удивление читается и понимается с первого взгляда. А самое главное, что у нас появился аналог regexp'ов и группировка правил на действие, что, несомненно, упростит составление маршрутов в будущем.
Что тут думать, прыгать надо.
В итоге имеем Lunapark
как замену pbx_lua. Его средствами нам и нужно создать логику
обработки нашей модели маршрутизации. Для начала нужно распарсить
набор правил и заменить все вхождения ${...}
на
соответствующие им значения, то есть привести к виду
('...')
. Значения будут браться из окружения текущего
канала.
Затем приводим каждое правило к виду условного оператора, чтобы
получить нечто похожее:
-- Exten = 123-- Sate = Ring-- CallerIDNum = 100-- CallerIDName = Test-- Channel = SIP/100-00000012cif ('123'):match('%d%d%d') and( ('100'):match('201') or ('Test'):match('Vasya') or ('Ring'):lower() ~= 'ring' or ('SIP/100-00000012c'):match('^[^/]+/([^%-]+)') == '333') then Hangup()endif ('123'):match('%d%d%d') then Dial {callee = ('SIP/%s'):format(('123'))}end
Делать это будут две функции fmt и syntax соответственно:
local fmt = function(str, tab) return (str:gsub('(%${[^}{]+})', function(w) local mark = w:sub(3, -2) return (mark:gsub('(.+)',function(v) local out = tab[v] or v return ("('%s')"):format(out) end)) end))endlocal syntax = function(str) return (str:gsub('([^;]+)=>([^;]+)',function(p,r) return ([[ if %s then %s end ]]):format(p,r) end))end
В принципе ничего сложного, все просто и понятно. Идем дальше считывать наши правила будем из файла в переменную при старте сервиса, а парсить их уже при звонке с актуальным окружением. Считыванием правил займется функция routes.
local routes = function(...) local conf, content = ... local f, err = io.open(conf, "r") if io.type(f) ~= 'file' then log.warn(err) -- Глобальный LOG объект доступный благодаря Lunapark'у return "" else content = f:read('*all') end f:close() return contentend
Осталось сделать две вещи: завернуть звонки в Lunapark и соответственно их обработать с учетом наших маршрутов. Тут стоит немного пояснить такой момент в Lunapark вся логика описывается в handler'е. Это текстовый файл, в котором мы будем определять наши FastAGI-приложения и работать с AMI и нашими маршрутами.
Как уже было сказано, объект AMI глобальный и, помимо роли AMI-клиента, может устанавливать своего рода слушатели для конкретных AMI событий. Этим мы и воспользуемся, но для начала сделаем некоторые приготовления в extensions.conf.
[default]exten => _[hit],1,NoOp()exten => _.,n,Wait(5)exten => _.,1,AGI(agi://127.0.0.1/${EXTEN}${IF($[ "X${PRMS}" != "X" ]?"?${PRMS}")})
Wait(5) в примере выше позволит нам не обрывать канал при завершении FastAGI-приложения, так как в маршрутах может быть описано несколько приложений, а выполнение их осуществляется по средствам Redirect на контекст default по ${EXTEN}.
Таким образом, беря во внимание все выше описанное и помня о кооперативной природе Lunapark'а, попробуем закодить логику обработки маршрутов через FastAGI-приложения.
-- Считываем наши правила в переменную ruleslocal rules = routes('routes.conf')-- Очищаем все обработчики, таким образом очистятся только не именные обработчики-- Это даст возможность не затирать цепочку выполнения при сигналах HUP/QUITami.removeEvents('*')-- Обработчик события создания нового каналаami.addEvents { ['newchannel'] = function(e) -- Условия, только каналы с набором и каналы с контекстом users if (e['Context'] and e['Context']:match('users')) and e['Exten'] then -- Переменная, указывающая на выполнении, какого FastAGI приложения мы находимся local step -- Будущий порядковый номер FatsAGI приложения в цепочке выполнения local count = 0 -- Парсим маршруты для текущего окружения канала local code, err = loadstring(syntax(fmt(rules,e))) -- В описании маршрутов нет ошибок, двигаемся дальше if type(code) == 'function' then -- Проксируем будущие FastAGI приложения setfenv(code,setmetatable({indexes = {}},{__index = function(t,k) -- Вот они последствия кооперативности return coroutine.wrap( function(...) local prms = {} -- Будущие параметры FastAGI приложения local owner = t -- Копия окружения local event = e -- Копия таблицы event local thread = coroutine.running() -- ID текущей сопрограммы -- Парсим параметры и приводим к виду URI for p,v in pairs({...}) do if type(v) == 'table' then for key, val in pairs(v) do table.insert(prms,("%s=%s"):format(key,val)) end else table.insert(prms,("%s=%s"):format(p,v)) end end -- Если это не первое FastAGI приложение в цепочке if step then -- Запоминаем предыдущее перед этим local last = ("%s"):format(step) -- Добавляем ИМЕННЕ обработчики события UserEvent по доп. условиям -- И записываем в таблицу indexes(в окружении) их порядковые номера -- Именные обработчики требуют последующего удаления самостоятельно table.insert(owner['indexes'],ami.addEvent('UserEvent',function(evt) -- Ловим событие AGIStatus указывающее на завершение приложения -- Если это предыдущее перед нами, пробуждаем сопрограмму if (evt['Channel'] and evt['Channel'] == event['Channel']) and (evt['UserEvent'] and evt['UserEvent']:match('AGIStatus')) and (evt['Script'] and evt['Script'] == last) then -- Соответствие порядкового номера нашей сопрограмме -- В цепочке может быть вызов одного приложения несколько раз -- Это позволит выполнять сопрограммы в порядке их определения if owner['indexes'][count] == thread then if coroutine.status(thread) ~= 'dead' then coroutine.resume(thread) end end end end,thread)) -- Устанавливаем маркер текущего FastAGI приложения step = k -- Приостанавливаем сопрограмму coroutine.yield() else -- Здесь обрабатывается первое FastAGI приложение в цепочке local index -- Индекс для обработчика Hangup события -- Устанавливаем маркер текущего FastAGI приложения step = k -- Добавляем ИМЕННОЙ обработчик события Hangup для канала -- В этом месте подчищаем за собой index = ami.addEvent('Hangup',function(evt) if evt['Channel'] and evt['Channel'] == event['Channel'] then -- Удаляем обработчик событие Hangup по ранее запомненному индексу ami.removeEvent('Hangup',index) -- Удаляем все обработчики цепочек выполнения по индексу for _,v in pairs(owner['indexes']) do ami.removeEvent('UserEvent',v) end -- Делаем приятно сборщику мусора owner = nil end end,thread) end -- По средствам AMI выставляем переменную для канала и вызова в цепочке ami.setvar{ Value = table.concat(prms,'&'), Channel = event['Channel'], Variable = 'PRMS' } -- Перенаправляем канал на AGI-приложение через контекст default ami.redirect{ Exten = k, Priority = 1, Channel = event['Channel'], Context = 'default' } -- Выставляем индекс приложения count = count + 1 end) end}))() else -- Если что-то пошло не так log.warn(err) end end end}
Для тех, кто не смотрел или не понял код, поясню простым языком, что же мы там накодили. Для каждого действия в маршруте создается сопрограмма с копированием всего, чего нужно через замыкания и в приостановленном состоянии. Кроме сопрограммы первого действия, она выполняется как есть. Приостанавливаем мы их для эмуляции последовательного выполнения, то есть строго одна за другой, вторая после первой, третья после второй и т.д.
Стоп, так они и так друг за другом выполняются. На самом деле нет если не эмулировать синхронность, то асинхронный redirect пробежится по каждому действию и займет это доли секунды. В нашем же коде мы выполняем каждое действие, по наступлению определенного события, а именно завершении предыдущего FastAGI-приложения. Lunapark заботливо генерирует специальный UserEvent по окончании выполнения каждого FastAGI-приложения с соответствующими параметрами вот на это событие и ориентируемся. Сами же сопрограммы просто редиректят текущий канал в контекст default с экстеншном, равным текущему действию, предварительно установив переменную канала PRMS.
Самое интересное, что звонок после redirect'а придет опять в handler, но уже в контексте выполнения AGI и на соответствующее приложение. В нашем случае это Hangup() и Dial(). Давайте же напишем их для полноты повествования.
function Hangup(...) local app, channel = ... -- В этом отличие от pbx_lua app.verbose(('The Channel %s does not match by routing rules'):format(channel.get('CHANNEL'))) app.hangup()endfunction Dial(...) local app, channel = ... local leg = app.agi.params['callee'] or '' app.verbose(('Trying to make a call from %s to %s'):format( channel.get('CALLERID(num)'), leg:match('^[^/]+/([^%-]+)')) ) app.dial(leg)end
Ну, вот и все допрыгались
Итак, давайте подытожим. Что же мы получили в результате этих генетических экспериментов?
- гибкий, функциональный подход описания маршрутов;
- возможность создания полнофункциональных VoIP-приложений. Теперь нам не нужно приложение Queue, мы можем его сами написать, не заморачиваясь с созданием своего собственного модуля для asterisk'а;
- вынесли логику формирования, управления звонками на сторону сервера VoIP-приложений, тем самым сделав из asterisk'а Mediahub, что позволило повысить производительность VoIP-системы в целом;
- возможность использовать достаточно простой, расширяемый и очень гибкий скриптовый язык для создания VoIP-приложений;
- расширили возможности интеграции с внешними системами из VoIP-приложений.
Кому как, а мне пока все нравится.
local fmt = function(str, tab) return (str:gsub('(%${[^}{]+})', function(w) local mark = w:sub(3, -2) return (mark:gsub('(.+)',function(v) local out = tab[v] or v return ("('%s')"):format(out) end)) end))endlocal syntax = function(str) return (str:gsub('([^;]+)=>([^;]+)',function(p,r) return ([[ if %s then %s end ]]):format(p,r) end))endlocal routes = function(...) local conf, content = ... local f, err = io.open(conf, "r") if io.type(f) ~= 'file' then log.warn(err) -- Глобальный LOG объект доступный благодаря Lunapark'у return "" else content = f:read('*all') end f:close() return contentend-- Считываем наши правила в переменную ruleslocal rules = routes('routes.conf')-- Очищаем все обработчики, причем таким образом очистятся только неименные обработчики событий-- Это даст возможность не затирать цепочку выполнения при сигналах HUP/QUITami.removeEvents('*')-- Обработчик события создания нового каналаami.addEvents { ['newchannel'] = function(e) -- Условия, только каналы с набором и каналы с контекстом users if (e['Context'] and e['Context']:match('users')) and e['Exten'] then local step -- Переменная, указывающая на выполнении, какого FastAGI приложения мы находимся local count = 0 -- Будущий порядковый номер FatsAGI приложения в цепочке выполнения -- Парсим маршруты для текущего окружения канала local code, err = loadstring(syntax(fmt(rules,e))) -- В описании маршрутов нет ошибок, двигаемся дальше if type(code) == 'function' then -- Проксируем будущие FastAGI приложения setfenv(code,setmetatable({indexes = {}},{__index = function(t,k) -- Вот они последствия кооперативности return coroutine.wrap( function(...) local prms = {} -- Будущие параметры FastAGI приложения local owner = t -- Копия окружения local event = e -- Копия таблицы event local thread = coroutine.running() -- ID текущей сопрограммы -- Парсим параметры и приводим к виду URI for p,v in pairs({...}) do if type(v) == 'table' then for key, val in pairs(v) do table.insert(prms,("%s=%s"):format(key,val)) end else table.insert(prms,("%s=%s"):format(p,v)) end end -- Если это не первое FastAGI приложение в цепочке if step then -- Запоминаем предыдущее перед этим local last = ("%s"):format(step) -- Добавляем ИМЕННЕ обработчики события UserEvent по доп. условиям -- И записываем в таблицу indexes(в окружении) их порядковые номера -- Именные обработчики требуют последующего удаления самостоятельно table.insert(owner['indexes'],ami.addEvent('UserEvent',function(evt) -- Ловим событие AGIStatus указывающее на завершение приложения -- Если это предыдущее перед нами, пробуждаем сопрограмму if (evt['Channel'] and evt['Channel'] == event['Channel']) and (evt['UserEvent'] and evt['UserEvent']:match('AGIStatus')) and (evt['Script'] and evt['Script'] == last) then -- Соответствие порядкового номера нашей сопрограмме -- В цепочке может быть вызов одного приложения несколько раз -- Это позволит выполнять сопрограммы в порядке их определения if owner['indexes'][count] == thread then if coroutine.status(thread) ~= 'dead' then coroutine.resume(thread) end end end end,thread)) -- Устанавливаем маркер текущего FastAGI приложения step = k -- Приостанавливаем сопрограмму coroutine.yield() else -- Здесь обрабатывается первое FastAGI приложение в цепочке local index -- Индекс для обработчика Hangup события -- Устанавливаем маркер текущего FastAGI приложения step = k -- Добавляем ИМЕННОЙ обработчик события Hangup для канала -- В этом месте подчищаем за собой index = ami.addEvent('Hangup',function(evt) if evt['Channel'] and evt['Channel'] == event['Channel'] then -- Удаляем обработчик событие Hangup по ранее запомненному индексу ami.removeEvent('Hangup',index) -- Удаляем все обработчики цепочек выполнения по индексу for _,v in pairs(owner['indexes']) do ami.removeEvent('UserEvent',v) end -- Делаем приятно сборщику мусора owner = nil end end,thread) end -- По средствам AMI выставляем переменную для канала и вызова в цепочке ami.setvar{ Value = table.concat(prms,'&'), Channel = event['Channel'], Variable = 'PRMS' } -- Перенаправляем канал на AGI-приложение через контекст default ami.redirect{ Exten = k, Priority = 1, Channel = event['Channel'], Context = 'default' } -- Выставляем индекс приложения count = count + 1 end) end}))() else -- Если что-то пошло не так log.warn(err) end end end}function Hangup(...) local app, channel = ... -- В этом отличие от pbx_lua app.verbose(('The Channel %s does not match by routing rules'):format(channel.get('CHANNEL'))) app.hangup() endfunction Dial(...) local app, channel = ... local leg = app.agi.params['callee'] or '' app.verbose(('Trying to make a call from %s to %s'):format( channel.get('CALLERID(num)'), leg:match('^[^/]+/([^%-]+)')) ) app.dial(leg) end