Source code for vstutils.api.decorators

# pylint: disable=protected-access
import typing as _t
from collections import OrderedDict
from inspect import getmembers

import orjson
from django.db import transaction
from rest_framework.decorators import action
from rest_framework import response, status
from drf_yasg.utils import swagger_auto_schema

from .filter_backends import DeepViewFilterBackend
from ..exceptions import VSTUtilsException
from .. import utils


empty = object()


def ensure_is_object(obj):
    if isinstance(obj, (str, bytes)):
        return orjson.loads(obj)
    return obj


def _is_extra_action(attr):
    return hasattr(attr, 'mapping')


def _create_with_iteration(view, get_serializer_func, nested_manager, data):
    serializer = get_serializer_func(data=data)
    serializer.is_valid(raise_exception=True)

    if hasattr(nested_manager, 'field'):
        # on fk relations
        serializer.validated_data[nested_manager.field.name] = nested_manager.instance
        view.perform_create(serializer)
        return serializer.instance

    elif hasattr(nested_manager, 'content_type_field_name'):
        # on contenttype relations
        content_type_field_name = nested_manager.content_type_field_name
        serializer.validated_data[content_type_field_name] = getattr(nested_manager, content_type_field_name)
        serializer.validated_data[nested_manager.object_id_field_name] = nested_manager.instance.id
        view.perform_create(serializer)
        return serializer.instance

    # other relations (many to many or custom with `.add()` method)
    with transaction.atomic():
        view.perform_create(serializer)
        nested_manager.add(serializer.instance)
    return serializer.instance


def __get_nested_path(
        name,
        arg=None,
        arg_regexp='[0-9]',
        empty_arg=True):
    path = name
    if not arg:
        return path
    path = ''.join([
        path, '/?(?P<', arg, '>', arg_regexp, '*' if empty_arg else "+", ')'
    ])
    return path


def _get_nested_subpath(*args, **kwargs):
    sub_path = kwargs.pop('sub_path', None)
    path = __get_nested_path(*args, **kwargs)
    if sub_path:
        path += '/'
        path += sub_path
    return path


[docs] def subaction(*args, **kwargs): """ Decorator which wrap object method to subaction of viewset. :param methods: List of allowed HTTP-request methods. Default is ``["post"]``. :param detail: Flag to set method execution to one instance. :param serializer_class: Serializer for this action. :param permission_classes: Tuple or list permission classes. :param url_path: API-path name for this action. :param description: Description for this action in OpenAPI. :param multiaction: Allow to use this action in multiactions. Works only with :class:`vstutils.api.serializers.EmptySerializer` as response. :param require_confirmation: Sets whether the action must be confirmed before being executed. :param is_list: Mark this action as paginated list with all rules and parameters. :param title: Override action title. :param icons: Setup action icon classes. """ operation_description = kwargs.pop('description', None) response_code = kwargs.pop('response_code', None) serializer_class = kwargs.get('serializer_class', None) response_serializer = kwargs.pop('response_serializer', serializer_class) is_list = kwargs.pop('is_list', kwargs.get('suffix') == 'List') title = kwargs.pop('title', None) icons = kwargs.pop('icons', None) assert ( (response_code is None) or (response_code is not None and response_serializer is not None) ), "If `response_code` was setted, `response_serializer` should be setted too." is_mul = kwargs.pop('multiaction', False) require_confirmation = kwargs.pop('require_confirmation', False) methods = kwargs['methods'] = kwargs.pop('methods', ['post']) def decorator(func: _t.Callable): func_object = action(*args, **kwargs)(func) override_kw: _t.Dict = {'methods': tuple(func_object.mapping.keys()) or None} # type: ignore if response_code: override_kw['responses'] = { response_code: response_serializer() } if operation_description: override_kw['operation_description'] = operation_description else: override_kw['operation_description'] = str(func.__doc__ or '').strip() # type: ignore override_kw['x-multiaction'] = bool(is_mul) override_kw['x-require-confirmation'] = bool(require_confirmation) override_kw['query_serializer'] = kwargs.get('query_serializer') if title: override_kw['x-title'] = title if icons and isinstance(icons, (tuple, list)): override_kw['x-icons'] = icons if 'GET' in methods or 'get' in methods: override_kw['x-list'] = is_list return swagger_auto_schema(**override_kw)(func_object) # type: ignore return decorator
def get_action_name(master_view, method=''): method = method.lower() if method == 'post': action_name = 'create' elif method == 'get' and not master_view.nested_detail: action_name = 'list' elif method == 'get' and master_view.nested_detail: action_name = 'retrieve' elif method == 'put': action_name = 'update' elif method == 'patch': action_name = 'partial_update' elif method == 'delete': action_name = 'destroy' else: # nocv action_name = None return action_name class NestedViewMixin: def _check_permission_obj(self, objects): for obj in objects: self.check_object_permissions(self.request, obj) def get_queryset(self): qs = self.nested_manager.all() if any( DeepViewFilterBackend in getattr(self, attr, []) for attr in ['pre_filter_backends', 'filter_backends'] ): qs = DeepViewFilterBackend().filter_queryset(self.request, qs, self) for qs_filter in self.queryset_filters: if callable(qs_filter): qs = qs_filter(self.nested_parent_object, qs) return qs def get_nested_action_name(self): return get_action_name(self.master_view, self.request.method) def get_serializer_context(self): context = super().get_serializer_context() return context def perform_destroy(self, instance): purge_nested = self.master_view.request.headers.get('X-Purge-Nested', 'false') == 'true' if self.master_view.nested_allow_append: # pylint: disable=import-outside-toplevel from vstutils.models import notify_clients self.nested_manager.remove(instance, **self.nested_manager_kwargs) notify_clients(instance.__class__, {'pk': instance.pk}) notify_clients(self.nested_parent_object.__class__, {'pk': self.nested_parent_object.pk}) if not self.master_view.nested_allow_append or purge_nested: instance.delete() @transaction.atomic() def dispatch_route(self, nested_sub=None): kwargs = {} if nested_sub: self.action = nested_sub else: self.action = self.get_nested_action_name() if self.action != 'list': kwargs[self.nested_append_arg] = self.nested_id self.check_permissions(self.request) self.check_throttles(self.request) if getattr(self, 'is_main_action', False): getattr(self, 'check_etag', lambda r: None)(self.request) return self.finalize_response( self.request, getattr(self, self.action)(self.request, **kwargs), **kwargs ) class NestedWithoutAppendMixin(NestedViewMixin): def create(self, request, *_, **__): # pylint: disable=unused-argument many = isinstance(request.data, (list, tuple)) return self.perform_create_nested(request.data, self.lookup_field, many) def prepare_request_data(self, request_data, many): return request_data if many else [request_data] def _data_create(self, request_data, nested_append_arg): get_serializer_func, manager = self.get_serializer, self.nested_manager return [ getattr(_create_with_iteration(self, get_serializer_func, manager, d), nested_append_arg) for d in request_data ] def perform_create_nested(self, request_data, nested_append_arg, many): id_list = self._data_create( self.prepare_request_data(request_data, many), nested_append_arg ) qs_filter = {nested_append_arg + '__in': id_list} queryset = self.get_queryset().filter(**qs_filter) if not many: queryset = queryset.get() serializer = self.get_serializer(queryset, many=many) return response.Response(serializer.data, status=status.HTTP_201_CREATED) class NestedWithAppendMixin(NestedWithoutAppendMixin): def _data_create(self, request_data, nested_append_arg): # pylint: disable=import-outside-toplevel from vstutils.models import bulk_notify_clients filter_arg = f'{nested_append_arg}__in' request_data = [ensure_is_object(d) for d in request_data] objects = self.get_queryset().model.objects.filter(**{ filter_arg: (i.get(nested_append_arg) for i in request_data) }) self._check_permission_obj(objects) self.nested_manager.add(*objects, **self.nested_manager_kwargs) id_list = [o.pk for o in objects] label = objects.model._meta.label handler = self.get_serializer_class()().get_fields()[nested_append_arg].to_representation def is_not_created(data): try: return handler(data.get(nested_append_arg, None)) not in id_list except: return True notif_objects = tuple( (label, {'pk': pk}) for pk in id_list ) id_list += super()._data_create( filter(is_not_created, request_data), nested_append_arg ) bulk_notify_clients(objects=notif_objects) return id_list def nested_view_function(master_view, view, view_request, *_, **kw): nested_sub = kw.get('nested_sub', None) view_obj = view( # type: ignore request=view_request, kwargs=view_request.parser_context['kwargs'], headers=master_view.headers, ) master_view.nested_view_object = view_obj master_view.nested_detail = view_obj.nested_detail return view_obj.dispatch_route(nested_sub) class BaseClassDecorator: __slots__ = 'name', 'arg', 'request_arg', 'args', 'kwargs' def __init__(self, name, arg, *args, **kwargs): self.name = name self.arg = arg self.request_arg = kwargs.pop('request_arg', f'{self.name}_{self.arg}') self.args = args self.kwargs = kwargs def setup(self, view_class): # nocv raise NotImplementedError() def __call__(self, view_class): return self.decorator(view_class) def decorator(self, view_class): return self.setup(view_class)
[docs] class nested_view(BaseClassDecorator): # pylint: disable=invalid-name """ By default, DRF does not support nested views. This decorator solves this problem. You need two or more models with nested relationship (Many-to-Many or Many-to-One) and two viewsets. Decorator nests viewset to parent viewset class and generate paths in API. :param name: Name of nested path. Also used as default name for related queryset (see `manager_name`). :type name: str :param arg: Name of nested primary key field. :type arg: str :param view: Nested viewset class. :type view: vstutils.api.base.ModelViewSet, vstutils.api.base.HistoryModelViewSet, vstutils.api.base.ReadOnlyModelViewSet :param allow_append: Flag for allowing to append existed instances. :type allow_append: bool :param allow_bulk: Flag for forbidding bulk queries in related manager add method. :type allow_bulk: bool :param manager_name: Name of model-object attr which contains nested queryset. :type manager_name: str,typing.Callable :param methods: List of allowed methods to nested view endpoints. :type methods: list :param subs: List of allowed subviews or actions to nested view endpoints. :type subs: list,None :param queryset_filters: List of callable objects which returns filtered queryset of main. .. note:: Some view methods will not call for performance reasons. This also applies to some of the class attributes that are usually initialized in the methods. For example, ``.initial()`` will never call. Each viewset wrapped by nested class with additional logic. Example: .. sourcecode:: python from vstutils.api.decorators import nested_view from vstutils.api.base import ModelViewSet from . import serializers as sers class StageViewSet(ModelViewSet): model = sers.models.Stage serializer_class = sers.StageSerializer nested_view('stages', 'id', view=StageViewSet) class TaskViewSet(ModelViewSet): model = sers.models.Task serializer_class = sers.TaskSerializer This code generates api paths: * `/tasks/` - GET, POST * `/tasks/{id}/` - GET, PUT, PATCH, DELETE * `/tasks/{id}/stages/` - GET, POST * `/tasks/{id}/stages/{stages_id}/` - GET, PUT, PATCH, DELETE """ __slots__ = ( 'view', 'allowed_subs', '_subs', 'methods', 'queryset_filters', 'empty_arg', 'append_arg', 'allow_append', 'manager_name', 'schema', 'allow_bulk', ) filter_subs = ('filter',) class NoView(VSTUtilsException): msg = 'Argument "view" must be installed for `nested_view` decorator.' def __init__(self, name, arg=None, methods=None, *args, **kwargs): if 'view' not in kwargs: raise self.NoView() self.view = kwargs.pop('view') self.allowed_subs = kwargs.pop('subs', ()) self.queryset_filters = kwargs.pop('queryset_filters', []) self.empty_arg = kwargs.pop('empty_arg', False) self.allow_append = bool(kwargs.pop('allow_append', False)) self.manager_name = kwargs.pop('manager_name', name) self.allow_bulk = kwargs.pop('allow_bulk', True) super().__init__(name, arg, *args, **kwargs) self._subs = self.get_subs() self.append_arg = self.kwargs.pop('append_arg', self.arg) self.methods = methods if self.arg is None: self.methods = methods or ['get'] def _get_subs_from_view(self): # pylint: disable=protected-access actions = getmembers(self.view, _is_extra_action) if self.arg is None: actions = filter(lambda x: not x[1].detail, actions) extra_acts = (x[0] for x in actions) filter_subs = self.filter_subs return filter(lambda name: name not in filter_subs, extra_acts) def get_subs(self): subs = set(self._get_subs_from_view()) if self.allowed_subs is None: return set() elif self.allowed_subs: return set(self.allowed_subs).intersection(subs) return subs def get_view(self, name, **options): # pylint: disable=redefined-outer-name,too-many-statements mixin_class = NestedViewMixin if hasattr(self.view, 'create'): if self.allow_append: mixin_class = NestedWithAppendMixin else: mixin_class = NestedWithoutAppendMixin tp = name.split('_')[-1] if tp == 'detail': detail = True elif tp == 'list': detail = False else: detail = getattr(self.view, options['nested_sub']).detail manager_name = self.manager_name or self.name view_class = utils.get_if_lazy(self.view) class NestedView(mixin_class, view_class): __doc__ = self.view.__doc__ format_kwarg = None queryset_filters = self.queryset_filters NestedView.__name__ = self.view.__name__ # type: ignore NestedView.nested_detail = detail # type: ignore def nested_view_function_wrapper(view_obj, request, *args, **kwargs): kwargs.update(options) # Nested name view_obj.nested_name = name # Allow to append to nested or only create view_obj.nested_allow_append = self.allow_append # ID name of nested object nested_request_arg = view_obj.nested_arg = self.request_arg nested_append_arg = view_obj.nested_append_arg = self.append_arg view_obj.nested_id = kwargs.get(view_obj.nested_arg, None) nested_parent_object = view_obj.nested_parent_object = view_obj.get_object() if nested_append_arg: nested_id = getattr(nested_parent_object, nested_append_arg, None) else: nested_id = None if callable(manager_name): nested_manager = manager_name(nested_parent_object) else: if hasattr(nested_parent_object, manager_name): nested_manager = getattr(nested_parent_object, manager_name) else: view_manager_function_name = f'get_manager_{manager_name}' nested_manager_func = getattr(view_obj, view_manager_function_name) nested_manager = nested_manager_func(nested_parent_object) NestedView.__name__ = self.view.__name__ NestedView.master_view = view_obj NestedView.lookup_field = nested_append_arg NestedView.lookup_url_kwarg = nested_request_arg NestedView.nested_detail = detail NestedView.nested_allow_append = self.allow_append NestedView.nested_append_arg = nested_append_arg NestedView.nested_request_arg = nested_request_arg NestedView.nested_parent_object = nested_parent_object NestedView.nested_id = nested_id NestedView.nested_manager = nested_manager NestedView.nested_manager_kwargs = {} if self.allow_bulk else {'bulk': self.allow_bulk} getattr(view_obj, 'nested_allow_check', lambda *_, **__: None)() return nested_view_function(view_obj, NestedView, request, *args, **kwargs) nested_view_function_wrapper.__name__ = name nested_view_function_wrapper.__doc__ = self.view.__doc__ nested_view_function_wrapper._nested_view = self.view nested_view_function_wrapper._nested_wrapped_view = NestedView return name, nested_view_function_wrapper def get_view_type(self, type_name, **options): return self.get_view(f'{self.name}_{type_name}', **options) def get_list_view(self, **options): return self.get_view_type('list', **options) def get_detail_view(self, **options): return self.get_view_type('detail', **options) def get_sub_view(self, sub, **options): return self.get_view_type(sub, nested_sub=sub, **options) def get_decorator(self, detail=False, **options): kwargs = dict(self.kwargs) kwargs.update(options) return action( detail=True, url_path=_get_nested_subpath( self.name, self.request_arg if detail else '', arg_regexp=kwargs.pop('arg_regexp', '[0-9]') if detail else None, empty_arg=self.empty_arg if detail else None, sub_path=kwargs.pop('sub_path', None) ), **kwargs ) def _filter_methods(self, methods, detail=False): allowed_methods = set(self.view.get_view_methods(detail)) return allowed_methods.intersection(methods) def decorated(self, detail): name, view = self.get_detail_view() if detail else self.get_list_view() kwargs = {'url_name': f'{self.name}-{"detail" if detail else "list"}', 'detail': detail} if not detail: kwargs['suffix'] = 'List' if self.methods: kwargs['methods'] = self._filter_methods(self.methods, detail=detail) else: kwargs['methods'] = self.view.get_view_methods(detail) # type: ignore kwargs['schema'] = self.kwargs.get('schema', self.view.schema) view_class = self.get_decorator(**kwargs)(view) view_class._nested_args = getattr(self.view, '_nested_args', OrderedDict()) view_class._nested_manager = self.kwargs.get('manager_name', self.name) view_class._nested_view = self.view view_class._nested_name = self.name view_class._nested_subname = self.name view_class._nested_wrapped_view = view._nested_wrapped_view if self.arg: view_class._nested_args[self.name] = self.arg return name, view_class def decorated_list(self): return self.decorated(detail=False) def decorated_detail(self): return self.decorated(detail=True) def _get_decorated_sub(self, sub): name, subaction_view = self.get_sub_view(sub) sub_view = getattr(self.view, sub) decorator = self.get_decorator( detail=sub_view.detail, sub_path=sub_view.url_path, methods=sub_view.mapping or self.methods, url_name=f'{self.name}-{sub_view.url_name}', schema=self.kwargs.get('schema', sub_view.kwargs.get('schema', self.view.schema)), _nested_args=getattr(sub_view, '_nested_args', OrderedDict()) ) view_class = decorator(subaction_view) existing_swagger_auto_schema = getattr(view_class, '_swagger_auto_schema', {}) view_class._swagger_auto_schema = getattr( sub_view, '_swagger_auto_schema', existing_swagger_auto_schema ) view_class._nested_view = getattr(sub_view, '_nested_view', self.view) view_class._nested_name = sub view_class._nested_subname = getattr(sub_view, '_nested_subname', self.name) view_class._nested_wrapped_view = getattr(sub_view, '_nested_wrapped_view', None) view_class._nested_args = getattr(sub_view, '_nested_args', OrderedDict()) if self.arg: view_class._nested_args[self.name] = self.arg return name, view_class def generate_decorated_subs(self): for sub in self._subs: yield self._get_decorated_sub(sub) def setup(self, view_class): if self.arg: setattr(view_class, *self.decorated_detail()) view_class._nested_args = getattr(view_class, '_nested_args', OrderedDict()) view_class._nested_args[self.name] = self.request_arg if self._subs: for sub_action_name, sub_action_view in self.generate_decorated_subs(): setattr(view_class, sub_action_name, sub_action_view) setattr(view_class, *self.decorated_list()) return view_class
def extend_viewset_attribute(name, override=False, data=None): def wrapper(view_class): if not override: attr_data = tuple(list(getattr(view_class, name, ())) + list(data or ())) else: attr_data = data setattr(view_class, name, attr_data) return view_class return wrapper def extend_filterbackends(backends, override=False): return extend_viewset_attribute('filter_backends', override, backends) def cache_method_result(func): """ Decorator that caches return value of method based on args and kwargs, cache value stored in the object instance """ name = f'__cache_{func.__name__}' def wrapper(self, *args, **kwargs): result = getattr(self, name, None) if result is None or result[0] != args or result[1] != kwargs: result = (args, kwargs, func(self, *args, **kwargs)) setattr(self, name, result) return result[2] return wrapper