Всем привет! Хочу раскрыть подробно одну интересную, но, к несчастью, не встречающуюся тему в документации Spark: как обучать модель в PySpark ML на датасете с разными типами данных (строковыми и числовыми)? Желание написать данную статью было вызвано необходимостью в течение нескольких дней просматривать Интернет в поисках необходимой статьи с кодом, ведь в официальном туториале от Spark приведён пример работы не то что с признаками одного типа данных, а вообще с одним признаком, а информация, как работать с несколькими колонками тем более разных типов данных, там отсутствует. Однако, подробно изучив возможности PySpark для работы с данными, у меня получилось написать рабочий код и понять как всё происходит, чем хочу поделиться и с вами. Так что полный вперёд, друзья!
Первоначально давайте импортируем все необходимые библиотеки для работы, а потом подробно разберём код, чтобы любой уважающий себя ржавый чайник, как, впрочем, и я недавно, всё понял:
#импортируем необходимые библиотекиfrom pyspark.context import SparkContextfrom pyspark.sql.session import SparkSessionfrom pyspark.ml import Pipelinefrom pyspark.ml.feature import HashingTF, Tokenizerfrom pyspark.sql.functions import UserDefinedFunctionfrom pyspark.sql.types import *from pyspark.ml import Pipelinefrom pyspark.ml.feature import StringIndexer, VectorIndexerfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorimport pyspark.sql.functions as sffrom pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssemblerfrom pyspark.ml import Pipelinefrom pyspark.ml.regression import GBTRegressor#other types of regression models#можно использовать и другие виды регрессии#from pyspark.ml.regression import LinearRegression#from pyspark.ml.regression import RandomForestRegressor#from pyspark.ml.regression import GeneralizedLinearRegression#from pyspark.ml.regression import DecisionTreeRegressorfrom pyspark.ml.feature import VectorIndexerfrom pyspark.ml.evaluation import RegressionEvaluator
Теперь создадим (локальный) спарковский контекст и спарковскую сессию и проверим, всё ли работает, выведя полученное на экран. Создание спарковской сессии является отправной точкой в работе с датасетами в Spark:
#создаём спарк сессеюsc = SparkContext('local')spark = SparkSession(sc)spark
Инструмент для работы с данными есть, теперь загрузим их. В статье используется датасет, который был взят с сайта соревнований по машинному обучению Kaggle:
https://www.kaggle.com/unitednations/international-greenhouse-gas-emissions
который после скачивания хранится в path_csv в формате .csv и имеет следующие опции:
- header: если в нашем файле первая строка является заголовком, то ставим true
- delimiter: ставим знак, разделяющий данные одной строки по признакам, зачастую это "," или ";"
- inferSchema: если true, то PySpark автоматически определит тип каждой колонки, иначе вам придётся прописывать его самостоятельно
#загружаем данные формата .csv из path_csvpath_csv = 'greenhouse_gas_inventory_data_data.csv'data = spark.read.format("csv")\ .option("header", "true")\ .option("delimiter", ",")\ .option("inferSchema", "true")\ .load(path_csv)
Чтобы лучше понимать, с какими данными мы имеем дело, посмотрим на несколько их строк:
#посмотрим на часть данныхdata.show()
Также посмотрим сколько у нас всего строк в датасете:
#количество строк данныхdata.select('year').count()
И, наконец, выведем типы наших данных, которые, как мы помним, мы попросили PySpark определить автоматически с помощью option(inferSchema, true):
#посмотрим на типы всех наших колонокdata.printSchema()
Теперь переходим к нашему основному блюду работе с несколькими признаками разных типов данных. Spark может обучить модель на преобразованных данных, где предсказываемая колонка является вектором и колонки с признаками тоже вектор, что усложняет задачу Но мы не сдаёмся, и чтобы обучить модель в PySpark мы будем использовать Pipeline, в который мы передадим некий план действий (переменная stages):
- шаг label_stringIdx: мы преобразовываем колонку датасета value, которую мы хотим предсказывать, в спарковскую строку-вектор и переназываем на label с параметром handleInvalid = 'keep', означающий, что наша предсказываемая колонка поддерживает null
- шаг stringIndexer: преобразовываем строковые колонки в спарковские категориальные строки
- шаг encoder: преобразовываем категориальные колонки в бинарные (числовые) вектора благодаря строковому преобразователю
- шаг assembler: чтобы обучить модель в Spark, мы должны колонки с признаками преобразовать в один вектор, что можно достичь с помощью VectorAssembler(), который берёт на вход название численных (для этого мы и преобразовали строки в числа в предыдущем шаге) колонок (assemblerInputs) и преобразовываем все колонки в один вектор с именем features
- шаг gbt: в качестве модели регрессии из PySpark ML выбран GBTRegressor, потому что бустинг наше всё
#value - это зависимая и предсказываемая переменная - меткаstages = []label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')stages += [label_stringIdx]#depend on categorical columns: country and types of emission#зависит от категориаьных колонок: страны и категории загрязненияcategoricalColumns = ['country_or_area', 'category']for categoricalCol in categoricalColumns: #преобразование категориальных колонок в бинарные вектора благодаря строковому преобразователю stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index', handleInvalid = 'keep') encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(), outputCol=categoricalCol + "classVec") stages += [stringIndexer, encoder]#зависит от численной колонки: годаnumericCols = ['year']assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols#преобразование нескольких колонок в вектор-колонку - признакиassembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
Разделим наш датасет на тренировочную и тестовую выборку в любимом соотношении соотношении 70% к 30% соответственно и начнём тренировать модель с помощью градиентного регрессионого дерева бустинга (GBTRegressor), который должен предсказывать вектор label по признакам, ранее объединённым в один вектор features с ограничением по итерируемости maxIter=10:
#делим данные на обучающую и тестовую выборки (30% тестовая)(trainingData, testData) = data.randomSplit([0.7, 0.3])#тренируем модель (градиентного регрессионого дерева бустинга)gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)stages += [gbt]# задаем план stages для обучения модели pipeline = Pipeline(stages=stages)
А теперь нам осталось только отправить компьютеру план действий и тренировочный датасет:
# тренируем модельmodel = pipeline.fit(trainingData)# делаем предсказания на тестовой выборкеpredictions = model.transform(testData)
Сохраним нашу модель, чтобы мы всегда могли вернуться к её использованию без повторного обучения:
#сохраняем модельpipeline.write().overwrite().save('model/gbtregr_model')
И если вы решили вновь начать использовать обученную модель для предсказаний, то просто напишите:
#загружаем модель для работы после обученияload_model = pipeline.read().load('model/gbtregr_model')
Итак, мы посмотрели, как в инструменте для работы с большими данными на языке Python, PySpark, реализуется работа с несколькими признаковыми колонками разных типов данных.
Теперь пора применить это в ваших моделях