В этом руководстве мы рассмотрим, как использовать Daft в качестве высокопроизводительного механизма обработки данных на языке Python для создания комплексного аналитического конвейера. Мы начнём с загрузки реального набора данных MNIST, затем будем последовательно преобразовывать его с помощью UDF, разработки функций, агрегации, объединений и отложенного выполнения. Также мы покажем, как можно беспрепятственно сочетать обработку структурированных данных, численные вычисления и машинное обучение.
Установка библиотек
Мы устанавливаем Daft и поддерживающие его библиотеки непосредственно в Google Colab, чтобы обеспечить чистую и воспроизводимую среду.
«`python
!pip -q install daft pyarrow pandas numpy scikit-learn
«`
«`python
import os
os.environ[«DONOTTRACK»] = «true»
«`
«`python
import numpy as np
import pandas as pd
import daft
from daft import col
«`
«`python
print(«Daft version:», getattr(daft, «version«, «unknown»))
«`
Загрузка данных
«`python
URL = «https://github.com/Eventual-Inc/mnist-json/raw/master/mnisthandwrittentest.json.gz»
df = daft.read_json(URL)
print(«\nSchema (sampled):»)
print(df.schema())
print(«\nPeek:»)
df.show(5)
«`
Преобразование данных
«`python
def to_28x28(pixels):
arr = np.array(pixels, dtype=np.float32)
if arr.size != 784:
return None
return arr.reshape(28, 28)
df2 = (
df
.with_column(
«img_28x28»,
col(«image»).apply(to28×28, returndtype=daft.DataType.python())
)
.with_column(
«pixel_mean»,
col(«img_28x28»).apply(lambda x: float(np.mean(x)) if x is not None else None,
return_dtype=daft.DataType.float32())
)
.with_column(
«pixel_std»,
col(«img_28x28»).apply(lambda x: float(np.std(x)) if x is not None else None,
return_dtype=daft.DataType.float32())
)
)
print(«\nAfter reshaping + simple features:»)
df2.select(«label», «pixelmean», «pixelstd»).show(5)
«`
Создание столбца признаков
«`python
@daft.udf(returndtype=daft.DataType.list(daft.DataType.float32()), batchsize=512)
def featurize(images_28x28):
out = []
for img in images28×28.topylist():
if img is None:
out.append(None)
continue
img = np.asarray(img, dtype=np.float32)
row_sums = img.sum(axis=1) / 255.0
col_sums = img.sum(axis=0) / 255.0
total = img.sum() + 1e-6
ys, xs = np.indices(img.shape)
cy = float((ys * img).sum() / total) / 28.0
cx = float((xs * img).sum() / total) / 28.0
vec = np.concatenate([rowsums, colsums, np.array([cy, cx, img.mean()/255.0, img.std()/255.0], dtype=np.float32)])
out.append(vec.astype(np.float32).tolist())
return out
df3 = df2.withcolumn(«features», featurize(col(«img28×28″)))
print(«\nFeature column created (list[float]):»)
df3.select(«label», «features»).show(2)
«`
Группировка и объединение данных
«`python
label_stats = (
df3.groupby(«label»)
.agg(
col(«label»).count().alias(«n»),
col(«pixelmean»).mean().alias(«meanpixel_mean»),
col(«pixelstd»).mean().alias(«meanpixel_std»),
)
.sort(«label»)
)
print(«\nLabel distribution + summary stats:»)
label_stats.show(10)
df4 = df3.join(label_stats, on=»label», how=»left»)
print(«\nJoined label stats back onto each row:»)
df4.select(«label», «n», «meanpixelmean», «meanpixelstd»).show(5)
«`
Обучение модели
«`python
small = df4.select(«label», «features»).collect().to_pandas()
small = small.dropna(subset=[«label», «features»]).reset_index(drop=True)
X = np.vstack(small[«features»].apply(np.array).values).astype(np.float32)
y = small[«label»].astype(int).values
from sklearn.modelselection import traintest_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracyscore, classificationreport
clf = LogisticRegression(maxiter=1000, njobs=None)
clf.fit(Xtrain, ytrain)
pred = clf.predict(X_test)
acc = accuracyscore(ytest, pred)
print(«\nBaseline accuracy (feature-engineered LogisticRegression):», round(acc, 4))
print(«\nClassification report:»)
print(classificationreport(ytest, pred, digits=4))
«`
Сохранение данных
«`python
outdf = df4.select(«label», «features», «pixelmean», «pixel_std», «n»)
outpath = «/content/daftmnist_features.parquet»
outdf.writeparquet(out_path)
print(«\nWrote parquet to:», out_path)
dfback = daft.readparquet(out_path)
print(«\nRead-back check:»)
df_back.show(3)
«`
В этом руководстве мы создали производственный конвейер обработки данных с помощью Daft, начиная с загрузки необработанных данных в формате JSON и заканчивая разработкой функций, агрегацией, обучением модели и сохранением данных в формате Parquet. Мы продемонстрировали, как интегрировать расширенную логику UDF, выполнять эффективные операции groupby и join, а также материализовать результаты для последующего машинного обучения. Всё это — в рамках чистой и масштабируемой структуры. Через этот процесс мы увидели, как Daft позволяет нам обрабатывать сложные преобразования, оставаясь при этом на языке Python и обеспечивая высокую производительность. Мы завершили созданием повторно используемого, комплексного конвейера, который демонстрирует, как можно объединить современные процессы обработки данных и машинного обучения в единой среде.
1. Какие основные этапы включает в себя создание масштабируемого конвейера обработки данных с помощью Daft, и как они представлены в данном руководстве?
Ответ: в руководстве описаны этапы загрузки данных, их преобразование, создание столбца признаков, группировка и объединение данных, обучение модели и сохранение данных.
2. Какие библиотеки, помимо Daft, используются в данном руководстве для обработки данных?
Ответ: в руководстве используются библиотеки NumPy, Pandas, pyarrow и scikit-learn.
3. Какие типы данных используются в процессе обработки данных в этом руководстве?
Ответ: в процессе обработки данных используются типы данных NumPy (numpy.ndarray), Pandas (pandas.DataFrame), а также специфические типы данных Daft (daft.DataType). В частности, применяются float32 для числовых значений и list(daft.DataType.float32()) для списков чисел.
4. Какие методы агрегации и объединения данных применяются в этом руководстве?
Ответ: в руководстве применяются методы агрегации (mean) и объединения данных (join) для группировки данных по меткам и объединения статистических данных с исходными данными.
5. Какие шаги предпринимаются для сохранения данных после обработки?
Ответ: после обработки данные сохраняются в формате Parquet с помощью метода write_parquet, который записывает DataFrame в указанный файл.