Как создать высокопроизводительную систему маршрутизации распределённых задач с помощью Kombu с Topic Exchanges и Concurrent Workers

В этом руководстве мы создадим полнофункциональную систему событийно-ориентированного рабочего процесса с помощью Kombu, рассматривая обмен сообщениями как ключевую архитектурную возможность. Мы пошагово настроим обмены, ключи маршрутизации, фоновых рабочих и одновременных производителей, что позволит нам наблюдать за реальной распределённой системой.

Установка и настройка

1. Установка Kombu:
«`
!pip install kombu
«`

2. Импорт зависимостей и настройка журналирования:
«`python
import threading
import time
import logging
import uuid
import datetime
import sys

from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin

logging.basicConfig(
level=logging.INFO,
format=’%(message)s’,
handlers=[logging.StreamHandler(sys.stdout)],
force=True
)
logger = logging.getLogger(name)

BROKER_URL = «memory://localhost/»
«`

Настройка обмена и очередей

«`python
mediaexchange = Exchange(‘mediaexchange’, type=’topic’, durable=True)

task_queues = [
Queue(‘videoqueue’, mediaexchange, routing_key=’video.#’),
Queue(‘auditqueue’, mediaexchange, routing_key=’#’),
]
«`

Реализация рабочего класса

«`python
class Worker(ConsumerMixin):
def init(self, connection, queues):
self.connection = connection
self.queues = queues
self.should_stop = False

def get_consumers(self, Consumer, channel):
return [
Consumer(queues=self.queues,
callbacks=[self.on_message],
accept=[‘json’],
prefetch_count=1)
]

def on_message(self, body, message):
routingkey = message.deliveryinfo[‘routing_key’]
payload_id = body.get(‘id’, ‘unknown’)

logger.info(f»\n RECEIVED MSG via key: [{routing_key}]»)
logger.info(f» Payload ID: {payload_id}»)

try:
if ‘video’ in routing_key:
self.process_video(body)
elif ‘audit’ in routing_key:
logger.info(» [Audit] Logging event…»)

message.ack()
logger.info(f» ACKNOWLEDGED»)

except Exception as e:
logger.error(f» ERROR: {e}»)

def process_video(self, body):
logger.info(» [Processor] Transcoding video (Simulating work…)»)
time.sleep(0.5)
«`

Отправка сообщений

«`python
def publish_messages(connection):
producer = Producer(connection)

tasks = [
(‘video.upload’, {‘file’: ‘movie.mp4’}),
(‘user.login’, {‘user’: ‘admin’}),
]

logger.info(«\n PRODUCER: Starting to publish messages…»)

for r_key, data in tasks:
data[‘id’] = str(uuid.uuid4())[:8]

logger.info(f» SENDING: {r_key} -> {data}»)

producer.publish(
data,
exchange=media_exchange,
routingkey=rkey,
serializer=’json’
)
time.sleep(1.5)

logger.info(» PRODUCER: Done.»)
«`

Запуск примера

«`python
def run_example():
with Connection(BROKER_URL) as conn:
worker = Worker(conn, task_queues)
worker_thread = threading.Thread(target=worker.run)
worker_thread.daemon = True
worker_thread.start()

logger.info(» SYSTEM: Worker thread started.»)
time.sleep(1)

try:
publish_messages(conn)
time.sleep(2)
except KeyboardInterrupt:
pass
finally:
worker.should_stop = True
logger.info(«\n SYSTEM: Execution complete.»)

if name == «main«:
run_example()
«`

В этом руководстве мы создали систему, которая демонстрирует, как Kombu упрощает работу с распределёнными системами. Мы настроили обмен сообщениями и показали, как сообщения могут быть маршрутизированы и обработаны в распределённой системе. Это позволяет нам лучше понять, как работают событийно-ориентированные системы, и подготовиться к созданию более сложных микросервисов и рабочих процессов.

1. Какие основные компоненты используются для настройки обмена сообщениями в системе маршрутизации распределённых задач с помощью Kombu?

В системе используются следующие основные компоненты:
— `Kombu`: библиотека для работы с очередями сообщений.
— `Exchange`: объект, который определяет тип обмена сообщениями.
— `Queue`: очередь, в которую помещаются сообщения для обработки.
— `Producer`: объект для отправки сообщений.
— `Consumer`: объект для получения и обработки сообщений.

2. Какие типы ключей маршрутизации используются в примере для распределения задач между очередями?

В примере используются следующие типы ключей маршрутизации:
— `’video.#’` для очереди `video_queue`.
— `’#’` для очереди `audit_queue`.

Это позволяет маршрутизировать сообщения в соответствующие очереди на основе их ключей.

3. Какие методы используются для обработки сообщений в рабочем классе `Worker`?

В рабочем классе `Worker` используются следующие методы для обработки сообщений:
— `on_message`: метод, который вызывается при получении сообщения.
— `process_video`: метод для обработки видеосообщений.

В зависимости от ключа маршрутизации, сообщение обрабатывается соответствующим методом.

4. Какие инструменты используются для настройки журналирования в примере?

В примере для настройки журналирования используется библиотека `logging`. Она позволяет настроить уровень логирования, формат вывода и обработчики для вывода логов.

5. Какие шаги необходимо выполнить для запуска примера системы маршрутизации задач с помощью Kombu?

Для запуска примера необходимо выполнить следующие шаги:
— Установить `Kombu` с помощью `pip`.
— Импортировать зависимости и настроить журналирование.
— Настроить обмен и очереди.
— Реализовать рабочий класс `Worker`.
— Отправить сообщения с помощью `Producer`.
— Запустить пример с помощью `run_example()`.

Источник