Русский
Русский
English
Статистика
Реклама

Из песочницы Apache Kafka и тестирование с Kafka Server

Введение


Существуют различные способы для написания тестов с использованием Apache Kafka. К примеру, можно использовать TestContainers и EmbeddedKafka. Об этом можно почитать, к примеру, вот здесь: Подводные камни тестирования Kafka Streams. Но существует и вариант для написания тестов с использованием KafkaServer.


Что будет тестироваться?


Предположим, необходимо разработать сервис отправки сообщений по различным каналам: email, telegram и т.п.


Пусть имя сервиса будет: SenderService.


Сервис должен: слушать заданный канал, выделять из канала нужные ему сообщения, разбирать сообщения и отправлять по нужному каналу для конечной доставки сообщений.


Для проверки сервиса необходимо сформировать сообщение для отправки с использованием канала отправки почты и убедиться в том, что сообщение было передано в конечный канал.
Конечно, в реальных приложениях тесты будут сложнее. Но для иллюстрации выбранного подхода, такого теста будет достаточно.


Сервис и тест реализованы с использованием: Java 1.8, Kafka 2.1.0, JUnit 5.5.2, Maven 3.6.1.


Сервис


Сервис будет иметь возможность начать работу и остановить свою работу.


void start()void stop()

При старте необходимо задать, как минимум, следующие параметры:


String bootstrapServersString senderTopicEmailService emailService

bootstrapServers адрес kafka.
senderTopic топик, из которого будут считываться сообщения.
emailService сервис для конечной отправки сообщений по почте.


В реальном сервисе таких конечных сервисов будет столько же сколько и конечных каналов отправки сообщений.


Теперь необходим потребитель, который слушает канал, фильтрует и отправляет сообщения в конечные каналы. Количество таких потребителей можно выбирать. Подход для написания потребителя описан вот здесь: Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.


Collection<AutoCloseable> closeables = new ArrayList<>();ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);for (int i = 0; i < senderTasksN; i++) {    SenderConsumerLoop senderConsumerLoop =            new SenderConsumerLoop(                    bootstrapServers,                    senderTopic,                    "sender",                    "sender",                    tasksExecutorService,                    emailService            );    closeables.add(senderConsumerLoop);    senderTasksExecutor.submit(senderConsumerLoop);}

В цикле создается экземпляр потребителя, запоминается в коллекции и запускается через сервис запуска задач.


При выполнении этого кода потребители начинают работать. Сервис ждет их завершения или сигнала для остановки.


Runtime.getRuntime().addShutdownHook(new Thread(() -> {    for (AutoCloseable autoCloseable : closeables) {        try {            autoCloseable.close();        } catch (Exception e) {            e.printStackTrace();        }    }    senderTasksExecutor.shutdown();    tasksExecutorService.shutdown();    stop();    try {        senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);    } catch (InterruptedException e) {        e.printStackTrace();    }}));

При завершении необходимо освободить ресурсы.


Потребитель


Потребитель имеет следующие публичные методы:


void run()void close()

Основной метод: run.


@Overridepublic void run() {    kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);    kafkaConsumer.subscribe(Collections.singleton(topic));    while (true) {        calculate(kafkaConsumer.poll(Duration.ofSeconds(1)));    }}

По входным параметрам создается экземпляр kafka-потребителя. kafka-потребитель подписывается на заданный топик. В бесконечном цикле выбираются записи из топика. И отправляются на обработку.


Для иллюстрации json-сообщения будут иметь несколько полей, которые будут задавать и тип сообщения, и данные для отправки.


Пример сообщения:


{  "subject": {    "subject_type": "send"  },  "body": {    "method": "email",    "recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",    "title": "42",    "message": "73"  }}

subject_type тип сообщения. Для сервиса нужно значение send.
method тип конечного сервиса для отправки. email отправка через почту.
recipients список получателей.
title заголовок для сообщения.
message сообщение.


Обработка всех записей:


void calculate(ConsumerRecords<String, String> records) {    for (ConsumerRecord<String, String> record : records) {        calculate(record);    }}

Обработка одной записи:


void calculate(ConsumerRecord<String, String> record) {            JSONParser jsonParser = new JSONParser();            Object parsedObject = null;            try {                parsedObject = jsonParser.parse(record.value());            } catch (ParseException e) {                e.printStackTrace();            }            if (parsedObject instanceof JSONObject) {                JSONObject jsonObject = (JSONObject) parsedObject;                JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);                String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();                if (SEND.equals(subjectType)) {                    JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);                    calculate(jsonBody);                }            }        }

Распределение сообщений по типу:


void calculate(JSONObject jsonBody) {    String method = jsonBody.get(METHOD).toString();    if (EMAIL_METHOD.equals(method)) {        String recipients = jsonBody.get(RECIPIENTS).toString();        String title = jsonBody.get(TITLE).toString();        String message = jsonBody.get(MESSAGE).toString();        sendEmail(recipients, title, message);    }}

Отправка в конечную систему:


void sendEmail(String recipients, String title, String message) {    tasksExecutorService.submit(() -> emailService.send(recipients, title, message));}

Отправка сообщений происходит через сервис исполнения задач.


Ожидания завершения отправки не происходит.


Создание kafka-потребителя:


static KafkaConsumer<String, String> createKafkaConsumerStringString(        String bootstrapServers,        String clientId,        String groupId) {    Properties properties = new Properties();    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);    properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);    properties.setProperty(            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,            "org.apache.kafka.common.serialization.StringDeserializer");    properties.setProperty(            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,            "org.apache.kafka.common.serialization.StringDeserializer");    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");    return new KafkaConsumer<>(properties);}

Интерфейс для писем:


interface EmailService {    void send(String recipients, String title, String message);}

Тест


Для теста понадобиться следующее.
Адрес kafka-сервера.
Порт для kafka-сервера.
Имя топика.


Сервис для управления kafka-сервером. Будет описан ниже.


public class SenderServiceTest {    @Test    void consumeEmail() throws InterruptedException {        String brokerHost = "127.0.0.1";        int brokerPort = 29092;        String bootstrapServers = brokerHost + ":" + brokerPort;        String senderTopic = "sender_data";        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {            kafkaServerService.start();            kafkaServerService.createTopic(senderTopic);        }    }}

Задаются параметры. Создается сервис для управления kafka-сервером. kafka-сервером стартует. Создается необходимый топик.


Создается mock конечного сервиса для отправки сообщений:


SenderService.EmailService emailService = mock(SenderService.EmailService.class);

Создается сам сервис и стартует:


SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);senderService.start();

Задаются параметры для сообщения:


String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";String title = "42";String message = "73";

Отправляется сообщение в канал:


kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));

Ожидание:


Thread.sleep(6000);

Проверка, что сообщение дошло до конечного сервиса:


verify(emailService).send(recipients, title, message);

Остановка:


senderService.stop();

Все вместе:


public class SenderServiceTest {    @Test    void consumeEmail() throws InterruptedException {        String brokerHost = "127.0.0.1";        int brokerPort = 29092;        String bootstrapServers = brokerHost + ":" + brokerPort;        String senderTopic = "sender_data";        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {            kafkaServerService.start();            kafkaServerService.createTopic(senderTopic);            SenderService.EmailService emailService = mock(SenderService.EmailService.class);            SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);            senderService.start();            String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";            String title = "42";            String message = "73";            kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));            Thread.sleep(6000);            verify(emailService).send(recipients, title, message);            senderService.stop();        }    }}

Вспомогательный код:


public class SenderFactory {    public static final String SUBJECT = "subject";    public static final String SUBJECT_TYPE = "subject_type";    public static final String BODY = "body";    public static final String METHOD = "method";    public static final String EMAIL_METHOD = "email";    public static final String RECIPIENTS = "recipients";    public static final String TITLE = "title";    public static final String MESSAGE = "message";    public static final String SEND = "send";    public static String key() {        return UUID.randomUUID().toString();    }    public static String createMessage(String method, String recipients, String title, String message) {        Map<String, Object> map = new HashMap<>();        Map<String, Object> subject = new HashMap<>();        Map<String, Object> body = new HashMap<>();        map.put(SUBJECT, subject);        subject.put(SUBJECT_TYPE, SEND);        map.put(BODY, body);        body.put(METHOD, method);        body.put(RECIPIENTS, recipients);        body.put(TITLE, title);        body.put(MESSAGE, message);        return JSONObject.toJSONString(map);    }}

Сервис для управления kafka-сервером


Основные методы:


void start()void close()void createTopic(String topic)

В методе start происходит создание сервера и вспомогательных объектов.


Создание zookeeper и сохранение его адреса:


zkServer = new EmbeddedZookeeper();String zkConnect = zkHost + ":" + zkServer.port();

Создание клиента zookeeper:


zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);zkUtils = ZkUtils.apply(zkClient, false);

Задание свойств для сервера:


Properties brokerProps = new Properties();brokerProps.setProperty("zookeeper.connect", zkConnect);brokerProps.setProperty("broker.id", "0");try {    brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());} catch (IOException e) {    throw new RuntimeException(e);}brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);brokerProps.setProperty("offsets.topic.replication.factor", "1");KafkaConfig config = new KafkaConfig(brokerProps);

Создание сервера:


kafkaServer = TestUtils.createServer(config, new MockTime());

Все вместе:


public void start() {    zkServer = new EmbeddedZookeeper();    String zkConnect = zkHost + ":" + zkServer.port();    zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);    zkUtils = ZkUtils.apply(zkClient, false);    Properties brokerProps = new Properties();    brokerProps.setProperty("zookeeper.connect", zkConnect);    brokerProps.setProperty("broker.id", "0");    try {        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());    } catch (IOException e) {        throw new RuntimeException(e);    }    brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);    brokerProps.setProperty("offsets.topic.replication.factor", "1");    KafkaConfig config = new KafkaConfig(brokerProps);    kafkaServer = TestUtils.createServer(config, new MockTime());}

Остановка сервиса:


@Overridepublic void close() {    kafkaServer.shutdown();    zkClient.close();    zkServer.shutdown();}

Создание топика:


public void createTopic(String topic) {    AdminUtils.createTopic(            zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);}

Заключение


В заключении нужно отметить, что приведенный здесь код лишь иллюстрирует выбранный способ.


Для создания и тестирования сервисов с использованием kafka можно обратиться к следующему ресурсу:
kafka-streams-examples


Ссылки и ресурсы


Исходный код


Код для тестирования с kafka-сервером

Источник: habr.com
К списку статей
Опубликовано: 12.11.2020 14:14:33
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Тестирование it-систем

Java

Apache

Kafka

Apache kafka

Junit

Services

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru