"""ETag feature"""
from functools import wraps
import hashlib
from marshmallow import Schema
from flask import request, current_app, json
from .exceptions import (
CheckEtagNotCalledError,
PreconditionRequired, PreconditionFailed, NotModified)
from .utils import get_appcontext
from .compat import MARSHMALLOW_VERSION_MAJOR
def _is_etag_enabled():
"""Return True if ETag feature enabled application-wise"""
return not current_app.config.get('ETAG_DISABLED', False)
def _get_etag_ctx():
"""Get ETag section of AppContext"""
return get_appcontext().setdefault('etag', {})
class EtagMixin:
"""Extend Blueprint to add ETag handling"""
METHODS_NEEDING_CHECK_ETAG = ['PUT', 'PATCH', 'DELETE']
METHODS_ALLOWING_SET_ETAG = ['GET', 'HEAD', 'POST', 'PUT', 'PATCH']
# Headers to include in ETag computation
ETAG_INCLUDE_HEADERS = ['X-Pagination']
def etag(self, etag_schema=None):
"""Decorator generating an endpoint response
:param etag_schema: :class:`Schema <marshmallow.Schema>` class
or instance. If not None, will be used to serialize etag data.
Can be used as either a decorator or a decorator factory:
Example: ::
@blp.etag
def view_func(...):
...
@blp.etag(EtagSchema)
def view_func(...):
...
The ``etag`` decorator expects the decorated view function to return a
``Response`` object. It is the case if it is decorated with the
``response`` decorator.
See :doc:`ETag <etag>`.
"""
if etag_schema is None or isinstance(etag_schema, (type, Schema)):
# Factory: @etag(), @etag(EtagSchema) or @etag(EtagSchema())
view_func = None
if isinstance(etag_schema, type):
etag_schema = etag_schema()
else:
# Decorator: @etag
view_func, etag_schema = etag_schema, None
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
etag_enabled = _is_etag_enabled()
if etag_enabled:
# Check etag precondition
self._check_precondition()
# Store etag_schema in AppContext
_get_etag_ctx()['etag_schema'] = etag_schema
# Execute decorated function
resp = func(*args, **kwargs)
if etag_enabled:
# Verify check_etag was called in resource code if needed
self._verify_check_etag()
# Add etag value to response
self._set_etag_in_response(resp, etag_schema)
return resp
return wrapper
if view_func:
return decorator(view_func)
return decorator
@staticmethod
def _generate_etag(etag_data, etag_schema=None, extra_data=None):
"""Generate an ETag from data
etag_data: Data to use to compute ETag
etag_schema: Schema to dump data with before hashing
extra_data: Extra data to add before hashing
Typically, extra_data is used to add pagination metadata to the hash.
It is not dumped through the Schema.
"""
if etag_schema is None:
raw_data = etag_data
else:
if isinstance(etag_schema, type):
etag_schema = etag_schema()
raw_data = etag_schema.dump(etag_data)
if MARSHMALLOW_VERSION_MAJOR < 3:
raw_data = raw_data.data
if extra_data:
raw_data = (raw_data, extra_data)
# flask's json.dumps is needed here
# as vanilla json.dumps chokes on lazy_strings
data = json.dumps(raw_data, sort_keys=True)
return hashlib.sha1(bytes(data, 'utf-8')).hexdigest()
def _check_precondition(self):
"""Check If-Match header is there
Raise 428 if If-Match header missing
Called automatically for PUT, PATCH and DELETE methods
"""
# TODO: other methods?
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Match
if (
request.method in self.METHODS_NEEDING_CHECK_ETAG and
not request.if_match
):
raise PreconditionRequired
def check_etag(self, etag_data, etag_schema=None):
"""Compare If-Match header with computed ETag
Raise 412 if If-Match-Header does not match.
Must be called from resource code to check ETag.
Unfortunately, there is no way to call it automatically. It is the
developer's responsability to do it. However, a warning is logged at
runtime if this function was not called.
"""
if _is_etag_enabled():
etag_schema = etag_schema or _get_etag_ctx().get('etag_schema')
new_etag = self._generate_etag(etag_data, etag_schema)
_get_etag_ctx()['etag_checked'] = True
if new_etag not in request.if_match:
raise PreconditionFailed
def _verify_check_etag(self):
"""Verify check_etag was called in resource code
Log a warning if ETag is enabled but check_etag was not called in
resource code in a PUT, PATCH or DELETE method.
Raise CheckEtagNotCalledError when in debug or testing mode.
This is called automatically. It is meant to warn the developer about
an issue in his ETag management.
"""
if request.method in self.METHODS_NEEDING_CHECK_ETAG:
if not _get_etag_ctx().get('etag_checked'):
message = (
'ETag not checked in endpoint {} on {} request.'
.format(request.endpoint, request.method))
app = current_app
app.logger.warning(message)
if app.debug or app.testing:
raise CheckEtagNotCalledError(message)
def set_etag(self, etag_data, etag_schema=None):
"""Set ETag for this response
Raise 304 if ETag identical to If-None-Match header
Must be called from resource code, unless the view function is
decorated with the ``response`` decorator, in which case the ETag is
computed by default from response data if ``set_etag`` is not called.
Logs a warning if called in a method other than one of
GET, HEAD, POST, PUT, PATCH.
"""
if request.method not in self.METHODS_ALLOWING_SET_ETAG:
current_app.logger.warning(
'ETag cannot be set on {} request.'.format(request.method))
if _is_etag_enabled():
etag_schema = etag_schema or _get_etag_ctx().get('etag_schema')
new_etag = self._generate_etag(etag_data, etag_schema)
if new_etag in request.if_none_match:
raise NotModified
# Store ETag in AppContext to add it to response headers later on
_get_etag_ctx()['etag'] = new_etag
def _set_etag_in_response(self, response, etag_schema):
"""Set ETag in response object
Called automatically.
If no ETag data was computed using set_etag, it is computed here from
response data.
"""
if request.method in self.METHODS_ALLOWING_SET_ETAG:
new_etag = _get_etag_ctx().get('etag')
# If no ETag data was manually provided, use response content
if new_etag is None:
# If etag_schema is provided, use raw result rather than
# the dump, as the dump needs to be done using etag_schema
etag_data = get_appcontext()[
'result_dump' if etag_schema is None else 'result_raw'
]
extra_data = tuple((k, v) for k, v in response.headers
if k in self.ETAG_INCLUDE_HEADERS)
new_etag = self._generate_etag(
etag_data, etag_schema, extra_data)
if new_etag in request.if_none_match:
raise NotModified
response.set_etag(new_etag)