Задача Поваренная книга¶
Обеспечение выполнения задания только по одному разу¶
Этого можно добиться с помощью замка.
В этом примере мы будем использовать кэш-фреймворк для установки блокировки, доступной для всех рабочих.
Это часть воображаемого импортера RSS-каналов под названием djangofeeds. Задача принимает URL канала в качестве единственного аргумента и импортирует этот канал в модель Django под названием Feed. Мы гарантируем, что два или более рабочих не смогут импортировать одну и ту же ленту одновременно, устанавливая ключ кэша, состоящий из контрольной суммы MD5 URL ленты.
Ключ кэша истекает через некоторое время на случай, если случится что-то непредвиденное, а оно всегда случается…
По этой причине время выполнения ваших задач не должно превышать таймаут.
Примечание
Для того чтобы это работало правильно, вам необходимо использовать кэш-бэкенд, в котором операция .add
является атомарной. Известно, что memcached
хорошо работает для этой цели.
import time
from celery import task
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed
logger = get_task_logger(__name__)
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = time.monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@task(bind=True)
def import_feed(self, feed_url):
# The cache key consists of the task name and the MD5 digest
# of the feed URL.
feed_url_hexdigest = md5(feed_url).hexdigest()
lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
logger.debug('Importing feed: %s', feed_url)
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
return Feed.objects.import_feed(feed_url).url
logger.debug(
'Feed %s is already being imported by another worker', feed_url)