Coverage for an_website/utils/elastic_transport_async_http_node.py: 32.692%

52 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-10 18:56 +0000

1# This program is free software: you can redistribute it and/or modify 

2# it under the terms of the GNU Affero General Public License as 

3# published by the Free Software Foundation, either version 3 of the 

4# License, or (at your option) any later version. 

5# 

6# This program is distributed in the hope that it will be useful, 

7# but WITHOUT ANY WARRANTY; without even the implied warranty of 

8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

9# GNU Affero General Public License for more details. 

10# 

11# You should have received a copy of the GNU Affero General Public License 

12# along with this program. If not, see <https://www.gnu.org/licenses/>. 

13"""Async elasticsearch.""" 

14 

15import logging 

16from typing import Final 

17 

18from elastic_transport import ( 

19 ApiResponse, 

20 ApiResponseMeta, 

21 BaseAsyncNode, 

22 ConnectionError as ESConnectionError, 

23 ConnectionTimeout, 

24 HttpHeaders, 

25 NodeConfig, 

26 TlsError, 

27 TransportError, 

28) 

29from elastic_transport.client_utils import DefaultType 

30from tornado.httpclient import AsyncHTTPClient, HTTPClientError, HTTPRequest 

31 

32try: 

33 import pycurl 

34 

35 # pylint: disable-next=ungrouped-imports 

36 from tornado.curl_httpclient import CurlError 

37except ImportError: 

38 CurlError = None # type: ignore[misc, assignment] 

39 pycurl = None # type: ignore[assignment] 

40 

41 

42LOGGER: Final = logging.getLogger(__name__) 

43 

44 

45class TornadoConnectionError(ESConnectionError): 

46 """An error occured while connecting.""" 

47 

48 __str__ = TransportError.__str__ 

49 

50 

51class TornadoAsyncNode(BaseAsyncNode): 

52 """A node that performs requests.""" 

53 

54 __client: AsyncHTTPClient | None 

55 

56 def __init__(self, config: NodeConfig) -> None: 

57 """Initialise self.""" 

58 super().__init__(config) 

59 self.__client = None 

60 

61 @property 

62 def _client(self) -> AsyncHTTPClient: 

63 """Get the AsyncHTTPClient.""" 

64 if self.__client is None: 

65 self.__client = AsyncHTTPClient( 

66 force_instance=True, 

67 defaults={ 

68 "ca_certs": self.config.ca_certs, 

69 "client_cert": self.config.client_cert, 

70 "client_key": self.config.client_key, 

71 "ssl_options": self.config.ssl_context, 

72 "use_gzip": self._http_compress, 

73 "validate_cert": self.config.verify_certs, 

74 }, 

75 ) 

76 return self.__client 

77 

78 async def close(self) -> None: # type: ignore[override] 

79 """Close the connection.""" 

80 if self.__client: 

81 self.__client.close() 

82 self.__client = None 

83 

84 # pylint: disable-next=too-many-arguments 

85 async def perform_request( # type: ignore[override] 

86 self, 

87 method: str, 

88 target: str, 

89 body: bytes | None = None, 

90 headers: HttpHeaders | None = None, 

91 request_timeout: float | DefaultType | None = None, 

92 ) -> ApiResponse[bytes]: 

93 """Perform a request.""" 

94 if isinstance(request_timeout, DefaultType): 

95 request_timeout = None 

96 url = self.base_url + target 

97 

98 request = HTTPRequest( 

99 url=url, 

100 method=method, 

101 body=body, 

102 headers={ 

103 **self._headers, 

104 **(headers or {}), 

105 }, 

106 request_timeout=request_timeout, 

107 ) 

108 

109 try: 

110 response = await self._client.fetch( 

111 request, 

112 raise_error=False, 

113 ) 

114 except HTTPClientError as e: 

115 err: TransportError | None = None 

116 if CurlError is not None and isinstance(e, CurlError): 

117 assert pycurl is not None, "pycurl is present if CurlError is" 

118 curl_errno: int = e.errno # pylint: disable=no-member 

119 if curl_errno == pycurl.E_COULDNT_CONNECT: 

120 err = ESConnectionError("Could not connect to server") 

121 elif curl_errno == pycurl.E_OPERATION_TIMEOUTED: 

122 err = ConnectionTimeout("Timeout was reached") 

123 elif curl_errno == pycurl.E_SSL_CONNECT_ERROR: 

124 err = TlsError("SSL connect error") 

125 elif e.code == 599: 

126 # TODO: maybe use curl_easy_strerror to get a string 

127 # added in pycurl 17ecb45612b99232cc0908ffa535f960eb485800 

128 err = TornadoConnectionError(f"{e.message} ({curl_errno})") 

129 err = err or TornadoConnectionError(str(e)) 

130 err.errors = (e,) 

131 raise err from e 

132 

133 LOGGER.debug( 

134 "%s %s [status:%s duration:%fs]", 

135 method, 

136 url, 

137 response.code, 

138 response.request_time, 

139 ) 

140 

141 return ApiResponse( 

142 meta=ApiResponseMeta( 

143 status=response.code, 

144 duration=( 

145 -1.0 

146 if response.request_time is None 

147 else response.request_time 

148 ), 

149 headers=HttpHeaders(response.headers), 

150 node=self._config, 

151 http_version="1.1", 

152 ), 

153 body=response.body, 

154 )