Coverage for an_website / main.py: 79.279%
222 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-15 14:36 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-15 14:36 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
13# pylint: disable=import-private-name, too-many-lines
15"""
16The website of the AN.
18Loads config and modules and starts Tornado.
19"""
22import asyncio
23import atexit
24import importlib
25import logging
26import os
27import platform
28import signal
29import ssl
30import sys
31import threading
32import time
33import types
34import uuid
35from asyncio import AbstractEventLoop
36from asyncio.runners import _cancel_all_tasks # type: ignore[attr-defined]
37from base64 import b64encode
38from collections.abc import Callable, Iterable, Mapping, MutableSequence
39from configparser import ConfigParser
40from functools import partial
41from hashlib import sha256
42from multiprocessing.process import _children # type: ignore[attr-defined]
43from pathlib import Path
44from socket import socket
45from typing import Any, Final, TypedDict, TypeGuard, cast
46from warnings import catch_warnings, simplefilter
47from zoneinfo import ZoneInfo
49import regex
50from Crypto.Hash import RIPEMD160
51from ecs_logging import StdlibFormatter
52from elasticapm.contrib.tornado import ElasticAPM
53from redis.asyncio import (
54 BlockingConnectionPool,
55 Redis,
56 SSLConnection,
57 UnixDomainSocketConnection,
58)
59from setproctitle import setproctitle
60from tornado.httpserver import HTTPServer
61from tornado.log import LogFormatter
62from tornado.netutil import bind_sockets, bind_unix_socket
63from tornado.process import fork_processes, task_id
64from tornado.web import Application, RedirectHandler
65from typed_stream import Stream
67from . import (
68 CA_BUNDLE_PATH,
69 DIR,
70 EVENT_SHUTDOWN,
71 NAME,
72 TEMPLATES_DIR,
73 UPTIME,
74 VERSION,
75 pytest_is_running,
76)
77from .contact.contact import apply_contact_stuff_to_app
78from .utils import background_tasks, static_file_handling
79from .utils.base_request_handler import BaseRequestHandler, request_ctx_var
80from .utils.better_config_parser import BetterConfigParser
81from .utils.elasticsearch_setup import setup_elasticsearch
82from .utils.logging import WebhookFormatter, WebhookHandler
83from .utils.request_handler import NotFoundHandler
84from .utils.static_file_from_traversable import TraversableStaticFileHandler
85from .utils.template_loader import TemplateLoader
86from .utils.utils import (
87 ArgparseNamespace,
88 Handler,
89 ModuleInfo,
90 Permission,
91 Timer,
92 create_argument_parser,
93 geoip,
94 get_arguments_without_help,
95 time_function,
96)
98try:
99 import perf8 # type: ignore[import, unused-ignore]
100except ModuleNotFoundError:
101 perf8 = None # pylint: disable=invalid-name
103IGNORED_MODULES: Final[set[str]] = {
104 "patches",
105 "static",
106 "templates",
107} | (set() if sys.flags.dev_mode or pytest_is_running() else {"example"})
109LOGGER: Final = logging.getLogger(__name__)
112# add all the information from the packages to a list
113# this calls the get_module_info function in every file
114# files and dirs starting with '_' get ignored
115def get_module_infos() -> str | tuple[ModuleInfo, ...]:
116 """Import the modules and return the loaded module infos in a tuple."""
117 module_infos: list[ModuleInfo] = []
118 loaded_modules: list[str] = []
119 errors: list[str] = []
121 for potential_module in DIR.iterdir():
122 if (
123 potential_module.name.startswith("_")
124 or potential_module.name in IGNORED_MODULES
125 or not potential_module.is_dir()
126 ):
127 continue
129 _module_infos = get_module_infos_from_module(
130 potential_module.name, errors, ignore_not_found=True
131 )
132 if _module_infos:
133 module_infos.extend(_module_infos)
134 loaded_modules.append(potential_module.name)
135 LOGGER.debug(
136 (
137 "Found module_infos in %s.__init__.py, "
138 "not searching in other modules in the package."
139 ),
140 potential_module,
141 )
142 continue
144 if f"{potential_module.name}.*" in IGNORED_MODULES:
145 continue
147 for potential_file in potential_module.iterdir():
148 module_name = f"{potential_module.name}.{potential_file.name[:-3]}"
149 if (
150 not potential_file.name.endswith(".py")
151 or module_name in IGNORED_MODULES
152 or potential_file.name.startswith("_")
153 ):
154 continue
155 _module_infos = get_module_infos_from_module(module_name, errors)
156 if _module_infos:
157 module_infos.extend(_module_infos)
158 loaded_modules.append(module_name)
160 if len(errors) > 0:
161 if sys.flags.dev_mode:
162 # exit to make sure it gets fixed
163 return "\n".join(errors)
164 # don't exit in production to keep stuff running
165 LOGGER.error("\n".join(errors))
167 LOGGER.info(
168 "Loaded %d modules: '%s'",
169 len(loaded_modules),
170 "', '".join(loaded_modules),
171 )
173 LOGGER.info(
174 "Ignored %d modules: '%s'",
175 len(IGNORED_MODULES),
176 "', '".join(IGNORED_MODULES),
177 )
179 sort_module_infos(module_infos)
181 # make module_infos immutable so it never changes
182 return tuple(module_infos)
185def get_module_infos_from_module(
186 module_name: str,
187 errors: MutableSequence[str], # gets modified
188 ignore_not_found: bool = False,
189) -> None | list[ModuleInfo]:
190 """Get the module infos based on a module."""
191 import_timer = Timer()
192 module = importlib.import_module(
193 f".{module_name}",
194 package="an_website",
195 )
196 if import_timer.stop() > 0.1:
197 LOGGER.warning(
198 "Import of %s took %ss. That's affecting the startup time.",
199 module_name,
200 import_timer.get(),
201 )
203 module_infos: list[ModuleInfo] = []
205 has_get_module_info = "get_module_info" in dir(module)
206 has_get_module_infos = "get_module_infos" in dir(module)
208 if not (has_get_module_info or has_get_module_infos):
209 if ignore_not_found:
210 return None
211 errors.append(
212 f"{module_name} has no 'get_module_info' and no 'get_module_infos' "
213 "method. Please add at least one of the methods or add "
214 f"'{module_name.rsplit('.', 1)[0]}.*' or {module_name!r} to "
215 "IGNORED_MODULES."
216 )
217 return None
219 if has_get_module_info and isinstance(
220 module_info := module.get_module_info(),
221 ModuleInfo,
222 ):
223 module_infos.append(module_info)
224 elif has_get_module_info:
225 errors.append(
226 f"'get_module_info' in {module_name} does not return ModuleInfo. "
227 "Please fix the returned value."
228 )
230 if not has_get_module_infos:
231 return module_infos or None
233 _module_infos = module.get_module_infos()
235 if not isinstance(_module_infos, Iterable):
236 errors.append(
237 f"'get_module_infos' in {module_name} does not return an Iterable. "
238 "Please fix the returned value."
239 )
240 return module_infos or None
242 for _module_info in _module_infos:
243 if isinstance(_module_info, ModuleInfo):
244 module_infos.append(_module_info)
245 else:
246 errors.append(
247 f"'get_module_infos' in {module_name} did return an Iterable "
248 f"with an element of type {type(_module_info)}. "
249 "Please fix the returned value."
250 )
252 return module_infos or None
255def sort_module_infos(module_infos: list[ModuleInfo]) -> None:
256 """Sort a list of module info and move the main page to the top."""
257 # sort it so the order makes sense
258 module_infos.sort()
260 # move the main page to the top
261 for i, info in enumerate(module_infos):
262 if info.path == "/":
263 module_infos.insert(0, module_infos.pop(i))
264 break
267def get_all_handlers(module_infos: Iterable[ModuleInfo]) -> list[Handler]:
268 """
269 Parse the module information and return the handlers in a tuple.
271 If a handler has only 2 elements a dict with title and description
272 gets added. This information is gotten from the module info.
273 """
274 handler: Handler | list[Any]
275 handlers: list[Handler] = static_file_handling.get_handlers()
277 # add all the normal handlers
278 for module_info in module_infos:
279 for handler in module_info.handlers:
280 handler = list(handler) # pylint: disable=redefined-loop-name
281 # if the handler is a request handler from us
282 # and not a built-in like StaticFileHandler & RedirectHandler
283 if issubclass(handler[1], BaseRequestHandler):
284 if len(handler) == 2:
285 # set "default_title" or "default_description" to False so
286 # that module_info.name & module_info.description get used
287 handler.append(
288 {
289 "default_title": False,
290 "default_description": False,
291 "module_info": module_info,
292 }
293 )
294 else:
295 handler[2]["module_info"] = module_info
296 handlers.append(tuple(handler))
298 # redirect handler, to make finding APIs easier
299 handlers.append((r"/(.+)/api/*", RedirectHandler, {"url": "/api/{0}"}))
301 handlers.append(
302 (
303 r"(?i)/\.well-known/(.*)",
304 TraversableStaticFileHandler,
305 {
306 "root": Path(".well-known"),
307 "headers": (("Access-Control-Allow-Origin", "*"),),
308 },
309 )
310 )
312 LOGGER.debug("Loaded %d handlers", len(handlers))
314 return handlers
317def ignore_modules(config: BetterConfigParser) -> None:
318 """Read ignored modules from the config."""
319 IGNORED_MODULES.update(
320 config.getset("GENERAL", "IGNORED_MODULES", fallback=set())
321 )
324def get_normed_paths_from_module_infos(
325 module_infos: Iterable[ModuleInfo],
326) -> dict[str, str]:
327 """Get all paths from the module infos."""
329 def tuple_has_no_none(
330 value: tuple[str | None, str | None],
331 ) -> TypeGuard[tuple[str, str]]:
332 return None not in value
334 def info_to_paths(info: ModuleInfo) -> Stream[tuple[str, str]]:
335 return (
336 Stream(((info.path, info.path),))
337 .chain(
338 info.aliases.items()
339 if isinstance(info.aliases, Mapping)
340 else ((alias, info.path) for alias in info.aliases)
341 )
342 .chain(
343 Stream(info.sub_pages)
344 .map(lambda sub_info: sub_info.path)
345 .filter()
346 .map(lambda path: (path, path))
347 )
348 .filter(tuple_has_no_none)
349 )
351 return (
352 Stream(module_infos)
353 .flat_map(info_to_paths)
354 .filter(lambda p: p[0].startswith("/"))
355 .map(lambda p: (p[0].strip("/").lower(), p[1]))
356 .filter(lambda p: p[0])
357 .collect(dict)
358 )
361def make_app(config: ConfigParser) -> str | Application:
362 """Create the Tornado application and return it."""
363 module_infos, duration = time_function(get_module_infos)
364 if isinstance(module_infos, str):
365 return module_infos
366 if duration > 1:
367 LOGGER.warning(
368 "Getting the module infos took %ss. That's probably too long.",
369 duration,
370 )
371 handlers = get_all_handlers(module_infos)
372 return Application(
373 handlers,
374 MODULE_INFOS=module_infos,
375 SHOW_HAMBURGER_MENU=not Stream(module_infos)
376 .exclude(lambda info: info.hidden)
377 .filter(lambda info: info.path)
378 .empty(),
379 NORMED_PATHS=get_normed_paths_from_module_infos(module_infos),
380 HANDLERS=handlers,
381 # General settings
382 autoreload=False,
383 debug=sys.flags.dev_mode,
384 default_handler_class=NotFoundHandler,
385 compress_response=config.getboolean(
386 "GENERAL", "COMPRESS_RESPONSE", fallback=False
387 ),
388 websocket_ping_interval=10,
389 # Template settings
390 template_loader=TemplateLoader(
391 root=TEMPLATES_DIR, whitespace="oneline"
392 ),
393 )
396def apply_config_to_app(app: Application, config: BetterConfigParser) -> None:
397 """Apply the config (from the config.ini file) to the application."""
398 app.settings["CONFIG"] = config
400 app.settings["cookie_secret"] = config.get(
401 "GENERAL", "COOKIE_SECRET", fallback="xyzzy"
402 )
404 app.settings["CRAWLER_SECRET"] = config.get(
405 "APP_SEARCH", "CRAWLER_SECRET", fallback=None
406 )
408 app.settings["DOMAIN"] = config.get("GENERAL", "DOMAIN", fallback=None)
410 app.settings["ELASTICSEARCH_PREFIX"] = config.get(
411 "ELASTICSEARCH", "PREFIX", fallback=NAME
412 )
414 app.settings["HSTS"] = config.getboolean("TLS", "HSTS", fallback=False)
416 app.settings["NETCUP"] = config.getboolean(
417 "GENERAL", "NETCUP", fallback=False
418 )
420 onion_address = config.get("GENERAL", "ONION_ADDRESS", fallback=None)
421 app.settings["ONION_ADDRESS"] = onion_address
422 if onion_address is None:
423 app.settings["ONION_PROTOCOL"] = None
424 else:
425 app.settings["ONION_PROTOCOL"] = onion_address.split("://")[0]
427 app.settings["RATELIMITS"] = config.getboolean(
428 "GENERAL",
429 "RATELIMITS",
430 fallback=config.getboolean("REDIS", "ENABLED", fallback=False),
431 )
433 app.settings["REDIS_PREFIX"] = config.get("REDIS", "PREFIX", fallback=NAME)
435 app.settings["REPORTING"] = config.getboolean(
436 "REPORTING", "ENABLED", fallback=True
437 )
439 app.settings["REPORTING_BUILTIN"] = config.getboolean(
440 "REPORTING", "BUILTIN", fallback=sys.flags.dev_mode
441 )
443 app.settings["REPORTING_ENDPOINT"] = config.get(
444 "REPORTING",
445 "ENDPOINT",
446 fallback=(
447 "/api/reports"
448 if app.settings["REPORTING_BUILTIN"]
449 else "https://asozial.org/api/reports"
450 ),
451 )
453 app.settings["TRUSTED_API_SECRETS"] = {
454 key_perms[0]: Permission(
455 int(key_perms[1])
456 if len(key_perms) > 1
457 else (1 << len(Permission)) - 1 # should be all permissions
458 )
459 for secret in config.getset(
460 "GENERAL", "TRUSTED_API_SECRETS", fallback={"xyzzy"}
461 )
462 if (key_perms := [part.strip() for part in secret.split("=")])
463 if key_perms[0]
464 }
466 app.settings["AUTH_TOKEN_SECRET"] = config.get(
467 "GENERAL", "AUTH_TOKEN_SECRET", fallback=None
468 )
469 if not app.settings["AUTH_TOKEN_SECRET"]:
470 node = uuid.getnode().to_bytes(6, "big")
471 secret = RIPEMD160.new(node).digest().decode("BRAILLE")
472 LOGGER.warning(
473 "AUTH_TOKEN_SECRET is unset, implicitly setting it to %r",
474 secret,
475 )
476 app.settings["AUTH_TOKEN_SECRET"] = secret
478 app.settings["UNDER_ATTACK"] = config.getboolean(
479 "GENERAL", "UNDER_ATTACK", fallback=False
480 )
482 apply_contact_stuff_to_app(app, config)
485def get_ssl_context( # pragma: no cover
486 config: ConfigParser,
487) -> None | ssl.SSLContext:
488 """Create SSL context and configure using the config."""
489 if config.getboolean("TLS", "ENABLED", fallback=False):
490 ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
491 ssl_ctx.load_cert_chain(
492 config.get("TLS", "CERTFILE"),
493 config.get("TLS", "KEYFILE", fallback=None),
494 config.get("TLS", "PASSWORD", fallback=None),
495 )
496 return ssl_ctx
497 return None
500def setup_logging( # pragma: no cover
501 config: ConfigParser,
502 force: bool = False,
503) -> None:
504 """Setup logging.""" # noqa: D401
505 root_logger = logging.getLogger()
507 if root_logger.handlers:
508 if not force:
509 return
510 for handler in root_logger.handlers[:]:
511 root_logger.removeHandler(handler)
512 handler.close()
514 debug = config.getboolean("LOGGING", "DEBUG", fallback=sys.flags.dev_mode)
516 logging.captureWarnings(True)
518 root_logger.setLevel(logging.DEBUG if debug else logging.INFO)
519 logging.getLogger("tornado.curl_httpclient").setLevel(logging.INFO)
520 logging.getLogger("elasticsearch").setLevel(logging.INFO)
522 stream_handler = logging.StreamHandler()
523 if sys.flags.dev_mode:
524 spam = regex.sub(r"%\((end_)?color\)s", "", LogFormatter.DEFAULT_FORMAT)
525 formatter = logging.Formatter(spam, LogFormatter.DEFAULT_DATE_FORMAT)
526 else:
527 formatter = LogFormatter()
528 stream_handler.setFormatter(formatter)
529 root_logger.addHandler(stream_handler)
531 if path := config.get("LOGGING", "PATH", fallback=None):
532 os.makedirs(path, 0o755, True)
533 file_handler = logging.handlers.TimedRotatingFileHandler(
534 os.path.join(path, f"{NAME}.log"),
535 encoding="UTF-8",
536 when="midnight",
537 backupCount=30,
538 utc=True,
539 )
540 file_handler.setFormatter(StdlibFormatter())
541 root_logger.addHandler(file_handler)
544class WebhookLoggingOptions: # pylint: disable=too-few-public-methods
545 """Webhook logging options."""
547 __slots__ = (
548 "url",
549 "content_type",
550 "body_format",
551 "timestamp_format",
552 "timestamp_timezone",
553 "escape_message",
554 "max_message_length",
555 )
557 url: str | None
558 content_type: str
559 body_format: str
560 timestamp_format: str | None
561 timestamp_timezone: str | None
562 escape_message: bool
563 max_message_length: int | None
565 def __init__(self, config: ConfigParser) -> None:
566 """Initialize Webhook logging options."""
567 self.url = config.get("LOGGING", "WEBHOOK_URL", fallback=None)
568 self.content_type = config.get(
569 "LOGGING",
570 "WEBHOOK_CONTENT_TYPE",
571 fallback="application/json",
572 )
573 spam = regex.sub(r"%\((end_)?color\)s", "", LogFormatter.DEFAULT_FORMAT)
574 self.body_format = config.get(
575 "LOGGING",
576 "WEBHOOK_BODY_FORMAT",
577 fallback='{"text":"' + spam + '"}',
578 )
579 self.timestamp_format = config.get(
580 "LOGGING",
581 "WEBHOOK_TIMESTAMP_FORMAT",
582 fallback=None,
583 )
584 self.timestamp_timezone = config.get(
585 "LOGGING", "WEBHOOK_TIMESTAMP_TIMEZONE", fallback=None
586 )
587 self.escape_message = config.getboolean(
588 "LOGGING",
589 "WEBHOOK_ESCAPE_MESSAGE",
590 fallback=True,
591 )
592 self.max_message_length = config.getint(
593 "LOGGING", "WEBHOOK_MAX_MESSAGE_LENGTH", fallback=None
594 )
597def setup_webhook_logging( # pragma: no cover
598 options: WebhookLoggingOptions,
599 loop: asyncio.AbstractEventLoop,
600) -> None:
601 """Setup Webhook logging.""" # noqa: D401
602 if not options.url:
603 return
605 LOGGER.info("Setting up Webhook logging")
607 root_logger = logging.getLogger()
609 webhook_content_type = options.content_type
610 webhook_handler = WebhookHandler(
611 logging.ERROR,
612 loop=loop,
613 url=options.url,
614 content_type=webhook_content_type,
615 )
616 formatter = WebhookFormatter(
617 options.body_format,
618 options.timestamp_format,
619 )
620 formatter.timezone = (
621 None
622 if options.timestamp_format is None
623 else ZoneInfo(options.timestamp_format)
624 )
625 formatter.escape_message = options.escape_message
626 formatter.max_message_length = options.max_message_length
627 formatter.get_context_line = lambda _: (
628 f"Request: {request}"
629 if (request := request_ctx_var.get(None))
630 else None
631 )
632 webhook_handler.setFormatter(formatter)
633 root_logger.addHandler(webhook_handler)
635 info_handler = WebhookHandler(
636 logging.INFO,
637 loop=loop,
638 url=options.url,
639 content_type=webhook_content_type,
640 )
641 info_handler.setFormatter(formatter)
642 logging.getLogger("an_website.quotes.create").addHandler(info_handler)
645def setup_apm(app: Application) -> None: # pragma: no cover
646 """Setup APM.""" # noqa: D401
647 config: BetterConfigParser = app.settings["CONFIG"]
648 app.settings["ELASTIC_APM"] = {
649 "ENABLED": config.getboolean("ELASTIC_APM", "ENABLED", fallback=False),
650 "SERVER_URL": config.get(
651 "ELASTIC_APM", "SERVER_URL", fallback="http://localhost:8200"
652 ),
653 "SECRET_TOKEN": config.get(
654 "ELASTIC_APM", "SECRET_TOKEN", fallback=None
655 ),
656 "API_KEY": config.get("ELASTIC_APM", "API_KEY", fallback=None),
657 "VERIFY_SERVER_CERT": config.getboolean(
658 "ELASTIC_APM", "VERIFY_SERVER_CERT", fallback=True
659 ),
660 "USE_CERTIFI": True, # doesn't actually use certifi
661 "SERVICE_NAME": NAME.removesuffix("-dev"),
662 "SERVICE_VERSION": VERSION,
663 "ENVIRONMENT": (
664 "production" if not sys.flags.dev_mode else "development"
665 ),
666 "DEBUG": True,
667 "CAPTURE_BODY": "errors",
668 "TRANSACTION_IGNORE_URLS": [
669 "/api/ping",
670 "/static/*",
671 "/favicon.png",
672 ],
673 "TRANSACTIONS_IGNORE_PATTERNS": ["^OPTIONS "],
674 "PROCESSORS": [
675 "an_website.utils.utils.apm_anonymization_processor",
676 "elasticapm.processors.sanitize_stacktrace_locals",
677 "elasticapm.processors.sanitize_http_request_cookies",
678 "elasticapm.processors.sanitize_http_headers",
679 "elasticapm.processors.sanitize_http_wsgi_env",
680 "elasticapm.processors.sanitize_http_request_body",
681 ],
682 "RUM_SERVER_URL": config.get(
683 "ELASTIC_APM", "RUM_SERVER_URL", fallback=None
684 ),
685 "RUM_SERVER_URL_PREFIX": config.get(
686 "ELASTIC_APM", "RUM_SERVER_URL_PREFIX", fallback=None
687 ),
688 }
690 script_options = [
691 f"serviceName:{app.settings['ELASTIC_APM']['SERVICE_NAME']!r}",
692 f"serviceVersion:{app.settings['ELASTIC_APM']['SERVICE_VERSION']!r}",
693 f"environment:{app.settings['ELASTIC_APM']['ENVIRONMENT']!r}",
694 ]
696 rum_server_url = app.settings["ELASTIC_APM"]["RUM_SERVER_URL"]
698 if rum_server_url is None:
699 script_options.append(
700 f"serverUrl:{app.settings['ELASTIC_APM']['SERVER_URL']!r}"
701 )
702 elif rum_server_url:
703 script_options.append(f"serverUrl:{rum_server_url!r}")
704 else:
705 script_options.append("serverUrl:window.location.origin")
707 if app.settings["ELASTIC_APM"]["RUM_SERVER_URL_PREFIX"]:
708 script_options.append(
709 f"serverUrlPrefix:{app.settings['ELASTIC_APM']['RUM_SERVER_URL_PREFIX']!r}"
710 )
712 app.settings["ELASTIC_APM"]["INLINE_SCRIPT"] = (
713 "elasticApm.init({" + ",".join(script_options) + "})"
714 )
716 app.settings["ELASTIC_APM"]["INLINE_SCRIPT_HASH"] = b64encode(
717 sha256(
718 app.settings["ELASTIC_APM"]["INLINE_SCRIPT"].encode("ASCII")
719 ).digest()
720 ).decode("ASCII")
722 if app.settings["ELASTIC_APM"]["ENABLED"]:
723 app.settings["ELASTIC_APM"]["CLIENT"] = ElasticAPM(app).client
726def setup_app_search(app: Application) -> None: # pragma: no cover
727 """Setup Elastic App Search.""" # noqa: D401
728 with catch_warnings():
729 simplefilter("ignore", DeprecationWarning)
730 # pylint: disable-next=import-outside-toplevel
731 from elastic_enterprise_search import ( # type: ignore[import-untyped]
732 AppSearch,
733 )
735 config: BetterConfigParser = app.settings["CONFIG"]
736 host = config.get("APP_SEARCH", "HOST", fallback=None)
737 key = config.get("APP_SEARCH", "SEARCH_KEY", fallback=None)
738 verify_certs = config.getboolean(
739 "APP_SEARCH", "VERIFY_CERTS", fallback=True
740 )
741 app.settings["APP_SEARCH"] = (
742 AppSearch(
743 host,
744 bearer_auth=key,
745 verify_certs=verify_certs,
746 ca_certs=CA_BUNDLE_PATH,
747 )
748 if host
749 else None
750 )
751 app.settings["APP_SEARCH_HOST"] = host
752 app.settings["APP_SEARCH_KEY"] = key
753 app.settings["APP_SEARCH_ENGINE"] = config.get(
754 "APP_SEARCH", "ENGINE_NAME", fallback=NAME.removesuffix("-dev")
755 )
758def setup_redis(app: Application) -> None | Redis[str]:
759 """Setup Redis.""" # noqa: D401
760 config: BetterConfigParser = app.settings["CONFIG"]
762 class Kwargs(TypedDict, total=False):
763 """Kwargs of BlockingConnectionPool constructor."""
765 db: int
766 username: None | str
767 password: None | str
768 retry_on_timeout: bool
769 connection_class: type[UnixDomainSocketConnection] | type[SSLConnection]
770 path: str
771 host: str
772 port: int
773 ssl_ca_certs: str
774 ssl_keyfile: None | str
775 ssl_certfile: None | str
776 ssl_check_hostname: bool
777 ssl_cert_reqs: str
779 kwargs: Kwargs = {
780 "db": config.getint("REDIS", "DB", fallback=0),
781 "username": config.get("REDIS", "USERNAME", fallback=None),
782 "password": config.get("REDIS", "PASSWORD", fallback=None),
783 "retry_on_timeout": config.getboolean(
784 "REDIS", "RETRY_ON_TIMEOUT", fallback=False
785 ),
786 }
787 redis_ssl_kwargs: Kwargs = {
788 "connection_class": SSLConnection,
789 "ssl_ca_certs": CA_BUNDLE_PATH,
790 "ssl_keyfile": config.get("REDIS", "SSL_KEYFILE", fallback=None),
791 "ssl_certfile": config.get("REDIS", "SSL_CERTFILE", fallback=None),
792 "ssl_cert_reqs": config.get(
793 "REDIS", "SSL_CERT_REQS", fallback="required"
794 ),
795 "ssl_check_hostname": config.getboolean(
796 "REDIS", "SSL_CHECK_HOSTNAME", fallback=False
797 ),
798 }
799 redis_host_port_kwargs: Kwargs = {
800 "host": config.get("REDIS", "HOST", fallback="localhost"),
801 "port": config.getint("REDIS", "PORT", fallback=6379),
802 }
803 redis_use_ssl = config.getboolean("REDIS", "SSL", fallback=False)
804 redis_unix_socket_path = config.get(
805 "REDIS", "UNIX_SOCKET_PATH", fallback=None
806 )
808 if redis_unix_socket_path is not None:
809 if redis_use_ssl:
810 LOGGER.warning(
811 "SSL is enabled for Redis, but a UNIX socket is used"
812 )
813 if config.has_option("REDIS", "HOST"):
814 LOGGER.warning(
815 "A host is configured for Redis, but a UNIX socket is used"
816 )
817 if config.has_option("REDIS", "PORT"):
818 LOGGER.warning(
819 "A port is configured for Redis, but a UNIX socket is used"
820 )
821 kwargs.update(
822 {
823 "connection_class": UnixDomainSocketConnection,
824 "path": redis_unix_socket_path,
825 }
826 )
827 else:
828 kwargs.update(redis_host_port_kwargs)
829 if redis_use_ssl:
830 kwargs.update(redis_ssl_kwargs)
832 if not config.getboolean("REDIS", "ENABLED", fallback=False):
833 app.settings["REDIS"] = None
834 return None
835 connection_pool = BlockingConnectionPool(
836 client_name=NAME,
837 decode_responses=True,
838 **kwargs,
839 )
840 redis = cast("Redis[str]", Redis(connection_pool=connection_pool))
841 app.settings["REDIS"] = redis
842 return redis
845def signal_handler( # noqa: D103 # pragma: no cover
846 signalnum: int, frame: None | types.FrameType
847) -> None:
848 # pylint: disable=unused-argument, missing-function-docstring
849 if signalnum in {signal.SIGINT, signal.SIGTERM}:
850 EVENT_SHUTDOWN.set()
851 if signalnum == getattr(signal, "SIGHUP", None):
852 EVENT_SHUTDOWN.set()
855def install_signal_handler() -> None: # pragma: no cover
856 """Install the signal handler."""
857 signal.signal(signal.SIGINT, signal_handler)
858 signal.signal(signal.SIGTERM, signal_handler)
859 if hasattr(signal, "SIGHUP"):
860 signal.signal(signal.SIGHUP, signal_handler)
863def supervise(loop: AbstractEventLoop) -> None:
864 """Supervise."""
865 while foobarbaz := background_tasks.HEARTBEAT: # pylint: disable=while-used
866 if time.monotonic() - foobarbaz >= 10:
867 worker = task_id()
868 pid = os.getpid()
870 task = asyncio.current_task(loop)
871 request = task.get_context().get(request_ctx_var) if task else None
873 LOGGER.fatal(
874 "Heartbeat timed out for worker %s (pid %d), "
875 "current request: %s, current task: %s",
876 worker,
877 pid,
878 request,
879 task,
880 )
881 atexit._run_exitfuncs() # pylint: disable=protected-access
882 os.abort()
883 time.sleep(1)
886def main( # noqa: C901 # pragma: no cover
887 config: BetterConfigParser | None = None,
888) -> int | str:
889 """
890 Start everything.
892 This is the main function that is called when running this program.
893 """
894 # pylint: disable=too-complex, too-many-branches
895 # pylint: disable=too-many-locals, too-many-statements
896 setproctitle(NAME)
898 install_signal_handler()
900 parser = create_argument_parser()
901 args, _ = parser.parse_known_args(
902 get_arguments_without_help(), ArgparseNamespace()
903 )
905 if args.version:
906 print("Version:", VERSION)
907 if args.verbose:
908 # pylint: disable-next=import-outside-toplevel
909 from .version.version import (
910 get_file_hashes,
911 get_hash_of_file_hashes,
912 )
914 print()
915 print("Hash der Datei-Hashes:")
916 print(get_hash_of_file_hashes())
918 if args.verbose > 1:
919 print()
920 print("Datei-Hashes:")
921 print(get_file_hashes())
923 return 0
925 config = config or BetterConfigParser.from_path(*args.config)
926 assert config is not None
927 config.add_override_argument_parser(parser)
929 setup_logging(config)
931 LOGGER.info("Starting %s %s", NAME, VERSION)
933 if platform.system() == "Windows":
934 LOGGER.warning(
935 "Running %s on Windows is not officially supported",
936 NAME.removesuffix("-dev"),
937 )
939 ignore_modules(config)
940 app = make_app(config)
941 if isinstance(app, str):
942 return app
944 apply_config_to_app(app, config)
945 setup_elasticsearch(app)
946 setup_app_search(app)
947 setup_redis(app)
948 setup_apm(app)
950 behind_proxy = config.getboolean("GENERAL", "BEHIND_PROXY", fallback=False)
952 server = HTTPServer(
953 app,
954 body_timeout=3600,
955 decompress_request=True,
956 max_body_size=1_000_000_000,
957 ssl_options=get_ssl_context(config),
958 xheaders=behind_proxy,
959 )
961 socket_factories: list[Callable[[], Iterable[socket]]] = []
963 port = config.getint("GENERAL", "PORT", fallback=None)
965 if port:
966 socket_factories.append(
967 partial(
968 bind_sockets,
969 port,
970 "localhost" if behind_proxy else "",
971 )
972 )
974 unix_socket_path = config.get(
975 "GENERAL",
976 "UNIX_SOCKET_PATH",
977 fallback=None,
978 )
980 if unix_socket_path:
981 os.makedirs(unix_socket_path, 0o755, True)
982 socket_factories.append(
983 lambda: (
984 bind_unix_socket(
985 os.path.join(unix_socket_path, f"{NAME}.sock"),
986 mode=0o666,
987 ),
988 )
989 )
991 processes = config.getint(
992 "GENERAL",
993 "PROCESSES",
994 fallback=hasattr(os, "fork") * (2 if sys.flags.dev_mode else -1),
995 )
997 if processes < 0:
998 processes = os.process_cpu_count() or 0
1000 worker: None | int = None
1002 run_supervisor_thread = config.getboolean(
1003 "GENERAL", "SUPERVISE", fallback=False
1004 )
1005 elasticsearch_is_enabled = config.getboolean(
1006 "ELASTICSEARCH", "ENABLED", fallback=False
1007 )
1008 redis_is_enabled = config.getboolean("REDIS", "ENABLED", fallback=False)
1009 webhook_logging_options = WebhookLoggingOptions(config)
1010 # all config options should be read before forking
1011 if args.save_config_to:
1012 with open(args.save_config_to, "w", encoding="UTF-8") as file:
1013 config.write(file)
1014 config.set_all_options_should_be_parsed()
1015 del config
1016 # show help message if --help is given (after reading config, before forking)
1017 parser.parse_args()
1019 if not socket_factories:
1020 LOGGER.warning("No sockets configured")
1021 return 0
1023 # create sockets after checking for --help
1024 sockets: list[socket] = (
1025 Stream(socket_factories).flat_map(lambda fun: fun()).collect(list)
1026 )
1028 UPTIME.reset()
1029 main_pid = os.getpid()
1031 if processes:
1032 setproctitle(f"{NAME} - Master")
1034 worker = fork_processes(processes)
1036 setproctitle(f"{NAME} - Worker {worker}")
1038 # yeet all children (there should be none, but do it regardless, just in case)
1039 _children.clear()
1041 if "an_website.quotes" in sys.modules:
1042 from .quotes.utils import ( # pylint: disable=import-outside-toplevel
1043 AUTHORS_CACHE,
1044 QUOTES_CACHE,
1045 WRONG_QUOTES_CACHE,
1046 )
1048 del AUTHORS_CACHE.control.created_by_ultra # type: ignore[attr-defined]
1049 del QUOTES_CACHE.control.created_by_ultra # type: ignore[attr-defined]
1050 del WRONG_QUOTES_CACHE.control.created_by_ultra # type: ignore[attr-defined]
1051 del (geoip.__kwdefaults__ or {})["caches"].control.created_by_ultra
1053 if unix_socket_path:
1054 sockets.append(
1055 bind_unix_socket(
1056 os.path.join(unix_socket_path, f"{NAME}.{worker}.sock"),
1057 mode=0o666,
1058 )
1059 )
1061 # get loop after forking
1062 # if not forking allow loop to be set in advance by external code
1063 loop: None | asyncio.AbstractEventLoop
1064 try:
1065 with catch_warnings(): # TODO: remove after dropping support for 3.13
1066 simplefilter("ignore", DeprecationWarning)
1067 loop = asyncio.get_event_loop()
1068 if loop.is_closed():
1069 loop = None
1070 except RuntimeError:
1071 loop = None
1073 if loop is None:
1074 loop = asyncio.new_event_loop()
1075 asyncio.set_event_loop(loop)
1077 if not loop.get_task_factory():
1078 loop.set_task_factory(asyncio.eager_task_factory)
1080 if perf8 and "PERF8" in os.environ:
1081 loop.run_until_complete(perf8.enable())
1083 setup_webhook_logging(webhook_logging_options, loop)
1085 server.add_sockets(sockets)
1087 tasks = background_tasks.start_background_tasks( # noqa: F841
1088 module_infos=app.settings["MODULE_INFOS"],
1089 loop=loop,
1090 main_pid=main_pid,
1091 app=app,
1092 processes=processes,
1093 elasticsearch_is_enabled=elasticsearch_is_enabled,
1094 redis_is_enabled=redis_is_enabled,
1095 worker=worker,
1096 )
1098 if run_supervisor_thread:
1099 background_tasks.HEARTBEAT = time.monotonic()
1100 threading.Thread(
1101 target=supervise, args=(loop,), name="supervisor", daemon=True
1102 ).start()
1104 try:
1105 loop.run_forever()
1106 EVENT_SHUTDOWN.set()
1107 finally:
1108 try: # pylint: disable=too-many-try-statements
1109 server.stop()
1110 loop.run_until_complete(asyncio.sleep(1))
1111 loop.run_until_complete(server.close_all_connections())
1112 if perf8 and "PERF8" in os.environ:
1113 loop.run_until_complete(perf8.disable())
1114 if redis := app.settings.get("REDIS"):
1115 loop.run_until_complete(
1116 redis.aclose(close_connection_pool=True)
1117 )
1118 if elasticsearch := app.settings.get("ELASTICSEARCH"):
1119 loop.run_until_complete(elasticsearch.close())
1120 finally:
1121 try:
1122 _cancel_all_tasks(loop)
1123 loop.run_until_complete(loop.shutdown_asyncgens())
1124 loop.run_until_complete(loop.shutdown_default_executor())
1125 finally:
1126 loop.close()
1127 background_tasks.HEARTBEAT = 0
1129 return len(tasks)