Основная цель этой публикации - показать наиболее важные проблемы, с которыми я столкнулся при работе с задачами Celery. Я воспользуюсь простым случаем, чтобы описать наиболее распространенные проблемы, с которыми вы можете столкнуться при работе с Celery.

Предпосылки:

  • Основы веб-фреймворка Python (например, Django / Flask / Pyramid)
  • Основы библиотеки сельдерея (рабочий, брокер, задержки, повторные попытки, подтверждение задачи)
  • Знание базы данных (ORM, транзакции, блокировка чтения)
  • Знакомство с использованием Redis в качестве брокера Celery

Дело

Представьте, что мы реализуем приложение для интернет-магазина. Давайте сосредоточимся на компоненте, отвечающем за регистрацию новых пользователей и отправку приветственных писем после успешной регистрации.

Я буду использовать примеры кода, подобные Django, потому что они короткие и легко демонстрируют общую идею, которая может быть применена к другим веб-фреймворкам. Упрощенный код для create_user обработчика запросов может выглядеть так:

Во-первых, мы используем ORM для создания нового User объекта, содержащего всю важную информацию (имя, адрес электронной почты). Затем мы вызываем метод send_email, который отправляет пользователю приветственное сообщение через SMTP-сервер. Наконец, мы визуализируем HTML-страницу и отправляем ее пользователю. Мы также используем transaction.commit_on_success decorator для автоматической фиксации объекта User в базе данных, если функция create_user возвращается без создания исключений.

Тщательно выбирайте аргументы функции задачи

Как вы, вероятно, знаете, отправка электронной почты через SMTP может вызвать несколько проблем, связанных с длительным временем ответа сервера или временной недоступностью сервера. Давайте извлечем send_email логику в задачу Celery, делегируя логику отправки электронной почты отдельному процессу. create_user вернется быстрее, независимо от возможных проблем с отправкой электронного письма. После этого извлечения наш код должен выглядеть следующим образом:

Вроде бы все аккуратно и просто, но есть небольшая проблема. Мы передаем объект User в send_welcome_email_task. Это объект ORM, привязанный к текущему соединению с базой данных. Задача будет запущена исполнителем в другом процессе, используя другое соединение с базой данных, поэтому не рекомендуется передавать весь User объект между процессами.

Кроме того, тем временем объект User может быть изменен, и работник Celery будет работать с устаревшей версией. Ваш объект также может содержать несколько более сложных типов полей, таких как datetime, которые нельзя сериализовать в JSON (наиболее часто используемый сериализатор в Celery). Но не волнуйтесь, мы можем решить эту проблему, передав user.id вместо user в качестве аргумента задачи:

Нет проблем с сериализацией user.id, потому что теперь это простой int. user извлекается из базы данных непосредственно id в начале функции, поэтому мы уверены, что задача Celery работает с последней версией объекта user.

Извлеченный урок: используйте только простые типы в качестве аргументов функции задачи.

Помните, что состояние вашего объекта может измениться после отправки задачи брокеру. Это особенно важно, когда две задачи работают с одним и тем же объектом. Второй должен знать о возможных изменениях объекта первым.

Извлеченный урок: всегда получайте самую новую версию объекта во время выполнения задачи.

Отправка задачи брокеру и транзакции в базе данных

Давайте двигаться дальше. Подумайте о случае, когда функция render_html работает очень медленно (например, плохо реализована). Если это так, возможно, возникнет состояние гонки. Работник сельдерея может начать выполнение задачи до того, как create_user завершит и зафиксирует user объект в базе данных. В этом случае send_welcome_email_task вызовет исключение типа «Пользовательский объект не найден в базе данных для данного идентификатора». Этот сценарий также может реализоваться, если после отправки задачи брокеру Celery запускается какая-то длительная операция.

Как решить эту проблему? Первая идея может заключаться в том, чтобы отложить момент отправки задачи брокеру, например, на 2 секунды, чтобы быть уверенным, что функция вернется до того, как работник Celery получит задачу от брокера:

send_welcome_email_task.apply_async(args=(user.id,), countdown=2)

Это довольно простое исправление, очевидно, не сработает, если render_html выполняется более 2 секунд. Это вызовет ненужную задержку отправки электронного письма.

Мы должны найти лучшее решение. Мы должны быть уверены, что задача Celery отправлена ​​брокеру не раньше, чем транзакция базы данных будет зафиксирована. Для этого мы будем использовать декоратор transaction.commit_manually, чтобы иметь полный контроль над транзакцией базы данных.

С этими новыми настройками задача отправляется в Celery после фиксации транзакции, поэтому для состояния гонки нет места. У этой реализации есть еще одно преимущество: задача отправляется брокеру только в том случае, если транзакция завершена успешно и в функции create_user не возникает исключения. Раньше, если происходило исключение (например, во время render_html функции или другой бизнес-логики) электронное письмо отправлялось пользователю, что не соответствовало нашим ожиданиям.

Совет: для больших проектов лучший способ решить эту проблему - расширить базовый класс Celery Task и использовать события транзакции для отправки задачи сразу после фиксации. Это позволит автоматизировать этот процесс для всех видов задач, которые работают с базой данных, и сделать код намного чище (см. Это сообщение в блоге для примера Django).

Извлеченный урок: отправляйте задачу брокеру только тогда, когда функция не вызывает
каких-либо исключений и после того, как транзакция зафиксирована.

Повторная попытка задачи может быть сложной

Давайте немного усложним ситуацию. Если есть временная проблема с SMTP-сервером, было бы неплохо повторить отправку электронной почты через некоторое время. Мы можем использовать функцию повторных попыток Celery, чтобы реализовать это. Для этого требуется добавить два параметра в декоратор задачи Celery и добавить блок try-except:

Теперь, если во время отправки электронного письма возникает какая-либо проблема, задача будет повторяться каждые 60 секунд, пока она не будет завершена успешно или не удастся (максимум 120 раз). Теоретически это должно работать без проблем… но не будет, если мы будем использовать Redis в качестве брокера. Почему?

Redis использует параметр visibility_timeout, который определяет ограничение по времени, когда работник должен подтвердить, что задача завершилась успешно или не удалось. В противном случае задача считается потерянной и повторно отправляется работнику (не обязательно тому же, если несколько из них работают параллельно) еще раз. Значение по умолчанию для visibility_timeout установлено на 1 час. В нашем случае это означает, что если письмо не будет отправлено в течение 1 часа, задача будет повторно отправлена ​​работнику. Однако задача не теряется, а ждет внутри воркера еще одной попытки. В результате мы получим две одинаковые задачи. Если оба, наконец, добьются успеха, пользователь получит 2 письма, что не является желаемым поведением.

Совет. Если вы хотите избежать проблем, связанных с visibility_timeout, вы также можете рассмотреть возможность использования RabbitMQ в качестве брокера.

Для получения дополнительной информации о visibility_timeout прочтите Документацию по сельдерею и Проблема с GitHub.

Первое решение, которое приходит на ум, - увеличить таймаут видимости, например, до 2 часов. Но что, если у нас есть другие задачи, требующие повторных попыток более 2 часов? Что делать, если мы не хотим ждать более 1 часа, чтобы повторно отправить задачу исполнителю, если задача действительно потеряна (например, из-за недоступности рабочей сети)?

Лучшее решение - написать код задачи таким образом, чтобы независимо от того, сколько раз мы запускали задачу, основная логика (в нашем случае отправка электронного письма) выполнялась не более одного раза. Это называется идемпотентностью задачи. Чтобы сделать нашу задачу идемпотентной, мы должны выполнить два шага.

Во-первых, в начале задачи мы должны проверить, отправлено ли уже электронное письмо. Для этого мы добавим поле is_welcome_email_sent bool в класс модели User ORM. Если это True, функция задачи должна возвращаться, ничего не делая. В противном случае он должен отправить электронное письмо и установить для is_welcome_email_sent значение True.

Во-вторых, мы должны защитить код от состояния гонки, которое может произойти, когда две задачи пытаются одновременно проверить значение is_welcome_email_sent. Если это произойдет, электронное письмо будет отправлено дважды. Мы будем использовать функцию блокировки чтения select_for_update(), чтобы быть уверенным, что один user объект одновременно обрабатывается максимально одним исполнителем. Другая задача должна будет ждать select_for_update(), пока первая не снимет блокировку (зафиксирует транзакцию). Решение должно выглядеть так:

Даже если задача будет распределена между несколькими рабочими, только первый выполнит работу. Остальные задачи будут только проверять, что работа уже выполнена и завершена, не вызывая нежелательных эффектов.

Извлеченный урок: Сделайте задачу идемпотентной, чтобы быть уверенным, что если работа выполняется только один раз.

Позднее признание

Когда все наши задачи идемпотентны, мы получаем еще один бонус. Мы можем включить «позднее подтверждение»:

@app.task(bind=True, default_retry_delay=60, max_retries=120, acks_late=True)

По умолчанию исполнитель подтверждает задачу сразу после ее получения от брокера (перед выполнением задачи). Это называется «раннее признание». Это означает, что если воркер выйдет из строя в середине выполнения, задача не будет запущена снова, потому что она считается выполненной. В результате задача никогда не будет завершена.

После включения «позднего подтверждения» задача подтверждается работником сразу после выполнения. Если во время этого процесса рабочий выйдет из строя, задача будет запущена еще раз. Это хорошее решение, потому что мы можем быть уверены, что весь код задачи был полностью выполнен хотя бы один раз. Есть еще один недостаток: часть кода задачи могла выполняться дважды. К счастью, наша задача идемпотентна, поэтому мы защищены от выполнения нескольких задач «бизнес-логики».

Извлеченный урок: используйте «позднее подтверждение» для идемпотентных задач, чтобы защитить их от неполного выполнения.

Заключение

Как видите, даже довольно простую задачу Celery не так-то просто написать. Особенно, если вы хотите быть на 100% уверенным, что работа будет выполнена при любых обстоятельствах. Надеюсь, эта статья поможет вам сделать ваши задачи более надежными.

Если вам понравился этот пост, нажмите кнопку хлопка ниже 👏👏👏

Вы также можете подписаться на нас в Facebook, Twitter и LinkedIn.