Coverage for an_website/patches/json.py: 30.612%
49 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-16 19:56 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-16 19:56 +0000
1# This program is free software: you can redistribute it and/or modify
2# it under the terms of the GNU Affero General Public License as
3# published by the Free Software Foundation, either version 3 of the
4# License, or (at your option) any later version.
5#
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU Affero General Public License for more details.
10#
11# You should have received a copy of the GNU Affero General Public License
12# along with this program. If not, see <https://www.gnu.org/licenses/>.
14"""A very fast JSON implementation."""
16from __future__ import annotations
18import inspect
19from collections.abc import Callable
20from json.encoder import JSONEncoder
21from typing import IO, Any, Protocol, TypeVar
23import orjson
24import regex
26_T_co = TypeVar("_T_co", covariant=True)
29class SupportsRead(Protocol[_T_co]): # noqa: D101
30 # pylint: disable=missing-class-docstring, too-few-public-methods
31 def read(self, __length: int = ...) -> _T_co: # noqa: D102
32 # pylint: disable=missing-function-docstring
33 ...
36def get_caller_name() -> None | str: # noqa: D103
37 # pylint: disable=missing-function-docstring
38 try:
39 frame = inspect.currentframe()
40 frame = frame.f_back if frame else None
41 if not (frame and "__name__" in frame.f_globals):
42 return None
43 name = frame.f_globals["__name__"]
44 while ( # pylint: disable=while-used
45 frame and frame.f_globals.get("__name__") == name # type: ignore[redundant-expr, truthy-bool] # noqa: B950
46 ):
47 frame = frame.f_back
48 caller = frame.f_globals.get("__name__") if frame else None
49 finally:
50 del frame
51 return caller
54def dumps( # noqa: D103 # pylint: disable=too-many-arguments
55 obj: Any,
56 *,
57 skipkeys: bool = False,
58 ensure_ascii: bool = True,
59 check_circular: bool = True,
60 allow_nan: bool = True,
61 cls: None | type[JSONEncoder] = None,
62 indent: None | int | str = None,
63 separators: None | tuple[str, str] = None,
64 default: None | Callable[[Any], Any] = None,
65 sort_keys: bool = False,
66 **kwargs: Any,
67) -> str:
68 # pylint: disable=missing-function-docstring
69 output: str | bytes
70 caller = get_caller_name()
71 option = orjson.OPT_SERIALIZE_NUMPY
72 if caller == "tornado.escape":
73 option |= orjson.OPT_NAIVE_UTC | orjson.OPT_UTC_Z
74 else:
75 option |= orjson.OPT_PASSTHROUGH_DATACLASS
76 if sort_keys:
77 option |= orjson.OPT_SORT_KEYS
78 if indent is not None:
79 option |= orjson.OPT_INDENT_2
80 if cls is not None:
81 _ = cls(
82 skipkeys=skipkeys,
83 ensure_ascii=ensure_ascii,
84 check_circular=check_circular,
85 allow_nan=allow_nan,
86 indent=indent,
87 separators=separators,
88 default=default,
89 sort_keys=sort_keys,
90 **kwargs,
91 )
92 default = _.default
93 output = orjson.dumps(obj, default, option)
94 if indent not in {None, 2, " "}:
95 if isinstance(indent, int):
96 indent = " " * indent
97 indent_bytes = str(indent).encode("UTF-8")
98 output = regex.sub(
99 rb"(?m)^\s+",
100 lambda match: len(match[0]) // 2 * indent_bytes,
101 output,
102 )
103 return output.decode("UTF-8")
106def dump(obj: Any, fp: IO[str], **kwargs: Any) -> None: # noqa: D103
107 # pylint: disable=missing-function-docstring
108 fp.write(dumps(obj, **kwargs))
111def loads(s: str | bytes, **kwargs: Any) -> Any: # noqa: D103
112 # pylint: disable=missing-function-docstring, unused-argument
113 return orjson.loads(s)
116def load(fp: SupportsRead[str | bytes], **kwargs: Any) -> Any: # noqa: D103
117 # pylint: disable=missing-function-docstring, unused-argument
118 return loads(fp.read())