Coverage for an_website/patches/__init__.py: 91.824%
159 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-10 18:56 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-10 18:56 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
13# pylint: disable=protected-access
15"""Patches that improve everything."""
17import http.client
18import json as stdlib_json # pylint: disable=preferred-module
19import logging
20from collections.abc import Callable
21from configparser import RawConfigParser
22from contextlib import suppress
23from pathlib import Path
24from threading import Thread
25from types import MethodType
26from typing import Any
27from urllib.parse import urlsplit
29import certifi
30import defusedxml # type: ignore[import-untyped]
31import jsonpickle # type: ignore[import-untyped]
32import multiprocessing_importlib_resources
33import orjson
34import pycurl
35import tornado.httputil
36import yaml
37from emoji import EMOJI_DATA
38from pillow_jxl import JpegXLImagePlugin # noqa: F401
39from setproctitle import setthreadtitle
40from tornado.httpclient import AsyncHTTPClient, HTTPRequest
41from tornado.httputil import (
42 HTTPFile,
43 HTTPHeaders,
44 HTTPInputError,
45 HTTPServerRequest,
46 ParseBodyConfig,
47)
48from tornado.log import gen_log
49from tornado.web import GZipContentEncoding, RedirectHandler, RequestHandler
51from .. import CA_BUNDLE_PATH, MEDIA_TYPES
52from . import braille, json # noqa: F401 # pylint: disable=reimported
55def apply() -> None:
56 """Improve."""
57 multiprocessing_importlib_resources._patch()
58 patch_certifi()
59 patch_configparser()
60 patch_emoji()
61 patch_http()
62 patch_json()
63 patch_jsonpickle()
64 patch_threading()
65 patch_xml()
67 patch_tornado_418()
68 patch_tornado_arguments()
69 patch_tornado_gzip()
70 patch_tornado_httpclient()
71 patch_tornado_logs()
72 patch_tornado_redirect()
75def patch_certifi() -> None:
76 """Make everything use our CA bundle."""
77 certifi.where = lambda: CA_BUNDLE_PATH
78 certifi.contents = lambda: Path(certifi.where()).read_text("ASCII")
81def patch_configparser() -> None:
82 """Make configparser funky."""
83 RawConfigParser.BOOLEAN_STATES.update( # type: ignore[attr-defined]
84 {
85 "sure": True,
86 "nope": False,
87 "accept": True,
88 "reject": False,
89 "enabled": True,
90 "disabled": False,
91 }
92 )
95def patch_emoji() -> None:
96 """Add cool new emoji."""
97 EMOJI_DATA["🐱\u200D💻"] = {
98 "de": ":hacker_katze:",
99 "en": ":hacker_cat:",
100 "status": 2,
101 "E": 1,
102 }
103 for de_name, en_name, rect in (
104 ("rot", "red", "🟥"),
105 ("blau", "blue", "🟦"),
106 ("orang", "orange", "🟧"),
107 ("gelb", "yellow", "🟨"),
108 ("grün", "green", "🟩"),
109 ("lilan", "purple", "🟪"),
110 ("braun", "brown", "🟫"),
111 ):
112 EMOJI_DATA[f"🫙\u200D{rect}"] = {
113 "de": f":{de_name}es_glas:",
114 "en": f":{en_name}_jar:",
115 "status": 2,
116 "E": 14,
117 }
118 EMOJI_DATA[f"🏳\uFE0F\u200D{rect}"] = {
119 "de": f":{de_name}e_flagge:",
120 "en": f":{en_name}_flag:",
121 "status": 2,
122 "E": 11,
123 }
124 EMOJI_DATA[f"\u2691\uFE0F\u200D{rect}"] = {
125 "de": f":tief{de_name}e_flagge:",
126 "en": f":deep_{en_name}_flag:",
127 "status": 2,
128 "E": 11,
129 }
132def patch_http() -> None:
133 """Add response code 420."""
134 http.client.responses[420] = "Enhance Your Calm"
137def patch_json() -> None:
138 """Replace json with orjson."""
139 if getattr(stdlib_json, "_omegajson", False):
140 return
141 stdlib_json.dumps = json.dumps
142 stdlib_json.dump = json.dump # type: ignore[assignment]
143 stdlib_json.loads = json.loads # type: ignore[assignment]
144 stdlib_json.load = json.load
147def patch_jsonpickle() -> None:
148 """Make jsonpickle return bytes."""
149 jsonpickle.load_backend("orjson")
150 jsonpickle.set_preferred_backend("orjson")
151 jsonpickle.enable_fallthrough(False)
154def patch_threading() -> None:
155 """Set thread names."""
156 _bootstrap = Thread._bootstrap # type: ignore[attr-defined]
158 def bootstrap(self: Thread) -> None:
159 with suppress(Exception):
160 setthreadtitle(self.name)
161 _bootstrap(self)
163 Thread._bootstrap = bootstrap # type: ignore[attr-defined]
166def patch_tornado_418() -> None:
167 """Add support for RFC 7168."""
168 RequestHandler.SUPPORTED_METHODS += (
169 "PROPFIND",
170 "BREW",
171 "WHEN",
172 )
173 _ = RequestHandler._unimplemented_method
174 RequestHandler.propfind = _ # type: ignore[attr-defined]
175 RequestHandler.brew = _ # type: ignore[attr-defined]
176 RequestHandler.when = _ # type: ignore[attr-defined]
179def patch_tornado_arguments() -> None: # noqa: C901
180 """Improve argument parsing."""
181 # pylint: disable=too-complex
183 def ensure_bytes(value: Any) -> bytes:
184 """Return the value as bytes."""
185 if isinstance(value, bool):
186 return b"true" if value else b"false"
187 if isinstance(value, bytes):
188 return value
189 return str(value).encode("UTF-8")
191 def parse_body_arguments( # pylint: disable=too-many-arguments
192 content_type: str,
193 body: bytes,
194 arguments: dict[str, list[bytes]],
195 files: dict[str, list[HTTPFile]],
196 headers: None | HTTPHeaders = None,
197 *,
198 config: ParseBodyConfig | None = None,
199 _: Callable[..., None] = tornado.httputil.parse_body_arguments,
200 ) -> None:
201 # pylint: disable=too-many-branches
202 if content_type.startswith("application/json"):
203 if headers and "Content-Encoding" in headers:
204 gen_log.warning(
205 "Unsupported Content-Encoding: %s",
206 headers["Content-Encoding"],
207 )
208 return
209 try:
210 spam = orjson.loads(body)
211 except Exception as exc:
212 raise HTTPInputError(f"Invalid JSON body: {exc}") from exc
213 if not isinstance(spam, dict):
214 return
215 for key, value in spam.items():
216 if value is not None:
217 arguments.setdefault(key, []).append(ensure_bytes(value))
218 elif content_type.startswith("application/yaml"):
219 if headers and "Content-Encoding" in headers:
220 gen_log.warning(
221 "Unsupported Content-Encoding: %s",
222 headers["Content-Encoding"],
223 )
224 return
225 try:
226 spam = yaml.safe_load(body)
227 except Exception as exc:
228 raise HTTPInputError(f"Invalid YAML body: {exc}") from exc
229 if not isinstance(spam, dict):
230 return
231 for key, value in spam.items():
232 if value is not None:
233 arguments.setdefault(key, []).append(ensure_bytes(value))
234 else:
235 _(content_type, body, arguments, files, headers, config=config)
237 parse_body_arguments.__doc__ = tornado.httputil.parse_body_arguments.__doc__
239 tornado.httputil.parse_body_arguments = parse_body_arguments
242def patch_tornado_gzip() -> None:
243 """Use gzip for more content types."""
244 GZipContentEncoding.CONTENT_TYPES = {
245 type for type, data in MEDIA_TYPES.items() if data.get("compressible")
246 }
249def patch_tornado_httpclient() -> None: # fmt: off
250 """Make requests quick."""
251 BACON = 0x75800 # noqa: N806 # pylint: disable=invalid-name
252 EGGS = 1 << 25 # noqa: N806 # pylint: disable=invalid-name
254 AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
256 def prepare_curl_callback(self: HTTPRequest, curl: pycurl.Curl) -> None:
257 # pylint: disable=c-extension-no-member, useless-suppression
258 if urlsplit(self.url).scheme == "https": # noqa: SIM102
259 if (ver := pycurl.version_info())[2] >= BACON and ver[4] & EGGS:
260 curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_3)
262 original_request_init = HTTPRequest.__init__
264 def request_init(self: HTTPRequest, *args: Any, **kwargs: Any) -> None:
265 if len(args) < 18: # there are too many positional arguments here
266 prepare_curl_method = MethodType(prepare_curl_callback, self)
267 kwargs.setdefault("prepare_curl_callback", prepare_curl_method)
268 original_request_init(self, *args, **kwargs)
270 request_init.__doc__ = HTTPRequest.__init__.__doc__
272 HTTPRequest.__init__ = request_init # type: ignore[method-assign]
275def patch_tornado_logs() -> None:
276 """Anonymize Tornado logs."""
277 # pylint: disable=import-outside-toplevel
278 from ..utils.utils import SUS_PATHS, anonymize_ip
280 RequestHandler._request_summary = ( # type: ignore[method-assign]
281 lambda self: "%s %s (%s)" # pylint: disable=consider-using-f-string
282 % (
283 self.request.method,
284 self.request.uri,
285 (
286 self.request.remote_ip
287 if self.request.path == "/robots.txt"
288 or self.request.path.lower() in SUS_PATHS
289 else anonymize_ip(self.request.remote_ip, ignore_invalid=True)
290 ),
291 )
292 )
294 HTTPServerRequest.__repr__ = ( # type: ignore[method-assign]
295 lambda self: "%s(%s)" # pylint: disable=consider-using-f-string
296 % (
297 self.__class__.__name__,
298 ", ".join(
299 [
300 "%s=%r" # pylint: disable=consider-using-f-string
301 % (
302 n,
303 getattr(self, n),
304 )
305 for n in ("protocol", "host", "method", "uri", "version")
306 ]
307 ),
308 )
309 )
312def patch_tornado_redirect() -> None:
313 """Use modern redirect codes and support HEAD requests."""
315 def redirect(
316 self: RequestHandler,
317 url: str,
318 permanent: bool = False,
319 status: None | int = None,
320 ) -> None:
321 if url == self.request.full_url():
322 logging.getLogger(
323 f"{self.__class__.__module__}.{self.__class__.__qualname__}"
324 ).critical("Infinite redirect to %r detected", url)
325 if self._headers_written:
326 # pylint: disable=broad-exception-raised
327 raise Exception("Cannot redirect after headers have been written")
328 if status is None:
329 status = 308 if permanent else 307
330 else:
331 assert isinstance(status, int) and 300 <= status <= 399 # type: ignore[redundant-expr] # noqa: B950
332 self.set_status(status)
333 self.set_header("Location", url)
334 self.finish() # type: ignore[unused-awaitable]
336 if RequestHandler.redirect.__doc__:
337 # fmt: off
338 redirect.__doc__ = (
339 RequestHandler.redirect.__doc__
340 .replace("301", "308")
341 .replace("302", "307")
342 )
343 # fmt: on
345 RequestHandler.redirect = redirect # type: ignore[method-assign]
347 RedirectHandler.head = RedirectHandler.get
350def patch_xml() -> None:
351 """Make XML safer."""
352 defusedxml.defuse_stdlib()
353 defusedxml.xmlrpc.monkey_patch()