Как создать высокопроизводительную систему маршрутизации распределённых задач с помощью Kombu с Topic Exchanges и Concurrent Workers

В этом руководстве мы создадим полнофункциональную систему событийно-ориентированного рабочего процесса с помощью Kombu, рассматривая обмен сообщениями как ключевую архитектурную возможность. Мы пошагово настроим обмены, ключи маршрутизации, фоновых рабочих и одновременных производителей, что позволит нам наблюдать за реальной распределённой системой.

По мере реализации каждого компонента мы увидим, как чистый поток сообщений, асинхронная обработка и шаблоны маршрутизации дают нам ту же мощность, на которую полагаются производственные микросервисы каждый день.

Установка и настройка

1. Установка Kombu:

`!pip install kombu`

2. Импорт зависимостей и настройка журналирования:

«`python
import threading
import time
import logging
import uuid
import datetime
import sys

from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin

logging.basicConfig(
level=logging.INFO,
format=’%(message)s’,
handlers=[logging.StreamHandler(sys.stdout)],
force=True
)
logger = logging.getLogger(name)

BROKER_URL = «memory://localhost/»
«`

Мы начинаем с установки Kombu, импорта зависимостей и настройки журналирования, чтобы чётко видеть каждое сообщение, проходящее через систему. Мы также устанавливаем URL-адрес брокера в памяти, что позволяет нам запускать всё локально в Colab без необходимости использования RabbitMQ. Эта настройка формирует основу для нашего распределённого рабочего процесса обмена сообщениями.

Настройка обмена и очередей

«`python
mediaexchange = Exchange(‘mediaexchange’, type=’topic’, durable=True)

task_queues = [
Queue(‘videoqueue’, mediaexchange, routing_key=’video.#’),
Queue(‘auditqueue’, mediaexchange, routing_key=’#’),
]
«`

Мы определяем обмен по теме для гибкой маршрутизации сообщений с помощью шаблонов с подстановочными знаками. Мы также создаём две очереди: одну, предназначенную для задач, связанных с видео, и другую очередь аудита, которая прослушивает всё. Используя маршрутизацию по темам, мы можем точно контролировать, как сообщения передаются по системе.

Реализация рабочего

«`python
class Worker(ConsumerMixin):
def init(self, connection, queues):
self.connection = connection
self.queues = queues
self.should_stop = False

def get_consumers(self, Consumer, channel):
return [
Consumer(queues=self.queues,
callbacks=[self.on_message],
accept=[‘json’],
prefetch_count=1)
]

def on_message(self, body, message):
routingkey = message.deliveryinfo[‘routing_key’]
payload_id = body.get(‘id’, ‘unknown’)

logger.info(f»\n RECEIVED MSG via key: [{routing_key}]»)
logger.info(f» Payload ID: {payload_id}»)

try:
if ‘video’ in routing_key:
self.process_video(body)
elif ‘audit’ in routing_key:
logger.info(» [Audit] Logging event…»)

message.ack()
logger.info(f» ACKNOWLEDGED»)

except Exception as e:
logger.error(f» ERROR: {e}»)

def process_video(self, body):
logger.info(» [Processor] Transcoding video (Simulating work…)»)
time.sleep(0.5)
«`

Мы реализуем пользовательского рабочего с помощью Kombu’s ConsumerMixin для запуска его в фоновом потоке. В обратном вызове сообщения мы проверяем ключ маршрутизации, вызываем соответствующую функцию обработки и подтверждаем сообщение. Эта архитектура рабочего даёт нам чистое, параллельное потребление сообщений с полным контролем.

Публикация сообщений

«`python
def publish_messages(connection):
producer = Producer(connection)

tasks = [
(‘video.upload’, {‘file’: ‘movie.mp4’}),
(‘user.login’, {‘user’: ‘admin’}),
]

logger.info(«\n PRODUCER: Starting to publish messages…»)

for r_key, data in tasks:
data[‘id’] = str(uuid.uuid4())[:8]

logger.info(f» SENDING: {r_key} -> {data}»)

producer.publish(
data,
exchange=media_exchange,
routingkey=rkey,
serializer=’json’
)
time.sleep(1.5)

logger.info(» PRODUCER: Done.»)
«`

Теперь мы создаём производителя, который отправляет структурированные полезные нагрузки JSON в обмен с разными ключами маршрутизации. Мы генерируем уникальные идентификаторы для каждого события и наблюдаем, как они направляются в другие очереди. Это отражает реальные события публикации микросервисов, где производители и потребители остаются независимыми.

Запуск примера

«`python
def run_example():
with Connection(BROKER_URL) as conn:
worker = Worker(conn, task_queues)
worker_thread = threading.Thread(target=worker.run)
worker_thread.daemon = True
worker_thread.start()

logger.info(» SYSTEM: Worker thread started.»)
time.sleep(1)

try:
publish_messages(conn)
time.sleep(2)
except KeyboardInterrupt:
pass
finally:
worker.should_stop = True
logger.info(«\n SYSTEM: Execution complete.»)

if name == «main«:
run_example()
«`

Мы запускаем рабочего в фоновом потоке и запускаем производителя в основном потоке. Эта структура даёт нам мини-распределённую систему, работающую в Colab. Наблюдая за логами, мы видим, как сообщения публикуются → маршрутизируются → потребляются → подтверждаются, завершая полный жизненный цикл обработки событий.

В заключение мы создали динамический, распределённый конвейер маршрутизации задач, который обрабатывает события в реальном времени с чёткостью и точностью. Мы увидели, как Kombu абстрагирует сложность систем обмена сообщениями, одновременно предоставляя нам детальный контроль над маршрутизацией, потреблением и параллелизмом рабочих процессов.

1. Какие ключевые компоненты используются для настройки обмена сообщениями в системе маршрутизации распределённых задач с помощью Kombu?

В статье описывается использование следующих ключевых компонентов:
* установка Kombu (`!pip install kombu`);
* импорт зависимостей и настройка журналирования;
* настройка обмена (`mediaexchange = Exchange(‘mediaexchange’, type=’topic’, durable=True)`);
* создание очередей (`Queue(‘videoqueue’, mediaexchange, routing_key=’video.#’)`);
* реализация рабочего (`class Worker(ConsumerMixin):`);
* публикация сообщений (`producer.publish(data, exchange=mediaexchange, routingkey=r_key, serializer=’json’`).

2. Какие типы обменов сообщениями используются в системе и для чего они применяются?

В системе используется обмен сообщениями типа `topic` (`type=’topic’`). Этот тип обмена позволяет гибко маршрутизировать сообщения с помощью шаблонов с подстановочными знаками. В статье создаются две очереди: одна для задач, связанных с видео (`videoqueue`), и другая для аудита (`auditqueue`), которая прослушивает всё (`routing_key=’#’`).

3. Какие функции выполняет класс `Worker` в системе маршрутизации?

Класс `Worker` выполняет функцию рабочего, который запускается в фоновом потоке. В обратном вызове сообщения он проверяет ключ маршрутизации, вызывает соответствующую функцию обработки и подтверждает сообщение. Эта архитектура рабочего даёт чистое, параллельное потребление сообщений с полным контролем.

4. Какие шаги необходимо выполнить для запуска примера системы маршрутизации в Colab?

Для запуска примера системы маршрутизации в Colab необходимо выполнить следующие шаги:
* установить Kombu (`!pip install kombu`);
* импортировать зависимости и настроить журналирование;
* настроить обмен и очереди;
* реализовать рабочего;
* создать производителя, который отправляет структурированные полезные нагрузки JSON в обмен с разными ключами маршрутизации;
* запустить рабочего в фоновом потоке и запустить производителя в основном потоке.

5. Какие преимущества предоставляет использование Kombu для маршрутизации распределённых задач?

Использование Kombu для маршрутизации распределённых задач предоставляет следующие преимущества:
* абстрагирование сложности систем обмена сообщениями;
* детальный контроль над маршрутизацией, потреблением и параллелизмом рабочих процессов;
* возможность создания динамического, распределённого конвейера маршрутизации задач, который обрабатывает события в реальном времени с чёткостью и точностью.

Источник