Coverage for an_website/utils/request_handler.py: 92.593%
81 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 13:44 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 13:44 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
14"""
15Useful request handlers used by other modules.
17This should only contain request handlers and the get_module_info function.
18"""
20from __future__ import annotations
22import logging
23from collections.abc import Mapping
24from http.client import responses
25from typing import Any, ClassVar, Final, override
26from urllib.parse import unquote
28import regex
29from tornado.web import HTTPError
31from .base_request_handler import BaseRequestHandler
32from .utils import (
33 SUS_PATHS,
34 get_close_matches,
35 remove_suffix_ignore_case,
36 replace_umlauts,
37)
39LOGGER: Final = logging.getLogger(__name__)
42class HTMLRequestHandler(BaseRequestHandler):
43 """A request handler that serves HTML."""
45 POSSIBLE_CONTENT_TYPES: ClassVar[tuple[str, ...]] = (
46 "text/html",
47 "text/plain",
48 "text/markdown",
49 "application/vnd.asozial.dynload+json",
50 )
53class APIRequestHandler(BaseRequestHandler):
54 """The base API request handler."""
56 POSSIBLE_CONTENT_TYPES: ClassVar[tuple[str, ...]] = (
57 "application/json",
58 "application/yaml",
59 )
62class NotFoundHandler(BaseRequestHandler):
63 """Show a 404 page if no other RequestHandler is used."""
65 @override
66 def initialize(self, *args: Any, **kwargs: Any) -> None:
67 """Do nothing to have default title and description."""
68 if "module_info" not in kwargs:
69 kwargs["module_info"] = None
70 super().initialize(*args, **kwargs)
72 @override
73 async def prepare(self) -> None:
74 """Throw a 404 HTTP error or redirect to another page."""
75 self.now = await self.get_time()
77 if self.request.method not in {"GET", "HEAD"}:
78 raise HTTPError(404)
80 new_path = regex.sub(r"/+", "/", self.request.path.rstrip("/")).replace(
81 "_", "-"
82 )
84 for ext in (".html", ".htm", ".php"):
85 new_path = remove_suffix_ignore_case(new_path, f"/index{ext}")
86 new_path = remove_suffix_ignore_case(new_path, ext)
88 new_path = replace_umlauts(new_path)
90 if new_path.lower() in SUS_PATHS:
91 self.set_status(469, reason="Nice Try")
92 return self.write_error(469)
94 if new_path and new_path != self.request.path:
95 return self.redirect(self.fix_url(new_path=new_path), True)
97 this_path_normalized = unquote(new_path).strip("/").lower()
99 paths: Mapping[str, str] = self.settings.get("NORMED_PATHS") or {}
101 if p := paths.get(this_path_normalized):
102 return self.redirect(self.fix_url(new_path=p), False)
104 if len(this_path_normalized) <= 1 and self.request.path != "/":
105 return self.redirect(self.fix_url(new_path="/"))
107 prefixes = tuple(
108 (p, repl)
109 for p, repl in paths.items()
110 if this_path_normalized.startswith(f"{p}/")
111 if f"/{p}" != repl.lower()
112 if p != "api" # api should not be a prefix
113 )
115 if len(prefixes) == 1:
116 ((prefix, replacement),) = prefixes
117 return self.redirect(
118 self.fix_url(
119 new_path=f"{replacement.strip('/')}"
120 f"{this_path_normalized.removeprefix(prefix)}"
121 ),
122 False,
123 )
124 if prefixes:
125 LOGGER.error(
126 "Too many prefixes %r for path %s", prefixes, self.request.path
127 )
129 matches = get_close_matches(this_path_normalized, paths, count=1)
130 if matches:
131 return self.redirect(
132 self.fix_url(new_path=paths[matches[0]]), False
133 )
135 self.set_status(404)
136 self.write_error(404)
139class ErrorPage(HTMLRequestHandler):
140 """A request handler that shows the error page."""
142 _success_status: int = 200
143 """The status code that is expected to be returned."""
145 @override
146 def clear(self) -> None:
147 """Reset all headers and content for this response."""
148 super().clear()
149 self._success_status = 200
151 @override
152 async def get(self, code: str, *, head: bool = False) -> None:
153 """Show the error page."""
154 # pylint: disable=unused-argument
155 status_code = int(code)
156 reason = (
157 "Nice Try" if status_code == 469 else responses.get(status_code, "")
158 )
159 # set the status code if it is allowed
160 if status_code not in (204, 304) and not 100 <= status_code < 200:
161 self.set_status(status_code, reason)
162 self._success_status = status_code
163 return await self.render(
164 "error.html",
165 status=status_code,
166 reason=reason,
167 description=self.get_error_page_description(status_code),
168 is_traceback=False,
169 )
171 @override
172 def get_status(self) -> int:
173 """Hack the status code.
175 This hacks the status code to be 200 if the status code is expected.
176 This avoids sending error logs to APM or Webhooks in case of success.
178 This depends on the fact that Tornado internally uses self._status_code
179 to set the status code in the response and self.get_status() when
180 deciding how to log the request.
181 """
182 status = super().get_status()
183 if status == self._success_status:
184 return 200
185 return status
188class ZeroDivision(BaseRequestHandler):
189 """A request handler that raises an error."""
191 @override
192 async def prepare(self) -> None:
193 """Divide by zero and raise an error."""
194 self.now = await self.get_time()
195 self.handle_accept_header(self.POSSIBLE_CONTENT_TYPES)
196 if self.request.method != "OPTIONS":
197 420 / 0 # pylint: disable=pointless-statement