У нас в финтехе, нам часто приходится обрабатывать довольно массивные объемы данных курсов обмена валют. Мы получаем данные из разных источников, и каждый из них имеет собственное представление о том, как экстраполировать значения курсов на завтра, послезавтра, следующий месяц и даже следующие три года. Если бы кто-то умел предсказывать курсы правильно, впору было бы закрывать бизнес и просто тупо менять деньги туда-сюда. Некоторые источники пользуются большим доверием, некоторые поставляют сплошь мусор, с редкими вкраплениями почти правильных значений, но зато для экзотических пар. Наша работа заключается в том, чтобы просеять эти десятки тысяч значений в секунду и определить, что именно показать заказчикам. Нам нужно отфильтровать единственное правильное значение из тонны грязи и ила, как это делают фламинго на обеде.
Особым отличительным признаком фламинго является массивный выгнутый вниз клюв, с помощью которого они фильтруют пищу из воды или ила.
Вики
Эта необходимость подарила рождение библиотеке Vela
, которая хранит кеш состояния для
нескольких значений в заданных временных интервалах. Под капотом,
она на лету отсеивает плохие и устаревшие данные, а также
предоставляет доступ к последним N прошедшим валидацию
значениям для каждого ключа (пары валют, в нашем случае).
Допустим, мы собираем курсы для трех пар валют. Простейшее
определение Vela
для хранения актуального состояния
будет выглядеть как-то так:
defmodule Pairs do use Vela, eurusd: [sorter: &Kernel.<=/2], eurgbp: [limit: 3, errors: 1], eurcad: [validator: Pairs] @behaviour Vela.Validator @impl Vela.Validator def valid?(:eurcad, rate), do: rate > 0end
Обновление значений
Vela.put/3
функция
последовательно сделает следующее:
- вызовет
validator
на значении, если таковой определен (см. главку Валидация ниже); - добавит значение либо в ряд хороших значений, если валидация
закончилась успешно, или в служебный ряд
:__errors__
в обратном случае; - вызовет сортировку если
sorter
определен для данного ключа, или просто положит значение в голову списка (FILO, см. главку Сортировка ниже); - обрежет ряд в соответствии с параметром
:limit
переданном при создании; - вернет обновленную структуру
Vela
.
iex|1 > pairs = %Pairs{}iex|2 > Vela.put(pairs, :eurcad, 1.0)# %Pairs{..., eurcad: [1.0], ...}iex|3 > Vela.put(pairs, :eurcad, -1.0)#%Pairs{__errors__: [eurcad: -1.0], ...}iex|4 > pairs |> Vela.put(:eurusd, 2.0) |> Vela.put(:eurusd, 1.0)#%Pairs{... eurusd: [1.0, 2.0]}
Также Vela
имплементирует Access
, так что можно для обновления
значений воспользоваться любой из стандартных функций для глубокого
обновления структур из арсенала Kernel
: Kernel.get_in/2
, Kernel.put_in/3
, Kernel.update_in/3
, Kernel.pop_in/2
, and Kernel.get_and_update_in/3
.
Валидация
Валидатор можен быть определен как:
- внешняя функция с одним аргументом
(
&MyMod.my_fun/1
), она получит только значение для валидации; - внешняя функция с двумя аргументами,
&MyMod.my_fun/2
, она получит паруserie, value
для валидации; - модуль, имплементирующий
Vela.Validator
; - конфигурационный параметр
threshold
, и опциональноcompare_by
, см. главку Comparison ниже.
Если валидация прошла успешно, значение добавляется в список под
соответствующим ключом, в обратном случае кортеж {serie,
value}
отправляется в :__errors_
.
Сравнение
Значения, сохраняемые в этих рядах, могут быть любыми. Чтобы
научить Vela
их сравнивать, необходимо передать
compare_by
параметр в определение ряда (если только
значения не могут быть сравнены стандартным
Kernel.</2
); этот параметр должен иметь тип
(Vela.value() -> number())
. По умолчанию это просто
& &1
.
Также, в определение ряда можно передать параметр
comparator
для вычисления значений дельт
(min
/max
); например, передавая
Date.diff/2
в качестве компаратора, можно получить
правильные дельты для дат.
Другим удобным способом работы является передача параметра
threshold
, который определяет максимально допустимое
отношение нового значения к {min, max}
интервалу.
Поскольку он задан в процентах, проверка не использует
comparator
, но все еще использует
compare_by
. Например, чтобы указать пороговое значение
для времени дат, необходимо указать compare_by:
&DateTime.to_unix/1
(для получения целочисленного
значения) и threshold: 1
, в результате чего новые
значения будут разрешены, только если они находятся в
band
интервале от текущих значений.
Наконец, можно использовать Vela.equal?/2
для
сравнения двух кешей. Если значения определяют функцию
equal?/2
или compare/2
, то эти функции
будут использованы для сравнения, в противном случае мы тупо
используем ==/2
.
Получение значений
Обработка текущего состояния обычно начинается с вызова
Vela.purge/1
, который убирает устаревшие значения
(если validator
завязан на timestamps
).
Затем можно вызвать Vela.slice/1
, которая вернет
keyword
с именами рядов в качестве ключей и первым,
актуальными значениями.
Также можно воспользоваться
get_in/2
/pop_in/2
для низкоуровнего
доступа к значениям в каждом ряду.
Приложение
Vela
может оказаться чрезвычайно полезной в
качестве кеша временных рядов в стейте процесса типа
GenServer
/Agent
. Мы хотим никогда не
использовать устаревшие значения курсов, и для этого мы просто
держим процесс с состоянием, обрабатываемым Vela
, с
валидатором, показанным ниже.
@impl Vela.Validatordef valid?(_key, %Rate{} = rate), do: Rate.age(rate) < @death_age
и Vela.purge/1
спокойно удаляет все устаревшие
значения каждый раз, когда нам требуются данные. Для доступа к
актуальным значениям, мы просто вызываем Vela.slice/1
,
а когда требуется небольшая история по курсу (весь ряд целиком), мы
просто возвращаем его, уже отсортированным, с провалидированными
значениями.
Удачного кеширования временных рядов!