В предыдущей статье была описана процедура установки Elasticsearch и настройка кластера. В этой статье будет рассмотрена процедура установки Kibana и Logstash, а также их настройка для работы с кластером Elasticsearch.
Импортируем PGP ключ:
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
Устанавливаем apt-transport-https
пакет:
sudo apt-get install apt-transport-https
Перед установкой пакета необходимо добавить репозиторий Elastic:
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
Устанавливаем Kibana:
sudo apt-get update && sudo apt-get install kibana
Настраиваем Kibana для автоматического запуска при старте системы:
sudo /bin/systemctl daemon-reload && sudo /bin/systemctl enable kibana.service
Так же возможен вариант установки из скаченного
Deb
пакет с помощьюdpkg
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.10.2-amd64.debsudo dpkg -i kibana-7.10.2-amd64.deb
Импортируем PGP ключ
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
В директории /etc/yum.repos.d/
создаем файл
репозитория kibana.repo
для CentOS или Red
Hat. Для дистрибутива OpenSUSE в директории
/etc/zypp/repos.d/
:
[kibana-7.x]name=Kibana repository for 7.x packagesbaseurl=https://artifacts.elastic.co/packages/7.x/yumgpgcheck=1gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearchenabled=1autorefresh=1type=rpm-md
Устанавливаем Kibana c помощью пакетного менеджера в
зависимости от операционной
системы,yum
илиdnf
дляCentOS,Red
Hat,Fedoraилиzypper
дляOpenSUSE:
# Yumsudo yum install kibana # Dnfsudo dnf install kibana # Zyppersudo zypper install kibana
Настраиваем Kibana для автоматического запуска при старте системы:
sudo /bin/systemctl daemon-reload && sudo /bin/systemctl enable kibana.service
Так же возможен вариант установки из скаченного
RPM
пакет с помощьюrpm
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.10.2-x86_64.rpmsudo rpm --install kibana-7.10.2-x86_64.rpm
Скачиваем архив c Kibana:
curl -O https://artifacts.elastic.co/downloads/kibana/kibana-7.10.2-linux-x86_64.tar.gz
Извлекаем данные и переходим в директорию с Kibana:
tar -xzf kibana-7.10.2-linux-x86_64.tar.gzcd kibana-7.10.2-linux-x86_64/
Текущий каталог считается как $KIBANA_HOME
.
Конфигурационные файлы находятся в
каталоге$KIBANA_HOME/config/
.
Для запускаKibanaможно создать отдельного пользователя предоставив все необходимые права к каталогу сKibana.
Для настройки Kibana используется YAML файл,
который лежит по следующему пути
/etc/kibana/kibana.yml
при установке из
Deb
и RPM
пакетов или
$KIBANA_HOME/config/kibana.yml
при установке из
архива.
Определяем адрес и порт, на которых будет работать Kibana
(по умолчанию localhost:5601
):
server.host: 10.0.3.1server.port: 5601
Указываем узлы кластера Elasticsearch:
elasticsearch.hosts: - http://10.0.3.11:9200 - http://10.0.3.12:9200 - http://10.0.3.13:9200
В случае недоступности узла, с которым Kibana установила соединение, произойдет переключение на другой узел кластера, указанный в конфигурационном файле.
Указываем, где Kibana будет хранить свои логи (по
умолчанию stdout
):
logging.dest: /var/log/kibana/kibana.log
Необходимо предоставить доступ на запись к данному каталогу, чтобы Kibana писала логи. В случае установки из пакетов
Dep
илиRPM
доступ предоставляется пользователю или группеkibana
, а для установки из архива - пользователю, который осуществляет запуск Kibana.
Настраиваем частоту опроса Elasticsearch для получения обновлённого списка узлов кластера:
elasticsearch.sniffInterval: 600000
Определяем, запрашивать ли обновленный список узлов кластера Elasticsearch в случае сбоя соединения с кластером:
elasticsearch.sniffOnConnectionFault: true
На выходе получаем конфигурационный файл:
# Адрес и портserver.host: 10.0.3.1server.port: 5601# Узлы кластера Elasticsearchelasticsearch.hosts: - http://10.0.3.11:9200 - http://10.0.3.12:9200 - http://10.0.3.13:9200# Частота запросов обновления узлов кластера# Запрос обновлений в случае сбоя подключенияelasticsearch.sniffInterval: 60000elasticsearch.sniffOnConnectionFault: true# Логи Kibanalogging.dest: /var/log/kibana/kibana.log
По умолчанию Kibana имеет ограничение на использование памяти в размере 1,4 Гб. Для изменения этого значения необходимо в файле
node.options
указать новые значения для параметра--max-old-space-size
. Значения указываются в мегабайтах.Данный файл находится в каталоге
/etc/kibana/
при установке изDeb
иRPM
пакетов или$KIBANA_HOME/config/
- при установке из архива
Запускаем службу kibana
:
sudo systemctl start kibana.service
Для установки из архива используем:
$KIBANA_HOME/bin/kibana
Для выключения службы запущенной из архива используйте
Ctrl-C
.
В браузере набираем IP адрес и порт (в примере выше это
http://10.0.3.1:5601
), который указывали в
конфигурации Kibana. В результате должна открыться
приветственная страница.
Для организации балансировки между Kibana и узлами
Elastcisearch в Elastcisearch имеется встроенный
механизм. На хост c установленной Kibana, ставится
Elasticsearch с ролью Coordinating only
. Узел
Elasticsearch с данной ролью обрабатывает HTTP
запросы и распределяет их между узлами кластера.
Устанавливаем Elasticseach на узел с Kibana. Как это сделать описано в предыдущей статье.
Настраиваем новому узлу роль Coordinating only
. Для
этого для ролей master
, data
и
ingest
указываем параметр false
:
node.master: falsenode.data: falsenode.ingest: false
ingest
роль позволяет построить конвейер дополнительной обработки данных до их индексирования. Выделения отдельных узлов с этой ролью снижает нагрузку на другие узлы. Узел с рольюmaster
и/илиdata
имеют эту роль по умолчанию.
Указываем имя Elasticsearch кластера:
cluster.name: es_cluster # Имя кластера
Указываем адреса узлов текущего кластера. Для этого создадим
файл unicast_hosts.txt
с перечнем узлов в директории с
конфигурационными файлами Elasticsearch. Для установки из
Deb
и RPM
пакетов это
/etc/elasticsearch/
или $ES_HOME/config
/
при установке из архива:
# Список узлов кластера10.0.3.1110.0.3.1210.0.3.13
Указываем, что адреса узлов кластера нужно брать из файла:
discovery.seed_providers: file
Данный метод имеет преимущество над методом из предыдущей статьи (
discovery.seed_hosts
). Elasticsearch следит за изменением файла и применяет настройки автоматически без перезагрузки узла.
Настраиваем IP адрес и порт для приема запросов от Kibana
(network.host
и http.port
) и для
коммуникации с другими узлами кластера
Elasticsearch(transport.host
и
transport.tcp.port
). По умолчанию параметр
transport.host
равен network.host
:
network.host: 10.0.3.1 # Адрес узлаhttp.port: 9200 # Портtransport.host: 10.0.3.1 # Адрес для связи с кластеромtransport.tcp.port: 9300-9400 # Порты для коммуникации внутри кластера
Итоговый конфигурационный файл:
# ------------------------------------ Node ------------------------------------# Имя узлаnode.name: es-nlb01# Указываем роль Coordinating onlynode.master: falsenode.data: falsenode.ingest: false## ---------------------------------- Cluster -----------------------------------#cluster.name: es_cluster # Имя кластера## --------------------------------- Discovery ----------------------------------discovery.seed_providers: file ## ---------------------------------- Network -----------------------------------#network.host: 10.0.3.1 # Адрес узлаhttp.port: 9200 # Портtransport.host: 10.0.3.1 # Адрес для связи с кластеромtransport.tcp.port: 9300-9400 # Порты для коммуникации внутри кластера## ----------------------------------- Paths ------------------------------------#path.data: /var/lib/elasticsearch # Директория с даннымиpath.logs: /var/log/elasticsearch # Директория с логами
Запускаем Elasticsearch и проверяем, что узел присоединился к кластеру:
curl -X GET "http://10.0.3.1:9200/_cluster/health?pretty"{ "cluster_name" : "es_cluster", "status" : "green", "timed_out" : false, "number_of_nodes" : 4, "number_of_data_nodes" : 3, "active_primary_shards" : 6, "active_shards" : 12, "relocating_shards" : 0, "initializing_shards" : 0, "unassigned_shards" : 0, "delayed_unassigned_shards" : 0, "number_of_pending_tasks" : 0, "number_of_in_flight_fetch" : 0, "task_max_waiting_in_queue_millis" : 0, "active_shards_percent_as_number" : 100.0}
Теперь в кластере 4 узла, из них 3 с ролью
data
.
В конфигурации Kibana указываем адрес Coordinating
only
узла:
elasticsearch.hosts: - http://10.0.3.1:9200
Перезапускаем Kibana и проверяем, что служба запустилась, а UI Kibana открывается в браузере.
Чтобы организовать работу нескольких экземпляров Kibana, размещаем их за балансировщиком нагрузки. Для каждого экземпляра Kibana необходимо:
Настроить уникальное имя:
server.name
уникальное имя экземпляра. По умолчанию
имя узла.
В документации к Kibana указана необходимость настройки
server.uuid
, однако, в списке параметров конфигурации данный параметр отсутствует. На практикеuuid
генерируется автоматически и хранится в директории, в которой Kibana хранит свои данные. Данная директория определяется параметромpath.data
. По умолчанию для установки изDeb
иRPM
пакетов это/var/lib/kibana/
или$KIBANA_HOME/data/
для установки из архива.
Для каждого экземпляра в рамках одного узла необходимо указать уникальные настройки:
logging.dest
директория для хранения логов;
path.data
директория для хранения данных;
pid.file
файл для записи ID процесса;
server.port
порт, который будет использовать данный
экземпляр Kibana ;
Указать следующие параметры одинаковыми для каждого экземпляра:
xpack.security.encryptionKey
произвольный ключ для
шифрования сессии. Длина не менее 32 символов.
xpack.reporting.encryptionKey
произвольный ключ для
шифрования отчетов. Длина не менее 32 символов.
xpack.encryptedSavedObjects.encryptionKey
ключ для
шифрования данных до отправки их в Elasticsearch. Длина не
менее 32 символов.
xpack.encryptedSavedObjects.keyRotation.decryptionOnlyKeys
список ранее использованных ключей. Позволит расшифровать ранее
сохраненные данные.
Настройки безопасности(
xpack
) ELK будут рассмотрены в отдельной статье.
Чтобы запустить несколько экземпляров Kibana на одном
узле, необходимо указать путь к файлу конфигурации каждого из них.
Для этого используется ключ -c
:
bin/kibana -c <путь_к_файлу_конфигурации_#1>bin/kibana -c <путь_к_файлу_конфигурации_#2>
Импортируем GPG ключ:
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
Устанавливаем apt-transport-https
пакет:
sudo apt-get install apt-transport-https
Добавляем репозиторий Elastic:
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
Устанавливаем Logstash:
sudo apt-get update && sudo apt-get install logstash
Настраиваем Logstash для автоматического запуска при старте системы:
sudo /bin/systemctl daemon-reload && sudo /bin/systemctl enable logstash.service
Импортируем PGP ключ
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
В директории /etc/yum.repos.d/
создаем файл
репозитория logstash.repo
для CentOS или Red
Hat. Для дистрибутива OpenSUSE - в директории
/etc/zypp/repos.d/
[logstash-7.x]name=Elastic repository for 7.x packagesbaseurl=https://artifacts.elastic.co/packages/7.x/yumgpgcheck=1gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearchenabled=1autorefresh=1type=rpm-md
Устанавливаем Logstash c помощью пакетного менеджера в
зависимости от операционной
системы,yum
илиdnf
дляCentOS
,Red
Hat
,Fedora
илиzypper
дляOpenSUSE
:
# Yumsudo yum install logstash # Dnfsudo dnf install logstash # Zyppersudo zypper install logstash
Настраиваем Logstash для автоматического запуска при старте системы:
sudo /bin/systemctl daemon-reload && sudo /bin/systemctl enable logstash.service
Скачиваем архив с Logstash:
curl -O https://artifacts.elastic.co/downloads/logstash/logstash-7.10.2-linux-x86_64.tar.gz
Извлекаем данные и переходим в директорию с Logstash:
tar -xzf logstash-7.10.2-linux-x86_64.tar.gzcd logstash-7.10.2/
Текущий каталог считается как $LOGSTASH_HOME
.
Конфигурационные файлы находятся в
каталоге$LOGSTASH_HOME/config/
.
Помимо архива сайта можно скачать
Deb
илиRPM
пакет и установить с помощьюdpkg
илиrpm
.
В качестве примера настроим считывание собственных логов
Logstash из директории /var/log/logstash/
. Для
этого необходимо настроить конвейер (pipeline
).
Logstash имеет два типа файлов конфигурации. Первый тип описывает запуск и работу Logstash (
settings files
).Второй тип отвечает за конфигурацию конвейера (
pipeline
) обработки данных. Этот файл состоит из трех секций:input
,filter
иoutput
.
Чтобы описать конвейер создаем файл logstash.conf
в
директории /etc/logstash/conf.d/
, если установка была
из Deb
и RPM
, или в директории
$LOGSTASH_HOME/conf.d/
для установки из архива,
предварительно создав эту директорию.
Для установки из архива необходимо в конфигурационном файле
$LOGSTASH_HOME/config/pipelines.yml
указать путь до
директории с настройками конвейера:
- pipeline.id: main path.config: "./conf.d/*.conf"
Для установки из архива также требуется указать, где хранить
логи в файле конфигурации
$LOGSTASH_HOME/config/logstash.yml
:
path.logs: /var/log/logstash/# логи Logstash
Не забудьте предоставить доступ на запись к данному каталогу пользователю, который осуществляет запуск Logstash.
В файле logstash.conf
настроим плагин
file
в секции input
для чтения файлов.
Указываем путь к файлам через параметр path
, а через
параметр start_position
указываем, что файл необходимо
читать с начала:
input { file { path => ["/var/log/logstash/*.log"] start_position => "beginning" }}
Каждое событие журнала логов будет представлять собой
JSON документ и будет записано целиком в поле с ключом
message
. Чтобы из каждого события извлечь дату, время,
источник и другую информацию, необходимо использовать секцию
filter
.
В секции filter
с помощью плагина grok
и встроенных шаблонов извлекаем из каждой записи
(message
) журнала логов необходимую информацию:
filter { grok { match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{DATA:severity}%{SPACE}\]\[%{DATA:source}%{SPACE}\]%{SPACE}%{GREEDYDATA:message}" } overwrite => [ "message" ] }}
Шаблон имеет формат
%{SYNTAX:SEMANTIC}
и по сути это оболочка над
регулярным выражением, соответственно шаблон можно описать регулярным выражением. С
перечнем всех встроенных шаблоном можно ознакомится тут.
SEMANTIC
- это ключ поля с данными, которые
извлекаются с помощью шаблона, и под этим ключом будут храниться в
JSON документе.
С помощью параметра match
указываем, из какого поля
(message
) извлекаем данные по описанным шаблонам.
Параметр overwrite
сообщает, что оригинальное поле
message
необходимо перезаписать в соответствии с теми
данными, которые мы получили с помощью шаблона
%{GREEDYDATA:message}
.
Для сохранения данных в Elasticsearch настраиваем плагин
elasticsearch
в секции output
. Указываем
адреса узлов кластера Elasticseach и имя индекса.
Индекс (
index
) - оптимизированный набор JSON документов, где каждый документ представляет собой набор полей ключ - значение. Каждый индекс сопоставляется с одним или более главным шардом (primary shard
) и может иметь реплики главного шарда (replica shard
).Что такое индекс, шард и репликация шардов рассмотрю в отдельной статье.
output { elasticsearch { hosts => ["http://10.0.3.11:9200","http://10.0.3.12:9200","http://10.0.3.13:9200"] index => "logstash-logs-%{+YYYY.MM}" }}
К названию индекса добавлен шаблон %{+YYYY.MM}
,
описывающий год и месяц. Данный шаблон позволит создавать новый
индекс каждый месяц.
Итоговый файл:
# Читаем файлinput { file { path => ["/var/log/logstash/*.log"] start_position => "beginning" }}# Извлекаем данные из событийfilter { grok { match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{DATA:severity}%{SPACE}\]\[%{DATA:source}%{SPACE}\]%{SPACE}%{GREEDYDATA:message}" } overwrite => [ "message" ] }}# Сохраняем все в Elasticsearchoutput { elasticsearch { hosts => ["http://10.0.3.11:9200","http://10.0.3.12:9200","http://10.0.3.13:9200"] index => "logstash-logs-%{+YYYY.MM}" }}
Запускаем Logstash:
sudo systemctl start logstash.service
Для установки из архива:
$LOGSTASH_HOME/bin/logstash
Открываем Kibana, в верхнем левом углу нажимаем меню и в
секции Management
выбираем Stack
Management
. Далее слева выбираем Index patterns
и нажимаем кнопку Create Index Patern
. В поле
Index pattern name
описываем шаблон
logstash*
, в который попадут все индексы,
начинающиеся с logstash.
Жмем Next step
и выбираем Time field
поле timestamp
, чтобы иметь возможность фильтровать
данные по дате и времени. После жмем Create index
pattern
:
Logstash при анализе событий добавил поле
@timestamp
, в результате получилось 2 поля с датой и временем. Это поле содержит дату и время обработки события, следовательно, эти даты могут различаться. Например, если анализируемый файл имеет старые записи, то полеtimestamp
будет содержать данные из записей, а@timestamp
текущее время, когда каждое событие были обработано.
После создания шаблона индексов Kibana покажет информацию об имеющихся полях, типе данных и возможности делать агрегацию по этим полям.
Чтобы посмотреть полученные данные на основе созданного шаблона
нажимаем меню и в секции Kiban
выбираем
Discover
.
В правой части экрана можно выбрать интервал в рамках которого отображать данные.
Выбор временного интервалаВ левой часте экрана можно выбрать шаблон индекса или поля для
отображения из списка Available fields
. При нажатии на
доступные поля можно получить топ-5 значений.
Для фильтрации данных можно использовать Kibana Query
Language (KQL)
. Запрос пишется в поле Search
.
Запросы можно сохранять, чтобы использовать их в будущем.
Для визуализации полученных данных нажимаем меню и в секции
Kiban
выбираем Visualize
. Нажав
Create new visualization
, откроется окно с перечнем
доступных типов визуализации.
Для примера выбираем Pie
, чтобы построить круговую
диаграмму. В качестве источника данных выбираем шаблон индексов
logstash*
. В правой части в секции
Buckets
жмем Add
, далее - Split
slices
. Тип агрегации выбираем Terms
, поле
severity.keyword
. Жмем Update
в правом
нижнем углу и получаем готовую диаграмму. В секции
Options
можно добавить отображение данных или изменить
вид диаграммы.
Круговая диаграммаЕсли вместо графика отобразилась надпись
No results found
, проверьте выбранный интервал времени.
Чтобы посмотреть данные в Elasticsearch необходимо
сделать GET
запрос /имя_индекса/_search
к
любому узлу кластера. Добавление параметра pretty
позволяет отобразить данные в читабельном виде. По умолчанию вывод
состоит из 10 записей, чтобы увеличить это количество необходимо
использовать параметр size
:
curl -X GET "http://10.0.3.1:9200/logstash-logs-2021.01/_search?pretty&size=100"...{ "_index" : "logstash-logs-2021.01", "_type" : "_doc", "_id" : "XlXeQncBKsFiW7wX45A0", "_score" : 1.0, "_source" : { "path" : "/var/log/logstash/logstash-plain.log", "@version" : "1", "source" : "logstash.outputs.elasticsearch", "message" : "[main] Restored connection to ES instance {:url=>\"http://10.0.3.11:9200/\"}", "host" : "logstash01", "timestamp" : "2021-01-27T08:03:55,979", "@timestamp" : "2021-01-27T08:03:58.907Z", "severity" : "WARN" } },...
В рамках этой статьи была рассмотрена процедура установки и
настройки Kibana и Logstash, настройка балансировки
трафика между Kibana и Elasticsearch и работа
нескольких экземпляров Kibana. Собрали первые данные с
помощью Logstash, посмотрели на данные с помощью
Kibana Discover
и построили первую визуализацию.
Прежде чем углубляться в изучение плагинов Logstash, сбор
данных с помощью Beats, визуализацию и анализ данных в
Kibana, необходимо уделить внимание очень важному вопросу
безопасности кластера. Об этом также постоянно намекает
Kibana, выдавая сообщение Your data is not
secure
.
Теме безопасности будет посвящена следующая статья данного цикла.
В первой и второй частях данной серии статей была описана процедура установки и настройки кластера Elasticsearch, Kibana и Logstash, но не был освящен вопрос безопасности.
В этой статье я расскажу, как настроить шифрование трафика между узлами кластера Elasticsearch и его клиентам, шифрование трафика между Kibana и клиентами, покажу, как создавать роли и пользователей, а так же выдавать API ключи для доступа к кластеру Elasticsearch.
Чтобы использовать функции безопасности в Elasticsearch, их необходимо активировать. Для этого на каждом узле в файле конфигурации указывается следующая настройка:
xpack.security.enabled: true
Следующим шагом необходимо настроить шифрование трафика между узлами Elasticsearch. Для этого выполняем несколько шагов:
Создаем CA (Certificate Authority) для кластера Elasticsearch:
./bin/elasticsearch-certutil ca
Для установки из пакетов
Deb
иRPM
исполняемые файлы Elasticsearch лежат в каталоге/usr/share/elasticsearch/bin/
. Для установки из архива - в каталоге$ES_HOME/bin/
.В примерах все команды выполнены из каталога
/usr/share/elasticsearch/
.
Во время генерации корневого сертификата можно задать имя
PKCS#12 файла, по умолчанию это
elastic-stack-ca.p12
и пароль к нему.
Для получения сертификата и ключа в PEM формате укажите
ключ --pem
. На выходе будет ZIP архив с
.crt
и .key
файлами.
С помощью ключа --out
можно указать каталог для
создаваемого файла.
Полученный корневой сертификат (для PEM формата еще и ключ) надо переместить на все узлы кластера для генерации сертификата узла.
Генерируем на каждом узле кластера сертификат и ключ:
./bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12 --ip 10.0.3.11 --dns es-node01
Ключ --ca
указывает путь к корневому сертификату
CA в формате PKCS#12. Если сертификат и ключ были
получены в PEM формате, то необходимо использовать ключи
--ca-cert
и --ca-key
соответственно.
Ключи --dns
и --ip
добавляют проверку
по имени узла и IP адресу и являются опциональными. Если вы
указываете их, то укажите эти параметры для каждого узла.
Ключ
--pass
позволяет установитьpassphrase
для ключа.
Включаем TLS в файле конфигурации Elasticsearch:
xpack.security.transport.ssl.enabled: truexpack.security.transport.ssl.verification_mode: fullxpack.security.transport.ssl.keystore.path: es-node01-cert.p12xpack.security.transport.ssl.truststore.path: elastic-stack-ca.p12
Где:
xpack.security.transport.ssl.enabled
- включаем
TLS/SSL
xpack.security.transport.ssl.verification_mode
-
режим проверки сертификатов. none
- проверка не
выполняется, certificate
- выполняется проверка
сертификата без проверки имени узла и IP адреса,
full
- проверка сертификата, а также имени узла и
адреса указанных в сертификате.
xpack.security.transport.ssl.keystore.path
- путь к
файлу с сертификатом и ключем узла.
xpack.security.transport.ssl.truststore.path
- путь
к доверенному сертификату (CA).
Если сертификаты CA и/или узла в формате PEM, то необходимо изменить последние два параметра на следующие:
xpack.security.transport.ssl.key
- путь к закрытому ключу узла
xpack.security.transport.ssl.certificate
- путь к сертификату узла
xpack.security.transport.ssl.certificate_authorities
- список путей к сертифкатам CA.
Cоздаем keystore и добавляем пароли от сертификатов(если они были заданы):
./bin/elasticsearch-keystore create -p
Ключ -p требуется для установки пароля keystone во время создания
Для PKCS#12 формата:
./bin/elasticsearch-keystore add xpack.security.transport.ssl.keystore.secure_password./bin/elasticsearch-keystore add xpack.security.transport.ssl.truststore.secure_password
Для PEM сертификата:
./bin/elasticsearch-keystore add xpack.security.transport.ssl.securekeypassphrase
Чтобы запустить Elasticsearch с keystore, на
который установлен пароль, необходимо передать этот пароль
Elasticsearch. Это делается с помощью файла на который будет
ссылаться переменная ES_KEYSTORE_PASSPHRASE_FILE
.
После запуска файл можно удалить, но при каждом последующим запуске
файл необходимо создавать.
echo "password" > /etc/elasticsearch/ks_secret.tmpchmod 600 /etc/elasticsearch/ks_secret.tmpsudo systemctl set-environment ES_KEYSTORE_PASSPHRASE_FILE=/etc/elasticsearch/ks_secret.tmp
Перезапускаем Elasticsearch
В логах Elasticsearch должны появится записи о создании кластера:
[INFO ][o.e.c.s.ClusterApplierService] [es-node01] master node changed {previous [], current [{es-node02}{L1KdSSwCT9uBFhq0QlxpGw}{ujCcXRmOSn-EbqioSeDNXA}{10.0.3.12}{10.0.3.12:9300}...
Если обратиться к API, то будет ошибка "missing authentication credentials for REST request". С момента включения функций безопасности для обращения к кластер необходимо пройти аутентификацию.
Elasticsearch имеет несколько встроенных пользователей:
Пользователь |
Описание |
elastic |
Суперпользователь |
kibana_system |
Используется для коммуникации между Kibana и Elasticsearch |
logstash_system |
Пользователь, которого использует Logstash сервер, когда сохраняет информацию в Elasticsearch |
beats_system |
Пользователь, которого использует агент Beats, когда сохраняет информацию в Elasticsearch |
apm_system |
Пользователь, которого использует APM сервер, когда сохраняет информацию в Elasticsearch |
remote_monitoring_user |
Пользователь Metricbeat, который используется при сборе и хранении информации мониторинга в Elasticsearch |
Прежде чем воспользоваться перечисленными пользователями,
необходимо установить для них пароль. Для этого используется
утилита elasticsearch-setup-passwords
. В режиме
interactive
для каждого пользователя необходимо ввести
пароли самостоятельно, в режиме auto
Elasticsearch создаст пароли автоматически:
./bin/elasticsearch-setup-passwords autoInitiating the setup of passwords for reserved users elastic,apm_system,kibana,kibana_system,logstash_system,beats_system,remote_monitoring_user.The passwords will be randomly generated and printed to the console.Please confirm that you would like to continue [y/N]yChanged password for user apm_systemPASSWORD apm_system = NtvuRYhwbKpIEVUmHsZBChanged password for user kibana_systemPASSWORD kibana_system = ycXvzXglaLnrFMdAFsvyChanged password for user kibanaPASSWORD kibana = ycXvzXglaLnrFMdAFsvyChanged password for user logstash_systemPASSWORD logstash_system = vU3CuRbjBpax1RrsCCLFChanged password for user beats_systemPASSWORD beats_system = c9GQ85qhNL59H2AXUvcAChanged password for user remote_monitoring_userPASSWORD remote_monitoring_user = wB320seihljmGsjc29W5Changed password for user elasticPASSWORD elastic = iOrMTBbfHOAkm5CPeOj7
Второй раз
elasticsearch-setup-passwords
запустить не получится, так как bootstrap password изменился. Чтобы изменить пароль пользователям можно воспользоваться API.
Попробуем сделатьAPI запрос к любому узлу
Elasticsearch с использованием учетной записи
elastic
и пароля к от неё:
curl -u 'elastic' -X GET "http://10.0.3.1:9200/_cluster/health?pretty"Enter host password for user 'elastic':{ "cluster_name" : "es_cluster", "status" : "green", "timed_out" : false, "number_of_nodes" : 4, "number_of_data_nodes" : 3, "active_primary_shards" : 9, "active_shards" : 18, "relocating_shards" : 0, "initializing_shards" : 0, "unassigned_shards" : 0, "delayed_unassigned_shards" : 0, "number_of_pending_tasks" : 0, "number_of_in_flight_fetch" : 0, "task_max_waiting_in_queue_millis" : 0, "active_shards_percent_as_number" : 100.0}
Как видно из примера все работает, но мы обращались через
http
, значит трафик между клиентом и кластером
Elasticsearch не шифрованный.
Для шифрования клиентского трафика необходимо выпустить сертификат, который будет использоваться для шифрования трафика между узлом Elasticsearch и клиентом. При этом для Kibana будет сформирован отдельный сертификат, чтобы настроить соединение с Elasticsearch. Для этого делаем следующее:
Генерируем сертификат:
./bin/elasticsearch-certutil http
В процессе генерации необходимо определить:
1) Необходимо ли сгенерировать Certificate Signing Request (CSR). Потребуется, если сертификат будет выпускаться сторонним CA (Certificate Authority).
2) Использовать ли собственный CA (Certificate Authority). Если да, то указываем путь до ключа, которым будет подписаны будущие сертификаты.
3) Срок действия сертификата. По умолчанию 5 лет. Можно определить дни(D), месяцы (M), года (Y).
4) Как выпустить сертификат, на каждый узел или общий. Если все
узлы имеют общий домен, то можно выпустить wildcard
сертификат (например *.domain.local). Если общего домена
нет и в будущем возможно добавление узлов, то необходимо
генерировать на каждый узел, указав имя узла и его адрес. В будущем
можно выпустить отдельный сертификат для нового узла.
На выходе получаем архив сертификатами, в моём случае со всеми сертификатам для каждого узла Elasticsearch, а так же сертификат для подключения Kibana к Elasticsearch:
. elasticsearch es-nlb01 README.txt http.p12 sample-elasticsearch.yml es-node01 README.txt http.p12 sample-elasticsearch.yml es-node02 README.txt http.p12 sample-elasticsearch.yml es-node03 README.txt http.p12 sample-elasticsearch.yml kibana README.txt elasticsearch-ca.pem sample-kibana.yml
В архиве также лежит инструкция по дальнейшим действиям с сертификатом (README.txt) и пример конфигурационного файла (sample-...yml).
Сертификат
elasticsearch-ca.pem
из директорииkibana
потребуется в следующем шаге.
Размещаем сертификаты на узлах и добавляем необходимые настройки в файле конфигурации:
xpack.security.http.ssl.enabled: truexpack.security.http.ssl.keystore.path: "http.p12"
Добавляем пароль от сгенерированного сертификата в keystore:
./bin/elasticsearch-keystore add xpack.security.http.ssl.keystore.secure_password
Проверяем, что все работает по https
:
curl -u 'elastic' -k -X GET "https://10.0.3.1:9200/_cluster/health?pretty"Enter host password for user 'elastic':{ "cluster_name" : "es_cluster", "status" : "green", "timed_out" : false, "number_of_nodes" : 4, "number_of_data_nodes" : 3, "active_primary_shards" : 9, "active_shards" : 18, "relocating_shards" : 0, "initializing_shards" : 0, "unassigned_shards" : 0, "delayed_unassigned_shards" : 0, "number_of_pending_tasks" : 0, "number_of_in_flight_fetch" : 0, "task_max_waiting_in_queue_millis" : 0, "active_shards_percent_as_number" : 100.0}
Если сейчас обратиться к Kibana, то ответом будет "Kibana server is not ready yet".
Это связанно с тем, что сейчас Kibana не может получить
доступ к Elasticsearch по http
, для этого необходим
сертификат Kibana, который был получен в предыдущем шаге, а
также требуется пройти аутентификацию. Чтобы настроить работу
Kibana:
В конфигурационном файле Kibana указываем ключ к
сертификату elasticsearch-ca.pem
(из предыдущего шага)
и меняем протокол http
на https
:
elasticsearch.hosts: ['https://10.0.3.1:9200']elasticsearch.ssl.certificateAuthorities: [ "/etc/kibana/elasticsearch-ca.pem" ]
В примере выше я использую адрес
Coordinating only
узла для балансировки нагрузки между Kibana и Elasticsearch, который был настроен в предыдущей части.
Создаем keystore для хранения пользователя и пароля:
sudo ./bin/kibana-keystore create --allow-root
Команда выполняются из директории
/usr/share/kibana
Так как keystore будет создан в каталоге
/etc/kibana/
, и keystone требуется файл конфигурации/etc/kibana/kibana.yml
, то пользователю, запускающему команду, необходим доступ к этому каталогу и файлу. Я используюsudo
и ключ--allow-root
. Ключ необходим, так как запускать из под пользователяroot
нельзя.Kibana keystore не имеет пароля. Для ограничения доступа используем стандартные средства Linux.
Добавляем пользователя kibana_system(встроенный пользователь Elasticsearch) и пароль учетной записи в Kibana keystore:
sudo ./bin/kibana-keystore add elasticsearch.username --allow-rootsudo ./bin/kibana-keystore add elasticsearch.password --allow-root
Можно использовать так же пользователя kibana, однако, он в системе считается устаревшим (deprecated).
перезапускаем Kibana и проверяем открыв нужный адрес в браузере:
Для входа используем учетную запись elastic.
Как можно заменить, трафик между браузером и Kibana не шифруется.
Данная процедура максимально похожа на ту, которую выполняли для получения сертификатов для узлов Elasticsearch.
Получаем сертификат при помощи
elsticsearch-certutil
:
./bin/elasticsearch-certutil cert -ca /etc/elasticsearch/elastic-stack-ca.p12 -name kibana-certificate -dns kibana01,10.0.3.1,127.0.0.1,localhost -ip 10.0.3.1
При необходимости можем использовать CA (Certificate Authority). В процессе генерации сертификата указываем пароль от CA, если используется, имя будущего сертификата и пароль к нему.
Через -dns
и -ip
указываем DNS
имена и адрес Kibana.
Указываем путь к сертификату в файле
kibana.yml
:
Так как я использовал ранее полученный CA, то указываю путь и к
нему (server.ssl.truststore.path
).
server.ssl.keystore.path: "/etc/kibana/kibana-certificate.p12"server.ssl.truststore.path: "/etc/elasticsearch/elastic-stack-ca.p12"
Не забывайте о правах доступа, пользователь kibana должен иметь права на чтения.
Включаем использование TLS:
server.ssl.enabled: true
Добавляем пароли от сертификатов в keystore:
sudo ./bin/kibana-keystore add server.ssl.keystore.password --allow-rootsudo ./bin/kibana-keystore add server.ssl.truststore.password --allow-root
Перезагружаем Kibana и проверяем:
Теперь для подключения к Kibana используем
https
.
Ранее мы активировали встроенные учетные записи и сгенерировали для них пароли, но использовать пользователя elastic (superuser) не лучшая практика, поэтому рассмотрим, как создавать пользователей и роли к ним.
Для примера создадим администратора Kibana. Открываем Menu > Management > Stack Management, выбираем Users и нажимаем Create user. Заполняем все поля и жмем Create User.
Создание пользователя в KibanaИли же можно воспользоваться API. Делать запросы к кластеру можно через консоль Dev Tools инструмента Kibana. Для этого перейдите Menu > Management > Dev Tools. В открывшейся консоли можно писать запросы к Elasticsearch.
POST /_security/user/kibana_admin{ "password" : "password", "roles" : [ "kibana_admin" ], "full_name" : "Kibana Administrator", "email" : "kibana.administrator@example.local"}
Создание пользователя kibana_admin через API
После создания пользователя, можно его использовать.
Роль kibana_admin не может создавать пользователей и роли. Для создания пользователей и роли необходима привилегия кластера
manage_security
. С перечнем всех встроенных ролей можно ознакомится тут.
Далее создадим роль для работы с данными в ранее созданном индексом logstash-logs*. Открываем Menu > Management > Stack Management. Слева выбираем Roles и нажимаем Create role. Настраиваем привилегии, указав в качестве индекса logstash-logs*:
Index privileges |
read |
Read only права на индекс |
Предоставляем пользователю доступ к Kibana, для этого ниже наживаем Add Kibana privilege и выбираем требуемые привилегии:
В поле Spaces указываю All spaces. Что такое Space (пространства) можно почитать на официальном сайте. В рамках данной серии статей Space будет описан в статье о Kibana dashboard.
Чтобы создать роль с привилегиями в Elasticsearch и Kibana через API, делаем запрос к Kibana:
curl -k -i -u 'elastic' -X PUT 'https://10.0.3.1:5601/api/security/role/logstash_reader' \--header 'kbn-xsrf: true' \--header 'Content-Type: application/json' \--data-raw '{ "elasticsearch": { "cluster" : [ ], "indices" : [ { "names": [ "logstash-logs*" ], "privileges": ["read"] } ] }, "kibana": [ { "base": [], "feature": { "discover": [ "all" ], "visualize": [ "all" ], "dashboard": [ "all" ], "dev_tools": [ "read" ], "indexPatterns": [ "read" ] }, "spaces": [ "*" ] } ]}'
Создаем пользователя logstash_reader и связываем его с созданной ролью (это мы уже научились делать) и заходим данным пользователем в Kibana.
Главная страница Kibana для пользователя logstash_readerКак видно, у данного пользователя не так много прав. Он может просматривать индексы logstash-logs*, строить графики, создавать панели и делать GET запросы к этому индексам через Dev Tools.
Закрываем неактивные сессии:
xpack.security.session.idleTimeout: "30m"
Устанавливаем максимальную продолжительность одной сессии:
xpack.security.session.lifespan: "1d"
Настраиваем интервал принудительной очистки данных о неактивных или просроченных сессиях из сессионного индекса:
xpack.security.session.cleanupInterval: "8h"
Для всех параметров формат времени может быть следующим: ms | s | m | h | d | w | M | Y
Данные о сессии удаляются после того, как пользователь осуществляет закрытие сессии. Если не настроить закрытие неактивных сессий или не ограничить максимальную длительность сессии, то информация об этих сессиях будет накапливаться в сессионном индексе, и Kibana не сможет автоматически удалить данные.
На данный момент Logstash не отправляет данные в кластер Elasticsearch, и чтобы это исправить, сделаем несколько настроек.
Создаем роль для подключения к кластеру:
В Kibana открываем Menu > Management > Stack Management. Слева выбираем Roles и нажимаем Create role. Указываем имя роли и настраиваем привелегии:
Cluster privileges |
manage_index_templates |
Все операции над шаблонами индексов |
monitor |
Read-only права на получение информации о кластере |
|
Index privileges |
create_index |
Создание индексов |
write |
Индексирование, обновление и удаление индексов |
|
manage |
Мониторинг и управление индексом |
|
manage_ilm |
Управление жизненным циклом индексов (IML) |
Создаем пользователя для подключения к кластер:
Открываем Menu > Management > Stack Management, выбираем Users и нажимаем Create user. Заполняем требуемые данные, указав в качестве роли созданную выше роль.
Создание пользователя logstash_userСоздаем Logstash keystore, и добавляем туда пользователя и пароль от него:
# Создаем keystore с паролемset +o historyexport LOGSTASH_KEYSTORE_PASS=mypasswordset -o historysudo /usr/share/logstash/bin/logstash-keystore create --path.settings /etc/logstash/# Добавляем данныеsudo /usr/share/logstash/bin/logstash-keystore add ES_USER --path.settings /etc/logstash/sudo /usr/share/logstash/bin/logstash-keystore add ES_PWD --path.settings /etc/logstash/
Настраиваем аутентификацию в output
плагине:
output { elasticsearch { ... user => "${ES_USER}" password => "${ES_PWD}" }}
Настраиваем TLS в Logstash:
Для подключения по https
к Elasticsearch
необходимо настроить использование ssl
и установить
.pem
сертификат, который был получен для
Kibana.
output { elasticsearch { ... ssl => true cacert => '/etc/logstash/elasticsearch-ca.pem' }}
Перезагружаем Logstash и проверяем новые записи в индексе
Проверку можно сделать в Kibana через Discovery, как делали в прошлой статье, или через API:
Dev Tools - Console в KibanaДля работы с Elasticsearch можно не только использовать
учетные записи пользователей, но и генерировать API ключ. Для этого
необходимо сделать POST
запрос:
POST /_security/api_key
В качестве параметров указывают:
name
- имя ключа
role_descriptors
- описание роли. Структура
совпадает с запросом на создание
роли.
expiration
- Срок действия ключа. По умолчанию без
срока действия.
Для примера заменим базовую аутентификацию в Logstash на API ключ.
Создаем API ключ в Kibana Dev Tool:
POST /_security/api_key{ "name": "host_logstash01", "role_descriptors": { "logstash_api_writer": { "cluster": ["manage_index_templates", "monitor"], "index": [ { "names": ["logstash-logs*"], "privileges": ["create_index", "write", "manage", "manage_ilm"] } ] } }, "expiration": "365d"}
Получаем следующий результат:
{ "id" : "979DkXcBv3stdorzoqsf", "name" : "host_logstash01", "expiration" : 1644585864715, "api_key" : "EmmnKb6NTES3nlRJenFKrQ"}
Помещаем ключ в формате id:api_key
в Keystore:
sudo /usr/share/logstash/bin/logstash-keystore add API_KEY --path.settings /etc/logstash/
Указываем API ключ в конфигурационном файле конвейера:
output { elasticsearch { hosts => ["https://10.0.3.11:9200","https://10.0.3.12:9200","https://10.0.3.13:9200"] index => "logstash-logs-%{+YYYY.MM}" ssl => true api_key => "${API_KEY}" cacert => '/etc/logstash/elasticsearch-ca.pem' }}
Информацию по ключам можно получить в Kibana Menu > Management > API Keys или через API запрос:
GET /_security/api_key?name=host_logstash01{ "api_keys" : [ { "id" : "9r8zkXcBv3stdorzZquD", "name" : "host_logstash01", "creation" : 1613048800876, "expiration" : 1613049520876, "invalidated" : false, "username" : "elastic", "realm" : "reserved" } ]}
В рамках данной статьи были рассмотрены основные функции безопасности стека Elastic, позволяющие обезопасить работу нашего кластера, научились создавать пользователей и пользовательские роли, производить настройку пользовательских сессий, настройку шифрования трафика между узлами кластера и между кластером и клиентами, работать с API ключами и keystore.
В следующей статье будет рассмотрена процедура создание кластера ELK с помощью Docker.
Каждый шард использует ресурсы памяти и процессора. Небольшое количество бльших по объёму шардов использует меньше ресурсов, чем множество мелких.
каждые сутки создаются новые индексы по числу микросервисов поэтому раньше каждую ночь эластик впадал в клинч примерно на 8 минут, пока создавалась сотня новых индексов, несколько сотен новых шардов, график нагрузки на диски уходил в полку, вырастали очереди на отправку логов в эластик на хостах, и Zabbix расцветал алертами как новогодняя ёлка. Чтобы этого избежать, по здравому размышлению был написан скрипт на Python для предварительного создания индексов.
Если вы используете стек Elastic (ELK) и заинтересованы в сопоставлении пользовательских журналов Logstash с Elasticsearch, то этот пост для вас.
Стек ELK это аббревиатура для трех проектов с открытым исходным кодом: Elasticsearch, Logstash и Kibana. Вместе они образуют платформу управления журналами.
Beats появился позже и является легким грузоотправителем данных. Введение Beats преобразовало Elk Stack в Elastic Stack, но это не главное.
Эта статья посвящена Grok, которая является функцией в Logstash, которая может преобразовать ваши журналы, прежде чем они будут отправлены в тайник. Для наших целей я буду говорить только об обработке данных из Logstash в Elasticsearch.
Grok-это фильтр внутри Logstash, который используется для разбора неструктурированных данных на что-то структурированное и подлежащее запросу. Он находится поверх регулярного выражения (regex) и использует текстовые шаблоны для сопоставления строк в файлах журналов.
Как мы увидим в следующих разделах, использование Grok имеет большое значение, когда речь заходит об эффективном управлении журналами.
Без Grok, когда журналы отправляются из Logstash в Elasticsearch и визуализируются в Kibana, они появляются только в значении сообщения.
Запрос значимой информации в этой ситуации затруднен, поскольку все данные журнала хранятся в одном ключе. Было бы лучше, если бы сообщения журнала были организованы лучше.
localhost GET /v2/applink/5c2f4bb3e9fda1234edc64d 400 46ms 5bc6e716b5d6cb35fc9687c0
Если вы внимательно посмотрите на необработанные данные, то увидите, что они на самом деле состоят из разных частей, каждая из которых разделена пробелом.
Для более опытных разработчиков вы, вероятно, можете догадаться, что означает каждая из частей и что это сообщение журнала от вызова API. Представление каждого пункта изложено ниже.
Структурированный вид наших данных
Как мы видим в структурированных данных, существует порядок для неструктурированных журналов. Следующий шаг это программная обработка необработанных данных. Вот где Грок сияет.
Logstash поставляется с более чем 100 встроенными шаблонами для структурирования неструктурированных данных. Вы определенно должны воспользоваться этим преимуществом, когда это возможно для общих системных журналов, таких как apache, linux, haproxy, aws и так далее.
Однако что происходит, когда у вас есть пользовательские журналы, как в приведенном выше примере? Вы должны построить свой собственный шаблон Grok.
Нужно пробовать, чтобы построить свой собственный шаблон Grok. Я использовал Grok Debugger и Grok Patterns.
Обратите внимание, что синтаксис шаблонов Grok выглядит
следующим образом: %{SYNTAX:SEMANTIC}
Первое, что я попытался сделать, это перейти на вкладку Discover в отладчике Grok. Я подумал, что было бы здорово, если бы этот инструмент мог автоматически генерировать шаблон Grok, но это было не слишком полезно, так как он нашел только два совпадения.
Используя это открытие, я начал создавать свой собственный шаблон на отладчике Grok, используя синтаксис, найденный на странице Github Elastic.
Поиграв с разными синтаксисами, я наконец-то смог структурировать данные журнала так, как мне хотелось.
Ссылка на отладчик Grok https://grokdebug.herokuapp.com/
Исходный текст:
localhost GET /v2/applink/5c2f4bb3e9fda1234edc64d 400 46ms 5bc6e716b5d6cb35fc9687c0
Pattern:
%{WORD:environment} %{WORD:method} %{URIPATH:url} %{NUMBER:response_status} %{WORD:response_time} %{USERNAME:user_id}
То что получилось в итоге
{ "environment": [ [ "localhost" ] ], "method": [ [ "GET" ] ], "url": [ [ "/v2/applink/5c2f4bb3e9fda1234edc64d" ] ], "response_status": [ [ "400" ] ], "BASE10NUM": [ [ "400" ] ], "response_time": [ [ "46ms" ] ], "user_id": [ [ "5bc6e716b5d6cb35fc9687c0" ] ]}
Имея в руках шаблон Grok и сопоставленные данные, последний шаг добавить его в Logstash.
На сервере, на котором вы установили стек ELK, перейдите к конфигурации Logstash:
sudo vi /etc/logstash/conf.d/logstash.conf
Вставьте изменения.
input { file { path => "/your_logs/*.log" }}filter{ grok { match => { "message" => "%{WORD:environment} %{WORD:method} %{URIPATH:url} %{NUMBER:response_status} %{WORD:response_time} %{USERNAME:user_id}"} }}output { elasticsearch { hosts => [ "localhost:9200" ] }}
После сохранения изменений перезапустите Logstash и проверьте его состояние, чтобы убедиться, что он все еще работает.
sudo service logstash restartsudo service logstash status
Наконец, чтобы убедиться, что изменения вступили в силу, обязательно обновите индекс Elasticsearch для Logstash в Kibana!
С Grok ваши данные из логов структурированы!
Как мы видим, на изображении выше, Grok способен автоматически сопоставлять данные журнала с Elasticsearch. Это облегчает управление журналами и быстрый запрос информации. Вместо того чтобы рыться в файлах журналов для отладки, вы можете просто отфильтровать то, что вы ищете, например среду или url-адрес.
Попробуйте дать Grok expressions шанс! Если у вас есть другой способ сделать это или у вас есть какие-либо проблемы с примерами выше, просто напишите комментарий ниже, чтобы сообщить мне об этом.
Спасибо за чтение и, пожалуйста, следуйте за мной здесь, на Medium, для получения более интересных статей по программной инженерии!
Ресурсы
https://www.elastic.co/blog/do-you-grok-grok
https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns