Login

Decorator to execute a method only once

Author:
atodorov-otb
Posted:
December 11, 2013
Language:
Python
Version:
1.6
Score:
0 (after 0 ratings)

Beware if using Amazon Simple Queue Service to execute Celery tasks which send email messages! Sometimes SQS messages are duplicated which results in multiple copies of the messages being sent. This is a simple decorator which uses a cache backend to prevent the task from executing twice in a specified period. For example:

@task
@execute_once_in(3600*24*7)
def cron_first_week_follow_up():
    """
        Send a follow-up email to new users!
    """
    pass

For more info see http://atodorov.org/blog/2013/12/06/duplicate-amazon-sqs-messages-cause-multiple-emails/

http://atodorov.org/blog/2013/12/11/idempotent-django-email-sender-with-amazon-sqs-and-memcache/

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from functools import wraps
from django.core.cache import cache

def execute_once_in(seconds):
    """
    This decorator wraps a normal function
    so that it can be executed only once in the next few seconds.

    Useful to make Celery email sending tasks idempotent and safeguard
    against SQS messages delivered twice (in rare cases).

    Usage:

    @task
    @execute_once_in(3600)
    def myfunction():
        pass

    If called multiple times, the above method will be executed only one
    time during the next hour following its first execution.
    """

    def decorator(func):
        def inner_decorator(*args, **kwargs):
            key = "%s.%s" % (func.__module__, func.__name__)
            key = key.replace(' ','_') # memcache doesn't like spaces

            # NB: there's no way to tell if
            # func() didn't execute or returned nothing
            if cache.get(key):
                return

            cache.set(key, True, seconds)
            return func(*args, **kwargs)


        return wraps(func)(inner_decorator)

    return decorator

More like this

  1. Template tag - list punctuation for a list of items by shapiromatron 2 months, 2 weeks ago
  2. JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 2 months, 3 weeks ago
  3. Serializer factory with Django Rest Framework by julio 9 months, 2 weeks ago
  4. Image compression before saving the new model / work with JPG, PNG by Schleidens 10 months, 1 week ago
  5. Help text hyperlinks by sa2812 11 months ago

Comments

romank0 (on October 5, 2016):

This solution has race conditions.

Use case 1

  1. the first celery task is started on SQS message receiving
  2. it sets a value to cache
  3. something goes wrong and task fails

Result: now the task will not be retried until the period specified in execute_once_in is finished.

Use case 2

  1. an SQS messages comes in
  2. the duplicate of SQS message comes in
  3. the first celery task is started for the first message
  4. the second celery task is started for the duplicate
  5. both tasks get False on cache.get
  6. both tasks proceed

The chances to get this increases if celery is configured to have many workers and SQS messages came close in time to each others.

Result: two emails are sent

#

Please login first before commenting.