В этом руководстве мы реализуем продвинутый конвейер обработки данных с помощью Dagster. Мы настроим специальный CSV-IOManager для сохранения ресурсов, определим разделённую ежедневную генерацию данных и обработаем синтетические данные о продажах с помощью очистки, разработки функций и обучения модели.
Установка необходимых библиотек
Мы начнём с установки необходимых библиотек: Dagster, Pandas и scikit-learn, чтобы иметь полный набор инструментов, доступных в Colab. Затем мы импортируем основные модули, настроим NumPy и Pandas для обработки данных и определим базовую директорию вместе с датой начала для организации выходных данных нашего конвейера.
“`python
import sys, subprocess, json, os
subprocess.check_call([sys.executable, “-m”, “pip”, “install”, “-q”, “dagster”, “pandas”, “scikit-learn”])
import numpy as np, pandas as pd
from pathlib import Path
from dagster import (
asset, AssetCheckResult, asset_check, Definitions, materialize, Output,
DailyPartitionsDefinition, IOManager, io_manager
)
from sklearn.linear_model import LinearRegression
BASE = Path(“/content/dagstore”); BASE.mkdir(parents=True, exist_ok=True)
START = “2025-08-01”
“`
Определение пользовательского CSVIOManager
Мы определяем пользовательский CSVIOManager для сохранения выходных данных ресурсов в виде файлов CSV или JSON и перезагружаем их при необходимости. Затем мы регистрируем его в Dagster как csviomanager и настраиваем схему ежедневной разбивки, чтобы наш конвейер мог обрабатывать данные за каждую дату независимо.
“`python
class CSVIOManager(IOManager):
def init(self, base: Path): self.base = base
def path(self, key, ext): return self.base / f”{‘‘.join(key.path)}.{ext}”
def handle_output(self, context, obj):
if isinstance(obj, pd.DataFrame):
p = self.path(context.assetkey, “csv”); obj.to_csv(p, index=False)
context.log.info(f”Saved {context.asset_key} -> {p}”)
else:
p = self.path(context.assetkey, “json”); p.write_text(json.dumps(obj, indent=2))
context.log.info(f”Saved {context.asset_key} -> {p}”)
def load_input(self, context):
k = context.upstreamoutput.assetkey; p = self._path(k, “csv”)
df = pd.read_csv(p); context.log.info(f”Loaded {k} <- {p} ({len(df)} rows)"); return df
@io_manager
def csviomanager(_): return CSVIOManager(BASE)
daily = DailyPartitionsDefinition(start_date=START)
“`
Создание основных ресурсов для конвейера
Мы создаём три основных ресурса для конвейера. Во-первых, rawsales генерирует синтетические ежедневные данные о продажах с шумом и периодическими пропущенными значениями, имитируя несовершенства реального мира. Затем cleansales удаляет нули и обрезает выбросы для стабилизации набора данных, регистрируя метаданные о диапазонах и количестве строк. Наконец, features выполняют разработку функций, добавляя взаимодействие и стандартизированные переменные, подготавливая данные для последующего моделирования.
“`python
@asset(partitions_def=daily, description=”Synthetic raw sales with noise & occasional nulls.”)
def raw_sales(context) -> Output[pd.DataFrame]:
rng = np.random.default_rng(42)
n = 200; day = context.partition_key
x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
sales = 2.5 x + 30 promo + noise + 50
x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
df = pd.DataFrame({“date”: day, “units”: x, “promo”: promo, “sales”: sales})
meta = {“rows”: n, “nullunits”: int(df[“units”].isna().sum()), “head”: df.head().tomarkdown()}
return Output(df, metadata=meta)
@asset(description=”Clean nulls, clip outliers for robust downstream modeling.”)
def cleansales(context, rawsales: pd.DataFrame) -> Output[pd.DataFrame]:
df = raw_sales.dropna(subset=[“units”]).copy()
lo, hi = df[“units”].quantile([0.01, 0.99]); df[“units”] = df[“units”].clip(lo, hi)
meta = {“rows”: len(df), “unitsmin”: float(df.units.min()), “unitsmax”: float(df.units.max())}
return Output(df, metadata=meta)
@asset(description=”Feature engineering: interactions & standardized columns.”)
def features(context, clean_sales: pd.DataFrame) -> Output[pd.DataFrame]:
df = clean_sales.copy()
df[“unitssq”] = df[“units”] 2; df[“unitspromo”] = df[“units”] * df[“promo”]
for c in [“units”, “unitssq”, “unitspromo”]:
mu, sigma = df[c].mean(), df[c].std(ddof=0) or 1.0
df[f”z_{c}”] = (df[c] – mu) / sigma
return Output(df, metadata={“rows”: len(df), “cols”: list(df.columns)})
“`
Проверка качества данных и моделирование
Мы укрепляем конвейер с помощью проверки и моделирования. Проверка качества cleansalesquality обеспечивает целостность данных, проверяя, что нет нулей, в поле promo есть только значения 0/1, а очищенные значения units остаются в допустимых пределах. Затем tinymodelmetrics обучает простую линейную регрессию на разработанных функциях и выводит ключевые метрики, такие как обучение и изученные коэффициенты.
“`python
@assetcheck(asset=cleansales, description=”No nulls; promo in {0,1}; units within clipped bounds.”)
def cleansalesquality(clean_sales: pd.DataFrame) -> AssetCheckResult:
nulls = int(clean_sales.isna().sum().sum())
promook = bool(set(cleansales[“promo”].unique()).issubset({0, 1}))
unitsok = bool(cleansales[“units”].between(cleansales[“units”].min(), cleansales[“units”].max()).all())
passed = bool((nulls == 0) and promook and unitsok)
return AssetCheckResult(
passed=passed,
metadata={“nulls”: nulls, “promook”: promook, “unitsok”: unitsok},
)
@asset(description=”Train a tiny linear regressor; emit R^2 and coefficients.”)
def tinymodelmetrics(context, features: pd.DataFrame) -> dict:
X = features[[“zunits”, “zunitssq”, “zunits_promo”, “promo”]].values
y = features[“sales”].values
model = LinearRegression().fit(X, y)
return {“r2_train”: float(model.score(X, y)),
{n: float(c) for n, c in zip([“zunits”,”zunitssq”,”zunitspromo”,”promo”], model.coef)}}
“`
Регистрация ресурсов и запуск конвейера
Мы регистрируем наши ресурсы и менеджер ввода-вывода в Definitions, затем материализуем весь DAG для выбранного ключа разбивки за один запуск. Мы сохраняем артефакты CSV/JSON в /content/dagstore и выводим флаг успеха, а также размеры сохранённых файлов и модельные метрики для немедленной проверки.
“`python
defs = Definitions(
assets=[rawsales, cleansales, features, tinymodelmetrics, cleansalesquality],
resources={“iomanager”: csvio_manager}
)
if name == “main“:
runday = os.environ.get(“RUNDATE”) or START
print(“Materializing everything for:”, run_day)
result = materialize(
[rawsales, cleansales, features, tinymodelmetrics, cleansalesquality],
partitionkey=runday,
resources={“iomanager”: csvio_manager},
)
print(“Run success:”, result.success)
for fname in [“rawsales.csv”,”cleansales.csv”,”features.csv”,”tinymodelmetrics.json”]:
f = BASE / fname
if f.exists():
print(fname, “->”, f.stat().st_size, “bytes”)
if fname.endswith(“.json”):
print(“Metrics:”, json.loads(f.read_text()))
“`
В заключение мы материализуем все ресурсы и проверки в одном запуске Dagster, подтверждаем качество данных и обучаем регрессионную модель, метрики которой сохраняются для проверки. Мы сохраняем конвейер модульным, при этом каждый ресурс производит и сохраняет свои выходные данные в CSV или JSON, и обеспечиваем совместимость путём явного преобразования значений метаданных в поддерживаемые типы.
Это руководство демонстрирует, как мы можем объединить разбиение, определения ресурсов и проверки для создания технически надёжного и воспроизводимого рабочего процесса, предоставляя нам практическую основу для расширения в сторону более сложных конвейеров реального мира.
1. Какие библиотеки и инструменты используются в статье для создания конвейера обработки данных в Dagster?
В статье используются библиотеки Dagster, Pandas, scikit-learn, NumPy, а также модули Python, такие как sys, subprocess, json и os.
2. Как в статье реализована ежедневная генерация данных для конвейера?
В статье используется класс DailyPartitionsDefinition для настройки ежедневной разбивки данных. Это позволяет конвейеру обрабатывать данные за каждую дату независимо.
3. Какие основные ресурсы создаются для конвейера в статье?
В статье создаются три основных ресурса: rawsales для генерации синтетических данных о продажах, cleansales для очистки данных и features для разработки функций.
4. Как в статье обеспечивается качество данных в конвейере?
В статье используется проверка качества данных cleansalesquality для обеспечения целостности данных. Эта проверка проверяет, что нет нулей, в поле promo есть только значения 0/1, а очищенные значения units остаются в допустимых пределах.
5. Какие метрики выводятся после обучения простой линейной регрессии на разработанных функциях?
После обучения простой линейной регрессии на разработанных функциях выводятся метрики R^2 и коэффициенты модели.