Coverage for an_website / utils / logging.py: 32.673%
101 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-22 18:49 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-22 18:49 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
14"""Logging stuff used by the website."""
17import asyncio
18import logging
19import traceback
20from asyncio import AbstractEventLoop
21from collections.abc import Callable, Coroutine, Iterable
22from concurrent.futures import Future
23from datetime import datetime, tzinfo
24from logging import LogRecord
25from pathlib import Path
26from typing import Never
28import orjson as json
29from tornado.httpclient import AsyncHTTPClient
31from an_website import DIR as AN_WEBSITE_DIR
33from .. import CA_BUNDLE_PATH
35HOME: str = Path("~/").expanduser().as_posix().rstrip("/")
38def minify_filepath(path: str) -> str:
39 """Make a filepath smaller."""
40 if path.startswith(f"{HOME}/"):
41 return "~" + path.removeprefix(HOME)
42 return path
45def get_minimal_traceback(
46 record: LogRecord, prefix: str = "\n\n"
47) -> Iterable[str]:
48 """Get a minimal traceback from the log record."""
49 if not record.exc_info:
50 return
51 (_, value, tb) = record.exc_info
52 if not (value and tb):
53 return
55 yield prefix
56 yield from traceback.format_exception(value, limit=0)
58 summary = traceback.extract_tb(tb)
59 if isinstance(AN_WEBSITE_DIR, Path):
60 start_path = f"{str(AN_WEBSITE_DIR).rstrip('/')}/"
62 for i in reversed(range(len(summary))):
63 if summary[i].filename.startswith(start_path):
64 summary = traceback.StackSummary(summary[i:])
65 break
67 for frame in summary:
68 frame.filename = minify_filepath(frame.filename)
70 yield from summary.format()
73class AsyncHandler(logging.Handler):
74 """A logging handler that can handle log records asynchronously."""
76 futures: set[Future[object]]
77 loop: AbstractEventLoop
79 def __init__(
80 self,
81 level: int | str = logging.NOTSET,
82 *,
83 loop: AbstractEventLoop,
84 ):
85 """Initialize the handler."""
86 super().__init__(level=level)
87 self.futures = set()
88 self.loop = loop
90 def callback(self, future: Future[object]) -> None:
91 """Remove the reference to the future from the handler."""
92 self.acquire()
93 try:
94 self.futures.discard(future)
95 finally:
96 self.release()
98 def emit( # type: ignore[override]
99 self, record: LogRecord
100 ) -> None | Coroutine[None, Never, object]:
101 """
102 Do whatever it takes to actually log the specified logging record.
104 This version is intended to be implemented by subclasses and so
105 raises a NotImplementedError.
106 """
107 raise NotImplementedError(
108 "emit must be implemented by AsyncHandler subclasses"
109 )
111 def handle( # type: ignore[override]
112 self, record: LogRecord
113 ) -> bool | LogRecord:
114 """Handle incoming log records."""
115 rv = self.filter(record)
116 if isinstance(rv, LogRecord):
117 record = rv
118 if rv and not self.loop.is_closed():
119 self.acquire()
120 try:
121 if awaitable := self.emit(record):
122 future: Future[object] = asyncio.run_coroutine_threadsafe(
123 awaitable, self.loop
124 )
125 self.futures.add(future)
126 future.add_done_callback(self.callback)
127 finally:
128 self.release()
129 return rv
132class DatetimeFormatter(logging.Formatter):
133 """A logging formatter that formats the time using datetime."""
135 timezone: None | tzinfo = None
137 def formatTime( # noqa: N802
138 self, record: LogRecord, datefmt: None | str = None
139 ) -> str:
140 """Return the creation time of the LogRecord as formatted text."""
141 spam = datetime.fromtimestamp(record.created).astimezone(self.timezone)
142 if datefmt:
143 return spam.strftime(datefmt)
144 return spam.isoformat()
147class WebhookFormatter(DatetimeFormatter):
148 """A logging formatter optimized for logging to a webhook."""
150 escape_message = False
151 max_message_length: int | None = None
152 get_context_line: Callable[[LogRecord], str | None] | None = None
154 def format(self, record: LogRecord) -> str:
155 """Format the specified record as text."""
156 record.message = record.getMessage()
157 if self.usesTime():
158 record.asctime = self.formatTime(record, self.datefmt)
159 if (
160 self.max_message_length is not None
161 and len(record.message) > self.max_message_length
162 ):
163 record.message = record.message[: self.max_message_length]
164 for line in get_minimal_traceback(record):
165 if (
166 self.max_message_length is not None
167 and len(line) + len(record.message) > self.max_message_length
168 ):
169 ellipsis = "…"
170 if (
171 len(ellipsis) + len(record.message)
172 <= self.max_message_length
173 ):
174 record.message += ellipsis
175 break
176 record.message += line
177 if (
178 self.get_context_line
179 and (context_line := self.get_context_line(record))
180 and (
181 (len(record.message) + 2 + len(context_line))
182 <= self.max_message_length
183 if self.max_message_length
184 else True
185 )
186 ):
187 record.message += f"\n\n{context_line}"
188 if self.escape_message:
189 record.message = json.dumps(record.message).decode("UTF-8")[1:-1]
190 return self.formatMessage(record)
193class WebhookHandler(AsyncHandler):
194 """A logging handler that sends logs to a webhook."""
196 url: str
197 content_type: str
199 def __init__(
200 self,
201 level: int | str = logging.NOTSET,
202 *,
203 loop: AbstractEventLoop,
204 url: str,
205 content_type: str,
206 ):
207 """Initialize the handler."""
208 super().__init__(level=level, loop=loop)
209 self.url = url
210 self.content_type = content_type
212 async def emit(self, record: LogRecord) -> None: # type: ignore[override]
213 """Send the request to the webhook."""
214 # pylint: disable=invalid-overridden-method
215 try:
216 message = self.format(record)
217 await AsyncHTTPClient().fetch(
218 self.url,
219 method="POST",
220 headers={"Content-Type": self.content_type},
221 body=message.strip(),
222 ca_certs=CA_BUNDLE_PATH,
223 )
224 except Exception: # pylint: disable=broad-except
225 self.handleError(record)