Coverage for an_website/discord/discord.py: 72.727%

66 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-10 18:56 +0000

1# This program is free software: you can redistribute it and/or modify 

2# it under the terms of the GNU Affero General Public License as 

3# published by the Free Software Foundation, either version 3 of the 

4# License, or (at your option) any later version. 

5# 

6# This program is distributed in the hope that it will be useful, 

7# but WITHOUT ANY WARRANTY; without even the implied warranty of 

8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

9# GNU Affero General Public License for more details. 

10# 

11# You should have received a copy of the GNU Affero General Public License 

12# along with this program. If not, see <https://www.gnu.org/licenses/>. 

13 

14"""A permanent redirect to an invite of the Discord guild.""" 

15 

16import time 

17from typing import Final 

18 

19import orjson as json 

20from tornado.httpclient import AsyncHTTPClient 

21from tornado.web import HTTPError 

22 

23from .. import CA_BUNDLE_PATH 

24from ..utils.request_handler import APIRequestHandler, HTMLRequestHandler 

25from ..utils.utils import ModuleInfo 

26 

27GUILD_ID: Final[str] = "367648314184826880" 

28 

29INVITE_CACHE: dict[ 

30 str, 

31 tuple[float, str, str] | tuple[float, HTTPError], 

32] = {} 

33 

34 

35def get_module_info() -> ModuleInfo: 

36 """Create and return the ModuleInfo for this module.""" 

37 return ModuleInfo( 

38 handlers=( 

39 (r"/discord", ANDiscord), 

40 (r"/api/discord", ANDiscordAPI), 

41 (f"/api/discord/({GUILD_ID})", ANDiscordAPI), 

42 (r"/api/discord/(\d+)", DiscordAPI), 

43 ), 

44 name="Discord-Einladung", 

45 short_name="Discord", 

46 description="Eine permanente Einladung zu unserer Discord-Gilde", 

47 path="/discord", 

48 keywords=( 

49 "Discord", 

50 "Server", 

51 "Guild", 

52 "Gilde", 

53 "Invite", 

54 "Einladung", 

55 ), 

56 ) 

57 

58 

59async def url_returns_200(url: str) -> bool: 

60 """Check whether a URL returns a status code of 200.""" 

61 response = await AsyncHTTPClient().fetch( 

62 url, 

63 method="HEAD", 

64 raise_error=False, 

65 ca_certs=CA_BUNDLE_PATH, 

66 ) 

67 return response.code == 200 

68 

69 

70async def get_invite(guild_id: str = GUILD_ID) -> tuple[str, str]: 

71 """ 

72 Get the invite to a Discord guild and return it with the source. 

73 

74 How to get the invite: 

75 - from the widget (has to be enabled in guild settings) 

76 - from DISBOARD (the guild needs to set it up first) 

77 

78 If the invite couldn't be fetched an HTTPError is raised. 

79 """ 

80 reason = "Invite not found." 

81 

82 # try getting the invite from the widget 

83 url = f"https://discord.com/api/guilds/{guild_id}/widget.json" 

84 response = await AsyncHTTPClient().fetch( 

85 url, raise_error=False, ca_certs=CA_BUNDLE_PATH 

86 ) 

87 if response.code == 200: 

88 response_json = json.loads(response.body) 

89 if invite := response_json["instant_invite"]: 

90 return invite, url 

91 reason = f"No instant invite in widget ({url}) found." 

92 

93 # try getting the invite from DISBOARD 

94 url = f"https://disboard.org/site/get-invite/{guild_id}" 

95 response = await AsyncHTTPClient().fetch( 

96 url, raise_error=False, ca_certs=CA_BUNDLE_PATH 

97 ) 

98 if response.code == 200: 

99 return ( 

100 json.loads(response.body), 

101 f"https://disboard.org/server/{guild_id}", 

102 ) 

103 

104 # check if Top.gg lists the guild 

105 url = f"https://top.gg/servers/{guild_id}/join" 

106 if await url_returns_200(url): 

107 return url, f"https://top.gg/servers/{guild_id}/" 

108 

109 # check if Discords.com lists the guild 

110 if await url_returns_200( 

111 # API endpoint that only returns 200 if the guild exists 

112 f"https://discords.com/api-v2/server/{guild_id}/relevant" 

113 ): 

114 return ( 

115 f"https://discords.com/servers/{guild_id}/join", 

116 f"https://discords.com/servers/{guild_id}/", 

117 ) 

118 

119 raise HTTPError(404, reason=reason) 

120 

121 

122async def get_invite_with_cache( 

123 guild_id: str = GUILD_ID, 

124) -> tuple[str, str]: 

125 """Get an invite from cache or from get_invite().""" 

126 if guild_id in INVITE_CACHE: 

127 cache_entry = INVITE_CACHE[guild_id] 

128 if cache_entry[0] > time.monotonic() - 300: 

129 if isinstance(cache_entry[1], HTTPError): 

130 raise cache_entry[1] 

131 return cache_entry[1], cache_entry[2] 

132 

133 try: 

134 invite, source = await get_invite(guild_id) 

135 except HTTPError as exc: 

136 INVITE_CACHE[guild_id] = (time.monotonic(), exc) 

137 raise exc 

138 

139 INVITE_CACHE[guild_id] = (time.monotonic(), invite, source) 

140 

141 return invite, source 

142 

143 

144class ANDiscord(HTMLRequestHandler): 

145 """The request handler that gets the Discord invite and redirects to it.""" 

146 

147 RATELIMIT_GET_LIMIT = 10 

148 

149 async def get(self, *, head: bool = False) -> None: 

150 """Get the Discord invite.""" 

151 invite = (await get_invite_with_cache(GUILD_ID))[0] 

152 if not self.user_settings.ask_before_leaving: 

153 return self.redirect(invite) 

154 if head: 

155 return 

156 return await self.render( 

157 "pages/redirect.html", 

158 send_referrer=True, 

159 redirect_url=invite, 

160 from_url=None, 

161 discord=True, 

162 ) 

163 

164 

165class DiscordAPI(APIRequestHandler): 

166 """The API request handler that gets the Discord invite and returns it.""" 

167 

168 RATELIMIT_GET_LIMIT = 5 

169 RATELIMIT_GET_COUNT_PER_PERIOD = 10 

170 

171 async def get( 

172 self, guild_id: str = GUILD_ID, *, head: bool = False 

173 ) -> None: 

174 """Get the Discord invite and render it as JSON.""" 

175 # pylint: disable=unused-argument 

176 invite, source_url = await get_invite_with_cache(guild_id) 

177 return await self.finish_dict(invite=invite, source=source_url) 

178 

179 

180class ANDiscordAPI(DiscordAPI): 

181 """The API request handler only for the AN Discord guild.""" 

182 

183 RATELIMIT_GET_LIMIT = 10 

184 RATELIMIT_GET_COUNT_PER_PERIOD = 30