Coverage for an_website / contact / contact.py: 63.303%
109 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 17:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 17:35 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
14"""A page that allows users to contact the website operator."""
16import asyncio
17import logging
18import smtplib
19import ssl
20import sys
21import time
22from collections.abc import Iterable, Mapping
23from configparser import ConfigParser
24from datetime import datetime, timezone
25from email import utils as email_utils
26from email.message import Message
27from math import pi
28from typing import Any, Final, cast
29from urllib.parse import quote, urlencode
31import typed_stream
32from tornado.web import Application, HTTPError, MissingArgumentError
34from .. import NAME
35from ..utils.request_handler import HTMLRequestHandler
36from ..utils.utils import ModuleInfo
38LOGGER: Final = logging.getLogger(__name__)
41def get_module_info() -> ModuleInfo:
42 """Create and return the ModuleInfo for this module."""
43 return ModuleInfo(
44 handlers=((r"/kontakt", ContactPage),),
45 name="Kontakt-Formular",
46 description="Nehme mit dem Betreiber der Webseite Kontakt auf.",
47 path="/kontakt",
48 keywords=("Kontakt", "Formular"),
49 aliases=("/contact",),
50 hidden=True,
51 )
54def apply_contact_stuff_to_app(app: Application, config: ConfigParser) -> None:
55 """Apply contact stuff to the app."""
56 if "/kontakt" not in typed_stream.Stream(
57 cast(Iterable[ModuleInfo], app.settings.get("MODULE_INFOS", ()))
58 ).map(lambda m: m.path):
59 return
61 contact_address = config.get(
62 "CONTACT",
63 "CONTACT_ADDRESS",
64 fallback=(
65 f"{NAME.removesuffix('-dev')}@restmail.net"
66 if app.settings.get("debug")
67 else ""
68 ),
69 )
70 sender_address = config.get(
71 "CONTACT",
72 "SENDER_ADDRESS",
73 fallback=(
74 "Marcell D'Avis <davis@1und1.de>"
75 if contact_address.endswith("@restmail.net")
76 else None
77 ),
78 )
80 if not sender_address:
81 # the contact form in other mode if no sender address is set
82 app.settings.update(
83 CONTACT_ADDRESS=contact_address,
84 CONTACT_USE_FORM=1,
85 )
86 return
87 app.settings["CONTACT_ADDRESS"] = None
89 if not (contact_address and sender_address):
90 return
92 app.settings.update(
93 CONTACT_USE_FORM=2,
94 CONTACT_RECIPIENTS={
95 address.strip() for address in contact_address.split(",")
96 },
97 CONTACT_SMTP_SERVER=config.get(
98 "CONTACT",
99 "SMTP_SERVER",
100 fallback=(
101 "restmail.net"
102 if contact_address.endswith("@restmail.net")
103 else "localhost"
104 ),
105 ),
106 CONTACT_SMTP_PORT=config.getint(
107 "CONTACT",
108 "SMTP_PORT",
109 fallback=25 if contact_address.endswith("@restmail.net") else 587,
110 ),
111 CONTACT_SMTP_STARTTLS=config.getboolean(
112 "CONTACT",
113 "SMTP_STARTTLS",
114 fallback=None,
115 ),
116 CONTACT_SENDER_ADDRESS=sender_address,
117 CONTACT_SENDER_USERNAME=config.get(
118 "CONTACT", "SENDER_USERNAME", fallback=None
119 ),
120 CONTACT_SENDER_PASSWORD=config.get(
121 "CONTACT", "SENDER_PASSWORD", fallback=None
122 ),
123 )
126def send_message( # pylint: disable=too-many-arguments
127 message: Message,
128 from_address: str,
129 recipients: Iterable[str],
130 server: str = "localhost",
131 sender: None | str = None,
132 username: None | str = None,
133 password: None | str = None,
134 starttls: None | bool = None,
135 port: int = 587,
136 *,
137 date: None | datetime = None,
138) -> dict[str, tuple[int, bytes]]:
139 """Send an email."""
140 recipients = list(recipients)
141 for spam, eggs in enumerate(recipients):
142 if eggs.startswith("@"):
143 recipients[spam] = "contact" + eggs
145 message["Date"] = email_utils.format_datetime(
146 date or datetime.now(tz=timezone.utc)
147 )
149 if sender:
150 message["Sender"] = sender
151 message["From"] = from_address
152 message["To"] = ", ".join(recipients)
154 with smtplib.SMTP(server, port) as smtp:
155 smtp.set_debuglevel(sys.flags.dev_mode * 2)
156 smtp.ehlo_or_helo_if_needed()
157 if starttls is None:
158 starttls = smtp.has_extn("starttls")
159 if starttls:
160 smtp.starttls(context=ssl.create_default_context())
161 if username and password:
162 smtp.login(username, password)
163 return smtp.send_message(message)
166def add_geoip_info_to_message(
167 message: Message,
168 geoip_info: Mapping[str, Any],
169 header_prefix: str = "X-GeoIP",
170) -> None:
171 """Add GeoIP information to the message."""
172 for spam, eggs in geoip_info.items():
173 header = f"{header_prefix}-{spam.replace('_', '-')}"
174 if isinstance(eggs, dict):
175 add_geoip_info_to_message(message, eggs, header)
176 else:
177 message[header] = str(eggs)
180class ContactPage(HTMLRequestHandler):
181 """The request handler for the contact page."""
183 RATELIMIT_POST_LIMIT = 5
184 RATELIMIT_POST_COUNT_PER_PERIOD = 1
185 RATELIMIT_POST_PERIOD = 120
187 ACCESS_LOG: dict[str, float] = {}
189 async def get(self, *, head: bool = False) -> None:
190 """Handle GET requests to the contact page."""
191 if not self.settings.get("CONTACT_USE_FORM"):
192 raise HTTPError(503)
193 if head:
194 return
196 current_time = time.monotonic()
198 # clean up old items from the access log
199 for key, value in tuple(self.ACCESS_LOG.items()):
200 if current_time - value > 10:
201 del self.ACCESS_LOG[key]
203 self.ACCESS_LOG[str(self.request.remote_ip)] = current_time
205 await self.render(
206 "pages/contact.html",
207 subject=self.get_argument("subject", ""),
208 message=self.get_argument("message", ""),
209 )
211 async def post(self) -> None:
212 """Handle POST requests to the contact page."""
213 if not self.settings.get("CONTACT_USE_FORM"):
214 raise HTTPError(503)
216 if atime := self.ACCESS_LOG.get(str(self.request.remote_ip)):
217 del self.ACCESS_LOG[str(self.request.remote_ip)]
218 if time.monotonic() - atime < pi:
219 LOGGER.info("Rejected message because of timing")
220 await self.render(
221 "pages/empty.html",
222 text="Nicht gesendet, da du zu schnell warst.",
223 )
224 return
226 text = self.get_argument("nachricht")
227 if not text:
228 raise MissingArgumentError("nachricht") # raise on empty message
230 name = self.get_argument("name", "")
231 address_arg = self.get_argument("addresse", "")
232 address = (
233 f"{name} <{address_arg or 'anonymous@foo.bar'}>"
234 if name
235 else address_arg or "anonymous@foo.bar"
236 )
238 message = Message()
239 host = self.request.host_name
240 name = name or address_arg or "Jemand"
241 message["Subject"] = str(
242 self.get_argument("subjekt", "")
243 or f"{name} will etwas über {host} schreiben."
244 )
245 message.set_payload(text, "UTF-8")
247 if honeypot := self.get_argument("message", ""): # 🍯
248 LOGGER.info(
249 "rejected message: %s",
250 {
251 "Subject": message["Subject"],
252 "message": message,
253 "address": address,
254 "geoip": await self.geoip(),
255 "🍯": honeypot,
256 },
257 )
258 await self.render("pages/empty.html", text="Erfolgreich gesendet.")
259 return
261 if self.settings.get("CONTACT_USE_FORM") == 1:
262 query = urlencode(
263 {
264 "subject": message["Subject"],
265 "body": f"{text}\n\nVon: {address}",
266 },
267 quote_via=quote,
268 )
269 self.redirect(
270 f"mailto:{self.settings.get('CONTACT_ADDRESS')}?{query}",
271 status=303,
272 )
273 return
275 geoip = await self.geoip()
276 if geoip:
277 add_geoip_info_to_message(message, geoip)
279 await asyncio.to_thread(
280 send_message,
281 message=message,
282 from_address=address,
283 server=self.settings.get("CONTACT_SMTP_SERVER", "localhost"),
284 sender=self.settings.get("CONTACT_SENDER_ADDRESS"),
285 recipients=self.settings.get("CONTACT_RECIPIENTS", ""),
286 username=self.settings.get("CONTACT_SENDER_USERNAME"),
287 password=self.settings.get("CONTACT_SENDER_PASSWORD"),
288 starttls=self.settings.get("CONTACT_SMTP_STARTTLS"),
289 port=self.settings.get("CONTACT_SMTP_PORT", 587),
290 date=await self.get_time(),
291 )
293 await self.render("pages/empty.html", text="Erfolgreich gesendet.")