В этом руководстве мы создадим полнофункциональную систему событийно-ориентированного рабочего процесса с помощью Kombu, рассматривая обмен сообщениями как ключевую архитектурную возможность. Мы пошагово настроим обмены, ключи маршрутизации, фоновых рабочих и одновременных производителей, что позволит нам наблюдать за реальной распределённой системой.
По мере реализации каждого компонента мы увидим, как чистый поток сообщений, асинхронная обработка и шаблоны маршрутизации дают нам ту же мощность, на которую полагаются производственные микросервисы каждый день.
Установка и настройка
1. Установка Kombu:
`!pip install kombu`
2. Импорт зависимостей и настройка журналирования:
«`python
import threading
import time
import logging
import uuid
import datetime
import sys
from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin
logging.basicConfig(
level=logging.INFO,
format=’%(message)s’,
handlers=[logging.StreamHandler(sys.stdout)],
force=True
)
logger = logging.getLogger(name)
BROKER_URL = «memory://localhost/»
«`
Мы начинаем с установки Kombu, импорта зависимостей и настройки журналирования, чтобы чётко видеть каждое сообщение, проходящее через систему. Мы также устанавливаем URL-адрес брокера в памяти, что позволяет нам запускать всё локально в Colab без необходимости использования RabbitMQ. Эта настройка формирует основу для нашего распределённого рабочего процесса обмена сообщениями.
Настройка обмена и очередей
«`python
mediaexchange = Exchange(‘mediaexchange’, type=’topic’, durable=True)
task_queues = [
Queue(‘videoqueue’, mediaexchange, routing_key=’video.#’),
Queue(‘auditqueue’, mediaexchange, routing_key=’#’),
]
«`
Мы определяем обмен по теме для гибкой маршрутизации сообщений с помощью шаблонов с подстановочными знаками. Мы также создаём две очереди: одну, предназначенную для задач, связанных с видео, и другую очередь аудита, которая прослушивает всё. Используя маршрутизацию по темам, мы можем точно контролировать, как сообщения передаются по системе.
Реализация рабочего
«`python
class Worker(ConsumerMixin):
def init(self, connection, queues):
self.connection = connection
self.queues = queues
self.should_stop = False
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=self.queues,
callbacks=[self.on_message],
accept=[‘json’],
prefetch_count=1)
]
def on_message(self, body, message):
routingkey = message.deliveryinfo[‘routing_key’]
payload_id = body.get(‘id’, ‘unknown’)
logger.info(f»\n RECEIVED MSG via key: [{routing_key}]»)
logger.info(f» Payload ID: {payload_id}»)
try:
if ‘video’ in routing_key:
self.process_video(body)
elif ‘audit’ in routing_key:
logger.info(» [Audit] Logging event…»)
message.ack()
logger.info(f» ACKNOWLEDGED»)
except Exception as e:
logger.error(f» ERROR: {e}»)
def process_video(self, body):
logger.info(» [Processor] Transcoding video (Simulating work…)»)
time.sleep(0.5)
«`
Мы реализуем пользовательского рабочего с помощью Kombu’s ConsumerMixin для запуска его в фоновом потоке. В обратном вызове сообщения мы проверяем ключ маршрутизации, вызываем соответствующую функцию обработки и подтверждаем сообщение. Эта архитектура рабочего даёт нам чистое, параллельное потребление сообщений с полным контролем.
Публикация сообщений
«`python
def publish_messages(connection):
producer = Producer(connection)
tasks = [
(‘video.upload’, {‘file’: ‘movie.mp4’}),
(‘user.login’, {‘user’: ‘admin’}),
]
logger.info(«\n PRODUCER: Starting to publish messages…»)
for r_key, data in tasks:
data[‘id’] = str(uuid.uuid4())[:8]
logger.info(f» SENDING: {r_key} -> {data}»)
producer.publish(
data,
exchange=media_exchange,
routingkey=rkey,
serializer=’json’
)
time.sleep(1.5)
logger.info(» PRODUCER: Done.»)
«`
Теперь мы создаём производителя, который отправляет структурированные полезные нагрузки JSON в обмен с разными ключами маршрутизации. Мы генерируем уникальные идентификаторы для каждого события и наблюдаем, как они направляются в другие очереди. Это отражает реальные события публикации микросервисов, где производители и потребители остаются независимыми.
Запуск примера
«`python
def run_example():
with Connection(BROKER_URL) as conn:
worker = Worker(conn, task_queues)
worker_thread = threading.Thread(target=worker.run)
worker_thread.daemon = True
worker_thread.start()
logger.info(» SYSTEM: Worker thread started.»)
time.sleep(1)
try:
publish_messages(conn)
time.sleep(2)
except KeyboardInterrupt:
pass
finally:
worker.should_stop = True
logger.info(«\n SYSTEM: Execution complete.»)
if name == «main«:
run_example()
«`
Мы запускаем рабочего в фоновом потоке и запускаем производителя в основном потоке. Эта структура даёт нам мини-распределённую систему, работающую в Colab. Наблюдая за логами, мы видим, как сообщения публикуются → маршрутизируются → потребляются → подтверждаются, завершая полный жизненный цикл обработки событий.
В заключение мы создали динамический, распределённый конвейер маршрутизации задач, который обрабатывает события в реальном времени с чёткостью и точностью. Мы увидели, как Kombu абстрагирует сложность систем обмена сообщениями, одновременно предоставляя нам детальный контроль над маршрутизацией, потреблением и параллелизмом рабочих процессов.
1. Какие ключевые компоненты используются для настройки обмена сообщениями в системе маршрутизации распределённых задач с помощью Kombu?
В статье описывается использование следующих ключевых компонентов:
* установка Kombu (`!pip install kombu`);
* импорт зависимостей и настройка журналирования;
* настройка обмена (`mediaexchange = Exchange(‘mediaexchange’, type=’topic’, durable=True)`);
* создание очередей (`Queue(‘videoqueue’, mediaexchange, routing_key=’video.#’)`);
* реализация рабочего (`class Worker(ConsumerMixin):`);
* публикация сообщений (`producer.publish(data, exchange=mediaexchange, routingkey=r_key, serializer=’json’`).
2. Какие типы обменов сообщениями используются в системе и для чего они применяются?
В системе используется обмен сообщениями типа `topic` (`type=’topic’`). Этот тип обмена позволяет гибко маршрутизировать сообщения с помощью шаблонов с подстановочными знаками. В статье создаются две очереди: одна для задач, связанных с видео (`videoqueue`), и другая для аудита (`auditqueue`), которая прослушивает всё (`routing_key=’#’`).
3. Какие функции выполняет класс `Worker` в системе маршрутизации?
Класс `Worker` выполняет функцию рабочего, который запускается в фоновом потоке. В обратном вызове сообщения он проверяет ключ маршрутизации, вызывает соответствующую функцию обработки и подтверждает сообщение. Эта архитектура рабочего даёт чистое, параллельное потребление сообщений с полным контролем.
4. Какие шаги необходимо выполнить для запуска примера системы маршрутизации в Colab?
Для запуска примера системы маршрутизации в Colab необходимо выполнить следующие шаги:
* установить Kombu (`!pip install kombu`);
* импортировать зависимости и настроить журналирование;
* настроить обмен и очереди;
* реализовать рабочего;
* создать производителя, который отправляет структурированные полезные нагрузки JSON в обмен с разными ключами маршрутизации;
* запустить рабочего в фоновом потоке и запустить производителя в основном потоке.
5. Какие преимущества предоставляет использование Kombu для маршрутизации распределённых задач?
Использование Kombu для маршрутизации распределённых задач предоставляет следующие преимущества:
* абстрагирование сложности систем обмена сообщениями;
* детальный контроль над маршрутизацией, потреблением и параллелизмом рабочих процессов;
* возможность создания динамического, распределённого конвейера маршрутизации задач, который обрабатывает события в реальном времени с чёткостью и точностью.