В этом руководстве мы создадим полнофункциональную систему событийно-ориентированного рабочего процесса с помощью Kombu, рассматривая обмен сообщениями как ключевую архитектурную возможность. Мы пошагово настроим обмены, ключи маршрутизации, фоновых рабочих и одновременных производителей, что позволит нам наблюдать за реальной распределённой системой.
Установка и настройка
1. Установка Kombu:
«`
!pip install kombu
«`
2. Импорт зависимостей и настройка журналирования:
«`python
import threading
import time
import logging
import uuid
import datetime
import sys
from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin
logging.basicConfig(
level=logging.INFO,
format=’%(message)s’,
handlers=[logging.StreamHandler(sys.stdout)],
force=True
)
logger = logging.getLogger(name)
BROKER_URL = «memory://localhost/»
«`
Настройка обмена и очередей
«`python
mediaexchange = Exchange(‘mediaexchange’, type=’topic’, durable=True)
task_queues = [
Queue(‘videoqueue’, mediaexchange, routing_key=’video.#’),
Queue(‘auditqueue’, mediaexchange, routing_key=’#’),
]
«`
Реализация рабочего класса
«`python
class Worker(ConsumerMixin):
def init(self, connection, queues):
self.connection = connection
self.queues = queues
self.should_stop = False
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=self.queues,
callbacks=[self.on_message],
accept=[‘json’],
prefetch_count=1)
]
def on_message(self, body, message):
routingkey = message.deliveryinfo[‘routing_key’]
payload_id = body.get(‘id’, ‘unknown’)
logger.info(f»\n RECEIVED MSG via key: [{routing_key}]»)
logger.info(f» Payload ID: {payload_id}»)
try:
if ‘video’ in routing_key:
self.process_video(body)
elif ‘audit’ in routing_key:
logger.info(» [Audit] Logging event…»)
message.ack()
logger.info(f» ACKNOWLEDGED»)
except Exception as e:
logger.error(f» ERROR: {e}»)
def process_video(self, body):
logger.info(» [Processor] Transcoding video (Simulating work…)»)
time.sleep(0.5)
«`
Отправка сообщений
«`python
def publish_messages(connection):
producer = Producer(connection)
tasks = [
(‘video.upload’, {‘file’: ‘movie.mp4’}),
(‘user.login’, {‘user’: ‘admin’}),
]
logger.info(«\n PRODUCER: Starting to publish messages…»)
for r_key, data in tasks:
data[‘id’] = str(uuid.uuid4())[:8]
logger.info(f» SENDING: {r_key} -> {data}»)
producer.publish(
data,
exchange=media_exchange,
routingkey=rkey,
serializer=’json’
)
time.sleep(1.5)
logger.info(» PRODUCER: Done.»)
«`
Запуск примера
«`python
def run_example():
with Connection(BROKER_URL) as conn:
worker = Worker(conn, task_queues)
worker_thread = threading.Thread(target=worker.run)
worker_thread.daemon = True
worker_thread.start()
logger.info(» SYSTEM: Worker thread started.»)
time.sleep(1)
try:
publish_messages(conn)
time.sleep(2)
except KeyboardInterrupt:
pass
finally:
worker.should_stop = True
logger.info(«\n SYSTEM: Execution complete.»)
if name == «main«:
run_example()
«`
В этом руководстве мы создали систему, которая демонстрирует, как Kombu упрощает работу с распределёнными системами. Мы настроили обмен сообщениями и показали, как сообщения могут быть маршрутизированы и обработаны в распределённой системе. Это позволяет нам лучше понять, как работают событийно-ориентированные системы, и подготовиться к созданию более сложных микросервисов и рабочих процессов.
1. Какие основные компоненты используются для настройки обмена сообщениями в системе маршрутизации распределённых задач с помощью Kombu?
В системе используются следующие основные компоненты:
— `Kombu`: библиотека для работы с очередями сообщений.
— `Exchange`: объект, который определяет тип обмена сообщениями.
— `Queue`: очередь, в которую помещаются сообщения для обработки.
— `Producer`: объект для отправки сообщений.
— `Consumer`: объект для получения и обработки сообщений.
2. Какие типы ключей маршрутизации используются в примере для распределения задач между очередями?
В примере используются следующие типы ключей маршрутизации:
— `’video.#’` для очереди `video_queue`.
— `’#’` для очереди `audit_queue`.
Это позволяет маршрутизировать сообщения в соответствующие очереди на основе их ключей.
3. Какие методы используются для обработки сообщений в рабочем классе `Worker`?
В рабочем классе `Worker` используются следующие методы для обработки сообщений:
— `on_message`: метод, который вызывается при получении сообщения.
— `process_video`: метод для обработки видеосообщений.
В зависимости от ключа маршрутизации, сообщение обрабатывается соответствующим методом.
4. Какие инструменты используются для настройки журналирования в примере?
В примере для настройки журналирования используется библиотека `logging`. Она позволяет настроить уровень логирования, формат вывода и обработчики для вывода логов.
5. Какие шаги необходимо выполнить для запуска примера системы маршрутизации задач с помощью Kombu?
Для запуска примера необходимо выполнить следующие шаги:
— Установить `Kombu` с помощью `pip`.
— Импортировать зависимости и настроить журналирование.
— Настроить обмен и очереди.
— Реализовать рабочий класс `Worker`.
— Отправить сообщения с помощью `Producer`.
— Запустить пример с помощью `run_example()`.