Source code for vstutils.middleware

import time
import logging
from typing import Generator
from contextlib import contextmanager

from django.db import connections
from django.apps import apps
from django.conf import settings
from django.utils import translation

from .api.models import Language
from .utils import BaseVstObject


logger = logging.getLogger(settings.VST_PROJECT)


@contextmanager
def wrap_connections(wrapper):
    for connection in connections.all():
        connection.execute_wrappers.append(wrapper)
    try:
        yield wrapper
    finally:
        for connection in connections.all():
            connection.execute_wrappers.remove(wrapper)


class QueryTimingLogger:
    __slots__ = ('queries_time',)

    def __init__(self):
        self.queries_time = 0

    def __call__(self, execute, sql, params, many, context):
        start = time.monotonic()
        try:
            return execute(sql, params, many, context)
        finally:
            self.queries_time += time.monotonic() - start


[docs] class BaseMiddleware(BaseVstObject): """ Middleware base class for handling: * Incoming requests by :meth:`.BaseMiddleware.request_handler()`; * Outgoing response before any calling on server by :meth:`.BaseMiddleware.get_response_handler()`; * Outgoing responses by :meth:`.BaseMiddleware.handler()`. Middleware must be added to `MIDDLEWARE` list in settings. Example: .. sourcecode:: python from vstutils.middleware import BaseMiddleware from django.http import HttpResponse class CustomMiddleware(BaseMiddleware): def request_handler(self, request): # Add header to request request.headers['User-Agent'] = 'Mozilla/5.0' return request def get_response_handler(self, request): if not request.user.is_stuff: # Return 403 HTTP status for non-stuff users. # This request never gets in any view. return HttpResponse( "Access denied!", content_type="text/plain", status_code=403 ) return super().get_response_handler(request) def handler(self, request, response): # Add header to response response['Custom-Header'] = 'Some value' return response """ __slots__ = 'get_response', 'logger' sync_capable = True async_capable = False def __init__(self, get_response): self.get_response = get_response self.logger = logger super().__init__() def get_setting(self, value): return self.get_django_settings(value) # nocv
[docs] def handler(self, request, response): # pylint: disable=unused-argument """ The response handler. Method to process responses. :param request: HTTP-request object. :type request: django.http.HttpRequest :param response: HTTP-response object which will be sended to client. :type response: django.http.HttpResponse :return: Handled response object. :rtype: django.http.HttpResponse """ return response
[docs] def request_handler(self, request): # pylint: disable=unused-argument """ The request handler. Called before request is handled by a view. :param request: HTTP-request object which is wrapped from client request. :type request: django.http.HttpRequest :return: Handled request object. :rtype: django.http.HttpRequest """ return request
[docs] def get_response_handler(self, request): """ Entrypoint for breaking or continuing request handling. This function must return `django.http.HttpResponse` object or result of parent class calling. Since the release of 5.3, it has been possible to write this method as asynchronous. This should be used in cases where the middleware makes queries to the database or cache. However, such a middleware should be excluded from bulk requests. .. warning:: Never do asynchronous middleware in dependent chains. They are designed to send independent requests to external sources. Set ``async_capable`` to ``True`` and ``sync_capable`` to ``False`` for such middleware. :param request: HTTP-request object which is wrapped from client request. :type request: django.http.HttpRequest :rtype: django.http.HttpResponse """ return self.get_response(request)
def __call__(self, request): return self.handler( self.request_handler(request), self.get_response_handler(request) )
[docs] class AsyncBaseMiddleware(BaseVstObject): """ Middleware base class for handling asynchronously: * Incoming requests by :meth:`.AsyncBaseMiddleware.request_handler()`; * Outgoing response before any calling on server by :meth:`.AsyncBaseMiddleware.get_response_handler()`; * Outgoing responses by :meth:`.AsyncBaseMiddleware.handler()`. Middleware must be added to `MIDDLEWARE` list in settings. Example: .. sourcecode:: python from vstutils.middleware import AsyncBaseMiddleware from django.http import HttpResponse import aiohttp class CustomMiddleware(AsyncBaseMiddleware): async def request_handler(self, request): # Perform an async HTTP request to get user-agent async with aiohttp.ClientSession() as session: async with session.get('http://httpbin.org/user-agent') as resp: data = await resp.json() request.headers['User-Agent'] = data['user-agent'] return request async def get_response_handler(self, request): if not request.user.is_staff: # Return 403 HTTP status for non-staff users. # This request never gets to any view. return HttpResponse( "Access denied!", content_type="text/plain", status=403 ) return await super().get_response_handler(request) async def handler(self, request, response): # Perform an async HTTP request to get a custom header value async with aiohttp.ClientSession() as session: async with session.get('http://httpbin.org/headers') as resp: data = await resp.json() response['Custom-Header'] = data['headers'].get('Custom-Header', 'Some value') return response """ __slots__ = 'get_response', 'logger' sync_capable = False async_capable = True def __init__(self, get_response): self.get_response = get_response self.logger = logger super().__init__() def get_setting(self, value): return self.get_django_settings(value) async def handler(self, request, response): return response # nocv async def request_handler(self, request): return request async def get_response_handler(self, request): return await self.get_response(request) async def __call__(self, request): request = await self.request_handler(request) response = await self.get_response_handler(request) return await self.handler(request, response)
class TimezoneHeadersMiddleware(AsyncBaseMiddleware): __slots__ = () async def handler(self, request, response): response['Server-Timezone'] = self.get_setting('TIME_ZONE') response['VSTutils-Version'] = self.get_setting('VSTUTILS_VERSION') return response class ExecuteTimeHeadersMiddleware(BaseMiddleware): __slots__ = () def __duration_handler(self, data): key, value = data if isinstance(value, (list, tuple, map, filter, Generator)): value = ''.join((self.__duration_handler(('', v)) for v in value)) elif isinstance(value, (int, float)): value = f';dur={float(value)}' elif isinstance(value, str) and value: if ' ' in value: value = f'"{value}"' value = f';desc={value}' elif not value: value = '' return f'{key}{value}' def _round_time(self, seconds): return round(seconds * 1000, 2) def get_response_handler(self, request): start_time = time.time() get_response_handler = super().get_response_handler ql = QueryTimingLogger() if not getattr(request, 'is_bulk', False): with wrap_connections(ql): response = get_response_handler(request) else: response = get_response_handler(request) response_durations = getattr(response, 'timings', None) total_time = self._round_time(time.time() - start_time) if getattr(request, 'is_bulk', False): response['Response-Time'] = str(total_time) else: if response_durations: response_durations = f', {", ".join(map(self.__duration_handler, response_durations.items()))}' else: response_durations = "" response_durations += f', db_execution_time;dur={self._round_time(ql.queries_time)}' response['Server-Timing'] = f'total_app;dur={total_time}{response_durations or ""}' return response class LangMiddleware(BaseMiddleware): __slots__ = () def get_lang_object(self, request): set_cookie = True if 'lang' in request.GET: code = request.GET['lang'] set_cookie = request.COOKIES.get(settings.LANGUAGE_COOKIE_NAME) != code elif settings.LANGUAGE_COOKIE_NAME in request.COOKIES: code = request.COOKIES[settings.LANGUAGE_COOKIE_NAME] set_cookie = False else: code = translation.get_language_from_request(request) obj = Language.objects.filter(code=code).first() if obj is not None: return obj, set_cookie return Language.objects.get(code=settings.LANGUAGE_CODE), set_cookie # nocv def get_response_handler(self, request): request.language, set_cookie = self.get_lang_object(request) # type: ignore translation.activate(request.language.code) # type: ignore request.LANGUAGE_CODE = translation.get_language() response = super().get_response_handler(request) if set_cookie: response.set_cookie('lang', request.language.code, domain=settings.SESSION_COOKIE_DOMAIN) # type: ignore if 'Content-Language' not in response: response['Content-Language'] = request.language.code # type: ignore return response class FrontendChangesNotifications(AsyncBaseMiddleware): __slots__ = ('notificator',) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.notificator = apps.get_app_config('vstutils_api').module.notificator_class([]) async def request_handler(self, request): if self.notificator.is_usable(): request.notificator = self.notificator return request async def handler(self, request, response): if self.notificator.is_usable(): await self.notificator.asend() return response