В Яндекс.Облаке есть сервисы, позволяющее запустить обработку задач из очереди сообщений. Celery из коробки умеет работать с аналогичными сервисами в Amazon Web Services, а для работы с Яндекс.Облаком нужно лишь немного поправить конфигурацию. Именно об этом и будет эта статья.
Заодно мы развернем приложение в Serverless Containers - сервисе Яндекс.Облака, который запускает Docker-контейнер только в тот момент, когда происходит обращение к нему. И уместимся в Serverless Free Tier, чтобы приложение работало почти бесплатно.
Запуск контейнера
Для того, чтобы приложение корректно запустилось в Serverless Containers, нужно добавить переменную окружения PORT
, которая будет устанавливать порт для запуска приложения. Итоговый Dockerfile для Django может выглядеть вот так:
1 |
|
Далее собираем контейнер:
docker build -t myproject .
- собираем образdocker run -ti --rm -p 8000:8000 myproject
- запускаем контейнер- открываем в браузере http://localhost:8000 и проверяем, кто приложение работает правильно.
Далее по инструкции Яндекс.Облака запускаем контейнер.
Подключение к очереди сообщений
Celery - инструмент для работы с очередями сообщений на Python. Он хорошо интегрируется с Django, но также может использоваться и без него.
Celery поддерживает несколько брокеров сообщений, но нас интересует Amazon SQS в первую очередь, так как Yandex Message Queue имеет поддержку Amazon SQS API.
Первым делом создаем Message Queue по официальной инструкции.
По умолчанию Celery настроен на работу с Amazon SQS. Чтобы настроить Celery для работы с Yandex Message Queue, немного поменяем settings.py
:
1 |
|
Указываем переменные окружения c заменой {ПЕРЕМЕННЫХ}
на значения, полученные при создании очереди:
1 |
|
Для указания переменных окружения можно использовать модуль dotenv и файл .env
. Если вы тестируете работу сразу в Яндекс.Облаке, то переменные окружения можно указать при создании новой версии контейнера.
Устанавливаем дополнение Celery для работы с SQS:
- pip:
pip install celery[sqs]
- poetry:
poetry add 'celery[sqs]'
После этого запускаем проект и пробуем добавить задачу в очередь (например, вызываем какой-нибудь URL, в котором происходит task.delay()
).
Запуск воркеров через POST-запрос
Организовать обработку сообщений, поступающих в очередь, можно несколькими способами
- запустить celery worker, который будет сам забирать сообщения и обрабатывать их
- настроить триггер в Yandex Message Queue, который будет вызывать Cloud Function или делать HTTP-запрос куда-либо при появлении нового сообщения.
Мы воспользуемся вторым способом и настроим HTTP-вызов при появлении сообщения в очередь. Чтобы этот вызов обрабатывался, добавим новый view в Django.
TLDR: ниже есть код готового обработчика запросов для Django
Тело HTTP-запроса с сообщением от Celery выглядит примерно так (некоторые данные скрыты):
1 |
|
В этом сообщении нас интересует BODY
- в нем содержится json, который отправил Celery, закодированный в base64. Если его раскодировать, мы увидим примерно следующее:
1 |
|
Вместо TASK
выше будет что-то типа appname.tasks.function_name
(т. е. путь до функции-обработчика задачи). BODY2
- это опять json в base64 с аргументами команды:
1 |
|
В нем - tuple с тремя аргументами: args, kwargs, options
. Эта информация должна быть передана в таск для того, чтобы вызвать его с правильными параметрами.
В итоге код обработчика запросов выглядит так:
1 |
|
Такая обработка задач поддерживает не все возможности Celery, но позволяет запустить простейшую обработку в Serverless Containers
Подключаем его в urls.py
:
1 |
|
Создание триггера
Конкретные шаги по созданию триггера для вызова контейнера при получении сообщения в очереди описаны в официальной инструкции.
Подскажу некоторые параметры, которые нужно указывать при создании триггера, на примере команды для консоли:
1 |
|
замените параметры:
NAME
- название триггераQUEUE
- arn очереди (строка видаyrn:yc:ymq:REGION:FOLDER_ID:QUEUE_NAME
, можно скопировать на странице очереди в консоли Яндекс.Облака)SERVICE_ACCOUNT_ID
- ID сервисного аккаунтаCONTAINER_ID
- ID контейнераCONTAINER_PATH
- URL-адрес в контейнере, который занимается обработкой сообщений из очереди (в нашем случае - это/worker/
)
Можете снова запустить создание задачи в Celery, через некоторое время она должна обработаться. Информацию об обработке можно посмотреть в логах контейнера и в мониторинге очереди в консоли Яндекс.Облака.
Сохранение результатов обработки задач
Бывают задачи, когда важно получить информацию об итогах обработки задачи в Celery.
Поэтому Celery может сохранять все данные, которые функции с задачами возвращают после своей работы. Если для вашего проекта это не применимо, можете пропустить этот раздел.
Механизм сохранения работает на базе celery result backends. Нас сейчас интересует AWS DynamoDB backend, так как в Яндексе есть Yandex Database Serverless, поддерживающая DynamoDB API.
Создаем Yandex DB Serverless по официальной инструкции. Serverless-режим тарифицируется по использованию и бесплатные квоты.
Устанавливаем дополнение Celery для работы с DynamoDB:
- pip:
pip install celery[sqs,dynamodb]
- poetry:
poetry add 'celery[sqs,dynamodb]'
Добавляем новую константу в settings.py
:
1 |
|
Устанавливаем значение этой переменной из параметра Document API Endpoint (доступен на странице Yandex DB в консоли Яндекс.Облака).
Поясню про принудительное сохранение результатов в коде обработчика задачи. Дело в том, что принудительный вызов Celery-задач с помощью apply()
возвращает EagerResult
, который по умолчанию не должен сохраняться. В нашей ситуации, наоборот, нужно сохранять такие результаты, поэтому мы сначала устанавливаем настройку в нужное нам положение, а потом возвращаем все как было, чтобы не мешать другим процессам, происходящим в приложении.
В целом, настройка связи с result backend завершена, можете попробовать запустить задачу с возвращением результатов и проверить, что эти результаты действительно возвращаются.
Настройка домена
В Яндекс.Облаке есть API Gateway для соединения нескольких сервисов под один хост. У нас по факту лишь один сервис, который предназначен для общения с внешним миром (это наш serverless container, развернутый в самом начале).
С API Gateway, кроме всего прочего, удобно подсоединять домены к развернутым в облаке приложениям.
Создадим API Gateway по официальной инструкции, но в конфигурации укажем примерно следующее:
1 |
|
Здесь указано, что все запросы, приходящие на API Gateway, будут переадресовываться на serverless container. Нужно заменить некоторые параметры:
PROJECT NAME
- название проектаCONTAINER_ID
- ID контейнераSERVICE_ACCOUNT_ID
- ID сервисного аккаунта
После создания появится адрес API Gateway, по которому можно будет обращаться к нашему приложению извне.
Теперь можно подключить свой домен и настроить HTTPS сертификат с помощью Let’s Encrypt прямо в консоли Яндекс.Облака.
Free Tier
В начале я сказал, что почти всё использование сервисов, описанных в этой статье, будет бесплатным благодаря Free Tier.
Действительно, Serverless Containers, Message Queue, YDB Serverless, API Gateway имеют определенный объем услуг, который не тарифицируется.
Yandex Database может работать на выделенных серверах, и тарифицироваться это будет значительно дороже. Мы использовали Serverless-вариант, входящий в Free Tier.
Используемый нами Yandex Container Registry тарифицируется. На момент написания статьи 3 руб/месяц за 1ГБ хранения. Чтобы сэкономить и здесь, можно настроить автоудаление образов, так как после деплоя контейнера они больше будут не нужны.
В статье не рассматривалось подключение к PostgreSQL/MySQL. Можете ознакомиться с их стоимостью на сайте Яндекс.Облака и принять решение об использовании их в своем проекте.
Автоматизация
Автоматизировать этот процесс можно, например, с помощью Terraform. У Яндекс.Облака есть провайдер, позволяющий настроить большинство сервисов.
На момент написания статьи не было возможности заставить Message Queue trigger запускать serverless container, поэтому мне пришлось вызывать консольную команду через yc
для этого.