Русский
Русский
English
Статистика
Реклама

Reactor

Лечим Java Reactor при помощи Kotlin Coroutines

17.01.2021 18:05:05 | Автор: admin

На текущей работе пишем на Reactor. Технология классная, но как всегда есть много НО. Некоторые вещи раздражают, код сложнее писать и читать, с ThreadLocal совсем беда. Решил посмотреть какие проблемы уйдут, если перейти на Kotlin Coroutines, а какие проблемы, наоборот, добавятся.

Карточка пациента

Для статьи написал маленький проект, воспроизведя проблемы, с которыми столкнулся на работе. Основной код здесь. Алгоритм специально не стал разбивать на отдельные методы, так лучше видно проблемы.

В двух словах об алгоритме:

Переводим деньги с одного счёта на другой, записывая транзакции о факте перевода.

Перевод идемпотентен, так что если транзакция уже есть в БД, то отвечаем клиенту, что всё хорошо. При вставке транзакции может вылететь DataIntegrityViolationException, это тоже значит, что транзакция уже есть.

Чтобы не уйти в минус есть проверка в коде + Optimistic lock, который не позволяет конкурентно обновлять счета. Чтобы он работал нужен retry и дополнительная обработка ошибок.

Для тех кому не нравится сам алгоритм

Алгоритм для проекта выбирал такой, чтобы воспроизвести проблемы, а не чтобы он был эффективным и архитектурно правильным. Вместо одной транзакции надо вставлять полупроводки, optimistic lock вообще не нужен (вместо него проверка положительности счета в sql), select + insert надо заменить на upsert.

Жалобы пациента

  1. Stacktrace не показывает каким образом мы попали в проблемное место.

  2. Код явно сложнее, чем был бы на блокирующих технологиях.

  3. Многоступенчатая вложенность кода из-за flatMap.

  4. Неудобная обработка ошибок и их выброс.

  5. Сложная обработка поведения для Mono.empty().

  6. Сложности с логированием, если надо в лог добавить что-то глобальное, например traceId. (в статье не описываю, но те же проблемы с другими ThreadLocal переменными, например SpringSecurity)

  7. Неудобно дебажить.

  8. Неявное api для параллелизации.

Ход лечения

Написал отдельный PR перехода с Java на Kotlin.

Интеграция почти везде гладкая.

Понадобилось добавить com.fasterxml.jackson.module:jackson-module-kotlin чтобы заработала сериализация data классов и org.jetbrains.kotlin.plugin.spring чтобы не прописывать везде open модификаторы.

В контроллере достаточно было написать suspend fun transfer(@RequestBody request: TransferRequest) вместо public Mono<Void> transfer(@RequestBody TransferRequest request)

В репозитории написал suspend fun save(account: Account): Account вместо Mono<Account> save(Account account); Единственное, репозитории не определяются, если в них только suspend функции, надо, чтобы хоть один метод работал с Reactor типами.

Тесты обернул в runBlocking { }, чтобы можно было вызывать suspend функции.

Для реализации Retry использовал библиотеку kotlin-retry. Единственное, в ней не было функции фильтрации по классу ошибки, но это было легко добавить (завёл PR).

Ну и, естественно, переписал алгоритм. Все детали опишу ниже по-отдельности.

Было:

public Mono<Void> transfer(String transactionKey, long fromAccountId,                           long toAccountId, BigDecimal amount) {  return transactionRepository.findByUniqueKey(transactionKey)    .map(Optional::of)    .defaultIfEmpty(Optional.empty())    .flatMap(withMDC(foundTransaction -> {      if (foundTransaction.isPresent()) {        log.warn("retry of transaction " + transactionKey);        return Mono.empty();      }      return accountRepository.findById(fromAccountId)        .switchIfEmpty(Mono.error(new AccountNotFound()))        .flatMap(fromAccount -> accountRepository.findById(toAccountId)          .switchIfEmpty(Mono.error(new AccountNotFound()))          .flatMap(toAccount -> {            var transactionToInsert = Transaction.builder()              .amount(amount)              .fromAccountId(fromAccountId)              .toAccountId(toAccountId)              .uniqueKey(transactionKey)              .build();            var amountAfter = fromAccount.getAmount().subtract(amount);            if (amountAfter.compareTo(BigDecimal.ZERO) < 0) {              return Mono.error(new NotEnoghtMoney());            }            return transactionalOperator.transactional(              transactionRepository.save(transactionToInsert)                .onErrorResume(error -> {                  //transaction was inserted on parallel transaction,                  //we may return success response                  if (error instanceof DataIntegrityViolationException             && error.getMessage().contains("TRANSACTION_UNIQUE_KEY")) {                    return Mono.empty();                  } else {                    return Mono.error(error);                  }                })                .then(accountRepository.transferAmount(                  fromAccount.getId(), fromAccount.getVersion(),                   amount.negate()                ))                .then(accountRepository.transferAmount(                  toAccount.getId(), toAccount.getVersion(), amount                ))            );          }));    }))    .retryWhen(Retry.backoff(3, Duration.ofMillis(1))      .filter(OptimisticLockException.class::isInstance)      .onRetryExhaustedThrow((__, retrySignal) -> retrySignal.failure())    )    .onErrorMap(      OptimisticLockException.class,      e -> new ResponseStatusException(        BANDWIDTH_LIMIT_EXCEEDED,        "limit of OptimisticLockException exceeded", e      )    )    .onErrorResume(withMDC(e -> {      log.error("error on transfer", e);      return Mono.error(e);    }));}

Стало:

suspend fun transfer(transactionKey: String, fromAccountId: Long,                     toAccountId: Long, amount: BigDecimal) {  try {    try {      retry(limitAttempts(3) + filter { it is OptimisticLockException }) {        val foundTransaction = transactionRepository          .findByUniqueKey(transactionKey)        if (foundTransaction != null) {          logger.warn("retry of transaction $transactionKey")          return@retry        }        val fromAccount = accountRepository.findById(fromAccountId)          ?: throw AccountNotFound()        val toAccount = accountRepository.findById(toAccountId)          ?: throw AccountNotFound()        if (fromAccount.amount - amount < BigDecimal.ZERO) {          throw NotEnoghtMoney()        }        val transactionToInsert = Transaction(          amount = amount,          fromAccountId = fromAccountId,          toAccountId = toAccountId,          uniqueKey = transactionKey        )        transactionalOperator.executeAndAwait {          try {            transactionRepository.save(transactionToInsert)          } catch (e: DataIntegrityViolationException) {            if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {              throw e;            }          }          accountRepository.transferAmount(            fromAccount.id!!, fromAccount.version, amount.negate()          )          accountRepository.transferAmount(            toAccount.id!!, toAccount.version, amount          )        }      }    } catch (e: OptimisticLockException) {      throw ResponseStatusException(        BANDWIDTH_LIMIT_EXCEEDED,         "limit of OptimisticLockException exceeded", e      )    }  } catch (e: Exception) {    logger.error(e) { "error on transfer" }    throw e;  }}

Stacktraces

Пожалуй, это самое главное.

Было:

o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockExceptionat c.g.c.v.r.services.Ledger.lambda$transfer$5(Ledger.java:75)...Caused by: c.g.c.v.r.OptimisticLockException: nullat c.g.c.v.r.repos.AccountRepositoryImpl.lambda$transferAmount$0(AccountRepositoryImpl.java:27)at r.c.p.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)  ...

Стало:

error on transfer o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockExceptionat c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:70)at c.g.c.v.r.services.Ledger$transfer$1.invokeSuspend(Ledger.kt)...Caused by: c.g.c.v.r.OptimisticLockException: nullat c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)...at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)(Coroutine boundary)at o.s.t.r.TransactionalOperatorExtensionsKt.executeAndAwait(TransactionalOperatorExtensions.kt:31)at c.g.c.v.r.services.Ledger$transfer$3.invokeSuspend(Ledger.kt:56)at com.github.michaelbull.retry.RetryKt$retry$3.invokeSuspend(Retry.kt:38)at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:35)at c.g.c.v.r.controllers.LedgerController$transfer$2$1.invokeSuspend(LedgerController.kt:20)at c.g.c.v.r.controllers.LedgerController$transfer$2.invokeSuspend(LedgerController.kt:19)at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)at o.s.c.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)(Coroutine creation stacktrace)at k.c.i.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)at k.c.i.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)...Caused by: c.g.c.v.r.OptimisticLockException: nullat c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)...at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)...

Скучные части стектрейсов я вырезал, пакеты сократил (забочусь о читателе, и без того длинно).

В Java очень куцая информация. Да, ошибка есть. Даже видно на какой строчке она вылетела. Только непонятно а как мы в эту строчку кода попали. В Kotlin версии виден весь трейс от контроллера.

Вот представьте себе, что вы видите ошибку в логе где-то на обращении в регулярно вызываемый метод. А кто его вызывал? Придётся по логам рядом искать. Это хорошо, если логи объединены через что-нибудь вроде traceId (thread name нам не поможет) и вообще логи есть.

Сложность кода

Kotlin версия выглядит проще, точно так же, как выглядел бы код на блокирующих технологиях. По крайне мере пока мы не вводим параллельные операции (тут ещё вопрос какой вариант сложнее писать: на блокирующих технологиях или с корутинами).

Многоступенчатая вложенность кода

Никаких flatMap. Добавились вложения из-за явных try catch, но схожая логика вся объявлена на одном уровне.

Было:

return accountRepository.findById(fromAccountId)  .switchIfEmpty(Mono.error(new AccountNotFound()))  .flatMap(fromAccount -> accountRepository.findById(toAccountId)    .switchIfEmpty(Mono.error(new AccountNotFound()))    .flatMap(toAccount -> {      ...    })

Стало:

val fromAccount = accountRepository.findById(fromAccountId)  ?: throw AccountNotFound()val toAccount = accountRepository.findById(toAccountId)  ?: throw AccountNotFound()...

Обработка ошибок и их выброс

Обработка ошибок теперь через обычный try catch, легко понять какой кусок кода мы обернули.

Было:

return transactionRepository.findByUniqueKey(transactionKey)  ...  .onErrorMap(    OptimisticLockException.class,    e -> new ResponseStatusException(      BANDWIDTH_LIMIT_EXCEEDED,       "limit of OptimisticLockException exceeded", e    )  )

Стало:

try {  val foundTransaction = transactionRepository    .findByUniqueKey(transactionKey)  ...} catch (e: OptimisticLockException) {  throw ResponseStatusException(    BANDWIDTH_LIMIT_EXCEEDED,     "limit of OptimisticLockException exceeded", e  )}

Ошибки выбрасывать можно просто через throw, а не возвращая объект ошибки. В Reactor меня особенно раздражают конструкции вида:

.flatMap(foo -> {  if (foo.isEmpty()) {     return Mono.error(new IllegalStateException());  } else {    return Mono.just(foo);  }})

Мне могут возразить, что это функциональный стиль, так и должно быть. Но именно из-за функционального стиля появляется дополнительная сложность кода.

Mono.empty()

Это заслуживает отдельного обсуждения. В реактор нельзя передавать null в качестве результата. При этом нельзя написать C5C.

Ide никак тебе не подсказывает, может ли этот конкретный mono быть пустым и обработал ли ты это. В рабочем проекте для меня именно это было источником тупых ошибок. То забудешь обработать, то после рефакторинга как-то не так работает.

В Kotlin будет not null тип, если ты точно знаешь, что результат будет. Или это будет nullable тип и компилятор обяжет тебя что-то с этим сделать.

Конкретно на нашем примере:

Было:

return transactionRepository.findByUniqueKey(transactionKey)  .map(Optional::of)  .defaultIfEmpty(Optional.empty())  .flatMap(foundTransaction -> {    if (foundTransaction.isPresent()) {      log.warn("retry of transaction " + transactionKey);      return Mono.empty();    }...

Стало:

val foundTransaction = transactionRepository  .findByUniqueKey(transactionKey)if (foundTransaction != null) {  logger.warn("retry of transaction $transactionKey")  return@retry}...

Может, как-то можно эту логику написать адекватнее на Reactor, но то что я нагуглил выглядит ещё хуже.

Логирование и контекст

Допустим, мы хотим всегда логировать traceId во время обработки запроса. ThreadLocal больше не работает, в том числе и MDC (контекст для логирования). Что делать?

Есть контекст. И в Reactor и в Coroutines контекст immutable, так что новое значение в MDC подбрасывать будет не так просто (нужно пересоздавать контекст).

Чтобы работало в Java надо написать фильтр, который сохранит traceId в контекст:

@Componentpublic class TraceIdFilter implements WebFilter {  @Override  public Mono<Void> filter(    ServerWebExchange exchange, WebFilterChain chain  ) {    var traceId = Optional.ofNullable(      exchange.getRequest().getHeaders().get("X-B3-TRACEID")    )      .orElse(Collections.emptyList())      .stream().findAny().orElse(UUID.randomUUID().toString());    return chain.filter(exchange)      .contextWrite(context ->        LoggerHelper.addEntryToMDCContext(context, "traceId", traceId)      );  }}

И каждый раз, когда мы хотим что-то залогировать, надо переносить traceId из контекста в MDC:

public static <T, R> Function<T, Mono<R>> withMDC(  Function<T, Mono<R>> block) {  return value -> Mono.deferContextual(context -> {    Optional<Map<String, String>> mdcContext = context      .getOrEmpty(MDC_ID_KEY);    if (mdcContext.isPresent()) {      try {        MDC.setContextMap(mdcContext.get());        return block.apply(value);      } finally {        MDC.clear();      }    } else {      return block.apply(value);    }  });}

Да, это опять Mono. Т.е. мы можем логировать только тогда, когда код позволяет вернуть Mono. Например вот так:

.onErrorResume(withMDC(e -> {  log.error("error on transfer", e);  return Mono.error(e);}))

В Kotlin проще. Нужно написать фильтр, чтобы сохранить traceId сразу в MDC:

@Componentclass TraceIdFilter : WebFilter {  override fun filter(    exchange: ServerWebExchange, chain: WebFilterChain  ): Mono<Void> {    val traceId = exchange.request.headers["X-B3-TRACEID"]?.first()     MDC.put("traceId", traceId ?: UUID.randomUUID().toString())    return chain.filter(exchange)  }}

И при создании корутины вызывать withContext(MDCContext()) { }

Каждый раз на время работы корутины, будет восстанавливаться MDC и в логах будет traceId. Во время записи в лог ни о чем задумываться не нужно.

Тут есть одно НО, об этом позже.

Дебаг

В Java Reactor проблемы с дебагом примерно те же, что и со стектрейсом: нужно думать о том, кто в каком порядке кого вызвал, ставить breakpoints в лямбдах и т.д..

С корутинами не замечаешь разницы с обычным кодом: работает stepOver, переменные подсвечиваются, видно стектрейс и по нему можно перемещаться (в нашем примере вплоть до контроллера).

Всё идеально, кроме возможности запускать suspend функции во время дебага. На это уже есть issue. Правда, надо сказать, что и в Java Reactor особо не получается в evaluate сделать то, что хочется.

Параллелизация

Предположим, я хочу ускорить алгоритм и запросить из базы аккаунты и транзакции параллельно, а не последовательно как сейчас.

Было:

return Mono.zip(  transactionRepository.findByUniqueKey(transactionKey)    .map(Optional::of)    .defaultIfEmpty(Optional.empty()),  accountRepository.findById(fromAccountId)    .switchIfEmpty(Mono.error(new AccountNotFound())),  accountRepository.findById(toAccountId)    .switchIfEmpty(Mono.error(new AccountNotFound())),).flatMap(withMDC(fetched -> {  var foundTransaction = fetched.getT1();  var fromAccount = fetched.getT2();  var toAccount = fetched.getT3();  if (foundTransaction.isPresent()) {    log.warn("retry of transaction " + transactionKey);    return Mono.empty();  }  ...}

Стало:

val foundTransactionAsync = GlobalScope.async(coroutineContext) {  logger.info("async fetch of transaction $transactionKey")  transactionRepository.findByUniqueKey(transactionKey)}val fromAccountAsync = GlobalScope.async(coroutineContext) {   accountRepository.findById(fromAccountId) }val toAccountAsync = GlobalScope.async(coroutineContext) {   accountRepository.findById(toAccountId) }if (foundTransactionAsync.await() != null) {  logger.warn("retry of transaction $transactionKey")  return@retry}val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()val toAccount = toAccountAsync.await() ?: throw AccountNotFound()

В Kotlin версии есть явное указание вот это выполни асинхронно, вместо выполни всё это в параллель в Reactor.

Что самое важное, код ведёт себя по-разному. В случае с Reactor мы создаем три параллельных запроса и продолжаем работу только после того, как все три завершатся. С корутинами мы запускаем все три запроса и ждать чего-то начинаем только при вызове foundTransactionAsync.await(). Таким образом, если transactionRepository.findByUniqueKey() выполнится быстрее, то мы завершим обработку, без ожидания accountRepository.findById() (эти операции отменятся).

В более сложных алгоритмах разница будет ощутимей. Не уверен, что вообще в Reactor получится написать такую версию алгоритма:

val foundTransactionAsync = GlobalScope.async(coroutineContext) {  logger.info("async fetch of transaction $transactionKey")  transactionRepository.findByUniqueKey(transactionKey)}val fromAccountAsync = GlobalScope.async(coroutineContext) {  accountRepository.findById(fromAccountId)}val toAccountAsync = GlobalScope.async(coroutineContext) {  accountRepository.findById(toAccountId)}if (foundTransactionAsync.await() != null) {  logger.warn("retry of transaction $transactionKey")  return@retry}val transactionToInsert = Transaction(  amount = amount,  fromAccountId = fromAccountId,  toAccountId = toAccountId,  uniqueKey = transactionKey)transactionalOperator.executeAndAwait {  try {    transactionRepository.save(transactionToInsert)  } catch (e: DataIntegrityViolationException) {    if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {      throw e;    }  }  val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()  val toAccount = toAccountAsync.await() ?: throw AccountNotFound()  if (fromAccount.amount - amount < BigDecimal.ZERO) {    throw NotEnoghtMoney()  }  accountRepository.transferAmount(    fromAccount.id!!, fromAccount.version, amount.negate()  )  accountRepository.transferAmount(    toAccount.id!!, toAccount.version, amount  )}

Здесь я ожидаю асинхронный запрос уже внутри открытой БД транзакции. Т.е. мы при работе с одним коннектом, ждём результата выполнения на другом коннекте. Так делать не стоит, скорее для примера, хоть оно и работает (пока коннектов хватает).

Побочные эффекты

Конечно, есть проблемы, куда без них.

Надо явно указывать context и scope

Чтобы программа работала как ожидается, надо:

  1. Каждому запросу назначить scope. Таким образом все порождаемые при обработке запроса корутины будут отменены все вместе, например, в случае ошибки.

  2. В каждом запросе проставить context. Зачем нам нужен контекст я рассказывал в разделе про логирование.

Spring не берет на себя заботу об этом вопросе, приходится в контроллере указывать явно:

@PutMapping("/transfer")suspend fun transfer(@RequestBody request: TransferRequest) {  coroutineScope {    withContext(MDCContext()) {      ledger.transfer(request.transactionKey, request.fromAccountId,                       request.toAccountId, request.amount)    }  }}

Естественно, можно объединить в одну функцию и потом через regexp проверить, что во всех методах есть нужный вызов. Но лучше была бы какая-то автоматизация для этого.

Передача context

В случае, если мы порождаем дополнительные корутины, необходимо явным образом передать текущий контекст. Вот пример:

val foundTransactionAsync = GlobalScope.async(coroutineContext) {  logger.info("async fetch of transaction $transactionKey")  transactionRepository.findByUniqueKey(transactionKey)}

По-умолчанию в async() указывать контекст не требуется (он будет пуст). Видимо, чтобы было меньше неявных вещей. Но как итог, в нашем случае, если забыть передать контекст, в логе не окажется traceId. Помните я писал, что не надо задумываться во время логирования? Вместо этого надо задумываться при создании корутины (что, конечно, лучше).

AOP и suspend

Автоматизацию, которую я упоминал в первом пункте, написать самому сложно. Потому что пока нельзя нормально написать aspect для suspend функции.

Я в итоге сумел написать такой аспект. Но для объяснения того, как это работает понадобится отдельная статья.

Надеюсь, появится более адекватный способ писать аспекты (попробую этому посодействовать).

Оценка лечения

Все проблемы исчезли. Добавилась пара новых, но оно терпимо.

Надо сказать, что корутины быстро развиваются и я ожидаю только улучшения работы с ними.

Видно, что команда JetBrains внимательно отнеслась к проблемам разработчиков. Насколько я знаю, где-то год назад всё ещё были проблемы с дебагом и стактрейсом, к примеру.

Самое главное, с корутинами не надо в голове держать все особенности работы Reactor и его могучий API. Ты просто пишешь код.

Подробнее..

Разгоняем REACTOR

12.06.2021 18:20:44 | Автор: admin

Кому будет интересно?

Реактор сегодня - это стильно, модно, молодежно. Почему многие из нас практикуют реактивное программирование? Мало кто может ответить однозначно на этот вопрос. Хорошо - если Вы понимаете свой выигрыш, плохо - если реактор навязан организацией как данность. Большинство аргументов "ЗА" - это использование микросервисной архитектуры, которая в свою очередь обязывает микросервисы часто и много коммуницировать между собой. Для коммуникации в большинстве случаев выбирают HTTP взаимодействие. Для HTTP нужен легковесный веб-сервер, а что первое приходит на ум? Tomcat. Тут появляются проблемы с лимитом на максимальное количество сессий, при превышении которого веб-сервер начинает реджектить запросы (хотя лимита этого не так уж и легко достичь). Здесь на подмогу приходит реактор, который подобными лимитами не ограничен, и, например, Netty в качестве веб-сервера, который работает с реактивностью из коробки. Раз есть реактивный веб-сервер, нужен реактивный веб-клиент (Spring WebClient или Reactive Feign), а раз клиент реактивный, то вся эта жуть просачивается в бизнес логику, Mono и Flux становятся Вашими лучшими друзьями (хотя по началу есть только ненависть :))

Среди бизнес задач, очень часто встречаются серьезные процедуры, которые обрабатывают большие массивы данных, и нам приходится применять реактор и для них. Тут начинаются сюрпризы, если реактор не уметь готовить, можно и проблем схлопотать очень много. Превышение лимита файловых дескрипторов на сервере, OutOfMemory из-за неконтролируемой скорости работы неблокирующего кода и многое многое другое, о чем мы сегодня поговорим. Мы с коллегами испытали очень много трудностей из-за проблем с пониманием как держать реактор под контролем, но всё что нас не убивает - делает нас умнее!

Блокирующий и неблокирующий код

Вы ничего не поймете дальше, если не будете понимать разницу между блокирующим и неблокирующим кодом. Поэтому, остановимся и внимательно разберемся в чем разница. Вы уже знаете, блокирующий код реактору - враг, неблокирующий - бро. Проблема лишь в том, что в настоящий момент времени, не все взаимодействия имеют неблокирующие аналоги.

Лидер здесь - HTTP взаимодействие, вариантов масса, выбирай любой. Я предпочитаю Reactive Feign от Playtika, в комбинации со Spring Boot + WebFlux + Eureka мы получаем очень годную сборку для микросервисной архитектуры.

Давайте по-простому: НЕблокирующий код, это обычно всё, в названии чего есть reactive, а блокирующий - все оставшееся :) Hibernate + PostgreSQL - блокирующий, отправить почту через JavaMail - блокирующий, скинуть сообщение в очередь IBMMQ - блокирующий. Но есть, например, реактивный драйвер для MongoDB - неблокирующий. Отличительной особенностью блокирующего кода, является то, что глубоко внутри произойдет вызов метода, который заставит Ваш поток ждать (Thread.sleep() / Socket.read() и многие подобные), что для реактора - как нож в спину. Что же делать? Большинство бизнес логики завязано на базу данных, без нее никуда. На самом деле достаточно знать и уметь делать 2 вещи:

  • Необходимо понимать где блокирующий код. В этом может помочь проект BlockHound или его аналоги (тут тема для отдельной статьи)

  • Исполнение блокирующего кода необходимо переключать на пулы, готовые его выполнять, например: Schedulers.boundedElastic(). Делается это при помощи операторов publishOn & subscribeOn

Разгоняемся сами

Перед тем, как продолжить, необходимо немного размяться!

Уровень 1

    @Test    fun testLevel1() {        val result = Mono.just("")            .map { "123" }            .block()        assertEquals("123", result)    }

Начнем с простого, такой код обычно пишут начинающие reactor программисты. Как начать цепочку? Mono.just и ты на коне :) Оператор map трансформирует пустую строку в "123" и оператор block делает subscribe.

Обращаю особенное внимание на оператор block, не поддавайтесь соблазну использовать его в Вашем коде, исключение составляют тесты, где это очень удобно. При вызове block внутри метода Вашего RestController, Вы сразу получите исключение в рантайме.

Уровень 2

    fun nonBlockingMethod1sec(data: String)     = data.toMono().delayElement(Duration.ofMillis(1000))    @Test    fun testLevel2() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { nonBlockingMethod1sec(it) }            .block()        assertEquals("Hello world", result)    }

Усложняем наш код, добавляем неблокирующий метод nonBlockingMethod1sec, все что он делает - ожидает одну секунду. Все что делает данный код - дважды, по очереди, запускает неблокирующий метод.

Уровень 3

    fun collectTasks() = (0..99)    @Test    fun testLevel3() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { businessContext ->                collectTasks()                    .toFlux()                    .map {                        businessContext + it                    }                    .collectList()            }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Начинаем добавлять самое интересное - Flux! У нас появляется метод collectTasks, который собирает массив из сотни чисел, и далее мы делаем из него Flux - это будет наш список задач. К каждой задаче мы применяем трансформацию через оператор map. Оператор collectList собирает все результаты в итоговый список для дальнейшего использования.

Здесь наш код начинает превращаться в рабочий паттерн, который можно использовать для массового выполнения задач. Сначала мы собираем некий "бизнес контекст", который мы используем в дальнейшем для выполнения задач.

Уровень 4

    fun collectTasks() = (0..100)        @Test    fun testLevel4() {        val result = nonBlockingMethod1sec("Hello world")            .flatMap { businessContext ->                collectTasks().toFlux()                    .flatMap {                        Mono.deferContextual { reactiveContext ->                            val hash = businessContext + it + reactiveContext["requestId"]                            hash.toMono()                        }                    }.collectList()            }            .contextWrite { it.put("requestId", UUID.randomUUID().toString()) }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Добавляем немного плюшек. Появилась запись данных (15) в реактивный контекст, а также чтение (10) из него. Мы почти у цели. Постепенно переходим к итоговому варианту

Уровень 5

    fun collectTasks() = (0..1000)        fun doSomethingNonBlocking(data: String)        = data.toMono().delayElement(Duration.ofMillis(1000))        fun doSomethingBlocking(data: String): String {        Thread.sleep(1000); return data    }    val pool = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "test-pool")    private val logger = getLogger()    @Test    fun testLevel5() {        val counter = AtomicInteger(0)        val result = nonBlockingMethod1sec("Hello world")            .flatMap { _ ->                collectTasks().toFlux()                    .parallel()                    .runOn(pool)                    .flatMap {                        Mono.deferContextual { _ ->                            doSomethingNonBlocking(it.toString())                                .doOnRequest { logger.info("Added task in pool ${counter.incrementAndGet()}") }                                .doOnNext { logger.info("Non blocking code finished ${counter.get()}") }                                .map { doSomethingBlocking(it) }                                .doOnNext { logger.info("Removed task from pool ${counter.decrementAndGet()}") }                        }                    }.sequential()                    .collectList()            }            .block()!!        assertEquals(collectTasks().toList().size, result.size)    }

Вот мы и добрались до итогового варианта! Часть с реактивным контекстом была опущена для более наглядной демонстрации того, зачем мы здесь собрались. У нас появились два новых метода: doSomethingNonBlocking (3) & doSomethingBlocking (6) - один с неблокирующим ожиданием в секунду, второй с блокирующим. Мы создали пул потоков для обработки задач (10), добавили счетчик активных задач в реакторе (15). У нас появился оператор parallel (19) и обратный ему sequential (29). Задачи мы назначили на свежесозданный пул (20). Для понимания, что же происходит внутри, добавили логирование внутри операторов doOnRequest (вызывается перед исполнением метода), doOnNext (вызывается после исполнения метода). Основная задумка - на примере, определить сколько задач одновременно выполняется в реакторе и за какое время цепочка завершит свою работу.

Такой "паттерн", мы с коллегами очень часто применяем для выполнения сложных задач, таких как отправка отчетов или массовая обработка транзакций. Первым делом собирается бизнес контекст - это некая структура, содержащая в себе информацию, полученную в результате вызовов других микросервисов. Бизнес контекст необходим нам для выполнения самих задач, и собирается он заранее, чтобы не тратить время в процессе обработки. Далее мы собираем список задач, превращаем их во Flux и скармливаем реактору на параллельную обработку.

И вот здесь начинается самое интересное. Попробуйте ответить на несколько вопросов. Как Вы считаете, сколько времени будет выполнятся данная цепочка? В ней 100 задач, в каждой задаче неблокирующее ожидание в 1 секунду, блокирующее ожидание в 1 секунду, и у нас в наличии пул из 10 потоков? (Вполне годная задачка на собеседование senior reactor developer :))

Правильный ответ

Около 12 секунд. Рассуждаем от блокирующего :) Блокирующее ожидание никуда не деть, и тут имеем 100 блокирующих секунд на 10 потоков, итого 10 секунд. Неблокирующее ожидание заметно нам лишь в первый раз, далее оно незаметно запускается в передышках между блокирующим. Не забываем про одну секунду сбора "бизнес контекста" перед запуском задач.

А теперь уберем строку (26) .map { doSomethingBlocking(it) } . Освободим наш реактор от блокирующего кода, интересно, сколько теперь времени займет выполнение цепочки?

Правильный ответ

2 секунды! 1 на сбор "бизнес контекста" и 1 на выполнение всех задач. Реактор запустит 100 задач одновременно. Но ведь у нас пул из 10 потоков? Как так? Первый разрыв шаблона.

Мы идем до конца и увеличиваем количество задач в методе collectTasks() до ... 1000? а может быть сразу до 15000? Как долго реактор будет выполнять столько задач?

Правильный ответ

2 секунды! 1 на сбор "бизнес контекста" и 1 на выполнение всех задач. Реактор запустит ВСЕ задачи одновременно. Второй разрыв шаблона. Где предел?

А это вообще легально?

Как же так и как это контролировать? Почему это опасно? Что если внутри параллельной обработки Вы решите вызвать другой микросервис? Если у вас 30000 задач, и по завершению каждой, Вам нужно отправлять запрос соседнему микросервису, Вы с удивлением можете обнаружить, что реактор непременно постарается выполнить все вызовы одновременно (Вы ведь используете реактивный web-client или реактивный feign, верно?) Открытие такого большого количества сокетов повлечет за собой превышение лимита открытых файловых дескрипторов в системе, что как минимум создаст проблемы с невозможностью создания новых сокетов в системе и помешает другим сервисам, а как максимум повалит Вам на сервере SSH и Вы потеряете доступ к серверу. Сомневаюсь, что в этот момент, программист будет кричать "зато смотри как быстро работает".

Разрыв шаблона. Thread Pool & Reactor

Основная проблема начинающего реактор программиста - это образ мышления, если есть медленный процесс - добавь X потоков, будет быстрее в X раз, а если слишком быстро - сократи количество потоков. Как всё просто было раньше? :) С реактором это не работает.

Классический thread pool - двери. Больше дверей - больше пропускная способность, все работает быстрее.

Теперь встречайте reactor! Вы видите двери? Нет никаких дверей

Реактор это большой мешок с подарками, или воздушная труба, задачи в которую валятся и летают там пока не выполнятся. А кто эти люди в желтом? Это наши epoll реактивные потоки, которые ни в коем случае нельзя нагружать блокирующими задачами. Можно провести аналогию с прорабами или инженерами. Они здесь, чтобы управлять процессом, а не чтобы выполнять тяжелую работу. Займите одного инженера тяжелой задачей, и когда к нему придет следующий рабочий с вопросом "что делать дальше?", он не сможет ответить, потому что был занят. Вот так и появляются таймауты в реактивном коде. Казалось бы микросервис стоит без нагрузки, выполняет какие-то задачки, а один из 500 запросов к нему падает с тайм-аутом, и непонятно почему. Велика вероятность что инженер был занят блокирующей задачей! Заботьтесь о своих инженерах и поручайте тяжелую работу специально обученным рабочим, например, Schedulers.boundedElastic().

Как контролировать эту "трубу", в которую валится всё без контроля? Вот мы и подошли к кульминации

Конфигурируем реактор!

В своей дефолтной конфигурации, параллельная обработка в реакторе зависит от количества ядер процессора сервера, на котором запускается код, поэтому, к своему удивлению, Вы получите разные результаты, проверяя работу реактора в тесте на локальной машине с 4-8 ядрами и production сервере с 32 ядрами.

Парад настроек открывает parallel с его аргументом parallelism

Меняя parallelism, мы можем регулировать количество запускаемых rails (это местное понятие реактора, которое похоже на корутины, но по сути является количеством одновременно выполняемых неблокирующих задач). Prefetch мы рассмотрим более подробно в следующем разделе.

Но одного parallelism недостаточно, реактор все еще будет нагребать задач как не в себя.

Мало кто обращал внимание что у оператора flatMap (только того что запускается на Flux) есть перегрузки с интересными аргументами, а именно maxConcurrency

maxConcurrency очень важен, по дефолту значение стоит Integer.MAX_VALUE (определяет сколько неблокирующих задач может выполняться одновременно на одной рельсе. Понимаете теперь откуда аппетит у реактора?

Также, не стоит забывать, что если цепочка будет запущена несколько раз (вызов одного http метода контроллера несколько раз), то все помножится! Никакой пул не спасет.

Количество запусков цепочки напрямую влияет на количество одновременно выполняемых задач.

Подведем небольшой итог:

  • parallel (parallelism)

  • flatMap (maxConcurrency)

  • Количество запусков цепочки

Эти три параметра являются множителями, для расчета количества одновременных задач.

По дефолту это Кол-во ядер * Integer.MAX_VALUE * Количество запусков цепочки

Напротив же, запустив данный код для 5 задач длительностью в секунду мы получим цепочку работающую 5 секунд. Теперь всё под контролем!

        val result = nonBlockingMethod1sec("Hello world")            .flatMap { _ ->                collectTasks().toFlux()                    .parallel(1)                    .runOn(pool, 1)                    .flatMap({                        Mono.deferContextual { _ ->                            doSomethingNonBlocking(it.toString())                        }                    }, false, 1, 1)                    .sequential()                    .collectList()            }            .block()!!

Стоп, или не всё?

Thread Pool

Зачем же нужен пул потоков в реакторе? Думайте о нем как о двигателе для Вашего автомобиля. Чем пул мощнее - тем блокирующие задачи будут разбираться быстрее, а если потоков мало, то и блокирующие задачи задержатся у вас надолго! А куда же мы без блокирующих вызовов? На количество одновременно выполняемых задач в реакторе он не влияет, вот это поворот :)

Надеюсь, Вы не пробовали использовать Schedulers.parallel() для исполнения Вашего блокирующего кода? =) Несмотря на свое подходящее название (ну называется он parallel, значит и нужен для параллельной обработки) использовать этот пул можно только для неблокирующего кода, в доке указано что он живет с одним воркером, и содержит в себе только особенные, реактивные потоки.

Распределение задач по рельсам

Не коснулись мы еще одной важной темы. Обычно, мы пытаемся закончить обработку большого массива данных в кратчайший срок, с чем нам определенно поможет изложенный выше материал, но это еще не все. В тестах мы часто используем синтетические данные, которые генерируем одинаковыми порциями, исключая погрешности production среды. Задачи обычно выполняются разное время и это создает проблемы с равномерным распределением задач.

Зеленые прямоугольники это наши задачи, которые распределяются в реакторе по алгоритму round-robin, что в случае с синтетическими данными дает красивую картинку.

Хорошо загруженный реактор (задачи равномерно распределены). 54 блокирующих задачи (каждая по 1сек), round-robin распределение по 6 рельсамХорошо загруженный реактор (задачи равномерно распределены). 54 блокирующих задачи (каждая по 1сек), round-robin распределение по 6 рельсам

Но запуская код в production среде, мы можем встретиться с долгим запросом в базу, сетевыми задержками, плохим настроением микросервиса да и чего только не бывает.

Плохо загруженный пул (задачи распределены не равномерно)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамПлохо загруженный пул (задачи распределены не равномерно)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсам

Оператор collectList() вернет нам результат только после завершения последней задачи, и как мы видим, наш пул будет простаивать пока 1 поток трудится разгребая очередь накопившихся задач. Это создает неприятные задержки, когда Вы знаете что можно быстрее, но быстрее не происходит.

Бороться с этим можно несколькими способами

  • concatMap вместо flatMap (посмотрите в профилировщик на ваш пул, передумаете)

  • правильно планировать задачи, чтобы исключить аномалии (почти невозможно)

  • дробить каждую задачу на много мелких, и также запускать их в параллельную обработку чтобы нивелировать проблемы с распределением (вполне рабочий вариант)

  • prefetch (наш выбор!)

Параметр prefetch у flatMap & runOn позволяет определить, сколько задач будет взято на одну рельсу на старте, а затем при достижении некоторого порога выполнения задач, реквесты будут повторяться с этим количеством. Значение по умолчанию - 256. Сменив значение на 1, можно заставить реактор использовать механизм "work stealing", при котором, рельсы и потоки, которые освободились, будут забирать задачи себе на выполнение и картина получится гораздо более приятная.

Хорошо загруженный пул (задачи равномерно распределены)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамPrefetch !Хорошо загруженный пул (задачи равномерно распределены)54 блокирующих задачи (каждая по 1сек кроме 2ух), round-robin распределение по 6 рельсамPrefetch !

На этом у меня всё. Будет интересно прочесть Ваши замечания и комментарии, на 100% истину не претендую, но все результаты подкреплены практическими примерами, на Spring Boot + Project Reactor 3.4. Всем спасибо!

Подробнее..
Категории: Kotlin , Java , Concurrency , Parallel , Reactor , Parallelism , Mono , Threads , Flux , Pool , Prefetch

Очередь отложенных событий delayedQueue

22.08.2020 10:17:49 | Автор: admin

Пару лет назад в одном из проектов мы столкнулись с необходимостью откладывать выполнение некоего действия на определенный промежуток времени. Например, узнать статус платежа через три часа или повторно отправить уведомление через 45 минут. Однако на тот момент мы не нашли подходящих библиотек, способных "откладывать" и не требующих дополнительного времени на настройку и эксплуатацию. Мы проанализировали возможные варианты и написали собственную маленькую библиотеку delayed queue на Java с использованием Redis в роли хранилища. В этой статье я расскажу про возможности библиотеки, ее альтернативы и те "грабли", на которые мы наткнулись в процессе.


Функциональность


Итак, что же делает delayed queue? Событие, добавленное в отложенную очередь, доставляется обработчику через указанный промежуток времени. Если процесс обработки завершается неудачно, событие будет доставлено снова позднее. При этом максимальное количество попыток ограничено. Redis не дает гарантий сохранности, и к потере событий нужно быть готовым. Однако в кластерном варианте Redis показывает достаточно высокую надежность, и мы ни разу не столкнулись с этим за полтора года эксплуатации.


API


Добавить событие в очередь


eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();

Обратите внимание, что метод возвращает Mono, поэтому для запуска надо выполнить одно из следующих действия:


  • subscribe(...)
  • block()

Более подробные разъяснения приводятся в документации по Project Reactor. Контекст добавляется к событию следующим образом:


eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();

Зарегистрировать обработчик событий


eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);

если вместе с событием необходимо обработать пришедший контекст, то:


eventService.addHandler(        DummyEvent.class,        e -> Mono            .subscriberContext()            .doOnNext(ctx -> {                Map<String, String> eventContext = ctx.get("eventContext");                log.info("context key {}", eventContext.get("key"));            })            .thenReturn(true),        1);

Удалить обработчик событий


eventService.removeHandler(DummyEvent.class);

Создание сервиса


Можно воспользоваться настройками "по-умолчанию":


import static com.github.fred84.queue.DelayedEventService.delayedEventService;var eventService = delayedEventService().client(redisClient).build();

или сконфигурировать всё самому:


import static com.github.fred84.queue.DelayedEventService.delayedEventService;var eventService = delayedEventService()        .client(redisClient)        .mapper(objectMapper)        .handlerScheduler(Schedulers.fromExecutorService(executor))        .schedulingInterval(Duration.ofSeconds(1))        .schedulingBatchSize(SCHEDULING_BATCH_SIZE)        .enableScheduling(false)        .pollingTimeout(POLLING_TIMEOUT)        .eventContextHandler(new DefaultEventContextHandler())        .dataSetPrefix("")        .retryAttempts(10)        .metrics(new NoopMetrics())        .refreshSubscriptionsInterval(Duration.ofMinutes(5))        .build();

Завершить работу сервиса (и всех открытых им соединений в Redis) можно eventService.close() или через фреймворк, поддерживающий аннотацию @javax.annotation.PreDestroy.


Метрики


С любой системой что-то может пойти не так, за ней надо приглядывать. В первую очередь вас должны интересовать:


  • общий размер памяти, используемый Redis;
  • количество событий, готовых к обработке (метрика "delayed.queue.ready.for.handling.count" и тэгом конкретного типа события)

История


Теперь в двух словах о том, как появился и развивался delayed queue. В 2018 году
наш маленький проект был запущен в Amazon Web Services.
Он разрабатывался и поддерживался двумя инженерами, и добавлять в него требующие обслуживания компоненты было накладно с точки зрения времени обслуживания системы. Основным правилом было: "используй подходящие компоненты, обслуживаемые Amazon-ом, если это не стоит очень дорого".


Готовые кандидаты


Мы рассматривали:



Первые два были отсеяны из-за необходимости их настраивать и обслуживать, к тому же с JMS не доставало опыта работы. SQS исключили, поскольку максимальное время задержки не превышает 15 минут.


Первая наивная реализация


На момент старта у нас уже был резервный вариант с обходом "зависших сущностей" в реляционной БД раз в сутки. Почитав статьи про организацию очередей отложенных событий, мы выбрали Redis со следующей структурой:


  • событие добавляется в sorted sets, где весом выступает время ее будущего выполнения
  • по наступлению времени выполнения событие перекладывается из "sorted_set" в "list" (может использоваться в режиме очереди)

Забегая вперед, на тот момент уже полгода существовал проект Netflix dyno-queues
с примерно похожим принципом работы. Однако тогда я его, к сожалению, еще не нашел.


Первая версия диспетчера, который перекладывал "созревшие события" из sorted set в list, выглядела примерно так (здесь и далее приведен упрощенный код):


var events = redis.zrangebyscore("delayed_events", Range.create(-1, System.currentTimeMillis()), 100);events.forEach(key -> {  var payload = extractPayload(key);  var listName = extractType(key);  redis.lpush(listName, payload);  redis.zrem("delayed_events", key);});

Обработчики событий были сделаны поверх Spring Integration, который в свою очередь фактически делал:


redis.brpop(listName)

Первые проблемы не заставили себя долго ждать.


Ненадежный диспетчер


При возникновении ошибки при добавлении в "list" (например, отвалилось соединение), событие помещалось в list несколько раз. Поскольку Redis поддерживает транзакции, мы просто обернули эти 2 метода.


events.forEach(key -> {  ...  redis.multi();  redis.zrem("delayed_events", key);  redis.lpush(listName, payload);  redis.exec();});

Ненадежный обработчик


С другой стороны list-a нас поджидала еще одна проблема. Событие пропадало навсегда, если ошибка происходила внутри обработчика. Решением стала замена удаления элемента из "sorted_set" на перезапись его на более позднее время и удаление только после успешного завершения обработки.


events.forEach(key -> {  ...  redis.multi();  redis.zadd("delayed_events", nextAttempt(key))  redis.zrem("delayed_events", key);  redis.lpush(listName, payload);  redis.exec();});

Не уникальное событие


Как я уже упоминал, у нас изначально был запасной механизм, который обходил "зависшие сущности" в БД и добавлял в "delayed queue" еще одно. Внутри "sorted set" ключ выглядел как
metadata;payload, где payload у нас неизменный, а вот metadata у следующей попытки для одного и того-же события отличалась. В итоге мы могли получить дубликат и много ненужных повторных попыток обработки. Эту ситуацию мы решили, вынеся изменяемую metadata и неизменный payload в Redis hset и оставив в "sorted set" только тип и идентификатор события.
В итоге регистрация события превратилась из


var envelope = metadata + SEPARATOR + payload;redis.zadd(envelope, scheduledAt);

в


var envelope = metadata + SEPARATOR + payload;var key = eventType + SEPARATOR + eventId;redis.multi();redis.zadd(key, scheduledAt);redis.hset("metadata", key, envelope)redis.exec();

Последовательный запуск диспетчера


Все наши обработчики были идемпотентными, и мы не беспокоились о дубликатах событий. Тем не менее, на нескольких экземплярах приложения диспечеры иногда запускались одновременно и добавляли в list одно и то же событие. Добавление банальной блокировки с коротким TTL сделало код чуть более эффективным:


redis.set(lockKey, "value", ex(lockTimeout.toMillis() * 1000).nx());

Эволюция в отдельный проект


Когда такую же задачу понадобилось решить в проекте без Spring, функционал был вынесен в самостоятельную библиотеку. "Под нож" пошли зависимости:



Первая была легко заменена на использование Lettuce напрямую, а со второй все оказалось чуть сложнее. К этому моменту у меня был небольшой опыт работы с реактивными стримами в общем и с Project Reactor в частности, поэтому источником событий для обработчика стал "горячий стрим".
Чтобы добиться равномерного распределения событий между обработчиками в разных экземплярах приложения, пришлось реализовать свой собственный Subscriber


redis  .reactive()  .brpop(timeout, queue)  .map(e -> deserialize(e))  .subscribe(new InnerSubscriber<>(handler, ... params ..))

и


class InnerSubscriber<T extends Event> extends BaseSubscriber<EventEnvelope<T>> {    @Override    protected void hookOnNext(@NotNull EventEnvelope<T> envelope) {        Mono<Boolean> promise = handler.apply(envelope.getPayload());        promise.subscribe(r -> request(1));    }}

В итоге мы получили библиотеку, которая сама доставляет события в зарегистрированные обработчики (в отличии от Netflix dyno queue, гда надо самому poll-ить события).


Что планируем дальше?


  • добавить Kotlin DSL. Новые проекты я все чаще начинаю на Kotlin и использовать suspend fun вместо API Project Reactor будет удобнее
  • сделать настраиваемыми интервалы между повторными попытками

Ccылки


Подробнее..
Категории: Redis , Java , Queue , Reactor

1000 и 1 способ сесть на мель в Spring WebFlux при написании высоконагруженного сервиса

29.04.2021 10:23:13 | Автор: admin

Источник изображения: Shutterstock.com/photowind

Добрый день, меня зовут Тараканов Анатолий, я senior java разработчик SberDevices. 2.5 года программирую на Java, до этого 6 лет писал на C# и 1 год на Scala. Хочу поделиться опытом создания сервиса-оркестратора Voice Processing Service. Он является точкой входа для пользователей семейства виртуальных ассистентов Салют. Через него также проходит часть трафика приложений SmartMarket, где любой разработчик может написать навык для наших виртуальных ассистентов Салют. Одним словом, на сервис приходится немалая нагрузка. Давайте посмотрим, какие проблемы при его создании возникли и как мы их решали, а также сколько времени ушло на поиск причин. И всё это в контексте реактивного фреймворка Spring WebFlux.

Немного о сервисе


Начнем с обзора архитектуры нашего сервиса-оркестратора. Он управляет процессом обработки входящего трафика от пользователей, формированием и передачей ответа. Среди смежных систем, к которым он обращается, есть такие сервисы:

  • идентификации по токену, а также голосовым и видеоданным;
  • насыщения запроса дополнительными данными о пользователе и истории взаимодействия;
  • преобразования речевого сигнала в текстовое представление;
  • обработки естественного языка;
  • преобразования текста в голосовое представление;
  • запуска пилотных фич;
  • распознавания музыки и другие.


Как видно, смежных систем немало. API части из них доступны по REST-у запрос-ответ, другие по Socket-у потоковая передача данных.

Сервис хостится в нескольких ЦОДах, в том числе в SberCloud, горизонтально масштабируется в OpenShift. Для передачи, поиска и хранения логов используется ELK-стек, для трассировки Jaeger, для сбора метрик Prometheus, а для их отображения Grafana.



Каждый инстанс в секунду держит нагрузку примерно в 7000 пакетов (средний размер пакета 3000 байт). Это эквивалентно активности 400 пользователей, которые без перерыва обращаются к виртуальному ассистенту. С учётом взаимодействия нашего сервиса со смежными число пакетов увеличивается втрое до 21 000.
Каждая виртуалка имеет 3 ядра и 8 Gb оперативной памяти.

Сервис создавался в реалиях стартапа, а значит неопределенности. Были такие вводные:

  • поддержка TLS/mTLS;
  • WebSocket с клиентом;
  • текстовый, голосовой стриминг;
  • отказоустойчивость 99.99;
  • высокая нагрузка;
  • масса смежных систем в перспективе и необходимость в гибком формате контракта.

В этих реалиях мы выбрали такие технологии:

  • Java 11 с Gradle;
  • JSON/Protobuf на транспортном уровне.

Оба формата позволяют легко шерить контракт между командами, программирующими на разных языках. Для обоих форматов есть решения автогенерации DTO в нотации многих языков на базе исходной структуры. Оба позволяют без боли вносить изменения в контракт.

А ещё мы использовали Junit 5 и Mokito для тестирования и несколько библиотек Nimbus JOSE + JWT, Google Guava, Lombok, vavr.io для удобства в виде синтаксического сахара и автогенерации кода.

Оценив требования, мы решили втащить в наш технологический стек Spring WebFlux с Reactor и Netty под капотом.

Итак, поговорим о нюансах использования этого реактивного фреймворка.

Кастомизация Netty-сервера


Сразу отмечу, что не все настройки Netty-сервера доступны через спринговые проперти или аргументы командной строки. А ещё иногда необходимо повесить логику на события самого сервера и подключения, заменить стандартный обработчик события в рамках подключения и добавить свой регистратор метрик.

Так вот, всё это можно сделать в компоненте, имплементирующем WebServerFactoryCustomizer. В его методе доступны как HttpServer, так и каждое клиентское подключение.



Пропущу этап создания сервиса и сразу перейду к его сдаче на стенд нагрузочного тестирования. В тот же день мы получили фидбэк о том, что с течением времени число незакрытых соединений растёт. Стали искать причину анализировали tcp-дампы с помощью WireShark. Выяснилось, что клиент не присылал сигнал о закрытии соединения, а также, что в реакторе по умолчанию не инициализируются обработчики таймаутов на входящие/исходящие пакеты. Для исправления ситуации в вышеуказанный компонент был добавлен такой обработчик.



На просмотр логов, анализ ситуации со смежниками, сбор дампов и их анализ, исправление и тестирование у нас ушло 2 дня. Немало.

Следующей проявившейся под нагрузкой проблемой было то, что спустя порядка 30 минут после начала теста смежные сервисы, доступные по RESTу, стали иногда отвечать на запросы ошибкой Сonnection reset by peer. Мы снова отправились смотреть логи, дампы. Оказалось, дело было в том, что при инициализации HttpClient-а фабричным методом .create(), размер пула соединений по умолчанию будет равен 16 или числу процессоров, умноженному на два. Со своей логикой выселения, ожидания свободного соединения и многим другим. И это всё на каждый тип протокола.



Фреймворк таким образом нам помогает сэкономить на хэндшейках, построении маршрута в сети, что, конечно, приятно, когда есть корреляция хотя бы по ttl между ним и настройками смежных сервисов и операционных систем в месте их хостинга.

Но всего этого не было, поэтому на время при взаимодействии с такими клиентами мы стали применять ConnectionProvider с отключенным пулом.



Поиск причины такого поведения съел 3 дня, это больно.

Мы развивали наш сервис дальше, накручивали логику, сценарии становились всё сложнее и вот, в один прекрасный день с нагрузочного тестирования пришла печальная весть: мы перестали держать ожидаемую нагрузку. Что обычно делают в таком случае берут в руку JFR и профилируют. Так мы и поступили. Результат не заставил себя долго ждать. Мы обнаружили, что при написании fluent-цепочек вызовов методов Flux-ов о декомпозиции логики в функциональном стиле стоит забыть.



В приведенном фрагменте кода замеряется работа флакса из 100_000 элементов с 1 реактивным методом, во втором с 6 методами. Тест проверяет, что первый метод работает вдвое быстрее второго, причем число итераций проверок не играет роли.

Почему так? Потому что на каждом этапе вызова методов .map/.filter/.flatmap/.switchOnFirst/.window и других создается Publisher, Subscriber и другие специфичные каждому из этих методов объекты. В момент подписки происходит вызов Publisher и Subscriber вверх по fluent-цепочке. Все эти накладные расходы можно наглядно увидеть в стектрейсах. Эту проблему решали 3 дня, такого рода рефакторинг недешёвое удовольствие.

Итак, мы отрефачили код, прошли тестирование. Ещё несколько месяцев обрастали новыми смежными сервисами и сценариями потоковой обработки. Как вы думаете, какие грабли пришли в движение следующими? Те, что находятся в ядре самого WebFlux-а. В логике сервиса было с десяток мест с методами .groupBy и .flatMap. В связи с особенностями контракта и бизнес-логики они порождали множество очередей, к тому же при некоторых стечениях обстоятельств группы не финализировались. Приклад не падал, но процессинг останавливался, а перед этим происходила утечка памяти.

Кстати, на гитхабе много вопросов по этой теме. Если отвечать коротко, то стоит заглядывать вглубь каждого метода. Там может быть много интересного: от ограничений по размеру внутренней очереди, volatile чтений/записей, до порождения потенциально бесконечного числа очередей, которые сами собой не зафиналятся. Подробнее здесь: github.com/reactor/reactor-core/issues/596.



Вот, собственно, простой тест с замиранием процессинга.



Как видно, последняя запись в логе 255 элемент. Если заглянуть в документацию, то причина такого поведения станет очевидна, но кто её читает?) Особенно когда методы имеют такие говорящие и всем привычные названия.

Были проблемы и с методом .windowWhile. Вот ссылка на найденный нами в этом методе баг. Отмена подписки на его источник данных останавливала работу оператора.

С фреймворком Spring WebFlux нужно быть очень аккуратным. В том числе нужно следить за тем, какой паблишер возвращается методом. На какие-то можно подписываться неограниченное число раз (FluxReplay), но они имеют нюанс с размером буфера, другие возвращают один и тот же элемент каждому новому подписчику (MonoDefer).

Несколько эффективных и дешёвых оптимизаций


  • Переход на Z Garbage Collector сильно улучшил производительность, а интервалы простоя приложения во время сборки мусора сократились с 200 мс до 20 мс.

  • С той же версией приложения и под той же нагрузкой G1 давал пилу с большими зубьями по таймингам, Major GC вообще шёл вразнос, так как не хватало CPU на I/O-операции. В то же время ZGC / Shenandoah GC сократили пилу раз в 10.

  • Если ваш сервис занимается передачей тяжеловесных данных (голоса или видео) стоит внимательно посмотреть на io.netty.buffer и пользоваться его возможностями. Профилирование показало, что его использование позволило вдвое уменьшить основную категорию мусора в памяти.

  • Использование метрик Reactor Netty вместе с профилированием показали, что на криптографию уходила уйма времени, поэтому мы перешли с JDK SSL на Open SSL. Это в 2 раза ускорило приклад.

Используйте JFR + JMC, именно они подсветили все эти проблемы. Во время ревью кода можно сделать неверные выводы, бенчмарк для отдельных маленьких операций можно некорректно написать и получить непоказательные результаты, но flame graph/monitor wait/thread park/GC-разделы в JMC подсветят реальные проблемы.

В качестве итогов


Reactor Netty удобен, гибок и быстр. Что касается текущей реализации Spring WebFlux, то она позволяет добиться высокой производительности, даже если сервис процессит большой объем событий в единицу времени в рамках каждого подключения и содержит витиеватую логику с нелинейной обработкой и ветвлениями.

Но придётся следовать трём правилам:

размеры очередей и буферов под капотом требуют тонкой настройки. Также нужно помнить, что использование фреймворка на каждом шагу порождает новый источник данных, который нужно закрывать;

следует избегать тяжеловесных .groupBy и .flatMap, лучше использовать .handle и .flatMapIterable, где возможно;



цепочки вызовов методов фреймворка должны быть короткими, а декомпозицией логики обработки лучше заниматься в своих методах, либо использовать Operators.lift и Sinks для создания своего реактивного оператора.



И повторюсь, читайте документацию и профилируйте. Этот кактус нужно научиться готовить прежде, чем есть, иначе в ходе трапезы он станет только больше.


Источник изображения: Shutterstock.com/SEE D JAN

Отдельного рассказа заслуживают нюансы применения сборщиков мусора (GC), инструментов JFR/JMC, особенности работы с буферами и очередями в Spring WebFlux, а также тонкости настройки Netty-сервера.
Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru