Coverage for an_website/reporting/reporting.py: 31.111%

90 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-16 19:56 +0000

1# This program is free software: you can redistribute it and/or modify 

2# it under the terms of the GNU Affero General Public License as 

3# published by the Free Software Foundation, either version 3 of the 

4# License, or (at your option) any later version. 

5# 

6# This program is distributed in the hope that it will be useful, 

7# but WITHOUT ANY WARRANTY; without even the implied warranty of 

8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

9# GNU Affero General Public License for more details. 

10# 

11# You should have received a copy of the GNU Affero General Public License 

12# along with this program. If not, see <https://www.gnu.org/licenses/>. 

13 

14"""The Reporting API™️ of the website.""" 

15 

16from __future__ import annotations 

17 

18import logging 

19from datetime import timedelta 

20from typing import Any, ClassVar, Final, cast 

21 

22import orjson as json 

23from elasticsearch import AsyncElasticsearch 

24from elasticsearch.exceptions import NotFoundError 

25from elasticsearch.helpers import async_bulk 

26from tornado.web import HTTPError 

27 

28from .. import EVENT_ELASTICSEARCH, ORJSON_OPTIONS 

29from ..utils.request_handler import APIRequestHandler 

30from ..utils.utils import ModuleInfo, Permission 

31 

32LOGGER: Final = logging.getLogger(__name__) 

33 

34 

35def get_module_info() -> ModuleInfo: 

36 """Create and return the ModuleInfo for this module.""" 

37 return ModuleInfo( 

38 handlers=((r"/api/reports", ReportingAPI),), 

39 name="Reporting API™️", 

40 description=( 

41 "Die Reporting API™️ kann zur Überwachung von " 

42 "Sicherheits-Verstößen, veralteten API-Aufrufen und mehr " 

43 "von Seiten des Asozialen Netzwerks genutzt werden.\n" 

44 "Bei Interesse kontakten Sie bitte das Gürteltier." 

45 ), 

46 path="/api/reports", 

47 hidden=True, 

48 ) 

49 

50 

51async def get_reports( # pylint: disable=too-many-arguments 

52 elasticsearch: AsyncElasticsearch, 

53 prefix: str, 

54 domain: None | str = None, 

55 type_: None | str = None, 

56 from_: int = 0, 

57 size: int = 10, 

58) -> list[dict[str, Any]]: 

59 """Get the reports from Elasticsearch.""" 

60 query: dict[str, dict[str, list[dict[str, dict[str, Any]]]]] 

61 query = {"bool": {"filter": [{"range": {"@timestamp": {"gte": "now-1M"}}}]}} 

62 query["bool"]["must_not"] = [ 

63 { 

64 "bool": { 

65 "filter": [ 

66 {"term": {"type": {"value": "network-error"}}}, 

67 {"term": {"body.type": {"value": "abandoned"}}}, 

68 ] 

69 } 

70 }, 

71 { 

72 "bool": { 

73 "filter": [ 

74 {"term": {"type": {"value": "csp-violation"}}}, 

75 {"term": {"body.source-file": {"value": "moz-extension"}}}, 

76 ] 

77 } 

78 }, 

79 ] 

80 if domain: 

81 query["bool"]["filter"].append( 

82 { 

83 "simple_query_string": { 

84 "query": domain, 

85 "fields": ["url.domain"], 

86 "flags": "AND|ESCAPE|NOT|OR|PHRASE|PRECEDENCE|WHITESPACE", 

87 } 

88 } 

89 ) 

90 if type_: 

91 query["bool"]["filter"].append( 

92 { 

93 "simple_query_string": { 

94 "query": type_, 

95 "fields": ["type"], 

96 "flags": "AND|ESCAPE|NOT|OR|PHRASE|PRECEDENCE|WHITESPACE", 

97 } 

98 } 

99 ) 

100 reports = await elasticsearch.search( 

101 index=f"{prefix}-reports", 

102 sort=[{"@timestamp": {"order": "desc"}}], 

103 query=query, 

104 from_=from_, 

105 size=size, 

106 ) 

107 return [report["_source"] for report in reports["hits"]["hits"]] 

108 

109 

110class ReportingAPI(APIRequestHandler): 

111 """The request handler for the Reporting API™️.""" 

112 

113 POSSIBLE_CONTENT_TYPES: ClassVar[tuple[str, ...]] = ( 

114 APIRequestHandler.POSSIBLE_CONTENT_TYPES + ("application/x-ndjson",) 

115 ) 

116 

117 RATELIMIT_GET_LIMIT: ClassVar[int] = 20 

118 RATELIMIT_GET_COUNT_PER_PERIOD: ClassVar[int] = 2 

119 

120 RATELIMIT_POST_LIMIT: ClassVar[int] = 20 

121 RATELIMIT_POST_COUNT_PER_PERIOD: ClassVar[int] = 2 

122 

123 MAX_BODY_SIZE: ClassVar[int] = 100_000_000 

124 

125 MAX_REPORTS_PER_REQUEST: ClassVar[int] = 1000 

126 

127 async def get(self, *, head: bool = False) -> None: 

128 """Handle GET requests to the Reporting API™️.""" 

129 if not EVENT_ELASTICSEARCH.is_set(): 

130 raise HTTPError(503) 

131 

132 if head: 

133 return 

134 

135 domain = self.get_argument("domain", None) 

136 type_ = self.get_argument("type", None) 

137 from_ = self.get_int_argument("from", 0, min_=0) 

138 size = self.get_int_argument("size", 10, min_=0) 

139 

140 if not self.is_authorized(Permission.REPORTING): 

141 from_ = 0 

142 size = min(1000, size) 

143 

144 try: 

145 reports = await get_reports( 

146 self.elasticsearch, 

147 self.elasticsearch_prefix, 

148 domain, 

149 type_, 

150 from_, 

151 size, 

152 ) 

153 except NotFoundError: # data stream doesn't exist 

154 raise HTTPError(404) from None 

155 

156 if self.content_type == "application/x-ndjson": 

157 await self.finish( 

158 b"\n".join( 

159 json.dumps(report, option=ORJSON_OPTIONS) 

160 for report in reports 

161 ) 

162 ) 

163 else: 

164 await self.finish(self.dump(reports)) 

165 

166 async def post(self) -> None: 

167 """Handle POST requests to the Reporting API™️.""" 

168 # pylint: disable=too-complex, too-many-branches 

169 if not ( 

170 self.settings.get("REPORTING_BUILTIN") 

171 and EVENT_ELASTICSEARCH.is_set() 

172 ): 

173 raise HTTPError(503) 

174 if self.request.headers.get("Content-Type", "").startswith( 

175 "application/reports+json" 

176 ): 

177 reports = json.loads(self.request.body) 

178 elif self.request.headers.get("Content-Type", "").startswith( 

179 "application/csp-report" 

180 ): 

181 data = json.loads(self.request.body) 

182 if not isinstance(data, dict): 

183 raise HTTPError(400) 

184 body = data.get("csp-report") 

185 if not isinstance(body, dict): 

186 raise HTTPError(400) 

187 for camel, kebab in ( 

188 ("blockedURL", "blocked-uri"), 

189 ("documentURL", "document-uri"), 

190 ("effectiveDirective", "effective-directive"), 

191 ("originalPolicy", "original-policy"), 

192 ("sample", "script-sample"), 

193 ("statusCode", "status-code"), 

194 ("violatedDirective", "violated-directive"), 

195 ): 

196 if kebab in body: 

197 body[camel] = body.pop(kebab) # 🥙 → 🐪 

198 report = { 

199 "age": 0, 

200 "body": body, 

201 "type": "csp-violation", 

202 "url": body.get("documentURL"), 

203 "user_agent": self.request.headers.get("User-Agent"), 

204 } 

205 reports = [report] 

206 else: 

207 raise HTTPError(415) 

208 if not isinstance(reports, list): 

209 raise HTTPError(400) 

210 if len(reports) > self.MAX_REPORTS_PER_REQUEST: 

211 LOGGER.warning( 

212 "%s > MAX_REPORTS_PER_REQUEST (%s)", 

213 len(reports), 

214 self.MAX_REPORTS_PER_REQUEST, 

215 ) 

216 raise HTTPError(400) 

217 self.set_status(202) 

218 self.finish() # type: ignore[unused-awaitable] 

219 for report in reports.copy(): 

220 if not isinstance(report, dict): 

221 reports.remove(report) # type: ignore[unreachable] 

222 continue 

223 if isinstance((sauce := report.pop("_source", None)), dict): 

224 report.update(sauce) 

225 if not all( 

226 ( 

227 isinstance(report.get("age"), int), 

228 isinstance(report.get("body"), dict), 

229 isinstance(report.get("type"), str), 

230 isinstance(report.get("url"), str), 

231 isinstance(report.get("user_agent"), str), 

232 ) 

233 ): 

234 reports.remove(report) 

235 continue 

236 report["@timestamp"] = self.now - timedelta( 

237 milliseconds=max(0, cast(int, report.pop("age"))) 

238 ) 

239 report["ecs"] = {"version": "8.12.0"} 

240 report["_op_type"] = "create" 

241 report.pop("_index", None) # DO NOT REMOVE 

242 await async_bulk( 

243 self.elasticsearch, 

244 reports, 

245 index=f"{self.elasticsearch_prefix}-reports", 

246 )