В этом руководстве мы разрабатываем комплексный аналитический и моделирующий конвейер в производственном стиле с использованием Vaex для эффективной работы с миллионами строк без материализации данных в памяти. Мы генерируем реалистичный крупномасштабный набор данных, разрабатываем расширенные поведенческие и городские характеристики с использованием отложенных выражений и приблизительной статистики, а затем агрегируем полученные сведения в большом масштабе. Затем мы интегрируем Vaex с scikit-learn для обучения и оценки прогнозной модели, демонстрируя, как Vaex может выступать в качестве основы для высокопроизводительного исследовательского анализа и рабочих процессов машинного обучения.
Установка необходимых библиотек
«`python
!pip -q install «vaex==4.19.0» «vaex-core==4.19.0» «vaex-ml==0.19.0» «vaex-viz==0.6.0» «vaex-hdf5==0.15.0» «pyarrow>=14» «scikit-learn>=1.3»
«`
«`python
import os, time, json, numpy as np, pandas as pd
import vaex
import vaex.ml
from vaex.ml.sklearn import Predictor
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import rocaucscore, averageprecisionscore
print(«Python:», import(«sys»).version.split()[0])
print(«vaex:», vaex.version)
print(«numpy:», np.version)
print(«pandas:», pd.version)
«`
Генерация данных
«`python
rng = np.random.default_rng(7)
n = 2000000
cities = np.array([«Montreal», «Toronto», «Vancouver», «Calgary», «Ottawa», «Edmonton», «Quebec City», «Winnipeg»], dtype=object)
city = rng.choice(cities, size=n, replace=True, p=np.array([0.16, 0.18, 0.12, 0.10, 0.10, 0.10, 0.10, 0.14]))
age = rng.integers(18, 75, size=n, endpoint=False).astype(«int32»)
tenure_m = rng.integers(0, 180, size=n, endpoint=False).astype(«int32»)
tx = rng.poisson(lam=22, size=n).astype(«int32»)
base_income = rng.lognormal(mean=10.6, sigma=0.45, size=n).astype(«float64»)
citymult = pd.Series({«Montreal»: 0.92, «Toronto»: 1.05, «Vancouver»: 1.10, «Calgary»: 1.02, «Ottawa»: 1.00, «Edmonton»: 0.98, «Quebec City»: 0.88, «Winnipeg»: 0.90}).reindex(city).tonumpy()
income = (baseincome citymult (1.0 + 0.004(age-35)) (1.0 + 0.0025*np.minimum(tenure_m,120))).astype(«float64»)
income = np.clip(income, 18000, 420000)
noise = rng.normal(0, 1, size=n).astype(«float64»)
score_latent = (
0.55*np.log1p(income/1000.0)
+ 0.28*np.log1p(tx)
+ 0.18*np.sqrt(np.maximum(tenure_m,0)/12.0 + 1e-9)
— 0.012*(age-40)
+ 0.22*(city == «Vancouver»).astype(«float64»)
+ 0.15*(city == «Toronto»).astype(«float64»)
+ 0.10*(city == «Ottawa»).astype(«float64»)
+ 0.65*noise
)
p = 1.0/(1.0 + np.exp(-(scorelatent — np.quantile(scorelatent, 0.70))))
target = (rng.random(n) < p).astype("int8")
df = vaex.fromarrays(city=city, age=age, tenurem=tenure_m, tx=tx, income=income, target=target)
«`
Создание и кодирование категориальных данных
«`python
encoder = vaex.ml.LabelEncoder(features=[«city»])
df = encoder.fit_transform(df)
citymap = encoder.labels[«city»]
invcitymap = {v: k for k, v in city_map.items()}
ncities = len(citymap)
«`
Вычисление агрегированных статистических данных
«`python
p95incomekbycity = df.percentileapprox(«incomek», 95, binby=»labelencodedcity», shape=ncities, limits=[-0.5, ncities-0.5])
p50valuebycity = df.percentileapprox(«valuescore», 50, binby=»labelencodedcity», shape=ncities, limits=[-0.5, n_cities-0.5])
avgincomekbycity = df.mean(«incomek», binby=»labelencodedcity», shape=ncities, limits=[-0.5, n_cities-0.5])
targetratebycity = df.mean(«target», binby=»labelencodedcity», shape=ncities, limits=[-0.5, n_cities-0.5])
nbycity = df.count(binby=»labelencodedcity», shape=ncities, limits=[-0.5, ncities-0.5])
«`
Стандартизация числовых признаков
«`python
features_num = [
«age», «tenurey», «tx», «incomek», «logincome», «txperyear», «valuescore»,
«p95incomek», «avgincomek», «medianvaluescore», «target_rate»,
«incomevscityp95″, «valuevscitymedian»
]
scaler = vaex.ml.StandardScaler(features=featuresnum, withmean=True, withstd=True, prefix=»z«)
df = scaler.fit_transform(df)
«`
Обучение модели
«`python
features = [«z«+f for f in featuresnum] + [«labelencodedcity»]
dftrain, dftest = df.splitrandom([0.80, 0.20], randomstate=42)
model = LogisticRegression(maxiter=250, solver=»lbfgs», njobs=None)
vaexmodel = Predictor(model=model, features=features, target=»target», predictionname=»pred»)
t0 = time.time()
vaexmodel.fit(df=dftrain)
fit_s = time.time() — t0
dftest = vaexmodel.transform(df_test)
ytrue = dftest[«target»].to_numpy()
ypred = dftest[«pred»].to_numpy()
auc = rocaucscore(ytrue, ypred)
ap = averageprecisionscore(ytrue, ypred)
print(«\nModel:»)
print(«fitseconds:», round(fits, 3))
print(«test_auc:», round(float(auc), 4))
print(«testavgprecision:», round(float(ap), 4))
«`
Анализ поведения модели
«`python
deciles = np.linspace(0, 1, 11)
cuts = np.quantile(y_pred, deciles)
cuts[0] = -np.inf
cuts[-1] = np.inf
bucket = np.digitize(y_pred, cuts[1:-1], right=True).astype(«int32»)
dftestlocal = vaex.fromarrays(ytrue=ytrue.astype(«int8»), ypred=y_pred.astype(«float64»), bucket=bucket)
lift = dftestlocal.groupby(by=»bucket», agg={«n»: vaex.agg.count(), «rate»: vaex.agg.mean(«ytrue»), «avgpred»: vaex.agg.mean(«y_pred»)}).sort(«bucket»)
liftpd = lift.topandas_df()
baseline = float(dftestlocal[«y_true»].mean())
liftpd[«lift»] = liftpd[«rate»] / (baseline + 1e-12)
print(«\nDecile lift table:»)
print(liftpd.tostring(index=False))
«`
Экспорт данных и сохранение состояния конвейера
«`python
outdir = «/content/vaexartifacts»
os.makedirs(outdir, existok=True)
parquetpath = os.path.join(outdir, «customers_vaex.parquet»)
statepath = os.path.join(outdir, «vaex_pipeline.json»)
basecols = [«city», «labelencodedcity», «age», «tenurem», «tenurey», «tx», «income», «incomek», «value_score», «target»]
exportcols = basecols + [«z«+f for f in featuresnum]
dfexport = df[exportcols].sample(n=500000, randomstate=1)
if os.path.exists(parquet_path):
os.remove(parquet_path)
dfexport.exportparquet(parquet_path)
pipeline_state = {
«vaex_version»: vaex.version,
«encoderlabels»: {k: {str(kk): int(vv) for kk,vv in v.items()} for k,v in encoder.labels.items()},
«scalermean»: [float(x) for x in scaler.mean],
«scalerstd»: [float(x) for x in scaler.std],
«featuresnum»: featuresnum,
«exportcols»: exportcols,
}
with open(state_path, «w») as f:
json.dump(pipeline_state, f)
dfreopen = vaex.open(parquetpath)
«`
В этом руководстве мы продемонстрировали, как Vaex позволяет выполнять быструю обработку данных с эффективным использованием памяти, одновременно поддерживая расширенную разработку функций, агрегацию и интеграцию моделей. Мы показали, что приблизительная статистика, отложенные вычисления и выполнение вне ядра позволяют нам плавно масштабировать процесс от анализа до создания артефактов, готовых к развёртыванию. Экспортируя воспроизводимые функции и сохраняя полное состояние конвейера, мы замкнули цикл от необработанных данных до логического вывода, проиллюстрировав, как Vaex естественным образом вписывается в современные рабочие процессы Python для работы с большими данными.
1. Какие инструменты и библиотеки используются в статье для работы с большими данными и машинного обучения?
В статье используются Vaex для эффективной работы с миллионами строк без материализации данных в памяти, NumPy, Pandas, scikit-learn для обучения и оценки прогнозной модели.
2. Какие методы и подходы применяются для генерации реалистичных данных в статье?
В статье используется генерация данных с помощью NumPy для создания массивов с различными характеристиками, такими как возраст, доход, и т. д. Также применяется случайный выбор городов для каждого элемента массива.
3. Как в статье осуществляется стандартизация числовых признаков?
В статье для стандартизации числовых признаков используется класс Vaex.ml.StandardScaler, который позволяет масштабировать данные, приводя их к стандартному нормальному распределению.
4. Какие метрики используются для оценки качества модели в статье?
В статье для оценки качества модели используются метрики ROC AUC (rocaucscore) и средняя точность (averageprecisionscore).
5. Как в статье осуществляется экспорт данных и сохранение состояния конвейера?
В статье данные экспортируются в формате Parquet с помощью метода export_parquet. Состояние конвейера сохраняется в виде JSON-файла, где сохраняются параметры кодировщика, масштабировщика и другие важные параметры.