from functools import wraps, WRAPPER_ASSIGNMENTS
from smtplib import SMTPException
from celery import Celery
from celery.signals import worker_process_init, worker_process_shutdown
from celery.exceptions import Reject
from celery.app.task import BaseTask
from celery.result import AsyncResult
from django.conf import settings
from django.apps import apps
from django.utils.module_loading import import_string
from .utils import send_template_email_handler, Lock, translate as _
celery_app: Celery = import_string(
settings.WORKER_OPTIONS['app'].replace(':', '.')
)
notificator = None
def get_notificator():
return apps.get_app_config('vstutils_api').module.notificator_class([])
@worker_process_init.connect
def init_notificator(*_, **__):
# pylint: disable=global-statement
global notificator
notificator = get_notificator() # nocv
@worker_process_shutdown.connect
def destruct_notificator(*_, **__):
# pylint: disable=global-statement
global notificator
if notificator is not None: # nocv
del notificator
class TaskMeta(type):
def __new__(mcs, name, bases, attrs, uniq=None):
task_class = super(TaskMeta, mcs).__new__(mcs, name, bases, attrs)
task_class.run = mcs.get_notificator_decorator(task_class.run)
if uniq:
task_class.run = mcs.get_unique_decorator(task_class.run)
task_class.apply_async = mcs.get_unique_decorator(task_class.apply_async, is_apply=True)
elif hasattr(task_class.run, '__uniq_wrapped__') and uniq is not None:
task_class.run = task_class.run.__uniq_wrapped__
task_class.apply_async = task_class.apply_async.__uniq_wrapped__
return task_class
@staticmethod
def get_notificator_decorator(func):
if getattr(func, '__notify_wrapped__', False):
return func
@wraps(func, assigned=WRAPPER_ASSIGNMENTS+('__notify_wrapped__',))
def wrapper(self, *args, **kwargs):
with notificator or get_notificator() as notifier:
self.__notifier__ = notifier
result = func(self, *args, **kwargs)
self.__notifier__ = None
return result
wrapper.__notify_wrapped__ = True
return wrapper
@staticmethod
def get_unique_decorator(func, is_apply=False):
if getattr(func, '__uniq_wrapped__', False):
return func
@wraps(func, assigned=WRAPPER_ASSIGNMENTS+('__uniq_wrapped__',))
def wrapper(self, *args, **kwargs):
# pylint: disable=protected-access
if not is_apply and self._get_app().conf.task_always_eager:
return func(self, *args, **kwargs)
try:
with Lock(
f'uniq-celery-task-{self.name}',
err_msg=_("This task is currently performed by another worker.")
):
return func(self, *args, **kwargs)
except Lock.AcquireLockException as err:
raise Reject(str(err), requeue=False) from err
wrapper.__uniq_wrapped__ = func
return wrapper
[docs]class TaskClass(BaseTask, metaclass=TaskMeta):
"""
Wrapper for Celery BaseTask class. Usage is same as Celery standard class, but you can execute task without
creating instance with :meth:`TaskClass.do` method.
Example:
.. sourcecode:: python
from vstutils.environment import get_celery_app
from vstutils.tasks import TaskClass
app = get_celery_app()
class Foo(TaskClass):
def run(*args, **kwargs):
return 'Foo task has been executed'
app.register_task(Foo())
Now you can call your task with various methods:
- by executing ``Foo.do(*args, **kwargs)``
- get registered task instance like that - app.tasks['full_path.to.task.class.Foo']
Also you can make your registered task periodic, by adding it to CELERY_BEAT_SCHEDULE in settings.py:
.. sourcecode:: python
CELERY_BEAT_SCHEDULE = {
'foo-execute-every-month': {
'task': 'full_path.to.task.class.Foo',
'schedule': crontab(day_of_month=1),
},
}
"""
# pylint: disable=abstract-method
@property
def name(self):
"""
property for proper Celery task execution, needed for :meth:`TaskClass.do` method to work
"""
return self.__name__
@property
def __name__(self):
return f'{self.__class__.__module__}.{self.__class__.__name__}'
[docs] @classmethod
def do(cls, *args, **kwargs) -> AsyncResult:
"""
Method which send signal to celery for start remote task execution.
All arguments will passed to the task :meth:`TaskClass.run` method.
"""
return cls().delay(*args, **kwargs)
class SendEmailMessage(TaskClass):
"""
Task for sending bulk emails, all args and kwargs are passed to :func:`vstutils.utils.send_template_email_handler`.
Usually you don't need to call this manually, this is called by :func:`vstutils.utils.send_template_email`.
"""
ignore_result = True
# Message context usually contains language object witch is not serializable by msgpack.
serializer = 'pickle'
def run(self, *args, **kwargs):
try:
send_template_email_handler(*args, **kwargs)
except SMTPException as exc:
raise self.retry(
exc=exc,
max_retries=settings.SEND_EMAIL_RETRIES,
countdown=settings.SEND_MESSAGE_RETRY_DELAY
)
celery_app.register_task(SendEmailMessage())