Coverage for an_website / utils / search.py: 66.667%
60 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 18:33 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 18:33 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
14"""Module used for easy and simple searching in O(n) complexity."""
16import dataclasses
17from collections.abc import Callable, Iterable, Iterator, Sequence
18from typing import Generic, NoReturn, TypeVar
20import regex as re
21from typed_stream import Stream
23T = TypeVar("T")
24U = TypeVar("U")
25V = TypeVar("V")
28class Query:
29 """Class representing a query."""
31 _data: tuple[str, Sequence[str], int]
32 __slots__ = ("_data",)
34 def __bool__(self) -> bool:
35 """Return False if this query matches everything."""
36 return bool(self.words)
38 def __hash__(self) -> int:
39 """Hash this."""
40 return hash(self.query)
42 def __init__(self, query: str) -> None:
43 """Initialize this."""
44 if hasattr(self, "_data"):
45 raise ValueError("Already initialized.")
46 query = query.lower()
47 # pylint: disable=bad-builtin
48 words = tuple(filter(None, re.split(r"\W+", query)))
49 words_len = sum(len(word) for word in words)
50 object.__setattr__(self, "_data", (query, words, words_len))
52 def __reduce__(self) -> tuple[type[Query], tuple[str]]:
53 """Reduce this object."""
54 return Query, (self.query,)
56 def __repr__(self) -> str:
57 """Return a string representation of self."""
58 return f"Query({self.query!r})"
60 def __setattr__(self, key: object, value: object) -> NoReturn:
61 """Raise an AttributeError."""
62 raise AttributeError("Cannot modify Query.")
64 @property
65 def query(self) -> str:
66 """The original query."""
67 return self._data[0]
69 def score(self, field_values: tuple[str, ...]) -> float:
70 """Field values needs to be a tuple of lower cased strings."""
71 if any(self.query in val for val in field_values):
72 return 1.0
73 return sum(
74 (
75 sum(word in value for value in field_values)
76 * (len(word) / self.words_len)
77 )
78 for word in self.words
79 ) / len(field_values)
81 @property
82 def words(self) -> Sequence[str]:
83 """The words in the query."""
84 return self._data[1]
86 @property
87 def words_len(self) -> int:
88 """Return sum(len(word) for word in self.words)."""
89 return self._data[2]
92@dataclasses.dataclass(frozen=True, slots=True, order=True)
93class ScoredValue(Generic[T]):
94 """Value with score."""
96 score: float
97 value: T
100class DataProvider(Generic[T, U]):
101 """Provide Data."""
103 __slots__ = ("_data", "_key", "_convert")
105 _data: Iterable[T] | Callable[[], Iterable[T]]
106 _key: Callable[[T], str | tuple[str, ...]]
107 _convert: Callable[[T], U]
109 def __init__(
110 self,
111 data: Iterable[T] | Callable[[], Iterable[T]],
112 key: Callable[[T], str | tuple[str, ...]],
113 convert: Callable[[T], U],
114 ) -> None:
115 """Initialize this."""
116 self._data = data
117 self._key = key
118 self._convert = convert
120 def _value_to_fields(self, value: T) -> tuple[str, ...]:
121 """Convert a value to a tuple of strings."""
122 return (
123 (cpv.lower(),)
124 if isinstance(cpv := self._key(value), str)
125 else tuple(map(str.lower, cpv)) # pylint: disable=bad-builtin
126 )
128 @property
129 def data(self) -> Iterable[T]:
130 """Return the data."""
131 return self._data if isinstance(self._data, Iterable) else self._data()
133 def search(
134 self, query: Query, excl_min_score: float = 0.0
135 ) -> Iterator[ScoredValue[U]]:
136 """Search this."""
137 for value in self.data:
138 score = query.score(self._value_to_fields(value))
139 if score > excl_min_score:
140 yield ScoredValue(score, self._convert(value))
143def search(
144 query: Query,
145 *providers: DataProvider[object, T],
146 excl_min_score: float = 0.0,
147) -> list[ScoredValue[T]]:
148 """Search through data."""
149 return sorted(
150 Stream(providers).flat_map(lambda x: x.search(query, excl_min_score)),
151 key=lambda sv: sv.score,
152 )