Русский
Русский
English
Статистика
Реклама

Services

Принцип слоеного теста

23.12.2020 16:22:50 | Автор: admin
Всем неустрашимым на пути от отрицания до убеждения посвящается


image

Среди разработчиков бытует справедливое мнение, что если программист не покрывает код тестами, то попросту не понимает зачем они нужны и как их готовить. С этим трудно не согласиться, когда уже понимаешь о чем речь. Но как достичь этого драгоценного понимания?

Не судьба...


Так получается, что зачастую самые очевидные вещи не имеют внятного описания среди тонн полезной информации в глобальной сети. Этакий очередной соискатель решает разобраться с актуальным вопросом что такое unit тесты и натыкается на уйму подобных примеров, которые словно калька копируются из статьи в статью:

у нас есть метод, который подсчитывает сумму чисел

public Integer sum(Integer a, Integer b) {
return a+b
}

на данный метод можно написать тест

Test
public void testGoodOne() {
assertThat(sum(2,2), is(4));
}


Это не шутка, это упрощенный пример из типичной статьи про технологию unit тестирования, где в начале и конце общие фразы про пользу и необходимость, а в середине такое
Увидев такое, и перечитав для верности дважды, соискатель восклицает: Что за лютый бред?.. Ведь, у него в коде практически нет методов, которые все необходимое получают через аргументы, а затем отдают однозначный результат по ним. Это типичные утилитарные методы, и они практически не меняются. А как быть со сложными процедурами, с внедренными зависимостями, с методами без возврата значений? Там это подход не применим от слова совсем.

Если на этом этапе упорный соискатель не машет рукой и погружается дальше, то вскоре обнаруживает, что для зависимостей используются МОКи, для методов которых определяется некоторое условное поведение, фактически заглушка. Тут у соискателя может снести крышу окончательно, если рядом не найдется доброго и терпеливого мидла/сеньора готового и умеющего все разъяснить Иначе, соискатель истины совершенно теряет смысл того, что такое unit тесты, поскольку большая часть тестируемого метода оказывается некоей мок-фикцией, и что в таком случае тестируется непонятно. Тем более непонятно, как это организовать для большого, многослойного приложения и зачем такое нужно. Таким образом вопрос в лучшем случае откладывается до лучших времен, в худшем прячется в ящик прОклятых вещей.

Самое обидное, что технология покрытия тестами элементарно проста и доступна каждому, а польза ее настолько очевидна, что любые отговорки выглядят наивно для знающих людей. Но чтобы разобраться новичку не хватает какой-то самой малости, элементарной сути, словно щелчка переключателя.

Ключевая миссия.



Для начала, предлагаю сформулировать в двух словах ключевую функцию(миссию) unit тестов и ключевой выигрыш. Тут возможны разные живописные варианты, но предлагаю рассмотреть этот:

Ключевая функция unit тестов зафиксировать ожидаемое поведение системы.

и этот:

Ключевой выигрыш unit тестов возможность прогнать весь функционал приложения за считанные секунды.

Рекомендую запомнить это для собеседований и немножко поясню. Любой функционал подразумевает правила использования и результаты. Эти требования приходят от бизнеса, через системную аналитику и реализуются в коде. Но код постоянно развивается, приходят новые требования и доработки, которые могут незаметно и неожиданно изменить что-то в готовом функционале. Именно тут на страже стоят unit тесты, которые фиксируют утвержденные правила, по которым должна работать система! В тестах фиксируется сценарий, который важен для бизнеса, и если после очередной доработки тест падает, значит, что-то упущено: либо ошибся разработчик или аналитик, либо новые требования противоречат существующим и следует их уточнять и т.д. Самое главное сюрприз не проскочил.

Простой, стандартный unit тест позволил на самых ранних этапах обнаружить неожиданное, и вероятно нежелательное поведение системы. Между тем, система растет и ширится, вероятность упущения ее деталей тоже растет, и лишь сценарии unit тестов способны все помнить и вовремя предотвращать незаметные уходы в сторону. Это очень удобно и надежно, а главное удобство быстрота. Приложение не нужно даже запускать и бродить по сотням его полей, форм или кнопок, требуется запустить выполнение тестов и за считанные секунды получить либо полную готовность либо баг.

Итак запомним: зафиксировать ожидаемое поведение в виде сценариев unit тестов, и моментально прогнать приложение без его запуска. Эта та безусловная ценность, которую позволяют достичь unit тесты.

Но, черт возьми, как?



Перейдем к самому интересному. Современные приложения активно избавляются от монолитности. Микросервисы, модули, слои основные принципы организации рабочего кода, позволяющие достигать независимости, удобства повторного использования, обмена и переноса в системы и т.д. В нашей теме имеет ключевое значение именно слоистая структура и внедрение зависимостей.

Рассмотрим слои типичного веб- приложения: контроллеры, сервисы, репозитории и т.п. Кроме того используются слои утилит, фасадов, моделей и DTO. Два последних не должны содержать функционала, т.е. методов кроме аксессоров(геттеры/сеттеры), поэтому покрывать их тестами не нужно. Остальные слои мы рассмотрим как цели для покрытия.
Как не напрашивается это вкусное сравнение, приложение нельзя сравнить со слоеным тортом по той причине, что слои эти внедряются друг в друга, как зависимости:
  • контроллер внедряет в себя сервис/ы, к которым обращается за результатом
  • сервис внедряет в себя репозитории (DAO), может внедрять утилитарные компоненты
  • фасад предназначен для комбинирования работы множества сервисов или компоненты, соответственно внедряет в себя их


Основная идея покрытия тестами всего этого добра в целом приложении: покрытие каждого слоя независимо от остальных слоев. Отсылка к независимости и прочим фишкам антимонолитности. Т.е. если в тестируемый сервис внедрен репозиторий этот гость мокается в рамках тестирования сервиса, но персонально тестируется по честному уже в рамках теста репозитория. Таким образом, создаются тесты для каждого элемента каждого слоя, никто не забыт все при делах.

Принцип слоеного теста.



Перейдем к примерам, простое приложение на Java Spring Boot, код будет элементарный, так что суть легко понятна и аналогично применима для других современных языков/фреймворков. Задача у приложения будет простая умножить число на 3, т.е. утроить (англ. triple), но при этом мы создадим многослойное приложение с внедрением зависимостей (dependency injection) и послойным покрытием с головы до пят.

image

В структуре созданы пакеты для трех слоев: controller, service, repo. Структура тестов аналогична.
Работать приложение будет так:
  1. с фронт-энда на контроллер приходит GET запрос с идентификатором числа, которое требуется утроить.
  2. контроллер запрашивает результат у своей зависимости сервиса
  3. сервис запрашивает данные у своей зависимости репозитория, умножает и возвращает результат контроллеру
  4. контроллер дополняет результат и возвращает на фронт-энд


Начнем с контроллера:

@RestController@RequiredArgsConstructorpublic class SomeController {   private final SomeService someService; // dependency injection   static final String RESP_PREFIX = "Результат: ";   static final String PATH_GET_TRIPLE = "/triple/{numberId}";   @GetMapping(path = PATH_GET_TRIPLE) // mapping method to GET with url=path   public ResponseEntity<String> triple(@PathVariable(name = "numberId") int numberId) {       int res = someService.tripleMethod(numberId);   // dependency call       String resp = RESP_PREFIX + res;                // own logic       return ResponseEntity.ok().body(resp);   }}


Типичный рест контроллер, имеет внедрение зависимости someService. Метод triple настроен на GET запрос по URL "/triple/{numberId}", где в переменной пути передается идентификатор числа. Сам метод можно разделить на две основные составляющие:

  • обращение к зависимости запрос данных извне, либо вызов процедуры без результата
  • собственная логика работа с имеющимися данными


Рассмотрим сервис:

@Service@RequiredArgsConstructorpublic class SomeService {   private final SomeRepository someRepository; // dependency injection   public int tripleMethod(int numberId) {       Integer fromDB = someRepository.findOne(numberId);  // dependency call       int res = fromDB * 3;                               // own logic       return res;   }}


Тут подобная ситуация: внедрение зависимости someRepository, а метод состоит из обращения к зависимости и собственной логики.

Наконец репозиторий, для простоты выполнен без базы данных:

@Repositorypublic class SomeRepository {   public Integer findOne(Integer id){       return id;   }}


Условный метод findOne якобы ищет в базе данных значение по идентификатору, но попросту возвращает тот же integer. На суть нашего примера это не влияет.

Если запустить наше приложение, то по настроенному url можно увидеть:



Работает! Многослойно! В прод :)
Ах да, тесты
Немного о сути. Написание тестов тоже процесс творческий! Поэтому совершенно не уместна отговорка я разработчик, а не тестер. Хороший тест, как и хороший функционал требует изобретательности и красоты. Но прежде всего, необходимо определить элементарную структуру теста.

Тестирующий класс содержит методы, тестирующие методы целевого класса. Минимум того, что должен содержать в себе каждый тестирующий метод это вызов соответствующего метода целевого класса, условно говоря так:

@Test    void someMethod_test() {        // prepare...        int res = someService.someMethod();                 // check...    }


Этот вызов могут окружать Подготовка и Проверка. Подготовка данных, в том числе входящих аргументов и описание поведения моков. Проверка результатов обычно сравнение с ожидаемым значением, помните про фиксацию ожидаемого поведения? Итого, тест это сценарий, который моделирует ситуацию и фиксирует, что она прошла ожидаемо и вернула ожидаемые результаты.

На примере контроллера попробуем подробно изобразить базовый алгоритм написания теста. Прежде всего, целевой метод контроллера принимает параметр int numberId, добавим его в наш сценарий:

int numberId = 42; // input path variable


Этот же numberId транзитом передается на вход методу сервиса, и тут самое время обеспечить сервис-мок:

@MockBeanprivate SomeService someService;


Собственный код метода контроллера работает с результатом полученным от сервиса, имитируем этот результат, а также вызов который его возвращает:

int serviceRes = numberId*3; // result from mock someService// prepare someService.tripleMethod behaviorwhen(someService.tripleMethod(eq(numberId))).thenReturn(serviceRes);


Эта запись означает: когда будет вызван someService.tripleMethod с аргументом равным numberId, вернуть значение serviceRes.
Кроме того, эта запись фиксирует факт, что данный метод сервиса должен быть вызван, что важный момент. Бывает что требуется зафиксировать вызов процедуры без результата, тогда используется иная запись, условно такая не делать ничего когда...:

Mockito.doNothing().when(someService).someMethod(eq(someParam));


Повторюсь, здесь лишь имитация работы someService, честное тестирование с детальной фиксацией поведения someService будет реализовано отдельно. Более того, тут даже не важно, чтобы значение именно утроилось, если мы напишем

int serviceRes = numberId*5; 


это не сломает текущий сценарий, т.к. тут фиксируется не поведение someService, а поведение контроллера, принимающего результат someService как должное. Это совершенно логично, ведь целевой класс не может отвечать за поведение внедряемой зависимости, а вынужден ей доверять.

Итак мы определили поведение мока в нашем сценарии, следовательно при выполнении теста, когда внутри вызова целевого метода дело дойдет до мока, он вернет что попросили serviceRes, и дальше с этим значением будет работать собственный код контроллера.
Далее помещаем в сценарий вызов целевого метода. Метод контроллера имеет особенность он не вызывается в коде явно, а привязан через HTTP метод GET и URL, поэтому в тестах вызывается через специальный тестовый клиент. В Spring это MockMvc, в других фреймворках есть аналоги, например WebTestCase.createClient в Symfony. Итак, далее просто выполнение метода контроллера через маппинг по GET и URL.

       //// mockMvc.perform       MockHttpServletRequestBuilder requestConfig = MockMvcRequestBuilders.get(SomeController.PATH_GET_TRIPLE, numberId);       MvcResult mvcResult = mockMvc.perform(requestConfig)           .andExpect(status().isOk())           //.andDo(MockMvcResultHandlers.print())           .andReturn()       ;//// mockMvc.perform


Там же заодно проверяется, что такой маппинг вообще существует. Если вызов удачен доходит дело до проверки-фиксации результатов. Например можно зафиксировать сколько раз был вызван метод мока:

// check of callingMockito.verify(someService, Mockito.atLeastOnce()).tripleMethod(eq(numberId));


В нашем случае это избыточно, т.к. его единственный вызов мы уже зафиксировали через when, но иногда это способ уместен.

А теперь главное мы проверяем поведение собственного кода контроллера:

// check of resultassertEquals(SomeController.RESP_PREFIX+serviceRes, mvcResult.getResponse().getContentAsString());


Тут мы зафиксировали то, за что отвечает сам метод что результат полученный от someService конкатенируется с префиксом контроллера, и именно эта строка уходит в тело response. Кстати, воочию в содержимом Body можно убедиться, если раскомментировать строку

//.andDo(MockMvcResultHandlers.print())


но обычно эта печать в консоль используется лишь как вспомогательная при отладке.

Таким образом у нас получился тестовый метод в тестовом классе контроллера:

@WebMvcTest(SomeController.class)class SomeControllerTest {   @MockBean   private SomeService someService;   @Autowired   private MockMvc mockMvc;   @Test   void triple() throws Exception {       int numberId = 42; // input path variable       int serviceRes = numberId*3; // result from mock someService       // prepare someService.tripleMethod behavior       when(someService.tripleMethod(eq(numberId))).thenReturn(serviceRes);       //// mockMvc.perform       MockHttpServletRequestBuilder requestConfig = MockMvcRequestBuilders.get(SomeController.PATH_GET_TRIPLE, numberId);       MvcResult mvcResult = mockMvc.perform(requestConfig)           .andExpect(status().isOk())           //.andDo(MockMvcResultHandlers.print())           .andReturn()       ;//// mockMvc.perform       // check of calling       Mockito.verify(someService, Mockito.atLeastOnce()).tripleMethod(eq(numberId));       // check of result       assertEquals(SomeController.RESP_PREFIX+serviceRes, mvcResult.getResponse().getContentAsString());   }}


Теперь настало время честного теста метода someService.tripleMethod, где аналогично есть вызов зависимости и собственный код. Готовим произвольный входящий аргумент и имитируем поведение зависимости someRepository:
int numberId = 42;when(someRepository.findOne(eq(numberId))).then(AdditionalAnswers.returnsFirstArg());


Перевод: когда будет вызван someRepository.findOne с аргументом равным numberId, вернуть тот же аргумент. Аналогичная ситуация тут мы не проверяем логику зависимости, а верим ей на слово. Мы лишь фиксируем вызов зависимости в пределах данного метода. Принципиальна тут собственная логика сервиса, его зона ответственности:

assertEquals(numberId*3, res);


Фиксируем, что значение полученное от репозитория должно быть утроено собственной логикой метода. Теперь данный тест стоит на страже этого требования:

@ExtendWith(MockitoExtension.class)class SomeServiceTest {   @Mock   private SomeRepository someRepository; // то, что мокируем   @InjectMocks   private SomeService someService; // куда внедряем то, что мокируем   @Test   void tripleMethod() {       int numberId = 42;       when(someRepository.findOne(eq(numberId))).then(AdditionalAnswers.returnsFirstArg());       int res = someService.tripleMethod(numberId);       assertEquals(numberId*3, res);   }}


Поскольку репозиторий у нас условно-игрушечный, то и тест получился соответствующий:

class SomeRepositoryTest {   // no dependency injection   private final SomeRepository someRepository = new SomeRepository();   @Test   void findOne() {       int id = 777;       Integer fromDB = someRepository.findOne(id);       assertEquals(id, fromDB);   }}


Однако и тут весь скелет на месте: подготовка, вызов и проверка. Таким образом корректная работа someRepository.findOne зафиксирована.

Реальный репозиторий требует тестирования с поднятием базы данных в памяти или в тест-контейнере, миграции структуры и данных, иногда вставки тестовых записей. Зачастую это самый длительный слой тестирования, но не менее важный, т.к. фиксируется успешная миграция, сохранение моделей, корректная выборка и т.д. Организация тестирования базы данных выходит за рамки данной статьи, но как раз она подробно описана в мануалах. Внедрения зависимостей в репозитории нет и не нужно, его задача работа с базой данных. В нашем случае, это был бы тест с предварительным сохранением записи в базу и последующим поиском по id.

Таким образом мы добились полного покрытия всей цепочки функционала. Каждый тест отвечает за работу собственного кода и фиксирует вызовы всех зависимостей. Чтобы протестировать приложение не требуется запускать его с полным поднятием контекста, это тяжело и долго. Сопровождение функционала быстрыми и легкими модульными тестами создает комфортную и надежную рабочую среду.

Кроме того, тесты повышают качество кода. В рамках независимого тестирования слоями часто приходится пересмотреть подход к организации кода. Например в сервисе метод создан метод first, он не маленький, он содержит и собственный код и моки, и, допустим, дробить его не имеет смысла, он покрыт тестом/ми по полной программе определены все подготовки и проверки. Затем кто-то решает добавить в сервис метод second, в котором вызывается метод first. Вроде некогда обычная ситуация, но когда доходит до покрытия тестом что-то не складывается Для метода second придется описывать и сценарий second и дублировать сценарий подготовки first? Ведь не получится замокать метод first самого тестируемого класса.

Возможно, в таком случае уместно задуматься об иной организации кода. Есть два противоположных подхода:
  • вынести метод first в компонент-утилиту, которая внедряется как зависимость в сервис.
  • вынести метод second в некий сервис-фасад, который комбинирует разные методы внедренного сервиса или даже нескольких сервисов.

Оба эти варианта хорошо укладываются в принцип слоев и удобно тестируются с мокированием зависимостей. Прелесть в том, что каждый слой отвечает за свою работу, а все вместе они создают прочный каркас неуязвимости целой системы.

На дорожку...



Вопрос для собеседования: сколько раз в рамках тикета разработчик должен запускать тесты? Сколько угодно, но как минимум дважды:
  • перед началом работы, чтобы убедиться что все OK, а не выяснять потом, что уже было сломано, а не ты сломал
  • по окончании работы

Так зачем же писать тесты? Затем, что не стоит и пытаться в большом, сложном приложении все упомнить и предусмотреть, это нужно возложить на автоматику. Разработчик не владеющий авто-тестированием не готов участвовать в большом проекте, это сразу выявит любой собеседующий.

Поэтому, рекомендую развивать эти навыки, если желаете претендовать на высокие зарплаты. Начать эту увлекательную практику можно с базовых вещей, а именно в рамках любимого фреймворка научиться тестировать:
  • компоненты с внедренными зависимостями, приемы мокирования
  • контроллеры, т.к. есть нюансы вызова энд-пойнта
  • DAO, репозитории, включая поднятие тестовой базы и миграции


Я надеюсь, что эта концепция слоеного теста помогла понять технику тестирования сложных приложений и почувствовать какой гибкий и мощный инструмент подарен нам для работы. Разумеется чем лучше инструмент, тем более мастерской работы он требует.
Приятной работы и высокого мастерства!

Код примера доступен по ссылке на github.com: https://github.com/denisorlov/examples/tree/main/unittestidea
Подробнее..

Как я решил протестировать нагрузочную способность web сервера

20.04.2021 10:21:38 | Автор: admin

В одно прекрасное утро вдруг пришла "гениальная" мысль а не протестировать ли мне что ни будь? Посмотрев по сторонам, на глаза мне попался он - герой этой статьи.

Это была вступительная минутка юмора, а если серьёзно то вопрос о тестировании web сервера стоял уже давно. Какого-то комплексного решения не было, поэтому я решил сделать хоть какие то первые шаги. Получить первые цифры. Благодаря которым можно было бы уже предметно разговаривать и продумывать дальнейшую стратегию.

И так дано - web сервер. Написан на .net core. Сервер используется в корпоративной разработке.

Посмотреть, как работает можно, например здесь бесплатный сервис хранение ссылок http://linkin.link. Про него я писал тут http://personeltest.ru/aways/habr.com/ru/users/developer7/posts

Вступление

Собственно, как тестировать web сервер? Если посмотреть на проблему в лоб то веб сервер должен отдавать все страницы, которые были запрошены клиентами. И желательно отдавать быстро.

Конечно, есть множество других сценариев. Ведь web сервер не работает сам по себе, как правило, он как минимум работает с какой-нибудь БД, связь с которой это отдельная большая тема. Опять же могут быть множество отдельных сервисов автаризационных и прочих. Перед web сервером может стоять балансировщик нагрузок. Всё это может размещается в множестве виртуальных контейнеров, которые опять же могут быть на разных физических серверах. В общем это целый зоопарк каналов передачи данных каждый из которых может стать узким горлышком.

Начинать надо с малого и для начала решил проверить самый простой и очевидный сценарий. Есть web сервер. Делаем на него запросы. Все запросы должны вернутся без ошибок и вовремя.

Задача поставлена. Тестировать решил так. На одной машине под windows 10 запускаю web сервер. На web сервере запущен сайт. На сайте размещено куча js, сss, mp4 файлов и, собственно, html страничка. Для простоты я просто взял страницу из готового сайта.

Чем досить сервер? Тут 2 пути скачать что-то готовое или написать свой велосипед. Я решил остановится на втором варианте. И этот выбор я сделал по нескольким причинам.

Во-первых, это интереснее. Во-вторых, своя программа обладает большой гибкость, любой тестовый сценарий можно внедрять, как только он пришёл в голову. Да и, по собственным размышлениям, это займёт меньше времени чем поиск другой программы, скачивания, изучения интерфейса. А потом ещё столкнуться с фактом, что тот функционал, который есть, не подходит.

Httpdos

Сказано сделано.

Программа работает так - при старте читается файл urls.txt где занесены url которые надо скачивать. Далее нажимаем старт. Создаются список Task по количеству url. Каждый Task открывает socket, отправляет http запрос, получает данные, закрывает socket. Далее процедура повторяется.

Изначально использовал TcpClient но по каким-то причинам он выдавал почти сразу на старте множество ошибочных загрузок. Не стал разбираться, а спустился пониже. Реализовал отправку на socket.

В программу добавил таблицу url, количество удачных скачиваний, количество ошибок, размер закачки.

Ошибки считал по 2 сценариям. Все exception связанные с открытием, отсылкой, приёмом данных. Собирательно если хоть, где что-то не так убиваем socket, инкрементируем счётчик ошибок. Ещё добавил проверку длины данных. При первой закачке страницы запоминается длина данных, все повторные закачки сверяются с этим числом. Отклонение считается ошибкой. Можно было бы добавить и что-то другое но для начала мне и этого достаточно.

Всё для эксперимента готово. Запускаем web сервер. Запускаем httpdos так назвал программу, чтобы дальше можно было использовать это короткое название. И вижу следующую картину.

Тут я уточню некоторые технические данные. Количество потоков делающие запросы получилось 1200. Эта цифра количество url прочитанных из файла urls.txt, плюс я решил умножить все запросы в 20 раз. Все цифры взял из головы на момент написания программы. В любой момент можно поставить любые другие по желанию. Преимущество велосипеда.

Так же в последствии я добавил сбоку textarea куда вывожу все exception, и ещё решил добавить вывод - количество запросов в секунду.

Картинка меня порадовала. Во первых - всё работает ). Во вторых работает без ошибок. Количество обработанных запросов получилось где-то 4000-6000 в секунду.

Откуда такая цифра? По моим размышлениям она зависит от многих обстоятельств. Самая очевидное это какого размера сами запросы. Как я писал выше я просто скачиваю все данные с определённой web страницы, которая была взята из стороннего web проекта. И там много mp4 файлов, размер которых под 3 мегабайта. Если уменьшить размер запросов, например скачивать только css наверняка количество обработанных запросов увеличится. Мне даже стало интересно, и я начал играть с исходным кодом как со стороны web сервера, так и со стороны httpdos. Там есть куча различных таймеров, буферов и прочего. Я смотрел, как то или иное изменение, окажет влияние на скорость.

Но не буду писать об этом. Да и цель была проверить отказоустойчивость, а не скорость. Простая проверка для начала - зависнет ли сервер от dos атаки или просто будет медленнее обрабатывать запросы.

Так же существенное влияние на скорость оказывало, то что web сервер и httpdos был запущен в режиме отладки в Visual Studio.

Поработав пару минут ни одной ошибки. Посмотрел загрузку процессора диспетчер задач показал примерно 28% на web сервер и 20% на httpdos. Процессор стоит i7-8700k. Не разогнанный. Это 6 ядерный 12 поточный камень. В процессе работы куллер охлаждения не было слышно проц холодный. Специально температуру не смотрел.

Решил параллельно с httpdos сделать загрузки js файла через браузер. Файл закачивается мгновенно. Т.е. httpdos не оказывает существенного влияния на web сервер.

Продолжаю эксперименты. Решил запустить три программы httpdos одновременно. И понаблюдать за результатами. То, что я увидел, и я является причиной появления этой статьи.

А у видел я, что по каким-то причинам спустя примерно минуту начали появляться ошибки загрузок.

И я начал расследование.

Такого поведения просто не должно быть!

Забегая вперёд, скажу, что даже не это побудило меня к дальнейшему расследованию. Ну пусть по каким-то причинам пару запросов из сотен тысяч не дойдут. Из моей практики браузеры делают большое число одних и тех же запросов пока не получат запрошенные данные. Клиенты этого даже не заметят.

То, что произошло дальше заставило меня напрячься.

В процессе экспериментов я и так и сяк изгалялся над httpdos. И один сценарий привёл к неожиданным результатам.

Если запустить не менее 3 программ. Думаю, что такое количество связанно с количеством одновременных запросов в секунду. Начать dos атаку. Дождаться появления ошибок. Потом резко закрыть программы. То слушающий socket web сервера просто умирает! Вот так нету никакой ошибки на стороне сервера. Все потоки работают. А socket ничего не принимает. Это уже ни в какие ворота.

Эксперименты показали, что socket оживал примерно через 4 минуты, но, если dos атаку проводить долго socket умирал навсегда, по крайней мере я минут 15 ждал оживления, а дальше уже и не интересно было. Такого поведения просто не должно быть!

Эксперимент был проведён множество раз результат всегда один и тот же.

Если перезапускать web сервер, т.е. получается мы пересоздаём слушающий socket, то socket начинал принимать клиентов сразу.

Первый шаг

Первое что я сделал - решил попробовать получить более подробную информацию из exсeption на стороне httpdos. Полазив по интернету, нашёл что мне нужен SocketException, а в нём посмотреть свойство ErrorCode. Сделано. Получил код ошибки 10061 - WSAECONNREFUSED. Тут пояснение.

В соединении отказано.

Невозможно установить соединение, потому что целевой компьютер активно отказался от него. Обычно это происходит в результате попытки подключиться к службе, которая неактивна на чужом хосте, то есть к службе, на которой не запущено серверное приложение.

Ну информативно(сарказм) однако. Вроде как это стоит понимать, что socket, к которому мы хотим подключится как бы и нету.

Запускаем консоль. Вводим netstat -an и видим. Вот он родненький. Слушает 80 порт. Ну по крайней мере система так думает.

И что делать далее? В таком случае начинаю искать хоть какие-то зацепки, хоть какие-то отклонения от ожидаемого поведения. Поэтому придумываю кучу сценариев, обкладываю их отладчиками, трассировкой, сниффингом и прочим и наблюдаю.

Пару слов о socket.

В веб сервере используется самый стандартный способ открытия и работы с socket в .net. Вот пример кода:

Socket listenSocket = new Socket(AddressFamily.InterNetwork,                                 SocketType.Stream, ProtocolType.Tcp){NoDelay = true,Blocking = false,ReceiveBufferSize = TLSpipe.TLSPlaintext_max_recive,SendBufferSize = TLSpipe.TLS_CHUNK};//~~~listenSocket.Bind(new IPEndPoint(point.ip, port.port));Socket socket = await listenSocket.AcceptAsync();

Как видно для работы с слушающим socket нужно 3 действия. Создали. Слушаем. Принимаем клиентов. Схема настолько простая что программисту не остаётся никаких способов для манёвров, ну или возможностей отстрелить себе ногу.

Предполагается что используй вот так (куда ещё проще?) и не иначе и всё у тебя должно работать. Но как показывает практика есть нюансы.

Хоть схема использования и проста, я всё же решил провести пару экспериментов что бы отсеять весь лишний код из подозрений. Первое что я сделал это очистил код от лишних действий и поставил точку остановки сразу после конструкции:

Soсket socket = await listenSocket.AcceptAsync();

Затем подвесил socket. Попытались присоединится клиентом. Программа висела в вышеприведённой строчки кода. Тут мы увидели, что проблема не в коде после socket, а где то внутри socket.

Дальше я решил поэкспериментировать с настройками открытия socket. Собственно, в своё время все настройки и так были исследованы, и оставлены только те, что нужно. Но в свете последних событий никто не уйдёт от (подозрений) экспериментов.

Под подозрения попали следующие настройки.

  1. ReceiveTimeout

  2. SendTimeout

  3. Ttl

Конечно, если подумать ни одно из этих свойств не должно вызывать эффект, описанный выше, но в принципе вообще ничего не должно убивать socket. Но что-то же убивает. Да и провести этот эксперимент занимает десятки секунд, гораздо меньше, чем я этот текст пишу.

И вообще было проведено большое число разных спонтанных экспериментов. Пришла в голову идея. Изменил код. Запустил. Проверил. Забыл. Десятки секунд, не более.

Что касается тайм аутов. В коде сервера реализованы различные таймауты, но на более высоком уровне. В приёмнике есть таймауты на приём шапки http, определения длины body, расчёт времени на приём body исходя из длины body и сценария наихудшего канала связи, например 1024 кб/c.

Примерно такие же правила и на отправку.

И если таймауты выходят socket клиента закрывается и удаляется. Для socket вызывается shutdown и close. Всё как и предписывает microsoft.

Поставил таймауты для resive/transmite 2000ms. Не помогло. Идём далее.

Ttl

Что это за параметр? Wiki говорит:

Получает или задает значение, задающее время существования (TTL) IP-пакетов

Т.е. при прохождении очередного шлюза параметр уменьшается на 1. При достижении 0 пакет удаляется. И вроде это не наш случай. Потому как наша система вся на localhost. Но! При гугление я нашёл следующую информацию.

Там было сказано, что windows от этого параметра рассчитывает двойное время нахождения socket в режиме TIME_WAIT. Про этот режим более подробно ниже.

Поставил ttl в минимально возможное значение. Не помогло.

Утечка sockets?

Далее я подумал сервер открывает каждую секунду около 6000 тысяч sockets и закрывает их. И всё это крутится на кучи асинхронных Task. Вдруг количество открытых сокетов и закрытых не совпадает? И есть, некая утечка sockets?

Быстро набросав код принцип которого заключается в инкременте глобальной переменной при открытии socket и декременте при закрытии и вывода этого числа в консоль.

Весь дополнительный код состоял из 3 блоков:

Interlocked.Increment(ref Program.cnt);Interlocked.Decrement(ref Program.cnt);Task.Run(async () => { while (true) {await Task.Delay(1000);Console.WriteLine(Program.cnt); });}

Разумеется, каждый блок размещается в нужном месте.

Запустив программы, я получил следующее:

Видео можно разделить на три части. Запуск. Рабочее состояние длится около 3 минут. В этом состоянии ни одной ошибки. И первое появление ошибки. Собственно, вторую часть я вырезал как не интересную.

Как же нам интерпретировать результат? Во-первых, мы видим число примерно 400 выводящееся каждую секунду.

Как работает веб сервер? Идеальный сервер приняв клиента мгновенно формирует ответ, отправляет его клиенту и мгновенно закрывает socket (хотя в реальности socket не закрывается а ожидает следующих запросов, ну условимся что так работает идеальный сервер). Поэтому количество открытых socket в идеальном сервере каждую секунду было бы 0. Но тут мы видим, что каждую секунду у нас примерно 400 обрабатывающихся клиентов. Что ж, для неидеального сервера вполне норма. Вообще количество одновременных клиентов в нашем сервере задаётся глобальной настройкой. В данном случае 10000 что значительно выше 400.

Так же мы видим, что периодически подпрыгивающее значение до 1000-2000. Связанно это может быть с чем угодно. При желании можно и это выяснить. Может сборщик мусора, может что ещё. Но, собственно, ничего криминального в этом нет.

И вот появились первые ошибки. Я начал закрывать программы. На видео виден характерный момент при закрытии второй программы, в последней оставшейся программе, посыпались ошибки открытия socket. Это в очередной раз показал своё лицо баг который мы так пытаемся отловить, пока безуспешно. Одна радость повторяется он регулярно, поэтому хоть есть возможность поиска. Сама же эта ошибка нам пока ничего не даёт.

Главная цель текущего эксперимента сопоставить количество открытых и закрытых socket. Окончательная цифра 0. Всё, как и должно быть. Ладно идём дальше.

Динамический диапазон портов

Далее расследование завело меня в область настроек windows. Должны же быть какие-то настройки для работы tcp/ip стека? Гугление мне подкинуло множество, но особо я хотел бы остановится на наиболее подходящих к нашему случаю.

Собственно настройки tcp/ip стека для windows меня интересовали с самого начала. Я задавал себе вопросы - а какие вообще порты выделяются на клиенте? Да и количество портов как бы ограничено. Всего 65535 значений. Такое число обусловлено исторически переменной uint16 в протоколе TCP.

Если выделять тысячи портов в секунду то можно и исчерпать их всех.

Я даже вначале проверил следующий кейс. Изначально в httpdos использовался стандартный для .net способ открытия socket клиента.

new TcpClient().Connect("127.0.0.1", 80);

Такой способ выдаёт socket с портом из динамического пула windows. Т.е. windows сама решает какой порт тебе выдать. Я поменял этот код на другой где я сам указывал порты. Но первый запуск показал, что диапазон выбранных мною портов был нашпигован как изюм открытыми портами, плюс доступ к некоторым портам выдавал странные ошибки о некорректном доступе. Это заставляло меня усложнять программу делать код поиска открытых портов

Поискав готовое решение я увидел, что это не простой способ в одну строчку. И я не стал развивать эту тему дальше. Windows виднее кокой мне порт дать.

Но какой же диапазон портов мне выдаёт Windows? Тут я узнал ответ. Честно ещё в десятке других мест. Правда в большинстве информация была устаревшая - мол диапазон 1025 до 5000. Но я из практики знал, что диапазон выделяется где-то от 50000.

В дальнейшем я и убедился в своей правоте. Как оказалось Microsoft изменила этот диапазон, и он составляет 49152-65535. И того где-то 15k портов. Явно меньше, чем 65535

Поэтому первая настройка увеличиваем этот диапазон.

Я применил следующую команду:

netsh int ipv4 set dynamicport tcp start=10000 num=55535

Команду запускаем под администратором. Проверить значение можно командой:

netsh int ipv4 show dynamicport tcp

Изменить то изменили но как проверить, что диапазон действительно расширен? Да и нужно ещё узнать повлияла ли эта настройка на наш баг.

Делаем вывод на стороне сервер, всех портов подключившихся клиентов. Ну и пытаемся увидеть какие-нибудь отклонения от нормы. Вдруг кусок выпадет из списка?

Добавляем код - 1 строка:

Console.WriteLine(((IPEndPoint)socket.RemoteEndPoint).Port);

Что показало нам запуск? При старте одной программы вылетела ошибка одного подключения. Не будем на это обращать внимание. А то так можно закопаться по уши в отладке. Дальше работало всё, как и ожидалось. Во-первых, мы чётко увидели, что диапазон выделяемых портов действительно стал 10000-65535. Во-вторых, порты выделяются последовательно без каких-то видимых провалов. Достигнув максимума, цикл повторяется. Я прогнал несколько циклов никаких отклонений.

Далее я начал закрывать программы и при закрытии второй программы серверный socket завис. Порты перестали выделятся. Во второй программе httpdos посыпались ошибки открытия socket.

Выводы проблема не в портах. По крайней мере порты явно не заканчиваются.

TIME WAIT

А что у нас есть вообще по протоколу TCP? Идём сюда и внимательно читаем.

Под подозрение попадает одно состояние TIME WAIT. Это одно из стояний, в котором может находится socket, т.е. пара ip+port. После его закрытия.

Получается, что мы закрыли socket но он ещё будет недоступен. Поиск показал, что это время находится в районе 4 минут.

Если бы мы играли в игру холодно-горячо, то я бы заорал горячо! Горячо! Похоже это именно наш случай.

Но зачем такое странное поведение. Опять же из справки получил пояснение. Т.к. протокол tcp/ip гуляет по сети пакетами, а сами пакеты могут пойти по разным маршрутам. И вообще застрять на некоторое время на каком-нибудь шлюзе, может получится ситуация, когда мы открыли socket, поработали с удалённым сервером. Закрыли socket. Потом этот socket открывает другая программа а ей начинают валится пакеты от предыдущей сессии. Как раз время таймаута и выбрано что бы отставшие все пакеты в сети удалились как мусорные.

OK, а можно ли изменить этот параметр? Оказывается в Windows есть целая ветка реестра, отвечающая за параметры tcp протокола.

HKEY_LOCAL_MACHINE \SYSTEM \CurrentControlSet \Services: \Tcpip \Parameters

Нас интересуют пока 2 параметра:

  1. TcpFinWait2Delay

  2. TcpTimedWaitDelay

Цель установить минимальное значение. Минимальное значение 30. Что означает что через 30 секунд порт опять будет доступен. Устанавливаем. Ну а далее наша стандартная проверка.

Запускаем сервер. Запускаем клиенты. Досим. Дожидаемся отказа socket. Потом запускаем консоль и вводим команду:

netstat -an

Такое ощущение что весь выделенный диапазон портов находится в состоянии TIME WAIT. Это пока соответствует ожиданиям. Ждём 30с. Повторяем команду. Листинг уменьшился в разы. Все порты с WAIT TIME пропали.

Проверяем серверный socket на оживление. Глух ((( А такие надежды были на эту настройку.

Впрочем, определённые выводы сделать можно. Настройка наша работает. Пауза 30с. Оставляем полезная настройка для нагруженного сервера.

Wireshark

Решил посмотреть, что нам покажет Wireshark. Кто не знает это мощнейший анализатор всевозможных протоколов, в том числе и tcp/ip. До недавнего времени в программе не была реализована функция прослушки localhost и приходилось производить некоторые танцы с бубном. Ставить виртуальную сетевую карту. Трафик пускать через неё. А Wireshark уже мог подключатся к прослушке этой карты.

Недавно подвезли возможность прослушки localhost что очень облегчило работу.

Далее всё стандартно. Запускаем сервер, клиентов. Убиваем слушающий socket.

Запускаем Wireshark ставим фильтр на 80 порт. Открываем браузер пытаемся закачать файл javascript.

Вот какое непотребство мы увидели в сниффере:

Анализируем увиденное.

Видно, наш запрос. Браузер с порта 36036 пытается достучатся к порту 80. Выставляет флаг SYN. Это стандартно. Но вот с порта 80 нам возвращается флаг RST оборвать соединения, сбросить буфер. Всё. И так по кругу.

Вывод. Wireshark нам особо не помощник. Разве только мы увидели, что слушающий socket не мёртв совсем, а отвечает. Он работает по какой-то своей внутренней логике, а не просто умер.

Журнал windows

Решил посмотреть может что в журнале событий windows есть что-то интересное. Для этого захожу в журнал.

Панель управления-> Администрирование-> Управление компьютером-> Служебные программы-> Просмотр событий-> Журналы Windows

Либо запустите eventvwr.msc

Там, разумеется, есть миллионы записей. И чтобы не копаться в этих миллионах, я делаю следующее. Убиваю socket. Захожу в журнал и чищу подразделы. Захожу в браузер пытаюсь скачать файл. Захожу в журнал и смотрю что там только что появилось.

К сожалению, никаких событий, кусающихся моей проблемы, не появилось. Я даже нашёл такой раздел Microsoft/Windows/TCPIP. Но кроме одинокой записи работает, там ничего не было.

Финал?

Поиск в интернете всей доступной информации по проблеме периодически выдавал мне страницы подобной этим:

И ещё во многих других местах.

Отсюда я сделала 2 вывода я со соей проблемой не одинок, и как минимум периодически с ней сталкиваются и другие.

Есть ещё 2 параметра TCP стека с которыми можно поэкспериментировать.

  1. TcpNumConnections - максимальное количество одновременных подключений в системе.

  2. TcpMaxDataRetransmissions - количество повторных посылок при неудаче.

Поиск по этим параметрам для Windows 10 ничего не дал. Только для Windows 2003-2008 Server. Может плохо искал (наверняка). Но я всё же решил их проверить.

Установил следующие значения:

TcpNumConnections REG_DWORD: 00fffffe (hex)

TcpMaxDataRetransmissions REG_DWORD: 00000005 (hex)

Перезагрузился. Повторил в который раз все процедуры. И.

Socket жив. Всё проверив несколько раз, путём добавления и удаления из реестра параметров с последующей перезагрузкой компьютера. Все эксперименты показали, что с параметрами socket остаётся жив. Это всё?

В этом месте был завершающий текст с выводами и размышления на тему почему именно эти параметры помогли.

Но.

На следующий день, я решил закрепить полученную информацию. Запускаю эксперимент socket мёртв. Мы вернулись к тому, с чего начинали!

Эти параметры оказались бесполезны в решении нашей проблемы. Но я их всё-таки оставил потому как по смыслу они полезные.

Продолжаем.

Финал.

Продолжая эксперименты, я обнаружил в коде один хитрый перехватчик исключений на слушающим socket. Точка остановки в этом месте показала, что при возникновении нашего случая сюда начали валится куча exception со следующим сообщением:

Удаленный хост принудительно разорвал существующее подключение.

В этом exсeption была паузу. С пояснением исключение на socket случай неординарный, например socket занят другим приложением, делаем паузу что бы всё утряслось и в лог не сыпалось миллионы сообщений.

Как оказалось не неординарный.

Картина сложилась.

Получается, что приёмный socket набрал большое число входящих подключений, которые оборвались, и больше не принимает новые подключения пока поток висел в exception на паузе.

Ради интереса провёл эксперимент. Убил socket. Ввёл команду netstat -an. И не увидел ни одного socket клиента на нашем socket сервера. Хотя в приёмном socket сервера висело куча мёртвых подключений.

Вообще непонятно как под капотом ведёт себя слушающий socket. По идее это должен быть совершенно автономный в отдельном потоке код. Который в любом случае должен принимать подключения. Пока, разумеется, не исчерпаются внутренние буфера. Но на практике пауза в потоке вызова слушающего socket - полностью убила его.

Выводы.

Ну что ж. Я, конечно, ожидал что геройски одержу победу над некоей эпичной ошибкой. А всё оказалось очень банально. Собственно, как всегда.

Все эксперименты у меня заняли где-то полтора дня. Эту статью я писал намного больше.

Из плюсов цель эксперимента достигнута. Сервер провёл часы под нагрузкой, вряд ли я бы столько тестировал его, не будь этой ошибки. Плюс я отдельно проверил некоторые важные механизмы работы сервера. Т.к. искал ошибку эта проверка была интересна. Если бы просто проверял то это была бы наискучнейшая работа.

Узнал новую информацию по поводу работы сети эти знания пригодятся впоследствии при решения очередного непонятного бага.

Почему у меня заняло так много времени на поиск банального Delay? Я думал над этим и пришёл к выводу, что все эксперименты уводили в сторону от ответа. Сложившееся картина показывала нам что умирал именно socket. Переставал принимать подключения. Спустя время оживал. Все сниферы показывали, что подключений не происходит.

Опять же было неправильное представление, что серверный socket работает в отдельном потоке. И в любом случае должен принимать клиентов. Поэтому я и скал проблему в настройках socket и tcp стека. Главная проблема я думал поток висит на socket. А в нашем аварийном случае он висел на exeption в Delay.

Подробнее..

Из песочницы Трассировка сервисов в мобильной транспортной сети. Как мы пришли к графовой БД Neo4j

31.08.2020 16:23:29 | Автор: admin

Часть 1. Начало


1.1 Введение и постановка задачи


В компании МТС мы централизованно занимаемся контролем качества сетей передачи данных или, проще транспортной сети (не путать с логистической транспортной сетью), далее по тексту ТС. И, в рамках нашей деятельности, нам постоянно приходиться решать две основные задачи:

  1. Обнаружена деградация клиентских (по отношению к ТС) сервисов нужно определить путь их проключения через ТС, и выяснить, является ли причиной деградации сервисов какой-либо участок ТС. Далее, будем называть это Прямой задачей.
  2. Обнаружена деградация качества транспортного канала или последовательности каналов нужно определить, какие сервисы зависят от данного канала/каналов, чтобы определить влияние. Далее, будем называть это Обратной задачей.

Под сервисами ТС понимается любое проключение клиентского оборудования. Это могут базовые станции (БС), В2В клиенты (использующие ТС МТС для организации доступа в сеть Интернет и/или наложенных сетей VPN), клиенты фиксированного доступа (т.н. ШПД), и т.д. и т.п.

В нашем распоряжении две централизованные информационные системы:
Система Performance Monitoring Данные о параметрах и топологии сети
Метрики, КПЭ ТС Параметры конфигурации, L2/L3 каналы

Любая транспортная сеть по своей сути является ориентированным графом $G=(V,E)$, в котором каждое ребро $(u,v) in E$ имеет неотрицательную пропускную способность. Потому с самого начала поиск решения указанных задач выполнялся в рамках теории графов.

Сначала вопрос сопоставления показателей качества ТС и сервисов с топологией ТС решался путем буквального объединения и представления данных топологии и качества в виде сетевого графа.

Просмотр сформированного по данным топологии и производительности графа был реализован на open source ПО Gephi. Это позволило решить задачу автоматического представления топологии, без ручной работы по её актуализации. Выглядит это так:


Здесь, узлы это, собственно узлы ТС (маршрутизаторы, коммутаторы) и базовые, рёбра каналы ТС. Цветовая маркировка, соответственно, обозначает наличие деградаций качества и статусы обработки этих деградаций.

Казалось бы вполне наглядно и можно работать, но:

  • Прямую задачу (от сервиса к пути сервиса) можно решить достаточно точно только при условии, что сама топология ТС достаточно простая дерево или просто последовательное соединение, без каких-либо колец и альтернативных маршрутов.
  • Обратную задачу можно решить при аналогичном условии, но при этом на уровне агрегации и ядра сети решить эту задачу невозможно в принципе, т.к. данные сегменты управляются протоколами динамической маршрутизации и имеют множество потенциальных альтернативных маршрутов.

Также, надо иметь ввиду, что в общем случае топология сети гораздо сложнее (выше приведенный фрагмент обведён красным):


И это не самый маленький сегмент есть гораздо больше и сложнее по топологии:


Итак, данный вариант подходил для медитативного разбора отдельных случаев, но никак не для поточной работы и, тем более, не для автоматизации решения прямой и обратной.



Часть 2. Автоматизация v1.0


Напоминаю, какие задачи мы решали:

  1. Определение пути проключения сервиса через ТС Прямая задача.
  2. Определение зависимых сервисов от канала ТС Обратная задача.

2.1. Транспортные сервисы для Базовых Станций (БС)


Обобщенно, организация транспорта от центрального узла (контроллера/шлюза) до БС выглядит так:


На сегментах агрегации и ядра ТС проключения выполняются через транспортные сервисы MPLS сети: L2/L3 VPN, VLL. На сегментах доступа проключения выполняются, как правило, через выделенные VLAN-ы.

Напоминаю, что у нас есть БД где лежит вся актуальная (в пределах определенного срока) параметрия и топология ТС.

2.2. Решение для коммутируемого сегмента (доступ)


Берем данные о VLAN логического интерфейса БС, и пошагово идём по связям, порты которых содержат этот Vlan ID, пока не дойдем до граничного маршрутизатора (МВН).


Для решения такой простой постановки задачи в итоге пришлось:

  1. Написать алгоритм пошаговой трассировки распространения VlanID от БС по каналам сети агрегации
  2. Учесть имеющиеся пробелы в данных. Особенно это касалось стыков между узлами на площадках.
  3. Фактически написать SPF алгоритм для того чтобы отбросить в конце тупиковые ветки, которые не ведут к МВН маршрутизатору.

Алгоритм получился из одного основного процесса и семи подпроцессов. На его реализацию и отладку было потрачено недели 3-4 чистого рабочего времени.

Кроме того, особое удовольствие нам доставили

2.2.1. SQL JOIN


В силу своей структуры реляционная модель для прохода по графу требует явного рекурсивного подхода с операциями соединения на каждом уровне, при это осуществляется многократный проход по одному набору записей. Что в свою очередь влечет за собой ухудшение производительности системы, особенно на больших наборах данных.

По понятным причинам не могу здесь приводить содержимое запросов к БД, но оцените каждый уступ в тексте запроса это подключение очередной таблицы, которая нужна для, в данном случае, получения в унифицированном виде соответствия Порт VlanID:


А этот запрос для получения, в унифицированном виде кросс-коннектов VlanID внутри коммутатора:


Учитывая, что кол-во портов составляло несколько десятков тысяч, а VLAN в 10 раз больше ворочалось всё это очень нехотя. А такие запросы нужно было сделать для каждого узла и VlanID. И выгрузить всё сразу и вычислить нельзя, т.к. это последовательное вычисление пути c с пошаговыми операциями, которые зависят от результатов предыдущего шага.

2.3. Определение пути сервиса в маршрутизируемых сегментах


Здесь мы начали с одного вендора МВН, система управления которого предоставляла данные о текущем и резервном LSP через сегмент MPLS. Зная Access интерфейс, который стыковался с доступом (L2 Vlan) можно было найти LSP а затем через серию запросов к NBI системы, получить путь LSP, состоящий из маршрутизаторов и линков между ними.


  • Аналогично коммутируемому сегменту, описание выгрузки пути LSP MPLS сервиса вылилось в алгоритм с уже 17-ю подпрограммами.
  • Решение работало только на сегментах, которые обслуживались данным вендором
  • Нужно было решать определение стыков между сервисами MPLS (например, в центре сегмента был общий VPLS сервис, а от него расходились или EPIPE либо L3VPN)

Мы проработали вопрос и для других вендоров МВН, где систем управления не было, или они не предоставляли данных о текущем прохождении LSP в принципе. Решение для нескольких мы нашли, но кол-во LSP, проходящее через маршрутизатор это уже не кол-во VanID, которое прописано на коммутаторе. Затягивая такой объем данных по запросу (ведь нам нужна оперативная информация) есть риск положить железо.

Кроме этого, возникали дополнительные вопросы:

  • Сеть МВН в общем случае мультивендорная, и на одном сегменте встречаются маршрутизаторы разных производителей, управляемых разными СУ. Т.е. у нас будут данные только о куске пути MPLS сервиса.
  • СУ вендора, которая могла предоставлять данные об основном и резервном LSP, на деле прописывала и первый и второй по одному и тому же пути. Это мы видели очень часто. А нам бы хотелось знать про потенциальные пути обхода при анализе ограничений на сети.

2.4. Где и как хранить результаты. Как запрашивать данные


Полученные данные о путях проключения сервисов надо куда-то записать, чтобы потом обращаться к ним при решении наших Прямой и Обратной задач.

Вариант с хранением в реляционной БД сразу исключили: с таким трудом агрегировать данные из множества источников чтобы потом в итоге рассовать по очередным наборам таблиц? Это не наш метод. Тем более помним про многоэтажные джойны и их производительность.

Данные должны содержать информацию о структуре сервиса и зависимостях его составляющих: линках, узлах, портах и т.п.

В качестве тестового решения был выбран формат XML и Native-XML БД Exist.

Каждый сервис в результате записывался в БД в формате (подробности опущены для компактности записи):

<services><service> <id>,<description> Описание сервиса (например, наименование БС)<source> Интерфейс условной точки А<target> Интерфейс условной точки Z<<segment>> Сегменты L2/L3<topology> Информация о пути через сегмент (узлы, порты/интерфейсы, соединения)<<joints>> Стыки между сегментами (узлы, порты/интерфейсы, соединения)</service></services>

Запрос данных по прямой и обратной задачам выполнялся по протоколу XPath:


Всё. Теперь система заработала на запрос с наименованием БС возвращается топология проключения её через транспортную сеть, на запрос с наименованием узла и порта ТС возвращается список БС, которые от него зависят по подключению. Поэтому, выводы мы сделали следующие

2.5.




Вместо заголовка о выводах по части 2

2.5.1. Для коммутируемого сегмента (сети на L2 коммутаторах Ethernet)


  • Обязательно нужны полные данные о топологии и соответствии Port VlanID. Если на каком-то линке нет данных о VlanID алгоритм останавливается, путь не найден
  • Многоэтажные непроизводительные запросы к реляционной БД. При появлении нового вендора со своей спецификой параметрии добавление запросов на всех этапах работы

2.5.2. Для маршрутизируемого сегмента


  • Ограничено возможностями СУ МВН по предоставлению данных о топологии LSP MPLS сервисов.
  • Запросы конфигурации непосредственно с МВН потенциально опасны, т.к. счёт обслуживаемых LSP идет на тысячи.
  • Резервные LSP часто прописываются системой по тому же маршруту что и основные нет информации о потенциальных альтернативных путях (в том числе и помимо тех, которые держит система).

2.5.3. Общее


  • Чтобы решить, по сути, задачу в сетевом графе, мы постоянно собираем данные из табличных источников, чтобы представить их в виде графа ( визуально в голове, или в памяти программы), реализовать алгоритм, а потом разобрать обратно.
  • Алгоритмизация решений и их реализация традиционными методами требует значительных трудозатрат. На реализацию данного этапа у нас ушло в целом 3-4 месяца.
  • Масштабирование очень затруднено, т.к. любое изменение, в виде появления нового вендора либо MPLS сервиса ведёт за собой внесение изменений в структуру запросов к БД и в сам программный код.
  • Приходится писать Дейкстра подобные алгоритмы, а не использовать готовые инструменты.

2.6. Чего всё-таки хотим


  • Чтобы данные о сетевом графе ТС и хранились, и обрабатывались как сетевой граф, т.е. набор узлов и зависимостей.
  • Чтобы использовались уже готовые алгоритмы работы с графами
  • Чтобы модель данных была универсальной, а не вендорно-зависимой
  • Чтобы трассировка была возможна и на неполных данных (например, частичное отсутствие данных о VlanID)

После того как мы провели оценку возможных вариантов реализации наших хотелок определились с классом систем, которые бы всё это обеспечили из коробки это т.н. графовые базы данных.

Хоть и читается последнее предложение как нечто линейное и простое, учитывая что ранее с таким классом БД никому из нас (и наших айтишников, как потом выяснилось тоже) сталкиваться не приходилось к решению пришли в некоторой степени случайно: подобные базы данных упоминались (но не разбирались) в обзорном курсе по Big Data. В частности, там упоминался продукт Neo4j. Мало того, что он, судя по описанию, удовлетворял всем нашим требованиям, у него ещё есть полностью бесплатная функциональная community-версия. Т.е. не 30-дневный триал, не обрезанный основной функционал, а полностью рабочий продукт, который можно не спеша изучить. Не последнюю (если не основную) роль в выборе сыграла широкая поддержка графовых алгоритмов.



Часть 3. Пример реализации прямой задачи в Neo4j


Чтобы не затягивать линейное повествование о том, как реализуется модель ТС в графовой БД Neo4j сразу покажем конечный результат на примере.

3.1. Трассировка пути проключения интерфейса Iub для БС 3G



Путь проключения сервиса проходит по двум сегментам маршрутизируемому МВН, и коммутируемому РРЛ (радиорелейные станции работают как Ethernet-коммутаторы). Путь через РРЛ сегмент определяется также, как было описано в части 2 по прохождению VlanID интерфейса БС по сегменту РРЛ до граничного маршрутизатора МВН. Сегмент МВН соединяет граничный (с сегментом РРЛ) маршрутизатор с маршрутизатором к которому подключен контроллер БС (RNC).

Изначально, из параметрии Iub, мы знаем точно какой МВН является шлюзом для БС (граничный МВН), и каким контроллером обслуживается БС.

Исходя из этих начальных условий, построим 2 запроса к БД для каждого из сегментов. Все запросы к БД строятся на языке Cypher. Чтобы сейчас не отвлекаться на его описание, воспринимайте его просто как SQL для графов.

3.1.1. Сегмент РРЛ. Путь по VlanID


Cypher-запрос трассировки пути сервиса по имеющимся данным о VlanID и топологии L2:
Фрагмент Cypher-запроса
(конструкция WITH передача результатов одного этапа запроса на следующий (конвейеризация обработки) )
Промежуточные результаты запроса (визуальное представление в консоли Neo4j Neo4j Browser)
Получение узлов БС и МВН между которыми будет проводиться поиск пути сервиса Iub

match (bts:node {name:'BTS_29_ХХХХ_N}), (mbh:node {name:'MBH_29_YYYYY_N})with bts, mbh

Получение узлов Vlan БС интерфейса Iub

match (bts)-[:port_attach]->(:port)-[:vlan]->(vlan:vlan) with bts as bts,  vlan.vlanid AS vlan_bts   , mbh 

Выбираем узлы ТС на той же площадке с БС, на портах которых прописан VlanID Iub БС
MATCH  ((bts)-->(pl_bts:PL)-->(n:node)-[:port_attach]->(pid:port)-->(v:vlan)) where (v.vlanid=vlan_bts and v.updated > bts.updated - 864000000) with distinct(v) as v,n,mbh,vlan_bts, bts

по алгоритму Дейкстра находим кратчайший путь от VlanID ТС узла площадки БС до граничного МВН
CALL apoc.algo.dijkstra(v, mbh, 'port_attach>|port_hosted>|<node_vlan|v_ptp_vlan>|ptp_vlan>|located_at>|to_node>', 'weight',10000000000.0,1) YIELD path as path_pl_mbh

Из цепочки Vlan получаем список узлов, портов, и связей между портами, который, в итоге, и будет являться путём проключения сервиса Iub от БС до граничного маршрутизатора
WITH FILTER(node in nodes(path_pl_mbh) WHERE  (node:vlan)) as vlans_node , path_pl_mbh, bts ,mbh , vlan_btsunwind vlans_node as vlan_nodematch (vlan_node)-->(p1:port)match p=(p1)-[:port_hosted|to_port|v_to_port|to_node|located_at]->()return p, bts, mbh

Результат:


Как видно, путь получен, даже несмотря на частичный недостаток данных. В данном случае отсутствует информация о стыке порта БС с портом радиорелейной станции.

3.1.2. Сегмент РРЛ. Путь по L2 топологии


Допустим, попытка в п. 3.1.1. не удалась по причине полного или частичного отсутствия данных о параметрии VlanID. Другими словами подобная непрерывная цепочка, доходящая до узла МВН не выстраивается:

Тогда можно попробовать определить проключение сервиса как кратчайший путь до МВН по топологии L2:
match (bts:node {name:'BTS_29_ХХХХ_N}), (mbh:node {name:'MBH_29_YYYYY_N})with bts, mbhCALL apoc.algo.dijkstra(bts, mbh, 'located_at>|to_node>|to_port>|v_to_port>|port_hosted>|port_attach>', 'weight',1.0,1) YIELD path as preturn p

Результат:


Как видно, получен такой же результат. Здесь недостаток информации о стыке БС с РРС восполнен прохождением связи через объект (узел) площадки, на которой они находятся. Разумеется, точность такого метода будет меньше, т.к. в общем случае Vlan может быть прописан не по кратчайшему пути, предполагаемому алгоритмом Дейкстра. Зато запрос состоит из всего двух операций.

3.1.3 Сегмент МВН. Трассировка пути от граничного МВН до контроллера


Здесь так же используем алгоритм Дейкстра.
Один путь с минимальной стоимостью
match (mbh:node {name:MBH_29_XXX}), (rnc:node {name:RNC_YYY})CALL apoc.algo.dijkstra(mbh,rnc, 'port_attach>|port_hosted>|<node_vlan|ptp_vlan>|located_at>|to_node>|to_port_mbh>', 'weight',10000000000.0,1) YIELD path as pathreturn path

Топ-2 пути с минимальной стоимостью (основной + альтернатива)
match (mbh:node {name:MBH_29_XXX}), (rnc:node {name:RNC_YYY})CALL apoc.algo.dijkstra(mbh,rnc, 'port_attach>|port_hosted>|<node_vlan|ptp_vlan>|located_at>|to_node>|to_port_mbh>', 'weight',10000000000.0,2) YIELD path as pathreturn path

Топ-3 пути с минимальной стоимостью (основной + две альтернативы)
match (mbh:node {name:MBH_29_XXX}), (rnc:node {name:RNC_YYY})CALL apoc.algo.dijkstra(mbh,rnc, 'port_attach>|port_hosted>|<node_vlan|ptp_vlan>|located_at>|to_node>|to_port_mbh>', 'weight',10000000000.0,3) YIELD path as pathreturn path


Аналогично в данном случае отсутствует информация о непосредственных стыках МВН с RNC. Но это не мешает построить путь сервиса, пусть и предполагаемый алгоритмом (об этом позже).

3.2. Трудозатраты


Реализация Прямой задачи, показанная сейчас, разительно отличается от подхода разработай алгоритм, программу, способ хранения и извлечения результатов здесь всё сводится к напиши запрос к БД. Забегая вперёд, отметим, что весь цикл от разработки простой модели графа, загрузки данных в Neo4j из реляционной БД, написания запросов, и до получения результата занял, в общей сложности, один день.

3-4 месяца vs 1 день!!! Это было последним доводом для окончательного ухода в графовую БД.



Часть 4. Графовая БД Neо4j и загрузка данных в неё


4.1. Сравнение реляционных и графовых БД


Сравнительная характеристика Реляционная БД Графовая БД
Хранение данных Данные хранятся в наборе отдельных таблиц, связи между строками которых могут быть определены по ключевым полям через специальные отношения: один к одному, один ко многим, многие ко многим.

Данные хранятся или в виде узлов или в виде отношений (направленных связей/ссылок между узлами). И у тех и у других может быть произвольный набор параметров.

Структура данных Тот, кто строит запросы к БД, должен точно знать структуру хранения данных, и какие связи между данными, помимо определенных через внешние ключи, могут быть. Любые изменения/дополнения в структуре данных требуется соотносить с моделью хранения, при этом заранее оценивая последствия для производительности БД в целом. По этой причине коммерческие системы, использующие для хранения РБД, берут с клиентов деньги за адаптацию потребностей последних к своей модели хранения данных
Структура данных (узлов и взаимосвязей между ними) определяется самими данными. Пользователю доступна вся информация о текущей структуре. Отсутствует жесткая фиксированная схема возможно одновременное существование нескольких вариантов представления данных. Прямое и универсальное моделирование данных пользователем. Любой объект может быть представлен набором из трех сущностей: Узел, Связь, Параметры.
Проблема JOIN Не всегда поля, по которым понадобится сделать объединение данных при запросе индексированы. Это может значительно снизить быстродействие запроса, т.к. заставляет планировщик прибегать к NESTED-LOOP
Связи между узлами определяются на этапе загрузки в БД или на этапах постпроцессинга (вычисления связей). Если угодно ГБД это как если бы в РБД все нужные JOIN уже были сделаны, причем на уровне отдельных полей отдельных строк.
JOIN chain Чем больше мы сцепляем JOIN операций друг за другом в одном запросе тем больше падение производительности. Т.е. мы экспоненциально теряем в производительности на каждый вложенный JOIN в запросе. В данной статье приведен пример решения задачи поиска в глубину запросами к РБД Oracle, и к ГБД Neo4j:

Проблемы не существует. Обход графа в глубину нативное свойство ГБД.
Зависимость от кол-ва записей/размера БД Чем больше записей в объединяемых таблицах, тем большее время требуется от РБД для обработки запроса, даже если количество реально возвращаемых записей неизменно
Запрос отсекает именно ту часть графа, которая связана с интересующими узлами/связями и работает только с ней, при этом размер БД графа может быть любым. Например, запрос идет от узла МВН-Х, при этом всего в графе может быть как 1000 так и 1 млн узлов МВН поиск всё равно будет внутри сегмента графа, включающего только МВН-Х.


4.2. Модель данных


Базовая модель представления ТС до уровня L3 топологии включительно:


Разумеется, модель обширнее представленной, и содержит также и MPLS сервисы, и виртуальные интерфейсы, но для упрощения рассмотрим её ограниченный фрагмент.

В такой модели связь между двумя сетевыми элементами одного региона может быть представлена как:


4.3. Загрузка данных


Загрузку данных выполняем из БД параметрии и топологии ТС. Для загрузки в Neo4j из SQL БД используется библиотеки APOC apoc.load.jdbc, которая принимает на вход строку подключения к RDBMS и текст SQL запроса, и возвращает множество строк, которые через CYHPER выражения отображаются на узлы, связи, или параметры. Такие операции выполняются слой за слоем для каждого типа объектов модели.

Например, проход для загрузки/обновления узлов, представляющих сетевые элементы (Nodes):
with "jdbc:oracle:thin:usr /passw@IP:1521/db" as url // DB connection stringCALL apoc.load.jdbc(url," select distinct mr,region_code,site_code,node, nodeip,TYPE,VENDOR, coalesce(updated, trunc(localtimestamp-7)) as updatedfrom (    select distinct mr,region_code,site_code,node, nodeip,TYPE,VENDOR, updated from GRAPH_IPMPLS_NE    union    select distinct mr,region_code,site_code,node, nodeip,TYPE,VENDOR, updated from GRAPH_RRN_NE    union    select distinct mr,region_code,site_code,node, nodeip,TYPE,VENDOR, updated from GRAPH_CONTROLLERS_NE) twhere mr is not null and region_code is not null and site_code is not null ") YIELD row

Вызов процедуры apoc.load.jdbc,
получение набора данных
match (reg:Region {region_code:row.REGION_CODE})-->(pl:PL {SiteCode:row.SITE_CODE}) with pl, row, {updated:row.UPDATED, weight:1000000000000} as conn_params

Для каждой строки из набора данных
по коду региона и сайта ищутся узлы
представляющие соответствующие площадки
merge (pl)<-[loc:located_at]-(n:node {ip:row.NODEIP})merge (pl)-[to_n:to_node]->(n)set n.name=row.NODEset n.region_code = row.REGION_CODEset n.type = row.TYPEset n.updated = row.UPDATEDset n.vendor = row.VENDORset loc += conn_paramsset to_n += conn_params

Для каждого объекта площадки обновляются
связанные с ней сетевые элементы (node).
Используется инструкция MERGE + SET,
которая обновляет параметры объекта,
если он уже существует в БД, если нет
то создает объект. Тут же записываются
параметры узла Node и его связей с узлом PL

И так далее по всем уровням модели ТС.

Поле Updated используется при контроле актуальности данных в графе не обновляемые дольше определенного периода объекты удаляются.



Часть 5. Решаем обратную задачу в Neo4j


Когда мы только начинали, выражение трассировка сервиса в первую очередь вызывало следующие ассоциации:


Т.е., что трассируется непосредственно текущий путь прохождения сервиса, в данный момент времени.

Это не совсем то, что мы имеем в графовой БД. В ГБД трассируется сервис по взаимосвязям объектов, определяющих его конфигурацию в каждом задействованном сетевом элементе. Т.е., в виде графовой модели представляется конфигурация, и результирующая трасса это проход по модели, представляющей эту конфигурацию.

Так как, в отличие от коммутируемого сегмента, фактические маршруты сервиса в сегменте mpls определяются динамическими протоколами, нам пришлось принять некоторые

5.1. Допущения для маршрутизируемых сегментов


Т.к. из данных конфигурации mpls сервисов нет возможности определить их точный путь прохождения через сегменты, управляемые динамическими протоколами маршрутизации (тем более, если используется Traffic Engineering) для решения используется алгоритм Дейкстры.
Да, есть системы управления, которые могут по NBI-интерфейсу предоставлять актуальный путь сервисных LSP, но пока у нас такой вендор только один, а вендоров на сегменте МВН больше чем один.

Да, есть системы анализа протоколов маршрутизации внутри AS, которые, слушая обмен IGP протоколов, могут определить текущий маршрут интересующего префикса. Но стоят такие системы как сбитый Боинг, а учитывая что такую систему нужно развернуть на всех AS того же мобильного бэкхола стоимость решения вместе со всеми лицензиями составит стоимость Боинга, сбитого чугунным мостом, привязанным к ракете Ангара при полной заправке. И это ещё при том что подобными системами не вполне решены задачи учета маршрутов через несколько AS c использованием BGP.

Поэтому пока так. Конечно, мы добавили несколько подпорок в условия стандартного алгоритма Дейкстра:

  • Учёт статуса интерфейсов/портов. Отключенный линк повышается в стоимости и идет в конец возможных вариантов пути.
  • Учёт резервного статуса линка. По данным системы Performance Monitoring вычисляется присутствие в mpls канала только трафика keepalive, соответственно, стоимость такого канала также увеличивается.

5.2. Как решить обратную задачу в Neo4j


Напоминание. Обратная задача получение списка сервисов, зависимых от конкретного канала или узла транспортной сети (ТС).

5.2.1. Коммутируемый L2 сегмент


Для коммутируемого сегмента, где путь сервиса и конфигурация сервиса это практически одно и то же, задачу ещё можно решить через CYPHER-запросы. Например, для радиолерейного пролета из результатов решения Прямой задачи в п 3.1.1., сделаем запрос от модема радиорелейного линка развернем все цепочки Vlan, которые проходят через него:

match (tn:node {name:'RRN_29_XXXX_1'})-->(tn_port:port {name:'Modem-1'})-->(tn_vlan:vlan)with tn, tn_vlan, tn_portcall apoc.path.spanningTree(tn_vlan,{relationshipFilter:"ptp_vlan>|v_ptp_vlan>", maxLevel:20}) yield path as ppwith tn_vlan,pp,nodes(pp)[-1] as last_node, tn_portmatch (last_node)-[:vlan]->(:port)-->(n:node)return pp, n, tn_port

Красным узлом обозначен модем, Vlan которого разворачиваем. Обведены 3 БС на которые в итоге вывело развертывание транзитных Vlan с Modem1.


У такого подхода есть несколько проблем:

  • Для портов должны быть известны и загружены в модель сконфигурированные Vlan.
  • Из-за возможного фрагментирования, цепочка Vlan не всегда выводит на терминальный узел
  • Невозможно определить относится ли последний узел в цепочке Vlan к терминальному узлу или сервис, на самом деле, продолжается дальше.

То есть, сервис всегда удобнее трассировать между конечными узлами/точками его сегмента, а не из произвольной середины, и с одного уровня OSI.

5.2.2. Маршрутизируемый сегмент


С маршрутизируемым сегментом, как уже описано в п 5.1, выбирать не приходится для решения обратной задачи на основании данных текущей конфигурации какого-то промежуточного линка MPLS никаких средств нет конфигурация не определяет в явном виде трассу сервиса.

5.3. Решение


Решение было принято следующее.

  • Проводится полная загрузка модели ТС, включая БС и контроллеры
  • Для всех БС выполняется решение прямой задачи трассировка сервисов Iub, S1 от БС до граничного МВН, а затем от граничного МВН до соответствующих контроллеров или шлюзов.
  • Результаты трассировки записываются в обычную SQL БД в формате: Наименование БС массив элементов пути сервиса

Соответственно, при обращении к БД с уловием Узел ТС или Узел ТС + Порт, возвращается список сервисов (БС), в массиве пути которых есть нужный Узел или Узел+Порт.





Часть 6. Результаты и выводы


6.1. Результаты


В итоге система работает следующим образом:


На данный момент для решения прямой задачи, т.е. при анализе причин деградации отдельных сервисов, разработано веб-приложение, показывающее результат трассировки (путь) из Neo4j, с наложенными данными о качестве и производительности отдельных участков пути.


Для получения списка сервисов, зависимых от узлов или каналов ТС (решение обратной задачи) разработан API для внешних систем (в частности Remedy).

6.2. Выводы


  • Оба решения подняли на качественно новый уровень автоматизацию анализа качества сервисов и транспортной сети.
  • Кроме этого, при наличии готовых данных о трассах сервисов БС, стало возможно быстро предоставлять данные для бизнес-подразделений о технической возможности включения В2В клиентов на конкретных площадках по емкости и качеству трассы.
  • Neo4j показал себя как очень мощный инструмент для решения задач с сетевыми графами. Решение отлично документировано, имеет широкую поддержку в различных сообществах разработчиков, постоянно развивается и улучшается.

6.3. Планы


В планах у нас:

  • расширение технологических сегментов, моделируемых в БД Neo4j
  • разработка и внедрение алгоритмов трассировки сервисов ШПД
  • увеличение производительности серверной платформы

Спасибо за внимание!
Подробнее..

Из песочницы Apache Kafka и тестирование с Kafka Server

12.11.2020 14:14:33 | Автор: admin

Введение


Существуют различные способы для написания тестов с использованием Apache Kafka. К примеру, можно использовать TestContainers и EmbeddedKafka. Об этом можно почитать, к примеру, вот здесь: Подводные камни тестирования Kafka Streams. Но существует и вариант для написания тестов с использованием KafkaServer.


Что будет тестироваться?


Предположим, необходимо разработать сервис отправки сообщений по различным каналам: email, telegram и т.п.


Пусть имя сервиса будет: SenderService.


Сервис должен: слушать заданный канал, выделять из канала нужные ему сообщения, разбирать сообщения и отправлять по нужному каналу для конечной доставки сообщений.


Для проверки сервиса необходимо сформировать сообщение для отправки с использованием канала отправки почты и убедиться в том, что сообщение было передано в конечный канал.
Конечно, в реальных приложениях тесты будут сложнее. Но для иллюстрации выбранного подхода, такого теста будет достаточно.


Сервис и тест реализованы с использованием: Java 1.8, Kafka 2.1.0, JUnit 5.5.2, Maven 3.6.1.


Сервис


Сервис будет иметь возможность начать работу и остановить свою работу.


void start()void stop()

При старте необходимо задать, как минимум, следующие параметры:


String bootstrapServersString senderTopicEmailService emailService

bootstrapServers адрес kafka.
senderTopic топик, из которого будут считываться сообщения.
emailService сервис для конечной отправки сообщений по почте.


В реальном сервисе таких конечных сервисов будет столько же сколько и конечных каналов отправки сообщений.


Теперь необходим потребитель, который слушает канал, фильтрует и отправляет сообщения в конечные каналы. Количество таких потребителей можно выбирать. Подход для написания потребителя описан вот здесь: Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.


Collection<AutoCloseable> closeables = new ArrayList<>();ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);for (int i = 0; i < senderTasksN; i++) {    SenderConsumerLoop senderConsumerLoop =            new SenderConsumerLoop(                    bootstrapServers,                    senderTopic,                    "sender",                    "sender",                    tasksExecutorService,                    emailService            );    closeables.add(senderConsumerLoop);    senderTasksExecutor.submit(senderConsumerLoop);}

В цикле создается экземпляр потребителя, запоминается в коллекции и запускается через сервис запуска задач.


При выполнении этого кода потребители начинают работать. Сервис ждет их завершения или сигнала для остановки.


Runtime.getRuntime().addShutdownHook(new Thread(() -> {    for (AutoCloseable autoCloseable : closeables) {        try {            autoCloseable.close();        } catch (Exception e) {            e.printStackTrace();        }    }    senderTasksExecutor.shutdown();    tasksExecutorService.shutdown();    stop();    try {        senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);    } catch (InterruptedException e) {        e.printStackTrace();    }}));

При завершении необходимо освободить ресурсы.


Потребитель


Потребитель имеет следующие публичные методы:


void run()void close()

Основной метод: run.


@Overridepublic void run() {    kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);    kafkaConsumer.subscribe(Collections.singleton(topic));    while (true) {        calculate(kafkaConsumer.poll(Duration.ofSeconds(1)));    }}

По входным параметрам создается экземпляр kafka-потребителя. kafka-потребитель подписывается на заданный топик. В бесконечном цикле выбираются записи из топика. И отправляются на обработку.


Для иллюстрации json-сообщения будут иметь несколько полей, которые будут задавать и тип сообщения, и данные для отправки.


Пример сообщения:


{  "subject": {    "subject_type": "send"  },  "body": {    "method": "email",    "recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",    "title": "42",    "message": "73"  }}

subject_type тип сообщения. Для сервиса нужно значение send.
method тип конечного сервиса для отправки. email отправка через почту.
recipients список получателей.
title заголовок для сообщения.
message сообщение.


Обработка всех записей:


void calculate(ConsumerRecords<String, String> records) {    for (ConsumerRecord<String, String> record : records) {        calculate(record);    }}

Обработка одной записи:


void calculate(ConsumerRecord<String, String> record) {            JSONParser jsonParser = new JSONParser();            Object parsedObject = null;            try {                parsedObject = jsonParser.parse(record.value());            } catch (ParseException e) {                e.printStackTrace();            }            if (parsedObject instanceof JSONObject) {                JSONObject jsonObject = (JSONObject) parsedObject;                JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);                String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();                if (SEND.equals(subjectType)) {                    JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);                    calculate(jsonBody);                }            }        }

Распределение сообщений по типу:


void calculate(JSONObject jsonBody) {    String method = jsonBody.get(METHOD).toString();    if (EMAIL_METHOD.equals(method)) {        String recipients = jsonBody.get(RECIPIENTS).toString();        String title = jsonBody.get(TITLE).toString();        String message = jsonBody.get(MESSAGE).toString();        sendEmail(recipients, title, message);    }}

Отправка в конечную систему:


void sendEmail(String recipients, String title, String message) {    tasksExecutorService.submit(() -> emailService.send(recipients, title, message));}

Отправка сообщений происходит через сервис исполнения задач.


Ожидания завершения отправки не происходит.


Создание kafka-потребителя:


static KafkaConsumer<String, String> createKafkaConsumerStringString(        String bootstrapServers,        String clientId,        String groupId) {    Properties properties = new Properties();    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);    properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);    properties.setProperty(            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,            "org.apache.kafka.common.serialization.StringDeserializer");    properties.setProperty(            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,            "org.apache.kafka.common.serialization.StringDeserializer");    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");    return new KafkaConsumer<>(properties);}

Интерфейс для писем:


interface EmailService {    void send(String recipients, String title, String message);}

Тест


Для теста понадобиться следующее.
Адрес kafka-сервера.
Порт для kafka-сервера.
Имя топика.


Сервис для управления kafka-сервером. Будет описан ниже.


public class SenderServiceTest {    @Test    void consumeEmail() throws InterruptedException {        String brokerHost = "127.0.0.1";        int brokerPort = 29092;        String bootstrapServers = brokerHost + ":" + brokerPort;        String senderTopic = "sender_data";        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {            kafkaServerService.start();            kafkaServerService.createTopic(senderTopic);        }    }}

Задаются параметры. Создается сервис для управления kafka-сервером. kafka-сервером стартует. Создается необходимый топик.


Создается mock конечного сервиса для отправки сообщений:


SenderService.EmailService emailService = mock(SenderService.EmailService.class);

Создается сам сервис и стартует:


SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);senderService.start();

Задаются параметры для сообщения:


String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";String title = "42";String message = "73";

Отправляется сообщение в канал:


kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));

Ожидание:


Thread.sleep(6000);

Проверка, что сообщение дошло до конечного сервиса:


verify(emailService).send(recipients, title, message);

Остановка:


senderService.stop();

Все вместе:


public class SenderServiceTest {    @Test    void consumeEmail() throws InterruptedException {        String brokerHost = "127.0.0.1";        int brokerPort = 29092;        String bootstrapServers = brokerHost + ":" + brokerPort;        String senderTopic = "sender_data";        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {            kafkaServerService.start();            kafkaServerService.createTopic(senderTopic);            SenderService.EmailService emailService = mock(SenderService.EmailService.class);            SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);            senderService.start();            String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";            String title = "42";            String message = "73";            kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));            Thread.sleep(6000);            verify(emailService).send(recipients, title, message);            senderService.stop();        }    }}

Вспомогательный код:


public class SenderFactory {    public static final String SUBJECT = "subject";    public static final String SUBJECT_TYPE = "subject_type";    public static final String BODY = "body";    public static final String METHOD = "method";    public static final String EMAIL_METHOD = "email";    public static final String RECIPIENTS = "recipients";    public static final String TITLE = "title";    public static final String MESSAGE = "message";    public static final String SEND = "send";    public static String key() {        return UUID.randomUUID().toString();    }    public static String createMessage(String method, String recipients, String title, String message) {        Map<String, Object> map = new HashMap<>();        Map<String, Object> subject = new HashMap<>();        Map<String, Object> body = new HashMap<>();        map.put(SUBJECT, subject);        subject.put(SUBJECT_TYPE, SEND);        map.put(BODY, body);        body.put(METHOD, method);        body.put(RECIPIENTS, recipients);        body.put(TITLE, title);        body.put(MESSAGE, message);        return JSONObject.toJSONString(map);    }}

Сервис для управления kafka-сервером


Основные методы:


void start()void close()void createTopic(String topic)

В методе start происходит создание сервера и вспомогательных объектов.


Создание zookeeper и сохранение его адреса:


zkServer = new EmbeddedZookeeper();String zkConnect = zkHost + ":" + zkServer.port();

Создание клиента zookeeper:


zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);zkUtils = ZkUtils.apply(zkClient, false);

Задание свойств для сервера:


Properties brokerProps = new Properties();brokerProps.setProperty("zookeeper.connect", zkConnect);brokerProps.setProperty("broker.id", "0");try {    brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());} catch (IOException e) {    throw new RuntimeException(e);}brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);brokerProps.setProperty("offsets.topic.replication.factor", "1");KafkaConfig config = new KafkaConfig(brokerProps);

Создание сервера:


kafkaServer = TestUtils.createServer(config, new MockTime());

Все вместе:


public void start() {    zkServer = new EmbeddedZookeeper();    String zkConnect = zkHost + ":" + zkServer.port();    zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);    zkUtils = ZkUtils.apply(zkClient, false);    Properties brokerProps = new Properties();    brokerProps.setProperty("zookeeper.connect", zkConnect);    brokerProps.setProperty("broker.id", "0");    try {        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());    } catch (IOException e) {        throw new RuntimeException(e);    }    brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);    brokerProps.setProperty("offsets.topic.replication.factor", "1");    KafkaConfig config = new KafkaConfig(brokerProps);    kafkaServer = TestUtils.createServer(config, new MockTime());}

Остановка сервиса:


@Overridepublic void close() {    kafkaServer.shutdown();    zkClient.close();    zkServer.shutdown();}

Создание топика:


public void createTopic(String topic) {    AdminUtils.createTopic(            zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);}

Заключение


В заключении нужно отметить, что приведенный здесь код лишь иллюстрирует выбранный способ.


Для создания и тестирования сервисов с использованием kafka можно обратиться к следующему ресурсу:
kafka-streams-examples


Ссылки и ресурсы


Исходный код


Код для тестирования с kafka-сервером

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru