Coverage for an_website/utils/data_parsing.py: 65.116%

129 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 15:59 +0000

1# This program is free software: you can redistribute it and/or modify 

2# it under the terms of the GNU Affero General Public License as 

3# published by the Free Software Foundation, either version 3 of the 

4# License, or (at your option) any later version. 

5# 

6# This program is distributed in the hope that it will be useful, 

7# but WITHOUT ANY WARRANTY; without even the implied warranty of 

8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

9# GNU Affero General Public License for more details. 

10# 

11# You should have received a copy of the GNU Affero General Public License 

12# along with this program. If not, see <https://www.gnu.org/licenses/>. 

13 

14"""Parse data into classes.""" 

15 

16from __future__ import annotations 

17 

18import contextlib 

19import functools 

20import inspect 

21import typing 

22from collections.abc import Callable, Iterable, Mapping 

23from inspect import Parameter 

24from types import UnionType 

25from typing import Any, TypeVar, get_origin 

26 

27from tornado.web import HTTPError, RequestHandler 

28 

29from .utils import str_to_bool 

30 

31T = TypeVar("T") 

32 

33 

34def parse( 

35 type_: type[T], 

36 data: Iterable[Mapping[str, Any]] | Mapping[str, Any] | Any, 

37 *, 

38 strict: bool = False, 

39) -> T: 

40 """Parse a data.""" 

41 # pylint: disable=too-many-return-statements, too-complex 

42 if data is None and type_ is None: 

43 return None # type: ignore[unreachable] 

44 

45 simple_type: type = get_origin(type_) or type_ 

46 

47 if ( 

48 simple_type is type_ 

49 and not isinstance(type_, str) # type: ignore[redundant-expr] 

50 and isinstance(data, type_) 

51 ): 

52 return data 

53 

54 if simple_type == UnionType: 

55 possible = list(typing.get_args(type_)) 

56 for pos in possible: 

57 with contextlib.suppress(ValueError): 

58 return typing.cast(T, parse(pos, data, strict=strict)) 

59 raise ValueError(f"Unable to parse {data!r} into {type_}") 

60 

61 if simple_type in {list, "list"} and isinstance(data, list): 

62 return _parse_list(type_, data, strict=strict) 

63 if type_ in {bool, "bool"}: 

64 return typing.cast(T, _parse_bool(data, strict=strict)) 

65 if type_ in {str, "str"}: 

66 return typing.cast(T, _parse_str(data, strict=strict)) 

67 if type_ in {int, "int"}: 

68 return typing.cast(T, _parse_int(data, strict=strict)) 

69 if type_ in {float, "float"}: 

70 return typing.cast(T, _parse_float(data, strict=strict)) 

71 if hasattr(type_, "__init__") and isinstance(data, dict): 

72 return _parse_class(type_, data, strict=strict) 

73 

74 raise ValueError(f"Unable to parse {data!r} into {type_}") 

75 

76 

77def _parse_str(data: Any, *, strict: bool) -> str: 

78 """Parse data into str.""" 

79 if isinstance(data, str): 

80 return data 

81 if strict: 

82 raise ValueError(f"{data!r} is not str.") 

83 return str(data) 

84 

85 

86def _parse_bool(data: Any, *, strict: bool) -> bool: 

87 """Parse data into bool.""" 

88 if isinstance(data, bool): 

89 return data 

90 if strict: 

91 raise ValueError(f"{data!r} is not bool.") 

92 if data in {0, 1}: 

93 return bool(data) 

94 if isinstance(data, str): 

95 return str_to_bool(data) 

96 raise ValueError(f"Cannot parse {data!r} into bool.") 

97 

98 

99def _parse_int(data: Any, *, strict: bool) -> int: 

100 """Parse data into int.""" 

101 if isinstance(data, int): 

102 return data 

103 if isinstance(data, float) and int(data) == data: 

104 return int(data) 

105 if strict: 

106 raise ValueError(f"{data!r} is not a number.") 

107 if isinstance(data, str): 

108 return int(data, base=0) 

109 if isinstance(data, bool): 

110 return int(data) 

111 raise ValueError(f"Cannot parse {data!r} into int.") 

112 

113 

114def _parse_float(data: Any, *, strict: bool) -> float: 

115 """Parse data into float.""" 

116 if isinstance(data, float): 

117 return data 

118 if isinstance(data, int): 

119 return float(data) 

120 if strict: 

121 raise ValueError(f"{data!r} is not a number.") 

122 if isinstance(data, str): 

123 return int(data) 

124 if isinstance(data, bool): 

125 return int(data) 

126 raise ValueError(f"Cannot parse {data!r} into int.") 

127 

128 

129def _parse_list( 

130 type_: type[T], data: Iterable[Mapping[str, Any]], *, strict: bool 

131) -> T: 

132 """Parse a list of data.""" 

133 args = typing.get_args(type_) 

134 if len(args) != 1: 

135 raise ValueError(f"{type_=} should be list[...]") 

136 return typing.cast( 

137 T, [parse(args[0], spam, strict=strict) for spam in data] 

138 ) 

139 

140 

141def _parse_class(type_: type[T], data: Mapping[str, Any], *, strict: bool) -> T: 

142 """Parse data into a class.""" 

143 signature = inspect.signature(type_.__init__, eval_str=True) 

144 args: list[Any] = [] 

145 kwargs: dict[str, Any] = {} 

146 in_positional = False 

147 

148 def add(_name: str, _value: Any) -> None: 

149 if in_positional: 

150 args.append(_value) 

151 else: 

152 kwargs[_name] = _value 

153 

154 first = True 

155 for arg_name, param in signature.parameters.items(): 

156 if first: 

157 first = False 

158 continue 

159 if param.kind in {Parameter.VAR_KEYWORD, Parameter.KEYWORD_ONLY}: 

160 in_positional = True 

161 if arg_name not in data: 

162 if param.default == Parameter.empty: 

163 raise ValueError(f"Missing required argument {arg_name!r}") 

164 add(arg_name, param.default) 

165 continue 

166 value = data[arg_name] 

167 if param.annotation == Parameter.empty: 

168 if strict: 

169 raise ValueError(f"Missing type annotation for {arg_name!r}") 

170 add(arg_name, value) 

171 continue 

172 add(arg_name, parse(param.annotation, value, strict=strict)) 

173 

174 return type_(*args, **kwargs) 

175 

176 

177def parse_args( 

178 *, type_: Any, name: str = "args", validation_method: str | None = None 

179) -> Callable[[Callable[..., T]], Callable[..., T]]: 

180 """Insert kwarg with name and type into function.""" 

181 

182 def _inner(func: Callable[..., T]) -> Callable[..., T]: 

183 @functools.wraps(func) 

184 def new_func(self: RequestHandler, *args: Any, **kwargs: Any) -> T: 

185 arguments: dict[str, str] = {} 

186 for key, values in self.request.arguments.items(): 

187 if len(values) == 1: 

188 arguments[key] = values[0].decode("UTF-8", "replace") 

189 else: 

190 raise HTTPError( # we don't want to guess 

191 400, reason=f"Given multiple values for {key!r}" 

192 ) 

193 try: 

194 _data = parse(type_, arguments, strict=False) 

195 except ValueError as err: 

196 raise HTTPError(400, reason=err.args[0]) from err 

197 if validation_method: 

198 getattr(_data, validation_method)() 

199 return func(self, *args, **kwargs, **{name: _data}) 

200 

201 return new_func 

202 

203 return _inner