Coverage for an_website/quotes/quotes.py: 72.770%
213 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-10 18:56 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-10 18:56 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
14"""
15A page with wrong quotes.
17It displays funny, but wrong, quotes.
18"""
20import asyncio
21import logging
22import random
23from asyncio import AbstractEventLoop, Future
24from dataclasses import dataclass
25from typing import Any, ClassVar, Final, Literal, TypeAlias
27import regex
28from tornado.web import HTTPError
30from .. import EVENT_REDIS
31from ..utils.data_parsing import parse_args
32from ..utils.request_handler import APIRequestHandler, HTMLRequestHandler
33from ..utils.utils import hash_ip
34from .image import IMAGE_CONTENT_TYPES, create_image
35from .quote_of_the_day import QuoteOfTheDayBaseHandler
36from .utils import (
37 WRONG_QUOTES_CACHE,
38 QuoteReadyCheckHandler,
39 WrongQuote,
40 create_wq_and_vote,
41 get_authors,
42 get_random_id,
43 get_wrong_quote,
44 get_wrong_quotes,
45)
47LOGGER: Final = logging.getLogger(__name__)
50RatingFilter: TypeAlias = Literal["w", "n", "unrated", "rated", "all", "smart"]
51SMART_RATING_FILTERS: Final[tuple[RatingFilter, ...]] = (
52 *(("n",) * 1),
53 *(("all",) * 5),
54 *(("w",) * 5),
55)
58@dataclass(frozen=True)
59class VoteArgument:
60 """Voting matters."""
62 vote: Literal[-1, 0, 1]
65def parse_rating_filter(rating_filter_str: str) -> RatingFilter:
66 """Get a rating filter."""
67 match rating_filter_str:
68 case "w":
69 return "w"
70 case "n":
71 return "n"
72 case "unrated":
73 return "unrated"
74 case "rated":
75 return "rated"
76 case "all":
77 return "all"
78 return "smart"
81# pylint: disable-next=too-complex
82def get_next_id(rating_filter: RatingFilter) -> tuple[int, int]:
83 """Get the id of the next quote."""
84 if rating_filter == "smart":
85 rating_filter = random.choice(SMART_RATING_FILTERS) # nosec: B311
87 match rating_filter:
88 case "unrated":
89 # get a random quote, but filter out already rated quotes
90 # pylint: disable=while-used
91 while (ids := get_random_id()) in WRONG_QUOTES_CACHE:
92 if WRONG_QUOTES_CACHE[ids].id == -1:
93 # check for wrong quotes that are unrated but in the cache
94 # they don't have a real wrong_quotes_id
95 return ids
96 return ids
97 case "all":
98 return get_random_id()
99 case "w":
100 wrong_quotes = get_wrong_quotes(lambda wq: wq.rating > 0)
101 case "n":
102 wrong_quotes = get_wrong_quotes(lambda wq: wq.rating < 0)
103 case "rated":
104 wrong_quotes = get_wrong_quotes(lambda wq: wq.id not in {-1, None})
105 case _:
106 LOGGER.error("Invalid rating filter %s", rating_filter)
107 return get_random_id()
109 if not wrong_quotes:
110 # invalid rating filter or no wrong quotes with that filter
111 return get_random_id()
113 return random.choice(wrong_quotes).get_id() # nosec: B311
116class QuoteBaseHandler(QuoteReadyCheckHandler):
117 """The base request handler for the quotes package."""
119 RATELIMIT_GET_LIMIT = 20
120 RATELIMIT_GET_COUNT_PER_PERIOD = 20
121 RATELIMIT_GET_PERIOD = 10
123 FUTURES: set[Future[Any]] = set()
125 loop: AbstractEventLoop
126 next_id: tuple[int, int]
127 rating_filter: RatingFilter
129 def future_callback(self, future: Future[WrongQuote | None]) -> None:
130 """Discard the future and log the exception if one occured."""
131 self.FUTURES.discard(future)
132 if exc := future.exception():
133 LOGGER.error(
134 "Failed to pre-fetch quote %d-%d (%s)",
135 *self.next_id,
136 exc,
137 exc_info=(type(exc), exc, exc.__traceback__),
138 )
139 else:
140 LOGGER.debug("Pre-fetched quote %d-%d", *self.next_id)
142 def get_next_url(self) -> str:
143 """Get the URL of the next quote."""
144 return self.fix_url(
145 f"/zitate/{self.next_id[0]}-{self.next_id[1]}",
146 query_args={
147 "r": (
148 None
149 if self.rating_filter == "smart"
150 else self.rating_filter
151 ),
152 "show-rating": self.get_show_rating() or None,
153 },
154 )
156 def get_show_rating(self) -> bool:
157 """Return whether the user wants to see the rating."""
158 return self.get_bool_argument("show-rating", False)
160 def on_finish(self) -> None:
161 """
162 Pre-fetch the data for the next quote.
164 This is done to show the users less out-of-date data.
165 """
166 if len(self.FUTURES) > 1 or (
167 self.content_type
168 and self.content_type.startswith("image/")
169 or self.content_type
170 in {"application/pdf", "application/vnd.ms-excel"}
171 ):
172 return # don't spam and don't do this for images
174 user_agent = self.request.headers.get("User-Agent")
175 if not user_agent or "bot" in user_agent.lower():
176 return # don't do this for bots
178 if hasattr(self, "loop"):
179 task = self.loop.create_task(
180 get_wrong_quote(*self.next_id, use_cache=False)
181 )
182 self.FUTURES.add(task)
183 task.add_done_callback(self.future_callback)
185 async def prepare(self) -> None:
186 """Set the id of the next wrong_quote to show."""
187 await super().prepare()
188 self.loop = asyncio.get_running_loop()
189 self.rating_filter = parse_rating_filter(self.get_argument("r", ""))
190 self.next_id = get_next_id(self.rating_filter)
193class QuoteMainPage(QuoteBaseHandler, QuoteOfTheDayBaseHandler):
194 """The main quote page that explains everything and links to stuff."""
196 async def get(self, *, head: bool = False) -> None:
197 """Render the main quote page, with a few links."""
198 # pylint: disable=unused-argument
199 quote = self.get_argument("quote", "")
200 author = self.get_argument("author", "")
201 if (quote or author) and regex.fullmatch(r"^[0-9]+$", quote + author):
202 self.redirect(self.fix_url(f"/zitate/{quote}-{author}"))
203 return
205 wrong_quotes = (
206 get_wrong_quotes(lambda wq: wq.rating > 0) or get_wrong_quotes()
207 )
208 await self.render(
209 "pages/quotes/main_page.html",
210 funny_quote_url=self.id_to_url(
211 *random.choice(wrong_quotes).get_id(),
212 rating_param="w",
213 ),
214 random_quote_url=self.id_to_url(*self.next_id),
215 quote_of_the_day=self.redis and await self.get_quote_of_today(), # type: ignore[truthy-bool]
216 one_stone_url=self.get_author_url("Albert Einstein"),
217 kangaroo_url=self.get_author_url("Das Känguru"),
218 muk_url=self.get_author_url("Marc-Uwe Kling"),
219 )
221 def get_author_url(self, author_name: str) -> None | str:
222 """Get the info URL of an author."""
223 authors = get_authors(lambda _a: _a.name.lower() == author_name.lower())
224 if not authors:
225 return None
226 return self.fix_url(f"/zitate/info/a/{authors[0].id}")
228 def id_to_url(
229 self, quote_id: int, author_id: int, rating_param: None | str = None
230 ) -> str:
231 """Get the URL of a quote."""
232 return self.fix_url(
233 f"/zitate/{quote_id}-{author_id}",
234 query_args={"r": rating_param},
235 )
238def wrong_quote_to_json(
239 wq_: WrongQuote, vote: int, next_: tuple[int, int], rating: str, full: bool
240) -> dict[str, Any]: # TODO: improve lazy return type typing
241 """Convert a wrong quote to a dict."""
242 next_q, next_a = next_
243 if full:
244 return {
245 "wrong_quote": wq_.to_json(),
246 "next": f"{next_q}-{next_a}",
247 "vote": vote,
248 }
249 return {
250 "id": wq_.get_id_as_str(),
251 "quote": str(wq_.quote),
252 "author": str(wq_.author),
253 "real_author": str(wq_.quote.author),
254 "real_author_id": wq_.quote.author_id,
255 "rating": rating,
256 "vote": vote,
257 "next": f"{next_q}-{next_a}",
258 }
261class QuoteById(QuoteBaseHandler):
262 """The page with a specified quote that then gets rendered."""
264 RATELIMIT_POST_LIMIT: ClassVar[int] = 10
265 RATELIMIT_POST_COUNT_PER_PERIOD: ClassVar[int] = 5
266 RATELIMIT_POST_PERIOD: ClassVar[int] = 10
268 POSSIBLE_CONTENT_TYPES: ClassVar[tuple[str, ...]] = (
269 *HTMLRequestHandler.POSSIBLE_CONTENT_TYPES,
270 *APIRequestHandler.POSSIBLE_CONTENT_TYPES,
271 *IMAGE_CONTENT_TYPES,
272 )
274 LONG_PATH: ClassVar[str] = "/zitate/%d-%d"
276 async def get(
277 self, quote_id: str, author_id: None | str = None, *, head: bool = False
278 ) -> None:
279 """Handle GET requests to this page and render the quote."""
280 int_quote_id = int(quote_id)
281 if author_id is None:
282 wqs = get_wrong_quotes(lambda wq: wq.id == int_quote_id)
283 if not wqs:
284 raise HTTPError(404, f"No wrong quote with id {quote_id}")
285 return self.redirect(self.fix_url(self.LONG_PATH % wqs[0].get_id()))
287 if head:
288 return
290 if (
291 self.content_type
292 and self.content_type.startswith("image/")
293 or self.content_type
294 in {"application/pdf", "application/vnd.ms-excel"}
295 ):
296 wrong_quote = await get_wrong_quote(int_quote_id, int(author_id))
297 if not wrong_quote:
298 raise HTTPError(404, reason="Falsches Zitat nicht gefunden")
299 return await self.finish(
300 await asyncio.to_thread(
301 create_image,
302 (
303 self.sub_stanley(wrong_quote.quote.quote)
304 if self.stanley()
305 else wrong_quote.quote.quote
306 ),
307 (
308 self.sub_stanley(wrong_quote.author.name)
309 if self.stanley()
310 else wrong_quote.author.name
311 ),
312 wrong_quote.rating,
313 f"{self.request.host_name}/z/{wrong_quote.get_id_as_str(True)}",
314 (
315 self.content_type.removeprefix("image/").removeprefix(
316 "x-"
317 )
318 if self.content_type.startswith("image/")
319 else {
320 "application/pdf": "pdf",
321 "application/vnd.ms-excel": "xlsx",
322 }[self.content_type]
323 ),
324 wq_id=wrong_quote.get_id_as_str(),
325 )
326 )
328 await self.render_quote(int_quote_id, int(author_id))
330 async def get_old_vote(
331 self, quote_id: int, author_id: int
332 ) -> Literal[-1, 0, 1]:
333 """Get the old vote from the saved vote."""
334 old_vote = await self.get_saved_vote(quote_id, author_id)
335 if old_vote is None:
336 return 0
337 return old_vote
339 async def get_rating_str(self, wrong_quote: WrongQuote) -> str:
340 """Get the rating str to display on the page."""
341 if wrong_quote.id in {None, -1}:
342 return "---"
343 if (
344 not self.get_show_rating() # don't hide the rating on wish of user
345 and self.rating_filter == "smart"
346 and self.request.method
347 and self.request.method.upper() == "GET"
348 and await self.get_saved_vote(
349 wrong_quote.quote_id, wrong_quote.author_id
350 )
351 is None
352 ):
353 return "???"
354 return str(wrong_quote.rating)
356 def get_redis_votes_key(self, quote_id: int, author_id: int) -> str:
357 """Get the key to save the votes with Redis."""
358 return (
359 f"{self.redis_prefix}:quote-votes:"
360 f"{self.get_user_id()}:{quote_id}-{author_id}"
361 )
363 async def get_saved_vote(
364 self, quote_id: int, author_id: int
365 ) -> None | Literal[-1, 0, 1]:
366 """
367 Get the vote of the current user saved with Redis.
369 Use the quote_id and author_id to query the vote.
370 Return None if nothing is saved.
371 """
372 if not EVENT_REDIS.is_set():
373 LOGGER.warning("No Redis connection")
374 return 0
375 result = await self.redis.get(
376 self.get_redis_votes_key(quote_id, author_id)
377 )
378 if result == "-1":
379 return -1
380 if result == "0":
381 return 0
382 if result == "1":
383 return 1
384 return None
386 @parse_args(type_=VoteArgument)
387 async def post(
388 self,
389 quote_id_str: str,
390 author_id_str: str | None = None,
391 *,
392 args: VoteArgument,
393 ) -> None:
394 """
395 Handle POST requests to this page and render the quote.
397 This is used to vote the quote, without changing the URL.
398 """
399 quote_id = int(quote_id_str)
400 if author_id_str is None:
401 wqs = get_wrong_quotes(lambda wq: wq.id == quote_id)
402 if not wqs:
403 raise HTTPError(404, f"No wrong quote with id {quote_id}")
404 return self.redirect(self.fix_url(self.LONG_PATH % wqs[0].get_id()))
406 author_id = int(author_id_str)
408 new_vote_str = self.get_argument("vote", None)
410 if not new_vote_str:
411 return await self.render_quote(quote_id, author_id)
413 old_vote: Literal[-1, 0, 1] = await self.get_old_vote(
414 quote_id, author_id
415 )
417 new_vote: Literal[-1, 0, 1] = args.vote
418 vote_diff: int = new_vote - old_vote
420 if not vote_diff: # == 0
421 return await self.render_quote(quote_id, author_id)
423 await self.update_saved_votes(quote_id, author_id, new_vote)
425 to_vote: Literal[-1, 1] = -1 if vote_diff < 0 else 1
427 contributed_by = f"an-website_{hash_ip(self.request.remote_ip, 10)}"
429 # do the voting
430 wrong_quote = await create_wq_and_vote(
431 to_vote, quote_id, author_id, contributed_by
432 )
433 if abs(vote_diff) == 2:
434 await wrong_quote.vote(to_vote, lazy=True)
436 await self.render_wrong_quote(wrong_quote, new_vote)
438 async def render_quote(self, quote_id: int, author_id: int) -> None:
439 """Get and render a wrong quote based on author id and author id."""
440 await self.render_wrong_quote(
441 await get_wrong_quote(quote_id, author_id),
442 await self.get_old_vote(quote_id, author_id),
443 )
445 async def render_wrong_quote(
446 self, wrong_quote: WrongQuote | None, vote: int
447 ) -> None:
448 """Render the page with the wrong_quote and this vote."""
449 if not wrong_quote:
450 # TODO: maybe show 404 inside of the quotes ui
451 self.set_status(404, reason="Zitat nicht gefunden")
452 self.write_error(404)
453 return
455 if self.content_type in APIRequestHandler.POSSIBLE_CONTENT_TYPES:
456 return await self.finish(
457 wrong_quote_to_json(
458 wrong_quote,
459 vote,
460 self.next_id,
461 await self.get_rating_str(wrong_quote),
462 self.get_bool_argument("full", False),
463 )
464 )
466 await self.render(
467 "pages/quotes/quotes.html",
468 wrong_quote=wrong_quote,
469 next_href=self.get_next_url(),
470 next_id=f"{self.next_id[0]}-{self.next_id[1]}",
471 description=str(wrong_quote),
472 rating_filter=self.rating_filter,
473 rating=await self.get_rating_str(wrong_quote),
474 show_rating=self.get_show_rating(),
475 vote=vote,
476 )
478 async def update_saved_votes(
479 self, quote_id: int, author_id: int, vote: int
480 ) -> None:
481 """Save the new vote in Redis."""
482 if not EVENT_REDIS.is_set():
483 raise HTTPError(503)
484 result = await self.redis.setex(
485 self.get_redis_votes_key(quote_id, author_id),
486 60 * 60 * 24 * 90, # time to live in seconds (3 months)
487 str(vote), # value to save (the vote)
488 )
489 if result:
490 return
491 LOGGER.warning("Could not save vote in Redis: %s", result)
492 raise HTTPError(500, "Could not save vote")
495# pylint: disable-next=too-many-ancestors
496class QuoteAPIHandler(APIRequestHandler, QuoteById):
497 """API request handler for the quotes page."""
499 ALLOWED_METHODS: ClassVar[tuple[str, ...]] = ("GET", "POST")
501 POSSIBLE_CONTENT_TYPES: ClassVar[tuple[str, ...]] = (
502 *APIRequestHandler.POSSIBLE_CONTENT_TYPES,
503 *IMAGE_CONTENT_TYPES,
504 )
506 LONG_PATH: ClassVar[str] = "/api/zitate/%d-%d"
508 async def render_wrong_quote(
509 self, wrong_quote: WrongQuote | None, vote: int
510 ) -> None:
511 """Return the relevant data for the quotes page as JSON."""
512 if not wrong_quote:
513 return await super().render_wrong_quote(wrong_quote, vote)
514 if self.content_type == "text/plain":
515 return await self.finish(str(wrong_quote))
516 return await self.finish(
517 wrong_quote_to_json(
518 wrong_quote,
519 vote,
520 self.next_id,
521 await self.get_rating_str(wrong_quote),
522 self.request.path.endswith("/full"),
523 )
524 )
527class QuoteRedirectAPI(APIRequestHandler, QuoteBaseHandler):
528 """Redirect to the API for a random quote."""
530 POSSIBLE_CONTENT_TYPES = QuoteAPIHandler.POSSIBLE_CONTENT_TYPES
532 async def get( # pylint: disable=unused-argument
533 self, suffix: str = "", *, head: bool = False
534 ) -> None:
535 """Redirect to a random funny quote."""
536 next_filter = parse_rating_filter(self.get_argument("r", "") or "w")
537 quote_id, author_id = get_next_id(next_filter)
538 kwargs: dict[str, str] = {"r": next_filter}
539 if self.get_show_rating():
540 kwargs["show-rating"] = "sure"
541 return self.redirect(
542 self.fix_url(
543 f"/api/zitate/{quote_id}-{author_id}{suffix}",
544 query_args=kwargs,
545 )
546 )