В этом руководстве мы рассмотрим создание продвинутого конвейера для анализа данных с использованием Polars — библиотеки DataFrame, разработанной для оптимальной производительности и масштабируемости. Наша цель — продемонстрировать, как можно использовать отложенную оценку Polars, сложные выражения, оконные функции и SQL-интерфейс для эффективной обработки крупномасштабных финансовых наборов данных.
Шаг 1: генерация синтетического набора данных
Мы начнём с импорта основных библиотек, включая Polars для высокопроизводительных операций с DataFrame и NumPy для генерации синтетических данных. Для обеспечения совместимости мы добавим шаг установки Polars в случае, если она ещё не установлена.
“`python
import polars as pl
import numpy as np
from datetime import datetime, timedelta
import io
try:
import polars as pl
except ImportError:
import subprocess
subprocess.run([“pip”, “install”, “polars”], check=True)
import polars as pl
print(” Advanced Polars Analytics Pipeline”)
print(“=” * 50)
np.random.seed(42)
n_records = 100000
dates = [datetime(2020, 1, 1) + timedelta(days=i//100) for i in range(n_records)]
tickers = np.random.choice([‘AAPL’, ‘GOOGL’, ‘MSFT’, ‘TSLA’, ‘AMZN’], n_records)
Create complex synthetic dataset
data = {
‘timestamp’: dates,
‘ticker’: tickers,
‘price’: np.random.lognormal(4, 0.3, n_records),
‘volume’: np.random.exponential(1000000, n_records).astype(int),
‘bidaskspread’: np.random.exponential(0.01, n_records),
‘marketcap’: np.random.lognormal(25, 1, nrecords),
‘sector’: np.random.choice([‘Tech’, ‘Finance’, ‘Healthcare’, ‘Energy’], n_records)
}
print(f” Generated {n_records:,} synthetic financial records”)
“`
Шаг 2: загрузка данных в Polars LazyFrame
Мы загружаем наш синтетический набор данных в Polars LazyFrame, чтобы включить отложенное выполнение, что позволяет нам эффективно связывать сложные преобразования.
“`python
lf = pl.LazyFrame(data)
result = (
lf
.with_columns([
pl.col(‘timestamp’).dt.year().alias(‘year’),
pl.col(‘timestamp’).dt.month().alias(‘month’),
pl.col(‘timestamp’).dt.weekday().alias(‘weekday’),
pl.col(‘timestamp’).dt.quarter().alias(‘quarter’)
])
.with_columns([
pl.col(‘price’).rollingmean(20).over(‘ticker’).alias(‘sma20′),
pl.col(‘price’).rollingstd(20).over(‘ticker’).alias(‘volatility20′),
pl.col(‘price’).ewmmean(span=12).over(‘ticker’).alias(’ema12′),
pl.col(‘price’).diff().alias(‘price_diff’),
(pl.col(‘volume’) * pl.col(‘price’)).alias(‘dollar_volume’)
])
.with_columns([
pl.col(‘pricediff’).clip(0, None).rollingmean(14).over(‘ticker’).alias(‘rsi_up’),
pl.col(‘pricediff’).abs().rollingmean(14).over(‘ticker’).alias(‘rsi_down’),
(pl.col(‘price’) – pl.col(‘sma20′)).alias(‘bbposition’)
])
.with_columns([
(100 – (100 / (1 + pl.col(‘rsiup’) / pl.col(‘rsidown’)))).alias(‘rsi’)
])
.filter(
(pl.col(‘price’) > 10) &
(pl.col(‘volume’) > 100000) &
(pl.col(‘sma20′).isnot_null())
)
.group_by([‘ticker’, ‘year’, ‘quarter’])
.agg([
pl.col(‘price’).mean().alias(‘avg_price’),
pl.col(‘price’).std().alias(‘price_volatility’),
pl.col(‘price’).min().alias(‘min_price’),
pl.col(‘price’).max().alias(‘max_price’),
pl.col(‘price’).quantile(0.5).alias(‘median_price’),
pl.col(‘volume’).sum().alias(‘total_volume’),
pl.col(‘dollarvolume’).sum().alias(‘totaldollar_volume’),
pl.col(‘rsi’).filter(pl.col(‘rsi’).isnotnull()).mean().alias(‘avg_rsi’),
pl.col(‘volatility20′).mean().alias(‘avgvolatility’),
pl.col(‘bbposition’).std().alias(‘bollingerdeviation’),
pl.len().alias(‘trading_days’),
pl.col(‘sector’).nunique().alias(‘sectorscount’),
(pl.col(‘price’) > pl.col(‘sma20′)).mean().alias(‘abovesma_ratio’),
((pl.col(‘price’).max() – pl.col(‘price’).min()) / pl.col(‘price’).min())
.alias(‘pricerangepct’)
])
.with_columns([
pl.col(‘totaldollarvolume’).rank(method=’ordinal’, descending=True).alias(‘volume_rank’),
pl.col(‘pricevolatility’).rank(method=’ordinal’, descending=True).alias(‘volatilityrank’)
])
.filter(pl.col(‘trading_days’) >= 10)
.sort([‘ticker’, ‘year’, ‘quarter’])
)
“`
Шаг 3: анализ результатов
Мы собираем результаты в DataFrame и сразу просматриваем топ-10 кварталов по общему объёму торгов в долларах. Это помогает нам определить периоды интенсивной торговой активности.
“`python
df = result.collect()
print(f”\n Analysis Results: {df.height:,} aggregated records”)
print(“\nTop 10 High-Volume Quarters:”)
print(df.sort(‘totaldollarvolume’, descending=True).head(10).to_pandas())
“`
Шаг 4: демонстрация SQL-интерфейса
Мы завершаем конвейер, демонстрируя элегантный SQL-интерфейс Polars, запуская агрегатный запрос для анализа производительности тикеров после 2021 года с помощью знакомого синтаксиса SQL.
“`python
print(“\n SQL Interface Demo:”)
pl.Config.settblrows(5)
sql_result = pl.sql(“””
SELECT
ticker,
AVG(avgprice) as meanprice,
STDDEV(pricevolatility) as volatilityconsistency,
SUM(totaldollarvolume) as total_volume,
COUNT(*) as quarters_tracked
FROM df
WHERE year >= 2021
GROUP BY ticker
ORDER BY total_volume DESC
“””, eager=True)
print(sql_result)
“`
Заключение
Мы увидели, как Polars может оптимизировать сложные аналитические рабочие процессы, которые в традиционных инструментах были бы медленными. Мы разработали комплексный конвейер финансового анализа, охватывающий от приёма необработанных данных до расчёта скользящих индикаторов, сгруппированных агрегаций и продвинутого скоринга, выполняемых с невероятной скоростью.