Русский
Русский
English
Статистика
Реклама

Multithreading

Клиент-серверный IPC на Python multiprocessing

11.01.2021 16:10:01 | Автор: admin

Статья отражает личный опыт разработки CLI приложения для Linux.

В ней рассмотрен способ выполнения привилегированных системных вызовов процессом суперпользователя по запросам управляющей программы через строго описанный API.

Исходный код написан на Python для реального коммерческого приложения, но для публикации абстрагирован от конкретных задач.

Введение

Межпроцессное взаимодействие (англ. inter-process communication, IPC) обмен данными между потоками одного или разных процессов. Реализуется посредством механизмов, предоставляемых ядром ОС или процессом, использующим механизмы ОС и реализующим новые возможности IPC. Википедия

У процессов, могут быть разные причины для обмена информацией. На мой взгляд все они являются следствием политики безопасности ядра Unix.

Как известно, ядро Unix это автономная система, которая функционирует без вмешательства человека. Собственно говоря, пользователь это объект операционной системы, который появился чтобы обезопасить ядро от несанкционированного вмешательства.

Обеспечение безопасности ядра заключается в разделении адресного пространства операционной системы на пространство ядра и пространство пользователя. Отсюда два режима работы системы режим пользователя и режим ядра. Причем, смена режимов это переключение между двумя пространствами.

В режиме пользователя недоступны области памяти, зарезервированные ядром, и системные вызовы, которые изменяют состояние системы.

Тем не менее таким доступом обладает суперпользователь.

Предпосылки параллелизма

Если ваша программа не использует привилегированные системные вызовы, вам не нужен суперпользователь, а значит можно писать монолит без параллелизма.

В противном случае вам придётся запускать свою программу под рутом.

На этом можно было бы закончить, если бы не проблема доступа к эксклюзивным ресурсам пользовательского окружения из окружения суперпользователя.

Представьте, что вы обращаетесь к ресурсу или к переменной окружения, которая присутствует только в пользовательском окружении.

Например, служба, которая вам нужна, зависит от окружения рабочего стола, которое доступно только когда пользовательская сессия активна. В этом случае объект, который вам нужен просто не существует в окружении суперпользователя. Для того, чтобы получить к нему доступ, вам нужен процесс в пользовательском окружении, а для системных вызовов процесс в руте.

При этом вы можете запросить у процесса в руте исполнение системного вызова из пользовательского процесса при помощи одного из методов IPC.


Таблица методов межпроцессного взаимодействия

Метод

Реализуется ОС или процессом

Неименованный канал

Все ОС, совместимые со стандартом POSIX.

Разделяемая память

Все ОС, совместимые со стандартом POSIX.

Очередь сообщений (Message queue)

Большинство ОС.

Сигнал

Большинство ОС; в некоторых ОС, например, в Windows, сигналы доступны только в библиотеках, реализующих стандартную библиотеку языка Си, и не могут использоваться для IPC.

Почтовый ящик

Некоторые ОС.

Сокет

Большинство ОС.

Именованный канал

Все ОС, совместимые со стандартом POSIX.

Проецируемый в память файл (mmap)

Все ОС, совместимые со стандартом POSIX. При использовании временного файла возможно возникновение гонки. ОС Windows также предоставляет этот механизм, но посредством API, отличающегося от API, описанного в стандарте POSIX.

Обмен сообщениями (без разделения)

Используется в парадигме MPI, Java RMI, CORBA и других.

Файл

Все ОС.

Семафор

Все ОС, совместимые со стандартом POSIX.

Канал

Все ОС, совместимые со стандартом POSIX.


Для своего приложения я выбрал сокеты и написал API для коммуникации между процессами.

Таким образом в распоряжении конечного пользователя оказывается программа, которая не требует прав суперпользователя, но запрашивает исполнение системных вызовов у процесса в руте через соответствующий сокет.

При этом процесс в руте, запускается при загрузке системы и остаётся активным всегда, прослушивая сокет на наличие входящих дейтаграмм.

Историческая справка

Традиционно процессы, которые запускаются при загрузке системы и остаются активными в фоне, классифицируются как daemon. Имена исполняемых файлов таких программ по соглашению заканчиваются на d. Пример: systemd.

Программы пользовательского пространства, взаимодействующие с daemon можно назвать управляющими, что также по соглашению отражено в их названиях. Пример: systemctl.

Известны и другие примеры: ssh и sshd.

В более современной интерпретации их называют клиентами и серверами с тем отличием, что последние взаимодействуют по сетевым протоколам. Однако, это не мешает нам использовать сокеты исключительно для локальных процессов когда сокет принимает дейтаграммы через конвейер.

Структура проекта

Для сервера и клиента я использую одинаковую структуру.

. core  api.py  __init__.py main.py

core это пакет, в который можно положить модули с любой логикой. В модуле api реализованы методы обращения процессов друг к другу.

Реализация API клиента

from multiprocessing.connection import Clientfrom multiprocessing.connection import Listener# адрес сервера (процесса в руте) для исходящих# запросовdaemon = ('localhost', 6000)# адрес клиента (этого процесса) для входящих# ответов от сервераcli = ('localhost', 6001)def send(request: dict) -> bool or dict:    """    Принимает словарь аргументов удалённого метода.    Отправляет запрос, после чего открывет сокет    и ждет на нем ответ от сервера.    """    with Client(daemon) as conn:        conn.send(request)    with Listener(cli) as listener:        with listener.accept() as conn:            try:                return conn.recv()            except EOFError:                return Falsedef hello(name: str) -> send:    """    Формирует уникальный запрос и вызывает функцию    send для его отправки.    """    return send({        "method": "hello",        "name": name    })

В модуле connection пакета multiprocessing есть два класса, реализующих API высокого уровня над низкоуровнивым аналогом стандартной библиотеки socket.

Client класс, который содержит методы отправки дейтаграмм.

Listener принимает дейтаграммы.

Отправляемые запросы содержат название целевого метода сервера.

Причем запросы не требуют никаких преобразований на сервере, ведь он тоже написан на Python, который интерпретирует поступающие данные также, как и клиент. Всё это происходит под капотом и не может не радовать.

Использование API

В main.py я импортирую модуль api для дальнейшего использования.

from core import apiresponse = api.hello("World!")print(response)

Этот код представлен для демонстрации. В работе я использовал Сlick Framework для создания СLI приложения с опциями, которые вызывают методы API.

Реализация API сервера

По идее метод должен выполнять системный вызов, который требует прав суперпользователя. Иначе всё это теряет смысл.

def hello(request: dict) -> str:    """    Привилегированный системный вызов.    """    return " ".join(["Hello", request["name"])

Использование API

from core import apifrom multiprocessing.connection import Listenerfrom multiprocessing.connection import Client# адрес сервера (этого процесса) для входящих запросовdaemon = ('localhost', 6000)# адрес клиента для исходящих ответовcli = ('localhost', 6001)while True:    with Listener(daemon) as listener:        with listener.accept() as conn:            request = conn.recv()            if request["method"] == "hello":                response = api.hello(request)            with Client(cli) as conn:                conn.send(response)

Сервер должен быть активен всегда, поэтому я запускаю его в бесконечном цикле.

Таким образом он всегда слушает порт 6000 и, при поступлении дейтаграммы, анализирует запрос. Затем он вызывает указанный в запросе метод и возвращает результат исполнения клиенту.

Дополнительно

Советую снабдить свой сервер пакетом systemd, который позволяет программам на Python писать лог в journald.

Для сборки вы можете использовать pyinstaller он запакует ваш код в бинарный файл со всеми зависимостями. Не забудьте про соглашение о наименовании исполняемых файлов, упомянутое ранее.

Спасибо за внимание!

Подробнее..

Многопоточное скачивание файлов с ftp python-скриптом

18.01.2021 00:17:12 | Автор: admin

Зачем это нужно?

Однажды передо мной встала задача копирования большого количества файлов с ftp-сервера. Нужно было делать бэкап. Казалось бы, что может быть проще! Но увы, ничего готового работающего так же быстро для моих условий найти не удалось.

Ситуация

Нужно было забирать периодически пару сотен файлов с ftp-сервера под Windows. Много мелочи и несколько очень крупных по размеру файлов. Суммарно примерно на 500 Гб. Сервер представляет собой vps, расположенный довольно далеко за рубежом. Днем машина высоко нагружена, рано ночью выполняются регламентные работы, итого на скачивание часов 5 максимум.

Ни одна из рассмотренных мной утилит не смогла справиться качественно и за отведенное время. Ну что ж, деваться некуда, нормальную систему резервного копирования ещё не купили, а значит ноги в руки вооружаемся редактором или IDE Python и вперёд! За приключениями!

Конфиг

Все параметры для скрипта вынесем в отдельный файл для удобства.

Шаблон конфига:

host = 'ip.ip.ip.ip'user = 'ftpusername'passwd = 'ftppassword'basepath = '/path/to/backup/folder'  # Папка, в которой будут созданы подпапки со скачанными файламиmax_threads = 20 # максимальное количество одновременных процессов загрузкиlog_path = '\path\to\logfile'statusfilepath = '\path\to\statusfile'

Конфиг сохраняем с расширением .py и импортируем в начале нашего скрипта. Импортировать можно непосредственно в пространство имён скрипта, но я сделал конструкцию слегка напоминающую костыль в основной части моего скрипта:

if __name__ == "__main__":    host = config.host    user = config.user    passwd = config.passwd    basepath = config.basepath  # Папка, в которой будут созданы подпапки со скачанными файлами    max_threads = config.max_threads    log_path = config.log_path    statusfilepath = config.statusfilepath    main()

В начале был список

Скачать файлы с ftp сама по себе задача не сложная, но путём недолгих экспериментов было выяснено, что скачивание файлов занимает время, а таймаут ftp-соединения приходит к нам гораздо быстрее. Следовательно, качать нужно каждый файл в новом соединении, иначе велик риск чего-то потом не досчитаться в скачанных файлах.

Для этого нам нужен список этих самых файлов. Ни о каком статичном списке файлов, конечно, речи не идет, значит нам его при каждом выполнении скрипта получать с сервера по-новой.

Удобства ради и чтобы не таскать параметры по всему коду - переопределим параметры стандартного класса ftp:

class MyFtp (ftplib.FTP):    """Класс переопределяет стандартный, чтобы задать все параметры соединение в одном месте"""    def __init__(self):        self.host = host        self.user = user        self.passwd = passwd        self.timeout = 1800        super(MyFtp, self).__init__()    def connect(self):        super(MyFtp, self).connect(self.host, timeout=self.timeout)    def login(self):        super(MyFtp, self).login(user=self.user, passwd=self.passwd)    def quit(self):        super(MyFtp,self).quit()

Параметры берутся из конфига. Конечно же в нужно не забыть импортировать библиотеку ftplib, чтобы этот кусок заработал.

Список файлов с сервера мы получим с помощью следующего класса:

class FileList:    """Класс для работы со списком загружаемых файлов"""    def __init__(self):        self.ftp = None        self.file_list = []    def connect_ftp(self):        import sys        self.ftp = MyFtp()        self.ftp.connect()        self.ftp.login()        self.ftp.__class__.encoding = sys.getfilesystemencoding()    def get_list(self, name):        """Метод для получения списка всех файлов с ftp-сервера."""        import os        for dirname in self.ftp.mlsd(str(name), facts=["type"]):            if dirname[1]["type"] == "file":                entry_file_list = {}                entry_file_list['remote_path'] = name  #путь до файла                entry_file_list['filename'] = dirname[0]  #имя файла                self.file_list.append(entry_file_list)            else:                path = os.path.join(name, dirname[0])                self.get_list(path)    def get_next_file(self):        return self.file_list.pop()    def len(self):        return len(self.file_list)

Помимо методов соединения с сервером, получения списка файлов и определения его длины здесь имеет метод, который возвращает нам следующий файл для скачивания, из списка он при этом, конечно, удаляется.

Логирование

Для ведения логов скачивания будет использовать стандартную библиотеку logging. Создадим класс, который будет заниматься логированием.

class MyLogger:    """Класс для логирования событий"""    def __init__(self):        self.logger = None    def start_file_logging(self, logger_name, log_path):        """Обычное логирование в файл"""        import logging        self.logger = logging.getLogger(logger_name)        self.logger.setLevel(logging.INFO)        try:            fh = logging.FileHandler(log_path)        except FileNotFoundError:            log_path = "downloader.log"            fh = logging.FileHandler(log_path)        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')        fh.setFormatter(formatter)        self.logger.addHandler(fh)    def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):        """Логирование в файл с ротацией логов"""        import logging        from logging.handlers import RotatingFileHandler        self.logger = logging.getLogger(logger_name)        self.logger.setLevel(logging.INFO)        try:            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)        except FileNotFoundError:            log_path = "downloader.log"            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')        fh.setFormatter(formatter)        self.logger.addHandler(fh)    def add(self, msg):        self.logger.info(str(msg))    def add_error(self, msg):        self.logger.error(str(msg))

Скрипт будет поддерживать просто логирование в файл и ротацию файловых логов, ибо логи имеют свойство расти непомерно и это надо бы держать под контролем.

Скачивание файла

Каждый файл будет скачиваться в отдельном потоке. Класс, скачивающий один конкретный файл с сервера выглядит следующим образом:

class BaseFileDownload(threading.Thread):    """ Объект для копируемого файла """    count = 0    def __init__(self, rpath, filename, log):        threading.Thread.__init__(self)        self.remote_path = rpath        self.filename = filename        self.ftp = None        self.command = None        self.currentpath = None        self.log = log        self.__class__.count += 1 # для подсчета одновременно запущенных закачек    def __del__(self):        self.__class__.count -= 1    def connect(self):        """Метод для соединения с ftp"""        import sys        self.ftp = MyFtp()        self.ftp.connect()        self.ftp.login()        self.ftp.__class__.encoding = sys.getfilesystemencoding()    def run(self):        """Запуск потока скачивания"""        import os        self.connect()        self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')        self.currentpath = os.path.join(basepath, self.remote_path[3:])        self.ftp.cwd(self.remote_path)        if not os.path.exists(self.currentpath):            os.makedirs(self.currentpath, exist_ok=True)        self.host_file = os.path.join(self.currentpath, self.filename)        try:            with open(self.host_file, 'wb') as local_file:                self.log.add("Start downloading " + self.filename)                self.ftp.retrbinary(self.command + self.filename, local_file.write)                self.log.add("Downloading " + self.filename + " complete")        except ftplib.error_perm:            self.log.add_error('Perm error')        self.ftp.quit()

Для подсчета количества одновременно скачиваемых файлов мы будет использовать свойство класса count. В нём у нас будет количество существующих экземпляров класса: в конструкторе счетчик наращивается, в деструкторе, соответственно, уменьшается.

Метод для запуска скачивания должен обязательно называться run - это требование библиотеки threading (не забываем её импортировать!), которую мы будем использовать для параллельного запуска нескольких процессов скачивания.

При сохранении списка файлов скрипт сохраняет также путь до этого файла, этот путь мы воссоздаем при скачивании с помощью os.makedirs.

Статус-файл

По завершению скачивания скрипт будет записывать в файл уведомление об этом. Это уведомление можно мониторить zabbix, чтобы понимать когда бэкап не отработал или, как сделал я - написать простого бота, чтобы периодически проверять статус.

Класс для работы с этим файлов выглядит так:

class StatusFile:    """По окончанию задачи скрипт пишет в файл уведомление о корректном выполнении."""    def __init__(self):        self.msg = ''    def setstatus(self, msg):        global statusfilepath        with open(statusfilepath, 'w') as status_file:            status_file.write(msg)

Многопоточность

Ну и, наконец, сама основная функция скрипта, которая осуществляет работу с потоками скачивания:

def main():    import os    import datetime    import time    log = MyLogger()    log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) # запускаем логирование    now = datetime.datetime.today().strftime("%Y%m%d")    global basepath    basepath = os.path.join(basepath, now)  # модифицируем путь, добавляя текущую дату    list_file = FileList()    list_file.connect_ftp()    list_file.get_list("..")    for i in range(list_file.len()):        flag = True        while flag:   # цикл внутри которого поддерживается нужное количество одновременно запущенных загрузок            if BaseFileDownload.count < max_threads:                curfile = list_file.get_next_file()                threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)                threadid.start()                flag = False            else:                time.sleep(20)    log.add("Downloading files complete")    statusfile = StatusFile()    statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")

Здесь мы запускаем логирование, получаем список файлов ( он хранится в памяти).

В вечном цикле while мы проверяем количество одновременно запущенных скачиваний и, при необходимости, запускаем дополнительные потоки.

Исходный код целиком можно найти здесь.

Подробнее..
Категории: Python , Multithreading , Ftp

Реализуем кооперативную многозадачность на C

07.03.2021 00:21:34 | Автор: admin


Когда речь заходит о многозадачности в .Net, то в подавляющем большинстве случаев предполагается вытесняющая многозадачность на основе потоков операционной системы. Но в этой статье речь пойдёт о реализации кооперативной многозадачности, с помощью которой можно создать видимость одновременной работы нескольких методов, используя всего один единственный поток.


Вот наша простенькая заготовка:


static void Main(){    DoWork("A", 4);    DoWork("B", 3);    DoWork("C", 2);    DoWork("D", 1);}static void DoWork(string name, int num){    for (int i = 1; i <= num; i++)    {        Console.WriteLine($"Work {name}: {i}");    }    Console.WriteLine($"Work {name} is completed");}

Статический метод вызывается последовательно 4 раза и выводит символы A, B, C, D заданное количество раз. Символы, очевидно, также выводятся последовательно, и наша задача здесь добиться того, что бы они выводились попеременно, но при этом сильно не меняя исходный код и не используя дополнительные потоки.


Заголовок этой статьи как бы намекает, что тут нужно реализовать кооперативную многозадачность, в которой исполняемый код время от времени сообщает, что: Я пока работу приостанавливаю, и кто-нибудь еще может воспользоваться текущем потоком, чтобы сделать свои дела, но я надеюсь, что рано или поздно, мне это поток вернут обратно, ведь у меня еще куча дел.


Вопрос: "Какая конструкция C# позволяет прервать работу метода на какое-то заранее неизвестное время?" Правильно! await! Давайте сделаем наш метод асинхронным и добавим await после вывода в консоль очередного символа:


static async ValueTask DoWork(string name, int num){    for (int i = 1; i <= num; i++)    {        Console.WriteLine($"Work {name}: {i}");        await /*Something*/    }    Console.WriteLine($"Work {name} is completed");}

Этот await обозначает, что метод решил прерваться, чтобы другие вызовы тоже смогли вывести свои символы. Но что именно этот метод будет await-ить? Task.Delay(), Task.Yield() не подходят, так как они подразумевают переключение на другие потоки. Тогда создадим свой класс, который можно использовать с await, и который не будет иметь ничего общего с многопоточкой. Назовем его CooperativeBroker:


private class CooperativeBroker : ICooperativeBroker{    private Action? _continuation;    public void GetResult()         => this._continuation = null;    public bool IsCompleted         => false;//Preventing sync completion in async method state machine    public void OnCompleted(Action continuation)    {        this._continuation = continuation;        this.InvokeContinuation();    }    public ICooperativeBroker GetAwaiter()         => this;    public void InvokeContinuation()         => this._continuation?.Invoke();}

Компилятор C# преобразует исходный код асинхронных методов в виде конечного автомата, каждое состояние которого соответствует вызову await внутри этого метода. Код перехода в следующее состояние передается в виде делегата continuation в метод OnCompleted. В реальной жизни предполагается, что continuation будет вызван, когда будет завершена асинхронная операция, но в нашем случае никаких асинхронных операций нет и для работы программы надо бы было вызвать это continuation немедленно, но тогда методы опять будут работать последовательно, а мы этого не хотим. Лучше сохраним этот делегат на будущее и дадим поработать другим вызовам. Чтобы было где хранить делегаты давайте добавим класс CooperativeContext:


private class CooperativeBroker{    private readonly CooperativeContext _cooperativeContext;    private Action? _continuation;    public CooperativeBroker(CooperativeContext cooperativeContext)        => this._cooperativeContext = cooperativeContext;    ...    public void OnCompleted(Action continuation)    {        this._continuation = continuation;        this._cooperativeContext.OnCompleted(this);    }}public class CooperativeContext{    private readonly List<CooperativeBroker> _brokers =         new List<CooperativeBroker>();    void OnCompleted(CooperativeBroker broker)    {        ...    }}

где метод OnCompleted собственно и будет отвечать за поочередный вызов методов:


private void OnCompleted(CooperativeBroker broker){    //Пропускает вызовы делегатов пока все брокеры не добавлены.    if (this._targetBrokersCount == this._brokers.Count)    {        var nextIndex = this._brokers.IndexOf(broker) + 1;        if (nextIndex == this._brokers.Count)        {            nextIndex = 0;        }        this._brokers[nextIndex].InvokeContinuation();    }}

Обратите внимание на первое условие делегаты не вызываются до тех пор, пока все брокеры для всех кооперативных вызовов не будут добавлены в контекст (_targetBrokersCount количество кооперативных вызовов). Если начать вызывать их сразу, то вызовы опять пойдут последовательно, так как наш контекст не сможет получить "продолжение" всех вызовов сразу.


Так как нам нужно заранее знать общее количество кооперативных вызовов, то перепишем эти вызовы следующим образом:


static void Main(){    CooperativeContext.Run(        b => DoWork(b, "A", 4),        b => DoWork(b, "B", 3),        b => DoWork(b, "C", 2),        b => DoWork(b, "D", 1)    );}static async ValueTask DoWork(CooperativeBroker broker, string name, int num, bool extraWork = false){    for (int i = 1; i <= num; i++)    {        Console.WriteLine($"Work {name}: {i}, Thread: {Thread.CurrentThread.ManagedThreadId}");        await broker;    }    Console.WriteLine($"Work {name} is completed, Thread: {Thread.CurrentThread.ManagedThreadId}");}public class CooperativeContext{    public static void Run(params Func<CooperativeBroker, ValueTask>[] tasks)    {        CooperativeContext context = new CooperativeContext(tasks.Length);        foreach (var task in tasks)        {            task(context.CreateBroker());        }        ...    }    ...    private int _targetBrokersCount;    private CooperativeContext(int maxCooperation)    {        this._threadId = Thread.CurrentThread.ManagedThreadId;        this._targetBrokersCount = maxCooperation;    }    ...}

Вроде бы можно уже и запускать, чтобы проверить как это работает, но осталась еще одна маленькая деталь если один из вызовов завершается раньше других, то он перестает вызывать метод OnCompleted и вся наша цепочка прерывается. Что бы исправить эту ситуацию форсируем вызов оставшихся "продолжений", и заодно удалим ненужный уже брокер:


public class CooperativeContext{    public static void Run(params Func<ICooperativeBroker, ValueTask>[] tasks)    {        CooperativeContext context = new CooperativeContext(tasks.Length);        foreach (var task in tasks)        {            task(context.CreateBroker());        }        // Программа приходит сюда когда один из методов завершен, но надо        // закончить и остальные        while (context._brokers.Count > 0)        {            context.ReleaseFirstFinishedBrokerAndInvokeNext();        }    }    ...    private void ReleaseFirstFinishedBrokerAndInvokeNext()    {        // IsNoAction означает что асинхронный метод завершен        var completedBroker = this._brokers.Find(i => i.IsNoAction)!;        var index = this._brokers.IndexOf(completedBroker);        this._brokers.RemoveAt(index);        this._targetBrokersCount--;        if (index == this._brokers.Count)        {            index = 0;        }        if (this._brokers.Count > 0)        {            this._brokers[index].InvokeContinuation();        }    }    }private class CooperativeBroker : ICooperativeBroker{    ...    public bool IsNoAction        => this._continuation == null;    ...}

Вот теперь можно и запускать (чуть усложним нам задачу введя дополнительную работу):


static void Main(){    CooperativeContext.Run(        b => DoWork(b, "A", 4),        b => DoWork(b, "B", 3, extraWork: true),        b => DoWork(b, "C", 2),        b => DoWork(b, "D", 1)    );}static async ValueTask DoWork(    ICooperativeBroker broker,     string name,     int num,     bool extraWork = false){    for (int i = 1; i <= num; i++)    {        Console.WriteLine(               $"Work {name}: {i}, Thread: {Thread.CurrentThread.ManagedThreadId}");        await broker;        if (extraWork)        {            Console.WriteLine(                   $"Work {name}: {i} (Extra), Thread: {Thread.CurrentThread.ManagedThreadId}");            await broker;        }    }    Console.WriteLine(           $"Work {name} is completed, Thread: {Thread.CurrentThread.ManagedThreadId}");}

Результат:


Work A: 1, Thread: 1Work B: 1, Thread: 1Work C: 1, Thread: 1Work D: 1, Thread: 1Work A: 2, Thread: 1Work B: 1 (Extra), Thread: 1Work C: 2, Thread: 1Work D is completed, Thread: 1Work A: 3, Thread: 1Work B: 2, Thread: 1Work C is completed, Thread: 1Work A: 4, Thread: 1Work B: 2 (Extra), Thread: 1Work A is completed, Thread: 1Work B: 3, Thread: 1Work B: 3 (Extra), Thread: 1Work B is completed, Thread: 1

Как видно, все вызовы выполнялись в одном и том же потоке, но при этом прерывались, чтобы дать поработать дуг другу, создавая таким образом многозадачность.




Честно сказать, я не могу сходу придумать, где такой подход пригодился бы, но наверняка такая задачка найдется, поэтому лучше знать, что C# позволяет делать и такое.


Исходный код вы можете найти на github.


Если вышеприведенный код вам все же кажется вам какой-то чертовщиной, то могу порекомендовать послушать мой доклад на CLRium про асинхронную машину состояний в C#. Там подробно разобрано как работает async/await.

Подробнее..
Категории: C , Net , Async/await , Multithreading

Многопоточность в Photon

14.04.2021 14:22:42 | Автор: admin

О чём статья

В этой статье мы поговорим о многопоточности в серверной части.

  • как реализована

  • как используется

  • что можно сделать

  • что мы сами изобрели

Все эти вопросы актуальны только если вы разрабатываете что-то непосредственно для серверной части - модифицируете код SDK, пишите свой плагин или вообще делаете что-то своё.

Как в Photon решается вопрос с многопоточностью?

Серверное приложение на фотоне принимает запросы от множества клиентских соединений. Буду называть такие соединения пирами. Эти запросы образуют очереди. По одной на каждый пир. Если пиры подключены к одной комнате, их очереди объединяются в одну - очередь комнаты. Таких комнат набирается до нескольких тысяч и их очереди запросов обрабатываются тоже параллельно.

В качестве основы для реализации очередей задачи в Photon была взята библиотека retlang, которая была разработана на базе библиотеки Jetlang.

Почему не используем Task и async/await

Поэтому поводу есть следующие соображения:

  1. Photon начали разрабатывать до появления этих штук

  2. Количество задач, которые выполняются файберами, огромно - десятки тысяч в секунду. Поэтому добавлять ещё одну абстракцию, которая, как мне кажется, ещё и GC нагружает, не было смысла. Абстракция файберов гораздо тоньше, если так можно выразится.

  3. Наверняка есть TaskScheduler, который делает тоже самое что и файберы и я про него узнал бы в комментах, но в общем-то переизобретать велосипед не хотелось и не хочется.

Что такое Fiber?

Файбер это класс, который реализует очередь команд. Команды ставятся в очередь и исполняются одна за другой - FIFO. Можно сказать, что тут реализован шаблон multiple writers-single reader. Я ещё раз хочу обратить внимание на то, что команды исполняются в той последовательности, в которой поступили, т.е. одна за другой. На этом основывается безопасность доступа к данным в многопоточной среде.

Хотя в Photon мы используем только один файбер, а именно PoolFiber, библиотека предоставляет их пять. Все они реализуют интерфейс IFiber. Вот коротко о них.

  • ThreadFiber - это IFiber, опирающийся на выделенный поток. Используется для частых и чувствительных к быстродействию операций.

  • PoolFiber - это IFiber, опирающийся на пул потоков .NET. Выполнение всё равно происходит последовательно и только в одном потоке за раз. Используйте его для нечастых и менее чувствительных к производительности операций. Или когда желательно не увеличивать количество потоков (Наш случай).

  • FormFiber/DispatchFiber - это IFiber, опирающийся на механизм сообщений WinForms/WPF. FormFiber/DispatchFiber полностью удаляют необходимость в вызове Invoke или BeginInvoke чтобы коммуницировать с окном из другого потока.

  • StubFiber - очень полезен для детерминированного тестирования. Предоставляется точный контроль, чтобы сделать тестирование опережений (races) простым. Исполнение всех задач происходит в вызывающем потоке.

Про PoolFiber

Раскрою тему про выполнение задач вы PoolFiber. Хоть он и использует пул потоков, задачи в нём всё равно выполняются последовательно и используется только один поток за раз. Работает это так:

  1. мы ставим в файбер задачу и она начинает исполнятся. Для этого вызывается ThreadPool.QueueUserWorkItem. И в какой-то момент выбирается один поток из пула и он выполняет эту задачу.

  2. Если пока первая задача выполнялась мы поставили ещё несколько задач, то по окончании выполнения первой задачи, все новый забираются из очереди и снова вызывается ThreadPool.QueueUserWorkItem, чтобы все эти задачи отправились на исполнение. Для них будет выбран новый поток из пула. И когда он закончит, если в очереди есть задачи всё повторяется с начала.

Т.е. при том, что каждый раз новый пакет задач выполняет новый поток из пула, в каждый момент времени он один. Поэтому, если все задачи по работе с игровой комнатой ставятся в её файбер, из них(задач) можно безопасно обращаться к данным комнаты. Если к какому-то объекту обращаются из задач, выполняющихся в разных файберах, то тогда обязательно нужна синхронизация.

Почему PoolFiber

В Photon повсеместно используются PoolFiber. В первую очередь как раз потому, что он не создаёт дополнительных потоков и своим файбером может обладать любой кому это нужно. Мы его, кстати, немного модифицировали и теперь его нельзя остановить. Т.е. PoolFiber.Stop не остановит исполнение текущих задач. Для нас это было важно.

Ставить задачи в файбер можно из какого угодно потока. Всё это потоко-безопасно. Задача, которая исполняется в текущий момент, тоже может ставить новые задачи в файбер, в котором она исполняется.

Поставить задачу в файбер можно тремя способами:

  • поставить задачу в очередь

  • поставить задачу в очередь, которая будет выполнена через некоторый интервал

  • поставить задачу в очередь, которая будет выполняться регулярно.

На уровне кода это выглядит примерно так:

// поставили задачу в очередьfiber.Enqueue(()=>{some action code;});
// поставили задачу в очередь, чтобы выполнилась через 10 секундvar scheduledAction = fiber.Schedule(()=>{some action code;}, 10_000);...// останавливаем таймерscheduledAction.Dispose()
// поставили задачу в очередь, чтобы выполнилась через 10 секунд и каждые 5var scheduledAction = fiber.Schedule(()=>{some action code;}, 10_000, 5_000);...// останавливаем таймерscheduledAction.Dispose()

Для задач, которые выполняются через какой-то интервал важно сохранить ссылку, которую вернул fiber.Schedule. Это единственный способ остановить выполнение такой задачи.

Executors

Теперь про экзекуторы. Это классы, которые собственно выполняют задачи. Они реализуют методы Execute(Action a) и Execute(List<Action> a). PoolFiber использует второй. Т.е. задачи пачкой попадают в экзекутор. Что с ними дальше происходит зависит от экзекутора. Поначалу мы использовали класс DefaultExecutor. Всё что он делает это:

        public void Execute(List<Action> toExecute)        {            foreach (var action in toExecute)            {                Execute(action);            }        }        public void Execute(Action toExecute)        {            if (_running)            {                toExecute();            }        }

В реальной жизни этого оказалось недостаточно. Потому что в случае исключения в одном из 'action' все остальные из списка toExecute пропускались. Поэтому по умолчанию сейчас используется FailSafeBatchExecutor, который внутрь цикла добавляет ещё try/catch. Мы рекомендуем использовать именно этот экзекутор, если не нужно ничего особенного. Этот экзекутор мы добавили сами, поэтому его нет в тех версиях, которые можно найти например на github.

Что ещё мы сами изобрели

BeforeAfterExecutor

Позднее мы добавили ещё один экзекутор, чтобы решить наши задачи с логгированием. Называется он BeforeAfterExecutor. Он "обёртывает" переданный эму экзекутор. Если ничего не передали, то создаётся FailSafeBatchExecutor. Особенностью BeforeAfterExecutor является способность выполнять экшен перед выполнением списка задач и ещё один экшен после выполнения списка задач. Конструктор выглядит следующим образом:

public BeforeAfterExecutor(Action beforeExecute, Action afterExecute, IExecutor executor = null)

Для чего это используется. Файбер и экзекутор имеют одного владельца. При создании экзекутора ему передаётся два экшена. Первый добавляет пары ключ/значение в контекст потока, а второй удаляет их, тем самым выполняя функцию уборщика. Добавленные в контекст потока пары добавляются системой логирования к сообщениям и мы можем видеть некоторые мета данные того, кто сообщение оставил.

Пример:

var beforeAction = ()=>{  log4net.ThreadContext.Properties["Meta1"] = "value";};var afterAction = () => ThreadContext.Properties.Clear();//создаём экзекуторvar e = new BeforeAfterExecutor(beforeAction, afterAction);//создаём PoolFibervar fiber = new PoolFiber(e);

Теперь если что-то логгируется из задачи, которая исполняется в fiber, log4net добавит тэг Meta1 со значением value.

ExtendedPoolFiber и ExtendedFailSafeExecutor

Есть ещё одна штука, которой не было в оригинальной версии retlang, и которую мы разработали позже. Этому предшествовала следующая история. Делюсь ей, чтобы и другим неповадно было. Была следующая задача. Есть PoolFiber (это тот, что работает поверх пула потоков .NET). В задаче, которая выполняется этим файбером, нам было необходимо синхронно выполнить HTTP запрос. Сделали просто:

  1. перед выполнением запроса создаём event;

  2. в другой файбер отправляется задача, выполняющая запрос, и, по завершению, ставящая event в сигнальное положение;

  3. после этого встаём ожидать event.

Не лучшее с точки зрения масштабируемости решение начало давать неожиданный сбой. Оказалось, что задача, который мы ставим в другой файбер на шаге два, попадает в очередь того самого потока, который встал ждать event. Таким образом получили дедлок. Не всегда. Но достаточно часто, чтобы обеспокоиться этим.

Решение было реализовано в ExtendedPoolFiber и ExtendedFailSafeExecutor. Придумали ставить весь файбер на паузу. В этом состоянии он может накапливать новые задачи в очереди, но не исполняет их. Для того, чтобы поставить файбер на паузу вызывается метод Pause. Как только он вызван файбер (а именно экзекутор файбера) ждёт пока текущая задача выполнится и замирает. Все остальные задачи будут ждать первого из двух событий:

  1. Вызов метода Resume

  2. Таймаута (указывается при вызове метода Pause) В метод Resume можно поставить ещё и задачу, которая будет выполнена перед всеми, стоявшими в очереди задачами.

Мы используем этот трюк, когда плагину надо загрузить состояние комнаты, используя HTTP запрос. Чтобы игроки увидели обновлённое состояние комнаты сразу же, файбер комнаты ставится на паузу. При вызове метода Resume мы ставим ему задачу, который применяет загруженное состояние и все остальные задачи уже работают с обновлённым состоянием комнаты.

Кстати, необходимость ставить файбер на паузу окончательно убила возможность использовать ThreadFiber для очереди задач игровых комнат.

IFiberAction

IFiberAction - это эксперимент по сокращению нагрузки на GC. Мы не можем управлять процессом создания экшенов в .NET. Поэтому было решено заменить стандартные экшены на экземпляры класса, который реализует интерфейс IFiberAction. Предполагается, что экземпляры таких классов достаются из пула объектов и возвращаются туда сразу же после завершения. Этим и достигается снижение нагрузки на GC

Интерфейс IFiberAction выглядит следующим образом:

public interface IFiberAction{    void Execute()    void Return()}

Метод Execute содержит собственно, то что нужно исполнить. Метод Return вызывается после Execute, когда пришло время вернуть объект в пул.

Пример:

public class PeerHandleRequestAction : IFiberAction{    public static readonly ObjectPool<PeerHandleRequestAction> Pool = initialization;    public OperationRequest Request {get; set;}    public PhotonPeer Peer {get; set;}        public void Execute()    {        this.Peer.HandleRequest(this.Request);    }        public void Return()    {        this.Peer = null;        this.Request = null;                Pool.Return(this);    }}//теперь использование будет выглядит примерно такvar action = PeerHandleRequestAction.Pool.Get();action.Peer = peer;action.Request = request;peer.Fiber.Enqueue(action);

Заключение

В качестве заключения коротко резюмирую то, о чём рассказал. Для обспечения потокобезопастности в фотон мы используем очереди задач, которые в нашем случае представлены файберами. Основной вид файбера, который мы используем это PoolFiber и его наследники. PoolFiber реализует очередь задач поверх стандартного пула потоков .NET. В силу дешевизны PoolFiber своим файбером могут обладать все, кому это необходимо. Если необходимо ставить очередь задач на паузу, используйте ExtendedPoolFiber.

Непосредственным выполнением задач в файберах занимаются экзекуторы, реализующие интефейс IExecutor. DefaultExecutor всем хорош, но в случае исключения теряет весь остаток задач, которые были переданы ему на исполнение. FailSafeExecutor видится в этом отношении разумным выбором. Если надо выполнить какое-то действие перед выполнением экзекутором пачки задач и после него, может пригодится BeforeAfterExecutor

Подробнее..
Категории: C , Net , Разработка игр , Multithreading , Photon

Чем опасен postDelayed

24.09.2020 08:13:47 | Автор: admin

Часто из-за особенностей работы android системы и sdk, нам необходимо подождать, когда определённая часть системы будет сконфигурирована или произойдёт какое-то необходимое нам событие. Зачастую это является костылём, но иногда без них никак, особенно в условиях дедлайнов. Поэтому во многих проектах для этого использовался postDelayed. Под катом рассмотрим, чем же он так опасен и что с этим делать.


Проблема


Для начала рассмотрим как обычно используют postDelayed():


override fun onViewCreated(view: View, savedInstanceState: Bundle?) {        super.onViewCreated(view, savedInstanceState)        view.postDelayed({            Log.d("test", "postDelayed")            // do action        }, 100)}

С виду всё хорошо, но давайте изучим этот код повнимательнее:


1) Это отложенное действие, выполнение которого мы будем ожидать через некоторое время. Зная насколько динамично пользователь может совершать переходы между экранами, данное действие должно быть отменено при смене фрагмента. Однако, этого здесь не происходит, и наше действие выполнится, даже если текущий фрагмент будет уничтожен.
Проверить это просто. Создаём два фрагмента, при переходе на второй запускаем postDelayed с большим временем, к примеру 5000 мс. Сразу возвращаемся назад. И через некоторое время видим в логах, что действие не отменено.


2) Второе "вытекает" из первого. Если в данном runnable мы передадим ссылку на property нашего фрагмента, будет происходить утечка памяти, поскольку ссылка на runnable будет жить дольше, чем сам фрагмент.


3) Третье и основное почему я об этом задумался:
Падения приложения, если мы обращаемся ко view после onDestroyView
synthitec java.lang.NullPointerException, поскольку кеш уже очищен при помощи _$_clearFindViewByIdCache, а findViewById отдаёт null
viewBinding java.lang.IllegalStateException: Can't access the Fragment View's LifecycleOwner when getView() is null


Что же делать?


1 Если нам нужные размеры view использовать doOnLayout или doOnNextLayout


2 Перенести ожидание в компонент, ответственный за бизнес-логику отображения (Presenter/ViewModel или что-то другое). Он в свою очередь должен устанавливать значения во фрагмент в правильный момент его жизненного цикла или отменять действие.


3 Использовать безопасный стиль.


Необходимо отписываться от нашего действия перед тем, как view будет отсоединено от window.


    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {        super.onViewCreated(view, savedInstanceState)         Runnable {            // do action        }.let { runnable ->            view.postDelayed(runnable, 100)            view.addOnAttachStateChangeListener(object : View.OnAttachStateChangeListener {                override fun onViewAttachedToWindow(view: View) {}                override fun onViewDetachedFromWindow(view: View) {                    view.removeOnAttachStateChangeListener(this)                    view.removeCallbacks(runnable)                }            })        }    }

Обычный doOnDetach нельзя использовать, поскольку view может быть ещё не прикреплено к window, как к примеру в onViewCreated. И тогда наше действие будет сразу же отменено.


Где то во View.kt:


inline fun View.doOnDetach(crossinline action: (view: View) -> Unit) {    if (!ViewCompat.isAttachedToWindow(this)) { // выполнится это условие        action(this)  // и здесь мы сразу же отпишемся от действия    } else {        addOnAttachStateChangeListener(object : View.OnAttachStateChangeListener {            override fun onViewAttachedToWindow(view: View) {}            override fun onViewDetachedFromWindow(view: View) {                removeOnAttachStateChangeListener(this)                action(view)            }        })    }}

Или же обобщим в extension:


fun View.postDelayedSafe(delayMillis: Long, block: () -> Unit) {        val runnable = Runnable { block() }        postDelayed(runnable, delayMillis)        addOnAttachStateChangeListener(object : View.OnAttachStateChangeListener {            override fun onViewAttachedToWindow(view: View) {}            override fun onViewDetachedFromWindow(view: View) {                removeOnAttachStateChangeListener(this)                view.removeCallbacks(runnable)            }        })}

В принципе на этом можно остановится. Все проблемы решены. Но этим мы добавляем ещё один тип асинхронного выполнения к нашему проекту, что несколько усложняет его. Сейчас в мире Native Android есть 2 основных решения для асинхронного выполнения кода Rx и Coroutines.
Попробуем использовать их.
Сразу оговорюсь, что не претендую на 100% правильность по отношению к вашему проекту. В вашем проекте это может быть по другому/лучше/короче.


Coroutines


Обычно во всех проектах есть базовый фрагмент для инжекта зависимостей, настройки di и так далее. Используем его для обобщения работы с отменой отложенных действий:


class BaseFragment(@LayoutRes layoutRes: Int) : Fragment(layoutRes), CoroutineScope by MainScope() {    override fun onDestroyView() {        super.onDestroyView()        coroutineContext[Job]?.cancelChildren()    }    override fun onDestroy() {        super.onDestroy()        cancel()    }}

Нам необходимо отменять все дочерние задачи в onDestroyView, но при этом не закрывать scope, поскольку после этого возможно вновь создание View без пересоздания Fragment. К примеру при роутинге вперёд на другой Fragment и после этого назад на текущий.


В onDestroy уже закрываем scope, так как далее никаких задач не должно быть запущено.


Все подготовительные работы сделаны.
Перейдём к самой замене postDelayed:


fun BaseFragment.delayActionSafe(delayMillis: Long, action: () -> Unit): Job? {    view ?: return null    return launch {        delay(delayMillis)        action()    }}

Так как нам не нужно, чтобы наши отложенные задачи выполнялись, если view уже уничтожено, просто не запускаем ничего и возвращаем null. Иначе запускаем наше действие с необходимой задержкой. Но теперь при уничтожении view, задача будет отменена и ресурсы будут отпущены.


RX


В RX за отмену подписок отвечает класс Disposable, но в RX нет Structured concurrency в отличии от coroutine. Из-за этого приходится прописывать это всё самому. Выглядит обычно это примерно так:


interface DisposableHolder {    fun dispose()    fun addDisposable(disposable: Disposable)}class DisposableHolderImpl : DisposableHolder {    private val compositeDisposable = CompositeDisposable()    override fun addDisposable(disposable: Disposable) {        compositeDisposable.add(disposable)    }    override fun dispose() {        compositeDisposable.clear()    }}

Также аналогично отменяем все задачи в базовом фрагменте:


class BaseFragment(@LayoutRes layoutRes: Int) : Fragment(layoutRes),    DisposableHolder by DisposableHolderImpl() {    override fun onDestroyView() {        super.onDestroyView()        dispose()    }    override fun onDestroy() {        super.onDestroy()        dispose()    }}

И сам extension:


fun BaseFragment.delayActionSafe(delayMillis: Long, block: () -> Unit): Disposable? {    view ?: return null    return Completable.timer(delayMillis, TimeUnit.MILLISECONDS).subscribe {        block()    }.also {        addDisposable(it)    }}

В заключении


При использовании каких-либо отложенных действий нельзя забывать, что это уже асинхронное исполнение, и соответственно оно требует отмены, в противном случае начинают происходить утечки памяти, краши и разные другие неожиданные вещи.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru