Coverage for an_website/utils/utils.py: 68.937%
367 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-10 18:56 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-10 18:56 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
14"""A module with many useful things used by other modules."""
16import argparse
17import asyncio
18import bisect
19import contextlib
20import io
21import logging
22import random
23import sys
24import time
25from base64 import b85encode
26from collections.abc import (
27 Awaitable,
28 Callable,
29 Collection,
30 Generator,
31 Iterable,
32 Mapping,
33 Set,
34)
35from dataclasses import dataclass, field
36from datetime import datetime, timezone
37from enum import IntFlag
38from functools import cache, partial
39from hashlib import sha1
40from importlib.resources.abc import Traversable
41from ipaddress import IPv4Address, IPv6Address, ip_address, ip_network
42from pathlib import Path
43from typing import (
44 IO,
45 TYPE_CHECKING,
46 Any,
47 Final,
48 Literal,
49 TypeAlias,
50 cast,
51 get_args,
52)
53from urllib.parse import SplitResult, parse_qsl, urlencode, urlsplit, urlunsplit
55import elasticapm
56import regex
57from blake3 import blake3
58from elastic_transport import ApiError, TransportError
59from elasticsearch import AsyncElasticsearch
60from geoip import geolite2 # type: ignore[import-untyped]
61from openmoji_dist import VERSION as OPENMOJI_VERSION
62from rapidfuzz.distance.Levenshtein import distance
63from redis.asyncio import Redis
64from tornado.web import HTTPError, RequestHandler
65from typed_stream import Stream
66from UltraDict import UltraDict # type: ignore[import-untyped]
68from .. import DIR as ROOT_DIR, pytest_is_running
70if TYPE_CHECKING:
71 from .background_tasks import BackgroundTask
73LOGGER: Final = logging.getLogger(__name__)
75type Handler = (
76 tuple[str, type[RequestHandler]]
77 | tuple[str, type[RequestHandler], dict[str, Any]]
78 | tuple[str, type[RequestHandler], dict[str, Any], str]
79)
81type OpenMojiValue = Literal[False, "img", "glyf_colr1", "glyf_colr0"]
82BumpscosityValue: TypeAlias = Literal[0, 1, 12, 50, 76, 100, 1000]
83BUMPSCOSITY_VALUES: Final[tuple[BumpscosityValue, ...]] = get_args(
84 BumpscosityValue
85)
87PRINT = int.from_bytes((ROOT_DIR / "primes.bin").read_bytes(), "big")
89IP_HASH_SALT: Final = {
90 "date": datetime.now(timezone.utc).date(),
91 "hasher": blake3(
92 blake3(
93 datetime.now(timezone.utc).date().isoformat().encode("ASCII")
94 ).digest()
95 ),
96}
98SUS_PATHS: Final[Set[str]] = {
99 "/-profiler/phpinfo",
100 "/.aws/credentials",
101 "/.env",
102 "/.env.bak",
103 "/.ftpconfig",
104 "/admin/controller/extension/extension",
105 "/assets/filemanager/dialog",
106 "/assets/vendor/server/php",
107 "/aws.yml",
108 "/boaform/admin/formlogin",
109 "/phpinfo",
110 "/public/assets/jquery-file-upload/server/php",
111 "/root",
112 "/settings/aws.yml",
113 "/uploads",
114 "/vendor/phpunit/phpunit/src/util/php/eval-stdin",
115 "/wordpress",
116 "/wp",
117 "/wp-admin",
118 "/wp-admin/css",
119 "/wp-includes",
120 "/wp-login",
121 "/wp-upload",
122}
125class ArgparseNamespace(argparse.Namespace):
126 """A class to fake type hints for argparse.Namespace."""
128 # pylint: disable=too-few-public-methods
129 __slots__ = ("config", "save_config_to", "version", "verbose")
131 config: list[Path]
132 save_config_to: Path | None
133 version: bool
134 verbose: int
137class AwaitableValue[T](Awaitable[T]):
138 # pylint: disable=too-few-public-methods
139 """An awaitable that always returns the same value."""
141 def __await__(self) -> Generator[None, None, T]:
142 """Return the value."""
143 yield
144 return self._value
146 def __init__(self, value: T) -> None:
147 """Set the value."""
148 self._value = value
151class Permission(IntFlag):
152 """Permissions for accessing restricted stuff."""
154 RATELIMITS = 1
155 TRACEBACK = 2
156 BACKDOOR = 4
157 UPDATE = 8
158 REPORTING = 16
159 SHORTEN = 32
160 UPLOAD = 64
163class Timer:
164 """Timer class used for timing stuff."""
166 __slots__ = ("_execution_time", "_start_time")
168 _execution_time: int
170 def __init__(self) -> None:
171 """Start the timer."""
172 self._start_time = time.perf_counter_ns()
174 def get(self) -> float:
175 """Get the execution time in seconds."""
176 return self.get_ns() / 1_000_000_000
178 def get_ns(self) -> int:
179 """Get the execution time in nanoseconds."""
180 assert hasattr(self, "_execution_time"), "Timer not stopped yet"
181 return self._execution_time
183 def stop(self) -> float:
184 """Stop the timer and get the execution time in seconds."""
185 return self.stop_ns() / 1_000_000_000
187 def stop_ns(self) -> int:
188 """Stop the timer and get the execution time in nanoseconds."""
189 assert not hasattr(self, "_execution_time"), "Timer already stopped"
190 self._execution_time = time.perf_counter_ns() - self._start_time
191 return self._execution_time
194@cache
195def add_args_to_url(url: str | SplitResult, **kwargs: object) -> str:
196 """Add query arguments to a URL."""
197 if isinstance(url, str):
198 url = urlsplit(url)
200 if not kwargs:
201 return url.geturl()
203 url_args: dict[str, str] = dict(
204 parse_qsl(url.query, keep_blank_values=True)
205 )
207 for key, value in kwargs.items():
208 if value is None:
209 if key in url_args:
210 del url_args[key]
211 # pylint: disable-next=confusing-consecutive-elif
212 elif isinstance(value, bool):
213 url_args[key] = bool_to_str(value)
214 else:
215 url_args[key] = str(value)
217 return urlunsplit(
218 (
219 url.scheme,
220 url.netloc,
221 url.path,
222 urlencode(url_args),
223 url.fragment,
224 )
225 )
228def anonymize_ip[A: (str, None, str | None)]( # noqa: D103
229 address: A, *, ignore_invalid: bool = False
230) -> A:
231 """Anonymize an IP address."""
232 if address is None:
233 return None
235 address = address.strip()
237 try:
238 version = ip_address(address).version
239 except ValueError:
240 if ignore_invalid:
241 return address
242 raise
244 if version == 4:
245 return str(ip_network(address + "/24", strict=False).network_address)
246 if version == 6:
247 return str(ip_network(address + "/48", strict=False).network_address)
249 raise HTTPError(reason="ERROR: -41")
252ansi_replace = partial(regex.sub, "\033" + r"\[-?\d+[a-zA-Z]", "")
253ansi_replace.__doc__ = "Remove ANSI escape sequences from a string."
256def apm_anonymization_processor(
257 # pylint: disable-next=unused-argument
258 client: elasticapm.Client,
259 event: dict[str, Any],
260) -> dict[str, Any]:
261 """Anonymize an APM event."""
262 if "context" in event and "request" in event["context"]:
263 request = event["context"]["request"]
264 if "url" in request and "pathname" in request["url"]:
265 path = request["url"]["pathname"]
266 if path == "/robots.txt" or path.lower() in SUS_PATHS:
267 return event
268 if "socket" in request and "remote_address" in request["socket"]:
269 request["socket"]["remote_address"] = anonymize_ip(
270 request["socket"]["remote_address"]
271 )
272 if "headers" in request:
273 headers = request["headers"]
274 if "X-Forwarded-For" in headers:
275 headers["X-Forwarded-For"] = ", ".join(
276 anonymize_ip(ip.strip(), ignore_invalid=True)
277 for ip in headers["X-Forwarded-For"].split(",")
278 )
279 for header in headers:
280 if "ip" in header.lower().split("-"):
281 headers[header] = anonymize_ip(
282 headers[header], ignore_invalid=True
283 )
284 return event
287def apply[V, Ret](value: V, fun: Callable[[V], Ret]) -> Ret: # noqa: D103
288 """Apply a function to a value and return the result."""
289 return fun(value)
292backspace_replace = partial(regex.sub, ".?\x08", "")
293backspace_replace.__doc__ = "Remove backspaces from a string."
296def bool_to_str(val: bool) -> str:
297 """Convert a boolean to sure/nope."""
298 return "sure" if val else "nope"
301def bounded_edit_distance(s1: str, s2: str, /, k: int) -> int:
302 """Return a bounded edit distance between two strings.
304 k is the maximum number returned
305 """
306 if (dist := distance(s1, s2, score_cutoff=k)) == k + 1:
307 return k
308 return dist
311def country_code_to_flag(code: str) -> str:
312 """Convert a two-letter ISO country code to a flag emoji."""
313 return "".join(chr(ord(char) + 23 * 29 * 191) for char in code.upper())
316def create_argument_parser() -> argparse.ArgumentParser:
317 """Parse command line arguments."""
318 parser = argparse.ArgumentParser()
319 parser.add_argument(
320 "--version",
321 help="show the version of the website",
322 action="store_true",
323 default=False,
324 )
325 parser.add_argument(
326 "--verbose",
327 action="count",
328 default=0,
329 )
330 parser.add_argument(
331 "-c",
332 "--config",
333 default=[Path("config.ini")],
334 help="the path to the config file",
335 metavar="PATH",
336 nargs="*",
337 type=Path,
338 )
339 parser.add_argument(
340 "--save-config-to",
341 default=None,
342 help="save the configuration to a file",
343 metavar="Path",
344 nargs="?",
345 type=Path,
346 )
347 return parser
350def emoji2html(emoji: str) -> str:
351 """Convert an emoji to HTML."""
352 return f"<img src={emoji2url(emoji)!r} alt={emoji!r} class='emoji'>"
355def emoji2url(emoji: str) -> str:
356 """Convert an emoji to an URL."""
357 if len(emoji) == 2:
358 emoji = emoji.removesuffix("\uFE0F")
359 code = "-".join(f"{ord(c):04x}" for c in emoji)
360 return f"/static/openmoji/svg/{code.upper()}.svg?v={OPENMOJI_VERSION}"
363if sys.flags.dev_mode and not pytest_is_running():
364 __origignal_emoji2url = emoji2url
366 def emoji2url(emoji: str) -> str: # pylint: disable=function-redefined
367 """Convert an emoji to an URL."""
368 import openmoji_dist # pylint: disable=import-outside-toplevel
369 from emoji import is_emoji # pylint: disable=import-outside-toplevel
371 assert is_emoji(emoji), f"{emoji} needs to be emoji"
372 result = __origignal_emoji2url(emoji)
373 file = (
374 openmoji_dist.get_openmoji_data()
375 / result.removeprefix("/static/openmoji/").split("?")[0]
376 )
377 assert file.is_file(), f"{file} needs to exist"
378 return result
381EMOJI_MAPPING: Final[Mapping[str, str]] = {
382 "⁉": "⁉",
383 "‼": "‼",
384 "?": "❓",
385 "!": "❗",
386 "-": "➖",
387 "+": "➕",
388 "\U0001F51F": "\U0001F51F",
389}
392async def geoip(
393 ip: None | str,
394 database: str = "GeoLite2-City.mmdb",
395 elasticsearch: None | AsyncElasticsearch = None,
396 *,
397 allow_fallback: bool = True,
398 caches: dict[str, dict[str, dict[str, Any]]] = UltraDict(), # noqa: B008
399) -> None | dict[str, Any]:
400 """Get GeoIP information."""
401 # pylint: disable=too-complex
402 if not ip:
403 return None
405 # pylint: disable-next=redefined-outer-name
406 cache = caches.get(ip, {})
407 if database not in cache:
408 if not elasticsearch:
409 if allow_fallback and database in {
410 "GeoLite2-City.mmdb",
411 "GeoLite2-Country.mmdb",
412 }:
413 return geoip_fallback(
414 ip, country=database == "GeoLite2-City.mmdb"
415 )
416 return None
418 properties: None | tuple[str, ...]
419 if database == "GeoLite2-City.mmdb":
420 properties = (
421 "continent_name",
422 "country_iso_code",
423 "country_name",
424 "region_iso_code",
425 "region_name",
426 "city_name",
427 "location",
428 "timezone",
429 )
430 elif database == "GeoLite2-Country.mmdb":
431 properties = (
432 "continent_name",
433 "country_iso_code",
434 "country_name",
435 )
436 elif database == "GeoLite2-ASN.mmdb":
437 properties = ("asn", "network", "organization_name")
438 else:
439 properties = None
441 try:
442 cache[database] = (
443 await elasticsearch.ingest.simulate(
444 pipeline={
445 "processors": [
446 {
447 "geoip": {
448 "field": "ip",
449 "database_file": database,
450 "properties": properties,
451 }
452 }
453 ]
454 },
455 docs=[{"_source": {"ip": ip}}],
456 filter_path="docs.doc._source",
457 )
458 )["docs"][0]["doc"]["_source"].get("geoip", {})
459 except ApiError, TransportError:
460 if allow_fallback and database in {
461 "GeoLite2-City.mmdb",
462 "GeoLite2-Country.mmdb",
463 }:
464 return geoip_fallback(
465 ip, country=database == "GeoLite2-City.mmdb"
466 )
467 raise
469 if "country_iso_code" in cache[database]:
470 cache[database]["country_flag"] = country_code_to_flag(
471 cache[database]["country_iso_code"]
472 )
474 caches[ip] = cache
475 return cache[database]
478def geoip_fallback(ip: str, country: bool = False) -> None | dict[str, Any]:
479 """Get GeoIP information without using Elasticsearch."""
480 if not (info := geolite2.lookup(ip)):
481 return None
483 info_dict = info.get_info_dict()
485 continent_name = info_dict.get("continent", {}).get("names", {}).get("en")
486 country_iso_code = info_dict.get("country", {}).get("iso_code")
487 country_name = info_dict.get("country", {}).get("names", {}).get("en")
489 data = {
490 "continent_name": continent_name,
491 "country_iso_code": country_iso_code,
492 "country_name": country_name,
493 }
495 if data["country_iso_code"]:
496 data["country_flag"] = country_code_to_flag(data["country_iso_code"])
498 if country:
499 for key, value in tuple(data.items()):
500 if not value:
501 del data[key]
503 return data
505 latitude = info_dict.get("location", {}).get("latitude")
506 longitude = info_dict.get("location", {}).get("longitude")
507 location = (latitude, longitude) if latitude and longitude else None
508 time_zone = info_dict.get("location", {}).get("time_zone")
510 data.update({"location": location, "timezone": time_zone})
512 for key, value in tuple(data.items()):
513 if not value:
514 del data[key]
516 return data
519def get_arguments_without_help() -> tuple[str, ...]:
520 """Get arguments without help."""
521 return tuple(arg for arg in sys.argv[1:] if arg not in {"-h", "--help"})
524def get_close_matches( # based on difflib.get_close_matches
525 word: str,
526 possibilities: Iterable[str],
527 count: int = 3,
528 cutoff: float = 0.5,
529) -> tuple[str, ...]:
530 """Use normalized_distance to return list of the best "good enough" matches.
532 word is a sequence for which close matches are desired (typically a string).
534 possibilities is a list of sequences against which to match word
535 (typically a list of strings).
537 Optional arg count (default 3) is the maximum number of close matches to
538 return. count must be > 0.
540 Optional arg cutoff (default 0.5) is a float in [0, 1]. Possibilities
541 that don't score at least that similar to word are ignored.
543 The best (no more than count) matches among the possibilities are returned
544 in a tuple, sorted by similarity score, most similar first.
545 """
546 if count <= 0:
547 raise ValueError(f"count must be > 0: {count}")
548 if not 0.0 <= cutoff <= 1.0:
549 raise ValueError(f"cutoff must be in [0.0, 1.0]: {cutoff}")
550 word_len = len(word)
551 if not word_len:
552 if cutoff < 1.0:
553 return ()
554 return Stream(possibilities).limit(count).collect(tuple)
555 result: list[tuple[float, str]] = []
556 for possibility in possibilities:
557 if max_dist := max(word_len, len(possibility)):
558 dist = bounded_edit_distance(
559 possibility, word, 1 + int(cutoff * max_dist)
560 )
561 if (ratio := dist / max_dist) <= cutoff:
562 bisect.insort(result, (ratio, possibility))
563 if len(result) > count:
564 result.pop(-1)
565 # Strip scores for the best count matches
566 return tuple(word for score, word in result)
569def hash_bytes(*args: bytes, hasher: Any = None, size: int = 32) -> str:
570 """Hash bytes and return the Base85 representation."""
571 digest: bytes
572 if not hasher:
573 hasher = blake3()
574 for arg in args:
575 hasher.update(arg)
576 digest = (
577 hasher.digest(size)
578 if isinstance(hasher, blake3)
579 else hasher.digest()[:size]
580 )
581 return b85encode(digest).decode("ASCII")
584def hash_ip(
585 address: None | str | IPv4Address | IPv6Address, size: int = 32
586) -> str:
587 """Hash an IP address."""
588 if isinstance(address, str):
589 address = ip_address(address)
590 if IP_HASH_SALT["date"] != (date := datetime.now(timezone.utc).date()):
591 IP_HASH_SALT["hasher"] = blake3(
592 blake3(date.isoformat().encode("ASCII")).digest()
593 )
594 IP_HASH_SALT["date"] = date
595 return hash_bytes(
596 address.packed if address else b"",
597 hasher=IP_HASH_SALT["hasher"].copy(), # type: ignore[attr-defined]
598 size=size,
599 )
602def is_in_european_union(ip: None | str) -> None | bool:
603 """Return whether the specified address is in the EU."""
604 if not (ip and (info := geolite2.lookup(ip))):
605 return None
607 return cast(bool, info.get_info_dict().get("is_in_european_union", False))
610def is_prime(number: int) -> bool:
611 """Return whether the specified number is prime."""
612 if not number % 2:
613 return number == 2
614 return bool(PRINT & (1 << (number // 2)))
617def length_of_match(match: regex.Match[Any]) -> int:
618 """Calculate the length of the regex match and return it."""
619 return match.end() - match.start()
622def n_from_set[T](set_: Set[T], n: int) -> set[T]: # noqa: D103
623 """Get and return n elements of the set as a new set."""
624 new_set = set()
625 for i, element in enumerate(set_):
626 if i >= n:
627 break
628 new_set.add(element)
629 return new_set
632def name_to_id(val: str) -> str:
633 """Replace umlauts and whitespaces in a string to get a valid HTML id."""
634 return regex.sub(
635 r"[^a-z0-9]+",
636 "-",
637 replace_umlauts(val).lower(),
638 ).strip("-")
641def none_to_default[T, D](value: None | T, default: D) -> D | T: # noqa: D103
642 """Like ?? in ECMAScript."""
643 return default if value is None else value
646def parse_bumpscosity(value: str | int | None) -> BumpscosityValue:
647 """Parse a string to a valid bumpscosity value."""
648 if isinstance(value, str):
649 with contextlib.suppress(ValueError):
650 value = int(value, base=0)
651 if value in BUMPSCOSITY_VALUES:
652 return cast(BumpscosityValue, value)
653 return random.Random(repr(value)).choice(BUMPSCOSITY_VALUES)
656def parse_openmoji_arg(value: str, default: OpenMojiValue) -> OpenMojiValue:
657 """Parse the openmoji arg into a Literal."""
658 value = value.lower()
659 if value == "glyf_colr0":
660 return "glyf_colr0"
661 if value == "glyf_colr1":
662 return "glyf_colr1"
663 if value in {"i", "img"}:
664 return "img"
665 if value in {"n", "nope"}:
666 return False
667 return default
670# pylint: disable-next=too-many-arguments
671async def ratelimit(
672 redis: Redis[str],
673 redis_prefix: str,
674 remote_ip: str,
675 *,
676 bucket: None | str,
677 max_burst: int,
678 count_per_period: int,
679 period: int,
680 tokens: int,
681) -> tuple[bool, dict[str, str]]:
682 """Take b1nzy to space using Redis."""
683 remote_ip = hash_bytes(remote_ip.encode("ASCII"))
684 key = f"{redis_prefix}:ratelimit:{remote_ip}"
685 if bucket:
686 key = f"{key}:{bucket}"
688 # see: https://github.com/brandur/redis-cell#usage
689 result = await redis.execute_command(
690 # type: ignore[no-untyped-call]
691 "CL.THROTTLE",
692 key,
693 max_burst,
694 count_per_period,
695 period,
696 tokens,
697 )
699 now = time.time()
701 headers: dict[str, str] = {}
703 if result[0]:
704 headers["Retry-After"] = str(result[3])
705 if not bucket:
706 headers["X-RateLimit-Global"] = "true"
708 if bucket:
709 headers["X-RateLimit-Limit"] = str(result[1])
710 headers["X-RateLimit-Remaining"] = str(result[2])
711 headers["X-RateLimit-Reset"] = str(now + result[4])
712 headers["X-RateLimit-Reset-After"] = str(result[4])
713 headers["X-RateLimit-Bucket"] = hash_bytes(bucket.encode("ASCII"))
715 return bool(result[0]), headers
718def remove_suffix_ignore_case(string: str, suffix: str) -> str:
719 """Remove a suffix without caring about the case."""
720 if string.lower().endswith(suffix.lower()):
721 return string[: -len(suffix)]
722 return string
725def replace_umlauts(string: str) -> str:
726 """Replace Ä, Ö, Ü, ẞ, ä, ö, ü, ß in string."""
727 if string.isupper():
728 return (
729 string.replace("Ä", "AE")
730 .replace("Ö", "OE")
731 .replace("Ü", "UE")
732 .replace("ẞ", "SS")
733 )
734 if " " in string:
735 return " ".join(replace_umlauts(word) for word in string.split(" "))
736 return (
737 string.replace("ä", "ae")
738 .replace("ö", "oe")
739 .replace("ü", "ue")
740 .replace("ß", "ss")
741 .replace("Ä", "Ae")
742 .replace("Ö", "Oe")
743 .replace("Ü", "Ue")
744 .replace("ẞ", "SS")
745 )
748async def run(
749 program: str,
750 *args: str,
751 stdin: int | IO[Any] = asyncio.subprocess.DEVNULL,
752 stdout: None | int | IO[Any] = asyncio.subprocess.PIPE,
753 stderr: None | int | IO[Any] = asyncio.subprocess.PIPE,
754 **kwargs: Any,
755) -> tuple[None | int, bytes, bytes]:
756 """Run a programm and return the exit code, stdout and stderr as tuple."""
757 proc = await asyncio.create_subprocess_exec(
758 program,
759 *args,
760 stdin=stdin,
761 stdout=stdout,
762 stderr=stderr,
763 **kwargs,
764 )
765 output = await proc.communicate()
766 return proc.returncode, *output
769def size_of_file(file: Traversable) -> int:
770 """Calculate the size of a file."""
771 if isinstance(file, Path):
772 return file.stat().st_size
774 with file.open("rb") as data:
775 if data.seekable():
776 data.seek(0, io.SEEK_END)
777 return data.tell()
779 return sum(map(len, data)) # pylint: disable=bad-builtin
782def str_to_bool(val: None | str | bool, default: None | bool = None) -> bool:
783 """Convert a string representation of truth to True or False."""
784 if isinstance(val, bool):
785 return val
786 if isinstance(val, str):
787 val = val.lower()
788 if val in {
789 "1",
790 "a",
791 "accept",
792 "e",
793 "enabled",
794 "on",
795 "s",
796 "sure",
797 "t",
798 "true",
799 "y",
800 "yes",
801 }:
802 return True
803 if val in {
804 "0",
805 "d",
806 "disabled",
807 "f",
808 "false",
809 "n",
810 "no",
811 "nope",
812 "off",
813 "r",
814 "reject",
815 }:
816 return False
817 if val in {"idc", "maybe", "random"}:
818 return bool(random.randrange(2)) # nosec: B311
819 if default is None:
820 raise ValueError(f"Invalid bool value: {val!r}")
821 return default
824def str_to_set(string: str) -> set[str]:
825 """Convert a string to a set of strings."""
826 return {part.strip() for part in string.split(",") if part.strip()}
829def strangle(string: str) -> float:
830 """Convert a string to an angle."""
831 hasher = sha1(string.encode("UTF-8"), usedforsecurity=False)
832 return int.from_bytes(hasher.digest()[:2], "little") / (1 << 16) * 360
835def time_function[ # noqa: D103
836 T, **P # fmt: skip
837](
838 function: Callable[P, T], *args: P.args, **kwargs: P.kwargs
839) -> tuple[T, float]:
840 """Run the function and return the result and the time it took in seconds."""
841 timer = Timer()
842 return function(*args, **kwargs), timer.stop()
845def time_to_str(spam: float) -> str:
846 """Convert the time into a string with second precision."""
847 int_time = int(spam)
848 div_60 = int(int_time / 60)
849 div_60_60 = int(div_60 / 60)
851 return (
852 f"{int(div_60_60 / 24)}d "
853 f"{div_60_60 % 24}h "
854 f"{div_60 % 60}min "
855 f"{int_time % 60}s"
856 )
859@dataclass(order=True, frozen=True, slots=True)
860class PageInfo:
861 """The PageInfo class that is used for the subpages of a ModuleInfo."""
863 name: str
864 description: str
865 path: None | str = None
866 # keywords that can be used for searching
867 keywords: tuple[str, ...] = field(default_factory=tuple)
868 hidden: bool = False # whether to hide this page info on the page
869 short_name: None | str = None # short name for the page
872@dataclass(order=True, frozen=True, slots=True)
873class ModuleInfo(PageInfo):
874 """
875 The ModuleInfo class adds handlers and subpages to the PageInfo.
877 This gets created by every module to add the handlers.
878 """
880 handlers: tuple[Handler, ...] = field(default_factory=tuple[Handler, ...])
881 sub_pages: tuple[PageInfo, ...] = field(default_factory=tuple)
882 aliases: tuple[str, ...] | Mapping[str, str] = field(default_factory=tuple)
883 required_background_tasks: Collection[BackgroundTask] = field(
884 default_factory=frozenset
885 )
887 def get_keywords_as_str(self, path: str) -> str:
888 """Get the keywords as comma-seperated string."""
889 page_info = self.get_page_info(path)
890 if self != page_info:
891 return ", ".join((*self.keywords, *page_info.keywords))
893 return ", ".join(self.keywords)
895 def get_page_info(self, path: str) -> PageInfo:
896 """Get the PageInfo of the specified path."""
897 if self.path == path:
898 return self
900 for page_info in self.sub_pages:
901 if page_info.path == path:
902 return page_info
904 return self