Создание приложений на FastAPI. Часть третья: Управление задачами

Создание приложений на FastAPI. Часть третья: Управление задачами

Картинка к публикации: Создание приложений на FastAPI. Часть третья:  Управление задачами

Введение в задачи

Асинхронные и синхронные задачи

Асинхронные задачи позволяют вашему приложению выполнять операции в фоновом режиме, не блокируя основной поток выполнения программы. Это особенно важно для операций, которые могут занять значительное время, например, отправка email, обработка изображений, взаимодействие с внешними API или работа с файлами. Асинхронные задачи позволяют избежать задержек и повысить отзывчивость приложения.

Синхронные задачи, в свою очередь, выполняются последовательно, что означает, что выполнение следующей задачи будет ожидать завершения текущей. Этот подход более прост и часто применяется для операций, требующих мгновенного завершения и не имеющих смысла быть вынесенными в отдельный поток. Например, валидация данных или простые вычисления могут быть выполнены синхронно.

Чтобы глубже понять разницу между асинхронными и синхронными задачами, давайте рассмотрим несколько ключевых аспектов:

  1. Блокировка потоков: Асинхронные задачи не блокируют основной поток выполнения, позволяя другим задачам выполняться параллельно. Синхронные задачи, напротив, блокируют поток до их завершения.
  2. Производительность: Асинхронные задачи позволяют улучшить производительность системы за счет того, что тяжелые задачи выполняются в фоне. Это освобождает ресурсы для обработки других запросов. Синхронные задачи, из-за своей природы, могут вызвать задержки в работе системы, если не управлять ими осторожно.
  3. Сложность реализации: Синхронные задачи легче реализовать и отладить, так как их выполнение происходит последовательно. Асинхронные задачи требуют более сложной архитектуры, особенно когда дело касается управления зависимостями и обработкой ошибок.
  4. Примеры использования:
    • Асинхронные задачи: отправка email-уведомлений, генерация отчетов, фоновая обработка данных.
    • Синхронные задачи: валидация формы, проверка авторизации пользователя, выполнение простых CRUD операций.

Интеграция таск-менеджеров в FastAPI требует понимания особенностей приложения и его архитектуры. Важно учитывать, какие задачи будут выполняться асинхронно, а какие — синхронно. Разделение задач на эти две категории помогает оптимизировать производительность и улучшить масштабируемость приложения.

Для реализации асинхронных задач в FastAPI идеально подходят такие инструменты, как TaskIQ. TaskIQ позволяет легко интегрировать асинхронные задачи в FastAPI, используя преимущества встроенной поддержки асинхронных операций в Python. Это позволяет эффективно выполнять задачи, не блокируя основной поток.

С другой стороны, Celery является отличным инструментом для выполнения синхронных задач и управления их выполнением. В сочетании с брокерами сообщений, такими как RabbitMQ и Redis, Celery обеспечивает надежное выполнение задач и управление очередями.

Для интеграции TaskIQ и Celery, важно также правильно настроить инфраструктуру приложения. Конфигурация контейнеров Docker, использование брокеров сообщений и бэкэндов для хранения результатов — все это является важной частью подготовки к внедрению таск-менеджеров.

version: '3.8'

x-shared-parameters: &shared-parameters
  env_file:
    - ../../.env
  network_mode: "host"
  restart: always

x-db-dependency: &db-dependency
  depends_on:
    db:
      condition: service_started
    rabbitmq:
      condition: service_healthy

services:
  db:
    <<: *shared-parameters
    image: postgres:16-alpine
    container_name: tmpl_stage_db
    volumes:
      - tmpl_stage_psgsql_volume:/var/lib/postgresql/data/
    ports:
      - "5432:5432"

  redis:
    <<: *shared-parameters
    image: redis:latest
    container_name: tmpl_stage_redis
    command: >
      --requirepass ${REDIS_PASSWORD}
    ports:
      - "6379:6379"

  rabbitmq:
    <<: *shared-parameters
    image: rabbitmq:management-alpine
    container_name: tmpl_stage_rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - tmpl_stage_rabbitmq_volume:/var/lib/rabbitmq/:rw
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  backend:
    <<: [*shared-parameters, *db-dependency]
    build:
      context: ../../backend
      dockerfile: Dockerfile
    container_name: tmpl_stage_backend
    command: ./src/conf/entrypoints/fastapi.sh
    ports:
      - 8100:8100

  celery:
    <<: [*shared-parameters, *db-dependency]
    build:
      context: ../../backend
      dockerfile: Dockerfile
    container_name: tmpl_stage_celery
    command: ./src/conf/entrypoints/celery.sh

  celery-beat:
    <<: [*shared-parameters, *db-dependency]
    build:
      context: ../../backend
      dockerfile: Dockerfile
    container_name: tmpl_stage_celery-beat
    command: ./src/conf/entrypoints/celery-beat.sh
    mem_limit: 1g
    cpus: "1.0"

  taskiq:
    <<: [*shared-parameters, *db-dependency]
    build:
      context: ../../backend
      dockerfile: Dockerfile
    container_name: tmpl_stage_taskiq_worker
    command: ./src/conf/entrypoints/taskiq.sh

volumes:
  tmpl_stage_psgsql_volume:
  tmpl_stage_rabbitmq_volume:

Эта конфигурация создает несколько сервисов, необходимых для работы FastAPI вместе с TaskIQ и Celery. Каждый сервис работает в своем контейнере, обеспечивая изоляцию и легкость масштабирования. TaskIQ и Celery работают вместе с RabbitMQ и Redis, что позволяет эффективно управлять задачами в приложении.

Таким образом, грамотное разделение задач на асинхронные и синхронные, а также их правильная интеграция в проект на FastAPI с использованием TaskIQ и Celery, позволяет создать высокопроизводительное и масштабируемое веб-приложение.

Основные возможности TaskIQ и Celery

TaskIQ — это современный таск-менеджер, разработанный с учетом асинхронной природы Python и особенностей работы с FastAPI. Он легко интегрируется с асинхронными фреймворками и предоставляет возможности для выполнения задач в фоне. Celery, напротив, существует на рынке дольше и является проверенным инструментом для управления задачами, но его архитектура изначально ориентирована на синхронную многопоточную работу.

TaskIQ

Плюсы:

  1. Асинхронная архитектура: TaskIQ изначально создан для работы с асинхронными фреймворками, такими как FastAPI. Он поддерживает нативную асинхронность, что позволяет оптимально использовать ресурсы сервера и избегать блокировок.
  2. Гибкость: TaskIQ легко настраивается и поддерживает различные брокеры сообщений (например, Redis, RabbitMQ), что делает его универсальным решением для различных задач.
  3. Интеграция с FastAPI: Благодаря тому, что TaskIQ разработан с учетом особенностей FastAPI, его интеграция в проект требует минимальных усилий, а производительность при этом остается на высоком уровне.

Минусы:

  1. Меньшее сообщество и экосистема: По сравнению с Celery, TaskIQ имеет менее развитую экосистему и меньшее сообщество, что может быть ограничивающим фактором в сложных проектах.
  2. Ограниченная совместимость с синхронными задачами: TaskIQ в первую очередь ориентирован на асинхронные задачи, поэтому для синхронных операций могут потребоваться дополнительные настройки.

Celery

Плюсы:

  1. Широкая поддержка и зрелость: Celery имеет обширную экосистему и большую базу пользователей, что делает его надежным выбором для управления задачами в масштабируемых проектах.
  2. Поддержка различных брокеров сообщений: Celery может работать с различными брокерами, такими как Redis, RabbitMQ, Amazon SQS и другими, что обеспечивает гибкость в выборе инфраструктуры.
  3. Встроенный планировщик задач (Scheduler): Celery поддерживает планирование задач с помощью встроенного механизма Celery Beat, что позволяет автоматизировать выполнение периодических задач.

Минусы:

  1. Ограниченная асинхронность: Celery не поддерживает асинхронность в своем стоковом виде. Это означает, что для работы с асинхронными задачами могут возникнуть сложности и дополнительные накладные расходы.
  2. Многопоточность: Celery использует многопоточность, что может привести к проблемам с масштабируемостью в системах, где асинхронные задачи являются основой.
  3. Сложность настройки: Интеграция Celery в асинхронное приложение может потребовать значительных усилий и понимания его внутренней архитектуры.

Одним из ключевых аспектов использования таск-менеджеров является работа с очередями задач. Очереди позволяют распределять задачи по приоритетам и обеспечивать их выполнение в порядке, наиболее подходящем для конкретного сценария.

TaskIQ и Celery поддерживают различные типы очередей, и выбор между ними зависит от характера задач:

  1. Обработка транзакций: В проектах, где требуется обработка транзакций, Celery может оказаться предпочтительнее благодаря своей стабильности и зрелости. Celery позволяет настроить надежные очереди для обработки транзакций, включая их подтверждение и повторное выполнение в случае сбоя.
  2. Высокоприоритетные задачи: Для задач, требующих быстрого выполнения и асинхронности, TaskIQ станет более подходящим выбором. Асинхронные очереди TaskIQ позволяют обрабатывать задачи параллельно и быстро, что особенно важно в высоконагруженных системах.
  3. Планирование задач: Оба инструмента поддерживают планировщик задач (Scheduler). TaskIQ и Celery могут использоваться для выполнения задач по расписанию, например, для еженочной обработки данных или периодического обновления кэша. Однако, если основное приложение асинхронное, TaskIQ может оказаться более эффективным, так как его планировщик работает в рамках той же асинхронной парадигмы.

Когда выбрать TaskIQ:

  • Если ваше приложение построено на асинхронной архитектуре.
  • Если требуется высокая степень параллелизма и минимизация блокировок.
  • Если важна простота интеграции с FastAPI и минимальные накладные расходы на настройку.

Когда выбрать Celery:

  • Если проект требует надежного выполнения задач в синхронном режиме, таких как обработка транзакций.
  • Если необходимо использовать сложные очереди и планировщики задач с поддержкой множества брокеров.
  • Если важна стабильность и зрелость решения с поддержкой со стороны большого сообщества.

Установка и настройка TaskIQ

Интеграция TaskIQ в FastApi

Для начала, необходимо установить TaskIQ и дополнительные библиотеки, которые будут использоваться в проекте для интеграции с RabbitMQ и Redis:

pip install taskiq taskiq-aio-pika taskiq-redis

Эти зависимости обеспечат вам возможность использовать TaskIQ для взаимодействия с брокером сообщений RabbitMQ через aio-pika, а также хранить результаты выполнения задач в Redis с использованием taskiq-redis.

После установки необходимо настроить TaskIQ для работы с FastAPI. Основная конфигурация TaskIQ будет включать настройку брокера сообщений (RabbitMQ) и бэкенда для хранения результатов (Redis). Мы создадим файл конфигурации conf/taskiq.py, который будет содержать необходимую настройку:

# conf/taskiq.py
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend

from . import settings

# Настройка брокера сообщений AioPikaBroker для работы с RabbitMQ
taskiq_broker = AioPikaBroker(
    f"amqp://{settings.RABBITMQ_DEFAULT_USER}:{settings.RABBITMQ_DEFAULT_PASS}@{settings.RABBITMQ_HOST}:{settings.RABBITMQ_PORT}",
).with_result_backend(
    # Настройка Redis как бэкенда для хранения результатов
    RedisAsyncResultBackend(f"redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}")
)

# Настройка планировщика задач с использованием TaskiqScheduler
scheduler = TaskiqScheduler(
    broker=taskiq_broker,
    sources=[LabelScheduleSource(taskiq_broker)],
)

В этом коде мы конфигурируем брокер AioPikaBroker, который подключается к RabbitMQ через URL, полученный из настроек проекта. Далее мы подключаем бэкенд для хранения результатов, используя RedisAsyncResultBackend, который позволяет сохранять результаты выполнения задач в Redis.

Теперь, когда TaskIQ настроен, необходимо интегрировать его в приложение FastAPI. Для этого мы изменим файл main.py, добавив соответствующие шаги для инициализации и завершения работы TaskIQ при старте и остановке приложения.

# main.py
from typing import AsyncIterator

from fastapi import FastAPI
from fastapi.concurrency import asynccontextmanager
from fastapi.staticfiles import StaticFiles
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from sqladmin import Admin
from src.admin import ImageAdmin, UserAdmin
from src.admin.auth import AdminAuth
from src.conf import settings, taskiq_broker
from src.conf.redis import set_async_redis_client
from src.db.session import engine
from src.routers import main_router
from starlette.middleware.cors import CORSMiddleware

# Инициализация приложения FastAPI с поддержкой жизненного цикла
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    # Инициализация клиента Redis для кэширования
    redis_client = await set_async_redis_client()
    FastAPICache.init(RedisBackend(redis_client), prefix="fastapi-cache")
    await FastAPICache.clear()

    # Запуск TaskIQ брокера, если это не процесс выполнения задачи
    if not taskiq_broker.is_worker_process:
        await taskiq_broker.startup()
    yield
    # Завершение работы TaskIQ брокера при остановке приложения
    if not taskiq_broker.is_worker_process:
        await taskiq_broker.shutdown()

# Инициализация самого приложения FastAPI
app = FastAPI(lifespan=lifespan)

# Подключение маршрутов, статических файлов и админ-панели
app.include_router(main_router)
app.mount("/static", StaticFiles(directory="src/static"), name="static")

В этом коде мы используем asynccontextmanager для определения жизненного цикла приложения FastAPI. При старте приложения инициализируется клиент Redis, который используется для кэширования. Затем проверяется, является ли текущий процесс рабочим процессом TaskIQ. Если нет, то запускается брокер TaskIQ (taskiq_broker.startup()), а при остановке приложения брокер завершается (taskiq_broker.shutdown()).

Создание и выполнение асинхронных задач

Основная идея создания асинхронных задач в TaskIQ заключается в использовании декоратора @taskiq_broker.task, который регистрирует функцию как задачу. Далее, эта задача может быть вызвана и выполнена в фоне.

Файл tasks/image_tasks.py содержит реализацию такой задачи. В предыдущей статье мы как раз не закончили обработку полученной картинки с фронтенда:

# tasks/image_tasks.py
from pydantic import UUID4
from sqlalchemy.ext.asyncio import AsyncSession
from src.conf import taskiq_broker
from src.crud import image_dao
from src.db.deps import get_async_session
from taskiq import TaskiqDepends
from pathlib import Path

@taskiq_broker.task
async def process_image_task(image_id: UUID4, db_session: AsyncSession = TaskiqDepends(get_async_session)):
    image, url = await image_dao.get(id=image_id, scheme=False, db_session=db_session)
    path = Path(image.file)
    file = await image.storage.download_file_by_url(str(url))
    image.file = await image.storage.update_object(file, image.file, str(path.parent))
    await db_session.commit()
    return f"Successfully updated image {image_id}"

В этом примере функция process_image_task асинхронно обрабатывает изображение, используя его идентификатор image_id. Она загружает файл по URL, обновляет его в хранилище и сохраняет изменения в базе данных. Эта задача выполняется в фоне, не блокируя основной поток приложения.

Чтобы запустить задачу из маршрута, можно использовать метод kiq, который отправляет задачу на выполнение:

# media/routers.py

@media_router.post("/images/treatment/", status_code=status.HTTP_200_OK)
async def create_task_image_treatment(
    image_id: UUID4 = Form(...),
    _: User = Depends(current_active_user), db_session: AsyncSession = Depends(get_async_session)
):
    await process_image_task.kiq(image_id=image_id)
    return 'Done'

Здесь маршрут /images/treatment/ принимает image_id, который затем передается в задачу process_image_task для обработки. Задача запускается асинхронно, и пользователю сразу возвращается ответ, не дожидаясь завершения задачи.

Одной из возможностей TaskIQ является поддержка отложенного выполнения задач, что полезно для таких случаев, как отправка email через определенное время. Рассмотрим, как это можно реализовать с помощью TaskIQ и Redis в качестве источника расписания.

Для отправки email через некоторое время, создадим задачу, которая будет выполняться через 2 часа после её планирования.

# tasks/email_tasks.py
from taskiq import TaskiqDepends, Context
from src.conf import taskiq_broker
from fastapi_mail import FastMail, MessageSchema
from src.conf.mail import conf as mail_conf
import datetime

@taskiq_broker.task
async def send_delayed_email(recipient: str, subject: str, body: str, context: Context = TaskiqDepends()):
    message = MessageSchema(
        recipients=[recipient],
        subject=subject,
        body=body,
        subtype="html"
    )
    fm = FastMail(mail_conf)
    await fm.send_message(message)
    print(f"Email sent to {recipient} with subject '{subject}'")

В этом примере задача send_delayed_email отправляет email с использованием библиотеки FastAPI-Mail. Email отправляется на указанный адрес с заданной темой и телом сообщения.

Теперь, чтобы отложить выполнение этой задачи на 2 часа, используем следующий код:

import datetime
from src.config import scheduler

schedule_task = await send_delayed_email.schedule_by_time(
    scheduler,
    datetime.datetime.utcnow() + datetime.timedelta(hours=2),
    recipient="user@example.com",
    subject="Scheduled Email",
    body="This is a scheduled email sent 2 hours later."
)

Метод schedule_by_time позволяет задать точное время выполнения задачи.

TaskIQ также позволяет управлять запланированными задачами. Например, вы можете отменить задачу перед её выполнением:

# Отмена задачи до её выполнения
await schedule_task.unschedule()

Эта возможность полезна, когда необходимо изменить или отменить задачу до того, как она будет выполнена.

Мониторинг и управление задачами

TaskIQ предоставляет несколько подходов для мониторинга задач. В зависимости от требований вашего проекта, вы можете выбрать наиболее подходящий метод или использовать их в комбинации.

1. Использование брокеров для мониторинга задач

Поскольку TaskIQ работает с различными брокерами сообщений, такими как Redis и RabbitMQ, можно использовать встроенные возможности этих систем для мониторинга выполнения задач.

Redis: В случае использования Redis, вы можете мониторить очередь задач напрямую через Redis CLI или специализированные инструменты, такие как redis-cli или RedisInsight. Эти инструменты позволяют отслеживать состояние очереди, количество задач, которые ожидают выполнения, и задачи, находящиеся в процессе выполнения.

RabbitMQ: Если вы используете RabbitMQ, у вас есть доступ к RabbitMQ Management Plugin, который предоставляет веб-интерфейс для мониторинга очередей задач, отслеживания потребителей, просматривая статус сообщений и другие метрики.

2. Отслеживание состояния задач и их результатов

TaskIQ поддерживает использование различных бэкендов для хранения результатов выполнения задач. Это позволяет вам сохранять статус задач и получать информацию о результате их выполнения в любой момент времени.

from src.conf import result_backend

@taskiq_broker.task
async def my_task(param1: int) -> str:
    # Некоторая обработка
    return f"Task completed with param {param1}"

# Выполнение задачи и получение ID результата
task_result = await my_task.kiq(10)

# Позже можно получить результат выполнения задачи по её ID
result = await result_backend.get_result(task_result.task_id)
print(result.result)  # Вывод результата

В этом примере результат выполнения задачи сохраняется в Redis, и позже можно получить его по task_id. Этот метод полезен для проверки состояния задач, особенно если выполнение задачи может занять продолжительное время.

3. Интеграция с инструментами мониторинга и логирования

Чтобы обеспечить централизованный мониторинг задач, можно интегрировать TaskIQ с инструментами логирования, такими как ELK Stack (Elasticsearch, Logstash, Kibana) или Prometheus с Grafana.

Логирование с помощью стандартной библиотеки logging: TaskIQ позволяет легко интегрировать логирование задач через стандартную библиотеку logging. Это позволяет сохранять информацию о задачах, их статусах и возможных ошибках в лог-файлы, которые затем могут быть проанализированы с использованием инструментов мониторинга.

from taskiq import TaskiqDepends, Context
from src.conf import taskiq_broker, logger

@taskiq_broker.task
async def monitored_task(context: Context = TaskiqDepends()) -> None:
    try:
        # Некоторая логика задачи
        logger.info(f"Task started with context: {context}")
        # Выполнение задачи...
        logger.info(f"Task completed successfully")
    except Exception as e:
        logger.error(f"Task failed with error: {e}")

Этот код интегрирует базовое логирование в задачу TaskIQ, где информация о старте и завершении задачи, а также об ошибках, будет записываться в лог-файл.

Интеграция с Prometheus и Grafana: Для более продвинутого мониторинга и алертинга, можно настроить интеграцию TaskIQ с Prometheus для сбора метрик и Grafana для визуализации. Это позволяет получить полную картину выполнения задач, от времени выполнения до количества ошибок, и настроить уведомления при возникновении проблем.

Пример сбора метрик для задач можно реализовать с помощью кастомного экспорта метрик в Prometheus:

from prometheus_client import Counter, start_http_server

# Инициализация метрик
task_success_counter = Counter('task_success_total', 'Total number of successfully completed tasks')
task_failure_counter = Counter('task_failure_total', 'Total number of failed tasks')

@taskiq_broker.task
async def monitored_task(context: Context = TaskiqDepends()) -> None:
    try:
        # Логика задачи
        # Если задача выполнена успешно
        task_success_counter.inc()
    except Exception as e:
        # Если произошла ошибка
        task_failure_counter.inc()
        raise e

# Запуск сервера метрик
start_http_server(8000)

Этот пример показывает, как использовать Prometheus для отслеживания количества успешно завершенных и проваленных задач.

Для оперативного реагирования на сбои или критические ситуации важно настроить уведомления. TaskIQ можно интегрировать с системами уведомлений, такими как Slack, Email, или SMS через сторонние сервисы.

import requests
from taskiq import TaskiqDepends, Context
from src.conf import taskiq_broker

SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/your-webhook-url"

def send_slack_notification(message: str):
    requests.post(SLACK_WEBHOOK_URL, json={"text": message})

@taskiq_broker.task
async def task_with_notification(context: Context = TaskiqDepends()) -> None:
    try:
        # Выполнение задачи...
        send_slack_notification(f"Task {context.task_name} completed successfully")
    except Exception as e:
        send_slack_notification(f"Task {context.task_name} failed with error: {e}")
        raise e

В этом примере задача отправляет уведомление в Slack о своём успешном выполнении или об ошибке. Такой подход позволяет оперативно информировать команду разработки о состоянии системы.

Использование Celery

Установка и конфигурация

Celery — это таск-менеджер, который используется для выполнения синхронных задач в фоновом режиме. Он идеально подходит для управления задачами, требующими высокой надежности и точности, например, регулярное создание резервных копий базы данных или выполнение транзакций. Celery поддерживает множество брокеров сообщений и бэкендов для хранения результатов, таких как Redis и RabbitMQ, что делает его гибким инструментом для разнообразных сценариев использования.

Чтобы интегрировать Celery в проект FastAPI, необходимо установить Celery и дополнительные библиотеки для взаимодействия с брокерами сообщений и базами данных.

pip install celery celery-sqlalchemy-scheduler kombu

Основной шаг после установки — настройка Celery в вашем проекте. Конфигурация включает определение брокера сообщений, бэкенда для хранения результатов и маршрутизацию задач. Рассмотрим конфигурацию в файле conf/celery.py.

# conf/celery.py
from celery import Celery
from kombu import Queue

class CeleryAppFactory:
    _celery_app = None

    @classmethod
    def get_celery_app(cls):
        from . import settings
        if cls._celery_app is None:
            cls._celery_app = Celery(
                "celery_app",
                broker=str(settings.CELERY_BROKER_URL),
                backend=str(settings.SYNC_CELERY_DATABASE_URI),
                include=("src.tasks.db_task",),
            )

            cls._celery_app.conf.update(
                {
                    'beat_dburi': str(settings.SYNC_CELERY_BEAT_DATABASE_URI),
                    'worker_log_level': 'INFO',
                    'beat_log_level': 'INFO',
                    'broker_transport_options': {
                        'max_retries': 5,
                        'interval_start': 0,
                        'interval_step': 0.5,
                        'interval_max': 3
                    },
                    'task_queues': (
                        Queue('celery_tasks', routing_key='celery_tasks.#'),
                    ),
                    'task_routes': {
                        'src.tasks.db_task.*': {'queue': 'celery_tasks'},
                    },
                }
            )
            cls._celery_app.autodiscover_tasks()

        return cls._celery_app
  1. Инициализация Celery:
    • CeleryAppFactory.get_celery_app() создаёт и конфигурирует экземпляр Celery. Метод использует брокер сообщений, указанный в settings.CELERY_BROKER_URL, и бэкенд для хранения результатов, указанный в settings.SYNC_CELERY_DATABASE_URI.
    • Параметр include указывает на модули, в которых находятся задачи, которые Celery будет автоматически обнаруживать и регистрировать.
  2. Конфигурация логирования и брокера сообщений:
    • Уровни логирования для воркера и планировщика (worker_log_level и beat_log_level) установлены в INFO, что позволяет отслеживать важные события выполнения задач.
    • Опции broker_transport_options настроены для управления поведением брокера сообщений: количество попыток, интервал между попытками и максимальный интервал.
  3. Очереди и маршрутизация задач:
    • task_queues определяет очередь celery_tasks, куда будут отправляться задачи с маршрутом celery_tasks.#.
    • task_routes настраивает маршрутизацию задач из модуля src.tasks.db_task в очередь celery_tasks.
  4. Автоматическое обнаружение задач:
    • Метод autodiscover_tasks() позволяет Celery автоматически находить и регистрировать задачи в указанных модулях, что упрощает масштабирование и организацию кода.

Оптимизация конфигурации Celery — важный этап, который влияет на производительность и устойчивость вашего приложения. Рассмотрим несколько рекомендаций:

  1. Настройка параметров ретрая:
    • Параметры max_retries, interval_start, interval_step и interval_max управляют поведением системы при возникновении ошибок. Установка этих параметров на разумные значения позволяет избежать избыточных попыток, которые могут перегрузить систему.
  2. Разделение очередей задач:
    • Если ваше приложение выполняет задачи с различными приоритетами (например, задачи резервного копирования базы данных и задачи отправки email), рассмотрите возможность создания отдельных очередей и маршрутизации задач в зависимости от их приоритета.
  3. Использование планировщика Celery Beat:
    • Celery Beat позволяет запускать задачи по расписанию, например, ежедневно или еженедельно. Если ваш проект нуждается в регулярных задачах (например, создание резервных копий базы данных), включение Celery Beat и настройка расписания задач с помощью crontab будет отличным решением.
  4. Мониторинг и управление производительностью:
    • Внедрение мониторинга выполнения задач с помощью инструментов, таких как Flower или Celery Event Viewer, поможет отслеживать выполнение задач, их состояние и время выполнения, что важно для своевременного выявления проблем.

Разработка синхронных задач

Celery — инструмент для выполнения синхронных задач в фоновом режиме. В этом разделе мы рассмотрим примеры создания синхронных задач с использованием Celery в FastAPI. Мы разберем несколько конкретных кейсов, таких как генерация отчетов и управление транзакциями платежей. Также обсудим, как эффективно управлять и использовать разные очереди для организации задач с разными приоритетами и требованиями.

Пример 1: Генерация отчетов

Генерация отчетов — типичный пример задачи, которая может занимать значительное время и ресурсы. Вместо того чтобы выполнять эту задачу синхронно и блокировать основной поток выполнения приложения, можно использовать Celery для выполнения этой задачи в фоне.

# tasks/report_tasks.py
import datetime
from celery import shared_task

@shared_task
def generate_report(report_type: str, start_date: datetime.date, end_date: datetime.date) -> str:
    # Симуляция сложной логики генерации отчета
    report_data = f"Report: {report_type} from {start_date} to {end_date}"
    report_path = f"/reports/{report_type}_{start_date}_{end_date}.txt"
    
    with open(report_path, "w") as report_file:
        report_file.write(report_data)

В этом примере задача generate_report принимает параметры report_type, start_date и end_date, генерирует отчет и сохраняет его в файловую систему. 

Вызов задачи из маршрута FastAPI

# routers/report_router.py
from fastapi import APIRouter, HTTPException
from tasks.report_tasks import generate_report
import datetime

report_router = APIRouter()

@report_router.post("/generate_report/")
async def generate_report_endpoint(report_type: str, start_date: datetime.date, end_date: datetime.date):
    task = generate_report.delay(report_type, start_date, end_date)
    
    # Немедленно возвращаем ID задачи пользователю
    return {"task_id": task.id}

Маршрут /generate_report/ запускает задачу генерации отчета в фоне, немедленно возвращая пользователю ID задачи. Пользователь может использовать этот ID для отслеживания статуса выполнения задачи.

Пример 2: Управление транзакциями платежей

Управление транзакциями — важная часть любого приложения, связанного с обработкой платежей. Такие задачи требуют надежности и последовательности, что делает Celery идеальным инструментом для их реализации.

# tasks/payment_tasks.py
from celery import shared_task
from some_payment_gateway import PaymentGateway

@shared_task(bind=True, max_retries=3)
def process_payment(self, user_id: int, payment_details: dict) -> str:
    try:
        gateway = PaymentGateway()
        transaction_id = gateway.process(user_id, payment_details)
        return f"Payment processed successfully, transaction ID: {transaction_id}"
    except Exception as exc:
        # В случае ошибки повторить задачу через 60 секунд
        raise self.retry(exc=exc, countdown=60)

В этой задаче process_payment используется внешняя библиотека PaymentGateway для обработки платежа. Если при выполнении задачи происходит ошибка, Celery автоматически попытается повторить выполнение задачи до трёх раз с интервалом в 60 секунд между попытками.

Вызов задачи из маршрута FastAPI

# routers/payment_router.py
from fastapi import APIRouter, HTTPException
from tasks.payment_tasks import process_payment

payment_router = APIRouter()

@payment_router.post("/process_payment/")
async def process_payment_endpoint(user_id: int, payment_details: dict):
    task = process_payment.delay(user_id, payment_details)
    
    # Немедленно возвращаем ID задачи пользователю
    return {"task_id": task.id}

Маршрут /process_payment/ запускает задачу обработки платежа в фоне, сразу же возвращая пользователю ID задачи для последующего отслеживания.

В сложных проектах часто требуется распределять задачи по разным очередям, чтобы обеспечить приоритетное выполнение более критичных задач или разделение нагрузки. Celery позволяет гибко управлять очередями и маршрутизацией задач.

Пример использования нескольких очередей

# conf/celery.py
from celery import Celery
from kombu import Queue

app = Celery("celery_app")

app.conf.task_queues = (
    Queue('high_priority', routing_key='high.#'),
    Queue('low_priority', routing_key='low.#'),
)

app.conf.task_routes = {
    'tasks.report_tasks.generate_report': {'queue': 'low_priority', 'routing_key': 'low.reports'},
    'tasks.payment_tasks.process_payment': {'queue': 'high_priority', 'routing_key': 'high.payments'},
}

В этой конфигурации мы создали две очереди: high_priority для задач с высоким приоритетом и low_priority для менее критичных задач. Задачи маршрутизируются в соответствующие очереди на основе их routing_key.

Задача обработки платежа теперь будет выполняться в очереди с высоким приоритетом, обеспечивая минимальные задержки при обработке критичных операций, тогда как задачи генерации отчетов будут выполняться в фоне с меньшим приоритетом.

Оптимизация и масштабирование задач

Оптимизация производительности задач

1. Использование подходящих брокеров и бэкендов: Выбор брокера сообщений и бэкенда для хранения результатов напрямую влияет на производительность Celery.

  • Redis: Подходит для высокопроизводительных систем благодаря своей скорости и способности обрабатывать большое количество операций в секунду. Однако, стоит учитывать, что Redis хранит данные в памяти, что может стать узким местом при очень большом количестве задач.
  • RabbitMQ: Отличается надежностью и богатым функционалом для работы с очередями, включая подтверждения доставки сообщений и более сложную маршрутизацию. Это делает его идеальным выбором для критичных задач, требующих высокой надежности.

2. Ограничение времени выполнения задач: Некоторые задачи могут занять значительно больше времени, чем предполагалось, что может привести к заблокированным или зависшим процессам. Настройка ограничения времени выполнения задач позволяет избежать подобных ситуаций.

# conf/celery.py

app.conf.update(
    task_time_limit=300,  # Ограничение времени выполнения задачи в секундах
    task_soft_time_limit=250,  # Мягкое ограничение времени для предупреждений
)

Эти параметры позволяют задать жесткие и мягкие лимиты на время выполнения задач, что помогает избежать их зависания и более эффективно управлять ресурсами.

3. Настройка Prefetch Limit: Для управления нагрузкой на воркеры Celery можно настроить параметр worker_prefetch_multiplier, который определяет количество задач, которые воркер будет брать на обработку одновременно.

# conf/celery.py

app.conf.update(
    worker_prefetch_multiplier=1,  # Воркер будет обрабатывать одну задачу за раз
)

Этот параметр особенно полезен, если задачи могут существенно различаться по времени выполнения. Он позволяет избежать ситуации, когда воркер загружен несколькими тяжелыми задачами одновременно, что может привести к задержкам.

Масштабирование задач

1. Масштабирование воркеров: Celery позволяет легко масштабировать количество воркеров, что важно в условиях увеличения нагрузки. Можно настроить количество воркеров, которые будут одновременно обрабатывать задачи, на основе текущей нагрузки на систему.

celery -A my_project worker --concurrency=10

Параметр --concurrency определяет количество одновременно работающих воркеров. Однако, его значение следует устанавливать с учетом доступных системных ресурсов (например, количества ядер процессора).

2. Горизонтальное масштабирование: Горизонтальное масштабирование предполагает запуск дополнительных экземпляров Celery воркеров на разных серверах или контейнерах. Это позволяет эффективно распределить нагрузку между несколькими экземплярами приложения.

docker-compose up --scale worker=3

Используя Docker, можно легко увеличить количество экземпляров воркеров для обработки задач, что повышает устойчивость системы к высокому уровню запросов.

Управление временем жизни задач и ретраями

1. Настройка времени жизни задач: Иногда задачи могут оставаться в очереди слишком долго, что может привести к их устареванию. Для предотвращения выполнения устаревших задач можно настроить время жизни задач.

# conf/celery.py

app.conf.update(
    task_expires=3600,  # Задача будет удалена через час, если не выполнена
)

Этот параметр позволяет удалить задачи из очереди, если они не были выполнены в течение заданного времени.

2. Настройка ретраев: Для повышения устойчивости системы необходимо настроить параметры автоматического повторного выполнения задач (ретраев) в случае их неудачного завершения, как мы рассматривали ранее.

# tasks/payment_tasks.py

@shared_task(bind=True, max_retries=3)
def process_payment(self, user_id: int, payment_details: dict) -> str:
    try:
        gateway = PaymentGateway()
        transaction_id = gateway.process(user_id, payment_details)
        return f"Payment processed successfully, transaction ID: {transaction_id}"
    except Exception as exc:
        # В случае ошибки повторить задачу через 60 секунд
        raise self.retry(exc=exc, countdown=60)

Повышение надежности и устойчивости системы

1. Мониторинг и логирование: Для повышения устойчивости системы важно настроить мониторинг и логирование задач. Это помогает оперативно обнаруживать проблемы и реагировать на них.

  • Flower: Веб-интерфейс для мониторинга состояния воркеров и очередей Celery в реальном времени.
  • Prometheus и Grafana: Используются для сбора и визуализации метрик выполнения задач, что позволяет отслеживать производительность и своевременно выявлять узкие места.

2. Отладка и управление задачами: В процессе разработки и эксплуатации системы важно иметь возможность управлять задачами, отслеживать их состояние и устранять проблемы.

celery -A my_project inspect active  # Проверка активных задач
celery -A my_project control revoke task_id  # Отмена задачи по ID

Эти команды позволяют управлять задачами на уровне выполнения, обеспечивая гибкость в управлении приложением.

Создание CLI для управления

Управление сложными веб-приложениями требует гибкости и удобства в работе с различными окружениями, такими как режимы отладки (debug) и подготовки к релизу (staging). Создание интерфейса командной строки (CLI) для управления приложением позволяет автоматизировать рутинные задачи, такие как запуск Docker-контейнеров, настройка окружений и управление состоянием приложения.

Для создания CLI используется библиотека click, которая упрощает разработку командной строки на Python. Файл app.py содержит команды для управления приложением, включая запуск и остановку контейнеров, а также работу в различных режимах.

#!/usr/bin/env python3

import os
import subprocess
import sys

import click


def check_file_exists(file_path):
    if not os.path.exists(file_path):
        print(f"File '{file_path}' does not exist.")
        sys.exit(1)


def start_docker_compose(compose_file, detached=False, custom_env=None):
    script_dir = os.path.dirname(os.path.abspath(__file__))
    env_file = os.path.join(script_dir, '.env')

    compose_file = os.path.abspath(compose_file)

    check_file_exists(env_file)
    check_file_exists(compose_file)

    env_vars = os.environ.copy()
    if custom_env:
        env_vars.update(custom_env)

    cmd = ['docker-compose', '-f', compose_file, 'up', '--build']
    if detached:
        cmd.append('-d')

    result = subprocess.run(cmd, env=env_vars)
    if result.returncode != 0:
        print(f"Error: docker-compose failed with return code {result.returncode}")
        sys.exit(result.returncode)


@click.group()
def cli():
    pass


@cli.command()
def debug():
    """Запуск приложения в режиме отладки"""
    custom_env = {
        'PGADMIN_LISTEN_PORT': '8090',
        'PGADMIN_DEFAULT_EMAIL': 'admin@admin.ru',
        'PGADMIN_DEFAULT_PASSWORD': 'admin_password'
    }
    start_docker_compose('infra/nginx/docker-compose.yml', detached=True)
    start_docker_compose('infra/debug/docker-compose.yml', custom_env=custom_env)


@cli.command()
def stage():
    """Запуск приложения в режиме подготовки к релизу (staging)"""
    custom_env = {
        'PGADMIN_LISTEN_PORT': '8090',
        'PGADMIN_DEFAULT_EMAIL': 'admin@admin.ru',
        'PGADMIN_DEFAULT_PASSWORD': 'admin_password'
    }
    start_docker_compose('infra/nginx/docker-compose.yml', detached=True)
    start_docker_compose('infra/stage/docker-compose.yml', custom_env=custom_env)


@cli.command()
@click.option('--clean', is_flag=True, help="Удалить висячие образы и тома после остановки контейнеров")
def stop(clean):
    """Остановка всех контейнеров и очистка ресурсов"""
    try:
        result = subprocess.run('docker ps -q', shell=True, check=True, capture_output=True, text=True)
        container_ids = result.stdout.strip().split()

        if container_ids:
            container_ids_str = ' '.join(container_ids)
            subprocess.run(f'docker stop {container_ids_str}', shell=True, check=True)
        else:
            print("No running containers to stop.")

        result = subprocess.run('docker ps -a -q', shell=True, check=True, capture_output=True, text=True)
        all_container_ids = result.stdout.strip().split()

        if all_container_ids:
            all_container_ids_str = ' '.join(all_container_ids)
            subprocess.run(f'docker rm {all_container_ids_str}', shell=True, check=True)
        else:
            print("No containers to remove.")

        if clean:
            print("Cleaning up dangling images and volumes...")
            subprocess.run('docker images -f "dangling=true" -q | xargs -r docker rmi', shell=True, check=True)
            subprocess.run('docker volume ls -qf dangling=true | grep -E "^[0-9a-f]{64}$" | xargs -r docker volume rm', shell=True, check=True)

    except subprocess.CalledProcessError as e:
        print(f"An error occurred: {e}")
        sys.exit(1)


if __name__ == '__main__':
    cli()

1. Команда debug:

  • Запускает приложение в режиме отладки. В этом режиме FastAPI запускается локально, а все остальные сервисы (например, базы данных, брокеры сообщений) работают в контейнерах Docker.
  • Команда использует docker-compose для поднятия инфраструктуры с конфигурацией из infra/nginx/docker-compose.yml и infra/debug/docker-compose.yml.

2. Команда stage:

  • Запускает приложение в режиме подготовки к релизу (staging). В этом режиме все сервисы, включая FastAPI, работают в контейнерах Docker.
  • Команда запускает Docker-композицию из infra/nginx/docker-compose.yml и infra/stage/docker-compose.yml.

3. Команда stop:

  • Останавливает все работающие контейнеры и, при необходимости, очищает висячие Docker-образы и тома.
  • Команда полезна для освобождения ресурсов и подготовки к запуску нового окружения.

Создание интерфейса командной строки (CLI) для управления приложением на FastAPI значительно упрощает процессы разработки и эксплуатации. CLI позволяет автоматизировать рутинные задачи, такие как запуск и остановка Docker-контейнеров, управление окружениями (debug, staging). Такой подход улучшает удобство работы с приложением, повышает его масштабируемость и упрощает управление сложными процессами.

PS: Код проекта доступен на GitHub по следующей ссылке: https://github.com/exp-ext/fastapi_template. Обратите внимание, что он может не полностью соответствовать описанному здесь, так как проект находится в процессе развития. В дальнейшем шаблон будет дополняться новыми методами и проходить рефакторинг.


Читайте также:

ChatGPT
Eva
💫 Eva assistant