Русский
Русский
English
Статистика
Реклама

Flask

Реализация аудиоконференций в Telegram Asterisk

25.09.2020 20:20:19 | Автор: admin


В предыдущей статье я описывал реализацию выбора пользователем места жительства при регистрации в моем telegram боте, который я создавал вдохновившись идеей Телефонного эфира. В этой же статье я опишу интеграцию бота с Asterisk .

Зачем ?


Многим не нравится что в Telegram нельзя осуществлять групповые звонки.
Ну не использовать же Viber ?)
Также есть ряд кейсов именно для такой реализации, например:
  • Для проведения анонимных аудиоконференций, когда не хочется засветить свой номер либо id среди участников конференции (сразу на ум приходит шабаш хакеров либо клуба анонимных алкоголиков). Не нужно находиться в какой либо группе, сообществе, канале
  • Когда не известно кто подключиться к конференции вообще, но нужно ограничить доступ паролем
  • Все прелести Asterisk: управление конференцией (mute/umute, kick), организация гибридных аудиоконференций с участием клиентов, зарегистрированных на asterisk, telegram и PSTN. Неплохо можно сэкономить на международных звонках
  • Организация корпоративного callback via telegram и т.п.

На ум приходит куча вариантов, их много, ограничено лишь фантазией. После многих лет работы с Asterisk я считаю, что главное завести на него звонок, а дальше с ним можно сделать все что годно, хоть в космос отправить.

Связка Asterisk VoIP- Telegram VoIP


Сама связка VoIP реализована благодаря библиотеки tg2sip. Использование ее описано в самом репозитории в разделе Usage. Есть еще несколько статей по настройке. Даже есть Docker образ.
Описание этой связки выходит за рамки данной статьи.
Единственный нюанс который я хотел бы озвучить это то, что нельзя позвонить на telegram_id, номера которого нет в Вашей книге контактов. Поэтому звонить нужно на номер телефона, на который зарегистрирован telegram.

В моем боте реализованы как публичные аудиоконференции (Эфиры), к которым может подключиться любой желающий, так и приватные аудиоконференции с доступом по паролю. Приватные комнаты/пароли создают сами пользователи и могут использовать бот в качестве площадки дле проведения аудиоконференций, совещаний и т.п.

Взаимодействие telegram bot Asterisk


Схема взаимодействия в моем боте выглядит следующим образом.
Пользователь выбирает желаемую комнату в меню бота, бот вызывает функцию взаимодействия с Asterisk по API передав в POST запросе параметры для подключения:
номер телефона абонента
идентификатор конференц комнаты
callerid для презентации в конференц комнате
язык для озвучивания пользователю уведомлений в системе Asterisk на родном языке
Далее, Asterisk осуществляет исходящий звонок через каналы telegram на номер, указанный в параметрах запроса. После ответа пользоветелем на звонок, Asterisk подключает его в соответствующую комнату.

Можно было бы использовать прямое подключение с бота на Astersik AMI, но я предпочитаю работать через API, чем проще тем лучше.

API на стороне Asterisk сервера


Код простого API на python. Для инициализации звонка используются .call файлы

#!/usr/bin/python3from flask import Flask, request, jsonifyimport codecsimport jsonimport globimport shutilapi_key = "s0m3_v3ry_str0ng_k3y"app = Flask(__name__)@app.route('/api/conf', methods= ['POST'])def go_conf():    content = request.get_json()    ## Блок авторизации    if not "api_key" in content:        return jsonify({'error': 'Authentication required', 'message': 'Please specify api key'}), 401    if not content["api_key"] == api_key:        return jsonify({'error': 'Authentication failure', 'message': 'Wrong api key'}), 401    ## Проверка наличия нужных параметров в запросе    if not "phone_number" in content or not "room_name" in content or not "caller_id" in content:        return jsonify({'error': 'not all parameters are specified'}), 400    if not "lang" in content:        lang = "ru"    else:        lang = content["lang"]    phone_number = content["phone_number"]    room_name = content["room_name"]    caller_id = content["caller_id"]    calls = glob.glob(f"/var/spool/asterisk/outgoing/*-{phone_number}*")    callfile = "cb_conf-" + phone_number + "-" + room_name + ".call"    filename = "/var/spool/asterisk/" + callfile    if calls:        return jsonify({'message': 'error', "text": "call already in progress"})    with codecs.open(filename, "w", encoding='utf8') as f:        f.write("Channel: LOCAL/" + phone_number + "@telegram-out\n")        f.write("CallerID: <" + caller_id + ">\n")        f.write("MaxRetries: 0\nRetryTime: 5\nWaitTime: 30\n")        f.write("Set: LANG=" + lang + "\nContext: conf-in\n")        f.write("Extension: " + room_name + "\nArchive: Yes\n")    shutil.chown(filename, user="asterisk", group="asterisk")    shutil.move(filename, "/var/spool/asterisk/outgoing/" + callfile)    return jsonify({'message': 'ok'})if __name__ == '__main__':    app.run(debug=True,host='0.0.0.0', port=8080)


При этом диалплан Asterisk в простом виде выглядит следующим образом:

[telegram-out]
exten => _+.!,1,NoOp()
same => n,Dial(SIP/${EXTEN}@telegram)

exten => _X!,1,NoOp()
same => n,Dial(SIP/+${EXTEN}@telegram)

[conf-in]
exten => _.!,1,NoOp()
same => n,Answer()
same => n,Wait(3)
same => n,Playback(beep)
same => n,Set(CHANNEL(language)=${LANG})
same => n,ConfBridge(${EXTEN})
same => n,Hangup


Данное API можно использовать и в других кейсах, например, для организации той же callback кнопки Перезвонить мне и т.п.

Функция вызова API


telephony_api.py
import requests# Заменить example.com на Ваш urlurl = "http://example.com:8080/api/conf"api_key = "s0m3_v3ry_str0ng_k3y"def go_to_conf(phone_number, room_name, caller_id, lang="ru"):    payload = "{\n\t\"phone_number\" : \""+phone_number+"\",\n\t\"room_name\" : \""+room_name+"\",\n    \"caller_id\" : \""+caller_id+"\",\n\t\"lang\": \""+lang+"\",\n\t\"api_key\": \""+api_key+"\"\n}"    headers = {        'content-type': "application/json",        'cache-control': "no-cache",        }    try:        response = requests.request("POST", url, data=payload, headers=headers, timeout=2, verify=False)        if "call already in progress" in response.text:            return False, "Ошибка. Звонок еще не завершен."        elif "error" in response.text:            print(response.text)            return False, "Ошибка. Произошел сбой. Попробуйте позже."        else:            return True, response.text    except:        return False, "Ошибка. Произошел сбой. Попробуйте позже."


Этих двух инструментов уже хватит для интеграции в Ваш бот, заворачивайте его в свою логику и используйте.

Пример бота для инициализации вызова в конференц комнату



#!/usr/bin/python3.6import telebotfrom telephony_api import go_to_confbot = telebot.TeleBot("ВашTOKEN")pnone_number = "799999999999"# Ваш номер телефона, на который зарегистрирован telegram аккаунт@bot.message_handler(content_types=['text'])def main_text_handler(message):    if message.text == "Подключи меня в аудиоконференцию":        bot.send_message(message.chat.id, "Ok. Сейчас на telegram Вам прийдет звонок, ответив на который Вы будете подключены в аудиоконференцию")        func_result, func_message = go_to_conf(pnone_number, "ROOM1", "Bob", "ru")        if not func_result:            bot.send_message(chat_id=message.chat.id, text=func_message)if __name__ == "__main__":)   print("bot started")   bot.polling(none_stop=True)

В данном примере номер телефона задан статически, в реальности же можно например, делать запросы в базу на соответствие message.chat.id номер телефона.

Надеюсь, моя статья поможет кому-то создать крутые проекты и в свою очередь поделиться ними с сообществом.
Подробнее..

Сокращаем ссылки без жира (F3)

18.08.2020 16:06:31 | Автор: admin


Сокращать ссылки в 13 лет не стыдно, верно?
Новичку, да и не только новичку, стоит попробовать написать свой Укротитель Ссылок, походу изучая какой-нибудь новый фреймворк. Чем я и занялся. Что сказать пятый бутстрап, обезжиренный фреймворк и частичка души.


Вот демо, а вот код. Для таких читателей, как я ;)


Фреймворк, не так ли?


Конечно не Laravel и тому подобные сегодня обойдёмся 65-ю килобайтами FatFreeFramework. Если Вы знакомы с Python Flask, то возникнет ощущение, что где-то это уже было:


#роутинг во Фласке@app.route('/')def hello_world():    return 'Hello, World!'



//роутинг в Обезжиренном$f3->route('GET /',    function() {        echo 'Hello, world!';    });

Ладно, забудьте. Скачиваем .zip с офф.сайта, распаковываем в папку, которая в этот же миг открывается в вашем Любимом Редакторе Кода. Очистите index.php и удалите все из /ui.


Здесь все предельно просто в папке ui у нас все Вьюшки, а если по простому прокачанные ХТМЛ шаблоны, которые мы будем показывать пользователю при заходе на определенный URL.


Вот каркас нашего "приложения":


<?php//Файл: index.php// Kickstart the framework$f3=require('lib/base.php');$f3->set('DEBUG', 1);if ((float)PCRE_VERSION<8.0)    trigger_error('PCRE version is out of date');$f3->config('config.ini');//ВЕСЬ ОСТАЛЬНОЙ КОД БУДЕМ ПИСАТЬ ЗДЕСЬ$f3->run();

Вот и все, что нужно знать для начала. Приступаем к кодингу!


[для разработки я использовал локальный XAMPP на Windows и VS Code, статья написана в Ноушене]


Homepage


Начнем с главной страницы. Логично, правда?


//Файл: index.php$f3->route('GET /',    function($f3) { //чтобы использовать функции F3 передаем его в роут                $view = new View; // создаем вьюшку        echo $view->render('home.htm'); //рендерим шаблон    });

Теперь нужно этот самый шаблон написать. Для простоты я использовал бутстрап v5 alpfa.


Не забудьте создавать все шаблоны в папке ui, иначе они не будут видны фреймворку


<!-- Файл: ui/home.htm --><!DOCTYPE html><html lang="en">    <head>        <meta charset="<?php echo $ENCODING; ?>" />        <meta name="viewport" content="width=device-width, initial-scale=1">        <title>Пишем (код), сокращаем (ссылки)!</title>        <link rel="stylesheet" href="http://personeltest.ru/aways/stackpath.bootstrapcdn.com/bootstrap/5.0.0-alpha1/css/bootstrap.min.css" integrity="sha384-r4NyP46KrjDleawBgD5tp8Y7UzmLA05oM1iAEQ17CSuDqnUK2+k9luXQOfXJCJ4I" crossorigin="anonymous">    </head>    <body class="text-center bg-dark text-light"> <!-- темная тема ;) -->        <!-- менюшка -->        <nav class="m-2">            <ul class="nav nav-pills justify-content-center">                <li class="nav-item">                    <a class="nav-link active" aria-current="page" href="#">Главная</a>                </li>                <li class="nav-item">                    <a class="nav-link" href="#">Статья на Хабре</a>                </li>                <li class="nav-item">                    <a class="nav-link" href="http://personeltest.ru/aways/nikonovs.ru">Создатель</a>                </li>            </ul>        </nav>        <div class="container">        <h1>Короткие ссылки уже здесь.</h1>        <!-- Будем отправлять данные POST-запросом на /newLink -->        <form class="mt-5 mb-3" action="/newLink/" method="POST">            <div class="row justify-content-center">                <div class="col-auto">                <label for="inputLink" class="col-form-label">Введи ссылку:</label>                </div>                <div class="col-auto">                <input required placeholder="http://personeltest.ru/aways/" type="url" name="link" id="inputLink" class="form-control mb-1" aria-describedby="inputLink">                </div>                <div class="col-auto">                <button type="submit" class="btn btn-outline-primary">Сократить!</button>                </div>            </div>        </form>        <!-- немного текста с Кликера -->        <p class="text-left m-auto mb-5" style="max-width: 30rem;">Довольно часто при общении в интернете люди пересылают друг другу ссылки. Иногда эти ссылки бывают длинными. Совсем иногда  очень длинными. Но почти всегда  слишком длинными. Для того чтобы избавиться от этой проблемы, был создан этот сайт. Да, я знаю, что таких тысячи, но этот же лучше!            А дальше все просто: вставляете ссылку в поле для ввода, нажимаете "Сократить!" и получаете короткий, совсем короткий URL. &copy; <a href="http://personeltest.ru/aways/clck.ru">Кликер</a></p>        <footer class="m-2">Сделано с <img width="20" height="20" src="http://personeltest.ru/aways/image.flaticon.com/icons/svg/833/833472.svg" alt="любовью">, <a href="http://personeltest.ru/aways/v5.getbootstrap.com/">пятым Bootstrap'ом</a>    и <a href="http://personeltest.ru/aways/fatfreeframework.com/">без жира</a></footer>        </div>    </body></html>

Вот и все, у нас уже работает главная страница. Форма отправляет POST-запросом ссылку, которую нужно сократить.
Теперь самое интересное (нет).


Работа с БД


Создадим БД MySQL. Если у Вас установлен PhpMyAdmin, то создайте новую БД "linker", а потом выполните этот SQL:


SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO";SET time_zone = "+00:00";CREATE TABLE IF NOT EXISTS `links` (  `code` varchar(10) NOT NULL,  `link` varchar(255) NOT NULL,  `hits` int(255) NOT NULL DEFAULT '0') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;ALTER TABLE `links`  ADD UNIQUE KEY `code` (`code`);

У нас будет 3 поля у каждой ссылки:


  1. Code это рандомные 4 символа после домена, по которым будет происходить перенаправление, вида example.com/ABC1
  2. Link Не сокращенная ссылка.
  3. Hits количество переходов по сокращенной ссылке.

Кратко расскажу принцип работы с БД, без жира.


<?php//сначала нужно подключиться к БД$db = new DB\SQL(    'mysql:host=localhost;port=3306;dbname=linker',    'root',    '');//Дальше есть два варианда работы с данными://Можно установить переменную в Фреймворк c помощью обычного SQL-запроса:$f3->set('result', $db->exec('SELECT * FROM wherever')); //они будут доступны в шаблонах, как <?= $resul ? >//А можно использовать встроенный SQL Mapper:$row = new DB\SQL\Mapper($db, 'links');$row->load(array('link="http://personeltest.ru/aways/habrahabr.ru"')); //теперь из этого объекта доступны все колонки строки, где ссылка на Хабр:$row_value = $row->somerow; //Вот так// Естесственно можно изменять значения:$row->link = 'http://personeltest.ru/aways/habr.com';$row->save(); //изменения нужно сохранить, а что вы думали// больше информации по работе с БД доступно здесь: https://a.nikonovs.ru/MPHR Настоятельно рекомендую прочитать, хотябы с помощью переводчика, встроенного в браузер.?>

Приступаем к Укорачиванию.


Обработка новой ссылки


Создаем новую Вьюшку в index'e, которая будет обрабатывать запрос из формы на главной странице.


Сначала создадим новый, но очень похожий на первый (home.htm) шаблон "newLink.htm".
Там мы будем выводить уже сокращенную ссылку и количество переходов по ней (чтобы снова увидеть эту "статистику" нужно снова сократить эту же ссылку адрес останется тот же).
Для вывода воспользуемся трюком с "переходом переменных":


<?php//Файл: нет (пример)//устанавливаем переменную в index'е и рендерим шаблон$f3->set('link', $shorted_link);$view = new View;echo $view->render('newLink.htm');//теперь в шаблоне можно использовать:<?= $link ?>

А вот и листинг newLink.html:


<!-- Файл: newLink.htm --><!DOCTYPE html><html lang="en">    <head>        <meta charset="<?php echo $ENCODING; ?>" />        <meta name="viewport" content="width=device-width, initial-scale=1">        <title>Пишем (код), сокращаем (ссылки)!</title>        <link rel="stylesheet" href="http://personeltest.ru/aways/stackpath.bootstrapcdn.com/bootstrap/5.0.0-alpha1/css/bootstrap.min.css" integrity="sha384-r4NyP46KrjDleawBgD5tp8Y7UzmLA05oM1iAEQ17CSuDqnUK2+k9luXQOfXJCJ4I" crossorigin="anonymous">    </head>    <body class="text-center bg-dark text-light">        <nav class="m-2">            <ul class="nav nav-pills justify-content-center">                <li class="nav-item">                    <a class="nav-link" aria-current="page" href="http://personeltest.ru/aways/habr.com/">Главная</a>                </li>                <li class="nav-item">                    <a class="nav-link" href="#">Статья на Хабре</a>                </li>                <li class="nav-item">                    <a class="nav-link" href="http://personeltest.ru/aways/nikonovs.ru">Создатель</a>                </li>            </ul>        </nav>        <div class="container">        <h1>Короткие ссылки уже здесь.</h1>        <!-- Убираем из формы функционал формы и выводим переменные -->        <form class="mt-5 mb-3">            <div class="row justify-content-center">                <div class="col-auto">                    <label for="inputLink" class="col-form-label">Сократили:</label>                </div>                <div class="col-auto">                    <input disabled required type="url" name="link" id="inputLink" class="form-control disabled" aria-describedby="inputLink" value="<?= $link ?>">                </div>            </div>            <p class="m-2 text-secondary">По этой ссылке перешли: `<?= $hits ?>`</p>        </form>        <a href="http://personeltest.ru/aways/habr.com/" class="mt-3 mb-5 btn btn-primary btn-lg">ВЕРНУТЬСЯ НА ГЛАВНУЮ</a>        <footer class="m-2">Сделано с <img width="20" height="20" src="http://personeltest.ru/aways/image.flaticon.com/icons/svg/833/833472.svg" alt="любовью">, <a href="http://personeltest.ru/aways/v5.getbootstrap.com/">пятым Bootstrap'ом</a>    и <a href="http://personeltest.ru/aways/fatfreeframework.com/">без жира</a></footer>        </div>    </body></html>

Пишем сам роут.


$f3->route('GET|POST /newLink', //мы будем обрабатывать и POST и GET    function($f3) {            $db = new DB\SQL( //Подключение к БД новое в каждом Роуте                'mysql:host=localhost;port=3306;dbname=linker',                'root',                ''            );            //прекрасная функция генерации радомных символов:            $permitted_chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ';            function generate_string($input, $strength = 4) {                $input_length = strlen($input);                $random_string = '';                for($i = 0; $i < $strength; $i++) {                    $random_character = $input[mt_rand(0, $input_length - 1)];                    $random_string .= $random_character;                }                return $random_string;            }            //проверка на повторение link - нам же не нужно чтобы каждый раз генерировались новые ссылки. link - уникальный.            $check = new DB\SQL\Mapper($db,'links');            $check->load(array('link="'. $link .'"'));            if ($check->dry()) {                $g_code = generate_string($permitted_chars);                $row = new DB\SQL\Mapper($db,'links');                $row->reset();                $row->code = $g_code;                $row->link = $link;                $row->save();            } else {                $g_code = $check->code; //если link повторяется, то показываем старый код            }            $short_link = 'https://'. $_SERVER['HTTP_HOST'] . '/' . $g_code; //собираем конечную ссылку            //параметры из $_POST можно получить с помощью $f3->get('POST'), поддерживается точечная нотация (поправьте, если неправильно называю): параметр "link" можно получить так:             $link = $f3->get('POST.link');            if ( !empty($f3->get('POST')) ) { //Выдаем HTML, только если POST не пустой.            $f3->set('link', $short_link);            $f3->set('hits', $check->hits);            $view = new View;            echo $view->render('newLink.htm');            } else { //иначе - редирект на главную                $f3->$f3->reroute('/');            }        });

Готово! На самом деле, это было просто.


Перенаправление


Дело осталось за малым:


  1. Получить параметр из URL
  2. Проверить его наличие в БД
  3. Получить из БД соответствующую ссылку
  4. Перенаправить пользователя
  5. Profit!

Продолжаем писать код после прошлого Роута.


$f3->route('GET /@code', //указываем параметр после "@", он попадет в PARAMS    function($f3) {        //снова определяем $db        $db = new DB\SQL(            'mysql:host=localhost;port=3306;dbname=linker',            'root',            ''        );        $code = $f3->get('PARAMS.code'); //получаем параметр        $link = new DB\SQL\Mapper($db,'links');         //если получается получить ссылку из БД - получаем, увеличиваем количество переходов и перенаправляем        if ($link->load(array('code="'.$code.'"', 'link=?'))) {            $link->hits++;            $link->save();            $f3->reroute($link->link);        } else {            $f3->reroute('/'); //а если такой ссылки нет - милости просим на главную        }    });

Вы могли заметить, что в и в роуте newLink, и в роуте выше определятся одно и тоже ведь code может совпасть с "newLink" (не может, в генераторе только буквы верхнего регистра), но так как сначала определен он, то он и выполнится первым.


$f3run()!


Спасибо за прочтение!
Я буду рад, если Вы напишете комментарий, и поправите если что не так.


А в качестве домашнего задания или доказательства лени автора (меня), оставляю списочек, что можно сделать. Ведь лучше учиться на практике!


  • Это конечно маловероятно, но при генерации $g_code может повториться, так что предлагаю вам написать функцию, которая будет это проверять.
  • Еще можно сделать нормальную статистику и выводить ее по переходу на /@code/stats
  • Настоятельно рекомендую даже в таком маленьком деле делать валидацию ввода и на стороне сервера, с выводом соответствующих ошибок, не стоит полагаться на добавление аттрибута required и типа type=url к полю ввода
    RedComrade

  • Предлагайте в комментарии...


    На связи)


Подробнее..

Делаем поиск в веб-приложении с нуля

05.11.2020 18:21:17 | Автор: admin
В статье Делаем современное веб-приложение с нуля я рассказал в общих чертах, как выглядит архитектура современных высоконагруженных веб-приложений, и собрал для демонстрации простейшую реализацию такой архитектуры на стеке из нескольких предельно популярных и простых технологий и фреймворков. Мы построили single page application с server side rendering, поддерживающее просмотр неких карточек, набранных в Markdown, и навигацию между ними.

В этой статье я затрону чуть более сложную и интересную (как минимум мне, разработчику команды поиска) тему: полнотекстовый поиск. Мы добавим в наш контейнерный рай ноду Elasticsearch, научимся строить индекс и делать поиск по контенту, взяв в качестве тестовых данных описания пяти тысяч фильмов из TMDB 5000 Movie Dataset. Также мы научимся делать поисковые фильтры и копнём совсем немножко в сторону ранжирования.




Инфраструктура: Elasticsearch


Elasticsearch популярное хранилище документов, умеющее строить полнотекстовые индексы и, как правило, используемое именно как поисковый движок. Elasticsearch добавляет к движку Apache Lucene, на котором он основан, шардирование, репликацию, удобный JSON API и ещё миллион мелочей, которые сделали его одним из самых популярных решений для полнотекстового поиска.

Давайте добавим одну ноду Elasticsearch в наш docker-compose.yml:

services:  ...  elasticsearch:    image: "elasticsearch:7.5.1"    environment:      - discovery.type=single-node    ports:      - "9200:9200"  ...


Переменная окружения discovery.type=single-node подсказывает Elasticsearch, что надо готовиться к работе в одиночку, а не искать другие ноды и объединяться с ними в кластер (таково поведение по умолчанию).

Обратите внимание, что мы публикуем 9200 порт наружу, хотя наше приложение ходит в него внутри сети, создаваемой docker-compose. Это исключительно для отладки: так мы сможем обращаться в Elasticsearch напрямую из терминала (до тех пор, пока не придумаем более умный способ об этом ниже).

Добавить клиент Elasticsearch в наш вайринг не составит труда благо, Elastic предоставляет минималистичный Python-клиент.

Индексация


В прошлой статье мы положили наши основные сущности карточки в коллекцию MongoDB. Из коллекции мы умеем быстро извлекать их содержимое по идентификатору, потому что MongoDB построила для нас прямой индекс в ней для этого используются B-деревья.

Теперь же перед нами стоит обратная задача по содержимому (или его фрагментам) получить идентификаторы карточек. Стало быть, нам нужен обратный индекс. Для него-то нам и пригодится Elasticsearch!

Общая схема построения индекса обычно выглядит как-то так.
  1. Создаём новый пустой индекс с уникальным именем, конфигурируем его как нам нужно.
  2. Обходим все наши сущности в базе и кладём их в новый индекс.
  3. Переключаем продакшн, чтобы все запросы начали ходить в новый индекс.
  4. Удаляем старый индекс. Тут по желанию вы вполне можете захотеть хранить несколько последних индексов, чтобы, например, удобнее было отлаживать какие-то проблемы.


Давайте создадим скелет индексатора и потом разберёмся подробнее с каждым шагом.

import datetimefrom elasticsearch import Elasticsearch, NotFoundErrorfrom backend.storage.card import Card, CardDAOclass Indexer(object):    def __init__(self, elasticsearch_client: Elasticsearch, card_dao: CardDAO, cards_index_alias: str):        self.elasticsearch_client = elasticsearch_client        self.card_dao = card_dao        self.cards_index_alias = cards_index_alias    def build_new_cards_index(self) -> str:        # Построение нового индекса.        # Сначала придумываем для индекса оригинальное название.        index_name = "cards-" + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")        # Создаём пустой индекс.         # Здесь мы укажем настройки и опишем схему данных.        self.create_empty_cards_index(index_name)        # Кладём в индекс все наши карточки одну за другой.        # В настоящем проекте вы очень скоро захотите         # переписать это на работу в пакетном режиме.        for card in self.card_dao.get_all():            self.put_card_into_index(card, index_name)        return index_name    def create_empty_cards_index(self, index_name):        ...     def put_card_into_index(self, card: Card, index_name: str):        ...    def switch_current_cards_index(self, new_index_name: str):        ... 


Индексация: создаём индекс


Индекс в Elasticsearch создаётся простым PUT-запросом в /имя-индекса или, в случае использования Python-клиента (нашем случае), вызовом

elasticsearch_client.indices.create(index_name, {    ...})


Тело запроса может содержать три поля.

  • Описание алиасов ("aliases": ...). Система алиасов позволяет держать знание о том, какой индекс сейчас актуальный, на стороне Elasticsearch; мы поговорим про неё ниже.
  • Настройки ("settings": ...). Когда мы будем большими дядями с настоящим продакшном, мы сможем сконфигурировать здесь репликацию, шардирование и другие радости SRE.
  • Схема данных ("mappings": ...). Здесь мы можем указать, какого типа какие поля в документах, которые мы будем индексировать, для каких из этих полей нужны обратные индексы, по каким должны быть поддержаны агрегации и так далее.


Сейчас нас интересует только схема, и у нас она очень простая:

{    "mappings": {        "properties": {            "name": {                "type": "text",                "analyzer": "english"            },            "text": {                "type": "text",                "analyzer": "english"            },            "tags": {                "type": "keyword",                "fields": {                    "text": {                        "type": "text",                        "analyzer": "english"                    }                }            }        }    }}


Мы пометили поля name и text как текстовые на английском языке. Анализатор это сущность в Elasticsearch, которая обрабатывает текст перед сохранением в индекс. В случае english анализатора текст будет разбит на токены по границам слов (подробности), после чего отдельные токены будут лемматизированы по правилам английского языка (например, слово trees упростится до tree), слишком общие леммы (вроде the) будут удалены и оставшиеся леммы будут положены в обратный индекс.

С полем tags чуть-чуть сложнее. Тип keyword предполагает, что значения этого поля некие строковые константы, которые не надо обрабатывать анализатором; обратный индекс будет построен по их сырым значениям без токенизации и лемматизации. Зато Elasticsearch создаст специальные структуры данных, чтобы по значениям этого поля можно было считать агрегации (например, чтобы одновременно с поиском можно было узнать, какие теги встречались в документах, удовлетворяющих поисковому запросу, и в каком количестве). Это очень удобно для полей, которые по сути enum; мы воспользуемся этой фичей, чтобы сделать клёвые поисковые фильтры.

Но чтобы по тексту тегов можно было искать и текстовым поиском тоже, мы добавляем к нему подполе "text", настроенное по аналогии с name и text выше по существу это означает, что Elasticsearch во всех приходящих ему документах будет создавать ещё одно виртуальное поле под названием tags.text, в которое будет копировать содержимое tags, но индексировать его по другим правилам.

Индексация: наполняем индекс


Для индексации документа достаточно сделать PUT-запрос в /имя-индекса/_create/id-документа или, при использовании Python-клиента, просто вызвать нужный метод. Наша реализация будет выглядеть так:

    def put_card_into_index(self, card: Card, index_name: str):        self.elasticsearch_client.create(index_name, card.id, {            "name": card.name,            "text": card.markdown,            "tags": card.tags,        })


Обратите внимание на поле tags. Хотя мы описали его как содержащее keyword, мы отправляем не одну строку, а список строк. Elasticsearch поддерживает такое; наш документ будет находиться по любому из значений.

Индексация: переключаем индекс


Чтобы реализовать поиск, нам надо знать имя самого свежего полностью достроенного индекса. Механизм алиасов позволяет нам держать эту информацию на стороне Elasticsearch.

Алиас это указатель на ноль или более индексов. API Elasticsearch позволяет использовать имя алиаса вместо имени индекса при поиске (POST /имя-алиаса/_search вместо POST /имя-индекса/_search); в таком случае Elasticsearch будет искать по всем индексам, на которые указывает алиас.

Мы заведём алиас под названием cards, который всегда будет указывать на актуальный индекс. Соответственно, переключение на актуальный индекс после завершения построения будет выглядеть так:

    def switch_current_cards_index(self, new_index_name: str):        try:            # Нужно удалить ссылку на старый индекс, если она есть.            remove_actions = [                {                    "remove": {                        "index": index_name,                         "alias": self.cards_index_alias,                    }                }                for index_name in self.elasticsearch_client.indices.get_alias(name=self.cards_index_alias)            ]        except NotFoundError:            # Ого, старого индекса-то и не существует вовсе.            # Наверное, мы впервые запустили индексацию.            remove_actions = []        # Одним махом удаляем ссылку на старый индекс         # и добавляем ссылку на новый.        self.elasticsearch_client.indices.update_aliases({            "actions": remove_actions + [{                "add": {                    "index": new_index_name,                     "alias": self.cards_index_alias,                }            }]        })


Я не стану подробнее останавливаться на alias API; все подробности можно посмотреть в документации.

Здесь надо сделать ремарку, что в реальном высоконагруженном сервисе такое переключение может быть довольно болезненным и может иметь смысл сделать предварительный прогрев нагрузить новый индекс каким-то пулом сохранённых пользовательских запросов.

Весь код, реализующий индексацию, можно посмотреть в этом коммите.

Индексация: добавляем контент


Для демонстрации в этой статье я использую данные из TMDB 5000 Movie Dataset. Чтобы избежать проблем с авторскими правами, я лишь привожу код утилиты, импортирующей их из CSV-файла, который предлагаю вам скачать самостоятельно с сайта Kaggle. После загрузки достаточно выполнить команду

docker-compose exec -T backend python -m tools.add_movies < ~/Downloads/tmdb-movie-metadata/tmdb_5000_movies.csv


, чтобы создать пять тысяч карточек, посвящённых кино, и команду

docker-compose exec backend python -m tools.build_index


, чтобы построить индекс. Обратите внимание, что последняя команда на самом деле не строит индекс, а только ставит задачу в очередь задач, после чего она выполнится на воркере подробнее об этом подходе я рассказывал в прошлой статье. docker-compose logs worker покажут вам, как воркер старался!

Прежде, чем мы приступим к, собственно, поиску, нам хочется своими глазами увидеть, записалось ли что-нибудь в Elasticsearch, и если да, то как оно выглядит!

Наиболее прямой и быстрый способ это сделать воспользоваться HTTP API Elasticsearch. Сперва проверим, куда указывает алиас:

$ curl -s localhost:9200/_cat/aliasescards                cards-2020-09-20-16-14-18 - - - -


Отлично, индекс существует! Посмотрим на него пристально:

$ curl -s localhost:9200/cards-2020-09-20-16-14-18 | jq{  "cards-2020-09-20-16-14-18": {    "aliases": {      "cards": {}    },    "mappings": {      ...    },    "settings": {      "index": {        "creation_date": "1600618458522",        "number_of_shards": "1",        "number_of_replicas": "1",        "uuid": "iLX7A8WZQuCkRSOd7mjgMg",        "version": {          "created": "7050199"        },        "provided_name": "cards-2020-09-20-16-14-18"      }    }  }}


Ну и, наконец, посмотрим на его содержимое:

$ curl -s localhost:9200/cards-2020-09-20-16-14-18/_search | jq{  "took": 2,  "timed_out": false,  "_shards": {    "total": 1,    "successful": 1,    "skipped": 0,    "failed": 0  },  "hits": {    "total": {      "value": 4704,      "relation": "eq"    },    "max_score": 1,    "hits": [      ...    ]  }}


Итого в нашем индексе 4704 документа, а в поле hits (которое я пропустил, потому что оно слишком большое) можно даже увидеть содержимое некоторых из них. Успех!

Более удобным способом просмотра содержимого индекса и вообще всевозможного баловства с Elasticsearch будет воспользоваться Kibana. Добавим контейнер в docker-compose.yml:

services:  ...  kibana:    image: "kibana:7.5.1"    ports:      - "5601:5601"    depends_on:      - elasticsearch  ...


После повторного docker-compose up мы сможем зайти в Kibana по адресу localhost:5601 (внимание, сервер может стартовать небыстро) и, после короткой настройки, просмотреть содержимое наших индексов в симпатичном веб-интерфейсе.



Очень советую вкладку Dev Tools при разработке вам часто нужно будет делать те или иные запросы в Elasticsearch, и в интерактивном режиме с автодополнением и автоформатированием это гораздо удобнее.

Поиск


После всех невероятно скучных приготовлений пора нам уже добавить функциональность поиска в наше веб-приложение!

Разделим эту нетривиальную задачу на три этапа и обсудим каждый в отдельности.

  1. Добавляем в бэкенд компонент Searcher, отвечающий за логику поиска. Он будет формировать запрос к Elasticsearch и конвертировать результаты в более удобоваримые для нашего бэкенда.
  2. Добавляем в API эндпоинт (ручку/роут/как у вас в компании это называют?) /cards/search, осуществляющий поиск. Он будет вызывать метод компонента Searcher, обрабатывать полученные результаты и возвращать клиенту.
  3. Реализуем интерфейс поиска на фронтенде. Он будет обращаться в /cards/search, когда пользователь определился, что он хочет искать, и отображать результаты (и, возможно, какие-то дополнительные контролы).


Поиск: реализуем


Не так сложно написать менеджер, осуществляющий поиск, как его задизайнить. Давайте опишем результат поиска и интерфейс менеджера и обсудим, почему он такой, а не иной.

# backend/backend/search/searcher.pyimport abcfrom dataclasses import dataclassfrom typing import Iterable, Optional@dataclassclass CardSearchResult:    total_count: int    card_ids: Iterable[str]    next_card_offset: Optional[int]class Searcher(metaclass=abc.ABCMeta):    @abc.abstractmethod    def search_cards(self, query: str = "",                      count: int = 20, offset: int = 0) -> CardSearchResult:        pass


Какие-то вещи очевидны. Например, пагинация. Мы амбициозный молодой убийца IMDB стартап, и результаты поиска никогда не будут вмещаться на одну страницу!

Какие-то менее очевидны. Например, список ID, а не карточек в качестве результата. Elasticsearch по умолчанию хранит наши документы целиком и возвращает их в результатах поиска. Это поведение можно отключить, чтобы сэкономить на размере поискового индекса, но для нас это явно преждевременная оптимизация. Так почему бы не возвращать сразу карточки? Ответ: это нарушит single-responsibility principle. Возможно, когда-нибудь мы накрутим в менеджере карточек сложную логику, переводящую карточки на другие языки в зависимости от настроек пользователя. Ровно в этот момент данные на странице карточки и данные в результатах поиска разъедутся, потому что добавить ту же самую логику в поисковый менеджер мы забудем. И так далее и тому подобное.

Реализация этого интерфейса настолько проста, что мне было лень писать этот раздел :-(

# backend/backend/search/searcher_impl.pyfrom typing import Anyfrom elasticsearch import Elasticsearchfrom backend.search.searcher import CardSearchResult, SearcherElasticsearchQuery = Any  # для аннотаций типовclass ElasticsearchSearcher(Searcher):    def __init__(self, elasticsearch_client: Elasticsearch, cards_index_name: str):        self.elasticsearch_client = elasticsearch_client        self.cards_index_name = cards_index_name    def search_cards(self, query: str = "", count: int = 20, offset: int = 0) -> CardSearchResult:        result = self.elasticsearch_client.search(index=self.cards_index_name, body={            "size": count,            "from": offset,            "query": self._make_text_query(query) if query else self._match_all_query        })        total_count = result["hits"]["total"]["value"]        return CardSearchResult(            total_count=total_count,            card_ids=[hit["_id"] for hit in result["hits"]["hits"]],            next_card_offset=offset + count if offset + count < total_count else None,        )    def _make_text_query(self, query: str) -> ElasticsearchQuery:        return {            # Multi-match query делает текстовый поиск по             # совокупности полей документов (в отличие от match            # query, которая ищет по одному полю).            "multi_match": {                "query": query,                # Число после ^  приоритет. Найти фрагмент текста                # в названии карточки лучше, чем в описании и тегах.                "fields": ["name^3", "tags.text", "text"],            }        }    _match_all_query: ElasticsearchQuery = {"match_all": {}}


По сути мы просто ходим в API Elasticsearch и аккуратно достаём ID найденных карточек из результата.

Реализация эндпоинта тоже довольно тривиальна:

# backend/backend/server.py...    def search_cards(self):        request = flask.request.json        search_result = self.wiring.searcher.search_cards(**request)        cards = self.wiring.card_dao.get_by_ids(search_result.card_ids)        return flask.jsonify({            "totalCount": search_result.total_count,            "cards": [                {                    "id": card.id,                    "slug": card.slug,                    "name": card.name,                    # Здесь не нужны все поля, иначе данных на одной                    # странице поиска будет слишком много, и она будет                    # долго грузиться.                } for card in cards            ],            "nextCardOffset": search_result.next_card_offset,        })...


Реализация фронтенда, пользующегося этим эндпоинтом, хоть и объёмна, но в целом довольно прямолинейна и в этой статье я не хочу заострять на ней внимание. На весь код можно посмотреть в этом коммите.



So far so good, идём дальше.

Поиск: добавляем фильтры


Поиск по тексту это клёво, но если вы когда-нибудь пользовались поиском на серьёзных ресурсах, вы наверняка видели всякие плюшки вроде фильтров.

У наших описаний фильмов из базы TMDB 5000 помимо названий и описаний есть теги, так что давайте для тренировки реализуем фильтры по тегам. Наша цель на скриншоте: при клике на тег в выдаче должны остаться только фильмы с этим тегом (их число указано в скобках рядом с ним).



Чтобы реализовать фильтры, нам нужно решить две проблемы.
  • Научиться по запросу понимать, какой набор фильтров доступен. Мы не хотим показывать все возможные значения фильтра на каждом экране, потому что их очень много и при этом большинство будет приводить к пустому результату; нужно понять, какие теги есть у документов, найденных по запросу, и в идеале оставить N самых популярных.
  • Научиться, собственно, применять фильтр оставить в выдаче только документы с тегами, фильтр по которым выбрал пользователь.


Второе в Elasticsearch элементарно реализуется через API запросов (см. terms query), первое через чуть менее тривиальный механизм агрегаций.

Итак, нам надо знать, какие теги встречаются у найденных карточек, и уметь отфильтровывать карточки с нужными тегами. Сперва обновим дизайн поискового менеджера:

# backend/backend/search/searcher.pyimport abcfrom dataclasses import dataclassfrom typing import Iterable, Optional@dataclassclass TagStats:    tag: str    cards_count: int@dataclassclass CardSearchResult:    total_count: int    card_ids: Iterable[str]    next_card_offset: Optional[int]    tag_stats: Iterable[TagStats]class Searcher(metaclass=abc.ABCMeta):    @abc.abstractmethod    def search_cards(self, query: str = "",                      count: int = 20, offset: int = 0,                     tags: Optional[Iterable[str]] = None) -> CardSearchResult:        pass


Теперь перейдём к реализации. Первое, что нам нужно сделать завести агрегацию по полю tags:

--- a/backend/backend/search/searcher_impl.py+++ b/backend/backend/search/searcher_impl.py@@ -10,6 +10,8 @@ ElasticsearchQuery = Any  class ElasticsearchSearcher(Searcher): +    TAGS_AGGREGATION_NAME = "tags_aggregation"+     def __init__(self, elasticsearch_client: Elasticsearch, cards_index_name: str):         self.elasticsearch_client = elasticsearch_client         self.cards_index_name = cards_index_name@@ -18,7 +20,12 @@ class ElasticsearchSearcher(Searcher):         result = self.elasticsearch_client.search(index=self.cards_index_name, body={             "size": count,             "from": offset,             "query": self._make_text_query(query) if query else self._match_all_query,+            "aggregations": {+                self.TAGS_AGGREGATION_NAME: {+                    "terms": {"field": "tags"}+                }+            }         })


Теперь в поисковом результате от Elasticsearch будет приходить поле aggregations, из которого по ключу TAGS_AGGREGATION_NAME мы сможем достать бакеты, содержащие информацию о том, какие значения лежат в поле tags у найденных документов и как часто они встречаются. Давайте извлечём эти данные и вернём в удобоваримом виде (as designed above):

--- a/backend/backend/search/searcher_impl.py+++ b/backend/backend/search/searcher_impl.py@@ -28,10 +28,15 @@ class ElasticsearchSearcher(Searcher):         total_count = result["hits"]["total"]["value"]+        tag_stats = [+            TagStats(tag=bucket["key"], cards_count=bucket["doc_count"])+            for bucket in result["aggregations"][self.TAGS_AGGREGATION_NAME]["buckets"]+        ]         return CardSearchResult(             total_count=total_count,             card_ids=[hit["_id"] for hit in result["hits"]["hits"]],             next_card_offset=offset + count if offset + count < total_count else None,+            tag_stats=tag_stats,         )


Добавить применение фильтра самая лёгкая часть:

--- a/backend/backend/search/searcher_impl.py+++ b/backend/backend/search/searcher_impl.py@@ -16,11 +16,17 @@ class ElasticsearchSearcher(Searcher):         self.elasticsearch_client = elasticsearch_client         self.cards_index_name = cards_index_name -    def search_cards(self, query: str = "", count: int = 20, offset: int = 0) -> CardSearchResult:+    def search_cards(self, query: str = "", count: int = 20, offset: int = 0,+                     tags: Optional[Iterable[str]] = None) -> CardSearchResult:         result = self.elasticsearch_client.search(index=self.cards_index_name, body={             "size": count,             "from": offset,-            "query": self._make_text_query(query) if query else self._match_all_query,+            "query": {+                "bool": {+                    "must": self._make_text_queries(query),+                    "filter": self._make_filter_queries(tags),+                }+            },             "aggregations": {


Подзапросы, включённые в must-клаузу, обязательны к выполнению, но также будут учитываться при расчёте скоров документов и, соответственно, ранжировании; если мы когда-нибудь будем добавлять ещё какие-то условия на тексты, их лучше добавить сюда. Подзапросы в filter-клаузе только фильтруют, не влияя на скоры и ранжирование.

Осталось реализовать _make_filter_queries():

    def _make_filter_queries(self, tags: Optional[Iterable[str]] = None) -> List[ElasticsearchQuery]:        return [] if tags is None else [{            "term": {                "tags": {                    "value": tag                }            }        } for tag in tags]


На фронтенд-части опять-таки не стану останавливаться; весь код в этом коммите.

Ранжирование


Итак, наш поиск ищет карточки, фильтрует их по заданному списку тегов и выводит в каком-то порядке. Но в каком? Порядок очень важен для практичного поиска, но всё, что мы сделали за время наших разбирательств в плане порядка это намекнули Elasticsearch, что находить слова в заголовке карточки выгоднее, чем в описании или тегах, указав приоритет ^3 в multi-match query.

Несмотря на то, что по умолчанию Elasticsearch ранжирует документы довольно хитрой формулой на основе TF-IDF, для нашего воображаемого амбициозного стартапа этого вряд ли хватит. Если наши документы это товары, нам надо уметь учитывать их продажи; если это user-generated контент уметь учитывать его свежесть, и так далее. Но и просто отсортировать по числу продаж/дате добавления мы не можем, потому что тогда мы никак не учтём релевантность поисковому запросу.

Ранжирование это большое и запутанное царство технологий, которое никак не покрыть за один раздел в конце статьи. Поэтому здесь я перехожу в режим крупных мазков; я попробую рассказать в самых общих словах, как может быть устроено industrial grade ранжирование в поиске, и раскрою немного технических деталей того, как его можно реализовать с Elasticsearch.

Задача ранжирования очень сложна, так что неудивительно, что один из основных современных методов её решения машинное обучение. Приложение технологий машинного обучения к ранжированию собирательно называется learning to rank.

Типичный процесс выглядит так.

Определяемся, что мы хотим ранжировать. Мы кладём интересующие нас сущности в индекс, научаемся для заданного поискового запроса получать какой-то разумный топ (например, какой-нибудь простой сортировкой и отсечением) этих сущностей и теперь хотим научиться его ранжировать более умным способом.

Определяемся, как мы хотим ранжировать. Мы решаем, по какой характеристике надо отранжировать нашу выдачу, в соответствии с бизнес-целями нашего сервиса. Например, если наши сущности это товары, которые мы продаём, мы можем хотеть отсортировать их по убыванию вероятности покупки; если мемы по вероятности лайка или шера и так далее. Эти вероятности мы, конечно, не умеем считать в лучшем случае прикидывать, да и то только для старых сущностей, для которых у нас набрано достаточно статистики, но мы попытаемся научить модель предсказывать их, исходя из косвенных признаков.

Извлекаем признаки. Мы придумываем для наших сущностей какое-то множество признаков, которые могли бы помочь нам оценить релевантность сущностей поисковым запросам. Помимо того же TF-IDF, который уже умеет для нас вычислять Elasticsearch, типичный пример CTR (click-through rate): мы берём логи нашего сервиса за всё время, для каждой пары сущность+поисковый запрос считаем, сколько раз сущность появлялась в выдаче по этому запросу и сколько раз её кликали, делим одно на другое, et voil простейшая оценка условной вероятности клика готова. Мы также можем придумать признаки для пользователя и парные признаки пользователь-сущность, чтобы сделать ранжирование персонализированным. Придумав признаки, мы пишем код, который их вычисляет, кладёт в какое-то хранилище и умеет отдавать в real time для заданного поискового запроса, пользователя и набора сущностей.

Собираем обучающий датасет. Тут много вариантов, но все они, как правило, формируются из логов хороших (например, клик и потом покупка) и плохих (например, клик и возврат на выдачу) событий в нашем сервисе. Когда мы собрали датасет, будь то список утверждений оценка релевантности товара X запросу Q примерно равна P, список пар товар X релевантнее товара Y запросу Q или набор списков для запроса Q товары P1, P2, правильно отранжировать так-то, мы ко всем фигурирующим в нём строкам подтягиваем соответствующие признаки.

Обучаем модель. Тут вся классика ML: train/test, гиперпараметры, переобучение, перфовидеокарты и так далее. Моделей, подходящих (и повсеместно использующихся) для ранжирования, много; упомяну как минимум XGBoost и CatBoost.

Встраиваем модель. Нам остаётся так или иначе прикрутить вычисление модели на лету для всего топа, чтобы до пользователя долетали уже отранжированные результаты. Тут много вариантов; в иллюстративных целях я (опять-таки) остановлюсь на простом Elasticsearch-плагине Learning to Rank.

Ранжирование: плагин Elasticsearch Learning to Rank


Elasticsearch Learning to Rank это плагин, добавляющий в Elasticsearch возможность вычислить ML-модель на выдаче и тут же отранжировать результаты согласно посчитанным ею скорам. Он также поможет нам получить признаки, идентичные используемым в real time, переиспользовав при этом способности Elasticsearch (TF-IDF и тому подобное).

Для начала нам нужно подключить плагин в нашем контейнере с Elasticsearch. Нам потребуется простенький Dockerfile

# elasticsearch/DockerfileFROM elasticsearch:7.5.1RUN ./bin/elasticsearch-plugin install --batch http://es-learn-to-rank.labs.o19s.com/ltr-1.1.2-es7.5.1.zip


и сопутствующие изменения в docker-compose.yml:

--- a/docker-compose.yml+++ b/docker-compose.yml@@ -5,7 +5,8 @@ services:   elasticsearch:-    image: "elasticsearch:7.5.1"+    build:+      context: elasticsearch     environment:       - discovery.type=single-node


Также нам потребуется поддержка плагина в Python-клиенте. С изумлением я обнаружил, что поддержка для Python не идёт в комплекте с плагином, так что специально для этой статьи я её запилил. Добавим elasticsearch_ltr в requirements.txt и проапгрейдим клиент в вайринге:

--- a/backend/backend/wiring.py+++ b/backend/backend/wiring.py@@ -1,5 +1,6 @@ import os +from elasticsearch_ltr import LTRClient from celery import Celery from elasticsearch import Elasticsearch from pymongo import MongoClient@@ -39,5 +40,6 @@ class Wiring(object):         self.task_manager = TaskManager(self.celery_app)          self.elasticsearch_client = Elasticsearch(hosts=self.settings.ELASTICSEARCH_HOSTS)+        LTRClient.infect_client(self.elasticsearch_client)         self.indexer = Indexer(self.elasticsearch_client, self.card_dao, self.settings.CARDS_INDEX_ALIAS)         self.searcher: Searcher = ElasticsearchSearcher(self.elasticsearch_client, self.settings.CARDS_INDEX_ALIAS)


Ранжирование: пилим признаки


Каждый запрос в Elasticsearch возвращает не только список ID документов, которые нашлись, но и некоторые их скоры (как вы бы перевели на русский язык слово score?). Так, если это match или multi-match query, которую мы используем, то скор это результат вычисления той самой хитрой формулы с участием TF-IDF; если bool query комбинация скоров вложенных запросов; если function score query результат вычисления заданной функции (например, значение какого-то числового поля в документе) и так далее. Плагин ELTR предоставляет нам возможность использовать скор любого запроса как признак, позволяя легко скомбинировать данные о том, насколько хорошо документ соответствует запросу (через multi-match query) и какие-то предрассчитанные статистики, которые мы заранее кладём в документ (через function score query).

Поскольку на руках у нас база TMDB 5000, в которой лежат описания фильмов и, помимо прочего, их рейтинги, давайте возьмём рейтинг в качестве образцово-показательного предрассчитанного признака.

В этом коммите я добавил в бэкенд нашего веб-приложения некую базовую инфраструктуру для хранения признаков и поддержал подгрузку рейтинга из файла с фильмами. Дабы не заставлять вас читать очередную кучу кода, опишу самое основное.

  • Признаки мы будем хранить в отдельной коллекции и доставать отдельным менеджером. Сваливать все данные в одну сущность порочная практика.
  • В этот менеджер мы будем обращаться на этапе индексации и класть все имеющиеся признаки в индексируемые документы.
  • Чтобы знать схему индекса, нам надо перед началом построения индекса знать список всех существующих признаков. Этот список мы пока что захардкодим.
  • Поскольку мы не собираемся фильтровать документы по значениям признаков, а собираемся только извлекать их из уже найденных документов для обсчёта модели, мы выключим построение по новым полям обратных индексов опцией index: false в схеме и сэкономим за счёт этого немного места.


Ранжирование: собираем датасет


Поскольку, во-первых, у нас нет продакшна, а во-вторых, поля этой статьи слишком малы для рассказа про телеметрию, Kafka, NiFi, Hadoop, Spark и построение ETL-процессов, я просто сгенерирую случайные просмотры и клики для наших карточек и каких-то поисковых запросов. После этого нужно будет рассчитать признаки для получившихся пар карточка-запрос.

Пришла пора закопаться поглубже в API плагина ELTR. Чтобы рассчитать признаки, нам нужно будет создать сущность feature store (насколько я понимаю, фактически это просто индекс в Elasticsearch, в котором плагин хранит все свои данные), потом создать feature set список признаков с описанием, как вычислять каждый из них. После этого нам достаточно будет сходить в Elasticsearch с запросом специального вида, чтобы получить вектор значений признаков для каждой найденной сущности в результате.

Начнём с создания feature set:

# backend/backend/search/ranking.pyfrom typing import Iterable, List, Mappingfrom elasticsearch import Elasticsearchfrom elasticsearch_ltr import LTRClientfrom backend.search.features import CardFeaturesManagerclass SearchRankingManager:    DEFAULT_FEATURE_SET_NAME = "card_features"    def __init__(self, elasticsearch_client: Elasticsearch,                  card_features_manager: CardFeaturesManager,                 cards_index_name: str):        self.elasticsearch_client = elasticsearch_client        self.card_features_manager = card_features_manager        self.cards_index_name = cards_index_name    def initialize_ranking(self, feature_set_name=DEFAULT_FEATURE_SET_NAME):        ltr: LTRClient = self.elasticsearch_client.ltr        try:            # Создать feature store обязательно для работы,            # но при этом его нельзя создавать дважды \_()_/            ltr.create_feature_store()        except Exception as exc:            if "resource_already_exists_exception" not in str(exc):                raise        # Создаём feature set с невероятными ТРЕМЯ признаками!        ltr.create_feature_set(feature_set_name, {            "featureset": {                "features": [                    # Совпадение поискового запроса с названием                    # карточки может быть более сильным признаком,                     # чем совпадение со всем содержимым, поэтому                     # сделаем отдельный признак про это.                    self._make_feature("name_tf_idf", ["query"], {                        "match": {                            # ELTR позволяет параметризовать                            # запросы, вычисляющие признаки. В данном                            # случае нам, очевидно, нужен текст                             # запроса, чтобы правильно посчитать                             # скор match query.                            "name": "{{query}}"                        }                    }),                    # Скор запроса, которым мы ищем сейчас.                    self._make_feature("combined_tf_idf", ["query"], {                        "multi_match": {                            "query": "{{query}}",                            "fields": ["name^3", "tags.text", "text"]                        }                    }),                    *(                        # Добавляем все имеющиеся предрассчитанные                        # признаки через механизм function score.                        # Если по какой-то причине в документе                         # отсутствует искомое поле, берём 0.                        # (В настоящем проекте вам стоит                        # предусмотреть умолчания получше!)                        self._make_feature(feature_name, [], {                            "function_score": {                                "field_value_factor": {                                    "field": feature_name,                                    "missing": 0                                }                            }                        })                        for feature_name in sorted(self.card_features_manager.get_all_feature_names_set())                    )                ]            }        })    @staticmethod    def _make_feature(name, params, query):        return {            "name": name,            "params": params,            "template_language": "mustache",            "template": query,        }


Теперь функция, вычисляющая признаки для заданного запроса и карточек:

    def compute_cards_features(self, query: str, card_ids: Iterable[str],                                feature_set_name=DEFAULT_FEATURE_SET_NAME) -> Mapping[str, List[float]]:        card_ids = list(card_ids)        result = self.elasticsearch_client.search({            "query": {                "bool": {                    # Нам не нужно проверять, находятся ли карточки                    # на самом деле по такому запросу  если нет,                     # соответствующие признаки просто будут нулевыми.                    # Поэтому оставляем только фильтр по ID.                    "filter": [                        {                            "terms": {                                "_id": card_ids                            }                        },                        # Это  специальный новый тип запроса,                        # вводимый плагином SLTR. Он заставит                        # плагин посчитать все факторы из указанного                        # feature set.                        # (Несмотря на то, что мы всё ещё в разделе                        # filter, этот запрос ничего не фильтрует.)                        {                            "sltr": {                                "_name": "logged_featureset",                                "featureset": feature_set_name,                                "params": {                                    # Та самая параметризация.                                     # Строка, переданная сюда,                                    # подставится в запросах                                    # вместо {{query}}.                                    "query": query                                }                            }                        }                    ]                }            },            # Следующая конструкция заставит плагин запомнить все            # рассчитанные признаки и добавить их в результат поиска.            "ext": {                "ltr_log": {                    "log_specs": {                        "name": "log_entry1",                        "named_query": "logged_featureset"                    }                }            },            "size": len(card_ids),        })        # Осталось достать значения признаков из (несколько        # замысловатого) результата поиска.        # (Чтобы понять, где в недрах результатов нужные мне         # значения, я просто делаю пробные запросы в Kibana.)        return {            hit["_id"]: [feature.get("value", float("nan")) for feature in hit["fields"]["_ltrlog"][0]["log_entry1"]]            for hit in result["hits"]["hits"]        }


Простенький скрипт, принимающий на вход CSV с запросами и ID карточек и выдающий CSV с признаками:

# backend/tools/compute_movie_features.pyimport csvimport itertoolsimport sysimport tqdmfrom backend.wiring import Wiringif __name__ == "__main__":    wiring = Wiring()    reader = iter(csv.reader(sys.stdin))    header = next(reader)    feature_names = wiring.search_ranking_manager.get_feature_names()    writer = csv.writer(sys.stdout)    writer.writerow(["query", "card_id"] + feature_names)    query_index = header.index("query")    card_id_index = header.index("card_id")    chunks = itertools.groupby(reader, lambda row: row[query_index])    for query, rows in tqdm.tqdm(chunks):        card_ids = [row[card_id_index] for row in rows]        features = wiring.search_ranking_manager.compute_cards_features(query, card_ids)        for card_id in card_ids:            writer.writerow((query, card_id, *features[card_id]))


Наконец можно это всё запустить!

# Создаём feature setdocker-compose exec backend python -m tools.initialize_search_ranking# Генерируем событияdocker-compose exec -T backend \    python -m tools.generate_movie_events \    < ~/Downloads/tmdb-movie-metadata/tmdb_5000_movies.csv \    > ~/Downloads/habr-app-demo-dataset-events.csv# Считаем признакиdocker-compose exec -T backend \    python -m tools.compute_features \    < ~/Downloads/habr-app-demo-dataset-events.csv \    > ~/Downloads/habr-app-demo-dataset-features.csv


Теперь у нас есть два файла с событиями и признаками и мы можем приступить к обучению.

Ранжирование: обучаем и внедряем модель


Опустим подробности загрузки датасетов (скрипт полностью можно посмотреть в этом коммите) и перейдём сразу к делу.

# backend/tools/train_model.py... if __name__ == "__main__":    args = parser.parse_args()    feature_names, features = read_features(args.features)    events = read_events(args.events)    # Разделим запросы на train и test в соотношении 4 к 1.    all_queries = set(events.keys())    train_queries = random.sample(all_queries, int(0.8 * len(all_queries)))    test_queries = all_queries - set(train_queries)    # DMatrix  это тип данных, используемый xgboost.    # Фактически это массив значений признаков с названиями     # и лейблами. В качестве лейбла мы берём 1, если был клик,     # и 0, если не было (детали см. в коммите).    train_dmatrix = make_dmatrix(train_queries, events, feature_names, features)    test_dmatrix = make_dmatrix(test_queries, events, feature_names, features)    # Учим модель!    # Поля этой статьи всё ещё крайне малы для долгого разговора     # про ML, так что я возьму минимально модифицированный пример     # из официального туториала к XGBoost.    param = {        "max_depth": 2,        "eta": 0.3,        "objective": "binary:logistic",        "eval_metric": "auc",    }    num_round = 10    booster = xgboost.train(param, train_dmatrix, num_round, evals=((train_dmatrix, "train"), (test_dmatrix, "test")))    # Сохраняем обученную модель в файл.     booster.dump_model(args.output, dump_format="json")     # Санитарный минимум проверки того, как прошло обучение: давайте    # посмотрим на топ признаков по значимости и на ROC-кривую.    xgboost.plot_importance(booster)    plt.figure()    build_roc(test_dmatrix.get_label(), booster.predict(test_dmatrix))    plt.show()


Запускаем

python backend/tools/train_search_ranking_model.py \    --events ~/Downloads/habr-app-demo-dataset-events.csv \    --features ~/Downloads/habr-app-demo-dataset-features.csv \     -o ~/Downloads/habr-app-demo-model.xgb


Обратите внимание, что поскольку мы экспортировали все нужные данные предыдущими скриптами, этот скрипт уже не надо запускать внутри докера его нужно запускать на вашей машине, предварительно установив xgboost и sklearn. Аналогично в настоящем продакшне предыдущие скрипты нужно было бы запускать где-то, где есть доступ в продакшн-окружение, а этот нет.

Если всё сделано правильно, модель успешно обучится, и мы увидим две красивых картинки. Первая график значимости признаков:



Хотя события генерировались случайно, combined_tf_idf оказался сильно значимее других потому что я сделал фокус и искусственно понизил вероятность клика для карточек, которые ниже в выдаче, отранжированной нашим старым способом. То, что модель это заметила добрый знак и признак того, что мы не совершили в процессе обучения каких-то совсем глупых ошибок.

Второй график ROC-кривая:



Синяя линия выше красной, а значит, наша модель предсказывает лейблы чуть-чуть лучше, чем бросок монетки. (Кривая ML-инженера маминой подруги должна почти касаться верхнего левого угла).

Дело совсем за малым добавляем скрипт для заливки модели, заливаем и добавляем маленький новый пункт в поисковый запрос рескоринг:

--- a/backend/backend/search/searcher_impl.py+++ b/backend/backend/search/searcher_impl.py@@ -27,6 +30,19 @@ class ElasticsearchSearcher(Searcher):                     "filter": list(self._make_filter_queries(tags, ids)),                 }             },+            "rescore": {+                "window_size": 1000,+                "query": {+                    "rescore_query": {+                        "sltr": {+                            "params": {+                                "query": query+                            },+                            "model": self.ranking_manager.get_current_model_name()+                        }+                    }+                }+            },             "aggregations": {                 self.TAGS_AGGREGATION_NAME: {                     "terms": {"field": "tags"}


Теперь после того, как Elasticsearch произведёт нужный нам поиск и отранжирует результаты своим (довольно быстрым) алгоритмом, мы возьмём топ-1000 результатов и переранжируем, применив нашу (относительно медленную) машинно-обученную формулу. Успех!

Заключение


Мы взяли наше минималистичное веб-приложение и прошли путь от отсутствия фичи поиска как таковой до масштабируемого решения со множеством продвинутых возможностей. Сделать это было не так уж просто. Но и не так уж сложно! Итоговое приложение лежит в репозитории на Github в ветке со скромным названием feature/search и требует для запуска Docker и Python 3 с библиотеками для машинного обучения.

Чтобы показать, как это в целом работает, какие проблемы встречаются и как их можно решить, я использовал Elasticsearch, но это, конечно, не единственный инструмент, который можно выбрать. Solr, полнотекстовые индексы PostgreSQL и другие движки точно так же заслуживают вашего внимания при выборе, на чём построить свою многомиллиардную корпорацию поисковую систему.

И, конечно, это решение не претендует на законченность и готовность к продакшну, а является исключительно иллюстрацией того, как всё может быть сделано. Улучшать его можно практически бесконечно!
  • Инкрементальная индексация. При модификации наших карточек через CardManager хорошо бы сразу обновлять их в индексе. Чтобы CardManager не знал, что у нас в сервисе есть ещё и поиск, и обошлось без циклических зависимостей, придётся прикрутить dependency inversion в том или ином виде.
  • Для индексации в конкретно нашем случае связки MongoDB с Elasticsearch можно использовать готовые решения вроде mongo-connector.
  • Пока пользователь вводит запрос, мы можем предлагать ему подсказки для этого в Elasticsearch есть специальная функциональность.
  • Когда запрос введён, стоит попытаться исправить в нём опечатки, и это тоже целое дело.
  • Для улучшения ранжирования нужно организовать логирование всех пользовательских событий, связанных с поиском, их агрегацию и расчёт признаков на основе счётчиков. Признаки сущность-запрос, сущность-пользователь, сущность-положение Меркурия тысячи их!
  • Особенно весело пилить агрегации событий не офлайновые (раз в день, раз в неделю), а реалтаймовые (задержка от события до учёта в признаках в пределах пяти минут). Вдвойне весело, когда событий сотни миллионов.
  • Предстоит разобраться с прогревом, нагрузочным тестированием, мониторингами.
  • Оркестрировать кластер нод с шардированием и репликацией это целое отдельное наслаждение.

Но чтобы статья осталась читабельного размера, я остановлюсь на этом и оставлю вас наедине с этими челленджами. Спасибо за внимание!
Подробнее..

Чтобы первый блин не вышел комом. Советы начинающему разработчику сервиса

26.05.2021 10:15:20 | Автор: admin

Добрый день, уважаемые читатели! Материал адресован всем специалистам, работающим с данными, которые решили написать первое веб-приложение. В данной публикации я не буду выкладывать листинги кода. На просторах Интернета есть масса практических примеров сборки сервисов, написанных на разных фреймворках. Но вот теоретических статей о логике процесса, архитектуре решения, а, главное, трудностях, с которыми впервые столкнется специалист, крайне мало. Я решил заполнить эту нишу и описать свой личный опыт, который кому-то может быть полезен.

Специально для статьи я подготовил два идентичных примера на Flask и Dash и выложил их на GitHub. В них иллюстрируется расчет и вывод показателей юнит-экономики абстрактного IT-маркета, который называется Хабр (а почему бы и нет, ведь сейчас все компании начали заниматься электронной коммерцией:).

Разговор предлагаю построить в форме поэтапного тезисного разбора приложенного материла, в процессе я буду акцентировать ваше внимание на тех моментах, которые лично мне показались наиболее сложными или интересными. И, конечно, мы обязательно остановимся на тех ошибках, которые я допустил как бэкенд-разработчик.

ОПП: не умеешь не берись! Когда речь заходит об ОПП, мне почему-то автоматически вспоминается Django с его классами. Но если посмотреть работы начинающих data scientist-ов или аналитиков данных, то мы увидим совсем другую картину. Классы применяются ради самих классов. В данную структуру языка просто сливается весь код. За что отвечает этот монстр? За все! Как искать ошибки или переписывать код, не понятно. Лично у меня такое мнение на этот счет. Если не знаешь когда, как и почему следует применять ОПП, то лучше для небольших разработок использовать процедурно-функциональный стиль.

Пусть безобразие, но единообразие. Даже если вы работаете один, разработайте единую систему формирования пространства имен. Это, во-первых, улучшит читаемость кода, а, во-вторых, ускорит его написание, так как не нужно вспоминать как именно названа функция на другой странице.

Коммитим, даже если не пушим. Даже если у вас нет GitHub аккаунта, заведите себе практику использовать систему контроля версий. Это реально удобно, так как позволяет производить эксперименты во вспомогательных ветках без создания дополнительных листов для тестирования гипотез. Я часто пренебрегаю данной рекомендацией, а зря.

Муки выбора или о разных фреймворках замолвим слово. Сочетание каких технологий можно использовать для создания собственного сервиса? Приведу несколько вариантов, которые сразу приходят на ум. Заранее прошу прощения, что обойду вниманием PHP, Ruby, C#:

  • Flask статичные страницы с шаблонами HTML+CSS

  • Django статичные страницы с шаблонами HTML+CSS

  • Flask Rest API/FastAPI/Django Rest Framework динамические страницы HTML+CSS+фреймворк Javascript (Vue, React, Angular)

  • Dash (по сути работает Flask) Dask (по сути работает React)

Как бы рассуждал я, если передо мной стоял выбор.

  • Нужно выводить таблицы, графики, интерактивные элементы здесь и сейчас Dash

  • Нужно рендерить отдельные показатели на статичной странице. Есть время на эксперименты с дизайном, но нет помощи фронтенд-разработчика Flask

  • Нужно выводить разноплановую информацию, нужна интерактивность. Есть много времени, есть ресурсы, плюс поддержка верстальщика и фронтенд-программиста FastAPI Vue.js

Теперь приведу скриншоты работ на Flask и Dash и сделаю несколько замечаний касательно данных платформ.

Задача состояла в том, что нужно было рассчитать, а потом отобразить 6 таблиц с показателями юнит-экономики, то есть сформировать веб-дашборд. Сразу скажу, что на разработку примеров я потратил примерно одинаковое время. Кардинального различия в результатах я не увидел, но есть нюансы.

В проекте Flask файл, который отвечает за вывод результатов, страницы html и фреймворк css это разные сущности. Документация по Bootstrap4 довольно качественная, но так как у меня нет навыков верстки, мне не удалось добиться корректного вывода всех сводных таблиц.

В проекте Dash за все операции отвечает единый файл, так как я выбрал вариант с хранением таблицы стилей в app.py. Если дашборд простой, то читаемость кода будет приемлемой. Но с ростом проекта с этим могут возникнуть трудности. Стили можно переместить в папку asset. Можно ли как-то еще раздробить основной файл я не знаю. Сразу из коробки имеется хорошая поддержка всех аналитических компонентов, включая таблицы, но нужно время для ознакомления со спецификой разработки.

Архитектура всему голова. Заранее продумывайте архитектуру своего приложения. Все файлы должны быть разнесены по модулям согласно их функционалу. При этом нужно стремиться к тому, чтобы, если изъять из сервиса часть модулей, остальная часть программы сохранила работоспособность. Компоненты должны спокойно интегрироваться в другой сервис с минимальными доработками. Переходим к моим ошибкам. Скрипты для запуска etl-процессов и расчета показателей лежат рядом с главным файлом проекта.

Многофункциональности здесь не место. В продолжение предыдущего пункта. Ваше приложение должно делать хорошо что-то одно. Мой сервис выполняет etl-команды, формирует БД, а затем наполняет ее записями и отвечает за вывод дашборда. Это три разных процесса, которые с большой долей вероятности в реальности будут разнесены во времени. И конечно, нужно убирать файл с данными из приложения, так как он только занимает место.

Что SQL-запросом вытянешь, то и считать будешь. Максимально перенесите расчетную нагрузку на сторону БД. При этом следует учитывать разности в диалектах sql. Старайтесь писать запросы максимально универсальными. Мои ошибки. База данных в качестве физического файла присутствует в проекте. В запросах имеются уникальные конструкции диалекта SQLite.

Pandas мне друг, но производительность дороже. Мне пришлось применить данную библиотеку, так требовалась именно сводная таблица, а получить ее на стороне БД проблематично. В большинстве случаев лучше обойтись только нативным Python.

Не все то золото, что YAML-файл. Идею применения yaml файла для хранения констант проекта я почерпнул из одного видео-ролика практикующего data scientist-а на Youtube. Что в этом плохого или хорошего я не знаю. Решать только вам.

А не замахнуться ли нам на Docker. Небольшое лирическое отступление. Чего мне реально не хватает в Windows, так это Docker. В Windows 10 эту проблему решили, а вот в предыдущих версиях пользователям остается лишь устанавливать Docker Toolbox. Но в настоящее время разработка и поддержка данного продукта завершена, хотя архивный файл можно по-прежнему скачать на официальном аккаунте Docker на GitHub. Лично у меня по некоторым причинам установлен Windows 8.1, поэтому я задался вопросом, как еще можно заполучить в распоряжение эту программу. Установку второй операционной системы я отмел сразу, а вот вариант с виртуальной машиной меня заинтересовал. Для экономии ресурсов я выбрал Debian 10. Если выделить под нужды ВМ один процессор и три гигабайта оперативной памяти, то вполне можно тестировать свои идеи. Но стоит оговориться, что если захочется собрать и запустить контейнер с Apache Airflow, то указанных вычислительных мощностей будет недостаточно.

Теперь можно возвращаться к нашим приложениям. Как сбилдить и запустить контейнер я рассказывать не буду, так как данную информацию легко можно нагуглить в Интернете. Есть лишь пара моментов, на которых я заострю внимание. В процессе сборки будет выдаваться предупреждение о необходимости создания виртуального окружения внутри контейнера. Я решил пренебречь им, так как контейнер и так изолирован от рабочей среды Linux. И еще момент. После того, как приложение на Dash было упаковано в docker-контейнер, перестал отображаться логотип Хабра. Явной причины этого я быстро не нашел, а время, отведенное на эксперимент, было исчерпано.

Семь раз проверь, один раз задеплой. Завершить публикацию я решил на банальной ноте. А именно напомнить вам, о том, как важно проверять результаты перед сдачей. Пара досадных опечаток в комментариях, конечно, не поставят крест на всем проекте, но ведь может сложиться ситуация, что приложение просто не запуститься на демонстрации.

И вот вам конкретный пример. Я построил контейнер на Dash, а дашборд в браузере не отображается. В локальном варианте все было нормально. Оказалось, я просто забыл поменять в файле app.py хост с 127.0.0.1, на 0.0.0.0.

Вместо заключения. За скобками разговора остались моменты связанные с подготовкой проекта к развертыванию на сервере и непосредственно деплой. Пусть это будет вопросами для самостоятельного изучения или темой одной из будущих публикаций.

На этом все. Всем здоровья, удачи и профессиональных успехов!

Подробнее..

Перевод Развертывание интерактивных визуализаций данных в реальном времени на Flask и Bokeh

03.11.2020 20:15:07 | Автор: admin
image

Сегодня, в преддверии старта нового потока курса Python для веб-разработки, делимся с вами полезным переводом статьи о небольшой интерактивной визуализации, для исследований данных о фильмах. Автор использует не только Flask и Bokeh, но и задействуя бесплатную облачную платформу баз данных easybase.io. Все подробности и демонстрации вы найдёте под катом.



Python имеет фантастическую поддержку полезных инструментов анализа: NumPy, SciPy, pandas, Dask, Scikit-Learn, OpenCV и многих других. Из библиотек визуализации данных для Python Bokeh преобладает как самая функциональная и мощная. Эта библиотека поддерживает несколько интерфейсов, охватывающих многие распространенные варианты применения.

Одна из замечательных особенностей Bokeh возможность экспортировать рисунок в виде сырых HTML и JavaScript. Она позволяет внедрять нарисованные программно рисунки в шаблоны приложения Flask. Когда пользователь открывает веб-приложение Flask, рисунки Bokeh создаются и встраиваются в HTML-код в реальном времени.

Для примера создадим программу интерактивного исследования данных о фильмах. В проекте будут виджеты пользовательского интерфейса (ползунки, меню), которые обновляют данные в ответ на действия пользователя. Вот, чему научит эта статья:

  1. Как в Bokeh создать интерактивную визуализацию с пятью точками данных.
  2. Как интегрировать в проект облачную базу данных с тремя тысячами точками данных (Easybase.io).
  3. Как вставить рисунок Bokeh в шаблон Flask.
  4. Как с помощью обратных вызовов JavaScript (CustomJS) добавить виджеты Bokeh, чтобы запрашивать данные.

Часть первая


Первые шаги установка:

pip install bokeh pip install Flask


Создайте файл с именем app.py и начните с такого кода:

from bokeh.models import ColumnDataSourcefrom bokeh.plotting import figure, output_file, showsource = ColumnDataSource()fig = figure(plot_height=600, plot_width=720, tooltips=[("Title", "@title"), ("Released", "@released")])fig.circle(x="x", y="y", source=source, size=8, color="color", line_color=None)fig.xaxis.axis_label = "IMDB Rating"fig.yaxis.axis_label = "Rotten Tomatoes Rating"


Переменная source используется для представления данных стандартным для элементов Bokeh способом. Данные передаются в объект, скармливаемые рисунку Bokeh. Этот объект сопоставление ключей с массивом значений. Позже мы увидим, как получить доступ и манипулировать этим объектом напрямую с помощью CustomJS.

Заметим, что fig представляет визуальный компонент Bokeh. Параметр tooltips задает надпись, отображаемую при наведении курсора мыши на точку в визуализации. Кортежи этого массива структурированы так: ("NAME TO DISPLAY", "@COLUMN_NAME_IN_SOURCE"). Чтобы изменить размер отдельных точек на графике, можно изменить параметр size в fig.circle(). Оставьте параметры x, y и color одинаковыми: позже будет показано, как изменить их согласно какому-то условию.

Переменная axis_label контролирует метки осей X и Y. В нашем случае ось X измеряет рейтинг фильма в IMDB, ось Y рейтинг Rotten Tomatoes. Позже мы увидим, что (очевидно) между ними есть положительная корреляция. Теперь можно написать такой код для передачи каких-то данных в рисунок и его отображения в нашем браузере:

currMovies = [    {'imdbid': 'tt0099878', 'title': 'Jetsons: The Movie', 'genre': 'Animation, Comedy, Family', 'released': '07/06/1990', 'imdbrating': 5.4, 'imdbvotes': 2731, 'country': 'USA', 'numericrating': 4.3, 'usermeter': 46},    {'imdbid': 'tt0099892', 'title': 'Joe Versus the Volcano', 'genre': 'Comedy, Romance', 'released': '03/09/1990', 'imdbrating': 5.6, 'imdbvotes': 23680, 'country': 'USA', 'numericrating': 5.2, 'usermeter': 54},    {'imdbid': 'tt0099938', 'title': 'Kindergarten Cop', 'genre': 'Action, Comedy, Crime', 'released': '12/21/1990', 'imdbrating': 5.9, 'imdbvotes': 83461, 'country': 'USA', 'numericrating': 5.1, 'usermeter': 51},    {'imdbid': 'tt0099939', 'title': 'King of New York', 'genre': 'Crime, Thriller', 'released': '09/28/1990', 'imdbrating': 7, 'imdbvotes': 19031, 'country': 'Italy, USA, UK', 'numericrating': 6.1, 'usermeter': 79},    {'imdbid': 'tt0099951', 'title': 'The Krays', 'genre': 'Biography, Crime, Drama', 'released': '11/09/1990', 'imdbrating': 6.7, 'imdbvotes': 4247, 'country': 'UK', 'numericrating': 6.4, 'usermeter': 82}]source.data = dict(    x = [d['imdbrating'] for d in currMovies],    y = [d['numericrating'] for d in currMovies],    color = ["#FF9900" for d in currMovies],    title = [d['title'] for d in currMovies],    released = [d['released'] for d in currMovies],    imdbvotes = [d['imdbvotes'] for d in currMovies],    genre = [d['genre'] for d in currMovies])output_file("graph.html")show(fig)

До второй части мы будем использовать массив из пяти примеров словарей, содержащих связанные с фильмами свойства. В конечном счете мы получим 3000 записей из EasyBase в режиме реального времени. Свойства в source.data это ссылка Bokeh на то, где на графике должен отображаться элемент и то, как он должен выглядеть. Как уже было сказано, структура этой переменной представляет собой словарь, отображающий все наши атрибуты в соответствующий массив. По этой причине используется синтаксис построения встроенного массива для захвата и извлечения каждого свойства из наших данных в его массив.

Здесь видно, где именно эти элементы расположены на рисунке Bokeh: x, y и color передаются в метод circle(), а title и genre передаются во всплывающую подсказку (позже воспользуемся released, imdbvotes и genre). Свободно изменяйте значения в массивах. Например, если вы хотите, чтобы цвет точек соотносился с жанром, вот код: color = ["#FF9900" for d in currMovies], можно сделать и так: color = ["#008800", if d['genre'] == "drama" else "#FF9900" for d in curMovies]. Наконец, output_file указывает на место, где вы хотите сохранить свои более поздние рисунки. show(fig) сохраняет рисунок в этом месте и открывает его в вашем браузере. Запустите файл и вот, что вы увидите:

chart plotting IMDB and Rotten Tomatoes ratings of movie titles

Пока всё не слишком увлекательно, но наведите курсор мыши на любую точку, чтобы получить информацию о фильме, которая указана в tooltips. Кроме того, попробуйте панорамировать и масштабировать интерфейс. Эти функции пригодятся позже.

Часть вторая


Вот ссылка на CSV-файл с тремя тысячами записей фильмов с теми же атрибутами, что в нашем примере.

Давайте поместим записи в базу данных, чтобы обращаться к ним асинхронно и управлять ими из подходящего источника. Я воспользуюсь easybase.io потому, что это бесплатно и не нужно ничего скачивать, подробнее об этом здесь. Кроме того, легко заполнить коллекцию содержимым файла CSV или JSON. Войдите в EasyBase и создайте таблицу по крайней мере с такими столбцами (свободно добавляйте другие атрибуты, если хотите):



Как только эта таблица откроется, нажмите кнопку + и перейдите к экрану upload data.



Перетащите файл CSV в это диалоговое окно. Полученная коллекция будет выглядеть примерно так:



Помните, что всегда можно загрузить данные в формате CSV или JSON из EasyBase, выбрав всё (в разделе +) и перейдя к разделу share.

Перейдите в Integrate REST GET. Откройте свою новую интеграцию в Get, добавьте все столбцы. Сохранитесь, а затем откройте всплывающее окно интеграции. Мое окно выглядит так:



Обратите внимание на ваш идентификатор интеграции именно через него мы будем извлекать данные из приложения.

Часть третья


Теперь давайте превратим приложение в проект Flask. Перейдите в каталог с файлом app.py. Создайте папку с именем template, добавьте файл с именем index.html.

project templates    index.html app.py

В файле app.py посмотрим очень простую реализацию приложения Flask:

from flask import Flask, render_templateapp = Flask(__name__)@app.route('/')def index():    return render_template('index.html')if __name__ == "__main__":    app.run(debug=True)

Запустите программу командой export FLASK_APP=app.py && flask run на Mac или set FLASK_APP=app.py && flask run на Windows, будет создан веб-сервер. Перейдите по адресу localhost:5000, приложение отобразит templates/index.html. Напишем в файле index.html такой код:

<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <meta name="viewport" content="width=device-width, initial-scale=1.0">    <title>Document</title>    {{ js_resources|indent(4)|safe }}    {{ css_resources|indent(4)|safe }}    {{ plot_script|indent(4)|safe }}</head><body>    <h1 style="text-align: center; margin-bottom: 40px;">Flask + Bokeh + EasyBase.io</h2>    <div style="display: flex; justify-content: center;">        {{ plot_div|indent(4)|safe }}    </div></body></html>

Это очень простая веб-страница с четырьмя текучими [прим. перев. конечно, нет никаких текучих атрибутов, вероятно, речь идет об этом видео] атрибутами: js_resources, css_resources, plot_script и plot_div. Bokeh даст нам все передаваемые в эти атрибуты переменные. Во-первых, мы намерены совместить код из первой части с app.py. Начнем с импортирования модулей:

from flask import Flask, render_templatefrom easybase import getfrom bokeh.models import ColumnDataSource, Div, Select, Slider, TextInputfrom bokeh.io import curdocfrom bokeh.resources import INLINEfrom bokeh.embed import componentsfrom bokeh.plotting import figure, output_file, show

Добавьте код из первой части в метод index() перед вызовом return render_template("index.html"). Я заменю захардкоженный массив фильмов вызовом get () в EasyBase. Если у вас не установлена библиотека EasyBase, установите ее так: pip easybase-python. Я заменяю массив из первой части вот таким методом:

def selectedMovies():    res = get("Dt-p-a0jVTBSVQji", 0, 3000, "password")    return res  # ...currMovies = selectedMovies()

Первый параметр get() это идентификатор интеграции из предыдущей версии, после идут offset, length и authentication.

Как и в первой части, этот метод возвращает массив словарей. У этих словарей атрибуты те же, что и раньше.

В методе index() заменим return render_template("index.html") вот этим кодом:

script, div = components(fig)return render_template(    'index.html',    plot_script=script,    plot_div=div,    js_resources=INLINE.render_js(),    css_resources=INLINE.render_css(),).encode(encoding='UTF-8')

Ниже перечисление вводимых в шаблон переменных.

  • plot_script: JavaScript рисунка
  • plot_div: HTML рисунка внутри тега div
  • js_resources: основной и требуемый Boken JavaScript
  • css_resources: Основной и требуемый Bokeh CSS

Теперь app.py будет выглядеть примерно так:

from flask import Flask, render_templatefrom easybase import getfrom bokeh.models import ColumnDataSource, Div, Select, Slider, TextInputfrom bokeh.io import curdocfrom bokeh.resources import INLINEfrom bokeh.embed import componentsfrom bokeh.plotting import figure, output_file, showapp = Flask(__name__)@app.route('/')def index():    def selectedMovies():        res = get("Dt-p-a0jVTBSVQji", 0, 3000, "password")        return res        source = ColumnDataSource()    fig = figure(plot_height=600, plot_width=720, tooltips=[("Title", "@title"), ("Released", "@released")])    fig.circle(x="x", y="y", source=source, size=5, color="color", line_color=None)    fig.xaxis.axis_label = "IMDB Rating"    fig.yaxis.axis_label = "Rotten Tomatoes Rating"    currMovies = selectedMovies()    source.data = dict(        x = [d['imdbrating'] for d in currMovies],        y = [d['numericrating'] for d in currMovies],        color = ["#FF9900" for d in currMovies],        title = [d['title'] for d in currMovies],        released = [d['released'] for d in currMovies],        imdbvotes = [d['imdbvotes'] for d in currMovies],        genre = [d['genre'] for d in currMovies]    )    script, div = components(fig)    return render_template(        'index.html',        plot_script=script,        plot_div=div,        js_resources=INLINE.render_js(),        css_resources=INLINE.render_css(),    ).encode(encoding='UTF-8')if __name__ == "__main__":    app.run(debug=True)

Выполните программу командой export FLASK_APP=app.py && flask run на Mac или set FLASK_APP=app.py && flask run на Windows. Ваш сайт на localhost:5000 должен выглядеть примерно так:



Часть четвертая


Итак, у нас есть отображаемая Flask модель Bokeh. Конечная цель добавить виджеты пользовательского интерфейса, с помощью которых пользователи смогут манипулировать данными. Все изменения будут касаться метода index(). Я очень старался сделать реализацию легко расширяемой. Давайте сначала создадим словарь элементов управления:

genre_list = ['All', 'Comedy', 'Sci-Fi', 'Action', 'Drama', 'War', 'Crime', 'Romance', 'Thriller', 'Music', 'Adventure', 'History', 'Fantasy', 'Documentary', 'Horror', 'Mystery', 'Family', 'Animation', 'Biography', 'Sport', 'Western', 'Short', 'Musical']controls = {    "reviews": Slider(title="Min # of reviews", value=10, start=10, end=200000, step=10),    "min_year": Slider(title="Start Year", start=1970, end=2021, value=1970, step=1),    "max_year": Slider(title="End Year", start=1970, end=2021, value=2021, step=1),    "genre": Select(title="Genre", value="All", options=genre_list)}controls_array = controls.values()

Мы видим три ползунка и выпадающее меню. Свободно редактируйте или добавляйте любые пользовательские виджеты в этот словарь. А нам нужно реализовать обратный вызов, выполняемый при изменении любого элемента управления. Это делается с помощью модуля Bokeh CustomJS. Четвертая часть статьи может быть сложной, но я постарался разобрать все как можно понятнее. Начнем с переменной обратного вызова:

callback = CustomJS(args=dict(source=source, controls=controls), code="""    if (!window.full_data_save) {        window.full_data_save = JSON.parse(JSON.stringify(source.data));    }    var full_data = window.full_data_save;    var full_data_length = full_data.x.length;    var new_data = { x: [], y: [], color: [], title: [], released: [], imdbvotes: [] }    for (var i = 0; i < full_data_length; i++) {        if (full_data.imdbvotes[i] === null || full_data.released[i] === null || full_data.genre[i] === null)            continue;        if (            full_data.imdbvotes[i] > controls.reviews.value &&            Number(full_data.released[i].slice(-4)) >= controls.min_year.value &&            Number(full_data.released[i].slice(-4)) <= controls.max_year.value &&            (controls.genre.value === 'All' || full_data.genre[i].split(",").some(ele => ele.trim() === controls.genre.value))        ) {            Object.keys(new_data).forEach(key => new_data[key].push(full_data[key][i]));        }    }    source.data = new_data;    source.change.emit();""")

Функция code вызывается при любом изменении входных данных. И вызывается она с доступными аргументами: source (исходные данные) и controls (словарь controls). Первая часть code проверяет, существует ли глобальная переменная JavaScript с именем full_data_save. Поскольку эта переменная не существует при первом запуске этой функции, функция создаст глубокую копию необработанных данных и сохранит их в этой глобальной переменной.

Теперь full_data_save не будет изменяться, поэтому у нас всегда будет ссылка на исходные данные. Затем создается новый объект с именем new_data, который принимает тот же формат, что у исходных данных. После выполняется цикл по всем исходным данным с проверкой того, удовлетворяют ли данные значению элемента управления. Видно, что доступ к значению элементов управления осуществляется через controls.*control_name*.value, аналогично тому, как исходные данные мы получили через аргументы CustomJS. Поскольку атрибут released имеет формат MM-DD-YYYY, чтобы сравнить его с min_year и max_year (строки 17-18), я воспользуюсь только последними четырьмя символами. Если отдельный элемент удовлетворяет всем запросам пользователя, он перемещается в new_data с помощью приведенной ниже строки:

Object.keys(new_data).forEach(key => new_data[key].push(full_data[key][i]));

Код отправляет все атрибуты full_data по индексу i в соответствующий массив new_data. После просмотра всех данных в цикле исходными данными становятся новые данные. Наконец, изменения отправляются с помощью функции source.change.emit(). Я пытался сделать код расширяемым, поэтому рабочий процесс добавления нового элемента управления выглядит так:

  1. Добавляем новый виджет в словарь controls.
  2. Внутри CustomJS, добавляем написанное нами условное выражение в строку 15.

Наконец, воспользуемся циклом, чтобы привести в действие обратный вызов и присвоить новые значения:

for single_control in controls_array:    single_control.js_on_change('value', callback)

И последнее добавим элементы управления в layout. Заменим предыдущую строку script, div = components(fig) вот так:

        inputs_column = column(*controls_array, width=320, height=1000)    layout_row = row([ inputs_column, fig ])        script, div = components(layout_row)

Код создает колонку элементов управления под названием input_controls, за которым следует строка input_controls и рисунок. Теперь передаем эту строку в метод components(), а не просто в рисунок. И запустим приложение:



Заключение


Наше приложение успешно интегрировало интерактивный Bokeh рисунок с пользовательскими обратными вызовами на JavaScript. Обратные вызовы выполняются при редактировании любого из наших элементов управления и позволяют выполнять интерактивные запросы из нескольких источников. Python извлекает тысячи записей из Easybase.io с помощью пакета easybase-python, и все эти технологии успешно работают на локальном экземпляре Flask.

Теперь добавим маршрут для пользователей, чтобы они могли добавлять данные в нашу базу данных из приложения Flask. Визуализации Bokeh будут обновляться в режиме реального времени. Спасибо, что прочитали! Не стесняйтесь оставлять комментарии с любыми вопросами. Ниже я добавил весь исходный код app.py:

Простыня исходного кода
from flask import Flask, render_templatefrom easybase import getfrom bokeh.models import ColumnDataSource, Select, Sliderfrom bokeh.resources import INLINEfrom bokeh.embed import componentsfrom bokeh.plotting import figurefrom bokeh.layouts import column, rowfrom bokeh.models.callbacks import CustomJSapp = Flask(__name__)@app.route('/')def index():        genre_list = ['All', 'Comedy', 'Sci-Fi', 'Action', 'Drama', 'War', 'Crime', 'Romance', 'Thriller', 'Music', 'Adventure', 'History', 'Fantasy', 'Documentary', 'Horror', 'Mystery', 'Family', 'Animation', 'Biography', 'Sport', 'Western', 'Short', 'Musical']    controls = {        "reviews": Slider(title="Min # of reviews", value=10, start=10, end=200000, step=10),        "min_year": Slider(title="Start Year", start=1970, end=2021, value=1970, step=1),        "max_year": Slider(title="End Year", start=1970, end=2021, value=2021, step=1),        "genre": Select(title="Genre", value="All", options=genre_list)    }    controls_array = controls.values()    def selectedMovies():        res = get("Dt-p-a0jVTBSVQji", 0, 2000, "password")        return res    source = ColumnDataSource()    callback = CustomJS(args=dict(source=source, controls=controls), code="""        if (!window.full_data_save) {            window.full_data_save = JSON.parse(JSON.stringify(source.data));        }        var full_data = window.full_data_save;        var full_data_length = full_data.x.length;        var new_data = { x: [], y: [], color: [], title: [], released: [], imdbvotes: [] }        for (var i = 0; i < full_data_length; i++) {            if (full_data.imdbvotes[i] === null || full_data.released[i] === null || full_data.genre[i] === null)                continue;            if (                full_data.imdbvotes[i] > controls.reviews.value &&                Number(full_data.released[i].slice(-4)) >= controls.min_year.value &&                Number(full_data.released[i].slice(-4)) <= controls.max_year.value &&                (controls.genre.value === 'All' || full_data.genre[i].split(",").some(ele => ele.trim() === controls.genre.value))            ) {                Object.keys(new_data).forEach(key => new_data[key].push(full_data[key][i]));            }        }                source.data = new_data;        source.change.emit();    """)    fig = figure(plot_height=600, plot_width=720, tooltips=[("Title", "@title"), ("Released", "@released")])    fig.circle(x="x", y="y", source=source, size=5, color="color", line_color=None)    fig.xaxis.axis_label = "IMDB Rating"    fig.yaxis.axis_label = "Rotten Tomatoes Rating"    currMovies = selectedMovies()    source.data = dict(        x = [d['imdbrating'] for d in currMovies],        y = [d['numericrating'] for d in currMovies],        color = ["#FF9900" for d in currMovies],        title = [d['title'] for d in currMovies],        released = [d['released'] for d in currMovies],        imdbvotes = [d['imdbvotes'] for d in currMovies],        genre = [d['genre'] for d in currMovies]    )    for single_control in controls_array:        single_control.js_on_change('value', callback)    inputs_column = column(*controls_array, width=320, height=1000)    layout_row = row([ inputs_column, fig ])    script, div = components(layout_row)    return render_template(        'index.html',        plot_script=script,        plot_div=div,        js_resources=INLINE.render_js(),        css_resources=INLINE.render_css(),    )if __name__ == "__main__":    app.run(debug=True)


Если ты построишь его они придут. [прим. перев. Отсыл на фразу из фильма Кевина Костнера Поле чудес: Если ты построишь его он придет].

На тот случай если вы задумали сменить сферу или повысить свою квалификацию промокод HABR даст вам дополнительные 10% к скидке указанной на баннере.

image




Рекомендуемые статьи


Подробнее..

Развертывание ML модели в Docker с использованием Flask (REST API) масштабирование нагрузки через Nginx балансер

27.03.2021 18:23:22 | Автор: admin

Как известно настройка и обучение моделей машинного обучения это только одна из частей цикла разработки, не менее важной частью является развертывание модели для её дальнейшего использования. В этой статье я расскажу о том, как модель машинного обучения может быть развернута в виде Docker микросервиса, а также о том, как можно распараллелить работу микросервиса с помощью распределения нагрузки в несколько потоков через Load balancer. В последнее время Docker набрал большую популярность, однако здесь будет описан только один из видов стратегий развертывания моделей, и в каждом конкретном случае выбор лучшего варианта остаётся за разработчиком.



Гитхаб репозиторий с исходным кодом: https://github.com/cdies/ML_microservice


Введение


Для этого примера я использовал распространённый набор данных MNIST. Конечная ML модель будет развернута в Docker контейнере, доступ к которой будет организован через HTTP протокол посредствам POST запроса (архитектурный стиль REST API). Полученный таким образом микросервис будет распараллелен через балансировщик на базе Nginx.


Веб фреймворк Flask уже содержит в себе веб-сервер, однако, он используется строго for dev purpose only, т.е. только для разработки, вследствие этого я воспользовался веб-сервером Gunicorn для предоставления нашего REST API.


Описание ML модели


Как уже было отмечено выше, для построения ML модели я использовал, наверное, один из самых известных наборов данных MNIST, тут в принципе показан стандартный пайплайн: загрузка и обработка данных -> обучение модели на нейронной сети Keras -> сохранение модели для повторного использования. Исходный код в файле mnist.py


from tensorflow.keras.layers import Dense, Conv2D, MaxPooling2D, Flatten, Dropoutfrom tensorflow.keras.models import Sequentialfrom tensorflow import kerasfrom tensorflow.keras.datasets import mnist(x_train, y_train), (x_test, y_test) = mnist.load_data()x_train = x_train.reshape((60000,28,28,1)).astype('float32')/255x_test = x_test.reshape((10000,28,28,1)).astype('float32')/255y_train = keras.utils.to_categorical(y_train, 10)y_test = keras.utils.to_categorical(y_test, 10)model = Sequential()model.add(Conv2D(32, (3,3), activation='relu', input_shape=(28,28,1)))model.add(Conv2D(64, (3,3), activation='relu'))model.add(MaxPooling2D(pool_size=(2,2)))model.add(Dropout(0.25))model.add(Flatten())model.add(Dense(128, activation='relu'))model.add(Dropout(0.5))model.add(Dense(10, activation='softmax'))model.compile(loss=keras.losses.categorical_crossentropy,     optimizer=keras.optimizers.Adadelta(), metrics=['accuracy'])model.fit(x_train, y_train, batch_size=128,     epochs=12, verbose=1, validation_data=(x_test, y_test))score = model.evaluate(x_test, y_test)print(score)# Save modelmodel.save('mnist-microservice/model.h5')

Построение HTTP REST API


Сохранённую ML модель я буду использовать для создания простого REST API микросервиса, для этого воспользуюсь веб-фреймворком Flask. Микросервис будет принимать изображение цифры, приводить его к виду, подходящему для использования в нейронной сети, которую мы сохранили в файле model.h5 и возвращать распознанное значение и его вероятность. Исходный код в файле mnist_recognizer.py


from flask import Flask, jsonify, requestfrom tensorflow import kerasimport numpy as npfrom flask_cors import CORSimport imageapp = Flask(__name__)# Cross Origin Resource Sharing (CORS) handlingCORS(app, resources={'/image': {"origins": "http://localhost:8080"}})@app.route('/image', methods=['POST'])def image_post_request():      model = keras.models.load_model('model.h5')    x = image.convert(request.json['image'])    y = model.predict(x.reshape((1,28,28,1))).reshape((10,))    n = int(np.argmax(y, axis=0))    y = [float(i) for i in y]    return jsonify({'result':y, 'digit':n})if __name__ == "__main__":    app.run(host='0.0.0.0', port=5000)

Docker файл микросервиса


В Dockerfile файле микросервиса содержатся все необходимые зависимости, код и сохраненная модель.


FROM python:3.7RUN python -m pip install flask flask-cors gunicorn numpy tensorflow pillowWORKDIR /appADD image.py image.pyADD mnist_recognizer.py mnist_recognizer.pyADD model.h5 model.h5EXPOSE 5000CMD [ "gunicorn", "--bind", "0.0.0.0:5000", "mnist_recognizer:app" ]

В таком виде микросервис уже можно использовать в однопоточном режиме, для этого нужно выполнить следующие команды в папке mnist-microservice:


docker build -t mnist_microservice_test .

docker run -d -p 5000:5000 mnist_microservice_test

Nginx балансер


Тут сразу стоит уточнить, что, в принципе распараллелить процесс можно было бы с помощью Gunicorn веб-сервера, в частности добавить в Dockerfile в строку запуска Gunicorn веб-сервера --workers n, чтобы было n процессов. Однако я исходил из того, что в Docker контейнере не нужно плодить процессы, кроме необходимых, поэтому решил разделить процессы по контейнерам (одна ML модель один контейнер), а не сваливать все процессы в один контейнер. (Пишите в комментах, как бы сделали вы)


Чтобы балансировщик нагрузки работал, нужно, чтобы Nginx перенаправлял запросы на порт 5000, который слушает наш микросервис. Исходный код в файле nginx.conf


user  nginx;events {    worker_connections   1000;}http {        server {              listen 4000;              location / {                proxy_pass http://mnist-microservice:5000;              }        }}

В заключительном docker-compose.yml файле я распараллелил созданный ранее микросервис, который назвал здесь mnist-microservice с помощью параметра replicas.


version: '3.7'services:    mnist-microservice:        build:            context: ./mnist-microservice        image: mnist-microservice        restart: unless-stopped        expose:            - "5000"        deploy:            replicas: 3    nginx-balancer:        image: nginx        container_name: nginx-balancer        restart: unless-stopped        volumes:            - ./nginx-balancer/nginx.conf:/etc/nginx/nginx.conf:ro        depends_on:            - mnist-microservice        ports:            - "5000:4000"    nginx-html:        image: nginx        container_name: nginx-html        restart: unless-stopped        volumes:            - ./html:/usr/share/nginx/html:ro        depends_on:            - nginx-balancer        ports:            - "8080:80"

Как видно, микросервис также продолжает слушать порт 5000 внутри виртуальной сети докера, в то же самое время nginx-balancer перенаправляет трафик от порта 4000 к порту 5000 также внутри виртуальной сети докера, а уже порт 5000 внешней сети я пробросил на 4000 внутренний порт nginx-balancer. Таким образом, для внешнего веб-сервера nginx-html ничего не поменялось, также не пришлось менять исходный код микросервиса.



Чтобы всё запустить, необходимо выполнить в корневой папке проекта:


docker-compose up --build

Для проверки работы микросервиса можно открыть адрес http://localhost:8080/ в браузере и начать посылать в него цифры нарисованные мышкой, в результате должно получиться что-то вроде этого:



Выводы


В последнее время всё чаще используется микросервисная архитектура. Монолитная и микросервисная архитектуры имеют как свои плюсы, так и минусы. К одному из недостатков монолитной архитектуры можно отнести трудность масштабируемости отдельных компонентов, что напротив, является преимуществом микросервисной архитектуры, ведь появление docker контейнеров позволяет легко и просто масштабировать независимые компоненты и также просто управлять ими.


Полезные ссылки:
https://medium.com/swlh/machine-learning-model-deployment-in-docker-using-flask-d77f6cb551d6
https://medium.com/@vinodkrane/microservices-scaling-and-load-balancing-using-docker-compose-78bf8dc04da9
https://github.com/deadfrominside/keras-flask-app

Подробнее..

Погода-бот DialogFlow OpenWeather Python

18.07.2020 00:05:02 | Автор: admin

Постановка задачи


Задача ставилась следующим образом: написать телеграм-бота, который распознавал бы вопросы о том, какая сегодня погода в том или ином городе и выдавал информацию о погоде.

DialogFlow


Для распознавания человеческой речи как нельзя лучше подходит фреймворк DialogFlow, уже имеющий встроенный в него ML. Давайте приступим к работе.

Переходим по ссылке https://dialogflow.cloud.google.com/, авторизуемся в своем аккаунте гугл и переходим на страницу создания бота. Нажимаем на Create new agent и вводим имя агенту: weather-bot. Выбираем дефолтный язык русский.

image

Основной объект, с которым работает DialogFlow интенты или намерения. При взаимодействии с ботом всегда срабатывает то или иное намерение, и задача вас как разработчика сопроводить каждое намерение разнообразными тренировочными фразами, чтобы бот каждый раз максимально правильно угадывал тот или иной интент.

Итак, переходим во вкладку Intents. При создании бота автоматически создаются два интента: Default Fallback Intent и Default Welcome Intent. Welcome Intent вызывается тогда, когда происходит запуск бота либо вы пишете ему приветственное сообщение. Fallback вызывается во всех случаях, когда бот не понимает, что вы ему пишете, т.е. во всех случаях, когда ни один другой интент не срабатывает. Оставляем дефолтные интенты без изменений и жмем на Create intent, называя его get-weather. Именно с этим намерением мы и продолжим работать в данной статье.

image

Переходим в наш интент get-weather, затем во вкладку Training phrases и создаем несколько тренировочных фраз, например, таких:

image

Заметим, что DialogFlow автоматически определяет города как параметры location. Это очень удобно, поскольку мы будем передавать эти самые параметры в бэкенд нашего приложения.

В самом DialogFlow осталось сделать совсем немного разрешить ему вебхуки для взаимодействия с бэкендом нашего бота. Для этого листаем в самый низ, разворачиваем вкладку Fulfillment и ставим галочку на Enable webhook call for this intent.

Бэк


Приступим к написанию серверной части нашего бота. Писать будем на Python в связке с Flask. Для получения информации о погоде был выбран OpenWeather API. Зарегистрируйтесь на этом сайте, затем вам на почту придет API KEY он и понадобится в нашем приложении. Кроме того, поскольку информация о погоде в этом API выдается по параметрам latitude и longitude ширина и долгота нам необходимо как-то преобразовывать город в его ширину и долготу. В этом нам поможет Python-библиотека geopy.

Импортируем все необходимое:

from flask import Flask, request, make_response, jsonifyimport requestsimport jsonfrom geopy.geocoders import Nominatim

Создаем Flask application:

app = Flask(__name__)

и вставляем в переменную API_KEY свой API KEY:

API_KEY = '<your_API_KEY_here>'

Пишем роут для пути "/":

@app.route('/')def index():    return 'Hello World!'

и далее функцию results(), в которой и будет осуществлена вся логика программы:

def results():    req = request.get_json(force=True)    action = req.get('queryResult').get('action')    result = req.get("queryResult")    parameters = result.get("parameters")    if parameters.get('location').get('city'):        geolocator = Nominatim(user_agent='weather-bot')        location = geolocator.geocode(parameters.get('location').get('city'))        lat = location.latitude        long = location.longitude        weather_req = requests.get('https://api.openweathermap.org/data/2.5/onecall?lat={}&lon={}&appid={}'.format(lat, long, API_KEY))        current_weather = json.loads(weather_req.text)['current']        temp = round(current_weather['temp'] - 273.15)        feels_like = round(current_weather['feels_like'] - 273.15)        clouds = current_weather['clouds']        wind_speed = current_weather['wind_speed']    return {'fulfillmentText': 'Сейчас температура воздуха - {} градусов, ощущается как {} градусов, облачность - {}%, скорость ветра - {}м/с'.format(str(temp), str(feels_like), str(clouds), str(wind_speed))}

Осталось дописать роут, по которому будет переход в наше приложение, назовем его webhook:

@app.route('/webhook', methods=['GET', 'POST'])def webhook():    return make_response(jsonify(results()))

и запустить приложение:

if __name__ == '__main__':   app.run(debug=True)

Мы справились!

И это все?


Не совсем. Программа лежит на нашей локальной машине, но DialogFlow о ней ничего не знает. Чтобы превратить нашу машину в сервер, который станет доступен в интернете, нужна особая утилита. Этим требованиям соответствует ngrok. Скачиваем ее, запускаем и вводим в консоли следующее: ngrok http 5000. Появится https-ссылка, которую необходимо скопировать и поместить в DialogFlow. Копируем, переходим в Fulfillment в DialogFlow, ставим Webhook в состояние enabled и вставляем в получившееся поле ссылку. Дописываем роут, т.е. "/webhook". Должно получиться что-то похожее на следующее:

image

Теперь запускаем наше Python-приложение. Осталось совсем немного подключить интеграцию с Telegram. Переходим на вкладку Integrations, выбираем телеграм, далее следуем инструкции по получению токена, вставляем токен, и вуаля приложение готово! Остается его протестировать:

image

Надеюсь, данная статья была вам полезна и сподвигнет на собственные эксперименты в этой области. Код проекта доступен по ссылке.
Подробнее..
Категории: Python , Dialogflow , Weather api , Flask

Flask Dependency Injector руководство по применению dependency injection

25.07.2020 06:07:48 | Автор: admin
Привет,

Я создатель Dependency Injector. Это dependency injection фреймворк для Python.

В этом руководстве хочу показать как применять Dependency Injector для разработки Flask приложений.

Руководство состоит из таких частей:

  1. Что мы будем строить?
  2. Подготовим окружение
  3. Структура проекта
  4. Hello world!
  5. Подключаем стили
  6. Подключаем Github
  7. Сервис поиска
  8. Подключаем поиск
  9. Немного рефакторинга
  10. Добавляем тесты
  11. Заключение

Завершенный проект можно найти на Github.

Для старта необходимо иметь:

  • Python 3.5+
  • Virtual environment

И желательно иметь:

  • Начальные навыки разработки с помощью Flask
  • Общее представление о принципе dependency injection

Что мы будем строить?


Мы будем строить приложение, которое помогает искать репозитории на Github. Назовем его Github Navigator.

Как работает Github Navigator?

  • Пользователь открывает веб-страницу где ему предлагают ввести поисковый запрос.
  • Пользователь вводит запрос и нажимает Enter.
  • Github Navigator ищет подходящие репозитории на Github.
  • По окончанию поиска Github Navigator показывает пользователю веб-страницу с результатами.
  • Страница результатов показывает все найденные репозитории и поисковый запрос.
  • Для каждого репозитория пользователь видит:
    • имя репозитория
    • владельца репозитория
    • последний коммит в репозиторий
  • Пользователь может нажать на любой из элементов чтобы открыть его страницу на Github.



Подготовим окружение


В первую очередь нам нужно создать папку проекта и virtual environment:

mkdir ghnav-flask-tutorialcd ghnav-flask-tutorialpython3 -m venv venv

Теперь давайте активируем virtual environment:

. venv/bin/activate

Окружение готово, теперь займемся структурой проекта.

Структура проекта


Создадим в текущей папке следующую структуру. Все файлы пока оставляем пустыми. Это пока не критично.

Начальная структура:

./ githubnavigator/    __init__.py    application.py    containers.py    views.py venv/ requirements.txt

Пришло время установить Flask и Dependency Injector.

Добавим следующие строки в файл requirements.txt:

dependency-injectorflask

Теперь давайте их установим:

pip install -r requirements.txt

И проверим что установка прошла успешно:

python -c "import dependency_injector; print(dependency_injector.__version__)"python -c "import flask; print(flask.__version__)"

Вы увидите что-то вроде:

(venv) $ python -c "import dependency_injector; print(dependency_injector.__version__)"3.22.0(venv) $ python -c "import flask; print(flask.__version__)"1.1.2

Hello world!


Давайте создадим минимальное hello world приложение.

Добавим следующие строки в файл views.py:

"""Views module."""def index():    return 'Hello, World!'

Теперь добавим контейнер зависимостей (дальше просто контейнер). Контейнер будет содержать все компоненты приложения. Добавим первые два компонента. Это Flask приложение и представление index.

Добавим следующее в файл containers.py:

"""Application containers module."""from dependency_injector import containersfrom dependency_injector.ext import flaskfrom flask import Flaskfrom . import viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = flask.Application(Flask, __name__)    index_view = flask.View(views.index)

Теперь нам нужно создать фабрику Flask приложения. Ее обычно называют create_app(). Она будет создавать контейнер. Контейнер будет использован для создания Flask приложения. Последним шагом настроим маршрутизацию мы назначим представление index_view из контейнера обрабатывать запросы к корню "/" нашего приложения.

Отредактируем application.py:

"""Application module."""from .containers import ApplicationContainerdef create_app():    """Create and return Flask application."""    container = ApplicationContainer()    app = container.app()    app.container = container    app.add_url_rule('/', view_func=container.index_view.as_view())    return app

Контейнер первый объект в приложении. Он используется для получения всех остальных объектов.

Теперь наше приложение готово сказать Hello, World!.

Выполните в терминале:

export FLASK_APP=githubnavigator.applicationexport FLASK_ENV=developmentflask run

Вывод должен выглядеть приблизительно так:

* Serving Flask app "githubnavigator.application" (lazy loading)* Environment: development* Debug mode: on* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)* Restarting with fsevents reloader* Debugger is active!* Debugger PIN: 473-587-859

Откройте браузер и зайдите на http://127.0.0.1:5000/.

Вы увидите Hello, World!.

Отлично. Наше минимальное приложение успешно стартует и работает.

Давайте сделаем его немного красивее.

Подключаем стили


Мы будем использовать Bootstrap 4. Используем для этого расширение Bootstrap-Flask. Оно поможет нам добавить все нужные файлы в несколько кликов.

Добавим bootstrap-flask в requirements.txt:

dependency-injectorflaskbootstrap-flask

и выполним в терминале:

pip install --upgrade -r requirements.txt

Теперь добавим расширение bootstrap-flask в контейнер.

Отредактируйте containers.py:

"""Application containers module."""from dependency_injector import containersfrom dependency_injector.ext import flaskfrom flask import Flaskfrom flask_bootstrap import Bootstrapfrom . import viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = flask.Application(Flask, __name__)    bootstrap = flask.Extension(Bootstrap)    index_view = flask.View(views.index)

Давайте инициализируем расширение bootstrap-flask. Нам нужно будет изменить create_app().

Отредактируйте application.py:

"""Application module."""from .containers import ApplicationContainerdef create_app():    """Create and return Flask application."""    container = ApplicationContainer()    app = container.app()    app.container = container    bootstrap = container.bootstrap()    bootstrap.init_app(app)    app.add_url_rule('/', view_func=container.index_view.as_view())    return app

Теперь нужно добавить шаблоны. Для этого нам понадобится добавить папку templates/ в пакет githubnavigator. Внутри папки с шаблонами добавим два файла:

  • base.html базовый шаблон
  • index.html шаблон основной страницы

Создаем папку templates и два пустых файла внутри base.html и index.html:

./ githubnavigator/    templates/       base.html       index.html    __init__.py    application.py    containers.py    views.py venv/ requirements.txt

Теперь давайте наполним базовый шаблон.

Добавим следующие строки в файл base.html:

<!doctype html><html lang="en">    <head>        {% block head %}        <!-- Required meta tags -->        <meta charset="utf-8">        <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">        {% block styles %}            <!-- Bootstrap CSS -->            {{ bootstrap.load_css() }}        {% endblock %}        <title>{% block title %}{% endblock %}</title>        {% endblock %}    </head>    <body>        <!-- Your page content -->        {% block content %}{% endblock %}        {% block scripts %}            <!-- Optional JavaScript -->            {{ bootstrap.load_js() }}        {% endblock %}    </body></html>

Теперь наполним шаблон основной страницы.

Добавим следующие строки в файл index.html:

{% extends "base.html" %}{% block title %}Github Navigator{% endblock %}{% block content %}<div class="container">    <h1 class="mb-4">Github Navigator</h1>    <form>        <div class="form-group form-row">            <div class="col-10">                <label for="search_query" class="col-form-label">                    Search for:                </label>                <input class="form-control" type="text" id="search_query"                       placeholder="Type something to search on the GitHub"                       name="query"                       value="{{ query if query }}">            </div>            <div class="col">                <label for="search_limit" class="col-form-label">                    Limit:                </label>                <select class="form-control" id="search_limit" name="limit">                    {% for value in [5, 10, 20] %}                    <option {% if value == limit %}selected{% endif %}>                        {{ value }}                    </option>                    {% endfor %}                </select>            </div>        </div>    </form>    <p><small>Results found: {{ repositories|length }}</small></p>    <table class="table table-striped">        <thead>            <tr>                <th>#</th>                <th>Repository</th>                <th class="text-nowrap">Repository owner</th>                <th class="text-nowrap">Last commit</th>            </tr>        </thead>        <tbody>        {% for repository in repositories %} {{n}}            <tr>              <th>{{ loop.index }}</th>              <td><a href="{{ repository.url }}">                  {{ repository.name }}</a>              </td>              <td><a href="{{ repository.owner.url }}">                  <img src="{{ repository.owner.avatar_url }}"                       alt="avatar" height="24" width="24"/></a>                  <a href="{{ repository.owner.url }}">                      {{ repository.owner.login }}</a>              </td>              <td><a href="{{ repository.latest_commit.url }}">                  {{ repository.latest_commit.sha }}</a>                  {{ repository.latest_commit.message }}                  {{ repository.latest_commit.author_name }}              </td>            </tr>        {% endfor %}        </tbody>    </table></div>{% endblock %}

Отлично, почти готово. Последним шагом изменим представление index чтобы оно использовало шаблон index.html.

Отредактируем views.py:

"""Views module."""from flask import request, render_templatedef index():    query = request.args.get('query', 'Dependency Injector')    limit = request.args.get('limit', 10, int)    repositories = []    return render_template(        'index.html',        query=query,        limit=limit,        repositories=repositories,    )

Готово.

Убедитесь что приложение работает или выполните flask run и откройте http://127.0.0.1:5000/.

Вы должны увидите:



Подключаем Github


В этом разделе интегрируем наше приложение с Github API.
Мы будем использовать библиотеку PyGithub.

Добавим её в requirements.txt:

dependency-injectorflaskbootstrap-flaskpygithub

и выполним в терминале:

pip install --upgrade -r requirements.txt

Теперь нам нужно добавить Github API клиент в контейнер. Для этого нам нужно будет воспользоваться двумя новыми провайдерами из модуля dependency_injector.providers:

  • Провайдер Factory будет создавать Github клиент.
  • Провайдер Configuration будет передавать API токен и таймаут Github клиенту.

Сделаем это.

Отредактируем containers.py:

"""Application containers module."""from dependency_injector import containers, providersfrom dependency_injector.ext import flaskfrom flask import Flaskfrom flask_bootstrap import Bootstrapfrom github import Githubfrom . import viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = flask.Application(Flask, __name__)    bootstrap = flask.Extension(Bootstrap)    config = providers.Configuration()    github_client = providers.Factory(        Github,        login_or_token=config.github.auth_token,        timeout=config.github.request_timeout,    )    index_view = flask.View(views.index)

Мы использовали параметры конфигурации перед тем как задали их значения. Это принцип, по которому работает провайдер Configuration.

Сначала используем, потом задаем значения.

Теперь давайте добавим файл конфигурации.
Будем использовать YAML.

Создайте пустой файл config.yml в корне проекта:

./ githubnavigator/    templates/       base.html       index.html    __init__.py    application.py    containers.py    views.py venv/ config.yml requirements.txt

И заполните его следующими строками:

github:  request_timeout: 10

Для работы с конфигурационным файлом мы будем использовать библиотеку PyYAML. Добавим ее в файл с зависимостями.

Отредактируйте requirements.txt:

dependency-injectorflaskbootstrap-flaskpygithubpyyaml

и установите зависимость:

pip install --upgrade -r requirements.txt

Для передачи API токена мы будем использовать переменную окружения GITHUB_TOKEN.

Теперь нам нужно отредактировать create_app() чтобы сделать 2 действие при старте приложения:

  • Загрузить конфигурацию из config.yml
  • Загрузить API токен из переменной окружения GITHUB_TOKEN

Отредактируйте application.py:

"""Application module."""from .containers import ApplicationContainerdef create_app():    """Create and return Flask application."""    container = ApplicationContainer()    container.config.from_yaml('config.yml')    container.config.github.auth_token.from_env('GITHUB_TOKEN')    app = container.app()    app.container = container    bootstrap = container.bootstrap()    bootstrap.init_app(app)    app.add_url_rule('/', view_func=container.index_view.as_view())    return app

Теперь нам нужно создать API токен.

Для это нужно:

  • Следовать этому руководству на Github
  • Установить токен в переменную окружения:

    export GITHUB_TOKEN=<your token>
    

Этот пункт можно временно пропустить.

Приложение будет работать без токена, но с ограниченной пропускной способностью. Ограничение для неаутентифицированных клиентов: 60 запросов в час. Токен нужен чтобы увеличить эту квоту до 5000 в час.

Готово.

Установка Github API клиента завершена.

Сервис поиска


Пришло время добавить сервис поиска SearchService. Он будет:

  • Выполнять поиск на Github
  • Получать дополнительные данные о коммитах
  • Преобразовывать формат результат

SearchService будет использовать Github API клиент.

Создайте пустой файл services.py в пакете githubnavigator:

./ githubnavigator/    templates/       base.html       index.html    __init__.py    application.py    containers.py    services.py    views.py venv/ config.yml requirements.txt

и добавьте в него следующие строки:

"""Services module."""from github import Githubfrom github.Repository import Repositoryfrom github.Commit import Commitclass SearchService:    """Search service performs search on Github."""    def __init__(self, github_client: Github):        self._github_client = github_client    def search_repositories(self, query, limit):        """Search for repositories and return formatted data."""        repositories = self._github_client.search_repositories(            query=query,            **{'in': 'name'},        )        return [            self._format_repo(repository)            for repository in repositories[:limit]        ]    def _format_repo(self, repository: Repository):        commits = repository.get_commits()        return {            'url': repository.html_url,            'name': repository.name,            'owner': {                'login': repository.owner.login,                'url': repository.owner.html_url,                'avatar_url': repository.owner.avatar_url,            },            'latest_commit': self._format_commit(commits[0]) if commits else {},        }    def _format_commit(self, commit: Commit):        return {            'sha': commit.sha,            'url': commit.html_url,            'message': commit.commit.message,            'author_name': commit.commit.author.name,        }

Теперь добавим SearchService в контейнер.

Отредактируйте containers.py:

"""Application containers module."""from dependency_injector import containers, providersfrom dependency_injector.ext import flaskfrom flask import Flaskfrom flask_bootstrap import Bootstrapfrom github import Githubfrom . import services, viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = flask.Application(Flask, __name__)    bootstrap = flask.Extension(Bootstrap)    config = providers.Configuration()    github_client = providers.Factory(        Github,        login_or_token=config.github.auth_token,        timeout=config.github.request_timeout,    )    search_service = providers.Factory(        services.SearchService,        github_client=github_client,    )    index_view = flask.View(views.index)

Подключаем поиск


Теперь мы готовы чтобы поиск заработал. Давайте используем SearchService в index представлении.

Отредактируйте views.py:

"""Views module."""from flask import request, render_templatefrom .services import SearchServicedef index(search_service: SearchService):    query = request.args.get('query', 'Dependency Injector')    limit = request.args.get('limit', 10, int)    repositories = search_service.search_repositories(query, limit)    return render_template(        'index.html',        query=query,        limit=limit,        repositories=repositories,    )

Теперь изменим контейнер чтобы передавать зависимость SearchService в представление index при его вызове.

Отредактируйте containers.py:

"""Application containers module."""from dependency_injector import containers, providersfrom dependency_injector.ext import flaskfrom flask import Flaskfrom flask_bootstrap import Bootstrapfrom github import Githubfrom . import services, viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = flask.Application(Flask, __name__)    bootstrap = flask.Extension(Bootstrap)    config = providers.Configuration()    github_client = providers.Factory(        Github,        login_or_token=config.github.auth_token,        timeout=config.github.request_timeout,    )    search_service = providers.Factory(        services.SearchService,        github_client=github_client,    )    index_view = flask.View(        views.index,        search_service=search_service,    )

Убедитесь что приложение работает или выполните flask run и откройте http://127.0.0.1:5000/.

Вы увидите:



Немного рефакторинга


Наше представление index содержит два hardcoded значения:

  • Поисковый запрос по умолчанию
  • Лимит количества результатов

Давайте сделаем небольшой рефакторинг. Мы перенесем эти значения в конфигурацию.

Отредактируйте views.py:

"""Views module."""from flask import request, render_templatefrom .services import SearchServicedef index(        search_service: SearchService,        default_query: str,        default_limit: int,):    query = request.args.get('query', default_query)    limit = request.args.get('limit', default_limit, int)    repositories = search_service.search_repositories(query, limit)    return render_template(        'index.html',        query=query,        limit=limit,        repositories=repositories,    )

Теперь нам нужно чтобы эти значения передавались при вызове. Давайте обновим контейнер.

Отредактируйте containers.py:

"""Application containers module."""from dependency_injector import containers, providersfrom dependency_injector.ext import flaskfrom flask import Flaskfrom flask_bootstrap import Bootstrapfrom github import Githubfrom . import services, viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = flask.Application(Flask, __name__)    bootstrap = flask.Extension(Bootstrap)    config = providers.Configuration()    github_client = providers.Factory(        Github,        login_or_token=config.github.auth_token,        timeout=config.github.request_timeout,    )    search_service = providers.Factory(        services.SearchService,        github_client=github_client,    )    index_view = flask.View(        views.index,        search_service=search_service,        default_query=config.search.default_query,        default_limit=config.search.default_limit,    )

Теперь давайте обновим конфигурационный файл.

Отредактируйте config.yml:

github:  request_timeout: 10search:  default_query: "Dependency Injector"  default_limit: 10

Готово.

Рефакторинг закончен. Му сделали код чище.

Добавляем тесты


Было бы хорошо добавить немного тестов. Давайте это сделаем.

Мы будем использовать pytest и coverage.

Отредактируйте requirements.txt:

dependency-injectorflaskbootstrap-flaskpygithubpyyamlpytest-flaskpytest-cov

и установите новые пакеты:

pip install -r requirements.txt

Создайте пустой файл tests.py в пакете githubnavigator:

./ githubnavigator/    templates/       base.html       index.html    __init__.py    application.py    containers.py    services.py    tests.py    views.py venv/ config.yml requirements.txt

и добавьте в него следующие строки:

"""Tests module."""from unittest import mockimport pytestfrom github import Githubfrom flask import url_forfrom .application import create_app@pytest.fixturedef app():    return create_app()def test_index(client, app):    github_client_mock = mock.Mock(spec=Github)    github_client_mock.search_repositories.return_value = [        mock.Mock(            html_url='repo1-url',            name='repo1-name',            owner=mock.Mock(                login='owner1-login',                html_url='owner1-url',                avatar_url='owner1-avatar-url',            ),            get_commits=mock.Mock(return_value=[mock.Mock()]),        ),        mock.Mock(            html_url='repo2-url',            name='repo2-name',            owner=mock.Mock(                login='owner2-login',                html_url='owner2-url',                avatar_url='owner2-avatar-url',            ),            get_commits=mock.Mock(return_value=[mock.Mock()]),        ),    ]    with app.container.github_client.override(github_client_mock):        response = client.get(url_for('index'))    assert response.status_code == 200    assert b'Results found: 2' in response.data    assert b'repo1-url' in response.data    assert b'repo1-name' in response.data    assert b'owner1-login' in response.data    assert b'owner1-url' in response.data    assert b'owner1-avatar-url' in response.data    assert b'repo2-url' in response.data    assert b'repo2-name' in response.data    assert b'owner2-login' in response.data    assert b'owner2-url' in response.data    assert b'owner2-avatar-url' in response.datadef test_index_no_results(client, app):    github_client_mock = mock.Mock(spec=Github)    github_client_mock.search_repositories.return_value = []    with app.container.github_client.override(github_client_mock):        response = client.get(url_for('index'))    assert response.status_code == 200    assert b'Results found: 0' in response.data

Теперь давайте запустим тестирование и проверим покрытие:

py.test githubnavigator/tests.py --cov=githubnavigator

Вы увидите:

platform darwin -- Python 3.8.3, pytest-5.4.3, py-1.9.0, pluggy-0.13.1plugins: flask-1.0.0, cov-2.10.0collected 2 itemsgithubnavigator/tests.py ..                                     [100%]---------- coverage: platform darwin, python 3.8.3-final-0 -----------Name                             Stmts   Miss  Cover----------------------------------------------------githubnavigator/__init__.py          0      0   100%githubnavigator/application.py      11      0   100%githubnavigator/containers.py       13      0   100%githubnavigator/services.py         14      0   100%githubnavigator/tests.py            32      0   100%githubnavigator/views.py             7      0   100%----------------------------------------------------TOTAL                               77      0   100%

Обратите внимание как мы заменяем github_client моком с помощью метода .override(). Таким образом можно переопределить возвращаемое значения любого провайдера.

Заключение


Мы построили Flask приложения применяя принцип dependency injection. Мы использовали Dependency Injector в качестве dependency injection фреймворка.

Основная часть нашего приложения это контейнер. Он содержит все компоненты приложения и их зависимости в одном месте. Это предоставляет контроль над структурой приложения. Её легко понимать и изменять:

"""Application containers module."""from dependency_injector import containers, providersfrom dependency_injector.ext import flaskfrom flask import Flaskfrom flask_bootstrap import Bootstrapfrom github import Githubfrom . import services, viewsclass ApplicationContainer(containers.DeclarativeContainer):    """Application container."""    app = flask.Application(Flask, __name__)    bootstrap = flask.Extension(Bootstrap)    config = providers.Configuration()    github_client = providers.Factory(        Github,        login_or_token=config.github.auth_token,        timeout=config.github.request_timeout,    )    search_service = providers.Factory(        services.SearchService,        github_client=github_client,    )    index_view = flask.View(        views.index,        search_service=search_service,        default_query=config.search.default_query,        default_limit=config.search.default_limit,    )


Контейнер как карта вашего приложения. Вы всегда знайте что от чего зависит.

Что дальше?


Подробнее..

Из песочницы Что такое REST API

14.08.2020 18:11:43 | Автор: admin
Если говорить отдаленно, то REST API нужен для создания сайта. Но ведь для этого много чего нужно (скажете Вы) в какой же части REST API?

Frontend и Backend


Начнем по порядку. Что такое frontend и backend? У сайта есть две стороны: лицевая и внутренняя соответственно. Первая обычно отвечает за визуальное расположение объектов на странице (где какие картинки, где какой текст и где какие кнопочки). Вторая отвечает за действия. Обычно это нажатия на те самые кнопочки или на другие штуки на сайте. Например, Вы заходите на страницу вашей любимой социальной сети и для начала Вам нужно войти в аккаунт. С лицевой стороны (frontend) Вы вводите логин и пароль и нажимаете кнопку Войти. В это время запрос отправляется в базу данных для проверки наличия такого пользователя, и в случае успеха, Вы попадаете в социальную сеть под своим аккаунтом, а в противном случае, Вы увидите сообщение об ошибке. В данном случае, за отправку запроса в базу данных по сути и отвечает backend сторона. Обычно ее разделяют на три подчасти:

  1. Web API для приема запросов
  2. Бизнес-логика для обработки запросов
  3. Взаимодействие с базой данных

В этой статье мы поговорим в основном про API или Application Programming Interface и немного про бизнес-логику. Но для начала создадим сервер.

Создание собственного сервера


Так выглядит простейшее серверное приложение на python с использованием фреймворка flask:

from flask import Flaskapp = Flask(__name__)@app.route("/")def index():   return "Hello, World!"app.run()

Здесь уже есть один пустой роут (/) и если запустить это приложение и открыть браузер на странице 127.0.0.1:5000, то Вы увидете надпись Hello, World!. Со стороны сервера, Вы увидите такое сообщение:

127.0.0.1 - [07/Aug/2020 20:32:16] GET / HTTP/1.1 200 Таким образом, переходя в браузере (клиенте) по данной ссылке, мы делаем GET-запрос на наш сервер и попадаем в функцию index отсюда и берется Hello, World!. Можно добавлять и другие запросы (гораздо более сложные) по другим роутам (или необязательно). Как я сказал, в данном случае у нас использовался GET-запрос стандартный по умолчанию. Но существует и множество других, самые популярные из которых POST, PUT, DELETE. Но зачем это нужно?

Create Read Update Delete


Во-первых, REST расшифровывается как REpresentational State Transfer (или, по-простому, РЕпрезентативная передача состояния). По факту само определение REST не так важно, но его обычно связывают с другой аббревиатурой CRUD Create Read Update Delete. В самом начале я приводил пример, связанный с базой данных и эти четыре операции неотъемлемая часть работы с ней (ну или просто с данными).

Во-вторых, REST или RESTfull API должны поддерживать обработку этих четырех действий. Здесь нам как раз и пригодятся методы GET, POST, PUT, DELETE. Как правило (не обязательно!) метод POST используется для добавления новых данных (Create), GET для чтения (Read), PUT для обновления существующих данных (Update) и DELETE соответственно для удаления (Delete). Например, то же самое приложение на flask можно переделать так:

from flask import Flask, requestapp = Flask(__name__)@app.route("/", methods=["POST", "GET", "PUT", "DELETE"])def index():   if request.method == "POST":       # добавить новые данные   if request.method == "GET":       # отдать данные   if request.method == "PUT":       # обновить данные   if request.method == "DELETE":       # удалить данныеapp.run()

Это и есть примитивный REST API. Frontend сторона теперь может посылать запросы и, в зависимости от их типа, мы будем производить дальнейшие действия.

Работа с данными


Наше текущее приложение совсем неинтересное хорошо бы поработать с какими-нибудь данными. Для этого надо подумать, как их передавать. Самый популярный способ JSON-формат (но можно использовать и другие, например, XML). Он представляет из себя аналог словаря в python и очень удобен в использовании. Я буду использовать примитивные данные для примера с авторизацией в социальной сети:

data = {   1: {       "login": "login1",       "password": "Qwerty1"},   2: {       "login": "login2",       "password": "Ytrewq2"}   }

У нас есть data, в которой пока что два пользователя (login1 и login2) и мы будем эту дату CRUDить. Стоит сказать, что все четыре метода редко когда работают на одном и том же роуте и обычно делают так: для методов GET (выдать всех пользователей) и POST используется роут, например, /users, а для методов GET (выдать одного пользователя по его id), PUT и DELETE /users/id. Также необходимо заметить, что для обновления и создания новых пользователей к нам приходят данные о них в теле запроса (request.json). Теперь нашу программу можно переписать следующим образом:

from flask import Flask, requestapp = Flask(__name__)data = {   1: {       "login": "login1",       "password": "Qwerty1"},   2: {       "login": "login2",       "password": "Ytrewq2"}   }@app.route("/users", methods=["POST", "GET"])def work_with_users():   if request.method == "POST":       data[max(data.keys())+1] = request.json       return {"message": "User was created"}, 201   if request.method == "GET":       return data, 200@app.route("/users/<int:user_id>", methods=["GET", "PUT", "DELETE"])def work_with_user_by_id(user_id):   if request.method == "GET":       return data[user_id], 200   if request.method == "PUT":       data[user_id]["login"] = request.json["login"]       data[user_id]["password"] = request.json["password"]       return {"message": "User was updated"}, 200   if request.method == "DELETE":       data.pop(user_id)       return {"message": "User was deleted"}, 200app.run()

Для тестирования запросов существует множество программ (Postman, Fiddler, Insomnia...) и я рекомендую ознакомиться с одной из них (лично мне больше всего нравится Postman). С их помощью можно посмотреть, что приходит в результате запроса и с каким статус-кодом (числа 200/201 в returnах). А также можно инсценировать отправку данных, добавляя их в тело запроса.

Стоит также заметить, что в настоящее время такой подход не используется, а обычно применяют библиотеку flask-restplus (или пришедшую ей на смену flask-restx), но я считаю, что для начала нужно познакомиться с чистым flask. Также необходимо проверять наличие данных и их корректность и предусмотреть возврат ошибки в противных случаях.

Заключение


REST API это просто методы CRUD, к которым обращается клиентская сторона сайта по определенным роутам. На слух и взгляд, возможно, это воспринимается трудно, так что я рекомендую написать собственный сервер по аналогии с примером. Лично я считаю flask одним из самых простых фреймворков для этого, и, если Вы новичок, то я советую попробовать именно его.
Подробнее..
Категории: Python , Api , Rest , Rest api , Flask , Crud restful api

Ещё раз о производительности фреймворков Python для веб разработки

23.12.2020 10:20:51 | Автор: admin
Недавно мне пришлось начинать проект нового веб сервиса, и я решил протестировать максимальную нагрузочную способность Django, а заодно сравнить её с Flaskом и AIOHTTP. Результат показался мне неожиданным, поэтому я просто оставлю его тут.

На диаграммах ниже приведены результаты простейшего Apache Benchmarka для фреймворков Django версии 3.1, Flask 1.1 и AIOHTTP 3.7. AIOHTTP работает в штатном однопоточном асинхронном режиме, Django и Flask обслуживаются синхронным WSGI сервером Gunicorn с числом потоков, равным числу доступных ядер процессора * 2. ASGI в тесте не участвовал.

Условия тестирования
Во всех трёх случаях выводится простая страница со списком по результатам выборки из реляционной базы данных PostgreSQL. Запрос я постарался сделать максимально приближенным к реальности:

SELECT r.id, r.auth_user_id, r.status, r.updated, r.label, r.content, u.username,    ARRAY_AGG(t.tag) tag, COUNT(*) OVER() cnt,    (        SELECT COUNT(*) FROM record r2            WHERE                r2.parent_id IS NOT NULL                AND r2.parent_id = r.id                AND r2.status = 'new'    ) AS partsFROM record rJOIN auth_user u ON u.id = r.auth_user_idLEFT JOIN tag t ON t.kind_id = r.id AND t.kind = 'rec'WHERE r.parent_id IS NULL AND r.status = 'new'GROUP BY r.id, u.usernameORDER BY r.updated DESCLIMIT 10 OFFSET 0

Этот запрос получает список корневых записей из основной таблицы приложения, число прямых потомков для каждой строки, данные владельца записи, список тегов и общее число строк выборки.

AIOHTTP использует пулл соединений с БД и драйвер asyncpg, Django и Flask SQLAlchemy без ORM (для чистоты эксперимента) и psycopg2.

Приложение Django создано стандартными средствами фреймворка (django-admin startproject, manage.py startapp и т.д.), вывод тестовой страницы через ListView. Установки Flask и AIOHTTP построены на канонических веб приложениях Hello, world, взятых из документации.

Результаты запуска теста на локальной машине (4 ядра CPU)



и на реальном однопроцессорном VDS (пинг около 45 ms)



Во время теста AIOHTTP использовал 100% одного ядра CPU, Flask и Django 100% всех доступных ядер.

Выводы


На самом деле, сравнение асинхронных и многопоточных приложений не совсем корректно они решают разные задачи. Поэтому, результат выглядит довольно логичным: в локальном тесте у AIOHTTP просто оказалось меньше ресурсов, при равных условиях производительность нивелируется.

А вот скромный результат Flaskа с трудом поддается объяснению, разогнать этот фреймворк у меня не получилось.
Подробнее..

Развертывание нескольких моделей машинного обучения на одном сервере

03.02.2021 00:17:16 | Автор: admin

Знакомство с проблемой

В коммерческой разработке многие сценарии использования машинного обучения подразумевают мультитенантную архитектуру и требуют обучения отдельной модели для каждого клиента и/или пользователя.

В качестве примера можно рассмотреть прогнозирование закупок и спроса на какие-либо продукты с помощью машинного обучения. Если вы управляете сетью розничных магазинов, вы можете использовать данные истории покупок клиентов и итогового спроса на эти продукты для прогнозирования расходов и объёмов закупок для каждого магазина по отдельности.

Чаще всего в таких случаях для развёртывания моделей пишут службу Flask и помещают её в контейнер Docker. Примеров одномодельных серверов машинного обучения очень много, но когда дело доходит до развёртывания нескольких моделей, у разработчика остаётся не так много доступных вариантов для решения проблемы.

В мультитенантных приложениях количество арендаторов заранее не известно и может быть практически не ограничено в какой-то момент у вас может быть только один клиент, а в другой момент вы можете обслуживать отдельные модели для каждого пользователя тысячам пользователей. Вот здесь и начинают проявляться ограничения стандартного подхода к развертыванию:

  • Если мы будем разворачивать контейнер Docker для каждого клиента, то мы получим очень большое и дорогостоящее приложение, которым будет достаточно сложно управлять.

  • Единый контейнер, в образе которого есть все модели, для нас тоже не работает, т. к. на сервере могут работать тысячи моделей, а новые модели добавляются во время выполнения.

Решение

Решение предполагает, что обучение модели машинного обучения выполняется отдельно от обслуживания данной модели. Например, задание Airflow выполняет обучение модели и сохраняет ее в S3, в таком случае единственная ответственность ML сервера это прогнозы.

Обученная модель ML это просто файл на диске, поэтому нам нужно сохранить файл и сопоставление: идентификатор пользователя -> идентификатор модели.

Компоненты решения

Чтобы сервер не зависел от реализации хранилища модели и базовой инфраструктуры машинного обучения, используются следующие абстракции:

  • Model абстрактная модель, предоставляющая прогноз; его реализация может быть SklearnModel, TensorFlowModel, MyCustomModel и т. д.

  • ModelInfoRepository абстрактный репозиторий, который предоставляет сопоставления userid -> modelid. Например, он может быть реализован как SQAlchemyModelInfoRepository.

  • ModelRepository абстрактный репозиторий, который может возвращать модель по ее ID. Это может быть FileSystemRepository, S3Repository или любая другая реализация репозитория.

from abc import ABCclass Model(ABC):    @abstractmethod    def predict(self, data: pd.DataFrame) -> np.ndarray:        raise NotImplementedError class ModelInfoRepository(ABC):    @abstractmethod    def get_model_id_by_user_id(self, user_id: str) -> str:        raise NotImplementedError class ModelRepository(ABC):    @abstractmethod    def get_model(self, model_id: str) -> Model:        raise NotImplementedError

Реализация

Теперь предположим, что мы обучили модель sklearn, которая хранится в Amazon S3 с сопоставлениями userid -> modelid, определенными в базе данных.

class SklearnModel(Model):    def __init__(self, model):        self.model = model     def predict(self, data: pd.DataFrame):        return self.model.predict(data) class SQAlchemyModelInfoRepository(ModelInfoRepository):    def __init__(self, sqalchemy_session: Session):        self.session = sqalchemy_session     def get_model_id_by_user_id(user_id: str) -> str:        # implementation goes here, query a table in any Database      class S3ModelRepository(ModelRepository):    def __init__(self, s3_client):        self.s3_client = s3_client     def get_model(self, model_id: str) -> Model:        # load and deserialize pickle from S3, implementation goes here

Это делает реализацию сервера чрезвычайно простой:

def make_app(model_info_repository: ModelInfoRepository,     model_repsitory: ModelRepository) -> Flask:    app = Flask("multi-model-server")        @app.predict("/predict/<user_id>")    def predict(user_id):        model_id = model_info_repository.get_model_id_by_user_id(user_id)         model = model_repsitory.get_model(model_id)         data = pd.DataFrame(request.json())         predictions = model.predict(data)         return jsonify(predictions.tolist())     return app

Обратите внимание, что благодаря абстракциям сервер Flask полностью независим от конкретной модели и реализации хранилища; мы можем заменить sklearn на TensorFlow и S3 на локальную папку, при этом в коде сервера Flask не меняются строки.

Замечание о кешировании

Поскольку некоторые модели можно запрашивать чаще, чем другие, загрузка их из хранилища каждый раз может оказаться дорогостоящей. Чтобы решить эту проблему, мы можем использовать кеширование. Вот как это может быть реализовано в виде композиции из существующего репозитория моделей и кеширующих библиотек cachetools:

from cachetools import Cache class CachedModelRepository(ModelRepository):    def __init__(self, model_repository: ModelRepository, cache: Cache):        self.model_repository = model_repository        self.cache = cache     @abstractmethod    def get_model(self, model_id: str) -> Model:        if model_id not in self.cache:            self.cache[model_id] = self.model_repository.get_model(model_id)        return self.cache[model_id]

Пример использования:

from cachetools import LRUCache model_repository = CachedModelRepository(    S3ModelRepository(s3_client),    LRUCache(max_size=10))

Перед выходом в продакшен

Такой многомодельный сервер - одна из многих частей, необходимых для запуска приложений производственного уровня с возможностями машинного обучения. Безопасность корпоративного уровня, масштабируемость, MLOps и т. д. могут быть даже более важными для успеха и надежности проекта, чем немного более точная модель машинного обучения. Всегда помните о гениальном правиле 4 от Google: пусть первая модель будет простой, а инфраструктура - правильной.

Подробнее..

Автоматическая документация для Flask с использованием OpenAPI

15.02.2021 18:22:10 | Автор: admin
image alt


Техническая документация, как известно, крайне важная часть любого проекта. До недавнего времени мы прекрасно жили с таким генератором документаций как Sphinx. Но наступил момент переходить на технологии с бОльшим набором возможностей, поэтому мы приняли решение переписать нашу документацию на более современный стандарт: OpenAPI Specification. Эта статья является скромным гайдом по такому переезду. Она будет интересна Python-разработчикам, особенно тем, которые используют Flask. После ее прочтения вы узнаете, как создать статическую OpenAPI документацию для Flask приложения и развернуть ее в GitLab Pages.




apispec + marshmallow


В качестве веб-фреймворка у нас используется Flask. Документацию для API, созданного с помощью него, мы и хотим создать. Спецификация по стандарту OpenAPI описывается форматом YAML(или JSON). Чтобы преобразовать докстринги нашего API в необходимый формат, будем использовать такой инструмент, как apispec, и его плагины. Например, MarshmallowPlugin, с помощью которого (и самой библиотеки marshmallow) можно удобно за счет возможности наследования и переиспользования описать входные и выходные данные эндпоинтов в виде python классов, а также провалидировать их.

Используя библиотеку marshmallow, создадим класс, описывающий параметры API:

from marshmallow import Schema, fieldsclass InputSchema(Schema):   number = fields.Int(description="Число", required=True, example=5)   power = fields.Int(description="Степень", required=True, example=2)

Аналогично сделаем для выходных параметров:

class OutputSchema(Schema):   result = fields.Int(description="Результат", required=True, example=25)

Для группировки запросов в OpenAPI используются теги. Создадим тег и добавим его в объект APISpec:

def create_tags(spec):   """ Создаем теги.   :param spec: объект APISpec для сохранения тегов   """   tags = [{'name': 'math', 'description': 'Математические функции'}]   for tag in tags:       print(f"Добавляем тег: {tag['name']}")       spec.tag(tag)

Далее нам нужно интегрировать параметры в докстринг так, чтобы это соответствовало OpenAPI спецификации.

Пример:

from flask import Blueprint, current_app, json, requestblueprint_power = Blueprint(name="power", import_name=__name__)@blueprint_power.route('/power')def power():   """   ---   get:     summary: Возводит число в степень     parameters:       - in: query         schema: InputSchema     responses:       '200':         description: Результат возведения в степень         content:           application/json:             schema: OutputSchema       '400':         description: Не передан обязательный параметр         content:           application/json:             schema: ErrorSchema     tags:       - math   """   args = request.args   number = args.get('number')   if number is None:       return current_app.response_class(           response=json.dumps(               {'error': 'Не передан параметр number'}           ),           status=400,           mimetype='application/json'       )   power = args.get('power')   if power is None:       return current_app.response_class(           response=json.dumps(               {'error': 'Не передан параметр power'}           ),           status=400,           mimetype='application/json'       )   return current_app.response_class(       response=json.dumps(           {'response': int(number)**int(power)}       ),       status=200,       mimetype='application/json'   )

Эта функция пример реализации метода GET в нашем API.

Блок summary. Краткое описание функции. Для более подробного описания можно добавить блок description.

Блок parameters. Описание параметров запроса. У параметра указывается, откуда он берется:

  • path, для /power/{number}
  • query, для /power?number=5
  • header, для X-MyHeader: Value
  • cookie, для параметров переданных в cookie файле

и schema, в которую передается python класс, описывающий данный параметр.

Блок responses. Описание вариантов ответа команды и их структура.

Блок tags. Описание тегов, которые используются для логической группировки эндпоинтов.

Для POST запроса, например, можно указать еще requestBody, в котором описываются параметры, передаваемые в теле. Подробнее можно почитать в официальной документации.

После того, как мы описали методы API, можем загрузить их описание в объект APISpec:

def load_docstrings(spec, app):   """ Загружаем описание API.   :param spec: объект APISpec, куда загружаем описание функций   :param app: экземпляр Flask приложения, откуда берем описание функций   """   for fn_name in app.view_functions:       if fn_name == 'static':           continue       print(f'Загружаем описание для функции: {fn_name}')       view_fn = app.view_functions[fn_name]       spec.path(view=view_fn)

Создаем метод get_apispec, который будет возвращать объект APISpec, в нем добавляем общую информацию о проекте и вызываем описанные ранее методы load_docstrings и create_tags:

from apispec import APISpecfrom apispec.ext.marshmallow import MarshmallowPluginfrom apispec_webframeworks.flask import FlaskPlugindef get_apispec(app):   """ Формируем объект APISpec.   :param app: объект Flask приложения   """   spec = APISpec(       title="My App",       version="1.0.0",       openapi_version="3.0.3",       plugins=[FlaskPlugin(), MarshmallowPlugin()],   )   spec.components.schema("Input", schema=InputSchema)   spec.components.schema("Output", schema=OutputSchema)   spec.components.schema("Error", schema=ErrorSchema)   create_tags(spec)   load_docstrings(spec, app)   return spec


image alt


Swagger UI


Swagger UI позволяет создать интерактивную страницу с документацией.

Создадим эндпоинт, который будет возвращать спецификацию в json формате, и вызываем в нем get_apispec:

@app.route('/swagger')def create_swagger_spec():   return json.dumps(get_apispec(app).to_dict())

Теперь, когда мы получили json спецификацию, нам нужно сформировать из неё html документ. Для этого воспользуемся пакетом flask_swagger_ui, с помощью которого можно встроить интерактивную страницу с документацией на базе Swagger UI в наше Flask приложение:

from flask_swagger_ui import get_swaggerui_blueprintSWAGGER_URL = '/docs'API_URL = '/swagger'swagger_ui_blueprint = get_swaggerui_blueprint(   SWAGGER_URL,   API_URL,   config={       'app_name': 'My App'   })


Таким образом, мы сделали эндпоинт /docs, при обращении по которому получаем документацию следующего вида:

image alt


image alt

GitLab Pages + ReDoc



Если мы не хотим формировать документацию во время обращения по эндпоинту, то можно собрать статичный документ.

Использование GitLab позволяет сгенерировать такую статическую страницу с документацией в CI/CD процессах.

Таким образом, мы один раз соберем документацию, и при обращении по заданному адресу она будет просто отображаться без какой-либо дополнительной обработки.

Для этого сохраним APISpec в YAML файл:

DOCS_FILENAME = 'docs.yaml'def write_yaml_file(spec: APISpec):   """ Экспортируем объект APISpec в YAML файл.   :param spec: объект APISpec   """   with open(DOCS_FILENAME, 'w') as file:       file.write(spec.to_yaml())   print(f'Сохранили документацию в {DOCS_FILENAME})

Теперь, когда мы получили YAML файл по спецификации OpenAPI, нужно сформировать HTML документ. Для этого будем использовать ReDoc, так как он позволяет сгенерировать документ в gitlab-ci с красивой и удобной структурой. Публиковать его будем с помощью GitLab Pages.

Добавим следующие строки в файл gitlab-ci.yml:

pages: stage: docs image: alpine:latest script:   - apk add --update nodejs npm   - npm install -g redoc-cli   - redoc-cli bundle -o public/index.html docs.yaml artifacts:   paths:     - public

Стоит отметить, что index.html нужно сохранять в папку public, так как она зарезервирована GitLabом.

Теперь, если мы запушим изменения в репозиторий, по адресу namespace.gitlab.com/project появится документация:

image alt


Также путь до документации можно посмотреть в Settings/Pages

Пример документации с использованием ReDoc: ivi-ru.github.io/hydra

Заключение


Таким образом, мы научились собирать OpenAPI документацию с использованием ReDoc и хостить ее на GitLab Pages. В эту статью не попало еще несколько возможностей этих инструментов, например, валидация параметров с помощью marshmallow. Но основной ее целью было показать непосредственно процесс создания документации.

Полезные ссылки


Подробнее..

Как прикрутить нейросеть к сайту по быстрому

05.03.2021 16:08:13 | Автор: admin

В данном материале предлагается, приложив небольшие усилия, соединить python 3.7+flask+tensorflow 2.0+keras+небольшие вкрапления js и вывести на web-страницу определенный интерактив. Пользователь, рисуя на холсте, будет отправлять на распознавание цифры, а ранее обученная модель, использующая архитектуру CNN, будет распознавать полученный рисунок и выводить результат. Модель обучена на известном наборе рукописных цифр MNIST, поэтому и распознавать будет только цифры от 0 до 9 включительно. В качестве системы, на которой все это будет крутиться, используется windows 7.


Небольшое вступление

.
Чем печальны книги по машинному обучению, так, пожалуй, тем, что код устаревает почти с выходом самой книги. И хорошо, если автор издания поддерживает свое дитя, сопровождая и обновляя код, но, зачастую все ограничивается тем, что пишут вот вам requirements.txt, ставьте устаревшие пакеты, и все заработает.

Так вышло и в этот раз. Читая Hands-On Python Deep Learning for the Web авторства Anubhav Singh, Sayak Paul, сначала все шло хорошо. Однако, после первой главы праздник закончился. Самое неприятное было то, что заявленные требования в requirements в целом соблюдались.

Масло в огонь подлили и сами разработчики пакетов tensorflow и keras. Один пакет работает только с определенным другим и, либо даунгрейд одного из них либо бубен шамана.
Но и это еще не все. Оказывается, что некоторые пакеты еще и зависимы от архитектуры используемого железа!

Так, за неимением алтернативы железа, устанавливался tensorflow 2.0 на платформу с Celeron j1900 и, как оказалось, там нет инструкции AVX2:


И вариант через pip install tensorflow не работал.
Но не все так грустно при наличии желания и интернета!

Вариант с tensorflow 2.0 удалось реализовать через wheel github.com/fo40225/tensorflow-windows-wheel/tree/master/2.0.0/py37/CPU/sse2 и установку x86: vc_redist.x86.exe, x64: vc_redist.x64.exe (http://personeltest.ru/aways/support.microsoft.com/en-us/help/2977003/the-latest-supported-visual-c-downloads).

Keras был установлен с минимальной версией, с которой он стал совместим с tensorflow Keras==2.3.0.

Поэтому
pip install tensorflow-2.0.0-cp37-cp37m-win_amd64.whl
и
pip install keras==2.3.0


Основное приложение.


Рассмотрим код основной программы.
flask_app.py
#code work with scipy==1.6.1, tensorflow @ file:///D:/python64/tensorflow-2.0.0-cp37-cp37m-win_amd64.whl,#Keras==2.3.0from flask import Flask, render_template, requestimport imageio#https://imageio.readthedocs.io/en/stable/examples.html#from scipy.misc import imread, imresize#from matplotlib.pyplot import imreadimport numpy as npimport tensorflow as tffrom tensorflow.keras.models import model_from_jsonfrom skimage import transform,iojson_file = open('model.json','r')model_json = json_file.read()json_file.close()model = model_from_json(model_json)model.load_weights("weights.h5")model.compile(loss='categorical_crossentropy',optimizer='adam',metrics=['accuracy'])#graph = tf.get_default_graph()graph = tf.compat.v1.get_default_graph()app = Flask(__name__)@app.route('/')def index():    return render_template("index.html")import reimport base64def convertImage(imgData1):    imgstr = re.search(r'base64,(.*)', str(imgData1)).group(1)    with open('output.png', 'wb') as output:        output.write(base64.b64decode(imgstr))@app.route('/predict/', methods=['GET', 'POST'])def predict():    global model, graph        imgData = request.get_data()    convertImage(imgData)    #print(imgData)       #x = imread('output.png', mode='L')    #x.shape    #(280, 280)    x = imageio.imread('output.png',pilmode='L')    #x = imresize(x, (28, 28))    #x = x.resize(x, (28, 28))    x = transform.resize(x, (28,28), mode='symmetric', preserve_range=True)    #(28, 28)    #type(x)    #<class 'numpy.ndarray'>    x = x.reshape(1, 28, 28, 1)    #(1, 28, 28, 1)     x = tf.cast(x, tf.float32)        # perform the prediction    out = model.predict(x)            #print(np.argmax(out, axis=1))    # convert the response to a string    response = np.argmax(out, axis=1)    return str(response[0])if __name__ == "__main__":    # run the app locally on the given port    app.run(host='0.0.0.0', port=80)# optional if we want to run in debugging mode    app.run(debug=True)



Подгрузили пакеты:
from flask import Flask, render_template, requestimport imageio#https://imageio.readthedocs.io/en/stable/examples.html#from scipy.misc import imread, imresize#from matplotlib.pyplot import imreadimport numpy as npimport tensorflow as tffrom tensorflow.keras.models import model_from_jsonfrom skimage import transform,io

Как выяснилось imread, imresize устарели еще со времен scipy==1.0. Непонятно, как у автора все работало, учитывая, что книга относительно нова (2019). С современной scipy==1.6.1 книжный вариант кода не работал.

Загружаем с диска, компилируем модель нейросети:
json_file = open('model.json','r')model_json = json_file.read()json_file.close()model = model_from_json(model_json)model.load_weights("weights.h5")model.compile(loss='categorical_crossentropy',optimizer='adam',metrics=['accuracy'])#graph = tf.get_default_graph()graph = tf.compat.v1.get_default_graph()

Здесь произведена замена на tf.compat.v1.get_default_graph() в виду несовместимости.

Далее часть, относящаяся к серверу на flask. Прорисовка шаблона страницы:
@app.route('/')def index():    return render_template("index.html")


Часть, преобразующая картинку в числовой массив:
import reimport base64def convertImage(imgData1):    imgstr = re.search(r'base64,(.*)', str(imgData1)).group(1)    with open('output.png', 'wb') as output:        output.write(base64.b64decode(imgstr))


Основная функция предсказания:
def predict():    global model, graph        imgData = request.get_data()    convertImage(imgData)    #print(imgData)       #x = imread('output.png', mode='L')    #x.shape    #(280, 280)    x = imageio.imread('output.png',pilmode='L')    #x = imresize(x, (28, 28))    #x = x.resize(x, (28, 28))    x = transform.resize(x, (28,28), mode='symmetric', preserve_range=True)    #(28, 28)    #type(x)    #<class 'numpy.ndarray'>    x = x.reshape(1, 28, 28, 1)    #(1, 28, 28, 1)     x = tf.cast(x, tf.float32)        # perform the prediction    out = model.predict(x)            #print(np.argmax(out, axis=1))    # convert the response to a string    response = np.argmax(out, axis=1)    return str(response[0])

Закоментированы строки, которые были заменены на рабочие, а также оставлены выводы отдельных строк для наглядности.

Как все работает.


После запуска командой python flask_app.py запускается локальный flask-сервер, который выводит index.html с вкраплением js.
Пользователь рисует на холсте цифру, нажимает predict. Картинка улетает на сервер, где сохраняется и преобразуется в цифровой массив. Далее в бой вступает CNN, распознающая цифру и возвращающая ответ в виде цифры.
Сеть не всегда дает верный ответ, т.к. обучалась всего на 10 эпохах. Это можно наблюдать, если нарисовать спорную цифру, которая может трактоваться по-разному.
*Можно покрутить слайдер, увеличивая или уменьшая толщину начертания цифры для целей распознавания.

Второй вариант программы через API,curl

.
Поользователь загружает на сервер свое изображение с цифрой для распознавания и нажимает отправить:


Заменим index.js на следующий
index.js:
$("form").submit(function(evt){evt.preventDefault();var formData = new FormData($(this)[0]);$.ajax({url: '/predict/',type: 'POST',data: formData,async: false,cache: false,contentType: false,enctype: 'multipart/form-data',processData: false,success: function (response) {$('#result').empty().append(response);}});return false;});



Шаблон страницы также изменится:
index.html
<!DOCTYPE html><html lang="en"><head><title>MNIST CNN</title></head><body><h1>MNIST Handwritten Digits Prediction</h1><form><input type="file" name="img"></input><input type="submit"></input></form><hr><h3>Prediction: <span id="result"></span></h3><scriptsrc='https://code.jquery.com/jquery-3.6.0.min.js'></script><script src="{{ url_for('static',filename='index.js') }}"></script></body></html>



Немного изменится и основная программа:
flask_app2.py
#code work with scipy==1.6.1, tensorflow @ file:///D:/python64/tensorflow-2.0.0-cp37-cp37m-win_amd64.whl,#Keras==2.3.0from flask import Flask, render_template, requestimport imageio#https://imageio.readthedocs.io/en/stable/examples.html#from scipy.misc import imread, imresize#from matplotlib.pyplot import imreadimport numpy as npimport tensorflow as tffrom tensorflow.keras.models import model_from_jsonfrom skimage import transform,iojson_file = open('model.json','r')model_json = json_file.read()json_file.close()model = model_from_json(model_json)model.load_weights("weights.h5")model.compile(loss='categorical_crossentropy',optimizer='adam',metrics=['accuracy'])#graph = tf.get_default_graph()graph = tf.compat.v1.get_default_graph()app = Flask(__name__)@app.route('/')def index():    return render_template("index.html")import reimport base64def convertImage(imgData1):    imgstr = re.search(r'base64,(.*)', str(imgData1)).group(1)    with open('output.png', 'wb') as output:        output.write(base64.b64decode(imgstr))@app.route('/predict/', methods=['POST'])def predict():    global model, graph        imgData = request.get_data()    try:        stringToImage(imgData)    except:        f = request.files['img']        f.save('image.png')           #x = imread('output.png', mode='L')    #x.shape    #(280, 280)    x = imageio.imread('image.png',pilmode='L')    #x = imresize(x, (28, 28))    #x = x.resize(x, (28, 28))    x = transform.resize(x, (28,28), mode='symmetric', preserve_range=True)    #(28, 28)    #type(x)    #<class 'numpy.ndarray'>    x = x.reshape(1, 28, 28, 1)    #(1, 28, 28, 1)     x = tf.cast(x, tf.float32)        # perform the prediction    out = model.predict(x)            #print(np.argmax(out, axis=1))    # convert the response to a string    response = np.argmax(out, axis=1)    return str(response[0])if __name__ == "__main__":    # run the app locally on the given port    app.run(host='0.0.0.0', port=80)# optional if we want to run in debugging mode    app.run(debug=True)



Запускается все похоже python flask_app2.py

Вариант с curl (для windows)

.
Скачиваем curl

В командной строке windows отправляем команду:
curl -X POST -F img=@1.png http://localhost/predict/

где 1.png картинка с цифрой (или она же с путем к ней).
В ответ прилетит распознанная цифра.

Файлы для скачивания скачать.
Подробнее..

Играем с CLIP. Создаем универсальный zero-shot классификатор на Android

13.03.2021 14:14:35 | Автор: admin

TLDR: приложение можно скачать и потестить тут

Ссылка на Google Play

Эта статья является дополненной и сильно расширенной версией моей статьи в TowardsDataScience о создании приложения, использующем новейшую мультимодальную нейросеть от OpenAI

В чем проблема классификаторов?

Многие заметили, что в последние годы все чаще для обработки изображений используется нейросетевой подход. Одной из простейших (по формулировке) задач является задача классификации изображений. В ней необходимо определить, к какому из заданных классов относится изображение. Стандартный подход с использованием сверточных нейросетей предполагает использование большого количества последовательных преобразований - сверток, с добавлением простых нелинейных функций, в результате которых изображение превращается в многомерный набор признаков. Далее эти признаки анализируются полносвязной нейросетью. Для обучения подобной нейросети обычно требуется большое количество обучающих примеров - размеченных изображений и сбор данных для конкретной задачи может являться наиболее трудоемким этапом для решения задачи классификации. Чтобы сократить количество необходимых размеченных данных, обычно используется подход переноса обучения (transfer learning). Для этого в качестве сверточной части используют сеть, предварительно обученную для решения задачи классификации на большом датасете (обычно ImageNet). Использование предобученной части позволяет выделять значимые признаки на любом изображении. Далее используется небольшая полносвязная сеть для решение задачи классификации. Использование такого подхода позволяет снизить размер обучающей выборки до нескольких тысяч или даже сотен примеров каждого класса. Тем не менее у вышеописанного подхода есть два существенных недостатка:

  • Если к сети, обученной определять 1000 классов необходимо добавить еще один класс, нужно существенно менять архитектуру сети и заново переобучать ее с использованием полного датасета, а не только изображений из добавленных классов (есть трюки, чтоб ослабить эту проблему но они не сильно меняют общее положение дел).

  • Для некоторых задач сбор данных может быть очень затруднительным

Поэтому чашей грааля для решения задачи классификации является реализации концепции Zero shot learning - создание классификатора, способного решить произвольную задачу классификации без обучающих примеров. Звучит немного фантастично и малопонятно, как это может быть реализовано. Многие считают, что для создания такого рода классификатора необходимо привлечь "понимание" естественного языка. К счастью, последние пару лет был достигнут большой прогресс в использовании нейросетей типа Transfirmer для обработки естественного языка. Кроме того, сейчас растет популярность и использования трансформеров для обработки изображений.

Почему CLIP?

В январе этого года был сделан прорыв в области обработки изображений - OpenAI представила новый генератор изображений Dall-E, который может генерировать изображение на основе текстового описания. Несмотря на название OpenAI, код Dall-E не является открытым. Тем не менее, меня очень заинтересовала вспомогательная нейронная сеть для обучения Dall-E и отбора лучших примеров. Это сеть CLIP. CLIP, в отличие от Dall-E, проект с открытым исходным кодом, опубликованный под лицензией MIT, поэтому его можно легко использовать в своих целях. Эта нейронная сеть выглядит не столь впечатляющей для демонстраций публике, но меня она очень удивила. В целом, это двухмодульный проект. Первый модуль - эффективная нейронная сеть Image Transformer. Этот модуль использует State-of-Art механизм внимания для кодирования изображения в 512-мерное пространство. Другая часть - нейросеть-трансформер для обработки текста, который преобразует текст в вектор в то же 512-мерное пространство. Сеть обучалась на большом массиве изображений (каком именно я не нашел, но, похоже, что это что-то типа "весь интернет", "вся википедия" или "весь инстаграм", как недавно сделали в Facebook AI). Процедура обучения не раскрывается, но предполагаю, что использовался loss типа Cosface или Arcface и различные параметры обучения для каждого из модулей. При обучении картинка с подходящей подписью должны быть близки, а с неподходящей - максимально далеки в пространстве embedding-ов.

CLIP хорошо работает для решения задачи zero-shot learning. Для этого необходимо создать набор предложений с использованием шаблона. Предложения могут быть типа "This is a photo of OBJECT", где OBJECT - название одного из множества классов. Набор предложений можно превратить при помощи текстового модуля transformer в набор векторов. Далее смотрится на какой из векторов больше всего похоже закодированное при помощи второго модуля изображение. Если нормировать близость векторов при помощи Softmax, то можно интерпретировать результат как вероятность того, что изображение принадлежит к какому-то классу.

Оказалось, что для многих задач Zero-shot learning работает даже лучше, чем натренированные на специально отобранных датасентах state-of-art сверточные нейросети.

Архитектура приложения

С учетом вышесказанного, приложение для классификации может работать следующим образом.

Телефон обладает доступом к изображениям (с камеры или из хранилища). После предварительной обработки (изменения разрешения и перенормировки каналов) изображение при помощи нейросети-трансформера превращается в 512-мерный вектор. Данный вектор сравнивается с каждым из векторов одного из предварительно сгенерированных наборов. Выдается описание 5 наиболее близких векторов.

Кроме того, существует возможность отправить запрос на сервер для генерации собственного классификатора. Отправляется строка - шаблон и набор классов. С сервера возвращается и сохраняется простой torchScript модуль, содержащий набор векторов и необходимые действия с ними. У пользователя появляется собственный классификатор! Работа с CLIP была на python, Android приложение - на JAVA. Серверная часть - Python/FLASK. Архитектура показана на рисунке.

Работаем с CLIP (Python)

Для разработки я использовал дистрибутив Anaconda python. Установим необходимые библиотеки

conda create-name pytorchconda activate pytorchconda install-yes -c pytorch pytorch=1.7.1 torchvision cudatoolkit=11.0conda create-name pytorchpip install ftfy regex tqdmpip install git+https://github.com/openai/CLIP.gitconda install -c conda-forge notebook

После установки необходимых библиотек импортируем их в проект:

И создаем модель CLIP. Работу с ним будем вести на GPU:

Нейросеть для обработки изображений - это модуль clipmodel.visual. Попробуем скомпилировать его и сохранить его отдельно в виде модуля TorchScript. Для этого используется JIT компилятор библиотеки torch (torch.jit). Для JIT компиляции необходимо запустить модуль visual на каком-либо изображении:

Найдем несколько списков названий классов (я использовал 4000 наиболее часто используемых существительных в английском языке, список пород кошек, список 10000 самых известных людей, список названий еды, национальностей и еще несколько других). Функция create_pt_xml создает предложения по шаблону, разбивает их на части (токенизирует), превращает в набор векторов, создает и сохраняет TorchScript модуль для сравнения любого вектора с векторами из набора и нахождения 5 ближайших векторов. create_pt_xml также сохраняет xml файл для чтения названий классов. Сгенерированные файлы будут использованы в приложении:

Создаем приложение для Android (Java)

Так как у меня нет опыта разработки на быстро набирающем популярность языке Kotlin, я использовал Java. В отличии от python, код на Java гораздо более громоздкий и менее выразительный. Поэтому, чтобы не перегружать пост оставлю только 2 наиболее важные части кода. Остальные части - описание работы кнопок/интерфейс - достаточно стандартные для любого приложения.

Первая важная часть - загрузка *.pt модели из папки assets и получение ответа в виде строки в TextView:

В качестве бонуса я добавил модуль, натренированный классическим образом (на большом датасете) для определения вероятности наличия пневмонии (python код не приведен) по флюорографии - сейчас такую задачку очень любят решать.

Вторая важная часть - отправление запроса на сервер и сохранения ответа (модели) в ExternalFilesDir. Список классов и название модели сохраняются в той же директории:

Серверная часть (Python/flask)

Я арендовал VPS на одном из сервисов. О системе - я запустил сервер apache 2.0 с WSGI / Flask под Centos 7 (для меня это была самая сложная часть проекта, поскольку я никогда раньше не работал с развертыванием сервера, на это ушло несколько дней постоянного поиска в Google/StackOverflow). Серверная часть Python очень похожа на функцию cerate_xml. Единственная разница заключается в обработке запросов и отправке сгенерированного файла модели. К сожалению, сервер работает не так быстро, как хотелось бы. Возможно, если приложение будет популярным, мне стоит перенести наиболее тяжелые вычисления (кодирование текста через текстовый трансформер) в AWS Lambda:

Буду смотреть по нагрузке.

Крутая часть! Тестируем!

Приложение работает на удивление хорошо. Сначала давайте повторим удивительные результаты OpenAI, который обнаружил схожие активации нейронов в одной концепции, представленной разными способами:

Общий классификатор NOUN использует 4000 наиболее часто используемых английских существительных и предложений, сгенерированных шаблоном This is an image of NOUN.

Приложение определенно понимает, что все эти понятия относятся к паукам. Интересно, что оно распознает текст, поэтому я решил протестировать его на работах бельгийского художника Рене Магритта, чьи работы в значительной степени наполнены взаимодействием слов и изображений:

Ничего особо интересного. Интересное начинается, когда я пытаюсь описать этот образ, используя национальность или использовать модель, которая может выдать только названия коктейлей или породы собак:

Здесь есть что-то определенно французское) Мне стало особенно любопытно, можно ли описывать любые изображения, используя названия коктейлей:

Список похожих коктейлей меня порадовал. Зеленый гоблин? Кажется у сети все неплохо со знанием вселенной Marvel) Также очевидно, что это изображение чего-то красного и большого, летающего или даже похожего на насекомое. Чтобы протестировать серверную часть, я создал на телефоне классификатор, определяющий профессию по изображению. Я нашел список из 30 профессий в одном из учебников английского языка и добавил их. Модель была успешно сгенерирована на сервере и загружена. К сожалению, на это ушла пара минут (

Проверим, как работает созданная на сервере модель:

Видим, что приложение неплохо определяет профессии.

А кто по профессии Человек-Паук?

Что касается других классификаторов, то они работают хорошо:

Или, как выяснили исследователи OpenAI, мы можем использовать их для выявления некоторых географических ассоциаций:

Думаю ассоциативному мышлению неизбежно сопутствует предвзятость, поэтому от нее невозможно полностью избавиться в сложных системах типа нейросетей:

Посмотрим, понимает ли нейросеть русский язык. Если использовать стандартный классификатор, результаты не очень. Если уточнить, что это животное - уже лучше:

При этом с пониманием английского проблем нет, а с китайским - есть:

А можно ли превратить приложение в переводчик? Ограничимся названием животных. Шаблон: This is NAME in russian. NAME - список из 100 часто встречающихся животных:

/

Загрузка на Google Play market

Основной проблемой стало ограничение на размер загружаемого *.aab файла. Из-за большого размера трансформерной нейросети приложение пришлось разбить на 2 части с использованием механизма Asset Delivery. Оказалось, что оно некорректно доставляет Assets при внутреннем тестировании - отправил запрос в техподдержку, но ответа не получил. Я подключил firebase для сбора аналитики, нарисовал простую страничку с описанием и отправил приложение в Play Market, где оно проверялось в течение 1 недели.

Монетизация

Если приложение будет пользоваться популярностью, я собираюсь добавить пару баннеров из AdMob, чтобы оплатить сервер и заработать немного денег)

Проблемы

В описываемом приложении есть несколько проблем. Во-первых, я обнаружил медленный (5 с) холодный запуск на нескольких устройствах из-за загрузки преобразователя изображений в оперативную память при запуске приложения. Вторая проблема - медленный ответ сервера на запросы новых классификаторов. Эту проблему можно решить, перенеся вычисления в облако (я думаю о сервисе AWS-lambda), но сейчас мне сложно оценить стоимость AWS. Мне, вероятно, следует ограничить ежедневные запросы к серверу для каждого пользователя или взимать плату с пользователей за расширение лимита, чтобы покрыть расходы AWS и обеспечить лучший UX. Третья проблема возникла сегодня - нестабильный доступ к серверу. Похоже связано с "замедлением Твиттера".

Что можно добавить

Еще подумываю добавить режим one-shot (одна фотография используется для создания классификатора). Это улучшение можно легко реализовать в приложении.

Если говорить непосредственно о перспективах CLIP - я много играл с комбинацией CLIP и BERT для генерации описания изображения на естественном языке и уже получил некоторые многообещающие результаты. Но BERT определенно нельзя запускать на телефонах, и даже на моей rtx3080 (успел купить за 80!) есть некоторые проблемы для быстрого прототипирования таких систем. Также пробовал реализовать CLIP Style transfer с описанием обработки фото естественным языком - результат пока не очень, на это следовало бы потратить больше времени. Кроме того подозреваю, что использование CLIP для покадровой обработки видео и анализа потока векторов при помощи трансформеров или LSTM (почему-то кажется, что LSTMы будут лучше работать) может привести к прорыву в нейросетевом описании видео - сейчас результаты в этой области достаточно скромные.

Спасибо!

Не стесняйтесь обращаться ко мне в случае новых идей, предложений или вопросов!

Полезные Ссылки:

О Dall-e:

О CLIP

О мультимодальных нейронах CLIP

Github CLIP

Подробнее..

Делаем телеграм бота за 5 минут быстрый старт с продвинутым шаблоном

27.03.2021 00:13:02 | Автор: admin

В последнее время я сделал насколько много ботов для телеграмма, что крайне преисполнился в том, как их писать, как хостить, да и в принципе выработал красивый шаблон для быстрого их создания.

Сразу могу предложить посмотреть на то, что получиться в конце этого туториала. Для этого я запустил бота с идентичном шаблону кодом.

Также стоит отметить, что хостинг в этом примере бесплатный, но его хватает для разумной нагрузки.

А еще сразу скажу, что далее будет все на питоне... Вот... Сказал. Не буду больше ходить вокруг да около, у нас всего 5 минут (помните, да?). Приступим!

Пошаговая инструкция

1) Создаем репозиторий на гитхабе из моего шаблона

2) Регистрируемся на Heroku

3) Создаем новое приложение

4) Привязываем наш репозиторий к проекту на Heroku

5) Настраиваем автоматический deployment

6) Смотрим на адрес, где будет висеть наш бот

7) Настраиваем переменные среды

KEY

VALUE

BOT_TOKEN

Токен для бота

WEBHOOK_TOKEN

Рандомная строка из букв для безопастности

ADMIN_PASSWORD

Еще одна рандомная строка из букв для безопастности

HOST

Адрес полученный в пункте 6 (например fancy-panda.herokuapp.com). Обратите внимание на формат!

IS_PRODUCTION

True

LOG_BOT_TOKEN

Токен для бота, куда будут отправляться логи

ADMIN_ID

user_id, куда будут отправляться логи (получить в боте @userinfobot)

8) Собираем наше приложение и ждем пока оно запустится

9) Заходим по адресу из пункта 6 и добавляем к ссылке пароль. Получиться что-то такое: fancy-panda.herokuapp.com/?password=<ADMIN_PASSWORD>

10) Устанавливаем webhook, переходя по ссылке на подобие fancy-panda.herokuapp.com/set_webhook?password=<ADMIN_PASSWORD>

Тестируем

Теперь, когда мы закончили все настраивать, пора посмотреть, что же мы "натворили".

Посмотреть в живую можно тут.

Пример работы из коробкиПример работы из коробкиПример работы логированияПример работы логирования

Добавляем функционал

Теперь, когда у вас есть рабочий бот, который сам разворачивается и запускается в облаке, пришло время добавить свои функции. Для примера такую:

@bot.message_handler(commands=["id"])def get_id(message):    logger.info(f'</code>@{message.from_user.username}<code> used /id')    bot.send_message(message.chat.id, f"user_id = {message.chat.id}")

Думаю, дальше ограничивает вас только воображение... (ну почти)

Применение в проектах

Все любят, когда есть примеры работы. На основе этого шаблона я сделал бота wifi_qr_bot, который генерирует QR-коды для подключения к WiFi. Это упрощает жизнь, ведь пароль у вас длинный (безопасность, все дела), а вводить его на каждом новом устройстве вам лень.

Выводы

Вот мы и сделали нашего бота, который хоститься в облаке. Он уже многое умеет в плане логирования. Для логирования я написал отдельную библиотеку, tg-logger. Если интересно, как она работает, то потыкайте в демо бота. Если все еще интересно, прочитайте мою статью. Такие пироги с котятками...

Ссылки

Подробнее..

Телеграмм-бот на Python

12.04.2021 20:16:10 | Автор: admin

Недавно я попал на стажировку в новую для себя IT-компанию и наш (моей команды) проект был - бот для телеграмма, который автоматизирует часть работы hr-менеджеров. Первую неделю нам дали на самостоятельное изучение всего, что мы посчитаем нужным (а я убежден, что лучший способ что-то изучить - это практика), так что я начал действовать. Язык программирования был выбран python (наверное понятно из обложки почему), так что в этой статьи я разберу пример именно с ним.

BotFather

Чтобы создать телеграмм-бота, достаточно написать пользователю @BotFather команду /newbot. Он запросит название и @username для будущего бота. Тут ничего сложного - он все подсказывает (главное, чтобы @username был не занят и заканчивался на "bot"). BotFather пришлет HTTP API токен, который мы и будем использовать для работы с ботом.

Создание ботаСоздание бота

Telebot и сила python

Мне всегда казалось, что создавать бота - это не так просто. Честно говоря, давно хотел попробовать, но то ли не хватало времени (думал, что это займет не один вечер), то ли не мог выбрать технологию (как-то смотрел туториал для c#), ну а скорее всего было просто лень. Но тут мне понадобилось это для работы, так что я больше не смел откладывать.

Сила python заключается в его популярности. А, как следствие, в появлении огромного количества сторонних библиотек практически под все нужды. Именно это сделало возможным написание примитивного бота (который просто отвечает однотипно на сообщения) в 6 (ШЕСТЬ!) строчек кода. А именно:

import telebotbot = telebot.TeleBot('1111105161:AAHIjyAKY4fj62whM5vEAfotuixC5syA-j8')@bot.message_handler(commands=['start'])def start_command(message):    bot.send_message(message.chat.id, "Hello!")bot.polling()
Первое сообщениеПервое сообщение

На самом деле бот будет отвечать только на команду /start, но для начала неплохо. Здесь все довольно просто: в первой строчке импортируется библиотека telebot (для ее работы необходимо установить пакет pyTelegramBotAPI командой pip install pyTelegramBotAPI (НЕ pip install telebot!), далее создаем объекта бот, используя токен, который нам прислал BotFather. Третья строчка проверяет, что присылают пользователи (в данном случае это только команда /start), и, если проверка пройдена, то бот отправляет ответ с текстом Hello!. Последняя строчка, наверное, самая сложная для понимания, и в следующих разделах я ее подробно разберу. Сейчас же я только скажу о ее предназначении - она заставляет бота работать, то есть "реагировать" на полученные сообщения.

Flask & Requests

Telebot, конечно, круто, но есть одно важное НО. По предположению нашего проекта, у hr-ов должен быть сервис (сайт), где они будут работать и через него отправлять/получать информацию пользователям/от них. Соответственно, нам нужно самим контролировать сервер и обрабатывать запросы. На мой взгляд самый простой способ создания сервера на python - фреймворк flask. Так выглядит простейший сервер, запускаемый локально на 5000-ом порту (http://localhost:5000/):

from flask import Flask app = Flask(__name__)@app.route("/", methods=["GET"])def index():    return "Hello, World!"  if __name__ == "__main__":    app.run()

Для работы бота нужно немного больше, а именно нужно добавить функцию отправки сообщений. Я не хочу полностью переписывать статью (habr), а воспользуюсь результатом и пойду с конца. Так выглядит программа, которая заставляет бота посылать Hello! на любое входящее сообщение:

from flask import Flask, requestimport requestsapp = Flask(__name__)def send_message(chat_id, text):    method = "sendMessage"    token = "1111105161:AAHIjyAKY4fj62whM5vEAfotuixC5syA-j8"    url = f"https://api.telegram.org/bot{token}/{method}"    data = {"chat_id": chat_id, "text": text}    requests.post(url, data=data)@app.route("/", methods=["POST"])def receive_update():    chat_id = request.json["message"]["chat"]["id"]    send_message(chat_id, "Hello!")    return "ok"if __name__ == "__main__":    app.run()

К сожалению, в таком варианте программа работать не будет. Точнее будет, но не сразу. Проблема заключается в том, что телеграмм пока что не знает, куда посылать информацию о полученных сообщениях. Для ее решения у telegram API есть метод setWebhook. Суть метода заключается в том, что мы просто отправляем телеграмму url, по которому мы хотим получать информацию о новых обращениях к боту (в нашем случае это http://localhost:5000/). Однако, мы не можем просто сказать телеграмму: "Посылай запросы на localhost", ведь для каждого сервера localhost будет разным. Еще одна проблема заключается в том, что метод setWebhook поддерживает только https url-ы. Для решения этих проблем можно воспользоваться программой ngrok, которая строит туннель до локального хоста. Скачать ее можно по ссылке ngrok, а для запуска туннеля достаточно ввести команду ngrok http 5000. Должно получиться так:

ngrokngrok

Теперь можно задействовать метод setWebhook, например, через postman. Нужно отправить post запрос на https://api.telegram.org/bot<ТОКЕН>/setWebhook с указанием в теле нужного url. Должно получиться аналогично:

setWebhooksetWebhook

Соединение

Чем больше я работал с библиотекой telebot, тем больше она мне нравилась. Хотелось бы, используя приложение на flaske, не терять эту возможность. Но как это сделать? Во-первых, мы можем вместо нашей функции send_message использовать готовую из библиотеки. Это будет выглядеть так:

from flask import Flask, requestimport telebotapp = Flask(__name__) bot = telebot.TeleBot('1111105161:AAHIjyAKY4fj62whM5vEAfotuixC5syA-j8')@app.route("/", methods=["POST"])def receive_update():    chat_id = request.json["message"]["chat"]["id"]    bot.send_message(chat_id, "Hello!")    return "ok"if __name__ == "__main__":    app.run()

Но, если присмотреться, можно заметить, что мы потеряли часть функционала, а именно @bot.message_handler - декораторы, которые отслеживают тип введенного боту сообщения (картинка, документ, текст, команда и т. д.). Получается, что если мы используем в качестве сервера наше flask приложение, то мы теряем некоторый функционал библиотеки telebot. Если же мы используем bot.polling(), то мы не можем обращаться к серверу со стороны. Конечно, хотелось бы как-то все соединить без потерь. Для этого я нашел немного костыльный способ, однако рабочий:

from flask import Flask, requestimport telebotbot = telebot.TeleBot('1111105161:AAHIjyAKY4fj62whM5vEAfotuixC5syA-j8')bot.set_webhook(url="http://personeltest.ru/aways/8c6f687b75c9.ngrok.io")app = Flask(__name__)@app.route('/', methods=["POST"])def webhook():    bot.process_new_updates(        [telebot.types.Update.de_json(request.stream.read().decode("utf-8"))]    )    return "ok"@bot.message_handler(commands=['start'])def start_command(message):    bot.send_message(message.chat.id, 'Hello!')if __name__ == "__main__":    app.run()

Здесь мы пользуемся методом set_webhook, аналогично тому, как мы делали это ранее через postman, а на пустом роуте прописываем "немного магии", чтобы успешно получать обновления бота. Конечно, это не очень хороший способ, и в дальнейшем лучше самостоятельно прописывать функционал для обработки входящих сообщений. Но для начала, я считаю, это лучшее решение.

Заключение

Написать телеграмм-бота на python и правда не так уж сложно, однако есть и подводные камни, о которых я постарался рассказать в данной статье. Конечно, это только начало, но последний фрагмент кода вполне можно использовать, как шаблон для серверной части, которая будет работать с телеграмм-ботом. Дальше остается только наращивать функционал по Вашему усмотрению.

Подробнее..
Категории: Python , Bot , Telegram , Flask

Оно живое! Вышла версия Flask 2.0

13.05.2021 18:23:09 | Автор: admin

Незаметно от всех 12 мая 2021 вышла новая версия известного микрофреймворка Flask. Хотя казалось, что во Flask есть уже все, ну или почти все, что нужно для микрофреймворка.
Предвкушая интерес, а что же нового завезли, оставлю ссылку на Change log.

Из приглянувшихся особенностей новой версии:

  • Прекращена поддержка Python версии 2. Минимальная версия Python 3.6

  • Поддержка асинхронных view и других обратных вызовов, таких как обработчики ошибок, определенные с помощью async def. Обычные синхронные view продолжают работать без изменений. Функции ASGI, такие как веб-сокеты, не поддерживаются.

  • Добавьте декораторы роутов для общих методов HTTP API.
    @app.post ("/ login") == @ app.route ("/ login", methods = ["POST"])

  • Новая функция Config.from_file для загрузки конфигурации из файла любого формата.

  • Команда flask shell включает завершение табуляции, как это делает обычная оболочка python.

  • При обслуживании статических файлов браузеры будут кэшировать на основе содержимого, а не на основе 12-часового таймера. Это означает, что изменения статического содержимого, такого как стили CSS, будут немедленно отражены при перезагрузке без необходимости очистки кеша.

Рассмотрим асинхронность

Все бы было хорошо, но в самом начале после установки был не найден модуль asgiref. Доустановим руками.

Для примера напишем самое простое приложение: Ping/Pong. Оно не имеет особого смысла и сложной логики, только имитирует некоторую проверку "жив ли сервис". Также это приложение станет бенчмарком.

from flask import Flaskapp = Flask(__name__)@app.get('/')async def ping():    return {'message': 'pong'}if __name__ == '__main__':    app.run(host='0.0.0.0')

Деплой

Как было сказано в Change log: "Функции ASGI, такие как веб-сокеты, не поддерживаются."
То есть только единственный способ задеплоить приложение используя gunicorn.

Команда: gunicorn -w 8 --bind 0.0.0.0:5000 app:app
-w 8 - 8 запущенных процессов
--bind 0.0.0.0:5000 - адрес приложения

Сверим производительность

Команда для нагрузочного тестирования: wrk -t 8 -c 100 -d 5 http://localhost:5000

Асинхронное приложение Flask 2.0:
Running 5s test @ http://localhost:5000
8 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 17.80ms 2.37ms 36.95ms 91.92%
Req/Sec 673.44 163.80 3.86k 99.75%
26891 requests in 5.10s, 4.21MB read
Requests/sec: 5273.84
Transfer/sec: 844.69KB

Синхронное приложение Flask 2.0:
Running 5s test @ http://localhost:5000
8 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.91ms 842.62us 21.56ms 89.86%
Req/Sec 2.38k 410.20 7.64k 93.53%
95301 requests in 5.10s, 14.91MB read
Requests/sec: 18689.25
Transfer/sec: 2.92MB

Синхронное приложение Flask 1.1.2:
Running 5s test @ http://localhost:5000
8 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.98ms 823.42us 17.40ms 88.55%
Req/Sec 2.37k 505.28 12.23k 98.50%
94522 requests in 5.10s, 14.78MB read
Requests/sec: 18537.84
Transfer/sec: 2.90MB

В качестве вывода

Исходя из результатов бенчмарков можно увидеть, что 1 и 2 версия в синхронном режиме выдают одинаковые результаты(с небольшой погрешностью). Что касается асинхронности в Flask 2.0 можно сделать вывод, что пока она слишком сырая даже в dev режиме запуска асинхронный view отстает от синхронного. Но также не стоит забывать о том что ASGI пока не поддерживается, и нет возможности запустить через uvicorn. Остается только ждать обновления и следить за дальнейшим развитием.

Обновилась именно major версия, а это значит у нас есть надежда новую переосмысленную итерацию фреймворка. Разработчиков стоит похвалить как минимум за то, что проект не заброшен и старается успевать за основными тенденциями в других фреймворках. Лично мне очень нравится Flask, он совершенно не перегружен, как например Django.

Если у вас есть идеи, почему получились именно такие результаты - пожалуйста поделитесь ими в комментариях.

Подробнее..

API для генерации ответов сервера с любыми кодами статусов

20.07.2020 14:14:41 | Автор: admin

Привет, Хабр! Работая над библиотекой-обёрткой REST API, я столкнулся с проблемой. Для тестирования обработки ошибочных кодов ответа сервера (400, 500, 403 и т.д.) необходимо искусственно создавать условия на сервере для получения соответствующих кодов. При правильно настроенном сервере, например, непросто получить ошибку 500. А тестировать функции-обработчики ошибок как-то надо. Я написал небольшое API, которое генерирует ошибочные ответы сервера httpme.tk


Как применять в тестировании?


Например, есть такой код (python3):


from requests import session as requests_sessionsession = requests_session()session.hooks = {    'response': lambda r, *args, **kwargs: raise AccessError('Доступ закрыт, т.к. сервер подключен к другой БД') if r.status_code == 403  else pass}class AccessError(Exception):    """ 'своя' ошибка """    passdef getter(url):    return session.get(url)

Если кратко в коде есть функция, которая возвращает ответ сервера на GET-запрос на заданный URL, если в результате выполнения запроса возникает ошибка 403 вызывается внутреннее исключение модуля AccessError.


Этот код надо протестировать и отладить. Cоздать вручную условия для ошибки 403, а уж тем более, например, 500 (сервер слишком хорошо работает) довольно непросто. Тестировщику не важно, при каких условиях сервер выдаст ошибку 403: он тестирует не само API (например), а функцию, которая к нему обращается. Поэтому для тестирования вызова исключения при коде статуса 403 он может сделать вот так (python3 + pytest):


import pytestfrom mymodule import def test_forbidden():    with pytest.raises(AccessError):        getter('http://httpme.tk/403')

Как пользоваться?


Очень просто. Отправьте на сервер GET-запрос в формате http://httpme.tk/<status_code>. Например так (cURL):


curl -G http://httpme.tk/500

Или так (python3):


from requests import getget('http://httpme.tk/408')  # <Response [408]>

А что внутри?


А внутри маленькое Flask-приложение, вызывающее функцию abort(status_code) на каждый запрос.


Ссылка на GitHub


На этом всё!


Интересно услышать оценку полезности данного сервиса сообществом.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru