Как мы переписывали логику очередей: Celery => aio-pika => FastStream

Страницы:  1

Ответить
 

Professor Seleznov


Наш путь активной работы с очередями RabbitMQ начался с классического Celery. Осознав критичность низкоуровневого контроля системы, принялись работать с aio-pika. Но и этот уровень слишком местами сложный (далее расскажу почему), и нашли отличное решение, на текущий момент, в лице FastStream. Сразу оставлю такую пометку, что каждый инструмент подходит для решения своей задачи. Мы больше хотели сделать акцент на удобство и скорость разработки относительно затрачиваемого времени на миграции решений.
N.B.: Код возможно покажется неоптимальным или старым. Это всё наш дорогой Легаси.
Постановка задачи
Наша система построена на основе микросервисов, работающих с RabbitMQ. Внутри - обычный асинхронный код для похода на внешние API и в БД.
Требования:
  • Надежный консьюминг - это для нас критично, чтобы сообщение шло по всему флоу и нигде не останавливалось без причин. Если ошибка падает, то это должно отражаться в 3 местах: БД, логи и метрики.
  • Ретраи при ошибках обработки.
  • Трейсинг - поддержка OpenTelemetry.
  • Мониторинг сервиса через healthcheck’и.
  • Prometheus метрики.

-
Решение №1: Celery как консьюмер
Почему Celery
Celery — классический инструмент для фоновых задач, знакомый большинству Python-разработчиков. Из коробки: декларативное описание задач, ретраи с экспоненциальной задержкой, хранение результатов, мониторинг через Flower, интеграции с фреймворками. Логика проста: пишешь @app.task, запускаешь воркер — и сообщения из очереди начинают обрабатываться.
Как мы его использовали
Мы не отправляли задачи из кода в духе my_task.delay(), а настраивали Celery на прослушивание внешней очереди, куда сообщения попадали от других систем. По сути, Celery выступал как consumer: подключался к брокеру, забирал сообщения, десериализовал и передавал в наши обработчики. Настройки вроде max_retries, default_retry_delay, countdown позволяли гибко управлять поведением при сбоях. Важно ещё подсветить, что результат всегда игнорируется с помощью параметра ignore_result=True поскольку все результаты записываются в БД.
Пример инициализации воркера:
def create_app(
name,
broker,
include,
backend=None,
task_queues=None,
liveness_probe=1,
update_period=60,
watcher_config={},
):
# Создание само приложение + наложение дополнительных конфигурации
app = Celery(name, broker=broker, include=include, backend=backend)
app.conf.update(
result_expires=120,
)
add_without_heartbeat_argument = Option(
("--without-heartbeat",),
default=True,
)
app.user_options["worker"].add(add_without_heartbeat_argument)
if task_queues is not None:
app.conf.task_queues = task_queues
if liveness_probe:
add_update_period_argument = Option(
("--update-period",),
default=update_period,
)
HEARTBEAT_FILE.touch()
app.user_options["worker"].add(add_update_period_argument)
# Добавление кастомной livenessProbe для K8s
app.steps["worker"].add(LivenessProbe)
# инициализация трейсинга
with_tracing = watcher_config.get("with_tracing")
if with_tracing:
tracing_exporters = watcher_config.get("tracing_exporters", ())
signals.worker_process_init.connect(
init_celery_tracing(app_name=name, tracing_exporters=tracing_exporters),
weak=False,
)
return app
С чем столкнулись
Celery проектировался как система передачи сообщений между системами. Из-за чего столкнулись со следующими проблемами:
  • Потребление памяти — из-за оборачивания каждого сообщения в метаданные и хранения внутренних структур Celery расход оперативной памяти быстро рос пропорционально потоку сообщений. При высоком темпе обработки воркер начинал использовать значительно больше ОЗУ, чем требовалось самой бизнес-логике, что вынуждало выделять избыточные ресурсы.
  • Управление соединениями и heartbeat — Celery скрывает многие детали брокера, из‑за чего при сетевых сбоях восстановление происходило с задержками, а тонкая настройка consumer_timeout, broker_transport_options была сложной и плохо документированной.
  • Избыточность — Как можно заметить мы используем минимальные дополнительные конфигурации Celery.
  • Падение контейнера на битом сообщений — Будет не совсем справедливо относить это полностью к минусам самого Celery, это больше камень в наш огород. Но опыт есть опыт. Если воркер обрабатывал невалидное сообщение в плане структуры, то падал весь контейнер, при этом сообщение консьюмилось. Что приводило к непониманию “куда делось сообщение и что с ним произошло?”.
Стало ясно: Celery — отличный выбор для фоновых задач с результатом, но для непрерывного потокового консьюминга он перегружен и недостаточно прозрачен.
-
Решение №2: aio-pika — близко к железу
Идея
Перейти на чистый AMQP, отказавшись от посредника в виде Celery. aio-pika — асинхронная библиотека для RabbitMQ, предоставляющая прямой доступ к каналам, обменникам, очередям. Код становится минимальной прослойкой над протоколом: сами управляем подписками, подтверждениями (ack/nack), префетчем, реконнектами.
Что переписали
Написали небольшой собственный “фреймворк” консьюминга: асинхронный раннер, который при запуске создаёт соединение, открывает канал, объявляет очереди с нужными параметрами durable/exclusive, подписывается на них, а в колбэке вызывает наши обработчики.
Поверх этого появились:
  • Ручной retry — Декоратор поверх колбэка, имеющий свой счётчик кол-во повторов при возникновений ошибок.
  • LivenessProb — Кастомный на уровне ядра aio-pika представлен чуть ниже.
  • Трейсинг — вручную оборачиваем вызовы в OpenTelemetry спаны, передаём trace context в заголовках AMQP при ретраях.
Пример LivenessProb:
class CustomRobustConnection(RobustConnection):
def __init__(
self,
url: URL,
loop: asyncio.AbstractEventLoop | None = None,
**kwargs: Any,
):
super().__init__(url=url, loop=loop, **kwargs)
# Кастомный класс пробы, схож с тем, что выше для Celery
self._liveness_probe = LivenessProbe()
async def connect(self, timeout: TimeoutType = None) -> None:
# Тут держитесь крепче. Я когда впервые это увидел, без литра кофе не смог понять как оно работает
self._RobustConnection__connect_timeout = timeout
if self.is_closed:
raise RuntimeError(f"{self!r} connection closed")
if self.reconnecting:
raise RuntimeError(
(
"Connect method called but connection "
f"{self!r} is reconnecting right now."
),
self,
)
if not self._RobustConnection__reconnection_task:
self._RobustConnection__reconnection_task = self.loop.create_task(
self.__connection_factory(),
)
await self._RobustConnection__fail_fast_future
await self.connected.wait()
async def __connection_factory(self) -> None:
logger.debug("Starting connection factory for %r", self)
while not self.is_closed and not self._close_called:
logger.debug("Waiting for connection close event for %r", self)
await self._RobustConnection__connection_close_event.wait()
if self.is_closed or self._close_called:
return
try:
self.transport = None
self.connected.clear()
logger.debug("Connection attempt for %r", self)
await Connection.connect(self, self._RobustConnection__connect_timeout)
if not self._RobustConnection__fail_fast_future.done():
self._RobustConnection__fail_fast_future.set_result(None)
logger.debug("Connection made on %r", self)
self._liveness_probe.start()
except CONNECTION_EXCEPTIONS as e:
if not self._RobustConnection__fail_fast_future.done():
self._RobustConnection__fail_fast_future.set_exception(e)
return
logger.warning(
'Connection attempt to "%s" failed: %s. '
"Reconnecting after %r seconds.",
self,
e,
self.reconnect_interval,
)
self._liveness_probe.stop()
except Exception:
logger.exception(
"Reconnect attempt failed %s. " "Retrying after %r seconds.",
self,
self.reconnect_interval,
)
self._liveness_probe.stop()
await asyncio.sleep(self.reconnect_interval)
Что стало лучше
Полный контроль над жизненным циклом соединения, тонкая настройка prefetch, возможность реализовать любую логику подтверждения (например, отложенный ack после завершения цепочки действий). Нет лишних метаданных в теле сообщения — брокер передаёт ровно то, что отправил продюсер. Асинхронность нативная, работает на asyncio без костылей.
Проблемы
Каждая «плюшка» делалась вручную и со временем объём инфраструктурного кода разросся. Десятки строк для декларации очередей, логирование reconnect‑цикла, согласование формата trace‑заголовков между сервисами. Healthcheck, хоть и был создан вручную, требовал аккуратности: нужно было отслеживать состояние не только TCP-соединения, но и открытого канала.
-
Решение №3: FastStream — золотая середина
Основная идея
FastStream — надстройка над aio-pika (а также над NATS, Kafka), которая даёт декларативный стиль описания consumer’ов, lifespan‑хуки, встроенные механизмы: healthcheck-эндпоинт, OpenTelemetry-интеграция, метрики Prometheus.
По сути, это aio-pika, обёрнутая в лучшие практики, которые мы сами реализовывали руками в предыдущем решении. Механизм retry описан как пример использования middleware.
Что пошло так
  • Healthcheck из коробки — достаточно указать FastStream объект в ASGI-приложении (через AsgiFastStream), и на /health возвращается статус брокера.
  • Lifespan — менеджер контекста управляет запуском и корректной остановкой consumer’ов, повторными соединениями. Не нужно писать свои обработчики сигналов. Это касалось не только RMQ коннектов, но и например коннектов к базам данных.
  • Мониторинг и трейсинг — подключение OpenTelemetry сводится к нескольким строчкам: спаны автоматически создаются для каждого обработанного сообщения, propagate context через заголовки.
  • Декларативные middleware — проще внедрять кросс‑касательную логику (логирование, валидацию) без захламления бизнес-кода.
  • Простота конфигурации — брокер, очереди, обменники описываются через Python-декораторы и типы, нет нужды вручную управлять каналами и ack/nack.
  • Интеграция с Pydantic — Сообщение на уровне описание консьюмеров сразу пытается отвалидироваться в Pydantic модель, если это нужно.
  • Dependency Injection — Как можно увидеть в примере ниже FastStream предлагает нам возможность подключение DI как в FastAPI.
Вот во что превратилась кодовая база одной инициализации процесса:
app = AsgiFastStream(
broker,
# health + metrics из коробки
asgi_routes=[
(
"/health",
make_system_ping_asgi(broker, timeout=5.0, include_in_schema=False),
),
("/metrics", make_asgi_app(registry)),
],
# Кастомный lifespan. Обычно здесь идёт инициализации подключений к БД
lifespan=lifespan,
)
Было бы ещё не очень справедливо не показать, что из себя представляет broker
broker = RabbitBroker(
settings.RMQ_URL,
# Примеры миддлвары
# RabbitPrometheusMiddleware из коробки для мониторинга сообщений
# EnrichLogMiddleware и FailCatchComplexMiddleware кастомные для отслеживания логирования и ошибок
middlewares=(
RabbitPrometheusMiddleware(registry=registry),
EnrichLogMiddleware,
FailCatchComplexMiddleware(
ignore_routing_keys=[
settings.RMQ_FAIL_TABLE_QUEUE,
settings.RMQ_DASHBOARD_SETTINGS_QUEUE,
settings.RMQ_TIMEOUT_QUEUE,
],
),
),
logger=logger,
# Кастомные парсеры и декодеры для обработки входящих сообщений
parser=json_parser,
decoder=decoder,
)
broker.include_router(router)
и пример как инициализируется консьюмер
from faststream import Depends
from faststream.rabbit import (
ExchangeType,
RabbitExchange,
RabbitMessage,
RabbitQueue,
RabbitRouter,
)
from faststream.rabbit.annotations import RabbitBroker as ContextRabbitBroker
# Вместо RabbitRouter можно использовать broker.
router = RabbitRouter()
@router.subscriber(
RabbitQueue(
name=...,
durable=True,
routing_key=...,
),
RabbitExchange(
name=...,
durable=True,
),
)
async def on_service_hub_message(
message: RabbitMessage,
# Имеется общий контекст всей системы
broker: ContextRabbitBroker,
# DI
async_session=Depends(get_db_session),
) -> None:
Подключение консьюмеров простое, почти схожее с Celery, но чуть шире. Возможности FastStream на этом не заканчиваются. Если копать ещё глубже, то там можно найти документацию AsyncAPI и In-memory тесты.
Что пошло не так
  • Производительность — из-за дополнительных слоёв абстракции (middleware, автоматическая обвязка спанов, встроенные ретраи) пропускная способность ниже, чем у голого aio-pika. Это для нас лишь только в теории, поскольку поток не достигает больших значений 30-40 RPS. Справедливости ради, это не относится к минусам, поскольку бенчмарки я не проводил, и фраза “поверь мне брат” меня убедила.
  • Сложность отслеживания кодовой базы — Тут опять из-за дополнительных слоёв абстракции иногда заходишь внутрь посмотреть что там, и можно с лёгкостью потеряться.
Итоговое сравнение
Критерий Celery aio-pika (самописный) FastStream
Подход Динамические задачи Низкоуровневый AMQP Высокоуровневый consumer
Асинхронность Ограниченная (gevent) Полная (asyncio) Полная (asyncio)
Healthcheck Требует доп. решения Ручная реализация Из коробки
Retry Встроен в задачу Реализуется вручную Реализуется вручную (есть пример)
Трейсинг (OTel) Через сигналы Celery Ручное встраивание Из коробки
Контроль Низкий Максимальный Средний (middleware)
Производительность Умеренная Высокая Больше чем у Celery, ниже чем у aio-pika
Кривая входа Низкая Средняя (требует знаний AMQP) Низкая (знакомо по FastAPI)

-
Заключение
Пройдя путь от монолитного Celery через хрупкий самописный код до сбалансированного FastStream, мы пришли к инструменту, который закрывает все эксплуатационные потребности: healthcheck, трейсинг, метрики — и при этом не требует поддерживать сотни строк инфраструктурного кода.
Да, возможно я выступаю в качестве адвоката этого инструмента, но что могу поделать, когда он так понравился? это вы ещё не увидели как стараюсь переписать всё на Rust.
Отойдя от жаргона и шуток повторю первую свою мысль: У каждой задачи свой инструмент. Мой — это FastStream.-Источник
 
Loading...
Error