Русский
Русский
English
Статистика
Реклама

Kotlin and spring

Перевод Производительпотребитель на Kafka и Kotlin

13.07.2020 20:06:08 | Автор: admin

Перевод статьи подготовлен в преддверии старта курса Backend-разработка на Kotlin




В этой статье мы поговорим о том, как создать простое приложение на Spring Boot с Kafka и Kotlin.


Введение


Начните с посещения https://start.spring.io и добавьте следующие зависимости:


Groovy


implementation("org.springframework.boot:spring-boot-starter-data-rest")implementation("org.springframework.boot:spring-boot-starter-web")implementation("com.fasterxml.jackson.module:jackson-module-kotlin")implementation("org.apache.kafka:kafka-streams")implementation("org.jetbrains.kotlin:kotlin-reflect")implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")implementation("org.springframework.kafka:spring-kafka")

В нашем примере для сборки мы воспользуемся Gradle. Вы вполне можете выбрать Maven.


Создайте и загрузите проект. Затем импортируйте его в IntelliJ IDEA.


Скачайте Apache Kafka


Загрузите последнюю версию Apache Kafka с их сайта и распакуйте в папку. Я пользуюсь операционной системой Windows 10. При запуске Kafka вы можете столкнуться с некоторыми проблемами по типу too many lines encountered. Так происходит потому что Kafka добавляет большую структуру папок в свое имя пути. Если эта проблема не будет устранена автоматически, вам придется переименовать структуру папок как-нибудь покороче и запустить приложение из Power Shell.


Чтобы запустить Kafka, воспользуйтесь следующими командами:


Shell


.\zookeeper-server-start.bat ..\..\config\zookeeper.properties.\kafka-server-start.bat ..\..\config\server.properties

Эти две команды вы увидите в папке /bin/windows.


Чтобы запустить Kafka, сначала нужно запустить Zookeeper. Zookeeper это продукт Apache, который предоставляет сервис распределенной конфигурации.


Запуск Spring Boot


Первым шагом создайте в своей IDE класс, который называется KafkaDemoApplication.kt. При создании проекта с сайта Spring, класс будет создан автоматически.


Добавьте следующий код:


Kotlin


import org.springframework.boot.autoconfigure.SpringBootApplicationimport org.springframework.boot.runApplication@SpringBootApplicationclass KafkaDemoApplication fun main(args: Array<String>) {   runApplication<KafkaDemoApplication>(*args)}

Производитель


Мы можем отправлять сообщения в топики двумя способами. Их мы рассмотрим ниже.


Мы разработаем класс-контроллер, который нужен для отправки и получения сообщений. Назовем этот класс KafkaController.kt. И добавим следующий метод:


Kotlin


var kafkaTemplate:KafkaTemplate<String, String>? = null;val topic:String = "test_topic"@GetMapping("/send")fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {    var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!!    var sendResult: SendResult<String, String> = lf.get()    return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")}

Для отправки сообщений в топик, который называется test_topic, мы используем KafkaTemplate. Он будет возвращать объект ListenableFuture, из которого мы можем получить результат этого действия. Такой подход является самым простым, если вы просто хотите отправлять сообщение в топик.


Второй способ


Следующий способ отправки сообщения в топик Kafka это использование объекта KafkaProducer. Для этого мы напишем следующий код:


Kotlin


@GetMapping("/produce")fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {    var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)    val map = mutableMapOf<String, String>()    map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"    map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"    map["bootstrap.servers"] = "localhost:9092"    var producer = KafkaProducer<String, String>(map as Map<String, Any>?)    var future:Future<RecordMetadata> = producer?.send(producerRecord)!!    return ResponseEntity.ok(" message sent to " + future.get().topic());}

И тут нужно сделать небольшое пояснение.


Нам нужно инициализировать объект KafkaProduce с Map, которая будет содержать ключ и значение для сериализации. В нашем примере речь идет о строковом сообщении, поэтому нам нужен только StringSerializer.


В принципе, Serializer это интерфейс Kafka, который преобразует строки в байты. В Apache Kafka есть и другие сериализаторы, такие как ByteArraySerializer, ByteSerializer, FloatSerializer и др.


Для map мы указываем ключ и значение с StringSerializer.


Kotlin


map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"

Следующее значение это сведения о bootstrap-сервере, необходимые для коммуникации с кластером Kafka.


Kotlin


map["bootstrap.servers"] = "localhost:9092"

Все эти атрибуты нужны, если мы используем KafkaProducer.


Затем нам нужно создать ProducerRecord с именем топика и самим сообщением. Именно это мы и сделаем в следующей строке:


Kotlin


var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)

Теперь мы можем отправить наше сообщение в топик с помощью следующего кода:


Kotlin


var future:Future<RecordMetadata> = producer?.send(producerRecord)!!

Эта операция вернет future с именем топика, который используется для отправки сообщения.


Потребитель


Мы посмотрели, как отправлять сообщения в топики. Но нам также нужно слушать входящие сообщения. Чтобы это сделать, нужно создать слушателя, который будет потреблять сообщения.
Давайте создадим класс MessageConsumer.kt и пометим его с помощью Service.


Kotlin


@KafkaListener(topics= ["test_topic"], groupId = "test_id")fun consume(message:String) :Unit {    println(" message received from topic : $message");}

Этот метод можно использовать для прослушивания сообщения с помощью аннотации @KafkaListener и вывода сообщения в консоль, как только оно появляется в топике. Только убедитесь, что вы используете то же имя топика, что и для отправки сообщения.
Исходный код вы можете посмотреть в моем репозитории на GitHub.




Узнать подробнее о курсе Backend-разработка на Kotlin



Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru