from functools import wraps, WRAPPER_ASSIGNMENTS
from smtplib import SMTPException
from celery import Celery
from celery.exceptions import Reject
from celery.app.task import BaseTask
from celery.result import AsyncResult
from django.conf import settings
from django.apps import apps
from .utils import import_class, send_template_email_handler, Lock, translate as _
celery_app: Celery = import_class(
settings.WORKER_OPTIONS['app'].replace(':', '.') # type: ignore
)
class TaskMeta(type):
def __new__(mcs, name, bases, attrs, uniq=None):
task_class = super(TaskMeta, mcs).__new__(mcs, name, bases, attrs)
task_class.run = mcs.get_notificator_decorator(task_class.run)
if uniq:
task_class.run = mcs.get_unique_decorator(task_class.run)
task_class.apply_async = mcs.get_unique_decorator(task_class.apply_async, is_apply=True)
elif hasattr(task_class.run, '__uniq_wrapped__') and uniq is not None:
task_class.run = task_class.run.__uniq_wrapped__
task_class.apply_async = task_class.apply_async.__uniq_wrapped__
return task_class
@staticmethod
def get_notificator_decorator(func):
if getattr(func, '__notify_wrapped__', False):
return func
@wraps(func, assigned=WRAPPER_ASSIGNMENTS+('__notify_wrapped__',))
def wrapper(self, *args, **kwargs):
notifier = (
getattr(self, '__notifier__', None) or
apps.get_app_config('vstutils_api').module.notificator_class([])
)
with notifier:
self.__notifier__ = notifier
result = func(self, *args, **kwargs)
self.__notifier__ = None
return result
wrapper.__notify_wrapped__ = True
return wrapper
@staticmethod
def get_unique_decorator(func, is_apply=False):
if getattr(func, '__uniq_wrapped__', False):
return func
@wraps(func, assigned=WRAPPER_ASSIGNMENTS+('__uniq_wrapped__',))
def wrapper(self, *args, **kwargs):
# pylint: disable=protected-access
if not is_apply and self._get_app().conf.task_always_eager:
return func(self, *args, **kwargs)
try:
with Lock(
f'uniq-celery-task-{self.name}',
err_msg=_("This task is currently performed by another worker.")
):
return func(self, *args, **kwargs)
except Lock.AcquireLockException as err:
raise Reject(str(err), requeue=False) from err
wrapper.__uniq_wrapped__ = func
return wrapper
[docs]class TaskClass(BaseTask, metaclass=TaskMeta):
"""
Wrapper for Celery BaseTask class. Usage is same as Celery standard class, but you can execute task without
creating instance with :meth:`TaskClass.do` method.
Example:
.. sourcecode:: python
from vstutils.environment import get_celery_app
from vstutils.tasks import TaskClass
app = get_celery_app()
class Foo(TaskClass):
def run(*args, **kwargs):
return 'Foo task has been executed'
app.register_task(Foo())
Now you can call your task with various methods:
- by executing ``Foo.do(*args, **kwargs)``
- get registered task instance like that - app.tasks['full_path.to.task.class.Foo']
Also you can make your registered task periodic, by adding it to CELERY_BEAT_SCHEDULE in settings.py:
.. sourcecode:: python
CELERY_BEAT_SCHEDULE = {
'foo-execute-every-month': {
'task': 'full_path.to.task.class.Foo',
'schedule': crontab(day_of_month=1),
},
}
"""
# pylint: disable=abstract-method
@property
def name(self):
"""
property for proper Celery task execution, needed for :meth:`TaskClass.do` method to work
"""
return self.__name__
@property
def __name__(self):
return f'{self.__class__.__module__}.{self.__class__.__name__}'
[docs] @classmethod
def do(cls, *args, **kwargs) -> AsyncResult:
"""
Method which send signal to celery for start remote task execution.
All arguments will passed to the task :meth:`TaskClass.run` method.
"""
return cls().delay(*args, **kwargs)
class SendEmailMessage(TaskClass):
"""
Task for sending bulk emails, all args and kwargs are passed to :func:`vstutils.utils.send_template_email_handler`.
Usually you don't need to call this manually, this is called by :func:`vstutils.utils.send_template_email`.
"""
ignore_result = True
def run(self, *args, **kwargs):
try:
send_template_email_handler(*args, **kwargs)
except SMTPException as exc:
raise self.retry(
exc=exc,
max_retries=settings.SEND_EMAIL_RETRIES,
countdown=settings.SEND_MESSAGE_RETRY_DELAY
)
celery_app.register_task(SendEmailMessage()) # type: ignore