Coverage for an_website / utils / decorators.py: 80.000%
110 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 18:33 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 18:33 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
14"""A module with useful decorators."""
16import contextlib
17import logging
18from base64 import b64decode
19from collections.abc import Callable, Mapping
20from functools import wraps
21from typing import Any, Final, ParamSpec, TypeVar, cast, overload
23from tornado.web import RequestHandler
25from .token import InvalidTokenError, parse_token
26from .utils import Permission, anonymize_ip
28Default = TypeVar("Default")
29Args = ParamSpec("Args")
30Ret = TypeVar("Ret")
33def keydecode(
34 token: str,
35 api_secrets: Mapping[str | None, Permission],
36 token_secret: str | bytes | None,
37) -> None | Permission:
38 """Decode a key."""
39 tokens: list[str] = [token]
40 decoded: str | None
41 try:
42 decoded = b64decode(token).decode("UTF-8")
43 except ValueError:
44 decoded = None
45 else:
46 tokens.append(decoded)
47 if token_secret:
48 for _ in tokens:
49 with contextlib.suppress(InvalidTokenError):
50 return parse_token(_, secret=token_secret).permissions
51 if decoded is None:
52 return None
53 return api_secrets.get(decoded)
56def is_authorized(
57 inst: RequestHandler,
58 permission: Permission,
59 allow_cookie_auth: bool = True,
60) -> None | bool:
61 """Check whether the request is authorized."""
62 keys: dict[str | None, Permission] = inst.settings.get(
63 "TRUSTED_API_SECRETS", {}
64 )
65 token_secret: str | bytes | None = inst.settings.get("AUTH_TOKEN_SECRET")
67 permissions: tuple[None | Permission, ...] = ( # TODO: CLEAN-UP THIS MESS!!
68 *(
69 (
70 keydecode(_[7:], keys, token_secret)
71 if _.lower().startswith("bearer ")
72 else keys.get(_)
73 )
74 for _ in inst.request.headers.get_list("Authorization")
75 ),
76 *(
77 keydecode(_, keys, token_secret)
78 for _ in inst.get_arguments("access_token")
79 ),
80 *(keys.get(_) for _ in inst.get_arguments("key")),
81 (
82 keydecode(
83 inst.get_cookie("access_token", ""),
84 keys,
85 token_secret,
86 )
87 if allow_cookie_auth
88 else None
89 ),
90 keys.get(inst.get_cookie("key", None)) if allow_cookie_auth else None,
91 )
93 if all(perm is None for perm in permissions):
94 return None
96 result = Permission(0)
97 for perm in permissions:
98 if perm:
99 result |= perm
101 return permission in result
104_DEFAULT_VALUE: Final = object()
107@overload
108def requires(
109 *perms: Permission,
110 return_instead_of_finishing: Default,
111 allow_cookie_auth: bool = True,
112) -> Callable[[Callable[Args, Ret]], Callable[Args, Ret | Default]]: ...
115@overload
116def requires(
117 *perms: Permission,
118 allow_cookie_auth: bool = True,
119) -> Callable[[Callable[Args, Ret]], Callable[Args, Ret]]: ...
122def requires(
123 *perms: Permission,
124 return_instead_of_finishing: Any = _DEFAULT_VALUE,
125 allow_cookie_auth: bool = True,
126) -> Callable[[Callable[Args, Ret]], Callable[Args, Any]]:
127 """Handle required permissions."""
128 permissions = Permission(0)
129 for perm in perms:
130 permissions |= perm
132 finish_with_error = return_instead_of_finishing is _DEFAULT_VALUE
134 def internal(method: Callable[Args, Ret]) -> Callable[Args, Any]:
135 method.required_perms = permissions # type: ignore[attr-defined]
136 logger = logging.getLogger(f"{method.__module__}.{method.__qualname__}")
138 wraps(method)
140 @wraps(method)
141 def wrapper(*args: Args.args, **kwargs: Args.kwargs) -> Any:
142 instance = args[0]
143 if not isinstance(instance, RequestHandler):
144 raise TypeError(f"Instance has invalid type {type(instance)}")
145 authorized = is_authorized(instance, permissions, allow_cookie_auth)
146 if not authorized:
147 if not finish_with_error:
148 return cast(Ret, return_instead_of_finishing)
149 logger.warning(
150 "Unauthorized access to %s from %s",
151 instance.request.path,
152 anonymize_ip(instance.request.remote_ip),
153 )
154 instance.clear()
155 instance.set_header("WWW-Authenticate", "Bearer")
156 status = 401 if authorized is None else 403
157 instance.set_status(status)
158 instance.write_error(status, **kwargs)
159 return None
161 return method(*args, **kwargs)
163 return wrapper
165 return internal
168@overload
169def requires_settings(
170 *settings: str,
171 return_: Default,
172) -> Callable[[Callable[Args, Ret]], Callable[Args, Ret | Default]]: ...
175@overload
176def requires_settings(
177 *settings: str,
178 status_code: int,
179) -> Callable[[Callable[Args, Ret]], Callable[Args, Ret | None]]: ...
182def requires_settings(
183 *settings: str,
184 return_: Any = _DEFAULT_VALUE,
185 status_code: int | None = None,
186) -> Callable[[Callable[Args, Ret]], Callable[Args, Any]]:
187 """Require some settings to execute a method."""
188 finish_with_error = return_ is _DEFAULT_VALUE
189 if not finish_with_error and isinstance(status_code, int):
190 raise ValueError("return_ and finish_status specified")
191 if finish_with_error and status_code is None:
192 status_code = 503
194 def internal(method: Callable[Args, Ret]) -> Callable[Args, Any]:
195 logger = logging.getLogger(f"{method.__module__}.{method.__qualname__}")
197 @wraps(method)
198 def wrapper(*args: Args.args, **kwargs: Args.kwargs) -> Any:
199 instance = args[0]
200 if not isinstance(instance, RequestHandler):
201 raise TypeError(f"Instance has invalid type {type(instance)}")
202 missing = [
203 setting
204 for setting in settings
205 if instance.settings.get(setting) is None
206 ]
207 if missing:
208 if not finish_with_error:
209 return cast(Ret, return_)
210 logger.warning(
211 "Missing settings %s for request to %s",
212 ", ".join(missing),
213 instance.request.path,
214 )
215 instance.send_error(cast(int, status_code))
216 return None
217 for setting in settings:
218 kwargs[setting.lower()] = instance.settings[setting]
219 return method(*args, **kwargs)
221 return wrapper
223 return internal
226def get_setting_or_default(
227 setting: str,
228 default: Any,
229) -> Callable[[Callable[Args, Ret]], Callable[Args, Ret]]:
230 """Require some settings to execute a method."""
232 def internal(method: Callable[Args, Ret]) -> Callable[Args, Ret]:
233 @wraps(method)
234 def wrapper(*args: Args.args, **kwargs: Args.kwargs) -> Ret:
235 instance = args[0]
236 if not isinstance(instance, RequestHandler):
237 raise TypeError(f"Instance has invalid type {type(instance)}")
238 kwargs[setting.lower()] = instance.settings.get(setting, default)
239 return method(*args, **kwargs)
241 return wrapper
243 return internal