Русский
Русский
English
Статистика
Реклама

First touch of Kafka

Прежде чем начать я бы хотел отметить, что это всего лишь небольшой туториал по быстрому старту для тех кто, как и я, ни разу не использовал Kafka на практике

И так приступим!

Единственный брокер Kafka и необходимый для его работы ZooKeeper я буду запускать в Docker

Сперва создам отдельную сеть kafkanet

docker network create kafkanet

Запуск контейнера с ZooKeeper

docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper

Запуск контейнера с Kafka

docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka

Для того чтобы убедиться в отсутствии ошибок, можно вывести лог docker logs kafka

Далее проверю функционирование брокера Kafka, выполнив простые операции, включающие создание тестовой темы, генерацию сообщений и их потребление

Для этого сценария подключусь к контейнеру kafka

docker exec -it kafka bash

Создам топик demo-topic

/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092

Выведу список всех топиков

/bin/kafka-topics --list --zookeeper zookeeper:2181

И выведу описание созданного топика

/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092

Сгенерирую несколько сообщений

/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092

И после прочитаю эти сообщения

/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092

Далее я создам два небольших .NET приложения: KafkaProducer, которое будет генерировать сообщения, и KafkaConsumer, которое будет потреблять сообщения. Для реализации мне понадобятся пакеты Confluent.Kafka и Microsoft.Extensions.Hosting

В проект KafkaProducer добавлю класс KafkaProducerService

using Confluent.Kafka;using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;using System.Threading;using System.Threading.Tasks;namespace KafkaProducer{    public class KafkaProducerService : IHostedService    {        private readonly ILogger<KafkaProducerService> _logger;        private readonly IProducer<Null, string> _producer;        public KafkaProducerService(ILogger<KafkaProducerService> logger)        {            _logger = logger;            var config = new ProducerConfig            {                BootstrapServers = "localhost:9092"            };            _producer = new ProducerBuilder<Null, string>(config).Build();        }        public async Task StartAsync(CancellationToken cancellationToken)        {            for (var i = 0; i < 5; i++)            {                var value = $"Event N {i}";                _logger.LogInformation($"Sending >> {value}");                await _producer.ProduceAsync(                    "demo-topic",                    new Message<Null, string> { Value = value },                    cancellationToken);            }        }        public Task StopAsync(CancellationToken cancellationToken)        {            _producer?.Dispose();            _logger.LogInformation($"{nameof(KafkaProducerService)} stopped");            return Task.CompletedTask;        }    }}

Изменю файл Program.cs

using Microsoft.Extensions.DependencyInjection;using Microsoft.Extensions.Hosting;using System;namespace KafkaProducer{    class Program    {        static void Main(string[] args)        {            CreateHostBuilder(args).Build().Run();            Console.ReadKey();        }        private static IHostBuilder CreateHostBuilder(string[] args) =>            Host                .CreateDefaultBuilder(args)                .ConfigureServices((context, collection) =>                    collection.AddHostedService<KafkaProducerService>());    }}

В проект KafkaConsumer добавлю класс KafkaConsumerService

using Confluent.Kafka;using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;using System.Threading;using System.Threading.Tasks;namespace KafkaConsumer{    public class KafkaConsumerService : IHostedService    {        private readonly ILogger<KafkaConsumerService> _logger;        private readonly IConsumer<Ignore, string> _consumer;        public KafkaConsumerService(ILogger<KafkaConsumerService> logger)        {            _logger = logger;            var config = new ConsumerConfig            {                BootstrapServers = "localhost:9092",                GroupId = "demo-group",                AutoOffsetReset = AutoOffsetReset.Earliest            };            _consumer = new ConsumerBuilder<Ignore, string>(config).Build();        }        public Task StartAsync(CancellationToken cancellationToken)        {            _consumer.Subscribe("demo-topic");            while (!cancellationToken.IsCancellationRequested)            {                var consumeResult = _consumer.Consume(cancellationToken);                _logger.LogInformation($"Received >> {consumeResult.Message.Value}");            }            return Task.CompletedTask;        }        public Task StopAsync(CancellationToken cancellationToken)        {            _consumer?.Dispose();            _logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");            return Task.CompletedTask;        }    }}

Изменю файл Program.cs

using Microsoft.Extensions.DependencyInjection;using Microsoft.Extensions.Hosting;using System;namespace KafkaConsumer{    class Program    {        static void Main(string[] args)        {            CreateHostBuilder(args).Build().Run();            Console.ReadKey();        }        private static IHostBuilder CreateHostBuilder(string[] args) =>            Host                .CreateDefaultBuilder(args)                .ConfigureServices((context, collection) =>                    collection.AddHostedService<KafkaConsumerService>());    }}

Результат работы приложений (ссылка на репозиторий)

Источник: habr.com
К списку статей
Опубликовано: 23.02.2021 12:21:01
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Net

Kafka

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru