Русский
Русский
English
Статистика
Реклама

Asyncio

Магия WebPush в Mozilla Firefox. Взгляд изнутри

07.07.2020 10:10:24 | Автор: admin

Безусловно одной из самых популярных технологий доставки оповещений на устройства пользователей являются Push уведомления. Технология такова, что для её работы необходим постоянный доступ к интернету, а именно доступ к серверам, на которых регистрируются устройства пользователя для получения уведомлений. В данной статье мы рассмотрим весь спектр механизмов технологии WebPush уведомлений, спрятанных за словами WebSocket, ServiceWorker, vapid, register, broadcast, message encryption и т.д. Основной причиной побудившей меня к реверсу и изучению механизма, являлась необходимость доставки уведомлений мониторинга на рабочие места техподдержки, находящиеся в закрытом сегменте сети без доступа в интернет. И да, это возможно! Подробности под катом.


Disclaimer


В статье рассматривается режим доставки уведомлений пользователям в рамках использования браузера Mozilla Firefox. Это связано с тем, что на данный момент это единственный продукт позволяющий менять настройки push серверов используемых по умолчанию. Настройки браузеров Google Chrome, Chromium и производных в целях безопасности жёстко "зашиты" производителем в коде продукта.


Статья делится на две части


  • Теоретическая информация
  • Практические заметки для реализации механизма WebPush уведомлений

Используемые технологии и термины


WebSocket


Транспортным ядром системы Push уведомлений является протокол WebSocket, позволяющий в рамках стандартного HTTP/HTTPS подключения к Web серверу установить постоянный двусторонний канал связи между клиентом и сервером. В рамках установленного канала связи могут использоваться любые, в том числе бинарные, протоколы клиент-серверного взаимодействия заложенные разработчиками сервиса.


ServiceWorker


ServiceWorker это внешний автономный механизм обработки и реакции на события, интегрирующийся в браузер путём загрузки логики обработки событий в событийную машину браузера. Перечень событий жёстко зафиксирован в коде браузера и не подлежит быстрому изменению. ServiceWorker по своей сути является частью исполняемого кода, выполняющегося вне контекста пользовательской сессии. Это важное условие, которое обеспечивает безопасность пользовательских данных. Прямое взаимодействие пользователя с ServiceWorker практически невозможно.


VAPID


Механизм формирования авторизационных заголовков для идентификации промежуточных серверов, инициирующих отравку сообщений пользователю.
спецификация VAPID


WebPush


Механизм доставки сообщений до получателя.
Набор документов и спецификаций по WebPush


Workflow


Документации по WebPush довольно много (см. спойлер), но она существует только в парадигме
Client <-> Push Service <-> Application


Подробные спецификации по работе механизма в продуктах Google и Mozilla

Модель взаимодействия предполагает следующую схему.
image


Таким образом создаётся впечатление, что для работы механизма как минимум нужны специализированные сервисы регистрации и приёма/доставки уведомлений, а для отправки сообщений обязателен VAPID, спецзаголовки и ключи. Документация не описывает некоторых внутренних механизмов взаимодействия Push сервера и клиента, которые реально влияют на работу.
Попробуем рассмотреть все процессы подробно.


Фаза обработки сообщения


Меня долгое время интересовало, какие типы сообщений можно отправить браузеру и на что он среагирует.
Вопрос отпал когда я включил отладочный режим для push сообщений в браузере и полез в исходный код Firefox.


Блок кода расставил все точки над И
    try {      reply = JSON.parse(message);    } catch (e) {      console.warn("wsOnMessageAvailable: Invalid JSON", message, e);      return;    }    // If we receive a message, we know the connection succeeded. Reset the    // connection attempt and ping interval counters.    this._retryFailCount = 0;    let doNotHandle = false;    if (      message === "{}" ||      reply.messageType === undefined ||      reply.messageType === "ping" ||      typeof reply.messageType != "string"    ) {      console.debug("wsOnMessageAvailable: Pong received");      doNotHandle = true;    }    // Reset the ping timer.  Note: This path is executed at every step of the    // handshake, so this timer does not need to be set explicitly at startup.    this._startPingTimer();    // If it is a ping, do not handle the message.    if (doNotHandle) {      return;    }    // A whitelist of protocol handlers. Add to these if new messages are added    // in the protocol.    let handlers = [      "Hello",      "Register",      "Unregister",      "Notification",      "Broadcast",    ];    // Build up the handler name to call from messageType.    // e.g. messageType == "register" -> _handleRegisterReply.    let handlerName =      reply.messageType[0].toUpperCase() +      reply.messageType.slice(1).toLowerCase();    if (!handlers.includes(handlerName)) {      console.warn(        "wsOnMessageAvailable: No whitelisted handler",        handlerName,        "for message",        reply.messageType      );      return;    }    let handler = "_handle" + handlerName + "Reply";

Ни одно сообщение отправленное через websocket в сторону браузера не будет обработано, если оно не является системным сообщением проверки доступности конечной стороны "{}" или ответом на запрос от Push сервера. Это означает, что Push сервер не имеет никакого способа воздействия на работу клиентской стороны, кроме проверки её доступности. Аналогично, кроме 5 типов ответных сообщений, ничего обработано не будет.


Фаза инициализации


При запуске браузера Firefox, его внутренний механизм автоматически инициирует соединение с WebSocket(WS) сервером находящимся в системной настройке dom.push.serverURL с сообщением следующего формата.


{  "messageType": "hello",  "broadcasts":    {      "remote-settings/monitor_changes": "v923"    },    "use_webpush": True}

При первичной инициализации соединения(первый запуск браузера после установки/запуск нового профиля), поле "uaid" отсутствует, что является сигналом Push серверу о необходимости регистрации нового идентификатора. Как мы видим в разделе "broadcasts" присутствует некая пара "remote-settings/monitor_changes": "v923". Данная пара используется как буфер для хранения информации, отправляемой в сторону сервера при установлении соединения. В продукте Mozilla autopush, промышленной версии webpush сервера используемого на стороне серверов Mozilla, данная переменная используется как идентификатор последнего полученного пользователем сообщения из глобальной очереди сервера. Об изменении данного идентификатора мы поговорим позже. Итак, после принятия сообщения от клиента, сервер отвечает сообщением следующего вида


{  "messageType": "hello",  "status": 200,  "uaid": "b4ab795089784bbb978e6c894fe753c0",  "use_webpush": True}

Поле uaid заполняется либо присланным со стороны клиента значением, либо новым случайным значением, если uaid был неопределён.
На этом фаза первичной инициализации заканчивается и в принципе между клиентом и сервером ничего больше не происходит до момента регистрации, либо разрыва соедиения.


Фаза регистрации


Под фазой регистрации подразумевается процесс, который предполагает готовность приёма событийной машиной браузера специально сформированных сообщений содержащих данные, транслируемые пользователю.
Фаза регистрации состоит из нескольких шагов:


  • Проверка разрешения пользователя на получение информации
  • Регистрация ServiceWorker
  • Получение параметров подписки
  • Формирование ключей шифрования для обслуживания подписки

Проверка разрешений пользователя на получение информации


На данном этапе браузер перед установкой ServiceWorker, запрашивает пользователя и системные настройки: "готов ли пользователь получать сообщения о подписке?"
В случае одного из отказов, установка ServiceWorker прерывается


Регистрация ServiceWorker


Как мы ранее оговаривали, ServiceWorker это автономная страница с обработчиками событий в отдельном пространстве браузера недоступном пользователю.
На работу с этим компонентом накладываются довольно серъёзные ограничения:


  • Загрузка компонента ServiceWorker должна производиться через защищённое соединение(HTTPS), либо в целях отладки с localhost. Возможно включение флагов на "небезопасное" использование внешних ресурсов, но это не рекомендуется
  • соединение WebSocket должно устанавливаться по защищённому соединению(WSS), либо в целях отладки по обычному WS соединению с localhost
  • если в локальной сети имя сервера(ресурса) с которого происходит регистрация ServiceWorker, отличается от полного fqdn ресурса на котором находится ServiceWorker, то будет вызвано исключение о небезопасном вызове
    Жизненный цикл ServiceWorker от Google
    Жизненный цикл ServiceWorker от Mozilla

Процесс подписки


Процесс подписки подразумевает собой запуск механизма формирования ключей шифрования для создания сообщений и последующего расшифровки данных со стороны Push сервера.
Он состоит из следующих этапов:


  • получение публичного ключа шифрования с Web сервера, являющегося промежуточным звеном между Push сервером и приложением осуществляющем отправку сообщений
  • формирование браузером ключей шифрования для защиты сообщений
  • вызов механизма подписки через канал WebSocket и получение точки для отправки сообщений

Получение публичного ключа


Для того, чтобы приложение могло отправить через Push сервер сообщение клиенту, Push сервер должен удостовериться в том, что отправляющий сообщение промежуточный сервер является доверенным для получателя.
Для получения публичного ключа необходимо обратиться к ресурсу обеспечивающему подготовку сообщения для отправки получателю. Первичное обращение к серверу подразумевает собой выдачу публичного ключа для VAPID идентификации


Запуск процесса генерации ключей шифрования


После получения публичного VAPID ключа вызывается процесс подписки ServiceWorker. Запущенный процесс подписки, используя в качестве идентификатора публичный VAPID ключ, формирует сессионный набор ключей шифрования (приватный ключ, публичный ключ, авторизационный ключ). Сессионный набор публичных ключей является экспортируемым и после окончания подписки может быть получен из пользовательской сессии.


Получение точки для отправки сообщений


После формирования ключей шифрования вызывается процесс внутри браузера называемый register. В сторону Push сервера через WebSocket браузер отправляет запрос вида


{  "channelID": "f9cb8f1c-05e0-403f-a09b-dd7864a03eb7",  "messageType": "register",  "key": "BO_C-Ou.......zKu2U4HZ9XeElUIdRfc6EBbRudAjq4="}

Это запрос на регистрацию соединения. В качестве параметров регистрации передаётся уникальный номер канала, формируемый каждый раз заново при старте процесса регистрации, а также публичный VAPID ключ сервера, через который будет производиться предварительная обработка и шифрование сообщений.
В ответ на запрос регистрации браузер ожидает сообщение вида


{  "messageType": "register",  "channelID": "f9cb8f1c-05e0-403f-a09b-dd7864a03eb7",  "status": 200,  "pushEndpoint": "https://webpush.example.net/wpush/f9cb8f1c-05e0-403f-a09b-dd7864a03eb7/",  "scope": "https://webpush.example.net/"}

В данном сообщении содержится адрес конечной точки сформированный Push(WebSocket) сервером, на которую необходимо отправить зашифрованное сообщение для получения его пользователем. Для передачи сообщения получателю, между WEB сервером, принимающим внешние запросы и WS сервером, отправляющим оповещения, должна быть организована логическая связь.


Итого по окончании процесса регистрации и подписки мы имеем следующий набор данных:


Браузер:


  • приватный ключ шифрования сообщений
  • публичный ключ шифрования сообщений
  • ключ авторизации(DH)
  • конечная точка для доставки сообщений получателю
  • номер канала зарегистрированный на WebSocket сервере
  • идентификатор клиента внутри WS соединения
  • публичный ключ WebPush сервера

WebPush сервер:


  • публичный ключ WebPush сервера
  • приватный ключ WebPush сервера

Push(WebSocket) сервер:


  • публичный ключ WebPush сервера
  • адрес конечной точки клиента
  • номер канала клиента привязанный к конечной точке
  • идентификатор клиента внутри WS соединения

Из всего набора данных самым странным выглядит WebPush сервер. Я долго не мог понять каким образом происходит весь процесс доставки сообщения до пользователя, но после реверса всех механизмов, а также дебага autopush получилась следующая схема:


  • некто хочет отправить сообщение в браузер пользователя
  • для защиты сообщения необходимо извлечь из браузера настройки текущей подписки к Push серверу (конечную точку для отправки сообщения, публичный ключ шифрования сообщения, ключ авторизации)
  • полученные настройки передаются на промежуточный WebPush сервер вместе с текстом сообщения
  • промежуточный WebPush сервер формирует авторизационный JWT токен, содержащий время создания сообщения, адрес администратора WebPush сервера, время действия сообщения и подписывает его при помощи своего приватного ключа
  • промежуточный WebPush сервер производит шифрование сообщения при помощи публичного ключа и ключа авторизации из браузера
  • промежуточный WebPush сервер вызывает конечную точку полученную из браузера, передавая в неё связку JWT токен+публичный ключ для их проверки в заголовке Authorization, а также бинарный массив зашифрованного сообщения в теле запроса
  • Push сервер по вызываемой конечной точке производит привязку запроса к каналу получателя
  • Push сервер проверяет валидность JWT токена
  • Push сервер конвертирует бинарный массив принятых данных в base64, формирует сообщение типа "notification" с каналом получателя, ставит сообщение в очередь, после чего механизм контроля очереди отправляет сообщение по WebSocket каналу в сторону клиента

Здесь мы прервём процесс для описания формата сообщения типа "notification".
Дело в том, что формат сообщения типа "notification" имеет два варианта. От того, что получил браузер и передал в ServiceWorker зависит логика работы по получению и отображению сообщения. Первый вариант, это "пустое" сообщение:


{  "messageType": "notification",  "channelID": "f7dfeed8-f868-47ca-a066-fbe629879fbf",  "version": "bf82eea1-69fd-4be0-b943-da96ff0041fb"}

"Пустое" сообщение как бы говорит браузеру "Эй, тебя тут ждут данные, приходи за ними". Браузер по логике работы должен сделать GET запрос на URL конечной точки и получить первую запись из очереди для отображения её пользователю. Схема конечно хорошая, только совсем небезопасная. В большинстве случаев она не применяется.
Вторым вариантом является передача данных совместно с сообщением.


{  "messageType": "notification",  "channelID": "f7dfeed8-f868-47ca-a066-fbe629879fbf",  "version": "bf82eea1-69fd-4be0-b943-da96ff0041fb",  "data": "I_j8p....eMlYK6jxE2-pHv-TRhqQ",  "headers":  {    "encoding": "aes128gcm"  }}

Браузер реагирует на поле headers в структуре сообщения типа "notification". При наличии этого поля автоматически включается механизм обработки зашифрованных данных из поля "data". На основании номера канала, событийная машина браузера выбирает набор ключей шифрования и пытается расшифровать полученные данные. После расшифровки расшифрованные данные передаются в обработчик "push" сообщений ServiceWorker. Как вы успели заметить, сообщение типа "notification" имеет поле "version", которое представляет собой уникальный номер сообщения. Уникальный номер сообщения используется в системе доставки и отображения сообщений для дедупликации данных.
Она работает следующим образом:


  • любое принятое сообщение имеющее поле "version" заносится во внутренний реестр исключений
  • корректно принятые и обработанные сообщения остаются в реестре исключений
  • некорректно принятые и не обработанные сообщения из рееста исключений удаляются
    Информация о причинах такого поведения будет ниже.

Продолжим разбор процесса.


  • Если сообщение принято и расшифровано, от браузера в сторону Push сервера формируется новое сообщение с типом "ack", включающее в себя номер канала и номер обработанного сообщения. Это является сигналом удаления сообщения из очереди сообщений для данного канала
  • Если сообщение по какой-то причине не может быть обработано, от браузера в сторону Push сервера формируется новое сообщение с типом "noack", включающее в себя номер канала и номер отвергнутого сообщения. Это является сигналом постановки сообщения на повторную доставку через 60 секунд

Вернёмся к сообщениям с типом "broadcast". Продукт "autopush" от Mozilla использует их в качестве хранилища на стороне клиента, для определения последнего отправленного клиенту сообщения. Дело в том, что отправка сообщения типа "broadcast" со сменой значения ключа "remote-settings/monitor_changes", приводит к срабатыванию механизма, сохраняющего полученное значение в хранилище браузера. При потере соединения или каком-то программном сбое, сохранённое значение будет автоматически передано на сторону Push сервера в момент инициализации соединения и будет являться начальной точкой для последующей переотправки пропущенных сообщений из очереди.


Описывать сообщения типа "unregister" смысла не имеет, т.к. оно ни на что, кроме удаления сессии не влияет.


К чему же было приведено подробное описание всех процессов происходящих при Push оповещениях?
Смысл в том, что на основании этих данных можно довольно быстро построить свой Push сервер с необходимым функционалом. Продукт "autopush" от Mozilla является продуктом промышленного масштаба, который рассчитан на многомилионные подключения клиентов. В его составе присутствует TornadoDB, PyPy, CPython. К сожалению движок написан на Python 2.7, который массово выводится из эксплуатации.
Нам же нужен небольшой сервер с простым, желательно асинхронным кодом. А именно, без промежуточного WebPush сервера, VAPID, лишних межсерверных проверок и прочего. Сервер должен уметь привязывать клиентские подключения Push сервера к именам пользователей, а также иметь возможность организации эндпоинтов и webhook'ов для отправки сообщений этим пользователям.


Пишем свой сервер


У нас есть следующие данные:


  • Пользователь с браузером Mozilla Firefox;
  • Точка регистрации пользователя на сервере уведомлений для получения этих самых уведомлений;
  • WebSocket сервер, обслуживающий подключения движка уведомлений, встроенного в браузер;
  • Web сервер, формирующий интерфейс для пользователя и обслуживающий точки для отправки уведомлений;

Шаг 1
Первым делом мы должны подготовить WebSocket сервер, обслуживающий описанную ранее логику работы и подключения к нему клиентов.
В качестве фреймворка для реализации логики сервера используется AsyncIO Python.
Изначально стоит сразу разделить понятие "регистрация" для WebSocket движка браузера и понятие "регистрация" на сервере уведомлений. Разница заключается в том, что "регистрация" WebSocket движка браузера происходит автоматически без участия пользователя, в то время как разрешение на "регистрацию" на сервере уведомлений это осознанное действие со стороны пользователя.
Первичной задачей WebSocket сервера является принятие входящего соединения и его контроль на протяжении всего времени подключения браузера к серверу. Поэтому мы должны принять внешнее соединение, сделать его привязку к каналу и сохранить для дальнейшей работы.


После принятия соединения сервером мы подключаем на него обработчик событий, внутри которого будет содержаться вся необходимая для работы информация и функционал отправки сообщений.
Для удобства работы мы используем два META справочника, один для списка соединений, второй для детальной информации о соединении.


WebSocket Handler
# внешнее имя сервераSERVERNAME='webpush.example.net'# вебсокетыWS = set()# каналыCHANNELS = dict()async def register(websocket):    try:        WS.add(websocket)        websocket.handler = PushConnectionHandler(websocket)    except Exception as ex:        logger.error('Register exception: %s' % ex)async def unregister(websocket):    try:        CHANNELS.remove(websocket.handler.channel_id)        WS.remove(websocket)        logger.debug('UnregisterWebsocket[websocket]: %s'%websocket)    except Exception as ex:        logger.error('Unregister exception: %s' % ex)async def pushserver(websocket, path):    await register(websocket)    try:        await websocket.send(json.dumps({}))        async for message in websocket:            data = json.loads(message)            logger.info('Incoming message[data]: %s => %s '%(message, data))            if message == '{}':                await websocket.send(json.dumps({}))            elif 'messageType' in data:                logger.info('Processing WebSocket Data')                # Подключение к вебсокету из браузера                if data['messageType'] == 'hello':                    # Если это первичное подключение, то нужно задать идентификатор подключения и вернуть его браузеру                    if 'uaid' not in data:                        data['uaid'] = '%s' % uuid.uuid4()                    # Принудительно включить webpush                    if 'use_webpush' not in data:                        data['use_webpush'] = True                    helloreturn = {                        "messageType": "hello",                        "status": 200,                        "uaid": data['uaid'],                        "use_webpush": data['use_webpush']                        }                    websocket.handler.uaid = data['uaid']                    if 'broadcasts' in data:                        websocket.handler.register_broadcasts(data['broadcasts'])                    logger.debug('Hello websocket: %s' % vars(websocket.handler))                    CHANNELS.update({ data['uaid'] : websocket.handler })                    await websocket.send(json.dumps(helloreturn))                elif data['messageType'] == 'register':                    # Регистрация serviceWorker                    logger.debug('Register[data]: %s'%data)                    registerreturn = {                        "messageType": "register",                        "channelID": data['channelID'],                        "status": 200,                        "pushEndpoint": "https://%s/wpush/%s/" % (SERVERNAME,data['channelID']),                        "scope": "https://%s/" % SERVERNAME                    }                    websocket.handler.channel_id = data['channelID']                    if 'key' in data:                        websocket.handler.server_public_key = data['key']                    logger.debug('Register[registerreturn]: %s'%registerreturn)                    CHANNELS.update({ data['channelID'] : websocket.handler })                    await websocket.send(json.dumps(registerreturn))                elif data['messageType'] == 'unregister':                    unregisterreturn = {                        "messageType": "unregister",                        "channelID": data['channelID'],                        "status": 200                    }                    if data['channelID'] in CHANNELS:                        del CHANNELS[data['channelID']]                    logger.debug('Unregister[unregisterreturn]: %s'%unregisterreturn)                    logger.debug('Unregister[CHANNELS]: %s'%CHANNELS)                    await websocket.send(json.dumps(unregisterreturn))                elif data['messageType'] == 'ack':                    logger.debug('Ack: %s' % data)                    for update in data['updates']:                        if CHANNELS[update['channelID']].mqueue.count(update['version']) > 0:                            CHANNELS[update['channelID']].mqueue.remove(update['version'])                    logger.debug('Mqueue for channel %s is %s' % (websocket.handler.channel_id, websocket.handler.mqueue))                    await websocket.send('{}')                elif data['messageType'] == 'nack':                    await websocket.send('{}')            else:                logger.error("unsupported event: {}", data)    finally:        await unregister(websocket)

Как вы видите, никакой большой магии в WebSocket сервисе не заложено. Обрабатывается только список основных команд внутри WS сессии согласно спецификации.


Шаг 2
Следующим шагом нужно обеспечить регистрацию клиентской сессии на сервере оповещений.
Для этого необходимо задействовать механизм регистрации и установки ServiceWorker. В сети довольно много примеров, поэтому я взял готовый пример из сети и изменил его, добавив расширение логики.


main.js
'use strict';let isSubscribed = false;let swRegistration = null;var wait = ms => new Promise((r, j)=>setTimeout(r, ms));function urlB64ToUint8Array(base64String) {    const padding = '='.repeat((4 - base64String.length % 4) % 4);    const base64 = (base64String + padding)        .replace(/\-/g, '+')        .replace(/_/g, '/');    const rawData = window.atob(base64);    const outputArray = new Uint8Array(rawData.length);    for (let i = 0; i < rawData.length; ++i) {        outputArray[i] = rawData.charCodeAt(i);    }    return outputArray;}function subscribeUser() {    const applicationServerPublicKey = localStorage.getItem('applicationServerPublicKey');    const applicationServerKey = urlB64ToUint8Array(applicationServerPublicKey);    swRegistration.pushManager.subscribe({            userVisibleOnly: true,            applicationServerKey: applicationServerKey        })        .then(function(subscription) {            console.log('User is subscribed.', JSON.stringify(subscription));            localStorage.setItem('sub_token',JSON.stringify(subscription));            isSubscribed = true;            fetch(subscription.endpoint, {                method: 'POST',                cache: 'no-cache',                body: JSON.stringify(subscription)            })            .then(function(response) {                console.log('Push keys Update Response: ' + JSON.stringify(response));            })        })        .catch(function(err) {            console.log('Failed to subscribe the user: ', err);        });}function unsubscribeUser() {    swRegistration.pushManager.getSubscription()        .then(function(subscription) {            if (subscription) {                return subscription.unsubscribe();            }        })        .catch(function(error) {            console.log('Error unsubscribing', error);        })        .then(function() {            console.log('User is unsubscribed.');            isSubscribed = false;        });}function initializeUI() {    // Set the initial subscription value    swRegistration.pushManager.getSubscription()        .then(function(subscription) {            isSubscribed = !(subscription === null);            if (isSubscribed) {                console.log('User IS subscribed. Unsubscribing.');                subscription.unsubscribe();            } else {                console.log('User is NOT subscribed. Subscribing.');                subscribeUser();            }        });    (async () => {    await wait(2000);    console.warn('Wait for operation is ok');         swRegistration.pushManager.getSubscription()                .then(function(subscription) {                        isSubscribed = !(subscription === null);                        if (!isSubscribed) {                                console.log('ReSubscribe user');                                subscribeUser();                        }                })    })()}console.log(navigator);console.log(window);if ('serviceWorker' in navigator && 'PushManager' in window) {    console.log('Service Worker and Push is supported');    navigator.serviceWorker.register("/sw.js")        .then(function(swReg) {            console.log('Service Worker is registered', swReg);            swRegistration = swReg;            initializeUI();        })        .catch(function(error) {            console.error('Service Worker Error', error);        });} else {    console.warn('Push messaging application ServerPublicKey is not supported');}$(document).ready(function(){    $.ajax({        type:"GET",        url:'/subscription/',        success:function(response){            console.log("response",response);            localStorage.setItem('applicationServerPublicKey',response.public_key);        }    })});

Основная точка на которую стоит обратить внимание это автоматическая переподписка пользователя при посещении страницы. Работа сервера рассчитана на отправку уведомления по имени пользователя или любому другому идентификатору пользователя, поэтому механизм переподписки всегда будет формировать новую подписку для идентификатора пользователя. Это избавляет нас от проблем с "потерянными" подписками на сервере.


sw.js
'use strict';/* eslint-disable max-len *//* eslint-enable max-len */function urlB64ToUint8Array(base64String) {  const padding = '='.repeat((4 - base64String.length % 4) % 4);  const base64 = (base64String + padding)    .replace(/\-/g, '+')    .replace(/_/g, '/');  const rawData = window.atob(base64);  const outputArray = new Uint8Array(rawData.length);  for (let i = 0; i < rawData.length; ++i) {    outputArray[i] = rawData.charCodeAt(i);  }  return outputArray;}function getEndpoint() {  return self.registration.pushManager.getSubscription()  .then(function(subscription) {    if (subscription) {      return subscription.endpoint;    }    throw new Error('User not subscribed');  });}self.popNotification = function(title, body, tag, icon, url) {  console.debug('Popup data:', tag, body, title, icon, url);  self.registration.showNotification(title, {      body: body,      tag: tag,      icon: icon    });  self.onnotificationclick = function(event){      console.debug('On notification click: ', event.notification.tag);      event.notification.close();      event.waitUntil(        clients.openWindow(url)      );  };}var wait = ms => new Promise((r, j)=>setTimeout(r, ms));self.addEventListener('push', function(event) {   console.log('[Push]', event);  if (event.data) {    var data = event.data.json();    var evtag = data.tag || 'notag';    self.popNotification(data.title || 'Default title', data.body || 'Body is not present', evtag, data.icon || '/static/images/default.svg', data.url || '/getevent?tag='+evtag);  }  else {    event.waitUntil(      getEndpoint().then(function(endpoint) {        return fetch(endpoint);      }).then(function(response) {          return response.json();      }).then(function(payload) {          console.debug('Payload',JSON.stringify(payload), payload.length);          var evtag = payload.tag || 'notag';          self.popNotification(payload.title || 'Default title', payload.body || 'Body is not present', payload.tag || 'notag', payload.icon || '/static/images/default.svg', payload.url || '/getevent?tag='+evtag);      })    );  }});self.addEventListener('pushsubscriptionchange', function(event) {  console.log('[Service Worker]: \'pushsubscriptionchange\' event fired.');  const applicationServerPublicKey = localStorage.getItem('applicationServerPublicKey');  const applicationServerKey = urlB64ToUint8Array(applicationServerPublicKey);  event.waitUntil(    self.registration.pushManager.subscribe({      userVisibleOnly: true,      applicationServerKey: applicationServerKey    })    .then(function(newSubscription) {      // TODO: Send to application server      console.log('[Service Worker] New subscription: ', newSubscription);    })  );});

Согласно представленного кода, Javascript файл main.js инициирует при своём запуске получение публичного VAPID ключа и принудительно вызывает подписку браузера на оповещения.
Для простоты отладки WebSocket сервер во время регистрации подписки отдаёт URL вида: https://webpush.example.net/wpush/ChannelGuid.
Откуда же берётся имя пользователя в сервере уведомлений. Вся суть в том, что инициирование подписки /subscription/ происходит полуавтоматически. Соответственно в зависимости от того, что вы хотите увидеть в качестве идентификатора пользователя, вы можете передать после оформления подписки в момент передачи ключей.
Это происходит путём вызова метода POST по адресу WebPush endpoint присланного сервером из модуля ServiceWorker.


function subscribeUser() {    const applicationServerPublicKey = localStorage.getItem('applicationServerPublicKey');    const applicationServerKey = urlB64ToUint8Array(applicationServerPublicKey);    swRegistration.pushManager.subscribe({            userVisibleOnly: true,            applicationServerKey: applicationServerKey        })        .then(function(subscription) {            console.log('User is subscribed.', JSON.stringify(subscription));            localStorage.setItem('sub_token',JSON.stringify(subscription));            isSubscribed = true;            fetch(subscription.endpoint, {                method: 'POST',                cache: 'no-cache',                body: JSON.stringify(subscription)            })            .then(function(response) {                console.log('Push keys Update Response: ' + JSON.stringify(response));            })        })        .catch(function(err) {            console.log('Failed to subscribe the user: ', err);        });}

Как было написано ранее, в сервере используется обработчик точек подключения. Это отдельная часть кода в скрипте сервера, но обрабатывающая вместо WebSocket, клиентский WEB трафик от браузера.
В качестве обрабатываемого заголовка, содержащего идентификатор пользователя, в базовом варианте сервиса использовался basiclogin полученный при авторизации пользователя в LDAP.


        location ~ /subscription|/pushdata|/getdata|/wpush|/notify {            proxy_pass http://localhost:8090;            proxy_set_header LDAP-AuthUser $remote_user;            proxy_set_header 'X-Remote-Addr' $remote_addr;            add_header "Access-Control-Allow-Origin" "*";            add_header Last-Modified $date_gmt;        proxy_hide_header "Authorization";            add_header Cache-Control 'no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0';            if_modified_since off;            expires off;            etag off;        }

Для упрощения демонстрации я добавил проверку в заголовках внешнего адреса пользователя. Также может использоваться токен авторизации, либо любой другой метод привязки пользователя.


USERIDHEADERNAME='X-Remote-Addr'async def update_channel_keys(request, data):    channel = request.path.replace('wpush','').replace('/','')    logger.debug('update channel keys data: %s'%data)    logger.debug('Update Channel keys Headers: %s' % request.headers)    if USERIDHEADERNAME not in set(request.headers):        return False    basiclogin = request.headers[USERIDHEADERNAME]    logger.debug('Login %s' % basiclogin)    if basiclogin not in LOGINS_IN_CHANNELS:        LOGINS_IN_CHANNELS.update({ '%s'%basiclogin : {} })    LOGINS_IN_CHANNELS['%s'%basiclogin].update({'%s' % channel : {} })    logger.debug('LOGINS_IN_CHANNELS: %s' % LOGINS_IN_CHANNELS)    try:        jdata = json.loads(data)        if 'endpoint' in jdata and 'keys' in jdata:            logger.debug('Saving Keys for Channel: %s => %s' % (channel, jdata))            CHANNELS[channel].register_keys(jdata['keys'])            logger.debug('Registered channel keys %s:' % vars(CHANNELS[channel]))        return True    except Exception as ex:        logger.error('Exception %s'%ex)        return False

Данная функция сохраняет на сервере для текущего канала оповещений пользователя блок ключей и имя пользователя, необходимые для передачи зашифрованного push сообщения в сторону браузера. У пользователя может быть несколько сессий и для каждой из них существует свой набор ключей.


Шаг 3
Сессия зарегистрирована, ключи на сервер переданы, пора отправлять и получать сообщения.
Как я описывал в самом начале статьи, у сервиса оповещений существует два способа доставки сообщений:


  • пустое push уведомление, когда браузер "приходит" в очередь сообщений сам
  • push уведомление содержащее зашифрованные данные.

При корректном формировании сообщения необходимо передать поле tag содержащее уникальный идентификатор сообщения. Если сервер имеет сессионные ключи для клиента в сторону которого передаётся уведомление, то он может зашифровать это уведомление. Если нет, то сервер отправит пустое уведомление и клиент сам придёт за этим уведомлением по полю tag. Логику получения браузером сообщения реализует следущий блок кода в ServiceWorker:


self.addEventListener('push', function(event) {   console.log('[Push]', event);  if (event.data) {    var data = event.data.json();    var evtag = data.tag || 'notag';    self.popNotification(data.title || 'Default title', data.body || 'Body is not present', evtag, data.icon || '/static/images/default.svg', data.url || '/getevent?tag='+evtag);  }  else {    event.waitUntil(      getEndpoint().then(function(endpoint) {        return fetch(endpoint);      }).then(function(response) {          return response.json();      }).then(function(payload) {          console.debug('Payload',JSON.stringify(payload), payload.length);          var evtag = payload.tag || 'notag';          self.popNotification(payload.title || 'Default title', payload.body || 'Body is not present', payload.tag || 'notag', payload.icon || '/static/images/default.svg', payload.url || '/getevent?tag='+evtag);      })    );  }});

В случае отсутствия ключей шифрования для уведомления, необходимо организовать очередь сообщений, через которую ServiceWorker сможет получить все ожидающие его сообщения. Так как мы строим простую реализацию сервера, всё что нам нужно это сохранить сообщения до определённого момента, а потом их удалить. Т.к. сообщения мониторинга ждущие получения больше 10 минут, в большинстве случаев уже не актуальны. При этом можно не хранить в очереди "пустые" сообщения для индикации наличия уведомлений на сервере.


Блок шифрования сообщений передаваемых был взят из кода сервера "autopush", дабы не нарушать совместимости.


Блок шифрования сообщения
    def encrypt_message(self, data, content_encoding="aes128gcm"):        """Encrypt the data.        :param data: A serialized block of byte data (String, JSON, bit array,            etc.) Make sure that whatever you send, your client knows how            to understand it.        :type data: str        :param content_encoding: The content_encoding type to use to encrypt            the data. Defaults to RFC8188 "aes128gcm". The previous draft-01 is            "aesgcm", however this format is now deprecated.        :type content_encoding: enum("aesgcm", "aes128gcm")        """        # Salt is a random 16 byte array.        if not data:            logger.error("PushEncryptMessage: No data found...")            return        if not self.auth_key or not self.receiver_key:            raise WebPushException("No keys specified in subscription info")        logger.debug("PushEncryptMessage: Encoding data...")        salt = None        if content_encoding not in self.valid_encodings:            raise WebPushException("Invalid content encoding specified. "                                   "Select from " +                                   json.dumps(self.valid_encodings))        if content_encoding == "aesgcm":            logger.debug("PushEncryptMessage: Generating salt for aesgcm...")            salt = os.urandom(16)        # The server key is an ephemeral ECDH key used only for this        # transaction        server_key = ec.generate_private_key(ec.SECP256R1, default_backend())        crypto_key = server_key.public_key().public_bytes(            encoding=serialization.Encoding.X962,            format=serialization.PublicFormat.UncompressedPoint        )        if isinstance(data, str):            data = bytes(data.encode('utf8'))        if content_encoding == "aes128gcm":            logger.debug("Encrypting to aes128gcm...")            encrypted = http_ece.encrypt(                data,                salt=salt,                private_key=server_key,                dh=self.receiver_key,                auth_secret=self.auth_key,                version=content_encoding)            reply = CaseInsensitiveDict({                'data': base64.urlsafe_b64encode(encrypted).decode()            })        else:            logger.debug("Encrypting to aesgcm...")            crypto_key = base64.urlsafe_b64encode(crypto_key).strip(b'=')            encrypted = http_ece.encrypt(                data,                salt=salt,                private_key=server_key,                keyid=crypto_key.decode(),                dh=self.receiver_key,                auth_secret=self.auth_key,                version=content_encoding)            reply = CaseInsensitiveDict({                'crypto_key': crypto_key,                'data': base64.urlsafe_b64encode(encrypted).decode()            })            if salt:                reply['salt'] = base64.urlsafe_b64encode(salt).strip(b'=')        reply['headers'] = { 'encoding': content_encoding }        return reply

Самым интересным в данном блоке кода является то, что на каждое сообщение генерируется новая уникальная пара ключей, которая используются при шифровании сообщения. В остальном обычная реализация механизма шифрования данных.


Полноценную логику вновь реализованного сервера описывать в статье можно долго, поэтому по ссылке вы сможете найти готовый webpush сервер, который выполняет всю необходимую работу. Я не стал включать в логику работы webpush сервера блок обработки broadcast запросов и получения данных из очереди, т.к. посчитал это излишним(криптография работает стабильно, поэтому незачем перегружать систему неиспользуемым функционалом). В случае необходимости данный функционал реализуется очень быстро.


WebPush AsyncIO server


Для развёртывания сервера необходимо:


  • Установить необходимые Python модули, а также настроить nginx по примеру приложенного конфигурационного файла.
  • Поместить содержимое директории web в корень ранее настроенного виртуального сервера
  • Перезапустить/перечитать конфиг nginx
  • В браузере через about:config поменять параметр dom.push.serverURL на адрес wss://ваш.сервер/ws
  • Перед сменой адреса push сервера можно очистить поле dom.push.userAgentID, которое автоматически заполнится если ваш Push сервер работает корректно и принимает соединения.
  • Для тестирования оповещений необходимо зайти на страницу https://ваш.сервер/indexpush.html и открыв окно отладки удостовериться в корректной регистрации ServiceWorker
  • Нажать кнопку "Check Push Notify"
  • Если всё правильно настроено, появится всплывающее сообщение

Как говорилось в начале статьи, система проектировалась для оперативного оповещения службы техподдержки. В зависимости от требуемой логики поведения, в Zabbix можно использовать следующий обработчик webhook


var req = new CurlHttpRequest();req.AddHeader('Content-Type: application/x-www-form-urlencoded');var jv = JSON.parse(value);if (jv.recovery_nstatus == '{EVENT.RECOVERY.VALUE}') {jv.icon = '/static/images/problem/' + jv.event_severity + '.svg';}else{jv.icon = '/static/images/recovery/' + jv.event_severity + '.svg';}value = JSON.stringify(jv);Zabbix.Log(2, 'webhook request value='+value);req.Post('https://webpush.server.net/pushdata/',  value);Zabbix.Log(2, 'response code: '+req.Status());return JSON.stringify({  'tags': {    'endpoint': 'webpush'  }});

c параметрами


Ключ Значение
url /zabbix/tr_events.php?triggerid={TRIGGER.ID}&eventid={EVENT.ID}
recipient {ALERT.SENDTO}
title {ALERT.SUBJECT}
body {ALERT.MESSAGE}
event_severity {EVENT.NSEVERITY}
recovery_nstatus {EVENT.RECOVERY.VALUE}

Если добавить красивых картинок из FontAwesome, то получится вот так


WebPush сервер поддерживает следующие вызовы :


  • POST https://webpush.example.net/wpush/channelId сохранение ключей шифрования и имени пользователя
  • GET https://webpush.example.net/wpush/channelId получение тестового сообщения
  • GET https://webpush.example.net/subscription получение публичного VAPID ключа
  • POST https://webpush.example.net/pushdata отправка JSON структуры передаваемой в качестве сообщения в браузер
    {        "url": "http://personeltest.ru/away/habr.com/", // URL на который необходимо перейти при клике        "recipient": login, // Логин или идентификатор пользователя        "title": "Заголовок сообщения",        "body": "Тело сообщения",         "icon": "/static/images/new-notification.png", // путь к иконке сообщения        "version": uuid, // идентификатор сообщения        "tag": uuid, // тег сообщения для получения        "mtime": parseInt(new Date().getTime()/1000) //Время }
    
  • GET https://webpush.example.net/getdata Получение очереди сообщений
  • POST https://webpush.example.net/notify/login Отправка пустого оповещения пользователю
  • POST https://webpush.example.net/notifychannel/channelId Отправка пустого оповещения в канал

Вот в принципе и всё. Надеюсь у вас снялась часть вопросов с тем, как работает WebPush. Спасибо за потраченное время на чтение материала.


Aborche 2020
Aborche

Подробнее..

Что не так с вашей консольной программой?

14.04.2021 12:16:12 | Автор: admin

Мы еще в школе научились вызывать функцию print. Что может пойти не так в консольной разработке? Да, и если бы не растущая сложность программ, проблем бы у нас не было до сих пор. А в реальности то в тексте трудно найти нужную информацию, то он не влезает в экран по ширине и по длине, а от многочисленности цветов рябит в глазах.

Но как часто мы обсуждаем наши повседневные инструменты с точки зрения читабельности, хотя пишем под web и каждый день используем консольные утилиты? Сегодня Андрей Светлов расскажет, что со всем этим делать, и чем он пользуется для консолей. Помимо того, что Андрей CPython Core developer и понемногу развивает Python, в свободное от работы время он эксперт по asyncio, со-автор aiohttp, yarl, multidict и прочим популярным библиотекам.

Помимо всяких мелочей, которые решаются легко, я расскажу и о том, как же нам упростить просмотр. Я не имею в виду режим текстового редактора, у нас интерактивность в основном односторонняя: запустили программу, выбрали команду, поработали. То есть нам должны хорошо, приятно, доходчиво показывать текст, а сами мы обычно мало что вводим во время работы программы.

Информативность

Это самая первая и очевидная проблема. Например, у всем известного Dockera вывод это простыня ровного, скучного и не выделяемого текста. В нем просто трудно ориентироваться:

Для контраста посмотрите на новомодную Githubовскую штучку утилиту для работы с самим Github. Здесь есть стили, цвета, подсветка. Один беглый взгляд на экран, и вы сразу можете выделить, что важно, а что не очень:

Размер шрифта и экрана

Снова тот же Docker. Если экран не очень широкий, если шрифт большой или вертикальный экран, то Docker перестает помещаться и выводится на две строки:

И это еще не худший случай, тут можно о чем-то догадаться. А если у вас широченная таблица, которая начинает схлопываться в 3, 4 и более строчек, то разобраться в ней в таком виде решительно невозможно.

А ведь на консоли уже можно делать то, что давным-давно изобрели в мобильных приложениях responsive design когда размер текста перестраивается в зависимости от размера экрана. Минус только один никто за нас не написал удобные библиотеки-помогайки, всё приходится делать самим.

Scrolling & Pager

Вот хороший пример: на скриншоте Manual page от git супер-популярная, хорошая, и между прочим, очень продуманная программа. Manual page позволяет скроллить вывод вверх-вниз, искать текст с помощью pager, и что-то еще выделять стилями. И если у вас на экране списки файлов, объектов, какие-то большие и длинные таблицы, то это хорошая помощь:

Светлый/темный цвет фона

Было бы здорово, если бы у всех была темная консоль (автомобиль может быть любого цвета, если этот цвет черный). Но жизнь не такая: светлая и тёмная тема это как остроконечники и тупоконечники. Победителя нет, есть бесконечный спор. Но программа должна работать с обоими режимами одинаково хорошо.

Что из этого следует? Возвращаемся к примеру от Github. Вот вывод команды (неважно какой, сейчас это статус моих pull requests) на темном фоне:

И он же со светлой схемой:

Выглядит одинаково хорошо, и к такому выводу нужно стремиться. Но чтобы этого достичь, придется себя ограничивать.

ЦВЕТА И СТИЛИ

Все, что мы можем использовать, чтобы не рябило в глазах пяток цветов и еще несколько стилей:

На этом выбор цветов для нас закончен! Конечно, такая очень узкая палитра не позволяет использовать полноцветные терминалы и точно перенести всё, что ваш UX дизайнер нарисовал в Фотошопе. Но зато у нее есть одно чудесное свойство: если приглядеться, то видно, что, например, желтый это не классический желтый, а немного золотистый, чтобы выглядело хорошо. И все цвета подобраны программой терминала так же. Если меняется тема, то программа снова подскажет, как правильно, хорошо, контрастно и красиво нарисовать этот цвет для терминала.

И пока мы остаемся в этом цветовом диапазоне, с выводом у нас всё будет нормально. Но как только начинаем изобретать что-то свое, начинаются проблемы то, что выглядело хорошо в вашем окружении, у соседа начинает смотреться отвратительно.

Обычно мы, конечно, выбираем понятные всем цвета:

  • Зеленый все хорошо;

  • Красный плохо;

  • Желтый warning.

Но если хотим большего, у нас есть подсказка для команды LS есть соглашение, каким цветом мы будем выводить файлы и папки. Это обеспечивается двумя переменными среды. Первая исторически появилась в виде LSCOLORS и рассказывает, как рисовать файлы и папки: по две буквы на одну позицию. Позиция это папка (нормальный файл, сокет, что-то еще). В документации или в интернете это всё есть. Первая буква отвечает за цвет шрифта (букв), вторая за цвет фона. Я попытался раскрасить так, как у меня закодировано на моей рабочей станции

Второй вариант немножко похитрее: помимо обозначений для папок и символических ссылок, можно еще рассказывать, какие окончания файлов рендерить:

И если у вас вывод, похожий на файлы, и там можно применить эти цвета, стоит написать простенький парсер и самому отформатировать вывод. Достоинство всё то же: эти цвета для LS настроены темой вашего терминала. В терминале они выглядят хорошо, естественно, сбалансировано и ваша программа при их использовании будет выглядеть так же.

Смайлики

Смайлики нельзя недооценивать, это очень хорошая вещь простенький значок, но позволяет быстро сориентироваться на экране:

Но нужно помнить, что смайликов очень и очень много, таблица Unicode также очень большая в результате не все символы отображаются одинаково хорошо. Например, смайлик улыбающийся человечек на Windows-консоли, как правило, не отображается: ?

Поэтому выбирайте простые символы, смотрите, как они рендерятся в разных режимах и проверяйте это на всех платформах (Windows, MAC, Ubuntu). И математические символы в том числе. Везде есть хитрые смайлики, с которыми могут быть проблемы.

Shell

Еще нужно помнить, что терминал существует не сам по себе. Консольные программы, в нем запускаются под разными shell: sh, bash, zsh, fish, cmd.exe, powershell или еще какие-то. Программа должна работать с выбранным shell без проблем, в том числе на Windows. Но на практике мы видим разницу в том, как shell авто-дополняет ввод и как (и в каком терминале) выводятся символы. Поэтому проверяйте и на своем shell, и на тех, которые будут у пользователей.

TTY навсегда?

Помимо shell и интерактивного режима, на который в консолях тратится большая часть усилий по пользовательскому дизайну, программы у нас могут запускаться и без терминала. И когда мы, например, перенаправляем вывод через py в grep, чтобы что-нибудь там поискать, или записываем в файл, или запускаем из-под cron, HTTP-сервера или еще чего-нибудь функция os.isatty() будет возвращать false:

В таких случаях нельзя выводить ни цвет, ни стили, ни размер экрана. Потому что размера экрана нет, а при попытке его спросить вы получите исключение. Но вывод должен работать и без терминала. Поэтому лучше дописать еще один тест и убедиться, что всё хотя бы базово работает даже без терминала.

Windows, любовь моя

К сожалению, мир консольных программ делится не только на темный и светлый фон, а еще на Windows и всех остальных. Если на posix системах (тех же MAC и Linux) всё весьма похоже, то на Windows есть много отличий, например:

  • less more. Стандартная прокрутка less отсутствует, вместо нее есть куда более гадкое и неудобное more.

  • \n \r\n. Возврат каретки другой.

  • dim / gray. Серого цвета нет (но на MAC, кстати, тоже бардак по поводу цвета, поэтому и надо всё проверять).

  • ANSI escape символы, которые как раз делают расцветку и прочие полезные вещи, по умолчанию выключены, но это легко поправить.

  • -\|/. Многие символы, как я говорил, не работают. Например, здесь наш специалист по UX создал дизайнерский спиннер у нас он должен крутиться треугольниками, а не палочками, как у всех остальных. Почему бы и нет? Но на Windows он крутится одинаковыми квадратиками, то есть не работает.

Инструменты

Расскажу теперь, какими чудо-инструментами можно (и нужно) пользоваться при создании консольных программ и их интерфейсов. Некоторые инструменты действительно чудо там и молоток есть, и напильник, иногда и кувалда встречается. Единственный нюанс. Так как консольная утилита вещь маргинальная и нишевая, ее разработчики создают сами для себя, то инструментарий может быть не таким классным, каким бы он мог быть, и не таким доведенным до ума, как для web-программ, например.

CLICK

Я очень рекомендую Click от славного парня Армена Ронахена. Это инструмент со своими особенностями, но он гораздо лучше и мощнее, чем встроенный в Python argparse. Если вы сомневаетесь, используйте Click.

В нем есть набор утилит (функций), чтобы выводить тексты со стилями можно печатать или накладывать стиль, чтобы получилась строка с анти-последовательностями. Можно снимать стили, использовать pager:

Кроме того, у Click есть маленькая, но очень приятная и удобная фича он автоматически убирает стили для не-терминала (non-TTY). Click сам понимает, когда вывод идет не на полноценный терминал, а, например, куда-нибудь в файл он автоматически снимает все стили и делает click.unstyle. Конечно, вы можете сделать unstyling сами, вместо использования click. Но в любом случае избегайте перенаправления в файл покореженного текста с кучей непонятных значков.

PYTHON PROMPT TOOLKIT

Второй инструмент чудесная штука, которая используется, например, в IPython, BPython, в других shell это инструмент для создания полноценных приложений. Но нас сейчас интересуют вопросы ввода-вывода и здесь Prompt Toolkit решает все вопросы.

Сначала мне показалось, что Prompt Toolkit избыточен потому что для работы хватает и Click. Например, если нужен progressbar, есть всем известный tqdm. Великолепная библиотека, которая решает ровно одну задачу, но делает это хорошо. А еще есть и click.progressbar().

Prompt Toolkit же позволяет легко и просто создавать различные варианты ввода-вывода, используя стили, шапки и прочие штуки. Например, есть обновляемый виджет для progressbar. Вроде бы ничего особого, но у Prompt Toolkit это не один виджет для progressbar.

Из Prompt Toolkit можно собирать очень сложные вещи, используя layout, виджеты, компоновку. А если чего-то нет из коробки, это можно написать.

Благодаря слоям, в Python Prompt Toolkit можно легко отрисовать несколько progressbar-ов по одному на слой загружаемого образа таких же, как например, делает Docker pool:

ВСЁ ПРОПАЛО, ШЕФ! ИЛИ "ГДЕ МОЙ КУРСОР?"

Мелочь, которая в свое время попортила мне немало крови.

Распространенная тема: есть консольная программа, которая рисует чудесные виджеты, рассказывает, как Docker Image тянется на много потоков, даже не моргает и отрисовывает всё гладко. Но если ее внезапно закрыть, может, например, пропасть курсор потому что в последнем режиме курсор спрятали, а обратно не вернули. Бывает, что вы в терминале печатаете, а курсор не мигает на экране. Есть и более сложные способы испортить консоль, загнав ее в какой-нибудь режим, который не предназначен для интерактивного вывода.

Чтобы этого не было, основная программа при выходе должна напечатать магическую скрипт-последовательность на экран:

Это так называемый Soft Reset, который сбрасывает режимы. Например, тот же less умеет переключаться для полноэкранного скроллинга в альтернативный режим. Но если ваша программа пытается реализовать функциональность как old less, то без магического скрипта при выходе надо будет переключаться обратно вручную.

ASYNCIO + CLICK

Я не могу не рассказать про asyncio!

Но Click из коробки не работает с asyncio от слова совсем! Он это не умеет, он написан немного раньше, и они совсем не дружат. Поэтому самым простым решением будет написать AsyncioRunner(), который будет не функцией, а классом, а run можно вызывать несколько раз. Это бывает удобно, например, когда запускаем асинхронный код для проверки типов входных параметров и вдобавок что-то еще run запускаются друг за другом в одном и том же контексте:

И что важно AsyncioRunner() работает при этом как асинхронный контекстный менеджер, то есть по завершению работы чистит за собой.

Мы у себя используем простое правило: неблокирующий код (тот, который выполняется мгновенно) может быть синхронным, пока Click не читает файлы, не лезет в интернет или еще что-нибудь такое не делает. Но как только нам нужно запускать асинхронный код, мы пользуемся AsyncioRunner(). Легко создается какой-нибудь декоратор, который внутри async-команд сделает все, что нам надо:

ASYNCIO + PROMPT_TOOLKIT

А вот Asyncio + Prompt_Toolkit работают вместе великолепно даже из коробки. Prompt_Toolkit знает об asyncio, а Prompt_async это стандартная штука Prompt_Toolkit, которая и запускает основную программу. Детали читайте в документации:

WINDOWS не отпускает

Windows из коробки не умеет пользоваться escape-последовательностью у нее свой набор функций, чтобы поменять цвет, сделать жирным, стереть экран и т.д. Это дико неудобно.

Давно известный проект Colorama работает почти со всеми escape-последовательностями, подменяя собой stdout и stderr. Он парсит то, что печатается, находит там escape-последовательности и убирает их. Вместо этого вызываются разные Windows-функции для того, чтобы поменять тот же самый цвет букв или цвет фона. Но Colorama работает только с подмножеством ANSI-символов.

Полного набора escape-последовательностей, между нами говоря, не существует. Есть много разных терминалов. Начиная от древних и заканчивая актуальными (те же MAC- и Linux-терминалы), у которых хоть и есть некоторые разные escape-последовательности, но в целом они хорошо пересекаются.

Но, к счастью, сейчас наступила эпоха Windows 10. К счастью, потому что в ней можно перевести экран в режим, который обрабатывает escape-последовательности (по умолчанию он не включен). Этот режим позволяют включить две простые функции, вызвать их из Python при помощи ctypes это упражнение на пару минут:

АВТОЗАПОЛНЕНИЕ

В Click оно есть из коробки, но не для Windows. Так уж получилось. Может быть, в следующих версиях будет по-другому. Для Unix-мира оно есть, и это уже хорошо.

Здесь декоратор принимает click.argument от autocompletion то есть функция вызывается тогда, когда в процессе вывода мы нажимаем табуляцию, как обычный Bash, а еще лучше Zsh как это делают shell для большого количества команд.

ВАЛИДАЦИЯ

В Click, разумеется, есть стандартные типы чисел, дат и файлов. Но если хочется сделать нестандартный тип, например, URL, то мы можем написать класс для параметра (будем его указывать в аргументе или опции), и Click автоматически вызовет метод convert, давая возможность нашему коду проверить, преобразовать тип и т.д.:

Я показал основы, которые мы используем для работы, в том числе и у себя. Совершенству нет предела вы можете делать ваши консольные программы удобными и читабельными. Не бойтесь экспериментировать, но всегда проверяйте, как выглядит результат на разных операционных системах и в различных цветовых схемах.

Подробнее..

Перевод Введение в асинхронное программирование на Python

03.07.2020 14:20:11 | Автор: admin
Всем привет. Подготовили перевод интересной статьи в преддверии старта базового курса Разработчик Python.



Введение


Асинхронное программирование это вид параллельного программирования, в котором какая-либо единица работы может выполняться отдельно от основного потока выполнения приложения. Когда работа завершается, основной поток получает уведомление о завершении рабочего потока или произошедшей ошибке. У такого подхода есть множество преимуществ, таких как повышение производительности приложений и повышение скорости отклика.



В последние несколько лет асинхронное программирование привлекло к себе пристальное внимание, и на то есть причины. Несмотря на то, что этот вид программирования может быть сложнее традиционного последовательного выполнения, он гораздо более эффективен.

Например, вместо того, что ждать завершения HTTP-запроса перед продолжением выполнения, вы можете отправить запрос и выполнить другую работу, которая ждет своей очереди, с помощью асинхронных корутин в Python.

Асинхронность это одна из основных причин популярности выбора Node.js для реализации бэкенда. Большое количество кода, который мы пишем, особенно в приложениях с тяжелым вводом-выводом, таком как на веб-сайтах, зависит от внешних ресурсов. В нем может оказаться все, что угодно, от удаленного вызова базы данных до POST-запросов в REST-сервис. Как только вы отправите запрос в один из этих ресурсов, ваш код будет просто ожидать ответа. С асинхронным программированием вы позволяете своему коду обрабатывать другие задачи, пока ждете ответа от ресурсов.

Как Python умудряется делать несколько вещей одновременно?




1. Множественные процессы

Самый очевидный способ это использование нескольких процессов. Из терминала вы можете запустить свой скрипт два, три, четыре, десять раз, и все скрипты будут выполняться независимо и одновременно. Операционная система сама позаботится о распределении ресурсов процессора между всеми экземплярами. В качестве альтернативы вы можете воспользоваться библиотекой multiprocessing, которая умеет порождать несколько процессов, как показано в примере ниже.

from multiprocessing import Processdef print_func(continent='Asia'):    print('The name of continent is : ', continent)if __name__ == "__main__":  # confirms that the code is under main function    names = ['America', 'Europe', 'Africa']    procs = []    proc = Process(target=print_func)  # instantiating without any argument    procs.append(proc)    proc.start()    # instantiating process with arguments    for name in names:        # print(name)        proc = Process(target=print_func, args=(name,))        procs.append(proc)        proc.start()    # complete the processes    for proc in procs:        proc.join()


Вывод:

The name of continent is :  AsiaThe name of continent is :  AmericaThe name of continent is :  EuropeThe name of continent is :  Africa


2. Множественные потоки

Еще один способ запустить несколько работ параллельно это использовать потоки. Поток это очередь выполнения, которая очень похожа на процесс, однако в одном процессе вы можете иметь несколько потоков, и у всех них будет общий доступ к ресурсам. Однако из-за этого написать код потока будет сложно. Аналогично, все тяжелую работу по выделению памяти процессора сделает операционная система, но глобальная блокировка интерпретатора (GIL) позволит только одному потоку Python запускаться в одну единицу времени, даже если у вас есть многопоточный код. Так GIL на CPython предотвращает многоядерную конкурентность. То есть вы насильно можете запуститься только на одном ядре, даже если у вас их два, четыре или больше.

import threading def print_cube(num):    """    function to print cube of given num    """    print("Cube: {}".format(num * num * num)) def print_square(num):    """    function to print square of given num    """    print("Square: {}".format(num * num)) if __name__ == "__main__":    # creating thread    t1 = threading.Thread(target=print_square, args=(10,))    t2 = threading.Thread(target=print_cube, args=(10,))     # starting thread 1    t1.start()    # starting thread 2    t2.start()     # wait until thread 1 is completely executed    t1.join()    # wait until thread 2 is completely executed    t2.join()     # both threads completely executed    print("Done!")


Вывод:

Square: 100Cube: 1000Done!


3. Корутины и yield:

Корутины это обобщение подпрограмм. Они используются для кооперативной многозадачности, когда процесс добровольно отдает контроль (yield) с какой-то периодичностью или в периоды ожидания, чтобы позволить нескольким приложениям работать одновременно. Корутины похожи на генераторы, но с дополнительными методами и небольшими изменениями в том, как мы используем оператор yield. Генераторы производят данные для итерации, в то время как корутины могут еще и потреблять данные.

def print_name(prefix):    print("Searching prefix:{}".format(prefix))    try :         while True:                # yeild used to create coroutine                name = (yield)                if prefix in name:                    print(name)    except GeneratorExit:            print("Closing coroutine!!") corou = print_name("Dear")corou.__next__()corou.send("James")corou.send("Dear James")corou.close()


Вывод:

Searching prefix:DearDear JamesClosing coroutine!!


4. Асинхронное программирование

Четвертый способ это асинхронное программирование, в котором не участвует операционная система. Со стороны операционной системы у вас останется один процесс, в котором будет всего один поток, но вы все еще сможете выполнять одновременно несколько задач. Так в чем тут фокус?

Ответ: asyncio

Asyncio модуль асинхронного программирования, который был представлен в Python 3.4. Он предназначен для использования корутин и future для упрощения написания асинхронного кода и делает его почти таким же читаемым, как синхронный код, из-за отсутствия callback-ов.

Asyncio использует разные конструкции: event loop, корутины и future.

  • event loop управляет и распределяет выполнение различных задач. Он регистрирует их и обрабатывает распределение потока управления между ними.
  • Корутины (о которых мы говорили выше) это специальные функции, работа которых схожа с работой генераторов в Python, с помощью await они возвращают поток управления обратно в event loop. Запуск корутины должен быть запланирован в event loop. Запланированные корутины будут обернуты в Tasks, что является типом Future.
  • Future отражает результат таска, который может или не может быть выполнен. Результатом может быть exception.


С помощью asyncio вы можете структурировать свой код так, чтобы подзадачи определялись как корутины и позволяли планировать их запуск так, как вам заблагорассудится, в том числе и одновременно. Корутины содержат точки yield, в которых мы определяем возможные точки переключения контекста. В случае, если в очереди ожидания есть задачи, то контекст будет переключен, в противном случае нет.

Переключение контекста в asyncio представляет собой event loop, который передает поток управления от одной корутины к другой.

В следующем примере, мы запускаем 3 асинхронных таска, которые по-отдельности делают запросы к Reddit, извлекают и выводят содержимое JSON. Мы используем aiohttp клиентскую библиотеку http, которая гарантирует, что даже HTTP-запрос будет выполнен асинхронно.

import signal  import sys  import asyncio  import aiohttp  import jsonloop = asyncio.get_event_loop()  client = aiohttp.ClientSession(loop=loop)async def get_json(client, url):      async with client.get(url) as response:        assert response.status == 200        return await response.read()async def get_reddit_top(subreddit, client):      data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=5')    j = json.loads(data1.decode('utf-8'))    for i in j['data']['children']:        score = i['data']['score']        title = i['data']['title']        link = i['data']['url']        print(str(score) + ': ' + title + ' (' + link + ')')    print('DONE:', subreddit + '\n')def signal_handler(signal, frame):      loop.stop()    client.close()    sys.exit(0)signal.signal(signal.SIGINT, signal_handler)asyncio.ensure_future(get_reddit_top('python', client))  asyncio.ensure_future(get_reddit_top('programming', client))  asyncio.ensure_future(get_reddit_top('compsci', client))  loop.run_forever()


Вывод:

50: Undershoot: Parsing theory in 1965 (http://personeltest.ru/away/jeffreykegler.github.io/Ocean-of-Awareness-blog/individual/2018/07/knuth_1965_2.html)12: Question about best-prefix/failure function/primal match table in kmp algorithm (http://personeltest.ru/aways/www.reddit.com/r/compsci/comments/8xd3m2/question_about_bestprefixfailure_functionprimal/)1: Question regarding calculating the probability of failure of a RAID system (http://personeltest.ru/aways/www.reddit.com/r/compsci/comments/8xbkk2/question_regarding_calculating_the_probability_of/)DONE: compsci336: /r/thanosdidnothingwrong -- banning people with python (http://personeltest.ru/aways/clips.twitch.tv/AstutePluckyCocoaLitty)175: PythonRobotics: Python sample codes for robotics algorithms (http://personeltest.ru/aways/atsushisakai.github.io/PythonRobotics/)23: Python and Flask Tutorial in VS Code (http://personeltest.ru/aways/code.visualstudio.com/docs/python/tutorial-flask)17: Started a new blog on Celery - what would you like to read about? (http://personeltest.ru/aways/www.python-celery.com)14: A Simple Anomaly Detection Algorithm in Python (http://personeltest.ru/aways/medium.com/@mathmare_/pyng-a-simple-anomaly-detection-algorithm-2f355d7dc054)DONE: python1360: git bundle (http://personeltest.ru/aways/dev.to/gabeguz/git-bundle-2l5o)1191: Which hashing algorithm is best for uniqueness and speed? Ian Boyd's answer (top voted) is one of the best comments I've seen on Stackexchange. (http://personeltest.ru/aways/softwareengineering.stackexchange.com/questions/49550/which-hashing-algorithm-is-best-for-uniqueness-and-speed)430: ARM launches Facts campaign against RISC-V (http://personeltest.ru/aways/riscv-basics.com/)244: Choice of search engine on Android nuked by Anonymous Coward (2009) (http://personeltest.ru/aways/android.googlesource.com/platform/packages/apps/GlobalSearch/+/592150ac00086400415afe936d96f04d3be3ba0c)209: Exploiting freely accessible WhatsApp data or Why does WhatsApp web know my phones battery level? (http://personeltest.ru/aways/medium.com/@juan_cortes/exploiting-freely-accessible-whatsapp-data-or-why-does-whatsapp-know-my-battery-level-ddac224041b4)DONE: programming


Использование Redis и Redis Queue RQ


Использование asyncio и aiohttp не всегда хорошая идея, особенно если вы пользуетесь более старыми версиями Python. К тому же, бывают моменты, когда вам нужно распределить задачи по разным серверам. В этом случае можно использовать RQ (Redis Queue). Это обычная библиотека Python для добавления работ в очередь и обработки их воркерами в фоновом режиме. Для организации очереди используется Redis база данных ключей/значений.

В примере ниже мы добавили в очередь простую функцию count_words_at_url с помощью Redis.

from mymodule import count_words_at_urlfrom redis import Redisfrom rq import Queueq = Queue(connection=Redis())job = q.enqueue(count_words_at_url, 'http://nvie.com')******mymodule.py******import requestsdef count_words_at_url(url):    """Just an example function that's called async."""    resp = requests.get(url)    print( len(resp.text.split()))    return( len(resp.text.split()))


Вывод:

15:10:45 RQ worker 'rq:worker:EMPID18030.9865' started, version 0.11.015:10:45 *** Listening on default...15:10:45 Cleaning registries for queue: default15:10:50 default: mymodule.count_words_at_url('http://nvie.com') (a2b7451e-731f-4f31-9232-2b7e3549051f)32215:10:51 default: Job OK (a2b7451e-731f-4f31-9232-2b7e3549051f)15:10:51 Result is kept for 500 seconds


Заключение


В качестве примера возьмем шахматную выставку, где один из лучших шахматистов соревнуется с большим количеством людей. У нас есть 24 игры и 24 человека, с которыми можно сыграть, и, если шахматист будет играть с ними синхронно, это займет не менее 12 часов (при условии, что средняя игра занимает 30 ходов, шахматист продумывает ход в течение 5 секунд, а противник примерно 55 секунд.) Однако в асинхронном режиме шахматист сможет делать ход и оставлять противнику время на раздумья, тем временем переходя к следующему противнику и деля ход. Таким образом, сделать ход во всех 24 играх можно за 2 минуты, и выиграны они все могут быть всего за один час.

Это и подразумевается, когда говорят о том, что асинхронность ускоряет работу. О такой быстроте идет речь. Хороший шахматист не начинает играть в шахматы быстрее, просто время более оптимизировано, и оно не тратится впустую на ожидание. Так это работает.

По этой аналогии шахматист будет процессором, а основная идея будет заключаться в том, чтобы процессор простаивал как можно меньше времени. Речь о том, чтобы у него всегда было занятие.

На практике асинхронность определяется как стиль параллельного программирования, в котором одни задачи освобождают процессор в периоды ожидания, чтобы другие задачи могли им воспользоваться. В Python есть несколько способов достижения параллелизма, отвечающих вашим требованиям, потоку кода, обработке данных, архитектуре и вариантам использования, и вы можете выбрать любой из них.



Узнать о курсе подробнее.


Подробнее..

Какая асинхронность должна была бы быть в Python

29.07.2020 12:21:28 | Автор: admin
В последние несколько лет ключевое слово async и семантика асинхронного программирования проникла во многие популярные языки программирования: JavaScript, Rust, C#, и многие другие. Конечно, в Python тоже есть async/await, они появились в Python 3.5.

В этой статье хочу обсудить проблемы асинхронного кода, порассуждать об альтернативах и предложить новый подход поддерживать и синхронные, и асинхронные приложения одновременно.

Цвет функций


Когда в язык программирования включают асинхронные функции, он по сути раскалывается надвое. Появляются красные функции (или асинхронные), а некоторые функции остаются синими (синхронными).

Основная проблема в том, что синие функции не могут вызывать красные, но красные потенциально могут вызвать синие. В Python, например, это частично так: асинхронные функции могут вызывать только синхронные неблокирующие функции. Но определить по описанию, блокирующая функция или нет, невозможно. Python же скриптовый язык.

Этот раскол приводит к разделению языка на два подмножества: синхронное и асинхронное. Python 3.5 вышел больше пяти лет назад, но async все еще поддерживается далеко не так хорошо, как синхронные возможности Python.

Больше о цветах функции можно прочитать в этой замечательной статье.

Дублирование кода


Разные цвета функций на практике означают дублирование кода.

Представьте, вы разрабатываете CLI-инструмент для извлечения размера веб-страницы и хотите поддерживать и синхронный, и асинхронный способы его работы. Например, это нужно, если вы пишете библиотеку и не знаете, как будет использоваться ваш код. И речь не только о библиотеках PyPI, но и о собственных библиотеках с общей логикой для разных сервисов, написанных, например, на Django и aiohttp. Хотя, конечно, независимые приложения в основном пишутся или только синхронно, или только асинхронно.

Начнём с синхронного псевдокода:

def fetch_resource_size(url: str) -> int:    response = client_get(url)    return len(response.content)

Выглядит хорошо. Теперь посмотрим на асинхронный аналог:

async def fetch_resource_size(url: str) -> int:    response = await client_get(url)    return len(response.content)

В целом, это тот же самый код, но с добавлением слов async и await. И я это не выдумал сравните примеры кода в туториале по httpx:


Там точно такая же картина.

Абстракция и композиция


Получается, нужно переписать весь синхронный код и расставить тут и там async и await, чтобы программа стала асинхронной.

В решении этой проблемы могут помочь два принципа. Во-первых, перепишем императивный псевдокод в функциональный. Это позволит увидеть картину более ясно.

def fetch_resource_size(url: str) -> Abstraction[int]:    return client_get(url).map(        lambda response: len(response.content),    )

Вы спросите, что это за метод .map, что он делает. Так в функциональном стиле происходит композиция сложных абстракций и чистых функций. Это позволяет создать новую абстракцию с новым состоянием из существующей. Предположим, client_get(url) изначально возвращает Abstraction[Response], а вызов .map(lambda response: len(response.content)) преобразует ответ в требуемый экземпляр Abstraction[int].

Становится понятно, что делать дальше. Обратите внимание, как легко мы перешли от нескольких независимых шагов к последовательному вызову функций. К тому же мы изменили тип ответа: теперь функция возвращает некоторую абстракцию.

Перепишем код для работы с асинхронной версией:

def fetch_resource_size(url: str) -> AsyncAbstraction[int]:    return client_get(url).map(        lambda response: len(response.content),    )

Единственное, что отличается, это тип возвращаемого значения AsyncAbstraction. В остальном код остался точно таким же. Больше не нужно использовать ключевые слова async и await. await не используется вообще (ради этого всё и затевалось), а без него нет смысла и в async.

Последнее, что требуется, это решить, какой клиент нам нужен: асинхронный или синхронный.

def fetch_resource_size(    client_get: Callable[[str], AbstactionType[Response]],    url: str,) -> AbstactionType[int]:    return client_get(url).map(        lambda response: len(response.content),    )

client_get теперь является аргументом вызываемого типа, который получает на вход строку URL-адреса и возвращает некоторый тип AbstractionType над объектом Response. AbstractionType либо Abstraction, либо AsyncAbstraction из предыдущих примеров.

Когда передаем Abstraction, код работает синхронно, когда AsyncAbstraction тот же самый код автоматически начинает работать асинхронно.

IOResult и FutureResult


К счастью, в dry-python/returns уже есть правильные абстракции.

Позвольте представить вам типобезопасный, дружелюбный к mypy, не зависящий от фреймворка, полностью написанный на Python инструмент. В нём есть потрясающие, удобные, замечательные абстракции, которые можно использовать абсолютно в любом проекте.

Синхронный вариант


Сначала поставим зависимости, чтобы получить воспроизводимый пример.

pip install returns httpx anyio

Далее превратим псевдокод в рабочий код на Python. Начнем с синхронного варианта.

from typing import Callable import httpx from returns.io import IOResultE, impure_safe def fetch_resource_size(    client_get: Callable[[str], IOResultE[httpx.Response]],    url: str,) -> IOResultE[int]:    return client_get(url).map(        lambda response: len(response.content),    ) print(fetch_resource_size(    impure_safe(httpx.get),    'https://sobolevn.me',))# => <IOResult: <Success: 27972>>

Потребовалось изменить пару моментов, чтобы получился рабочий код:

  • Использовать IOResultE функциональный способ обработки ошибок синхронного IO (исключения не всегда подходят). Типы, основанные на Result, позволяют имитировать исключения, но с раздельными значениями Failure(). Успешные выходы при этом оборачиваются в тип Success. Обычно никому нет дела до исключений, а нам есть.
  • Использовать httpx, который может работать с синхронными и асинхронными запросами.
  • Использовать функцию impure_safe, чтобы преобразовывать тип, который возвращает httpx.get, в абстракцию IOResultE.

Асинхронный вариант


Попробуем сделать всё то же самое в асинхронном коде.

from typing import Callable import anyioimport httpx from returns.future import FutureResultE, future_safe def fetch_resource_size(    client_get: Callable[[str], FutureResultE[httpx.Response]],    url: str,) -> FutureResultE[int]:    return client_get(url).map(        lambda response: len(response.content),    ) page_size = fetch_resource_size(    future_safe(httpx.AsyncClient().get),    'https://sobolevn.me',)print(page_size)print(anyio.run(page_size.awaitable))# => <FutureResult: <coroutine object async_map at 0x10b17c320>># => <IOResult: <Success: 27972>>

Видите: результат точно такой же, но теперь код работает асинхронно. При этом его основная часть не изменилась. Однако нужно обратить внимание вот на что:

  • Синхронный IOResultE изменился на асинхронный FutureResultE, impure_safe на future_safe. Работает так же, но возвращает другую абстракцию: FutureResultE.
  • Используется AsyncClient из httpx.
  • Результирующее значение FutureResult необходимо запустить, потому что красные функции не могут вызывать сами себя.
  • Утилита anyio используется, чтобы показать, что этот подход работает с любой асинхронной библиотекой: asyncio, trio, curio.

Два в одном


Покажу, как объединить синхронную и асинхронную версию в одном типобезопасном API.

Higher Kinded Types и type-class для работы с IO ещё не вышли в релиз (они появятся в 0.15.0), поэтому проиллюстрирую на обычном @overload:

from typing import Callable, Union, overload import anyioimport httpx from returns.future import FutureResultE, future_safefrom returns.io import IOResultE, impure_safe @overloaddef fetch_resource_size(    client_get: Callable[[str], IOResultE[httpx.Response]],    url: str,) -> IOResultE[int]:    """Sync case.""" @overloaddef fetch_resource_size(    client_get: Callable[[str], FutureResultE[httpx.Response]],    url: str,) -> FutureResultE[int]:    """Async case.""" def fetch_resource_size(    client_get: Union[        Callable[[str], IOResultE[httpx.Response]],        Callable[[str], FutureResultE[httpx.Response]],    ],    url: str,) -> Union[IOResultE[int], FutureResultE[int]]:    return client_get(url).map(        lambda response: len(response.content),    )

С помощью декораторов @overload описываем, какие входные данные разрешены и какой при этом будет тип возвращаемого значения. Прочитать подробнее о декораторе @overload можно в другой моей статье.

Вызов функции с синхронным или асинхронным клиентом выглядит так:

# Sync:print(fetch_resource_size(    impure_safe(httpx.get),    'https://sobolevn.me',))# => <IOResult: <Success: 27972>> # Async:page_size = fetch_resource_size(    future_safe(httpx.AsyncClient().get),    'https://sobolevn.me',)print(page_size)print(anyio.run(page_size.awaitable))# => <FutureResult: <coroutine object async_map at 0x10b17c320>># => <IOResult: <Success: 27972>>

Как видите, fetch_resource_size в синхронном варианте сразу возвращает IOResult и выполняет его. В то время как в асинхронном варианте требуется event-loop, как для обычной корутины. anyio используется для вывода результатов.

У mypy к этому коду никаких замечаний нет:

 mypy async_and_sync.pySuccess: no issues found in 1 source file

Посмотрим, что будет, если что-нибудь испортить.

---lambda response: len(response.content),+++lambda response: response.content,

mypy легко находит новые ошибки:

 mypy async_and_sync.pyasync_and_sync.py:33: error: Argument 1 to "map" of "IOResult" has incompatible type "Callable[[Response], bytes]"; expected "Callable[[Response], int]"async_and_sync.py:33: error: Argument 1 to "map" of "FutureResult" has incompatible type "Callable[[Response], bytes]"; expected "Callable[[Response], int]"async_and_sync.py:33: error: Incompatible return value type (got "bytes", expected "int")

Ловкость рук и никакой магии: чтобы написать асинхронный код с правильными абстракциями, нужна только старая добрая композиция. А вот то, что у нас получается один и тот же API для разных типов, по-настоящему здорово. Например, это позволяет абстрагироваться от того, как работают HTTP-запросы: синхронно или асинхронно.

Надеюсь, этот пример наглядно доказал, какими на самом деле классными могут быть асинхронные программы. А если попробуете dry-python/returns, то найдете еще много интересного. В новой версии мы уже сделали необходимые примитивы для работы с Higher Kinded Types и все необходимые интерфейсы. Код выше теперь можно переписать так:

from typing import Callable, TypeVarimport anyioimport httpxfrom returns.future import future_safefrom returns.interfaces.specific.ioresult import IOResultLike2from returns.io import impure_safefrom returns.primitives.hkt import Kind2, kinded_IOKind = TypeVar('_IOKind', bound=IOResultLike2)@kindeddef fetch_resource_size(    client_get: Callable[[str], Kind2[_IOKind, httpx.Response, Exception]],    url: str,) -> Kind2[_IOKind, int, Exception]:    return client_get(url).map(        lambda response: len(response.content),    )# Sync:print(fetch_resource_size(    impure_safe(httpx.get),    'https://sobolevn.me',))# => <IOResult: <Success: 27972>># Async:page_size = fetch_resource_size(    future_safe(httpx.AsyncClient().get),    'https://sobolevn.me',)print(page_size)print(anyio.run(page_size.awaitable))# => <FutureResult: <coroutine object async_map at 0x10b17c320>># => <IOResult: <Success: 27972>>

Смотрите ветку `master`, там это уже работает.

Больше возможностей dry-python


Расскажу о нескольких других полезных фичах dry-python, которыми я больше всего горжусь.


from returns.curry import curry, partial def example(a: int, b: str) -> float:    ... reveal_type(partial(example, 1))# note: Revealed type is 'def (b: builtins.str) -> builtins.float' reveal_type(curry(example))# note: Revealed type is 'Overload(def (a: builtins.int) -> def (b: builtins.str) -> builtins.float, def (a: builtins.int, b: builtins.str) -> builtins.float)'

Это позволяет использовать @curry, например, вот так:

@currydef example(a: int, b: str) -> float:    return float(a + len(b)) assert example(1, 'abc') == 4.0assert example(1)('abc') == 4.0


За счёт кастомного mypy-плагина можно строить функциональные пайплайны, возвращающие типы.

from returns.pipeline import flowassert flow(    [1, 2, 3],    lambda collection: max(collection),    lambda max_number: -max_number,) == -3

Обычно в типизированном коде очень неудобно работать с лямбдами, из-за того что их аргументы всегда типа Any. Вывод mypy решает эту проблему.

С его помощью нам теперь известно, что lambda collection: max(collection) типа Callable[[List[int]], int], а lambda max_number: -max_number просто Callable[[int], int]. Во flow можно передать любое количество аргументов, и все они будут отлично работать. Всё благодаря плагину.


Абстракцию над FutureResult, о которой мы говорили ранее, можно использовать для того, чтобы явно передать зависимости в асинхронные программы в функциональном стиле.

Планы на будущее


Прежде чем наконец-то выпустить версию 1.0, нам предстоит решить несколько важных задач:

  • Реализовать Higher Kinded Types или их эмуляцию (issue).
  • Добавить надлежащие type-классы, чтобы реализовать необходимые абстракции (issue).
  • Возможно, попробовать компилятор mypyc, что потенциально позволит компилировать типизированные аннотированные Python-программы в двоичный файл. Тогда код с dry-python/returns будет работать в несколько раз быстрее (issue).
  • Исследовать новые способы написания функционального кода на Python, например, такие как do-notation.

Выводы


С помощью композиции и абстракции можно решить любую проблему. В этой статье мы рассмотрели, как решить проблему цветов функций и писать простой, читаемый и гибкий код, который работает. И сделать проверку типов.

Пробуйте dry-python/returns и подключайтесь к Russian Python Week: на конференции core-разработчик dry-python Pablo Aguilar проведет воркшоп по использованию dry-python для написания бизнес-логики.
Подробнее..

Пишем веб сервис на Python с помощью FastAPI

04.08.2020 08:14:31 | Автор: admin
image
Знаю, знаю, наверное вы сейчас думаете что опять?!.
Да, на хабре уже неоднократно писали о фреймворке FastAPI. Но я предлагаю рассмотреть этот инструмент немного подробнее и написать API своего собственного мини Хабра без кармы и рейтингов, зато с блэкджеком и с тестами, аутентификацией, миграциями и асинхронной работой с БД.

Схема базы данных и миграции


Прежде всего, с помощью SQLAlchemy Expression Language, опишем схему базы данных. Создадим файл models/users.py:
import sqlalchemyfrom sqlalchemy.dialects.postgresql import UUIDmetadata = sqlalchemy.MetaData()users_table = sqlalchemy.Table(    "users",    metadata,    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),    sqlalchemy.Column("email", sqlalchemy.String(40), unique=True, index=True),    sqlalchemy.Column("name", sqlalchemy.String(100)),    sqlalchemy.Column("hashed_password", sqlalchemy.String()),    sqlalchemy.Column(        "is_active",        sqlalchemy.Boolean(),        server_default=sqlalchemy.sql.expression.true(),        nullable=False,    ),)tokens_table = sqlalchemy.Table(    "tokens",    metadata,    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),    sqlalchemy.Column(        "token",        UUID(as_uuid=False),        server_default=sqlalchemy.text("uuid_generate_v4()"),        unique=True,        nullable=False,        index=True,    ),    sqlalchemy.Column("expires", sqlalchemy.DateTime()),    sqlalchemy.Column("user_id", sqlalchemy.ForeignKey("users.id")),)

И файл models/posts.py:
import sqlalchemyfrom .users import users_tablemetadata = sqlalchemy.MetaData()posts_table = sqlalchemy.Table(    "posts",    metadata,    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),    sqlalchemy.Column("user_id", sqlalchemy.ForeignKey(users_table.c.id)),    sqlalchemy.Column("created_at", sqlalchemy.DateTime()),    sqlalchemy.Column("title", sqlalchemy.String(100)),    sqlalchemy.Column("content", sqlalchemy.Text()),)

Чтобы автоматизировать миграции базы данных, установим alembic:
$ pip install alembic

Для инициализации Alembic выполним:
$ alembic init migrations

Эта команда создаст в текущей директории файл alembic.ini и каталог migrations содержащий
  • каталог versions, в котором будут хранится файлы миграций
  • скрипт env.py, запускающийся при вызове alembic
  • файл script.py.mako, содержащий шаблон для новых миграций.


Укажем url нашей базы данных, для этого в файле alembic.ini добавим строчку:
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s:5432/%(DB_NAME)s

Формат %(variable_name)s позволяет нам устанавливать разные значения переменных в зависимости от среды окружения, переопределяя их в файле env.py например вот так:

from os import environfrom alembic import contextfrom app.models import posts, users# Alembic Config объект предоставляет доступ# к переменным из файла alembic.iniconfig = context.configsection = config.config_ini_sectionconfig.set_section_option(section, "DB_USER", environ.get("DB_USER"))config.set_section_option(section, "DB_PASS", environ.get("DB_PASS"))config.set_section_option(section, "DB_NAME", environ.get("DB_NAME"))config.set_section_option(section, "DB_HOST", environ.get("DB_HOST"))fileConfig(config.config_file_name)target_metadata = [users.metadata, posts.metadata]

Здесь мы берем значения DB_USER, DB_PASS, DB_NAME и DB_HOST из переменных окружения. Кроме этого, в файле env.py указываются метаданные нашей базы в атрибуте target_metadata, без этого Alembic не сможет определить какие изменения необходимо произвести в базе данных.

Все готово и мы можем сгенерировать миграции и обновить БД:
$ alembic revision --autogenerate -m "Added required tables"$ alembic upgrade head


Запускаем приложение и подключаем БД


Создадим файл main.py:
from fastapi import FastAPIapp = FastAPI()@app.get("/")def read_root():    return {"Hello": "World"}

И запустим приложение, выполнив команду
$ uvicorn main:app --reload

Убедимся, что все работает как надо. Открываем в браузере http://127.0.0.1:8000/ и видим
{"Hello": "World"}

Чтобы подключиться к базе данных, воспользуемся модулем databases, который позволяет выполнять запросы асинхронно.
Настроим startup и shutdhown события нашего сервиса, при которых будут происходить подключение и отключение от базы данных. Отредактируем файл main.py:
from os import environimport databases# берем параметры БД из переменных окруженияDB_USER = environ.get("DB_USER", "user")DB_PASSWORD = environ.get("DB_PASSWORD", "password")DB_HOST = environ.get("DB_HOST", "localhost")DB_NAME = "async-blogs"SQLALCHEMY_DATABASE_URL = (    f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}")# создаем объект database, который будет использоваться для выполнения запросовdatabase = databases.Database(SQLALCHEMY_DATABASE_URL)app = FastAPI()@app.on_event("startup")async def startup():    # когда приложение запускается устанавливаем соединение с БД    await database.connect()@app.on_event("shutdown")async def shutdown():    # когда приложение останавливается разрываем соединение с БД    await database.disconnect()@app.get("/")def read_root():    # изменим роут таким образом, чтобы он брал данные из БД    query = (        select(            [                posts_table.c.id,                posts_table.c.created_at,                posts_table.c.title,                posts_table.c.content,                posts_table.c.user_id,                users_table.c.name.label("user_name"),            ]        )        .select_from(posts_table.join(users_table))        .order_by(desc(posts_table.c.created_at))    )    return await database.fetch_all(query)

Открываем http://127.0.0.1:8000/ и если видим в ответе пустой список [], значит все прошло хорошо и можно двигаться дальше.

Валидация запроса и ответа


Реализуем возможность регистрации пользователей. Для этого нам понадобиться валидировать HTTP запросы и ответы. Для решения этой задачи воспользуемся библиотекой pydantic:
pip install pydantic

Создадим файл schemas/users.py и добавим модель, отвечающую за валидацию тела запроса:
from pydantic import BaseModel, EmailStrclass UserCreate(BaseModel):    """ Проверяет sign-up запрос """    email: EmailStr    name: str    password: str

Обратите внимание, что типы полей определяются с помощью аннотации типов. Помимо встроенных типов данных, таких как int и str, pydantic предлагает большое количество типов, обеспечивающих дополнительную проверку. Например, тип EmailStr проверяет, что полученное значение корректный email. Для использования типа EmailStr необходимо установить модуль email-validator:
pip install email-validator

Тело ответа должно содержать свои собственные специфические поля, например id и access_token, поэтому добавим в файл schemas/users.py модели, отвечающие за формирование ответа:
from typing import Optionalfrom pydantic import UUID4, BaseModel, EmailStr, Field, validatorclass UserCreate(BaseModel):    """ Проверяет sign-up запрос """    email: EmailStr    name: str    password: strclass UserBase(BaseModel):    """ Формирует тело ответа с деталями пользователя """    id: int    email: EmailStr    name: strclass TokenBase(BaseModel):    token: UUID4 = Field(..., alias="access_token")    expires: datetime    token_type: Optional[str] = "bearer"    class Config:        allow_population_by_field_name = True    @validator("token")    def hexlify_token(cls, value):        """ Конвертирует UUID в hex строку """        return value.hexclass User(UserBase):    """ Формирует тело ответа с деталями пользователя и токеном """    token: TokenBase = {}

Для каждого поля модели можно написать кастомный валидатор. Например, hexlify_token преобразует UUID значение в hex строку. Стоит отметить, что вы можете использовать класс Field, когда нужно переопределить стандартное поведение поля модели. Например, token: UUID4 = Field(..., alias=access_token) устанавливает псевдоним access_token для поля token. Для обозначения, что поле обязательно, в качестве первого параметра передается специальное значение ... (ellipsis).

Добавим файл utils/users.py, в котором создадим методы, необходимые для записи пользователя в БД:
import hashlibimport randomimport stringfrom datetime import datetime, timedeltafrom sqlalchemy import and_from app.models.database import databasefrom app.models.users import tokens_table, users_tablefrom app.schemas import users as user_schemadef get_random_string(length=12):    """ Генерирует случайную строку, использующуюся как соль """    return "".join(random.choice(string.ascii_letters) for _ in range(length))def hash_password(password: str, salt: str = None):    """ Хеширует пароль с солью """    if salt is None:        salt = get_random_string()    enc = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)    return enc.hex()def validate_password(password: str, hashed_password: str):    """ Проверяет, что хеш пароля совпадает с хешем из БД """    salt, hashed = hashed_password.split("$")    return hash_password(password, salt) == hashedasync def get_user_by_email(email: str):    """ Возвращает информацию о пользователе """    query = users_table.select().where(users_table.c.email == email)    return await database.fetch_one(query)async def get_user_by_token(token: str):    """ Возвращает информацию о владельце указанного токена """    query = tokens_table.join(users_table).select().where(        and_(            tokens_table.c.token == token,            tokens_table.c.expires > datetime.now()        )    )    return await database.fetch_one(query)async def create_user_token(user_id: int):    """ Создает токен для пользователя с указанным user_id """    query = (        tokens_table.insert()        .values(expires=datetime.now() + timedelta(weeks=2), user_id=user_id)        .returning(tokens_table.c.token, tokens_table.c.expires)    )    return await database.fetch_one(query)async def create_user(user: user_schema.UserCreate):    """ Создает нового пользователя в БД """    salt = get_random_string()    hashed_password = hash_password(user.password, salt)    query = users_table.insert().values(        email=user.email, name=user.name, hashed_password=f"{salt}${hashed_password}"    )    user_id = await database.execute(query)    token = await create_user_token(user_id)    token_dict = {"token": token["token"], "expires": token["expires"]}    return {**user.dict(), "id": user_id, "is_active": True, "token": token_dict}


Создадим файл routers/users.py и добавим sign-up роут, указав, что в запросе он ожидает модель CreateUser и возвращает модель User:
from fastapi import APIRouterfrom app.schemas import usersfrom app.utils import users as users_utilsrouter = APIRouter()@router.post("/sign-up", response_model=users.User)async def create_user(user: users.UserCreate):    db_user = await users_utils.get_user_by_email(email=user.email)    if db_user:        raise HTTPException(status_code=400, detail="Email already registered")    return await users_utils.create_user(user=user)


Осталось только подключить роуты из файла routers/users.py. Для этого добавим в main.py следующие строки:
from app.routers import usersapp.include_router(users.router)


Аутентификация и контроль доступа


Теперь, когда в нашей базе данных есть пользователи, все готово для того чтобы настроить аутентификацию приложения. Добавим эндпоинт, который принимает имя пользователя и пароль и возвращает токен. Обновим файл routers/users.py, добавив в него:
from fastapi import Dependsfrom fastapi.security import OAuth2PasswordRequestForm@router.post("/auth", response_model=users.TokenBase)async def auth(form_data: OAuth2PasswordRequestForm = Depends()):    user = await users_utils.get_user_by_email(email=form_data.username)    if not user:        raise HTTPException(status_code=400, detail="Incorrect email or password")    if not users_utils.validate_password(        password=form_data.password, hashed_password=user["hashed_password"]    ):        raise HTTPException(status_code=400, detail="Incorrect email or password")    return await users_utils.create_user_token(user_id=user["id"])

При этом, нам не нужно самостоятельно описывать модель запроса, Fastapi предоставляет специальный dependency класс OAuth2PasswordRequestForm, который заставляет роут ожидать два поля username и password.

Чтобы ограничить доступ к определенным роутам для неаутентифицированных пользователей, напишем метод-зависимость(dependency). Он проверит, что предоставленный токен принадлежит активному пользователю и вернет данные пользователя. Это позволит нам использовать информацию о пользователе во всех роутах, требующих аутентификации. Создадим файл utils/dependecies.py:
from app.utils import users as users_utilsfrom fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordBeareroauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth")async def get_current_user(token: str = Depends(oauth2_scheme)):    user = await users_utils.get_user_by_token(token)    if not user:        raise HTTPException(            status_code=status.HTTP_401_UNAUTHORIZED,            detail="Invalid authentication credentials",            headers={"WWW-Authenticate": "Bearer"},        )    if not user["is_active"]:        raise HTTPException(            status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"        )    return user


Обратите внимание, что зависимость может в свою очередь зависеть от другой зависимости. К пример OAuth2PasswordBearer зависимость, которая дает понять FastAPI, что текущий роут требует аутентификации.

Чтобы проверить, что все работает как надо, добавим роут /users/me, возвращающий детали текущего пользователя. В файл routers/users.py добавим строки:
from app.utils.dependencies import get_current_user@router.get("/users/me", response_model=users.UserBase)async def read_users_me(current_user: users.User = Depends(get_current_user)):    return current_user

Теперь у нас есть роут /users/me к которому имеют доступ только аутентифицированные пользователи.

Все готово для того, чтобы наконец добавить возможность пользователям создавать и редактировать публикации:
utils/posts.py
from datetime import datetimefrom app.models.database import databasefrom app.models.posts import posts_tablefrom app.models.users import users_tablefrom app.schemas import posts as post_schemafrom sqlalchemy import desc, func, selectasync def create_post(post: post_schema.PostModel, user):    query = (        posts_table.insert()        .values(            title=post.title,            content=post.content,            created_at=datetime.now(),            user_id=user["id"],        )        .returning(            posts_table.c.id,            posts_table.c.title,            posts_table.c.content,            posts_table.c.created_at,        )    )    post = await database.fetch_one(query)    # Convert to dict and add user_name key to it    post = dict(zip(post, post.values()))    post["user_name"] = user["name"]    return postasync def get_post(post_id: int):    query = (        select(            [                posts_table.c.id,                posts_table.c.created_at,                posts_table.c.title,                posts_table.c.content,                posts_table.c.user_id,                users_table.c.name.label("user_name"),            ]        )        .select_from(posts_table.join(users_table))        .where(posts_table.c.id == post_id)    )    return await database.fetch_one(query)async def get_posts(page: int):    max_per_page = 10    offset1 = (page - 1) * max_per_page    query = (        select(            [                posts_table.c.id,                posts_table.c.created_at,                posts_table.c.title,                posts_table.c.content,                posts_table.c.user_id,                users_table.c.name.label("user_name"),            ]        )        .select_from(posts_table.join(users_table))        .order_by(desc(posts_table.c.created_at))        .limit(max_per_page)        .offset(offset1)    )    return await database.fetch_all(query)async def get_posts_count():    query = select([func.count()]).select_from(posts_table)    return await database.fetch_val(query)async def update_post(post_id: int, post: post_schema.PostModel):    query = (        posts_table.update()        .where(posts_table.c.id == post_id)        .values(title=post.title, content=post.content)    )    return await database.execute(query)


routers/posts.py
from app.schemas.posts import PostDetailsModel, PostModelfrom app.schemas.users import Userfrom app.utils import posts as post_utilsfrom app.utils.dependencies import get_current_userfrom fastapi import APIRouter, Depends, HTTPException, statusrouter = APIRouter()@router.post("/posts", response_model=PostDetailsModel, status_code=201)async def create_post(post: PostModel, current_user: User = Depends(get_current_user)):    post = await post_utils.create_post(post, current_user)    return post@router.get("/posts")async def get_posts(page: int = 1):    total_cout = await post_utils.get_posts_count()    posts = await post_utils.get_posts(page)    return {"total_count": total_cout, "results": posts}@router.get("/posts/{post_id}", response_model=PostDetailsModel)async def get_post(post_id: int):    return await post_utils.get_post(post_id)@router.put("/posts/{post_id}", response_model=PostDetailsModel)async def update_post(    post_id: int, post_data: PostModel, current_user=Depends(get_current_user)):    post = await post_utils.get_post(post_id)    if post["user_id"] != current_user["id"]:        raise HTTPException(            status_code=status.HTTP_403_FORBIDDEN,            detail="You don't have access to modify this post",        )    await post_utils.update_post(post_id=post_id, post=post_data)    return await post_utils.get_post(post_id)


Подключим новые роуты, добавив в main.py
from app.routers import postsapp.include_router(posts.router)


Тестирование


Тесты мы будем писать на pytest:
$ pip install pytest

Для тестирования эндпоинтов FastAPI предоставляет специальный инструмент TestClient.
Напишем тест для эндпоинта, который не требует подключения к базе данных:
from app.main import appfrom fastapi.testclient import TestClientclient = TestClient(app)def test_health_check():    response = client.get("/")    assert response.status_code == 200    assert response.json() == {"Hello": "World"}

Как видите, все достаточно просто. Необходимо инициализировать TestClient, и использовать его для тестирования HTTP запросов.

Для тестирования остальных эндпоинтов, необходимо создать тестовую БД. Отредактируем файл main.py, добавив в него конфигурацию тестовой базы:
from os import environimport databasesDB_USER = environ.get("DB_USER", "user")DB_PASSWORD = environ.get("DB_PASSWORD", "password")DB_HOST = environ.get("DB_HOST", "localhost")TESTING = environ.get("TESTING")if TESTING:    # Используем отдельную базу данных для тестов    DB_NAME = "async-blogs-temp-for-test"    TEST_SQLALCHEMY_DATABASE_URL = (        f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"    )    database = databases.Database(TEST_SQLALCHEMY_DATABASE_URL)else:    DB_NAME = "async-blogs"    SQLALCHEMY_DATABASE_URL = (        f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"    )    database = databases.Database(SQLALCHEMY_DATABASE_URL)

Мы по-прежнему используем БД async-blogs для нашего приложения. Но если задано значение переменной окружение TESTING, тогда использовуется БД async-blogs-temp-for-test.

Чтобы база async-blogs-temp-for-test автоматически создавалась при запуске тестов и удалялась после их выполнения, создадим фикстуру в файле tests/conftest.py:
import osimport pytest# Устанавливаем `os.environ`, чтобы использовать тестовую БДos.environ['TESTING'] = 'True'from alembic import commandfrom alembic.config import Configfrom app.models import databasefrom sqlalchemy_utils import create_database, drop_database@pytest.fixture(scope="module")def temp_db():    create_database(database.TEST_SQLALCHEMY_DATABASE_URL) # Создаем БД    base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))    alembic_cfg = Config(os.path.join(base_dir, "alembic.ini")) # Загружаем конфигурацию alembic     command.upgrade(alembic_cfg, "head") # выполняем миграции    try:        yield database.TEST_SQLALCHEMY_DATABASE_URL    finally:        drop_database(database.TEST_SQLALCHEMY_DATABASE_URL) # удаляем БД

Для создания и удаления БД воспользуемся библиотекой sqlalchemy_utils .

Используя фикстуру temp_db в тестах, мы сможем протестировать все эндпоинты нашего приложения:
def test_sign_up(temp_db):    request_data = {        "email": "vader@deathstar.com",        "name": "Darth Vader",        "password": "rainbow"    }    with TestClient(app) as client:        response = client.post("/sign-up", json=request_data)    assert response.status_code == 200    assert response.json()["id"] == 1    assert response.json()["email"] == "vader@deathstar.com"    assert response.json()["name"] == "Darth"    assert response.json()["token"]["expires"] is not None    assert response.json()["token"]["access_token"] is not None

tests/test_posts.py
import asynciofrom app.main import appfrom app.schemas.users import UserCreatefrom app.utils.users import create_user, create_user_tokenfrom fastapi.testclient import TestClientdef test_create_post(temp_db):    user = UserCreate(        email="vader@deathstar.com",        name="Darth",        password="rainbow"    )    request_data = {      "title": "42",      "content": "Don't panic!"    }    with TestClient(app) as client:        # Create user and use his token to add new post        loop = asyncio.get_event_loop()        user_db = loop.run_until_complete(create_user(user))        response = client.post(            "/posts",            json=request_data,            headers={"Authorization": f"Bearer {user_db['token']['token']}"}        )    assert response.status_code == 201    assert response.json()["id"] == 1    assert response.json()["title"] == "42"    assert response.json()["content"] == "Don't panic!"def test_create_post_forbidden_without_token(temp_db):    request_data = {      "title": "42",      "content": "Don't panic!"    }    with TestClient(app) as client:        response = client.post("/posts", json=request_data)    assert response.status_code == 401def test_posts_list(temp_db):    with TestClient(app) as client:        response = client.get("/posts")    assert response.status_code == 200    assert response.json()["total_count"] == 1    assert response.json()["results"][0]["id"] == 1    assert response.json()["results"][0]["title"] == "42"    assert response.json()["results"][0]["content"] == "Don't panic!"def test_post_detail(temp_db):    post_id = 1    with TestClient(app) as client:        response = client.get(f"/posts/{post_id}")    assert response.status_code == 200    assert response.json()["id"] == 1    assert response.json()["title"] == "42"    assert response.json()["content"] == "Don't panic!"def test_update_post(temp_db):    post_id = 1    request_data = {      "title": "42",      "content": "Life? Don't talk to me about life."    }    with TestClient(app) as client:        # Create user token to add new post        loop = asyncio.get_event_loop()        token = loop.run_until_complete(create_user_token(user_id=1))        response = client.put(            f"/posts/{post_id}",            json=request_data,            headers={"Authorization": f"Bearer {token['token']}"}        )    assert response.status_code == 200    assert response.json()["id"] == 1    assert response.json()["title"] == "42"    assert response.json()["content"] == "Life? Don't talk to me about life."def test_update_post_forbidden_without_token(temp_db):    post_id = 1    request_data = {      "title": "42",      "content": "Life? Don't talk to me about life."    }    with TestClient(app) as client:        response = client.put(f"/posts/{post_id}", json=request_data)    assert response.status_code == 401


tests/test_users.py
import asyncioimport pytestfrom app.main import appfrom app.schemas.users import UserCreatefrom app.utils.users import create_user, create_user_tokenfrom fastapi.testclient import TestClientdef test_sign_up(temp_db):    request_data = {        "email": "vader@deathstar.com",        "name": "Darth",        "password": "rainbow"    }    with TestClient(app) as client:        response = client.post("/sign-up", json=request_data)    assert response.status_code == 200    assert response.json()["id"] == 1    assert response.json()["email"] == "vader@deathstar.com"    assert response.json()["name"] == "Darth"    assert response.json()["token"]["expires"] is not None    assert response.json()["token"]["token"] is not Nonedef test_login(temp_db):    request_data = {"username": "vader@deathstar.com", "password": "rainbow"}    with TestClient(app) as client:        response = client.post("/auth", data=request_data)    assert response.status_code == 200    assert response.json()["token_type"] == "bearer"    assert response.json()["expires"] is not None    assert response.json()["access_token"] is not Nonedef test_login_with_invalid_password(temp_db):    request_data = {"username": "vader@deathstar.com", "password": "unicorn"}    with TestClient(app) as client:        response = client.post("/auth", data=request_data)    assert response.status_code == 400    assert response.json()["detail"] == "Incorrect email or password"def test_user_detail(temp_db):    with TestClient(app) as client:        # Create user token to see user info        loop = asyncio.get_event_loop()        token = loop.run_until_complete(create_user_token(user_id=1))        response = client.get(            "/users/me",            headers={"Authorization": f"Bearer {token['token']}"}        )    assert response.status_code == 200    assert response.json()["id"] == 1    assert response.json()["email"] == "vader@deathstar.com"    assert response.json()["name"] == "Darth"def test_user_detail_forbidden_without_token(temp_db):    with TestClient(app) as client:        response = client.get("/users/me")    assert response.status_code == 401@pytest.mark.freeze_time("2015-10-21")def test_user_detail_forbidden_with_expired_token(temp_db, freezer):    user = UserCreate(        email="sidious@deathstar.com",        name="Palpatine",        password="unicorn"    )    with TestClient(app) as client:        # Create user and use expired token        loop = asyncio.get_event_loop()        user_db = loop.run_until_complete(create_user(user))        freezer.move_to("'2015-11-10'")        response = client.get(            "/users/me",            headers={"Authorization": f"Bearer {user_db['token']['token']}"}        )    assert response.status_code == 401



P.S. Исходники


Вот собственно и все, репозиторий с исходниками из поста можно посмотреть на GitHub.
Подробнее..

VKWave фреймворк для разработки ботов ВКонтакте

16.08.2020 16:17:46 | Автор: admin


Привет, Хабр!


Сегодня я хочу рассказать о замечательной библиотеке для разработке ботов ВКонтакте с помощью языка программирования Python.


VKWave


VKWave это фреймворк для разработки ботов ВКонтакте, написанный с помощью asyncio. Основные цели проекта дать возможность разработчику конфигурировать фреймворк максимально под себя, в тоже время обеспечивая достойную скорость разработки.


Минимальная требуемая версия Python 3.7


В этой статье я хочу показать вам несколько примеров простых ботов на VKWave, а после вы всегда можете попробовать сами. Фреймворк находится в активной разработке, поэтому документация написанна ещё не до конца, но уже имеется большое количество примеров.


Также у нас есть чат в Telegram


Установка


Установка очень проста и содержит в себе всего одну команду:


pip install vkwave

А теперь предлагаю перейти к примерам!


Echo-бот


Самая простая задача. Бот который отвечает нам тем же самым текстом, который мы ему написали.


# Импортируем нужные классы.# SimpleLongPollBot: обёртка для более удобной работы с фреймворком# SimpleBotEvent: тип события, который предоставляет SimpleLongPollBotfrom vkwave.bots import SimpleLongPollBot, SimpleBotEvent# инициализируем бота (можно ввести список токенов, тогда vkwave сможет обходить лимиты ВКонтакте)bot = SimpleLongPollBot(tokens=TOKEN, group_id=GROUP_ID)# декоратор для создания обработчиков.# можно передавать свои фильтры, но в данном случае мы хотим принимать все сообщения@bot.message_handler()def echo(event: SimpleBotEvent) -> str:    # мы можем сразу возвращать текст, т.к vkwave понимает, что если вы возвращаете строку, то вы хотите ответить на сообщение этим текстом. пользователь может задать свои типы данных, которые он сможет возвращать из хендлеров (а также написать нужную логику для их преобразования в нужные действия)    return event.object.object.message.text# запускаем бота с игнорированием ошибок (не останавливаться даже при них)bot.run_forever()

Код содержит буквально пару строк и выглядит очень легко.


Предлагаю написать что-нибудь поинтереснее. Давайте напишем такого же бота, но он будет печатать в ответ только тот текст, который идёт как аргументы команды /echo. Например /echo мой текст.


Вторая версия Echo-бота


# мы используем фильтр для команд. он фильтрует все сообщения которые не выглядит как `/<наша команда>`. можно задать свои префиксы, а также передать список команд@bot.message_handler(bot.command_filter("echo"))def echo(event: SimpleBotEvent) -> str:    # получаем все аргуметы команды    args = event.object.object.message.text.split()    # проверяем, что есть хотя бы один аргумент    # в противном случае - пишем, что пользователь должен ввести какой-нибудь текст    if len(args) < 2:        return "Напиши какой-нибудь текст!"    # возвращаем итоговый текст (соединяем все аргументы через пробел)    return " ".join(args[1:])

Уже интереснее. Как мы видим, VKWave предоставляет нам фильтр для команд. Скажу сразу: стандартных фильтров в VKWave много, а также вы можете написать свои.


Итоговая версия Echo-бота


А сейчас мы попробуем написать свои фильтры для того, чтобы весь наш обработчик выглядел максимально просто.


# Импортируем нужные классы для разработки своих фильтровfrom vkwave.bots.core.dispatching.filters.base import BaseFilter, BaseEvent, FilterResult# объявляем свой фильтр, который наследуется от базовогоclass EchoFilter(BaseFilter):    # мы можем определить `__init__` для дополнительной настройки фильтра    # объявляем асинхронный метод `check`, который принимает событие и возвращает результат фильтра    async def check(self, event: BaseEvent) -> FilterResult:        # делаем алиас для текста сообщения        text = event.object.object.message.text        # разбиваем сообщение по пробелам        all_args = text.split()        # если частей меньше двух - фильтр не прошёл        if len(all_args) < 2:            # возвращаем False.            # так же можем, например, что-то написать пользователю            # у нас есть `event.api_ctx`, который предоставляет лёгкий доступ ко всем методам            return FilterResult(False)        # если нулевой аргумент (сама команда) не "/echo" возвращаем False        if all_args[0] != "/echo":            return FilterResult(False)        # передаём обработчику уже готовый ответ на сообщение        event["echo_answer"] = " ".join(all_args[1:])        return FilterResult(True)# используем фильтр@bot.message_handler(EchoFilter())def echo(event: SimpleBotEvent) -> str:    # возвращаем текст, который мы уже "собрали" в фильтре    return event["echo_answer"]

Заключение


Конечно, я показал вам не все, вообще не все, возможности VKWave. Они очень широки, начиная от возможности своих фильтров и middlewares, типов обработчиков, способов получения событий, заканчивая мультиботом, собственным HTTP клиентом и многое другое!


Этой статьёй я хочу мотивировать вас попробовать создать своего бота ВКонтакте, не используя не удобные vk_api и vk.


Репозиторий на GitHub
Наш чат в Telegram

Подробнее..

Мир без корутин. Костыли для программиста asyncio

02.08.2020 20:17:48 | Автор: admin

1. Введение


Тот, кто научился летать, ползать уже не будет. Но не должно быть и высокомерия к тому, кто летать не может в принципе. И то и другое вполне норма. И то и другое уважаемо и почетно. Для человека это, как выбор профессии: вы, условно, либо летчик, либо шофер. Для тех же животных аналогично вы либо орел, либо волк, т.е. либо летаете, либо бегаете (убегаете). Но только человек в своих понятиях, категориях, отношении и мыслях наделил персонажи характеристиками и выработал свое отношение к ним. Правда, с нюансами. Так, нет, наверное, почетнее и романтичнее профессии летчика, но попробуйте в этом убедить дальнобойщика или авиаконструктора?! И тут сложно возразить: космонавтов много даже сейчас, а второго Королева все еще нет!

Мы программисты. Может, в разной степени, но некоторые уж точно. Это я к тому, что мы разные и мыслить можем тоже по-разному. Утверждение, что программист мыслит только последовательно, столь же однобоко, вредно и даже кощунственно, как и то, что человек только бегает. Он иногда и летает. Кто-то, как летчики, делает это довольно регулярно, а некоторые, как космонавты, даже месяцами и непрерывно. Идея последовательного мышления принижает способности человека. В какой-то момент и на какое-то время в это можно даже поверить, но " все-таки она вертится" это про то, что рано или поздно жизнь возьмет свое.

Asyncio в Python это программные костыли, имитирующие, образно выражаясь, полет неправильного параллельного мышления. Этакое подпрыгивание со взмахами рук. Выглядит, порой, смешно и коряво. Хотя в определенной ситуации это тоже выход: можно просто перейти лужу и заляпаться, но, если силы позволяют, то лучше уж перепрыгнуть. Но, может, программистам сил не хватает?

Попробуем отбросить навязываемые программные костыли и воспарить над программной обыденностью. И пусть это будет не прыжок, а, может, не такой уж высокий и длительный, но все же, особенно в сравнении с костылями, полет. Ведь когда-то и Можайский Александр Федорович (не путать с Можайским городским судом Московской области ;) или те же братья Райт преодолели по воздуху впервые несколько сот метров. Да и испытания современных самолетов начинают с пробежки и кратковременного отрыва от взлетной полосы.

2. Совсем простой пример с asyncio


А начнем мы с полетов на Python. Программа полетов простая. Есть самолеты (которые, правда, в исходном варианте образности пауки см. [1]) с именами на фюзеляжах Blog, News, Forum. Вылетают они одновременно. Каждый за определенное время должен пролететь отрезок пути и выбросить, предположим, флажок с номером преодоленного отрезка. Так нужно поступить три раза. И только после этого приземлиться.

На языке Python модель подобного поведения описывает, а затем и моделирует, код листинга 1.
Листинг 1. Код самолетов-пауков на Python
import asyncioimport timeasync def spider(site_name): for page in range(1, 4):     await asyncio.sleep(1)     print(site_name, page)spiders = [ asyncio.ensure_future(spider("Blog")), asyncio.ensure_future(spider("News")), asyncio.ensure_future(spider("Forum"))]start = time.time()event_loop = asyncio.get_event_loop()event_loop.run_until_complete(asyncio.gather(*spiders))event_loop.close()print("{:.2F}".format(time.time() - start))



Результаты моделирования подобного полета следующие:

Blog 1
News 1
Forum 1
Blog 2
News 2
Forum 2
Blog 3
News 3
Forum 3
3.00

Почему это так подробно разъясняет видео [1]. Но мы-то ребята с фантазией и одновременный (по сценарию асинхронный) полет наших трех самолетов без использования asyncio представим по-другому на базе автоматных моделей. Так, листинг 2 приводит код автоматной задержки аналога асинхронной задержки из модуля asyncio, представленной строкой await asyncio.sleep(1) в листинге 1.
Листинг 2. Код автоматной задержки на Python
import timeclass PSleep:    def __init__(self, t, p_FSM): self.SetTime = t; self.nState = 0; self.bIfLoop = False; self.p_mainFSM = p_FSM    def x1(self): return time.time() - self.t0 <= self.SetTime    def y1(self): self.t0 = time.time()    def loop(self):        if (self.nState == 0): self.y1(); self.nState = 1        elif (self.nState == 1):            if (not self.x1()): self.nState = 4


Через конструктор класса передается значение задержки и указатель на объект, создавший объект задержки. Указатель требуется функции управления процессами, которая после удалении задержки продолжит работу родительского процесса, который был остановлен при ее создании.

На листинге 3 показан автоматный аналог асинхронного самолета-паука (см. также листинг 1). Весьма вероятно, что ассу программирования на Python подобное не приснится даже в самом кошмарном сне! Исходный код из четырех строчек увеличился в 15 раз! Это ли не повод восхищения типичным кодом Python вообще и asycio в частности или, как минимум, доказательство преимущества корутинной технологии перед автоматным программированием?
Листинг 3. Код автоматного паука на Python
# "паук" для страницы "Blog"class PBSpider:    def __init__(self, name):        self.nState = 0; self.bIfLoop = True; self.site_name = name; self.page = 1;        self.p_mainFSM = b_sleep;    def x1(self): return self.page < 4    def y1(self):        self.bIfLoop = False; automaton.append(b_sleep);        b_sleep.p_mainFSM = blog        automaton[-1].bIfLoop = True;        automaton[-1].nState = 0    def y2(self): print(self.site_name, self.page)    def y3(self): self.page += 1    def y4(self): self.page = 1    def loop(self):        if (self.x1() and self.nState == 0):  self.y1(); self.nState = 1        elif (not self.x1()  and self.nState == 0): self.y1(); self.y4(); self.nState = 33        elif (self.nState == 1): self.y2(); self.y3(); self.nState = 0# "паук" для страницы "News"class PNSpider:    def __init__(self, name):        self.nState = 0; self.bIfLoop = True; self.site_name = name; self.page = 1;        self.p_mainFSM = n_sleep;    def x1(self): return self.page < 4    def y1(self):        self.bIfLoop = False; automaton.append(n_sleep);        n_sleep.p_mainFSM = news        automaton[-1].bIfLoop = True;        automaton[-1].nState = 0    def y2(self): print(self.site_name, self.page)    def y3(self): self.page += 1    def y4(self): self.page = 1    def loop(self):        if (self.x1() and self.nState == 0):  self.y1(); self.nState = 1        elif (not self.x1()  and self.nState == 0): self.y1(); self.y4(); self.nState = 33        elif (self.nState == 1): self.y2(); self.y3(); self.nState = 0# паук для страницы "Forum"class PFSpider:    def __init__(self, name):        self.nState = 0; self.bIfLoop = True; self.site_name = name; self.page = 1;        self.p_mainFSM = f_sleep;    def x1(self): return self.page < 4    def y1(self):        self.bIfLoop = False; automaton.append(f_sleep);        f_sleep.p_mainFSM = forum        automaton[-1].bIfLoop = True;        automaton[-1].nState = 0    def y2(self): print(self.site_name, self.page)    def y3(self): self.page += 1    def y4(self): self.page = 1    def loop(self):        if (self.x1() and self.nState == 0):  self.y1(); self.nState = 1        elif (not self.x1()  and self.nState == 0): self.y1(); self.y4(); self.nState = 33        elif (self.nState == 1): self.y2(); self.y3(); self.nState = 0# задержкиb_sleep = PSleep(1, 0)n_sleep = PSleep(1, 0)f_sleep = PSleep(1, 0)# "пауки"blog = PBSpider("Blog")news = PNSpider("News")forum = PFSpider("Forum")# формирование исходного списка процессовautomaton = []automaton.append(blog);automaton.append(news);automaton.append(forum);start = time.time()# управление процессами (аналог event_loop)while True:    ind = 0;    while True:        while ind < len(automaton):            if automaton[ind].nState == 4:                automaton[ind].p_mainFSM.bIfLoop = True                automaton.pop(ind)                ind -=1            elif automaton[ind].bIfLoop:                automaton[ind].loop()            elif automaton[ind].nState == 33:                print("{:.2F}".format(time.time() - start))                exit()            ind += 1        ind = 0


А вот и результат автоматных полетов:

News 1
Forum 1
Blog 1
Blog 2
News 2
Forum 2
News 3
Forum 3
Blog 3
3.00

Но обсудим. Увеличение объема кода произошло из-за проблем с указателями в Python. В результате пришлось создать класс для каждой страницы, что и увеличило код в три раза. Поэтому правильнее говорить не о 15-ти, а о пятикратном увеличении объема. Более искусный в программировании Python-летчик этот недостаток, возможно, сможет даже устранить.

Но основная причина все же не в указателях. Код на С++, показанный далее, при полной свободе работы с указателями, имеет на один класс даже большее число строк. Причина в используемой вычислительной модели, языке ее описания и подходов к реализации алгоритмов на ее базе. Рис. 1 демонстрирует обычную модель самолета-паука в форме блок-схемы и модель в форме автомата. Можно видеть, что внешне и по качеству это разные модели, хотя и допускающие эквивалентные преобразования. У автоматов есть состояния, у блок-схем их нет и в помине. Автоматы по определению работают в дискретном времени, а блок-схемы об этом и не мечтают. Все это накладывает на реализацию модели определенные обязательства.

Отсутствие понятия дискретного времени суть проблем существующей блок-схемной модели программирования, которая должна реализовать, строго говоря, для нее нереализуемое, т.е. параллельные процессы. Напомним, что для автоматов автоматная сеть, как модель параллельных процессов (а также асинхронных), это естественное их состояние.
Рис. 1. Автоматная и блок-схемная модели самолета-паука
image

Но даже на уровне отдельного процесса у моделей есть отличия, которые проецируются на язык и реализацию модели. В силу этих качеств апологеты последовательной блок-схемной модели создали конструкции языка, которые в явном или неявном виде позволяют весьма компактно ее описать. Взять тот же цикл for или хотя бы неявно подразумеваемую последовательность исполнения операторов (действия y1, y2, y3).

Для блок-схемы в одном квадратике можно без проблем перечислить действия и это не изменит последовательный характер их работы. Если же у автомата заменить переходы в состояниях s2, s3 на цикл при состоянии s1, пометив дугу этими же действиями, то смысл алгоритма изменится, т.к. введет параллелизм в его работу. Поскольку упомянутые действия должны исполнятся строго последовательно, то это и предопределило вид модели автомата (см. рис.1). Конечный автомат модель, не допускающая двоемыслия.

Отсутствие дискретного времени у блок-схем было их преимуществом. Но сейчас это стало их основным недостатком. Это влияет, как это не покажется кощунственным, и на образ мышления. Последовательными языками оправдывают последовательное мышление программистов, отказывая им в другом в параллельном. Именно этим обосновывают конструкции существующего асинхронного программирования, представляя набор и функциональность операторов того же пакета asyncio. И именно этот подход, как утверждают, позволяет привычные программистам последовательные программы превратить в асинхронные (почти что параллельные).

Но вернемся к теме статьи и ее образам. Мы хотели свой самолет и мы его получили! Полеты или, что точнее, видимые их результаты, несколько отличаются по внешнему виду, но совершенно неотличимы по своей сути. Их можно трактовать так, что флажки подбирались и фиксировались в протоколе в ином порядке, но сами самолеты при этом летели, как и должно, т.е. одновременно и одновременно выбрасывали свои флажки. А уж в каком порядке их фиксировали дело, как говорится, десятое. Главное реализовано и иполнено: последовательность и времена их сбрасывания соответствуют программе полета

Код можно сократить. Так, по-видимому, можно ограничиться кодом только одного класса. Можно скрыть и код событийного цикла. Если же при этом в исходном коде открыть подкапотный код, который сокрыт за операторами asyncio и await, то объем автоматного кода, скорее всего, уже не будет таким уж отпугивающим.

3. О проблемах реализации автоматов в Python


Остановимся подробнее на проблемах, которые породили внешний вид автоматного кода. Последний, что там скрывать, выглядит пока монструозно в сравнении с исходным кодом. Но, заметим, и первый самолет Можайского далеко не был похож на современную сушку, да и первые автомобили отличаются далеко не в лучшую сторону от любого своего современного аналога. Подчеркну, что проблемы предъявленного автоматного кода во многом связаны с моим текущим пониманием языка Python и, возможно, в меньшей степени с возможностями самого языка.

Тем не менее первая проблема связана с языком описания автоматной модели. В С++ она решена средствами языка. В Python я таких возможностей не вижу. К сожалению, как сейчас иногда выражаются, от слова совсем. Поэтому за основу был взят метод реализации автоматов на базе управляющих операторов языка if-elif-else. Кроме того, напомним, в ВКП(а), кроме собственно автоматов, для полноценной реализации параллелизма введена теневая память и автоматные пространства. Без этого возможности автоматного программирования весьма ограничены и во многом неполноценны.

Следующая проблема, которую мы уже упоминали, указатели. В С++ с ними проблем нет. В рамках ВКП(а) в соответствии с парадигмой ООП создан базовый автоматный класс, от которого порождаются прикладные автоматные классы, а их параметрами могут быть не только указатели, но даже их адреса. Все это позволяет просто, компактно и весьма эффективно описать и реализовать любую задачу, включающую множество параллельных взаимодействующих процессов.

Далее представлен код эквивалентных рассматриваемому примеру автоматных классов на С++. Код задержки на листинге 4 эквивалентен строке await asyncio.sleep(1) на листинге 1. В графической форме ей соответствует модель автомата FAwaitSleep на рис. 1. Такой и только такой автомат можно считать асинхронным и он не будет тормозить вычислительный поток. Автомат FSleep на этом же рисунке соответствует обычному оператору sleep(). Он проще, но гарантированно разрушит модель дискретного времени из-за действия y1, вызывающего обычную последовательную задержку. А это уже ни куда не годится.
Листинг 4. Код асинхронной задержки
// Задержка (в дискретном времени)#include "lfsaappl.h"#include <QTime>class FAwaitSleep :    public LFsaAppl{public:    FAwaitSleep(int n);protected:    int x1();    QTime time;    int nAwaitSleep;};#include "stdafx.h"#include "FAwaitSleep.h"static LArc TBL_AwaitSleep[] = {    LArc("s1","s1","x1",  "--"),//    LArc("s1","00","^x1","--"),//    LArc()};FAwaitSleep::FAwaitSleep(int n):    LFsaAppl(TBL_AwaitSleep, "FAwaitSleep"){    nAwaitSleep = n; time.start();}int FAwaitSleep::x1() { return time.elapsed() < nAwaitSleep; }


Код самолета-паука на С++ демонстрирует листинг 5. Данный код в гораздо большей степени адекватен своей модели, чем блок-схема коду на Python. Особенно, если сравнить таблицу переходов автомата и внешний вид автоматного графа. Это просто разные формы описания одного и того же абстрактного понятия автомата. Здесь же показано, как передается указатель на родительский класс при создании задержки (см. вызов метода FCall в действии y1)
Листинг 5. Код самолета-паука, имитирующего чтение страниц сайта
// "Паук". Имитация страниц сайта#include "lfsaappl.h"class FAwaitSleep;class FSpider :    public LFsaAppl{public:    LFsaAppl* Create(CVarFSA *pCVF) { Q_UNUSED(pCVF)return new FSpider(nameFsa); }    bool FCreationOfLinksForVariables() override;    FSpider(string strNam);    virtual ~FSpider(void);    CVar *pVarStrSiteName;// имя сайта    FAwaitSleep *pFAwaitSleep{nullptr};protected:    int x1(); void y1(); void y2(); void y3(); void y4();    int page{1};};#include "stdafx.h"#include "FSpider.h"#include "FSleep.h"#include "FAwaitSleep.h"#include <QDebug>static LArc TBL_Spider[] = {    LArc("st","s1","--","--"),    LArc("s1","s2","x1","y1"),  // x1- номер<макс.числа страниц; y1-задержка;    LArc("s2","s3","--","y2"),  // y2- печатать номера страницы;    LArc("s3","s1","--","y3"),  // y3- увеличить номер страницы    LArc("s1","st","^x1","y4"), // y4- сбросит номера страницы    LArc()};FSpider::FSpider(string strNam):    LFsaAppl(TBL_Spider, strNam){ }FSpider::~FSpider(void) { if (pFAwaitSleep) delete pFAwaitSleep; }bool FSpider::FCreationOfLinksForVariables() {    pVarStrSiteName = CreateLocVar("strSiteName", CLocVar::vtString, "name of site");    return true;}// счетчик страниц меньше заданного числа страниц?int FSpider::x1() { return page < 4; }// create delay - pure sleep (synchronous function) or await sleep (asynchronous function)void FSpider::y1() {    //sleep(1000);    // await sleep (asynchronous function)    if (pFAwaitSleep) delete pFAwaitSleep;    pFAwaitSleep = new FAwaitSleep(1000);    pFAwaitSleep->FCall(this);}void FSpider::y2() {#ifdef QT_DEBUG    string str = pVarStrSiteName->strGetDataSrc();    printf("%s%d", str.c_str(), page);    qDebug()<<str.c_str()<<page;#endif}void FSpider::y3() { page++; }void FSpider::y4() { page = 1; }



Код, реализующий функции так называемого событийного цикла, отсутствует. В нем просто нет необходимости, т.к. его функции исполняет ядро среды ВКП(а). Оно создает объекты и управляет их параллельным исполнением в дискретном времени.

4. Выводы


Краткость не всегда сестра таланта, а иногда еще и признак косноязычия. Правда, отличить одно от другого сразу сложно. Код на Python часто будет короче кода на С++. Но это характерно для простых случаев. Чем сложнее решение, тем эта разница будет меньше. В конце концов даже сложность решения определяется возможностями модели. Автоматная модель гораздо мощнее блок-схемной.

Автоматы и распараллеливание это в первую очередь весьма эффективные средства решения проблем сложности, борьбы с нею, а не столько средства увеличения скорости работы программы. Поскольку все это автоматная модель, параллелизм сложно реализуемо на Python, то, несмотря на все его фишки, батарейки и много еще чего, меня сложно склонить в его сторону. Я бы больше уделял внимание окружению С++, а не очень оправданному внедрению в него тех же корутин. Модель эта временная и причина ее внедрения во многом вынужденная. А что мы будем делать с этим костылем, когда будет решена проблема выбора параллельной модели?

Поэтому, извините, но мои предпочтения все еще на стороне С++. А если учесть область моих профессиональных интересов промышленные системы так называемого жуткого жесткого реального времени, то выбора как такового у меня пока что нет. Да, какое-то окружение, какой-то сервис можно создать, используя Python. Это удобно, это быстро, есть много прототипов и т.д. и т.п. Но ядро решения, его параллельную модель, логику самих процессов однозначно С++, однозначно автоматы. Здесь автоматы, конечно, главнее и рулят. Но никак не корутины :)

В дополнение Посмотрите видео [2], обратив внимание на реализацию модели ракеты. О ней, начиная примерно с 12-й минуты, и повествует видео. Респект лектору за использование автоматов :) А на сладкое предлагается еще одно решение из [3]. Оно в духе асинхронного программирования и asyncio. Собственно с этого примера все и началось реализация вложенных автоматов в Python. Здесь глубина вложения даже больше, чем в примере, подробно рассмотренном выше. На листинге 6 приведен исходный код и его автоматный аналог на Python. На рис. 2 автоматная модель чаепития, а на листинге 7 эквивалентная реализация на С++ для ВКП(а). Сравнивайте, анализируйте, делайте выводы, критикуйте
Листинг 6. Читаем и пьем чай асинхронно на Python
import asyncioimport time# # Easy Python. Asyncio в python 3.7 https://www.youtube.com/watch?v=PaY-hiuE5iE# # 10:10# async def teatime():#     await asyncio.sleep(1)#     print('take a cap of tea')#     await asyncio.sleep(1)## async def read():#     print('Reading for 1 hour...')#     await teatime()#     print('...reading for 1 hour...')## if __name__ == '__main__':#     asyncio.run(read())class PSleep:    def __init__(self, t, p_FSM): self.SetTime = t; self.nState = 0; self.bIfLoop = False; self.p_mainFSM = p_FSM    def x1(self): return time.time() - self.t0 <= self.SetTime    def y1(self): self.t0 = time.time()    def loop(self):        if (self.nState == 0): self.y1(); self.nState = 1        elif (self.nState == 1):            if (not self.x1()): self.nState = 4class PTeaTime:    def __init__(self, p_FSM): self.nState = 0; self.bIfLoop = False; self.p_mainFSM = p_FSM;    def y1(self): self.bIfLoop = False; automaton.append(sl); automaton[-1].bIfLoop = True; automaton[-1].nState = 0    def y2(self): print('take a cap of tea')    def loop(self):        if (self.nState == 0):  self.y1(); self.nState = 1        elif (self.nState == 1): self.y2(); self.nState = 2        elif (self.nState == 2): self.y1(); self.nState = 3        elif (self.nState == 3): self.nState = 4class PRead:    def __init__(self): self.nState = 0; self.bIfLoop = False;    def y1(self): print('Reading for 1 hour...')    def y2(self): self.bIfLoop = False; automaton.append(rt); automaton[-1].bIfLoop = True; automaton[-1].nState = 0    def loop(self):        if (self.nState == 0): self.y1(); self.nState = 1        elif (self.nState == 1): self.y2(); self.nState = 2        elif (self.nState == 2): self.y1(); self.nState = 33; self.bIfLoop = Falseread = PRead()rt = PTeaTime(read)sl = PSleep(5, rt)automaton = []automaton.append(read); automaton[-1].bIfLoop = Truewhile True:    ind = 0;    while True:        while ind < len(automaton):            if automaton[ind].nState == 4:                automaton[ind].p_mainFSM.bIfLoop = True                automaton.pop(ind)                ind -=1            elif automaton[ind].bIfLoop:                automaton[ind].loop()            elif automaton[ind].nState == 33:                exit()            ind += 1        ind = 0


Рис. 2. Автоматная модель чаепития
image

Листинг 7. Читаем и пьем чай асинхронно на С++
#include "lfsaappl.h"class FRead :    public LFsaAppl{public:    LFsaAppl* Create(CVarFSA *pCVF) { Q_UNUSED(pCVF)return new FRead(nameFsa); }    FRead(string strNam);    virtual ~FRead(void);protected:    void y1(); void y2();  void y3();    LFsaAppl *pFRealTime{nullptr};};#include "stdafx.h"#include "FRead.h"#include "FTeaTime.h"#include <QDebug>static LArc TBL_Read[] = {    LArc("s1","s2","--","y1"),// Reading for 1 hour...    LArc("s2","s3","--","y2"),// Call(TeaTime)    LArc("s3","s4","--","y1"),// Reading for 1 hour...    LArc("s4","s5","--","y3"),// sleep(5)    LArc("s5","s1","--","--"),//    LArc()};FRead::FRead(string strNam):    LFsaAppl(TBL_Read, strNam){ }FRead::~FRead(void) { if (pFRealTime) delete pFRealTime; }void FRead::y1() {#ifdef QT_DEBUG    qDebug()<<"Reading for 1 hour...";#endif}void FRead::y2() {    if (pFRealTime) delete pFRealTime;    pFRealTime = new FTeaTime("TeaTime");    pFRealTime->FCall(this);}void FRead::y3() { FCreateDelay(5000); }#include "lfsaappl.h"class FTeaTime :    public LFsaAppl{public:    FTeaTime(string strNam);protected:    void y1(); void y2();};#include "stdafx.h"#include "FTeaTime.h"#include <QDebug>#include "./LSYSLIB/FDelay.h"static LArc TBL_TeaTime[] = {    LArc("s1","s2","--","y1"),// sleep(1)    LArc("s2","s3","--","y2"),// take a cap of tea    LArc("s3","s4","--","y1"),// sleep(1)    LArc("s4","00","--","--"),//    LArc()};FTeaTime::FTeaTime(string strNam):    LFsaAppl(TBL_TeaTime, strNam){ }void FTeaTime::y1() { FCreateDelay(2000); }void FTeaTime::y2() {#ifdef QT_DEBUG    qDebug()<<"take a cap of tea";#endif}



PS
Уже после написания статьи после чтения перевода статьи Йерайна Диаза[4], познакомился с еще одним достаточно интересным и довольно восхищенным взглядом на корутины вообще и asyncio в частности. Несмотря на этот факт и ему подобные, мы все же пойдем иным путем :) Согласен только в одном с Робом Пайком (Rob Pike), что Concurrency Is Not Parallelesm. Конкурентность, можно сказать даже жестче, не имеет вообще ни чего общего с параллелизмом. И примечательно, что Google-переводчик переводит эту фразу как Параллелизм это не параллелизм. Мужчина по имени Google, конечно, не прав. Но кто-то же его в этом убедил? :)

Литература


1. Shultais Education. 1. Введение в асинхронное программирование. [Электронный ресурс], Режим доступа: www.youtube.com/watch?v=BmOjeVM0w1U&list=PLJcqk6mrJtxCo_KqHV2rM2_a3Z8qoE5Gk, свободный. Яз. рус. (дата обращения 01.08.2020).
2. Computer Science Center. Лекция 9. async / await (Программирование на Python). [Электронный ресурс], Режим доступа: www.youtube.com/watch?v=x6JZmBK2I8Y, свободный. Яз. рус. (дата обращения 13.07.2020).
3. Easy Python. Asyncio в python 3.7. [Электронный ресурс], Режим доступа: www.youtube.com/watch?v=PaY-hiuE5iE, свободный. Яз. рус. (дата обращения 01.08.2020).
4. Йерай Диаз (Yeray Diaz). Asyncio для практикующего python-разработчика. [Электронный ресурс], Режим доступа: www.youtube.com/watch?v=PaY-hiuE5iE, свободный. Яз. рус. (дата обращения 01.08.2020).
Подробнее..

Из песочницы Реализация offline режима для Yandex.Music

08.08.2020 16:12:48 | Автор: admin

Введение


Сегодня мы будем рассматривать такой достаточно известный музыкальный сервис, как Yandex.Music. Хороший в целом сервис, но с существенным недостатком невозможностью работы оффлайн. Мы попробуем исправить это досадное недоразумение, используя подручные инструменты.


