Русский
Русский
English
Статистика
Реклама

Aiohttp

О первоклашках, дистанционке и асинхронном программировании

15.12.2020 10:11:51 | Автор: admin
image

Я дауншифтер. Так получилось, что последние три года мы с женой и младшим ребёнком наслаждаемся сельским пейзажем за окном, свежим воздухом и пением птиц. Удобства в доме, оптический интернет от местного провайдера, мощный бесперебойник и нагрянувший ковид неожиданно сделали идею переезда из мегаполиса не такой уж странной.

Пока я увлеченно занимался веб разработкой, где-то на фоне жена периодически жаловалась на проблемы выбора школы для ребёнка. И тут (вдруг) ребёнок подрос и школьный вопрос встал ребром. Ладно, значит, время пришло. Давайте вместе разберёмся, что же все-таки не так с системой образования в бывшей 1/6 части суши, и что мы с вами можем с этим сделать?

Традиционные методы очного обучения я оставлю за рамками этой статьи. Скажу только, что у обычных школ есть как неоспоримые преимущества, так и серьезные недостатки, к которым, кстати, в последнее время добавилась вынужденная самоизоляция. Здесь мы рассмотрим варианты дистанционного и семейного образования, которые, по целому ряду причин, в последнее время привлекают все больше родителей.

Внесу ясность: дистанционное обучение подразумевает занятия в обычной школе с помощью дистанционных образовательных технологий (ДОТ), а семейное означает добровольный уход из школы и обучение только силами семьи (по сути, это старый добрый экстернат). Впрочем, в любом случае ребёнка нужно прикрепить к какой-либо из доступных школ, как минимум, для сдачи промежуточных аттестаций.

А теперь немного наблюдений из жизни. С вынужденным переводом на дистанционку детей, уже учившихся в обычной школе, все грустно. Школьники воспринимают этот подарок судьбы как своего рода каникулы, родители не привыкли следить за дисциплиной во время занятий и в результате общая успеваемость неизбежно падает.

С первоклашками, особенно в случае семейной формы, у родителей, пожалуй, появляется шанс поставить ребёнка на рельсы, используя естественный интерес и эффект новизны. Лично для меня добиться самостоятельности главная задача. Сидеть и делать с ребёнком домашку я считаю верхом глупости. Конечно, если вы хотите, чтобы ваши дети чего-то добились в жизни и не висели у вас на шее. Я хочу, поэтому моя цель научить ребёнка учиться, правильно задавать вопросы и вообще, думать своей головой.

Ближе к делу. Выбираем государственную школу


Пожалуй, семейное образование мне нравится больше из-за возможности выбрать программу и график обучения. Да и физически посещать школу можно реже. Но выбрать государственную школу, поговорить с директором о прикреплении ребёнка и получить приказ о зачислении в первый класс нужно уже в конце зимы, чтобы в сентябре не было сюрпризов. Хотя, с юридической точки зрения, закон об образовании вроде бы не требует ежегодных аттестаций, дедлайны, по моему опыту, отлично мотивируют, поэтому пусть будут аттестации. Вряд ли любая школа примет нас с распростертыми объятьями, но найти достойный вариант в ближайшем городе мы сможем, я уверен.

Выбираем учебную программу


Именно выбираем. Пытаться составить программу самостоятельно, не имея профильного образования, не разумно. Хотя существуют государственные образовательные ресурсы, такие как Российская Электронная Школа (РЭШ) и Московская Электронная Школа (МЭШ), которых в теории могло было бы хватить, но Оба варианта предоставляют планы уроков, видеозаписи, тесты и учебные пособия. Вот чего мне не удалось найти, так это самих учебников, даже по обязательной программе.

И тут нет самого главного: общения. Обучить ребёнка, показывая ему бесконечные видеоролики и заставляя ставить галочки в тестах, не получится. Значит, нужно либо проводить уроки полностью самостоятельно, либо выбрать одну из онлайн школ.

Выбираем онлайн школу


Мы почти вернулись к тому, с чего начали. Дистанционка? Ладно, присмотримся к ней повнимательней. Как вообще можно организовать учебный процесс удаленно? Тут возникает много вопросов, я подниму только ключевые:

* Живое общение. Что предлагают школы? Скайп, в лучшем случае Тимс. Уроки по Скайпу? Серьёзно? Если я не ошибаюсь, на дворе 2020-й. Открыть перед первоклашкой несколько окон с красивыми разноцветными кнопочками и ждать, что он на них не нажмет, а будет пол-дня послушно слушать скучного дядю или тетю? Ни разу таких детей не видел. А вы?

* Домашка. Точнее, как она попадает к учителю на проверку? На самом деле, это действительно сложный вопрос, возможно, даже не решаемый в принципе. Существующие варианты:

  1. Написать в тетрадке, сфоткать и отправить учителю. Бр-р-р, не хочу заставлять учителей ломать глаза в попытках прочесть мутные фотки с мобильников, сделанные, как правило, по какому-то неписанному закону в темноте.
  2. Отправить скан. Полумера, в общем случае невозможная из-за отсутствия у родителей нужного оборудования.
  3. Оцифровать рукописный ввод с помощью дигитайзера или планшета. Так себе вариант, но об этом чуть позже.
  4. Напечатать текст. В принципе, допустимо, но вот как ребёнок введёт с клавиатуры, например, математическую или химическую формулу? Никак. Плюс, для более продвинутых деток, проблема с плагиатом.
  5. Выполнить онлайн тест. Это, безусловно, самый популярный вариант. Полагаю, большинство школ, включая РЭШ и МЭШ, ориентируются на него. На практике это означает скорее дрессировку, чем обучение. Дети учатся ставить галочки в правильном месте. За бортом остаются предметы, требующие любой формы творчества, например, сочинения, а также диктанты и непопулярное теперь по неведомой мне причине чистописание. Сюда же можно отнести умение отстаивать своё мнение.

* Оценки. Очевидно, выставленные на уроке и при проверке домашних заданий оценки должны попадать в электронный дневник, доступный родителям. И они туда попадают. Вот только не сразу. Я поинтересовался у старших детей, закончивших один из престижных лицеев златоглавой (по иронии судьбы, с информационным уклоном), почему так? Ответ, честно сказать, меня удивил. Оказывается, учителя записывают оценки на бумажку, а после уроков вбивают их в этот самый электронный дневник на государственном портале. И это в то время, как Теслы Илона Маска бороздят просторы космоса

Ладно, пора провести небольшое техническое исследование и проверить, может существуют объективные причины такого положения дел?

Давайте определим требования к гипотетической идеальной платформе для обучения. На самом деле, все просто: дети должны оставаться на уроке, сосредоточившись на том, что говорит и показывает учитель, при необходимости отвечая на вопросы и при желании поднимая руку. По сути, нам нужно окно на полный экран с потоком с учительской камеры, презентацией или интерактивной доской. Самый простой способ добиться этого использовать технологию WebRTC (real-time communications, коммуникации в реальном времени). Эта штука работает в любом более-менее современном браузере, не требует покупки дополнительного оборудования и, к тому же, обеспечивает хорошее качество связи. И да, этот стандарт требует асинхронного программирования как минимум потому, что необходимый JS метод navigator.mediaDevices.getUserMedia() возвращает промис. Вроде все понятно, приступаю к реализации.

Лирическое отступление о выборе фреймворка
В последнее время я все чаше слышу мнение о том, что чистый JavaScript шагнул далеко вперед, проблем с кроссбраузерностью уже нет и фреймворки на фронтэнде не нужны вообще. Особенно часто критике подвергается jQuery. Хорошо, стряхнем пыль со старого доброго JS и сравним:

// Выбрать элементelement = $(selector);element = document.querySelector(selector);// Выбрать элемент внутри элементаelement2 = element.find(selector2);element2 = element.querySelector(selector2);// Скрыть элементelement.hide();  // добавляет стиль display: noneelement.classList.add('hidden');

Тут нужно пояснить, что CSS классу hidden, при желании, можно прописать свойства opacity и transition, что даст эффект fadeIn/fadeOut на чистом CSS. Отлично, давно хотел отказаться от JS анимации!

// Слушать событие onClickelement.click(e => { ... });element.onclick = (e) => { ...  }// Переключить классelement.toggleClass(class_name);element.classList.toggle(class_name);// Создать divdiv = $("<div>");div = document.createElement("div");// Вставить созданный div в element// (это не опечатка, можно писать одинаково)element.append(div);element.append(div);

И т.д и т.п. В целом, код на чистом JS получается немного более многословным, но такова цена повышения производительности и снижения трафика. Нет, я никого не агитирую, но для эксперимента использую нативный JS с удовольствием!

WebRTC предназначен для связи между браузерами напрямую, по технологии точка-точка (p2p). Однако, чтобы установить эту связь, браузеры должны сообщить друг другу о своем намерении общаться. Для этого понадобится сервер сигнализации.

Пример базовой реализации простого видеочата, использующего топологию full mesh
'use strict';(function () {    const selfView = document.querySelector('#self-view'),        remoteMaster = document.querySelector('#remote-master'),        remoteSlaves = document.querySelector('#remote-slaves');    let localStream,        selfStream = null,        socket = null,        selfId = null,        connections = {};    // ***********************    // UserMedia & DOM methods    // ***********************    const init = async () => {        try {            let stream = await navigator.mediaDevices.getUserMedia({                audio: true, video: {                    width: { max: 640 }, height: { max: 480 }                }            });            localStream = stream;            selfStream = new MediaStream();            stream.getVideoTracks().forEach(track => {                selfStream.addTrack(track, stream); // track.kind == 'video'            });            selfView.querySelector('video').srcObject = selfStream;        } catch (e) {            document.querySelector('#self-view').innerHTML =                '<i>Веб камера и микрофон не найдены</i>';            console.error('Local stream not found: ', e);        }        wsInit();    }    const createRemoteView = (id, username) => {        let iDiv = document.querySelector('#pc' + id);        if (!iDiv) {            iDiv = document.createElement('div');            iDiv.className = 'remote-view';            iDiv.id = 'pc' + id;            let iVideo = document.createElement('video');            iVideo.setAttribute('autoplay', 'true');            iVideo.setAttribute('playsinline', 'true');            let iLabel = document.createElement('span');            iDiv.append(iVideo);            iDiv.append(iLabel);            if (!remoteMaster.querySelector('video')) {                remoteMaster.append(iDiv);                iLabel.textContent = 'Ведущий';            } else {                remoteSlaves.append(iDiv);                iLabel.textContent = username;            }            remoteMaster.style.removeProperty('display');        }    }    // *******************************    // Signaling (Web Socket) methods    // *******************************    const wsInit = () => {        socket = new WebSocket(SIGNALING_SERVER_URL);        socket.onopen = function (e) {            log('[socket open] Соединение установлено');        }        socket.onmessage = function (event) {            log('[socket message] Данные получены с сервера', event);            wsHandle(event.data);        }        socket.onclose = function (event) {            if (event.wasClean) {                log('[close] Соединение закрыто чисто, ' +                    `код=${event.code} причина=${event.reason}`);            } else {                log('[socket close] Соединение прервано', event);            }            clearInterval(socket.timer);        }        socket.onerror = function (error) {            logError('[socket error]', error);        }        socket.timer = setInterval(() => {            socket.send('heartbeat');        }, 10000);    }    const wsHandle = async (data) => {        if (!data) {            return;        }        try {            data = JSON.parse(data);        } catch (e) {            return;        }        switch (data.type) {            case 'handshake':                selfId = data.uid;                if (!Object.keys(data.users).length) {                    createRemoteView(selfId, 'Ведущий');                    remoteMaster.querySelector('video').srcObject =                        selfStream;                    selfView.remove();                    break;                } else {                    selfView.style.removeProperty('display');                }                for (let id in data.users) {                    await pcCreate(id, data.users[id]);                }                break;            case 'offer':                await wsHandleOffer(data);                break;            case 'answer':                await wsHandleAnswer(data)                break;            case 'candidate':                await wsHandleICECandidate(data);                break;            default:                break;        }    }    const wsHandleOffer = async (data) => {        let pc = null;        if (!connections[data.src]) {            await pcCreate(data.src, data.username);        }        pc = connections[data.src].pc;        // We need to set the remote description to the received SDP offer        // so that our local WebRTC layer knows how to talk to the caller.        let desc = new RTCSessionDescription(data.sdp);        pc.setRemoteDescription(desc).catch(error => {            logError('handleOffer', error);        });        await pc.setLocalDescription(await pc.createAnswer());        wsSend({            type: 'answer',            target: data.src,            sdp: pc.localDescription        });        connections[data.src].pc = pc; // ???    }    const wsHandleAnswer = async (data) => {        log('*** Call recipient has accepted our call, answer:', data);        let pc = connections[data.src].pc;        // Configure the remote description,        // which is the SDP payload in our 'answer' message.        let desc = new RTCSessionDescription(data.sdp);        await pc.setRemoteDescription(desc).catch((error) => {            logError('handleAnswer', error);        });    }    const wsHandleICECandidate = async (data) => {        let pc = connections[data.src].pc;        let candidate = new RTCIceCandidate(data.candidate);        log('*** Adding received ICE candidate', candidate);        pc.addIceCandidate(candidate).catch(error => {            logError('handleICECandidate', error);        });    }    const wsSend = (data) => {        if (socket.readyState !== WebSocket.OPEN) {            return;        }        socket.send(JSON.stringify(data));    }    // ***********************    // Peer Connection methods    // ***********************    const pcCreate = async (id, username) => {        if (connections[id]) {            return;        }        try {            let pc = new RTCPeerConnection(PC_CONFIG);            pc.onicecandidate = (event) =>                pcOnIceCandidate(event, id);            pc.oniceconnectionstatechange = (event) =>                pcOnIceConnectionStateChange(event, id);            pc.onsignalingstatechange =  (event) =>                pcOnSignalingStateChangeEvent(event, id);            pc.onnegotiationneeded = (event) =>                pcOnNegotiationNeeded(event, id);            pc.ontrack = (event) =>                pcOnTrack(event, id);            connections[id] = {                pc: pc,                username: username            }            if (localStream) {                try {                    localStream.getTracks().forEach(                        (track) => connections[id].pc.addTransceiver(track, {                            streams: [localStream]                        })                    );                } catch (err) {                    logError(err);                }            } else {                // Start negotiation to listen remote stream only                pcOnNegotiationNeeded(null, id);            }            createRemoteView(id, username);        } catch (error) {            logError('Peer: Connection failed', error);        }    }    const pcOnTrack = (event, id) => {        let iVideo = document.querySelector('#pc' + id + ' video');        iVideo.srcObject = event.streams[0];    }    const pcOnIceCandidate = (event, id) => {        let pc = connections[id].pc;        if (event.candidate && pc.remoteDescription) {            log('*** Outgoing ICE candidate: ' + event.candidate);            wsSend({                type: 'candidate',                target: id,                candidate: event.candidate            });        }    }    const pcOnNegotiationNeeded = async (event, id) => {        let pc = connections[id].pc;        try {            const offer = await pc.createOffer();            // If the connection hasn't yet achieved the "stable" state,            // return to the caller. Another negotiationneeded event            // will be fired when the state stabilizes.            if (pc.signalingState != 'stable') {                return;            }            // Establish the offer as the local peer's current            // description.            await pc.setLocalDescription(offer);            // Send the offer to the remote peer.            wsSend({                type: 'offer',                target: id,                sdp: pc.localDescription            });        } catch(err) {            logError('*** The following error occurred while handling' +                ' the negotiationneeded event:', err);        };    }    const pcOnIceConnectionStateChange = (event, id) => {        let pc = connections[id].pc;        switch (pc.iceConnectionState) {            case 'closed':            case 'failed':            case 'disconnected':                pcClose(id);                break;        }    }    const pcOnSignalingStateChangeEvent = (event, id) => {        let pc = connections[id].pc;        log('*** WebRTC signaling state changed to: ' + pc.signalingState);        switch (pc.signalingState) {            case 'closed':                pcClose(id);                break;        }    }    const pcClose = (id) => {        let remoteView = document.querySelector('#pc' + id);        if (connections[id]) {            let pc = connections[id].pc;            pc.close();            delete connections[id];        }        if (remoteView) {            remoteView.remove();        }    }    // *******    // Helpers    // *******    const log = (msg, data) => {        if (!data) {            data = ''        }        console.log(msg, data);    }    const logError = (msg, data) => {        if (!data) {            data = ''        }        console.error(msg, data);    }    init();})();


Сервер сигнализации выполнен на Python фреймвоке aiohttp и представляет собой простую вьюху, тривиально проксирующую запросы WebRTC. Соединение с сервером в этом примере выполнено на веб сокетах. Ну и, в дополнение, через канал сигнализации передаются данные простого текстового чата.

Пример реализации сервера сигнализации
import jsonfrom aiohttp.web import WebSocketResponse, Responsefrom aiohttp import WSMsgTypefrom uuid import uuid1from lib.views import BaseViewclass WebSocket(BaseView):    """ Process WS connections """    async def get(self):        username = self.request['current_user'].firstname or 'Аноним'        room_id = self.request.match_info.get('room_id')        if room_id != 'test_room' and            self.request['current_user'].is_anonymous:            self.raise_error('forbidden')  # @TODO: send 4000        if (self.request.headers.get('connection', '').lower() != 'upgrade' or            self.request.headers.get('upgrade', '').lower() != 'websocket'):            return Response(text=self.request.path)  # ???        self.ws = WebSocketResponse()        await self.ws.prepare(self.request)        self.uid = str(uuid1())        if room_id not in self.request.app['web_sockets']:            self.request.app['web_sockets'][room_id] = {}        self.room = self.request.app['web_sockets'][room_id]        users = {}        for id, data in self.room.items():            users[id] = data['name']        ip = self.request.headers.get(            'X-FORWARDED-FOR',            self.request.headers.get('X-REAL-IP',            self.request.remote))        msg = {            'type': 'handshake',            'uid': str(self.uid),            'users': users, 'ip': ip}        await self.ws.send_str(json.dumps(msg, ensure_ascii=False))        self.room[self.uid] = {'name': username, 'ws': self.ws}        try:            async for msg in self.ws:                if msg.type == WSMsgType.TEXT:                    if msg.data == 'heartbeat':                        print('---heartbeat---')                        continue                    try:                        msg_data = json.loads(msg.data)                        if 'target' not in msg_data or                            msg_data['target'] not in self.room:                            continue                        msg_data['src'] = self.uid                        if 'type' in msg_data and 'target' in msg_data:                            if msg_data['type'] == 'offer':                                msg_data['username'] = username                        else:                            print('INVALID DATA:', msg_data)                    except Exception as e:                        print('INVALID JSON', e, msg)                    try:                        await self.room[msg_data['target']]['ws'].send_json(                            msg_data);                    except Exception as e:                        if 'target' in msg_data:                            self.room.pop(msg_data['target'])        finally:            self.room.pop(self.uid)        return self.ws


Технология WebRTC, кроме видеосвязи, позволяет предоставить браузеру разрешение на захват содержимого дисплея или отдельного приложения, что может оказаться незаменимым при проведении онлайн уроков, вебинаров или презентаций. Отлично, используем.

Я так увлекся современными возможностями видеосвязи, что чуть не забыл о самом главном предмете в классе интерактивной доске. Впрочем, базовая реализация настолько тривиальна, что я не стану загромождать ею эту статью. Просто добавляем canvas, слушаем события перемещения мыши onmousemove (ontouchmove для планшетов) и отправляем полученные координаты всем подключенным точкам через тот же сервер сигнализации.

Тестируем интерактивную доску


Тут понадобится планшет, дигитайзер и живой ребёнок. Заодно проверим возможность оцифровки рукописного ввода.

Для начала я взял старенький планшет Galaxy Tab на андроиде 4.4, самодельный стилус и первые попавшиеся прописи в качестве фона для canvas. Дополнительные программы не устанавливал. Результат меня обескуражил: мой планшет абсолютно не пригоден для письма! То есть водить по нему пальцем без проблем, а вот попасть стилусом в контур буквы, даже такой огромной, как на картинке ниже, уже проблема. Плюс гаджет начинает тупить в процессе рисования, в результате чего линии становятся ломанными. Плюс мне не удалось заставить ребёнка не опирать запястье на экран, отчего под рукой остается дополнительная мазня, а сам планшет начинает тормозить еще больше. Итог: обычный планшет для письма на доске не подходит. Максимум его возможностей двигать пальцем по экрану достаточно крупные фигуры. Но предлагать это школьникам поздновато.

Ладно, у нас ведь чисто теоретическое исследование, верно? Тогда берем дигитайзер (он же графический планшет) Wacom Bamboo формата A8, и наблюдаем за ребёнком.

Замечу, что мой подопытный шести лет от роду получил ноутбук, да еще с графическим пером первый раз в жизни. На получение базовых навыков обращения с пером у нас ушло минут десять, а уже на втором уроке ребёнок пользовался планшетом вполне уверенно, самостоятельно стирал с доски, рисовал рожицы, цветочки, нашу собаку и даже начал тыкать доступные в эпсилон окрестности кнопки, попутно задавая вопросы типа А зачем в школе поднимают руку?. Вот только результат неизменно оставлял желать лучшего. Дело в том, что дизайнеры и художники для прорисовки элемента максимально увеличивают фрагмент изображения, что и делает линии точными. Здесь же мы должны видеть доску целиком, в масштабе 1:1. Тут и взрослый не попадет в линию. Вот что получилось у нас:

image

Итоговый вердикт: ни о каком рукописном вводе не может быть и речи. А если мы хотим поставить руку нашим детям, нужно добиваться этого самостоятельно, на бумаге, школа в этом никак не поможет.

Надо сказать, что ребёнок воспринял все мои эксперименты с восторгом и, более того, с тех пор ходит за мной хвостиком и просит включить прописи. Уже хорошо, полученный навык ему пригодится, только совсем для других целей.

Так или иначе, в результате экспериментов я фактически получил MVP минимально жизнеспособный продукт (minimum viable product), почти пригодный для проведения онлайн уроков, с видео/аудио конференцией, общим экраном, интерактивной доской, простым текстовым чатом и кнопкой поднять руку. Это на случай, если у ребёнка вдруг не окажется микрофона. Да, такое бывает, особенно у детей, не выучивших уроки.

Но в этой бочке меда, к сожалению, есть пара ложек дёгтя.

Тестируем WebRTC


Ложка 1. Поскольку наша видеосвязь использует прямые подключения между клиентами, нужно первым делом проверить масштабируемость такого решения. Для теста я взял старенький ноутбук с двухядерным i5-3230M на борту, и начал подключать к нему клиентов с отключенными веб камерами, то есть эмулируя режим один-ко-многим:

image

Как видите, подопытный ноут в состоянии более-менее комфортно вещать пяти клиентам (при загрузке CPU в пределах 60%). И это при условии снижения разрешения исходящего видеопотока до 720p (640x480px) и frame rate до 15 fps. В принципе, не так уж и плохо, но при подключении класса из нескольких десятков учеников от фулл меша придется отказаться в пользу каскадирования, то есть каждый из первых пяти клиентов проксирует поток следующим пяти и так далее.

Ложка 2. Для создания прямого интерактивного подключения (ICE) между клиентами им нужно обойти сетевые экраны и преобразователи NAT. Для этого WebRTC использует сервер STUN, который сообщает клиентам внешние параметры подключения. Считается, что в большинстве случаев этого достаточно. Но мне почти сразу же повезло:

Как видите, отладчик ругается на невозможность ICE соединения и требует подключения TURN сервера, то есть ретранслятора (relay). А это уже ДОРОГО. Простым сервером сигнализации тут не обойтись. Вывод придётся пропускать все потоки через медиа сервер.

Медиа сервер


Для тестирования я использовал aiortc. Интересная разработка, позволяет подключить браузер напрямую к серверу через WebRTC. Отдельная сигнализация не нужна, можно использовать канал данных самого соединения. Это работает, все мои тестовые точки подключились без проблем. Но вот с производительностью беда. Простое эхо видео/аудио потока с теми же ограничениями 720p и 15fps съели 50% моего виртуального CPU на тестовом VDS. Причём, если увеличить нагрузку до 100%, видео поток не успевает выгружаться клиентам и начинает забивать память, что в итоге приводит к остановке сервера. Очевидно, Питон, который мы любим использовать для задач обработки ввода/вывода, не очень подходит для CPU bound. Придётся поискать более специализированное решение, например, Янус или Джитси.

В любом случае, полноценное решение потребует выделенных серверов, по моим прикидкам, из расчёта 1 ядро на комнату (класс). Это уже стоит денег и выходит за рамки простого тестирования, поэтому в этом месте первую фазу моего исследования я буду считать завершённой.

Выводы


1. Мягко говоря, странно видеть на официальном портале Российской Федерации инструкции по скачиванию и ссылки на регистрацию в программе бывшего потенциального врага 1 (тут про Microsoft Teams). И это в эпоху санкций и импортозамещения.
Нет, лично я за дружбу народов и вообще всяческую толерантность, но неужели только у меня от такой интеграции встают волосы дыбом? Разве нет наших разработок?

2. Интеграция МЭШ/РЭШ со школами. Вообще-то, разработчики МЭШ молодцы, даже с Яндекс.репетитором интеграцию сделали. А как быть с выставлением оценок в реальном времени во время уроков, когда будет API? Или я чего-то не знаю?

3. Выбирая дистанционную или семейную форму обучения, нужно быть честным с собой: переложить ответственность за образование ребёнка на школу не получится. Вся работа по проведению уроков (в случае семейного образования), поддержанию дисциплины и самоорганизованности (в любом случае) полностью ложится на родителей. Нужно отдавать себе в этом отчёт и найти время на занятия. Впрочем, в больших семьях с этим проблем быть не должно.

4. Я не буду приводить здесь ссылки на выбранные нами онлайн школы, чтобы это не сочли рекламой. Скажу только, что мы остановились на частных школах среднего ценового диапазона. В любом случае, окончательный результат будет зависеть от ребёнка и получим мы его не раньше сентября.

Или есть смысл довести до логического конца начатую здесь разработку и организовать свою школу? Что вы думаете? Есть единомышленники, имеющие профильные знания и опыт в области образования?

Полезные ссылки:
Российская Электронная Школа
Московская Электронная Школа
Библиотека МЭШ
Разработчику
Подробнее..

Json api сервис на aiohttp middleware и валидация

28.02.2021 18:05:07 | Автор: admin

В этой статье я опишу один из подходов для создания json api сервиса с валидацией данных.


Сервис будет реализован на aiohttp. Это современный, постоянно развивающийся фреймворк на языке python, использующий asyncio.


Об аннотациях:


Появление аннотаций в python позволило сделать код более понятным. Так же, аннотации открывают некоторые дополнительные возможности. Именно аннотации играют ключевую роль при валидации данных у обработчиков api-методов в этой статье.


Используемые библиотеки:


  • aiohttp фреймворк для создания web-приложений
  • pydantic классы, которые позволяют декларативно описывать данные и валидировать их
  • valdec декоратор для валидации аргументов и возвращаемых значений у функций

Оглавление:



1. Файлы и папки приложения


- sources - Папка с кодом приложения    - data_classes - Папка с модулями классов данных        - base.py - базовый класс данных        - person.py - классы данных о персоне        - wraps.py - классы данных оболочек для запросов/ответов    - handlers - Папка с модулями обработчиков запросов        - kwargs.py - обработчики для примера работы с `KwargsHandler.middleware`        - simple.py - обработчики для примера работы с `SimpleHandler.middleware`        - wraps.py - обработчики для примера работы с `WrapsKwargsHandler.middleware`    - middlewares - Папка с модулями для middlewares        - exceptions.py - классы исключений        - kwargs_handler.py - класс `KwargsHandler`        - simple_handler.py - класс `SimpleHandler`        - utils.py - вспомогательные классы и функции для middlewares        - wraps_handler.py - класс `WrapsKwargsHandler`    - requirements.txt - зависимости приложения    - run_kwargs.py - запуск с `KwargsHandler.middleware`    - run_simple.py - запуск c `SimpleHandler.middleware`    - run_wraps.py - запуск c `WrapsKwargsHandler.middleware`    - settings.py - константы с настройками приложения- Dockerfile - докерфайл для сборки образа

Код доступен на гитхаб: https://github.com/EvgeniyBurdin/api_service


2. json middlewares


middleware в aiohttp.web.Application() является оболочкой для обработчиков запросов.


Если в приложении используется middleware, то поступивший запрос сначала попадает в неё, и только потом передается в обработчик. Обработчик формирует и отдает ответ. Этот ответ снова сначала попадает в middleware и уже она отдает его наружу.


Если в приложении используются нескольно middleware, то каждая из них добавляет новый уровень вложенности.


Между middleware и обработчиком не обязательно должны передаваться "запрос" и "ответ" в виде web.Request и web.Response. Допускается передавать любые данные.


Таким образом, в middleware можно выделить действия над запросами/ответами, которые будут одинаковыми для всех обработчиков.


Это довольно упрощенное описание, но достаточное для понимания того что будет дальше.


2.1. Простая middleware для json сервиса


Обычно, объявление обработчика запроса в приложении aiohttp.web.Application() выглядит, примерно, так:


from aiohttp import webasync def some_handler(request: web.Request) -> web.Response:    data = await request.json()    ...    text = json.dumps(some_data)    ...    return web.Response(text=text, ...)

Для доступа к данным обработчику необходимо "вытащить" из web.Request объект, который был передал в json. Обработать его, сформировать объект с данными для ответа. Закодировать ответ в строку json и отдать "наружу" web.Response (можно отдать и сразу web.json_response()).


2.1.1. Объявление обработчика


Все обработчики нашего приложения должны выполнять подобные шаги. Поэтому, имеет смысл создать middleware, которая возьмет на себя одинаковые действия по подготовке данных и обработке ошибок, а сами обработчики бы стали такими:


from aiohttp import webasync def some_handler(request: web.Request, data: Any) -> Any:    ...    return some_data

Каждый из обработчиков имеет два позиционных аргумента. В первый будет передан оригинальный экземпляр web.Request (на всякий случай), во второй уже готовый объект python, с полученными данными.


В примере, второй аргумент имеет такое объявление: data: Any. Имя у него может быть любым (как и у первого аргумента), а вот в аннотации лучше сразу указать тип объекта, который "ждет" обработчик. Это пожелание справедливо и для возврата.


То есть, в реальном коде, объявление обработчика может быть таким:


from aiohttp import webfrom typing import Union, Listasync def some_handler(    request: web.Request, data: Union[str, List[str]]) -> List[int]:    ...    return some_data

2.1.2. Класс SimpleHandler для middleware


Класс SimpleHandler реализует метод для самой middleware и методы, которые впоследствии помогут изменять/дополнять логику работы middleware (ссылка на код класса).


Остановлюсь подробнее только на некоторых.


2.1.2.1. Метод middleware

    @web.middleware    async def middleware(self, request: web.Request, handler: Callable):        """ middleware для json-сервиса.        """        if not self.is_json_service_handler(request, handler):            return await handler(request)        try:            request_body = await self.get_request_body(request, handler)        except Exception as error:            response_body = self.get_error_body(request, error)            status = 400        else:            # Запуск обработчика            response_body, status = await self.get_response_body_and_status(                request, handler, request_body            )        finally:            # Самостоятельно делаем дамп объекта python (который находится в            # response_body) в строку json.            text, status = await self.get_response_text_and_status(                request, response_body, status            )        return web.Response(            text=text, status=status, content_type="application/json",        )

Именно этот метод надо будет добавить в список middlewares в процессе создания приложения.


Например, так:


    ...    app = web.Application()    service_handler = SimpleHandler()    app.middlewares.append(service_handler.middleware)    ...

2.1.2.2. Метод для получения данных ответа с ошибкой

Так как у нас json сервис, то, желательно, чтобы ошибки во входящих данных (с кодом 400), и внутренние ошибки сервиса (с кодом 500), отдавались в формате json.


Для этого создан метод формирования "тела" для ответа с ошибкой:


    def get_error_body(self, request: web.Request, error: Exception) -> dict:        """ Отдает словарь с телом ответа с ошибкой.        """        return {"error_type": str(type(error)), "error_message": str(error)}

Хочу обратить внимание на то, что этот метод должен отработать без исключений и вернуть объект с описанием ошибки, который можно кодировать в json. Если работа этого метода завершиться исключением, то мы не увидим json в теле ответа.


2.1.2.3. Метод запуска обработчика

В текущем классе он очень простой:


    async def run_handler(        self, request: web.Request, handler: Callable, request_body: Any    ) -> Any:        """ Запускает реальный обработчик, и возвращает результат его работы.        """        return await handler(request, request_body)

Запуск выделен в отдельный метод для того, чтобы можно было добавить логику до/после выполнения самого обработчика.


2.1.3. Примеры


Имеется такой обработчик:


async def some_handler(request: web.Request, data: dict) -> dict:    return data

Будем посылать запросы на url этого обработчика.


текст примеров...
2.1.3.1. Ответ с кодом 200

Запрос POST на /some_handler:


{    "name": "test",    "age": 25}

ожидаемо вернет ответ с кодом 200:


{    "name": "test",    "age": 25}

2.1.3.2. Ответ с кодом 400

Сделаем ошибку в теле запроса.


Запрос POST на /some_handler:


{    "name": "test", 111111111111    "age": 25}

Теперь ответ сервиса выглядит так:


{    "error_type": "<class 'json.decoder.JSONDecodeError'>",    "error_message": "Expecting property name enclosed in double quotes: line 2 column 21 (char 22)"}

2.1.3.3. Ответ с кодом 500

Добавим в код обработчика исключение (эмулируем ошибку сервиса).


async def handler500(request: web.Request, data: dict) -> dict:    raise Exception("Пример ошибки 500")    return data

Запрос POST на /handler500:


{    "name": "test",    "age": 25}

в ответ получит такое:


{    "error_type": "<class 'Exception'>",    "error_message": "Пример ошибки 500"}

2.2. middleware для "kwargs-обработчиков"


middleware из предыдущего раздела уже можно успешно использовать.


Но проблема дублирования кода в обработчиках не решена до конца.


Рассмотрим такой пример:


async def some_handler(request: web.Request, data: dict) -> dict:    storage = request.app["storage"]    logger = request.app["logger"]    user_id = request.match_info["user_id"]    # и т.д. и т.п...    return data

Так как storage, или logger (или что-то еще), могут быть нужны и в других обработчиках, то везде придется "доставать" их одинаковым образом.


2.2.1. Объявление обработчика


Хотелось бы, чтобы обработчики объявлялись, например, так:


async def some_handler_1(data: dict) -> int:    # ...    return some_dataasync def some_handler_2(storage: StorageClass, data: List[int]) -> dict:    # ...    return some_dataasync def some_handler_3(    data: Union[dict, List[str]], logger: LoggerClass, request: web.Request) -> str:    # ...    return some_data

То есть, чтобы нужные для обработчика сущности объявлялись в его сигнатуре и сразу были бы доступны в коде.


2.2.2. Вспомогательный класс ArgumentsManager


Про нужные для обработчика сущности должна знать middleware, чтобы она смогла "вытащить" небходимые для обработчика и "подсунуть" ему при вызове.


За регистрацию, хранение и "выдачу" таких сущностей отвечает класс ArgumentsManager. Он объявлен в модуле middlewares/utils.py (ссылка на код класса).


Для хранения связи "имя аргумента" "действие по извлечению значения для аргумента" в этом классе определен простой словарь, где ключем является "имя аргумента", а значением ссылка на метод, который будет вызван для извлечения "значения аргумента".


Звучит немного запутано, но на самом деле всё просто:


@dataclassclass RawDataForArgument:    request: web.Request    request_body: Any    arg_name: Optional[str] = Noneclass ArgumentsManager:    """ Менеджер для аргументов обработчика.        Связывает имя аргумента с действием, которое надо совершить для        получения значения аргумента.    """    def __init__(self) -> None:        self.getters: Dict[str, Callable] = {}    # Тело json запроса ------------------------------------------------------    def reg_request_body(self, arg_name) -> None:        """ Регистрация имени аргумента для тела запроса.        """        self.getters[arg_name] = self.get_request_body    def get_request_body(self, raw_data: RawDataForArgument):        return raw_data.request_body    # Ключи в request --------------------------------------------------------    def reg_request_key(self, arg_name) -> None:        """ Регистрация имени аргумента который хранится в request.        """        self.getters[arg_name] = self.get_request_key    def get_request_key(self, raw_data: RawDataForArgument):        return raw_data.request[raw_data.arg_name]    # Ключи в request.app ----------------------------------------------------    def reg_app_key(self, arg_name) -> None:        """ Регистрация имени аргумента который хранится в app.        """        self.getters[arg_name] = self.get_app_key    def get_app_key(self, raw_data: RawDataForArgument):        return raw_data.request.app[raw_data.arg_name]    # Параметры запроса ------------------------------------------------------    def reg_match_info_key(self, arg_name) -> None:        """ Регистрация имени аргумента который приходит в параметрах запроса.        """        self.getters[arg_name] = self.get_match_info_key    def get_match_info_key(self, raw_data: RawDataForArgument):        return raw_data.request.match_info[raw_data.arg_name]    # Можно добавить и другие регистраторы...

Регистрация имен аргументов выполняется при создании экземпляра web.Application():


# ...app = web.Application()arguments_manager = ArgumentsManager()# Регистрация имени аргумента обработчика, в который будут передаваться# данные полученные из json-тела запросаarguments_manager.reg_request_body("data")# Регистрация имени аргумента обработчика, в который будет передаваться# одноименный параметр запроса из словаря request.match_infoarguments_manager.reg_match_info_key("info_id")# В приложении будем использовать хранилище# (класс хранилища "взят с потолка" и здесь просто для примера)app["storage"] = SomeStorageClass(login="user", password="123")# Регистрация имени аргумента обработчика, в который будет передаваться# экземпляр хранилищаarguments_manager.reg_app_key("storage")# ...

Теперь экземпляр ArgumentsManager хранит информацию о возможных аргументах обработчиков. Он передается при создании экземпляра класса для middleware:


...service_handler = KwargsHandler(arguments_manager=arguments_manager)app.middlewares.append(service_handler.middleware)...

Сейчас менеджер очень простой. Можно добавить в него регистрацию сразу нескольких ключей одного вида, правила для разрешения конфликтов имен, и проч например, и то, что потом можно будет использовать при сборке документации.


2.2.3. Класс KwargsHandler для middleware


Класс KwargsHandler является наследником SimpleHandler и расширяет его возможности тем, что позволяет создавать обработчики согласно требованию п.2.2.1.


В этом классе переопределяется один метод run_handler, и добавляется еще два make_handler_kwargs и build_error_message_for_invalid_handler_argument (ссылка на код класса).


2.2.3.1. Метод запуска обработчика

Переопределяется метод родительского класса:


    async def run_handler(        self, request: web.Request, handler: Callable, request_body: Any    ) -> Any:        """ Запускает реальный обработчик, и возвращает результат его работы.            (Этот метод надо переопределять, если необходима дополнительная            обработка запроса/ответа/исключений)        """        kwargs = self.make_handler_kwargs(request, handler, request_body)        return await handler(**kwargs)

Как можно заметить, теперь аргументы в обработчик передаются именованными. Таким образом, в обработчиках становится не важен порядок следования аргументов в сигнатуре. Но стали важны сами имена аргументов.


2.2.3.2. Метод формирования словаря с именами аргументов и их значениями

Метод make_handler_kwargs был добавлен в текущий класс. Он реализует заполнение словаря с именами аргументов и их значениями, который будет потом использован при вызове обработчика. Заполнение словаря происходит при помощи уже подготовленного экземпляра ArgumentsManager.


Напомню, что в сигнатурах обработчиков сейчас можно использовать только имена аргументов, которые были зарегистрированы в экземпляре класса ArgumentsManager.


Но у этого требования есть одно исключение. А именно, аргумент с экземпляром web.Request может иметь в сигнатуре обработчика любое имя, но он обязательно должен иметь аннотацию типом web.Request (например, r: web.Request или req: web.Request или request: web.Request). То есть, экземпляр web.Request "зарегистрирован" по умолчанию, и может быть использован в любом обработчике.


И еще одно замечание: все аргументы обработчика должны иметь аннотацию.


Метод build_error_message_for_invalid_handler_argument просто формирует строку с сообщением об ошибке. Он создан для возможности изменить сообщение на свой вкус.


2.2.4. Примеры


Сигнатуры методов такие:


async def create(    data: Union[dict, List[dict]], storage: dict,) -> Union[dict, List[dict]]:    # ...async def read(storage: dict, data: str) -> dict:    # ...async def info(info_id: int, request: web.Request) -> str:    # ...

Первые два обслуживают POST запросы, последний GET (просто, для примера)


текст примеров...
2.2.4.1. Метод /create

Запрос:


[    {        "name": "Ivan"    },    {        "name": "Oleg"    }]

Ответ:


[    {        "id": "5730bab1-9c1b-4b01-9979-9ad640ea5fc1",        "name": "Ivan"    },    {        "id": "976d821a-e871-41b4-b5a2-2875795d6166",        "name": "Oleg"    }]

2.2.4.2. Метод /read

Запрос:


"5730bab1-9c1b-4b01-9979-9ad640ea5fc1"

Ответ:


{    "id": "5730bab1-9c1b-4b01-9979-9ad640ea5fc1",    "name": "Ivan"}

Примечание: читайте данные с одним из UUID которые получили в предыдущем примере, иначе будет ответ с ошибкой 500 PersonNotFound.


2.2.4.3. Метод /info/{info_id}

Запрос GET на /info/123:


"any json"

Ответ:


"info_id=123 and request=<Request GET /info/123 >"

2.3. middleware c оболочками запроса/ответа и валидацией


Иногда, требования для api-сервиса включают в себя стандартизированные оболочки для запросов и ответов.


Например, тело запроса к методу create может быть таким:


{    "data": [        {            "name": "Ivan"        },        {            "name": "Oleg"        }    ],    "id": 11}

а ответ таким:


{    "success": true,    "result": [        {            "id": "9738d8b8-69da-40b2-8811-b33652f92f1d",            "name": "Ivan"        },        {            "id": "df0fdd43-4adc-43cd-ac17-66534529d440",            "name": "Oleg"        }    ],    "id": 11}

То есть, данные для запроса в ключе data а от ответа в result.


Имеется ключ id, который в ответе должен иметь такое же значение как и в запросе.


Ключ ответа success является признаком успешности запроса.


А если запрос закончился неудачно, то ответ может быть таким:


Запрос к методу read:


{    "data":  "ddb0f2b1-0179-44b7-b94d-eb2f3b69292d",    "id": 3}

Ответ:


{    "success": false,    "result": {        "error_type": "<class 'handlers.PersonNotFound'>",        "error_message": "Person whith id=ddb0f2b1-0179-44b7-b94d-eb2f3b69292d not found!"    },    "id": 3}

Уже представленные классы для json middleware позволяют добавить логику работы с оболочками в новый класс для middleware. Надо будет дополнить метод run_handler, и заменить (или дополнить) метод get_error_body.


Таким образом, в обработчики будут "прилетать" только данные, необходимые для их работы (в примере это значение ключа data). Из обработчиков будет возвращаться только положительный результат (значение ключа result). А исключения будет обрабатывать middleware.


Так же, если это необходимо, можно добавить и валидацию данных.


Чтобы "два раза не вставать", я сразу покажу как добавить и оболочки и валидацию. Но сначала необходимо сделать некоторые пояснения по выбранным инструментам.


2.3.1. Класс данных pydantic.BaseModel


pydantic.BaseModel позволяет декларативно объявлять данные.


При создании экземпляра происходит валидация данных по их аннотациям (и не только). Если валидация провалилась поднимается исключение.


Небольшой пример:


from pydantic import BaseModelfrom typing import Union, Listclass Info(BaseModel):    foo: intclass Person(BaseModel):    name: str    info: Union[Info, List[Info]]kwargs = {"name": "Ivan", "info": {"foo": 0}}person = Person(**kwargs)assert person.info.foo == 0kwargs = {"name": "Ivan", "info": [{"foo": 0}, {"foo": 1}]}person = Person(**kwargs)assert person.info[1].foo == 1kwargs = {"name": "Ivan", "info": {"foo": "bar"}}  # <- Ошибка, str не intperson = Person(**kwargs)# Возникло исключение:# ...# pydantic.error_wrappers.ValidationError: 2 validation errors for Person# info -> foo#  value is not a valid integer (type=type_error.integer)# info#  value is not a valid list (type=type_error.list)

Тут мы видим, что после успешной валидации, поля экземпляра получают значения входящих данных. То есть, был словарь, стал экземпляр класса.


В аннотациях к полям мы можем использовать алиасы из typing.


Если в аннотации к полю присутствует класс-потомок pydantic.BaseModel, то данные "маппятся" и в него (и так с любой вложенностью хотя, на счет "любой" не проверял).


Провал валидации сопровождается довольно информативным сообщением об ошибке. В примере мы видим, что на самом деле было две ошибки: info.foo не int, и info не list, что соответствует аннотации и сопоставленному с ней значению.


При использовании pydantic.BaseModel есть нюансы, на которые я хочу обратить внимание.


2.3.1.1. Строгие типы

Если в любом из приведенных выше примеров заменить целое на строку, содержащую только цифры, то валидация всё равно закончится успешно:


kwargs = {"name": "Ivan", "info": {"foo": "0"}}person = Person(**kwargs)assert person.info.foo == 0

То есть, имеем неявное приведение типов. И такое встречается не только с str->int (более подробно про типы pydantic см. в документации).


Приведение типов, в определенных ситуациях, может оказаться полезным, например строка с UUID -> UUID. Но, если приведение некоторых типов недопустимо, то в аннотациях надо использовать типы, наименование у которых начинается со Strict.... Например, pydantic.StrictInt, pydantic.StrictStr, и т.п...


2.3.1.2. Строгая сигнатура при создании экземпляра

Если, для определенных выше классов, попробовать выполнить такой пример:


kwargs = {"name": "Ivan", "info": {"foo": 0}, "bar": "BAR"}person = Person(**kwargs)

То создание экземпляра пройдет без ошибок.


Это тоже может оказаться не тем, что ожидаешь по умолчанию.


Для строгой проверки аргументов, при создании экземпляра, необходимо переопределить базовый класс:


from pydantic import BaseModel, Extra, StrictInt, StrictStrfrom typing import Union, Listclass BaseApi(BaseModel):    class Config:        # Следует ли игнорировать (ignore), разрешать (allow) или        # запрещать (forbid) дополнительные атрибуты во время инициализации        # модели, подробнее:        # https://pydantic-docs.helpmanual.io/usage/model_config/        extra = Extra.forbidclass Info(BaseApi):    foo: StrictIntclass Person(BaseApi):    name: StrictStr    info: Union[Info, List[Info]]kwargs = {"name": "Ivan", "info": {"foo": 0}, "bar": "BAR"}person = Person(**kwargs)# ...# pydantic.error_wrappers.ValidationError: 1 validation error for Person# bar#   extra fields not permitted (type=value_error.extra)

Теперь все нормально, валидация провалилась.


2.3.2. Декоратор valdec.validate


Декоратор valdec.validate позволяет валидировать аргументы и/или возвращаемое значение функции или метода.


Можно валидировать только те аргументы, для которых указана аннотация.


Если у возврата нет аннотации, то считается что функция должна вернуть None (имеет аннотацию -> None:).


Определен декоратор как для обычных функций/методов:


from valdec.decorators import validate@validate  # Валидируем все аргументы с аннотациями, и возвратdef foo(i: int, s: str) -> int:    return i@validate("i", "s")  # Валидируем только "i" и "s"def bar(i: int, s: str) -> int:    return i

так и для асинхронных.


# Импортируем асинхронный вариантfrom valdec.decorators import async_validate as validate@validate("s", "return", exclude=True)  # Валидируем только "i"async def foo(i: int, s: str) -> int:    return int(i)@validate("return")  # Валидируем только возвратasync def bar(i: int, s: str) -> int:    return int(i)

2.3.2.1. Функции-валидаторы

Декоратор получает данные об аргументах/возврате функции, и передает их в функцию-валидатор (это сильно упрощенно, но по сути так), которая и производит, непосредственно, валидацию.


Сигнатура функции-валидатора:


def validator(    annotations: Dict[str, Any],    values: Dict[str, Any],    is_replace: bool,    extra: dict) -> Optional[Dict[str, Any]]:

Аргументы:


  • annotations Словарь, который содержит имена аргументов и их аннотации.
  • values Словарь, который содержит имена аргументов и их значения.
  • is_replace управляет тем, что возвращает функция-валидатор, а именно возвращать отвалидированные значения или нет.
    • Если True, то функция должна вернуть словарь с именами отвалидированных аргументов и их значениями после валидации. Таким образом, например, если у аргумента была аннотация с наследником BaseModel и данные для него поступили в виде словаря, то они будут заменены на экземпляр BaseModel, и в декорируемой функции к ним можно будет обращаться "через точку".
    • Если параметр равен False, то функция вернет None, а декорируемая функция получит оригинальные данные (то есть, например, словарь так и останется словарем, а не станет экземпляром BaseModel).
  • extra Словарь с дополнительными параметрами.

По умолчанию, в декораторе validate используется функция-валидатор на основе pydantic.BaseModel.


В ней происходит следующее:


  • На основании словаря с именами аргументов и их аннотаций создается класс данных (потомок pydantic.BaseModel)
  • Создается экземпляр этого класса в который передается словарь с именами и значениями. В этот момент и происходит валидация.
  • Возвращает функция аргументы после валидации (которые уже буду содержать значения из созданного экземпляра), или ничего не возвращает, зависит от аргумента is_replace.

Вызов функции происходит один раз для всех аргументов, и второй раз, отдельно, для возврата. Конечно, если есть что валидировать, как в первом, так и во втором случае.


Функция-валидатор может быть реализована на основе любого валидирующего класса (в репозитарии valdec есть пример реализации на ValidatedDC). Но необходимо учесть следующее: далее в статье, я буду использовать потомков pydantic.BaseModel в аннотациях аргументов у обработчиков. Соответственно, при другом валидирующем классе, в аннотациях необходимо будет указывать потомков этого "другого" класса.


2.3.2.2. Настройка декоратора

По умолчанию, декоратор "подменяет" исходные данные на данные экземпляра валидирующего класса:


from typing import List, Optionalfrom pydantic import BaseModel, StrictInt, StrictStrfrom valdec.decorators import validateclass Profile(BaseModel):    age: StrictInt    city: StrictStrclass Student(BaseModel):    name: StrictStr    profile: Profile@validate("group")def func(group: Optional[List[Student]] = None):    for student in group:        assert isinstance(student, Student)        assert isinstance(student.name, str)        assert isinstance(student.profile.age, int)data = [    {"name": "Peter", "profile": {"age": 22, "city": "Samara"}},    {"name": "Elena", "profile": {"age": 20, "city": "Kazan"}},]func(data)

Обратите внимание на assert'ы.


Это работает и для возврата:


@validate  # Валидируем всёdef func(group: Optional[List[Student]] = None, i: int) -> List[Student]:    #...    return [        {"name": "Peter", "profile": {"age": 22, "city": "Samara"}},        {"name": "Elena", "profile": {"age": 20, "city": "Kazan"}},    ]

Здесь, несмотря на то, что в return явно указан список словарей, функция вернет список экземпляров Student (подмену выполнит декоратор).


Но Нам не всегда надо именно такое поведение. Иногда, бывает полезно отвалидировать, а данные не подменять (например, если речь о входящих данных, чтобы сразу отдать их в БД). И этого можно добиться изменив настройки декоратора:


from valdec.data_classes import Settingsfrom valdec.decorators import validate as _validatefrom valdec.validator_pydantic import validatorcustom_settings = Settings(    validator=validator,     # Функция-валидатор.    is_replace_args=False,   # Делать ли подмену в аргументах    is_replace_result=False, # Делать ли подмену в результате    extra={}                 # Дополнительные параметры, которые будут                             # передаваться в функцию-валидатор)# Определяем новый декораторdef validate_without_replacement(*args, **kwargs):    kwargs["settings"] = custom_settings    return _validate(*args, **kwargs)# Используем@validate_without_replacementdef func(group: Optional[List[Student]] = None, i: int) -> List[Student]:    #...    return [        {"name": "Peter", "profile": {"age": 22, "city": "Samara"}},        {"name": "Elena", "profile": {"age": 20, "city": "Kazan"}},    ]

И теперь func вернет список словарей, так как is_replace_result=False. И получит тоже список словарей, так как is_replace_args=False.


Но сама валидация данных будет работать как и раньше, не будет лишь подмены.


Есть один нюанс линтер, иногда, может "ругаться" на различия в типах. Да, это минус. Но всегда лучше иметь выбор, чем его не иметь.


Может возникнуть вопрос что будет, если декоратор получит, допустим, не словарь, а уже готовый экземпляр класса? Ответ будет выполнена обычная проверка экземпляра на соответствие типу.


Как можно заметить, в настройках указывается и функция-валидатор, и если вы захотите использовать свою именно там нужно ее подставить.


2.3.2.3. Еще раз про приведение типов

Рассмотрим такой пример применения декоратора:


from valdec.decorators import validate@validatedef foo(i: int):    assert isinstance(i, int)foo("1")

Мы вызываем функцию и передаем ей строку. Но валидация прошла успешно, и в функцию прилетело целое.


Как я уже говорил, по умолчанию, в декораторе validate, используется функция-валидатор на основе pydantic.BaseModel. В п.2.3.1.1. можно еще раз почитать про неявное приведение типов в этом классе.


В нашем же примере, для того чтобы получить желаемое поведение (ошибку валидации), необходимо сделать так:


from valdec.decorators import validatefrom pydantic import StrictInt@validatedef foo(i: StrictInt):    passfoo("1")# ...# valdec.errors.ValidationArgumentsError: Validation error# <class 'valdec.errors.ValidationError'>: 1 validation error for# argument with the name of:# i#  value is not a valid integer (type=type_error.integer).

Вывод такой: Используя декоратор на основе валидирующего класса, аннотации к аргументам функции надо писать по правилам этого класса.


Не забывайте про это.


2.3.2.4. Исключения

  • valdec.errors.ValidationArgumentsError "поднимается" если валидация аргументов функции потерпела неудачу
  • valdec.errors.ValidationReturnError если не прошел валидацию возврат

Само сообщение с описанием ошибки берется из валидирующего класса. В нашем примере это сообщение об ошибке от pydantic.BaseModel.


2.3.3. Базовый класс данных


Как я уже говорил, в этой статье используем классы-наследники от pydantic.BaseModel.


Cначала обязательно определим базовый класс данных:


data_classes/base.py


from pydantic import BaseModel, Extraclass BaseApi(BaseModel):    """ Базовый класс данных для api.    """    class Config:        extra = Extra.forbid

2.3.4. Объявление обработчика


Класс для middleware, над созданием которого мы сейчас работаем, позволит объявлять обработчики, например, так:


from typing import List, Unionfrom valdec.decorators import async_validate as validatefrom data_classes.person import PersonCreate, PersonInfo@validate("data", "return")async def create(    data: Union[PersonCreate, List[PersonCreate]], storage: dict,) -> Union[PersonInfo, List[PersonInfo]]:    # ...    return result

Что здесь добавилось (по сравнению с обработчиками из прошлых глав):


  • декоратор validate валидирует поступившие данные и ответ, и "подменяет" их на экземпляры валидирующих классов
  • в аннотациях у данных указаны уже конкретные классы.

Про оболочки запросов/ответов обработчик ничего не знает, ему это и не надо.


Позволю себе небольшую ремарку: если в аргументах у обработчика нет экземпляра web.Request(), то он является обычной функцией, которую можно использовать не только в wep.Aplication(). На самом деле, даже если он там есть, эту функцию по-прежнему можно будет использовать в приложениях другого типа, если обеспечить совместимый с web.Request() экземпляр данных.


Соответственно, классы данных для этого обработчика могут быть такими:


data_classes/person.py


from uuid import UUIDfrom pydantic import Field, StrictStrfrom data_classes.base import BaseApiclass PersonCreate(BaseApi):    """ Данные для создания персоны.    """    name: StrictStr = Field(description="Имя.", example="Oleg")class PersonInfo(BaseApi):    """ Информация о персоне.    """    id: UUID = Field(description="Идентификатор.")    name: StrictStr = Field(description="Имя.")

2.3.5. Классы данных для оболочек


В самом начале п.2.3. были обозначены тебования к оболочкам запроса и ответа.


Для их выполнения создадим классы данных.


data_classes/wraps.py


from typing import Any, Optionalfrom pydantic import Field, StrictIntfrom data_classes.base import BaseApi_ID_DESCRIPTION = "Идентификатор запроса к сервису."class WrapRequest(BaseApi):    """ Запрос.    """    data: Any = Field(description="Параметры запроса.", default=None)    id: Optional[StrictInt] = Field(description=_ID_DESCRIPTION)class WrapResponse(BaseApi):    """ Ответ.    """    success: bool = Field(description="Статус ответа.", default=True)    result: Any = Field(description="Результат ответа.")    id: Optional[StrictInt] = Field(description=_ID_DESCRIPTION)

Эти классы будут использоваться в классе для middleware при реализации логики оболочек.


2.3.6. Класс WrapsKwargsHandler для middleware


Класс WrapsKwargsHandler является наследником KwargsHandler и расширяет его возможности тем, что позволяет использовать оболочки для данных запросов и ответов и их валидацию (ссылка на код класса).


В этом классе переопределяются два метода run_handler и get_error_body.


2.3.6.1. Метод запуска обработчика

Переопределяется метод родительского класса:


async def run_handler(        self, request: web.Request, handler: Callable, request_body: Any    ) -> dict:        id_ = None        try:            # Проведем валидацию оболочки запроса            wrap_request = WrapRequest(**request_body)        except Exception as error:            message = f"{type(error).__name__} - {error}"            raise InputDataValidationError(message)        # Запомним поле id для ответов        id_ = wrap_request.id        request[KEY_NAME_FOR_ID] = id_        try:            result = await super().run_handler(                request, handler, wrap_request.data            )        except ValidationArgumentsError as error:            message = f"{type(error).__name__} - {error}"            raise InputDataValidationError(message)        # Проведем валидацию оболочки ответа        wrap_response = WrapResponse(success=True, result=result, id=id_)        return wrap_response.dict()

Сначала мы проверим оболочку запроса. Исключение InputDataValidationError поднимется в следующих случаях:


  • если в теле запроса не словарь (пусть даже пустой)
  • если есть поля с ключами отличными от data и id
  • если есть ключ id но его значение не StrictInt и не None

Если в запросе нет ключа id, то wrap_request.id получит значение None. Ключ data может иметь любое значение и валидироваться не будет. Так же, его может вообще не быть во входящих данных, тогда wrap_request.data получит значение None.


Затем мы запоминаем wrap_request.id в request. Это необходимо для формирования ответа с ошибкой на текущий запрос (если она произойдет).


После этого вызывается обработчик, но для его входящих данных передается только wrap_request.data (напомню, что во wrap_request.data сейчас объект python в том виде, как он был получен из json). При этом, исключение InputDataValidationError поднимается если получено исключение valdec.errors.ValidationArgumentsError.


Если обработчик отработал нормально, и был получен результат его работы, то создаем экземпляр класса оболочки ответа WrapResponse в варианте для успешного ответа.


Все просто, но хотел бы обратить внимание на такой момент. Можно было бы обойтись без создания wrap_response, а сразу сформировать словарь (как это и будет сделано для ответа с ошибкой). Но, в случае успешного ответа мы не знаем что пришло в ответе от обработчика, это может быть, например, как список словарей, так и список экземпляров BaseApi. А на выходе из метода мы должны гарантированно отдать объект, готовый для кодирования в json. Поэтому, мы "заворачиваем" любые данные с результом во WrapResponse.result и уже из wrap_response получаем окончательный ответ для метода при помощи wrap_response.dict() (ссылка на документацию).


2.3.6.2. Метод для получения данных ответа с ошибкой

Заменяется метод родительского класса:


def get_error_body(self, request: web.Request, error: Exception) -> dict:        """ Формирует и отдает словарь с телом ответа с ошибкой.        """        result = dict(error_type=str(type(error)), error_message=str(error))        # Так как мы знаем какая у нас оболочка ответа, сразу сделаем словарь        # с аналогичной "схемой"        response = dict(            # Для поля id используется сохраненное в request значение.            success=False, result=result, id=request.get(KEY_NAME_FOR_ID)        )        return response

Здесь можно было бы применить и наследование (вызвать super() для получения result), но для наглядности я оставил так. Вы можете сделать как сочтете нужным.


2.3.7. Примеры


Сигнатуры методов такие:


@validate("data", "return")async def create(    data: Union[PersonCreate, List[PersonCreate]], storage: dict,) -> Union[PersonInfo, List[PersonInfo]]:    # ...@validate("data", "return")async def read(storage: dict, req: web.Request, data: UUID) -> PersonInfo:    # ...@validate("info_id")async def info(info_id: int, request: web.Request) -> Any:    return f"info_id={info_id} and request={request}"

Первые два обслуживают POST запросы, последний GET (просто, для примера)


текст примеров...
2.3.7.1. Метод /create

  • Запрос 1:

{    "data": [        {            "name": "Ivan"        },        {            "name": "Oleg"        }    ],    "id": 1}

Ответ:


{    "success": true,    "result": [        {            "id": "af908a90-9157-4231-89f6-560eb6a8c4c0",            "name": "Ivan"        },        {            "id": "f7d554a0-1be9-4a65-bfc2-b89dbf70bb3c",            "name": "Oleg"        }    ],    "id": 1}

  • Запрос 2:

{    "data": {        "name": "Eliza"    },    "id": 2}

Ответ:


{    "success": true,    "result": {        "id": "f3a45a19-acd0-4939-8e0c-e10743ff8e55",        "name": "Eliza"    },    "id": 2}

  • Запрос 3:

Попробуем передать в data невалидное значение


{    "data": 123,    "id": 3}

Ответ:


{    "success": false,    "result": {        "error_type": "<class 'middlewares.exceptions.InputDataValidationError'>",        "error_message": "ValidationArgumentsError - Validation error <class 'valdec.errors.ValidationError'>: 2 validation errors for argument with the name of:\ndata\n  value is not a valid dict (type=type_error.dict)\ndata\n  value is not a valid list (type=type_error.list)."    },    "id": 3}

2.3.7.2. Метод /read

  • Запрос 1:

{    "data": "f3a45a19-acd0-4939-8e0c-e10743ff8e55",    "id": 4}

Ответ:


{    "success": true,    "result": {        "id": "f3a45a19-acd0-4939-8e0c-e10743ff8e55",        "name": "Eliza"    },    "id": 4

  • Запрос 2:

Попробуем сделать ошибку в оболочке.


{    "some_key": "f3a45a19-acd0-4939-8e0c-e10743ff8e55",    "id": 5}

Ответ:


{    "success": false,    "result": {        "error_type": "<class 'middlewares.exceptions.InputDataValidationError'>",        "error_message": "ValidationError - 1 validation error for WrapRequest\nsome_key\n  extra fields not permitted (type=value_error.extra)"    },    "id": null}

2.3.7.3. Метод /info/{info_id}

  • Запрос GET на /info/123:

{}

Ответ:


{    "success": true,    "result": "info_id=123 and request=<Request GET /info/123 >",    "id": null}

3. О нереализованной документации


У обработчиков, которые используются с классом WrapsKwargsHandler, есть всё, чтобы автоматически собрать документацию. К ним более не надо ничего добавлять. Так как классы pydantic.BaseModel позволяют получать json-schema, то остается только сделать скрипт сборки документации (если кратко, то надо: перед запуском приложения пройтись по всем обработчикам и у каждого заменить докстринг на swagger-описание, построенное на основе уже имеющегося докстринга и json-схем входящих данных и возврата).


И я эту документацию собираю. Но не стал рассказывать про это в статье. Причина в том, что я не нашел библиотеки для swagger и aiohttp, которая бы работала полностью как надо (или я не нашел способа заставить работать как надо).


Например, библиотека aiohttp-swagger некорректно отображает аргумент (в областях с примерами), если в аннотации есть алиас Union.


Библиотека aiohttp-swagger3, напротив, все прекрасно показывает, но не работает если в приложении есть sub_app.


Если кто-то знает как решить эти проблемы, или, возможно, кто-то знает библиотеку, которая работает стабильно буду очень благодарен за комментарий.


4. Заключение


В итоге у нас имеются три класса для json middleware с разными возможностями. Любой из них можно изменить под свои нужды. Или создать на их основе новый.


Можно создавать любые оболочки для содержимого запросов и ответов. Так же, можно гибко настраивать валидацию, и применять ее только там, где она действительно необходима.


Не сомневаюсь в том, что примеры которые я предложил, можно реализовать и по другому. Но надеюсь, что мои решения, если и не пригодятся полностью, то поспособствуют нахождению иных, более подходящих.


Спасибо за уделенное время. Буду рад замечаниям, и уточнениям.


При публикации статьи использовал MarkConv

Подробнее..

Из песочницы День и ночь в интернете, или открытое письмо веб-разработчикам

30.07.2020 16:13:15 | Автор: admin

Я веб-разработчик со стажем, и мне приходится подолгу сидеть за компьютером. Поэтому, чтобы к вечеру глаза не уставали, мне пришлось перепробовать все возможные способы снижения нагрузки на зрение от защитных экранов, специальных очков и упражнений для глаз до различных режимов eye saver и прочих манипуляций с настройками монитора.


На мой взгляд, одним из самых эффективных методов борьбы за свое здоровье является выбор темных, или ночных тем в настройках операционной системы. Однако, мне постоянно приходится переключаться между текстовым редактором и браузером, и вот тут происходит очень неприятный для зрения эффект.


Как известно, операционная система не позволяет управлять содержимым веб-сайта. Решение о том, как оформить веб-страницу, принимает дизайнер, а не пользователь, и большинство страниц светлые. Конечно, существуют плагины типа Dark Reader, которые пытаются решить эту проблему. Но, к сожалению, все они так или иначе искажают содержимое страниц и, кроме того, могут нарушать работу скриптов сайта.


И вот, во время работы над одним из проектов, мне написал наш главный редактор и попросил использовать темную тему, ссылаясь на то, что от светлой у него болят глаза. Но в проекте публикуется, в том числе, детский контент, поэтому сделать это было никак нельзя.


Это было последней каплей, и я сказал себе: все, хватит! Пусть сам посетитель выбирает, в каком режиме показывать страницу, в соответствии со своими личными предпочтениями.


Дальше было все просто. Техническая реализация идеи заняла у меня максимум пол-дня, включая тесты и размышления о том, где бы в коде лучше разместить переключатель. Я просто вынес все упоминания цвета из основного CSS-файла в файл light.css, потом скопировал его в файл dark.css и изменил некоторые (даже не все) цвета. Вот что получилось в итоге:


image


Детали реализации (на примере aiohttp)
main.py:    async def create_app():        ...        jinja_setup(            app,            context_processors=[BaseHandler().context_processor])views.py:    class BaseHandler:        async def context_processor(self, request):            ...            return {                'theme': 'dark'                    if request.cookies.get('theme') == 'dark' else 'light'}base.html:    <head>        ...        <link rel="stylesheet"            href="{{ static_root_url }}/css/{{ theme }}.css">    </head>    <body>        ...        <div class="nav-block nav-item nav-theme">            {% if theme == 'dark' %}                <a href="javascript:void(0)" class="js-theme"                    data-theme="light">                    Дневной режим                </a>            {% else %}                <a href="javascript:void(0)" class="js-theme"                    data-theme="dark">                    Ночной режим                </a>            {% endif %}        </div>        ...    </body>base.js:    ...    $('.js-theme').on('click', function () {        $.setCookie('theme', $(this).data('theme'));        location.reload();    });

Как видите, достаточно было просто разместить на видном месте кнопку Ночной режим, которая выбирает нужный файл цветовой схемы. Это оказалось удобно, и благодарности не заставили себя ждать.


А теперь давайте помечтаем о том, как это могло бы быть, если бы было веб-стандартом. Предположим, в настройках ОС одним из первых пунктов идет что-то вроде Look and feel, который предоставляет выбор между темными и светлыми темами. ОС передает эту настройку браузеру, а тот, в свою очередь, веб-странице. Адаптивный сайт (или, если угодно, ресурс с персонализированным интерфейсом), получив такую инструкцию, должен отреагировать на нее переключением цветовой схемы и предоставить посетителю выбор: использовать текущую схему или переключить день на ночь (или наоборот).


И в заключение: уважаемые коллеги-разработчики, давайте будем уважать себя и наших пользователей и начнем внедрять подобные решения в наших проектах уже сейчас! Это совсем не сложно.


Мы ведь делаем сайты для людей, да?

Подробнее..

Aiohttp Dependency Injector руководство по применению dependency injection

02.08.2020 22:16:32 | Автор: admin
Привет,

Я создатель Dependency Injector. Это dependency injection фреймворк для Python.

Продолжаю серию руководств по применению Dependency Injector для построения приложений.

В этом руководстве хочу показать как применять Dependency Injector для разработки aiohttp приложений.

Руководство состоит из таких частей:

  1. Что мы будем строить?
  2. Подготовка окружения
  3. Структура проекта
  4. Установка зависимостей
  5. Минимальное приложение
  6. Giphy API клиент
  7. Сервис поиска
  8. Подключаем поиск
  9. Немного рефакторинга
  10. Добавляем тесты
  11. Заключение

Завершенный проект можно найти на Github.

Для старта необходимо иметь:

  • Python 3.5+
  • Virtual environment

И желательно иметь:

  • Начальные навыки разработки с помощью aiohttp
  • Общее представление о принципе dependency injection


Что мы будем строить?





Мы будем строить REST API приложение, которое ищет забавные гифки на Giphy. Назовем его Giphy Navigator.

Как работает Giphy Navigator?

  • Клиент отправляет запрос указывая что искать и сколько результатов вернуть.
  • Giphy Navigator возвращает ответ в формате json.
  • Ответ включает:
    • поисковый запрос
    • количество результатов
    • список url гифок

Пример ответа:

{    "query": "Dependency Injector",    "limit": 10,    "gifs": [        {            "url": "https://giphy.com/gifs/boxes-dependent-swbf2-6Eo7KzABxgJMY"        },        {            "url": "https://giphy.com/gifs/depends-J56qCcOhk6hKE"        },        {            "url": "https://giphy.com/gifs/web-series-ccstudios-bro-dependent-1lhU8KAVwmVVu"        },        {            "url": "https://giphy.com/gifs/TheBoysTV-friends-friend-weneedeachother-XxR9qcIwcf5Jq404Sx"        },        {            "url": "https://giphy.com/gifs/netflix-a-series-of-unfortunate-events-asoue-9rgeQXbwoK53pcxn7f"        },        {            "url": "https://giphy.com/gifs/black-and-white-sad-skins-Hs4YzLs2zJuLu"        },        {            "url": "https://giphy.com/gifs/always-there-for-you-i-am-here-PlayjhCco9jHBYrd9w"        },        {            "url": "https://giphy.com/gifs/stream-famous-dollar-YT2dvOByEwXCdoYiA1"        },        {            "url": "https://giphy.com/gifs/i-love-you-there-for-am-1BhGzgpZXYWwWMAGB1"        },        {            "url": "https://giphy.com/gifs/life-like-twerk-9hlnWxjHqmH28"        }    ]}

Подготовим окружение


Начнём с подготовки окружения.

В первую очередь нам нужно создать папку проекта и virtual environment:

mkdir giphynav-aiohttp-tutorialcd giphynav-aiohttp-tutorialpython3 -m venv venv

Теперь давайте активируем virtual environment:

. venv/bin/activate

Окружение готово, теперь займемся структурой проекта.

Структура проекта


В этом разделе организуем структуру проекта.

Создадим в текущей папке следующую структуру. Все файлы пока оставляем пустыми.

Начальная структура:

./ giphynavigator/    __init__.py    application.py    containers.py    views.py venv/ requirements.txt

Установка зависимостей


Пришло время установить зависимости. Мы будем использовать такие пакеты:

  • dependency-injector dependency injection фреймворк
  • aiohttp веб фреймворк
  • aiohttp-devtools библиотека-помогатор, которая предоставляет сервер для разработки с live-перезагрузкой
  • pyyaml библиотека для парсинга YAML файлов, используется для чтения конфига
  • pytest-aiohttp библиотека-помогатор для тестирования aiohttp приложений
  • pytest-cov библиотека-помогатор для измерения покрытия кода тестами

Добавим следующие строки в файл requirements.txt:

dependency-injectoraiohttpaiohttp-devtoolspyyamlpytest-aiohttppytest-cov

И выполним в терминале:

pip install -r requirements.txt

Дополнительно установим httpie. Это HTTP клиент для командной строки. Мы будем
использовать его для ручного тестирования API.

Выполним в терминале:

pip install httpie

Зависимости установлены. Теперь построим минимальное приложение.

Минимальное приложение


В этом разделе построим минимальное приложение. У него будет эндпоинт, который будет возвращать пустой ответ.

Отредактируем views.py:

"""Views module."""from aiohttp import webasync def index(request: web.Request) -> web.Response:    query = request.query.get('query', 'Dependency Injector')    limit = int(request.query.get('limit', 10))    gifs = []    return web.json_response(        {            'query': query,            'limit': limit,            'gifs': gifs,        },    )

Теперь добавим контейнер зависимостей (дальше просто контейнер). Контейнер будет содержать все компоненты приложения. Добавим первые два компонента. Это aiohttp приложение и представление index.

Отредактируем containers.py:

"""Application containers module."""from dependency_injector import containersfrom dependency_injector.ext import aiohttpfrom aiohttp import webfrom . import viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = aiohttp.Application(web.Application)    index_view = aiohttp.View(views.index)

Теперь нам нужно создать фабрику aiohttp приложения. Ее обычно называют
create_app(). Она будет создавать контейнер. Контейнер будет использован для создания aiohttp приложения. Последним шагом настроим маршрутизацию мы назначим представление index_view из контейнера обрабатывать запросы к корню "/" нашего приложения.

Отредактируем application.py:

"""Application module."""from aiohttp import webfrom .containers import ApplicationContainerdef create_app():    """Create and return aiohttp application."""    container = ApplicationContainer()    app: web.Application = container.app()    app.container = container    app.add_routes([        web.get('/', container.index_view.as_view()),    ])    return app

Контейнер первый объект в приложении. Он используется для получения всех остальных объектов.

Теперь мы готовы запустить наше приложение:

Выполните команду в терминале:

adev runserver giphynavigator/application.py --livereload

Вывод должен выглядеть так:

[18:52:59] Starting aux server at http://localhost:8001 [18:52:59] Starting dev server at http://localhost:8000 

Используем httpie чтобы проверить работу сервера:

http http://127.0.0.1:8000/

Вы увидите:

HTTP/1.1 200 OKContent-Length: 844Content-Type: application/json; charset=utf-8Date: Wed, 29 Jul 2020 21:01:50 GMTServer: Python/3.8 aiohttp/3.6.2{    "gifs": [],    "limit": 10,    "query": "Dependency Injector"}

Минимальное приложение готово. Давайте подключим Giphy API.

Giphy API клиент


В этом разделе мы интегрируем наше приложение с Giphy API. Мы создадим собственный API клиент используя клиентскую часть aiohttp.

Создайте пустой файл giphy.py в пакете giphynavigator:

./ giphynavigator/    __init__.py    application.py    containers.py    giphy.py    views.py venv/ requirements.txt

и добавьте в него следующие строки:

"""Giphy client module."""from aiohttp import ClientSession, ClientTimeoutclass GiphyClient:    API_URL = 'http://api.giphy.com/v1'    def __init__(self, api_key, timeout):        self._api_key = api_key        self._timeout = ClientTimeout(timeout)    async def search(self, query, limit):        """Make search API call and return result."""        if not query:            return []        url = f'{self.API_URL}/gifs/search'        params = {            'q': query,            'api_key': self._api_key,            'limit': limit,        }        async with ClientSession(timeout=self._timeout) as session:            async with session.get(url, params=params) as response:                if response.status != 200:                    response.raise_for_status()                return await response.json()

Теперь нам нужно добавить GiphyClient в контейнер. У GiphyClient есть две зависимости, которые нужно передать при его создании: API ключ и таймаут запроса. Для этого нам нужно будет воспользоваться двумя новыми провайдерами из модуля dependency_injector.providers:

  • Провайдер Factory будет создавать GiphyClient.
  • Провайдер Configuration будет передавать API ключ и таймаут GiphyClient.

Отредактируем containers.py:

"""Application containers module."""from dependency_injector import containers, providersfrom dependency_injector.ext import aiohttpfrom aiohttp import webfrom . import giphy, viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = aiohttp.Application(web.Application)    config = providers.Configuration()    giphy_client = providers.Factory(        giphy.GiphyClient,        api_key=config.giphy.api_key,        timeout=config.giphy.request_timeout,    )    index_view = aiohttp.View(views.index)

Мы использовали параметры конфигурации перед тем как задали их значения. Это принцип, по которому работает провайдер Configuration.

Сначала используем, потом задаем значения.

Теперь давайте добавим файл конфигурации.
Будем использовать YAML.

Создайте пустой файл config.yml в корне проекта:

./ giphynavigator/    __init__.py    application.py    containers.py    giphy.py    views.py venv/ config.yml requirements.txt

И заполните его следующими строками:

giphy:  request_timeout: 10

Для передачи API ключа мы будем использовать переменную окружения GIPHY_API_KEY .

Теперь нам нужно отредактировать create_app() чтобы сделать 2 действие при старте приложения:

  • Загрузить конфигурацию из config.yml
  • Загрузить API ключ из переменной окружения GIPHY_API_KEY

Отредактируйте application.py:

"""Application module."""from aiohttp import webfrom .containers import ApplicationContainerdef create_app():    """Create and return aiohttp application."""    container = ApplicationContainer()    container.config.from_yaml('config.yml')    container.config.giphy.api_key.from_env('GIPHY_API_KEY')    app: web.Application = container.app()    app.container = container    app.add_routes([        web.get('/', container.index_view.as_view()),    ])    return app

Теперь нам нужно создать API ключ и установить его в переменную окружения.

Чтобы не тратить на это время сейчас используйте вот этот ключ:

export GIPHY_API_KEY=wBJ2wZG7SRqfrU9nPgPiWvORmloDyuL0

Для создания собственного ключа Giphy API следуйте этому руководству.

Создание Giphy API клиента и установка конфигурации завершена. Давайте перейдем к сервису поиска.

Сервис поиска


Пришло время добавить сервис поиска SearchService. Он будет:

  • Выполнять поиск
  • Форматировать полученный ответ

SearchService будет использовать GiphyClient.

Создайте пустой файл services.py в пакете giphynavigator:

./ giphynavigator/    __init__.py    application.py    containers.py    giphy.py    services.py    views.py venv/ requirements.txt

и добавьте в него следующие строки:

"""Services module."""from .giphy import GiphyClientclass SearchService:    def __init__(self, giphy_client: GiphyClient):        self._giphy_client = giphy_client    async def search(self, query, limit):        """Search for gifs and return formatted data."""        if not query:            return []        result = await self._giphy_client.search(query, limit)        return [{'url': gif['url']} for gif in result['data']]

При создании SearchService нужно передавать GiphyClient. Мы укажем это при добавлении SearchService в контейнер.

Отредактируем containers.py:

"""Application containers module."""from dependency_injector import containers, providersfrom dependency_injector.ext import aiohttpfrom aiohttp import webfrom . import giphy, services, viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = aiohttp.Application(web.Application)    config = providers.Configuration()    giphy_client = providers.Factory(        giphy.GiphyClient,        api_key=config.giphy.api_key,        timeout=config.giphy.request_timeout,    )    search_service = providers.Factory(        services.SearchService,        giphy_client=giphy_client,    )    index_view = aiohttp.View(views.index)

Создание сервиса поиска SearchService завершено. В следующем разделе мы подключим его к нашему представлению.

Подключаем поиск


Теперь мы готовы чтобы поиск заработал. Давайте используем SearchService в index представлении.

Отредактируйте views.py:

"""Views module."""from aiohttp import webfrom .services import SearchServiceasync def index(        request: web.Request,        search_service: SearchService,) -> web.Response:    query = request.query.get('query', 'Dependency Injector')    limit = int(request.query.get('limit', 10))    gifs = await search_service.search(query, limit)    return web.json_response(        {            'query': query,            'limit': limit,            'gifs': gifs,        },    )

Теперь изменим контейнер чтобы передавать зависимость SearchService в представление index при его вызове.

Отредактируйте containers.py:

"""Application containers module."""from dependency_injector import containers, providersfrom dependency_injector.ext import aiohttpfrom aiohttp import webfrom . import giphy, services, viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = aiohttp.Application(web.Application)    config = providers.Configuration()    giphy_client = providers.Factory(        giphy.GiphyClient,        api_key=config.giphy.api_key,        timeout=config.giphy.request_timeout,    )    search_service = providers.Factory(        services.SearchService,        giphy_client=giphy_client,    )    index_view = aiohttp.View(        views.index,        search_service=search_service,    )

Убедитесь что приложение работает или выполните:

adev runserver giphynavigator/application.py --livereload

и сделайте запрос к API в терминале:

http http://localhost:8000/ query=="wow,it works" limit==5

Вы увидите:

HTTP/1.1 200 OKContent-Length: 850Content-Type: application/json; charset=utf-8Date: Wed, 29 Jul 2020 22:22:55 GMTServer: Python/3.8 aiohttp/3.6.2{    "gifs": [        {            "url": "https://giphy.com/gifs/discoverychannel-nugget-gold-rush-rick-ness-KGGPIlnC4hr4u2s3pY"        },        {            "url": "https://giphy.com/gifs/primevideoin-ll1hyBS2IrUPLE0E71"        },        {            "url": "https://giphy.com/gifs/jackman-works-jackmanworks-l4pTgQoCrmXq8Txlu"        },        {            "url": "https://giphy.com/gifs/cat-massage-at-work-l46CzMaOlJXAFuO3u"        },        {            "url": "https://giphy.com/gifs/everwhatproductions-fun-christmas-3oxHQCI8tKXoeW4IBq"        },    ],    "limit": 10,    "query": "wow,it works"}



Поиск работает.

Немного рефакторинга


Наше представление index содержит два hardcoded значения:

  • Поисковый запрос по умолчанию
  • Лимит количества результатов

Давайте сделаем небольшой рефакторинг. Мы перенесем эти значения в конфигурацию.

Отредактируйте views.py:

"""Views module."""from aiohttp import webfrom .services import SearchServiceasync def index(        request: web.Request,        search_service: SearchService,        default_query: str,        default_limit: int,) -> web.Response:    query = request.query.get('query', default_query)    limit = int(request.query.get('limit', default_limit))    gifs = await search_service.search(query, limit)    return web.json_response(        {            'query': query,            'limit': limit,            'gifs': gifs,        },    )

Теперь нам нужно чтобы эти значения передавались при вызове. Давайте обновим контейнер.

Отредактируйте containers.py:

"""Application containers module."""from dependency_injector import containers, providersfrom dependency_injector.ext import aiohttpfrom aiohttp import webfrom . import giphy, services, viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = aiohttp.Application(web.Application)    config = providers.Configuration()    giphy_client = providers.Factory(        giphy.GiphyClient,        api_key=config.giphy.api_key,        timeout=config.giphy.request_timeout,    )    search_service = providers.Factory(        services.SearchService,        giphy_client=giphy_client,    )    index_view = aiohttp.View(        views.index,        search_service=search_service,        default_query=config.search.default_query,        default_limit=config.search.default_limit,    )

Теперь давайте обновим конфигурационный файл.

Отредактируйте config.yml:

giphy:  request_timeout: 10search:  default_query: "Dependency Injector"  default_limit: 10

Рефакторинг закончен. Мы сделали наше приложение чище перенесли hardcoded значения в конфигурацию.

В следующем разделе мы добавим несколько тестов.

Добавляем тесты


Было бы неплохо добавить несколько тестов. Давай сделаем это. Мы будем использовать pytest и coverage.

Создайте пустой файл tests.py в пакете giphynavigator:

./ giphynavigator/    __init__.py    application.py    containers.py    giphy.py    services.py    tests.py    views.py venv/ requirements.txt

и добавьте в него следующие строки:

"""Tests module."""from unittest import mockimport pytestfrom giphynavigator.application import create_appfrom giphynavigator.giphy import GiphyClient@pytest.fixturedef app():    return create_app()@pytest.fixturedef client(app, aiohttp_client, loop):    return loop.run_until_complete(aiohttp_client(app))async def test_index(client, app):    giphy_client_mock = mock.AsyncMock(spec=GiphyClient)    giphy_client_mock.search.return_value = {        'data': [            {'url': 'https://giphy.com/gif1.gif'},            {'url': 'https://giphy.com/gif2.gif'},        ],    }    with app.container.giphy_client.override(giphy_client_mock):        response = await client.get(            '/',            params={                'query': 'test',                'limit': 10,            },        )    assert response.status == 200    data = await response.json()    assert data == {        'query': 'test',        'limit': 10,        'gifs': [            {'url': 'https://giphy.com/gif1.gif'},            {'url': 'https://giphy.com/gif2.gif'},        ],    }async def test_index_no_data(client, app):    giphy_client_mock = mock.AsyncMock(spec=GiphyClient)    giphy_client_mock.search.return_value = {        'data': [],    }    with app.container.giphy_client.override(giphy_client_mock):        response = await client.get('/')    assert response.status == 200    data = await response.json()    assert data['gifs'] == []async def test_index_default_params(client, app):    giphy_client_mock = mock.AsyncMock(spec=GiphyClient)    giphy_client_mock.search.return_value = {        'data': [],    }    with app.container.giphy_client.override(giphy_client_mock):        response = await client.get('/')    assert response.status == 200    data = await response.json()    assert data['query'] == app.container.config.search.default_query()    assert data['limit'] == app.container.config.search.default_limit()

Теперь давайте запустим тестирование и проверим покрытие:

py.test giphynavigator/tests.py --cov=giphynavigator

Вы увидите:

platform darwin -- Python 3.8.3, pytest-5.4.3, py-1.9.0, pluggy-0.13.1plugins: cov-2.10.0, aiohttp-0.3.0, asyncio-0.14.0collected 3 itemsgiphynavigator/tests.py ...                                     [100%]---------- coverage: platform darwin, python 3.8.3-final-0 -----------Name                            Stmts   Miss  Cover---------------------------------------------------giphynavigator/__init__.py          0      0   100%giphynavigator/__main__.py          5      5     0%giphynavigator/application.py      10      0   100%giphynavigator/containers.py       10      0   100%giphynavigator/giphy.py            16     11    31%giphynavigator/services.py          9      1    89%giphynavigator/tests.py            35      0   100%giphynavigator/views.py             7      0   100%---------------------------------------------------TOTAL                              92     17    82%

Обратите внимание как мы заменяем giphy_client моком с помощью метода .override(). Таким образом можно переопределить возвращаемое значения любого провайдера.

Работа закончена. Теперь давайте подведем итоги.

Заключение


Мы построили aiohttp REST API приложение применяя принцип dependency injection. Мы использовали Dependency Injector в качестве dependency injection фреймворка.

Преимущество, которое вы получаете с Dependency Injector это контейнер.

Контейнер начинает окупаться, когда вам нужно понять или изменить структуру приложения. С контейнером это легко, потому что все компоненты приложения и их зависимости в одном месте:

"""Application containers module."""from dependency_injector import containers, providersfrom dependency_injector.ext import aiohttpfrom aiohttp import webfrom . import giphy, services, viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = aiohttp.Application(web.Application)    config = providers.Configuration()    giphy_client = providers.Factory(        giphy.GiphyClient,        api_key=config.giphy.api_key,        timeout=config.giphy.request_timeout,    )    search_service = providers.Factory(        services.SearchService,        giphy_client=giphy_client,    )    index_view = aiohttp.View(        views.index,        search_service=search_service,        default_query=config.search.default_query,        default_limit=config.search.default_limit,    )

Что дальше?


Подробнее..

Собираем данные AlphaVantage с Faust. Часть 1. Подготовка и введение

20.09.2020 14:22:27 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть I Введение

20.09.2020 16:05:00 | Автор: admin

http://personeltest.ru/aways/habrastorage.org/webt/wo/6b/ui/wo6buieqgfwzr4y5tczce4js0rc.png


Как я дошёл до жизни такой?


Сначала я решил опробовать Celery, которым пользовался ранее. В связи с асинхронностью проекта, я погрузился в вопрос и увидел статью, а так же проект, созданный автором статьи.


Скажу так, проект очень интересный и вполне успешно работает в других приложениях нашей команды, да и сам автор говорит о том, что смог выкатить в прод, заюзав асинхронный пул. Но, к сожалению, мне это не очень подошло, так как обнаружилась проблема с групповым запуском задач (см. group). На момент написания статьи issue уже закрыта, однако, работа велась на протяжении месяца. В любом случае, автору удачи и всех благ, так как рабочие штуки на либе уже есть в общем, дело во мне и для меня оказался инструмент сыроват. Вдобавок, в некоторых задачах было по 2-3 http-запроса к разным сервисам, таким образом даже при оптимизации задач мы создаём 4 тысячи tcp соединений, примерно каждые 2 часа не очень Хотелось бы создавать сессию на один тип задач при запуске воркеров. Чуть подробнее о большом кол-ве запросов через aiohttp тут.


В связи с этим, я начал искать альтернативы и нашёл! Создателями celery, а конкретно, как я понял Ask Solem, была создана Faust, изначально для проекта robinhood. Faust написана под впечатлением от Kafka Streams и работает с Kafka в качестве брокера, также для хранения результатов от работы агентов используется rocksdb, а самое главное это то, что библиотека асинхронна.


Также, можете глянуть краткое сравнение celery и faust от создателей последней: их различия, различия брокеров, реализацию элементарной задачи. Всё весьма просто, однако, в faust привлекает внимание приятная особенность типизированные данные для передачи в топик.


Что будем делать?


Итак, в небольшой серии статей я покажу, как собирать данные в фоновых задачах с помощью Faust. Источником для нашего пример-проекта будет, как следует из названия, alphavantage.co. Я продемонстрирую, как писать агентов (sink, топики, партиции), как делать регулярное (cron) выполнение, удобнейшие cli-комманды faust (обёртка над click), простой кластеринг, а в конце прикрутим datadog (работающий из коробки) и попытаемся, что-нибудь увидеть. Для хранения собранных данных будем использовать mongodb и motor для подключения.


P.S. Судя по уверенности, с которой написан пункт про мониторинг, думаю, что читатель в конце последней статьи всё-таки будет выглядеть, как-то так:


http://personeltest.ru/aways/habrastorage.org/webt/e5/v1/pl/e5v1plkcyvxyoawde4motgq7vpm.png


Требования к проекту


В связи с тем, что я уже успел наобещать, составим небольшой списочек того, что должен уметь сервис:


  1. Выгружать ценные бумаги и overview по ним (в т.ч. прибыли и убытки, баланс, cash flow за последний год) регулярно
  2. Выгружать исторические данные (для каждого торгового года находить экстремумы цены закрытия торгов) регулярно
  3. Выгружать последние торговые данные регулярно
  4. Выгружать настроенный список индикаторов для каждой ценной бумаги регулярно

Как полагается, выбираем имя проекту с потолка: horton


Готовим инфраструктуру


Заголовок конечно сильный, однако, всё что нужно сделать это написать небольшой конфиг для docker-compose с kafka (и zookeeper в одном контейнере), kafdrop (если нам захочется посмотреть сообщения в топиках), mongodb. Получаем [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) следующего вида:


version: '3'services:  db:    container_name: horton-mongodb-local    image: mongo:4.2-bionic    command: mongod --port 20017    restart: always    ports:      - 20017:20017    environment:      - MONGO_INITDB_DATABASE=horton      - MONGO_INITDB_ROOT_USERNAME=admin      - MONGO_INITDB_ROOT_PASSWORD=admin_password  kafka-service:    container_name: horton-kafka-local    image: obsidiandynamics/kafka    restart: always    ports:      - "2181:2181"      - "9092:9092"    environment:      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"      KAFKA_RESTART_ATTEMPTS: "10"      KAFKA_RESTART_DELAY: "5"      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"  kafdrop:    container_name: horton-kafdrop-local    image: 'obsidiandynamics/kafdrop:latest'    restart: always    ports:      - '9000:9000'    environment:      KAFKA_BROKERCONNECT: kafka-service:29092    depends_on:      - kafka-service

Тут вообще ничего сложного. Для kafka объявили два listener'а: одного (internal) для использования внутри композной сети, а второго (external) для запросов из вне, поэтому пробросили его наружу. 2181 порт zookeeper'а. По остальному, я думаю, ясно.


Готовим скелет проекта


В базовом варианте структура нашего проекта должна выглядеть так:


horton docker-compose.yml horton     agents.py *     alphavantage.py *     app.py *     config.py     database      connect.py      cruds       base.py       __init__.py       security.py *      __init__.py     __init__.py     records.py *     tasks.py *

*Всё что я отметил мы пока не трогаем, а просто создаём пустые файлы.**


Создали структуру. Теперь добавим необходимые зависимости, напишем конфиг и подключение к mongodb. Полный текст файлов приводить в статье не буду, чтобы не затягивать, а сделаю ссылки на нужные версии.


Начнём с зависимостей и мета о проекте pyproject.toml


Далее, запускаем установку зависимостей и создание virtualenv (либо, можете сами создать папку venv и активировать окружение):


pip3 install poetry (если ещё не установлено)poetry install

Теперь создадим config.yml креды и куда стучаться. Сразу туда можно разместить и данные для alphavantage. Ну и переходим к config.py извлекаем данные для приложения из нашего конфига. Да, каюсь, заюзал свою либу sitri.


По подключению с монго совсем всё просто. Объявили класс клиента для подключения и базовый класс для крудов, чтобы проще было делать запросы по коллекциям.


Что будет дальше?


Статья получилась не очень большая, так как здесь я говорю только о мотивации и подготовке, поэтому не обессудьте обещаю, что в следующей части будет экшн и графика.


Итак, а в этой самой следующей части мы:


  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
  2. Сделаем агента, который будет собирать данные о ценных бумагах и исторические цены по ним.

Код проекта


Код этой части

Подробнее..

Фоновые задачи на Faust, Часть II Агенты и Команды

23.09.2020 04:06:02 | Автор: admin

Оглавление

  1. Часть I: Введение

  2. Часть II: Агенты и Команды

Что мы тут делаем?

Итак-итак, вторая часть. Как и писалось ранее, в ней мы сделаем следующее:

  1. Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.

  2. Сделаем агента, который будет собирать данные о ценных бумагах и мета информацию по ним.

Но, это то, что мы сделаем для самого проекта, а в плане исследования faust мы узнаем, как писать агентов, обрабатывающих стрим событий из kafka, а так же как написать команды (обёртка на click), в нашем случаи - для ручного пуша сообщения в топик, за которым следит агент.

Подготовка

Клиент AlphaVantage

Для начала, напишем небольшой aiohttp клиентик для запросов на alphavantage.

alphavantage.py

Spoiler
import urllib.parse as urlparsefrom io import StringIOfrom typing import Any, Dict, List, Unionimport aiohttpimport pandas as pdimport stringcasefrom loguru import loggerfrom horton.config import API_ENDPOINTclass AlphaVantageClient:    def __init__(        self,        session: aiohttp.ClientSession,        api_key: str,        api_endpoint: str = API_ENDPOINT,    ):        self._query_params = {"datatype": "json", "apikey": api_key}        self._api_endpoint = api_endpoint        self._session = session    @logger.catch    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:        formatted_data = {}        for field, item in data.items():            formatted_data[stringcase.snakecase(field)] = item        return formatted_data    @logger.catch    async def _construct_query(        self, function: str, to_json: bool = True, **kwargs    ) -> Union[Dict[str, Any], str]:        path = "query/"        async with self._session.get(            urlparse.urljoin(self._api_endpoint, path),            params={"function": function, **kwargs, **self._query_params},        ) as response:            data = (await response.json()) if to_json else (await response.text())            if to_json:                data = self._format_fields(data)        return data    @logger.catch    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)        data = pd.read_csv(StringIO(data))        securities = data.to_dict("records")        for index, security in enumerate(securities):            security = self._format_fields(security)            security["_type"] = "physical"            securities[index] = security        return securities    @logger.catch    async def get_security_overview(self, symbol: str) -> Dict[str, str]:        return await self._construct_query("OVERVIEW", symbol=symbol)    @logger.catch    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query(            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"        )    @logger.catch    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)    @logger.catch    async def get_indicator_data(        self, symbol: str, indicator: str, **indicator_options    ) -> Dict[str, Any]:        return await self._construct_query(            indicator, symbol=symbol, **indicator_options        )

Собственно по нему всё ясно:

  1. API AlphaVantage достаточно просто и красиво спроектирована, поэтому все запросы я решил проводить через метод construct_query где в свою очередь идёт http вызов.

  2. Все поля я привожу к snake_case для удобства.

  3. Ну и декорация logger.catch для красивого и информативного вывода трейсбека.

P.S. Незабываем локально добавить токен alphavantage в config.yml, либо экспортировать переменную среды HORTON_SERVICE_APIKEY. Получаем токен тут.

CRUD-класс

У нас будет коллекция securities для хранения мета информации о ценных бумагах.

database/security.py

Тут по-моему ничего пояснять не нужно, а базовый класс сам по себе достаточно прост.

get_app()

Добавим функцию создания объекта приложения в app.py

Spoiler
import faustfrom horton.config import KAFKA_BROKERSdef get_app():    return faust.App("horton", broker=KAFKA_BROKERS)

Пока у нас будет самое простое создание приложения, чуть позже мы его расширим, однако, чтобы не заставлять вас ждать, вот референсы на App-класс. На класс settings тоже советую взглянуть, так как именно он отвечает за большую часть настроек.

Основная часть

Агент сбора и сохранения списка ценных бумаг

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Так, сначала получаем объект faust-приложения - это достаточно просто. Далее, мы явно объявляем топик для нашего агента... Тут стоит упомянуть, что это такое, что за параметр internal и как это можно устроить по-другому.

  1. Топики в kafka, если мы хотим узнать точное определение, то лучше прочитать офф. доку, либо можно прочитать конспект на хабре на русском, где так же всё достаточно точно отражено :)

  2. Параметр internal, достаточно хорошо описанный в доке faust, позволяет нам настраивать топик прямо в коде, естественно, имеются ввиду параметры, предусмотренные разработчиками faust, например: retention, retention policy (по-умолчанию delete, но можно установить и compact), кол-во партиций на топик (partitions, чтобы сделать, например, меньшее чем глобальное значение приложения faust).

  3. Вообще, агент может создавать сам управляемый топик с глобальными значениями, однако, я люблю объявлять всё явно. К тому же, некоторые параметры (например, кол-во партиций или retention policy) топика в объявлении агента настроить нельзя.

    Вот как это могло было выглядеть без ручного определения топика:

app = get_app()@app.agent()async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:pass

Ну а теперь, опишем, что будет делать наш агент :)

app = get_app()collect_securities_topic = app.topic("collect_securities", internal=True)@app.agent(collect_securities_topic)async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for _ in stream:            logger.info("Start collect securities")            client = AlphaVantageClient(session, API_KEY)            securities = await client.get_securities()            for security in securities:                await SecurityCRUD.update_one(                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True                )            yield True

Итак, в начале агента мы открываем aiohttp сессию для запросов через наш клиент. Таким образом, при запуске воркера, когда будет запущен наш агент, сразу же будет открыта сессия - одна, на всё время работы воркера (или несколько, если изменить параметр concurrency у агента с дефолтной единички).

Далее, мы идём по стриму (сообщение мы помещаем в _, так как нам, в данном агенте, безразлично содержание) сообщений из нашего топика, если они есть при текущем сдвиге (offset), иначе, наш цикл будет ожидать их поступления. Ну а внутри нашего цикла, мы логируем поступление сообщения, получаем список активных (get_securities возвращает по-умолчания только active, см. код клиента) ценных бумаг и сохраняем его в базу, проверяя при этом, есть ли бумага с таким тикером и биржей в БД, если есть, то она (бумага) просто обновится.

Запустим наше творение!

> docker-compose up -d... Запуск контейнеров ...> faust -A horton.agents worker --without-web -l info

P.S. Возможности веб-компонента faust я рассматривать в статьях не буду, поэтому выставляем соответствующий флаг.

В нашей команде запуска мы указали faust'у, где искать объект приложения и что делать с ним (запустить воркер) с уровнем вывода логов info. Получаем следующий вывод:

Spoiler
aS v1.10.4 id           horton                                             transport    [URL('kafka://localhost:9092')]                    store        memory:                                            log          -stderr- (info)                                    pid          1271262                                            hostname     host-name                                          platform     CPython 3.8.2 (Linux x86_64)                       drivers                                                           transport  aiokafka=1.1.6                                       web        aiohttp=3.6.2                                      datadir      /path/to/project/horton-data                       appdir       /path/to/project/horton-data/v1                   ... логи, логи, логи ...Topic Partition Set topic                       partitions  collect_securities          {0-7}       horton-__assignor-__leader  {0}         

Оно живое!!!

Посмотрим на partition set. Как мы видим, был создан топик с именем, которое мы обозначили в коде, кол-во партиций дефолтное (8, взятое из topic_partitions - параметра объекта приложения), так как у нашего топика мы индивидуальное значение (через partitions) не указывали. Запущенному агенту в воркере отведены все 8 партициций, так как он единственный, но об этом будет подробнее в части про кластеринг.

Что же, теперь можем зайти в другое окно терминала и отправить пустое сообщение в наш топик:

> faust -A horton.agents send @collect_securities{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. с помощью @ мы показываем, что посылаем сообщение в топик с именем "collect_securities".

В данном случае, сообщение ушло в 6 партицию - это можно проверить, зайдя в kafdrop на localhost:9000

Перейдя в окно терминала с нашим воркером, мы увидим радостное сообщение, посланное с помощью loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Так же, можем заглянуть в mongo (с помощью Robo3T или Studio3T) и увидеть, что ценные бумаги в базе:

Я не миллиардер, а потому, довольствуемся первым вариантом просмотра.

Счастье и радость - первый агент готов :)

Агент готов, да здравствует новый агент!

Да, господа, нами пройдена только 1/3 пути, уготованного этой статьёй, но не унывайте, так как сейчас будет уже легче.

Итак, теперь нам нужен агент, который собирает мета информацию и складывает её в документ коллекции:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[?],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            ...

Так как этот агент будет обрабатывать информацию о конкретной security, нам нужно в сообщении указать тикер (symbol) этой бумаги. Для этого в faust существуют Records - классы, декларирующие схему сообщения в топике агента.

В таком случае перейдём в records.py и опишем, как должно выглядеть сообщение у этого топика:

import faustclass CollectSecurityOverview(faust.Record):    symbol: str    exchange: str

Как вы уже могли догадаться, faust для описания схемы сообщения использует аннотацию типов в python, поэтому и минимальная версия, поддерживаемая библиотекой - 3.6.

Вернёмся к агенту, установим типы и допишем его:

collect_security_overview_topic = app.topic(    "collect_security_overview", internal=True, value_type=CollectSecurityOverview)@app.agent(collect_security_overview_topic)async def collect_security_overview(    stream: StreamT[CollectSecurityOverview],) -> AsyncIterable[bool]:    async with aiohttp.ClientSession() as session:        async for event in stream:            logger.info(                "Start collect security [{symbol}] overview", symbol=event.symbol            )            client = AlphaVantageClient(session, API_KEY)            security_overview = await client.get_security_overview(event.symbol)            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)            yield True

Как видите, мы передаём в метод инициализации топика новый параметр со схемой - value_type. Далее, всё по той же самой схеме, поэтому останавливаться на чём то ещё - смысла не вижу.

Ну что же, последний штрих - добавим в collect_securitites вызов агента сбора мета информации:

....for security in securities:    await SecurityCRUD.update_one({            "symbol": security["symbol"],            "exchange": security["exchange"]        },        security,        upsert = True,    )    await collect_security_overview.cast(        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])    )....

Используем ранее объявлению схему для сообщения. В данном случае, я использовал метод .cast, так как нам не нужно ожидать результат от агента, но стоит упомянуть, что способов послать сообщение в топик:

  1. cast - не блокирует, так как не ожидает результата. Нельзя послать результат в другой топик сообщением.

  2. send - не блокирует, так как не ожидает результата. Можно указать агента в топик которого уйдёт результат.

  3. ask - ожидает результата. Можно указать агента в топик которого уйдёт результат.

Итак, на этом с агентами на сегодня всё!

Команда мечты

Последнее, что я обещал написать в этой части - команды. Как уже говорилось ранее, команды в faust - это обёртка над click. Фактически faust просто присоединяет нашу кастомную команду к своему интерфейсу при указании ключа -A

После объявленных агентов в agents.py добавим функцию с декоратором app.command, вызывающую метод cast у collect_securitites:

@app.command()async def start_collect_securities():    """Collect securities and overview."""    await collect_securities.cast()

Таким образом, если мы вызовем список команд, в нём будет и наша новая команда:

> faust -A horton.agents --help....Commands:  agents                    List agents.  clean-versions            Delete old version directories.  completion                Output shell completion to be evaluated by the...  livecheck                 Manage LiveCheck instances.  model                     Show model detail.  models                    List all available models as a tabulated list.  reset                     Delete local table state.  send                      Send message to agent/topic.  start-collect-securities  Collect securities and overview.  tables                    List available tables.  worker                    Start worker instance for given app.

