# pylint: disable=django-not-available,invalid-name,import-outside-toplevel,too-many-lines
import base64
import codecs
import io
import logging
import os
import pickle
import random
import subprocess
import asyncio
import sys
import tempfile
import time
import json
import traceback
import types
import typing as tp
import uuid
import warnings
from functools import lru_cache, wraps
from pathlib import Path
from enum import Enum, EnumMeta
from importlib import import_module
from asgiref.sync import sync_to_async, async_to_sync
from django.conf import settings
from django.middleware.gzip import GZipMiddleware
from django.urls import re_path, path, include
from django.core.mail import get_connection, EmailMultiAlternatives
from django.core.cache import caches, InvalidCacheBackendError
from django.core.paginator import Paginator as BasePaginator
from django.core.exceptions import ImproperlyConfigured
from django.template import loader
from django.utils import translation, functional
from django.utils.cache import cc_delim_re
from django.utils.translation import get_language
from django.utils.module_loading import import_string
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.base import View
from . import exceptions as ex
if tp.TYPE_CHECKING: # nocv
from django.contrib.sessions.backends.base import SessionBase
logger: logging.Logger = logging.getLogger('vstutils')
ON_POSIX = 'posix' in sys.builtin_module_names
_gzip_object = GZipMiddleware(lambda *args, **kwargs: None) # type: ignore
[docs]
def deprecated(func):
"""
This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used.
:param func: any callable that will be wrapped and will issue a deprecation warning when called.
"""
def new_func(*args, **kwargs):
warnings.warn(f'Call to deprecated function {func.__name__}.',
category=DeprecationWarning,
stacklevel=2)
return func(*args, **kwargs)
return new_func
[docs]
def raise_misconfiguration(ok, message=None):
"""
Helper function that raises an `ImproperlyConfigured` exception if a condition is not met.
This function acts as a replacement for the `assert` statement, providing clearer error handling
in cases where the application configuration is incorrect.
:param ok:
A value of any type that can be evaluated as a boolean. If the boolean evaluation returns False,
the exception will be raised.
:type ok: Any
:param message:
An optional message to include in the exception.
If not provided, the exception will be raised without a message.
:type message: str, optional
:raises ImproperlyConfigured:
Raised if the boolean evaluation of the `ok` parameter is False,
indicating a misconfiguration in the application.
:return:
This function does not return any value. It either passes silently or raises an exception.
:rtype: None
"""
if not ok:
raise ImproperlyConfigured(message)
[docs]
def list_to_choices(items_list, response_type=list):
"""
Method to create django model choices from flat list of values.
:param items_list: list of flat values.
:param response_type: casting type of returned mapping
:return: list of tuples from `items_list` values
"""
return response_type(((x, x) for x in items_list))
def is_member_descriptor(obj):
try:
return type(obj).__name__ == 'member_descriptor'
except: # nocv
return False
@lru_cache()
def current_lang(lang=None):
from vstutils.api.models import Language
try:
return Language.objects.get(code=lang or get_language())
except Exception: # nocv
return Language.objects.all().first()
[docs]
def get_render(name, data, trans='en'):
"""
Render string from template.
:param name: -- full template name
:type name: str
:param data: -- dict of rendered vars
:type data: dict
:param trans: -- translation for render. Default 'en'.
:type trans: str
:return: -- rendered string
:rtype: str
"""
cur_language: str = translation.get_language()
try:
if trans != cur_language:
translation.activate(trans)
config = loader.get_template(name)
result = config.render(data, data.pop('request', None)).replace('\r', '')
finally:
if trans != cur_language:
translation.activate(cur_language)
return result
[docs]
def encode(key, clear):
"""
Encode string by Vigenère cipher.
:param key: -- secret key for encoding
:type key: str
:param clear: -- clear value for encoding
:type clear: str
:return: -- encoded string
:rtype: str
"""
# pylint: disable=consider-using-enumerate
enc = []
for i in range(len(clear)):
key_c = key[i % len(key)]
enc.append(chr((ord(clear[i]) + ord(key_c)) % 256))
return base64.urlsafe_b64encode("".join(enc).encode()).decode()
[docs]
def decode(key, enc):
"""
Decode string from encoded by Vigenère cipher.
:param key: -- secret key for encoding
:type key: str
:param enc: -- encoded string for decoding
:type enc: str
:return: -- decoded string
:rtype: str
"""
# pylint: disable=consider-using-enumerate
dec = []
enc = base64.urlsafe_b64decode(enc).decode()
for i in range(len(enc)):
key_c = key[i % len(key)]
dec.append(chr((256 + ord(enc[i]) - ord(key_c)) % 256))
return "".join(dec)
def get_if_lazy(obj):
with raise_context():
if isinstance(obj, functional.LazyObject):
# pylint: disable=protected-access
obj._setup() if obj._wrapped == functional.empty else None
return obj._wrapped
return obj
[docs]
def send_mail(subject, message, from_email, recipient_list, # noqa: CFQ002
fail_silently=False, auth_user=None, auth_password=None,
connection=None, html_message=None, **kwargs):
"""
Wrapper over :func:`django.core.mail.send_mail` which provide additional named arguments.
"""
# pylint: disable=too-many-arguments
connection = connection or get_connection(
username=auth_user,
password=auth_password,
fail_silently=fail_silently,
)
mail = EmailMultiAlternatives(
subject,
message,
from_email,
recipient_list,
connection=connection,
**kwargs,
)
if html_message:
mail.attach_alternative(html_message, 'text/html')
return mail.send()
[docs]
def send_template_email_handler(
subject,
email_from,
email,
template_name,
context_data=None,
**kwargs,
):
"""
Function for email sending.
The function convert recipient to list and set context before sending if it possible.
:param subject: mail subject.
:param email_from: sender that be setup in email.
:param email: list of strings or single string, with email addresses of recipients
:param template_name: relative path to template in `templates` directory, must include extension in file name.
:param context_data: dictionary with context for rendering message template.
:param kwargs: additional named arguments for `send_mail`
:return: Number of emails sent.
"""
recipient_list = email if isinstance(email, (list, tuple)) else [email]
if context_data is None:
context = {}
elif not isinstance(context_data, dict):
context = dict(context_data)
else:
context = context_data.copy()
return send_mail(
subject=subject,
message="",
from_email=email_from,
recipient_list=recipient_list,
html_message=loader.render_to_string(
template_name,
context=context,
request=context.pop('request', None)
),
**kwargs
)
[docs]
def send_template_email(sync=False, **kwargs):
"""
Function executing sync or async email sending; according `sync` argument and settings variable "RPC_ENABLED".
If you don't set settings for celery or don't have celery it sends synchronously mail.
If celery is installed and configured and `sync` argument of the function is set to `False`,
it sends asynchronously email.
:param sync: argument for determining how send email, asynchronously or synchronously
:param subject: mail subject.
:param email: list of strings or single string, with email addresses of recipients
:param template_name: relative path to template in `templates` directory, must include extension in file name.
:param context_data: dictionary with context for rendering message template.
"""
if sync or not settings.RPC_ENABLED:
send_template_email_handler(email_from=settings.EMAIL_FROM_ADDRESS, **kwargs)
else:
from .tasks import SendEmailMessage
SendEmailMessage.do(email_from=settings.EMAIL_FROM_ADDRESS, **kwargs)
def patch_gzip_response(response, request):
if not response.status_code == 200:
return # nocv
GZipMiddleware.process_response(_gzip_object, request, response)
return response
def patch_gzip_response_decorator(func):
def gzip_response_wrapper(view, request, *args, **kwargs):
response = func(view, request, *args, **kwargs)
with raise_context():
patch_gzip_response(response, request)
return response
return gzip_response_wrapper
[docs]
def translate(text):
"""
The ``translate`` function supports translation message dynamically
with standard i18n vstutils'es mechanisms usage.
Uses :func:`django.utils.translation.get_language` to get the language code and
tries to get the translation from the list of available ones.
:param text: Text message which should be translated.
"""
try:
return current_lang(lang=get_language()).translate(text)
except Exception: # nocv
return text
[docs]
def lazy_translate(text):
"""
The ``lazy_translate`` function has the same behavior as :func:`.translate`, but wraps it in a lazy promise.
This is very useful, for example, for translating error messages in
class attributes before the language code is known.
:param text: Text message which should be translated.
"""
return functional.lazy(translate, str)(text)
[docs]
def create_view(model, **meta_options):
"""
A simple function for getting the generated view by standard means, but with overloaded meta-parameters.
This method can completely get rid of the creation of proxy models.
Example:
.. sourcecode:: python
from vstutils.utils import create_view
from .models import Host
# Host model has full :class:`vstutils.api.base.ModelViewSet` view.
# For overriding and create simple list view just setup this:
HostListViewSet = create_view(
HostList,
view_class='list_only'
)
.. note::
This method is also recommended in cases where there is a problem of recursive imports.
.. warning::
This function is oldstyle and will be deprecated in future versions.
Use native call of method :meth:`vstutils.models.BModel.get_view_class`.
:type model: Type[vstutils.models.BaseModel]
:param model: Model class with `.get_view_class` method. This method also has :class:`vstutils.models.BModel`.
:rtype: vstutils.api.base.GenericViewSet
"""
return model.get_view_class(**meta_options)
[docs]
class apply_decorators:
"""
Decorator which apply list of decorators on method or class.
Example:
.. sourcecode:: python
from vstutils.utils import apply_decorators
def decorator_one(func):
print(f"Decorated {func.__name__} by first decorator.")
return func
def decorator_two(func):
print(f"Decorated {func.__name__} by second decorator.")
return func
@apply_decorators(decorator_one, decorator_two)
def decorated_function():
# Function decorated by both decorators.
print("Function call.")
"""
__slots__ = ('decorators',)
def __init__(self, *decorators):
self.decorators = decorators
def __call__(self, func):
for decorator in self.decorators:
func = decorator(func)
return func
class ClassPropertyMeta(type):
def __setattr__(cls, key, value):
obj = cls.__dict__.get(key, None)
if isinstance(obj, ClassPropertyDescriptor):
return obj.__set__(cls, value)
return super().__setattr__(key, value)
class ClassPropertyDescriptor:
__slots__ = ('fget', 'fset')
meta = ClassPropertyMeta
def __init__(self, fget, fset=None):
self.fget, self.fset = self._fix_function(fget), self._fix_function(fset)
def __get__(self, obj, klass=None):
if obj is not None:
return self.fget.__get__(obj, obj)()
return self.fget.__get__(obj, type(obj) if klass is None else klass)()
def __set__(self, obj, value):
if not self.fset:
raise AttributeError("can't set attribute")
if obj is not None:
return self.fset.__get__(obj, obj)(value)
return self.fset.__get__(obj, type(obj))(value) # nocv
def setter(self, func):
self.fset = self._fix_function(func)
return self
@classmethod
def _fix_function(cls, func):
if func is None:
return func
if not isinstance(func, (classmethod, staticmethod)):
func = classmethod(func)
return func
[docs]
class classproperty(ClassPropertyDescriptor):
"""
Decorator which makes class method as class property.
Example:
.. sourcecode:: python
from vstutils.utils import classproperty
class SomeClass(metaclass=classproperty.meta):
# Metaclass is needed for set attrs in class
# instead of and not only object.
some_value = None
@classproperty
def value(cls):
return cls.some_value
@value.setter
def value(cls, new_value):
cls.some_value = new_value
:param fget: function for getting an attribute value.
:param fset: function for setting an attribute value.
"""
[docs]
class redirect_stdany:
"""
Context for redirect any output to own stream.
.. note::
- On context returns stream object.
- On exit returns old streams.
"""
__slots__ = ('stream', 'streams', '_old_streams')
_streams = ["stdout", "stderr"]
def __init__(self, new_stream=io.StringIO(), streams=None):
"""
:param new_stream: -- stream where redirects all
:type new_stream: object
:param streams: -- names of streams like ``['stdout', 'stderr']``
:type streams: list
"""
self.streams = streams or self._streams
self.stream = new_stream
self._old_streams = {}
def __enter__(self):
for stream in self.streams:
self._old_streams[stream] = getattr(sys, stream)
setattr(sys, stream, self.stream)
return self.stream
def __exit__(self, exctype, excinst, exctb):
for stream in self.streams:
setattr(sys, stream, self._old_streams.pop(stream))
[docs]
class Dict(dict):
"""
Wrapper over `dict` which
return JSON on conversion to string.
"""
def __repr__(self): # nocv
return self.__str__()
def __str__(self):
return json.dumps(self.copy())
[docs]
class tmp_file:
"""
Temporary file with name
generated and auto removed on close.
**Attributes**:
:param data: -- string to write in tmp file.
:type data: str
:param mode: -- file open mode. Default 'w'.
:type mode: str
:param bufsize: -- buffer size for tempfile.NamedTemporaryFile
:type bufsize: int
:param kwargs: -- other kwargs for tempfile.NamedTemporaryFile
"""
__slots__ = ('fd', 'path')
def __init__(self, data="", mode="w", bufsize=-1, **kwargs):
# pylint: disable=consider-using-with
self.fd = tempfile.NamedTemporaryFile(mode, buffering=bufsize, **kwargs)
self.path = Path(self.fd.name)
if data:
self.write(data)
[docs]
def write(self, wr_string):
"""
Write to file and flush
:param wr_string: -- writable string
:type wr_string: str
:return: None
:rtype: None
"""
result = self.fd.write(wr_string)
self.fd.flush()
return result
def __getattr__(self, name):
return getattr(self.fd, name)
def __del__(self):
with raise_context():
self.fd.close()
def __enter__(self):
"""
:return: -- file object
:rtype: tempfile.NamedTemporaryFile
"""
return self
def __exit__(self, type_e, value, tb):
self.fd.close()
[docs]
class tmp_file_context:
"""
Context object for work with tmp_file.
Auto close on exit from context and
remove if file still exist.
This context manager over :class:`.tmp_file`
"""
__slots__ = ('tmp',)
def __init__(self, *args, **kwargs):
self.tmp: tempfile.NamedTemporaryFile = tmp_file(*args, **kwargs)
def __enter__(self):
return self.tmp
def __exit__(self, type_e, value, tb):
self.tmp.close()
if self.tmp.path.exists():
self.tmp.path.unlink()
class assertRaises:
__slots__ = ('_kwargs', '_verbose', '_exclude', '_excepts', '_log', '__failed')
def __init__(self, *args, **kwargs):
"""
:param args: -- list of exception classes should be passed
:type args: list,Exception
:param exclude: -- list of exception classes should be raised
:type exclude: list,Exception
:param verbose: -- logging
:type verbose: bool
"""
self._kwargs = {**kwargs}
self._verbose = kwargs.pop("verbose", settings.DEBUG)
self._exclude = kwargs.pop("exclude", False)
self._excepts = tuple(args)
self.__failed = False
def mark_as_failed(self):
self.__failed = True
def cleanup_fails(self):
self.__failed = False
@property
def is_failed(self):
return self.__failed
def __enter__(self):
return self # pragma: no cover
def __exit__(self, exc_type, exc_val, exc_tb):
return exc_type is not None and (
(not self._exclude and not issubclass(exc_type, self._excepts)) or
(self._exclude and issubclass(exc_type, self._excepts))
)
# noinspection PyUnreachableCode
[docs]
class raise_context(assertRaises):
"""
Context for exclude exceptions.
"""
__slots__ = ()
def execute(self, func, *args, **kwargs):
self.cleanup_fails()
with self.__class__(self._excepts, **self._kwargs):
try:
return func(*args, **kwargs)
except:
self.mark_as_failed()
type, value, traceback_obj = sys.exc_info()
if self._verbose:
logger.debug(traceback.format_exc())
raise
return type, value, traceback_obj
def __enter__(self):
return self.execute
def __call__(self, original_function):
def wrapper(*args, **kwargs):
return self.execute(original_function, *args, **kwargs)
return wrapper
[docs]
class raise_context_decorator_with_default(raise_context):
"""
Context for exclude errors and return default value.
Example:
.. sourcecode:: python
from yaml import load
from vstutils.utils import raise_context_decorator_with_default
@raise_context_decorator_with_default(default={})
def get_host_data(yaml_path, host):
with open(yaml_path, 'r') as fd:
data = load(fd.read(), Loader=Loader)
return data[host]
# This decorator used when you must return some value even on error
# In log you also can see traceback for error if it occur
def clone_host_data(host):
bs_data = get_host_data('inventories/aws/hosts.yml', 'build_server')
...
"""
__slots__ = ('default_value',)
def __init__(self, *args, **kwargs):
self.default_value = kwargs.pop('default', None)
super().__init__(*args, **kwargs)
def execute(self, func, *args, **kwargs):
result = super().execute(func, *args, **kwargs)
if self.is_failed:
return self.default_value
return result
class exception_with_traceback(raise_context):
__slots__ = ()
def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
exc_val.traceback = traceback.format_exc()
raise exc_val.with_traceback(exc_tb)
[docs]
class BaseVstObject:
"""
Default mixin-class for custom objects which needed to get settings and cache.
"""
__slots__ = ()
[docs]
@classmethod
def get_django_settings(cls, name, default=None):
# pylint: disable=access-member-before-definition
"""
Get params from Django settings.
:param name: name of param
:type name: str
:param default: default value of param
:type default: object
:return: Param from Django settings or default.
"""
return getattr(settings, name, default)
@classmethod
def get_django_cache(cls, cache_name='default'):
try:
return caches[cache_name]
except InvalidCacheBackendError: # nocv
return caches['default']
[docs]
class SecurePickling(BaseVstObject):
"""
Secured pickle wrapper by Vigenère cipher.
.. warning::
Do not use it with untrusted transport anyway.
Example:
.. sourcecode:: python
from vstutils.utils import SecurePickling
serializer = SecurePickling('password')
# Init secret object
a = {"key": "value"}
# Serialize object with secret key
pickled = serializer.dumps(a)
# Deserialize object
unpickled = serializer.loads(pickled)
# Check, that object is correct
assert a == unpickled
"""
__slots__ = ('secure_key',)
def __init__(self, secure_key=None):
"""
:param secure_key: Secret key for encoding.
"""
if secure_key is None:
secure_key = self.get_django_settings('SECRET_KEY')
self.secure_key = str(secure_key)
def _encode(self, value):
return encode(self.secure_key, value)
def _decode(self, value):
return decode(self.secure_key, value)
def loads(self, value):
return pickle.loads(codecs.decode(self._decode(value).encode(), "base64")) # nosec
def dumps(self, value):
return self._encode(codecs.encode(pickle.dumps(value), "base64").decode())
[docs]
class Executor(BaseVstObject):
"""
Command executor with realtime output write and line handling.
By default and by design executor initialize string attribute ``output``
which will be modified by ``+=`` operator with new lines by :meth:`.Executor.write_output`
procedure. Override the method if you want change behavior.
Executor class supports periodically (0.01 sec) handling process and execute some checks
by overriding :meth:`.Executor.working_handler` procedure method. If you want disable this behavior
override the method by None value or use :class:`.UnhandledExecutor`.
"""
__slots__ = ('output', '__stdout__', '__stderr__', 'env')
CANCEL_PREFIX = "CANCEL_EXECUTE_"
STDOUT = subprocess.PIPE
STDERR = subprocess.STDOUT
DEVNULL = subprocess.DEVNULL
CalledProcessError = subprocess.CalledProcessError
env: tp.Dict[str, str]
def __init__(
self,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
**environ_variables: str,
):
self.output = ''
self.__stdout__ = stdout
self.__stderr__ = stderr
self.env = environ_variables
[docs]
def write_output(self, line) -> None:
"""
:param line: -- line from command output
:type line: str
:return: None
:rtype: None
"""
self.output += str(line)
[docs]
async def working_handler(self, proc: asyncio.subprocess.Process):
# pylint: disable=unused-argument
"""
Additional handler for executions.
:arg proc: running process
:type proc: asyncio.subprocess.Process
"""
async def line_handler(self, line) -> None:
write_output = self.write_output
if not asyncio.iscoroutinefunction(write_output):
write_output = sync_to_async(write_output, thread_sensitive=True)
if line is not None: # pragma: no branch
with raise_context():
await write_output(line)
async def _handle_process(self, proc: asyncio.subprocess.Process, stream='stdout'):
stream_object: asyncio.StreamReader = getattr(proc, stream)
working_handler = self.working_handler
if not asyncio.iscoroutinefunction(working_handler):
working_handler = sync_to_async(working_handler, thread_sensitive=True) # nocv
# Run handler periodically until stream object is closed
while not stream_object.at_eof():
await working_handler(proc)
await asyncio.sleep(0.01)
async def _unbuffered_read(self, proc: asyncio.subprocess.Process, stream='stdout'):
async for line in getattr(proc, stream):
await self.line_handler(line.decode('utf-8'))
async def _run_subprocess(self, cmd, cwd, env_vars=None):
# Run pre execution hook.
await self.pre_execute(
cmd=cmd,
cwd=cwd,
env=env_vars,
)
# Cleanup output variable
self.output = ""
# Setup environment variables
env = os.environ.copy()
env.update(self.env)
if env_vars:
env.update(env_vars)
# Start execution process
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=self.__stdout__,
stderr=self.__stderr__,
env=env,
cwd=str(cwd),
bufsize=0,
close_fds=ON_POSIX,
)
# Setup process output handlers
# and wait until it ends
tasks = [self._unbuffered_read(proc)]
if self.working_handler is not None:
tasks.append(self._handle_process(proc))
await asyncio.gather(*tasks)
# Parse return code and handle post execution hook
return_code = proc.returncode
try:
if return_code:
logger.error(self.output)
raise self.CalledProcessError(
return_code, cmd, output=str(self.output)
)
finally:
await self.post_execute(
cmd=cmd,
cwd=cwd,
env=env_vars,
return_code=return_code,
)
[docs]
async def pre_execute(self, cmd, cwd, env):
"""
Runs before execution starts.
:param cmd: -- list of cmd command and arguments
:param cwd: -- workdir for executions
:param env: -- extra environment variables which overrides defaults
"""
[docs]
async def post_execute(self, cmd, cwd, env, return_code):
"""
Runs after execution end.
:param cmd: -- list of cmd command and arguments
:param cwd: -- workdir for executions
:param env: -- extra environment variables which overrides defaults
:param return_code: -- return code of executed process
"""
[docs]
async def aexecute(self, cmd, cwd, env=None):
"""
Executes commands and outputs its result. Asynchronous implementation.
:param cmd: -- list of cmd command and arguments
:param cwd: -- workdir for executions
:param env: -- extra environment variables which overrides defaults
:return: -- string with full output
"""
await self._run_subprocess(cmd, cwd, env)
return self.output
[docs]
def execute(self, cmd, cwd, env=None):
"""
Executes commands and outputs its result.
:param cmd: -- list of cmd command and arguments
:param cwd: -- workdir for executions
:param env: -- extra environment variables which overrides defaults
:return: -- string with full output
"""
return async_to_sync(self.aexecute)(cmd, cwd, env)
[docs]
class UnhandledExecutor(Executor):
"""
Class based on :class:`.Executor` but disables `working_handler`.
"""
__slots__ = ()
working_handler = None
[docs]
class KVExchanger(BaseVstObject):
"""
Class for transmit data using key-value fast (cache-like) storage between
services. Uses same cache-backend as Lock.
"""
__slots__ = ('key', 'timeout', '__djangocache__')
TIMEOUT = 60
__djangocache__: tp.Any
@classproperty
def PREFIX(cls):
# pylint: disable=no-self-argument
return f"{cls.get_django_settings('VST_PROJECT_LIB')}_exchange_"
@classproperty
def cache(cls):
# pylint: disable=no-self-argument,no-member,access-member-before-definition
if hasattr(cls, '__djangocache__') and not is_member_descriptor(cls.__djangocache__):
return cls.__djangocache__
cls.__djangocache__ = cls.get_django_cache('locks')
return cls.cache
def __init__(self, key, timeout=None):
self.key = self.PREFIX + str(key)
self.timeout = timeout or self.TIMEOUT
def send(self, value, ttl=None):
# pylint: disable=no-member
return self.cache.add(self.key, value, ttl or self.timeout)
def prolong(self, ttl=None):
# pylint: disable=no-member
self.cache.touch(self.key, ttl or self.timeout)
def get(self):
# pylint: disable=no-member
value = self.cache.get(self.key)
self.cache.delete(self.key)
return value
def delete(self):
# pylint: disable=no-member
self.cache.delete(self.key)
[docs]
class Lock(KVExchanger):
"""
Lock class for multi-jobs workflow. Based on :class:`.KVExchanger`.
The Lock allows only one thread to enter the part that's locked and shared
between apps using one locks cache (see also `[locks] <config.html#locks-settings>`_).
:param id: -- unique id for lock.
:type id: int,str
:param payload: -- lock additional info. Should be any boolean True value.
:param repeat: -- time to wait lock.release. Default 1 sec.
:type repeat: int
:param err_msg: -- message for AcquireLockException error.
:type err_msg: str
.. note::
- Used django.core.cache lib and settings in `settings.py`
- Have Lock.SCHEDULER and Lock.GLOBAL id
Example:
.. sourcecode:: python
from vstutils.utils import Lock
with Lock("some_lock_identifier", repeat=30, err_msg="Locked by another proccess") as lock:
# where
# ``"some_lock_identifier"`` is unique id for lock and
# ``30`` seconds lock is going wait until another process will release lock id.
# After 30 sec waiting lock will raised with :class:`.Lock.AcquireLockException`
# and ``err_msg`` value as text.
some_code_execution()
# ``lock`` object will has been automatically released after
# exiting from context.
Another example without context manager:
.. sourcecode:: python
from vstutils.utils import Lock
# locked block after locked object created
lock = Lock("some_lock_identifier", repeat=30, err_msg="Locked by another proccess")
# deleting of object calls ``lock.release()`` which release and remove lock from id.
del lock
"""
__slots__ = ('id', 'payload_data')
TIMEOUT = 60 * 60 * 24
GLOBAL = "global-deploy"
SCHEDULER = "celery-beat"
[docs]
class AcquireLockException(Exception):
""" Exception which will be raised on unreleased lock. """
@classproperty
def PREFIX(cls):
# pylint: disable=no-self-argument
return f"{cls.get_django_settings('VST_PROJECT_LIB')}_lock_"
def __init__(self, id, payload=1, repeat=1, err_msg="", timeout=None):
# pylint: disable=too-many-arguments
super().__init__(id, timeout)
self.payload_data = f'{uuid.uuid4()}_{payload}'
self.id, start = None, time.time()
while time.time() - start <= repeat:
if self.send(self.payload_data) and not time.sleep(random.random()) and self.get() == self.payload_data:
logger.debug(f'Acquire lock with id `{id}` and payload `{self.payload_data}`')
self.id = id
return
time.sleep(random.random() / 10)
raise self.AcquireLockException(err_msg)
def get(self): # nocv
# pylint: disable=no-member
return self.cache.get(self.key)
def __enter__(self):
return self
def __exit__(self, type_e, value, tb):
self.release()
def release(self, force_release=False):
# pylint: disable=no-member
if force_release or self.id is not None:
self.cache.delete(self.key)
def __del__(self):
self.release()
class __LockAbstractDecorator:
__slots__ = ('kwargs',)
_err = "Wait until the end."
_lock_key = None
def __init__(self, **kwargs):
self.kwargs = kwargs
self.kwargs["err_msg"] = self.kwargs.get("err_msg", self._err)
def execute(self, func, *args, **kwargs):
if self._lock_key is not None:
with Lock(self._lock_key, **self.kwargs):
return func(*args, **kwargs)
return func(*args, **kwargs)
def __call__(self, original_function):
@wraps(original_function)
def wrapper(*args, **kwargs):
return self.execute(original_function, *args, **kwargs)
return wrapper
[docs]
class model_lock_decorator(__LockAbstractDecorator):
"""
Decorator for functions where 'pk' kwarg exist
for lock by id.
.. warning::
- On locked error raised ``Lock.AcquireLockException``
- Method must have and called with ``pk`` named arg.
"""
__slots__ = ('_lock_key',)
_err = "Object locked. Wait until unlock."
def execute(self, func, *args, **kwargs):
self._lock_key = kwargs.get('pk', None)
return super().execute(func, *args, **kwargs)
[docs]
class Paginator(BasePaginator):
"""
Class for fragmenting the query for small queries.
"""
def __init__(self, qs, chunk_size=None):
"""
:param qs: -- queryset for fragmenting
:type qs: django.db.models.QuerySet
:param chunk_size: -- size of the fragments.
:type chunk_size: int
"""
chunk_size = chunk_size or BaseVstObject().get_django_settings("PAGE_LIMIT", None)
super().__init__(qs, chunk_size)
def __iter__(self):
for page in range(1, self.num_pages + 1):
yield self.page(page)
def items(self):
for page in self:
for obj in page.object_list:
obj.paginator = self
obj.page = page
yield obj
[docs]
class ObjectHandlers(BaseVstObject):
"""
Handlers wrapper for get objects from some settings structure.
Example:
.. sourcecode:: python
from vstutils.utils import ObjectHandlers
'''
In `settings.py` you should write some structure:
SOME_HANDLERS = {
"one": {
"BACKEND": "full.python.path.to.module.SomeClass"
},
"two": {
"BACKEND": "full.python.path.to.module.SomeAnotherClass",
"OPTIONS": {
"some_named_arg": "value"
}
}
}
'''
handlers = ObjectHandlers('SOME_HANDLERS')
# Get class handler for 'one'
one_backend_class = handlers['one']
# Get object of backend 'two'
two_obj = handlers.get_object()
# Get object of backend 'two' with overriding constructor named arg
two_obj_overrided = handlers.get_object(some_named_arg='another_value')
:param type_name: type name for backends.Like name in dict.
:type type_name: str
"""
__slots__ = ('type', 'err_message', '__list__', '__loaded_backends__')
type: str
err_message: tp.Optional[str]
def __init__(self, type_name, err_message=None):
self.type = type_name
self.err_message = err_message
self.__list__ = None
self.__loaded_backends__ = {}
@property
def objects(self):
return {name: self[name] for name in self.list()}
def __len__(self): # pragma: no cover
return len(self.objects)
def __iter__(self):
return iter(self.items())
def __getitem__(self, name):
return self.backend(name)
def __call__(self, name, obj):
return self.get_object(name, obj)
def __dict__(self): # pragma: no cover
return self.items()
def keys(self):
return self.objects.keys()
def values(self): # pragma: no cover
return dict(self).values()
def items(self):
return self.objects.items()
def list(self):
if self.__list__ is None:
self.__list__ = self.get_django_settings(self.type, {})
return self.__list__
def _get_backend(self, backend):
if backend in self.__loaded_backends__:
return self.__loaded_backends__[backend]
self.__loaded_backends__[backend] = import_string(backend)
return self.__loaded_backends__[backend]
def get_backend_data(self, name):
return self.list()[name]
def get_backend_handler_path(self, name):
return self.get_backend_data(name).get('BACKEND', None)
[docs]
def backend(self, name):
"""
Get backend class
:param name: -- name of backend type
:type name: str
:return: class of backend
:rtype: type,types.ModuleType,object
"""
try:
backend = self.get_backend_handler_path(name)
if backend is None:
raise ex.VSTUtilsException("Backend is 'None'.") # pragma: no cover
return self._get_backend(backend)
except (KeyError, ImportError) as err:
msg = f"{name} ({self.err_message})" if self.err_message else name
raise ex.UnknownTypeException(msg) from err
def opts(self, name):
return self.get_backend_data(name).get('OPTIONS', {})
def get_object(self, name, *args, **kwargs):
opts = self.opts(name)
opts.update(kwargs)
return self[name](*args, **opts)
[docs]
class ModelHandlers(ObjectHandlers):
"""
Handlers for some models like 'INTEGRATIONS' or 'REPO_BACKENDS'.
Based on :class:`.ObjectHandlers` but more specific for working with models.
All handlers backends get by first argument model object.
**Attributes**:
:param objects: -- dict of objects like: ``{<name>: <backend_class>}``
:type objects: dict
:param keys: -- names of supported backends
:type keys: list
:param values: -- supported backends classes
:type values: list
:param type_name: type name for backends.Like name in dict.
"""
__slots__ = ()
[docs]
def get_object(self, name, obj):
"""
:param name: -- string name of backend
:param name: str
:param obj: -- model object
:type obj: django.db.models.Model
:return: backend object
:rtype: object
"""
return self[name](obj, **self.opts(name))
[docs]
class URLHandlers(ObjectHandlers):
"""
Object handler for GUI views. Uses `GUI_VIEWS` from settings.py.
Based on :class:`.ObjectHandlers` but more specific to urlpatterns.
Example:
.. sourcecode:: python
from vstutils.utils import URLHandlers
# By default gets from `GUI_VIEWS` in `settings.py`
urlpatterns = list(URLHandlers())
:param type_name: type name for backends.Like name in dict.
"""
__slots__ = ('additional_handlers', '__handlers__', 'default_namespace')
additional_handlers: tp.List[tp.Text]
def __init__(self, type_name='URLS', *args, **kwargs):
self.additional_handlers = kwargs.pop('additional_handlers', ['VIEWS']) + [type_name]
self.default_namespace = kwargs.pop('namespace', None)
self.__handlers__ = None
super().__init__(type_name, *args, **kwargs)
@property
def view_handlers(self):
if not self.__handlers__:
self.__handlers__ = tuple(map(self.__class__, self.additional_handlers))
return self.__handlers__
def get_backend_data(self, name):
data = super().get_backend_data(name)
if isinstance(data, str):
for handler in self.view_handlers:
try:
return handler.get_backend_data(data)
except: # nosec
continue
raise ex.VSTUtilsException(f'Invalid handler name "{data}"') # nocv
return data
[docs]
def get_object(self, name, *argv, **kwargs):
"""
Get url object tuple for urls.py
:param name: url regexp from
:type name: str
:param argv: overridden args
:param kwargs: overridden kwargs
:return: url object
:rtype: django.urls.re_path
"""
regexp = name
options = self.opts(regexp)
options.update(kwargs)
args = options.pop('view_args', argv)
view_kwargs = options.pop('view_kwargs', {})
csrf_enable = self.get_backend_data(regexp).get('CSRF_ENABLE', True)
view_class = self[name]
namespace = view_kwargs.pop('namespace', self.default_namespace)
result: tp.Union[tuple, types.ModuleType]
if any(s in regexp for s in (r'^', r'$', r'(', r'?')):
path_handler = re_path
else:
path_handler = path
if isinstance(view_class, View) or hasattr(view_class, 'as_view'):
view = view_class.as_view(**view_kwargs)
if not csrf_enable:
view = csrf_exempt(view)
return path_handler(regexp, view, *args, **options)
elif (isinstance(view_class, types.ModuleType) and
hasattr(view_class, 'urlpatterns') and
hasattr(view_class, 'app_name')):
result = view_class
namespace = None
else:
result = (view_class, 'gui')
return path_handler(regexp, include(result, namespace=namespace), *args, **view_kwargs)
def urls(self):
for regexp in self.list():
yield self.get_object(regexp)
def __iter__(self):
return self.urls()
class VstEnumMeta(EnumMeta):
LOWER = object()
UPPER = object()
SAME = object()
def __new__(metacls, cls, bases, classdict):
# pylint: disable=bad-mcs-classmethod-argument
mutated_types = {}
for key, value in classdict.items():
if value is metacls.LOWER:
dict.__setitem__(classdict, key, key.lower())
mutated_types[key] = str.lower
elif value is metacls.UPPER:
dict.__setitem__(classdict, key, key.upper())
mutated_types[key] = str.upper
elif value is metacls.SAME:
dict.__setitem__(classdict, key, key)
mutated_types[key] = str
classdict['__mutated_types__'] = mutated_types
result = super().__new__(metacls, cls, bases, classdict)
if result.__members__:
result.max_len = max(len(i) for i in result.__members__)
return result
class VstEnum(Enum, metaclass=VstEnumMeta):
pass
[docs]
class BaseEnum(str, VstEnum):
"""
BaseEnum extends :class:`enum.Enum` class and used to create enum-like objects that can be used in
django serializers or django models.
Example:
.. sourcecode:: python
from vstutils.models import BModel
class ItemClasses(BaseEnum):
FIRST = BaseEnum.SAME
SECOND = BaseEnum.SAME
THIRD = BaseEnum.SAME
class MyDjangoModel(BModel):
item_class = models.CharField(max_length=ItemClasses.max_len, choices=ItemClasses.to_choices())
@property
def is_second(self):
# Function check is item has second class of instance
return ItemClasses.SECOND.is_equal(self.item_class)
.. note::
For special cases, when value must be in lower or upper case, you can setup value as ``BaseEnum.LOWER`` or
``BaseEnum.UPPER``. But in default cases we recommend use ``BaseEnum.SAME`` for memory optimization.
"""
def __new__(cls, name):
return str.__new__(cls, name)
def __repr__(self):
return repr(self.__str__())
def __str__(self):
return self.__mutated_types__.get(self.name, str)(self.name)
def __hash__(self):
return hash(str(self))
@classmethod
def get_names(cls):
return tuple(x.name for x in cls)
@classmethod
def to_choices(cls):
return list_to_choices(str(x) for x in cls)
def is_equal(self, cmp_str):
return str(cmp_str) == str(self)
def not_equal(self, cmp_str):
return not self.is_equal(cmp_str)
@lru_cache(maxsize=1)
def get_session_store() -> 'SessionBase':
engine = import_module(settings.SESSION_ENGINE)
return engine.SessionStore
[docs]
def add_in_vary(headers: dict, value: str):
"""
Adds provided value to Vary header if not added already
"""
vary = cc_delim_re.split(
headers.get('Vary', '').lower()
)
if value.lower() not in vary:
vary.append(value)
headers['Vary'] = ', '.join(vary)
[docs]
def check_request_etag(request, etag_value, header_name="If-None-Match", operation_handler=str.__eq__):
"""
The function plays a crucial role within the context of the ETag mechanism,
providing a flexible way to validate client-side ETags against the server-side version for both cache validation
and ensuring data consistency in web applications.
It supports conditional handling of HTTP requests based on the match or mismatch of ETag values,
accommodating various scenarios such as cache freshness checks and prevention of concurrent modifications.
:param request: The HTTP request object containing the client's headers,
from which the ETag for comparison is retrieved.
:type request: :class:`rest_framework.request.Request`
:param etag_value: The server-generated ETag value that represents the current state of the resource.
This unique identifier is recalculated whenever the resource's content changes.
:type etag_value: :class:`str`
:param header_name: Specifies the HTTP header to look for the client's ETag.
Defaults to "If-None-Match", commonly used in GET requests for cache validation.
For operations requiring confirmation that the client is acting on the latest version
of a resource (e.g., PUT or DELETE), "If-Match" should be used instead.
:type header_name: :class:`str`
:param operation_handler: A function to compare the ETags. By default, this is set to ``str.__eq__``,
which checks for an exact match between the client's and server's ETags,
suitable for validating caches with ``If-None-Match``.
To handle ``If-Match`` scenarios, where the operation should proceed only if the ETags
do not match, indicating the resource has been modified, ``str.__ne__`` (not equal)
can be used as the operation handler. This flexibility allows for precise control over how
and when clients are allowed to read from or write to resources based on their version.
:return: Returns a tuple containing the server's ETag and a boolean flag.
The flag is ``True`` if the operation handler condition between the server's and client's ETag is met,
indicating the request should proceed based on the matching logic defined by the operation handler;
otherwise, it returns ``False``.
"""
header = request.headers.get(header_name)
if not header:
return etag_value, False
header = str(header)
if header[:2] in {'W/', "w/"}:
header = header[2:]
if header[0] != '"':
header = f'"{header}"'
if etag_value[0] != '"':
etag_value = f'"{etag_value}"'
if operation_handler(etag_value, header):
return etag_value, True
return etag_value, False
try:
from ._utils import encode, decode # noqa: F811
except ImportError: # nocv
pass