Celery
Celery
это отличная система очередей задач, которая позволяет эффективно управлять асинхронными задачами в Python. Она позволяет разделять сложные процессы на более мелкие задачи, которые могут выполняться параллельно и асинхронно, что увеличивает производительность и масштабируемость приложения.
Основные компоненты Celery:
- Task - это базовый элемент Celery, который представляет собой отдельную задачу, которая будет выполнена асинхронно. Каждая задача должна быть определена в отдельном модуле Python, который содержит описание задачи и ее реализацию.
- Queue - это очередь задач, которые будут выполнены асинхронно. Каждая задача помещается в очередь и ожидает, когда ее будет обработана.
- Worker - это процесс, который запускает и обрабатывает задачи из очереди. Каждый worker запускается в отдельном процессе и может обрабатывать несколько задач одновременно.
- Broker - это посредник между задачами и worker'ами. Он предоставляет очередь задач и хранит информацию о задачах, которые должны быть выполнены. В качестве брокера может выступать RabbitMQ, Redis или другие системы очередей сообщений.
Рассмотрим как работает Celery + Redis в Django:
Настройка
Для начала работы с Celery, нужно создать объект Celery и определить задачи.
Создадим Django приложение и настроим Celery для использования Redis в качестве брокера сообщений. В файле settings.py добавив следующие настройки:
# settings.py
import os
from dotenv import load_dotenv
load_dotenv()
REDIS_URL = os.getenv('REDIS_URL', 'redis://127.0.0.1:6379')
BROKER_URL = REDIS_URL
CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_DEFAULT_QUEUE = 'default'
# .env
REDIS_URL=redis://redis:6379
В этом примере мы настроили Redis в качестве брокера сообщений, который будет использоваться для хранения и обработки задач. Мы также использовали базу REDIS в качестве результата выполнения задач.
В папке с проектом рядом с файлом settings.py необходимо создать файл celery.py. Это файл конфигурации Celery для Django проекта. Он настраивает и запускает экземпляр Celery приложения, определяет таймеры, брокер сообщений и задачи.
Ниже приведен пример простого файла celery.py:
# celery.py
import os
from celery import Celery
# Устанавливаем переменную окружения для Django settings module
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# Загружаем конфигурацию из файла настроек Django
app.config_from_object('django.conf:settings', namespace='CELERY')
# Обнаруживаем задачи в приложениях Django
app.autodiscover_tasks()
В этом примере мы создаем экземпляр Celery приложения с именем "myproject". Затем мы загружаем настройки из объекта "django.conf:settings" и указываем пространство имен "CELERY". Наконец, мы используем метод "autodiscover_tasks", чтобы автоматически обнаруживать задачи в приложениях Django.
Теперь рассмотрим некоторые настройки Celery, которые могут быть определены в файле celery.py:
BROKER_URL: URL брокера сообщений, который Celery будет использовать для передачи сообщений между приложениями. Например, для использования Redis в качестве брокера сообщений можно задать BROKER_URL = 'redis://localhost:6379/0'.
CELERY_RESULT_BACKEND: URL бэкенда результатов, используемого Celery для хранения результатов выполнения задач. Например, для использования Redis в качестве бэкенда результатов можно задать CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'.
CELERY_TASK_SERIALIZER: Сериализатор, который Celery будет использовать для сериализации аргументов задач. По умолчанию используется "pickle". Можно также использовать "json" или "msgpack".
CELERY_RESULT_SERIALIZER: Сериализатор, который Celery будет использовать для сериализации результатов задач. По умолчанию используется "pickle". Можно также использовать "json" или "msgpack".
CELERY_ACCEPT_CONTENT: Список допустимых форматов сериализации. По умолчанию это ['json', 'pickle'].
CELERY_TIMEZONE: Часовой пояс, используемый Celery для расписаний задач. По умолчанию это часовой пояс системы.
CELERY_ENABLE_UTC: Определяет, используется ли Celery для расписаний задач UTC-время или локальное время. По умолчанию установлено значение True.
CELERYD_HIJACK_ROOT_LOGGER: Эта опция позволяет перенаправить логгирование в Celery, так что все логи, например, в приложениях Django, будут записываться в Celery. По умолчанию эта опция отключена, поэтому если вы хотите использовать эту функцию, необходимо установить ее значение в True.
В той же папке с файлом settings.py рекомендуется создать файл инициализации с кодом.
# __init__.py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
В итоге у нас получается такая структура проекта:
├─ mysite/
│ ├─ mysite/
│ │ ├─ __init__.py
│ │ ├─ settings.py
│ │ ├─ celery.py
│ │ ├─ urls.py
│ │ └─ wsgi.py
│ ├─ app/
│ │ ├─ __init__.py
│ │ ├─ tasks.py
│ │ └─ views.py
│ ├─ manage.py
│ ├─ requirements.txt
│ └─ Dockerfile
├─docker-compose.yml
└─.env
Пример Dockerfile и docker-compose.yaml:
# Dockerfile
FROM python:3.8
WORKDIR /app
COPY requirements.txt .
RUN pip install pip --upgrade && \
pip install -r requirements.txt
COPY . /app
# docker-compose.yaml
version: '3'
services:
web:
build: .
command: python manage.py runserver 0.0.0.0:8000
volumes:
- .:/app
ports:
- "8000:8000"
depends_on:
- redis
- celery
celery:
build: .
command: celery worker -A mysite --loglevel=info
volumes:
- .:/app
environment:
- DJANGO_SETTINGS_MODULE=mysite.settings
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- redis
redis:
image: redis:6.0.6
ports:
- "6379:6379"
Не забудьте создать файл `requirements.txt`, в котором указать все зависимости вашего приложения, вот пример:
# requirements.txt
Django>=3.2.3
redis>=3.5.3
celery>=5.0.5
После запуска `docker-compose up`, ваше Django приложение должно быть доступно по http://localhost:8000/
Задачи
@app.task
это декоратор, который определяет задачу для конкретного приложения Celery. Чтобы создать задачу в Celery, мы должны определить функцию-задачу и пометить ее декоратором @app.task. Давайте определим простую задачу, которая просто выводит сообщение в консоль:
# tasks.py
from mysite.celery import app
@app.task
def say_hello():
print('Hello, world!')
В этом примере мы определили функцию say_hello() как задачу, используя декоратор @app.task. Внутри функции мы просто выводим сообщение в консоль.
Мы можем вызвать эту задачу из Django приложения следующим образом:
# views.py
from django.shortcuts import render
from tasks import say_hello
def home(request):
say_hello.delay()
return render(request, 'home.html')
Здесь мы импортируем функцию say_hello() из модуля tasks.py и вызываем ее асинхронно с помощью метода delay(). В результате задача будет добавлена в очередь и выполнена асинхронно.
Мы можем передать аргументы в задачу, добавив их в качестве аргументов для функции, которая определяет задачу. Например:
@app.task
def add_numbers(a, b):
return a + b
Мы можем вызвать эту задачу следующим образом:
add_numbers.delay(4, 5)
Эта задача будет добавлена в очередь и выполнена асинхронно.
Также можно передавать объекты Django в качестве аргументов. Например:
from django.contrib.auth.models import User
@app.task
def send_email(user_id):
user = User.objects.get(id=user_id)
Мы можем вызвать эту задачу и передать объект Django следующим образом:
from django.contrib.auth.models import User
user = User.objects.get(id=1)
send_email.delay(user.id)
Здесь мы передаем идентификатор пользователя в качестве аргумента задачи send_email(), а затем внутри функции используем этот идентификатор, чтобы получить объект пользователя из базы данных Django.
Также можно добавить дополнительные аргументы, которые будут использоваться при запуске задачи. Например:
@app.task
def send_email(to, subject, message):
# код для отправки электронного письма
Мы можем вызвать эту задачу и передать ей дополнительные аргументы следующим образом:
send_email.delay('user@example.com', 'Hello', 'How are you doing?')
В этом примере мы передаем три дополнительных аргумента задаче send_email() - адрес электронной почты получателя, тему письма и текст сообщения.
Кроме того, мы можем указать задержку перед запуском задачи с помощью метода .apply_async():
send_email.apply_async(args=['user@example.com', 'Test Email', 'This is a test email.'], countdown=10)
В этом примере мы отправляем задачу на выполнение через 10 секунд после ее добавления в очередь. Мы также передаем аргументы задачи с помощью списка args.
это декоратор, который можно использовать для определения задачи в любом месте вашего проекта Django. Эта задача может быть выполнена из любого экземпляра приложения Celery. В отличие от @app.task, @shared_task не привязан к конкретному приложению и может использоваться в разных приложениях проекта.
Например, чтобы отправить электронное письмо, мы можем создать функцию send_email() и добавить ее в очередь следующим образом:
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_email(subject, message, recipient_list):
send_mail(subject, message, recipient_list, ['from@example.com'])
# вызов задачи
send_email.delay('Test subject', 'Test message', ['to@example.com'])
В этом примере мы создаем функцию send_email(), которая использует стандартный метод Django send_mail() для отправки электронного письма. Затем мы декорируем эту функцию с помощью декоратора @shared_task, чтобы Celery мог распознать ее как задачу, которую нужно выполнить асинхронно. Наконец, мы добавляем задачу в очередь, используя метод delay().
Также можно передавать объекты Django в качестве аргументов, как мы обсуждали ранее:
from django.contrib.auth.models import User
@shared_task
def send_welcome_email(user_id):
user = User.objects.get(id=user_id)
send_email.delay('Welcome to our website', f'Hello {user.username}, welcome to our website!', [user.email])
# вызов задачи
user = User.objects.create(username='testuser', email='testuser@example.com')
send_welcome_email.delay(user.id)
В этом примере мы создаем функцию send_welcome_email(), которая использует объект User из Django для получения имени пользователя и электронного адреса. Затем мы используем метод send_email.delay() для отложенного выполнения задачи отправки электронного письма с данными пользователя в качестве аргументов.
Таким образом, мы можем использовать Celery в связке с Redis и Django для асинхронной обработки задач и выполнения задач в фоновом режиме.
Опций задач
Конфигурация задач может быть изменена с помощью опций, которые могут быть заданы при использовании декоратор.
Некоторые примеры опций:
- name: название задачи. Если опция не указана, то имя задачи будет сгенерировано автоматически на основе имени функции. Например:
@app.task(name="send_email_task")
def send_email(to, body):
# отправка электронной почты
- bind: флаг, указывающий, должен ли первый аргумент метода быть экземпляром класса задачи. Этот флаг полезен, когда методы класса используют общие ресурсы и нуждаются в доступе к контексту задачи. Например:
@app.task(bind=True)
def send_email(self, to, body):
# использование self.request
- max_retries: максимальное количество попыток выполнения задачи в случае возникновения исключения. По умолчанию равно 3. Например:
@app.task(max_retries=5)
def send_email(to, body):
# отправка электронной почты
- retry_backoff: определяет, как долго ждать перед повторной попыткой выполнения задачи, в случае возникновения исключения. Эта опция должна быть указана в секундах или в функции, которая принимает аргумент retry_number и возвращает задержку в секундах. Например:
@app.task(retry_backoff=60) # задержка в 60 секунд перед повторной попыткой
def send_email(to, body):
# отправка электронной почты
- queue: имя очереди, в которую будут помещаться задачи. Если очередь не существует, то она будет создана автоматически. Например:
@app.task(queue="email")
def send_email(to, body):
# отправка электронной почты
- priority: приоритет задачи. Эта опция используется для установки приоритета выполнения задачи. Чем ниже значение, тем выше приоритет. Например:
@app.task(priority=2)
def send_email(to, body):
# отправка электронной почты
- gnore_result: Эта опция указывает на то, следует ли игнорировать результат задачи. По умолчанию, Celery сохраняет результат каждой задачи в backend. Если вы хотите, чтобы результат не сохранялся, то установите ignore_result=True при создании задачи. Например:
@shared_task(ignore_result=True)
def add(x, y):
return x + y
- task_reject_on_worker_lost: Эта опция указывает, что Celery должен отменить задачу, если ее исполнитель (worker) потерян. Это может произойти, например, если воркер упал или был перезапущен. По умолчанию, Celery не отменяет задачу и она остается в очереди. Чтобы использовать эту опцию, установите task_reject_on_worker_lost=True при создании задачи. Например:
@shared_task(task_reject_on_worker_lost=True)
def add(x, y):
return x + y
- acks_late: Эта опция указывает, когда задача должна быть отмечена как выполненная. По умолчанию, Celery отмечает задачу выполненной сразу после того, как ее исполнитель (worker) завершит выполнение. Если установить acks_late=True, Celery отметит задачу выполненной только после того, как результат был сохранен в backend. Это может быть полезно, если вы хотите убедиться, что результат задачи был сохранен, прежде чем отметить задачу выполненной. Например:
@shared_task(acks_late=True)
def add(x, y):
return x + y
Управление приоритетами задач
В Celery мы можем управлять приоритетами задач, добавляя их в различные очереди с разными приоритетами. Это позволяет нам управлять порядком выполнения задач и дать приоритет наиболее важным задачам.
Для определения приоритета задачи мы можем использовать параметр priority при добавлении задачи в очередь. Чем ниже значение приоритета, тем выше приоритет задачи.
Например, допустим, что у нас есть несколько задач, которые должны быть выполнены в разном порядке. Мы можем добавить эти задачи в разные очереди с разными приоритетами:
# Очередь для задач с низким приоритетом
celery_app.conf.task_queues = (
Queue('low_priority', routing_key='low_priority.#'),
Queue('default', routing_key='default.#'),
Queue('high_priority', routing_key='high_priority.#'),
)
# Определение задачи с разными приоритетами
@shared_task
def send_welcome_email(user_id):
# Определение задачи с низким приоритетом
send_email.delay('Welcome to our website', f'Hello {user.username}, welcome to our website!', [user.email], priority=1)
@shared_task
def send_important_email(user_id):
# Определение задачи с высоким приоритетом
send_email.delay('Important message', f'Hello {user.username}, this is an important message!', [user.email], priority=10)
В этом примере мы создаем три очереди с разными приоритетами: low_priority, default и high_priority. Затем мы определяем две задачи: send_welcome_email() и send_important_email(). Первая задача имеет низкий приоритет (1), а вторая задача имеет высокий приоритет (10).
Когда мы вызываем эти задачи с помощью метода delay(), они добавляются в соответствующие очереди с приоритетами, которые мы определили.
Для настройки очереди задач в Celery мы можем определить несколько параметров в нашем файле настроек Django, таких как CELERY_BROKER_URL, CELERY_RESULT_BACKEND и CELERY_TASK_SERIALIZER.
Например, в файле settings.py мы можем определить настройки очереди задач следующим образом:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_SERIALIZER = 'json'
В этом примере мы указываем URL-адрес Redis в качестве места хранения нашей очереди задач и результатов выполнения. Мы также указываем, что мы будем использовать сериализацию JSON для передачи данных между нашими работниками.
Кроме того, мы можем определить дополнительные параметры для настройки очереди задач, такие как CELERY_TASK_DEFAULT_QUEUE и CELERY_TASK_DEFAULT_EXCHANGE.
Например, мы можем определить очередь задач для определенного типа задач следующим образом:
# Настройки для очереди задач по умолчанию
CELERY_TASK_DEFAULT_QUEUE = 'default'
# Настройки для очереди задач по типу
CELERY_QUEUES = {
'emails': {
'exchange': 'emails',
'binding_key': 'emails'
},
'payments': {
'exchange': 'payments',
'binding_key': 'payments'
}
}
В этом примере мы определяем очереди задач по типу - для электронных писем и платежей. Каждая очередь имеет свой собственный обмен и ключ связывания, чтобы задачи могли быть маршрутизированы правильно.
Таким образом, мы можем определять настройки очереди задач в файле настроек Django и использовать их для управления выполнением задач в Celery + Redis + Django.
Планирование задач
Для создания периодических задач в Celery используется модуль celery.task.schedules, который определяет расписание выполнения задач. Можно задать расписание как через константы, так и через строку с cron-выражением.
Пример создания периодической задачи, которая будет выполняться каждые 5 минут:
from celery.task.schedules import crontab
from celery.decorators import periodic_task
@periodic_task(run_every=crontab(minute='*/5'))
def my_periodic_task():
# Тело задачи
В этом примере используется объект crontab, который позволяет задать расписание в формате cron. В данном случае задача будет выполняться каждые 5 минут.
Также можно задать расписание через константы, например:
from datetime import timedelta
from celery.decorators import periodic_task
@periodic_task(run_every=timedelta(minutes=5))
def my_periodic_task():
# Тело задачи
В этом примере задача будет выполняться каждые 5 минут, используя объект timedelta.
Если нужно выполнить задачу только один раз через определенное время, можно использовать метод apply_async с параметром eta, указывающим время, когда задача должна быть запущена:
from datetime import datetime, timedelta
from myapp.tasks import my_task
eta = datetime.utcnow() + timedelta(minutes=30)
my_task.apply_async(args=[arg1, arg2], eta=eta)
В этом примере задача my_task будет выполнена через 30 минут от текущего момента.
Для создания периодических задач используется понятие "расписания" (schedule). Расписание определяет, как часто и когда задача должна выполняться.
Для создания расписания можно использовать модуль celery.schedules. В этом модуле определены несколько типов расписаний: crontab, schedule, timedelta и clock.
Пример создания расписания с помощью crontab:
from celery.schedules import crontab
from myapp.tasks import my_task
schedule = crontab(hour=4, minute=30)
my_task.apply_async(args=[arg1, arg2], eta=schedule)
В этом примере задача my_task будет запланирована на выполнение ежедневно в 4:30 утра.
Пример создания расписания с помощью schedule:
from celery.schedules import schedule
from myapp.tasks import my_task
every_five_seconds = schedule(run_every=5)
my_task.apply_async(args=[arg1, arg2], eta=every_five_seconds)
В этом примере задача my_task будет запланирована на выполнение каждые 5 секунд.
Пример создания расписания с помощью timedelta:
from datetime import timedelta
from celery.schedules import schedule
from myapp.tasks import my_task
every_hour = timedelta(hours=1)
my_schedule = schedule(run_every=every_hour)
my_task.apply_async(args=[arg1, arg2], eta=my_schedule)
В этом примере задача my_task будет запланирована на выполнение каждый час.
Можно использовать несколько типов расписаний для создания более сложных расписаний. Например:
from celery.schedules import crontab, schedule
from myapp.tasks import my_task
schedule1 = crontab(hour=4, minute=30)
schedule2 = schedule(run_every=60)
my_schedule = schedule1 | schedule2
my_task.apply_async(args=[arg1, arg2], eta=my_schedule)
В этом примере задача my_task будет запланирована на выполнение ежедневно в 4:30 утра, а также каждые 60 секунд.
Мониторинг и отладка
Для мониторинга задач в реальном времени можно использовать инструменты, предоставляемые Celery, такие как Flower или celerymon.
Flower - это веб-интерфейс для мониторинга и администрирования Celery. Он позволяет просматривать статистику выполнения задач, мониторить очереди, управлять процессами worker'ов, просматривать логи и многое другое. Для запуска Flower необходимо выполнить команду celery flower в консоли.
Еще один способ мониторинга - celerymon, это CLI-интерфейс для мониторинга Celery, предоставляемый в составе пакета celery. Он позволяет просматривать текущее состояние очередей, worker'ов, задач и т.д. Для запуска celerymon необходимо выполнить команду celery -A <имя проекта> events.
Пример использования Flower:
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# urls.py
from django.urls import path
from flower.views import FlowerView
urlpatterns = [
path('flower/', FlowerView.as_view(), name='flower'),
]
После запуска Celery и Flower, можно открыть в браузере страницу http://localhost:5555/, на которой будет доступна веб-интерфейс Flower.
Пример использования celerymon:
$ celery -A myproject events
После запуска celerymon, в консоли будут отображаться текущие события, связанные с выполнением задач в Celery. Например, можно увидеть, когда задача была запущена и завершена, а также ее статус.
При работе с Celery, как и с любым другим инструментом, могут возникать ошибки. Для отладки их необходимо использовать различные инструменты и методы.
Один из основных инструментов для отладки ошибок в Celery - это логирование. В Celery встроена поддержка логирования событий, происходящих в системе. В логах можно найти информацию о запуске и завершении задач, а также об ошибках, возникших в процессе выполнения. Для включения логирования в Celery необходимо настроить соответствующий логгер.
Также можно использовать отладочный режим Celery, который позволяет выполнить задачу в отладочном режиме. В этом режиме задача выполняется на локальной машине, а не на удаленном сервере, что упрощает отладку.
Пример отладки ошибки:
# tasks.py
from mysite.celery import app
@app.task
def divide(x, y):
result = None
try:
result = x / y
except Exception as e:
# Логируем ошибку
app.logger.error(str(e))
return result
В этом примере, если произойдет ошибка при делении, то она будет записана в лог. Для просмотра логов можно использовать команду celery -A tasks worker --loglevel=INFO.
Также можно запустить задачу в отладочном режиме и использовать pdb:
# tasks.py
from mysite.celery import app
@app.task
def divide(x, y):
result = None
try:
result = x / y
except Exception as e:
# Используем pdb для отладки
import pdb; pdb.set_trace()
return result
При запуске этой задачи, выполнение остановится на строке pdb.set_trace(), и можно будет проанализировать состояние системы с помощью pdb.
Еще один способ отладки ошибок в Celery - это использование инструментов для трассировки (debugging). Например, можно использовать стандартный Python-дебаггер pdb, чтобы остановить выполнение задачи на определенном моменте и проанализировать состояние системы. Для запуска приложения Django с Celery в режиме Debug можно использовать следующий пример конфигурации в файле settings.py:
# settings.py
DEBUG = True
# ...
if DEBUG:
# Celery
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True
CELERY_TASK_IGNORE_RESULT = True
Эти настройки позволяют запустить задачи в режиме синхронного выполнения, то есть без использования очереди и воркеров Celery. Это может быть полезно для отладки и тестирования, когда вы не хотите запускать отдельный процесс для воркера Celery.
Однако, обратите внимание, что в этом режиме некоторые функциональности Celery могут работать некорректно, например, задачи не будут перезапускаться в случае сбоя воркера. Поэтому рекомендуется использовать этот режим только для тестирования и отладки, а не для работы в боевой среде.
Заключение
Celery - это инструмент для обработки задач в фоновом режиме в Django-приложениях. Он позволяет разделить основной поток выполнения от обработки тяжелых задач, повышая производительность и отзывчивость приложения. Celery поддерживает очереди задач, отложенное выполнение, приоритеты, планирование и мониторинг выполнения задач.
Использование Celery с Redis как брокером сообщений обеспечивает быстрый и надежный обмен сообщениями между различными процессами, а Django обеспечивает интеграцию с приложением и удобный доступ к базе данных и другими объектами.
Знание основных функций и возможностей Celery позволяет эффективно организовать работу с задачами в Django-приложениях и улучшить производительность вашего приложения.