Source code for vstutils.models.custom_model

# cython: binding=True
# pylint: disable=unused-import
from copy import deepcopy
from functools import partial

from yaml import load
try:
    from yaml import CSafeLoader as Loader
except ImportError:  # nocv
    from yaml import SafeLoader as Loader
from django.db.models.query import ModelIterable
from django.db.models.fields import CharField, TextField, IntegerField, BooleanField, AutoField    # noqa: F401

from . import BQuerySet, BaseModel
from .base import ModelBaseClass
from ..utils import raise_context
from ..tools import get_file_value, multikeysort  # pylint: disable=import-error


class Query(dict):
    distinct_fields = False

    def __init__(self, queryset, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queryset = queryset
        self.combinator = None
        self.is_sliced = False
        self.select_for_update = False
        self.select_related = False
        self['standard_ordering'] = True

    @property
    def model(self):
        return self.queryset.model

    @property
    def standard_ordering(self):
        return self['standard_ordering']

    @standard_ordering.setter
    def standard_ordering(self, value):
        self['standard_ordering'] = bool(value)

    def chain(self):
        return self.clone()

    def clone(self):
        query = deepcopy(self)
        if 'custom_queryset_kwargs' in self:
            query['custom_queryset_kwargs'] = self['custom_queryset_kwargs']
        return query

    def _check_data(self, check_type, data):
        # pylint: disable=protected-access,too-many-return-statements
        if getattr(self, 'empty', False):
            return False
        check_data = self.get(check_type, {})
        if check_type == 'exclude' and not check_data:
            return False
        meta = self.model._meta
        for filter_name, filter_data in check_data.items():
            filter_name = filter_name.replace('__exact', '')
            filter_name__cleared, search_format = (filter_name.split('__', maxsplit=1) + [None])[:2]
            if filter_name__cleared == 'pk':
                filter_name__cleared = meta.pk.attname
            try:
                value = data[filter_name__cleared]
            except KeyError:
                continue
            field = meta._forward_fields_map[filter_name__cleared]
            if isinstance(filter_data, (list, tuple, set)):
                filter_data = map(field.to_python, filter_data)
            else:
                filter_data = field.to_python(filter_data)
            if search_format == 'in':
                if value not in filter_data:
                    return False
            elif search_format == 'contains' and isinstance(filter_data, str):
                if filter_data not in value:
                    return False
            elif search_format == 'icontains' and isinstance(filter_data, str):
                if filter_data.upper() not in value.upper():
                    return False
            elif search_format is None:
                if filter_data != value:
                    return False
        return True

    def check_in_query(self, data):
        return self._check_data('filter', data) and not self._check_data('exclude', data)

    def set_empty(self):
        self.empty = True

    def set_limits(self, low: int = None, high: int = None):
        self['low_mark'], self['high_mark'] = low, high
        self.is_sliced = True

    def has_results(self, *args, **kwargs):
        # pylint: disable=unused-argument
        return bool(self.queryset.all()[:2])

    def get_count(self, using):
        # pylint: disable=unused-argument
        model = self.model
        if hasattr(model, 'get_data_generator_count'):
            return model.get_data_generator_count(self)
        return len(self.queryset.all())

    def can_filter(self):
        return self.get('low_mark', None) is None and self.get('high_mark', None) is None

    def clear_ordering(self, *args, **kwargs):
        # pylint: disable=unused-argument
        self['ordering'] = []

    def add_ordering(self, *ordering):
        self['ordering'] = ordering

    @property
    def order_by(self):
        return self.get('ordering', ())


class CustomModelIterable(ModelIterable):
    def values_handler(self, unit, fields, pk_name):
        # pylint: disable=no-member
        return {f: unit.get(f) if f != 'pk' else unit.get(pk_name) for f in fields}

    def construct_instance(self, data, model, only_fields, defer_fields):
        if only_fields is not None:
            data = {k: v for k, v in data.items() if k in only_fields}
        elif defer_fields is not None:
            data = {k: v for k, v in data.items() if k not in defer_fields}
        for field in model._meta.get_fields():
            if field.attname in data:
                with raise_context():
                    data[field.attname] = field.to_python(data[field.attname])
        return model(**data)

    def __iter__(self):
        # pylint: disable=protected-access
        queryset = self.queryset
        model = queryset.model
        query = queryset.query

        if hasattr(model, 'get_data_generator'):
            model_data = model.get_data_generator(query=query)
        else:
            model_data = model._get_data(
                chunked_fetch=self.chunked_fetch,
                **query.get('custom_queryset_kwargs', {})
            )
            if isinstance(model._meta.pk, AutoField):
                for idx, item in enumerate(model_data, 1):
                    item[model._meta.pk.attname] = idx
            model_data = list(filter(query.check_in_query, model_data))
            ordering = query.order_by
            if ordering:
                ordering = list(ordering)
                for idx, value in enumerate(ordering):
                    if value in ('pk', '-pk'):
                        ordering[idx] = value.replace('pk', model._meta.pk.name)
                model_data = multikeysort(
                    model_data,
                    ordering,
                    not query.standard_ordering
                )
            elif not query.standard_ordering:
                model_data.reverse()
            low = query.get('low_mark', 0)
            high = query.get('high_mark', len(model_data))
            model_data = model_data[low:high]

        fields = getattr(self, 'fields', None)
        if fields is None:
            handler = partial(
                self.construct_instance,
                model=model,
                only_fields=getattr(self, 'only_fields', None),
                defer_fields=getattr(self, 'defer_fields', None),
            )
        else:
            handler = partial(
                self.values_handler,
                fields=tuple(fields) or {f.name for f in model._meta.get_fields()},
                pk_name=model._meta.pk.name,
            )
        for data in model_data:
            yield handler(data)


class CustomQuerySet(BQuerySet):
    custom_iterable_class = CustomModelIterable
    custom_query_class = Query

    def _filter_or_exclude(self, negate, *args, **kwargs):
        clone = self._chain()
        if negate:
            filter_type = 'exclude'
        else:
            filter_type = 'filter'
        clone.query[filter_type] = clone.query.get(filter_type, {})
        clone.query[filter_type].update(kwargs)
        for q_arg in filter(lambda x: isinstance(x, dict), filter(bool, args)):
            clone.query[filter_type].update(q_arg)  # nocv
        return clone

    _filter_or_exclude_inplace = _filter_or_exclude

    def last(self):
        return self.reverse().first()

    def first(self):
        return next(iter(self), None)

    def values(self, *fields, **expressions):
        assert not expressions, 'Expressions is not supported on custom non-database models.'
        clone = self._clone()
        clone.__iterable_class__ = type('CustomModelIterableValues', (CustomModelIterable,), {'fields': fields})
        return clone

    def setup_custom_queryset_kwargs(self, **kwargs):
        qs = self._chain()
        qs.query['custom_queryset_kwargs'] = kwargs
        return qs

    def only(self, *fields):
        clone = self._clone()
        clone.__iterable_class__ = type('CustomModelIterable', (CustomModelIterable,), {'only_fields': fields})
        return clone

    def defer(self, *fields):
        clone = self._clone()
        clone.__iterable_class__ = type('CustomModelIterable', (CustomModelIterable,), {'defer_fields': fields})
        return clone


class CustomModelBase(ModelBaseClass):
    def __new__(mcs, name, bases, attrs, **kwargs):
        new_class = super(CustomModelBase, mcs).__new__(mcs, name, bases, attrs, **kwargs)
        if not new_class._meta.abstract:
            pk_name = new_class._meta.pk.attname
            new_class.add_to_class(
                pk_name,
                property(new_class.get_pk_value, new_class.set_pk_value)
            )
        return new_class


[docs]class ListModel(BaseModel, metaclass=CustomModelBase): """ Custom model which uses a list of dicts with data (attribute `ListModel.data`) instead of database records. Useful when you have a simple list of data. Examples: .. sourcecode:: python from vstutils.custom_model import ListModel, CharField class Authors(ListModel): name = CharField(max_length=512) data = [ {"name": "Sergey Klyuykov"}, {"name": "Michael Taran"}, ] Sometimes, it may be necessary to switch the data source. For these purposes, you should use the `setup_custom_queryset_kwargs` function, which takes various named arguments, which are also passed to the data initialization function. One such argument for :class:`ListModel` is date_source, which takes any iterable object. Examples: .. sourcecode:: python from vstutils.custom_model import ListModel, CharField class Authors(ListModel): name = CharField(max_length=512) qs = Authors.objects.setup_custom_queryset_kwargs(data_source=[ {"name": "Sergey Klyuykov"}, {"name": "Michael Taran"}, ]) In this case, we setup source list via `setup_custom_queryset_kwargs` function, and any other chained call is going to work with this data. :ivar list data: List with data dicts. Empty by default. """ data = [] objects = CustomQuerySet.as_manager() class Meta: abstract = True def get_pk_value(self): return getattr( self, f'_{self.__class__._meta.pk.attname}', None ) def set_pk_value(self, value): setattr( self, f'_{self.__class__._meta.pk.attname}', value ) @classmethod def _get_data(cls, chunked_fetch=False, data_source=None): # pylint: disable=unused-argument return deepcopy(cls.data if data_source is None else data_source)
[docs]class FileModel(ListModel): """ Custom model that loads data from a YAML file instead of a database. The path to the file is specified in the `FileModel.file_path` attribute. Examples: Suppose the source file is stored at `/etc/authors.yaml` with the following content: .. sourcecode:: YAML - name: "Sergey Klyuykov" - name: "Michael Taran" You can create a custom model using this file: .. sourcecode:: python from vstutils.custom_model import FileModel, CharField class Authors(FileModel): name = CharField(max_length=512) file_path = '/etc/authors.yaml' """ class Meta: abstract = True @classmethod def load_file_data(cls): # pylint: disable=no-member return get_file_value(cls.file_path, strip=False) @classmethod def _get_data(cls, chunked_fetch=False): return load(cls.load_file_data(), Loader=Loader)
[docs]class ExternalCustomModel(ListModel): """ Represents a custom model designed for the self-implementation of requests to external services. This model facilitates the seamless interaction with external services by allowing the passing of filtering, limiting, and sorting parameters to an external request. It is designed to receive data that is already filtered and limited. To utilize this model effectively, developers need to implement the ``get_data_generator()`` class method. This method receives a query object containing the necessary parameters, enabling developers to customize interactions with external services. **Example:** .. code-block:: python class MyExternalModel(ExternalCustomModel): # ... model fields ... class Meta: managed = False @classmethod def get_data_generator(cls, query): data = ... # some fetched data from the external resource or generated from memory calculations. for row in data: yield row """ class Meta: abstract = True
[docs] @classmethod def get_data_generator(cls, query): """ This class method must be implemented by derived classes to define custom logic for fetching data from an external service based on the provided query parameters. Query object might contain the following parameters: * filter (dict): A dictionary specifying the filtering criteria. * exclude (dict): A dictionary specifying the exclusion criteria. * order_by (list): A list specifying the sorting order. * low_mark (int): The low index for slicing (if sliced). * high_mark (int): The high index for slicing (if sliced). * is_sliced (bool): A boolean indicating whether the query is sliced. :param query: An object containing filtering, limiting, and sorting parameters. :type query: dict :return: A generator that yields the requested data. :rtype: Generator :raises NotImplementedError: If the method is not implemented by the derived class. """ raise NotImplementedError
[docs]class ViewCustomModel(ExternalCustomModel): """ Implements the SQL View programming mechanism over other models. This model provides a mechanism for implementing SQL View-like behavior over other models. In the ``get_view_queryset()`` method, a base query is prepared, and all further actions are implemented on top of it. **Example Usage:** .. code-block:: python class MyViewModel(ViewCustomModel): # ... model fields ... class Meta: managed = False @classmethod def get_view_queryset(cls): return SomeModel.objects.annotate(...) # add some additional annotations to query """ class Meta: abstract = True
[docs] @classmethod def get_view_queryset(cls): """ This class method must be implemented by derived classes to define custom logic for generating the base queryset for the SQL View. :return: The base queryset for the SQL View. :rtype: django.db.models.query.QuerySet :raises NotImplementedError: If the method is not implemented by the derived class. """ raise NotImplementedError
@classmethod def get_data_generator(cls, query): qs = cls.get_view_queryset()\ .filter(**query.get('filter', {}))\ .exclude(**query.get('exclude', {}))\ .order_by(*query.order_by) if query.is_sliced: qs = qs[query.get('low_mark'):query.get('high_mark')] return qs.values(*{f.name for f in cls._meta.get_fields()}) @classmethod def get_data_generator_count(cls, query): return cls.get_data_generator(query).count()