Русский
Русский
English
Статистика
Реклама

Coroutines

Лечим Java Reactor при помощи Kotlin Coroutines

17.01.2021 18:05:05 | Автор: admin

На текущей работе пишем на Reactor. Технология классная, но как всегда есть много НО. Некоторые вещи раздражают, код сложнее писать и читать, с ThreadLocal совсем беда. Решил посмотреть какие проблемы уйдут, если перейти на Kotlin Coroutines, а какие проблемы, наоборот, добавятся.

Карточка пациента

Для статьи написал маленький проект, воспроизведя проблемы, с которыми столкнулся на работе. Основной код здесь. Алгоритм специально не стал разбивать на отдельные методы, так лучше видно проблемы.

В двух словах об алгоритме:

Переводим деньги с одного счёта на другой, записывая транзакции о факте перевода.

Перевод идемпотентен, так что если транзакция уже есть в БД, то отвечаем клиенту, что всё хорошо. При вставке транзакции может вылететь DataIntegrityViolationException, это тоже значит, что транзакция уже есть.

Чтобы не уйти в минус есть проверка в коде + Optimistic lock, который не позволяет конкурентно обновлять счета. Чтобы он работал нужен retry и дополнительная обработка ошибок.

Для тех кому не нравится сам алгоритм

Алгоритм для проекта выбирал такой, чтобы воспроизвести проблемы, а не чтобы он был эффективным и архитектурно правильным. Вместо одной транзакции надо вставлять полупроводки, optimistic lock вообще не нужен (вместо него проверка положительности счета в sql), select + insert надо заменить на upsert.

Жалобы пациента

  1. Stacktrace не показывает каким образом мы попали в проблемное место.

  2. Код явно сложнее, чем был бы на блокирующих технологиях.

  3. Многоступенчатая вложенность кода из-за flatMap.

  4. Неудобная обработка ошибок и их выброс.

  5. Сложная обработка поведения для Mono.empty().

  6. Сложности с логированием, если надо в лог добавить что-то глобальное, например traceId. (в статье не описываю, но те же проблемы с другими ThreadLocal переменными, например SpringSecurity)

  7. Неудобно дебажить.

  8. Неявное api для параллелизации.

Ход лечения

Написал отдельный PR перехода с Java на Kotlin.

Интеграция почти везде гладкая.

Понадобилось добавить com.fasterxml.jackson.module:jackson-module-kotlin чтобы заработала сериализация data классов и org.jetbrains.kotlin.plugin.spring чтобы не прописывать везде open модификаторы.

В контроллере достаточно было написать suspend fun transfer(@RequestBody request: TransferRequest) вместо public Mono<Void> transfer(@RequestBody TransferRequest request)

В репозитории написал suspend fun save(account: Account): Account вместо Mono<Account> save(Account account); Единственное, репозитории не определяются, если в них только suspend функции, надо, чтобы хоть один метод работал с Reactor типами.

Тесты обернул в runBlocking { }, чтобы можно было вызывать suspend функции.

Для реализации Retry использовал библиотеку kotlin-retry. Единственное, в ней не было функции фильтрации по классу ошибки, но это было легко добавить (завёл PR).

Ну и, естественно, переписал алгоритм. Все детали опишу ниже по-отдельности.

Было:

public Mono<Void> transfer(String transactionKey, long fromAccountId,                           long toAccountId, BigDecimal amount) {  return transactionRepository.findByUniqueKey(transactionKey)    .map(Optional::of)    .defaultIfEmpty(Optional.empty())    .flatMap(withMDC(foundTransaction -> {      if (foundTransaction.isPresent()) {        log.warn("retry of transaction " + transactionKey);        return Mono.empty();      }      return accountRepository.findById(fromAccountId)        .switchIfEmpty(Mono.error(new AccountNotFound()))        .flatMap(fromAccount -> accountRepository.findById(toAccountId)          .switchIfEmpty(Mono.error(new AccountNotFound()))          .flatMap(toAccount -> {            var transactionToInsert = Transaction.builder()              .amount(amount)              .fromAccountId(fromAccountId)              .toAccountId(toAccountId)              .uniqueKey(transactionKey)              .build();            var amountAfter = fromAccount.getAmount().subtract(amount);            if (amountAfter.compareTo(BigDecimal.ZERO) < 0) {              return Mono.error(new NotEnoghtMoney());            }            return transactionalOperator.transactional(              transactionRepository.save(transactionToInsert)                .onErrorResume(error -> {                  //transaction was inserted on parallel transaction,                  //we may return success response                  if (error instanceof DataIntegrityViolationException             && error.getMessage().contains("TRANSACTION_UNIQUE_KEY")) {                    return Mono.empty();                  } else {                    return Mono.error(error);                  }                })                .then(accountRepository.transferAmount(                  fromAccount.getId(), fromAccount.getVersion(),                   amount.negate()                ))                .then(accountRepository.transferAmount(                  toAccount.getId(), toAccount.getVersion(), amount                ))            );          }));    }))    .retryWhen(Retry.backoff(3, Duration.ofMillis(1))      .filter(OptimisticLockException.class::isInstance)      .onRetryExhaustedThrow((__, retrySignal) -> retrySignal.failure())    )    .onErrorMap(      OptimisticLockException.class,      e -> new ResponseStatusException(        BANDWIDTH_LIMIT_EXCEEDED,        "limit of OptimisticLockException exceeded", e      )    )    .onErrorResume(withMDC(e -> {      log.error("error on transfer", e);      return Mono.error(e);    }));}

Стало:

suspend fun transfer(transactionKey: String, fromAccountId: Long,                     toAccountId: Long, amount: BigDecimal) {  try {    try {      retry(limitAttempts(3) + filter { it is OptimisticLockException }) {        val foundTransaction = transactionRepository          .findByUniqueKey(transactionKey)        if (foundTransaction != null) {          logger.warn("retry of transaction $transactionKey")          return@retry        }        val fromAccount = accountRepository.findById(fromAccountId)          ?: throw AccountNotFound()        val toAccount = accountRepository.findById(toAccountId)          ?: throw AccountNotFound()        if (fromAccount.amount - amount < BigDecimal.ZERO) {          throw NotEnoghtMoney()        }        val transactionToInsert = Transaction(          amount = amount,          fromAccountId = fromAccountId,          toAccountId = toAccountId,          uniqueKey = transactionKey        )        transactionalOperator.executeAndAwait {          try {            transactionRepository.save(transactionToInsert)          } catch (e: DataIntegrityViolationException) {            if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {              throw e;            }          }          accountRepository.transferAmount(            fromAccount.id!!, fromAccount.version, amount.negate()          )          accountRepository.transferAmount(            toAccount.id!!, toAccount.version, amount          )        }      }    } catch (e: OptimisticLockException) {      throw ResponseStatusException(        BANDWIDTH_LIMIT_EXCEEDED,         "limit of OptimisticLockException exceeded", e      )    }  } catch (e: Exception) {    logger.error(e) { "error on transfer" }    throw e;  }}

Stacktraces

Пожалуй, это самое главное.

Было:

o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockExceptionat c.g.c.v.r.services.Ledger.lambda$transfer$5(Ledger.java:75)...Caused by: c.g.c.v.r.OptimisticLockException: nullat c.g.c.v.r.repos.AccountRepositoryImpl.lambda$transferAmount$0(AccountRepositoryImpl.java:27)at r.c.p.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)  ...

Стало:

error on transfer o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockExceptionat c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:70)at c.g.c.v.r.services.Ledger$transfer$1.invokeSuspend(Ledger.kt)...Caused by: c.g.c.v.r.OptimisticLockException: nullat c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)...at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)(Coroutine boundary)at o.s.t.r.TransactionalOperatorExtensionsKt.executeAndAwait(TransactionalOperatorExtensions.kt:31)at c.g.c.v.r.services.Ledger$transfer$3.invokeSuspend(Ledger.kt:56)at com.github.michaelbull.retry.RetryKt$retry$3.invokeSuspend(Retry.kt:38)at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:35)at c.g.c.v.r.controllers.LedgerController$transfer$2$1.invokeSuspend(LedgerController.kt:20)at c.g.c.v.r.controllers.LedgerController$transfer$2.invokeSuspend(LedgerController.kt:19)at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)at o.s.c.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)(Coroutine creation stacktrace)at k.c.i.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)at k.c.i.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)...Caused by: c.g.c.v.r.OptimisticLockException: nullat c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)...at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)...

Скучные части стектрейсов я вырезал, пакеты сократил (забочусь о читателе, и без того длинно).

В Java очень куцая информация. Да, ошибка есть. Даже видно на какой строчке она вылетела. Только непонятно а как мы в эту строчку кода попали. В Kotlin версии виден весь трейс от контроллера.

Вот представьте себе, что вы видите ошибку в логе где-то на обращении в регулярно вызываемый метод. А кто его вызывал? Придётся по логам рядом искать. Это хорошо, если логи объединены через что-нибудь вроде traceId (thread name нам не поможет) и вообще логи есть.

Сложность кода

Kotlin версия выглядит проще, точно так же, как выглядел бы код на блокирующих технологиях. По крайне мере пока мы не вводим параллельные операции (тут ещё вопрос какой вариант сложнее писать: на блокирующих технологиях или с корутинами).

Многоступенчатая вложенность кода

Никаких flatMap. Добавились вложения из-за явных try catch, но схожая логика вся объявлена на одном уровне.

Было:

return accountRepository.findById(fromAccountId)  .switchIfEmpty(Mono.error(new AccountNotFound()))  .flatMap(fromAccount -> accountRepository.findById(toAccountId)    .switchIfEmpty(Mono.error(new AccountNotFound()))    .flatMap(toAccount -> {      ...    })

Стало:

val fromAccount = accountRepository.findById(fromAccountId)  ?: throw AccountNotFound()val toAccount = accountRepository.findById(toAccountId)  ?: throw AccountNotFound()...

Обработка ошибок и их выброс

Обработка ошибок теперь через обычный try catch, легко понять какой кусок кода мы обернули.

Было:

return transactionRepository.findByUniqueKey(transactionKey)  ...  .onErrorMap(    OptimisticLockException.class,    e -> new ResponseStatusException(      BANDWIDTH_LIMIT_EXCEEDED,       "limit of OptimisticLockException exceeded", e    )  )

Стало:

try {  val foundTransaction = transactionRepository    .findByUniqueKey(transactionKey)  ...} catch (e: OptimisticLockException) {  throw ResponseStatusException(    BANDWIDTH_LIMIT_EXCEEDED,     "limit of OptimisticLockException exceeded", e  )}

Ошибки выбрасывать можно просто через throw, а не возвращая объект ошибки. В Reactor меня особенно раздражают конструкции вида:

.flatMap(foo -> {  if (foo.isEmpty()) {     return Mono.error(new IllegalStateException());  } else {    return Mono.just(foo);  }})

Мне могут возразить, что это функциональный стиль, так и должно быть. Но именно из-за функционального стиля появляется дополнительная сложность кода.

Mono.empty()

Это заслуживает отдельного обсуждения. В реактор нельзя передавать null в качестве результата. При этом нельзя написать C5C.

Ide никак тебе не подсказывает, может ли этот конкретный mono быть пустым и обработал ли ты это. В рабочем проекте для меня именно это было источником тупых ошибок. То забудешь обработать, то после рефакторинга как-то не так работает.

В Kotlin будет not null тип, если ты точно знаешь, что результат будет. Или это будет nullable тип и компилятор обяжет тебя что-то с этим сделать.

Конкретно на нашем примере:

Было:

return transactionRepository.findByUniqueKey(transactionKey)  .map(Optional::of)  .defaultIfEmpty(Optional.empty())  .flatMap(foundTransaction -> {    if (foundTransaction.isPresent()) {      log.warn("retry of transaction " + transactionKey);      return Mono.empty();    }...

Стало:

val foundTransaction = transactionRepository  .findByUniqueKey(transactionKey)if (foundTransaction != null) {  logger.warn("retry of transaction $transactionKey")  return@retry}...

Может, как-то можно эту логику написать адекватнее на Reactor, но то что я нагуглил выглядит ещё хуже.

Логирование и контекст

Допустим, мы хотим всегда логировать traceId во время обработки запроса. ThreadLocal больше не работает, в том числе и MDC (контекст для логирования). Что делать?

Есть контекст. И в Reactor и в Coroutines контекст immutable, так что новое значение в MDC подбрасывать будет не так просто (нужно пересоздавать контекст).

Чтобы работало в Java надо написать фильтр, который сохранит traceId в контекст:

@Componentpublic class TraceIdFilter implements WebFilter {  @Override  public Mono<Void> filter(    ServerWebExchange exchange, WebFilterChain chain  ) {    var traceId = Optional.ofNullable(      exchange.getRequest().getHeaders().get("X-B3-TRACEID")    )      .orElse(Collections.emptyList())      .stream().findAny().orElse(UUID.randomUUID().toString());    return chain.filter(exchange)      .contextWrite(context ->        LoggerHelper.addEntryToMDCContext(context, "traceId", traceId)      );  }}

И каждый раз, когда мы хотим что-то залогировать, надо переносить traceId из контекста в MDC:

public static <T, R> Function<T, Mono<R>> withMDC(  Function<T, Mono<R>> block) {  return value -> Mono.deferContextual(context -> {    Optional<Map<String, String>> mdcContext = context      .getOrEmpty(MDC_ID_KEY);    if (mdcContext.isPresent()) {      try {        MDC.setContextMap(mdcContext.get());        return block.apply(value);      } finally {        MDC.clear();      }    } else {      return block.apply(value);    }  });}

Да, это опять Mono. Т.е. мы можем логировать только тогда, когда код позволяет вернуть Mono. Например вот так:

.onErrorResume(withMDC(e -> {  log.error("error on transfer", e);  return Mono.error(e);}))

В Kotlin проще. Нужно написать фильтр, чтобы сохранить traceId сразу в MDC:

@Componentclass TraceIdFilter : WebFilter {  override fun filter(    exchange: ServerWebExchange, chain: WebFilterChain  ): Mono<Void> {    val traceId = exchange.request.headers["X-B3-TRACEID"]?.first()     MDC.put("traceId", traceId ?: UUID.randomUUID().toString())    return chain.filter(exchange)  }}

И при создании корутины вызывать withContext(MDCContext()) { }

Каждый раз на время работы корутины, будет восстанавливаться MDC и в логах будет traceId. Во время записи в лог ни о чем задумываться не нужно.

Тут есть одно НО, об этом позже.

Дебаг

В Java Reactor проблемы с дебагом примерно те же, что и со стектрейсом: нужно думать о том, кто в каком порядке кого вызвал, ставить breakpoints в лямбдах и т.д..

С корутинами не замечаешь разницы с обычным кодом: работает stepOver, переменные подсвечиваются, видно стектрейс и по нему можно перемещаться (в нашем примере вплоть до контроллера).

Всё идеально, кроме возможности запускать suspend функции во время дебага. На это уже есть issue. Правда, надо сказать, что и в Java Reactor особо не получается в evaluate сделать то, что хочется.

Параллелизация

Предположим, я хочу ускорить алгоритм и запросить из базы аккаунты и транзакции параллельно, а не последовательно как сейчас.

Было:

return Mono.zip(  transactionRepository.findByUniqueKey(transactionKey)    .map(Optional::of)    .defaultIfEmpty(Optional.empty()),  accountRepository.findById(fromAccountId)    .switchIfEmpty(Mono.error(new AccountNotFound())),  accountRepository.findById(toAccountId)    .switchIfEmpty(Mono.error(new AccountNotFound())),).flatMap(withMDC(fetched -> {  var foundTransaction = fetched.getT1();  var fromAccount = fetched.getT2();  var toAccount = fetched.getT3();  if (foundTransaction.isPresent()) {    log.warn("retry of transaction " + transactionKey);    return Mono.empty();  }  ...}

Стало:

val foundTransactionAsync = GlobalScope.async(coroutineContext) {  logger.info("async fetch of transaction $transactionKey")  transactionRepository.findByUniqueKey(transactionKey)}val fromAccountAsync = GlobalScope.async(coroutineContext) {   accountRepository.findById(fromAccountId) }val toAccountAsync = GlobalScope.async(coroutineContext) {   accountRepository.findById(toAccountId) }if (foundTransactionAsync.await() != null) {  logger.warn("retry of transaction $transactionKey")  return@retry}val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()val toAccount = toAccountAsync.await() ?: throw AccountNotFound()

В Kotlin версии есть явное указание вот это выполни асинхронно, вместо выполни всё это в параллель в Reactor.

Что самое важное, код ведёт себя по-разному. В случае с Reactor мы создаем три параллельных запроса и продолжаем работу только после того, как все три завершатся. С корутинами мы запускаем все три запроса и ждать чего-то начинаем только при вызове foundTransactionAsync.await(). Таким образом, если transactionRepository.findByUniqueKey() выполнится быстрее, то мы завершим обработку, без ожидания accountRepository.findById() (эти операции отменятся).

В более сложных алгоритмах разница будет ощутимей. Не уверен, что вообще в Reactor получится написать такую версию алгоритма:

val foundTransactionAsync = GlobalScope.async(coroutineContext) {  logger.info("async fetch of transaction $transactionKey")  transactionRepository.findByUniqueKey(transactionKey)}val fromAccountAsync = GlobalScope.async(coroutineContext) {  accountRepository.findById(fromAccountId)}val toAccountAsync = GlobalScope.async(coroutineContext) {  accountRepository.findById(toAccountId)}if (foundTransactionAsync.await() != null) {  logger.warn("retry of transaction $transactionKey")  return@retry}val transactionToInsert = Transaction(  amount = amount,  fromAccountId = fromAccountId,  toAccountId = toAccountId,  uniqueKey = transactionKey)transactionalOperator.executeAndAwait {  try {    transactionRepository.save(transactionToInsert)  } catch (e: DataIntegrityViolationException) {    if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {      throw e;    }  }  val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()  val toAccount = toAccountAsync.await() ?: throw AccountNotFound()  if (fromAccount.amount - amount < BigDecimal.ZERO) {    throw NotEnoghtMoney()  }  accountRepository.transferAmount(    fromAccount.id!!, fromAccount.version, amount.negate()  )  accountRepository.transferAmount(    toAccount.id!!, toAccount.version, amount  )}

Здесь я ожидаю асинхронный запрос уже внутри открытой БД транзакции. Т.е. мы при работе с одним коннектом, ждём результата выполнения на другом коннекте. Так делать не стоит, скорее для примера, хоть оно и работает (пока коннектов хватает).

Побочные эффекты

Конечно, есть проблемы, куда без них.

Надо явно указывать context и scope

Чтобы программа работала как ожидается, надо:

  1. Каждому запросу назначить scope. Таким образом все порождаемые при обработке запроса корутины будут отменены все вместе, например, в случае ошибки.

  2. В каждом запросе проставить context. Зачем нам нужен контекст я рассказывал в разделе про логирование.

Spring не берет на себя заботу об этом вопросе, приходится в контроллере указывать явно:

@PutMapping("/transfer")suspend fun transfer(@RequestBody request: TransferRequest) {  coroutineScope {    withContext(MDCContext()) {      ledger.transfer(request.transactionKey, request.fromAccountId,                       request.toAccountId, request.amount)    }  }}

Естественно, можно объединить в одну функцию и потом через regexp проверить, что во всех методах есть нужный вызов. Но лучше была бы какая-то автоматизация для этого.

Передача context

В случае, если мы порождаем дополнительные корутины, необходимо явным образом передать текущий контекст. Вот пример:

val foundTransactionAsync = GlobalScope.async(coroutineContext) {  logger.info("async fetch of transaction $transactionKey")  transactionRepository.findByUniqueKey(transactionKey)}

По-умолчанию в async() указывать контекст не требуется (он будет пуст). Видимо, чтобы было меньше неявных вещей. Но как итог, в нашем случае, если забыть передать контекст, в логе не окажется traceId. Помните я писал, что не надо задумываться во время логирования? Вместо этого надо задумываться при создании корутины (что, конечно, лучше).

AOP и suspend

Автоматизацию, которую я упоминал в первом пункте, написать самому сложно. Потому что пока нельзя нормально написать aspect для suspend функции.

Я в итоге сумел написать такой аспект. Но для объяснения того, как это работает понадобится отдельная статья.

Надеюсь, появится более адекватный способ писать аспекты (попробую этому посодействовать).

Оценка лечения

Все проблемы исчезли. Добавилась пара новых, но оно терпимо.

Надо сказать, что корутины быстро развиваются и я ожидаю только улучшения работы с ними.

Видно, что команда JetBrains внимательно отнеслась к проблемам разработчиков. Насколько я знаю, где-то год назад всё ещё были проблемы с дебагом и стактрейсом, к примеру.

Самое главное, с корутинами не надо в голове держать все особенности работы Reactor и его могучий API. Ты просто пишешь код.

Подробнее..

Как заблокировать приложение с помощью runBlocking

10.02.2021 14:12:50 | Автор: admin

Когда мы начинаем изучать корутины, то идём и пробуем что-то простое с билдером runBlocking, поэтому многим он хорошо знаком. runBlocking запускает новую корутину, блокирует текущий поток и ждёт пока выполнится блок кода. Кажется, всё просто и понятно. Но что, если я скажу, что в runBlocking есть одна любопытная вещь, которая может заблокировать не только текущий поток, а вообще всё ваше приложение навсегда?

Напишите где-нибудь в UI потоке (например в методе onStart) такой код:

//где-то в UI потокеrunBlocking(Dispatchers.Main) {  println(Hello, World!)}

Вы получите дедлок приложение зависнет. Это не ошибка, а на 100% ожидаемое поведение. Тезис может показаться неочевидным и неявным, поэтому давайте погрузимся поглубже и я расскажу, что здесь происходит.


Сравним код выше с более низкоуровневым подходом с потоками. Вы можете написать в главном потоке вот так:

//где-то в UI потокеHandler().post {println("Hello, World!") // отработает в UI потоке}

Или даже так:

//где-то в UI потокеrunOnUiThread {  println("Hello, World!") // и это тоже отработает в UI потоке}

Вроде конструкция очень похожа на наш проблемный код, но здесь обе части кода работают (по-разному под капотом, но работают). Чем они отличаются от кода с runBlocking?

Как работает runBlocking

Для начала небольшой дисклеймер. runBlocking редко используется в продакшн коде Android-приложения. Обычно он предназначен для использования в синхронном коде, вроде функций main или unit-тестах.

Несмотря на это, мы всё-таки рассмотрим этот билдер при вызове в главном потоке Android-приложения потому, что:

  • Это наглядно. Ниже мы придем к тому, что это актуально и не только для UI-потока Android-приложения. Но для наглядности лучше всего подходит пример на UI-потоке.

  • Интересно разобраться, почему всё именно так работает.

  • Всё-таки иногда мы можем использовать runBlocking, пусть даже в тестовых приложениях.

Билдер runBlocking работает почти так же, как и launch: создает корутину и вызывает в ней блок кода. Но чтобы сделать вызов блокирующим runBlocking создает особую корутину под названием BlockingCoroutine, у которой есть дополнительная функция joinBlocking(). runBlocking вызывает joinBlocking() сразу же после запуска корутины.

Фрагмент из runBlocking():

// runBlocking() function// val coroutine = BlockingCoroutine<T>(newContext, )coroutine.start(CoroutineStart.DEFAULT, coroutine, block)return coroutine.joinBlocking()

Функция joinBlocking() использует механизм блокировки Java LockSupport для блокировки текущего потока с помощью функции park(). LockSupport это низкоуровневый и высокопроизводительный инструмент, обычно используется для написания собственных блокировок.

Кроме того, BlockingCoroutine переопределяет функцию afterCompletion(), которая вызывается после завершения работы корутины.

override fun afterCompletion(state: Any?) {//wake up blocked threadif (Thread.currentThread ()! = blockedThread)LockSupport.unpark (blockedThread)}

Эта функция просто разблокирует поток, если она была заблокирована до этого с помощью park().

Как это всё работает примерно показано на схеме работы runBlocking.

Что здесь делает Dispatchers

Хорошо, мы поняли, что делает билдер runBlocking. Но почему в одном случае он блокирует UI-поток, а в другом нет? Почему Dispatchers.Main приводит к дедлоку...

// Этот код создает дедлокrunBlocking(Dispatchers.Main) {  println(Hello, World!)}

...,а Dispatchers.Default нет?

// А этот код создает дедлокrunBlocking(Dispatchers.Default) {  println(Hello, World!)}

Для этого вспомним, что такое диспатчер и зачем он нужен.

Диспатчер определяет, какой поток или потоки использует корутина для своего выполнения. Это некий высокоуровневый аналог Java Executor. Мы даже можем создать диспатчер из Executorа с помощью удобного экстеншна:

public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher

Dispatchers.Default реализует класс DefaultScheduler и делегирует обработку исполняемого блока кода объекту coroutineScheduler. Его функция dispatch() выглядит так:

override fun dispatch (context: CoroutineContext, block: Runnable) =  try {    coroutineScheduler.dispatch (block)  } catch (e: RejectedExecutionException) {    //    DefaultExecutor.dispatch(context, block)  }

Класс CoroutineScheduler отвечает за наиболее эффективное распределение обработанных корутин по потокам. Он реализует интерфейс Executor.

override fun execute(command: Runnable) = dispatch(command)

А что же делает функция CoroutineScheduler.dispatch()?

  • Добавляет исполняемый блок в очередь задач. При этом существует две очереди: локальная и глобальная. Это часть механизма приоритезации внешних задач.

  • Создает воркеры. Воркер это класс, унаследованный от обычного Java Thread (в данном случае daemon thread). Здесь создаются рабочие потоки. У воркера также есть локальная и глобальная очереди, из которых он выбирает задачи и выполняет их.

  • Запускает воркеры.

Теперь соединим всё, что разобрали выше про Dispatchers.Default, и напишем, что происходит в целом.

  • runBlocking запускает корутину, которая вызывает CoroutineScheduler.dispatch().

  • dispatch() запускает воркеры (под капотом Java потоки).

  • BlockingCoroutine блокирует текущий поток с помощью функции LockSupport.park().

  • Исполняемый блок кода выполняется.

  • Вызывается функция afterCompletion(), которая разблокирует текущий поток с помощью LockSupport.unpark().

Эта последовательность действий выглядит примерно так.

Перейдём к Dispatchers.Main

Это диспатчер, который создан специально для Android. Например, при использовании Dispatchers.Main фреймворк бросит исключение, если вы не добавляете зависимость:

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:..*'

Перед началом разбора Dispatchers.Main стоит поговорить о HandlerContext. Это специальный класс, который добавлен в пакет coroutines для Android. Это диспатчер, который выполняет задачи с помощью Android Handler всё просто.

Dispatchers.Main создаёт HandlerContext с помощью AndroidDispatcherFactory через функцию createDispatcher().

override fun createDispatcher() =  HandlerContext(Looper.getMainLooper().asHandler(async = true))

И что мы тут видим? Looper.getMainLooper().asHandler() означает, что он принимает Handler главного потока Android. Получается, что Dispatchers.Main это просто HandlerContext с Handlerом главного потока Android.

Теперь посмотрим на функцию dispatch() у HandlerContext:

override fun dispatch(context: CoroutineContext, block: Runnable) {  handler.post(block)}

Он просто постит исполняемый код через Handler. В нашем случае Handler главного потока.

Итого, что же происходит?

  • runBlocking запускает корутину, которая вызывает CoroutineScheduler.dispatch().

  • dispatch() отправляет исполняемый блок кода через Handler главного потока.

  • BlockingCoroutine блокирует текущий поток с помощью функции LockSupport.park().

  • Main Looper никогда не получает сообщение с исполняемым блоком кода, потому что главный поток заблокирован.

  • Из-за этого afterCompletion() никогда не вызывается.

  • И из-за этого текущий поток не будет разблокирован (через unparked) в функции afterCompletion().

Эта последовательность действий выглядит примерно так.

Вот почему runBlocking с Dispatchers.Main блокирует UI-поток навсегда.

Главный потокблокируется и ждёт завершения исполняемого кода. Но он никогда не завершается, потому что Main Looper не может получить сообщение на запуск исполняемого кода. Дедлок.

Совсем простое объяснение

Помните пример с Handler().post в самом начале статьи? Там код работает и ничего не блокируется. Однако мы можем легко изменить его, чтобы он был в значительной степени похож на наш код с Dispatcher.Main, и стал ещё нагляднее. Для этого можем добавить операции parking и unparking к текущему потоку, иммитируя работу функций afterCompletion() и joinBlocking(). Код начинает работать почти так же, как с билдером runBlocking.

//где-то в UI потокеval thread = Thread.currentThread()Handler().post {  println("Hello, World!") // это никогда не будет вызвано  // имитируем afterCompletion()  LockSupport.unpark(thread)}// имитируем joinBlocking()LockSupport.park()

Но этот трюк не будет работать с функцией runOnUiThread.

//где-то в UI потокеval thread = Thread.currentThread()runOnUiThread {  println("Hello, World!") // этот код вызовется  LockSupport.unpark(thread)}LockSupport.park()

Это происходит потому, что runOnUiThread использует оптимизацию, проверяя текущий поток. Если текущий поток главный, то он сразу же выполнит блок кода. В противном случае сделает post в Handler главного потока.

Если всё же очень хочется использовать runBlocking в UI-потоке, то у Dispatchers.Main есть оптимизация Dispatchers.Main.immediate. Там аналогичная логика как у runOnUiThread. Поэтому этот блок кода будет работать и в UI-потоке:

//где-то в UI потокеrunBlocking(Dispatchers.Main.immediate) {   println(Hello, World!)}

Выводы

В статье я описал как безобидный билдер runBlocking может заморозить ваше приложение на Android. Это произойдет, если вызвать runBlocking в UI-потоке с диспатчером Dispatchers.Main. Приложение заблокируется по следующему алгоритму:

  • runBlocking создаёт блокирующую корутину BlockingCoroutine.

  • Dispatchers.Main отправляет на запуск исполняемый блок кода через Handler.post.

  • Но BlockingCoroutine тут же заблокирует UI поток.

  • Поэтому Main Looper никогда не получит сообщение с исполняемым блоком кода.

  • А UI не разблокируется, потому что корутина ждёт завершения исполняемого кода.

Эта статья больше теоретическая, чем практическая. Просто потому, что runBlocking редко встречается в продакшн-коде. Но примеры с UI-потоком наглядны, потому что можно сразу заблокировать приложение и разобраться, как работает runBlocking.

Но заблокировать исполнение можно не только в UI-потоке, но и с помощью других диспатчеров, если поток вызова и корутины окажется одним и тем же. В такую ситуацию можно попасть, если мы будем пытаться вызвать билдер runBlocking на том же самом потоке, что и корутина внутри него. Например, мы можем использовать newSingleThreadContext для создания нового диспатчера и результат будет тот же. Здесь UI не будет заморожен, но выполнение будет заблокировано.

val singleThreadDispatcher = newSingleThreadContext("Single Thread")GlobalScope.launch (singleThreadDispatcher) {  runBlocking (singleThreadDispatcher) {    println("Hello, World!") // этот кусок кода опять не выполнится  }}

Если очень надо написать runBlocking в главном потоке Android-приложения, то не используйте Dispatchers.Main. Используйте Dispatchers.Default или Dispatchers.Main.immediate в крайнем случае.


Также будет интересно почитать:

Оригинал статьи на английском How runBlocking May Surprise You.
Как страдали iOS-ники когда выпиливали Realm.
О том, над чем в целом мы тут работаем: монолит, монолит, опять монолит.
Кратко об истории Open Source просто развлечься (да и статья хорошая).

Подписывайтесь начат Dodo Engineering, если хотите обсудить эту и другие наши статьи и подходы, а также на канал Dodo Engineering, где мы постим всё, что с нами интересного происходит.

Подробнее..

Параллельные запросы в Kotlin для автоматизации сборки данных

10.02.2021 18:20:17 | Автор: admin

Всем привет! В своей работе я часто использую Kotlin для автоматизации. Деятельность моя не связана напрямую с программированием, но Котлин здорово упрощает рабочие задачи.

Недавно нужно было собрать данные немаленького размера, дабы сделать анализ, поэтому решил написать небольшой скрипт, для получения данных и сохранения их в Excel. С последним пунктом проблем не возникло - почитал про Apache POI, взял пару примеров из официальной документации, доработав под себя. Чего не скажешь про запросы в Сеть.

Источник отдавал пачками json и надо было как-то быстро эти "пачки" собирать, преобразовывая в текст и записывая в файл таблицу.

Асинхронный метод

Начать решил с простой асинхронщины. Немного поковыряв HttpUrlConnection, отправил туда, где ему и место, заменив на HttpClient из Java.

Для тестов взял сервис https://jsonplaceholder.typicode.com/, который мне подсказал один знакомый разработчик. Сохранил ссылку, которая выдает Json с комментариями в переменную, дабы не дублировать и начал тесты.

const val URL = "https://jsonplaceholder.typicode.com/comments"

Функция была готова и даже работала. Данные приходили.

fun getDataAsync(url: String): String? {    val httpClient = HttpClient.newBuilder()        .build()    val httpRequest = HttpRequest.newBuilder()        .uri(URI.create(link)).build()    return httpClient.sendAsync(httpRequest, BodyHandlers.ofString())        .join().body()}

Теперь надо было проверить скорость работы. Вооружившись measureTimeMillis я запустил код.

val asyncTime = measureTimeMillis {     val res = (1..10)        .toList()        .map {getDataAsync("$URL/$it")}    res.forEach { println(it) }}println("Асинхронный запрос время $asyncTime мс")

Все работало как надо, но хотелось быстрее. Немного покопавшись в Интернете, я набрел на решение, в котором задачи выполняются параллельно.

Parallel Map

Автор в своем блоге пишет, что код ниже выполняется параллельно с использованием корутин. Ну что, я давно хотел их попробовать, а тут представилась возможность.

suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> =    coroutineScope {        map { async { f(it) } }.awaitAll()    }

Если я все верно понял, то здесь расширяется стандартная коллекция (класс Iterable) функцией pmap, в которую передается лямбда. В лямбду поочередно приходит параметр A. Затем после окончания прохода по списку async дожидается выполнения всех элементов списка, и с помощью .awaitAll() выдает результат в виде списка. Причем для каждого элемента функция с модификатором suspend, то есть блокироваться она не будет.

Пришло время тестов, и сказать, что я был разочарован - значит не сказать ничего.

val parmapTime = measureTimeMillis {    runBlocking {        val res = (1..10)            .toList()            .pmap { getDataAsync("$URL/$it") }        println(mapResult)    }}println("Время pmap $parmapTime мс")

Средний результат был в районе - 1523мс, что не сильно то отличалось по скорости от первого решения. Задачи может и работали параллельно благодаря map и async, но уж очень медленно.

Parallel Map v 2.0

После работы, вооружившись малиновым чаем, я сел читать документацию по корутинам и через некоторое время переписал реализацию автора.

suspend fun <T, V> Iterable<T>.parMap(func: suspend (T) -> V): Iterable<V> =    coroutineScope {        map { element ->             async(Dispatchers.IO) { func(element) }         }.awaitAll()     }val parMapTime = measureTimeMillis {    runBlocking {        val res = (1..10)            .toList()            .parMap { getDataAsync("$URL/$it") }    }    println(res)}println("Параллельная map время $parMapTime мс")

После добавления контекста Dispatchers.IO задача выполнялась в 2 раза быстрее ~ 610 мс. Другое дело! Остановившись на этом варианте и дописав все до полноценного рабочего скрипта (проверка ошибок, запись в excel и т.д.) я успокоился. Но мысль в голове о том, что можно еще что-то улучшить не покидала меня.

Java ParallelStream

Через несколько дней, в одном из постов на stackowerflow прочитал о parallelStream. Не откладывая дело в долгий ящик, после работы вновь запустил IDEA.

val javaParallelTime = measureTimeMillis {     val res = (1..10).toList()        .parallelStream()        .map { getDataAsync("$URL/$it") }    res.forEach { println(it) }}println("Java parallelSrtream время $javaParallelTime мс")

Код выполнялся даже чуть быстрее, чем моя реализация. Но радость длилась ровно до того момента, когда пришло время обрабатывать ошибки. Точки останова насколько я понял в stream нет. Иногда, у меня получалось так, что все считалось до конца, вываливалась ошибка и в виде результата "прилетал" то неполный, то пустой Json.

Может, я делал что-то не так, но с async таких проблем не возникло. Там можно контролировать данные на каждом шаге итерации и удобно обрабатывать ошибки.

Выводы

Результаты можно посмотреть в таблице ниже. Для себя я однозначно решил оставить async await. В основном конечно из-за более простой обработки ошибок. Да и за пределы корутин тут выходить не надо.

Метод

Время (ms)

Асинхронный метод

1487

Реализация pmap из Сети

1523

Мой вариант - parallelMap

610

Java.parallelStream

578

В дальнейшем, есть мысли оформить это в небольшую библиотеку и использовать в личных целях, и конечно переписать все это с "индусского кода" на человеческий, на сколько хватит возможностей. А потом залить все это на vds.

Надеюсь мой опыт кому-нибудь пригодится. Буду рад конструктивной критике и советам! Всем спасибо

Подробнее..

C20. Coroutines

18.09.2020 02:05:20 | Автор: admin

В этой статье мы подробно разберем понятие сопрограмм (coroutines), их классификацию, детально рассмотрим реализацию, допущения и компромиссы предлагаемые новым стандартом C++20.


image


Общие сведения


Сопрограммы можно рассматривать как обобщение понятия подпрограмм (routines, функций) в срезе выполняемых над ними операций. Принципиальное различие между сопрограммами и подпрограммами заключается в том, что сопрограмма обеспечивает возможность явно приостанавливать свое выполнение, отдавая контроль другим программным единицам и возобновлять свою работу в той же точке при получение контроля обратно, с помощью дополнительных операций, сохраняя локальные данные (состояние выполнения), между последовательными вызовами, тем самым, обеспечивая более гибкий и расширенный поток управления.


Чтобы внести больше ясности в это определение и дальнейшие рассуждения и ввести вспомогательные понятия и термины, рассмотрим механику обычных функций в C++ и их стековую природу.


Мы будем рассматривать семантику функции в контексте двух операций.


Вызов (call). Передача управления вызываемой процедуре. Выполнение операции можно разделить на несколько этапов:


  1. Выделить доступную вызываемой процедуре область памяти кадр (activation record, activation frame), необходимого размера;
  2. Сохранить значения регистров процессора (локальные данные) для последующего их восстановления, когда управление вернётся из вызываемой процедуры;
  3. Поместить значения аргументов вызова в доступную для процедуры область памяти. В этой же памяти размещаются локальные переменные;
  4. Поместить адрес возврата адрес команды, следующей за командой вызова в доступную для процедуры область памяти.

После чего процессор переходит по адресу первой команды вызываемой процедуры, передавая поток управления.


Возврат из процедуры (return). Передача управления обратно вызывающей стороне. Выполнение этой операции так же состоит из несколько этапов:


  1. Сохранить (если необходимо) возвращаемое значение в области памяти доступной вызывающей процедуре;
  2. Удалить локальные переменные, переданные аргументы;
  3. Восстановить значения регистров.

Дальше процессор переходит по адресу команды из адреса возврата, передавая управление обратно вызывающей стороне. После получения управление, вызывающая сторона освобождает выделенную вызываемой процедуре память (кадр).


Важно заметить следующее:


  1. Выделяемая, вызываемой процедуре, память имеет строго вложенную структуру и время жизни (strictly nested lifetime) относительно вызывающей стороны. Другими словами в каждый момент времени есть один активный кадр: кадр вызванной процедуры, после возврата управления, активным становится кадр вызывающей стороны.
  2. Размер кадр известен на стороне вызывающей процедуры.

Эти свойства позволяют использовать структуру, которая называется аппаратным стеком или просто стеком. Аппаратный стек это непрерывная область памяти аппаратно поддерживаемая центральным процессором и адресуемая специальными регистрами: ss (сегментный регистр стека), bp (регистр указателя базы стекового кадра), sp (регистр указателя стека), последний хранит адрес вершины стека (смешение относительно сегмента стека). Чтобы выделить память на стеке достаточно просто сместить указатель вершины стека (в сторону увеличения) на требуемый размер, чтобы освободить память нужно вернуть указатель в исходное положение.


Выделенная таким образом память называется стековым кадром или стекфреймом. Стекфрейм имеет строгую организацию, которая определяется соглашением о вызове (Calling Conversion). Соглашение зависит от компилятора, от особенностей аппаратной платформы и стандартов языка. Основные отличия касаются особенностей оптимизации (использование регистров) и порядка передачи аргументов. Так же им определяется, например, сторона на которой будут восстанавливаться регистры после вызова.


Рассмотрим простой пример:


void bar(int a, int b){}void foo(){    int a = 1;    int b = 2;    bar(a, b);}int main(){    foo();}

Без каких-либо оптимизаций будет сгенерирован следующий код (x86-64 clang 10.0.0 -m32,
код сгенерирован в 32х битном окружение просто чтобы продемонстрировать работу стека, по соглашению о вызовах для 64х битных систем при передачи аргументов в функцию, в таком простом случае, стек участвовать не будет, аргументы будут переданы напрямую через регистры):


bar(int, int):        push    ebp        mov     ebp, esp        mov     eax, dword ptr [ebp + 12]        mov     ecx, dword ptr [ebp + 8]        pop     ebp        retfoo():        push    ebp         mov     ebp, esp        sub     esp, 24         mov     dword ptr [ebp - 4], 1        mov     dword ptr [ebp - 8], 2        mov     eax, dword ptr [ebp - 4]        mov     ecx, dword ptr [ebp - 8]        mov     dword ptr [esp], eax        mov     dword ptr [esp + 4], ecx        call    bar(int, int)        add     esp, 24        pop     ebp        retmain:        push    ebp        mov     ebp, esp        sub     esp, 8          call    foo()        xor     eax, eax        add     esp, 8        pop     ebp        ret

Проиллюстрируем работу стека:


Начало работы функции main, на стеке лежит адрес возврата из функции, пушем на стек значение регистра ebp (указатель базы стекового кадра) т.к. дальше значение регистра будет меняться на базу текущего стекфрейма, сохраняем в ebp значение регистра esp (адрес вершины стека)


| ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp, esp+----------------+

Выделяем необходимую память под локальные перемменные и аргументы для вызова функции foo. Локальных переменных и аргументов у нас нет. Но т.к. стек имеет выравнивание в 16 байт и на стеке уже лежит 8 байт (4 для адреса возврата и 4 для сохраненного ebp) выделяем дополнительные 8 байт, смещая указатель стека.


| ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp+----------------+| ...            || 8 byte padding || ...            |     <-- esp-----------------+

Вызываем функцию foo. Команда call сохраняет на стеке адрес возврата и передает управление вызываемой функции. На стороне функции foo пушем на стек значение регистра ebp (указатель базы стекового кадра) и сохраняем в ebp значение регистра esp (адрес вершины стека), инициализируем новый стекфрем.


| ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp+----------------+| ...            || 8 byte padding || ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp, esp+----------------+

Выделяем необходимую память под локальные переменные и аргументы для вызова функции bar. У нас две локальные переменные типа int это 8 байт, два аргумента для функции bar типа int это 8 байт. И т.к у нас уже есть на стеке 8 байт (адрес возврата и сохраненный ebp) нужно выделить еще 8 байт чтобы соблюсти требования к выравниванию. Таким образом всего выделяем 8 + 8 + 8 = 24 байта, смещая указатель стека.


| ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp+----------------+| ...            || 8 byte padding || ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp+----------------+| local a        |     <-- ebp - 4+----------------+| local b        |     <-- ebp - 8+----------------+| ...            || 8 byte padding || ...            |+----------------+| arg a          |     <-- esp + 4+----------------+| arg b          |     <-- esp+----------------+

Вызываем функцию bar. Все работает также как и при вызове функции foo. Команда call сохраняет на стеке адрес возврата и передает управление вызываемой функции. На стороне функции bar пушем на стек значение регистра ebp и сохраняем в ebp значение регистра esp, инициализируем новый стекфрем.


| ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp+----------------+| ...            || 8 byte padding || ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp+----------------+| local a        |     <-- ebp - 4+----------------+| local b        |     <-- ebp - 8+----------------+| ...            || 8 byte padding || ...            |+----------------+| arg a          |     <-- ebp + 12+----------------+| arg b          |     <-- ebp + 8+----------------+| return address |+----------------+| saved rbp      |     <-- ebp, esp+----------------+

Функция bar ничего не делает. Восстанавливаем значение указателя базы предыдущего стекового кадра ebp (вызывающей стороны, функция foo) и удаляем сохраненное значение со стека, смещая указатель вершины стека на 4 байта вверх. Забираем со стека адрес возврата и удаляем сохраненное значение со стека, таким же смещением указатель вершины стека на 4 байта вверх. Передаем управление обратно функции foo.


| ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp+----------------+| ...            || 8 byte padding || ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp+----------------+| local a        |     <-- ebp - 4+----------------+| local b        |     <-- ebp - 8+----------------+| ...            || 8 byte padding || ...            |+----------------+| arg a          |     <-- esp + 4+----------------+| arg b          |     <-- esp+----------------+

Функция foo после вызова bar завершает свою работу. Удаляем локальные переменные и аргументы предыдущего вызова, смещаем указатель вершины обратно на 24 байта вверх.


| ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp+----------------+| ...            || 8 byte padding || ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp, esp+----------------+

Восстанавливаем значение указателя базы предыдущего стекового кадра ebp (вызывающей стороны, функция main) и удаляем сохраненное значение со стека. Забираем со стека адрес возврата и удаляем сохраненное значение со стека. Передаем управление обратно функции main.


| ...            |+----------------+| return address |+----------------+| saved rbp      |     <-- ebp+----------------+| ...            || 8 byte padding || ...            |     <-- esp-----------------+

main завершает свою работу, выполняя ровно те же действия, что и предыдущие вызовы и мы возвращаемся в исходное состояние.


| ...            |+----------------+| return address |+----------------+

Мы видим, что функции между вызовами ни как не сохраняют свое локальное состояние, каждый следующий вызов создаёт новый кадр и удаляет его по завершению работы.


Классификация


Помимо фундаментального отличия сопрограмм в способности сохранять свое состояние, можно выделить три основные конструктивные особенности.


  1. Способ передачи управления;
  2. Способ представления в языке;
  3. Способ локализации внутреннего состояния (состояния выполнения).

По способу передачи управления сопрограммы можно разделить на симметричные (symmetric) и асимметричные (asymmetric, semi-symmetric).
Симметричные сопрограммы обеспечивают единый механизм передачи управления между друг другом, являются равноправными, работая на одном иерархическом уровне. В этом случае сопрограмма должна явно указывать кому она передаёт управление, другую сопрограмму, приостанавливая свое выполнение и ожидая пока ей вернут контроль таким же образом. По сути, стек и вложенная природа вызовов функций заменяется на множество приостановленных, равноправных сопрограмм и одной активной, которая может передать управление любой другой сопрограмме.


В то время как асимметричные сопрограммы предоставляют две операции для передачи управления: одна для вызова, передача управления вызываемой сопрограмме и одна для приостановление выполнения, последняя возвращает контроль вызывающей стороне.


Стоить заметить что обе концепции обладают одинаковой выразительная силой.
Хотя асимметричные сопрограммы семантически ближе к функциям, в то время как симметричные интуитивно понятнее в контексте кооперативной многозадачности.


По способ представления в языке сопрограммы могут быть представлены как объекты первого класса (first-class object, first-class citizen) или как ограниченные низкоуровневые языковые конструкции (constrained, compiler-internal), скрывающие детали реализации и предоставляющие управляющие описатели (handles), дескрипторы.
Объекты первого класса в контексте языка программирования это сущности которые могут быть сохранены в переменные, могут передаваться в функции как аргументы или возвращаться в качестве результата, могут быть созданы в рантайме и не зависят от именования (внутренне самоопознаваемы). Например, нельзя создавать функции во время выполнения программы, поэтому функции не являются объектами первого класса. В то же время, существует понятие функционального объекта (function object): пользовательский тип данных, реализующий эквивалентную функциям семантику, который является объектом первого класса.


По способу локализации внутреннего состояния сопрограммы можно разделить на стековые (stackful) и стеконезависимые (stackless). Чтобы понять детальнее, что лежит в основе такого разделения, необходимо дать некоторую классификацию аппаратному стеку (proccesor stack).


Аппаратный стек может быть назначен разным уровням приложения:


  • Стек приложения (Application stack). Принадлежит функции main. Система управления памятью операционной системы может определять переполнения или недопустимые аллокации такого стека. Стек расположен в адресном пространстве таким образом, что его можно расширять по мере необходимости;
  • Стек потока выполнения (Thread stack). Стек назначенный явно запущенному потоку. Обычно используются стеки фиксированного размера (до 1-2 мб);
  • Стек контекста выполнения (Side stack). Контекст (Exection context) это некоторое окружение, пользовательский поток управления или функция (top level context function, контекстная функция верхнего уровня) со своим назначенным стеком. Стек обычно выделяется в пользовательском режиме (библиотечным кодом), а не операционной системой. Контекст имеет свойство сохранять и восстанавливать свое состояние выполнение: регистры центрального процессора, счётчик команд и указатель стека, что позволяет в пользовательском режиме переключатся между контекстами.

Каждая стековая сопрограмма использует отдельный контекст выполнения и назначенный ему стек. Передача управления между такими сопрограммами это переключение контекста (переключение пользовательского контекста достаточно быстрая операция, в сравнение с переключением контекстов операционной системы: между потоками) и соответственно активного стека. Важное следствие из стековой архитектуры таких сопрограмм заключается в том что, из-за того что все фремы вложенных вызовов полностью сохраняются на стеке, выполнение сопрограммы может быть приостановлено в любом из этих вызовов.


Приведем пример простой сопрограммы, мы воспользуемся семейством функций для управления контекcтами: getcontext, makecontext и swapcontext (см. Complete Context Control)


#include <iostream>#include <ucontext.h>static ucontext_t caller_context;static ucontext_t coroutine_context;void print_hello_and_suspend(){     // выводим Hello     std::cout << "Hello";    // точка передачи управления вызывающей стороне,     // переключаемся на контекст caller_context    // в контексте сопрограммы coroutine_context сохраняется текущая точка выполнения,    // после возвращения контроля выполнение продолжится с этой точки.    swapcontext(&coroutine_context, &caller_context);}void simple_coroutine(){    // точка первой передачи управления в coroutine_context    // чтобы продемонстрировать преимущества использование стека    // выполним вложенный вызов функции print_hello_and_suspend.    print_hello_and_suspend();    // функция print_hello_and_suspend приостановила выполнение сопрограмма    // после того как управление вернётся мы выведем Coroutine! и завершим работу,    // управление будет передано контексту,    // указатель на который хранится в coroutine_context.uc_link, т.е. caller_context    std::cout << "Coroutine!" << std::endl;}int main(){    // Стек сопрограммы.    char stack[256];    // Инициализация контекста сопрограммы coroutine_context    // uc_link указывает на caller_context, точку возврата при завершении сопрограммы.    // uc_stack хранит указатель и размер стека    coroutine_context.uc_link          = &caller_context;    coroutine_context.uc_stack.ss_sp   = stack;    coroutine_context.uc_stack.ss_size = sizeof(stack);    getcontext(&coroutine_context);    // Заполнение coroutine_context    // Контекст настраивается таким образом, что переключаясь на него    // исполнение начинается с точки входа в функцию simple_coroutine    makecontext(&coroutine_context, simple_coroutine, 0);    // передаем управление сопрограмме, переключаемся на контекст coroutine_context    // в контексте caller_context сохраняется текущая точка выполнения,    // после возвращения контроля, выполнение продолжится с этой точки.    swapcontext(&caller_context, &coroutine_context);    // сопрограмма приостановила свое выполнение и вернула управление    // выводим пробел    std::cout << " ";    // передаём управление обратно сопрограмме.    swapcontext(&caller_context, &coroutine_context);    return 0;}

Отметим, что контексты выполнения лежат в основе реализации стековых сопрограмм библиотеки Boost: Boost.Coroutine, Boost.Coroutine2, только в Boost по умолчанию вместо ucontext_t используется fcontext_t собственная, более производительная реализация(ассемблерная, с ручным сохранением/восстановлением регистров, без системных вызовов) POSIX стандарта.


В случае стеконезависимых сопрограмм контекст выполнения и назначенный ему стек не используются, сопрограмма выполняется на стеке вызывающей стороны, как обычная подпрограмма, до момента передачи управления. В силу того что стековые кадры имеют строго вложенную структуру и время жизни, такие сопрограмма, приостанавливая выполнение, сохраняют свое состояние, динамически размещая его в куче. Возобновление работы приостановленной сопрограммы, не отличается от вызова обычной подпрограммы, с тем исключение, что регистры и переменные восстанавливаются из сохраненного состояния также как и счетчик команд, т.е. управление передается в точку последней остановки. Структурно такие сопрограммы похожи на устройство Даффа и машину состояний.


Можно вывести несколько важных следствия стеконезависимых сопрограмм:


  1. Передача управления возможна только из самой сопрограммы (top level function), все вложенные вызовы к этому моменту должны быть завершены;
  2. Передача управления возможна только вызывающей стороне;
  3. Возобновление работы сопрограммы происходит на стеке вызывающей стороны, он может отличатся от стека первоначального вызова, это может быть даже другой поток;
  4. Для сохранения состояния (определения набора переменных), восстановления кадра и генерации шаблонного кода для возобновления работы с точки последней остановки, необходима поддержка со стороны стандартов и компиляторов языка.

Мы описали общую теорию и классификацию сопрограмм, рассмотрим техническую спецификацию сопрограмм нового C++20, какое место они занимают в общей теории, их особенности, семантику и синтаксис.


C++20.


Техническая спецификация сопрограмм в новом C++ носит название Coroutine TS. Coroutine TS предоставляет низкоуровневые средства обеспечивающие характерную возможность передачи управления, описывает обобщенный механизм взаимодействия и настройки сопрограмми и набор вспомогательных высокоуровневых типов стандартной библиотеки, задача которых сделать разработку сопрограмм более доступной и безопасной.


Подход который применяется для реализации обобщенных механизмов уже встречается и используется стандартом. Это range based for, суть его в том что компилятор генерирует код цикла, вызывая определенный набор методов строго описанным способом, в данном случае это методы begin и end, тем самым давая возможность программистам настраивать необходимое поведение цикла, определяя эти методы и тип итератора который они возвращают. Точно также компилятор генерирует код сопрограммы, вызывая в строго определенный момент методы определенных пользователем типов, позволяя полностью настраивать и контролировать поведение сопрограммы.


В описанной нами классификации предоставляемые средства подпадают под определение compile-internal сопрограмм.


Далее, чтобы дать возможность компилятору более эффективно работать с памятью, оптимизировать размер и выравнивание сохраняемых состояний используется стеконезависимая модель. Более того, во-первых, выделенная ранее память может переиспользовать, по сути, работая в режиме высокопроизводительного блочного аллокатора, во-вторых, в случаях когда сопрограмма имеет строго вложенное время жизни и размер кадра известен на вызывающей стороне (как в примере с контекстами выполнения), компилятор может избавится от динамического размещения состояния в куче и хранить состояние на стеке вызывающей стороны если это обычная подпрограмма или внутри уже размещенного состояния вызывающей сопрограммы.


И в завершение, чтобы снизить сложность восприятия и сделать более понятным поток управления, сопрограммы семантически приближены к подпрограммам т.е. являются асимметричными.


В итоге, C++20 даёт нам возможность работать с compile-internal asymmetric stackless coroutines.


Мы начнем с того, что сделаем обзор тех выразительных средств, которые предоставляет новый стандарт для описания и работы со сопрограммами и будем постепенно углубляться в детали реализации.


New Keywords


Для оперирования сопрограммами стандарт вводит три ключевых оператора:


  • co_await. Унарный оператор, позволяющий, в общем случае, приостановить выполнение сопрограммы и передать управление вызывающей стороне, пока не завершатся вычисления представленные операндом;
  • co_yield. Унарный оператор, частный случай оператора co_await, позволяющий приостановить выполнение сопрограммы и передать управление и значение операнда вызывающей стороне;
  • co_return. Оператор завершает работу сопрограммы, возвращая значение, после вызова сопрограмма больше не сможет возобновить свое выполнение.

Если в определение функции встречается хотя бы один из этих операторов, то функция считается сопрограммой и обрабатывается соответствующим образом.


Сопрограммой не может быть:


  • Функция main;
  • Функция с оператором return;
  • Функция помеченная constexpr;
  • Функция с автоматическим выведение типа возвращаемого значения (auto);
  • Функция с переменным числом аргументов (variadic arguments, не путать с variadic templates);
  • Конструктор;
  • Деструктор.

User types.


Со сопрограммами ассоциировано несколько интерфейсных типов, позволяющих настраивать поведение сопрограммы и контролировать семантику операторов.


Promise.


Объект типа Promise позволяет настраивать поведения сопрограммы как программной единицы. Должен определять:


  • Поведение сопрограммы при первом вызове;
  • Поведение при выходе из сопрограммы;
  • Стратегию обработки исключительных ситуаций;
  • Необходимость в дополнительном уточнении типа выражения операторов co_await;
  • Передача промежуточных и конечных результатов выполнения вызывающей стороне.
    Также тип promise участвует в разрешение перегрузки операторов new и delete, что позволяет настраивать динамическое размещение фрейма сопрограммы.
    Объект типа promise создаётся и хранится в рамках фрейма сопрограммы для каждого нового вызова.

Тип Promise определяется компилятором согласно специализации шаблона std::coroutine_traits по типу сопрограммы, в специализации участвует: тип возвращаемого значения, список типов входных параметров, тип класса, если сопрограмма представлена методом. Шаблон std::coroutine_traits определен следующем образом:


template <typename Ret, typename = std::void_t<>>struct coroutine_traits_base{};template <typename Ret>struct coroutine_traits_base<Ret, std::void_t<typename Ret::promise_type>>{    using promise_type = typename Ret::promise_type;};template <typename Ret, typename... Ts>struct coroutine_traits : coroutine_traits_base<Ret>{};

Тип должен иметь строгое имя promise_type. Из определения std::coroutine_traits, следует что существует как минимум одна специализация, которая ищет определение типа promise_type в пространстве имен типа возвращаемого результата. promise_type может быть как именем типа, так и псевдонимом.


Самый простой способ определения типа Promise для сопрограммы.


struct Task{    struct Promise    {        ...    };    using promise_type = Promise;};...Task foo(){    ...}

Также определение подобного типа Task полезно в более сложных ситуациях, тип может определять дополнительную семантику внешнего оперирования сопрограммой: передавать и получать данные, передавать поток управления (пробуждать) или уничтожать сопрограмму.


Другой способ определить тип Promise это явно специализировать шаблон std::coroutine_traits. Это удобно, например, для сопрограмма представленных методом пользовательского типа


class Coroutine{public:    void call(int);};namespace std{    template<>    struct coroutine_traits<void, Coroutine, int>    {        using promise_type = Coroutine;    };}

Если тип Promise имеет конструктор соответствующий параметрам сопрограммы, то он будет вызван, иначе будет вызван конструктор по умолчанию. Важно, что все аргументы будут переданы как lvalues, это нужно для того чтобы мы не смогли случайно переместить данные из переданных аргументов в объект Promise т.к. мы ожидаем аргументы в теле сопрограммы. Более подробно создание объекта типа Promise мы рассмотрим ниже.


Прежде чем определить интерфейс типа Promise, необходимо описать второй тип ассоциированный со сопрограммой: Awaitable.


Avaitable.


Объекты типа Avaitable определяют семантику потока управления сопрограммы. Позволяют:


  • Определить следует ли приостанавливать выполнение сопрограммы в точке вызова оператора co_await;
  • Выполнить некоторую логику после приостановления выполнения сопрограммы для дальнейшего планирования возобновления ее работы (асинхронные операции);
  • Получить результат вызова оператора co_await, после возобновления работы.

Объект типа Awaitable определяется в результате разрешения перегрузки (overload resolution) и вызова оператора co_await. Если жизнеспособной перегрузки не было найдено, то результат вычисления самого операнда является объектом типа Awaitable. Далее вызов оператора транслируется в последовательность вызовов методов объекта данного типа.


Например, мы хотим создать механизм отложенного выполнения, который позволит функции уснуть на некоторое время, вернет управление вызывающей стороне, после чего продолжить свое выполнение через заданное время.


Task foo(){    using namespace std::chrono_literals;    // выполнить некоторый набор операций    // вернуть управление    co_await 10s;    // через 10 секунд выполнить еще один набор операций.}

В этом примере выражение переданное в качестве операнда имеет тип std::chrono::duration<long long>, чтобы скомпилировать этот код нам нужно определить перегрузку оператора co_await для выражений такого типа.


template<typename Rep, typename Period>auto operator co_await(std::chrono::duration<Rep, Period> duration) noexcept{    struct Awaitable    {        explicit Awaitable(std::chrono::system_clock::duration<Rep, Period> duration)            : duration_(duration)        {}        ...    private:        std::chrono::system_clock::duration duration_;    };    return Awaitable{ duration };}

Внутри перегрузки мы описываем тип Awaitable, задача которого запланировать и вернуть управление сопрограмме через заданный промежуток времени, и возвращаем объект данного типа как результат.


Нам осталось определить интерфейс типа Awaitable, чтобы это сделать рассмотрим подробнее вызов оператора co_await <expr> и код, который компилятор генерирует в месте вызова.


{    // в начале мы определили тип Promise    using coroutine_traits = std::coroutine_traits<ReturnValue, Args...>;    using promise_type = typename coroutine_traits::promise_type;    ...    // вызов co_await <expr> в рамках сопрограммы    // 1.    // Cоздаем объект типа Avaitable, находим подходящую перегрузку оператора co_await,    // результат сохраняем во фрейме сопрограммы (как создается фрейм мы рассмотрим    // в рамках описания типа Promise), это необходимо     // т.к. с помощью Awaitable мы вернем результат вычисления, после возобновления работы.    frame->awaitable = create_awaitable(<expr>);    // 2.    // Вызываем метод await_ready().    // Основная задача метода позволить нам избежать остановки сопрограммы    // в случаях когда операция (вычисления) могут быть завершены синхронно    // или уже завершены, сохранив вычислительные ресурсы.    if (!awaitable.await_ready())    {        // 3.        // Если вызов await_ready() вернул false,        // то сопрограмма приостанавливает свое выполнение,        // сохраняет состояние: состояние локальных переменных, точку остановки        // (это идентификатор состояния, на которое сопрограмма перейдет        // после возобновления своей работы,         // достаточная информация что бы перейти в точку <resume-point>)        <suspend-coroutine>        // 4.        // Определяем тип coroutine_handle        // corotine_handle - это дескриптор фрейма сопрограммы.        // он обеспечивает низкоуровневую функциональность оперирования сопрограммой:        // передача управления (возобновление выполнения) и удаление.        using handle_type = std::coroutine_handle<promise_type>;        using await_suspend_result_type =            decltype(frame->awaitable.await_suspend(handle_type::from_promise(promise)));        // 5.        // Вызов метода await_suspend(handle),         // задача метода await_suspend выполнить некоторую логику        // на клиентской стороне после приостановления выполнения сопрограммы        // для дальнейшего планирования возобновления ее работы (если необходимо).         // Метод принимает один аргумент - дескриптор сопрограммы.        // Тип возвращаемого результата, определяет семантику передачи управления        if constexpr (std::is_void_v<await_suspend_result_type>)        {            // Тип возвращаемого результата void,            // то мы безусловно передаем управление вызывающей стороне            // (под вызывающей стороной здесь понимается сторона,            // которая передала управление сопрограмме)            frame->awaitable.await_suspend(handle_type::from_promise(promise));            <return-to-caller-or-resumer>;        }        else if constexpr (std::is_same_v<await_suspend_result_type, bool>)        {            // Тип возвращаемого результата bool,            // если метод вернул false, то управление не передается вызывающей стороне            // и сопрограмма возобновляет свое выполнение            // Это полезно, например, когда асинхронная операция            // инициированная объектом Awaitable завершилась синхронно            if (frame->awaitable.await_suspend(handle_type::from_promise(promise))                <return-to-caller-or-resumer>;        }        else if constexrp (is_coroutine_handle_v<await_suspend_result_type>)        {            // Тип возвращаемого результат std::coroutine_handle<OtherPromise>,            // т.е. вызов возвращает дескриптор другой сопрограммы,            // то мы передаем управление этой сопрограмме, это семантика позволяет            // эффективно реализовывать симметричный механизм передачи потока             // управления между сопрограммами            auto&& other_handle = frame->awaitable.await_suspend(                 handle_type::from_promise(promise));            other_handle.resume();        }        else        {            static_assert(false);        }    }    // 6.    // Точка возобновления выполнения (пробуждения)    // Вызов метода await_resume(). Задача метода получить результат вычисления.    // Возвращаемое значение рассматривается как результат вызова оператора co_await.resume_point:    return frame->awaitable.await_resume();}

Здесь есть несколько важных замечаний:


  1. Если в процессе обработки возбуждается исключение, то исключение пробрасывается дальше, наружу оператора co_await. Если во время исключения выполнение сопрограммы было приостановлено, то исключение перехватывается, сопрограмма автоматически возобновляет свое выполнение и только после этого пробрасывается дальше;
  2. Крайне важно, что сопрограмма полностью останавливает свое выполнение до вызова метода await_suspend и передачи дескриптора сопрограммы пользовательскому коду. В этом случае дескриптор сопрограммы может быть свободно передаваться между потоками выполнения без дополнительной синхронизации. Например, дескриптор может быть передан в запланированную в пуле-потоков асинхронную операцию. Конечно здесь следует очень внимательно следить за тем, в какой момент метода await_suspend мы передаем дескриптор другому потоку и как другой поток оперирует этим дескриптором. Поток получившей дескриптор может возобновить выполнение сопрограммы до того как мы вышли из await_suspend. После возобновление работы и вызова метода await_resume, объект Awaitable может быть удален. Также потенциально фрейм и объект Promise может быть удалены, до того как мы завершим метод await_suspend. Поэтому основное чего следует избегать, после передачи контроля над сопрограммой другому потоку в await_suspend: это не обращаться к полям (this может быть удален) и объекту Promise, они могут быть уже удалены.

Формально концепцию Awaitable можно определить в терминах type-traits примерно так:


is_awaitable
// является ли тип std::coroutine_handletemplate<typename Type>struct is_coroutine_handle : std::false_type{};template<typename Promise>struct is_coroutine_handle<std::coroutine_handle<Promise>> : std::true_type{};// типы возможных возвращаемых значений метода await_suspend// - void// - bool// - std::coroutine_handletemplate<typename Type>struct is_valid_await_suspend_return_type : std::disjunction<    std::is_void<Type>,    std::is_same<Type, bool>,    is_coroutine_handle<Type>>{};// метод await_suspenttemplate<typename Type>using is_await_suspent_method = is_valid_await_suspend_return_type<    decltype(std::declval<Type>().await_suspend(std::declval<std::coroutine_handle<>>()))>;// метод await_readytemplate<typename Type>using is_await_ready_method = std::is_constructible<bool, decltype(    std::declval<Type>().await_ready())>;// интерфейс типа Avaitable/*templae<typename Type>struct Avaitable{...    bool await_ready();    void await_suspend(std::coroutine_handle<>);    Type await_resume();...}*/template<typename Type, typename = std::void_t<>>struct is_awaitable : std::false_type{};template<typename Type>struct is_awaitable<Type, std::void_t<    decltype(std::declval<Type>().await_ready()),    decltype(std::declval<Type>().await_suspend(std::declval<std::coroutine_handle<>>())),    decltype(std::declval<Type>().await_resume())>> : std::conjunction<    is_await_ready_method<Type>,    is_await_suspent_method<Type>>{};template<typename Type>constexpr bool is_awaitable_v = is_awaitable<Type>::value;

Дополним предыдущий пример:


template<typename Rep, typename Period>auto operator co_await(std::chrono::duration<Rep, Period> duration) noexcept{    struct Awaitable    {        explicit Awaitable(std::chrono::system_clock::duration duration)            : duration_(duration)        {}        bool await_ready() const noexcept        {            return duration_.count() <= 0;        }        void await_resume() noexcept        {}        void await_suspend(std::coroutine_handle<> h)        {            // Реализация timer::async в данном контексте не очень интересна.            // Важно что это асинхронная операция, которая через заданный            // промежуток времени вызовет переданный callback.            timer::async(duration_, [h]()            {                h.resume();            });        }    private:        std::chrono::system_clock::duration duration_;    };    return Awaitable{ duration };}// сопрограмма, которая через каждую секунду будет выводить текст на экранTask tick(){    using namespace std::chrono_literals;    co_await 1s;    std::cout << "1..." << std::endl;    co_await 1000ms;    std::cout << "2..." << std::endl;}int main(){    tick();    std::cin.get();}

  1. Вызываем функцию tick;
  2. Находим нужную перегрузку оператора co_await и создаем объект Awaitable, передаем в конструктор временной интервал в 1 секунду;
  3. Вызываем метод await_ready, проверяем необходимо ли ожидание;
  4. Приостанавливаем работу функции tick, сохраняем состояние;
  5. Вызываем метод await_suspend и передаем дескриптор сопрограммы;
  6. Метод await_suspend инициирует асинхронную операцию timer::async, которая ожидает заданное время и вызывает переданный callback. Предаем в callback дескриптор сопрограммы чтобы после ожидания передать ей управление;
  7. Передаем управление вызывающей стороне функции main;
  8. Функция main вызывает метод стандартного потока ввода get, это синхронная операция, ожидающая ввода. Мы висим, чтобы просто дать завершится инициированным асинхронным операциям;
  9. Ждем одну секунду, асинхронная операция вызывает переданный нами callback, вызов осуществляется в том же потоке, в котором происходило ожидание;
  10. Вызываем метод resume у дескриптора. Метод передает управление сопрограмме: вызывается функция tick на стеке потока, восстанавливаем сохраненное во фрейме состояние, управление передается в точку последней остановки;
  11. Вызывается метод await_resume у объекта Avaitable, созданного при вызове оператора co_await и сохраненного во фрейме;
  12. Метода await_resume ничего не делает и не возвращает результата, оператор co_await завершает свою работу и передает управление, следующей за ним команде;
  13. Функция tick выводит сообщение на экран с помощью стандартного потока вывода "1...";
  14. Вызов следующего оператора co_await. Выполняем все шаги начиная с пункта 2. Отличие только в том, что управление возвращается не функции main, a асинхронной операции, которая вызвала наш callback, т.е. resumer'у. После это асинхронная операция завершает свое выполнение;
  15. Сопрограмма tick завершает свое выполнение (более детально этот процесс мы рассмотрим ниже)

После того как мы определили интерфейс и семантику оператора co_await и типа Awaitable, мы можем вернутся к более детальному изучению типа Promise и настройке сопрограммы.


Promise.


Также как и в случае с Awaitable, чтобы понять роль объекта Promise мы начнем с кода, который генерирует компилятор в процессе обработки сопрограммы


Генерируемый код можно разделить на три часть:


  1. Создание и инициализация кадра сопрограммы. Инициация выполнения сопрограммы. Т.е. это код который выполняется при первом вызове сопрограммы;
  2. Описание стейт-машины согласно пользовательским запросам передачи управления (вызовы операторов co_awat/co_yield/co_return). Это код, который выполняется при передаче управления сопрограмме т.е. при первом вызове и при возобновление работы;
  3. Завершение выполнения, освобождение ресурсов и удаление кадра. Код выполняется при естественном завершение или принудительном удаление.

// Примерная организация кадра сопрограммы.// Здесь отражены наиболее важные для понимая части// 1. resume - указатель на функцию, //    коротая вызывается при передаче управления сопрограмме, описывает стейт-машину.// 2. promise - объект типа Promise// 3. state - текущее состояние// 4. heap_allocated - был ли фрейм при создание размещен в куче//    или фрейм был создан на стеке вызывающей стороны// 5. args - аргументы вызова сопрограммы// 6. locals - сохраненные локальные переменные текущего состояния// ...struct coroutine_frame{    void (*resume)(coroutine_frame *);    promise_type promise;    int16_t state;    bool heap_allocated;    // args    // locals    //...};// 1. Создание и инициализация кадра сопрограммы. Инициация выполнения.template<typename ReturnValue, typename ...Args>ReturnValue Foo(Args&&... args){    // 1.    // Определяем тип Promise    using coroutine_traits = std::coroutine_traits<ReturnValue, Args...>;    using promise_type = typename coroutine_traits::promise_type;    // 2.    // Создание кадра сопрограммы.     // Размер кадра определяется встроенными средствами компилятора    // и зависит от размера объекта Promise, количества и размера локальных переменных    // и аргументов, и набора вспомогательных данных,    // необходимых для управления состоянием сопрограммы.    // 1. Если тип promise_type имеет статический метод    //    get_return_object_on_allocation_failure,    //    то вызывается версия оператора new, не генерирующая исключений    //    и в случае неудачи вызывается метод get_return_object_on_allocation_failure,    //    результат вызова возвращается вызывающей стороне.    // 2. Иначе вызывается обычная версия оператора new.    coroutine_frame* frame = nullptr;    if constexpr (has_static_get_return_object_on_allocation_failure_v<promise_type>)    {        frame = reinterpret_cast<coroutine_frame*>(            operator new(__builtin_coro_size(), std::nothrow));        if(!frame)            return promise_type::get_return_object_on_allocation_failure();    }    else    {        frame = reinterpret_cast<coroutine_frame*>(operator new(__builtin_coro_size()));    }    // 3.    // Сохраняем, переданные функции, аргументы во фрейме.    // Аргументы переданные по значению перемещаются.    // Аргументы переданные по ссылке (lvalue и rvalue) сохраняют ссылочную семантику.    <move-args-to-frame>    // 4.    // Создаем объект типа promise_type и сохраняем его во фрейме    new(&frame->promise) create_promise<promise_type>(<frame-lvalue-args>);    // 5.    // Вызываем метод Promise::get_return_object().    // Результат вычисления будет возвращен вызывающей стороне    // при достижение первой точки остановки и передачи потока управления.    // Результат сохраняется как локальная переменная до вызов тела функции,    // т.к. фрейм сопрограммы может быть удален (см. оператор co_await).    auto return_object = frame->promise.get_return_object();    // 6.    // Вызываем функцию описывающую стейт-машину согласно     // пользовательским запросам передачи управления    // В реализации GCC, например, эти две функции называются    // ramp-fucntion (создание и инициализация) и     // action-function (пользовательская стейт-машина) соответственно    void couroutine_states(coroutine_frame*);    couroutine_states(frame);    // 7.    // Возвращаем результат вызывающей стороне,     // мы достигнем этой точки в коде только при первом вызове,    // все последующие запросы на возобновление работы будут вызывать функцию    // стейт-машины couroutine_states, указатель на функцию сохранен во фрейме сопрограммы.    return return_object;}

Мы упоминали выше что тип Promise участвует в разрешение перегрузки операторов new и delete. Например, при таком определение, будут вызваны пользовательские операторы new и delete:


struct Promise{    void* operator new(std::size_t size, std::nothrow_t) noexcept    {        ...    }    void operator delete(void* ptr, std::size_t size)    {        ...    }    // определяем поведение сопрограммы если не удалось создать фрейм    static auto get_return_object_on_allocation_failure() noexcept    {        // создаем и возвращаем вызывающей стороне невалидный объект        return make_invalid_task();    }};

Более того у нас есть возможность добавить перегрузку оператора new c дополнительными аргументами, набор параметров должен быть согласован с параметрами функции. Это позволяет использовать, например, стандартные механизмы такие как leading-allocator convention.


// тип Promise с перегрузкой оператора new c пользовательским аллокаторомtemplate<typename Allocator>struct Promise : PromiseBase{    // std::allocator_arg_t - это tag-тип    // нужен для устранения неоднозначных ситуаций при перегрузке    void* operator new(std::size_t size, std::allocator_arg_t, Allocator allocator) noexcept    {        ...    }    void operator delete(void* ptr, std::size_t size)    {        ...    }};// добавляем соответствующую специализацию в std::coroutine_traitsnamespace std{    template<typename... Args>    struct coroutine_traits<Task, Args...>    {        using promise_type = PromiseBase;    };    template<typename Allocator>    struct coroutine_traits<Task, std::allocator_arg_t, Allocator>    {        using promise_type = Promise<Allocator>;    };}// мы можем вызывать сопрограммы с передачей конкретного аллокатораint main(){    MyAlloc alloc;    coro(std::allocator_arg, alloc);    ...}

Независимо от того, перегрузим мы операторы new и delete или нет, компилятор может избавиться от динамического размещения кадра и сохранить состояние на стеке вызывающей стороны.


Следующим шагом мы сохраняем аргументы сопрограммы во фрейм. Здесь есть несколько важных замечаний.
Во-первых нам следует быть крайне осторожными с передачей ссылок. Важно понимать, что сопрограмма ни как не меняет время жизни переданных аргументов, сохраняя ссылочную семантику, но при этом время жизни кадра сопрограммы отличается от стекового кадра обычной функции. Следующий, казалось бы простой пример, в случае со сопрограммой, является случаем Undefined Behavior.


void Coroutine(const std::vector<int>& data){    co_await 10s;    for(const auto& value : data)        std::cout << value << std::endl;}void Foo(){    // 1. Мы передаем сопрограмме  временный объект типа vector<int>;    // 2. Ссылка на этот временный объект сохраняется в константной ссылке data;    // 3. Аргументы вызова сохраняются во фрейме сопрограммы, т.к. ссылочная семантика    //    сохраняется, то поле, в котором мы сохранили data     //    будет указывать на тот же временный объект;    // 4. Временные объекты удаляются после полного вычисления выражения;    // 5. Выражение, в котором участвует временный объект типа vector<int>,     //    будет вычислено, когда оператор co_await вернет управление вызывающей стороне    //    и функция Foo продолжит свое выполнение;    // 6. Через 10 секунд, когда сопрограмма вновь получит поток управления и     //    продолжит свое выполнение c цикла, вектор, на который ссылается data     //    (поле фрейма, в котором сохранена ссылка), будет уже удален.    Coroutine({1, 2, 3});    ...}

Второе замечание касается порядка сохранения аргументов и создания объекта типа Promise. Нужно иметь ввиду что все дальнейшее оперирование с аргументами в теле сопрограммы осуществляется с копиями аргументов, сохраненных в кадре сопрограммы, поэтому мы сначала сохраняем аргументы, а потом создаем объект типа Promise и вызываем (если нашли) соответствующим перегруженный конструктор, в конструктор передаем именно скопированные аргументы, а не изначальные объекты. Это позволяет сохранить естественное восприятие потока управления и избежать неприятных эффектов, если Promise в конструкторе сохранил ссылку на какой-нибудь аргумент, тогда объект, на который мы будем ссылаться из Promise, будет существовать на протяжение всего времени жизни кадра сопрограммы.


Далее мы обращаемся к объекту типа Promise и вызываем метод get_return_object. Создаваемый объект не обязательно должен в точности соответствовать типу возвращаемого результата, по необходимости и возможности может быть выполнено неявное преобразование. В стандарте нет требования в какой момент выполнять неявное преобразование: в момент создания объекта и вызова get_return_object или в момент возвращения вызываемой стороне. Это важно, например, если мы используем какие-то операции с побочными эффектами, реализуя императивную последовательность выполнения. Пример эксплуатации такого преобразования Мonadic composition.


class Task{public:    struct promise_type    {        auto get_return_object() noexcept        {            return Task{ std::coroutine_handle<promise_type>::from_promise(*this) };        }        ...    };    void resume()    {        if(coro_handle)            coro_handle.resume();    }private:    Task() = default;    explicit Task(std::coroutine_handle<> handle)        : coro_handle(handle)    {}    std::coroutine_handle<> coro_handle;};

Мы уже встречали тип std::coroutine_handle эти дескриптор сопрограммы, обеспечивает низкоуровневую функциональность оперирования сопрограммой: передача управления (возобновление выполнения) и удаление. Статический метод from_promise, позволяет получить дескриптор сопрограммы по объекту Promise.


В заключение мы вызываем функцию couroutine_states и передаем управление стейт-машине, после того как функция couroutine_states вернет управление, приостановив или завершив выполнение сопрограммы, мы возвращаем ранее созданный методом get_return_object объект вызывающей стороне.


State Machine.


Функции couroutine_states описание стейт-машину согласно пользовательскому набору вызовов операторов co_awat/co_yield/co_return и вызывается автоматически при передаче управления сопрограмме: при первом вызове или при возобновление работы, вызовом метода дескриптора resume. Указатель на функцию сохранен в кадре сопрограммы.


void couroutine_states(coroutine_frame* frame){    switch(frame->state)    {        case 1:        ... goto resume_point_0;        case N:            goto resume_point_N;        ...    }    co_await promise.initial_suspend();    try    {        // function body    }    catch(...)    {        promise.unhandled_exception();    }final_suspend:    co_await promise.final_suspend();}

Если вернутся к коду, который генерируется вызовом оператора co_await, то можно заметить метку resume_point точка пробуждения.


{    ...resume_point:    return frame->awaitable.await_resume();}

Для каждого вызова оператора co_await в служебном или пользовательском коде генерируется уникальная метка порядковый номер оператора co_await, этот номер так же идентифицирует текущие состояние сопрограммы и сохраняется в кадре в поле state, перед передачей управления вызывающей стороне. И первое что мы делаем при вызове это смотрим на текущее состояние, безусловно переходим к соответствующей ветке, возобновляем работу и завершаем вызов оператора co_await в пользовательском коде.


При первом вызове сопрограммы состояние не будет задано и мы перейдем к вызову метода initial_suspend с последующей передачей результата оператору co_await. Задача этого вызова определить: следует ли начать выполнение пользовательского кода немедленно или выполнение должно быть отложено. Стандарт предоставляет два тривиальных типа реализующих концепцию Avaitable: std::suspend_never, std::suspend_always, которые упрощают реализацию метода initial_suspend, позволяя реализовать две наиболее распространенных модели поведения.


namespace std{    struct suspend_never    {        bool await_ready() noexcept { return true; }        void await_suspend(coroutine_handle<>) noexcept {}        void await_resume() noexcept {}    };    struct suspend_always    {        bool await_ready() noexcept { return false; }        void await_suspend(coroutine_handle<>) noexcept {}        void await_resume() noexcept {}    };}// В данном случае, вызов сопрограммы приводит к немедленному выполнению// пользовательского кодаclass Task{public:    struct promise_type    {        ...        auto init_suspend() const noexcept        {            return std::suspend_never{};        }    }    ...};// В этом же случае, при вызове сопрограммы, // управление сразу передается вызывающей стороне// и выполнение пользовательского кода будет отложено,// до явной передачи управление обратно через вызов метода resume.class TaskManual{public:    struct promise_type    {        ...        auto init_suspend() const noexcept        {            return std::suspend_always{};        }    }    ...};

Далее следует выполнение пользовательского кода. Если включена поддержка исключений, то пользовательский код заключается в блок try-catch с вызовом метода unhandled_exception в случае возбуждения исключения.


В пользовательском коде, помимо оператора co_await, могут встречаться операторы co_yield и co_return. Эти операторы позволяют передавать вызываемой стороне промежуточный или конечный результат выполнения сопрограммы. Передача результат осуществляется средствами объекта Promise, но с разной семантикой.


Оператор co_yield <expr> эквивалентен вызову:


co_await frame->promise.yield_value(<expr>);

Т.е. оператор позволяет сохранить промежуточный результат в объекте Promise и вернуть управление вызывающей стороне для дальнейшей манипуляции полученными данными. Типичная реализация метода yield_value выглядит так:


template<typename Type>class Task{public:    struct promise_type    {        ...        // Cохраняем переданное сопрограммой значение,        // передаем управление вызывающей стороне,         // возвращая и передавая оператору co_await объект типа std::suspend_always.        auto yield_value(Type value)        {            current_value = std::move(value);            return std::suspend_always{};        }    };    ...};

Пользовательский тип Task может реализовывать разные стратегии получения доступа к сохраненным в объекте Promise значениям. Один из наиболее выразительных вариантов использования семантики оператора coyield это [генераторы](https://en.wikipedia.org/wiki/Generator(computer_programming)). Пример генератора из библиотеки cppcoro


В свою очередь оператор co_return в контексте пользовательского кода обладает такой же семантикой как и оператор return. Позволяет принудительно завершить выполнение и вернуть результат вызывающей стороне.


  • Вызов co_rеturn без операндов эквивалентен:


    // co_return;frame->promise.return_void();goto final_suspend;
    

  • Если тип результат вычисления выражения, переданного оператору в качестве аргумента, отличен от void, то вызов co_rеturn эквивалентен следующему коду


    // co_return <expr>;promise.return_value(<expr>);goto final_suspend;
    

  • Если же тип результата вычисления выражения void, то вызов генерирует следующий код


    // co_return <expr>;<expr>;promise.return_void();goto final_suspend;
    


Важно, если в пользовательском коде нет операторов co_return, то в конце тела функции генерируется вызов оператора без аргументов co_return;. Т.е. выражение frame->promise.return_void() должно быть валидно.


После передачи результата вычислений в Promise через вызов методов return_value или return_void мы завершаем выполнение пользовательского кода и переходим к служебному вызову final_suspend.


В противовес методу initial_suspend, вызов метода final_suspend позволяет настроить поведение сопрограммы в момент завершения своего выполнения. Задача вызова дать возможность пользователю обработать и опубликовать результат выполнения, сигнализировать о завершение вычислений или передать выполнение другой сопрограмме.


// В этот случае после выполнения пользовательского кода сопрограмма завершит свою работу// и все ресурсы будут удалены автоматически. Объекту Promise будет вызван деструктор,// аргументы будут удалены, память выделенная под кадр сопрограммы будет очищена// вызовом оператора delete, затем управление будет передано вызывающей стороне.// Передача управления обратно сопрограмме приведет к Undefined Behavior.// Это поведение полезно, когда мы не ожидаем результата выполнения сопрограммы.class Task{public:    struct promise_type    {        ...        auto final_suspend() const noexcept        {            // не передаем управление вызывающей стороне            return std::suspend_never{};        }    };    ...};// В этом же случае, после выполнения пользовательского кода сопрограмма передаст // управление вызывающей стороне и ответственность за удаление ресурсов сопрограммы.// Передача управления обратно сопрограмме на этом этапе приведет к Undefined Behavior.// Удаление ресурсов сопрограммы осуществляется принудительным вызовом// на стороне пользователя coroutine_handle::destroy()// Это стратегия необходима, когда на нужно получить результат работы сопрограммы, // в противном случае они будут удалены вместе с объектом Promise.class TaskManual{public:    struct promise_type    {        ...        auto final_suspend() const noexcept        {            // передаем управление вызывающей стороне            return std::suspend_always{};        }    }    ...};

Мы рассмотрели все случаи использования оператора co_await как в пользовательском так и в служебном коде. Он вызывается с результатами вызова init_suspend и final_suspent, приостанавливает работу сопрограммы в случае вызова оператора co_yeild, может быть вызван в пользовательском коде с произвольным выражением. У последнего использования есть одна особенность. Если тип Promise определяет метод await_transform, то любой вызов оператора co_await в пользовательском коде транслируется в вызов


// co_await <expr>co_await frame->promise.await_transform(<expr>);

Это достаточно мощный инструмент, например, помимо вызова оператора co_await с типами, которые не поддерживают концепцию Avaitable, мы можем запрещать использование или менять поведение уже определенных сущностей. Например можно запретив прямые вызовы оператора co_await в генераторе.


class Task{public:    struct promise_type    {        ...        template<typename Type>        auto await_transform(Type&& Whatever) const noexcept        {            static_assert(false,                "co_await is not supported in coroutines of type Generator");            return std::suspend_never{};        }    };    ...};

Вместо заключения


Примеры кода:


  • cppcoro. Библиотека примитивов асинхронной и кооперативной композиции построенная на сопрограммах Coroutine TS;
  • folly. Реализована экспериментальная поддержка стандартных сопрограмм;

Возможное дальнейшее развитие:



Ссылки:



Буду рад комментариям и предложениям (можно по почте yegorov.alex@gmail.com)
Спасибо!

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru