Coverage for an_website/quotes/utils.py: 64.793%
338 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-14 14:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-14 14:44 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
14"""A page with wrong quotes."""
16from __future__ import annotations
18import abc
19import asyncio
20import contextlib
21import logging
22import multiprocessing.synchronize
23import random
24import sys
25import time
26from collections.abc import (
27 Callable,
28 Iterable,
29 Mapping,
30 MutableMapping,
31 Sequence,
32)
33from dataclasses import dataclass
34from datetime import date
35from multiprocessing import Value
36from typing import Any, Final, Literal, cast
37from urllib.parse import urlencode
39import dill # type: ignore[import-untyped] # nosec: B403
40import elasticapm
41import orjson as json
42import typed_stream
43from redis.asyncio import Redis
44from tornado.httpclient import AsyncHTTPClient
45from tornado.web import Application, HTTPError
46from UltraDict import UltraDict # type: ignore[import-untyped]
48from .. import (
49 CA_BUNDLE_PATH,
50 DIR as ROOT_DIR,
51 EVENT_REDIS,
52 EVENT_SHUTDOWN,
53 NAME,
54 ORJSON_OPTIONS,
55 pytest_is_running,
56)
57from ..utils.request_handler import HTMLRequestHandler
58from ..utils.utils import ModuleInfo, Permission, ratelimit
60DIR: Final = ROOT_DIR / "quotes"
62LOGGER: Final = logging.getLogger(__name__)
64API_URL: Final[str] = "https://zitate.prapsschnalinen.de/api"
67# pylint: disable-next=too-few-public-methods
68class UltraDictType[K, V](MutableMapping[K, V], abc.ABC):
69 """The type of the shared dictionaries."""
71 lock: multiprocessing.synchronize.RLock
74QUOTES_CACHE: Final[UltraDictType[int, Quote]] = UltraDict(
75 buffer_size=1024**2, serializer=dill
76)
77AUTHORS_CACHE: Final[UltraDictType[int, Author]] = UltraDict(
78 buffer_size=1024**2, serializer=dill
79)
80WRONG_QUOTES_CACHE: Final[UltraDictType[tuple[int, int], WrongQuote]] = (
81 UltraDict(buffer_size=1024**2, serializer=dill)
82)
84MAX_QUOTES_ID = Value("Q", 0)
85MAX_AUTHORS_ID = Value("Q", 0)
88@dataclass(init=False, slots=True)
89class QuotesObjBase(abc.ABC):
90 """An object with an id."""
92 id: int
94 @abc.abstractmethod
95 async def fetch_new_data(self) -> QuotesObjBase:
96 """Fetch new data from the API."""
97 raise NotImplementedError
99 # pylint: disable=unused-argument
100 def get_id_as_str(self, minify: bool = False) -> str:
101 """Get the id of the object as a string."""
102 return str(self.id)
104 @abc.abstractmethod
105 def get_path(self) -> str:
106 """Return the path to the Object."""
107 raise NotImplementedError
110@dataclass(slots=True)
111class Author(QuotesObjBase):
112 """The author object with a name."""
114 name: str
115 # tuple(url_to_info, info_str, creation_date)
116 info: None | tuple[str, None | str, date]
118 def __str__(self) -> str:
119 """Return the name of the author."""
120 return self.name
122 async def fetch_new_data(self) -> Author:
123 """Fetch new data from the API."""
124 return parse_author(
125 await make_api_request(
126 f"authors/{self.id}", entity_should_exist=True
127 )
128 )
130 def get_path(self) -> str:
131 """Return the path to the author info."""
132 return f"/zitate/info/a/{self.id}"
134 def to_json(self) -> dict[str, Any]:
135 """Get the author as JSON."""
136 return {
137 "id": self.id,
138 "name": str(self),
139 "path": self.get_path(),
140 "info": (
141 {
142 "source": self.info[0],
143 "text": self.info[1],
144 "date": self.info[2].isoformat(),
145 }
146 if self.info
147 else None
148 ),
149 }
152@dataclass(slots=True)
153class Quote(QuotesObjBase):
154 """The quote object with a quote text and an author."""
156 quote: str
157 author_id: int
159 def __str__(self) -> str:
160 """Return the content of the quote."""
161 return self.quote.strip()
163 @property
164 def author(self) -> Author:
165 """Get the corresponding author object."""
166 return AUTHORS_CACHE[self.author_id]
168 async def fetch_new_data(self) -> Quote:
169 """Fetch new data from the API."""
170 return parse_quote(
171 await make_api_request(
172 f"quotes/{self.id}", entity_should_exist=True
173 ),
174 self,
175 )
177 def get_path(self) -> str:
178 """Return the path to the quote info."""
179 return f"/zitate/info/z/{self.id}"
181 def to_json(self) -> dict[str, Any]:
182 """Get the quote as JSON."""
183 return {
184 "id": self.id,
185 "quote": str(self),
186 "author": self.author.to_json(),
187 "path": self.get_path(),
188 }
191@dataclass(slots=True)
192class WrongQuote(QuotesObjBase):
193 """The wrong quote object with a quote, an author and a rating."""
195 quote_id: int
196 author_id: int
197 rating: int
199 def __str__(self) -> str:
200 r"""
201 Return the wrong quote.
203 like: '»quote« - author'.
204 """
205 return f"»{self.quote}« - {self.author}"
207 @property
208 def author(self) -> Author:
209 """Get the corresponding author object."""
210 return AUTHORS_CACHE[self.author_id]
212 async def fetch_new_data(self) -> WrongQuote:
213 """Fetch new data from the API."""
214 if self.id == -1:
215 api_data = await make_api_request(
216 "wrongquotes",
217 {
218 "quote": str(self.quote_id),
219 "simulate": "true",
220 "author": str(self.author_id),
221 },
222 entity_should_exist=True,
223 )
224 if api_data:
225 api_data = api_data[0]
226 else:
227 api_data = await make_api_request(
228 f"wrongquotes/{self.id}", entity_should_exist=True
229 )
230 if not api_data:
231 return self
232 return parse_wrong_quote(api_data, self)
234 def get_id(self) -> tuple[int, int]:
235 """
236 Get the id of the quote and the author in a tuple.
238 :return tuple(quote_id, author_id)
239 """
240 return self.quote_id, self.author_id
242 def get_id_as_str(self, minify: bool = False) -> str:
243 """
244 Get the id of the wrong quote as a string.
246 Format: quote_id-author_id
247 """
248 if minify and self.id != -1:
249 return str(self.id)
250 return f"{self.quote_id}-{self.author_id}"
252 def get_path(self) -> str:
253 """Return the path to the wrong quote."""
254 return f"/zitate/{self.get_id_as_str()}"
256 @property
257 def quote(self) -> Quote:
258 """Get the corresponding quote object."""
259 return QUOTES_CACHE[self.quote_id]
261 def to_json(self) -> dict[str, Any]:
262 """Get the wrong quote as JSON."""
263 return {
264 "id": self.get_id_as_str(),
265 "quote": self.quote.to_json(),
266 "author": self.author.to_json(),
267 "rating": self.rating,
268 "path": self.get_path(),
269 }
271 async def vote(
272 # pylint: disable=unused-argument
273 self,
274 vote: Literal[-1, 1],
275 lazy: bool = False,
276 ) -> WrongQuote:
277 """Vote for the wrong quote."""
278 if self.id == -1:
279 raise ValueError("Can't vote for a not existing quote.")
280 # if lazy: # simulate the vote and do the actual voting later
281 # self.rating += vote
282 # asyncio.get_running_loop().call_soon_threadsafe(
283 # self.vote,
284 # vote,
285 # )
286 # return self
287 # do the voting
288 return parse_wrong_quote(
289 await make_api_request(
290 f"wrongquotes/{self.id}",
291 method="POST",
292 body={"vote": str(vote)},
293 entity_should_exist=True,
294 ),
295 self,
296 )
299def get_wrong_quotes(
300 filter_fun: None | Callable[[WrongQuote], bool] = None,
301 *,
302 sort: bool = False, # sorted by rating
303 filter_real_quotes: bool = True,
304 shuffle: bool = False,
305) -> Sequence[WrongQuote]:
306 """Get cached wrong quotes."""
307 if shuffle and sort:
308 raise ValueError("Sort and shuffle can't be both true.")
309 wqs: list[WrongQuote] = list(WRONG_QUOTES_CACHE.values())
310 if filter_fun or filter_real_quotes:
311 for i in reversed(range(len(wqs))):
312 if (filter_fun and not filter_fun(wqs[i])) or (
313 filter_real_quotes
314 and wqs[i].quote.author_id == wqs[i].author_id
315 ):
316 del wqs[i]
317 if shuffle:
318 random.shuffle(wqs)
319 elif sort:
320 wqs.sort(key=lambda wq: wq.rating, reverse=True)
321 return wqs
324def get_quotes(
325 filter_fun: None | Callable[[Quote], bool] = None,
326 shuffle: bool = False,
327) -> list[Quote]:
328 """Get cached quotes."""
329 quotes: list[Quote] = list(QUOTES_CACHE.values())
330 if filter_fun:
331 for i in reversed(range(len(quotes))):
332 if not filter_fun(quotes[i]):
333 del quotes[i]
334 if shuffle:
335 random.shuffle(quotes)
336 return quotes
339def get_authors(
340 filter_fun: None | Callable[[Author], bool] = None,
341 shuffle: bool = False,
342) -> list[Author]:
343 """Get cached authors."""
344 authors: list[Author] = list(AUTHORS_CACHE.values())
345 if filter_fun:
346 for i in reversed(range(len(authors))):
347 if not filter_fun(authors[i]):
348 del authors[i]
349 if shuffle:
350 random.shuffle(authors)
351 return authors
354# pylint: disable-next=too-many-arguments
355async def make_api_request(
356 endpoint: str,
357 args: Mapping[str, str] | None = None,
358 *,
359 entity_should_exist: bool,
360 method: Literal["GET", "POST"] = "GET",
361 body: None | Mapping[str, str | int] = None,
362 request_timeout: float | None = None,
363) -> Any: # TODO: list[dict[str, Any]] | dict[str, Any]:
364 """Make API request and return the result as dict."""
365 if pytest_is_running():
366 return None
367 query = f"?{urlencode(args)}" if args else ""
368 url = f"{API_URL}/{endpoint}{query}"
369 body_str = urlencode(body) if body else body
370 response = await AsyncHTTPClient().fetch(
371 url,
372 method=method,
373 headers={"Content-Type": "application/x-www-form-urlencoded"},
374 body=body_str,
375 raise_error=False,
376 ca_certs=CA_BUNDLE_PATH,
377 request_timeout=request_timeout,
378 )
379 if response.code != 200:
380 normed_response_code = (
381 400
382 if not entity_should_exist and response.code == 500
383 else response.code
384 )
385 LOGGER.log(
386 logging.ERROR if normed_response_code >= 500 else logging.WARNING,
387 "%s request to %r with body=%r failed with code=%d and reason=%r",
388 method,
389 url,
390 body_str,
391 response.code,
392 response.reason,
393 )
394 raise HTTPError(
395 normed_response_code if normed_response_code in {400, 404} else 503,
396 reason=(
397 f"{API_URL}/{endpoint} returned: "
398 f"{response.code} {response.reason}"
399 ),
400 )
401 return json.loads(response.body)
404def fix_author_name(name: str) -> str:
405 """Fix common mistakes in authors."""
406 if len(name) > 2 and name.startswith("(") and name.endswith(")"):
407 # remove () from author name, that shouldn't be there
408 name = name[1:-1]
409 return name.strip()
412def parse_author(json_data: Mapping[str, Any]) -> Author:
413 """Parse an author from JSON data."""
414 id_ = int(json_data["id"])
415 name = fix_author_name(json_data["author"])
417 with AUTHORS_CACHE.lock:
418 author = AUTHORS_CACHE.get(id_)
419 if author is None:
420 # pylint: disable-next=too-many-function-args
421 author = Author(id_, name, None)
422 MAX_AUTHORS_ID.value = max(MAX_AUTHORS_ID.value, id_)
423 elif author.name != name:
424 author.name = name
425 author.info = None # reset info
427 AUTHORS_CACHE[author.id] = author
429 return author
432def fix_quote_str(quote_str: str) -> str:
433 """Fix common mistakes in quotes."""
434 if (
435 len(quote_str) > 2
436 and quote_str.startswith(('"', "„", "“"))
437 and quote_str.endswith(('"', "“", "”"))
438 ):
439 # remove quotation marks from quote, that shouldn't be there
440 quote_str = quote_str[1:-1]
442 return quote_str.strip()
445def parse_quote(
446 json_data: Mapping[str, Any], quote: None | Quote = None
447) -> Quote:
448 """Parse a quote from JSON data."""
449 quote_id = int(json_data["id"])
450 author = parse_author(json_data["author"]) # update author
451 quote_str = fix_quote_str(json_data["quote"])
453 with QUOTES_CACHE.lock:
454 if quote is None: # no quote supplied, try getting it from cache
455 quote = QUOTES_CACHE.get(quote_id)
456 if quote is None: # new quote
457 # pylint: disable=too-many-function-args
458 quote = Quote(quote_id, quote_str, author.id)
459 MAX_QUOTES_ID.value = max(MAX_QUOTES_ID.value, quote.id)
460 else: # quote was already saved
461 quote.quote = quote_str
462 quote.author_id = author.id
464 QUOTES_CACHE[quote.id] = quote
466 return quote
469def parse_wrong_quote(
470 json_data: Mapping[str, Any], wrong_quote: None | WrongQuote = None
471) -> WrongQuote:
472 """Parse a wrong quote and update the cache."""
473 quote = parse_quote(json_data["quote"])
474 author = parse_author(json_data["author"])
476 id_tuple = (quote.id, author.id)
477 rating = json_data["rating"]
478 wrong_quote_id = int(json_data.get("id") or -1)
480 if wrong_quote is None:
481 with WRONG_QUOTES_CACHE.lock:
482 wrong_quote = WRONG_QUOTES_CACHE.get(id_tuple)
483 if wrong_quote is None:
484 wrong_quote = (
485 WrongQuote( # pylint: disable=unexpected-keyword-arg
486 id=wrong_quote_id,
487 quote_id=quote.id,
488 author_id=author.id,
489 rating=rating,
490 )
491 )
492 WRONG_QUOTES_CACHE[id_tuple] = wrong_quote
493 return wrong_quote
495 # make sure the wrong quote is the correct one
496 if (wrong_quote.quote_id, wrong_quote.author_id) != id_tuple:
497 raise HTTPError(reason="ERROR: -41")
499 # update the data of the wrong quote
500 if wrong_quote.rating != rating:
501 wrong_quote.rating = rating
502 if wrong_quote.id != wrong_quote_id:
503 wrong_quote.id = wrong_quote_id
505 WRONG_QUOTES_CACHE[id_tuple] = wrong_quote
507 return wrong_quote
510async def parse_list_of_quote_data(
511 json_list: str | Iterable[Mapping[str, Any]],
512 parse_fun: Callable[[Mapping[str, Any]], QuotesObjBase],
513) -> tuple[QuotesObjBase, ...]:
514 """Parse a list of quote data."""
515 if not json_list:
516 return ()
517 if isinstance(json_list, str):
518 json_list = cast(list[dict[str, Any]], json.loads(json_list))
519 return_list = []
520 for json_data in json_list:
521 _ = parse_fun(json_data)
522 await asyncio.sleep(0)
523 return_list.append(_)
524 return tuple(return_list)
527async def update_cache_periodically(
528 app: Application, worker: int | None
529) -> None:
530 """Start updating the cache every hour."""
531 # pylint: disable=too-complex, too-many-branches
532 if "/troet" in typed_stream.Stream(
533 cast(Iterable[ModuleInfo], app.settings.get("MODULE_INFOS", ()))
534 ).map(lambda m: m.path):
535 app.settings["SHOW_SHARING_ON_MASTODON"] = True
536 if worker:
537 return
538 with contextlib.suppress(asyncio.TimeoutError):
539 await asyncio.wait_for(EVENT_REDIS.wait(), 5)
540 redis: Redis[str] = cast("Redis[str]", app.settings.get("REDIS"))
541 prefix: str = app.settings.get("REDIS_PREFIX", NAME).removesuffix("-dev")
542 apm: None | elasticapm.Client
543 if EVENT_REDIS.is_set(): # pylint: disable=too-many-nested-blocks
544 await parse_list_of_quote_data(
545 await redis.get(f"{prefix}:cached-quote-data:authors"), # type: ignore[arg-type] # noqa: B950
546 parse_author,
547 )
548 await parse_list_of_quote_data(
549 await redis.get(f"{prefix}:cached-quote-data:quotes"), # type: ignore[arg-type] # noqa: B950
550 parse_quote,
551 )
552 await parse_list_of_quote_data(
553 await redis.get(f"{prefix}:cached-quote-data:wrongquotes"), # type: ignore[arg-type] # noqa: B950
554 parse_wrong_quote,
555 )
556 if QUOTES_CACHE and AUTHORS_CACHE and WRONG_QUOTES_CACHE:
557 last_update = await redis.get(
558 f"{prefix}:cached-quote-data:last-update"
559 )
560 if last_update:
561 last_update_int = int(last_update)
562 since_last_update = int(time.time()) - last_update_int
563 if 0 <= since_last_update < 60 * 60:
564 # wait until the last update is at least one hour old
565 update_cache_in = 60 * 60 - since_last_update
566 if not sys.flags.dev_mode and update_cache_in > 60:
567 # if in production mode update wrong quotes just to be sure
568 try:
569 await update_cache(
570 app, update_quotes=False, update_authors=False
571 )
572 except Exception: # pylint: disable=broad-except
573 LOGGER.exception("Updating quotes cache failed")
574 apm = app.settings.get("ELASTIC_APM", {}).get(
575 "CLIENT"
576 )
577 if apm:
578 apm.capture_exception()
579 else:
580 LOGGER.info("Updated quotes cache successfully")
581 LOGGER.info(
582 "Next update of quotes cache in %d seconds",
583 update_cache_in,
584 )
585 await asyncio.sleep(update_cache_in)
587 # update the cache every hour
588 failed = 0
589 while not EVENT_SHUTDOWN.is_set(): # pylint: disable=while-used
590 try:
591 await update_cache(app)
592 except Exception: # pylint: disable=broad-except
593 LOGGER.exception("Updating quotes cache failed")
594 if apm := app.settings.get("ELASTIC_APM", {}).get("CLIENT"):
595 apm.capture_exception()
596 failed += 1
597 await asyncio.sleep(pow(min(failed * 2, 60), 2)) # 4,16,...,60*60
598 else:
599 LOGGER.info("Updated quotes cache successfully")
600 failed = 0
601 await asyncio.sleep(60 * 60)
604async def update_cache(
605 app: Application,
606 update_wrong_quotes: bool = True,
607 update_quotes: bool = True,
608 update_authors: bool = True,
609) -> None:
610 """Fill the cache with all data from the API."""
611 LOGGER.info("Updating quotes cache")
612 redis: Redis[str] = cast("Redis[str]", app.settings.get("REDIS"))
613 prefix: str = app.settings.get("REDIS_PREFIX", NAME).removesuffix("-dev")
614 redis_available = EVENT_REDIS.is_set()
616 if update_wrong_quotes:
617 await parse_list_of_quote_data(
618 wq_data := await make_api_request(
619 "wrongquotes",
620 entity_should_exist=True,
621 request_timeout=100,
622 ),
623 parse_wrong_quote,
624 )
625 if wq_data and redis_available:
626 await redis.setex(
627 f"{prefix}:cached-quote-data:wrongquotes",
628 60 * 60 * 24 * 30,
629 json.dumps(wq_data, option=ORJSON_OPTIONS),
630 )
632 if update_quotes:
633 await parse_list_of_quote_data(
634 quotes_data := await make_api_request(
635 "quotes", entity_should_exist=True
636 ),
637 parse_quote,
638 )
639 if quotes_data and redis_available:
640 await redis.setex(
641 f"{prefix}:cached-quote-data:quotes",
642 60 * 60 * 24 * 30,
643 json.dumps(quotes_data, option=ORJSON_OPTIONS),
644 )
646 if update_authors:
647 await parse_list_of_quote_data(
648 authors_data := await make_api_request(
649 "authors", entity_should_exist=True
650 ),
651 parse_author,
652 )
653 if authors_data and redis_available:
654 await redis.setex(
655 f"{prefix}:cached-quote-data:authors",
656 60 * 60 * 24 * 30,
657 json.dumps(authors_data, option=ORJSON_OPTIONS),
658 )
660 if (
661 redis_available
662 and update_wrong_quotes
663 and update_quotes
664 and update_authors
665 ):
666 await redis.setex(
667 f"{prefix}:cached-quote-data:last-update",
668 60 * 60 * 24 * 30,
669 int(time.time()),
670 )
673async def get_author_by_id(author_id: int) -> Author:
674 """Get an author by its id."""
675 author = AUTHORS_CACHE.get(author_id)
676 if author is not None:
677 return author
678 return parse_author(
679 await make_api_request(
680 f"authors/{author_id}", entity_should_exist=False
681 )
682 )
685async def get_quote_by_id(quote_id: int) -> Quote:
686 """Get a quote by its id."""
687 quote = QUOTES_CACHE.get(quote_id)
688 if quote is not None:
689 return quote
690 return parse_quote(
691 await make_api_request(f"quotes/{quote_id}", entity_should_exist=False)
692 )
695async def get_wrong_quote(
696 quote_id: int, author_id: int, use_cache: bool = True
697) -> WrongQuote:
698 """Get a wrong quote with a quote id and an author id."""
699 wrong_quote = WRONG_QUOTES_CACHE.get((quote_id, author_id))
700 if wrong_quote:
701 if use_cache:
702 return wrong_quote
703 # do not use cache, so update the wrong quote data
704 return await wrong_quote.fetch_new_data()
705 # wrong quote not in cache
706 if use_cache and quote_id in QUOTES_CACHE and author_id in AUTHORS_CACHE:
707 # we don't need to request anything, as the wrong_quote probably has
708 # no ratings just use the cached quote and author
709 # pylint: disable-next=too-many-function-args
710 return WrongQuote(-1, quote_id, author_id, 0)
711 # request the wrong quote from the API
712 result = await make_api_request(
713 "wrongquotes",
714 {
715 "quote": str(quote_id),
716 "simulate": "true",
717 "author": str(author_id),
718 },
719 entity_should_exist=False,
720 )
721 if result:
722 return parse_wrong_quote(result[0])
724 raise HTTPError(404, reason="Falsches Zitat nicht gefunden")
727async def get_rating_by_id(quote_id: int, author_id: int) -> int:
728 """Get the rating of a wrong quote."""
729 return (await get_wrong_quote(quote_id, author_id)).rating
732def get_random_quote_id() -> int:
733 """Get random quote id."""
734 return random.randint(1, MAX_QUOTES_ID.value) # nosec: B311
737def get_random_author_id() -> int:
738 """Get random author id."""
739 return random.randint(1, MAX_AUTHORS_ID.value) # nosec: B311
742def get_random_id() -> tuple[int, int]:
743 """Get random wrong quote id."""
744 return (
745 get_random_quote_id(),
746 get_random_author_id(),
747 )
750async def create_wq_and_vote(
751 vote: Literal[-1, 1],
752 quote_id: int,
753 author_id: int,
754 contributed_by: str,
755 fast: bool = False,
756) -> WrongQuote:
757 """
758 Vote for the wrong_quote with the API.
760 If the wrong_quote doesn't exist yet, create it.
761 """
762 wrong_quote = WRONG_QUOTES_CACHE.get((quote_id, author_id))
763 if wrong_quote and wrong_quote.id != -1:
764 return await wrong_quote.vote(vote, fast)
765 # we don't know the wrong_quote_id, so we have to create the wrong_quote
766 wrong_quote = parse_wrong_quote(
767 await make_api_request(
768 "wrongquotes",
769 method="POST",
770 body={
771 "quote": str(quote_id),
772 "author": str(author_id),
773 "contributed_by": contributed_by,
774 },
775 entity_should_exist=False,
776 )
777 )
778 return await wrong_quote.vote(vote, lazy=True)
781class QuoteReadyCheckHandler(HTMLRequestHandler):
782 """Class that checks if quotes have been loaded."""
784 async def check_ready(self) -> None:
785 """Fail if quotes aren't ready yet."""
786 if not WRONG_QUOTES_CACHE:
787 # should work in a few seconds, the quotes just haven't loaded yet
788 self.set_header("Retry-After", "5")
789 raise HTTPError(503, reason="Service available in a few seconds")
791 async def prepare(self) -> None: # noqa: D102
792 await super().prepare()
793 if self.request.method != "OPTIONS":
794 await self.check_ready()
796 if ( # pylint: disable=too-many-boolean-expressions
797 self.settings.get("RATELIMITS")
798 and self.request.method not in {"HEAD", "OPTIONS"}
799 and not self.is_authorized(Permission.RATELIMITS)
800 and not self.crawler
801 and (
802 self.request.path.endswith(".xlsx")
803 or self.content_type == "application/vnd.ms-excel"
804 )
805 ):
806 if self.settings.get("UNDER_ATTACK") or not EVENT_REDIS.is_set():
807 raise HTTPError(503)
809 ratelimited, headers = await ratelimit(
810 self.redis,
811 self.redis_prefix,
812 str(self.request.remote_ip),
813 bucket="quotes:image:xlsx",
814 max_burst=4,
815 count_per_period=1,
816 period=60,
817 tokens=1 if self.request.method != "HEAD" else 0,
818 )
820 for header, value in headers.items():
821 self.set_header(header, value)
823 if ratelimited:
824 if self.now.date() == date(self.now.year, 4, 20):
825 self.set_status(420)
826 self.write_error(420)
827 else:
828 self.set_status(429)
829 self.write_error(429)