В этом руководстве мы реализуем продвинутый конвейер обработки данных с помощью Dagster. Мы настроим специальный CSV-IOManager для сохранения ресурсов, определим разделённую ежедневную генерацию данных и обработаем синтетические данные о продажах с помощью очистки, разработки функций и обучения модели.
Установка необходимых библиотек
Мы начнём с установки необходимых библиотек: Dagster, Pandas и scikit-learn, чтобы иметь полный набор инструментов, доступных в Colab. Затем мы импортируем основные модули, настроим NumPy и Pandas для обработки данных и определим базовую директорию вместе с датой начала для организации выходных данных нашего конвейера.
«`python
import sys, subprocess, json, os
subprocess.check_call([sys.executable, «-m», «pip», «install», «-q», «dagster», «pandas», «scikit-learn»])
import numpy as np, pandas as pd
from pathlib import Path
from dagster import (
asset, AssetCheckResult, asset_check, Definitions, materialize, Output,
DailyPartitionsDefinition, IOManager, io_manager
)
from sklearn.linear_model import LinearRegression
BASE = Path(«/content/dagstore»); BASE.mkdir(parents=True, exist_ok=True)
START = «2025-08-01»
«`
Определение пользовательского CSVIOManager
Мы определяем пользовательский CSVIOManager для сохранения выходных данных ресурсов в виде файлов CSV или JSON и перезагружаем их при необходимости. Затем мы регистрируем его в Dagster как csviomanager и настраиваем схему ежедневной разбивки, чтобы наш конвейер мог обрабатывать данные за каждую дату независимо.
«`python
class CSVIOManager(IOManager):
def init(self, base: Path): self.base = base
def path(self, key, ext): return self.base / f»{‘‘.join(key.path)}.{ext}»
def handle_output(self, context, obj):
if isinstance(obj, pd.DataFrame):
p = self.path(context.assetkey, «csv»); obj.to_csv(p, index=False)
context.log.info(f»Saved {context.asset_key} -> {p}»)
else:
p = self.path(context.assetkey, «json»); p.write_text(json.dumps(obj, indent=2))
context.log.info(f»Saved {context.asset_key} -> {p}»)
def load_input(self, context):
k = context.upstreamoutput.assetkey; p = self._path(k, «csv»)
df = pd.read_csv(p); context.log.info(f»Loaded {k} <- {p} ({len(df)} rows)"); return df
@io_manager
def csviomanager(_): return CSVIOManager(BASE)
daily = DailyPartitionsDefinition(start_date=START)
«`
Создание основных ресурсов для конвейера
Мы создаём три основных ресурса для конвейера. Во-первых, rawsales генерирует синтетические ежедневные данные о продажах с шумом и периодическими пропущенными значениями, имитируя несовершенства реального мира. Затем cleansales удаляет нули и обрезает выбросы для стабилизации набора данных, регистрируя метаданные о диапазонах и количестве строк. Наконец, features выполняют разработку функций, добавляя взаимодействие и стандартизированные переменные, подготавливая данные для последующего моделирования.
«`python
@asset(partitions_def=daily, description=»Synthetic raw sales with noise & occasional nulls.»)
def raw_sales(context) -> Output[pd.DataFrame]:
rng = np.random.default_rng(42)
n = 200; day = context.partition_key
x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
sales = 2.5 x + 30 promo + noise + 50
x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
df = pd.DataFrame({«date»: day, «units»: x, «promo»: promo, «sales»: sales})
meta = {«rows»: n, «nullunits»: int(df[«units»].isna().sum()), «head»: df.head().tomarkdown()}
return Output(df, metadata=meta)
@asset(description=»Clean nulls, clip outliers for robust downstream modeling.»)
def cleansales(context, rawsales: pd.DataFrame) -> Output[pd.DataFrame]:
df = raw_sales.dropna(subset=[«units»]).copy()
lo, hi = df[«units»].quantile([0.01, 0.99]); df[«units»] = df[«units»].clip(lo, hi)
meta = {«rows»: len(df), «unitsmin»: float(df.units.min()), «unitsmax»: float(df.units.max())}
return Output(df, metadata=meta)
@asset(description=»Feature engineering: interactions & standardized columns.»)
def features(context, clean_sales: pd.DataFrame) -> Output[pd.DataFrame]:
df = clean_sales.copy()
df[«unitssq»] = df[«units»] 2; df[«unitspromo»] = df[«units»] * df[«promo»]
for c in [«units», «unitssq», «unitspromo»]:
mu, sigma = df[c].mean(), df[c].std(ddof=0) or 1.0
df[f»z_{c}»] = (df[c] — mu) / sigma
return Output(df, metadata={«rows»: len(df), «cols»: list(df.columns)})
«`
Проверка качества данных и моделирование
Мы укрепляем конвейер с помощью проверки и моделирования. Проверка качества cleansalesquality обеспечивает целостность данных, проверяя, что нет нулей, в поле promo есть только значения 0/1, а очищенные значения units остаются в допустимых пределах. Затем tinymodelmetrics обучает простую линейную регрессию на разработанных функциях и выводит ключевые метрики, такие как обучение и изученные коэффициенты.
«`python
@assetcheck(asset=cleansales, description=»No nulls; promo in {0,1}; units within clipped bounds.»)
def cleansalesquality(clean_sales: pd.DataFrame) -> AssetCheckResult:
nulls = int(clean_sales.isna().sum().sum())
promook = bool(set(cleansales[«promo»].unique()).issubset({0, 1}))
unitsok = bool(cleansales[«units»].between(cleansales[«units»].min(), cleansales[«units»].max()).all())
passed = bool((nulls == 0) and promook and unitsok)
return AssetCheckResult(
passed=passed,
metadata={«nulls»: nulls, «promook»: promook, «unitsok»: unitsok},
)
@asset(description=»Train a tiny linear regressor; emit R^2 and coefficients.»)
def tinymodelmetrics(context, features: pd.DataFrame) -> dict:
X = features[[«zunits», «zunitssq», «zunits_promo», «promo»]].values
y = features[«sales»].values
model = LinearRegression().fit(X, y)
return {«r2_train»: float(model.score(X, y)),
{n: float(c) for n, c in zip([«zunits»,»zunitssq»,»zunitspromo»,»promo»], model.coef)}}
«`
Регистрация ресурсов и запуск конвейера
Мы регистрируем наши ресурсы и менеджер ввода-вывода в Definitions, затем материализуем весь DAG для выбранного ключа разбивки за один запуск. Мы сохраняем артефакты CSV/JSON в /content/dagstore и выводим флаг успеха, а также размеры сохранённых файлов и модельные метрики для немедленной проверки.
«`python
defs = Definitions(
assets=[rawsales, cleansales, features, tinymodelmetrics, cleansalesquality],
resources={«iomanager»: csvio_manager}
)
if name == «main«:
runday = os.environ.get(«RUNDATE») or START
print(«Materializing everything for:», run_day)
result = materialize(
[rawsales, cleansales, features, tinymodelmetrics, cleansalesquality],
partitionkey=runday,
resources={«iomanager»: csvio_manager},
)
print(«Run success:», result.success)
for fname in [«rawsales.csv»,»cleansales.csv»,»features.csv»,»tinymodelmetrics.json»]:
f = BASE / fname
if f.exists():
print(fname, «->», f.stat().st_size, «bytes»)
if fname.endswith(«.json»):
print(«Metrics:», json.loads(f.read_text()))
«`
В заключение мы материализуем все ресурсы и проверки в одном запуске Dagster, подтверждаем качество данных и обучаем регрессионную модель, метрики которой сохраняются для проверки. Мы сохраняем конвейер модульным, при этом каждый ресурс производит и сохраняет свои выходные данные в CSV или JSON, и обеспечиваем совместимость путём явного преобразования значений метаданных в поддерживаемые типы.
Это руководство демонстрирует, как мы можем объединить разбиение, определения ресурсов и проверки для создания технически надёжного и воспроизводимого рабочего процесса, предоставляя нам практическую основу для расширения в сторону более сложных конвейеров реального мира.
1. Какие библиотеки и инструменты используются в статье для создания конвейера обработки данных в Dagster?
В статье используются библиотеки Dagster, Pandas, scikit-learn, NumPy, а также модули Python, такие как sys, subprocess, json и os.
2. Как в статье реализована ежедневная генерация данных для конвейера?
В статье используется класс DailyPartitionsDefinition для настройки ежедневной разбивки данных. Это позволяет конвейеру обрабатывать данные за каждую дату независимо.
3. Какие основные ресурсы создаются для конвейера в статье?
В статье создаются три основных ресурса: rawsales для генерации синтетических данных о продажах, cleansales для очистки данных и features для разработки функций.
4. Как в статье обеспечивается качество данных в конвейере?
В статье используется проверка качества данных cleansalesquality для обеспечения целостности данных. Эта проверка проверяет, что нет нулей, в поле promo есть только значения 0/1, а очищенные значения units остаются в допустимых пределах.
5. Какие метрики выводятся после обучения простой линейной регрессии на разработанных функциях?
После обучения простой линейной регрессии на разработанных функциях выводятся метрики R^2 и коэффициенты модели.