# cython: binding=True
# pylint: disable=unused-import
from copy import deepcopy
from functools import partial
from yaml import load
try:
from yaml import CSafeLoader as Loader
except ImportError: # nocv
from yaml import SafeLoader as Loader
from django.db.models.query import ModelIterable
from django.db.models.fields import CharField, TextField, IntegerField, BooleanField, AutoField # noqa: F401
from . import BQuerySet, BaseModel
from .base import ModelBaseClass
from ..utils import raise_context
from ..tools import get_file_value, multikeysort # pylint: disable=import-error
class Query(dict):
distinct_fields = False
def __init__(self, queryset, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queryset = queryset
self.combinator = None
self.is_sliced = False
self.select_for_update = False
self.select_related = False
self['standard_ordering'] = True
@property
def model(self):
return self.queryset.model
@property
def standard_ordering(self):
return self['standard_ordering']
@standard_ordering.setter
def standard_ordering(self, value):
self['standard_ordering'] = bool(value)
def chain(self):
return self.clone()
def clone(self):
query = deepcopy(self)
if 'custom_queryset_kwargs' in self:
query['custom_queryset_kwargs'] = self['custom_queryset_kwargs']
return query
def _check_data(self, check_type, data):
# pylint: disable=protected-access,too-many-return-statements
if getattr(self, 'empty', False):
return False
check_data = self.get(check_type, {})
if check_type == 'exclude' and not check_data:
return False
meta = self.model._meta
for filter_name, filter_data in check_data.items():
filter_name = filter_name.replace('__exact', '')
filter_name__cleared, search_format = (filter_name.split('__', maxsplit=1) + [None])[:2]
if filter_name__cleared == 'pk':
filter_name__cleared = meta.pk.attname
try:
value = data[filter_name__cleared]
except KeyError:
continue
field = meta._forward_fields_map[filter_name__cleared]
if isinstance(filter_data, (list, tuple, set)):
filter_data = map(field.to_python, filter_data)
else:
filter_data = field.to_python(filter_data)
if search_format == 'in':
if value not in filter_data:
return False
elif search_format == 'contains' and isinstance(filter_data, str):
if filter_data not in value:
return False
elif search_format == 'icontains' and isinstance(filter_data, str):
if filter_data.upper() not in value.upper():
return False
elif search_format is None:
if filter_data != value:
return False
return True
def check_in_query(self, data):
return self._check_data('filter', data) and not self._check_data('exclude', data)
def set_empty(self):
self.empty = True
def set_limits(self, low: int = None, high: int = None):
self['low_mark'], self['high_mark'] = low, high
self.is_sliced = True
def has_results(self, *args, **kwargs):
# pylint: disable=unused-argument
return bool(self.queryset.all()[:2])
def get_count(self, using):
# pylint: disable=unused-argument
model = self.model
if hasattr(model, 'get_data_generator_count'):
return model.get_data_generator_count(self)
return len(self.queryset.all())
def can_filter(self):
return self.get('low_mark', None) is None and self.get('high_mark', None) is None
def clear_ordering(self, *args, **kwargs):
# pylint: disable=unused-argument
self['ordering'] = []
def add_ordering(self, *ordering):
self['ordering'] = ordering
@property
def order_by(self):
return self.get('ordering', ())
class CustomModelIterable(ModelIterable):
def values_handler(self, unit, fields, pk_name):
# pylint: disable=no-member
return {f: unit.get(f) if f != 'pk' else unit.get(pk_name) for f in fields}
def construct_instance(self, data, model, only_fields, defer_fields):
if only_fields is not None:
data = {k: v for k, v in data.items() if k in only_fields}
elif defer_fields is not None:
data = {k: v for k, v in data.items() if k not in defer_fields}
for field in model._meta.get_fields():
if field.attname in data:
with raise_context():
data[field.attname] = field.to_python(data[field.attname])
return model(**data)
def __iter__(self):
# pylint: disable=protected-access
queryset = self.queryset
model = queryset.model
query = queryset.query
if hasattr(model, 'get_data_generator'):
model_data = model.get_data_generator(query=query)
else:
model_data = model._get_data(
chunked_fetch=self.chunked_fetch,
**query.get('custom_queryset_kwargs', {})
)
if isinstance(model._meta.pk, AutoField):
for idx, item in enumerate(model_data, 1):
item[model._meta.pk.attname] = idx
model_data = list(filter(query.check_in_query, model_data))
ordering = query.order_by
if ordering:
ordering = list(ordering)
for idx, value in enumerate(ordering):
if value in ('pk', '-pk'):
ordering[idx] = value.replace('pk', model._meta.pk.name)
model_data = multikeysort(
model_data,
ordering,
not query.standard_ordering
)
elif not query.standard_ordering:
model_data.reverse()
low = query.get('low_mark', 0)
high = query.get('high_mark', len(model_data))
model_data = model_data[low:high]
fields = getattr(self, 'fields', None)
if fields is None:
handler = partial(
self.construct_instance,
model=model,
only_fields=getattr(self, 'only_fields', None),
defer_fields=getattr(self, 'defer_fields', None),
)
else:
handler = partial(
self.values_handler,
fields=tuple(fields) or {f.name for f in model._meta.get_fields()},
pk_name=model._meta.pk.name,
)
for data in model_data:
yield handler(data)
class CustomQuerySet(BQuerySet):
custom_iterable_class = CustomModelIterable
custom_query_class = Query
def _filter_or_exclude(self, negate, *args, **kwargs):
clone = self._chain()
if negate:
filter_type = 'exclude'
else:
filter_type = 'filter'
clone.query[filter_type] = clone.query.get(filter_type, {})
clone.query[filter_type].update(kwargs)
for q_arg in filter(lambda x: isinstance(x, dict), filter(bool, args)):
clone.query[filter_type].update(q_arg) # nocv
return clone
_filter_or_exclude_inplace = _filter_or_exclude
def last(self):
return self.reverse().first()
def first(self):
return next(iter(self), None)
def values(self, *fields, **expressions):
assert not expressions, 'Expressions is not supported on custom non-database models.'
clone = self._clone()
clone.__iterable_class__ = type('CustomModelIterableValues', (CustomModelIterable,), {'fields': fields})
return clone
def setup_custom_queryset_kwargs(self, **kwargs):
qs = self._chain()
qs.query['custom_queryset_kwargs'] = kwargs
return qs
def only(self, *fields):
clone = self._clone()
clone.__iterable_class__ = type('CustomModelIterable', (CustomModelIterable,), {'only_fields': fields})
return clone
def defer(self, *fields):
clone = self._clone()
clone.__iterable_class__ = type('CustomModelIterable', (CustomModelIterable,), {'defer_fields': fields})
return clone
class CustomModelBase(ModelBaseClass):
def __new__(mcs, name, bases, attrs, **kwargs):
new_class = super(CustomModelBase, mcs).__new__(mcs, name, bases, attrs, **kwargs)
if not new_class._meta.abstract:
pk_name = new_class._meta.pk.attname
new_class.add_to_class(
pk_name,
property(new_class.get_pk_value, new_class.set_pk_value)
)
return new_class
[docs]class ListModel(BaseModel, metaclass=CustomModelBase):
"""
Custom model which uses a list of dicts with data (attribute `ListModel.data`) instead of database records.
Useful when you have a simple list of data.
Examples:
.. sourcecode:: python
from vstutils.custom_model import ListModel, CharField
class Authors(ListModel):
name = CharField(max_length=512)
data = [
{"name": "Sergey Klyuykov"},
{"name": "Michael Taran"},
]
Sometimes, it may be necessary to switch the data source. For these purposes,
you should use the `setup_custom_queryset_kwargs` function, which takes various named arguments,
which are also passed to the data initialization function.
One such argument for :class:`ListModel` is date_source, which takes any iterable object.
Examples:
.. sourcecode:: python
from vstutils.custom_model import ListModel, CharField
class Authors(ListModel):
name = CharField(max_length=512)
qs = Authors.objects.setup_custom_queryset_kwargs(data_source=[
{"name": "Sergey Klyuykov"},
{"name": "Michael Taran"},
])
In this case, we setup source list via `setup_custom_queryset_kwargs` function, and any other chained call
is going to work with this data.
"""
#: List with data dicts. Empty by default.
data = []
objects = CustomQuerySet.as_manager()
class Meta:
abstract = True
def get_pk_value(self):
return getattr(
self,
f'_{self.__class__._meta.pk.attname}',
None
)
def set_pk_value(self, value):
setattr(
self,
f'_{self.__class__._meta.pk.attname}',
value
)
@classmethod
def _get_data(cls, chunked_fetch=False, data_source=None):
# pylint: disable=unused-argument
return deepcopy(cls.data if data_source is None else data_source)
[docs]class FileModel(ListModel):
"""
Custom model which loads data from YAML-file instead of database.
Path to the file stored in `FileModel.file_path` attribute.
Examples:
Source file stored in `/etc/authors.yaml` with content:
.. sourcecode:: YAML
- name: "Sergey Klyuykov"
- name: "Michael Taran"
Example:
.. sourcecode:: python
from vstutils.custom_model import FileModel, CharField
class Authors(FileModel):
name = CharField(max_length=512)
file_path = '/etc/authors.yaml'
"""
class Meta:
abstract = True
@classmethod
def load_file_data(cls):
# pylint: disable=no-member
return get_file_value(cls.file_path, strip=False)
@classmethod
def _get_data(cls, chunked_fetch=False):
return load(cls.load_file_data(), Loader=Loader)
[docs]class ExternalCustomModel(ListModel):
"""
This custom model is intended for self-implementation of requests to external services.
The model allows you to pass filtering, limiting and sorting parameters to an external request,
receiving already limited data.
To start using this model, it is enough to implement the ``get_data_generator()`` class method,
which receives the query object with the necessary parameters as an argument.
"""
class Meta:
abstract = True
@classmethod
def get_data_generator(cls, query):
raise NotImplementedError
[docs]class ViewCustomModel(ExternalCustomModel):
"""
This model implements the SQL View programming mechanism over other models.
In the ``get_view_queryset()`` method, a query is prepared, and all further actions are implemented on top of it.
"""
class Meta:
abstract = True
@classmethod
def get_view_queryset(cls):
raise NotImplementedError
@classmethod
def get_data_generator(cls, query):
qs = cls.get_view_queryset()\
.filter(**query.get('filter', {}))\
.exclude(**query.get('exclude', {}))\
.order_by(*query.order_by)
if query.is_sliced:
qs = qs[query.get('low_mark'):query.get('high_mark')]
return qs.values(*{f.name for f in cls._meta.get_fields()})
@classmethod
def get_data_generator_count(cls, query):
return cls.get_data_generator(query).count()