Русский
Русский
English
Статистика
Реклама

Asynchronous

Лечим Java Reactor при помощи Kotlin Coroutines

17.01.2021 18:05:05 | Автор: admin

На текущей работе пишем на Reactor. Технология классная, но как всегда есть много НО. Некоторые вещи раздражают, код сложнее писать и читать, с ThreadLocal совсем беда. Решил посмотреть какие проблемы уйдут, если перейти на Kotlin Coroutines, а какие проблемы, наоборот, добавятся.

Карточка пациента

Для статьи написал маленький проект, воспроизведя проблемы, с которыми столкнулся на работе. Основной код здесь. Алгоритм специально не стал разбивать на отдельные методы, так лучше видно проблемы.

В двух словах об алгоритме:

Переводим деньги с одного счёта на другой, записывая транзакции о факте перевода.

Перевод идемпотентен, так что если транзакция уже есть в БД, то отвечаем клиенту, что всё хорошо. При вставке транзакции может вылететь DataIntegrityViolationException, это тоже значит, что транзакция уже есть.

Чтобы не уйти в минус есть проверка в коде + Optimistic lock, который не позволяет конкурентно обновлять счета. Чтобы он работал нужен retry и дополнительная обработка ошибок.

Для тех кому не нравится сам алгоритм

Алгоритм для проекта выбирал такой, чтобы воспроизвести проблемы, а не чтобы он был эффективным и архитектурно правильным. Вместо одной транзакции надо вставлять полупроводки, optimistic lock вообще не нужен (вместо него проверка положительности счета в sql), select + insert надо заменить на upsert.

Жалобы пациента

  1. Stacktrace не показывает каким образом мы попали в проблемное место.

  2. Код явно сложнее, чем был бы на блокирующих технологиях.

  3. Многоступенчатая вложенность кода из-за flatMap.

  4. Неудобная обработка ошибок и их выброс.

  5. Сложная обработка поведения для Mono.empty().

  6. Сложности с логированием, если надо в лог добавить что-то глобальное, например traceId. (в статье не описываю, но те же проблемы с другими ThreadLocal переменными, например SpringSecurity)

  7. Неудобно дебажить.

  8. Неявное api для параллелизации.

Ход лечения

Написал отдельный PR перехода с Java на Kotlin.

Интеграция почти везде гладкая.

Понадобилось добавить com.fasterxml.jackson.module:jackson-module-kotlin чтобы заработала сериализация data классов и org.jetbrains.kotlin.plugin.spring чтобы не прописывать везде open модификаторы.

В контроллере достаточно было написать suspend fun transfer(@RequestBody request: TransferRequest) вместо public Mono<Void> transfer(@RequestBody TransferRequest request)

В репозитории написал suspend fun save(account: Account): Account вместо Mono<Account> save(Account account); Единственное, репозитории не определяются, если в них только suspend функции, надо, чтобы хоть один метод работал с Reactor типами.

Тесты обернул в runBlocking { }, чтобы можно было вызывать suspend функции.

Для реализации Retry использовал библиотеку kotlin-retry. Единственное, в ней не было функции фильтрации по классу ошибки, но это было легко добавить (завёл PR).

Ну и, естественно, переписал алгоритм. Все детали опишу ниже по-отдельности.

Было:

public Mono<Void> transfer(String transactionKey, long fromAccountId,                           long toAccountId, BigDecimal amount) {  return transactionRepository.findByUniqueKey(transactionKey)    .map(Optional::of)    .defaultIfEmpty(Optional.empty())    .flatMap(withMDC(foundTransaction -> {      if (foundTransaction.isPresent()) {        log.warn("retry of transaction " + transactionKey);        return Mono.empty();      }      return accountRepository.findById(fromAccountId)        .switchIfEmpty(Mono.error(new AccountNotFound()))        .flatMap(fromAccount -> accountRepository.findById(toAccountId)          .switchIfEmpty(Mono.error(new AccountNotFound()))          .flatMap(toAccount -> {            var transactionToInsert = Transaction.builder()              .amount(amount)              .fromAccountId(fromAccountId)              .toAccountId(toAccountId)              .uniqueKey(transactionKey)              .build();            var amountAfter = fromAccount.getAmount().subtract(amount);            if (amountAfter.compareTo(BigDecimal.ZERO) < 0) {              return Mono.error(new NotEnoghtMoney());            }            return transactionalOperator.transactional(              transactionRepository.save(transactionToInsert)                .onErrorResume(error -> {                  //transaction was inserted on parallel transaction,                  //we may return success response                  if (error instanceof DataIntegrityViolationException             && error.getMessage().contains("TRANSACTION_UNIQUE_KEY")) {                    return Mono.empty();                  } else {                    return Mono.error(error);                  }                })                .then(accountRepository.transferAmount(                  fromAccount.getId(), fromAccount.getVersion(),                   amount.negate()                ))                .then(accountRepository.transferAmount(                  toAccount.getId(), toAccount.getVersion(), amount                ))            );          }));    }))    .retryWhen(Retry.backoff(3, Duration.ofMillis(1))      .filter(OptimisticLockException.class::isInstance)      .onRetryExhaustedThrow((__, retrySignal) -> retrySignal.failure())    )    .onErrorMap(      OptimisticLockException.class,      e -> new ResponseStatusException(        BANDWIDTH_LIMIT_EXCEEDED,        "limit of OptimisticLockException exceeded", e      )    )    .onErrorResume(withMDC(e -> {      log.error("error on transfer", e);      return Mono.error(e);    }));}

Стало:

suspend fun transfer(transactionKey: String, fromAccountId: Long,                     toAccountId: Long, amount: BigDecimal) {  try {    try {      retry(limitAttempts(3) + filter { it is OptimisticLockException }) {        val foundTransaction = transactionRepository          .findByUniqueKey(transactionKey)        if (foundTransaction != null) {          logger.warn("retry of transaction $transactionKey")          return@retry        }        val fromAccount = accountRepository.findById(fromAccountId)          ?: throw AccountNotFound()        val toAccount = accountRepository.findById(toAccountId)          ?: throw AccountNotFound()        if (fromAccount.amount - amount < BigDecimal.ZERO) {          throw NotEnoghtMoney()        }        val transactionToInsert = Transaction(          amount = amount,          fromAccountId = fromAccountId,          toAccountId = toAccountId,          uniqueKey = transactionKey        )        transactionalOperator.executeAndAwait {          try {            transactionRepository.save(transactionToInsert)          } catch (e: DataIntegrityViolationException) {            if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {              throw e;            }          }          accountRepository.transferAmount(            fromAccount.id!!, fromAccount.version, amount.negate()          )          accountRepository.transferAmount(            toAccount.id!!, toAccount.version, amount          )        }      }    } catch (e: OptimisticLockException) {      throw ResponseStatusException(        BANDWIDTH_LIMIT_EXCEEDED,         "limit of OptimisticLockException exceeded", e      )    }  } catch (e: Exception) {    logger.error(e) { "error on transfer" }    throw e;  }}

Stacktraces

Пожалуй, это самое главное.

Было:

o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockExceptionat c.g.c.v.r.services.Ledger.lambda$transfer$5(Ledger.java:75)...Caused by: c.g.c.v.r.OptimisticLockException: nullat c.g.c.v.r.repos.AccountRepositoryImpl.lambda$transferAmount$0(AccountRepositoryImpl.java:27)at r.c.p.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)  ...

Стало:

error on transfer o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockExceptionat c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:70)at c.g.c.v.r.services.Ledger$transfer$1.invokeSuspend(Ledger.kt)...Caused by: c.g.c.v.r.OptimisticLockException: nullat c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)...at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)(Coroutine boundary)at o.s.t.r.TransactionalOperatorExtensionsKt.executeAndAwait(TransactionalOperatorExtensions.kt:31)at c.g.c.v.r.services.Ledger$transfer$3.invokeSuspend(Ledger.kt:56)at com.github.michaelbull.retry.RetryKt$retry$3.invokeSuspend(Retry.kt:38)at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:35)at c.g.c.v.r.controllers.LedgerController$transfer$2$1.invokeSuspend(LedgerController.kt:20)at c.g.c.v.r.controllers.LedgerController$transfer$2.invokeSuspend(LedgerController.kt:19)at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)at o.s.c.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)(Coroutine creation stacktrace)at k.c.i.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)at k.c.i.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)...Caused by: c.g.c.v.r.OptimisticLockException: nullat c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)...at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)...

Скучные части стектрейсов я вырезал, пакеты сократил (забочусь о читателе, и без того длинно).

В Java очень куцая информация. Да, ошибка есть. Даже видно на какой строчке она вылетела. Только непонятно а как мы в эту строчку кода попали. В Kotlin версии виден весь трейс от контроллера.

Вот представьте себе, что вы видите ошибку в логе где-то на обращении в регулярно вызываемый метод. А кто его вызывал? Придётся по логам рядом искать. Это хорошо, если логи объединены через что-нибудь вроде traceId (thread name нам не поможет) и вообще логи есть.

Сложность кода

Kotlin версия выглядит проще, точно так же, как выглядел бы код на блокирующих технологиях. По крайне мере пока мы не вводим параллельные операции (тут ещё вопрос какой вариант сложнее писать: на блокирующих технологиях или с корутинами).

Многоступенчатая вложенность кода

Никаких flatMap. Добавились вложения из-за явных try catch, но схожая логика вся объявлена на одном уровне.

Было:

return accountRepository.findById(fromAccountId)  .switchIfEmpty(Mono.error(new AccountNotFound()))  .flatMap(fromAccount -> accountRepository.findById(toAccountId)    .switchIfEmpty(Mono.error(new AccountNotFound()))    .flatMap(toAccount -> {      ...    })

Стало:

val fromAccount = accountRepository.findById(fromAccountId)  ?: throw AccountNotFound()val toAccount = accountRepository.findById(toAccountId)  ?: throw AccountNotFound()...

Обработка ошибок и их выброс

Обработка ошибок теперь через обычный try catch, легко понять какой кусок кода мы обернули.

Было:

return transactionRepository.findByUniqueKey(transactionKey)  ...  .onErrorMap(    OptimisticLockException.class,    e -> new ResponseStatusException(      BANDWIDTH_LIMIT_EXCEEDED,       "limit of OptimisticLockException exceeded", e    )  )

Стало:

try {  val foundTransaction = transactionRepository    .findByUniqueKey(transactionKey)  ...} catch (e: OptimisticLockException) {  throw ResponseStatusException(    BANDWIDTH_LIMIT_EXCEEDED,     "limit of OptimisticLockException exceeded", e  )}

Ошибки выбрасывать можно просто через throw, а не возвращая объект ошибки. В Reactor меня особенно раздражают конструкции вида:

.flatMap(foo -> {  if (foo.isEmpty()) {     return Mono.error(new IllegalStateException());  } else {    return Mono.just(foo);  }})

Мне могут возразить, что это функциональный стиль, так и должно быть. Но именно из-за функционального стиля появляется дополнительная сложность кода.

Mono.empty()

Это заслуживает отдельного обсуждения. В реактор нельзя передавать null в качестве результата. При этом нельзя написать C5C.

Ide никак тебе не подсказывает, может ли этот конкретный mono быть пустым и обработал ли ты это. В рабочем проекте для меня именно это было источником тупых ошибок. То забудешь обработать, то после рефакторинга как-то не так работает.

В Kotlin будет not null тип, если ты точно знаешь, что результат будет. Или это будет nullable тип и компилятор обяжет тебя что-то с этим сделать.

Конкретно на нашем примере:

Было:

return transactionRepository.findByUniqueKey(transactionKey)  .map(Optional::of)  .defaultIfEmpty(Optional.empty())  .flatMap(foundTransaction -> {    if (foundTransaction.isPresent()) {      log.warn("retry of transaction " + transactionKey);      return Mono.empty();    }...

Стало:

val foundTransaction = transactionRepository  .findByUniqueKey(transactionKey)if (foundTransaction != null) {  logger.warn("retry of transaction $transactionKey")  return@retry}...

Может, как-то можно эту логику написать адекватнее на Reactor, но то что я нагуглил выглядит ещё хуже.

Логирование и контекст

Допустим, мы хотим всегда логировать traceId во время обработки запроса. ThreadLocal больше не работает, в том числе и MDC (контекст для логирования). Что делать?

Есть контекст. И в Reactor и в Coroutines контекст immutable, так что новое значение в MDC подбрасывать будет не так просто (нужно пересоздавать контекст).

Чтобы работало в Java надо написать фильтр, который сохранит traceId в контекст:

@Componentpublic class TraceIdFilter implements WebFilter {  @Override  public Mono<Void> filter(    ServerWebExchange exchange, WebFilterChain chain  ) {    var traceId = Optional.ofNullable(      exchange.getRequest().getHeaders().get("X-B3-TRACEID")    )      .orElse(Collections.emptyList())      .stream().findAny().orElse(UUID.randomUUID().toString());    return chain.filter(exchange)      .contextWrite(context ->        LoggerHelper.addEntryToMDCContext(context, "traceId", traceId)      );  }}

И каждый раз, когда мы хотим что-то залогировать, надо переносить traceId из контекста в MDC:

public static <T, R> Function<T, Mono<R>> withMDC(  Function<T, Mono<R>> block) {  return value -> Mono.deferContextual(context -> {    Optional<Map<String, String>> mdcContext = context      .getOrEmpty(MDC_ID_KEY);    if (mdcContext.isPresent()) {      try {        MDC.setContextMap(mdcContext.get());        return block.apply(value);      } finally {        MDC.clear();      }    } else {      return block.apply(value);    }  });}

Да, это опять Mono. Т.е. мы можем логировать только тогда, когда код позволяет вернуть Mono. Например вот так:

.onErrorResume(withMDC(e -> {  log.error("error on transfer", e);  return Mono.error(e);}))

В Kotlin проще. Нужно написать фильтр, чтобы сохранить traceId сразу в MDC:

@Componentclass TraceIdFilter : WebFilter {  override fun filter(    exchange: ServerWebExchange, chain: WebFilterChain  ): Mono<Void> {    val traceId = exchange.request.headers["X-B3-TRACEID"]?.first()     MDC.put("traceId", traceId ?: UUID.randomUUID().toString())    return chain.filter(exchange)  }}

И при создании корутины вызывать withContext(MDCContext()) { }

Каждый раз на время работы корутины, будет восстанавливаться MDC и в логах будет traceId. Во время записи в лог ни о чем задумываться не нужно.

Тут есть одно НО, об этом позже.

Дебаг

В Java Reactor проблемы с дебагом примерно те же, что и со стектрейсом: нужно думать о том, кто в каком порядке кого вызвал, ставить breakpoints в лямбдах и т.д..

С корутинами не замечаешь разницы с обычным кодом: работает stepOver, переменные подсвечиваются, видно стектрейс и по нему можно перемещаться (в нашем примере вплоть до контроллера).

Всё идеально, кроме возможности запускать suspend функции во время дебага. На это уже есть issue. Правда, надо сказать, что и в Java Reactor особо не получается в evaluate сделать то, что хочется.

Параллелизация

Предположим, я хочу ускорить алгоритм и запросить из базы аккаунты и транзакции параллельно, а не последовательно как сейчас.

Было:

return Mono.zip(  transactionRepository.findByUniqueKey(transactionKey)    .map(Optional::of)    .defaultIfEmpty(Optional.empty()),  accountRepository.findById(fromAccountId)    .switchIfEmpty(Mono.error(new AccountNotFound())),  accountRepository.findById(toAccountId)    .switchIfEmpty(Mono.error(new AccountNotFound())),).flatMap(withMDC(fetched -> {  var foundTransaction = fetched.getT1();  var fromAccount = fetched.getT2();  var toAccount = fetched.getT3();  if (foundTransaction.isPresent()) {    log.warn("retry of transaction " + transactionKey);    return Mono.empty();  }  ...}

Стало:

val foundTransactionAsync = GlobalScope.async(coroutineContext) {  logger.info("async fetch of transaction $transactionKey")  transactionRepository.findByUniqueKey(transactionKey)}val fromAccountAsync = GlobalScope.async(coroutineContext) {   accountRepository.findById(fromAccountId) }val toAccountAsync = GlobalScope.async(coroutineContext) {   accountRepository.findById(toAccountId) }if (foundTransactionAsync.await() != null) {  logger.warn("retry of transaction $transactionKey")  return@retry}val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()val toAccount = toAccountAsync.await() ?: throw AccountNotFound()

В Kotlin версии есть явное указание вот это выполни асинхронно, вместо выполни всё это в параллель в Reactor.

Что самое важное, код ведёт себя по-разному. В случае с Reactor мы создаем три параллельных запроса и продолжаем работу только после того, как все три завершатся. С корутинами мы запускаем все три запроса и ждать чего-то начинаем только при вызове foundTransactionAsync.await(). Таким образом, если transactionRepository.findByUniqueKey() выполнится быстрее, то мы завершим обработку, без ожидания accountRepository.findById() (эти операции отменятся).

В более сложных алгоритмах разница будет ощутимей. Не уверен, что вообще в Reactor получится написать такую версию алгоритма:

val foundTransactionAsync = GlobalScope.async(coroutineContext) {  logger.info("async fetch of transaction $transactionKey")  transactionRepository.findByUniqueKey(transactionKey)}val fromAccountAsync = GlobalScope.async(coroutineContext) {  accountRepository.findById(fromAccountId)}val toAccountAsync = GlobalScope.async(coroutineContext) {  accountRepository.findById(toAccountId)}if (foundTransactionAsync.await() != null) {  logger.warn("retry of transaction $transactionKey")  return@retry}val transactionToInsert = Transaction(  amount = amount,  fromAccountId = fromAccountId,  toAccountId = toAccountId,  uniqueKey = transactionKey)transactionalOperator.executeAndAwait {  try {    transactionRepository.save(transactionToInsert)  } catch (e: DataIntegrityViolationException) {    if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {      throw e;    }  }  val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()  val toAccount = toAccountAsync.await() ?: throw AccountNotFound()  if (fromAccount.amount - amount < BigDecimal.ZERO) {    throw NotEnoghtMoney()  }  accountRepository.transferAmount(    fromAccount.id!!, fromAccount.version, amount.negate()  )  accountRepository.transferAmount(    toAccount.id!!, toAccount.version, amount  )}

Здесь я ожидаю асинхронный запрос уже внутри открытой БД транзакции. Т.е. мы при работе с одним коннектом, ждём результата выполнения на другом коннекте. Так делать не стоит, скорее для примера, хоть оно и работает (пока коннектов хватает).

Побочные эффекты

Конечно, есть проблемы, куда без них.

Надо явно указывать context и scope

Чтобы программа работала как ожидается, надо:

  1. Каждому запросу назначить scope. Таким образом все порождаемые при обработке запроса корутины будут отменены все вместе, например, в случае ошибки.

  2. В каждом запросе проставить context. Зачем нам нужен контекст я рассказывал в разделе про логирование.

Spring не берет на себя заботу об этом вопросе, приходится в контроллере указывать явно:

@PutMapping("/transfer")suspend fun transfer(@RequestBody request: TransferRequest) {  coroutineScope {    withContext(MDCContext()) {      ledger.transfer(request.transactionKey, request.fromAccountId,                       request.toAccountId, request.amount)    }  }}

Естественно, можно объединить в одну функцию и потом через regexp проверить, что во всех методах есть нужный вызов. Но лучше была бы какая-то автоматизация для этого.

Передача context

В случае, если мы порождаем дополнительные корутины, необходимо явным образом передать текущий контекст. Вот пример:

val foundTransactionAsync = GlobalScope.async(coroutineContext) {  logger.info("async fetch of transaction $transactionKey")  transactionRepository.findByUniqueKey(transactionKey)}

По-умолчанию в async() указывать контекст не требуется (он будет пуст). Видимо, чтобы было меньше неявных вещей. Но как итог, в нашем случае, если забыть передать контекст, в логе не окажется traceId. Помните я писал, что не надо задумываться во время логирования? Вместо этого надо задумываться при создании корутины (что, конечно, лучше).

AOP и suspend

Автоматизацию, которую я упоминал в первом пункте, написать самому сложно. Потому что пока нельзя нормально написать aspect для suspend функции.

Я в итоге сумел написать такой аспект. Но для объяснения того, как это работает понадобится отдельная статья.

Надеюсь, появится более адекватный способ писать аспекты (попробую этому посодействовать).

Оценка лечения

Все проблемы исчезли. Добавилась пара новых, но оно терпимо.

Надо сказать, что корутины быстро развиваются и я ожидаю только улучшения работы с ними.

Видно, что команда JetBrains внимательно отнеслась к проблемам разработчиков. Насколько я знаю, где-то год назад всё ещё были проблемы с дебагом и стактрейсом, к примеру.

Самое главное, с корутинами не надо в голове держать все особенности работы Reactor и его могучий API. Ты просто пишешь код.

Подробнее..

Перевод Введение в асинхронное программирование на Python

03.07.2020 14:20:11 | Автор: admin
Всем привет. Подготовили перевод интересной статьи в преддверии старта базового курса Разработчик Python.



Введение


Асинхронное программирование это вид параллельного программирования, в котором какая-либо единица работы может выполняться отдельно от основного потока выполнения приложения. Когда работа завершается, основной поток получает уведомление о завершении рабочего потока или произошедшей ошибке. У такого подхода есть множество преимуществ, таких как повышение производительности приложений и повышение скорости отклика.



В последние несколько лет асинхронное программирование привлекло к себе пристальное внимание, и на то есть причины. Несмотря на то, что этот вид программирования может быть сложнее традиционного последовательного выполнения, он гораздо более эффективен.

Например, вместо того, что ждать завершения HTTP-запроса перед продолжением выполнения, вы можете отправить запрос и выполнить другую работу, которая ждет своей очереди, с помощью асинхронных корутин в Python.

Асинхронность это одна из основных причин популярности выбора Node.js для реализации бэкенда. Большое количество кода, который мы пишем, особенно в приложениях с тяжелым вводом-выводом, таком как на веб-сайтах, зависит от внешних ресурсов. В нем может оказаться все, что угодно, от удаленного вызова базы данных до POST-запросов в REST-сервис. Как только вы отправите запрос в один из этих ресурсов, ваш код будет просто ожидать ответа. С асинхронным программированием вы позволяете своему коду обрабатывать другие задачи, пока ждете ответа от ресурсов.

Как Python умудряется делать несколько вещей одновременно?




1. Множественные процессы

Самый очевидный способ это использование нескольких процессов. Из терминала вы можете запустить свой скрипт два, три, четыре, десять раз, и все скрипты будут выполняться независимо и одновременно. Операционная система сама позаботится о распределении ресурсов процессора между всеми экземплярами. В качестве альтернативы вы можете воспользоваться библиотекой multiprocessing, которая умеет порождать несколько процессов, как показано в примере ниже.

from multiprocessing import Processdef print_func(continent='Asia'):    print('The name of continent is : ', continent)if __name__ == "__main__":  # confirms that the code is under main function    names = ['America', 'Europe', 'Africa']    procs = []    proc = Process(target=print_func)  # instantiating without any argument    procs.append(proc)    proc.start()    # instantiating process with arguments    for name in names:        # print(name)        proc = Process(target=print_func, args=(name,))        procs.append(proc)        proc.start()    # complete the processes    for proc in procs:        proc.join()


Вывод:

The name of continent is :  AsiaThe name of continent is :  AmericaThe name of continent is :  EuropeThe name of continent is :  Africa


2. Множественные потоки

Еще один способ запустить несколько работ параллельно это использовать потоки. Поток это очередь выполнения, которая очень похожа на процесс, однако в одном процессе вы можете иметь несколько потоков, и у всех них будет общий доступ к ресурсам. Однако из-за этого написать код потока будет сложно. Аналогично, все тяжелую работу по выделению памяти процессора сделает операционная система, но глобальная блокировка интерпретатора (GIL) позволит только одному потоку Python запускаться в одну единицу времени, даже если у вас есть многопоточный код. Так GIL на CPython предотвращает многоядерную конкурентность. То есть вы насильно можете запуститься только на одном ядре, даже если у вас их два, четыре или больше.

import threading def print_cube(num):    """    function to print cube of given num    """    print("Cube: {}".format(num * num * num)) def print_square(num):    """    function to print square of given num    """    print("Square: {}".format(num * num)) if __name__ == "__main__":    # creating thread    t1 = threading.Thread(target=print_square, args=(10,))    t2 = threading.Thread(target=print_cube, args=(10,))     # starting thread 1    t1.start()    # starting thread 2    t2.start()     # wait until thread 1 is completely executed    t1.join()    # wait until thread 2 is completely executed    t2.join()     # both threads completely executed    print("Done!")


Вывод:

Square: 100Cube: 1000Done!


3. Корутины и yield:

Корутины это обобщение подпрограмм. Они используются для кооперативной многозадачности, когда процесс добровольно отдает контроль (yield) с какой-то периодичностью или в периоды ожидания, чтобы позволить нескольким приложениям работать одновременно. Корутины похожи на генераторы, но с дополнительными методами и небольшими изменениями в том, как мы используем оператор yield. Генераторы производят данные для итерации, в то время как корутины могут еще и потреблять данные.

def print_name(prefix):    print("Searching prefix:{}".format(prefix))    try :         while True:                # yeild used to create coroutine                name = (yield)                if prefix in name:                    print(name)    except GeneratorExit:            print("Closing coroutine!!") corou = print_name("Dear")corou.__next__()corou.send("James")corou.send("Dear James")corou.close()


Вывод:

Searching prefix:DearDear JamesClosing coroutine!!


4. Асинхронное программирование

Четвертый способ это асинхронное программирование, в котором не участвует операционная система. Со стороны операционной системы у вас останется один процесс, в котором будет всего один поток, но вы все еще сможете выполнять одновременно несколько задач. Так в чем тут фокус?

Ответ: asyncio

Asyncio модуль асинхронного программирования, который был представлен в Python 3.4. Он предназначен для использования корутин и future для упрощения написания асинхронного кода и делает его почти таким же читаемым, как синхронный код, из-за отсутствия callback-ов.

Asyncio использует разные конструкции: event loop, корутины и future.

  • event loop управляет и распределяет выполнение различных задач. Он регистрирует их и обрабатывает распределение потока управления между ними.
  • Корутины (о которых мы говорили выше) это специальные функции, работа которых схожа с работой генераторов в Python, с помощью await они возвращают поток управления обратно в event loop. Запуск корутины должен быть запланирован в event loop. Запланированные корутины будут обернуты в Tasks, что является типом Future.
  • Future отражает результат таска, который может или не может быть выполнен. Результатом может быть exception.


С помощью asyncio вы можете структурировать свой код так, чтобы подзадачи определялись как корутины и позволяли планировать их запуск так, как вам заблагорассудится, в том числе и одновременно. Корутины содержат точки yield, в которых мы определяем возможные точки переключения контекста. В случае, если в очереди ожидания есть задачи, то контекст будет переключен, в противном случае нет.

Переключение контекста в asyncio представляет собой event loop, который передает поток управления от одной корутины к другой.

В следующем примере, мы запускаем 3 асинхронных таска, которые по-отдельности делают запросы к Reddit, извлекают и выводят содержимое JSON. Мы используем aiohttp клиентскую библиотеку http, которая гарантирует, что даже HTTP-запрос будет выполнен асинхронно.

import signal  import sys  import asyncio  import aiohttp  import jsonloop = asyncio.get_event_loop()  client = aiohttp.ClientSession(loop=loop)async def get_json(client, url):      async with client.get(url) as response:        assert response.status == 200        return await response.read()async def get_reddit_top(subreddit, client):      data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=5')    j = json.loads(data1.decode('utf-8'))    for i in j['data']['children']:        score = i['data']['score']        title = i['data']['title']        link = i['data']['url']        print(str(score) + ': ' + title + ' (' + link + ')')    print('DONE:', subreddit + '\n')def signal_handler(signal, frame):      loop.stop()    client.close()    sys.exit(0)signal.signal(signal.SIGINT, signal_handler)asyncio.ensure_future(get_reddit_top('python', client))  asyncio.ensure_future(get_reddit_top('programming', client))  asyncio.ensure_future(get_reddit_top('compsci', client))  loop.run_forever()


Вывод:

50: Undershoot: Parsing theory in 1965 (http://personeltest.ru/away/jeffreykegler.github.io/Ocean-of-Awareness-blog/individual/2018/07/knuth_1965_2.html)12: Question about best-prefix/failure function/primal match table in kmp algorithm (http://personeltest.ru/aways/www.reddit.com/r/compsci/comments/8xd3m2/question_about_bestprefixfailure_functionprimal/)1: Question regarding calculating the probability of failure of a RAID system (http://personeltest.ru/aways/www.reddit.com/r/compsci/comments/8xbkk2/question_regarding_calculating_the_probability_of/)DONE: compsci336: /r/thanosdidnothingwrong -- banning people with python (http://personeltest.ru/aways/clips.twitch.tv/AstutePluckyCocoaLitty)175: PythonRobotics: Python sample codes for robotics algorithms (http://personeltest.ru/aways/atsushisakai.github.io/PythonRobotics/)23: Python and Flask Tutorial in VS Code (http://personeltest.ru/aways/code.visualstudio.com/docs/python/tutorial-flask)17: Started a new blog on Celery - what would you like to read about? (http://personeltest.ru/aways/www.python-celery.com)14: A Simple Anomaly Detection Algorithm in Python (http://personeltest.ru/aways/medium.com/@mathmare_/pyng-a-simple-anomaly-detection-algorithm-2f355d7dc054)DONE: python1360: git bundle (http://personeltest.ru/aways/dev.to/gabeguz/git-bundle-2l5o)1191: Which hashing algorithm is best for uniqueness and speed? Ian Boyd's answer (top voted) is one of the best comments I've seen on Stackexchange. (http://personeltest.ru/aways/softwareengineering.stackexchange.com/questions/49550/which-hashing-algorithm-is-best-for-uniqueness-and-speed)430: ARM launches Facts campaign against RISC-V (http://personeltest.ru/aways/riscv-basics.com/)244: Choice of search engine on Android nuked by Anonymous Coward (2009) (http://personeltest.ru/aways/android.googlesource.com/platform/packages/apps/GlobalSearch/+/592150ac00086400415afe936d96f04d3be3ba0c)209: Exploiting freely accessible WhatsApp data or Why does WhatsApp web know my phones battery level? (http://personeltest.ru/aways/medium.com/@juan_cortes/exploiting-freely-accessible-whatsapp-data-or-why-does-whatsapp-know-my-battery-level-ddac224041b4)DONE: programming


Использование Redis и Redis Queue RQ


Использование asyncio и aiohttp не всегда хорошая идея, особенно если вы пользуетесь более старыми версиями Python. К тому же, бывают моменты, когда вам нужно распределить задачи по разным серверам. В этом случае можно использовать RQ (Redis Queue). Это обычная библиотека Python для добавления работ в очередь и обработки их воркерами в фоновом режиме. Для организации очереди используется Redis база данных ключей/значений.

В примере ниже мы добавили в очередь простую функцию count_words_at_url с помощью Redis.

from mymodule import count_words_at_urlfrom redis import Redisfrom rq import Queueq = Queue(connection=Redis())job = q.enqueue(count_words_at_url, 'http://nvie.com')******mymodule.py******import requestsdef count_words_at_url(url):    """Just an example function that's called async."""    resp = requests.get(url)    print( len(resp.text.split()))    return( len(resp.text.split()))


Вывод:

15:10:45 RQ worker 'rq:worker:EMPID18030.9865' started, version 0.11.015:10:45 *** Listening on default...15:10:45 Cleaning registries for queue: default15:10:50 default: mymodule.count_words_at_url('http://nvie.com') (a2b7451e-731f-4f31-9232-2b7e3549051f)32215:10:51 default: Job OK (a2b7451e-731f-4f31-9232-2b7e3549051f)15:10:51 Result is kept for 500 seconds


Заключение


В качестве примера возьмем шахматную выставку, где один из лучших шахматистов соревнуется с большим количеством людей. У нас есть 24 игры и 24 человека, с которыми можно сыграть, и, если шахматист будет играть с ними синхронно, это займет не менее 12 часов (при условии, что средняя игра занимает 30 ходов, шахматист продумывает ход в течение 5 секунд, а противник примерно 55 секунд.) Однако в асинхронном режиме шахматист сможет делать ход и оставлять противнику время на раздумья, тем временем переходя к следующему противнику и деля ход. Таким образом, сделать ход во всех 24 играх можно за 2 минуты, и выиграны они все могут быть всего за один час.

Это и подразумевается, когда говорят о том, что асинхронность ускоряет работу. О такой быстроте идет речь. Хороший шахматист не начинает играть в шахматы быстрее, просто время более оптимизировано, и оно не тратится впустую на ожидание. Так это работает.

По этой аналогии шахматист будет процессором, а основная идея будет заключаться в том, чтобы процессор простаивал как можно меньше времени. Речь о том, чтобы у него всегда было занятие.

На практике асинхронность определяется как стиль параллельного программирования, в котором одни задачи освобождают процессор в периоды ожидания, чтобы другие задачи могли им воспользоваться. В Python есть несколько способов достижения параллелизма, отвечающих вашим требованиям, потоку кода, обработке данных, архитектуре и вариантам использования, и вы можете выбрать любой из них.



Узнать о курсе подробнее.


Подробнее..

Перевод Шпаргалка по Spring Boot WebClient

09.02.2021 02:22:17 | Автор: admin

В преддверии старта курса Разработчик на Spring Framework подготовили традиционный перевод полезного материала.

Также абсолютно бесплатно предлагаем посмотреть запись демо-урока на тему
Введение в облака, создание кластера в Mongo DB Atlas.


WebClient это неблокирующий, реактивный клиент для выполнения HTTP-запросов.

Время RestTemplate подошло к концу

Возможно, вы слышали, что время RestTemplate на исходе. Теперь это указано и в официальной документации:

NOTE: As of 5.0 this class is in maintenance mode, with only minor requests for changes and bugs to be accepted going forward. Please, consider using the org.springframework.web.reactive.client.WebClient which has a more modern API and supports sync, async, and streaming scenarios.

ПРИМЕЧАНИЕ: Начиная с версии 5.0, этот класс законсервирован и в дальнейшем будут приниматься только минорные запросы на изменения и на исправления багов. Пожалуйста, подумайте об использовании org.springframework.web.reactive.client.WebClient, который имеет более современный API и поддерживает синхронную, асинхронную и потоковую передачи.

Конечно, мы понимаем, что RestTemplate не исчезнет мгновенно, однако новой функциональности в нем уже не будет. По этой причине в Интернете можно видеть десятки вопросов о том, что такое WebClient и как его использовать. В этой статье вы найдете советы по использованию новой библиотеки.

Отличия между WebClient и RestTemplate

Если в двух словах, то основное различие между этими технологиями заключается в том, что RestTemplate работает синхронно (блокируя), а WebClient работает асинхронно (не блокируя).

RestTemplate это синхронный клиент для выполнения HTTP-запросов, он предоставляет простой API с шаблонным методом поверх базовых HTTP-библиотек, таких как HttpURLConnection (JDK), HttpComponents (Apache) и другими.

Spring WebClient это асинхронный, реактивный клиент для выполнения HTTP-запросов, часть Spring WebFlux.

Вам, вероятно, интересно, как можно заменить синхронного клиента на асинхронный. У WebClient есть решение этой задачи. Мы рассмотрим несколько примеров использования WebClient.

А сейчас настало время попрощаться с RestTemplate , сказать ему спасибо и продолжить изучение WebClient.

Начало работы с WebClient

Предварительные условия

Подготовка проекта

Давайте создадим базовый проект с зависимостями, используя Spring Initializr.

Теперь взглянем на зависимости нашего проекта. Самая важная для нас зависимость spring-boot-starter-webflux.

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId> </dependency>

Spring WebFlux является частью Spring 5 и обеспечивает поддержку реактивного программирования для веб-приложений.

Пришло время настроить WebClient.

Настройка WebClient

Есть несколько способов настройки WebClient. Первый и самый простой создать его с настройками по умолчанию.

WebClient client = WebClient.create();

Можно также указать базовый URL:

WebClient client = WebClient.create("http://base-url.com");

Третий и самый продвинутый вариант создать WebClient с помощью билдера. Мы будем использовать конфигурацию, которая включает базовый URL и таймауты.

@Configurationpublic class WebClientConfiguration {    private static final String BASE_URL = "https://jsonplaceholder.typicode.com";    public static final int TIMEOUT = 1000;    @Bean    public WebClient webClientWithTimeout() {        final var tcpClient = TcpClient                .create()                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, TIMEOUT)                .doOnConnected(connection -> {                    connection.addHandlerLast(new ReadTimeoutHandler(TIMEOUT, TimeUnit.MILLISECONDS));                    connection.addHandlerLast(new WriteTimeoutHandler(TIMEOUT, TimeUnit.MILLISECONDS));                });        return WebClient.builder()                .baseUrl(BASE_URL)                .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))                .build();    }}

Параметры, поддерживаемые WebClient.Builder можно посмотреть здесь.

Подготовка запроса с помощью Spring WebClient

WebClient поддерживает методы: get(), post(), put(), patch(), delete(), options() и head().

Также можно указать следующие параметры:

  • Переменные пути (path variables) и параметры запроса с помощью метода uri().

  • Заголовки запроса с помощью метода headers().

  • Куки с помощью метода cookies().

После указания параметров можно выполнить запрос с помощью retrieve() или exchange(). Далее мы преобразуем результат в Mono с помощью bodyToMono() или во Flux с помощью bodyToFlux().

Асинхронный запрос

Давайте создадим сервис, который использует бин WebClient и создает асинхронный запрос.

@Service@AllArgsConstructorpublic class UserService {    private final WebClient webClient;    public Mono<User> getUserByIdAsync(final String id) {        return webClient                .get()                .uri(String.join("", "/users/", id))                .retrieve()                .bodyToMono(User.class);    }}

Как вы видите, мы не сразу получаем модель User. Вместо User мы получаем Mono-обертку, с которой выполняем различные действия. Давайте подпишемся неблокирующим способом, используя subscribe().

userService  .getUserByIdAsync("1")  .subscribe(user -> log.info("Get user async: {}", user));

Выполнение продолжается сразу без блокировки на методе subscribe(), даже если для получения значения будет требоваться некоторое время.

Синхронный запрос

Если вам нужен старый добрый синхронный вызов, то это легко сделать с помощью метода block().

public User getUserByIdSync(final String id) {    return webClient            .get()            .uri(String.join("", "/users/", id))            .retrieve()            .bodyToMono(User.class)            .block();}

Здесь поток блокируется, пока запрос не выполнится. В этом случае мы получаем запрашиваемую модель сразу же после завершения выполнения метода.

Повторные попытки

Мы все знаем, что сетевой вызов не всегда может быть успешным. Но мы можем перестраховаться и в некоторых случаях выполнить его повторно. Для этого используется метод retryWhen(), который принимает в качестве аргумента класс response.util.retry.Retry.

public User getUserWithRetry(final String id) {    return webClient            .get()            .uri(String.join("", "/users/", id))            .retrieve()            .bodyToMono(User.class)            .retryWhen(Retry.fixedDelay(3, Duration.ofMillis(100)))            .block();}

С помощью билдера можно настроить параметры и различные стратегии повтора (например, экспоненциальную). Если вам нужно повторить успешную попытку, то используйте repeatWhen() или repeatWhenEmpty() вместо retryWhen().

Обработка ошибок

В случае ошибки, когда повторная попытка не помогает, мы все еще можем контролировать ситуацию с помощью резервного варианта. Доступны следующие методы:

  • doOnError() срабатывает, когда Mono завершается с ошибкой.

  • onErrorResume() при возникновении ошибки подписывается на резервного издателя, используя функцию для выбора действия в зависимости от ошибки.

Вы можете использовать эти функции для вызова другого сервиса, выбрасывания исключения, записи в лог или выполнения любого действия в зависимости от ошибки.

public User getUserWithFallback(final String id) {    return webClient            .get()            .uri(String.join("", "/broken-url/", id))            .retrieve()            .bodyToMono(User.class)            .doOnError(error -> log.error("An error has occurred {}", error.getMessage()))            .onErrorResume(error -> Mono.just(new User()))            .block();}

В некоторых ситуациях может быть полезно отреагировать на конкретный код ошибки. Для этого можно использовать метод onStatus().

public User getUserWithErrorHandling(final String id) {  return webClient          .get()          .uri(String.join("", "/broken-url/", id))          .retrieve()              .onStatus(HttpStatus::is4xxClientError,                      error -> Mono.error(new RuntimeException("API not found")))              .onStatus(HttpStatus::is5xxServerError,                      error -> Mono.error(new RuntimeException("Server is not responding")))          .bodyToMono(User.class)          .block();}

Клиентские фильтры

Для перехвата и изменения запроса можно настроить фильтры через билдер WebClient .

WebClient.builder()  .baseUrl(BASE_URL)  .filter((request, next) -> next          .exchange(ClientRequest.from(request)                  .header("foo", "bar")                  .build()))  .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))  .build();

Ниже приведен пример фильтра для базовой аутентификации с помощью статического фабричного метода.

WebClient.builder()  .baseUrl(BASE_URL)  .filter(basicAuthentication("user", "password")) // org.springframework.web.reactive.function.client.basicAuthentication()  .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))  .build();

Заключение

В этой статье мы узнали, как настроить WebClient и выполнять синхронные и асинхронные HTTP-запросы. Все фрагменты кода, упомянутые в статье, можно найти в GitHub-репозитории. Документацию по Spring WebClient вы можете найти здесь.

Подводя итог, мы видим, что WebClient прост в использовании и включает в себя все необходимые функции, необходимые в современном программировании.

Удачи с новым Spring WebClient!


Узнать подробнее о курсе Разработчик на Spring Framework.

Посмотреть запись демо-урока на тему Введение в облака, создание кластера в Mongo DB Atlas.

Подробнее..

Шаблонные функции в Python, которые могут выполнятся синхронно и асинхронно

02.11.2020 00:07:43 | Автор: admin
image

Сейчас практически каждый разработчик знаком с понятием асинхронность в программировании. В эру, когда информационные продукты настолько востребованы, что вынуждены обрабатывать одновременно огромное количество запросов и также параллельно взаимодействовать с большим набором других сервисов без асинхронного программирования никуда. Потребность оказалась такой большой, что был даже создан отдельный язык, главной фишкой которого (помимо минималистичности) является очень оптимизированная и удобная работа с параллельным/конкурентным кодом, а именно Golang. Несмотря на то, что статья совершенно не про него, я буду часто делать сравнения и ссылаться. Но вот в Python, про который и пойдёт речь в этой статье есть некоторые проблемы, которые я опишу и предложу решение одной из них. Если заинтересовала эта тема прошу под кат.



Так получилось, что мой любимый язык, с помощью которого я работаю, реализую пет-проекты и даже отдыхаю и расслабляюсь это Python. Я бесконечно покорён его красотой и простотой, его очевидностью, за которой с помощью различного рода синтаксического сахара скрываются огромные возможности для лаконичного описания практически любой логики, на которую способно человеческое воображение. Где-то даже читал, что Python называют сверхвысокоуровневым языком, так как с его помощью можно описывать такие абстракции, которые на других языках описать будет крайне проблематично.

Но есть один серьёзный нюанс Python очень тяжело вписывается в современные представления о языке с возможностью реализации параллельной/конкурентной логики. Язык, идея которого зародилась ещё в 80-ых годах и являющийся ровесником Java до определённого времени не предполагал выполнение какого-либо кода конкурентно. Если JavaScript изначально требовал конкурентности для неблокирующей работы в браузере, а Golang совсем свежий язык с реальным пониманием современных потребностей, то перед Python таких задач ранее не стояло.

Это, конечно же, моё личное мнение, но мне кажется, что Python очень сильно опоздал с реализацией асинхронности, так как появление встроенной библиотеки asyncio было, скорее, реакцией на появление других реализаций конкуретного выполнения кода для Python. По сути, asyncio создан для поддержки уже существующих реализаций и содержит не только собственную реализацию событийного цикла, но также и обёртку для других асинхронных библиотек, тем самым предлагая общий интерфейс для написания асинхронного кода. И Python, который изначально создавался как максимально лаконичный и читабельный язык из-за всех перечисленных выше факторов при написании асинхронного кода становится нагромождением декораторов, генераторов и функций. Ситуацию немного исправило добавление специальных директив async и await (как в JavaScript, что важно) (исправил, спасибо пользователю tmnhy), но общие проблемы остались.

Я не буду перечислять их все и остановлюсь на одной, которую я и попытался решить: это описание общей логики для асинхронного и синхронного выполнения. Например, если я захочу в Golang запустить функцию параллельно, то мне достаточно просто вызвать функцию с директивой go:

Параллельное выполнение function в Golang
package mainimport "fmt"func function(index int) {    fmt.Println("function", index)}func main() {    for i := 0; i < 10; i++ {         go function(i)    }    fmt.Println("end")}


При этом в Golang я могу запустить эту же функцию синхронно:

Последовательное выполнение function в Golang
package mainimport "fmt"func function(index int) {    fmt.Println("function", index)}func main() {    for i := 0; i < 10; i++ {         function(i)    }    fmt.Println("end")}


В Python все корутины (асинхронные функции) основаны на генераторах и переключение между ними происходит во время вызова блокирующих функций, возвращая управление событийному циклу с помощью директивы yield. Честно признаюсь, я не знаю, как работает параллельность/конкурентность в Golang, но не ошибусь, если скажу, что работает это совсем не так, как в Python. Несмотря даже на существующие различия во внутренностях реализации компилятора Golang и интерпретатора CPython и на недопустимость сравнения параллельности/конкурентности в них, всё же я это сделаю и обращу внимание не на само выполнение, а именно на синтаксис. В Python я не могу взять функцию и запустить её параллельно/конкурентно одним оператором. Чтобы моя функция смогла работать асинхронно, я должен явно прописать перед её объявлением async и после этого она уже не является просто функцией, она является уже корутиной. И я не могу без дополнительных действий смешивать их вызовы в одном коде, потому что функция и корутина в Python совсем разные вещи, несмотря на схожесть в объявлении.

def func1(a, b):    func2(a + b)    await func3(a - b)  # Ошибка, так как await может выполняться только в корутинах

Моей основной проблемой оказалась необходимость разрабатывать логику, которая может работать и синхронно, и асинхронно. Простым примером является моя библиотека по взаимодействию с Instagram, которую я давно забросил, но сейчас снова за неё взялся (что и сподвигло меня на поиск решения). Я хотел реализовать в ней возможность работать с API не только синхронно, но и асинхронно, и это было не просто желание при сборе данных в Интернет можно отправить большое количество запросов асинхронно и быстрее получить ответ на них всех, но при этом массивный сбор данных не всегда нужен. В данный момент в библиотеке реализовано следующее: для работы с Instagram есть 2 класса, один для синхронной работы, другой для асинхронной. В каждом классе одинаковый набор методов, только в первом методы синхронные, а во втором асинхронные. Каждый метод выполняет одно и то же за исключением того, как отправляются запросы в Интернет. И только из-за различий одного блокирующего действия мне пришлось практически полностью продублировать логику в каждом методе. Выглядит это примерно так:

class WebAgent:    def update(self, obj=None, settings=None):        ...        response = self.get_request(path=path, **settings)        ...class AsyncWebAgent:    async def update(self, obj=None, settings=None):        ...        response = await self.get_request(path=path, **settings)        ...

Всё остальное в методе update и в корутине update абсолютно идентичное. А как многие знают, дублирование кода добавляет очень много проблем, особенно остро это ощущается в исправлении багов и тестировании.

Для решения этой проблемы я написал собственную библиотеку pySyncAsync. Идея такова вместо обычных функций и корутин реализуется генератор, в дальнейшем я буду называть его шаблоном. Для того, чтобы выполнить шаблон его нужно сгенерировать как обычную функцию или как корутину. Шаблон при выполнении в тот момент, когда ему нужно выполнить внутри себя асинхронный или синхронный код возвращает с помощью yield специальный объект Call, который указывает, что именно вызвать и с какими аргументами. В зависимости от того, как будет сгенерирован шаблон как функция или как корутина таким образом и будут выполнятся методы, описанные в объекте Call.

Покажу небольшой пример шаблона, который предполагает возможность делать запросы в google:

Пример запросов в google с помощью pySyncAsync
import aiohttpimport requestsimport pysyncasync as psa# Регистрируем функцию для синхронного запроса в google# В скобочках указываем имя для дальнейшего указания в объекте Call@psa.register("google_request")def sync_google_request(query, start):    response = requests.get(        url="http://personeltest.ru/aways/google.com/search",        params={"q": query, "start": start},    )    return response.status_code, dict(response.headers), response.text# Регистрируем корутину для асинхронного запроса в google# В скобочках указываем имя для дальнейшенго указания в объекте Call@psa.register("google_request")async def async_google_request(query, start):    params = {"q": query, "start": start}    async with aiohttps.ClientSession() as session:        async with session.get(url="http://personeltest.ru/aways/google.com/search", params=params) as response:            return response.status, dict(response.headers), await response.text()# Шаблон для получения первых 100 результатовdef google_search(query):    start = 0    while start < 100:        # В Call аргументы передавать можно как угодно, они так же и будут переданы в google_request        call = Call("google_request", query, start=start)        yield call        status, headers, text = call.result        print(status)        start += 10if __name__ == "__main__":    # Синхронный запуск кода    sync_google_search = psa.generate(google_search, psa.SYNC)    sync_google_search("Python sync")    # Асинхронный запуск кода    async_google_search = psa.generate(google_search, psa.ASYNC)    loop = asyncio.get_event_loop()    loop.run_until_complete(async_google_search("Python async"))


Расскажу немного про внутреннее устройство библиотеки. Есть класс Manager, в котором регистрируются функции и корутины для вызова с помощью Call. Также есть возможность регистрировать шаблоны, но это необязательно. У класса Manager есть методы register, generate и template. Эти же методы в примере выше вызывались напрямую из pysyncasync, только они использовали глобальный экземпляр класса Manager, который уже создан в одном из модулей библиотеки. По факту можно создать свой экземпляр и от него вызывать методы register, generate и template, таким образом изолируя менеджеры друг от друга, если, например, возможен конфликт имён.

Метод register работает как декоратор и позволяет зарегистрировать функцию или корутину для дальнейшего вызова из шаблона. Декоратор register принимает в качестве аргумента имя, под которым в менеджере регистрируется функция или корутина. Если имя не указано, то функция или корутина регистрируется под своим именем.

Метод template позволяет зарегистрировать генератор как шаблон в менеджере. Это нужно для того, чтобы была возможность получить шаблон по имени.

Метод generate позволяет на основе шаблона сгенерировать функцию или корутину. Принимает два аргумента: первый имя шаблона или сам шаблон, второй sync или async во что генерировать шаблон в функцию или в корутину. На выходе метод generate отдаёт готовую функцию или корутину.

Приведу пример генерации шаблона, например, в корутину:

def _async_generate(self, template):    async def wrapper(*args, **kwargs):        ...        for call in template(*args, **kwargs):            callback = self._callbacks.get(f"{call.name}:{ASYNC}")            call.result = await callback(*call.args, **call.kwargs)        ...    return wrapper

Внутри генерируется корутина, которая просто итерируется по генератору и получает объекты класса Call, потом берёт ранее зарегистрированную корутину по имени (имя берёт из call), вызывает её с аргументами (которые тоже берёт из call) и результат выполнения этой корутины также сохраняет в call.

Объекты класса Call являются просто контейнерами для сохранения информации о том, что и как вызывать и также позволяют сохранить в себе результат. wrapper также может вернуть результат выполнения шаблона, для этого шаблон оборачивается в специальный класс Generator, который здесь не показан.

Некоторые нюансы я опустил, но суть, надеюсь, в общем донёс.

Если быть честным, эта статья была написана мною скорее для того, чтобы поделится своими мыслями о решении проблем с асинхронным кодом в Python и, самое главное, выслушать мнения хабравчан. Возможно, кого-то я натолкну на другое решение, возможно, кто-то не согласится именно с данной реализацией и подскажет, как можно сделать её лучше, возможно, кто-то расскажет, почему такое решение вообще не нужно и не стоит смешивать синхронный и асинхронный код, мнение каждого из вас для меня очень важно. Также я не претендую на истинность всех моих рассуждений в начале статьи. Я очень обширно размышлял на тему других ЯП и мог ошибиться, плюс есть возможность, что я могу путать понятия, прошу, если вдруг будут замечены какие-то несоответствия опишите в комментарии. Также буду рад, если будут поправки по синтаксису и пунктуации.

И спасибо за внимание к данному вопросу и к этой статье в частности!
Подробнее..
Категории: Python , Asynchronous , Asyncio , Generators

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru