Русский
Русский
English
Статистика
Реклама

Threading

Перевод 8 продвинутых возможностей модуля logging в Python, которые вы не должны пропустить

05.08.2020 18:06:10 | Автор: admin

Понимайте свою программу без ущерба для производительности


image


Журналирование это очень важная часть разработки ПО. Оно помогает разработчикам лучше понимать выполнение программы и судить о дефектах и непредвиденных сбоях. Журнальное сообщение может хранить информацию наподобие текущего статуса программы или того, в каком месте она выполняется. Если происходит ошибка, то разработчики могут быстро найти строку кода, которая вызвала проблему, и действовать с учетом этого.


Python предоставляет довольно мощный и гибкий встроенный модуль logging со множеством возможностей. В этой статье я хочу поделиться восемью продвинутыми возможностями, которые будут полезны при разработке ПО.


Основы модуля logging


Прежде чем приступить к рассмотрению продвинутых возможностей, давайте убедимся, что у нас есть базовое понимание модуля logging.


Логгер


Экземпляр, который мы создаем для генерации записей, называют логгером. Он инстанциируется через logger = logging.getLogger(name). Лучшая практика это использовать name в качестве имени логгера. name включает в себя имя пакета и имя модуля. Имя будет появляться в журнальном сообщении, что поможет разработчикам быстро находить, где оно было сгенерировано.


Форматировщик и обработчик


У любого логгера есть несколько конфигураций, которые могут быть модифицированы. Более продвинутые конфигурации мы обсудим позже, а наиболее ходовые это форматировщик (прим. пер.: formatter) и обработчик (прим. пер.: handler).


Форматировщик устанавливает структуру журнального сообщения. Каждое журнальное сообщение это объект класса LogRecord с несколькими атрибутами (имя модуля один из них). Когда мы определяем форматировщик, мы можем решить, как должно выглядеть журнальное сообщение вместе с этими атрибутами и, возможно, с атрибутами, созданными нами. Стандартный форматировщик выглядит так:


серьезность ошибки:имя логгера:сообщение# например: WARNING:root:Программа стартует!

Кастомизированный форматировщик может выглядеть так:


"%(asctime)s - [%(levelname)s] -  %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s"# 2020-07-26 23:37:15,374 - [INFO] -  __main__ - (main.py).main(18) - Программа стартует!

Обработчик задает целевое местонахождение журнальных сообщений. Журнальное сообщение может быть отправлено в более чем одно место. Собственно говоря, модуль logging предоставляет довольно много стандартных обработчиков. Самые популярные FileHandler, который отправляет записи в файл, и StreamHandler, который отправляет записи в потоки, такие как sys.stderr или sys.stdout. Экземпляр логгера поддерживает ноль или более обработчиков. Если никакие обработчики не определены, тогда он будет отправлять записи в sys.stderr. Если определен более чем один обработчик, тогда целевое местонахождение журнального сообщения зависит от его уровня и от уровня обработчика.


Например, у меня есть FileHandler с уровнем WARNING (прим. пер.: предупреждение) и StreamHandler с уровнем INFO (прим. пер.: информация). Если я напишу в журнал сообщение об ошибке, то оно будет отправлено как в sys.stdout, так и в файл журнала.


Например:

В этом примере мы создали main.py, package1.py, и app_logger.py. Модуль app_logger.py содержит функцию get_logger, которая возвращает экземпляр логгера. Экземпляр логгера включает в себя кастомный форматировщик и два обработчика: StreamHandler с уровнем INFO и FileHandler с уровнем WARNING. Важно установить базовый уровень в INFO или DEBUG (уровень журналирования по умолчанию WARNING), в противном случае любые записи журнала по уровню ниже, чем WARNING, будут отфильтрованы. И main.py, и package1.py, используют get_logger, чтобы создавать свои собственные логгеры.


image
Диаграмма Xiaoxu Gao


# app_logger.pyimport logging_log_format = f"%(asctime)s - [%(levelname)s] - %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s"def get_file_handler():    file_handler = logging.FileHandler("x.log")    file_handler.setLevel(logging.WARNING)    file_handler.setFormatter(logging.Formatter(_log_format))    return file_handlerdef get_stream_handler():    stream_handler = logging.StreamHandler()    stream_handler.setLevel(logging.INFO)    stream_handler.setFormatter(logging.Formatter(_log_format))    return stream_handlerdef get_logger(name):    logger = logging.getLogger(name)    logger.setLevel(logging.INFO)    logger.addHandler(get_file_handler())    logger.addHandler(get_stream_handler())    return logger# package1.pyimport app_loggerlogger = app_logger.get_logger(__name__)def process(msg):    logger.info("Перед процессом")    print(msg)    logger.info("После процесса")# main.pyimport package1import app_loggerlogger = app_logger.get_logger(__name__)def main():    logger.info("Программа стартует")    package1.process(msg="сообщение")    logger.warning("Это должно появиться как в консоли, так и в файле журнала")    logger.info("Программа завершила работу")if __name__ == "__main__":    main()# 2020-07-25 21:06:06,657 - [INFO] - __main__ - (main.py).main(8) - Программа стартует# 2020-07-25 21:06:06,658 - [INFO] - package1 - (package1.py).process(7) - Перед процессом# сообщение# 2020-07-25 21:06:06,658 - [INFO] - package1 - (package1.py).process(9) - После процесса# 2020-07-25 21:06:06,658 - [WARNING] - __main__ - (main.py).main(10) - Это должно появиться как в консоли, так и в файле журнала# 2020-07-25 21:06:06,658 - [INFO] - __main__ - (main.py).main(11) - Программа завершила работу# cat x.log# 2020-07-25 21:06:06,658 - [WARNING] - __main__ - (main.py).main(10) - Это должно появиться как в консоли, так и в файле журнала

basic-logging.py


Записи с уровнем INFO отправляются как в консольный вывод (sys.stdout), так и в файл журнала, а записи с уровнем WARNING пишутся только в файл журнала. Если вы можете полностью понять, что и почему происходит в этом примере, то мы готовы приступить к более продвинутым возможностям.


1. Создавайте заданные пользователем атрибуты объектов класса LogRecord, используя класс LoggerAdapter


Как я упоминал ранее, у LogRecord есть несколько атрибутов. Разработчики могут выбрать наиболее важные атрибуты и использовать в форматировщике. Помимо того, модуль logging также предоставляет возможность добавить в LogRecord определенные пользователем атрибуты.
Один из способов сделать это использовать LoggerAdapter. Когда вы создаете адаптер, вы передаете ему экземпляр логгера и свои атрибуты (в словаре). Этот класс предоставляет тот же интерфейс, что и класс Logger, поэтому вы все еще можете вызывать методы наподобие logger.info.


Новый атрибут с фиксированным значением


Если вы хотите иметь что-то вроде атрибута с фиксированным значением в журнальном сообщении, например имя приложения, то вы можете использовать стандартный класс LoggerAdapter и получать значение атрибута при создании логгера (прим. пер.: вероятно, получать откуда-либо, чтобы затем передать конструктору). Не забывайте добавлять этот атрибут в форматировщик. Местоположение атрибута вы можете выбрать по своему усмотрению. В следующем коде я добавляю атрибут app, значение которого определяется, когда я создаю логгер.


import logginglogging.basicConfig(    level=logging.INFO,    format="%(asctime)s - [%(levelname)s] - %(app)s - %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s",)logger = logging.getLogger(__name__)logger = logging.LoggerAdapter(logger, {"app": "тестовое приложение"})logger.info("Программа стартует")logger.info("Программа завершила работу")# 2020-07-25 21:36:20,709 - [INFO] - тестовое приложение - __main__ - (main.py).main(8) - Программа стартует# 2020-07-25 21:36:20,709 - [INFO] - тестовое приложение - __main__ - (main.py).main(11) - Программа завершила работу

logging_adapter_fixed_value.py


Новый атрибут с динамическим значением


В других ситуациях вам, возможно, понадобятся динамические атрибуты, например что-то вроде динамического идентификатора. В таком случае вы можете расширить базовый класс LoggerAdapter и создать свой собственный. Метод process() то место, где дополнительные атрибуты добавляются к журнальному сообщению. В коде ниже я добавляю динамический атрибут id, который может быть разным в каждом журнальном сообщении. В этом случае вам не нужно добавлять атрибут в форматировщик.


import loggingclass CustomAdapter(logging.LoggerAdapter):    def process(self, msg, kwargs):        my_context = kwargs.pop('id', self.extra['id'])        return '[%s] %s' % (my_context, msg), kwargslogging.basicConfig(    level=logging.INFO,    format="%(asctime)s - [%(levelname)s] - %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s",)logger = logging.getLogger(__name__)logger = CustomAdapter(logger, {"id": None})logger.info('ID предоставлен', id='1234')logger.info('ID предоставлен', id='5678')logger.info('Отсутствует информация об ID')# 2020-07-25 22:12:06,715 - [INFO] - __main__ - (main.py).<module>(38) - [1234] ID предоставлен# 2020-07-25 22:12:06,715 - [INFO] - __main__ - (main.py).<module>(38) - [1234] ID предоставлен# 2020-07-25 22:21:31,568 - [INFO] - __main__ - (main.py).<module>(39) - [5678] ID предоставлен# 2020-07-25 22:21:31,568 - [INFO] - __main__ - (main.py).<module>(39) - [5678] ID предоставлен# 2020-07-25 22:12:06,715 - [INFO] - __main__ - (main.py).<module>(39) - [None] Отсутствует информация об ID# 2020-07-25 22:12:06,715 - [INFO] - __main__ - (main.py).<module>(39) - [None] Отсутствует информация об ID

logging_adapter_dynamic_value.py


2. Создавайте определенные пользователем атрибуты объектов класса LogRecord, используя класс Filter


Другой способ добавления определенных пользователем атрибутов использование кастомного Filter. Фильтры предоставляют дополнительную логику для определения того, какие журнальные сообщения выводить. Это шаг после проверки базового уровня журналирования, но до передачи журнального сообщения обработчикам. В дополнение к определению, должно ли журнальное сообщение двигаться дальше, мы также можем вставить новые атрибуты в методе filter().


image
Диаграмма из официальной документации Python


В этом примере мы добавляем новый атрибут color (прим. пер.: цвет) в методе filter(), значение которого определяется на основе имени уровня в журнальном сообщении. В этом случае имя атрибута снова должно быть добавлено в форматировщик.


import loggingclass CustomFilter(logging.Filter):    COLOR = {        "DEBUG": "GREEN",        "INFO": "GREEN",        "WARNING": "YELLOW",        "ERROR": "RED",        "CRITICAL": "RED",    }    def filter(self, record):        record.color = CustomFilter.COLOR[record.levelname]        return Truelogging.basicConfig(    level=logging.DEBUG,    format="%(asctime)s - [%(levelname)s] - [%(color)s] - %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s",)logger = logging.getLogger(__name__)logger.addFilter(CustomFilter())logger.debug("сообщение для отладки, цвет  зеленый")logger.info("информационное сообщение, цвет  зеленый")logger.warning("предупреждающее сообщение, цвет  желтый")logger.error("сообщение об ошибке, цвет  красный")logger.critical("сообщение о критической ошибке, цвет  красный")# 2020-07-25 22:45:17,178 - [DEBUG] - [GREEN] - __main__ - (main.py).<module>(52) - сообщение для отладки, цвет  зеленый# 2020-07-25 22:45:17,179 - [INFO] - [GREEN] - __main__ - (main.py).<module>(53) - информационное сообщение, цвет  зеленый# 2020-07-25 22:45:17,179 - [WARNING] - [YELLOW] - __main__ - (main.py).<module>(54) - предупреждающее сообщение, цвет  желтый# 2020-07-25 22:45:17,179 - [ERROR] - [RED] - __main__ - (main.py).<module>(55) - сообщение об ошибке, цвет  красный# 2020-07-25 22:45:17,179 - [CRITICAL] - [RED] - __main__ - (main.py).<module>(56) - сообщение о критической ошибке, цвет  красный

logging_filter_dynamic_attributes.py


3. Многопоточность с модулем logging


Модуль logging на самом деле реализован потокобезопасным способом, поэтому нам не нужны дополнительные усилия. Код ниже показывает, что MainThread и WorkThread разделяют один и тот же экземпляр логгера без проблемы состояния гонки. Также есть встроенный атрибут threadName для форматировщика.


import loggingimport threadinglogging.basicConfig(    level=logging.INFO,    format="%(asctime)s - [%(levelname)s] - [%(threadName)s] - %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s",)logger = logging.getLogger(__name__)def worker():    for i in range(5):        logger.info(f"журнальное сообщение {i} из рабочего потока")thread = threading.Thread(target=worker)thread.start()for i in range(5):    logger.info(f"журнальное сообщение {i} из главного потока")thread.join()# 2020-07-26 15:33:21,078 - [INFO] - [Thread-1] - __main__ - (main.py).worker(62) - 0-ое журнальное сообщение из рабочего потока# 2020-07-26 15:33:21,078 - [INFO] - [Thread-1] - __main__ - (main.py).worker(62) - 1-ое журнальное сообщение из рабочего потока# 2020-07-26 15:33:21,078 - [INFO] - [Thread-1] - __main__ - (main.py).worker(62) - 2-ое журнальное сообщение из рабочего потока# 2020-07-26 15:33:21,078 - [INFO] - [Thread-1] - __main__ - (main.py).worker(62) - 3-ое журнальное сообщение из рабочего потока# 2020-07-26 15:33:21,078 - [INFO] - [Thread-1] - __main__ - (main.py).worker(62) - 4-ое журнальное сообщение из рабочего потока# 2020-07-26 15:33:21,078 - [INFO] - [MainThread] - __main__ - (main.py).<module>(69) - 0-ое журнальное сообщение из главного потока# 2020-07-26 15:33:21,078 - [INFO] - [MainThread] - __main__ - (main.py).<module>(69) - 1-ое журнальное сообщение из главного потока# 2020-07-26 15:33:21,078 - [INFO] - [MainThread] - __main__ - (main.py).<module>(69) - 2-ое журнальное сообщение из главного потока# 2020-07-26 15:33:21,078 - [INFO] - [MainThread] - __main__ - (main.py).<module>(69) - 3-ое журнальное сообщение из главного потока# 2020-07-26 15:33:21,078 - [INFO] - [MainThread] - __main__ - (main.py).<module>(69) - 4-ое журнальное сообщение из главного потока

logging_multi_threading.py


Под капотом модуль logging использует threading.RLock() практически везде. Отличия между RLock от Lock:


  1. Lock может быть получен только один раз и больше не может быть получен до тех пор, пока он не будет освобожден. С другой стороны, RLock может быть получен неоднократно до своего освобождения, но он должен быть освобожден столько же раз.


  2. Lock может быть освобожден любым потоком, а RLock только тем потоком, который его удерживает.



Любой обработчик, который наследуется от класса Handler, обладает методом handle(), предназначенным для генерации записей. Ниже представлен блок кода метода Handler.handle(). Как видите, обработчик получит и освободит блокировку до и после генерации записи соответственно. Метод emit() может быть реализован по-разному в разных обработчиках.


def handle(self, record):    """    При определенных условиях генерирует журнальную запись.    Генерация зависит от фильтров, которые могут быть добавлены в обработчик.    Оборачивает настоящую генерацию записи получением/освобождением    блокировки потока ввода-вывода. Возвращает значение, свидетельствующее о том, пропустил ли фильтр запись на    генерацию.    """    rv = self.filter(record)    if rv:        self.acquire()        try:            self.emit(record)        finally:            self.release()    return rv

logging_handle.py


4. Многопроцессная обработка с модулем logging QueueHandler


Несмотря на то, что модуль logging потокобезопасен, он не процессобезопасен. Если вы хотите, чтобы несколько процессов вели запись в один и тот же файл журнала, то вы должны вручную позаботиться о доступе к вашему файлу. В соответствии с учебником по logging, есть несколько вариантов.


QueueHandler + процесс-потребитель


Один из вариантов использование QueueHandler. Идея заключается в том, чтобы создать экземпляр класса multiprocessing.Queue и поделить его между любым количеством процессов. В примере ниже у нас есть 2 процесса-производителя, которые отправляют записи журнала в очередь и процесс-потребитель, читающий записи из очереди и пишущий их в файл журнала.


У записей журнала в очереди будут, вероятно, разные уровни, так что в функции log_processor мы используем logger.log(record.levelno, record.msg) вместо logger.info() или logger.warning(). В конце (прим. пер.: функции main) мы отправляем сигнал, чтобы позволить процессу log_processor остановиться. Деление экземпляра очереди между множеством процессов или потоков не новшество, но модуль logging как бы помогает нам справиться с этой ситуацией.


import osfrom logging.handlers import QueueHandlerformatter = "%(asctime)s - [%(levelname)s] - %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s"logging.basicConfig(level=logging.INFO)def log_processor(queue):    logger = logging.getLogger(__name__)    file_handler = logging.FileHandler("a.log")    file_handler.setFormatter(logging.Formatter(formatter))    logger.addHandler(file_handler)    while True:        try:            record = queue.get()            if record is None:                break            logger.log(record.levelno, record.msg)        except Exception as e:            passdef log_producer(queue):    pid = os.getpid()    logger = logging.getLogger(__name__)    queue_handler = QueueHandler(queue)    logger.addHandler(QueueHandler(queue))    logger.info(f"уровень INFO - производитель {pid}")    logger.warning(f"уровень WARNING - производитель {pid}")def main():    queue = multiprocessing.Queue(-1)    listener = multiprocessing.Process(target=log_processor, args=(queue,))    listener.start()    workers = []    for i in range(2):        worker = multiprocessing.Process(target=log_producer, args=(queue,))        workers.append(worker)        worker.start()    for w in workers:        w.join()    queue.put_nowait(None)    listener.join()if __name__ == '__main__':    main()# >> cat a.log # 2020-07-26 18:38:10,525 - [INFO] - __mp_main__ - (main.py).log_processer(118) - уровень INFO - производитель 32242# 2020-07-26 18:38:10,525 - [WARNING] - __mp_main__ - (main.py).log_processer(118) - уровень WARNING - производитель 32242# 2020-07-26 18:38:10,530 - [INFO] - __mp_main__ - (main.py).log_processer(118) - уровень INFO - производитель 32243# 2020-07-26 18:38:10,530 - [WARNING] - __mp_main__ - (main.py).log_processer(118) - уровень WARNING - производитель 32243

logging_queue_handler.py


QueueHandler + QueueListener


В модуле logging.handlers есть особый класс с именем QueueListener. Этот класс создает экземпляр слушателя с очередью журнальных сообщений и списком обработчиков для обработки записей журнала. QueueListener может заменить процесс, который мы создали в предыдущем примере, поместив его в переменную listener. При этом будет использовано меньше кода.


def log_producer(queue):    pid = os.getpid()    logger = logging.getLogger(__name__)    logger.addHandler(QueueHandler(queue))    logger.info(f"уровень INFO - производитель {pid}")    logger.warning(f"уровень WARNING - производитель {pid}")def main():    queue = multiprocessing.Queue(-1)    file_handler = logging.FileHandler("b.log")    file_handler.setFormatter(logging.Formatter(formatter))    queue_listener = QueueListener(queue, file_handler)    queue_listener.start()    workers = []    for i in range(2):        worker = multiprocessing.Process(target=log_producer, args=(queue,))        workers.append(worker)        worker.start()    for w in workers:        w.join()    queue_listener.stop()if __name__ == '__main__':    main()# >> cat b.log # 2020-07-26 20:15:58,656 - [INFO] - __mp_main__ - (main.py).log_producer(130) - уровень INFO - производитель 34199# 2020-07-26 20:15:58,656 - [WARNING] - __mp_main__ - (main.py).log_producer(131) - уровень WARNING - производитель 34199# 2020-07-26 20:15:58,662 - [INFO] - __mp_main__ - (main.py).log_producer(130) - уровень INFO - производитель 34200# 2020-07-26 20:15:58,662 - [WARNING] - __mp_main__ - (main.py).log_producer(131) - уровень WARNING - производитель 34200

logging_queue_listener.py


SocketHandler


Другое решение, предлагаемое учебником, отправлять записи из нескольких процессов в SocketHandler и иметь отдельный процесс, который реализует сокет-сервер, читающий записи и отправляющий их в место назначения. В этих источниках есть довольно подробная реализация.


Все эти решения, в основном, следуют одному принципу: отправлять записи из разных процессов в централизованное место либо в очередь, либо на удаленный сервер. Получатель на другой стороне ответственен за их запись в места назначения.


5. По умолчанию не генерируйте какие-либо журнальные записи библиотеки NullHandler


На данный момент мы упомянули несколько обработчиков, реализованных модулем logging.
Другой полезный встроенный обработчик NullHandler. В реализации NullHandler практически ничего нет. Тем не менее, он помогает разработчикам отделить библиотечные записи журнала от записей приложения.


Ниже приведена реализация обработчика NullHandler.


class NullHandler(Handler):    """    Этот обработчик ничего не делает. Предполагается использовать его, чтобы избежать    разового предупреждения "Для логгера XXX не найдено никаких обработчиков". Это    важно для кода библиотеки, который может содержать код для журналирования событий. Если пользователь    библиотеки не настроил конфигурацию журналирования, то может быть    создано разовое предупреждение; чтобы избежать этого, разработчику библиотеку нужно просто создать экземпляр класса    NullHandler и добавить его в логгер верхнего уровня модуля библиотеки или    пакета.    """    def handle(self, record):        """Заглушка."""    def emit(self, record):        """Заглушка."""    def createLock(self):        self.lock = None

logging_nullhandler.py


Почему нам нужно отделять записи библиотеки от записей приложения?


По словам автора модуля logging, Vinay Sajip:


Сторонняя библиотека, которая использует logging, по умолчанию не должна выбрасывать вывод журналирования, так как он может быть не нужен разработчику/пользователю приложения, которое использует библиотеку.


Наилучшая практика по умолчанию не генерировать библиотечные записи журнала и давать пользователю библиотеки возможность решить, хочет ли он получать и обрабатывать эти записи в приложении.


В роли разработчика библиотеки нам нужна только одна строка кода внутри init.py, чтобы добавить NullHandler. Во вложенных пакетах и модулях логгеры остаются прежними. Когда мы устанавливаем этот пакет в наше приложение через pip install, мы по умолчанию не увидим библиотечные записи журнала.


# package/# #  __init__.py#  module1.py# __init__.pyimport logginglogging.getLogger(__name__).addHandler(logging.NullHandler())# module1.pylogger = logging.getLogger(__name__)logger.info("это информационное журнальное сообщение")

logging_nullhandler_example.py


Чтобы сделать эти записи видимыми, нужно добавить обработчики в логгер библиотеки в своем приложении.


# ваше приложение logging.getLogger("package").addHandler(logging.StreamHandler())

Если библиотека не использует NullHandler, но вы хотите отключить записи из библиотеки, то можете установить logging.getLogger("package").propagate = False. Если propagate установлен в False, то записи журнала не будут передаваться обработчикам.


6. Делайте ротацию своих файлов журнала RotatingFileHandler, TimedRotatingFileHandler


RotatingFileHandler поддерживает ротацию файлов журнала, основанную на максимальном размере файла. Здесь должны быть определено два параметра: maxBytes и backupCount. Параметр maxBytes сообщает обработчику, когда делать ротацию журнала. Параметр backupCount количество файлов журнала. У каждого продолженного файла журнала есть суффикс .1, .2 в конце имени файла. Если текущее журнальное сообщение вот-вот позволит файлу журнала превысить максимальный размер, то обработчик закроет текущий файл и откроет следующий.


Вот этот пример очень похож на пример из учебника. Должно получиться 6 файлов журнала.


import loggingfrom logging.handlers import RotatingFileHandlerdef create_rotating_log(path):    logger = logging.getLogger(__name__)    logger.setLevel(logging.INFO)    handler = RotatingFileHandler(path, maxBytes=20, backupCount=5)    logger.addHandler(handler)    for i in range(100):        logger.info("Это тестовая строка-запись в журнале %s" % i)if __name__ == "__main__":    log_file = "test.log"    create_rotating_log(log_file)

logging_file_rotation.py


Другой обработчик для ротации файлов TimeRotatingFileHandler, который позволяет разработчикам создавать ротационные журналы, основываясь на истекшем времени. Условия времени включают: секунду, минуту, час, день, день недели (0=Понедельник) и полночь (журнал продлевается в полночь).


В следующем примере мы делаем ротацию файла журнала каждую секунду с пятью резервными файлами. В каждом резервном файле есть временная метка в качестве суффикса.


import loggingimport timefrom logging.handlers import TimedRotatingFileHandlerlogger = logging.getLogger(__name__)logger.setLevel(logging.INFO)handler = TimedRotatingFileHandler(    "timed_test.log", when="s", interval=1, backupCount=5)logger.addHandler(handler)for i in range(6):    logger.info(i)    time.sleep(1)

time_file_rotation.py


7. Исключения в процессе журналирования


Зачастую мы используем logger.error() или logger.exception() при обработке исключений. Но что если сам логгер генерирует исключение? Что случится с программой? Ну, зависит от обстоятельств.


Ошибка логгера обрабатывается, когда обработчик вызывает метод emit(). Это означает, что любое исключение, связанное с форматированием или записью, перехватывается обработчиком, а не поднимается. Если конкретнее, метод handleError() будет выводить трассировку в stderr, и программа продолжится. Если у вас есть кастомный обработчик, наследуемый от класса Handler, то вы можете реализовать свой собственный handleError().


def emit(self, record):    """    Генерирует запись.    Если форматировщик указан, он используется для форматирования записи.    Затем запись пишется в поток с завершающим символом переноса строки. Если    предоставлена информация об исключении, то она форматируется с использованием    traceback.print_exception и добавляется в конец потока. Если у потока    есть атрибут 'encoding', он используется для определения того, как делать    вывод в поток.    """    try:        msg = self.format(record)        stream = self.stream        # issue 35046: merged two stream.writes into one.        stream.write(msg + self.terminator)        self.flush()    except RecursionError:  # See issue 36272        raise    except Exception:        self.handleError(record)

logging_emit.py


В этом примере во втором журнальном сообщении слишком много аргументов. Поэтому в консольном выводе мы получили трассировку, и выполнение программы все еще могло быть продолжено.


import logginglogging.basicConfig(    level=logging.INFO,    format="%(asctime)s - [%(levelname)s] -  %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s",)logger = logging.getLogger(__name__)logger.info("Программа стартует")logger.info("У этого сообщения %s слишком много аргументов", "msg", "other")logger.info("Программа завершила работу")# 2020-07-26 23:37:15,373 - [INFO] -  __main__ - (main.py).main(16) - Программа стартует# --- Logging error ---# Traceback (most recent call last):#   File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/logging/__init__.py", line 1197, in emit#     msg = self.format(record)#   File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/logging/__init__.py", line 1032, in format#     return fmt.format(record)#   File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/logging/__init__.py", line 756, in format#     record.message = record.getMessage()#   File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/logging/__init__.py", line 443, in getMessage#     msg = msg % self.args# TypeError: not all arguments converted during string formatting (прим. пер.: не все аргументы конвертированы при форматировании строки)# Call stack:#   File "/Users/ie15qx/repo/compare_xml_files/source/main.py", line 22, in <module>#     main()#   File "/Users/ie15qx/repo/compare_xml_files/source/main.py", line 17, in main#     logger.info("У этого сообщения %s слишком много аргументов", "msg", "other")# Message: 'У этого сообщения %s слишком много аргументов'# Arguments: ('msg', 'other')# 2020-07-26 23:37:15,374 - [INFO] -  __main__ - (main.py).main(18) - Программа завершила работу

logging_error_handle.py


Однако, если исключение произошло за пределами emit(), то оно может быть поднято, и программа остановится. Например, в коде ниже мы добавляем дополнительный атрибут id в logger.info без его обработки в LoggerAdapter. Эта ошибка не обработана и приводит к остановке программы.


import logginglogging.basicConfig(    level=logging.INFO,    format="%(asctime)s - [%(levelname)s] - %(app)s - %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s",)logger = logging.getLogger(__name__)logger = logging.LoggerAdapter(logger, {"app": "тестовое приложение"})logger.info("Программа стартует", id="123")logger.info("Программа завершила работу")# source/main.py# Traceback (most recent call last):#   File "/Users/ie15qx/repo/compare_xml_files/source/main.py", line 10, in <module>#     logger.info("Программа стартует", id="123")#   File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/logging/__init__.py", line 1960, in info#     self.log(INFO, msg, *args, **kwargs)#   File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/logging/__init__.py", line 2001, in log#     self.logger.log(level, msg, *args, **kwargs)#   File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/logging/__init__.py", line 1637, in log#     self._log(level, msg, args, **kwargs)# TypeError: _log() got an unexpected keyword argument 'id' (прим. пер.: _log() получил непредвиденный именованный аргумент 'id')

logging_exception_raise.py


8. Три разных способа конфигурирования своего логгера


Последний пункт, которым я хотел поделиться, о конфигурировании своего логгера. Есть три способа конфигурирования логгера.


используйте код


Самый простой вариант использовать код для конфигурирования своего логгера, так же, как во всех примерах, что мы видели ранее в этой статье. Но недостаток этого варианта в том, что любая модификация (прим. пер.: конфигурации) требует внесения изменений в исходном коде.


use dictConfig


Второй вариант записывать конфигурацию в словарь и использовать logging.config.dictConfig, чтобы читать ее. Вы также можете сохранить словарь в JSON-файл и читать оттуда. Плюс в том, что этот файл может быть загружен как внешняя конфигурация, но он может способствовать появлению ошибок из-за своей структуры.


import loggingimport logging.configLOGGING_CONFIG = {    'version': 1,    'disable_existing_loggers': True,    'formatters': {        'standard': {            'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'        },    },    'handlers': {        'default': {            'level': 'INFO',            'formatter': 'standard',            'class': 'logging.StreamHandler',            'stream': 'ext://sys.stdout',        },    },    'loggers': {        '': {  # root logger            'handlers': ['default'],            'level': 'DEBUG',            'propagate': True,        }    },}logging.config.dictConfig(LOGGING_CONFIG)if __name__ == "__main__":    log = logging.getLogger(__name__)    log.info("hello world")# 2020-07-28 21:44:09,966 [INFO] __main__: hello world

logging_configure_json.py


используйте fileConfig


И последний, но не менее важный, третий вариант использовать logging.config.fileConfig Конфигурация записывается в отдельный файл формата .ini.


# logging_config.ini# [loggers]# keys=root# [handlers]# keys=stream_handler# [formatters]# keys=formatter# [logger_root]# level=DEBUG# handlers=stream_handler# [handler_stream_handler]# class=StreamHandler# level=DEBUG# formatter=formatter# args=(sys.stderr,)# [formatter_formatter]# format=%(asctime)s %(name)-12s %(levelname)-8s %(message)simport loggingfrom logging.config import fileConfigfileConfig('logging_config.ini')logger = logging.getLogger()logger.debug('hello world')

logging_configure_file.py


Есть возможность обновлять конфигурацию во время выполнения программы через сервер конфигурации. Учебник показывает пример как со стороны клиента, так и со стороны сервера. Конфигурация обновляется посредством подключения через сокет, а на стороне клиента мы используем c = logging.config.listen(PORT) c.start(), чтобы получать самую новую конфигурацию.


Надеюсь, эти советы и приемы, связанные с модулем logging, могут помочь вам создать вокруг вашего приложения хороший фреймворк для журналирования без ущерба для производительности. Если у вас есть что-нибудь, чем можно поделиться, пожалуйста, оставьте комментарий ниже!

Подробнее..

System.Threading.Channels высокопроизводительный производитель-потребитель и асинхронность без алокаций и стэк дайва

07.07.2020 14:13:48 | Автор: admin
И снова здравствуй. Какое-то время назад я писал о другом малоизвестном инструменте для любителей высокой производительности System.IO.Pipelines. По своей сути, рассматриваемый System.Threading.Channels (в дальнейшем каналы) построен по похожим принципам, что и Пайплайны, решает ту же задачу Производитель-Потребитель. Однако имеет в разы более простое апи, которое изящно вольется в любого рода enterprise-код. При этом использует асинхронность без алокаций и без stack-dive даже в асинхронном случае! (Не всегда, но часто).



Оглавление




Введение


Задача Производитель/Потребитель встречается на пути программистов довольно часто и уже не первый десяток лет. Сам Эдсгер Дейкстра приложил руку к решению данной задачи ему принадлежит идея использования семафоров для синхронизации потоков при организации работы по принципу производитель/потребитель. И хотя ее решение в простейшем виде известно и довольно тривиально, в реальном мире данный шаблон (Производитель/Потребитель) может встречаться в гораздо более усложненном виде. Также современные стандарты программирования накладывают свои отпечатки, код пишется более упрощенно и разбивается для дальнейшего переиспользования. Все делается для понижения порога написания качественного кода и упрощения данного процесса. И рассматриваемое пространство имен System.Threading.Channels очередной шаг на пути к этой цели.

Какое-то время назад я рассматривал System.IO.Pipelines. Там требовалось более внимательная работа и глубокое осознание дела, в ход шли Span и Memory, а для эффективной работы требовалось не вызывать очевидных методов (чтобы избежать лишних выделений памяти) и постоянно думать в байтах. Из-за этого программный интерфейс Пайплайнов был нетривиален и не интуитивно понятен.

В System.Threading.Channels пользователю представляется гораздо более простое api для работы. Стоит упомянуть, что несмотря на простоту api, данный инструмент является весьма оптимизированным и на протяжении своей работы вполне вероятно не выделит память. Возможно это благодаря тому, что под капотом повсеместно используется ValueTask, а даже в случае реальной асинхронности используется IValueTaskSource, который переиспользуется для дальнейших операций. Именно в этом заключается весь интерес реализации Каналов.

Каналы являются обобщенными, тип обобщения, как несложно догадаться тип, экземпляры которого будут производиться и потребляться. Интересно то, что реализация класса Channel, которая помещается в 1 строку (источник github):

namespace System.Threading.Channels{    public abstract class Channel<T> : Channel<T, T> { }}

Таким образом основной класс каналов параметризован 2 типами отдельно под канал производитель и канал потребитель. Но для реализованых каналов это не используется.
Для тех, кто знаком с Пайплайнами, общий подход для начала работы покажется знакомым. А именно. Мы создаем 1 центральный класс, из которого вытаскиваем отдельно производителей(CannelWriter) и потребителей(ChannelReader). Несмотря на названия, стоит помнить, что это именно производитель/потребитель, а не читатель/писатель из еще одной классической одноименной задачи на многопоточность. ChannelReader изменяет состояние общего channel (вытаскивает значение), которое более становится недоступно. А значит он скорее не читает, а потребляет. Но с реализацией мы ознакомимся позже.

Начало работы. Channel


Начало работы с каналами начинается с абстрактного класса Channel<T> и статического класса Channel, который создает наиболее подходящую реализацию. Далее из этого общего Channel можно получать ChannelWriter для записи в канал и ChannelReader для потребления из канала. Канал является хранилищем общей информации для ChannelWriter и ChannelReader, так, именно в нем хранятся все данные. А уже логика их записи или потребления рассредоточения в ChannelWriter и ChannelReader, Условно каналы можно разделить на 2 группы безграничные и ограниченные. Первые более простые по реализации, в них можно писать безгранично (пока память позволяет). Вторые же ограничены неким максимальным значением количества записей.

Здесь вытекает немного разная природа асинхронности. В неограниченных каналах операция записи всегда будет завершаться синхронно, нет ничего, что могло бы остановить от записи в канал. Для ограниченных каналов ситуация иная. При стандартном поведении (которое можно заменить) операция записи будет завершаться синхронно до тех пор пока в канале есть место для новых экземпляров. Как только канал заполнен, операция записи не завершится, пока не освободится место (после того, как потребитель потребил потребляемое). Поэтому здесь операция будет реально асинхронной со сменой потоков и сопутствующими изменениями (или без смены, что будет описано чуть позже).

Поведения читателей по большей части одинаково если в канале есть что-то, то читатель просто читает это и завершается синхронно. Если ничего нет, то ожидает пока кто-то что-то запишет.

Статический класс Channel содержит 4 метода для создания вышеперечисленных каналов:

Channel<T> CreateUnbounded<T>();Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);Channel<T> CreateBounded<T>(int capacity);Channel<T> CreateBounded<T>(BoundedChannelOptions options);

При желании можно указать более точные опции для создания канала, которые помогут оптимизировать его под указанные нужды.

UnboundedChannelOptions содержит 3 свойства, значение которых по умолчанию false:

  1. AllowSynchronousContinuations просто сумасводящая опция, которая позволяет выполнить продолжение асинхронной операции тому, кто ее разблокирует. А теперь по-простому. Допустим, мы писали в заполненный канал. Соответственно, операция прерывается, поток освобождается, а продолжение будет выполнено по завершению на новом потоке из пула. Но если включить эту опцию, продолжение выполнит тот, кто разблокирует операцию, то есть в нашем случае читатель. Это серьезно меняет внутреннее поведение и позволяет более экономно и производительно распоряжаться ресурсами, ведь зачем нам слать какие-то продолжения в какие-то потоки, если мы можем сами его выполнить;
  2. SingleReader указывает, что будет использоваться один потребитель. Опять же, это позволяет избавиться от некоторой лишней синхронизации;
  3. SingleWriter то же самое, только для писателя;

BoundedChannelOptions содержит те же 3 свойства и еще 2 сверху

  1. AllowSynchronousContinuations то же;
  2. SingleReader то же;
  3. SingleWriter то же;
  4. Capacity количество вмещаемых в канал записей. Данный параметр также является параметром конструктора;
  5. FullMode перечисление BoundedChannelFullMode, которое имеет 4 опции, определяет поведение при попытке записи в заполненный канал:
    • Wait ожидает освобождения места для завершения асинхронной операции
    • DropNewest записываемый элемент перезаписывает самый новый из существующих, завершается синхронно
    • DropOldest записываемый элемент перезаписывает самый старый из существующих завершается синхронно
    • DropWrite записываемый элемент не записывается, завершается синхронно


В зависимости от переданных параметров и вызванного метода будет создана одна из 3 реализаций: SingleConsumerUnboundedChannel, UnboundedChannel, BoundedChannel. Но это не столь важно, ведь пользоваться каналом мы будем через базовый класс Channel<TWrite, TRead>.

У него есть 2 свойства:

  • ChannelReader<TRead> Reader { get; protected set; }
  • ChannelWriter<TWrite> Writer { get; protected set; }

А также, 2 оператора неявного приведения типа к ChannelReader<TRead> и ChannelWriter<TWrite>.

Пример начала работы с каналами:

Channel<int> channel = Channel.CreateUnbounded<int>();//Можно делать такChannelWriter<int> writer = channel.Writer;ChannelReader<int> reader = channel.Reader; //Или такChannelWriter<int> writer = channel;ChannelReader<int> reader = channel;

Данные хранятся в очереди. Для 3 типов используются 3 разные очереди ConcurrentQueue<T>, Deque<T> и SingleProducerSingleConsumerQueue<T>. На этом моменте мне показалось, что я устарел и пропустил кучу новых простейших коллекций. Но спешу огорчить они не для всех. Помечены internal, так что использовать их не получится. Но если вдруг они понадобятся на проде их можно найти здесь (SingleProducerConsumerQueue) и здесь (Deque). Реализация последней весьма проста. Советую ознакомится, ее очень быстро можно изучить.

Итак, приступим к изучению непосредственно ChannelReader и ChannelWriter, а также интересных деталей реализации. Они все сводятся к асинхронности без выделений памяти с помощью IValueTaskSource.

ChannelReader потребитель


При запросе объекта потребителя возвращается одна из реализаций абстрактного класса ChannelReader<T>. Опять же в отличие от Пайплайнов АПИ несложное и методов немного. Достаточно просто знать список методов, чтобы понять, как использовать это на практике.

Методы:

  1. Виртуальное get-only свойство Task Completion { get; }
    Обьект типа Task, который завершается, когда закрывается канал;
  2. Виртуальное get-only свойство int Count { get; }
    Тут сделает заострить внимание, что возвращается текущее количество доступных для чтения объектов;
  3. Виртуальное get-only свойство bool CanCount { get; }
    Показывает, доступно ли свойство Count;
  4. Абстрактный метод bool TryRead(out T item)
    Пытается потребить объект из канала. Возвращает bool, показывающий, получилось ли у него прочитать. Результат помещается в out параметр (или null, если не получилось);
  5. Абстрактный ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
    Возвращается ValueTask со значением true, когда в канале появятся доступные для чтения данные, до тех пор задача не завершается. Возвращает ValueTask со значением false, когда канал закрывается(данных для чтения больше не будет);
  6. Виртуальный метод ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
    Потребляет значение из канала. Если значение есть, возвращается синхронно. В противном случае асинхронно ждет появления доступных для чтения данных и возвращает их.

    У данного метода в абстрактном классе есть реализация, которая основана на методах TryRead и WaitToReadAsync. Если опустить все инфраструктурные нюансы (исключения и cancelation tokens), то логика примерно такая попытаться прочитать объект с помощью TryRead. Если не удалось, то в цикле while(true) проверять результат метода WaitToReadAsync. Если true, то есть данные есть, вызвать TryRead. Если TryRead получается прочитать, то вернуть результат, в противном случае цикл по новой. Цикл нужен для неудачных попыток чтения в результате гонки потоков, сразу много потоков могут получить завершение WaitToReadAsync, но объект будет только один, соответственно только один поток сможет прочитать, а остальные уйдут на повторный круг.
    Однако данная реализация, как правило, переопределена на что-то более завязанное на внутреннем устройстве.


ChannelWriter производитель


Все аналогично потребителю, так что сразу смотрим методы:

  1. Виртуальный метод bool TryComplete(Exception? error = null)
    Пытается пометить канал как завершенный, т.е. показать, что в него больше не будет записано данных. В качестве необязательного параметра можно передать исключение, которое вызвало завершение канала. Возвращает true, если удалось завершить, в противном случае false (если канал уже был завершен или не поддерживает завршение);
  2. Абстрактный метод bool TryWrite(T item)
    Пытается записать в канал значение. Возвращает true, если удалось и false, если нет
  3. Абстрактный метод ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default)
    Возвращает ValueTask со значением true, который завершится, когда в канале появится место для записи. Значение false будет в том случае, если записи в канал более не будут разрешены;
  4. Виртуальный метод ValueTask WriteAsync(T item, CancellationToken cancellationToken = default)
    Асинхронно пишет в канал. Например, в случае, если канал заполнен, операция будет реально асинхронной и завершится только после освобождения места под данную запись;
  5. Метод void Complete(Exception? error = null)
    Просто пытается пометить канал как завершенный с помощью TryComplete, а в случае неудачи кидает исключение.

Небольшой пример вышеописанного (для легкого начала ваших собственных экспериментов):

Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();//Объекты ниже можно отправить в разные потоки, которые будут использовать их независимо в своих целяхChannelWriter<int> writer = unboundedChannel;ChannelReader<int> reader = unboundedChannel;//Первый поток может писать в каналint objectToWriteInChannel = 555;await writer.WriteAsync(objectToWriteInChannel);//И завершить его, при исключении или в случае, когда записал все, что хотелwriter.Complete();//Второй может читать данные из канала по мере их доступностиint valueFromChannel = await reader.ReadAsync();

А теперь перейдем к самой интересной части.

Асинхронность без алллокаций


В процессе написания и изучения кода, я осознал, что почти ничего интересного в реализации всех этих операций нет. В общем можно описать так избегание лишних блокировок с помощью конкурентных коллекций и обильное использование ValueTask, который является структурой, что экономит память. Однако спешу напомнить, что не стоит быстрой заменой проходиться по всем файлам на вашей ПЭВМ и заменять все Task на ValueTask. Он имеет смысл только в случаях, когда операция в большинстве случаев завершается синхронно. Ведь, как мы помним, при асинхронности вполне вероятна смена потока, а значит и стек уже будет не тот, что прежде. Да и вообще, истинный профессионал в области производительности знает не оптимизируй до возникновения проблем.

Радует одно, в профессионалы я себя записывать не буду, а поэтому самое время разобраться, в чем же секрет написания асинхронного кода без выделений памяти, что на первый взгляд звучит слишком хорошо для правды. Но бывает и такое.

Интерфейс IValueTaskSource


Начнем наш путь с истоков структуры ValueTask, которая была добавлена в .net core 2.0 и дополнена в 2.1. Внутри этой структуры скрывается хитрое поле object _obj. Несложно догадаться, опираясь на говорящее название, что в этом поле может скрываться одна из 3 вещей null, Task/Task<T> или IValueTaskSource. На самом деле, это вытекает из способов создания ValueTask.

Как заверяет производитель, данную структуру следует использовать лишь очевидно с ключевым словом await. То есть не следует применять await много раз к одному и тому же ValueTask, использовать комбинаторы, добавлять несколько продолжений и тп. Также не следует получать результат из ValueTask более одного раза. А связано это как раз с тем, что мы пытаемся понять переиспользованием всего этого добра без выделения памяти.

Я уже упомянул интерфейс IValueTaskSource. Именно он помогает сэкономить память. Делается это с помощью переиспользования самого IValueTaskSource несколько раз для множества задач. Но именно из-за этого переиспользования и нет возможности баловаться с ValueTask.

Итак, IValueTaskSource. Данный интерфейс имеет 3 метода, реализовав которые вы будете успешно экономить память и время на выделении тех заветных байт.

  1. GetResult Вызывается единожды, когда в стейт машине, образованной на рантайме для асинхронных методов, понадобится результат. В ValueTask есть метод GetResult, который и вызывает одноименный метод интерфейса, который, как мы помним, может хранится в поле _obj.
  2. GetStatus Вызывается стейт машиной для определения состояния операции. Также через ValueTask.
  3. OnCompleted Опять же, вызывается стейт машиной для добавления продолжения к невыполненной на тот момент задаче.

Но несмотря на простой интерфейс, реализация потребует определенной сноровки. И тут можно вспомнить про то, с чего мы начали Channels. В данной реализации используется класс AsyncOperation, который является реализацией IValueTaskSource. Данный класс скрыт за модификатором доступа internal. Но это не мешает разобраться, в основных механизмах. Напрашивается вопрос, почему не дать реализацию IValueTaskSource в массы? Первая причина (хохмы ради) когда в руках молоток, повсюду гвозди, когда в руках реализация IValueTaskSource, повсюду неграмотная работа с памятью. Вторая причина (более правдоподобная) в то время, как интерфейс прост и универсален, реальная реализация оптимальна при использований определенных нюансов применения. И вероятно именно по этой причине можно найти реализации в самых разных частях великого и могучего .net, как то AsyncOperation под капотом каналов, AsyncIOOperation внутри нового API сокетов и тд.

CompareExchange


Довольно популярный метод популярного класса, позволяющий избежать накладных расходов на классические примитивы синхронизации. Думаю, большинство знакомы с ним, но все же стоит описать в 3 словах, ведь данная конструкция используется довольно часто в AsyncOperation.
В массовой литературе данную функцию называют compare and swap (CAS). В .net она доступна в классе Interlocked.

Сигнатура следующая:

public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;

Имеются также перегрузи с int, long, float, double, IntPtr, object.

Сам метод атомарный, то бишь выполняется без прерываний. Сравнивает 2 значения и, если они равны, выполняет присваивание нового значения в переменную. Решают проблему, когда нужно проверить значение переменной и в зависимости от него изменить переменную.

Допустим, вы хотите инкрементировать переменную, если ее значение меньше 10.

Далее идут 2 потока.

Поток 1 Поток 2
Проверяет значение переменной на некоторое условие (то есть меньше ли оно 10), которое срабатывает -
Между проверкой и изменением значения Присваивает переменной значение, не удовлетворяющее условию (например, 15)
Изменяет значение, хотя не должен, ведь условие уже не соблюдается -


При использовании данного метода, вы либо изменяете именно то значение, что хотели, либо не изменяете, получив при этом актуальное значение переменной.

location1 переменная, значение которой мы хотим поменять. Оно сравнивается с comparand, в случае равенства в location1 записывается value. Если операция удалась, то метод вернет прошлое значение переменной location1. Если же нет, то будет возращено актуальное значение location1.
Если говорить чуть глубже, то существует инструкция языка ассемблера cmpxchg, которая выполняет эти действия. Именно она и используется под капотом.

Stack dive


Рассматривая весь этот код я не раз наткнулся на упоминания Stack Dive. Это очень крутая и интересная штука, которая на самом деле очень нежелательна. Суть в том, что при синхронном выполнении продолжений мы можем исчерпать ресурсы стека.

Допустим, мы имеем 10000 задач, в стиле

//code1await ...//code2

Допустим, первая задача завершает выполнение и этим освобождает продолжение второй, которое мы начинаем тут же выполнять синхронно в этом потоке, то есть забирая кусок стека стек фреймом данного продолжения. В свою очередь, данное продолжение разблокирует продолжение третей задачи, которое мы тоже начинаем сразу выполнять. И так далее. Если в продолжении больше нет await'ов или чего-то, что как-то сбросит стек, то мы просто будем потреблять стековое пространство до упора. Что может вызвать StackOverflow и крах приложения. В рассмотрении кода я упомяну, как с этим борется AsyncOperation.

AsyncOperation как реализация IValueTaskSource


Source code.

Внутри AsyncOperation есть поле _continuation типа Action<object>. Поле используется для, не поверите, продолжений. Но, как это часто бывает в слишком современном коде, у полей появляются дополнительные обязанности (как сборщик мусора и последний бит в ссылке на таблицу методов). Поле _continuation из той же серии. Есть 2 специальных значения, которые могут хранится в этом поле, кроме самого продолжения и null. s_availableSentinel и s_completedSentinel. Данные поля показывают, что операция доступна и завершена соответственно. Доступна она бывает как раз для переиспользования для совершенно асинхронной операции.

Также AsyncOperation реализует IThreadPoolWorkItem с единственным методом void Execute() => SetCompletionAndInvokeContinuation(). Метод SetCompletionAndInvokeContinuation как раз и занимается выполнением продолжения. И данный метод вызывается либо напрямую в коде AsyncOperation, либо через упомянутый Execute. Ведь типы реализующие IThreadPoolWorkItem можно забрасывать в тред пул как-то вот так ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false).

Метод Execute будет выполнен тред пулом.

Само выполнение продолжения довольно тривиально.

Продолжение _continuation копируется в локальную переменную, на ее место записывается s_completedSentinel искусственный объект-марионетка (иль часовой, не знаю, как глаголить мне в нашей речи), который указывает, что задача завершена. Ну а далее локальная копия реального продолжения просто выполняется. При наличии ExecutionContext, данные действия постятся в контекст. Никакого секрета тут нет. Этот код может быть вызван как напрямую классом просто вызвав метод, инкапсулирующий эти действия, так и через интерфейс IThreadPoolWorkItem в тред пуле. Теперь можно догадаться, как работает функция с выполнением продолжений синхронно.

Первый метод интерфейса IValueTaskSource GetResult (github).

Все просто, он:

  1. Инкрементирует _currentId.
    _currentId то, что идентифицирует конкретную операцию. После инкремента она уже не будет ассоциирована с этой операцией. Поэтому не следует получать результат дважды и тп;
  2. помещает в _continuation делегат-марионетку s_availableSentinel. Как было упомянуто, это показывает, что этот экземпляр AsyncOperation можно испоьзовать повторно и не выделять лишней памяти. Делается это не всегда, а лишь если это было разрешено в конструкторе (pooled = true);
  3. Возвращает поле _result.
    Поле _result просто устанавливается в методе TrySetResult который описан ниже.

Метод TrySetResult (github).

Метод тривиален. он сохраняет принятый параметр в _result и сигнализирует о завершении, а именно вызывает метод SignalCompleteion, который довольно интересен.

Метод SignalCompletion (github).

В данном методе используется все, о чем мы говорили в начале.

В самом начале, если _comtinuation == null, мы записываем марионетку s_completedSentinel.

Далее метод можно разделить на 4 блока. Сразу скажу для простоты понимания схемы, 4 блок просто синхронное выполнение продолжения. То есть тривиальное выполнение продолжения через метод, как я описано в абзаце про IThreadPoolWorkItem.

  1. Если _schedulingContext == null, т.е. нет захваченного контекста (это первый if).
    Далее необходимо проверить _runContinuationsAsynchronously == true, то есть явно указано, что продолжения нужно выполнять как все привыкли асинхронно (вложенный if).
    При соблюдении данный условий в бой идет схема с IThreadPoolWorkItem описанная выше. То есть AsyncOperation добавляется в очередь на выполнение потоком тред пула. И выходим из метода.
    Следует обратить внимание, что если первый if прошел (что будет очень часто, особенно в коре), а второй нет, то мы не попадем в 2 или 3 блок, а спустимся сразу на синхронное выполнение продолжения т.е. 4 блок;
  2. Если _schedulingContext is SynchronizationContext, то есть захвачен контекст синхронизации (это первый if).
    По аналогии мы проверяем _runContinuationsAsynchronously = true. Но этого не достаточно. Необходимо еще проверить, контекст потока, на котором мы сейчас находимся. Если он отличен от захваченного, то мы тоже не можем просто выполнить продолжение. Поэтому если одно из этих 2 условий выполнено, мы отправляем продолжение в контекст знакомым способом:
    sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
    

    И выходим из метода. опять же, если первая проверка прошла, а остальные нет (то есть мы сейчас находимся на том же контексте, что и был захвачен), мы попадем сразу на 4 блок синхронное выполнение продолжения;
  3. Выполняется, если мы не зашли в первые 2 блока. Но стоит расшифровать это условие.
    Хитрость в том, что _schedulingContext может быть на самом деле захваченным TaskScheduler, а не непосредственно контекстом. В этом случае мы поступаем также, как и в блоке 2, т.е. проверяем флаг _runContinuationsAsynchronously = true и TaskScheduler текущего потока. Если планировщик не совпадает или флаг не тот, то мы сетапим продолжение через Task.Factory.StartNew и передаем туда этот планировщик. И выходим из метода.
  4. Как и сказал в начале просто выполняем продолжение на текущем потоке. Раз мы до сюда дошли, то все условия для этого соблюдены.

Второй метод интерфейса IValueTaskSource GetStatus (github)
Просто как питерская пышка.

Если _continuation != _completedSentinel, то возвращаем ValueTaskSourceStatus.Pending
Если error == null, то возвращаем ValueTaskSourceStatus.Succeeded
Если _error.SourceException is OperationCanceledException, то возвращаем ValueTaskSourceStatus.Canceled
Ну а коль уж до сюда дошли, то возвращаем ValueTaskSourceStatus.Faulted

Третий и последний, но самый сложный метод интерфейса IValueTaskSource OnCompleted (github)

Метод добавляет продолжение, которое выполняется по завершению.

При необходимости захватывает ExecutionContext и SynchronizationContext.

Далее используется Interlocked.CompareExchange, описанный выше, чтобы сохранить продолжение в поле, сравнивая его с null. Напоминаю, что CompareExchange возвращает актуальное значение переменной.

Если сохранение продолжения прошло, то возвращается значение, которое было в переменной до обновления, то есть null. Это означает, что операция еще не завершилась на момент записи продолжения. И тот, кто ее завершит сам со всем разберется (как мы смотрели выше). И нам нет смысла выполнять какие-то дополнительные действия. И на этом работа метода завершается.

Если сохранить значение не получилось, то есть из CompareExchange вернулось что-то кроме null. В этом случае кто-то успел положить значение в быстрее нас. То есть произошла одна из 2 ситуаций или задача завершилась быстрее, чем мы до сюда дошли, или была попытка записать более 1 продолжения, что делать нельзя.

Таким образом проверяем возвращенное значение, равно ли оно s_completedSentinel именно оно было бы записано в случае завершения.

  • Если это не s_completedSentinel, то нас использовали не по плану попытались добавить более одного продолжения. То есть то, которое уже записано, и то, которое пишем мы. А это исключительная ситуация;
  • Если это s_completedSentinel, то это один из допустимых исходов, операция уже завершена и продолжение должны вызвать мы, здесь и сейчас. И оно будет выполнено асинхронно в любом случае, даже если _runContinuationsAsynchronously = false.
    Сделано это так, потому что если мы дошли до этого места, значит мы внутри метода OnCompleted, внутри awaiter'а. А синхронное выполнение продолжений именно здесь грозит упомянутым стек дайвом. Сейчас вспомним, для чего нам нужна эта AsyncOperation System.Threading.Channels. А там ситуация может быть очень легко достигнута, если о ней не задуматься. Допустим, мы читатель в ограниченном канале. Мы читаем элемент и разблокируем писателя, выполняем его продолжение синхронно, что разблокирует очередного читателя(если читатель очень быстр или их несколько) и так далее. Тут стоит осознать тонкий момент, что именно внутри awaiter'а возможна эта ситуация, в других случаях продолжение выполнится и завершится, что освободит занятый стек фрейм. А постоянный зацеп новых продолжений вглубь стека порождается постоянным выполнением продолжения внутри awaiter'а.
    В целях избежания данной ситуации, несмотря ни на что необходимо запустить продолжение асинхронно. Выполняется по тем же схемам, что и первые 3 блока в методе SignalCompleteion просто в пуле, на контексте или через фабрику и планировщик

А вот и пример синхронных продолжений:

class Program    {        static async Task Main(string[] args)        {            Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions            {                AllowSynchronousContinuations = true            });            ChannelWriter<int> writer = unboundedChannel;            ChannelReader<int> reader = unboundedChannel;            Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");            var writerTask = Task.Run(async () =>            {                Thread.Sleep(500);                int objectToWriteInChannel = 555;                Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");                await writer.WriteAsync(objectToWriteInChannel);                Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");            });            //Blocked here because there are no items in channel            int valueFromChannel = await reader.ReadAsync();            Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");            await writerTask;            Console.Read();        }    }

Output:

Main, before await. Thread id: 1
Created thread for writing with delay, before await write. Thread id: 4
Main, after await (will be processed by created thread for writing). Thread id: 4
Created thread for writing with delay, after await write. Thread id: 4
Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru