Русский
Русский
English
Статистика
Реклама

PySpark. Решаем задачу на поиск сессий

Добрый день уважаемые читатели! Несколько дней назад перечитывая книгу Энтони Молинаро SQL. Сборник рецептов, в одной из глав я наткнулся на тему, которая была посвящена определению начала и конца диапазона последовательных значений. Бегло ознакомившись с материалом, я сразу вспомнил, что уже сталкивался с данным вопросом в качестве одного из тестовых заданий, но тогда тема была заявлена как Задача на поиск сессий. Фишкой технического собеседования был не разбор выполненной работы, а один из вопросов интервьюера о том, как получить аналогичные значения с помощью Spark. Готовясь к собеседованию, я не знал, что в компании применяется (а может и не применяется) Apache Spark, и поэтому не собрал информацию по новому на тот момент для меня инструменту. Оставалось лишь выдвинуть гипотезу, что искомое решение может быть подобно скрипту, который можно написать c помощью библиотеки Pandas. Хотя очень отдалено я все-таки попал в цель, однако поработать в данной организации не получилось.

Справедливости ради хочу заметить, что за прошедшие годы я несильно продвинулся в изучении Apache Spark. Но я все равно хочу поделиться с читателями наработками, так как многие аналитики вообще не сталкивались с этим инструментом, а другим возможно предстоит подобное собеседование. Если вы являетесь профессионалом Spark, то всегда можно предложить более оптимальный код в комментариях к публикации.

Это была преамбула, приступим непосредственно к разбору данной темы. Пойдем сначала и напишем SQL скрипт. Но прежде создадим базу данных и заполним ее значениями. Так как это демо-пример предлагаю использовать SQLite. Данная БД уступает более мощным коллегам по цеху, но ее возможностей для разработки скрипта нам хватит сполна. Чтобы автоматизировать заявленные выше операции, я написал вот такой код на Python.

# Импорт библиотекimport sqlite3# Данные для записи в БДprojects = [    ('2020-01-01', '2020-01-02'),    ('2020-01-02', '2020-01-03'),    ('2020-01-03', '2020-01-04'),    ('2020-01-04', '2020-01-05'),    ('2020-01-06', '2020-01-07'),    ('2020-01-16', '2020-01-17'),    ('2020-01-17', '2020-01-18'),    ('2020-01-18', '2020-01-19'),    ('2020-01-19', '2020-01-20'),    ('2020-01-21', '2020-01-22'),    ('2020-01-26', '2020-01-27'),    ('2020-01-27', '2020-01-28'),    ('2020-01-28', '2020-01-29'),    ('2020-01-29', '2020-01-30')]try:    # Создаем соединение    con = sqlite3.connect("projects.sqlite")    # Создаем курсор    cur = con.cursor()    # Создаем таблицу    cur.execute("""CREATE TABLE IF NOT EXISTS projects (                    proj_id INTEGER PRIMARY KEY AUTOINCREMENT,                    proj_start TEXT,                    proj_end TEXT)""")    # Добавляем записи    cur.executemany("INSERT INTO projects VALUES(NULL, ?,?)", projects)    # Сохраняем транзакцию    con.commit()    # Закрываем курсор    cur.close()except sqlite3.Error as err:    print("Ошибка выполнения запроса", err)finally:    # Закрываем соединение    con.close()    print("Соединение успешно закрыто")

Решение тривиальное и не требует дополнительных комментариев. Для коммуникации с созданной БД я использовал DBeaver. Клиентское приложение хорошо себя зарекомендовало, поэтому я часто использую его для разработки SQL запросов.

В рукописи Молинаро приводится два варианта решения данной задачи, но, по сути, это один и тот же код. Просто в первом случае не применяются оконные функции, а во-втором они присутствуют. Я выбрал последний, чтобы упростить ход рассуждений. Текстовую версию скрипта и все вспомогательные файлы к публикации вы можете найти по адресу (ссылка).

select       p3.proj_group,       min(p3.proj_start) as date_start,      max(p3.proj_end) as date_end,      julianday(max(p3.proj_end))-julianday( min(p3.proj_end))+1 as deltafrom    (select      p2.*,     sum(p2.flag)over(order by p2.proj_id) as proj_groupfrom (select       p.proj_id ,       p.proj_start,       p.proj_end,       case       when lag(p.proj_end)over(order by p.proj_id) = p.proj_start then 0 else 1       end as flagfrom projects as p) as p2) as p3group by p3.proj_group

Если вы раньше уже использовали оконные функции, то разобраться самостоятельно с написанной конструкцией не составит никакого труда. Я лишь кратко опишу логику. Первоначальная таблица представляет собой последовательные шаги, для которых заданы два параметра: дата начала и дата конца. Если дата начала шага соответствует дате конца предыдущего шага, то два шага считаются одной сессией. Следовательно, начинать расчеты нужно со смещения, за это отвечает оконная функция lag. На следующем этапе сравниваем дату старта текущего шага и дату конца предыдущего и выводим либо 0, либо 1. Если к новому столбцу применить суммирование с нарастающим итогом, то получим номера сессий. Стандартная группировка по номерам с агрегирующими функциями позволит извлечь начало и конец диапазона значений. Я также рассчитал дельту между двумя датами на случай, если потребуется установить самую длинную или короткую сессию. Приведенный код будет актуален и для других БД. Ошибка будет выводиться только на строчке, где находится разница между двумя датами (функция julianday это прерогатива SQLite). На этом первая часть тестового задания выполнена. Переходим к Spark.

Если верить Википедии, то Apache Spark это фреймворксоткрытым исходным кодомдля реализации распределённой обработкинеструктурированныхи слабоструктурированных данных, входящий в экосистему проектовHadoop. Так как я не пишу на Java, Scala или R, то для получения функциональности Spark решил использовать PySpark. Устанавливать на компьютер все необходимые для работы компоненты я не стал. Для экспериментов выбрал облачный сервис Google Colab, так как у меня уже был заведенный аккаунт. Основной минус - при каждом новом сеансе работы нужно заново скачивать файлы, связанные с запуском нашего инструмента. На просторах Интернета я встречал вариант с фиксированной установкой, но пока не пробовал его на практике.

С помощью базовых команд Linux мы устанавливаем OpenJDK, скачиваем и разархивируем файлы Spark. Затем прописываем две переменные среды. Нужно не забыть о вспомогательной библиотеке findspark. Подготовительная работа закончена, осталось только открыть сессию.

В идеале следует импортировать файл с БД SQLite в облако и подключаться к нему, но я решил облегчить себе жизнь и сформировал датафрейм прямо в ноутбуке. Чтобы даты воспринимались как даты, потребовалось написать собственную функцию.

Так как операций в Spark довольно много, рекомендую сразу обзавестись шпаргалками. Если говорить о литературе для изучения данного инструмента, то радует два факта. Во-первых, есть как англоязычные, так и переводные издания, а во-вторых, источников информации предостаточно. Если вы не владеете языком Шекспира, то могу порекомендовать в первую очередь Изучаем Spark. Молниеносный анализ данных, авторы Холден Карау, Энди Конвински, Патрик Венделл, Матей Захария.

После того, как датафрейм подготовлен, можно пойти сначала самым простым путем и использовать уже имеющийся скрипт SQL. В код нужно лишь внести две правки: изменить имя базовой таблицы, изменить способ нахождения дельты (функция datediff).

Строго говоря, мы уже выполнили вторую часть тестового задания. Но, что-то мне подсказывает, что на собеседовании к данному способу могут придраться, аргументируя это тем, что это все тот же скрипт SQL без специфики Spark. Поэтому в конце статьи приведу псевдокод, который скорее всего уступает по производительности предыдущему решению, но лучше отражает способности соискателя программировать. Полную версию скрипта можно найти в ноутбуке.

from pyspark.sql.functions import lagfrom pyspark.sql import functions as Ffrom pyspark.sql.window import Window# Equivalent of Pandas.dataframe.shift() methodw = Window().partitionBy().orderBy(col("proj_id"))df_dataframe = df.withColumn('lag', F.lag("proj_end").over(w))#...# Equivalent of SQL- CASE WHEN...THEN...ELSE... ENDdf_dataframe = df_dataframe.withColumn('flag',F.when(df_dataframe["proj_start"] == df_dataframe["lag"],0).otherwise(1))#...# Cumsum by column flagw = Window().partitionBy().orderBy(col("proj_id"))df_dataframe = df_dataframe.withColumn("proj_group", F.sum("flag").over(w))#...# Equivalent of SQL - GROUP BYfrom pyspark.sql.functions import  min, maxdf_group = df_dataframe.groupBy("proj_group").agg(min("proj_start").alias("date_start"), \                                                  max("proj_end").alias("date_end"))df_group = df_group.withColumn("delta", F.datediff(df_group.date_end,df_group.date_start))df_group.show()

Краткие выводы.

  1. Перед собеседованием внимательно изучайте список инструментов и технологий, с которыми работает компания. Не стоит ограничиваться позициями только из текста вакансии. В идеале нужно постоянно расширять свой профессиональный кругозор, чтобы отвечать на каверзные вопросы, которые задаются с цель всесторонне прощупать кандидата.

  2. Даже если вы раньше никогда не работали со Spark, это не повод отказываться от конкурса на вакантную позицию. Основы PySpark можно освоить в сжатые сроки, при условии, что в бэкграунде уже есть опыт программирования с использованием библиотеки Pandas.

  3. Недостатка в книгах по Spark не наблюдается.

На этом все. Всем здоровья, удачи и профессиональных успехов!

Источник: habr.com
К списку статей
Опубликовано: 07.03.2021 10:21:03
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Python

Sql

Big data

Sqlite

Data engineering

Spark

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru