Coverage for an_website/utils/base_request_handler.py: 79.032%

496 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-10 18:56 +0000

1# This program is free software: you can redistribute it and/or modify 

2# it under the terms of the GNU Affero General Public License as 

3# published by the Free Software Foundation, either version 3 of the 

4# License, or (at your option) any later version. 

5# 

6# This program is distributed in the hope that it will be useful, 

7# but WITHOUT ANY WARRANTY; without even the implied warranty of 

8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

9# GNU Affero General Public License for more details. 

10# 

11# You should have received a copy of the GNU Affero General Public License 

12# along with this program. If not, see <https://www.gnu.org/licenses/>. 

13# pylint: disable=too-many-lines 

14 

15""" 

16The base request handler used by other modules. 

17 

18This should only contain the BaseRequestHandler class. 

19""" 

20 

21import contextlib 

22import inspect 

23import logging 

24import secrets 

25import sys 

26import traceback 

27import uuid 

28from asyncio import Future 

29from base64 import b64decode 

30from collections.abc import Awaitable, Callable, Coroutine, Mapping 

31from contextvars import ContextVar 

32from datetime import date, datetime, timedelta, timezone, tzinfo 

33from functools import cached_property, partial, reduce 

34from random import Random, choice as random_choice 

35from types import TracebackType 

36from typing import Any, ClassVar, Final, cast, override 

37from urllib.parse import SplitResult, urlsplit, urlunsplit 

38from zoneinfo import ZoneInfo 

39 

40import elasticapm 

41import html2text 

42import orjson as json 

43import regex 

44import tornado.web 

45import yaml 

46from accept_types import get_best_match # type: ignore[import-untyped] 

47from ansi2html import Ansi2HTMLConverter 

48from bs4 import BeautifulSoup 

49from dateutil.easter import easter 

50from elastic_transport import ApiError, TransportError 

51from elasticsearch import AsyncElasticsearch 

52from openmoji_dist import VERSION as OPENMOJI_VERSION 

53from redis.asyncio import Redis 

54from tornado.httputil import HTTPServerRequest 

55from tornado.iostream import StreamClosedError 

56from tornado.web import ( 

57 Finish, 

58 GZipContentEncoding, 

59 HTTPError, 

60 MissingArgumentError, 

61 OutputTransform, 

62) 

63 

64from .. import ( 

65 EVENT_ELASTICSEARCH, 

66 EVENT_REDIS, 

67 GH_ORG_URL, 

68 GH_PAGES_URL, 

69 GH_REPO_URL, 

70 NAME, 

71 ORJSON_OPTIONS, 

72 pytest_is_running, 

73) 

74from .decorators import is_authorized 

75from .options import ColourScheme, Options 

76from .static_file_handling import FILE_HASHES_DICT, fix_static_path 

77from .themes import RANDOM_THEMES 

78from .utils import ( 

79 ModuleInfo, 

80 Permission, 

81 add_args_to_url, 

82 ansi_replace, 

83 apply, 

84 backspace_replace, 

85 bool_to_str, 

86 emoji2html, 

87 geoip, 

88 hash_bytes, 

89 is_prime, 

90 ratelimit, 

91 str_to_bool, 

92) 

93 

94LOGGER: Final = logging.getLogger(__name__) 

95 

96TEXT_CONTENT_TYPES: Final[set[str]] = { 

97 "application/javascript", 

98 "application/json", 

99 "application/vnd.asozial.dynload+json", 

100 "application/x-ndjson", 

101 "application/xml", 

102 "application/yaml", 

103} 

104 

105CLACKS_OVERHEADS = ( 

106 "GNU Aaron Swartz", 

107 "GNU Carol Angie Deborah Maltesi", 

108 "GNU Charlotte Angie", 

109 "GNU Terry Pratchett", 

110) 

111 

112request_ctx_var: ContextVar[HTTPServerRequest] = ContextVar("current_request") 

113 

114 

115class _RequestHandler(tornado.web.RequestHandler): 

116 """Base for Tornado request handlers.""" 

117 

118 crawler: bool = False 

119 

120 @override 

121 async def _execute( 

122 self, transforms: list[OutputTransform], *args: bytes, **kwargs: bytes 

123 ) -> None: 

124 request_ctx_var.set(self.request) 

125 

126 self.now = await self.get_time() 

127 

128 return await super()._execute(transforms, *args, **kwargs) 

129 

130 # pylint: disable-next=protected-access 

131 _execute.__doc__ = tornado.web.RequestHandler._execute.__doc__ 

132 

133 @property 

134 def apm_client(self) -> None | elasticapm.Client: 

135 """Get the APM client from the settings.""" 

136 return self.settings.get("ELASTIC_APM", {}).get("CLIENT") # type: ignore[no-any-return] 

137 

138 @property 

139 def apm_enabled(self) -> bool: 

140 """Return whether APM is enabled.""" 

141 return bool(self.settings.get("ELASTIC_APM", {}).get("ENABLED")) 

142 

143 @override 

144 def data_received( # noqa: D102 

145 self, chunk: bytes 

146 ) -> None | Awaitable[None]: 

147 pass 

148 

149 data_received.__doc__ = tornado.web.RequestHandler.data_received.__doc__ 

150 

151 @property 

152 def elasticsearch(self) -> AsyncElasticsearch: 

153 """ 

154 Get the Elasticsearch client from the settings. 

155 

156 This is None if Elasticsearch is not enabled. 

157 """ 

158 return cast(AsyncElasticsearch, self.settings.get("ELASTICSEARCH")) 

159 

160 @property 

161 def elasticsearch_prefix(self) -> str: 

162 """Get the Elasticsearch prefix from the settings.""" 

163 return self.settings.get( # type: ignore[no-any-return] 

164 "ELASTICSEARCH_PREFIX", NAME 

165 ) 

166 

167 def geoip( 

168 self, 

169 ip: None | str = None, 

170 database: str = geoip.__defaults__[0], # type: ignore[index] 

171 *, 

172 allow_fallback: bool = True, 

173 ) -> Coroutine[None, None, None | dict[str, Any]]: 

174 """Get GeoIP information.""" 

175 if not ip: 

176 ip = self.request.remote_ip 

177 if not EVENT_ELASTICSEARCH.is_set(): 

178 return geoip(ip, database) 

179 return geoip( 

180 ip, database, self.elasticsearch, allow_fallback=allow_fallback 

181 ) 

182 

183 async def get_time(self) -> datetime: 

184 """Get the start time of the request in the users' timezone.""" 

185 tz: tzinfo = timezone.utc 

186 try: 

187 geoip = await self.geoip() # pylint: disable=redefined-outer-name 

188 except ApiError, TransportError: 

189 LOGGER.exception("Elasticsearch request failed") 

190 if self.apm_client: 

191 self.apm_client.capture_exception() # type: ignore[no-untyped-call] 

192 else: 

193 if geoip and "timezone" in geoip: 

194 tz = ZoneInfo(geoip["timezone"]) 

195 return datetime.fromtimestamp( 

196 self.request._start_time, tz=tz # pylint: disable=protected-access 

197 ) 

198 

199 def is_authorized( 

200 self, permission: Permission, allow_cookie_auth: bool = True 

201 ) -> bool | None: 

202 """Check whether the request is authorized.""" 

203 return is_authorized(self, permission, allow_cookie_auth) 

204 

205 @override 

206 def log_exception( 

207 self, 

208 typ: None | type[BaseException], 

209 value: None | BaseException, 

210 tb: None | TracebackType, 

211 ) -> None: 

212 if isinstance(value, HTTPError): 

213 super().log_exception(typ, value, tb) 

214 elif typ is StreamClosedError: 

215 LOGGER.debug( 

216 "Stream closed %s", 

217 self._request_summary(), 

218 exc_info=(typ, value, tb), # type: ignore[arg-type] 

219 ) 

220 else: 

221 LOGGER.error( 

222 "Uncaught exception %s", 

223 self._request_summary(), 

224 exc_info=(typ, value, tb), # type: ignore[arg-type] 

225 ) 

226 

227 log_exception.__doc__ = tornado.web.RequestHandler.log_exception.__doc__ 

228 

229 @cached_property 

230 def now(self) -> datetime: 

231 """Get the current time.""" 

232 # pylint: disable=method-hidden 

233 if pytest_is_running(): 

234 raise AssertionError("Now accessed before it was set") 

235 # if self.request.method in self.SUPPORTED_METHODS: # Why? 

236 LOGGER.error("Now accessed before it was set", stacklevel=3) 

237 return self.now_utc 

238 

239 @cached_property 

240 def now_utc(self) -> datetime: 

241 """Get the current time in the correct timezone.""" 

242 return datetime.fromtimestamp( 

243 self.request._start_time, # pylint: disable=protected-access 

244 tz=timezone.utc, 

245 ) 

246 

247 @override # pylint: disable-next=invalid-overridden-method 

248 async def prepare(self) -> None: 

249 """Check authorization and call self.ratelimit().""" 

250 if crawler_secret := self.settings.get("CRAWLER_SECRET"): 

251 self.crawler = crawler_secret in self.request.headers.get( 

252 "User-Agent", "" 

253 ) 

254 

255 if ( 

256 self.request.method in {"GET", "HEAD"} 

257 and self.redirect_to_canonical_domain() 

258 ): 

259 return 

260 

261 if self.request.method != "OPTIONS" and not await self.ratelimit(True): 

262 await self.ratelimit() 

263 

264 async def ratelimit(self, global_ratelimit: bool = False) -> bool: 

265 """Take b1nzy to space using Redis.""" 

266 if ( 

267 not self.settings.get("RATELIMITS") 

268 or self.request.method == "OPTIONS" 

269 or self.is_authorized(Permission.RATELIMITS) 

270 or self.crawler 

271 ): 

272 return False 

273 

274 if not EVENT_REDIS.is_set(): 

275 LOGGER.warning( 

276 ( 

277 "Ratelimits are enabled, but Redis is not available. " 

278 "This can happen shortly after starting the website." 

279 ), 

280 ) 

281 raise HTTPError(503) 

282 

283 if global_ratelimit: # TODO: add to _RequestHandler 

284 ratelimited, headers = await ratelimit( 

285 self.redis, 

286 self.redis_prefix, 

287 str(self.request.remote_ip), 

288 bucket=None, 

289 max_burst=99, # limit = 100 

290 count_per_period=20, # 20 requests per second 

291 period=1, 

292 tokens=10 if self.settings.get("UNDER_ATTACK") else 1, 

293 ) 

294 else: 

295 method = ( 

296 "GET" if self.request.method == "HEAD" else self.request.method 

297 ) 

298 if not (limit := getattr(self, f"RATELIMIT_{method}_LIMIT", 0)): 

299 return False 

300 ratelimited, headers = await ratelimit( 

301 self.redis, 

302 self.redis_prefix, 

303 str(self.request.remote_ip), 

304 bucket=getattr( 

305 self, 

306 f"RATELIMIT_{method}_BUCKET", 

307 self.__class__.__name__.lower(), 

308 ), 

309 max_burst=limit - 1, 

310 count_per_period=getattr( # request count per period 

311 self, 

312 f"RATELIMIT_{method}_COUNT_PER_PERIOD", 

313 30, 

314 ), 

315 period=getattr( 

316 self, f"RATELIMIT_{method}_PERIOD", 60 # period in seconds 

317 ), 

318 tokens=1 if self.request.method != "HEAD" else 0, 

319 ) 

320 

321 for header, value in headers.items(): 

322 self.set_header(header, value) 

323 

324 if ratelimited: 

325 if self.now.date() == date(self.now.year, 4, 20): 

326 self.set_status(420) 

327 self.write_error(420) 

328 else: 

329 self.set_status(429) 

330 self.write_error(429) 

331 

332 return ratelimited 

333 

334 def redirect_to_canonical_domain(self) -> bool: 

335 """Redirect to the canonical domain.""" 

336 if ( 

337 not (domain := self.settings.get("DOMAIN")) 

338 or not self.request.headers.get("Host") 

339 or self.request.host_name == domain 

340 or self.request.host_name.endswith((".onion", ".i2p")) 

341 or regex.fullmatch(r"/[\u2800-\u28FF]+/?", self.request.path) 

342 ): 

343 return False 

344 port = urlsplit(f"//{self.request.headers['Host']}").port 

345 self.redirect( 

346 urlsplit(self.request.full_url()) 

347 ._replace(netloc=f"{domain}:{port}" if port else domain) 

348 .geturl(), 

349 permanent=True, 

350 ) 

351 return True 

352 

353 @property 

354 def redis(self) -> Redis[str]: 

355 """ 

356 Get the Redis client from the settings. 

357 

358 This is None if Redis is not enabled. 

359 """ 

360 return cast("Redis[str]", self.settings.get("REDIS")) 

361 

362 @property 

363 def redis_prefix(self) -> str: 

364 """Get the Redis prefix from the settings.""" 

365 return self.settings.get( # type: ignore[no-any-return] 

366 "REDIS_PREFIX", NAME 

367 ) 

368 

369 

370class BaseRequestHandler(_RequestHandler): 

371 """The base request handler used by every page and API.""" 

372 

373 # pylint: disable=too-many-instance-attributes, too-many-public-methods 

374 

375 ELASTIC_RUM_URL: ClassVar[str] = ( 

376 f"/@apm-rum/elastic-apm-rum.umd{'' if sys.flags.dev_mode else '.min'}.js" 

377 "?v=5.12.0" 

378 ) 

379 

380 COMPUTE_ETAG: ClassVar[bool] = True 

381 ALLOW_COMPRESSION: ClassVar[bool] = True 

382 MAX_BODY_SIZE: ClassVar[None | int] = None 

383 ALLOWED_METHODS: ClassVar[tuple[str, ...]] = ("GET",) 

384 POSSIBLE_CONTENT_TYPES: ClassVar[tuple[str, ...]] = () 

385 

386 module_info: ModuleInfo 

387 # info about page, can be overridden in module_info 

388 title: str = "Das Asoziale Netzwerk" 

389 short_title: str = "Asoziales Netzwerk" 

390 description: str = "Die tolle Webseite des Asozialen Netzwerks" 

391 

392 used_render: bool = False 

393 

394 active_origin_trials: set[str] 

395 content_type: None | str = None 

396 apm_script: None | str 

397 nonce: str 

398 

399 def _finish( 

400 self, chunk: None | str | bytes | dict[str, Any] = None 

401 ) -> Future[None]: 

402 if self._finished: 

403 raise RuntimeError("finish() called twice") 

404 

405 if chunk is not None: 

406 self.write(chunk) 

407 

408 if ( # pylint: disable=too-many-boolean-expressions 

409 (content_type := self.content_type) 

410 and ( 

411 content_type in TEXT_CONTENT_TYPES 

412 or content_type.startswith("text/") 

413 or content_type.endswith(("+xml", "+json")) 

414 ) 

415 and self._write_buffer 

416 and not self._write_buffer[-1].endswith(b"\n") 

417 ): 

418 self.write(b"\n") 

419 

420 return super().finish() 

421 

422 @override 

423 def compute_etag(self) -> None | str: 

424 """Compute ETag with Base85 encoding.""" 

425 if not self.COMPUTE_ETAG: 

426 return None 

427 return f'"{hash_bytes(*self._write_buffer)}"' # noqa: B907 

428 

429 @override 

430 def decode_argument( # noqa: D102 

431 self, value: bytes, name: str | None = None 

432 ) -> str: 

433 try: 

434 return value.decode("UTF-8", "replace") 

435 except UnicodeDecodeError as exc: 

436 err_msg = f"Invalid unicode in {name or 'url'}: {value[:40]!r}" 

437 LOGGER.exception(err_msg, exc_info=exc) 

438 raise HTTPError(400, err_msg) from exc 

439 

440 @property 

441 def dump(self) -> Callable[[Any], str | bytes]: 

442 """Get the function for dumping the output.""" 

443 yaml_subset = self.content_type in { 

444 "application/json", 

445 "application/vnd.asozial.dynload+json", 

446 } 

447 

448 if self.content_type == "application/yaml": 

449 if self.now.timetuple()[2:0:-1] == (1, 4): 

450 yaml_subset = True 

451 else: 

452 return lambda spam: yaml.dump( 

453 spam, 

454 width=self.get_int_argument("yaml_width", 80, min_=80), 

455 ) 

456 

457 if yaml_subset: 

458 option = ORJSON_OPTIONS 

459 if self.get_bool_argument("pretty", False): 

460 option |= json.OPT_INDENT_2 

461 return lambda spam: json.dumps(spam, option=option) 

462 

463 return lambda spam: spam 

464 

465 @override 

466 def finish( # noqa: D102 

467 self, chunk: None | str | bytes | dict[Any, Any] = None 

468 ) -> Future[None]: 

469 as_json = self.content_type == "application/vnd.asozial.dynload+json" 

470 as_plain_text = self.content_type == "text/plain" 

471 as_markdown = self.content_type == "text/markdown" 

472 

473 if ( 

474 not isinstance(chunk, bytes | str) 

475 or self.content_type == "text/html" 

476 or not self.used_render 

477 or not (as_json or as_plain_text or as_markdown) 

478 ): 

479 return self._finish(chunk) 

480 

481 chunk = chunk.decode("UTF-8") if isinstance(chunk, bytes) else chunk 

482 

483 if as_markdown: 

484 return self._finish( 

485 f"# {self.title}\n\n" 

486 + html2text.html2text(chunk, self.request.full_url()).strip() 

487 ) 

488 

489 soup = BeautifulSoup(chunk, features="lxml") 

490 

491 if as_plain_text: 

492 return self._finish(soup.get_text("\n", True)) 

493 

494 dictionary: dict[str, object] = { 

495 "url": self.fix_url(include_protocol_and_host=True), 

496 "title": self.title, 

497 "short_title": ( 

498 self.short_title if self.title != self.short_title else None 

499 ), 

500 "body": "".join( 

501 str(element) 

502 for element in soup.find_all(name="main")[0].contents 

503 ).strip(), 

504 "scripts": [ 

505 {"script": script.string} | script.attrs 

506 for script in soup.find_all("script") 

507 ], 

508 "stylesheets": [ 

509 stylesheet.get("href").strip() 

510 for stylesheet in soup.find_all("link", rel="stylesheet") 

511 ], 

512 "css": "\n".join(style.string for style in soup.find_all("style")), 

513 } 

514 

515 return self._finish(dictionary) 

516 

517 finish.__doc__ = _RequestHandler.finish.__doc__ 

518 

519 def finish_dict(self, **kwargs: Any) -> Future[None]: 

520 """Finish the request with a dictionary.""" 

521 return self.finish(kwargs) 

522 

523 def fix_url( 

524 self, 

525 url: None | str | SplitResult = None, 

526 new_path: None | str = None, 

527 include_protocol_and_host: bool | str = False, 

528 query_args: Mapping[str, None | str | bool | float] | None = None, 

529 ) -> str: 

530 """ 

531 Fix a URL and return it. 

532 

533 If the URL is from another website, link to it with the redirect page, 

534 otherwise just return the URL with no_3rd_party appended. 

535 """ 

536 query_args_d = dict(query_args or {}) 

537 del query_args 

538 if url is None: 

539 url = self.request.full_url() 

540 if isinstance(url, str): 

541 url = urlsplit(url) 

542 if url.netloc and url.netloc.lower() != self.request.host.lower(): 

543 if ( 

544 not self.user_settings.ask_before_leaving 

545 or not self.settings.get("REDIRECT_MODULE_LOADED") 

546 ): 

547 return url.geturl() 

548 path = "/redirect" 

549 query_args_d["to"] = url.geturl() 

550 url = urlsplit(self.request.full_url()) 

551 else: 

552 path = url.path if new_path is None else new_path 

553 path = f"/{path.strip('/')}".lower() 

554 if path == "/lolwut": 

555 path = path.upper() 

556 if path.startswith("/soundboard/files/") or path in FILE_HASHES_DICT: 

557 query_args_d.update( 

558 dict.fromkeys(self.user_settings.iter_option_names()) 

559 ) 

560 else: 

561 for ( 

562 key, 

563 value, 

564 ) in self.user_settings.as_dict_with_str_values().items(): 

565 query_args_d.setdefault(key, value) 

566 for key, value in self.user_settings.as_dict_with_str_values( 

567 include_query_argument=False, 

568 include_body_argument=self.request.path == "/einstellungen" 

569 and self.get_bool_argument("save_in_cookie", False), 

570 ).items(): 

571 if value == query_args_d[key]: 

572 query_args_d[key] = None 

573 

574 result = add_args_to_url( 

575 urlunsplit( 

576 ( 

577 self.request.protocol, 

578 self.request.host, 

579 path, 

580 url.query, 

581 url.fragment, 

582 ) 

583 ), 

584 **query_args_d, 

585 ) 

586 

587 return ( 

588 result 

589 if include_protocol_and_host 

590 else result.removeprefix( 

591 f"{self.request.protocol}://{self.request.host}" 

592 ) 

593 ) 

594 

595 @classmethod 

596 def get_allowed_methods(cls) -> list[str]: 

597 """Get allowed methods.""" 

598 methods = {"OPTIONS", *cls.ALLOWED_METHODS} 

599 if "GET" in cls.ALLOWED_METHODS and cls.supports_head(): 

600 methods.add("HEAD") 

601 return sorted(methods) 

602 

603 def get_bool_argument( 

604 self, 

605 name: str, 

606 default: None | bool = None, 

607 ) -> bool: 

608 """Get an argument parsed as boolean.""" 

609 if default is not None: 

610 return str_to_bool(self.get_argument(name, ""), default) 

611 value = str(self.get_argument(name)) 

612 try: 

613 return str_to_bool(value) 

614 except ValueError as err: 

615 raise HTTPError(400, f"{value} is not a boolean") from err 

616 

617 def get_display_scheme(self) -> ColourScheme: 

618 """Get the scheme currently displayed.""" 

619 scheme = self.user_settings.scheme 

620 if scheme == "random": 

621 return ("light", "dark")[self.now.microsecond & 1] 

622 return scheme 

623 

624 def get_display_theme(self) -> str: 

625 """Get the theme currently displayed.""" 

626 theme = self.user_settings.theme 

627 

628 if theme == "default" and self.now.month == 12: 

629 return "christmas" 

630 

631 if theme != "random": 

632 return theme 

633 

634 return random_choice(RANDOM_THEMES) # nosec: B311 

635 

636 def get_error_message(self, **kwargs: Any) -> str: 

637 """ 

638 Get the error message and return it. 

639 

640 If the serve_traceback setting is true (debug mode is activated), 

641 the traceback gets returned. 

642 """ 

643 if "exc_info" in kwargs and not issubclass( 

644 kwargs["exc_info"][0], HTTPError 

645 ): 

646 if self.settings.get("serve_traceback") or self.is_authorized( 

647 Permission.TRACEBACK 

648 ): 

649 return "".join( 

650 traceback.format_exception(*kwargs["exc_info"]) 

651 ).strip() 

652 return "".join( 

653 traceback.format_exception_only(*kwargs["exc_info"][:2]) 

654 ).strip() 

655 if "exc_info" in kwargs and issubclass( 

656 kwargs["exc_info"][0], MissingArgumentError 

657 ): 

658 return cast(str, kwargs["exc_info"][1].log_message) 

659 return str(self._reason) 

660 

661 def get_error_page_description(self, status_code: int) -> str: 

662 """Get the description for the error page.""" 

663 # pylint: disable=too-many-return-statements 

664 # https://developer.mozilla.org/docs/Web/HTTP/Status 

665 if 100 <= status_code <= 199: 

666 return "Hier gibt es eine total wichtige Information." 

667 if 200 <= status_code <= 299: 

668 return "Hier ist alles super! 🎶🎶" 

669 if 300 <= status_code <= 399: 

670 return "Eine Umleitung ist eingerichtet." 

671 if 400 <= status_code <= 499: 

672 if status_code == 404: 

673 return f"{self.request.path} wurde nicht gefunden." 

674 if status_code == 451: 

675 return "Hier wäre bestimmt geiler Scheiß." 

676 return "Ein Client-Fehler ist aufgetreten." 

677 if 500 <= status_code <= 599: 

678 return "Ein Server-Fehler ist aufgetreten." 

679 raise ValueError( 

680 f"{status_code} is not a valid HTTP response status code." 

681 ) 

682 

683 def get_int_argument( 

684 self, 

685 name: str, 

686 default: None | int = None, 

687 *, 

688 max_: None | int = None, 

689 min_: None | int = None, 

690 ) -> int: 

691 """Get an argument parsed as integer.""" 

692 if default is None: 

693 str_value = self.get_argument(name) 

694 try: 

695 value = int(str_value, base=0) 

696 except ValueError as err: 

697 raise HTTPError(400, f"{str_value} is not an integer") from err 

698 elif self.get_argument(name, ""): 

699 try: 

700 value = int(self.get_argument(name), base=0) 

701 except ValueError: 

702 value = default 

703 else: 

704 value = default 

705 

706 if max_ is not None: 

707 value = min(max_, value) 

708 if min_ is not None: 

709 value = max(min_, value) 

710 

711 return value 

712 

713 def get_module_infos(self) -> tuple[ModuleInfo, ...]: 

714 """Get the module infos.""" 

715 return self.settings.get("MODULE_INFOS") or () 

716 

717 def get_reporting_api_endpoint(self) -> None | str: 

718 """Get the endpoint for the Reporting API™️.""" 

719 if not self.settings.get("REPORTING"): 

720 return None 

721 endpoint = self.settings.get("REPORTING_ENDPOINT") 

722 

723 if not endpoint or not endpoint.startswith("/"): 

724 return endpoint 

725 

726 return f"{self.request.protocol}://{self.request.host}{endpoint}" 

727 

728 @override 

729 def get_template_namespace(self) -> dict[str, Any]: 

730 """ 

731 Add useful things to the template namespace and return it. 

732 

733 They are mostly needed by most of the pages (like title, 

734 description and no_3rd_party). 

735 """ 

736 namespace = super().get_template_namespace() 

737 ansi2html = partial( 

738 Ansi2HTMLConverter(inline=True, scheme="xterm").convert, full=False 

739 ) 

740 namespace.update(self.user_settings.as_dict()) 

741 namespace.update( 

742 ansi2html=partial( 

743 reduce, apply, (ansi2html, ansi_replace, backspace_replace) 

744 ), 

745 apm_script=( 

746 self.settings["ELASTIC_APM"].get("INLINE_SCRIPT") 

747 if self.apm_enabled 

748 else None 

749 ), 

750 as_html=self.content_type == "text/html", 

751 c=self.now.date() == date(self.now.year, 4, 1) 

752 or str_to_bool(self.get_cookie("c", "f") or "f", False), 

753 canonical_url=self.request.protocol 

754 + "://" 

755 + (self.settings["DOMAIN"] or self.request.host) 

756 + self.fix_url( 

757 self.request.full_url().upper() 

758 if self.request.path.upper().startswith("/LOLWUT") 

759 else self.request.full_url().lower() 

760 ) 

761 .split("?")[0] 

762 .removesuffix("/"), 

763 description=self.description, 

764 display_theme=self.get_display_theme(), 

765 display_scheme=self.get_display_scheme(), 

766 elastic_rum_url=self.ELASTIC_RUM_URL, 

767 fix_static=lambda path: self.fix_url(fix_static_path(path)), 

768 fix_url=self.fix_url, 

769 emoji2html=( 

770 emoji2html 

771 if self.user_settings.openmoji == "img" 

772 else ( 

773 (lambda emoji: f'<span class="openmoji">{emoji}</span>') 

774 if self.user_settings.openmoji 

775 else (lambda emoji: f"<span>{emoji}</span>") 

776 ) 

777 ), 

778 form_appendix=self.user_settings.get_form_appendix(), 

779 GH_ORG_URL=GH_ORG_URL, 

780 GH_PAGES_URL=GH_PAGES_URL, 

781 GH_REPO_URL=GH_REPO_URL, 

782 keywords="Asoziales Netzwerk, Känguru-Chroniken" 

783 + ( 

784 f", {self.module_info.get_keywords_as_str(self.request.path)}" 

785 if self.module_info # type: ignore[truthy-bool] 

786 else "" 

787 ), 

788 lang="de", # TODO: add language support 

789 nonce=self.nonce, 

790 now=self.now, 

791 openmoji_version=OPENMOJI_VERSION, 

792 settings=self.settings, 

793 short_title=self.short_title, 

794 testing=pytest_is_running(), 

795 title=self.title, 

796 ) 

797 namespace.update( 

798 { 

799 "🥚": timedelta() 

800 <= self.now.date() - easter(self.now.year) 

801 < timedelta(days=2), 

802 "🦘": is_prime(self.now.microsecond), 

803 } 

804 ) 

805 return namespace 

806 

807 def get_user_id(self) -> str: 

808 """Get the user id saved in the cookie or create one.""" 

809 cookie = self.get_secure_cookie( 

810 "user_id", 

811 max_age_days=90, 

812 min_version=2, 

813 ) 

814 

815 user_id = cookie.decode("UTF-8") if cookie else str(uuid.uuid4()) 

816 

817 if not self.get_secure_cookie( # save it in cookie or reset expiry date 

818 "user_id", max_age_days=30, min_version=2 

819 ): 

820 self.set_secure_cookie( 

821 "user_id", 

822 user_id, 

823 expires_days=90, 

824 path="/", 

825 samesite="Strict", 

826 ) 

827 

828 return user_id 

829 

830 def handle_accept_header( # pylint: disable=inconsistent-return-statements 

831 self, possible_content_types: tuple[str, ...], strict: bool = True 

832 ) -> None: 

833 """Handle the Accept header and set `self.content_type`.""" 

834 if not possible_content_types: 

835 return 

836 content_type = get_best_match( 

837 self.request.headers.get("Accept") or "*/*", 

838 possible_content_types, 

839 ) 

840 if content_type is None: 

841 if strict: 

842 return self.handle_not_acceptable(possible_content_types) 

843 content_type = possible_content_types[0] 

844 self.content_type = content_type 

845 self.set_content_type_header() 

846 

847 def handle_not_acceptable( 

848 self, possible_content_types: tuple[str, ...] 

849 ) -> None: 

850 """Only call this if we cannot respect the Accept header.""" 

851 self.clear_header("Content-Type") 

852 self.set_status(406) 

853 raise Finish("\n".join(possible_content_types) + "\n") 

854 

855 def head(self, *args: Any, **kwargs: Any) -> None | Awaitable[None]: 

856 """Handle HEAD requests.""" 

857 if self.get.__module__ == "tornado.web": 

858 raise HTTPError(405) 

859 if not self.supports_head(): 

860 raise HTTPError(501) 

861 

862 kwargs["head"] = True 

863 return self.get(*args, **kwargs) 

864 

865 @override 

866 def initialize( 

867 self, 

868 *, 

869 module_info: ModuleInfo, 

870 # default is true, because then empty args dicts are 

871 # enough to specify that the defaults should be used 

872 default_title: bool = True, 

873 default_description: bool = True, 

874 ) -> None: 

875 """ 

876 Get title and description from the kwargs. 

877 

878 If title and description are present in the kwargs, 

879 then they override self.title and self.description. 

880 """ 

881 self.module_info = module_info 

882 if not default_title: 

883 page_info = self.module_info.get_page_info(self.request.path) 

884 self.title = page_info.name 

885 self.short_title = page_info.short_name or self.title 

886 if not default_description: 

887 self.description = self.module_info.get_page_info( 

888 self.request.path 

889 ).description 

890 

891 @override 

892 async def options(self, *args: Any, **kwargs: Any) -> None: 

893 """Handle OPTIONS requests.""" 

894 # pylint: disable=unused-argument 

895 self.set_header("Allow", ", ".join(self.get_allowed_methods())) 

896 self.set_status(204) 

897 await self.finish() 

898 

899 def origin_trial(self, token: bytes | str) -> bool: 

900 """Enable an experimental feature.""" 

901 # pylint: disable=protected-access 

902 payload = json.loads(b64decode(token)[69:]) 

903 if payload["feature"] in self.active_origin_trials: 

904 return True 

905 origin = urlsplit(payload["origin"]) 

906 url = urlsplit(self.request.full_url()) 

907 if url.port is None and url.scheme in {"http", "https"}: 

908 url = url._replace( 

909 netloc=f"{url.hostname}:{443 if url.scheme == 'https' else 80}" 

910 ) 

911 if self.request._start_time > payload["expiry"]: 

912 return False 

913 if url.scheme != origin.scheme: 

914 return False 

915 if url.netloc != origin.netloc and not ( 

916 payload.get("isSubdomain") 

917 and url.netloc.endswith(f".{origin.netloc}") 

918 ): 

919 return False 

920 self.add_header("Origin-Trial", token) 

921 self.active_origin_trials.add(payload["feature"]) 

922 return True 

923 

924 @override 

925 async def prepare(self) -> None: 

926 """Check authorization and call self.ratelimit().""" 

927 await super().prepare() 

928 

929 if self._finished: 

930 return 

931 

932 if not self.ALLOW_COMPRESSION: 

933 for transform in self._transforms: 

934 if isinstance(transform, GZipContentEncoding): 

935 # pylint: disable=protected-access 

936 transform._gzipping = False 

937 

938 self.handle_accept_header(self.POSSIBLE_CONTENT_TYPES) 

939 

940 if self.request.method == "GET" and ( 

941 days := Random(self.now.timestamp()).randint(0, 31337) 

942 ) in { 

943 69, 

944 420, 

945 1337, 

946 31337, 

947 }: 

948 self.set_cookie("c", "s", expires_days=days / 24, path="/") 

949 

950 if ( 

951 self.request.method != "OPTIONS" 

952 and self.MAX_BODY_SIZE is not None 

953 and len(self.request.body) > self.MAX_BODY_SIZE 

954 ): 

955 LOGGER.warning( 

956 "%s > MAX_BODY_SIZE (%s)", 

957 len(self.request.body), 

958 self.MAX_BODY_SIZE, 

959 ) 

960 raise HTTPError(413) 

961 

962 @override 

963 def render( # noqa: D102 

964 self, template_name: str, **kwargs: Any 

965 ) -> Future[None]: 

966 self.used_render = True 

967 return super().render(template_name, **kwargs) 

968 

969 render.__doc__ = _RequestHandler.render.__doc__ 

970 

971 def set_content_type_header(self) -> None: 

972 """Set the Content-Type header based on `self.content_type`.""" 

973 if str(self.content_type).startswith("text/"): # RFC 2616 (3.7.1) 

974 self.set_header( 

975 "Content-Type", f"{self.content_type};charset=utf-8" 

976 ) 

977 elif self.content_type is not None: 

978 self.set_header("Content-Type", self.content_type) 

979 

980 @override 

981 def set_cookie( # noqa: D102 # pylint: disable=too-many-arguments 

982 self, 

983 name: str, 

984 value: str | bytes, 

985 domain: None | str = None, 

986 expires: None | float | tuple[int, ...] | datetime = None, 

987 path: str = "/", 

988 expires_days: None | float = 400, # changed 

989 *, 

990 secure: bool | None = None, 

991 httponly: bool = True, 

992 **kwargs: Any, 

993 ) -> None: 

994 if "samesite" not in kwargs: 

995 # default for same site should be strict 

996 kwargs["samesite"] = "Strict" 

997 

998 super().set_cookie( 

999 name, 

1000 value, 

1001 domain, 

1002 expires, 

1003 path, 

1004 expires_days, 

1005 secure=( 

1006 self.request.protocol == "https" if secure is None else secure 

1007 ), 

1008 httponly=httponly, 

1009 **kwargs, 

1010 ) 

1011 

1012 set_cookie.__doc__ = _RequestHandler.set_cookie.__doc__ 

1013 

1014 def set_csp_header(self) -> None: 

1015 """Set the Content-Security-Policy header.""" 

1016 self.nonce = secrets.token_urlsafe(16) 

1017 

1018 script_src = ["'self'", f"'nonce-{self.nonce}'"] 

1019 

1020 if ( 

1021 self.apm_enabled 

1022 and "INLINE_SCRIPT_HASH" in self.settings["ELASTIC_APM"] 

1023 ): 

1024 script_src.extend( 

1025 ( 

1026 f"'sha256-{self.settings['ELASTIC_APM']['INLINE_SCRIPT_HASH']}'", 

1027 "'unsafe-inline'", # for browsers that don't support hash 

1028 ) 

1029 ) 

1030 

1031 connect_src = ["'self'"] 

1032 

1033 if self.apm_enabled and "SERVER_URL" in self.settings["ELASTIC_APM"]: 

1034 rum_server_url = self.settings["ELASTIC_APM"].get("RUM_SERVER_URL") 

1035 if rum_server_url: 

1036 # the RUM agent needs to connect to rum_server_url 

1037 connect_src.append(rum_server_url) 

1038 elif rum_server_url is None: 

1039 # the RUM agent needs to connect to ["ELASTIC_APM"]["SERVER_URL"] 

1040 connect_src.append(self.settings["ELASTIC_APM"]["SERVER_URL"]) 

1041 

1042 connect_src.append( # fix for older browsers 

1043 ("wss" if self.request.protocol == "https" else "ws") 

1044 + f"://{self.request.host}" 

1045 ) 

1046 

1047 self.set_header( 

1048 "Content-Security-Policy", 

1049 "default-src 'self';" 

1050 f"script-src {' '.join(script_src)};" 

1051 f"connect-src {' '.join(connect_src)};" 

1052 "style-src 'self' 'unsafe-inline';" 

1053 "img-src 'self' https://img.zeit.de https://github.asozial.org;" 

1054 "frame-ancestors 'self';" 

1055 "sandbox allow-downloads allow-same-origin allow-modals" 

1056 " allow-popups-to-escape-sandbox allow-scripts allow-popups" 

1057 " allow-top-navigation-by-user-activation allow-forms;" 

1058 "report-to default;" 

1059 "base-uri 'none';" 

1060 + ( 

1061 f"report-uri {self.get_reporting_api_endpoint()};" 

1062 if self.settings.get("REPORTING") 

1063 else "" 

1064 ), 

1065 ) 

1066 

1067 @override 

1068 def set_default_headers(self) -> None: 

1069 """Set default headers.""" 

1070 self.set_csp_header() 

1071 self.active_origin_trials = set() 

1072 if self.settings.get("REPORTING"): 

1073 endpoint = self.get_reporting_api_endpoint() 

1074 self.set_header( 

1075 "Reporting-Endpoints", 

1076 f'default="{endpoint}"', # noqa: B907 

1077 ) 

1078 self.set_header( 

1079 "Report-To", 

1080 json.dumps( 

1081 { 

1082 "group": "default", 

1083 "max_age": 2592000, 

1084 "endpoints": [{"url": endpoint}], 

1085 }, 

1086 option=ORJSON_OPTIONS, 

1087 ), 

1088 ) 

1089 self.set_header("NEL", '{"report_to":"default","max_age":2592000}') 

1090 self.set_header("X-Content-Type-Options", "nosniff") 

1091 self.set_header("Access-Control-Max-Age", "7200") 

1092 self.set_header("Access-Control-Allow-Origin", "*") 

1093 self.set_header("Access-Control-Allow-Headers", "*") 

1094 self.set_header( 

1095 "Access-Control-Allow-Methods", 

1096 ", ".join(self.get_allowed_methods()), 

1097 ) 

1098 self.set_header("Cross-Origin-Resource-Policy", "cross-origin") 

1099 self.set_header( 

1100 "Permissions-Policy", 

1101 "browsing-topics=()," 

1102 "identity-credentials-get=()," 

1103 "join-ad-interest-group=()," 

1104 "private-state-token-issuance=()," 

1105 "private-state-token-redemption=()," 

1106 "run-ad-auction=()", 

1107 ) 

1108 self.set_header("Referrer-Policy", "same-origin") 

1109 self.set_header( 

1110 "Cross-Origin-Opener-Policy", "same-origin;report-to=default" 

1111 ) 

1112 if self.request.path == "/kaenguru-comics-alt": # TODO: improve this 

1113 self.set_header( 

1114 "Cross-Origin-Embedder-Policy", 

1115 "credentialless;report-to=default", 

1116 ) 

1117 else: 

1118 self.set_header( 

1119 "Cross-Origin-Embedder-Policy", 

1120 "require-corp;report-to=default", 

1121 ) 

1122 if self.settings.get("HSTS"): 

1123 self.set_header("Strict-Transport-Security", "max-age=63072000") 

1124 if ( 

1125 onion_address := self.settings.get("ONION_ADDRESS") 

1126 ) and not self.request.host_name.endswith(".onion"): 

1127 self.set_header( 

1128 "Onion-Location", 

1129 onion_address 

1130 + self.request.path 

1131 + (f"?{self.request.query}" if self.request.query else ""), 

1132 ) 

1133 if self.settings.get("debug"): 

1134 self.set_header("X-Debug", bool_to_str(True)) 

1135 for permission in Permission: 

1136 if permission.name: 

1137 self.set_header( 

1138 f"X-Permission-{permission.name}", 

1139 bool_to_str(bool(self.is_authorized(permission))), 

1140 ) 

1141 self.set_header( 

1142 "X-Clacks-Overhead", 

1143 CLACKS_OVERHEADS[ 

1144 int(self.now_utc.microsecond) % len(CLACKS_OVERHEADS) 

1145 ], 

1146 ) 

1147 self.set_header("Accept-CH", "Sec-CH-Prefers-Reduced-Motion") 

1148 self.set_header("Critical-CH", "Sec-CH-Prefers-Reduced-Motion") 

1149 self.set_header( 

1150 "Vary", "Accept,Authorization,Cookie,Sec-CH-Prefers-Reduced-Motion" 

1151 ) 

1152 

1153 set_default_headers.__doc__ = _RequestHandler.set_default_headers.__doc__ 

1154 

1155 def stanley(self) -> bool: 

1156 """Stanley.""" 

1157 return self.user_settings.stanley is not False and ( 

1158 self.now.date() == date(self.now.year, 4, 27) 

1159 or self.user_settings.stanley is True 

1160 ) 

1161 

1162 def sub_stanley(self, text: str) -> str: 

1163 """Sub Stanley.""" 

1164 return regex.sub( 

1165 r"\b\p{Lu}\p{Ll}{4}\p{Ll}*\b", 

1166 lambda match: ( 

1167 "Stanley" 

1168 if Random(match[0]).randrange(5) == self.now.year % 5 

1169 else match[0] 

1170 ), 

1171 text, 

1172 ) 

1173 

1174 @classmethod 

1175 def supports_head(cls) -> bool: 

1176 """Check whether this request handler supports HEAD requests.""" 

1177 signature = inspect.signature(cls.get) 

1178 return ( 

1179 "head" in signature.parameters 

1180 and signature.parameters["head"].kind 

1181 == inspect.Parameter.KEYWORD_ONLY 

1182 ) 

1183 

1184 @cached_property 

1185 def user_settings(self) -> Options: 

1186 """Get the user settings.""" 

1187 return Options(self) 

1188 

1189 @override 

1190 def write(self, chunk: str | bytes | dict[str, Any]) -> None: # noqa: D102 

1191 if self._finished: 

1192 raise RuntimeError("Cannot write() after finish()") 

1193 

1194 self.set_content_type_header() 

1195 

1196 if isinstance(chunk, dict): 

1197 chunk = self.dump(chunk) 

1198 

1199 if self.stanley(): 

1200 if isinstance(chunk, bytes): 

1201 with contextlib.suppress(UnicodeDecodeError): 

1202 chunk = chunk.decode("UTF-8") 

1203 if isinstance(chunk, str): 

1204 chunk = self.sub_stanley(chunk) 

1205 

1206 super().write(chunk) 

1207 

1208 write.__doc__ = _RequestHandler.write.__doc__ 

1209 

1210 @override 

1211 def write_error(self, status_code: int, **kwargs: Any) -> None: 

1212 """Render the error page.""" 

1213 dict_content_types: tuple[str, str] = ( 

1214 "application/json", 

1215 "application/yaml", 

1216 ) 

1217 all_error_content_types: tuple[str, ...] = ( 

1218 # text/plain as first (default), to not screw up output in terminals 

1219 "text/plain", 

1220 "text/html", 

1221 "text/markdown", 

1222 *dict_content_types, 

1223 "application/vnd.asozial.dynload+json", 

1224 ) 

1225 

1226 if self.content_type not in all_error_content_types: 

1227 # don't send 406, instead default with text/plain 

1228 self.handle_accept_header(all_error_content_types, strict=False) 

1229 

1230 if self.content_type == "text/html": 

1231 self.render( # type: ignore[unused-awaitable] 

1232 "error.html", 

1233 status=status_code, 

1234 reason=self.get_error_message(**kwargs), 

1235 description=self.get_error_page_description(status_code), 

1236 is_traceback="exc_info" in kwargs 

1237 and not issubclass(kwargs["exc_info"][0], HTTPError) 

1238 and ( 

1239 self.settings.get("serve_traceback") 

1240 or self.is_authorized(Permission.TRACEBACK) 

1241 ), 

1242 ) 

1243 return 

1244 

1245 if self.content_type in dict_content_types: 

1246 self.finish( # type: ignore[unused-awaitable] 

1247 { 

1248 "status": status_code, 

1249 "reason": self.get_error_message(**kwargs), 

1250 } 

1251 ) 

1252 return 

1253 

1254 self.finish( # type: ignore[unused-awaitable] 

1255 f"{status_code} {self.get_error_message(**kwargs)}\n" 

1256 ) 

1257 

1258 write_error.__doc__ = _RequestHandler.write_error.__doc__