Инструментарий


Итак, нам понадобится:


  • Относительно свежий python: 3.7 и выше
  • Всякая асинхронщина: aiohttp и aiofile
  • Классический инструмент для работы с html-API: BeautifulSoup
  • Для развлечения пользователя во время процесса: tqdm
  • Для заполнения тэгов: mutagen

Авторизация


Неавторизованным пользователям сервиса доступны только отрезки песен длинной до 30 секунд. Этого явно недостаточно для качественного прослушивания. Авторизоваться мы будем самым что ни на есть естественным способом, через веб-форму с получением печенюшек. В этом нам поможет opener для выполнения запросов и HTMLParser для парсинга форм.


def resolve_cookie(login: str, password: str) -> str:    cookies = CookieJar()    opener = urllib.request.build_opener(        urllib.request.HTTPCookieProcessor(cookies),        urllib.request.HTTPRedirectHandler())    response = opener.open("https://passport.yandex.ru")    doc = response.read()    parser = FormParser()    parser.feed(doc.decode("utf-8"))    parser.close()    parser.params["login"] = login    response = opener.open(parser.url or response.url, urllib.parse.urlencode(parser.params).encode("utf-8"))    doc = response.read()    parser = FormParser()    parser.feed(doc.decode("utf-8"))    parser.close()    parser.params["login"] = login    parser.params["passwd"] = password    response = opener.open(parser.url or response.url, urllib.parse.urlencode(parser.params).encode("utf-8"))    cookie_data = {}    for item in cookies:        if item.domain == ".yandex.ru":            cookie_data[item.name] = item.value    if "yandex_login" not in cookie_data:        keys = ", ".join(cookie_data.keys())        raise Exception(f"Invalid cookie_data {keys}")    return "; ".join(map(lambda v: f"{v[0]}={v[1]}", cookie_data.items()))

По адресу https://passport.yandex.ru нас ожидает первая форма с полем login. Если будет указан актуальный логин, то следующая форма предложит ввести пароль, что мы незамедлительно сделаем. Результат отправки формы нас особо не интересует. Наша цель кукисы. Если среди полученых кук окажется yandex_login, то авторизация прошла успешно. Вариант с двухфакторной аутентификацией мы пока не рассматриваем.


Yandex Music (HTML) API


Следующим нашим шагом будет получение списка избранных пользователем исполнителей. Чтобы показать наш стиль и молодёжность, для запросов применим aiohttp. Полученный html распарсим с помощью BeautifulSoup. Это классический инструмент, который используют ребята, заинтересованные в получении контента с чужих веб-страниц.


class YandexMusicApi:    host = "music.yandex.ru"    base_url = f"https://{host}"    def __init__(self, cookie: str):        self.headers = Headers(self.host, cookie)    async def _request(self, end_point: str):        async with aiohttp.ClientSession() as session:            url = f"{self.base_url}/{end_point}"            async with session.request(method="GET", url=url) as response:                return await response.read()    async def get_favorite_artists(self, login: str) -> List[Artist]:        body = await self._request(f"users/{login}/artists")        soup = BeautifulSoup(body, "lxml")        artists_soup = soup.find("div", class_="page-users__artists")        if artists_soup is None:            caption = soup.find("div", class_="page-users__caption")            if caption:                raise Exception(caption.contents[0])        result = []        for artist_soup in artists_soup.find_all("div", class_="artist"):            title_soup = artist_soup.find("div", class_="artist__name")            title = title_soup.attrs["title"]            title_href_soup = title_soup.find("a")            id_ = int(title_href_soup.attrs["href"].split("/")[-1])            result.append(Artist(id_, title))        return result

Здесь всё довольно просто, на странице https://music.yandex.ru/users/<login>/artists в блоке page-users__artists отображается требуемый нам список. Имя исполнителя можно обнаружить в аттрибуте title блока artist__name. Id исполнителя лёгким движением split извлекается из адреса ссылки.
Аналогичные действия позволят нам получить список альбомов исполнителя, а также список треков в альбоме.


Проникновение в хранилище


В этом месте мы уже потираем наши шаловливые ручки в ожидании прослушивания своей музыкальной коллекции. Но если бы всё было так просто, то я бы не писал эту статью. Нас ожидает новый противник yandex-хранилище. Для проникновения в хранилище потребуется воспроизвести жонглирование ссылками, которое происходит на страницах музыкального ресурса. При внимательном изучении вкладки Network браузерного инструментария выяснилось, что все элементы запроса легко выводятся за исключением одного участка в адресе https://{host}/get-mp3/{sign}/{ts}/{path}, который после непродолжительного гугления получил имя sign. Соль для сигнатуры (XGRlBW9FXlekgbPrRHuSiA) тоже была обнаружена на просторах интернета. Скорее всего не составит большого труда извлечь это значение из исходников страницы, но это уже не требуется.


    async def get_track_url(self, album_id: int, track_id: int) -> str:        async with aiohttp.ClientSession() as session:            url = f"{self.base_url}/api/v2.1/handlers/track/{track_id}:{album_id}/" \                  f"web-album-track-track-main/download/m?hq=0&external-domain={self.host}&overembed=no&__t={timestamp()}"            page = f"album/{album_id}"            headers = self.headers.build(page)            async with session.request(method="GET", url=url, headers=headers) as response:                body = await response.json()                src = body["src"]                src += f"&format=json&external-domain={self.host}&overembed=no&__t={timestamp()}"                result = parse.urlparse(src)                headers = self.headers.build(page, {                    ":authority": "storage.mds.yandex.net",                    ":method": "GET",                    ":path": f"{result.path}/{result.query}",                    ":scheme": "https",                }, True)                async with session.request(method="GET", url=src, headers=headers) as response:                    body = await response.json()                    host = body["host"]                    path = body["path"]                    s = body["s"]                    ts = body["ts"]                    sign = md5(f"XGRlBW9FXlekgbPrRHuSiA{path[1::]}{s}".encode("utf-8")).hexdigest()                    url = f"https://{host}/get-mp3/{sign}/{ts}/{path}"                    return url

Время жизни ссылки ограничено, поэтому нужно незамедлительно перейти к скачиванию.


Скачивание


На этом этапе уже немного надоедает всё это дело, поэтому берём в одну руку ранее используемое нами aiohttp, в другую aiofile и реализуем скачивание в лоб.


    async def download_file(cls, url: str, filename: str):        async with aiohttp.ClientSession() as session:            async with session.request(method="GET", url=url) as response:                data = await response.read()                async with AIOFile(filename, "wb") as afp:                    await afp.write(data)                    await afp.fsync()

Опытные программисты могут предложить реализовать скачивание с использованием буфера и записью во временный файл. Но в данном случае, учитывая, что максимальный размер mp3, как правило, укладывается в оперативную память, то при достаточно быстром интернете можно этим не заморачиваться. Ладно, не надо кидать в меня подручными материалами, я всё понял. На самом деле это одно из ключевых мест нашего проекта, которое в следующей итерации требуется привести к адекватному виду. В идеале ещё стоит добавить повторные запросы при возникновении сетевых ошибок.


Раскручиваем шарманку


Далее, нам нужно поставить закачку на поток. Методично перебираем исполнителей, альбомы, треки. Создаём директории, скачиваем альбомный арт, скачиваем сами треки. Прописываем тэги (почему-то у полученных mp3 они отсутствовали).


    async def download_artist(self, artist: Artist, depth: Depth = Depth.NORMAL):        artist_progress = tqdm(total=0, desc=artist.title, position=1, ascii=True)        albums = await self.api.get_artist_albums(artist.id)        artist_progress.total = len(albums)        artist_progress.refresh()        for album in albums:            album_dir = os.path.join(self.target_dir, normalize(artist.title), f"{album.year} - {normalize(album.title)}")            if depth < Depth.ALBUMS and os.path.exists(album_dir):                artist_progress.update()                continue            album_progress = tqdm(total=0, desc=f"> {album.title}", position=0, ascii=True)            tracks = await self.api.get_album_tracks(album.id)            album_progress.total = len(tracks)            album_progress.refresh()            os.makedirs(album_dir, exist_ok=True)            if album.cover:                album_progress.total += 1                cover_filename = os.path.join(album_dir, "cover.jpg")                if not os.path.exists(cover_filename):                    await self.download_file(album.cover, cover_filename)                album_progress.update()            for track in tracks:                target_filename = os.path.join(album_dir, f"{track.num:02d}. {normalize(track.title)}.mp3")                if depth >= Depth.TRACKS or not os.path.exists(target_filename):                    url = await self.api.get_track_url(track.album_id, track.id)                    await self.download_file(url, target_filename)                    self.write_tags(target_filename, {                        "title": track.title,                        "tracknumber": str(track.num),                        "artist": artist.title,                        "album": album.title,                        "date": str(album.year),                    })                album_progress.update()            album_progress.close()            artist_progress.update()        artist_progress.close()

После первой загрузки, я с удивлением обнаружил, что любимая мною группа AC/DC оказалась разделена на две директории. Это послужило поводом к реализации метода normalize:


def normalize(name: str) -> str:    return name.replace("/", "-")

Как несложно заметить, процесс скачивания можно (точнее нужно) распараллелить. Почва для этого подготовлена благодаря использованию асинхронных библиотек для сетевых запросов и записи в файлы. Для этого можно использовать asyncio.Semaphore в комбинации с asyncio.gather.
Но реализация асинхронной очереди не является предметом данной статьи, поэтому пока и так сойдёт.


Запуск


Теперь, когда у нас всё уже практически готово, нужно подумать о том, как это запускать. Явки и пароли мы будем вычитывать из файла .credentials, ненавязчиво бросая эксепшен в случае его отсутствия. Полученную куку бережно сохраним в файл .cookie для дальнейшего использования.


def resolve_cookie() -> str:    base_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), os.path.pardir))    cookie_file = os.path.join(base_dir, ".cookie")    if os.path.exists(cookie_file):        with open(cookie_file, "rt") as file:            return file.read()    credentials_file = os.path.join(base_dir, ".credentials")    if os.path.exists(credentials_file):        config = configparser.ConfigParser()        config.read(credentials_file)        login = config["yandex"]["login"]        password = config["yandex"]["password"]    else:        raise Exception(f"""Create \"{credentials_file}\" with content[yandex]login=<user_login>password=<user_password>""")    cookie = auth.resolve_cookie(login, password)    with open(cookie_file, "wt") as file:        file.write(cookie)    return cookie

В этом месте стоит немного украсить жизнь пользователя, позволив ему выбрать директорию для хранения музыкальной коллекции, а так же возможность скачивания треков отдельного исполнителя. Для разбора аргументов командной строки используем стандартный инструмент argparse, который отличается простотой конфигурации.
Опишем немного опциональных параметров:


  • -a (--artist), Id исполнителя, если указан, то скачиваются только треки этого исполнителя
  • -o (--output), директория для хранения музыкальной коллекции, по умолчанию Music в домашней директории пользователя.
  • -d (--depth), параметр родился как костыль, вызванный возможным прерыванием процесса
    • При значении по-умолчанию 0 (NORMAL) проверяется наличие директории с альбомом, и, если она существует, то загрузка альбома пропускается
    • Значение 1 (ALBUMS) перебирает все треки в альбоме и скачивает недостающие
    • Значение 2 (TRACKS) скачивает и перезаписывает треки, даже если они уже присутствуют в файловой системе

async def main():    parser = argparse.ArgumentParser()    parser.add_argument("-a", "--artist", help="Artist ID")    parser.add_argument("-o", "--output", default=f"{Path.home()}/Music",                        help=f"Output directory, default {Path.home()}/Music")    parser.add_argument("-d", "--depth", default=0, type=int,                        help=f"Exists files check depth, {enum_print(Depth)}")    args = parser.parse_args()    cookie = resolve_cookie()    api = YandexMusicApi(cookie)    agent = YandexMusicAgent(api, args.output)    if args.artist:        artist = await api.get_artist(args.artist)        await agent.download_artist(artist, args.depth)    else:        email = re.compile(".*?yandex_login=(.*?);.*?", re.M).match(cookie).group(1)        await agent.download_favorites(email, args.depth)

И вот, наконец-то, мы можем всё это запустить:


if __name__ == "__main__":    asyncio.run(main())

Спасибо за внимание. Теперь вы знаете, как реализовать API которого нет, скачать нескачиваемое и стать счастливым обладателем собственной музыкальной коллекции.


Результат можно увидеть в репозитории yandex.music.agent

Подробнее..
Категории: Python , Api , Asyncio , Yandex music

Мониторинг демон на Asyncio Dependency Injector руководство по применению dependency injection

09.08.2020 08:06:15 | Автор: admin
Привет,

Я создатель Dependency Injector. Это dependency injection фреймворк для Python.

Это еще одно руководство по построению приложений с помощью Dependency Injector.

Сегодня хочу показать как можно построить асинхронный демон на базе модуля asyncio.

Руководство состоит из таких частей:

  1. Что мы будем строить?
  2. Проверка инструментов
  3. Структура проекта
  4. Подготовка окружения
  5. Логирование и конфигурация
  6. Диспетчер
  7. Мониторинг example.com
  8. Мониторинг httpbin.org
  9. Тесты
  10. Заключение

Завершенный проект можно найти на Github.

Для старта желательно иметь:

  • Начальные знания по asyncio
  • Общее представление о принципе dependency injection

Что мы будем строить?


Мы будем строить мониторинг демон, который будет следить за доступом к веб-сервисам.

Демон будет посылать запросы к example.com и httpbin.org каждые несколько секунд. При получении ответа он будет записывать в лог такие данные:

  • Код ответа
  • Количество байт в ответе
  • Время, затраченное на выполнение запроса



Проверка инструментов


Мы будем использовать Docker и docker-compose. Давайте проверим, что они установлены:

docker --versiondocker-compose --version

Вывод должен выглядеть приблизительно так:

Docker version 19.03.12, build 48a66213fedocker-compose version 1.26.2, build eefe0d31

Если Docker или docker-compose не установлены, их нужно установить перед тем как продолжить. Следуйте этим руководствам:


Инструменты готовы. Переходим к структуре проекта.

Структура проекта


Создаем папку проекта и переходим в нее:

mkdir monitoring-daemon-tutorialcd monitoring-daemon-tutorial

Теперь нам нужно создать начальную структуру проекта. Создаем файлы и папки следуя структуре ниже. Все файлы пока будут пустыми. Мы наполним их позже.

Начальная структура проекта:

./ monitoringdaemon/    __init__.py    __main__.py    containers.py config.yml docker-compose.yml Dockerfile requirements.txt

Начальная структура проекта готова. Мы расширим ее с следующих секциях.

Дальше нас ждет подготовка окружения.

Подготовка окружения


В этом разделе мы подготовим окружение для запуска нашего демона.

Для начала нужно определить зависимости. Мы будем использовать такие пакеты:

  • dependency-injector dependency injection фреймворк
  • aiohttp веб фреймворк (нам нужен только http клиент)
  • pyyaml библиотека для парсинга YAML файлов, используется для чтения конфига
  • pytest фреймворк для тестирования
  • pytest-asyncio библиотека-помогатор для тестирования asyncio приложений
  • pytest-cov библиотека-помогатор для измерения покрытия кода тестами

Добавим следующие строки в файл requirements.txt:

dependency-injectoraiohttppyyamlpytestpytest-asynciopytest-cov

И выполним в терминале:

pip install -r requirements.txt

Далее создаем Dockerfile. Он будет описывать процесс сборки и запуска нашего демона. Мы будем использовать python:3.8-buster в качестве базового образа.

Добавим следующие строки в файл Dockerfile:

FROM python:3.8-busterENV PYTHONUNBUFFERED=1WORKDIR /codeCOPY . /code/RUN apt-get install openssl \ && pip install --upgrade pip \ && pip install -r requirements.txt \ && rm -rf ~/.cacheCMD ["python", "-m", "monitoringdaemon"]

Последним шагом определим настройки docker-compose.

Добавим следующие строки в файл docker-compose.yml:

version: "3.7"services:  monitor:    build: ./    image: monitoring-daemon    volumes:      - "./:/code"

Все готово. Давайте запустим сборку образа и проверим что окружение настроено верно.

Выполним в терминале:

docker-compose build

Процесс сборки может занять несколько минут. В конце вы должны увидеть:

Successfully built 5b4ee5e76e35Successfully tagged monitoring-daemon:latest

После того как процесс сборки завершен запустим контейнер:

docker-compose up

Вы увидите:

Creating network "monitoring-daemon-tutorial_default" with the default driverCreating monitoring-daemon-tutorial_monitor_1 ... doneAttaching to monitoring-daemon-tutorial_monitor_1monitoring-daemon-tutorial_monitor_1 exited with code 0

Окружение готово. Контейнер запускается и завершает работу с кодом 0.

Следующим шагом мы настроим логирование и чтение файла конфигурации.

Логирование и конфигурация


В этом разделе мы настроим логирование и чтение файла конфигурации.

Начнем с добавления основной части нашего приложения контейнера зависимостей (дальше просто контейнера). Контейнер будет содержать все компоненты приложения.

Добавим первые два компонента. Это объект конфигурации и функция настройки логирования.

Отредактируем containers.py:

"""Application containers module."""import loggingimport sysfrom dependency_injector import containers, providersclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    config = providers.Configuration()    configure_logging = providers.Callable(        logging.basicConfig,        stream=sys.stdout,        level=config.log.level,        format=config.log.format,    )

Мы использовали параметры конфигурации перед тем как задали их значения. Это принцип, по которому работает провайдер Configuration.

Сначала используем, потом задаем значения.

Настройки логирования будут содержаться в конфигурационном файле.

Отредактируем config.yml:

log:  level: "INFO"  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"

Теперь определим функцию, которая будет запускать наш демон. Её обычно называют main(). Она будет создавать контейнер. Контейнер будет использован для чтения конфигурационного файла и вызова функции настройки логирования.

Отредактируем __main__.py:

"""Main module."""from .containers import ApplicationContainerdef main() -> None:    """Run the application."""    container = ApplicationContainer()    container.config.from_yaml('config.yml')    container.configure_logging()if __name__ == '__main__':    main()

Контейнер первый объект в приложении. Он используется для получения всех остальных объектов.

Логирование и чтение конфигурации настроено. В следующем разделе мы создадим диспетчер мониторинговых задач.

Диспетчер


Пришло время добавить диспетчер мониторинговых задач.

Диспетчер будет содержать список мониторинговых задач и контролировать их выполнение. Он будет выполнять каждую задачу в соответствии с расписанием. Класс Monitor базовый класс для мониторинговых задач. Для создания конкретных задач нужно добавлять дочерние классы и реализовывать метод check().


Добавим диспетчер и базовый класс мониторинговой задачи.

Создадим dispatcher.py и monitors.py в пакете monitoringdaemon:

./ monitoringdaemon/    __init__.py    __main__.py    containers.py    dispatcher.py    monitors.py config.yml docker-compose.yml Dockerfile requirements.txt

Добавим следующие строки в файл monitors.py:

"""Monitors module."""import loggingclass Monitor:    def __init__(self, check_every: int) -> None:        self.check_every = check_every        self.logger = logging.getLogger(self.__class__.__name__)    async def check(self) -> None:        raise NotImplementedError()

и в файл dispatcher.py:

""""Dispatcher module."""import asyncioimport loggingimport signalimport timefrom typing import Listfrom .monitors import Monitorclass Dispatcher:    def __init__(self, monitors: List[Monitor]) -> None:        self._monitors = monitors        self._monitor_tasks: List[asyncio.Task] = []        self._logger = logging.getLogger(self.__class__.__name__)        self._stopping = False    def run(self) -> None:        asyncio.run(self.start())    async def start(self) -> None:        self._logger.info('Starting up')        for monitor in self._monitors:            self._monitor_tasks.append(                asyncio.create_task(self._run_monitor(monitor)),            )        asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)        asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)        await asyncio.gather(*self._monitor_tasks, return_exceptions=True)        self.stop()    def stop(self) -> None:        if self._stopping:            return        self._stopping = True        self._logger.info('Shutting down')        for task, monitor in zip(self._monitor_tasks, self._monitors):            task.cancel()        self._logger.info('Shutdown finished successfully')    @staticmethod    async def _run_monitor(monitor: Monitor) -> None:        def _until_next(last: float) -> float:            time_took = time.time() - last            return monitor.check_every - time_took        while True:            time_start = time.time()            try:                await monitor.check()            except asyncio.CancelledError:                break            except Exception:                monitor.logger.exception('Error executing monitor check')            await asyncio.sleep(_until_next(last=time_start))

Диспетчер нужно добавить в контейнер.

Отредактируем containers.py:

"""Application containers module."""import loggingimport sysfrom dependency_injector import containers, providersfrom . import dispatcherclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    config = providers.Configuration()    configure_logging = providers.Callable(        logging.basicConfig,        stream=sys.stdout,        level=config.log.level,        format=config.log.format,    )    dispatcher = providers.Factory(        dispatcher.Dispatcher,        monitors=providers.List(            # TODO: add monitors        ),    )

Каждый компонент добавляется в контейнер.

В завершении нам нужно обновить функцию main(). Мы получим диспетчер из контейнера и вызовем его метод run().

Отредактируем __main__.py:

"""Main module."""from .containers import ApplicationContainerdef main() -> None:    """Run the application."""    container = ApplicationContainer()    container.config.from_yaml('config.yml')    container.configure_logging()    dispatcher = container.dispatcher()    dispatcher.run()if __name__ == '__main__':    main()

Теперь запустим демон и проверим его работу.

Выполним в терминале:

docker-compose up

Вывод должен выглядеть так:

Starting monitoring-daemon-tutorial_monitor_1 ... doneAttaching to monitoring-daemon-tutorial_monitor_1monitor_1  | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting upmonitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting downmonitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfullymonitoring-daemon-tutorial_monitor_1 exited with code 0

Все работает верно. Диспетчер запускается и выключается так как мониторинговых задач нет.

К концу этого раздела каркас нашего демона готов. В следующем разделе мы добавим первую мониторинговую задачу.

Мониторинг example.com


В этом разделе мы добавим мониторинговую задачу, которая будет следить за доступом к http://example.com.

Мы начнем с расширения нашей модели классов новым типом мониторинговой задачи HttpMonitor.

HttpMonitor это дочерний класс Monitor. Мы реализуем метод check(). Он будет отправлять HTTP запрос и логировать полученный ответ. Детали выполнения HTTP запроса будут делегированы классу HttpClient.


Сперва добавим HttpClient.

Создадим файл http.py в пакете monitoringdaemon:

./ monitoringdaemon/    __init__.py    __main__.py    containers.py    dispatcher.py    http.py    monitors.py config.yml docker-compose.yml Dockerfile requirements.txt

И добавим в него следующие строки:

"""Http client module."""from aiohttp import ClientSession, ClientTimeout, ClientResponseclass HttpClient:    async def request(self, method: str, url: str, timeout: int) -> ClientResponse:        async with ClientSession(timeout=ClientTimeout(timeout)) as session:            async with session.request(method, url) as response:                return response

Далее нужно добавить HttpClient в контейнер.

Отредактируем containers.py:

"""Application containers module."""import loggingimport sysfrom dependency_injector import containers, providersfrom . import http, dispatcherclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    config = providers.Configuration()    configure_logging = providers.Callable(        logging.basicConfig,        stream=sys.stdout,        level=config.log.level,        format=config.log.format,    )    http_client = providers.Factory(http.HttpClient)    dispatcher = providers.Factory(        dispatcher.Dispatcher,        monitors=providers.List(            # TODO: add monitors        ),    )

Теперь мы готовы добавить HttpMonitor. Добавим его в модуль monitors.

Отредактируем monitors.py:

"""Monitors module."""import loggingimport timefrom typing import Dict, Anyfrom .http import HttpClientclass Monitor:    def __init__(self, check_every: int) -> None:        self.check_every = check_every        self.logger = logging.getLogger(self.__class__.__name__)    async def check(self) -> None:        raise NotImplementedError()class HttpMonitor(Monitor):    def __init__(            self,            http_client: HttpClient,            options: Dict[str, Any],    ) -> None:        self._client = http_client        self._method = options.pop('method')        self._url = options.pop('url')        self._timeout = options.pop('timeout')        super().__init__(check_every=options.pop('check_every'))    @property    def full_name(self) -> str:        return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url)    async def check(self) -> None:        time_start = time.time()        response = await self._client.request(            method=self._method,            url=self._url,            timeout=self._timeout,        )        time_end = time.time()        time_took = time_end - time_start        self.logger.info(            'Response code: %s, content length: %s, request took: %s seconds',            response.status,            response.content_length,            round(time_took, 3)        )

У нас все готово для добавления проверки http://example.com. Нам нужно сделать два изменения в контейнере:

  • Добавить фабрику example_monitor.
  • Передать example_monitor в диспетчер.

Отредактируем containers.py:

"""Application containers module."""import loggingimport sysfrom dependency_injector import containers, providersfrom . import http, monitors, dispatcherclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    config = providers.Configuration()    configure_logging = providers.Callable(        logging.basicConfig,        stream=sys.stdout,        level=config.log.level,        format=config.log.format,    )    http_client = providers.Factory(http.HttpClient)    example_monitor = providers.Factory(        monitors.HttpMonitor,        http_client=http_client,        options=config.monitors.example,    )    dispatcher = providers.Factory(        dispatcher.Dispatcher,        monitors=providers.List(            example_monitor,        ),    )

Провайдер example_monitor имеет зависимость от значений конфигурации. Давайте добавим эти значения:

Отредактируем config.yml:

log:  level: "INFO"  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"monitors:  example:    method: "GET"    url: "http://example.com"    timeout: 5    check_every: 5

Все готово. Запускаем демон и проверяем работу.

Выполняем в терминале:

docker-compose up

И видим подобный вывод:

Starting monitoring-daemon-tutorial_monitor_1 ... doneAttaching to monitoring-daemon-tutorial_monitor_1monitor_1  | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting upmonitor_1  | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Checkmonitor_1  |     GET http://example.commonitor_1  |     response code: 200monitor_1  |     content length: 648monitor_1  |     request took: 0.067 secondsmonitor_1  |monitor_1  | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Checkmonitor_1  |     GET http://example.commonitor_1  |     response code: 200monitor_1  |     content length: 648monitor_1  |     request took: 0.073 seconds

Наш демон может следить за наличием доступа к http://example.com.

Давайте добавим мониторинг https://httpbin.org.

Мониторинг httpbin.org


В этом разделе мы добавим мониторинговую задачу, которая будет следить за доступом к http://example.com.

Добавление мониторинговой задачи для https://httpbin.org будет сделать легче, так как все компоненты уже готовы. Нам просто нужно добавить новый провайдер в контейнер и обновить конфигурацию.

Отредактируем containers.py:

"""Application containers module."""import loggingimport sysfrom dependency_injector import containers, providersfrom . import http, monitors, dispatcherclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    config = providers.Configuration()    configure_logging = providers.Callable(        logging.basicConfig,        stream=sys.stdout,        level=config.log.level,        format=config.log.format,    )    http_client = providers.Factory(http.HttpClient)    example_monitor = providers.Factory(        monitors.HttpMonitor,        http_client=http_client,        options=config.monitors.example,    )    httpbin_monitor = providers.Factory(        monitors.HttpMonitor,        http_client=http_client,        options=config.monitors.httpbin,    )    dispatcher = providers.Factory(        dispatcher.Dispatcher,        monitors=providers.List(            example_monitor,            httpbin_monitor,        ),    )

Отредактируем config.yml:

log:  level: "INFO"  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"monitors:  example:    method: "GET"    url: "http://example.com"    timeout: 5    check_every: 5  httpbin:    method: "GET"    url: "https://httpbin.org/get"    timeout: 5    check_every: 5

Запустим демон и проверим логи.

Выполним в терминале:

docker-compose up

И видим подобный вывод:

Starting monitoring-daemon-tutorial_monitor_1 ... doneAttaching to monitoring-daemon-tutorial_monitor_1monitor_1  | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting upmonitor_1  | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Checkmonitor_1  |     GET http://example.commonitor_1  |     response code: 200monitor_1  |     content length: 648monitor_1  |     request took: 0.077 secondsmonitor_1  |monitor_1  | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Checkmonitor_1  |     GET https://httpbin.org/getmonitor_1  |     response code: 200monitor_1  |     content length: 310monitor_1  |     request took: 0.18 secondsmonitor_1  |monitor_1  | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Checkmonitor_1  |     GET http://example.commonitor_1  |     response code: 200monitor_1  |     content length: 648monitor_1  |     request took: 0.066 secondsmonitor_1  |monitor_1  | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Checkmonitor_1  |     GET https://httpbin.org/getmonitor_1  |     response code: 200monitor_1  |     content length: 310monitor_1  |     request took: 0.126 seconds

Функциональная часть завершена. Демон следит за наличием доступа к http://example.com и https://httpbin.org.

В следующем разделе мы добавим несколько тестов.

Тесты


Было бы неплохо добавить несколько тестов. Давайте сделаем это.

Создаем файл tests.py в пакете monitoringdaemon:

./ monitoringdaemon/    __init__.py    __main__.py    containers.py    dispatcher.py    http.py    monitors.py    tests.py config.yml docker-compose.yml Dockerfile requirements.txt

и добавляем в него следующие строки:

"""Tests module."""import asyncioimport dataclassesfrom unittest import mockimport pytestfrom .containers import ApplicationContainer@dataclasses.dataclassclass RequestStub:    status: int    content_length: int@pytest.fixturedef container():    container = ApplicationContainer()    container.config.from_dict({        'log': {            'level': 'INFO',            'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s',        },        'monitors': {            'example': {                'method': 'GET',                'url': 'http://fake-example.com',                'timeout': 1,                'check_every': 1,            },            'httpbin': {                'method': 'GET',                'url': 'https://fake-httpbin.org/get',                'timeout': 1,                'check_every': 1,            },        },    })    return container@pytest.mark.asyncioasync def test_example_monitor(container, caplog):    caplog.set_level('INFO')    http_client_mock = mock.AsyncMock()    http_client_mock.request.return_value = RequestStub(        status=200,        content_length=635,    )    with container.http_client.override(http_client_mock):        example_monitor = container.example_monitor()        await example_monitor.check()    assert 'http://fake-example.com' in caplog.text    assert 'response code: 200' in caplog.text    assert 'content length: 635' in caplog.text@pytest.mark.asyncioasync def test_dispatcher(container, caplog, event_loop):    caplog.set_level('INFO')    example_monitor_mock = mock.AsyncMock()    httpbin_monitor_mock = mock.AsyncMock()    with container.example_monitor.override(example_monitor_mock), \            container.httpbin_monitor.override(httpbin_monitor_mock):        dispatcher = container.dispatcher()        event_loop.create_task(dispatcher.start())        await asyncio.sleep(0.1)        dispatcher.stop()    assert example_monitor_mock.check.called    assert httpbin_monitor_mock.check.called

Для запуска тестов выполним в терминале:

docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon

Должен получиться подобный результат:

platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1rootdir: /codeplugins: asyncio-0.14.0, cov-2.10.0collected 2 itemsmonitoringdaemon/tests.py ..                                    [100%]----------- coverage: platform linux, python 3.8.3-final-0 -----------Name                             Stmts   Miss  Cover----------------------------------------------------monitoringdaemon/__init__.py         0      0   100%monitoringdaemon/__main__.py         9      9     0%monitoringdaemon/containers.py      11      0   100%monitoringdaemon/dispatcher.py      43      5    88%monitoringdaemon/http.py             6      3    50%monitoringdaemon/monitors.py        23      1    96%monitoringdaemon/tests.py           37      0   100%----------------------------------------------------TOTAL                              129     18    86%

Обратите внимание как в тесте test_example_monitor мы подменяем HttpClient моком с помощью метода .override(). Таким образом можно переопределить возвращаемое значения любого провайдера.

Такие же действия выполняются в тесте test_dispatcher для подмены моками мониторинговых задач.


Заключение


Мы построили мониторинг демон на базе asyncio применяя принцип dependency injection. Мы использовали Dependency Injector в качестве dependency injection фреймворка.

Преимущество, которое вы получаете с Dependency Injector это контейнер.

Контейнер начинает окупаться, когда вам нужно понять или изменить структуру приложения. С контейнером это легко, потому что все компоненты приложения и их зависимости в одном месте:

"""Application containers module."""import loggingimport sysfrom dependency_injector import containers, providersfrom . import http, monitors, dispatcherclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    config = providers.Configuration()    configure_logging = providers.Callable(        logging.basicConfig,        stream=sys.stdout,        level=config.log.level,        format=config.log.format,    )    http_client = providers.Factory(http.HttpClient)    example_monitor = providers.Factory(        monitors.HttpMonitor,        http_client=http_client,        options=config.monitors.example,    )    httpbin_monitor = providers.Factory(        monitors.HttpMonitor,        http_client=http_client,        options=config.monitors.httpbin,    )    dispatcher = providers.Factory(        dispatcher.Dispatcher,        monitors=providers.List(            example_monitor,            httpbin_monitor,        ),    )


Контейнер как карта вашего приложения. Вы всегда знайте что от чего зависит.

Что дальше?


Подробнее..

Собираем данные AlphaVantage с Faust. Часть 1. Подготовка и введение

20.09.2020 14:22:27 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть I Введение

20.09.2020 16:05:00 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть II Агенты и Команды

23.09.2020 04:06:02 | Автор: admin

Оглавление

  1. Часть I: Введение

  2. Часть II: Агенты и Команды

Что мы тут делаем?

Итак-итак, вторая часть. Как и писалось ранее, в ней мы сделаем следующее:

  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.

  2. Сделаем агента, который будет собирать данные о ценных бумагах и мета информацию по ним.

Но, это то, что мы сделаем для самого проекта, а в плане исследования faust мы узнаем, как писать агентов, обрабатывающих стрим событий из kafka, а так же как написать команды (обёртка на click), в нашем случаи - для ручного пуша сообщения в топик, за которым следит агент.

Подготовка

Клиент AlphaVantage

Для начала, напишем небольшой aiohttp клиентик для запросов на alphavantage.

alphavantage.py

Spoiler
import urllib.parse as urlparsefrom io import StringIOfrom typing import Any, Dict, List, Unionimport aiohttpimport pandas as pdimport stringcasefrom loguru import loggerfrom horton.config import API_ENDPOINTclass AlphaVantageClient:    def __init__(        self,        session: aiohttp.ClientSession,        api_key: str,        api_endpoint: str = API_ENDPOINT,    ):        self._query_params = {"datatype": "json", "apikey": api_key}        self._api_endpoint = api_endpoint        self._session = session    @logger.catch    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:        formatted_data = {}        for field, item in data.items():            formatted_data[stringcase.snakecase(field)] = item        return formatted_data    @logger.catch    async def _construct_query(        self, function: str, to_json: bool = True, **kwargs    ) -> Union[Dict[str, Any], str]:        path = "query/"        async with self._session.get(            urlparse.urljoin(self._api_endpoint, path),            params={"function": function, **kwargs, **self._query_params},        ) as response:            data = (await response.json()) if to_json else (await response.text())            if to_json:                data = self._format_fields(data)        return data    @logger.catch    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)        data = pd.read_csv(StringIO(data))        securities = data.to_dict("records")        for index, security in enumerate(securities):            security = self._format_fields(security)            security["_type"] = "physical"            securities[index] = security        return securities    @logger.catch    async def get_security_overview(self, symbol: str) -> Dict[str, str]:        return await self._construct_query("OVERVIEW", symbol=symbol)    @logger.catch    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query(            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"        )    @logger.catch    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)    @logger.catch    async def get_indicator_data(        self, symbol: str, indicator: str, **indicator_options    ) -> Dict[str, Any]:        return await self._construct_query(            indicator, symbol=symbol, **indicator_options        )

Собственно по нему всё ясно:

  1. API AlphaVantage достаточно просто и красиво спроектирована, поэтому все запросы я решил проводить через метод construct_query где в свою очередь идёт http вызов.

  2. Все поля я привожу к snake_case для удобства.

  3. Ну и декорация logger.catch для красивого и информативного вывода трейсбека.

P.S. Незабываем локально добавить токен alphavantage в config.yml, либо экспортировать переменную среды HORTON_SERVICE_APIKEY. Получаем токен тут.

CRUD-класс

У нас будет коллекция securities для хранения мета информации о ценных бумагах.

database/security.py

Тут по-моему ничего пояснять не нужно, а базовый класс сам по себе достаточно прост.

get_app()

Добавим функцию создания объекта приложения в app.py

Spoiler
import faustfrom horton.config import KAFKA_BROKERSdef get_app():    return faust.App("horton", broker=KAFKA_BROKERS)

Пока у нас будет самое простое создание приложения, чуть позже мы его расширим, однако, чтобы не заставлять вас ждать, вот референсы на App-класс. На класс settings тоже советую взглянуть, так как именно он отвечает за большую часть настроек.

Основная часть

Агент сбора и сохранения списка ценных бумаг

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Так, сначала получаем объект faust-приложения - это достаточно просто. Далее, мы явно объявляем топик для нашего агента... Тут стоит упомянуть, что это такое, что за параметр internal и как это можно устроить по-другому.

  1. Топики в kafka, если мы хотим узнать точное определение, то лучше прочитать офф. доку, либо можно прочитать конспект на хабре на русском, где так же всё достаточно точно отражено :)

  2. Параметр internal, достаточно хорошо описанный в доке faust, позволяет нам настраивать топик прямо в коде, естественно, имеются ввиду параметры, предусмотренные разработчиками faust, например: retention, retention policy (по-умолчанию delete, но можно установить и compact), кол-во партиций на топик (partitions, чтобы сделать, например, меньшее чем глобальное значение приложения faust).

  3. Вообще, агент может создавать сам управляемый топик с глобальными значениями, однако, я люблю объявлять всё явно. К тому же, некоторые параметры (например, кол-во партиций или retention policy) топика в объявлении агента настроить нельзя.

    Вот как это могло было выглядеть без ручного определения топика:

app = get_app()@app.agent()async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Ну а теперь, опишем, что будет делать наш агент :)

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for _ in stream:            logger.info("Start collect securities")            client = AlphaVantageClient(session, API_KEY)            securities = await client.get_securities()            for security in securities:                await SecurityCRUD.update_one(                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True                )            yield True

Итак, в начале агента мы открываем aiohttp сессию для запросов через наш клиент. Таким образом, при запуске воркера, когда будет запущен наш агент, сразу же будет открыта сессия - одна, на всё время работы воркера (или несколько, если изменить параметр concurrency у агента с дефолтной единички).

Далее, мы идём по стриму (сообщение мы помещаем в _, так как нам, в данном агенте, безразлично содержание) сообщений из нашего топика, если они есть при текущем сдвиге (offset), иначе, наш цикл будет ожидать их поступления. Ну а внутри нашего цикла, мы логируем поступление сообщения, получаем список активных (get_securities возвращает по-умолчания только active, см. код клиента) ценных бумаг и сохраняем его в базу, проверяя при этом, есть ли бумага с таким тикером и биржей в БД, если есть, то она (бумага) просто обновится.

Запустим наше творение!

> docker-compose up -d... Запуск контейнеров ...> faust -A horton.agents worker --without-web -l info

P.S. Возможности веб-компонента faust я рассматривать в статьях не буду, поэтому выставляем соответствующий флаг.

В нашей команде запуска мы указали faust'у, где искать объект приложения и что делать с ним (запустить воркер) с уровнем вывода логов info. Получаем следующий вывод:

Spoiler
aS v1.10.4 id           horton                                             transport    [URL('kafka://localhost:9092')]                    store        memory:                                            log          -stderr- (info)                                    pid          1271262                                            hostname     host-name                                          platform     CPython 3.8.2 (Linux x86_64)                       drivers                                                           transport  aiokafka=1.1.6                                       web        aiohttp=3.6.2                                      datadir      /path/to/project/horton-data                       appdir       /path/to/project/horton-data/v1                   ... логи, логи, логи ...Topic Partition Set topic                       partitions  collect_securities          {0-7}       horton-__assignor-__leader  {0}         

Оно живое!!!

Посмотрим на partition set. Как мы видим, был создан топик с именем, которое мы обозначили в коде, кол-во партиций дефолтное (8, взятое из topic_partitions - параметра объекта приложения), так как у нашего топика мы индивидуальное значение (через partitions) не указывали. Запущенному агенту в воркере отведены все 8 партициций, так как он единственный, но об этом будет подробнее в части про кластеринг.

Что же, теперь можем зайти в другое окно терминала и отправить пустое сообщение в наш топик:

> faust -A horton.agents send @collect_securities{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. с помощью @ мы показываем, что посылаем сообщение в топик с именем "collect_securities".

В данном случае, сообщение ушло в 6 партицию - это можно проверить, зайдя в kafdrop на localhost:9000

Перейдя в окно терминала с нашим воркером, мы увидим радостное сообщение, посланное с помощью loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Так же, можем заглянуть в mongo (с помощью Robo3T или Studio3T) и увидеть, что ценные бумаги в базе:

Я не миллиардер, а потому, довольствуемся первым вариантом просмотра.

Счастье и радость - первый агент готов :)

Агент готов, да здравствует новый агент!

Да, господа, нами пройдена только 1/3 пути, уготованного этой статьёй, но не унывайте, так как сейчас будет уже легче.

Итак, теперь нам нужен агент, который собирает мета информацию и складывает её в документ коллекции:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[?],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            ...

Так как этот агент будет обрабатывать информацию о конкретной security, нам нужно в сообщении указать тикер (symbol) этой бумаги. Для этого в faust существуют Records - классы, декларирующие схему сообщения в топике агента.

В таком случае перейдём в records.py и опишем, как должно выглядеть сообщение у этого топика:

import faustclass CollectSecurityOverview(faust.Record):    symbol: str    exchange: str

Как вы уже могли догадаться, faust для описания схемы сообщения использует аннотацию типов в python, поэтому и минимальная версия, поддерживаемая библиотекой - 3.6.

Вернёмся к агенту, установим типы и допишем его:

collect_security_overview_topic = app.topic(    "collect_security_overview", internal=True, value_type=CollectSecurityOverview)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[CollectSecurityOverview],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            logger.info(                "Start collect security [{symbol}] overview", symbol=event.symbol            )            client = AlphaVantageClient(session, API_KEY)            security_overview = await client.get_security_overview(event.symbol)            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)            yield True

Как видите, мы передаём в метод инициализации топика новый параметр со схемой - value_type. Далее, всё по той же самой схеме, поэтому останавливаться на чём то ещё - смысла не вижу.

Ну что же, последний штрих - добавим в collect_securitites вызов агента сбора мета информации:

....for security in securities:    await SecurityCRUD.update_one({            "symbol": security["symbol"],            "exchange": security["exchange"]        },        security,        upsert = True,    )    await collect_security_overview.cast(        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])    )....

Используем ранее объявлению схему для сообщения. В данном случае, я использовал метод .cast, так как нам не нужно ожидать результат от агента, но стоит упомянуть, что способов послать сообщение в топик:

  1. cast - не блокирует, так как не ожидает результата. Нельзя послать результат в другой топик сообщением.

  2. send - не блокирует, так как не ожидает результата. Можно указать агента в топик которого уйдёт результат.

  3. ask - ожидает результата. Можно указать агента в топик которого уйдёт результат.

Итак, на этом с агентами на сегодня всё!

Команда мечты

Последнее, что я обещал написать в этой части - команды. Как уже говорилось ранее, команды в faust - это обёртка над click. Фактически faust просто присоединяет нашу кастомную команду к своему интерфейсу при указании ключа -A

После объявленных агентов в agents.py добавим функцию с декоратором app.command, вызывающую метод cast у collect_securitites:

@app.command()async def start_collect_securities():    """Collect securities and overview."""    await collect_securities.cast()

Таким образом, если мы вызовем список команд, в нём будет и наша новая команда:

> faust -A horton.agents --help....Commands:  agents                    List agents.  clean-versions            Delete old version directories.  completion                Output shell completion to be evaluated by the...  livecheck                 Manage LiveCheck instances.  model                     Show model detail.  models                    List all available models as a tabulated list.  reset                     Delete local table state.  send                      Send message to agent/topic.  start-collect-securities  Collect securities and overview.  tables                    List available tables.  worker                    Start worker instance for given app.

Ею мы можем воспользоваться, как любой другой, поэтому перезапустим faust воркер и начнём полноценный сбор ценных бумаг:

> faust -A horton.agents start-collect-securities

Что будет дальше?

В следующей части мы, на примере оставшихся агентов, рассмотрим, механизм sink для поиска экстремум в ценах закрытия торгов за год и cron-запуск агентов.

На сегодня всё! Спасибо за прочтение :)

Код этой части

P.S. Под прошлой частью меня спросили про faust и confluent kafka (какие есть у confluent фичи). Кажется, что confluent во многом функциональнее, но дело в том, что faust не имеет полноценной поддержки клиента для confluent - это следует из описания ограничений клиентов в доке.

Подробнее..

Шаблонные функции в Python, которые могут выполнятся синхронно и асинхронно

02.11.2020 00:07:43 | Автор: admin
image

Сейчас практически каждый разработчик знаком с понятием асинхронность в программировании. В эру, когда информационные продукты настолько востребованы, что вынуждены обрабатывать одновременно огромное количество запросов и также параллельно взаимодействовать с большим набором других сервисов без асинхронного программирования никуда. Потребность оказалась такой большой, что был даже создан отдельный язык, главной фишкой которого (помимо минималистичности) является очень оптимизированная и удобная работа с параллельным/конкурентным кодом, а именно Golang. Несмотря на то, что статья совершенно не про него, я буду часто делать сравнения и ссылаться. Но вот в Python, про который и пойдёт речь в этой статье есть некоторые проблемы, которые я опишу и предложу решение одной из них. Если заинтересовала эта тема прошу под кат.



Так получилось, что мой любимый язык, с помощью которого я работаю, реализую пет-проекты и даже отдыхаю и расслабляюсь это Python. Я бесконечно покорён его красотой и простотой, его очевидностью, за которой с помощью различного рода синтаксического сахара скрываются огромные возможности для лаконичного описания практически любой логики, на которую способно человеческое воображение. Где-то даже читал, что Python называют сверхвысокоуровневым языком, так как с его помощью можно описывать такие абстракции, которые на других языках описать будет крайне проблематично.

Но есть один серьёзный нюанс Python очень тяжело вписывается в современные представления о языке с возможностью реализации параллельной/конкурентной логики. Язык, идея которого зародилась ещё в 80-ых годах и являющийся ровесником Java до определённого времени не предполагал выполнение какого-либо кода конкурентно. Если JavaScript изначально требовал конкурентности для неблокирующей работы в браузере, а Golang совсем свежий язык с реальным пониманием современных потребностей, то перед Python таких задач ранее не стояло.

Это, конечно же, моё личное мнение, но мне кажется, что Python очень сильно опоздал с реализацией асинхронности, так как появление встроенной библиотеки asyncio было, скорее, реакцией на появление других реализаций конкуретного выполнения кода для Python. По сути, asyncio создан для поддержки уже существующих реализаций и содержит не только собственную реализацию событийного цикла, но также и обёртку для других асинхронных библиотек, тем самым предлагая общий интерфейс для написания асинхронного кода. И Python, который изначально создавался как максимально лаконичный и читабельный язык из-за всех перечисленных выше факторов при написании асинхронного кода становится нагромождением декораторов, генераторов и функций. Ситуацию немного исправило добавление специальных директив async и await (как в JavaScript, что важно) (исправил, спасибо пользователю tmnhy), но общие проблемы остались.

Я не буду перечислять их все и остановлюсь на одной, которую я и попытался решить: это описание общей логики для асинхронного и синхронного выполнения. Например, если я захочу в Golang запустить функцию параллельно, то мне достаточно просто вызвать функцию с директивой go:

Параллельное выполнение function в Golang
package mainimport "fmt"func function(index int) {    fmt.Println("function", index)}func main() {    for i := 0; i < 10; i++ {         go function(i)    }    fmt.Println("end")}


При этом в Golang я могу запустить эту же функцию синхронно:

Последовательное выполнение function в Golang
package mainimport "fmt"func function(index int) {    fmt.Println("function", index)}func main() {    for i := 0; i < 10; i++ {         function(i)    }    fmt.Println("end")}


В Python все корутины (асинхронные функции) основаны на генераторах и переключение между ними происходит во время вызова блокирующих функций, возвращая управление событийному циклу с помощью директивы yield. Честно признаюсь, я не знаю, как работает параллельность/конкурентность в Golang, но не ошибусь, если скажу, что работает это совсем не так, как в Python. Несмотря даже на существующие различия во внутренностях реализации компилятора Golang и интерпретатора CPython и на недопустимость сравнения параллельности/конкурентности в них, всё же я это сделаю и обращу внимание не на само выполнение, а именно на синтаксис. В Python я не могу взять функцию и запустить её параллельно/конкурентно одним оператором. Чтобы моя функция смогла работать асинхронно, я должен явно прописать перед её объявлением async и после этого она уже не является просто функцией, она является уже корутиной. И я не могу без дополнительных действий смешивать их вызовы в одном коде, потому что функция и корутина в Python совсем разные вещи, несмотря на схожесть в объявлении.

def func1(a, b):    func2(a + b)    await func3(a - b)  # Ошибка, так как await может выполняться только в корутинах

Моей основной проблемой оказалась необходимость разрабатывать логику, которая может работать и синхронно, и асинхронно. Простым примером является моя библиотека по взаимодействию с Instagram, которую я давно забросил, но сейчас снова за неё взялся (что и сподвигло меня на поиск решения). Я хотел реализовать в ней возможность работать с API не только синхронно, но и асинхронно, и это было не просто желание при сборе данных в Интернет можно отправить большое количество запросов асинхронно и быстрее получить ответ на них всех, но при этом массивный сбор данных не всегда нужен. В данный момент в библиотеке реализовано следующее: для работы с Instagram есть 2 класса, один для синхронной работы, другой для асинхронной. В каждом классе одинаковый набор методов, только в первом методы синхронные, а во втором асинхронные. Каждый метод выполняет одно и то же за исключением того, как отправляются запросы в Интернет. И только из-за различий одного блокирующего действия мне пришлось практически полностью продублировать логику в каждом методе. Выглядит это примерно так:

class WebAgent:    def update(self, obj=None, settings=None):        ...        response = self.get_request(path=path, **settings)        ...class AsyncWebAgent:    async def update(self, obj=None, settings=None):        ...        response = await self.get_request(path=path, **settings)        ...

Всё остальное в методе update и в корутине update абсолютно идентичное. А как многие знают, дублирование кода добавляет очень много проблем, особенно остро это ощущается в исправлении багов и тестировании.

Для решения этой проблемы я написал собственную библиотеку pySyncAsync. Идея такова вместо обычных функций и корутин реализуется генератор, в дальнейшем я буду называть его шаблоном. Для того, чтобы выполнить шаблон его нужно сгенерировать как обычную функцию или как корутину. Шаблон при выполнении в тот момент, когда ему нужно выполнить внутри себя асинхронный или синхронный код возвращает с помощью yield специальный объект Call, который указывает, что именно вызвать и с какими аргументами. В зависимости от того, как будет сгенерирован шаблон как функция или как корутина таким образом и будут выполнятся методы, описанные в объекте Call.

Покажу небольшой пример шаблона, который предполагает возможность делать запросы в google:

Пример запросов в google с помощью pySyncAsync
import aiohttpimport requestsimport pysyncasync as psa# Регистрируем функцию для синхронного запроса в google# В скобочках указываем имя для дальнейшего указания в объекте Call@psa.register("google_request")def sync_google_request(query, start):    response = requests.get(        url="http://personeltest.ru/aways/google.com/search",        params={"q": query, "start": start},    )    return response.status_code, dict(response.headers), response.text# Регистрируем корутину для асинхронного запроса в google# В скобочках указываем имя для дальнейшенго указания в объекте Call@psa.register("google_request")async def async_google_request(query, start):    params = {"q": query, "start": start}    async with aiohttps.ClientSession() as session:        async with session.get(url="http://personeltest.ru/aways/google.com/search", params=params) as response:            return response.status, dict(response.headers), await response.text()# Шаблон для получения первых 100 результатовdef google_search(query):    start = 0    while start < 100:        # В Call аргументы передавать можно как угодно, они так же и будут переданы в google_request        call = Call("google_request", query, start=start)        yield call        status, headers, text = call.result        print(status)        start += 10if __name__ == "__main__":    # Синхронный запуск кода    sync_google_search = psa.generate(google_search, psa.SYNC)    sync_google_search("Python sync")    # Асинхронный запуск кода    async_google_search = psa.generate(google_search, psa.ASYNC)    loop = asyncio.get_event_loop()    loop.run_until_complete(async_google_search("Python async"))


Расскажу немного про внутреннее устройство библиотеки. Есть класс Manager, в котором регистрируются функции и корутины для вызова с помощью Call. Также есть возможность регистрировать шаблоны, но это необязательно. У класса Manager есть методы register, generate и template. Эти же методы в примере выше вызывались напрямую из pysyncasync, только они использовали глобальный экземпляр класса Manager, который уже создан в одном из модулей библиотеки. По факту можно создать свой экземпляр и от него вызывать методы register, generate и template, таким образом изолируя менеджеры друг от друга, если, например, возможен конфликт имён.

Метод register работает как декоратор и позволяет зарегистрировать функцию или корутину для дальнейшего вызова из шаблона. Декоратор register принимает в качестве аргумента имя, под которым в менеджере регистрируется функция или корутина. Если имя не указано, то функция или корутина регистрируется под своим именем.

Метод template позволяет зарегистрировать генератор как шаблон в менеджере. Это нужно для того, чтобы была возможность получить шаблон по имени.

Метод generate позволяет на основе шаблона сгенерировать функцию или корутину. Принимает два аргумента: первый имя шаблона или сам шаблон, второй sync или async во что генерировать шаблон в функцию или в корутину. На выходе метод generate отдаёт готовую функцию или корутину.

Приведу пример генерации шаблона, например, в корутину:

def _async_generate(self, template):    async def wrapper(*args, **kwargs):        ...        for call in template(*args, **kwargs):            callback = self._callbacks.get(f"{call.name}:{ASYNC}")            call.result = await callback(*call.args, **call.kwargs)        ...    return wrapper

Внутри генерируется корутина, которая просто итерируется по генератору и получает объекты класса Call, потом берёт ранее зарегистрированную корутину по имени (имя берёт из call), вызывает её с аргументами (которые тоже берёт из call) и результат выполнения этой корутины также сохраняет в call.

Объекты класса Call являются просто контейнерами для сохранения информации о том, что и как вызывать и также позволяют сохранить в себе результат. wrapper также может вернуть результат выполнения шаблона, для этого шаблон оборачивается в специальный класс Generator, который здесь не показан.

Некоторые нюансы я опустил, но суть, надеюсь, в общем донёс.

Если быть честным, эта статья была написана мною скорее для того, чтобы поделится своими мыслями о решении проблем с асинхронным кодом в Python и, самое главное, выслушать мнения хабравчан. Возможно, кого-то я натолкну на другое решение, возможно, кто-то не согласится именно с данной реализацией и подскажет, как можно сделать её лучше, возможно, кто-то расскажет, почему такое решение вообще не нужно и не стоит смешивать синхронный и асинхронный код, мнение каждого из вас для меня очень важно. Также я не претендую на истинность всех моих рассуждений в начале статьи. Я очень обширно размышлял на тему других ЯП и мог ошибиться, плюс есть возможность, что я могу путать понятия, прошу, если вдруг будут замечены какие-то несоответствия опишите в комментарии. Также буду рад, если будут поправки по синтаксису и пунктуации.

И спасибо за внимание к данному вопросу и к этой статье в частности!
Подробнее..
Категории: Python , Asynchronous , Asyncio , Generators

Телеграм бот для автоматизации обменника криптовалюты

13.02.2021 14:22:01 | Автор: admin

Вместо предисловия

В этой статье я буду в общих чертах рассказывать про то, в каком направлении нужно двигаться, чтобы сделать полуавтоматический обменник криптовалюты с возможностью управлять сделками с любого устройства в любой точке планеты 24/7. Вы не найдете здесь деталей реалиализации, т.к. этот материал предназначен скорее для получения базового набора знаний, необходимых для запуска такого стартапа.

Полуавтоматический обменник криптовалюты.

Как-то очень давно я у же немного писал про использование телеграм бота для автоматизации некоторых процессов. Надо сказать, что уже не мало времени прошло, но я продолжаю использовать некоторые идеи, что были изложены в том небольшом материале.

Обмен криптовалюты сегодня - это уже не просто реальность, в какой-то мере это уже потребность. Время беспощадно и теперь цифровое золото становится очень важной частью активов миллионов людей. Обменники в интернете бывает нескольких видов, основные из которых:

  • полуавтоматические

  • ручные

  • автоматические

  • p2p - обменники

  • биржи

Мы поговорим про полуавтоматический вариант с возможностью расширения до p2p обменника, потому как это довольно простой и удобный способ.

Необходимый набор навыков.

Говоря довольно простой, я наверное выражаюсь несовсем корректно. В наше время навыки, которыми должен обладать разработчик, оцениваются в тысячи часов времени, которое он должен потратить на свое обучение. Я сегодня буду предельно краток, так что давайте сразу перейдем к делу. Для реализации задачи нам понадобится следующий набор иснтрументов:

  1. Linux, zsh, vim, systemd

  2. nginx, ssl

  3. ES6, Material Ui, React, eslint, webpack, scss

  4. python3, asyncio, aiohttp, peewee

  5. postgresql

  6. telegram bot api

  7. docker

И такие паттерны как:

  1. MVC - шаблон архитектуры системы

  2. Abstract Factory, Factory Method, Builder, Facade, Prototype - генерация объектов

  3. Scheduler - многопоточный постановщик задач

  4. Event Listner, State - события, сосстояния

  5. Proxy - заместитель для балансировки нагрузки

В общих чертах это вроде как все, что должно пригодится, согласитесь не мало. Давайте пробежимся теперь немного более подробно, чтобы понимать как все это барахло должно быть настроено, чтобы даже работать.

Теперь я начинаю с фронта

Если вы попробуете поискать в сети с чего начинать веб разработку - с фронтенда или с бэкэнда, вы наверное не найдете ничего более дельного, чем информация о том, что все это лучше делать параллельно и каких-то особых протоколов на этот счет нет. Т.е. фронтендер делает свою работу, бэкэндер свою, они встречаются на созвонах и в чате, обсуждают все проблемы: все хорошо. Но что, если вы собираетесь делать и фронт и бэк самостоятельно (например в случае небольшого приложения как криптообменник) - какая будет точка отправления?

Начинать лучше с фронта, потому как он может работать на моковых данных и бэкэнд ему собственно нужен только абсолютно гипотетически. Фронтенд - независимое приложение, он должен работать корректно в разных браузерах, на разных устройствах. Мы будем делать Single Page Application, а значит нам потребуется протокол взаимодействия, давайте выберем json-rpc. Для транспортного протокола используем tcp,а на прикладном уровне остановимся на http.

Дальше все довольно не трудно. Ставим Node Package Manager, создаем новое реакт приложение, добавляем туда react router,настраиваем eslint для форматирования кода, node-sass для возможности использования css препроцессора, webpack для сборки проекта.

Правильная структура проекта - залог успеха. Компоненты делаем модульными - файлы стилей лежат внутри дирректории рядом с компонентом. Компоненты по мере возможностей реализуем как stateless. Я бы пожалуй еще рекоммендовал дважды задуматься перед внедерением redux в приложение - делайте это только если вы точно уверены, что вам это нужно.

При работа с фронтом важно всегда помнить об области видимости переменных, потому как современный фронтенд это сложное многопоточное, асинхронное приложение. При этом при правильной организации файлов и компонентов, react и material ui позволяют делать все достаточно быстро. Если у вас нет готового дизайна - просто выберите сайт обменника, который вам нравится и не стесняясь копируйте его, превнося свои изменения - в дальнейшем ваш обменник все равно еще претерпит кучу изменений и не стоит сейчас беспокоится о том как это выглядит на текущей стадии.

Вот так может выглядеть ваш реакт компонент, отвечающий за рендеринг главной страницы:

Бэкэнд - это сложно, но куда веселее

Бэкэнд тоже должен быть асинхронным. P2P приложения должны быть ориентированы под высокую нагрузку, а значит сразу стоит закладывать немного больше, чем может показаться нужным. Мы будем делать монолитный бэкэнд, потому как серверная часть не будет очень большой. Микросервисы это здорово, но не всегда необходимо, и в данном случае мы не будем использовать этот подход.

asyncio позволяет работать с петлей событий, что в свою очередь предоставляет возможность асинхронного программирования и управления заданиями. В нашем случае у нас будет несколько заданий, которые должны будут работать независимо и параллельно основному приложению. Это задание на обновление курсов BTC/USD и USD/RUB, и задание, которое будет отменять устаревшие заявки на обмен валюты. Курсы вылют можно получать get запросом из апи всех популярных бирж, например coinbase, kraken, bitmex. Благо aiohttp clientпозволяет это делать в несколько строчек:

Для того, чтобы сериализовывать данные, необходимые фронту для построения интерфейса, сервер должен использовать объектно-ориентированную модель данных, проще говоря - нужно поделить наш обменник на модели, а эти модели сложить в таблицы бд, чтобы можно было строить к ней запросы.

При создании моделей стоит особое внимание уделить инкапсуляции и наследованию - хорошей идеей будет сразу создать BaseModel, в которую поменстить, например, поля created_at, updated_at и, например, datetime_serializer, который вам точно пригодится, а остальные модели наследовать от этой модели:

Для взаимодействия с блокчейн придется получить API KEY, например на blockchain.com. Хочу сразу отметить, что тут есть своего рода "подводный камень". Как работает блокчейн апи? После того, как создается транзакция, для ее завершения необходимы подтверждения от майнеров. Каждое подтверждение - это своего рода события, информацию о котором вы будете получать на свой сервер. В этом событии есть адрес кошелька, на который поступает криптовалюта. Теперь предположим, что для покупки криптовалюты в нашем обменнике мы всем пользователям будем предоставлять одинаковый кошелек для перевода. Это было бы довольно удобно, так как все биткоины были бы сосредоточены у нас на одном адресе, одной суммой. На первый взгляд. Но в таком случае при поступлении средств от пользователя на кошелек и последующих веб хуках от блокчейн на callback_url, мы не сможем определить от какого конкретно пользователя поступил платеж. Можно конечно использовать параметр в webhook url но есть еще один интересный нюанс. Нам важно знать курс по которому была совершена та или иная транзакция.Опять же, есть вариант хранить свзяь между транзакицей и курсом, но есть и альтернативное решение. Оно состоит в том, что каждому пользователю системы должен генерироваться свой уникальный BTC кошелек. И в случае, когда этот самый пользователь хочет совершить сделку в нашем обменном пункте и продавть свои кровные BTC, мы будем скидывать ему его уникальный адрес.

Дальше может покзаться, что целесообразно со всех этих адресов собирать все деньги на master wallet, но это не так, ведь за каждый перевод вы будете платить комиссию майнерам. К слову эту комиссию нужно считать ручками. Получить информацию от сети можно в любой момент:

Таким образом получается, что баланс нашего обменника сосредоточен децентрализованно на разных кошельках всех пользователей. При каждой транзакции мы записываем курс, по которому она была осуществлена, а ее статус (так же как впрочем и статус документа по этой транзакции) мы меняем в зависимости от подтверждений blockchain.К слову сразу имеет смысл подумать над реализацией классов Billing и Processing, для создания и проводки документов.

Для продажи крипты нам необходим механизм ручного взаимодействия с этими документами: нужно искать документ с выгодным на текущий момент курсом, проверять текущий баланс кошелька из этого документа, и выполнять перевод с этого кошелька на кошелек, который указывает пользователь в своей заявке на покупку в нашем обменнике. И вот мы наконец подобрались к кульминации нашего материала: к нам снова на помощь приходит телеграм.

Telegram bot

Тут все совсем не трудно. Создаем бота у @BotFather, настраиваем, берем токен, кладем его в конфиг (делаем два конфига и два бота - один на прод, один на дев).

Из этого токена мы можем создать бота и использовать его для получения важных данных, а так же для запуска протокола rpc, не требующего дополнительной авторизации и шифрования.

Теперь мы можем слать сообщения в телеграм из нашего бэкэнда, для этого мы создадим приватный чат и добавим бота администратором туда. Далее нам потребуется создать вебхук и контроллер для его обработки (читайте об этом в предыдущем материале про мониторинг). В контроллере нам необходимо создать диспетчер запросов, который будет определять обработчик.

Запросы от тг могут быть разные, нам пока нужы будут только message и callback_query (reply клавиатура и inline клавиатура).

Далее мы будем отправлять в этот чат сообщения с кнопками, которые позволят контролировать значения в базе данных. Здесь обращу внимание на race condition, и трудно-уловимые ошибки, по этому всегда используйте atomic_db_query

async with objects.atomic() as atomic_db_query:try:  pass  # some database change  except:  atomic_db_query.rollback()

Кнопки вы можете нажать после ручной модерации заявки - ну т.е. вы посмотрели, что деньги к вам на счет реально поступили, и только после этого нажали на кнопку, которая запустит механизм для завершения документооборота. Так же можно, например создать механизмsubscription, который позволит информировать подписавшихся пользователей на обновления курсов, например:

Это довольно удобно, ведь телеграм всегда под рукой, особенно после разблокировки . 24/7 все, кто находятся в приватной группе, смогут получать информационные сообщения, а так же управлять состоянием документов:

Настройка production

Нужно все это барахло завернуть в докер, настроить системные даемон для запускать юнита, в идеале конечно настроить CI-CD, но это все наверное уже детали.

В базовом варианте можно деплоится через гит, использовать переменные окружения для чтения конфигов, использовать ipython для проведения миграций в бд:

Нужно уметь использовать настраивать nginx, и понимать, как работает mod_rewrite.

location = /api/rates/rates.xml {  rewrite .* /api/rates/ last;}   

Кстати для добавления вашего обменника в мониторинги, вам понадобится xml выгрузки файла курсов, так что этот rewrite вам может еще пригодится.

Наверное вы захотите сделать какую-то админку - для этого отлично сгодится механизм Basic Auth и bootstrap admin template . Вам останется только пробросить в шаблоны необходимый контекст и немного поиграть с контролами:

Послевкусие

Это все сложновато, но в тоже время и не очень, если не наступать на грабли, которые, надо сказать, присутствуют. Не забывайте о JWT, SLL, CORS, и еще куче прелестей, которые по пути обязательно появятся у вас на пути. Но в целом это рабочая схема автоматизации механизмов, которые могут пригодится не только при создании обменника. Я не претендую на роль эксперта в этом деле, я лишь высказываю свои умозаключения, после довольно трудоемкого процесса прохождения через все вышесказанное. Не стоит принимать буквально - многое является весьма субъективным и не претендует на роль аксиомы. Я бы сказал бОльшая часть. Но под лежачий камень вода не течет, и лучшее решение на сегодня - это развитие и движение дальше.

Надеюсь кому-то покажется эта информация полезной. Просто захотелось немного поделиться переживаниями и опытом, полученным от процесса. Результат работы вы можете посмотреть на https://exbtc.pro/

Дальнейшие развитие позиционируется как p2p платформа для совершения обмена. Буду рад любым вопросам и предложениям, и большое спасибо за потраченное время.

Подробнее..

Первые шаги в aiohttp

31.05.2021 14:10:41 | Автор: admin

Введение

Привет, меня зовут Артём и я работаю бекендером в KTS. Компания уже 3 года проводит летние и зимние курсы по разработке, а в феврале этого года прошла очередная бесплатная backend-школа от KTS. В ее рамках студенты изучали инструменты и технологии, которые используют разработчики нашей компании, общались с менторами и делали итоговый проект - чат-бота в стиле Моя игра, который защищали в конце курса. После курса отличившихся студентов мы приглашали на стажировку.

Школа состояла из 6 лекций, шаг за шагом погружавших студентов в мир веб-разработки. На них были рассмотрены такие темы как сетевые протоколы, взаимодействие backend-а и frontend-а, компоненты веб-сервера и многое другое. Лейтмотивом курса было изучение асинхронного веб-программирования на Python, в частности изучение фреймворка aiohttp.

Для поступления на курс нужно было пройти комплексный тест на знания в области веба и python-а, так что студенты пришли учиться с хорошим начальным уровнем знаний. Однако, во время курса выяснилось, что не все темы даются одинаково легко. Самыми трудными для понимания темами стали:

  1. Асинхронное программирование

  2. Работа с СУБД

  3. Деплой приложения

Студенты задавали достаточно разные по уровню понимания вопросы, начиная от Как создать отложенную задачу, используя только asyncio? и заканчивая Почему нельзя использовать Django для асинхронного программирования? (имелась в виду полностью синхронная версия Django). В коде наши менторы тоже находили ошибки, связанные с недостаточным пониманием предмета, например, использование синхронного драйвера для базы данных в асинхронном проекте.

По результатам курса, я решил написать небольшой туториал, рассказывающий о создании базового aiohttp-сервиса с нуля и затрагивающий самые сложные для студентов вопросы: как сделать асинхронное python-приложение, как работать с базой данных и как разложить свой проект в интернете.

В цикле статей мы рассмотрим следующие темы:

  1. Архитектура веб-приложения

  2. Асинхронная работа с базой данных и автоматические миграции

  3. Работа с HTML-шаблонами с помощью Jinja2

  4. Размещение нашего приложения в Интернете с помощью сервиса Heroku

  5. А также сигналы, обработку ошибок, работу с Dockerом и многое другое.

Эта статья первая из трех, и ее цель помочь начинающим aiohttp-программистам написать первое hello-world приложение.

В этой статье мы напишем небольшое веб-приложение на aiohttpстену с отзывами, где пользователь может оставить мнение о продукте.

Мы пройдем по шагам:

Создание проекта

Все команды в статье были выполнены в операционной системе OSX, но также должны работать в любой *NIX системе, например в Linux Ubuntu. Во время разработки я буду использовать Python 3.7.

Давайте создадим папку aiohttp_server, которая в дальнейшем будет называться корнем проекта. В ней создадим текстовый файл requirements.txt, который будет содержать все необходимые для работы приложения зависимости и их версии. Запишем в него следующие модули:

aiohttp==3.7.3 # наш фрейворкaiohttp-jinja2==1.4.2 # модуль для работы с HTML-шаблонами

Создадим виртуальное окружение что-то вроде песочницы, которое содержит приложение со своими библиотеками, обновление и изменение которых не затронет другие приложение, и установим в него наши зависимости:

cd {путь_до_папки}/aiohttp_serverpython3 -m venv venvsource venv/bin/activate

После этого в начале строки терминала должна появится надпись (venv)это означает что виртуальное окружение успешно активировано. Установим необходимые модули:

pip install -r requirements.txt

Структура проекта

Создадим в папке aiohttp_server следующую структуру:

 app    __init__.py    forum       __init__.py       routes.py  # тут будут пути, по которым надо отправлять запросы       views.py  # тут будут функции, обрабатывающие запросы    settings.py main.py  # тут будет точка входа в приложение requirements.txt templates    index.html  # тут будет html-шаблон страницым сайта

Теперь откроем файл main.py и добавим в него следующее:

from aiohttp import web  # основной модуль aiohttpimport jinja2  # шаблонизатор jinja2import aiohttp_jinja2  # адаптация jinja2 к aiohttp# в этой функции производится настройка url-путей для всего приложенияdef setup_routes(application):   from app.forum.routes import setup_routes as setup_forum_routes   setup_forum_routes(application)  # настраиваем url-пути приложения forumdef setup_external_libraries(application: web.Application) -> None:   # указываем шаблонизатору, что html-шаблоны надо искать в папке templates   aiohttp_jinja2.setup(application, loader=jinja2.FileSystemLoader("templates"))def setup_app(application):   # настройка всего приложения состоит из:   setup_external_libraries(application)  # настройки внешних библиотек, например шаблонизатора   setup_routes(application)  # настройки роутера приложенияapp = web.Application()  # создаем наш веб-серверif __name__ == "__main__":  # эта строчка указывает, что данный файл можно запустить как скрипт   setup_app(app)  # настраиваем приложение   web.run_app(app)  # запускаем приложение

После предварительной настройки можно создать первый View.

Первый View

Viewэто некий вызываемый объект, который принимает на вход HTTP-запросRequest и возвращает на пришедший запрос HTTP-ответResponse.

Http-запрос содержит полезную информацию, например url запроса и его контекст, переданные пользователем данные и многое другое. В контексте запроса содержатся данные, которые мы или aiohttp добавили к этому запросу. Например, мы предварительно авторизовали пользователячтобы повторно не проверять авторизацию пользователя из базы во всех View и не дублировать код, мы можем добавить объект пользователя в контекст запроса. Тогда мы сможем получить нашего пользователя во View, например, так: request['user'].

HTTP-ответ включает в себя полезную нагрузку, например, данные в json, заголовки и статус ответа. В простейшем View, который из примера выше, всю работу по формированию HTTP-ответа выполняет декоратор @aiohttp_jinja2.template("index.html") . Декоратор получает данные из View, которые возвращаются в виде словаря, находит шаблон index.html (о шаблонах написано ниже), подставляет туда данные из этого словаря, преобразует шаблон в html-текст и передает его в ответ на запрос. Браузер парсит html и показывает страницу с нашим контентом.

В файле views.py в папке app/forum напишем следующий код:

import aiohttp_jinja2from aiohttp import web# создаем функцию, которая будет отдавать html-файл@aiohttp_jinja2.template("index.html")async def index(request):   return {'title': 'Пишем первое приложение на aiohttp'}

Здесь создается функциональный View (function-based View). Определение функциональный означает, что код оформлен в виде функции, а не классом (в следующей части мы коснемся и class-based View).

Рассмотрим написанную функцию детальнее: функция обернута в декоратор @aiohttp_jinja2.template("index.html")этот декоратор передает возвращенное функцией значение в шаблонизатор Jinja2, а затем возвращает сгенерированную шаблонизатором html-страницу как http-ответ. В данном случае возвращенным значением будет словарь, значения которого подставляются в html-файл index.html.

Отдельно стоит заметить, что объект запроса request передается как аргумент функции index. Мы не используем request в этой функции, но будем использовать в дальнейшем.

HTTP-запрос отправляется на конкретный url-адрес. Для передачи HTTP-запроса в нужный View необходимо задать эту связь в приложении с помощью Route.

Первый Route

Routeэто звено, связывающее адрес, по которому был отправлен запрос и код View, в котором этот запрос будет обработан. То есть, если пользователь перейдет в корень нашего сайта (по адресу /), то объект запроса будет передан в View index и оттуда же будет возвращен ответ. Подробней про Route можно прочитать тут.

В файл routes.py необходимо добавить следующий код:

from app.forum import views# настраиваем пути, которые будут вести к нашей страницеdef setup_routes(app):   app.router.add_get("/", views.index)

Первый Template

Теперь нам осталось только добавить в templates/index.html код верстку нашей страницы. Его можно найти по этой ссылке.

Templateэто html-шаблон, в который подставляются данные, полученные в результате обработки запроса. В примере в коде View отдается словарь с ключом title, шаблонизатор Jinja2 ищет в указанном html-шаблоне строки {{title}} и заменяет их на значение из словаря по данному ключу. Это простейший пример, шаблоны позволяют делать намного больше: выполнять операции ветвления, циклы и другие операции, например, суммирование. Примеры использования можно посмотреть в документации jinja2.

Запуск приложения

Мы создали первую версию нашего приложения! Осталось запустить его следующей командой в терминале (убедитесь, что находитесь в папке aiohttp_server):

python3 main.py

Вы должны увидеть следующий текст в консоли. Он означает, что сервер запущен на порту 8080.

======== Running on http://0.0.0.0:8080 ========(Press CTRL+C to quit)

Давайте теперь посмотрим результаты нашей работы! Для этого перейдите по адресу http://0.0.0.0:8080 в браузере. Вы должны увидеть первую версию нашего приложения. При клике на кнопку Отправить должно возникнуть сообщение о том, что отзыв отправлен.

Поздравляю! Вы успешно создали первое приложение на aiohttp!

Заключение

В статье рассмотрено создание простого приложения на aiohttp, которое принимает запрос пользователя и отдает html-страницу. Мы затронули:

  • Настройку виртуального окружения

  • Базовую настройку проекта на aiohttp

  • Создание View

  • Создание Route

  • Использование html-шаблонов

Наше приложение представляет собой простой веб-сервер, отдающий html-страницу по запросу - в нем нет никакого взаимодействия с базами данных, его структура максимально проста и оно недоступно пользователям в Интернете. В следующих статьях мы разберем, как вырастить из нашей заготовки настоящее веб-приложение на aiohttp и опубликовать его в Интернете.

Весь код статьи можно найти на гитхабе.

Пользуясь случаем, приглашаю всех читателей, интересующихся веб-разработкой, к нам на бесплатные занятия школу KTS. А для более опытных читателей сейчас идет запись на продвинутые курсы для backend-разработчиков, желающих повысить свои навыки в асинхронной веб-разработке. Всю информацию о всех школах можно найти на сайте, а также в нашем телеграм-чате.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru