Coverage for an_website/update/update.py: 83.333%

24 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-16 19:56 +0000

1# This program is free software: you can redistribute it and/or modify 

2# it under the terms of the GNU Affero General Public License as 

3# published by the Free Software Foundation, either version 3 of the 

4# License, or (at your option) any later version. 

5# 

6# This program is distributed in the hope that it will be useful, 

7# but WITHOUT ANY WARRANTY; without even the implied warranty of 

8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

9# GNU Affero General Public License for more details. 

10# 

11# You should have received a copy of the GNU Affero General Public License 

12# along with this program. If not, see <https://www.gnu.org/licenses/>. 

13 

14"""The API for updating the website.""" 

15 

16from __future__ import annotations 

17 

18import asyncio 

19import logging 

20import os 

21import sys 

22from asyncio import Future 

23from queue import SimpleQueue 

24from tempfile import NamedTemporaryFile, TemporaryDirectory 

25from typing import IO, TYPE_CHECKING, Any, ClassVar, Final 

26from urllib.parse import unquote 

27 

28from tornado.web import stream_request_body 

29 

30from .. import EVENT_SHUTDOWN, NAME 

31from ..utils.decorators import requires 

32from ..utils.request_handler import APIRequestHandler 

33from ..utils.utils import ModuleInfo, Permission 

34 

35if TYPE_CHECKING: 

36 from tempfile import _TemporaryFileWrapper 

37 

38LOGGER: Final = logging.getLogger(__name__) 

39 

40 

41def get_module_info() -> ModuleInfo: 

42 """Create and return the ModuleInfo for this module.""" 

43 return ModuleInfo( 

44 handlers=((r"/api/update/(.*)", UpdateAPI),), 

45 name="Update-API", 

46 description=f"API zum Aktualisieren von {NAME.removesuffix('-dev')}", 

47 hidden=True, 

48 ) 

49 

50 

51def write_from_queue(file: IO[bytes], queue: SimpleQueue[None | bytes]) -> None: 

52 """Read from a queue and write to a file.""" 

53 while (chunk := queue.get()) is not None: # pylint: disable=while-used 

54 file.write(chunk) 

55 file.flush() 

56 

57 

58@stream_request_body 

59class UpdateAPI(APIRequestHandler): # pragma: no cover 

60 """The request handler for the update API.""" 

61 

62 ALLOWED_METHODS: ClassVar[tuple[str, ...]] = ("PUT",) 

63 POSSIBLE_CONTENT_TYPES: ClassVar[tuple[str, ...]] = ("text/plain",) 

64 

65 dir: TemporaryDirectory[str] 

66 file: _TemporaryFileWrapper[bytes] 

67 queue: SimpleQueue[None | bytes] 

68 future: Future[Any] 

69 

70 def data_received(self, chunk: bytes) -> None: # noqa: D102 

71 self.queue.put(chunk) 

72 

73 def on_finish(self) -> None: # noqa: D102 

74 if hasattr(self, "queue"): 

75 self.queue.put(None) 

76 

77 async def pip_install(self, *args: str) -> int: 

78 """Install something and write the output.""" 

79 process = await asyncio.create_subprocess_exec( 

80 sys.executable, 

81 "-m", 

82 "pip", 

83 "install", 

84 "--require-virtualenv", 

85 *args, 

86 stdin=asyncio.subprocess.DEVNULL, 

87 stdout=asyncio.subprocess.PIPE, 

88 stderr=asyncio.subprocess.STDOUT, 

89 ) 

90 # pylint: disable=while-used 

91 while not process.stdout.at_eof(): # type: ignore[union-attr] 

92 char = await process.stdout.read(1) # type: ignore[union-attr] 

93 self.write(char) 

94 if char == b"\n": 

95 self.flush() # type: ignore[unused-awaitable] 

96 await process.wait() 

97 assert process.returncode is not None 

98 return process.returncode 

99 

100 async def prepare(self) -> None: # noqa: D102 

101 await super().prepare() 

102 loop = asyncio.get_running_loop() 

103 self.dir = TemporaryDirectory() 

104 self.file = NamedTemporaryFile(dir=self.dir.name, delete=False) 

105 self.queue = SimpleQueue() 

106 self.future = loop.run_in_executor( 

107 None, write_from_queue, self.file, self.queue 

108 ) 

109 

110 @requires(Permission.UPDATE) 

111 async def put(self, filename: str) -> None: 

112 """Handle PUT requests to the update API.""" 

113 self.queue.put(None) 

114 await self.future 

115 self.file.close() 

116 

117 filepath = os.path.join(self.dir.name, unquote(filename)) 

118 os.rename(self.file.name, filepath) 

119 

120 self.set_status(202) 

121 self.set_header("X-Accel-Buffering", "no") 

122 

123 await self.pip_install("--upgrade", "pip") 

124 

125 returncode = await self.pip_install(filepath) 

126 

127 await self.finish() 

128 

129 if returncode: 

130 LOGGER.error("Failed to install %s", filename) 

131 elif self.get_bool_argument("shutdown", True): 

132 EVENT_SHUTDOWN.set()