Русский
Русский
English
Статистика
Реклама

Elk

Как ELK помогает инженерам по ИБ бороться с атаками на сайты и спать спокойно

22.10.2020 14:22:35 | Автор: admin
Наш центр киберзащиты отвечает за безопасность веб-инфраструктуры клиентов и отбивает атаки на клиентские сайты. Для защиты от атак мы используем файрволы веб-приложений FortiWeb (WAF). Но даже самый крутой WAF не панацея и не защищает из коробки от целевых атак.

Поэтому в дополнение к WAF мы используем ELK. Он помогает собирать все события в одном месте, копит статистику, визуализирует ее и позволяет нам вовремя видеть направленную атаку.

Сегодня расскажу подробнее, как мы скрестили ёлку с WAF и что из этого получилось.




История одной атаки: как все работало до перехода на ELK

В нашем облаке у заказчика развернуто приложение, которое стоит за нашим WAF. В день к сайту подключались от 10 000 до 100 000 пользователей, количество подключений доходило до 20 млн. в день. Из них 3-5 пользователей были злоумышленниками и пытались взломать сайт.

Обычный брутфорс формы с одного IP-адреса FortiWeb блокировал достаточно легко. Количество обращений к сайту в минуту было больше, чем у легитимных пользователей. Мы просто настраивали пороги активности с одного адреса и отражали атаку.

Куда сложнее бороться с медленными атаками, когда злоумышленники действуют не спеша и маскируются под обычных клиентов. Они используют много уникальных IP-адресов. Такая активность не выглядела для WAF как массовый брутфорс, отследить ее автоматически было сложнее. А еще был риск заблокировать нормальных пользователей. Мы искали другие признаки атаки и настраивали политику автоматической блокировки IP-адресов по этому признаку. Например, у многих нелегитимных сессий обнаруживались общие поля в заголовках http-запроса. Искать такие поля часто приходилось вручную в журналах событий FortiWeb.

Получалось долго и неудобно. В стандартной функциональности FortiWeb события записываются текстом в 3 разных журнала: обнаруженные атаки, информация о запросах и системные сообщения о работе WAF. За минуту могут приходить десятки, а то и сотни событий об атаках.

Не так уж и много, но приходится вручную лазить по нескольким журналам и перебирать много строк:


В журнале атак видим адреса пользователей и характер активности.

Недостаточно просто сканировать таблицу журнала. Чтобы найти самое интересное и полезное про характер атаки, нужно заглянуть внутрь конкретного события:


Выделенные поля помогают обнаружить медленную атаку. Источник: скрин с сайта Fortinet.

Ну и самая главная проблема разобраться в этом может только специалист по FortiWeb. Если в рабочее время мы еще могли отслеживать подозрительную активность в реальном времени, то расследование ночных инцидентов могло затянуться. Когда политики FortiWeb по какой-то причине не срабатывали, дежурные инженеры ночной смены не могли оценить ситуацию без доступа к WAF и будили специалиста по FortiWeb. Мы просматривали журналы за несколько часов и находили момент атаки.

С такими объемами информации сложно понять общую картину с первого взгляда и действовать на опережение. Тогда мы решили собирать данные в одном месте, чтобы анализировать все в наглядном виде, находить начало атаки, выявлять ее направление и метод блокировки.

Из чего выбирали


В первую очередь мы смотрели на уже используемые решения, чтобы не множить сущности без необходимости.

Одним из первых вариантов был Nagios, который мы используем для мониторинга инженерной инфраструктуры, сетевой инфраструктуры, оповещения о нештатных ситуациях. Безопасники тоже ее используют для оповещений дежурных при подозрительном трафике, но она не умеет собирать разрозненные журналы и поэтому отпадает.

Был вариант агрегировать все с помощью MySQL и PostgreSQL или другой реляционной базы. Но, чтобы вытаскивать данные, нужно было лепить свое приложение.

В качестве коллектора логов в нашей компании также используют FortiAnalyzer от Fortinet. Но в этом кейсе он тоже не подошел. Во-первых, он больше заточен для работы с межсетевым экраном FortiGate. Во-вторых, не хватало многих настроек, а взаимодействие с ним требовало отменного знания SQL-запросов. Ну и в-третьих, его использование увеличило бы стоимость сервиса для заказчика.

Вот так мы и пришли к опенсорсу в лице ELK.

Почему выбрали ELK


ELK это комплекс программ с открытым кодом:

  • Elasticsearch база данных временных рядов, которая как раз создавалась для работы с большими объемами текста;
  • Logstash механизм сбора данных, который может конвертировать журналы в нужный формат;
  • Kibana хороший визуализатор, а также достаточно дружелюбный интерфейс для управления Elasticsearch. Можно с его помощью построить графики, за которыми могут наблюдать дежурные инженеры по ночам.

Порог входа в ELK невысокий. Все основные возможности бесплатные. Что еще нужно для счастья.

Как собрали это все в единую систему


Сформировали индексы и оставили только нужную информацию. Мы загрузили все три журнала FortiWEB в ELK на выходе получились индексы. Это файлы со всеми собранными журналами за период, например, сутки. Если бы мы сразу их визуализировали, то увидели бы только динамику атак. За подробностями нужно проваливаться в каждую атаку и смотреть конкретные поля.



Мы поняли, что для начала нужно настроить разбор неструктурированной информации. Взяли длинные поля в виде строк, например, Message и URL, и распарсили их, чтобы получить больше информации для принятия решений.
Например, с помощью парсинга мы вынесли отдельно местоположение пользователя. Это помогло сразу выделить атаки из-за рубежа на сайты для российских пользователей. Блокируя все подключения из других стран, мы уменьшили количество атак в 2 раза и могли спокойно разбираться с атаками внутри России.

После парсинга стали искать, какую информацию хранить и визуализировать. Оставлять в журнале все было нецелесообразно: размер одного индекса был большим 7 Гб. ELK требовалось много времени для обработки файла. При этом не вся информация была полезной. Что-то дублировалось и занимало лишнее место нужно было оптимизировать.

Сначала мы просто просматривали индекс и удаляли ненужные события. Это оказалось еще неудобнее и дольше, чем работа с журналами на самом FortiWeb. Единственный плюс от ёлки на этом этапе мы смогли визуализировать большой промежуток времени на одном экране.

Мы не отчаивались, продолжали есть кактус изучать ELK и верили, что сумеем вытянуть необходимую информацию. После очистки индексов начали визуализировать что есть. Так мы пришли к большим дашбордам. Понатыкали виджетов наглядно и нарядно, настоящая ЁLKа!



Зафиксировали момент атаки. Теперь нужно было понять, как на графике выглядит начало атаки. Для ее обнаружения мы смотрели на ответы сервера пользователю (return codes). Нас интересовали ответы сервера с такими кодами (rc):

Код (rc)
Название
Описание
0
DROP
Запрос к серверу блокируется
200
Ok
Запрос успешно обработан
400
Bad Request
Ошибочный запрос
403
Forbidden
Отказ авторизации
500
Internal Server Error
Сервис недоступен


Если кто-то начинал атаковать сайт, менялось соотношение кодов:

  • Если ошибочных запросов с кодом 400 становилось больше, а нормальных запросов с кодом 200 оставалось столько же, значит кто-то пытался взломать сайт.
  • Если при этом запросы с кодом 0 тоже росли, значит политики FortiWeb тоже видели атаку и применяли к ней блокировки.
  • Если увеличивалось количество сообщений с кодом 500, значит сайт недоступен для этих IP-адресов тоже своего рода блокировка.

К третьему месяцу мы настроили дашборд для отслеживания такой активности.



Чтобы не мониторить все вручную, настроили интеграцию с Nagios, который с определенной периодичностью опрашивал ELK. Если фиксировал достижение пороговых значений по кодам, отправлял уведомление дежурным о подозрительной активности.

Совместили 4 графика в системе мониторинга. Теперь было важно увидеть на графиках момент, когда атака не блокируется и нужно вмешательство инженера. На 4 разных графиках наш глаз замыливался. Поэтому мы совместили графики и стали наблюдать за всем на одном экране.

На мониторинге мы следили, как меняются графики разных цветов. Всплеск красного цвета показывал, что атака началась, а оранжевый и синий графики демонстрировали реакцию FortiWeb:


Тут все хорошо: был всплеск красной активности, но FortiWeb справился и график атаки сошел на нет.

Также мы нарисовали для себя пример графика, который требует вмешательства:


Здесь мы видим, что FortiWeb повысил активность, но красный график атаки не снизился. Нужно поменять настройки WAF.

Расследовать ночные инциденты тоже стало проще. На графике сразу видно момент, когда пора прийти на защиту сайта.


Вот что иногда происходит ночью. Красный график началась атака. Синий активность FortiWeb. Атака заблокировалась не до конца, пришлось вмешаться.

Куда стремимся


Сейчас мы обучаем дежурных администраторов работать с ELK. Дежурные учатся оценивать ситуацию на дашборде и принимают решение: пора эскалировать на специалиста по FortiWeb, или политик на WAF хватит для автоматического отражения атаки. Так мы снижаем нагрузку на инженеров ИБ по ночам и делим роли в поддержке на уровне систем. Доступ к FortiWeb остается только у центра киберзащиты, и только они вносят изменения в настройки WAF при острой необходимости.

Также мы работаем над отчетностью для заказчиков. Планируем, что данные о динамике работы WAF будут доступны в личном кабинете клиента. ELK сделает ситуацию прозрачнее без необходимости обращаться к самому WAF.

Если заказчик захочет сам наблюдать за своей защитой в реальном времени, ELK тоже пригодится. Доступ к WAF мы отдать не можем, так как вмешательство заказчика в работу может повлиять на остальных. А вот поднять отдельный ELK и отдать поиграться можно.

Вот такие сценарии использования ёлки у нас накопились за последнее время. Делитесь своими идеями на этот счет и не забывайте правильно все настраивать, чтобы избежать утечек из баз данных.
Подробнее..

Мониторинг эксплойтов

08.04.2021 18:23:51 | Автор: admin

В прошлой статье мы собрали данные для того чтобы мониторить эксплуатацию уязвимости в Windows Server, который имеет роль контроллера домена. В этой статье мы попытаемся понять есть ли возможность мониторить остальные классы эксплойтов с помощью инструментов ELK, Zabbix или Prometheus.

Классификация эксплойтов с точки зрения мониторинга

Уязвимость - несовершенство программного обеспечения. В простом случае это причина почему приложение не может выполнить свою задачу и завершает работу аварийно.

Можно ли все уязвимости отследить? Попробуем собрать факты воедино. Факт 1 уязвимости с точки зрения их использования для достижения целей компрометации или вывода из строя информационной системы можно условно поделить на:

  1. Уязвимости приводящие к аварийному завершению работы приложения

  2. Уязвимости позволяющие запустить дополнительные команды в уязвимой системе

Стоит отметить, что для обеих групп мы можем мониторить только последствия или зависимые события.

Первая группа уязвимостей достаточно просто мониторится системами ELK, Zabbix и Prometheus. Недоступность пострадавшей системы тут же будет выведено на общий мониторинг.

Не так всё просто со вторым типом. Дело в том, что именно эти уязвимости имеют довольно большое количество вариаций. И как правило связаны с теми подсистемами, где важен порядок и скорость действий системы. В эти подсистемы очень сложно добавить механизмы снятия информации для мониторинга.

чтобы понять насколько много этих уязвимостей их список по популярности появления в программном обеспечении можно найти здесь.

Факт 2 - всего классов уязвимостей задокументировано - 699. Интересная цифра, но это тоже верхнеуровневая абстракция, на самом деле и у этих 699 уязвимостей тоже есть вариации.

Что особенного среди 699 групп уязвимостей? Особенность заключается в том, что большую часть этих уязвимостей в силу своей природы достаточно проблематично обнаружить. К примеру, группа CWE-120. Группа уязвимостей основной смысл которых заключается в том, что программист не верно рассчитал количество памяти, которое нужно было приложению для работы с определенным объектом. В результате у атакующего есть возможность влиять на заполнение памяти процесса. В ряде случаев это может привести к выполнению передаваемых злоумышленником команд.

Очень редко современные операционные системы предоставляют в своих логах события, которые позволяют обнаруживать такого рода уязвимости. Более того эти уязвимости если и есть в программном обеспечении их изучить или задетектировать можно только если перевести приложение в режим отладки, что не является для большинства приложений обычным режимом работы.

Получается память для мониторинга недоступна? На самом деле это не так. Любая система мониторинга и даже логи операционных систем могут включать информацию о том как потребляется память приложениями. Можно эти данные наблюдать в режиме реального времени с помощью различного софта типа VMMap. Софт может показывать количество страниц памяти, которое используется, освобождается, резервируется и т.д. Что же это нам дает?

Ни одна уязвимость, которая была описана выше не может быть полезной для атакующего без специального метода её использования. У всех 699 групп есть свои способы и подходы, которые позволяют уязвимость использовать для выполнения команд. Иногда это отдельные паттерны программирования, иногда это набор последовательности функций, а иногда это просто список уязвимостей, которые используются в совокупности чтобы выполнить результирующую операцию. Поэтому для детектирования факта эксплуатации нам нужно:

  • понимать как работает эксплойт, идеально если будет исходный код

  • иметь возможность расширить набор событий в системных логах ОС

Тактика обнаружения на основании списка Mitre

Уязвимости сами по себе могут быть не видны на средствах мониторинга. При использовании возможностей только операционной системы. Иногда сами вендоры, видя популярность уязвимости могут создать отдельные события, которые помогают локализовать эксплуатацию уязвимости.

В случае с уязвимостью CVE-2020-1472 Microsoft добавила специальные события, которые генерируются, если система использует уязвимые алгоритмы. Нам же придется искать закономерности самостоятельно и использовать эти закономерности для мониторинга. Возможно даже придется прибегнуть к сторонним программным продуктам.

Для примера сбора таких закономерностей будем использовать эксплойт к уязвимости CVE-2020-0796. Основная проблема уязвимости - переполнение целочисленного элемента, который используется для контроля размера выделяемой приложением памяти. В сети можно найти 2 сценария использования уязвимости:

  1. Remote Code Execution

  2. Local Priveledge Escalation

Первый весьма нестабилен, выполнения команд от него добиться достаточно сложно. Однако из-за того что уязвимость находится в ядре, падение ОС будет зарегистрировано мониторингом по-умолчанию.

Попробуем разобраться со второй. Проект на github предоставляет исходный код на языке программирования C++. Попробуем локализовать основные операциии, которые выполняет эксплойт.

Инициализация её код можно найти ниже:

Создается сокет на loopback интерфейсе и 445 порту. Далее эксплойт будет создавать в специальном формате пакет и отправит его на созданный сокет. Произойдет переполнение размера выделенной памяти в ядре ОС и код перетрёт память, которая относилась к primary токену основного процесса. С этими данными будет создан новый процесс cmd.exe. Как создается этот процесс? Эти данные можно найти в исходнике:

К сожалению, среди событий логов обнаружить ничего не удалось, так как все действия работали динамически в оперативной памяти и не задействовали стандартные механизмы получения доступа к объектам. Журнал "Security" тоже не будет содержать никакой информации. Что же делать? Есть 2 варианта:

  1. Использовать IDS для loopback интерфейса

  2. Использовать Endpoint защиту

Мы выберем 3-й путь. В ОС Windows есть возможность расширить список событий, которые будет генерировать система. Это возможно сделать с помощью инструмента SysMon. Чтобы инструмент мог генерировать нужные события, ему необходим конфиг, где и перечислены необходимые данные. Есть уже готовый. Причем если мы хотим поотслеживать события с loopback интерфейсом, нужно убрать следующие строки:

...<RuleGroup name="" groupRelation="or">        <NetworkConnect onmatch="exclude">            ...            <DestinationIp condition="is">127.0.0.1</DestinationIp> <==удалить            <DestinationIp condition="begin with">fe80:0:0:0</DestinationIp> <==удалить        </NetworkConnect>    </RuleGroup>...

Все данные по событиям выкладываются в системные журналы. Для Windows 10 можно использовать вот этот путь: Applications and Services Logs\Microsoft\Windows\Sysmon\Operational.

Для установки в систему конфига и Sysmon нужно набрать следующую команду:

sysmon.exe -accepteula -i sysmonconfig-export.xml

Запустим эксплойт и заглянем в лог Sysmon:

А вот и наш файл засветился, которым пользовались для эксплуатации. Теперь будет просто написать шаблон для этого события и мониторить их в Zabbix. Предлагаем читателю настроить самостоятельно. Это, кстати можно сделать по похожему сценарию из предыдущей статьи.

P.S. Для систем на базе Linux тоже скоро будет SysMon.

Подробнее..

Перевод ELK SIEM Open Distro Интеграция с WAZUH

24.08.2020 12:15:17 | Автор: admin

Продвигаемся дальше по нашему проекту. Мы завершили часть SIEM. Пришло время перевести наш проект из простого наблюдателя в активного ответчика. Одним из важных инструментов, которые мы использовали для этого, является Wazuh. В этой статье мы надеемся просветить вас о преимуществах, предлагаемых этим инструментом. А также расскажем как его установить и использовать.


Wazuh это механизм обнаружения, просмотра и сравнения соответствия безопасности с открытым исходным кодом.


Он был создан как форк OSSEC HIDS, позже был интегрирован с Elastic Stack и OpenSCAP, которые превратились в более комплексное решение.


Wazuh помогает вам получить более глубокую видимость безопасности в вашей инфраструктуре, отслеживая хосты на операционной системе и уровне приложений.


Оглавление всех постов.



Статья разделена на следующие разделы:


  • Установка сервера и агента Wazuh

  1. Установка Wazuh-сервера


  2. Установка Wazuh-агента


  3. Установка приложения и интеграция с kibana


  4. Настройка и подключение агентов



  • Активный ответ

1- Установка wazuh сервера и агента


Wazuh это бесплатный сервис для корпоративного использования для мониторинга безопасности с открытым исходным кодом, предназначенный для обнаружения угроз, мониторинга целостности, реагирования на инциденты и соблюдения нормативных требований. Вот некоторые определения, которые вам нужно знать.


Сервер Wazuh: запускает менеджер Wazuh, API и Filebeat. Он собирает и анализирует данные от развернутых агентов.


Агент Wazuh: запускается на отслеживаемом хосте, собирает системный журнал и данные конфигурации и обнаруживает вторжения и аномалии. Он общается с сервером Wazuh, на который пересылает собранные данные для дальнейшего анализа.


1.1- Введение в архитектуру сервера Wazuh:


Архитектура Wazuh основана на агентах, работающих на контролируемых хостах, которые пересылают данные журнала на центральный сервер. Кроме того, поддерживаются безагентные устройства (такие как межсетевые экраны, коммутаторы, маршрутизаторы, точки доступа и т. Д.), Которые могут активно отправлять данные журнала через системный журнал и / или периодически проверять изменения своей конфигурации для последующей пересылки данных на центральный сервер. Центральный сервер декодирует и анализирует входящую информацию и передает результаты в кластер Elasticsearch для индексации и хранения.


Мы будем использовать архитектуру с одним хостом (Single-host architecture (HIDS)), которая имеет следующие характеристики:



Подробнее о другой архитектуре. Посетите официальный сайт:


https://documentation.wazuh.com/3.8/getting-started/architecture.html


1.2- Установка Wazuh manager, API и Filebeat


Здесь мы предоставим вам официальный сайт wazuh для установки


https://documentation.wazuh.com/3.12/installation-guide/installing-wazuh-manager/linux/ubuntu/wazuh_server_packages_ubuntu.html#wazuh-server-packages-ubuntu


После установки нам нужно настроить файл конфигурации filebeat: вы можете подключить filebeat к выходу elasticsearch или выходу logstash. В нашем случае мы настроим вывод elasticsearch без проверки ssl (здесь мы видим, что включен только модуль предупреждений)


cd /etc/filebeatnano filebeat.yml


Теперь настроим шаблон индекса и запустим 3 службы:


filebeat setup  index-managementservice filebeat startservice wazuh-manager startservice wazuh-api start



1.3- Установка wazuh-агента


Используйте эту ссылку для установки


https://documentation.wazuh.com/3.12/installation-guide/installing-wazuh-agent/linux/ubuntu12.04-or-greater/wazuh_agent_package_ubuntu12.04_or_greater.html#wazuh-agent-package-ubuntu12-04-or большая


Проверьте, правильно ли работает wazuh-agent:



1.4- Установка приложения wazuh и интеграция с Kibana:


Это приложение станет мостом между сервером Wazuh и Kibana в ELK, который мы ранее установили.


Это приложение предоставляется только в репозитории Git Hub, а не на официальном веб-сайте.


Мы будем устанавливать приложение wazuh, совместимое с ELK Stack 7.6.1. Для этого.


cd /usr/share/kibanasudo -u kibana bin/kibana-plugin install https://packages.wazuh.com/wazuhapp/wazuhapp-3.12.2_7.6.1.zip

Рекомендуется увеличить размер Kibana, чтобы обеспечить установку плагина:


cat >> /etc/default/kibana << EOFNODE_OPTIONS=" max_old_space_size=2048"EOF

Перезапустите Кибану:


systemctl restart kibana

Вы можете проверить все доступные версии на этом сайте:


https://github.com/wazuh/wazuh-kibana-app


Теперь в вашей кибане вы должны увидеть, что символ wazuh появился на левой вкладке вашей кибаны. Нажмите на него. Откроется wazuh api. Изучите его. У вас должно получиться нечто подобное. На данный момент у вас не будет никаких агентов, связанных с ним. В следующей части мы обсудим, как подключить ваших агентов.



15 Подключение и настройка агентов


Есть много способов зарегистрировать агента. В этой статье мы воспользуемся ручным способом.


В интерфейсе командной строки хоста Wazuh manager мы запустим manage_agents, чтобы добавить агента. В этом примере мы собираемся добавить нового агента. Для этого типа запустите следующую команду:


/var/ossec/bin/manage_agents


Выберите агента добавления, набрав A и нажав клавишу ВВОД. Затем мы вводим имя, которое хотим дать нашей машине, в данном случае user1. Мы вводим IP-адрес конечного устройства. Обратите внимание: если у вас нет статического IP-адреса для конечного устройства, вы можете использовать ключевое слово (любой) вместо IP-адреса. После этого нажмите Enter


Теперь мы собираемся извлечь секретный ключ, который позволит нашему агенту подключиться к серверу wazuh.


Для этого на этот раз мы выберем опцию E извлекать ключ для агента. Затем мы вводим идентификатор нашего агента, и в этом случае я выбрал агента с идентификатором 001.



После того, как вы добавили агент на хост менеджера Wazuh, откройте сеанс на хосте агента Linux как пользователь root. После этого импортируем ключ и подключим агента к менеджеру.


Введите следующее


/var/ossec/bin/manage_agents -i "Ваш_секретный_ключ"

Вы должны получить такой результат, наберите "y" и нажмите Enter.



Измените конфигурацию агента Wazuh в /var/ossec/etc/ossec.conf, чтобы добавить IP-адрес сервера Wazuh. В разделе <клиент> <сервер> измените значение MANAGER_IP на адрес сервера Wazuh. Адрес Wazuh-сервера может быть IP-адресом или DNS-именем:



1.6- Проверка полученных данных:


Чтобы проверить, получает ли ELK данные от сервера wazuh. Перейдите в Управление индексами. Вы должны получить что-то похожее на это (wazuh-alert и wazuh-monitoring)



  1. Активный ответ Wazuh:



Wazuh предоставляет модуль активного ответа для обработки автоматического ответа на определенные предупреждения, которые вы настраиваете в Wazuh-manager.


Активный ответ это сценарий, который настроен на выполнение при срабатывании определенного предупреждения, уровня предупреждения или группы правил. Активные ответы это ответы с сохранением состояния или без состояния. Ответы с сохранением состояния настроены для отмены действия по истечении заданного периода времени, в то время как ответы без состояния настроены как одноразовые действия.


Например, если мы хотим автоматически блокировать определенные IP-адреса на основе определенных журналов, поступающих с вашего конечного устройства, показывая, что они выполняют атаку Bruteforce, независимо от того, является ли это RDP или SSH, в зависимости от ОС хоста.


Мы можем создать активный ответ, который будет блокировать IP-адрес злоумышленника, если он соответствует поведению с набором правил, хранящимся в Wazuh. Мы возьмем пример SSH-Bruteforce. Мы будем рассматривать 8 неудачных попыток входа в систему как попытку атаки. Когда это событие происходит, действует правило "5712 SSHD брутфорс пытается получить доступ к системе". Будет запущено. Таким образом, команда блокировки IP выполняется.


Во-первых, нам нужно определить команду, которую мы будем использовать для ответа.


OSSEC поставляется с набором общих скриптов, используемых в активном ответе. Эти сценарии находятся в / var / ossec / active-response / bin / на сервере. Мы собираемся использовать сценарий firewall-drop.sh, который должен работать с распространенными операционными системами Linux / Unix и позволяет блокировать вредоносный IP-адрес с помощью локального брандмауэра.


Определите команду в ossec.conf вашего OSSEC Manager:


nano /var/ossec/etc/ossec.conf

Мы собираемся использовать скрипт firewall-drop.sh, который должен работать с распространенными операционными системами Linux / Unix и позволяет блокировать вредоносный IP-адрес с помощью локального брандмауэра.



Затем в том же файле мы настраиваем OSSEC для запуска активного ответа. Основные поля:


-command: ранее определенная команда (firewall-drop).


-location: Где команда должна быть выполнена. Мы хотим выполнить его для агента, сообщившего о событии. Итак, мы используем local.


-rules_id: команда выполняется, если срабатывает правило 5712.


-timeout: заблокировать IP на 60 секунд на брандмауэре (iptables, ipfilter и т. д.)



Затем сохраните модификацию и закройте файл. Перезапустите wazuh-manager командой:


service wazuh-manager restart

Теперь на ваших хостах wazuh-agent не забудьте изменить файл ossec.conf и добавить:


<active-response><disabled>no</disabled></active-response>

Теперь вы можете попытаться подобрать SSH на вашем хост-компьютере, на котором установлен агент Wazuh, и вы будете заблокированы на 60 секунд после 8 неудачных попыток входа.


Чтобы узнать больше об активном респоне Wazuh, вы можете проверить:


https://documentation.wazuh.com/3.7/user-manual/capabilities/active-response/how-it-works.html


Телеграм чат по Elasticsearch: https://t.me/elasticsearch_ru

Подробнее..

Изучаем ELK. Часть I Установка Elasticsearch

23.01.2021 14:23:52 | Автор: admin

Вступительное слово

Эта статья является первой в серии статей по стеку Elasticsearch, Logstash, Kibana (ELK). Цикл статей ориентирован на тех, кто только начинает знакомится со стеком ELK, и содержит минимально необходимый набор знаний, чтобы успешно запустить свой первый кластер ELK.

В рамках данного цикла будут рассмотрены такие темы, как:

  • установка и настройка компонентов ELK,

  • безопасность кластера, репликация данных и шардирование,

  • конфигурирование Logstash и Beat для сборки и отправки данных в Elasticsearch,

  • визуализация в Kibana

  • запуск стека в Docker.

В данной статье будет рассмотрена процедура установки Elasticsearch и конфигурирование кластера.

План действий:

  1. Скачиваем и устанавливаем Elasticsearch.

  2. Настраиваем кластер.

  3. Запускаем и проверяем работоспособность кластера.

  4. Делаем важные настройки.

Скачиваем и устанавливаем Elasticsearch

Существует множество вариантов установки Elasticsearch, под любые нужды и желания. С перечнем можно ознакомится на официальном сайте. Не смотря на то, что на своем стенде я использовал установку из Deb пакетов, считаю правильным так же описать установку из RPM пакетов и архива tar.gz для Linux системы.

Установка из Deb пакетов

  • Импортируем Elasticsearch PGP ключ:

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
  • Устанавливаем apt-transport-https пакет:

sudo apt-get install apt-transport-https
  • Перед установкой пакета необходимо добавить репозиторий Elastic:

echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-7.x.list
  • Устанавливаем Elasticsearch из пакета:

sudo apt-get update && sudo apt-get install elasticsearch
  • Настраиваем Elasticsearch для автоматического запуска при запуске системы:

sudo /bin/systemctl daemon-reload && sudo /bin/systemctl enable elasticsearch.service

Установка из RPM пакетов

  • Импортируем Elasticsearch PGP ключ:

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
  • В директории /etc/yum.repos.d/ создаем файл репозитория Elasticsearch elasticsearch.repo:

[elasticsearch]name=Elasticsearch repository for 7.x packagesbaseurl=https://artifacts.elastic.co/packages/7.x/yumgpgcheck=1gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearchenabled=0autorefresh=1type=rpm-md
  • Устанавливаем Elasticsearch c помощью пакетного менеджера в зависимости от операционной системы, yum или dnf для CentOS, Red Hat, Fedora или zypper для OpenSUSE:

# Yumsudo yum install --enablerepo=elasticsearch elasticsearch # Dnfsudo dnf install --enablerepo=elasticsearch elasticsearch # Zyppersudo zypper modifyrepo --enable elasticsearch && \  sudo zypper install elasticsearch; \  sudo zypper modifyrepo --disable elasticsearch

Установка из архива tar.gz

  • Скачиваем архив с Elasticsearch

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.1-linux-x86_64.tar.gz
  • Извлекаем данные из архива и переходим в каталог с Elasticsearch:

tar -xzf elasticsearch-7.10.1-linux-x86_64.tar.gzcd elasticsearch-7.10.1/

Текущий каталог считается, как $ES_HOME.

Конфигурационные файлы лежать в каталоге $ES_HOME/config/.

На этом шаге установка из архива считается завершенной.

Для запуска Elasticsearch можно создать отдельного пользователя, предоставив все необходимые права к каталогу с Elasticsearch и указывая данного пользователя при настройке файловых дескрипторов.

Настраиваем кластер

На текущем этапе у нас есть три хоста с установленным Elasticsearch. Следующим и самым важным этапом является настройка каждого узла для объединения их в единый кластер.

Для настройки Elasticsearch используется YAML файл, который лежит по следующему пути /etc/elasticsearch/elasticsearch.yml при установке из Deb или RPM пакетов или $ES_HOME/config/elasticsearch.yml - при установке из архива.

Ниже будут приведены пример конфигурации для узла es-node01. Аналогичные настройки необходимо сделать на каждом узле, указав соответствующие имя и адрес.

  • Указываем имя узла и определяем роли. Укажем для узла роли master и data:

# ------------------------------------ Node ------------------------------------node.name: es-node01        # Имя нодыnode.roles: [ master, data ]  # Роли узла

master Данная роль дает возможность узлу быть избранным, как управляющий узел кластера

data узел с данной ролью содержит данные и выполняет операции с этими данными

Со списком всех ролей узла можно ознакомится тут.

  • Настраиваем адрес и порт, на которых узел будет принимать запросы:

# ---------------------------------- Network -----------------------------------network.host: 10.0.3.11# Адрес узлаhttp.port: 9200# Порт

Если указать 0.0.0.0 или просто 0, то Elasticsearch будет принимать запросы на всех интерфейсах.

  • Определяем имя кластера и начальный список узлов в голосовании по выбору master узла:

# ---------------------------------- Cluster -----------------------------------cluster.name: es_cluster                                             # Имя кластераcluster.initial_master_nodes: ["es-node01","es-node02","es-node03"]  # Начальный набор мастер узлов

cluster.initial_master_nodes

При начальной загрузке кластера определяются узлы, участвующие в голосовании по выбору мастера, поэтому в новом кластере мы должны указать перечень этих узлов. Если данный параметр оставить пустым (по умолчанию), то узел будет ожидать подключения к уже созданному кластеру.

Данный параметр следует удалить после создания кластера и не использовать при добавлении новых узлов в существующий кластер.

  • Указываем список master узлов кластера:

# --------------------------------- Discovery ----------------------------------discovery.seed_hosts: ["10.0.3.11", "10.0.3.12", "10.0.3.13"] # Узлы кластера

До версии 7.0 в конфигурации Elasticsearch также использовался параметр discovery.zen.minimum_master_nodes, который можно увидеть и сейчас в некоторых конфигурациях. Этот параметр был необходим, чтобы защитится от Split Brain, и определял минимальное количество master узлов для голосования. Начиная с версии 7.0 данный параметр игнорируется, так как кластер автоматически защищает себя. Более подробно о том, как это работает, можно прочитать в данной статье.

  • Определяем, где будем хранить данные и логи

# ----------------------------------- Paths ------------------------------------path.data: /var/lib/elasticsearch # Директория с даннымиpath.logs: /var/log/elasticsearch # Директория с логами

На выходе мы должны получить следующий конфигурационный файл:

# ------------------------------------ Node ------------------------------------node.name: es-node01        # Имя нодыnode.roles: [ master, data ]  # Роли узла## ---------------------------------- Network -----------------------------------network.host: 10.0.3.11# Адрес узлаhttp.port: 9200# Порт## ---------------------------------- Cluster -----------------------------------cluster.name: es_cluster                                             # Имя кластераcluster.initial_master_nodes: ["es-node01","es-node02","es-node03"]  # Начальный набор мастер узлов## --------------------------------- Discovery ----------------------------------discovery.seed_hosts: ["10.0.3.11", "10.0.3.12", "10.0.3.13"] # Узлы кластера## ----------------------------------- Paths ------------------------------------path.data: /var/lib/elasticsearch # Директория с даннымиpath.logs: /var/log/elasticsearch # Директория с логами
  • При необходимости настраиваем межсетевой экран на хосте:

    • 9200 - прием HTTP запросов (согласно конфигурации выше http.port). По умолчанию Elasticsearch использует диапазон 9200-9300 и выбирает первый свободный.

    • 9300-9400 - порты (по умолчанию) для коммуникации между узлами кластера. Elasticsearch будет использовать первый свободный из данного диапазона (для настройки данного порта в Elasticsearch можно использовать параметр transport.port).

Запускаем и проверяем

Запускаем на каждом узле службу elasticsearch:

sudo systemctl start elasticsearch.service

Для установки из архива используем:

$ES_HOME/bin/elasticsearch

или если мы хотим запустить Elasticsearch как демон, то:

$ES_HOME/bin/elasticsearch -d -p pid

Для выключения службы используйте Ctrl-C для первого варианта запуска (из архива) или pkill -F pid для второго варианта.

После запуска первого узла в логах можно увидеть, что узел ожидает подключение других узлов, чтобы определить, кто возьмет роль master:

[es-node01] master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and this node must discover master-eligible nodes [es-node01, es-node02, es-node03] to bootstrap a cluster: have discovered [{es-node01}{olhmN6eCSuGxF4yH0Q-cgA}{CHniuFCYS-u67R5mfysg8w}{10.0.3.11}{10.0.3.11:9300}{dm}{xpack.installed=true, transform.node=false}]; discovery will continue using [10.0.3.12:9300, 10.0.3.13:9300] from hosts providers and [{es-node01}{olhmN6eCSuGxF4yH0Q-cgA}{CHniuFCYS-u67R5mfysg8w}{10.0.3.11}{10.0.3.11:9300}{dm}{xpack.installed=true, transform.node=false}] from last-known cluster state; node term 0, last-accepted version 0 in term 0

После включения остальных узлов в логах появится запись о том, что кластер собрался:

[es-node01] master node changed {previous [], current [{es-node02}{VIGgr6_aS-C39yrnmoZQKw}{pye7sBQUTz6EFh7Pqn7CJA}{10.0.3.12}{10.0.3.12:9300}{dm}{xpack.installed=true, transform.node=false}]}, added {{es-node02}{VIGgr6_aS-C39yrnmoZQKw}{pye7sBQUTz6EFh7Pqn7CJA}{10.0.3.12}{10.0.3.12:9300}{dm}{xpack.installed=true, transform.node=false}}, term: 1, version: 1, reason: ApplyCommitRequest{term=1, version=1, sourceNode={es-node02}{VIGgr6_aS-C39yrnmoZQKw}{pye7sBQUTz6EFh7Pqn7CJA}{10.0.3.12}{10.0.3.12:9300}{dm}{xpack.installed=true, transform.node=false}}

Проверяем состояние кластера, обратившись к любому его узлу:

curl -X GET "http://10.0.3.11:9200/_cluster/health?pretty"{  "cluster_name" : "es_cluster",  "status" : "green",  "timed_out" : false,  "number_of_nodes" : 3,  "number_of_data_nodes" : 3,  "active_primary_shards" : 0,  "active_shards" : 0,  "relocating_shards" : 0,  "initializing_shards" : 0,  "unassigned_shards" : 0,  "delayed_unassigned_shards" : 0,  "number_of_pending_tasks" : 0,  "number_of_in_flight_fetch" : 0,  "task_max_waiting_in_queue_millis" : 0,  "active_shards_percent_as_number" : 100.0}

Узнаем, какой узел взял на себя роль master. В нашем примере это узел es-node02:

curl -X GET "http://10.0.3.11:9200/_cat/master?pretty"VIGgr6_aS-C39yrnmoZQKw 10.0.3.12 10.0.3.12 es-node02

Делаем важные настройки

Для нормальной работы кластера необходимо произвести еще некоторые настройки.

Heap size

Так как Elasticsearch написан на Java, то для работы необходимо настроить размер кучи (heap size). В каталоге установки Elasticsearch имеется файл jvm.options, в котором уже указаны размеры, по умолчанию - это 1 Гб. Однако, для настройки рекомендуется указать новые параметры в файле каталога jvm.options.d, который находится там же.

-Xms16g-Xmx16g

Пример выше использует минимальный Xms и максимальный Xmx размер heap size, равный 16 Гб. Для настройки данных параметров следует использовать следующие рекомендации:

  • установите значения Xmx и Xms не более 50% от имеющейся физической памяти узла. Оставшуюся память Elasticsearch будет использовать для других целей. Чем больше heap size, тем меньше памяти используется под системные кеш;

  • устанавите значение не более того значения, которое использует JVM для сжатия указателей, он же compressed object pointers. Данное значение составляет около 32 Гб. Стоит также отметить, что рекомендуется ограничивать heap size еще одним параметром JVM, а именно zero-based compressed oops (обычно размер около 26 Гб). Узнать подробнее об этих параметрах можно тут.

Отключаем подкачку

Подкачка негативно сказывается на производительности и стабильности работы Elasticsearch, ведь система может выгрузить страницы JVM на диск. Есть несколько вариантов работы с подкачкой:

  1. Полное отключение подкачки. Перезапуск Elasticseach при этом не требуется.

    sudo swapoff -a
    
  2. Ограничение подкачки через значение vm.swappiness=1 для sysctl.

  3. Использование mlockall для блокировки адресного пространства в оперативной памяти.

Чтобы включить mlockall в конфигурации Elasticseach elasticsearch.yml указываем для параметра bootstrap.memory_lock значение true.

bootstrap.memory_lock: true

Перезапускаем Elasticsearch и проверяем настройки через запрос к любому узлу:

curl -X GET "http://10.0.3.12:9200/_nodes?filter_path=**.mlockall&pretty"{  "nodes" : {    "olhmN6eCSuGxF4yH0Q-cgA" : {      "process" : {        "mlockall" : true      }    },    "VIGgr6_aS-C39yrnmoZQKw" : {      "process" : {        "mlockall" : true      }    },    "hyfhcEtyQMK3kKmvYQdtZg" : {      "process" : {        "mlockall" : true      }    }  }}

Если перезапуск Elasticsearch завершился ошибкой вида:

[1] bootstrap checks failed[1]: memory locking requested for elasticsearch process but memory is not locked

или проверка показывает, что данная настройка не применилась, необходимо сделать следующее в зависимости от способа установки:

  • Установка из архива

Установитеulimit -l unlimitedперед запуском Elasticsearch или значениюmemlockустанвоитеunlimitedв файле/etc/security/limits.conf.

  • Установка из пакета RPM или Deb

Установите значение параметраMAX_LOCKED_MEMORYкакunlimitedв /etc/sysconfig/elasticsearch для rpm или /etc/default/elasticsearch для dep.

Если вы используете systemd для запуска Elasticsearch, то лимиты должны быть указаны через настройку параметра LimitMEMLOCK. Для этого выполните команду:

sudo systemctl edit elasticsearch

и укажите следующее значение:

[Service]LimitMEMLOCK=infinity

Настройка файловых дескрипторов

Elasticsearch работает с большим количеством файловых дескрипторов, и их нехватка может катастрофически сказаться на его работе. Согласно официальной документации необходимо указать значение файловых дескрипторов не ниже 65 536.

  • Если Elasticsearch установлен из RPM или Deb пакетов, то настройка не требуется.

  • Для установки из архива необходимо в файле /etc/security/limits.conf установить параметр nofile для пользователя, который осуществляет запуск Elasticsearch. В примере ниже таким пользователем является elasticsearch:

elasticsearch - nofile 65536

Проверить установленные значения можно следующим образом:

curl -X GET "http://10.0.3.11:9200/_nodes/stats/process?filter_path=**.max_file_descriptors&pretty"{  "nodes" : {    "olhmN6eCSuGxF4yH0Q-cgA" : {      "process" : {        "max_file_descriptors" : 65535      }    },    "VIGgr6_aS-C39yrnmoZQKw" : {      "process" : {        "max_file_descriptors" : 65535      }    },    "hyfhcEtyQMK3kKmvYQdtZg" : {      "process" : {        "max_file_descriptors" : 65535      }    }  }}

Настройка виртуальной памяти

Elasticsearch по умолчанию использует каталог mmapfs для хранения индексов, и ограничение операционной системы по умолчанию для счетчиков mmap может привести к нехватки памяти. Для настройки запустите из-под root следующую команду:

sysctl -w vm.max_map_count=262144

Чтобы настройка сохранилась после перезапуска системы, необходимо указать параметр vm.max_map_count в файле /etc/sysctl.conf.

Если Elasticsearch установлен из RPM или Deb пакетов, то настройка не требуется.

Настройка потоков

Необходимо убедиться, что количество потоков, которые Elasticsearch может создавать, было не менее 4096.

Это можно сделать, установив ulimit -u 4096 или установив значение nproc равным 4096 в файле /etc/security/limits.conf.

Если Elasticsearch работает под управлением systemd, то настройка не требуется.

DNS кеширование

По умолчанию Elasticsearch кеширует результат DNS на 60 и 10 секунд для положительных и отрицательных запросов. В случае необходимости можно настроить эти параметры, а именно es.networkaddress.cache.ttl и es.networkaddress.cache.negative.ttl, как JVM опции в каталоге /etc/elasticsearch/jvm.options.d/ для RPM и Deb или в $ES_HOME/config/jvm.options.d/ для установки из архива.

Временный каталог для JNA

Elasticsearch использует Java Native Access (JNA) для запуска кода, необходимого в его работе, и извлекает этот код в свой временный каталог директории /tmp. Следовательно, если каталог смонтирован с опцией noexec, то данный код невозможно будет выполнить.

В данном случае необходимо или перемонтировать каталог /tmp без опции noexec, или изменить настройку JVM, указав другой путь через опцию -Djna.tmpdir=<new_path>.

На этом шаге дополнительные настройки Elasticsearch окончены.

Заключение

Дойдя до этого места, вы должны иметь работоспособный кластер Elasticsearch.

В следующей статье будут описаны процедуры установки и настройки Kibana и Logstash. А в качестве проверки работоспособности кластера соберём данные из файла и посмотрим на них с помощью Kibana.

Полезные ссылки:

Подробнее..

Изучаем ELK. Часть III Безопасность

03.03.2021 20:06:54 | Автор: admin

Вступительное слово

В первой и второй частях данной серии статей была описана процедура установки и настройки кластера Elasticsearch, Kibana и Logstash, но не был освящен вопрос безопасности.

В этой статье я расскажу, как настроить шифрование трафика между узлами кластера Elasticsearch и его клиентам, шифрование трафика между Kibana и клиентами, покажу, как создавать роли и пользователей, а так же выдавать API ключи для доступа к кластеру Elasticsearch.

План действий

  1. "Включаем" безопасность.

  2. Настраиваем шифрование между узлами Elasticsearch.

  3. Настраиваем аутентификацию.

  4. Настраиваем шифрование клиентского трафика Elasticsearch.

  5. Подключаем Kibana к Elasticsearch.

  6. Настраиваем шифрование трафика между Kibana и клиентами.

  7. Создаем пользователей и роли.

  8. Настраиваем пользовательские сессии в Kibana.

  9. Настраиваем Logstash.

  10. Создаем API ключи.

"Включаем" безопасность

Чтобы использовать функции безопасности в Elasticsearch, их необходимо активировать. Для этого на каждом узле в файле конфигурации указывается следующая настройка:

xpack.security.enabled: true

Настраиваем шифрование между узлами Elasticsearch

Следующим шагом необходимо настроить шифрование трафика между узлами Elasticsearch. Для этого выполняем несколько шагов:

  • Создаем CA (Certificate Authority) для кластера Elasticsearch:

./bin/elasticsearch-certutil ca

Для установки из пакетов Deb и RPM исполняемые файлы Elasticsearch лежат в каталоге /usr/share/elasticsearch/bin/. Для установки из архива - в каталоге $ES_HOME/bin/.

В примерах все команды выполнены из каталога /usr/share/elasticsearch/.

Во время генерации корневого сертификата можно задать имя PKCS#12 файла, по умолчанию это elastic-stack-ca.p12 и пароль к нему.

Для получения сертификата и ключа в PEM формате укажите ключ --pem. На выходе будет ZIP архив с .crt и .key файлами.

С помощью ключа --out можно указать каталог для создаваемого файла.

Полученный корневой сертификат (для PEM формата еще и ключ) надо переместить на все узлы кластера для генерации сертификата узла.

  • Генерируем на каждом узле кластера сертификат и ключ:

./bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12 --ip 10.0.3.11 --dns es-node01

Ключ --ca указывает путь к корневому сертификату CA в формате PKCS#12. Если сертификат и ключ были получены в PEM формате, то необходимо использовать ключи --ca-cert и --ca-key соответственно.

Ключи --dns и --ip добавляют проверку по имени узла и IP адресу и являются опциональными. Если вы указываете их, то укажите эти параметры для каждого узла.

Ключ --pass позволяет установить passphrase для ключа.

  • Включаем TLS в файле конфигурации Elasticsearch:

xpack.security.transport.ssl.enabled: truexpack.security.transport.ssl.verification_mode: fullxpack.security.transport.ssl.keystore.path: es-node01-cert.p12xpack.security.transport.ssl.truststore.path: elastic-stack-ca.p12

Где:

xpack.security.transport.ssl.enabled - включаем TLS/SSL

xpack.security.transport.ssl.verification_mode - режим проверки сертификатов. none - проверка не выполняется, certificate - выполняется проверка сертификата без проверки имени узла и IP адреса, full - проверка сертификата, а также имени узла и адреса указанных в сертификате.

xpack.security.transport.ssl.keystore.path - путь к файлу с сертификатом и ключем узла.

xpack.security.transport.ssl.truststore.path - путь к доверенному сертификату (CA).

Если сертификаты CA и/или узла в формате PEM, то необходимо изменить последние два параметра на следующие:

xpack.security.transport.ssl.key - путь к закрытому ключу узла

xpack.security.transport.ssl.certificate - путь к сертификату узла

xpack.security.transport.ssl.certificate_authorities - список путей к сертифкатам CA.

  • Cоздаем keystore и добавляем пароли от сертификатов(если они были заданы):

./bin/elasticsearch-keystore create -p

Ключ -p требуется для установки пароля keystone во время создания

Для PKCS#12 формата:

./bin/elasticsearch-keystore add xpack.security.transport.ssl.keystore.secure_password./bin/elasticsearch-keystore add xpack.security.transport.ssl.truststore.secure_password

Для PEM сертификата:

./bin/elasticsearch-keystore add xpack.security.transport.ssl.securekeypassphrase

Чтобы запустить Elasticsearch с keystore, на который установлен пароль, необходимо передать этот пароль Elasticsearch. Это делается с помощью файла на который будет ссылаться переменная ES_KEYSTORE_PASSPHRASE_FILE. После запуска файл можно удалить, но при каждом последующим запуске файл необходимо создавать.

echo "password" > /etc/elasticsearch/ks_secret.tmpchmod 600 /etc/elasticsearch/ks_secret.tmpsudo systemctl set-environment ES_KEYSTORE_PASSPHRASE_FILE=/etc/elasticsearch/ks_secret.tmp
  • Перезапускаем Elasticsearch

В логах Elasticsearch должны появится записи о создании кластера:

[INFO ][o.e.c.s.ClusterApplierService] [es-node01] master node changed {previous [], current [{es-node02}{L1KdSSwCT9uBFhq0QlxpGw}{ujCcXRmOSn-EbqioSeDNXA}{10.0.3.12}{10.0.3.12:9300}...

Если обратиться к API, то будет ошибка "missing authentication credentials for REST request". С момента включения функций безопасности для обращения к кластер необходимо пройти аутентификацию.

Настраиваем аутентификацию

Elasticsearch имеет несколько встроенных пользователей:

Пользователь

Описание

elastic

Суперпользователь

kibana_system

Используется для коммуникации между Kibana и Elasticsearch

logstash_system

Пользователь, которого использует Logstash сервер, когда сохраняет информацию в Elasticsearch

beats_system

Пользователь, которого использует агент Beats, когда сохраняет информацию в Elasticsearch

apm_system

Пользователь, которого использует APM сервер, когда сохраняет информацию в Elasticsearch

remote_monitoring_user

Пользователь Metricbeat, который используется при сборе и хранении информации мониторинга в Elasticsearch

Прежде чем воспользоваться перечисленными пользователями, необходимо установить для них пароль. Для этого используется утилита elasticsearch-setup-passwords. В режиме interactive для каждого пользователя необходимо ввести пароли самостоятельно, в режиме auto Elasticsearch создаст пароли автоматически:

./bin/elasticsearch-setup-passwords autoInitiating the setup of passwords for reserved users elastic,apm_system,kibana,kibana_system,logstash_system,beats_system,remote_monitoring_user.The passwords will be randomly generated and printed to the console.Please confirm that you would like to continue [y/N]yChanged password for user apm_systemPASSWORD apm_system = NtvuRYhwbKpIEVUmHsZBChanged password for user kibana_systemPASSWORD kibana_system = ycXvzXglaLnrFMdAFsvyChanged password for user kibanaPASSWORD kibana = ycXvzXglaLnrFMdAFsvyChanged password for user logstash_systemPASSWORD logstash_system = vU3CuRbjBpax1RrsCCLFChanged password for user beats_systemPASSWORD beats_system = c9GQ85qhNL59H2AXUvcAChanged password for user remote_monitoring_userPASSWORD remote_monitoring_user = wB320seihljmGsjc29W5Changed password for user elasticPASSWORD elastic = iOrMTBbfHOAkm5CPeOj7

Второй раз elasticsearch-setup-passwords запустить не получится, так как bootstrap password изменился. Чтобы изменить пароль пользователям можно воспользоваться API.

Попробуем сделатьAPI запрос к любому узлу Elasticsearch с использованием учетной записи elastic и пароля к от неё:

curl -u 'elastic' -X GET "http://10.0.3.1:9200/_cluster/health?pretty"Enter host password for user 'elastic':{  "cluster_name" : "es_cluster",  "status" : "green",  "timed_out" : false,  "number_of_nodes" : 4,  "number_of_data_nodes" : 3,  "active_primary_shards" : 9,  "active_shards" : 18,  "relocating_shards" : 0,  "initializing_shards" : 0,  "unassigned_shards" : 0,  "delayed_unassigned_shards" : 0,  "number_of_pending_tasks" : 0,  "number_of_in_flight_fetch" : 0,  "task_max_waiting_in_queue_millis" : 0,  "active_shards_percent_as_number" : 100.0}

Как видно из примера все работает, но мы обращались через http, значит трафик между клиентом и кластером Elasticsearch не шифрованный.

Настраиваем шифрование клиентского трафика Elasticsearch

Для шифрования клиентского трафика необходимо выпустить сертификат, который будет использоваться для шифрования трафика между узлом Elasticsearch и клиентом. При этом для Kibana будет сформирован отдельный сертификат, чтобы настроить соединение с Elasticsearch. Для этого делаем следующее:

  • Генерируем сертификат:

./bin/elasticsearch-certutil http

В процессе генерации необходимо определить:

1) Необходимо ли сгенерировать Certificate Signing Request (CSR). Потребуется, если сертификат будет выпускаться сторонним CA (Certificate Authority).

2) Использовать ли собственный CA (Certificate Authority). Если да, то указываем путь до ключа, которым будет подписаны будущие сертификаты.

3) Срок действия сертификата. По умолчанию 5 лет. Можно определить дни(D), месяцы (M), года (Y).

4) Как выпустить сертификат, на каждый узел или общий. Если все узлы имеют общий домен, то можно выпустить wildcard сертификат (например *.domain.local). Если общего домена нет и в будущем возможно добавление узлов, то необходимо генерировать на каждый узел, указав имя узла и его адрес. В будущем можно выпустить отдельный сертификат для нового узла.

На выходе получаем архив сертификатами, в моём случае со всеми сертификатам для каждого узла Elasticsearch, а так же сертификат для подключения Kibana к Elasticsearch:

. elasticsearch    es-nlb01       README.txt       http.p12       sample-elasticsearch.yml    es-node01       README.txt       http.p12       sample-elasticsearch.yml    es-node02       README.txt       http.p12       sample-elasticsearch.yml    es-node03        README.txt        http.p12        sample-elasticsearch.yml kibana     README.txt     elasticsearch-ca.pem     sample-kibana.yml

В архиве также лежит инструкция по дальнейшим действиям с сертификатом (README.txt) и пример конфигурационного файла (sample-...yml).

Сертификат elasticsearch-ca.pem из директории kibana потребуется в следующем шаге.

  • Размещаем сертификаты на узлах и добавляем необходимые настройки в файле конфигурации:

    xpack.security.http.ssl.enabled: truexpack.security.http.ssl.keystore.path: "http.p12"
    
  • Добавляем пароль от сгенерированного сертификата в keystore:

./bin/elasticsearch-keystore add xpack.security.http.ssl.keystore.secure_password
  • Проверяем, что все работает по https:

curl -u 'elastic' -k -X GET "https://10.0.3.1:9200/_cluster/health?pretty"Enter host password for user 'elastic':{  "cluster_name" : "es_cluster",  "status" : "green",  "timed_out" : false,  "number_of_nodes" : 4,  "number_of_data_nodes" : 3,  "active_primary_shards" : 9,  "active_shards" : 18,  "relocating_shards" : 0,  "initializing_shards" : 0,  "unassigned_shards" : 0,  "delayed_unassigned_shards" : 0,  "number_of_pending_tasks" : 0,  "number_of_in_flight_fetch" : 0,  "task_max_waiting_in_queue_millis" : 0,  "active_shards_percent_as_number" : 100.0}

Подключаем Kibana к Elasticsearch

Если сейчас обратиться к Kibana, то ответом будет "Kibana server is not ready yet".

Это связанно с тем, что сейчас Kibana не может получить доступ к Elasticsearch по http, для этого необходим сертификат Kibana, который был получен в предыдущем шаге, а также требуется пройти аутентификацию. Чтобы настроить работу Kibana:

  • В конфигурационном файле Kibana указываем ключ к сертификату elasticsearch-ca.pem(из предыдущего шага) и меняем протокол http на https:

elasticsearch.hosts: ['https://10.0.3.1:9200']elasticsearch.ssl.certificateAuthorities: [ "/etc/kibana/elasticsearch-ca.pem" ]

В примере выше я использую адрес Coordinating only узла для балансировки нагрузки между Kibana и Elasticsearch, который был настроен в предыдущей части.

  • Создаем keystore для хранения пользователя и пароля:

sudo ./bin/kibana-keystore create --allow-root

Команда выполняются из директории /usr/share/kibana

Так как keystore будет создан в каталоге /etc/kibana/ , и keystone требуется файл конфигурации /etc/kibana/kibana.yml, то пользователю, запускающему команду, необходим доступ к этому каталогу и файлу. Я использую sudo и ключ --allow-root. Ключ необходим, так как запускать из под пользователя root нельзя.

Kibana keystore не имеет пароля. Для ограничения доступа используем стандартные средства Linux.

  • Добавляем пользователя kibana_system(встроенный пользователь Elasticsearch) и пароль учетной записи в Kibana keystore:

sudo ./bin/kibana-keystore add elasticsearch.username --allow-rootsudo ./bin/kibana-keystore add elasticsearch.password --allow-root 

Можно использовать так же пользователя kibana, однако, он в системе считается устаревшим (deprecated).

  • перезапускаем Kibana и проверяем открыв нужный адрес в браузере:

Kibana Kibana

Для входа используем учетную запись elastic.

Как можно заменить, трафик между браузером и Kibana не шифруется.

Настраиваем шифрование трафика между Kibana и клиентами

Данная процедура максимально похожа на ту, которую выполняли для получения сертификатов для узлов Elasticsearch.

  • Получаем сертификат при помощи elsticsearch-certutil :

./bin/elasticsearch-certutil cert  -ca /etc/elasticsearch/elastic-stack-ca.p12 -name kibana-certificate -dns kibana01,10.0.3.1,127.0.0.1,localhost -ip 10.0.3.1

При необходимости можем использовать CA (Certificate Authority). В процессе генерации сертификата указываем пароль от CA, если используется, имя будущего сертификата и пароль к нему.

Через -dns и -ip указываем DNS имена и адрес Kibana.

  • Указываем путь к сертификату в файле kibana.yml:

Так как я использовал ранее полученный CA, то указываю путь и к нему (server.ssl.truststore.path).

server.ssl.keystore.path: "/etc/kibana/kibana-certificate.p12"server.ssl.truststore.path: "/etc/elasticsearch/elastic-stack-ca.p12"

Не забывайте о правах доступа, пользователь kibana должен иметь права на чтения.

  • Включаем использование TLS:

server.ssl.enabled: true
  • Добавляем пароли от сертификатов в keystore:

sudo ./bin/kibana-keystore add server.ssl.keystore.password --allow-rootsudo ./bin/kibana-keystore add server.ssl.truststore.password --allow-root
  • Перезагружаем Kibana и проверяем:

Доступ к Kibana по httpsДоступ к Kibana по https

Теперь для подключения к Kibana используем https.

Создаем пользователей и роли

Ранее мы активировали встроенные учетные записи и сгенерировали для них пароли, но использовать пользователя elastic (superuser) не лучшая практика, поэтому рассмотрим, как создавать пользователей и роли к ним.

Для примера создадим администратора Kibana. Открываем Menu > Management > Stack Management, выбираем Users и нажимаем Create user. Заполняем все поля и жмем Create User.

Создание пользователя в KibanaСоздание пользователя в Kibana

Или же можно воспользоваться API. Делать запросы к кластеру можно через консоль Dev Tools инструмента Kibana. Для этого перейдите Menu > Management > Dev Tools. В открывшейся консоли можно писать запросы к Elasticsearch.

POST /_security/user/kibana_admin{  "password" : "password",  "roles" : [ "kibana_admin" ],  "full_name" : "Kibana Administrator",  "email" : "kibana.administrator@example.local"}
Создание пользователя kibana_admin через APIСоздание пользователя kibana_admin через API

После создания пользователя, можно его использовать.

Роль kibana_admin не может создавать пользователей и роли. Для создания пользователей и роли необходима привилегия кластера manage_security. С перечнем всех встроенных ролей можно ознакомится тут.

Далее создадим роль для работы с данными в ранее созданном индексом logstash-logs*. Открываем Menu > Management > Stack Management. Слева выбираем Roles и нажимаем Create role. Настраиваем привилегии, указав в качестве индекса logstash-logs*:

Index privileges

read

Read only права на индекс

Предоставляем пользователю доступ к Kibana, для этого ниже наживаем Add Kibana privilege и выбираем требуемые привилегии:

В поле Spaces указываю All spaces. Что такое Space (пространства) можно почитать на официальном сайте. В рамках данной серии статей Space будет описан в статье о Kibana dashboard.

Чтобы создать роль с привилегиями в Elasticsearch и Kibana через API, делаем запрос к Kibana:

curl -k -i -u 'elastic' -X PUT 'https://10.0.3.1:5601/api/security/role/logstash_reader' \--header 'kbn-xsrf: true' \--header 'Content-Type: application/json' \--data-raw '{  "elasticsearch": {    "cluster" : [ ],    "indices" : [      {        "names": [ "logstash-logs*" ],        "privileges": ["read"]      }      ]  },  "kibana": [    {      "base": [],      "feature": {       "discover": [          "all"        ],        "visualize": [          "all"        ],        "dashboard": [          "all"        ],        "dev_tools": [          "read"        ],        "indexPatterns": [          "read"        ]      },      "spaces": [        "*"      ]    }  ]}'

Создаем пользователя logstash_reader и связываем его с созданной ролью (это мы уже научились делать) и заходим данным пользователем в Kibana.

Главная страница Kibana для пользователя logstash_readerГлавная страница Kibana для пользователя logstash_reader

Как видно, у данного пользователя не так много прав. Он может просматривать индексы logstash-logs*, строить графики, создавать панели и делать GET запросы к этому индексам через Dev Tools.

Настраиваем пользовательские сессии Kibana

  • Закрываем неактивные сессии:

xpack.security.session.idleTimeout: "30m"
  • Устанавливаем максимальную продолжительность одной сессии:

xpack.security.session.lifespan: "1d"
  • Настраиваем интервал принудительной очистки данных о неактивных или просроченных сессиях из сессионного индекса:

xpack.security.session.cleanupInterval: "8h"

Для всех параметров формат времени может быть следующим: ms | s | m | h | d | w | M | Y

Данные о сессии удаляются после того, как пользователь осуществляет закрытие сессии. Если не настроить закрытие неактивных сессий или не ограничить максимальную длительность сессии, то информация об этих сессиях будет накапливаться в сессионном индексе, и Kibana не сможет автоматически удалить данные.

Настраиваем Logstash

На данный момент Logstash не отправляет данные в кластер Elasticsearch, и чтобы это исправить, сделаем несколько настроек.

  • Создаем роль для подключения к кластеру:

В Kibana открываем Menu > Management > Stack Management. Слева выбираем Roles и нажимаем Create role. Указываем имя роли и настраиваем привелегии:

Cluster privileges

manage_index_templates

Все операции над шаблонами индексов

monitor

Read-only права на получение информации о кластере

Index privileges

create_index

Создание индексов

write

Индексирование, обновление и удаление индексов

manage

Мониторинг и управление индексом

manage_ilm

Управление жизненным циклом индексов (IML)

  • Создаем пользователя для подключения к кластер:

Открываем Menu > Management > Stack Management, выбираем Users и нажимаем Create user. Заполняем требуемые данные, указав в качестве роли созданную выше роль.

Создание пользователя logstash_userСоздание пользователя logstash_user
  • Создаем Logstash keystore, и добавляем туда пользователя и пароль от него:

# Создаем keystore с паролемset +o historyexport LOGSTASH_KEYSTORE_PASS=mypasswordset -o historysudo /usr/share/logstash/bin/logstash-keystore create --path.settings /etc/logstash/# Добавляем данныеsudo /usr/share/logstash/bin/logstash-keystore add ES_USER --path.settings /etc/logstash/sudo /usr/share/logstash/bin/logstash-keystore add ES_PWD --path.settings /etc/logstash/
  • Настраиваем аутентификацию в output плагине:

output {  elasticsearch {    ...    user => "${ES_USER}"    password => "${ES_PWD}"  }}
  • Настраиваем TLS в Logstash:

Для подключения по https к Elasticsearch необходимо настроить использование ssl и установить .pem сертификат, который был получен для Kibana.

output {  elasticsearch {    ...    ssl => true    cacert => '/etc/logstash/elasticsearch-ca.pem'  }}
  • Перезагружаем Logstash и проверяем новые записи в индексе

Проверку можно сделать в Kibana через Discovery, как делали в прошлой статье, или через API:

Dev Tools - Console в KibanaDev Tools - Console в Kibana

Создание API ключей

Для работы с Elasticsearch можно не только использовать учетные записи пользователей, но и генерировать API ключ. Для этого необходимо сделать POST запрос:

POST /_security/api_key

В качестве параметров указывают:

name - имя ключа

role_descriptors - описание роли. Структура совпадает с запросом на создание роли.

expiration - Срок действия ключа. По умолчанию без срока действия.

Для примера заменим базовую аутентификацию в Logstash на API ключ.

  • Создаем API ключ в Kibana Dev Tool:

POST /_security/api_key{  "name": "host_logstash01",   "role_descriptors": {    "logstash_api_writer": {       "cluster": ["manage_index_templates", "monitor"],      "index": [        {          "names": ["logstash-logs*"],          "privileges": ["create_index", "write", "manage", "manage_ilm"]        }      ]    }  },  "expiration": "365d"}

Получаем следующий результат:

{  "id" : "979DkXcBv3stdorzoqsf",  "name" : "host_logstash01",  "expiration" : 1644585864715,  "api_key" : "EmmnKb6NTES3nlRJenFKrQ"}
  • Помещаем ключ в формате id:api_key в Keystore:

sudo /usr/share/logstash/bin/logstash-keystore add API_KEY --path.settings /etc/logstash/
  • Указываем API ключ в конфигурационном файле конвейера:

output {  elasticsearch {    hosts => ["https://10.0.3.11:9200","https://10.0.3.12:9200","https://10.0.3.13:9200"]    index => "logstash-logs-%{+YYYY.MM}"    ssl => true    api_key => "${API_KEY}"    cacert => '/etc/logstash/elasticsearch-ca.pem'  }}

Информацию по ключам можно получить в Kibana Menu > Management > API Keys или через API запрос:

GET /_security/api_key?name=host_logstash01{  "api_keys" : [    {      "id" : "9r8zkXcBv3stdorzZquD",      "name" : "host_logstash01",      "creation" : 1613048800876,      "expiration" : 1613049520876,      "invalidated" : false,      "username" : "elastic",      "realm" : "reserved"    }  ]}

Заключение

В рамках данной статьи были рассмотрены основные функции безопасности стека Elastic, позволяющие обезопасить работу нашего кластера, научились создавать пользователей и пользовательские роли, производить настройку пользовательских сессий, настройку шифрования трафика между узлами кластера и между кластером и клиентами, работать с API ключами и keystore.

В следующей статье будет рассмотрена процедура создание кластера ELK с помощью Docker.

Полезные ссылки

Подробнее..

Fluentd почему важно настроить выходной буфер

13.07.2020 14:10:08 | Автор: admin


В наше время невозможно представить проект на базе Kubernetes без стека ELK, с помощью которого сохраняются логи как приложений, так и системных компонентов кластера. В своей практике мы используем стек EFK с Fluentd вместо Logstash.


Fluentd это современный универсальный коллектор логов, набирающий всё большую популярность и присоединившийся к Cloud Native Computing Foundation, из-за чего вектор его разработки ориентирован на использование совместно с Kubernetes.


Факт использования Fluentd вместо Logstash не изменяет общую суть программного комплекса, однако, для Fluentd характерны свои специфические нюансы, следующие из его многофункциональности.


Например, начав использовать EFK в нагруженном проекте с высокой интенсивностью записи логов, мы столкнулись с тем, что в Kibana некоторые сообщения отображаются повторно по несколько раз. В данной статье мы расскажем вам, по какой причине происходит данное явление и как решить проблему.


Проблема дублирования документов


В наших проектах Fluentd развернут как DaemonSet (автоматически запускается в одном экземпляре на каждом узле кластера Kubernetes) и отслеживает stdout логи контейнеров в /var/log/containers. После сбора и обработки логи в виде JSON-документов поступают в ElasticSearch, поднятый в кластерном либо standalone виде, в зависимости от масштабов проекта и требований к производительности и отказоустойчивости. В качестве графического интерфейса используется Kibana.


При использовании Fluentd с буферизирующим плагином на выходе мы столкнулись с ситуацией, когда некоторые документы в ElasticSearch имеют полностью одинаковое содержание и отличаются лишь идентификатором. Убедиться, что это именно повтор сообщения можно на примере лога Nginx. В файле лога данное сообщение существует в единственном экземпляре:


127.0.0.1 192.168.0.1 - [28/Feb/2013:12:00:00 +0900] "GET / HTTP/1.1" 200 777 "-" "Opera/12.0" -

Однако, в ElasticSearch существует несколько документов, содержащих данное сообщение:


{  "_index": "test-custom-prod-example-2020.01.02",  "_type": "_doc",  "_id": "HgGl_nIBR8C-2_33RlQV",  "_version": 1,  "_score": 0,  "_source": {    "service": "test-custom-prod-example",    "container_name": "nginx",    "namespace": "test-prod",    "@timestamp": "2020-01-14T05:29:47.599052886 00:00",    "log": "127.0.0.1 192.168.0.1 - [28/Feb/2013:12:00:00  0900] \"GET / HTTP/1.1\" 200 777 \"-\" \"Opera/12.0\" -",    "tag": "custom-log"  }}{  "_index": "test-custom-prod-example-2020.01.02",  "_type": "_doc",  "_id": "IgGm_nIBR8C-2_33e2ST",  "_version": 1,  "_score": 0,  "_source": {    "service": "test-custom-prod-example",    "container_name": "nginx",    "namespace": "test-prod",    "@timestamp": "2020-01-14T05:29:47.599052886 00:00",    "log": "127.0.0.1 192.168.0.1 - [28/Feb/2013:12:00:00  0900] \"GET / HTTP/1.1\" 200 777 \"-\" \"Opera/12.0\" -",    "tag": "custom-log"  }}

Притом, повторов может быть больше двух.


Во время фиксации данной проблемы в логах Fluentd можно наблюдать большое количество предупреждений следующего содержания:


2020-01-16 01:46:46 +0000 [warn]: [test-prod] failed to flush the buffer. retry_time=4 next_retry_seconds=2020-01-16 01:46:53 +0000 chunk="59c37fc3fb320608692c352802b973ce" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elasticsearch\", :port=>9200, :scheme=>\"http\", :user=>\"elastic\", :password=>\"obfuscated\"}): read timeout reached"

Данные предупреждения возникают когда ElasticSearch не может вернуть ответ на запрос за установленное параметром request_timeout время, из-за чего пересылаемый фрагмент буфера не может быть очищен. После этого Fluentd пытается отправить фрагмент буфера в ElasticSearch повторно и через произвольное число попыток операция завершается успешно:


2020-01-16 01:47:05 +0000 [warn]: [test-prod] retry succeeded. chunk_id="59c37fc3fb320608692c352802b973ce" 2020-01-16 01:47:05 +0000 [warn]: [test-prod] retry succeeded. chunk_id="59c37fad241ab300518b936e27200747" 2020-01-16 01:47:05 +0000 [warn]: [test-dev] retry succeeded. chunk_id="59c37fc11f7ab707ca5de72a88321cc2" 2020-01-16 01:47:05 +0000 [warn]: [test-dev] retry succeeded. chunk_id="59c37fb5adb70c06e649d8c108318c9b" 2020-01-16 01:47:15 +0000 [warn]: [kube-system] retry succeeded. chunk_id="59c37f63a9046e6dff7e9987729be66f"

Однако, ElasticSearch воспринимает каждый из пересланных фрагментов буфера как уникальный и присваивает им уникальные значения полей _id при индексации. Таким образом и появляются копии сообщений.


В Kibana это выглядит так:



Решение проблемы


Существует несколько вариантов решения данной проблемы. Один из них встроенный в плагин fluent-plugin-elasticsearch механизм генерации уникального хеша для каждого документа. Если использовать данный механизм, ElasticSearch будет распознавать повторы на стадии пересылки и не допускать дублирования документов. Но нельзя не учитывать, что данный способ решения проблемы борется со следствием и не устраняет ошибку с нехваткой тайм-аута, поэтому мы отказались от его применения.


Мы используем буферизирующий плагин на выходе Fluentd, чтобы не допустить потери логов в случае кратковременных неполадок с сетью или возросшей интенсивности записи логов. Если по какой-либо причине ElasticSearch не может мгновенно записать документ в индекс, документ попадает в очередь, которая хранится на диске. Поэтому, в нашем случае, чтобы устранить источник проблемы, которая приводит к возникновению описанной выше ошибки, необходимо задать корректные значения параметров буферизации, при которых выходной буфер Fluentd будет достаточного объема и при этом успевать очищаться за отведенное время.


Стоит отметить, что значения параметров, речь о которых пойдет ниже, индивидуальны в каждом конкретном случае использования буферизации в выходных плагинах, так как зависят от множества факторов: интенсивности записи в лог сообщений сервисами, производительности дисковой системы, загруженности сетевого канала и его пропускной способности. Поэтому, чтобы получить подходящие для каждого отдельного случая, но не избыточные настройки буфера, избегая длительного перебора вслепую, можно воспользоваться отладочной информацией, которую пишет в свой лог Fluentd в процессе работы и сравнительно быстро получить корректные значения.


На момент фиксации проблемы конфигурация выглядела следующим образом:


 <buffer>        @type file        path /var/log/fluentd-buffers/kubernetes.test.buffer        flush_mode interval        retry_type exponential_backoff        flush_thread_count 2        flush_interval 5s        retry_forever        retry_max_interval 30        chunk_limit_size 8M        queue_limit_length 8        overflow_action block      </buffer>

В ходе решения проблемы вручную подбирались значения следующих параметров:
chunk_limit_size размер чанков, на которые разбиваются сообщения в буфере.


  • flush_interval интервал времени, через который происходит очистка буфера.
  • queue_limit_length максимальное количество чанков в очереди.
  • request_timeout время, на которое устанавливается соединение между Fluentd и ElasticSearch.

Общий размер буфера можно вычислить, перемножив параметры queue_limit_length и chunk_limit_size, что можно толковать как максимальное количество чанков в очереди, каждый из которых имеет заданный объем. При недостаточном размере буфера в логах будет появляться следующее предупреждение:


2020-01-21 10:22:57 +0000 [warn]: [test-prod] failed to write data into buffer by buffer overflow action=:block

Оно означает, что буфер не успевает очиститься за отведенное время и данные, которые поступают в заполненный буфер, блокируются, что приведет к потере части логов.


Увеличить буфер можно двумя способами: увеличив либо размер каждого чанка в очереди, либо количество чанков, которые могут находиться в очереди.


Если установить размер чанка chunk_limit_size более 32 мегабайт, то ElasticSeacrh не примет его, так как входящий пакет получится слишком большим. Поэтому, если необходимо увеличить буфер дополнительно, лучше увеличивать максимальную длину очереди queue_limit_length.


Когда буфер перестанет переполняться и останется только сообщение о нехватке тайм-аута, можно приступить к увеличению параметра request_timeout. Однако, при установке значения больше 20 секунд, в логах Fluentd начнут появляться следующие предупреждения:


2020-01-21 09:55:33 +0000 [warn]: [test-dev] buffer flush took longer time than slow_flush_log_threshold: elapsed_time=20.85753920301795 slow_flush_log_threshold=20.0 plugin_id="postgresql-dev" 

Данное сообщение никак не влияет на работу системы и означает, что время очистки буфера заняло больше, чем установлено параметром slow_flush_log_threshold. Это отладочная информация и мы используем её при подборе значения параметра request_timeout.


Обобщенный алгоритм подбора выглядит следующим образом:


  1. Установить значение request_timeout гарантированно большим, чем необходимо (сотни секунд). На время настройки основным критерием правильности установки данного параметра будет являться исчезновение предупреждений у нехватке тайм-аута.
  2. Дождаться сообщений о превышении порога slow_flush_log_threshold. В тексте предупреждения в поле elapsed_time будет написано реальное время очистки буфера.
  3. Установить значение request_timeout больше, чем максимальное значение elapsed_time, полученное за период наблюдения. Мы рассчитываем значение request_timeout как elapsed_time + 50%.
  4. Чтобы убрать из лога предупреждения о долгой очистке буфера, можно поднять значение slow_flush_log_threshold. Мы рассчитываем данное значение как elapsed_time + 25%.

Итоговые значения данных параметров, как было замечено ранее, получаются индивидуальными для каждого случая. Следуя приведенному выше алгоритму, мы гарантированно устраняем ошибку, приводящую к повтору сообщений.


В таблице ниже показано, как изменяется количество ошибок за сутки, приводящих к дублированию сообщений, в процессе подбора значений описанных выше параметров:


node-1 node-2 node-3 node-4
До/После До/После До/После До/После
failed to flush the buffer 1749/2 694/2 47/0 1121/2
retry succeeded 410/2 205/1 24/0 241/2

Стоит дополнительно отметить, что полученные настройки могут потерять свою актуальность в процессе роста проекта и, соответственно, увеличения количества логов. Первичным признаком нехватки установленного тайм-аута является возвращение в лог Fluentd сообщений о долгой очистке буфера, то есть превышение порога slow_flush_log_threshold. С этого момента есть ещё небольшой запас до превышения параметра request_timeout, поэтому необходимо своевременно отреагировать на данные сообщения и повторно выполнить процесс подбора оптимальных настроек, описанный выше.


Заключение


Тонкая настройка выходного буфера Fluentd является одним из главных этапов настройки EFK стека, определяющим стабильность его работы и корректность размещения документов в индексах. Ориентируясь на описанный алгоритм настройки, можно быть уверенным, что все логи будут записаны в индекс ElasticSearch в правильном порядке, без повторений и потерь.


Также читайте другие статьи в нашем блоге:


Подробнее..

Собираем логи с Loki

25.06.2020 16:23:50 | Автор: admin


Мы в Badoo постоянно мониторим свежие технологии и оцениваем, стоит ли использовать их в нашей системе. Одним из таких исследований и хотим поделиться с сообществом. Оно посвящено Loki системе агрегирования логов.


Loki это решение для хранения и просмотра логов, также этот стек предоставляет гибкую систему для их анализа и отправки данных в Prometheus. В мае вышло очередное обновление, которое активно продвигают создатели. Нас заинтересовало, что умеет Loki, какие возможности предоставляет и в какой степени может выступать в качестве альтернативы ELK стека, который мы используем сейчас.


Что такое Loki


Grafana Loki это набор компонентов для полноценной системы работы с логами. В отличие от других подобных систем Loki основан на идее индексировать только метаданные логов labels (так же, как и в Prometheus), a сами логи сжимать рядом в отдельные чанки.


Домашняя страница, GitHub


Прежде чем перейти к описанию того, что можно делать при помощи Loki, хочу пояснить, что подразумевается под идеей индексировать только метаданные. Сравним подход Loki и подход к индексированию в традиционных решениях, таких как Elasticsearch, на примере строки из лога nginx:


172.19.0.4 - - [01/Jun/2020:12:05:03 +0000] "GET /purchase?user_id=75146478&amp;item_id=34234 HTTP/1.1" 500 8102 "-" "Stub_Bot/3.0" "0.001"

Традиционные системы парсят строку целиком, включая поля с большим количеством уникальных значений user_id и item_id, и сохраняют всё в большие индексы. Преимуществом данного подхода является то, что можно выполнять сложные запросы быстро, так как почти все данные в индексе. Но за это приходится платить тем, что индекс становится большим, что выливается в требования к памяти. В итоге полнотекстовый индекс логов сопоставим по размеру с самими логами. Для того чтобы по нему быстро искать, индекс должен быть загружен в память. И чем больше логов, тем быстрее индекс увеличивается и тем больше памяти он потребляет.


Подход Loki требует, чтобы из строки были извлечены только необходимые данные, количество значений которых невелико. Таким образом, мы получаем небольшой индекс и можем искать данные, фильтруя их по времени и по проиндексированным полям, а затем сканируя оставшееся регулярными выражениями или поиском подстроки. Процесс кажется не самым быстрым, но Loki разделяет запрос на несколько частей и выполняет их параллельно, обрабатывая большое количество данных за короткое время. Количество шардов и параллельных запросов в них конфигурируется; таким образом, количество данных, которое можно обработать за единицу времени, линейно зависит от количества предоставленных ресурсов.


Этот компромисс между большим быстрым индексом и маленьким индексом с параллельным полным перебором позволяет Loki контролировать стоимость системы. Её можно гибко настраивать и расширять в соответствии с потребностями.


Loki-стек состоит из трёх компонентов: Promtail, Loki, Grafana. Promtail собирает логи, обрабатывает их и отправляет в Loki. Loki их хранит. А Grafana умеет запрашивать данные из Loki и показывать их. Вообще Loki можно использовать не только для хранения логов и поиска по ним. Весь стек даёт большие возможности по обработке и анализу поступающих данных, используя Prometheus way.
Описание процесса установки можно найти здесь.


Поиск по логам


Искать по логам можно в специальном интерфейсе Grafana Explorer. Для запросов используется язык LogQL, очень похожий на PromQL, использующийся в Prometheus. В принципе, его можно рассматривать как распределённый grep.


Интерфейс поиска выглядит так:



Сам запрос состоит из двух частей: selector и filter. Selector это поиск по индексированным метаданным (лейблам), которые присвоены логам, а filter поисковая строка или регэксп, с помощью которого отфильтровываются записи, определённые селектором. В приведенном примере: В фигурных скобках селектор, все что после фильтр.


{image_name="nginx.promtail.test"} |= "index"

Из-за принципа работы Loki нельзя делать запросы без селектора, но лейблы можно делать сколь угодно общими.


Селектор это key-value значения в фигурных скобках. Можно комбинировать селекторы и задавать разные условия поиска, используя операторы =, != или регулярные выражения:


{instance=~"kafka-[23]",name!="kafka-dev"} // Найдёт логи с лейблом instance, имеющие значение kafka-2, kafka-3, и исключит dev 

Фильтр это текст или регэксп, который отфильтрует все данные, полученные селектором.


Есть возможность получения ad-hoc-графиков по полученным данным в режиме metrics. Например, можно узнать частоту появления в логах nginx записи, содержащей строку index:



Полное описание возможностей можно найти в документации LogQL.


Парсинг логов


Есть несколько способов собрать логи:


  • С помощью Promtail, стандартного компонента стека для сбора логов.
  • Напрямую с докер-контейнера при помощи Loki Docker Logging Driver.
  • Использовать Fluentd или Fluent Bit, которые умеют отправлять данные в Loki. В отличие от Promtail они имеют готовые парсеры практически для любого вида лога и справляются в том числе с multiline-логами.

Обычно для парсинга используют Promtail. Он делает три вещи:


  • Находит источники данных.
  • Прикрепляет к ним лейблы.
  • Отправляет данные в Loki.

В настоящий момент Promtail может читать логи с локальных файлов и с systemd journal. Он должен быть установлен на каждую машину, с которой собираются логи.


Есть интеграция с Kubernetes: Promtail автоматически через Kubernetes REST API узнаёт состояние кластера и собирает логи с ноды, сервиса или пода, сразу развешивая лейблы на основе метаданных из Kubernetes (имя пода, имя файла и т. д.).


Также можно развешивать лейблы на основе данных из лога при помощи Pipeline. Pipeline Promtail может состоять из четырёх типов стадий. Более подробно в официальной документации, тут же отмечу некоторые нюансы.


  1. Parsing stages. Это стадия RegEx и JSON. На этом этапе мы извлекаем данные из логов в так называемую extracted map. Извлекать можно из JSON, просто копируя нужные нам поля в extracted map, или через регулярные выражения (RegEx), где в extracted map мапятся named groups. Extracted map представляет собой key-value хранилище, где key имя поля, а value его значение из логов.
  2. Transform stages. У этой стадии есть две опции: transform, где мы задаем правила трансформации, и source источник данных для трансформации из extracted map. Если в extracted map такого поля нет, то оно будет создано. Таким образом, можно создавать лейблы, которые не основаны на extracted map. На этом этапе мы можем манипулировать данными в extracted map, используя достаточно мощный Golang Template. Кроме того, надо помнить, что extracted map целиком загружается при парсинге, что даёт возможность, например, проверять значение в ней: {{if .tag}tag value exists{end}}. Template поддерживает условия, циклы и некоторые строковые функции, такие как Replace и Trim.
  3. Action stages. На этом этапе можно сделать что-нибудь с извлечённым:
    • Создать лейбл из extracted data, который проиндексируется Loki.
    • Изменить или установить время события из лога.
    • Изменить данные (текст лога), которые уйдут в Loki.
    • Создать метрики.
  4. Filtering stages. Стадия match, на которой можно либо отправить в /dev/null записи, которые нам не нужны, либо направить их на дальнейшую обработку.

Покажу на примере обработки обычных nginx-логов, как можно парсить логи при помощи Promtail.


Для теста возьмём в качестве nginx-proxy модифицированный образ nginx jwilder/nginx-proxy:alpine и небольшой демон, который умеет спрашивать сам себя по HTTP. У демона задано несколько эндпоинтов, на которые он может давать ответы разного размера, с разными HTTP-статусами и с разной задержкой.


Собирать логи будем с докер-контейнеров, которые можно найти по пути /var/lib/docker/containers/<container_id>/<container_id>-json.log


В docker-compose.yml настраиваем Promtail и указываем путь до конфига:


promtail:  image: grafana/promtail:1.4.1 // ... volumes:   - /var/lib/docker/containers:/var/lib/docker/containers:ro   - promtail-data:/var/lib/promtail/positions   - ${PWD}/promtail/docker.yml:/etc/promtail/promtail.yml command:   - '-config.file=/etc/promtail/promtail.yml' // ...

Добавляем в promtail.yml путь до логов (в конфиге есть опция "docker", которая делает то же самое одной строчкой, но это было бы не так наглядно):


scrape_configs: - job_name: containers   static_configs:       labels:         job: containerlogs         __path__: /var/lib/docker/containers/*/*log  # for linux only

При включении такой конфигурации в Loki будут попадать логи со всех контейнеров. Чтобы этого избежать, меняем настройки тестового nginx в docker-compose.yml добавляем логирование поле tag:


proxy: image: nginx.test.v3// logging:   driver: "json-file"   options:     tag: "{{.ImageName}}|{{.Name}}"

Правим promtail.yml и настраиваем Pipeline. На вход попадают логи следующего вида:


{"log":"\u001b[0;33;1mnginx.1    | \u001b[0mnginx.test 172.28.0.3 - - [13/Jun/2020:23:25:50 +0000] \"GET /api/index HTTP/1.1\" 200 0 \"-\" \"Stub_Bot/0.1\" \"0.096\"\n","stream":"stdout","attrs":{"tag":"nginx.promtail.test|proxy.prober"},"time":"2020-06-13T23:25:50.66740443Z"}{"log":"\u001b[0;33;1mnginx.1    | \u001b[0mnginx.test 172.28.0.3 - - [13/Jun/2020:23:25:50 +0000] \"GET /200 HTTP/1.1\" 200 0 \"-\" \"Stub_Bot/0.1\" \"0.000\"\n","stream":"stdout","attrs":{"tag":"nginx.promtail.test|proxy.prober"},"time":"2020-06-13T23:25:50.702925272Z"}

Pipeline stage:


 - json:     expressions:       stream: stream       attrs: attrs       tag: attrs.tag

Извлекаем из входящего JSON поля stream, attrs, attrs.tag (если они есть) и кладём их в extracted map.


 - regex:     expression: ^(?P<image_name>([^|]+))\|(?P<container_name>([^|]+))$     source: "tag"

Если удалось положить поле tag в extracted map, то при помощи регэкспа извлекаем имена образа и контейнера.


 - labels:     image_name:     container_name:

Назначаем лейблы. Если в extracted data будут обнаружены ключи image_name и container_name, то их значения будут присвоены соотвестующим лейблам.


 - match:     selector: '{job="docker",container_name="",image_name=""}'     action: drop

Отбрасываем все логи, у которых не обнаружены установленные labels image_name и container_name.


  - match:     selector: '{image_name="nginx.promtail.test"}'     stages:       - json:           expressions:             row: log

Для всех логов, у которых image_name равен nginx.promtail.test, извлекаем из исходного лога поле log и кладём его в extracted map с ключом row.


  - regex:         # suppress forego colors         expression: .+nginx.+\|.+\[0m(?P<virtual_host>[a-z_\.-]+) +(?P<nginxlog>.+)         source: logrow

Очищаем входную строку регулярными выражениями и вытаскиваем nginx virtual host и строку лога nginx.


     - regex:         source: nginxlog         expression: ^(?P<ip>[\w\.]+) - (?P<user>[^ ]*) \[(?P<timestamp>[^ ]+).*\] "(?P<method>[^ ]*) (?P<request_url>[^ ]*) (?P<request_http_protocol>[^ ]*)" (?P<status>[\d]+) (?P<bytes_out>[\d]+) "(?P<http_referer>[^"]*)" "(?P<user_agent>[^"]*)"( "(?P<response_time>[\d\.]+)")?

Парсим nginx-лог регулярными выражениями.


    - regex:           source: request_url           expression: ^.+\.(?P<static_type>jpg|jpeg|gif|png|ico|css|zip|tgz|gz|rar|bz2|pdf|txt|tar|wav|bmp|rtf|js|flv|swf|html|htm)$     - regex:           source: request_url           expression: ^/photo/(?P<photo>[^/\?\.]+).*$       - regex:           source: request_url           expression: ^/api/(?P<api_request>[^/\?\.]+).*$

Разбираем request_url. С помощью регэкспа определяем назначение запроса: к статике, к фоткам, к API и устанавливаем в extracted map соответствующий ключ.


       - template:           source: request_type           template: "{{if .photo}}photo{{else if .static_type}}static{{else if .api_request}}api{{else}}other{{end}}"

При помощи условных операторов в Template проверяем установленные поля в extracted map и устанавливаем для поля request_type нужные значения: photo, static, API. Назначаем other, если не удалось. Теперь request_type содержит тип запроса.


       - labels:           api_request:           virtual_host:           request_type:           status:

Устанавливаем лейблы api_request, virtual_host, request_type и статус (HTTP status) на основании того, что удалось положить в extracted map.


       - output:           source: nginx_log_row

Меняем output. Теперь в Loki уходит очищенный nginx-лог из extracted map.



После запуска приведённого конфига можно увидеть, что каждой записи присвоены метки на основе данных из лога.


Нужно иметь в виду, что извлечение меток с большим количеством значений (cardinality) может существенно замедлить работу Loki. То есть не стоит помещать в индекс, например, user_id. Подробнее об этом читайте в статье How labels in Loki can make log queries faster and easier. Но это не значит, что нельзя искать по user_id без индексов. Нужно использовать фильтры при поиске (грепать по данным), а индекс здесь выступает как идентификатор потока.


Визуализация логов



Loki может выступать в роли источника данных для графиков Grafana, используя LogQL. Поддерживаются следующие функции:


  • rate количество записей в секунду;
  • count over time количество записей в заданном диапазоне.

Ещё присутствуют агрегирующие функции Sum, Avg и другие. Можно строить достаточно сложные графики, например график количества HTTP-ошибок:



Стандартный data source Loki несколько урезан по функциональности по сравнению с data source Prometheus (например, нельзя изменить легенду), но Loki можно подключить как источник с типом Prometheus. Я не уверен, что это документированное поведение, но, судя по ответу разработчиков How to configure Loki as Prometheus datasource? Issue #1222 grafana/loki, например, это вполне законно, и Loki полностью совместим с PromQL.


Добавляем Loki как data source с типом Prometheus и дописываем URL /loki:



И можно делать графики, как в том случае, если бы мы работали с метриками из Prometheus:



Я думаю, что расхождение в функциональности временное и разработчики в будущем это поправят.



Метрики


В Loki доступны возможность извлечения числовых метрик из логов и отправка их в Prometheus. Например, в логе nginx присутствует количество байтов на ответ, а также, при определённой модификации стандартного формата лога, и время в секундах, которое потребовалось на ответ. Эти данные можно извлечь и отправить в Prometheus.


Добавляем ещё одну секцию в promtail.yml:


- match:   selector: '{request_type="api"}'   stages:     - metrics:         http_nginx_response_time:           type: Histogram           description: "response time ms"           source: response_time           config:             buckets: [0.010,0.050,0.100,0.200,0.500,1.0]- match:   selector: '{request_type=~"static|photo"}'   stages:     - metrics:         http_nginx_response_bytes_sum:           type: Counter           description: "response bytes sum"           source: bytes_out           config:             action: add         http_nginx_response_bytes_count:           type: Counter           description: "response bytes count"           source: bytes_out           config:             action: inc

Опция позволяет определять и обновлять метрики на основе данных из extracted map. Эти метрики не отправляются в Loki они появляются в Promtail /metrics endpoint. Prometheus должен быть сконфигурирован таким образом, чтобы получить данные, полученные на этой стадии. В приведённом примере для request_type=api мы собираем метрику-гистограмму. С этим типом метрик удобно получать перцентили. Для статики и фото мы собираем сумму байтов и количество строк, в которых мы получили байты, чтобы вычислить среднее значение.


Более подробно о метриках читайте здесь.


Открываем порт на Promtail:


promtail:     image: grafana/promtail:1.4.1     container_name: monitoring.promtail     expose:       - 9080     ports:       - "9080:9080"

Убеждаемся, что метрики с префиксом promtail_custom появились:



Настраиваем Prometheus. Добавляем job promtail:


- job_name: 'promtail' scrape_interval: 10s static_configs:   - targets: ['promtail:9080']

И рисуем график:



Таким образом можно узнать, например, четыре самых медленных запроса. Также на данные метрики можно настроить мониторинг.


Масштабирование


Loki может быть как в одиночном режиме (single binary mode), так и в шардируемом (horizontally-scalable mode). Во втором случае он может сохранять данные в облако, причём чанки и индекс хранятся отдельно. В версии 1.5 реализована возможность хранения в одном месте, но пока не рекомендуется использовать её в продакшене.



Чанки можно хранить в S3-совместимом хранилище, для хранения индексов использовать горизонтально масштабируемые базы данных: Cassandra, BigTable или DynamoDB. Другие части Loki Distributors (для записи) и Querier (для запросов) stateless и также масштабируются горизонтально.


На конференции DevOpsDays Vancouver 2019 один из участников Callum Styan озвучил, что с Loki его проект имеет петабайты логов с индексом меньше 1% от общего размера: How Loki Correlates Metrics and Logs And Saves You Money.


Сравнение Loki и ELK


Размер индекса
Для тестирования получаемого размера индекса я взял логи с контейнера nginx, для которого настраивался Pipeline, приведённый выше. Файл с логами содержал 406 624 строки суммарным объёмом 109 Мб. Генерировались логи в течение часа, примерно по 100 записей в секунду.


Пример двух строк из лога:



При индексации ELK это дало размер индекса 30,3 Мб:



В случае с Loki это дало примерно 128 Кб индекса и примерно 3,8 Мб данных в чанках. Стоит отметить, что лог был искусственно сгенерирован и не отличался большим разнообразием данных. Простой gzip на исходном докеровском JSON-логе с данными давал компрессию 95,4%, а с учётом того, что в сам Loki посылался только очищенный nginx-лог, то сжатие до 4 Мб объяснимо. Суммарное количество уникальных значений для лейблов Loki было 35, что объясняет небольшой размер индекса. Для ELK лог также очищался. Таким образом, Loki сжал исходные данные на 96%, а ELK на 70%.


Потребление памяти



Если сравнивать весь стек Prometheus и ELK, то Loki ест в несколько раз меньше. Понятно, что сервис на Go потребляет меньше, чем сервис на Java, и сравнение размера JVM Heap Elasticsearch и выделенной памяти для Loki некорректно, но тем не менее стоит отметить, что Loki использует гораздо меньше памяти. Его преимущество по CPU не так очевидно, но также присутствует.


Скорость


Loki быстрее пожирает логи. Скорость зависит от многих факторов что за логи, как изощрённо мы их парсим, сеть, диск и т. д. но она однозначно выше, чем у ELK (в моём тесте примерно в два раза). Объясняется это тем, что Loki кладёт гораздо меньше данных в индекс и, соответственно, меньше времени тратит на индексирование. Со скоростью поиска при этом ситуация обратная: Loki ощутимо притормаживает на данных размером более нескольких гигабайтов, у ELK же скорость поиска от размера данных не зависит.


Поиск по логам


Loki существенно уступает ELK по возможностям поиска по логам. Grep с регулярными выражениями это сильная вещь, но он уступает взрослой базе данных. Отсутствие range-запросов, агрегация только по лейблам, невозможность искать без лейблов всё это ограничивает нас в поисках интересующей информации в Loki. Это не подразумевает, что с помощью Loki ничего нельзя найти, но определяет флоу работы с логами, когда вы сначала находите проблему на графиках Prometheus, а потом по этим лейблам ищете, что случилось в логах.


Интерфейс


Во-первых, это красиво (извините, не мог удержаться). Grafana имеет приятный глазу интерфейс, но Kibana гораздо более функциональна.


Плюсы и минусы Loki


Из плюсов можно отметить, что Loki интегрируется с Prometheus, соответственно, метрики и алертинг мы получаем из коробки. Он удобен для сбора логов и их хранения с Kubernetes Pods, так как имеет унаследованный от Prometheus service discovery и автоматически навешивает лейблы.


Из минусов слабая документация. Некоторые вещи, например особенности и возможности Promtail, я обнаружил только в процессе изучения кода, благо open-source. Ещё один минус слабые возможности парсинга. Например, Loki не умеет парсить multiline-логи. Также к недостаткам можно отнести то, что Loki относительно молодая технология (релиз 1.0 был в ноябре 2019 года).


Заключение


Loki на 100% интересная технология, которая подходит для небольших и средних проектов, позволяя решать множество задач агрегирования логов, поиска по логам, мониторинга и анализа логов.


Мы не используем Loki в Badoo, так как имеем ELK-стек, который нас устраивает и который за много лет оброс различными кастомными решениями. Для нас камнем преткновения является поиск по логам. Имея почти 100 Гб логов в день, нам важно уметь находить всё и чуть-чуть больше и делать это быстро. Для построения графиков и мониторинга мы используем другие решения, которые заточены под наши нужды и интегрированы между собой. У стека Loki есть ощутимые плюсы, но он не даст нам больше, чем у нас есть, и его преимущества точно не перекроют стоимость миграции.


И хотя после исследования стало понятно, что мы Loki использовать не можем, надеемся, что данный пост поможет вам в выборе.


Репозиторий с кодом, использованным в статье, находится тут.

Подробнее..

Поиск в Кафке

26.02.2021 10:11:26 | Автор: admin

Меня зовут Сергей Калинец, я архитектор в компании Parimatch Tech, и в этой публикации хочу поделиться нашим опытом в области поиска сообщений в Kafka.

Для нашей компании Kafka является центральной нервной системой, через которую микросервисы обмениваются информацией. От входа до выхода сообщение может пройти через десяток сервисов, которые его фильтруют и трансформируют, перекладывая из одного топика в другой. Этими сервисами владеют разные команды, и очень полезно бывает посмотреть, что же содержится в том или ином сообщении. Особенно интересно это в случаях, когда что-то идет не по плану важно понять на каком этапе все превратилось в тыкву (ну и кому нужно в тыкву дать, чтобы такого больше не повторялось). С высоты птичьего полета решение простое нужно взять соответствующие сообщения из кафки и посмотреть что в них не так. Но, как обычно, интересноеначинаетсяв деталях.

Начнем с того, что кафка это не просто брокер сообщений, как многие думают и пользуются ей, но и распределенный лог. Это значит много чего, но нам интересно то, что сообщения не удаляются из топиков после того, как их прочитали получатели, и технически можно в любой момент их прочитать заново и посмотреть, что там внутри. Однако все усложняется тем, что читать из Kafka можно только последовательно. Нужно знать смещение (для упрощения это порядковый номер в топике), с которого нам нужны сообщения. Также возможно в качестве начальной точки указать время, но потом все равно можно только читать все сообщения по порядку.

Получается, что для того, чтобы найти нужное сообщение, нужно прочитать пачку и найти среди них те, которые интересны. Например, если мы хотим разобраться в проблемах игрока с id=42, нужно найти все сообщения, где он упоминается (playerId: 42), выстроить их в цепочку, ну и дальше уже смотреть, на каком этапе все пошло не так.

В отличии от баз данных вроде MySQL или MSSQL, у которых в комплекте поставки сразу есть клиентские приложения с графическим интерфейсом, ванильная Kafka не балует нас такими изысками и предлагает разве что консольные утилиты с довольно узким (на первый взгляд) функционалом.

Но есть и хорошие новости. На рынке есть ряд решений, которые помогают облегчить весь процесс. Сразу отмечу, что на рынке тут не в смысле за деньги все рассмотренные ниже инструменты бесплатны.

Я опишу некоторые наиболее часто используемые, а в конце более детально разберем самое, на наш взгляд, близкое к идеальному.

Итак, из чего можно выбирать?

Kafka Tool

(скриншот с официального сайта https://www.kafkatool.com/features.html)

Наверное самый популярный инструмент, которым пользуются люди, привыкшие к GUI инструментам. Позволяет увидеть список топиков, и прочитать отдельные сообщения. Есть возможность фильтровать по содержимому сообщений, указывать с какого смещения читать и сколько сообщений нужно прочитать. Но все же нужно знать примерные смещения интересующих нас сообщений, потому что Kafka Tool ищет только в рамках тех сообщений, которые были прочитаны. Говоря словами Дятлова из сериала Чернобыль (в оригинальной озвучке): Not great not terrible.

Часто это единственная альтернатива, с которой вынуждены работать люди, волей судьбы столкнувшиеся с кафкой. Но есть и другие средства.

Kafka Console Consumer

Это одна из утилит, входящая в комплект поставки кафки. И она позволяет читать данные из нее. Как и сама Kafka, это JVM приложение, которое для своей работы требует установленной Java. В принципе, это справедливо и для Kafka Tool, но в данном случае можно обойтись без Java если запускать через docker:

По сути, нужно просто перед самой командой добавить docker run --rm -it taion809/kafka-cli:2.2.0, что на докерском значит запусти вот такой образ, выведи, то, что он показывает, на мой экран и удали образ, когда он закончит работу. Можно пойти еще дальшеи добавить алиас типа

Утилита кажется архаичной и ее консольность может отпугнуть неискушенных адептов графических интерфейсов, но, как и большинство консольных инструментов, при правильном использовании она дает более мощный результат. Как именно рассмотрим на примере следующего инструмента, тоже консольного.

Kafkacat

Это уже весьма могучая штука, которая позволяет читать и писать из Кафки, а также получать список топиков. Она тоже консольная, но при этом поудобнее в использовании, чем стандартные утилиты вроде kafka-console-consumer (и ее тоже можно запустить из докера).

Вот так можно сохранить 10 сообщений в файле messages (в формате JSON):

Файл можно будет использовать для анализа или чтобы воспроизвести какой-то сценарий.Безусловно, для решения этой задачи можно было бы и использовать родной консьюмер, но kafkacat позволяет выразить это короче.

Или вот пример посложнее (в смысле букв немного больше, но решение проще многих других альтернатив):

Код взят с блога одного из самых топовых популяризаторов Kafka экосистемы Robin Moffatt. В двух словах один инстанс kafkacat читает сообщения из топика Kafka, потом сообщения трансформируются в необходимый формат и скармливаются другому инстансу kafkacat, который пишет их в другой топик. Многим такое может показаться вырвиглазным непонятным брейнфаком, однако на самом деле у нас готовое решение, которое можно запустить в докере и оно будет работать. Реализация такого сценария на вот этих ваших дотнетах и джавах потребует больше букв однозначно.

Но это меня уже занесло немного не в ту степь. Статья же про поиск. Так вот, похожим способом можно организовать и поиск сообщений просто перенаправить вывод в какой-то grep и дело с концом.

Из возможностей для улучшений можно отметить тот факт, что kafkacat поддерживает Avro из коробки, а вот protobuf нет.

Kafka Connect + ELK

Все вышеперечисленные штуки работают и решают поставленные задачи, однако не всем удобны. При разборе инцидентов нужно именно поискать сообщения в разных топиках по определенному тексту это может быть определенный идентификатор или имя. Наши QA (а именно они в 90% случаев занимаются подобными расследованиями) наловчились пользоваться Kafka Tool, а некоторые и консольными утилитами. Но это все меркнет по сравнению с возможностями, которые дает Kibana, UI оболочка вокруг базы данных Elasticsearch. Kibana тоже используется нами QA для анализа логов. И не раз поднимался вопрос давайте логировать все сообщения, чтобы можно было искать в Kibana. Но оказалось, что есть способ намного проще, чем добавление вызова логера в каждый из наших сервисов, и имя ему Kafka Connect.

Kafka Connect это решение для интеграции Kafka с другими системами. В двух словах,оно (или он?) позволяет экспортировать из и импортировать данные в Kafka без написания кода. Все, что нужно это поднятый кластер Connect и конфиги наших хотелок в формате JSON. Слово кластер звучит дорого и сложно, но на самом деле это один или больше инстансов, которые можно поднять где угодно мы, например, запускаем их там же, где и обычные сервисы в Kubernetes.

Kafka Connect предоставляет REST API, c помощью которого можно управлять коннекторами, занимающимися перегонкой данных из и в Kafka. Коннекторы задаются конфигурациями, в случае Elasticsearch эта конфигурация может быть вот такой:

Если такой конфиг через HTTP PUT передать на сервер Connect, то, при определенном стечении обстоятельств, создастся коннектор с именем ElasticSinkConnector, который будет в три потока читать данные из топика и писать их в Elastic.

Выглядит всё крайне просто, но самое интересное, конечно же, в деталях. А деталей тут есть )

Большинство проблем связанно с данными. Обычно, как и в нашем случае, нужно работать с форматами данных, разработчики которого явно не думали, что когда-то эти данные будут попадать в Elasticsearch.

Для решения нюансов с данными есть трансформации. Это такие себе функции, которые можно применять к данным и мутировать их, подстраивая под требования получателя. При этом всегда есть возможность использовать любую Kafka клиент технологию для случаев, когда трансформации бессильны. Какие же сценарии мы решали с помощью трансформаций?

В нашем случае есть 4 трансформации. Вначале мы их перечисляем, а потом конфигурим. Трансформации применяются в порядке их перечисления, и это позволяет интересно их комбинировать.

Имена

Вначале добавляем возможность поиска по имени топика просто дополняем наши сообщения полем с нужной информацией.

Индексы

Elasticsearch работает с индексами, которые имеют свойство забиваться и нервировать девопсов. Нужна поддержка удобной для управления схемой индексирования. В нашем случае мы остановились на индексе на сервис / команду с ежедневной ротацией. Плюс решения хранение данные из разных топиков в одном индексе с возможностью контролировать его возраст.

Даты

Для того, чтобы в Kibana можно было искать по дате, необходимо задать поле, в котором эта дата содержится. Мы используем дату публикации сообщения в Kafka. Чтобы ее получить, мы вначале вставляем поле с датой сообщения, а потом конвертируем ее в UTC формат. Конвертация была добавлена, чтобы помочь Elasticsearch распознать в этом поле timestamp, однако в нашем случае это не всегда происходило, поэтому мы добавили index template, который явно говорил в этом поле дата:

В результате у нас сообщения практически мгновенно становятся доступными для анализа, и время, потраченное на разбор инцидентов, уменьшается.

Тут, конечно, стоит отметить, что данная инициатива сейчас у нас на этапе внедрения, поэтому нельзя сказать,что все массово ищут все что нужно в Kibana, но мы к этому уверенно идем.

Вообще, Kafka Connect применимо не только для таких задач. Его можно вполне использовать в тех случаях, где нужна интеграция с другими системами, Реально, например, реализовать полнотекстовый поиск в вашем приложении с помощью двух коннекторов. Один будет читать из операционной базы обновления и писать их в Kafka. А второй читать из Kafka и отсылать в Elasticsearch. Приложение делает поисковый запрос в Elasticsearch, получает id и по нему находит нужные данные в базе.

Заключение

Ну а наша публикация подходит к концу. Очень надеюсь, что вы узнали что-то новое для себя, а иначе зачем это все? Если что-то не получилось раскрыть, или вы категорически не согласны с чем-то, или может есть более удобные способы решения подобных проблем напишите про это в комментариях, обсудим )

Подробнее..

Перевод Как построить систему распознавания лиц с помощью Elasticsearch и Python

13.05.2021 18:23:09 | Автор: admin

Перевод материала подготовлен в рамках практического интенсива Централизованные системы логирования Elastic stack.


Пытались ли вы когда-нибудь искать объекты на изображениях? Elasticsearch может помочь вам хранить, анализировать и искать объекты на изображениях или видео.

В этом кратком руководстве мы покажем вам, как создать систему распознавания лиц с помощью Python. Узнайте больше о том, как обнаруживать и кодировать информацию о внешности - и находить совпадения в поиске.

Краткое описание основ

Вам нужно освежить информацию? Давайте вкратце рассмотрим несколько основных понятий.

Распознавание лиц

Распознавание лиц - это процесс идентификации человека по его лицу, например, для реализации механизма аутентификации (как разблокировка смартфона). Он фиксирует, анализирует и сравнивает паттерны, основанные на деталях лица человека. Этот процесс можно разделить на три этапа:

  • Обнаружение лица: Идентификация человеческих лиц на цифровых изображениях.

  • Кодирование данных о лице: Преобразование черт лица в цифровое представление

  • Сопоставление лиц: Поиск и сравнение черт лица

Мы рассмотрим каждый этап на нашем примере.

128-мерный вектор

Черты лица могут быть преобразованы в набор цифровой информации для хранения и анализа.

Векторный тип данных

Elasticsearch предлагает тип данных dense_vector для хранения плотных векторов плавающих значений. Максимальное количество элементов в векторе не должно превышать 2048, что вполне достаточно для хранения репрезентаций черт лица.

Теперь давайте реализуем все эти концепции.

Все, что вам нужно

Для обнаружения лиц и кодирования информации вам понадобится следующее:

Обратите внимание, что мы протестировали следующие инструкции на Ubuntu 20.04 LTS и Ubuntu 18.04 LTS. В зависимости от вашей операционной системы могут потребоваться некоторые изменения.

Установите Python и библиотеки Python

Ubuntu 20 и другие версии Debian Linux поставляются с установленным Python 3. Если у вас не такой пакет, загрузите и установите Python по этой ссылке.

Чтобы убедиться, что ваша версия является самой свежей, вы можете выполнить следующую команду:

sudo apt update sudo apt upgrade

Убедитесь, что версия Python - 3.x:

python3 -V

Установите pip3 для управления библиотеками Python:

sudo apt install -y python3-pip

Установите cmake, необходимый для работы библиотеки face_recognition:

pip3 install CMake

Добавьте папку cmake bin в каталог $PATH:

export PATH=$CMake_bin_folder:$PATH

В завершение установите следующие библиотеки, прежде чем приступить к написанию сценария нашей основной программы:

pip3 install dlib pip3 install numpy pip3 install face_recognition  pip3 install elasticsearch

Обнаружение и кодирование информации о лице из изображения

Используя библиотеку face_recognition, мы можем обнаружить лица на изображении и преобразовать черты лица в 128-мерный вектор.

Создайте файл getVectorFromPicture.py:

touch getVectorFromPicture.py

Дополните файл следующим сценарием:

import face_recognition import numpy as np import sys image = face_recognition.load_image_file("$PATH_TO_IMAGE") # detect the faces from the images  face_locations = face_recognition.face_locations(image) # encode the 128-dimension face encoding for each face in the image face_encodings = face_recognition.face_encodings(image, face_locations) # Display the 128-dimension for each face detected for face_encoding in face_encodings:       print("Face found ==>  ", face_encoding.tolist())

Давайте выполним getVectorFromPicture.py, чтобы получить репрезентацию черт лица для изображений основателей компании Elastic. В сценарии необходимо изменить переменную $PATH_TO_IMAGE, чтобы задать имя файла изображения.

Теперь мы можем сохранить представление черт лица в Elasticsearch.

Сначала создадим индекс с отображением, содержащим поле с типом dense_vector:

# Store the face 128-dimension in Elasticsearch ## Create the mapping curl -XPUT "http://localhost:9200/faces" -H 'Content-Type: application/json' -d' {   "mappings" : {       "properties" : {         "face_name" : {           "type" : "keyword"         },         "face_encoding" : {           "type" : "dense_vector",           "dims" : 128         }       }     } }'

Нам необходимо создать один документ для каждой репрезентации лица, это можно сделать с помощью Index API:

## Index the face feature representationcurl -XPOST "http://localhost:9200/faces/_doc" -H 'Content-Type: application/json' -d'{  "face_name": "name",  "face_encoding": [     -0.14664565,     0.07806452,     0.03944433,     ...     ...     ...     -0.03167224,     -0.13942884  ]}'

Сопоставление лиц

Допустим, мы проиндексировали четыре документа в Elasticsearch, которые содержат отдельные изображения лиц основателей Elastic. Теперь мы можем использовать другое изображение наших основателей для сопоставления отдельных изображений.

Создайте файл recognizeFaces.py:

touch recognizeFaces.py

В этом сценарии мы извлечем векторы для каждого лица, обнаруженного на входном изображении, и используем эти векторы при создании запроса для отправки в Elasticsearch:

Импортируйте библиотеки:

import face_recognition import numpy as np from elasticsearch import Elasticsearch import sys

Добавьте следующий раздел для подключения к Elasticsearch:

# Connect to Elasticsearch cluster from elasticsearch import Elasticsearch es = Elasticsearch( cloud_id="cluster-1:dXMa5Fx...",      http_auth=("elastic", "<password>"), )

Мы будем использовать функцию cosineSimilarity для вычисления степени косинусного сходства между заданными векторами запросов и векторами документов, хранящихся в Elasticsearch.

i=0 for face_encoding in face_encodings:         i += 1         print("Face",i)         response = es.search(         index="faces",         body={          "size": 1,          "_source": "face_name",         "query": {         "script_score": {                  "query" : {                      "match_all": {}                  },          "script": {                 "source": "cosineSimilarity(params.query_vector, 'face_encoding')",                  "params": {                  "query_vector":face_encoding.tolist()                 }            }           }          }         }         )

Предположим, что значение меньше 0,93 считается неизвестным лицом:

for hit in response['hits']['hits']:                 #double score=float(hit['_score'])                 if (float(hit['_score']) > 0.93):                     print("==> This face  match with ", hit['_source']['face_name'], ",the score is" ,hit['_score'])                 else:                         print("==> Unknown face")

Давайте выполним наш скрипт:

Скрипт смог обнаружить все лица с совпадением результатов более 0,93.

Сделайте еще один шаг вперед с расширенным поиском

Распознавание лиц и поиск можно объединить для расширенных сценариев использования. Вы можете использовать Elasticsearch для создания более сложных запросов, таких как geo-queries, query-dsl-bool-query и search-aggregations.

Например, следующий запрос применяет поиск cosineSimilarity к определенному местоположению в радиусе 200 км:

GET /_search {   "query": {     "script_score": {       "query": {     "bool": {       "must": {         "match_all": {}       },       "filter": {         "geo_distance": {           "distance": "200km",           "pin.location": {             "lat": 40,             "lon": -70           }         }       }     }   },        "script": {                 "source": "cosineSimilarity(params.query_vector, 'face_encoding')",                  "params": {                  "query_vector":[                         -0.14664565,                       0.07806452,                       0.03944433,                       ...                       ...                       ...                       -0.03167224,                       -0.13942884                    ]                 }            }     }   } }

Комбинирование cosineSimilarity с другими запросами Elasticsearch дает вам неограниченные возможности для реализации более сложных сценариев использования.

Заключение

Распознавание лиц может быть полезно во многих случаях, и, возможно, вы уже используете его в своей повседневной жизни. Описанные выше концепции могут быть обобщены для любого обнаружения объектов на изображениях или видео, поэтому вы можете дополнить свой сценарий использования до очень широкого применения.

Elasticsearch может помочь в решении сложных задач. Попробуйте это с помощью бесплатной 14-дневной пробной версии Elastic Cloud, нашего официального управляемого предложения Elasticsearch, и сообщите о своем мнении на наших форумах для обсуждения.


Узнать больше об экспресс-курсе Централизованные системы логирования Elastic stack.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru