В этом руководстве мы рассмотрим, как использовать возможности Apache Spark с помощью PySpark непосредственно в Google Colab. Мы начнём с настройки локального сеанса Spark, затем последовательно перейдём к трансформациям, SQL-запросам, объединениям и оконным функциям.
Настройка
Мы создадим и оценим простую модель машинного обучения для прогнозирования типов подписки пользователей и, наконец, продемонстрируем, как сохранять и перезагружать файлы Parquet. Также мы увидим, как можно использовать распределённые возможности обработки данных Spark для аналитики и рабочих процессов машинного обучения даже в среде Colab с одним узлом.
Этапы работы:
1. Настройка PySpark: инициализация сеанса Spark и подготовка набора данных.
2. Создание структурированного DataFrame: содержащий информацию о пользователях, включая страну, доход и тип плана.
3. Выполнение различных преобразований данных: добавление новых столбцов и регистрация DataFrame в виде таблицы SQL.
4. Применение оконных функций: для ранжирования пользователей по доходу.
5. Введение пользовательской функции (UDF): для присвоения уровням приоритета планов подписки.
6. Объединение пользовательского набора данных: с метаданными о стране, включая регион и население.
7. Вычисление аналитических сводок: таких как средний доход и количество пользователей по регионам и типам планов.
8. Подготовка данных для обучения модели: индексация категориальных столбцов, сборка признаков и обучение модели логистической регрессии для прогнозирования премиум-пользователей.
9. Оценка точности модели.
10. Запись обработанных данных в формате Parquet и чтение их обратно в Spark для проверки.
Код
«`python
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, FloatType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = (SparkSession.builder.appName(«ColabSparkAdvancedTutorial»)
.master(«local[*]»)
.config(«spark.sql.shuffle.partitions», «4»)
.getOrCreate())
print(«Spark version:», spark.version)
data = [
(1, «Alice», «IN», «2025-10-01», 56000.0, «premium»),
(2, «Bob», «US», «2025-10-03», 43000.0, «standard»),
(3, «Carlos», «IN», «2025-09-27», 72000.0, «premium»),
(4, «Diana», «UK», «2025-09-30», 39000.0, «standard»),
(5, «Esha», «IN», «2025-10-02», 85000.0, «premium»),
(6, «Farid», «AE», «2025-10-02», 31000.0, «basic»),
(7, «Gita», «IN», «2025-09-29», 46000.0, «standard»),
(8, «Hassan», «PK», «2025-10-01», 52000.0, «premium»),
]
schema = StructType([
StructField(«id», IntegerType(), False),
StructField(«name», StringType(), True),
StructField(«country», StringType(), True),
StructField(«signup_date», StringType(), True),
StructField(«income», FloatType(), True),
StructField(«plan», StringType(), True),
])
df = spark.createDataFrame(data, schema)
df.show()
«`
Заключение
В этом руководстве мы получили практическое понимание того, как PySpark объединяет задачи обработки данных и машинного обучения в рамках единой масштабируемой платформы. Мы увидели, как простые преобразования DataFrame превращаются в SQL-аналитику, разработку признаков и прогнозное моделирование, оставаясь при этом в Google Colab. Экспериментируя с этими концепциями, мы укрепили свои способности к созданию прототипов и развёртыванию решений на основе Spark.
Проверьте полные коды здесь.
Подписывайтесь на нашу страницу на GitHub для учебных материалов, кодов и ноутбуков. Также подписывайтесь на нас в Twitter и присоединяйтесь к нашему ML SubReddit с более чем 100 тысячами участников и подписывайтесь на нашу рассылку. А если вы в Telegram, присоединяйтесь к нам и там.
1. Какие этапы включает в себя создание полного конвейера для обработки данных и машинного обучения с помощью Apache Spark и PySpark?
Ответ:
Создание полного конвейера включает в себя настройку PySpark, создание структурированного DataFrame, выполнение различных преобразований данных, применение оконных функций, введение пользовательской функции (UDF), объединение пользовательского набора данных с метаданными, вычисление аналитических сводок, подготовку данных для обучения модели, оценку точности модели и запись обработанных данных в формате Parquet.
2. Какие типы данных используются в примере кода для создания DataFrame?
Ответ:
В примере кода используются следующие типы данных: `IntegerType` (целочисленные значения), `StringType` (строковые значения), `FloatType` (значения с плавающей точкой), `StructType` и `StructField` для определения схемы DataFrame.
3. Какие функции из PySpark используются для подготовки данных и обучения модели в примере?
Ответ:
Для подготовки данных и обучения модели в примере используются следующие функции: `StringIndexer` для индексации категориальных столбцов, `VectorAssembler` для сборки признаков, `LogisticRegression` для обучения модели логистической регрессии и `MulticlassClassificationEvaluator` для оценки точности модели.
4. Как в примере происходит настройка SparkSession?
Ответ:
В примере SparkSession настраивается с помощью метода `SparkSession.builder`, где указывается имя приложения (`appName`), мастер (`master`) и конфигурация (`config`). Затем создаётся экземпляр SparkSession с помощью метода `getOrCreate()`.
5. Какие аналитические сводки вычисляются в примере?
Ответ:
В примере вычисляются средний доход и количество пользователей по регионам и типам планов.