Ею мы можем воспользоваться, как любой другой, поэтому перезапустим faust воркер и начнём полноценный сбор ценных бумаг:

> faust -A horton.agents start-collect-securities

Что будет дальше?

В следующей части мы, на примере оставшихся агентов, рассмотрим, механизм sink для поиска экстремум в ценах закрытия торгов за год и cron-запуск агентов.

На сегодня всё! Спасибо за прочтение :)

Код этой части

P.S. Под прошлой частью меня спросили про faust и confluent kafka (какие есть у confluent фичи). Кажется, что confluent во многом функциональнее, но дело в том, что faust не имеет полноценной поддержки клиента для confluent - это следует из описания ограничений клиентов в доке.

Подробнее..

Aio api crawler

31.10.2020 06:11:16 | Автор: admin
Всем пример. Я начал работать над библиотекой для выдергивания данных из разных json api. Также она может использоваться для тестирования api.

Апишки описываются в виде классов, например

class Categories(JsonEndpoint):    url = "http://127.0.0.1:8888/categories"    params = {"page": range(100), "language": "en"}    headers = {"User-Agent": get_user_agent}    results_key = "*.slug"categories = Categories()class Posts(JsonEndpoint):    url = "http://127.0.0.1:8888/categories/{category}/posts"    params = {"page": range(100), "language": "en"}    url_params = {"category": categories.iter_results()}    results_key = "posts"    async def comments(self, post):        comments = Comments(            self.session,            url_params={"category": post.url.params["category"], "id": post["id"]},        )        return [comment async for comment in comments]posts = Posts()


В params и url_params могут быть функции(как здесь get_user_agent возвращает случайный useragent), range, итераторы, awaitable и асинхронные итераторы(таким образом можно увязать их между собой).

В параметрах headers и cookies тоже могут быть функции и awaitable.

Апи категорий в примере выше возвращает массив объектов, у которых есть slug, итератор будет возвроащать именно их. Подсунув этот итератор в url_params постов, итератор пройдется рекурсивно по всем категориям и по всем страницам в каждой. Он прервется когда наткнется на 404 или какую-то другую ошибку и перейдет к следующей категории.

А репозитории есть пример aiohttp сервера для этих классов чтобы всё можно было протестировать.

Помимо get параметров можно передавать их как data или json и задать другой method.

results_key разбивается по точке и будет пытаться выдергивать ключи из результатов. Например comments.*.text вернет текст каждого комментария из массива внутри comments.

Результаты оборачиваются во wrapper у которого есть свойства url и params. url это производное строки, у которой тоже есть params. Таким образом можно узнать какие параметры использовались для получения данного результата Это демонстрируется в методе comments.

Также там есть базовый класс Sink для обработки результатов. Например, складывания их в mq или базу данных. Он работает в отдельных тасках и получает данные через asyncio.Queue.

class LoggingSink(Sink):    def transform(self, obj):        return repr(obj)    async def init(self):        from loguru import logger        self.logger = logger    async def process(self, obj):        self.logger.info(obj)        return Truesink = LoggingSink(num_tasks=1)


Пример простейшего Sink. Метод transform позволяет провести какие-то манипуляции с объектом и вернуть None, если он нам не подходит. т.е. в тем также можно сделать валидацию.

Sink это асинхронный contextmanager, который при выходе по-идее будет ждать пока все объекты в очереди будут обработаны, потом отменит свои таски.

Ну и, наконец, для связки этого всего вместе я сделал класс Worker. Он принимает один endpoint и несколько sink`ов. Например,

worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])worker.run()


run запустит asyncio.run_until_complete для pipeline`а worker`а. У него также есть метод transform.

Ещё есть класс WorkerGroup который позволяет создать сразу несколько воркеров и сделать asyncio.gather для них.

В коде есть пример сервера, который генерит данные через faker и обработчиков для его endpoint`ов. Думаю это нагляднее всего.

Всё это на ранней стадии развития и я пока что часто менял api. Но сейчас вроде пришел к тому как это должно выглядеть. Буду раз merge request`ам и комментариям к моему коду.
Подробнее..
Категории: Python , Json , Python 3 , Aiohttp

Ещё раз о производительности фреймворков Python для веб разработки

23.12.2020 10:20:51 | Автор: admin
Недавно мне пришлось начинать проект нового веб сервиса, и я решил протестировать максимальную нагрузочную способность Django, а заодно сравнить её с Flaskом и AIOHTTP. Результат показался мне неожиданным, поэтому я просто оставлю его тут.

На диаграммах ниже приведены результаты простейшего Apache Benchmarka для фреймворков Django версии 3.1, Flask 1.1 и AIOHTTP 3.7. AIOHTTP работает в штатном однопоточном асинхронном режиме, Django и Flask обслуживаются синхронным WSGI сервером Gunicorn с числом потоков, равным числу доступных ядер процессора * 2. ASGI в тесте не участвовал.

Условия тестирования
Во всех трёх случаях выводится простая страница со списком по результатам выборки из реляционной базы данных PostgreSQL. Запрос я постарался сделать максимально приближенным к реальности:

SELECT r.id, r.auth_user_id, r.status, r.updated, r.label, r.content, u.username,    ARRAY_AGG(t.tag) tag, COUNT(*) OVER() cnt,    (        SELECT COUNT(*) FROM record r2            WHERE                r2.parent_id IS NOT NULL                AND r2.parent_id = r.id                AND r2.status = 'new'    ) AS partsFROM record rJOIN auth_user u ON u.id = r.auth_user_idLEFT JOIN tag t ON t.kind_id = r.id AND t.kind = 'rec'WHERE r.parent_id IS NULL AND r.status = 'new'GROUP BY r.id, u.usernameORDER BY r.updated DESCLIMIT 10 OFFSET 0

Этот запрос получает список корневых записей из основной таблицы приложения, число прямых потомков для каждой строки, данные владельца записи, список тегов и общее число строк выборки.

AIOHTTP использует пулл соединений с БД и драйвер asyncpg, Django и Flask SQLAlchemy без ORM (для чистоты эксперимента) и psycopg2.

Приложение Django создано стандартными средствами фреймворка (django-admin startproject, manage.py startapp и т.д.), вывод тестовой страницы через ListView. Установки Flask и AIOHTTP построены на канонических веб приложениях Hello, world, взятых из документации.

Результаты запуска теста на локальной машине (4 ядра CPU)



и на реальном однопроцессорном VDS (пинг около 45 ms)



Во время теста AIOHTTP использовал 100% одного ядра CPU, Flask и Django 100% всех доступных ядер.

Выводы


На самом деле, сравнение асинхронных и многопоточных приложений не совсем корректно они решают разные задачи. Поэтому, результат выглядит довольно логичным: в локальном тесте у AIOHTTP просто оказалось меньше ресурсов, при равных условиях производительность нивелируется.

А вот скромный результат Flaskа с трудом поддается объяснению, разогнать этот фреймворк у меня не получилось.
Подробнее..

Телеграм бот для автоматизации обменника криптовалюты

13.02.2021 14:22:01 | Автор: admin

Вместо предисловия

В этой статье я буду в общих чертах рассказывать про то, в каком направлении нужно двигаться, чтобы сделать полуавтоматический обменник криптовалюты с возможностью управлять сделками с любого устройства в любой точке планеты 24/7. Вы не найдете здесь деталей реалиализации, т.к. этот материал предназначен скорее для получения базового набора знаний, необходимых для запуска такого стартапа.

Полуавтоматический обменник криптовалюты.

Как-то очень давно я у же немного писал про использование телеграм бота для автоматизации некоторых процессов. Надо сказать, что уже не мало времени прошло, но я продолжаю использовать некоторые идеи, что были изложены в том небольшом материале.

Обмен криптовалюты сегодня - это уже не просто реальность, в какой-то мере это уже потребность. Время беспощадно и теперь цифровое золото становится очень важной частью активов миллионов людей. Обменники в интернете бывает нескольких видов, основные из которых:

  • полуавтоматические

  • ручные

  • автоматические

  • p2p - обменники

  • биржи

Мы поговорим про полуавтоматический вариант с возможностью расширения до p2p обменника, потому как это довольно простой и удобный способ.

Необходимый набор навыков.

Говоря довольно простой, я наверное выражаюсь несовсем корректно. В наше время навыки, которыми должен обладать разработчик, оцениваются в тысячи часов времени, которое он должен потратить на свое обучение. Я сегодня буду предельно краток, так что давайте сразу перейдем к делу. Для реализации задачи нам понадобится следующий набор иснтрументов:

  1. Linux, zsh, vim, systemd

  2. nginx, ssl

  3. ES6, Material Ui, React, eslint, webpack, scss

  4. python3, asyncio, aiohttp, peewee

  5. postgresql

  6. telegram bot api

  7. docker

И такие паттерны как:

  1. MVC - шаблон архитектуры системы

  2. Abstract Factory, Factory Method, Builder, Facade, Prototype - генерация объектов

  3. Scheduler - многопоточный постановщик задач

  4. Event Listner, State - события, сосстояния

  5. Proxy - заместитель для балансировки нагрузки

В общих чертах это вроде как все, что должно пригодится, согласитесь не мало. Давайте пробежимся теперь немного более подробно, чтобы понимать как все это барахло должно быть настроено, чтобы даже работать.

Теперь я начинаю с фронта

Если вы попробуете поискать в сети с чего начинать веб разработку - с фронтенда или с бэкэнда, вы наверное не найдете ничего более дельного, чем информация о том, что все это лучше делать параллельно и каких-то особых протоколов на этот счет нет. Т.е. фронтендер делает свою работу, бэкэндер свою, они встречаются на созвонах и в чате, обсуждают все проблемы: все хорошо. Но что, если вы собираетесь делать и фронт и бэк самостоятельно (например в случае небольшого приложения как криптообменник) - какая будет точка отправления?

Начинать лучше с фронта, потому как он может работать на моковых данных и бэкэнд ему собственно нужен только абсолютно гипотетически. Фронтенд - независимое приложение, он должен работать корректно в разных браузерах, на разных устройствах. Мы будем делать Single Page Application, а значит нам потребуется протокол взаимодействия, давайте выберем json-rpc. Для транспортного протокола используем tcp,а на прикладном уровне остановимся на http.

Дальше все довольно не трудно. Ставим Node Package Manager, создаем новое реакт приложение, добавляем туда react router,настраиваем eslint для форматирования кода, node-sass для возможности использования css препроцессора, webpack для сборки проекта.

Правильная структура проекта - залог успеха. Компоненты делаем модульными - файлы стилей лежат внутри дирректории рядом с компонентом. Компоненты по мере возможностей реализуем как stateless. Я бы пожалуй еще рекоммендовал дважды задуматься перед внедерением redux в приложение - делайте это только если вы точно уверены, что вам это нужно.

При работа с фронтом важно всегда помнить об области видимости переменных, потому как современный фронтенд это сложное многопоточное, асинхронное приложение. При этом при правильной организации файлов и компонентов, react и material ui позволяют делать все достаточно быстро. Если у вас нет готового дизайна - просто выберите сайт обменника, который вам нравится и не стесняясь копируйте его, превнося свои изменения - в дальнейшем ваш обменник все равно еще претерпит кучу изменений и не стоит сейчас беспокоится о том как это выглядит на текущей стадии.

Вот так может выглядеть ваш реакт компонент, отвечающий за рендеринг главной страницы:

Бэкэнд - это сложно, но куда веселее

Бэкэнд тоже должен быть асинхронным. P2P приложения должны быть ориентированы под высокую нагрузку, а значит сразу стоит закладывать немного больше, чем может показаться нужным. Мы будем делать монолитный бэкэнд, потому как серверная часть не будет очень большой. Микросервисы это здорово, но не всегда необходимо, и в данном случае мы не будем использовать этот подход.

asyncio позволяет работать с петлей событий, что в свою очередь предоставляет возможность асинхронного программирования и управления заданиями. В нашем случае у нас будет несколько заданий, которые должны будут работать независимо и параллельно основному приложению. Это задание на обновление курсов BTC/USD и USD/RUB, и задание, которое будет отменять устаревшие заявки на обмен валюты. Курсы вылют можно получать get запросом из апи всех популярных бирж, например coinbase, kraken, bitmex. Благо aiohttp clientпозволяет это делать в несколько строчек:

Для того, чтобы сериализовывать данные, необходимые фронту для построения интерфейса, сервер должен использовать объектно-ориентированную модель данных, проще говоря - нужно поделить наш обменник на модели, а эти модели сложить в таблицы бд, чтобы можно было строить к ней запросы.

При создании моделей стоит особое внимание уделить инкапсуляции и наследованию - хорошей идеей будет сразу создать BaseModel, в которую поменстить, например, поля created_at, updated_at и, например, datetime_serializer, который вам точно пригодится, а остальные модели наследовать от этой модели:

Для взаимодействия с блокчейн придется получить API KEY, например на blockchain.com. Хочу сразу отметить, что тут есть своего рода "подводный камень". Как работает блокчейн апи? После того, как создается транзакция, для ее завершения необходимы подтверждения от майнеров. Каждое подтверждение - это своего рода события, информацию о котором вы будете получать на свой сервер. В этом событии есть адрес кошелька, на который поступает криптовалюта. Теперь предположим, что для покупки криптовалюты в нашем обменнике мы всем пользователям будем предоставлять одинаковый кошелек для перевода. Это было бы довольно удобно, так как все биткоины были бы сосредоточены у нас на одном адресе, одной суммой. На первый взгляд. Но в таком случае при поступлении средств от пользователя на кошелек и последующих веб хуках от блокчейн на callback_url, мы не сможем определить от какого конкретно пользователя поступил платеж. Можно конечно использовать параметр в webhook url но есть еще один интересный нюанс. Нам важно знать курс по которому была совершена та или иная транзакция.Опять же, есть вариант хранить свзяь между транзакицей и курсом, но есть и альтернативное решение. Оно состоит в том, что каждому пользователю системы должен генерироваться свой уникальный BTC кошелек. И в случае, когда этот самый пользователь хочет совершить сделку в нашем обменном пункте и продавть свои кровные BTC, мы будем скидывать ему его уникальный адрес.

Дальше может покзаться, что целесообразно со всех этих адресов собирать все деньги на master wallet, но это не так, ведь за каждый перевод вы будете платить комиссию майнерам. К слову эту комиссию нужно считать ручками. Получить информацию от сети можно в любой момент:

Таким образом получается, что баланс нашего обменника сосредоточен децентрализованно на разных кошельках всех пользователей. При каждой транзакции мы записываем курс, по которому она была осуществлена, а ее статус (так же как впрочем и статус документа по этой транзакции) мы меняем в зависимости от подтверждений blockchain.К слову сразу имеет смысл подумать над реализацией классов Billing и Processing, для создания и проводки документов.

Для продажи крипты нам необходим механизм ручного взаимодействия с этими документами: нужно искать документ с выгодным на текущий момент курсом, проверять текущий баланс кошелька из этого документа, и выполнять перевод с этого кошелька на кошелек, который указывает пользователь в своей заявке на покупку в нашем обменнике. И вот мы наконец подобрались к кульминации нашего материала: к нам снова на помощь приходит телеграм.

Telegram bot

Тут все совсем не трудно. Создаем бота у @BotFather, настраиваем, берем токен, кладем его в конфиг (делаем два конфига и два бота - один на прод, один на дев).

Из этого токена мы можем создать бота и использовать его для получения важных данных, а так же для запуска протокола rpc, не требующего дополнительной авторизации и шифрования.

Теперь мы можем слать сообщения в телеграм из нашего бэкэнда, для этого мы создадим приватный чат и добавим бота администратором туда. Далее нам потребуется создать вебхук и контроллер для его обработки (читайте об этом в предыдущем материале про мониторинг). В контроллере нам необходимо создать диспетчер запросов, который будет определять обработчик.

Запросы от тг могут быть разные, нам пока нужы будут только message и callback_query (reply клавиатура и inline клавиатура).

Далее мы будем отправлять в этот чат сообщения с кнопками, которые позволят контролировать значения в базе данных. Здесь обращу внимание на race condition, и трудно-уловимые ошибки, по этому всегда используйте atomic_db_query

async with objects.atomic() as atomic_db_query:try:  pass  # some database change  except:  atomic_db_query.rollback()

Кнопки вы можете нажать после ручной модерации заявки - ну т.е. вы посмотрели, что деньги к вам на счет реально поступили, и только после этого нажали на кнопку, которая запустит механизм для завершения документооборота. Так же можно, например создать механизмsubscription, который позволит информировать подписавшихся пользователей на обновления курсов, например:

Это довольно удобно, ведь телеграм всегда под рукой, особенно после разблокировки . 24/7 все, кто находятся в приватной группе, смогут получать информационные сообщения, а так же управлять состоянием документов:

Настройка production

Нужно все это барахло завернуть в докер, настроить системные даемон для запускать юнита, в идеале конечно настроить CI-CD, но это все наверное уже детали.

В базовом варианте можно деплоится через гит, использовать переменные окружения для чтения конфигов, использовать ipython для проведения миграций в бд:

Нужно уметь использовать настраивать nginx, и понимать, как работает mod_rewrite.

location = /api/rates/rates.xml {  rewrite .* /api/rates/ last;}   

Кстати для добавления вашего обменника в мониторинги, вам понадобится xml выгрузки файла курсов, так что этот rewrite вам может еще пригодится.

Наверное вы захотите сделать какую-то админку - для этого отлично сгодится механизм Basic Auth и bootstrap admin template . Вам останется только пробросить в шаблоны необходимый контекст и немного поиграть с контролами:

Послевкусие

Это все сложновато, но в тоже время и не очень, если не наступать на грабли, которые, надо сказать, присутствуют. Не забывайте о JWT, SLL, CORS, и еще куче прелестей, которые по пути обязательно появятся у вас на пути. Но в целом это рабочая схема автоматизации механизмов, которые могут пригодится не только при создании обменника. Я не претендую на роль эксперта в этом деле, я лишь высказываю свои умозаключения, после довольно трудоемкого процесса прохождения через все вышесказанное. Не стоит принимать буквально - многое является весьма субъективным и не претендует на роль аксиомы. Я бы сказал бОльшая часть. Но под лежачий камень вода не течет, и лучшее решение на сегодня - это развитие и движение дальше.

Надеюсь кому-то покажется эта информация полезной. Просто захотелось немного поделиться переживаниями и опытом, полученным от процесса. Результат работы вы можете посмотреть на https://exbtc.pro/

Дальнейшие развитие позиционируется как p2p платформа для совершения обмена. Буду рад любым вопросам и предложениям, и большое спасибо за потраченное время.

Подробнее..

Первые шаги в aiohttp

31.05.2021 14:10:41 | Автор: admin

Введение

Привет, меня зовут Артём и я работаю бекендером в KTS. Компания уже 3 года проводит летние и зимние курсы по разработке, а в феврале этого года прошла очередная бесплатная backend-школа от KTS. В ее рамках студенты изучали инструменты и технологии, которые используют разработчики нашей компании, общались с менторами и делали итоговый проект - чат-бота в стиле Моя игра, который защищали в конце курса. После курса отличившихся студентов мы приглашали на стажировку.

Школа состояла из 6 лекций, шаг за шагом погружавших студентов в мир веб-разработки. На них были рассмотрены такие темы как сетевые протоколы, взаимодействие backend-а и frontend-а, компоненты веб-сервера и многое другое. Лейтмотивом курса было изучение асинхронного веб-программирования на Python, в частности изучение фреймворка aiohttp.

Для поступления на курс нужно было пройти комплексный тест на знания в области веба и python-а, так что студенты пришли учиться с хорошим начальным уровнем знаний. Однако, во время курса выяснилось, что не все темы даются одинаково легко. Самыми трудными для понимания темами стали:

  1. Асинхронное программирование

  2. Работа с СУБД

  3. Деплой приложения

Студенты задавали достаточно разные по уровню понимания вопросы, начиная от Как создать отложенную задачу, используя только asyncio? и заканчивая Почему нельзя использовать Django для асинхронного программирования? (имелась в виду полностью синхронная версия Django). В коде наши менторы тоже находили ошибки, связанные с недостаточным пониманием предмета, например, использование синхронного драйвера для базы данных в асинхронном проекте.

По результатам курса, я решил написать небольшой туториал, рассказывающий о создании базового aiohttp-сервиса с нуля и затрагивающий самые сложные для студентов вопросы: как сделать асинхронное python-приложение, как работать с базой данных и как разложить свой проект в интернете.

В цикле статей мы рассмотрим следующие темы:

  1. Архитектура веб-приложения

  2. Асинхронная работа с базой данных и автоматические миграции

  3. Работа с HTML-шаблонами с помощью Jinja2

  4. Размещение нашего приложения в Интернете с помощью сервиса Heroku

  5. А также сигналы, обработку ошибок, работу с Dockerом и многое другое.

Эта статья первая из трех, и ее цель помочь начинающим aiohttp-программистам написать первое hello-world приложение.

В этой статье мы напишем небольшое веб-приложение на aiohttpстену с отзывами, где пользователь может оставить мнение о продукте.

Мы пройдем по шагам:

Создание проекта

Все команды в статье были выполнены в операционной системе OSX, но также должны работать в любой *NIX системе, например в Linux Ubuntu. Во время разработки я буду использовать Python 3.7.

Давайте создадим папку aiohttp_server, которая в дальнейшем будет называться корнем проекта. В ней создадим текстовый файл requirements.txt, который будет содержать все необходимые для работы приложения зависимости и их версии. Запишем в него следующие модули:

aiohttp==3.7.3 # наш фрейворкaiohttp-jinja2==1.4.2 # модуль для работы с HTML-шаблонами

Создадим виртуальное окружение что-то вроде песочницы, которое содержит приложение со своими библиотеками, обновление и изменение которых не затронет другие приложение, и установим в него наши зависимости:

cd {путь_до_папки}/aiohttp_serverpython3 -m venv venvsource venv/bin/activate

После этого в начале строки терминала должна появится надпись (venv)это означает что виртуальное окружение успешно активировано. Установим необходимые модули:

pip install -r requirements.txt

Структура проекта

Создадим в папке aiohttp_server следующую структуру:

 app    __init__.py    forum       __init__.py       routes.py  # тут будут пути, по которым надо отправлять запросы       views.py  # тут будут функции, обрабатывающие запросы    settings.py main.py  # тут будет точка входа в приложение requirements.txt templates    index.html  # тут будет html-шаблон страницым сайта

Теперь откроем файл main.py и добавим в него следующее:

from aiohttp import web  # основной модуль aiohttpimport jinja2  # шаблонизатор jinja2import aiohttp_jinja2  # адаптация jinja2 к aiohttp# в этой функции производится настройка url-путей для всего приложенияdef setup_routes(application):   from app.forum.routes import setup_routes as setup_forum_routes   setup_forum_routes(application)  # настраиваем url-пути приложения forumdef setup_external_libraries(application: web.Application) -> None:   # указываем шаблонизатору, что html-шаблоны надо искать в папке templates   aiohttp_jinja2.setup(application, loader=jinja2.FileSystemLoader("templates"))def setup_app(application):   # настройка всего приложения состоит из:   setup_external_libraries(application)  # настройки внешних библиотек, например шаблонизатора   setup_routes(application)  # настройки роутера приложенияapp = web.Application()  # создаем наш веб-серверif __name__ == "__main__":  # эта строчка указывает, что данный файл можно запустить как скрипт   setup_app(app)  # настраиваем приложение   web.run_app(app)  # запускаем приложение

После предварительной настройки можно создать первый View.

Первый View

Viewэто некий вызываемый объект, который принимает на вход HTTP-запросRequest и возвращает на пришедший запрос HTTP-ответResponse.

Http-запрос содержит полезную информацию, например url запроса и его контекст, переданные пользователем данные и многое другое. В контексте запроса содержатся данные, которые мы или aiohttp добавили к этому запросу. Например, мы предварительно авторизовали пользователячтобы повторно не проверять авторизацию пользователя из базы во всех View и не дублировать код, мы можем добавить объект пользователя в контекст запроса. Тогда мы сможем получить нашего пользователя во View, например, так: request['user'].

HTTP-ответ включает в себя полезную нагрузку, например, данные в json, заголовки и статус ответа. В простейшем View, который из примера выше, всю работу по формированию HTTP-ответа выполняет декоратор @aiohttp_jinja2.template("index.html") . Декоратор получает данные из View, которые возвращаются в виде словаря, находит шаблон index.html (о шаблонах написано ниже), подставляет туда данные из этого словаря, преобразует шаблон в html-текст и передает его в ответ на запрос. Браузер парсит html и показывает страницу с нашим контентом.

В файле views.py в папке app/forum напишем следующий код:

import aiohttp_jinja2from aiohttp import web# создаем функцию, которая будет отдавать html-файл@aiohttp_jinja2.template("index.html")async def index(request):   return {'title': 'Пишем первое приложение на aiohttp'}

Здесь создается функциональный View (function-based View). Определение функциональный означает, что код оформлен в виде функции, а не классом (в следующей части мы коснемся и class-based View).

Рассмотрим написанную функцию детальнее: функция обернута в декоратор @aiohttp_jinja2.template("index.html")этот декоратор передает возвращенное функцией значение в шаблонизатор Jinja2, а затем возвращает сгенерированную шаблонизатором html-страницу как http-ответ. В данном случае возвращенным значением будет словарь, значения которого подставляются в html-файл index.html.

Отдельно стоит заметить, что объект запроса request передается как аргумент функции index. Мы не используем request в этой функции, но будем использовать в дальнейшем.

HTTP-запрос отправляется на конкретный url-адрес. Для передачи HTTP-запроса в нужный View необходимо задать эту связь в приложении с помощью Route.

Первый Route

Routeэто звено, связывающее адрес, по которому был отправлен запрос и код View, в котором этот запрос будет обработан. То есть, если пользователь перейдет в корень нашего сайта (по адресу /), то объект запроса будет передан в View index и оттуда же будет возвращен ответ. Подробней про Route можно прочитать тут.

В файл routes.py необходимо добавить следующий код:

from app.forum import views# настраиваем пути, которые будут вести к нашей страницеdef setup_routes(app):   app.router.add_get("/", views.index)

Первый Template

Теперь нам осталось только добавить в templates/index.html код верстку нашей страницы. Его можно найти по этой ссылке.

Templateэто html-шаблон, в который подставляются данные, полученные в результате обработки запроса. В примере в коде View отдается словарь с ключом title, шаблонизатор Jinja2 ищет в указанном html-шаблоне строки {{title}} и заменяет их на значение из словаря по данному ключу. Это простейший пример, шаблоны позволяют делать намного больше: выполнять операции ветвления, циклы и другие операции, например, суммирование. Примеры использования можно посмотреть в документации jinja2.

Запуск приложения

Мы создали первую версию нашего приложения! Осталось запустить его следующей командой в терминале (убедитесь, что находитесь в папке aiohttp_server):

python3 main.py

Вы должны увидеть следующий текст в консоли. Он означает, что сервер запущен на порту 8080.

======== Running on http://0.0.0.0:8080 ========(Press CTRL+C to quit)

Давайте теперь посмотрим результаты нашей работы! Для этого перейдите по адресу http://0.0.0.0:8080 в браузере. Вы должны увидеть первую версию нашего приложения. При клике на кнопку Отправить должно возникнуть сообщение о том, что отзыв отправлен.

Поздравляю! Вы успешно создали первое приложение на aiohttp!

Заключение

В статье рассмотрено создание простого приложения на aiohttp, которое принимает запрос пользователя и отдает html-страницу. Мы затронули:

  • Настройку виртуального окружения

  • Базовую настройку проекта на aiohttp

  • Создание View

  • Создание Route

  • Использование html-шаблонов

Наше приложение представляет собой простой веб-сервер, отдающий html-страницу по запросу - в нем нет никакого взаимодействия с базами данных, его структура максимально проста и оно недоступно пользователям в Интернете. В следующих статьях мы разберем, как вырастить из нашей заготовки настоящее веб-приложение на aiohttp и опубликовать его в Интернете.

Весь код статьи можно найти на гитхабе.

Пользуясь случаем, приглашаю всех читателей, интересующихся веб-разработкой, к нам на бесплатные занятия школу KTS. А для более опытных читателей сейчас идет запись на продвинутые курсы для backend-разработчиков, желающих повысить свои навыки в асинхронной веб-разработке. Всю информацию о всех школах можно найти на сайте, а также в нашем телеграм-чате.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru