Руководство по программированию для создания масштабируемого конвейера обработки данных машинного обучения с помощью Daft

В этом руководстве мы рассмотрим, как использовать Daft в качестве высокопроизводительного механизма обработки данных на языке Python для создания комплексного аналитического конвейера. Мы начнём с загрузки реального набора данных MNIST, затем будем последовательно преобразовывать его с помощью UDF, разработки функций, агрегации, объединений и отложенного выполнения. Также мы покажем, как можно беспрепятственно сочетать обработку структурированных данных, численные вычисления и машинное обучение.

Установка библиотек

Мы устанавливаем Daft и поддерживающие его библиотеки непосредственно в Google Colab, чтобы обеспечить чистую и воспроизводимую среду.

«`python
!pip -q install daft pyarrow pandas numpy scikit-learn
«`

«`python
import os
os.environ[«DONOTTRACK»] = «true»
«`

«`python
import numpy as np
import pandas as pd
import daft
from daft import col
«`

«`python
print(«Daft version:», getattr(daft, «version«, «unknown»))
«`

Загрузка данных

«`python
URL = «https://github.com/Eventual-Inc/mnist-json/raw/master/mnisthandwrittentest.json.gz»

df = daft.read_json(URL)
print(«\nSchema (sampled):»)
print(df.schema())

print(«\nPeek:»)
df.show(5)
«`

Преобразование данных

«`python
def to_28x28(pixels):
arr = np.array(pixels, dtype=np.float32)
if arr.size != 784:
return None
return arr.reshape(28, 28)

df2 = (
df
.with_column(
«img_28x28»,
col(«image»).apply(to28×28, returndtype=daft.DataType.python())
)
.with_column(
«pixel_mean»,
col(«img_28x28»).apply(lambda x: float(np.mean(x)) if x is not None else None,
return_dtype=daft.DataType.float32())
)
.with_column(
«pixel_std»,
col(«img_28x28»).apply(lambda x: float(np.std(x)) if x is not None else None,
return_dtype=daft.DataType.float32())
)
)

print(«\nAfter reshaping + simple features:»)
df2.select(«label», «pixelmean», «pixelstd»).show(5)
«`

Создание столбца признаков

«`python
@daft.udf(returndtype=daft.DataType.list(daft.DataType.float32()), batchsize=512)
def featurize(images_28x28):
out = []
for img in images28×28.topylist():
if img is None:
out.append(None)
continue
img = np.asarray(img, dtype=np.float32)
row_sums = img.sum(axis=1) / 255.0
col_sums = img.sum(axis=0) / 255.0
total = img.sum() + 1e-6
ys, xs = np.indices(img.shape)
cy = float((ys * img).sum() / total) / 28.0
cx = float((xs * img).sum() / total) / 28.0
vec = np.concatenate([rowsums, colsums, np.array([cy, cx, img.mean()/255.0, img.std()/255.0], dtype=np.float32)])
out.append(vec.astype(np.float32).tolist())
return out

df3 = df2.withcolumn(«features», featurize(col(«img28×28″)))

print(«\nFeature column created (list[float]):»)
df3.select(«label», «features»).show(2)
«`

Группировка и объединение данных

«`python
label_stats = (
df3.groupby(«label»)
.agg(
col(«label»).count().alias(«n»),
col(«pixelmean»).mean().alias(«meanpixel_mean»),
col(«pixelstd»).mean().alias(«meanpixel_std»),
)
.sort(«label»)
)

print(«\nLabel distribution + summary stats:»)
label_stats.show(10)

df4 = df3.join(label_stats, on=»label», how=»left»)

print(«\nJoined label stats back onto each row:»)
df4.select(«label», «n», «meanpixelmean», «meanpixelstd»).show(5)
«`

Обучение модели

«`python
small = df4.select(«label», «features»).collect().to_pandas()

small = small.dropna(subset=[«label», «features»]).reset_index(drop=True)

X = np.vstack(small[«features»].apply(np.array).values).astype(np.float32)
y = small[«label»].astype(int).values

from sklearn.modelselection import traintest_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracyscore, classificationreport

clf = LogisticRegression(maxiter=1000, njobs=None)
clf.fit(Xtrain, ytrain)

pred = clf.predict(X_test)
acc = accuracyscore(ytest, pred)

print(«\nBaseline accuracy (feature-engineered LogisticRegression):», round(acc, 4))
print(«\nClassification report:»)
print(classificationreport(ytest, pred, digits=4))
«`

Сохранение данных

«`python
outdf = df4.select(«label», «features», «pixelmean», «pixel_std», «n»)
outpath = «/content/daftmnist_features.parquet»
outdf.writeparquet(out_path)

print(«\nWrote parquet to:», out_path)

dfback = daft.readparquet(out_path)
print(«\nRead-back check:»)
df_back.show(3)
«`

В этом руководстве мы создали производственный конвейер обработки данных с помощью Daft, начиная с загрузки необработанных данных в формате JSON и заканчивая разработкой функций, агрегацией, обучением модели и сохранением данных в формате Parquet. Мы продемонстрировали, как интегрировать расширенную логику UDF, выполнять эффективные операции groupby и join, а также материализовать результаты для последующего машинного обучения. Всё это — в рамках чистой и масштабируемой структуры. Через этот процесс мы увидели, как Daft позволяет нам обрабатывать сложные преобразования, оставаясь при этом на языке Python и обеспечивая высокую производительность. Мы завершили созданием повторно используемого, комплексного конвейера, который демонстрирует, как можно объединить современные процессы обработки данных и машинного обучения в единой среде.

1. Какие основные этапы включает в себя создание масштабируемого конвейера обработки данных с помощью Daft, и как они представлены в данном руководстве?

Ответ: в руководстве описаны этапы загрузки данных, их преобразование, создание столбца признаков, группировка и объединение данных, обучение модели и сохранение данных.

2. Какие библиотеки, помимо Daft, используются в данном руководстве для обработки данных?

Ответ: в руководстве используются библиотеки NumPy, Pandas, pyarrow и scikit-learn.

3. Какие типы данных используются в процессе обработки данных в этом руководстве?

Ответ: в процессе обработки данных используются типы данных NumPy (numpy.ndarray), Pandas (pandas.DataFrame), а также специфические типы данных Daft (daft.DataType). В частности, применяются float32 для числовых значений и list(daft.DataType.float32()) для списков чисел.

4. Какие методы агрегации и объединения данных применяются в этом руководстве?

Ответ: в руководстве применяются методы агрегации (mean) и объединения данных (join) для группировки данных по меткам и объединения статистических данных с исходными данными.

5. Какие шаги предпринимаются для сохранения данных после обработки?

Ответ: после обработки данные сохраняются в формате Parquet с помощью метода write_parquet, который записывает DataFrame в указанный файл.

Источник