Русский
Русский
English
Статистика
Реклама

Использование Spring Cloud Stream Binding с брокером сообщений Kafka

Всем привет! Меня зовут Виталий, я разработчик в компании Web3Tech. В этом посте я представлю основные концепции и конструкции платформы Spring Cloud Stream для поддержки и работы с брокерами сообщений Kafka, с полным циклом их контекстного unit-тестирования. Мы используем такую схему в своем проекте всероссийского электронного голосования на блокчейн-платформе Waves Enterprise.

Являясь частью группы проектов Spring Cloud, Spring Cloud Stream основан на Spring Boot и использует Spring Integration для обеспечения связи с брокерами сообщений. При этом он легко интегрируется с различными брокерами сообщений и требует минимальной конфигурации для создания event-driven или message-driven микросервисов.

Конфигурация и зависимости

Для начала нам нужно добавить зависимость spring-cloud-starter-stream-kafka в build.gradle:

dependencies {   implementation(kotlin("stdlib"))   implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")   implementation("com.fasterxml.jackson.module:jackson-module-kotlin")   implementation("org.springframework.boot:spring-boot-starter-web")   implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")   testImplementation("org.springframework.boot:spring-boot-starter-test")   testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")   testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")}

В конфигурацию проекта Spring Cloud Stream необходимо включить URL Kafka-брокера, имя очереди (топик) и другие параметры биндинга. Вот пример YAML-конфигурации для сервиса application.yaml:

spring: application:   name: cloud-stream-binding-kafka-app cloud:   stream:     kafka:       binder:         brokers: 0.0.0.0:8080         configuration:           auto-offset-reset: latest     bindings:       customChannel:                   #Channel name         destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)         group: input-group-N         contentType: application/json         consumer:           max-attempts: 1           autoCommitOffset: true           autoCommitOnError: false

Концепция и классы

По сути, мы имеем дело с сервисом, построенным на Spring Cloud Stream, который прослушивает входящую очередь, используя биндинги (SpringCloudStreamBindingKafkaApp.kt):

@EnableBinding(ProducerBinding::class)@SpringBootApplication  class SpringCloudStreamBindingKafkaApp fun main(args: Array<String>) { SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args) }

Аннотация @EnableBinding указывает сервису на биндинг как входящего, так и исходящего канала.

Здесь необходимо уточнить ряд концепций.

Binding интерфейс, в котором описаны входящие и исходящие каналы.
Binder имплементация middleware для сообщений.
Channel представляет канал для передачи сообщений между middleware и приложением.
StreamListeners методы обработки сообщений в виде бинов (beans), которые будут автоматически вызваны после того, как MessageConverter осуществит сериализацию или десериализацию между событиями в middleware и типами объектов в домене DTO.
Message Schema схемы, используемые для сериализации и десериализации сообщений. Могут быть прочитаны из источника или динамически загружены.

Тестирование

Чтобы протестировать сообщение и операции send/receive, нам нужно создать как минимум одного producer и одного consumer. Вот простейший пример того, как это можно сделать в Spring Cloud Stream.

Инстанс бина Producer будет отправлять сообщение в топик Kafka, используя биндер (ProducerBinding.kt):

interface ProducerBinding {   @Output(BINDING_TARGET_NAME)   fun messageChannel(): MessageChannel}

Инстанс бина Сonsumer будет слушать топик Kafka и получать сообщения.

ConsumerBinding.kt:

interface ConsumerBinding {   companion object {       const val BINDING_TARGET_NAME = "customChannel"   }   @Input(BINDING_TARGET_NAME)   fun messageChannel(): MessageChannel}

Consumer.kt:

@EnableBinding(ConsumerBinding::class)class Consumer(val messageService: MessageService) {   @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)   fun process(       @Payload message: Map<String, Any?>,       @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?   ) {       messageService.consume(message)   }}

Мы создали брокер Kafka с топиком. Для тестирования будем использовать встроенную Kafka, доступную нам с зависимостью spring-kafka-test.

Функциональное тестирование с MessageCollector

Мы имеем дело с имплементацией биндера, позволяющей взаимодействовать с каналами и получать сообщения. Отправим сообщение в канал ProducerBinding и затем получим его в виде payloadProducerTest.kt:

@SpringBootTestclass ProducerTest {   @Autowired   lateinit var producerBinding: ProducerBinding   @Autowired   lateinit var messageCollector: MessageCollector   @Test   fun `should produce somePayload to channel`() {       // ARRANGE       val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)       // ACTproducerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())       val payload = messageCollector.forChannel(producerBinding.messageChannel())           .poll()           .payload       // ASSERT       val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)       assertTrue(request.entries.stream().allMatch { re ->           re.value == payloadAsMap[re.key.toString()]       })       messageCollector.forChannel(producerBinding.messageChannel()).clear()   }}

Тестирование с брокером Embedded Kafka

Используем аннотацию @ClassRule для создания брокера. Так мы сможем поднять сервера Kafka и Zookeeper на случайном порте перед началом теста и выключить их, когда тест завершится. Это избавляет нас от необходимости в рабочем инстансе Kafka и Zookeper на всё время проведения теста (ConsumerTest.kt):

@SpringBootTest@ActiveProfiles("test")@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])@EnableBinding(ProducerBinding::class)class ConsumerTest {   @Autowired   lateinit var producerBinding: ProducerBinding   @Autowired   lateinit var objectMapper: ObjectMapper   @MockBean   lateinit var messageService: MessageService   companion object {       @ClassRule @JvmField       var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")   }   @Test   fun `should consume via txConsumer process`() {       // ACT       val request = mapOf(1 to "foo", 2 to "bar")       producerBinding.messageChannel().send(MessageBuilder.withPayload(request)           .setHeader("someHeaderName", "someHeaderValue")           .build())       // ASSERT       val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))       runBlocking {           delay(20)           verify(messageService).consume(requestAsMap)       }   }}

Заключение

В этом посте я продемонстрировал возможности Spring Cloud Stream и использования его с Kafka. Spring Cloud Stream предлагает удобный интерфейс с упрощенными нюансами настройки брокера, быстро внедряется, стабильно работает и поддерживает современные популярные брокеры, такие как Kafka. По итогам я привел ряд примеров с unit-тестированием на основе EmbeddedKafkaRule с использованием MessageCollector.

Все исходники можно найти на Github. Спасибо за прочтение!

Источник: habr.com
К списку статей
Опубликовано: 15.04.2021 16:11:54
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Тестирование веб-сервисов

Kotlin

Gradle

Распределенные системы

Микросервисы

Spring cloud

Kafka streams

Kafka binder

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru