Реализация кода для создания унифицированного конвейера Apache Beam, демонстрирующего пакетную и потоковую обработку с оконным анализом в событийном времени с использованием DirectRunner

В этом руководстве мы покажем, как создать унифицированный конвейер Apache Beam, который без проблем работает как в пакетном, так и в потоковом режимах с помощью DirectRunner. Мы генерируем синтетические данные с учётом времени событий и применяем фиксированные окна с триггерами и допустимой задержкой, чтобы продемонстрировать, как Apache Beam последовательно обрабатывает своевременные и поздние события.

Установка зависимостей

Мы устанавливаем необходимые зависимости и обеспечиваем совместимость версий, чтобы Apache Beam работал корректно. Импортируем основные API Beam вместе с инструментами для окон, триггеров и TestStream, которые понадобятся позже в конвейере. Также подключаем стандартные модули Python для работы со временем и форматирования JSON.

«`
!pip -q install -U «grpcio>=1.71.2» «grpcio-status>=1.71.2»
!pip -q install -U apache-beam crcmod

import apache_beam as beam
from apachebeam.options.pipelineoptions import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apachebeam.testing.teststream import TestStream
import json
from datetime import datetime, timezone
«`

Конфигурация

Определяем глобальную конфигурацию, которая контролирует размер окна, допустимую задержку и режим выполнения. Создаём синтетические события с явными временными метками событий, чтобы поведение окон было детерминированным и его было легко понять. Подготавливаем небольшой набор данных, который намеренно включает внеочередные и поздние события, чтобы наблюдать семантику времени событий Beam.

«`
MODE = «stream»
WINDOWSIZESECS = 60
ALLOWEDLATENESSSECS = 120

def makeevent(userid, eventtype, amount, eventtimeepochs):
return {«userid»: userid, «eventtype»: eventtype, «amount»: float(amount), «eventtime»: int(eventtimeepochs)}

base = datetime.now(timezone.utc).replace(microsecond=0)
t0 = int(base.timestamp())

BATCH_EVENTS = [
make_event(«u1», «purchase», 20, t0 + 5),
make_event(«u1», «purchase», 15, t0 + 20),
make_event(«u2», «purchase», 8, t0 + 35),
make_event(«u1», «refund», -5, t0 + 62),
make_event(«u2», «purchase», 12, t0 + 70),
make_event(«u3», «purchase», 9, t0 + 75),
make_event(«u2», «purchase», 3, t0 + 50),
]
«`

Логика агрегации

Создаём повторно используемый Beam PTransform, который инкапсулирует всю логику агрегации с окнами. Применяем фиксированные окна, триггеры и правила накопления, затем группируем события по пользователям и вычисляем количество и суммы. Мы сохраняем этот трансформ независимым от источника данных, поэтому одна и та же логика применяется как к пакетным, так и к потоковым входным данным.

«`
def formatjoinedrecord(kv):
user_id, d = kv
return {
«userid»: userid,
«count»: int(d[«count»][0]) if d[«count»] else 0,
«sumamount»: float(d[«sumamount»][0]) if d[«sum_amount»] else 0.0,
}

class WindowedUserAgg(beam.PTransform):
def expand(self, pcoll):
stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e[«event_time»]))
windowed = stamped | beam.WindowInto(
FixedWindows(WINDOWSIZESECS),
allowedlateness=ALLOWEDLATENESS_SECS,
trigger=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(10),
),
accumulation_mode=AccumulationMode.ACCUMULATING,
)
keyed = windowed | beam.Map(lambda e: (e[«user_id»], e[«amount»]))
counts = keyed | beam.combiners.Count.PerKey()
sums = keyed | beam.CombinePerKey(sum)
return (
{«count»: counts, «sum_amount»: sums}
| beam.CoGroupByKey()
| beam.Map(formatjoinedrecord)
)
«`

Добавление информации об окнах

Обогащаем каждую агрегированную запись метаданными окна и панели, чтобы чётко видеть, когда и почему результаты выводятся. Преобразуем внутренние метки времени Beam в удобочитаемые времена UTC для ясности.

«`
class AddWindowInfo(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
ws = float(window.start)
we = float(window.end)
yield {
element,
«windowstartutc»: datetime.fromtimestamp(ws, tz=timezone.utc).strftime(«%H:%M:%S»),
«windowendutc»: datetime.fromtimestamp(we, tz=timezone.utc).strftime(«%H:%M:%S»),
«panetiming»: str(paneinfo.timing),
«paneisfirst»: paneinfo.isfirst,
«paneislast»: paneinfo.islast,
}
«`

Создание тестового потока

Определяем TestStream, который имитирует реальное потоковое поведение с помощью водяных знаков, продвижений по времени обработки и поздних данных.

«`
def buildteststream():
return (
TestStream()
.advancewatermarkto(t0)
.add_elements([
beam.window.TimestampedValue(make_event(«u1», «purchase», 20, t0 + 5), t0 + 5),
beam.window.TimestampedValue(make_event(«u1», «purchase», 15, t0 + 20), t0 + 20),
beam.window.TimestampedValue(make_event(«u2», «purchase», 8, t0 + 35), t0 + 35),
])
.advanceprocessingtime(5)
.advancewatermarkto(t0 + 61)
.add_elements([
beam.window.TimestampedValue(make_event(«u1», «refund», -5, t0 + 62), t0 + 62),
beam.window.TimestampedValue(make_event(«u2», «purchase», 12, t0 + 70), t0 + 70),
beam.window.TimestampedValue(make_event(«u3», «purchase», 9, t0 + 75), t0 + 75),
])
.advanceprocessingtime(5)
.add_elements([
beam.window.TimestampedValue(make_event(«u2», «purchase», 3, t0 + 50), t0 + 50),
])
.advancewatermarkto(t0 + 121)
.advancewatermarkto_infinity()
)
«`

Запуск пакетной обработки

«`
def run_batch():
with beam.Pipeline(options=PipelineOptions([])) as p:
(
p
| beam.Create(BATCH_EVENTS)
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
«`

Запуск потоковой обработки

«`
def run_stream():
opts = PipelineOptions([])
opts.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=opts) as p:
(
p
| buildteststream()
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)

runstream() if MODE == «stream» else runbatch()
«`

Мы объединили всё в исполняемые пакетные и потоковые конвейеры. Переключаемся между режимами, изменяя единственный флаг, при этом повторно используя одно и то же преобразование агрегации. Запускаем конвейер и выводим результаты в окне, что упрощает проверку потока выполнения и выходных данных.

В заключение мы продемонстрировали, что один и тот же конвейер Beam может обрабатывать как ограниченные пакетные данные, так и неограниченные потоковые данные, сохраняя при этом идентичные семантики окон и агрегации. Мы наблюдали, как водяные знаки, триггеры и режимы накопления влияют на время выдачи результатов и как поздние данные обновляют ранее вычисленные окна. Также мы сосредоточились на концептуальных основах унифицированной модели Beam, предоставив прочную основу для последующего масштабирования той же конструкции до реальных потоковых движков и производственных сред.

1. Какие инструменты и API используются в статье для создания унифицированного конвейера Apache Beam?

В статье используются основные API Beam вместе с инструментами для окон, триггеров и TestStream, а также стандартные модули Python для работы со временем и форматирования JSON.

2. Как в статье обеспечивается совместимость версий Apache Beam и других используемых библиотек?

В статье указано, что для установки зависимостей используются команды pip для установки необходимых версий библиотек, включая grpcio, grpcio-status, apache-beam и crcmod.

3. Какие параметры конфигурации используются для определения размера окна и допустимой задержки в примере?

В примере используются параметры WINDOWSIZESECS для определения размера окна в секундах и ALLOWEDLATENESSSECS для определения допустимой задержки в секундах.

4. Как в статье обрабатывается время событий и как это влияет на поведение окон?

В статье указано, что синтетические события создаются с явными временными метками событий, чтобы поведение окон было детерминированным и его было легко понять. Это позволяет наблюдать семантику времени событий Beam.

5. Какие преимущества даёт использование унифицированного конвейера Apache Beam для обработки как пакетных, так и потоковых данных?

Унифицированный конвейер Apache Beam позволяет без проблем работать как в пакетном, так и в потоковом режимах, сохраняя при этом идентичные семантики окон и агрегации. Это упрощает проверку потока выполнения и выходных данных, а также предоставляет прочную основу для последующего масштабирования конструкции до реальных потоковых движков и производственных сред.

Источник