Русский
Русский
English
Статистика
Реклама

Как мы Schema Registry для Kafka настраивали, и что могло пойти не так

Всем привет.

В статье я опишу, как мы настраивали реестр схем данных для того, чтобы использовать его для сериализации и десериализации сообщений Kafka.

Спойлер: на данный момент реестр схем данных настроен и используется в боевой системе, каких-то проблем, связанных с SR, замечено не было.

Исходные данные (небольшое оправдание)

Разработчикам не всегда выпадает возможность выбирать от и до инструменты, которые следует использовать при реализации той или иной задачи. Например, если вы приходите в команду, где уже начали реализовывать что-то на чем-то, то полностью изменить направление может быть неподъемной задачей. Таким образом, примем за данность, что используется Kafka для асинхронного взаимодействия сервисов - передачи сообщений по событиям.

Что? Почему? Зачем?

А вот то, что мы выбрали осознанно, так это использование Apache Avro как систему сериализации данных.Почему мы выбрали Apache Avro?

Если кратко, то это:

  • компактность представления бинарного формата данных (а также есть возможность кодировки в JSON);

  • поддержка логических типов данных: BigDecimal, дата, дата/время в нативном виде;

  • версионирование моделей;

Этих пунктов хватило, чтобы выбрать Avro как схему данных. А версию взяли 1.8.2, которая вышла в мае 2017 года.

Последняя релизная версия Avro - 1.10.1 от декабря 2020-го. В ней решены некоторые проблемы, которые нам пришлось чинить костылями, но взять ее мы не могли по определенным причинам (так как в одном из сервисов для сериализации ответа внешней системе используется схема версии 1.8.2 и поднятие версии может привести к нарушению этого взаимодействия)

До версии 1.9 в Avro используется библиотека Joda Time для обработки логических типов, связанных с датой и временем. А начиная с версии 1.9 для этого используются нативные Java 8 библиотеки.

Перед непосредственно разработкой функциональности мы все-таки попробовали новую версию Avro, но испытали проблемы и конверсиями дат (например, сериализатор не смог корректно обработать LocalDateTime). Точных примеров я не вспомню, но нам нужно было очень аккуратно передавать время в разных временных зонах, а это не самая простая часть для конвертации.

Несомненно, можно найти еще преимущества и недостатки этой схемы данных, но мы остановились на ней.

Итак, с системой сериализации определились.

Далее речь пойдет о реестре схем данных.

Реестр схем данных, или The Confluent Schema Registry for Kafka. Благодаря SR можно обеспечить совместимость схем данных между продюсером и консьюмером Kafka. И, чего нам очень хотелось, SR поможет не потерять сообщения из-за ошибок сериализации и десериализации во время эволюции схемы данных.

В процессе настройки и при тестовых прогонах мы выявили некоторые, скажем так, особенности использования SR, о которых я хочу рассказать вам и попытаюсь собрать все неувязки в одном месте.

Настройка деплоя моделей

Прежде всего, модели нужно описать в JSON-формате. Мы создали репозиторий и описали там нужные нам модели. Описывать модели в JSON просто и понятно, например:

{ "type": "record", "name": "SomeMessage", "namespace": "com.trthhrts", "fields": [   {     "name": "title",     "type": "string"   }  ]}

Если хотим nullable-поле, то добавляем в тип возможность null:

{ "type": "record", "name": "AnotherMessage", "namespace": "com.trthhrts", "fields": [    {     "name": "messageDate",     "type": [       "null",       {         "type": "long",         "logicalType": "timestamp-millis"       }     ],     "default": null    }  ]}

Модели описали, теперь воспользуемся maven-плагином для взаимодействия с SR:

<plugin>   <groupId>io.confluent</groupId>   <artifactId>kafka-schema-registry-maven-plugin</artifactId>   <version>6.0.1</version>   <configuration>     <schemaRegistryUrls>         <param>${SCHEMA_REGISTRY_URL}</param>     </schemaRegistryUrls>     <outputDirectory>${project.build}/avro</outputDirectory>     <userInfoConfig />     <subjects>       <com.trthhrts.SomeMessage>src/SomeMessage.avsc</com.trthhrts.SomeMessage>     </subjects>     <schemaTypes>       <com.trthhrts.SomeMessage>AVRO</com.trthhrts.SomeMessage>     </schemaTypes>   </configuration></plugin>

Далее смотрим здесь описание возможностей плагина, и как мы можем управлять реестром схем:

  • проверить (validate) на корректность локальную схему;

  • проверить изменение схемы на совместимость (test-compatibility);

  • скачать схему с SR (download);

  • зарегистрировать схему (register).

Мы настроили в Gitlab CI stage с деплоем схем так (ручной запуск и только на master-ветке):

Примечание: ручной запуск, потому что в репозитории находятся не только модели Avro, но, например, тесты моделей, вспомогательные библиотеки для работы с Kafka. Смысла регистрировать новые версии схем, если в них не было изменений - нет.

register_schemas: stage: deploy script:   - mvn schema-registry:test-compatibility $MAVEN_CLI_OPTS -B   - mvn schema-registry:register $MAVEN_CLI_OPTS -B when: manual only:   - master
Возможно, стоит добавить фазу validate в самое начало скрипта:
register_schemas: stage: deploy before_script:   - *scm_connect script:   - mvn schema-registry:validate $MAVEN_CLI_OPTS -B   - mvn schema-registry:test-compatibility $MAVEN_CLI_OPTS -B   - mvn schema-registry:register $MAVEN_CLI_OPTS -B when: manual only:   - master

Итак, после деплоя наши схемы успешно хранятся в Kafka. Стоп, скажете вы, в смысле в Kafka? А как же SR?! Ну, SR живет отдельно от Kafka брокеров. Это дополнительный компонент, который может быть настроен на любом Kafka кластере и который использует Kafka для хранения в том числе и схем. И, SR предоставляет пользователю REST API для взаимодействия со схемами данных.

В специальном топике Kafka (<kafkastore.topic>, по умолчанию _schemas), который имеет 1 партицию и используется как WAL. И все данные, которыми оперирует SR, добавляются как сообщения в этот лог.

Упрощенно взаимодействие с SR выглядит так:

Настройка использования Schema Registry

Чтобы использовать SR, достаточно добавить свойство с URL подключения к SR и назначить в качестве сериализатора и десериализатора соответствующие классы: KafkaAvroSerializer и KafkaAvroDeserializer

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

И для консьюмера, например, так:

    @Bean    public ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaConfigs(), new StringDeserializer(), deserializer()));        factory.setErrorHandler(errorHandler());        return factory;    }    private KafkaAvroDeserializer deserializer() {        KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();        deserializer.configure(kafkaConfigs(), false);        return deserializer;    }

НО...

Есть несколько нюансов, которые следует учесть перед тестом и тем более перед деплоем на бой.

Стратегия наименования сабжектов (subjects)

Если не кратко, то: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#sr-schemas-subject-name-strategy

А если кратко, то существует 3 стратегии:

  • TopicNameStrategy - название сабжекта основывается на названии топика (стратегия по умолчанию!);

  • RecordNameStrategy - название сабжекта основывается на названии модели данных (как способ логически объединить в группу похожие события, которые, однако, могут иметь различную структуру данных, но передавать в рамках одного топика);

  • TopicRecordNameStrategy - название сабжекта основывается и на названии топика, и на названии модели данных (опять же как способ логической группировки событий с различной структурой данных);

Для себя мы выбрали стратегию наименования по названию модели данных:

props.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class);

Выбрали такой вариант потому что:

  • топики для тестовых стендов и боевого названы по разному, а значит, выбери мы другую стратегию, в реестре схем данных копилось бы много схем для каждого топика, это привело бы к дополнительным сложностям при анализе работы и вообще поиску багов;

  • возможность на разных топиках протестировать именно те схемы, которые будут использоваться в боевом окружении.

Имейте это ввиду, если тоже зададитесь вопросом "А какую стратегию выбрать?".

Аналогично, можно задать стратегию наименования схемы для ключа (key) сообщения, но мы не используем ключ для сообщений (гарантируя очередность сообщений тем, что у нас задана только 1 партиция на брокерах), поэтому и задавать стратегию для ключа смысла нет.

auto.register.schemas

Свойство, которое отвечает за то, чтобы клиентские приложения автоматически регистрировали новые схемы в SR. Для dev-окружения это может быть полезно, но для боевого сервера все-таки лучше отключить. И да, по умолчанию это свойство активировано, имейте ввиду.

Ну а мы продюсерам это свойство отключили на первых этапах внедрения SR (еще в тестовом режиме):

props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);

use.latest.version

Применимо, только если предыдущее свойство (auto.register.schemas) выставлено в false. Если use.latest.version установлено в true, это говорит SR брать последнюю версию в сабжекте для сериализации, а не ту, которая идет вместе с данными клиента. По умолчанию false. Мы обнаружили, что сериализатор той версии, которую мы используем - 5.3.0, не задействует это свойство.

Тип совместимости схем

В SR доступно несколько типов совместимости схем. Подробно о них написано, например, в документации Confluent.

По умолчанию используется тип BACKWARD, который разрешает удалять поля из модели данных и добавлять optional-поля (те, которые могут быть null). Сравнение схемы идет только с последней версией, а при обновлении модели в первую очередь нужно обновлять консьюмеры, чтобы не получить ошибку десериализации (например, удалили поле, обновили продюсер, который послал сообщение без этого поля, а консьюмер будет его ожидать и свалится с ошибкой десериализации).

Нас устроил тип по умолчанию, не меняли.

specific.avro.reader

Мы захотели использовать типы BigDecimal и Timestamp в моделях. Чтобы такие типы корректно сериализовать и десериализовать, следует добавить это свойство:

put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

А так же добавить в класс SpecificData при запуске приложения нужные конверторы, например, так:

static {  SpecificData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());  SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());}

После этого, если ваши модели данных наследуются от SpecificRecordBase (что по умолчанию при типе записи record) - данные будут корректно сериализовываться и десериализовываться.

Вот пример описания модели с такими логическими типами:

{  "type": "record",  "name": "SomePaymentInfoModel",  "namespace": "com.trthhrts",  "fields": [    {      "name": "paymentDate",      "type": [        "null",        {          "type": "long",          "logicalType": "timestamp-millis"        }      ],      "default": null    },    {      "name": "amount",      "type": [        "null",        {          "type": "bytes",          "logicalType": "decimal",          "precision": 9,          "scale": 2        }      ],      "default": null    }  ]}

Кэширование

В KafkaAvroSerializer и KafkaAvroDeserializer используется локальное кэширование, а значит клиенты не будут каждый раз запрашивать схему у SR, только в случае отсутствия нужной схемы нужной версии в локальном кэше.

Для нас это означало, что сервисы не попадут в постоянную зависимость на доступность SR и отказ SR далеко не всегда будет означать невозможность сериализации и десериализации сообщений Kafka - по крайней мере, пока жив локальный кэш в приложении.

И все?

Нет.

В процессе тестирования и разработки мы столкнулись с не совсем очевидными вещами, о которых я расскажу, возможно, это сэкономит вам времени.

Joda Time

Avro 1.8.2 использует Joda Time библиотеку для работы с полями дат и времени. А мы пишем на Java 8 и используем нативные классы для работы с датами.

В процессе разработки был составлен класс утилитных методов для конвертаций дат. Мы нашли это наименьшей головной болью при передаче дат с часовым поясом с помощью Avro по Kafka.

Вот так мы, например, конвертируем даты перед отправкой:

convertDate(model.getBeginDate()).ifPresent(builder::setBeginDate)convertLocalDateToJodaDateTime(model.getPaymentDate()).ifPresent(builder::setPaymentDate);

А вот так, например, принимаем:

convertToLocalDateTime(message.getLastUpdate()).ifPresent(model::setLastUpdate);
Привожу класс целиком, собрали методы из разных уголков вселенной
/** * Класс с утилитными методам для конвертации типов */public final class MessageUtils {    /** Кол-во наносекунд в миллисекунде */    private static final int NANO_SECONDS_IN_MILLISECOND = 1_000_000;    /** Кол-во миллисекунд в секунде */    private static final int MILLISECONDS_IN_SECONDS = 1000;    /**     * Запрещаем создание     */    private MessageUtils() {    }    /**     * Конвертирует Joda {@link DateTime} в Java 8 {@link ZonedDateTime}     * @param dateTime дата в формате Joda {@link DateTime}     * @return {@link ZonedDateTime}     */    public static Optional<ZonedDateTime> convertToZonedDateTime(DateTime dateTime) {        return Optional.ofNullable(dateTime).map(dt -> {            DateTime localDateTime = dateTime.withZone(DateTimeZone.getDefault());            return ZonedDateTime.ofLocal(                    LocalDateTime.of(                            localDateTime.getYear(),                            localDateTime.getMonthOfYear(),                            localDateTime.getDayOfMonth(),                            localDateTime.getHourOfDay(),                            localDateTime.getMinuteOfHour(),                            localDateTime.getSecondOfMinute(),                            localDateTime.getMillisOfSecond() * NANO_SECONDS_IN_MILLISECOND),                    ZoneId.of(localDateTime.getZone().getID(), ZoneId.SHORT_IDS),                    ZoneOffset.ofTotalSeconds(localDateTime.getZone().getOffset(localDateTime) / MILLISECONDS_IN_SECONDS));        });    }    /**     * Конвертирует Joda {@link DateTime} в Java 8 {@link LocalDateTime}     * @param dateTime дата в формате Joda {@link DateTime}     * @return {@link LocalDateTime}     */    public static Optional<LocalDateTime> convertToLocalDateTime(DateTime dateTime) {        return convertToZonedDateTime(dateTime).map(ZonedDateTime::toLocalDateTime);    }    /**     * Конвертирует Joda {@link DateTime} в Java 8 {@link LocalDate}     * @param dateTime дата в формате Joda {@link DateTime}     * @return {@link LocalDate}     */    public static Optional<LocalDate> convertToLocalDate(DateTime dateTime) {        return convertToZonedDateTime(dateTime).map(ZonedDateTime::toLocalDate);    }    /**     * Конвертирует {@link LocalDate} в {@link org.joda.time.LocalDate}     * @param ld {@link LocalDate}     * @return {@link org.joda.time.LocalDate}     */    public static Optional<org.joda.time.LocalDate> convertLocalDateToJodaLocalDate(LocalDate ld) {        return Optional.ofNullable(ld).map(date -> new org.joda.time.LocalDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth()));    }    /**     * Конвертирует {@link org.joda.time.LocalDate} в {@link LocalDate}     * @param ld {@link org.joda.time.LocalDate}     * @return {@link LocalDate}     */    public static Optional<LocalDate> convertJodaLocalDateToLocalDate(org.joda.time.LocalDate ld) {        return Optional.ofNullable(ld).map(date -> LocalDate.of(date.getYear(), date.getMonthOfYear(), date.getDayOfMonth()));    }    /**     * Конвертирует {@link LocalDateTime} в Joda {@link DateTime}     * @param ldt дата в формате {@link LocalDateTime}     * @return дата в формате Joda {@link DateTime}     */    public static Optional<DateTime> convertLocalDateTimeToJodaDateTime(LocalDateTime ldt) {        return Optional.ofNullable(ldt)                .map(date -> new org.joda.time.LocalDateTime(date.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()).toDateTime());    }    /**     * Конвертирует {@link LocalDate} в Joda {@link DateTime}     * @param ld дата в формате {@link LocalDate}     * @return дата в формате Joda {@link DateTime}     */    public static Optional<DateTime> convertLocalDateToJodaDateTime(LocalDate ld) {        return Optional.ofNullable(ld)                .map(date ->                        new org.joda.time.LocalDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth()).toDateTimeAtStartOfDay());    }}

"avro.java.string": "String"

Сразу кину ссылку на issue: https://issues.apache.org/jira/browse/AVRO-2702

Если грубо, то в самой схеме, допустим, нет такой строчки. Но maven-плагин пишет в схему внутри модели именно так:

{  "type": "record",  "name": "FromMaven",  "namespace": "com.trthhrts",  "fields": [    {      "name": "message",      "type": {        "type": "string",        "avro.java.string": "String"      }    }  ]}

От этого стратегия наименования значений сабжектов по названию модели - RecordNameStrategy - отрабатывает некорректно, и мы получаем ошибку при десериализации, как будто схема не найдена.

Workaround: добавить "avro.java.string": "String" для каждого string-поля в моделях (тогда будет совпадение с тем, как схему опишет maven-плагин).

Решение совсем неэлегантное, если есть варианты изящнее - с удовольствием обсудим в комментариях.

Выводы

В итоге, мы внедрили Schema Registry, и успешно его используем. Инструмент очень полезный, но есть определенные подводные грабли, которые мы успешно протестировали и отложили в сторону еще на этапе разработки.

Источник: habr.com
К списку статей
Опубликовано: 16.03.2021 14:17:52
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Блог компании альфастрахование

Java

Хранение данных

Kafka

Avro

Confluent

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru