1#!/usr/bin/env python3 


3# This program is free software: you can redistribute it and/or modify 

4# it under the terms of the GNU Affero General Public License as 

5# published by the Free Software Foundation, either version 3 of the 

6# License, or (at your option) any later version. 


8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 


11# GNU Affero General Public License for more details. 


13# You should have received a copy of the GNU Affero General Public License 

14# along with this program. If not, see <>. 


16"""The client for the backdoor API of the website.""" 


18from __future__ import annotations 


20import ast 

21import asyncio 

22import http.client 

23import io 

24import os 

25import pickle # nosec: B403 

26import pydoc 

27import re # pylint: disable=preferred-module 

28import socket 

29import sys 

30import time 

31import traceback 

32import uuid 

33from base64 import b64encode 

34from import Callable, Iterable, MutableMapping 

35from contextlib import suppress 

36from importlib import import_module 

37from textwrap import dedent 

38from types import EllipsisType 

39from typing import Any, Required, TypeAlias, TypedDict, cast 

40from urllib.parse import SplitResult, quote, quote_plus, urlsplit 


42with suppress(ModuleNotFoundError): 

43 # pylint: disable=shadowed-import 

44 import dill as pickle # type: ignore[import-untyped, no-redef] # noqa: F811, B950 # nosec: B403 



47 import hy # type: ignore[import-untyped] 

48except ModuleNotFoundError: 

49 hy = None # pylint: disable=invalid-name 



52 import idna 

53except ModuleNotFoundError: 

54 idna = None # type: ignore[assignment] # pylint: disable=invalid-name 



57 import socks # type: ignore[import-untyped] 

58except ModuleNotFoundError: 

59 socks = None # pylint: disable=invalid-name 


61if os.environ.get("DISABLE_UVLOOP") not in {"y", "yes", "t", "true", "on", "1"}: 

62 with suppress(ModuleNotFoundError): 

63 asyncio.set_event_loop_policy(import_module("uvloop").EventLoopPolicy()) 


65E = eval( # pylint: disable=eval-used # nosec: B307 

66 "eval(repr((_:=[],_.append(_))[0]))[0][0]" 



69ErrorTuple: TypeAlias = tuple[int, str] 


71FLUFL = True 



74class Response(TypedDict): # noqa: D101 

75 # pylint: disable=missing-class-docstring 

76 success: bool | EllipsisType 

77 output: None | str 

78 result: None | tuple[str, None | bytes] | SystemExit 



81class Proxy(TypedDict, total=False): # noqa: D101 

82 # pylint: disable=missing-class-docstring 

83 type: Required[int] 

84 addr: Required[str] 

85 port: int 

86 rdns: bool 

87 username: str 

88 password: str 



91async def create_socket( 

92 addr: str, 

93 port: int | str, 

94 proxy: None | Proxy, 

95) -> socket.socket: 

96 """Create a socket (optionally with a proxy).""" 

97 # pylint: disable=too-complex 

98 if proxy is not None and not socks: 

99 raise NotImplementedError("PySocks is required for proxy support") 

100 loop = asyncio.get_running_loop() 

101 address_infos = await loop.getaddrinfo( 

102 addr, 

103 port, 

104 # PySocks doesn't support AF_INET6 

105 family=0 if proxy is None else socket.AF_INET, 

106 type=socket.SOCK_STREAM, 

107 ) 

108 if not address_infos: 

109 raise OSError("getaddrinfo() returned empty list") 

110 exceptions = [] 

111 for address_info in address_infos: 

112 sock = None 

113 try: 

114 sock = cast( 

115 socket.socket, 

116 ( 

117 socks.socksocket(address_info[0]) 

118 if proxy is not None 

119 else socket.socket(address_info[0]) 

120 ), 

121 ) 

122 sock.setblocking(False) 

123 if proxy is not None: 

124 sock.set_proxy( # type: ignore[attr-defined] 

125 proxy["type"], 

126 proxy["addr"], 

127 proxy.get("port"), 

128 proxy.get("rdns", True), 

129 proxy.get("username"), 

130 proxy.get("password"), 

131 ) 

132 await loop.sock_connect(sock, address_info[4]) 

133 return sock 

134 except OSError as exc: 

135 if sock is not None: 

136 sock.close() 

137 exceptions.append(exc) 

138 continue 

139 except BaseException: 

140 if sock is not None: 

141 sock.close() 

142 raise 

143 if len(exceptions) == 1: 

144 raise exceptions[0] 

145 model = str(exceptions[0]) 

146 if all(str(exc) == model for exc in exceptions): 

147 raise exceptions[0] 

148 raise OSError( 

149 f"Multiple exceptions: {', '.join(str(exc) for exc in exceptions)}" 

150 ) 



153async def request( # noqa: C901 

154 method: str, 

155 url: str | SplitResult, 

156 headers: None | MutableMapping[str, str] = None, 

157 body: None | bytes | bytearray | Iterable[bytes | bytearray] | str = None, 

158 proxy: None | Proxy = None, 

159) -> tuple[int, dict[str, str], bytes]: 

160 """Insanely awesome HTTP client.""" 

161 # pylint: disable=line-too-long, too-complex 

162 # pylint: disable=too-many-branches, too-many-locals, while-used 

163 if isinstance(url, str): 

164 url = urlsplit(url) 

165 if url.scheme not in {"", "http", "https"}: 

166 raise ValueError(f"Unsupported scheme: {url.scheme}") 

167 if not url.hostname: 

168 raise ValueError("URL has no hostname") 

169 if headers is None: 

170 headers = {} 

171 if isinstance(body, str): 

172 body = body.encode("UTF-8") 

173 if isinstance(body, memoryview): 

174 body = body.tobytes() 

175 if isinstance(body, Iterable) and not isinstance(body, (bytes, bytearray)): 

176 body = b"".join(body) # type: ignore[arg-type] 

177 https = url.scheme == "https" 

178 header_names = [x.strip().title() for x in headers.keys()] 

179 if "Host" not in header_names: 

180 host: None | str = None 

181 if idna: # type: ignore[truthy-bool] 

182 with suppress(idna.core.InvalidCodepoint): 

183 host = idna.encode(url.hostname).decode("ASCII") 

184 host = f"{host}:{url.port}" if url.port else host 

185 if not host: 

186 host = url.netloc.encode("IDNA").decode("ASCII") 

187 headers["Host"] = host 

188 if body and "Content-Length" not in header_names: 

189 headers["Content-Length"] = str(len(body)) 

190 sock = await create_socket( 

191 url.hostname, 

192 url.port or ("https" if https else "http"), 

193 proxy, 

194 ) 

195 reader, writer = await asyncio.open_connection( 

196 sock=sock, 

197 ssl=https, 

198 server_hostname=url.hostname if https else None, 

199 ) 

200 writer.write( 

201 ( 

202 method 

203 + " " 

204 + (quote(url.path) or "/") 

205 + ("?" + quote_plus(url.query) if url.query else "") 

206 + " HTTP/1.0\r\n" 

207 ).encode("ASCII") 

208 + "\r\n".join( 

209 [f"{key}:{value}" for key, value in headers.items()] + [""] 

210 ).encode("LATIN-1") 

211 + b"\r\n" 

212 ) 

213 if body: 

214 writer.write(body) 

215 await writer.drain() 

216 del headers 

217 e, data = E, b"" 

218 while chunk := await 

219 if b"\r\n\r\n" in (data := data + chunk) and e is E: 

220 e, data = data.split(b"\r\n\r\n", 1) 

221 status, o = re.match(r"HTTP/.+? (\d+).*?\r\n(.*)", e.decode("LATIN-1"), 24).groups() # type: ignore[union-attr] # noqa: B950 

222 headers = dict(re.match(r"([^\s]+):\s*(.+?)\s*$", x, 24).groups() for x in o.split("\r\n")) # type: ignore[union-attr, misc] # noqa: B950 

223 writer.close() 

224 await writer.wait_closed() 

225 if "status" not in locals(): 

226 raise AssertionError("No HTTP response received") 

227 # pylint: disable-next=possibly-used-before-assignment 

228 return int(status), headers, data # type: ignore[possibly-undefined] 



231def detect_mode(code: str) -> str: 

232 """Detect which mode needs to be used.""" 

233 import __future__ # pylint: disable=import-outside-toplevel 


235 flags: int = ast.PyCF_ONLY_AST 

236 if FLUFL: 

237 flags |= __future__.barry_as_FLUFL.compiler_flag 


239 try: 

240 compile(code, "", "eval", flags, cast(bool, 0x5F3759DF), 0) 

241 return "eval" 

242 except SyntaxError: 

243 compile(code, "", "exec", flags, cast(bool, 0x5F3759DF), 0) 

244 return "exec" 



247def send( 

248 url: str | SplitResult, 

249 key: str, 

250 code: str, 

251 mode: str = "exec", 

252 session: None | str = None, 

253 proxy: None | Proxy = None, 

254) -> tuple[int, dict[str, str], Response | ErrorTuple | str | None | bytes]: 

255 """Send code to the backdoor API.""" 

256 # pylint: disable=too-many-arguments 

257 body = code.encode("UTF-8") 

258 if isinstance(url, str): 

259 url = urlsplit(url) 

260 if not url.path: 

261 url = url._replace(path="/api/backdoor") 

262 key = f"Bearer {b64encode(key.encode('UTF-8')).decode('ASCII')}" 

263 headers = { 

264 "Authorization": key, 

265 "Accept": ( 

266 "application/vnd.uqfoundation.dill" 

267 if pickle.__name__ == "dill" 

268 else "application/vnd.python.pickle" 

269 ), 

270 "X-Pickle-Protocol": str(pickle.HIGHEST_PROTOCOL), 

271 "X-Future-Feature": "annotations", 

272 } 

273 if FLUFL: 

274 headers["X-Future-Feature"] += ", barry_as_FLUFL" 

275 if session: 

276 headers["X-Backdoor-Session"] = session 

277 response = 

278 request( 

279 "POST", 

280 url._replace(path=f"{url.path.removesuffix('/')}/{mode}"), 

281 headers, 

282 body, 

283 proxy, 

284 ) 

285 ) 

286 try: 

287 return ( 

288 response[0], # status 

289 response[1], # header 

290 pickle.loads(response[2]), # data # nosec: B301 

291 ) 

292 except pickle.UnpicklingError: 

293 return response 



296def lisp_always_active() -> bool: 

297 """Return True if LISP is always active.""" 

298 return ( 

299 hy 

300 and not hy.eval( 


302 '(* (- (* (+ 0 1) 2 3 4 5) (+ 6 7 8 9 10 11)) ' # fmt: skip 

303 '(int (= (. (__import__ "os.path") sep) "/")))' 

304 ) 

305 ) 

306 and not int.from_bytes( 

307 getattr( 

308 os, 

309 "洀漀搀渀愀爀甀".encode("UTF-16-BE")[::-1].decode("UTF-16-BE"), 

310 )(1), 

311 "big", 

312 ) 

313 // (69 // 4 - 1) 

314 ) 



317def run_and_print( # noqa: C901 

318 url: str, 

319 key: str, 

320 code: str, 

321 lisp: bool = False, 

322 session: None | str = None, 

323 proxy: None | Proxy = None, 

324 time_requests: bool = False, 

325 *, 

326 # pylint: disable=redefined-builtin 

327 print: Callable[..., None] = print, 

328) -> None: 

329 """Run the code and print the output.""" 

330 # pylint: disable=too-complex, too-many-arguments, too-many-branches 

331 # pylint: disable=too-many-locals, too-many-statements 

332 start_time = time.monotonic() 

333 if lisp or lisp_always_active(): 

334 code = hy.disassemble(, True) 

335 try: 

336 response = send( 

337 url, 

338 key, 

339 code, 

340 detect_mode(code), 

341 session, 

342 proxy, 

343 ) 

344 except SyntaxError as exc: 

345 print("".join(traceback.format_exception_only(exc)).strip()) 

346 return 

347 if time_requests: 

348 took = time.monotonic() - start_time 

349 if took > 1: 

350 color = "91" # red 

351 elif took > 0.5: 

352 color = "93" # yellow 

353 else: 

354 color = "92" # green 

355 print(f"\033[{color}mTook: {took:.3f}s\033[0m") 

356 status, headers, body = response # pylint: disable=unused-variable 

357 if status >= 400: 

358 reason = ( 

359 body[1] 

360 if isinstance(body, tuple) 

361 else http.client.responses[status] 

362 ) 

363 print("\033[91m" + f"{status} {reason}" + "\033[0m") 

364 if isinstance(body, str): 

365 print("\033[91m" + body + "\033[0m") 

366 elif body is None: # pylint: disable=confusing-consecutive-elif 

367 pass 

368 elif isinstance(body, dict): 

369 if isinstance(body["success"], bool): 

370 print(f"Success: {body['success']}") 

371 if isinstance(body["output"], str) and body["output"]: 

372 print("Output:") 

373 print(body["output"].strip()) 

374 if isinstance(body["result"], SystemExit): 

375 raise body["result"] 

376 if isinstance(body["result"], tuple): 

377 if not body["success"]: 

378 print(body["result"][0]) 

379 return 

380 result_obj: Any = None 

381 if isinstance(body["result"][1], bytes): 

382 try: 

383 result_obj = pickle.loads(body["result"][1]) # nosec: B301 

384 except Exception: # pylint: disable=broad-except 

385 if sys.flags.dev_mode: 

386 traceback.print_exc() 

387 if ( 

388 isinstance(result_obj, tuple) 

389 and len(result_obj) == 2 

390 and result_obj[0] == "PagerTuple" 

391 and isinstance(result_obj[1], str) 

392 ): 

393 pydoc.pager(result_obj[1]) 

394 else: 

395 print("Result:") 

396 print(body["result"][0]) 

397 else: 

398 print(f"Response has unexpected type {type(body).__name__}!") 

399 print(body) 



402def shellify(code: str) -> str: 

403 """Modify code in a way that it gets executed in a shell.""" 

404 if not code.startswith("!"): 

405 return code 


407 code = ( 

408 code[1:] 

409 .strip() 

410 .replace("\\", r"\\") 

411 .replace("\n", r"\n") 

412 .replace('"', r"\"") 

413 ) 

414 return f""" 

415async def run_shell_50821273052022fbc283(): 

416 import asyncio 

417 proc = await asyncio.create_subprocess_shell( 

418 {code!r}, 

419 asyncio.subprocess.DEVNULL, 

420 stdout=asyncio.subprocess.PIPE, 

421 stderr=asyncio.subprocess.STDOUT, 

422 ) 

423 output, _ = await asyncio.wait_for(proc.communicate(), 60 * 60) 

424 if output: 

425 print(output.decode("UTF-8")) 

426await run_shell_50821273052022fbc283() 

427del run_shell_50821273052022fbc283 




431def main() -> int | str: # noqa: C901 

432 """Parse arguments, load the config and start the backdoor client.""" 

433 # pylint: disable=too-complex, too-many-branches 

434 # pylint: disable=too-many-locals, too-many-statements 


436 # pylint: disable-next=import-outside-toplevel, import-error, useless-suppression 

437 from pyrepl.python_reader import ( # type: ignore[import, unused-ignore] 

438 ReaderConsole, 

439 main as _main, 

440 ) 


442 if "--help" in sys.argv or "-h" in sys.argv: 

443 print( 

444 dedent( 

445 """\ 

446 Accepted arguments: 


448 --dev use a separate config for a local dev instance 

449 --lisp enable Lots of Irritating Superfluous Parentheses 

450 --new-proxy don't use the saved proxy 

451 --new-session start a new session with saved URL and key 

452 --no-config start without loading/saving the config 

453 --no-patch-help don't patch help() 

454 --reset-config reset the whole config 

455 --timing print the time it took to execute each command 


457 --help or -h show this help message 

458 """ 

459 ), 

460 end="", 

461 file=sys.stderr, 

462 ) 

463 return 0 

464 for arg in sys.argv[1:]: 

465 if arg not in { 

466 "--dev", 

467 "--lisp", 

468 "--new-proxy", 

469 "--new-session", 

470 "--no-config", 

471 "--no-patch-help", 

472 "--reset-config", 

473 "--timing", 

474 "--help", 

475 "-h", 

476 }: 

477 print(f"Unknown argument: {arg}") 

478 sys.exit(64 + 4 + 1) 

479 file: io.IOBase 

480 url: None | str = None 

481 key: None | str = None 

482 session: None | str = None 

483 proxy_type: None | int | EllipsisType = None 

484 proxy_addr: None | str = None 

485 proxy_port: None | int = None 

486 proxy_rdns: None | bool = True 

487 proxy_username: None | str = None 

488 proxy_password: None | str = None 

489 config_pickle = os.path.join( 

490 os.path.expanduser(os.getenv("XDG_CONFIG_HOME") or "~/.config"), 

491 "an-backdoor-client/" 

492 + ("dev-" if "--dev" in sys.argv else "") 

493 + "session.pickle", 

494 ) 

495 if "--reset-config" in sys.argv and os.path.exists(config_pickle): 

496 os.remove(config_pickle) 

497 if "--no-config" not in sys.argv: 

498 try: 

499 with open(config_pickle, "rb") as file: 

500 config = pickle.load(file) # nosec: B301 

501 except FileNotFoundError: 

502 pass 

503 else: 

504 url = config.get("url") 

505 key = config.get("key") 

506 session = config.get("session") 

507 proxy_type = config.get("proxy_type") 

508 proxy_addr = config.get("proxy_addr") 

509 proxy_port = config.get("proxy_port") 

510 proxy_rdns = config.get("proxy_rdns") 

511 proxy_username = config.get("proxy_username") 

512 proxy_password = config.get("proxy_password") 

513 if "--new-session" in sys.argv: 

514 print(f"Using URL {url}") 

515 else: 

516 print(f"Using URL {url} with existing session {session}") 

517 while not url: # pylint: disable=while-used 

518 url = input("URL: ").strip().rstrip("/") 

519 if not url: 

520 print("No URL given!") 

521 elif "://" not in url: 

522 if not url.startswith("//"): 

523 url = "//" + url 

524 if re.match( 

525 r"^(\/\/)(localhost|127\.0\.0\.1|\[::1\])(\:\d+)?(/\S*)?$", url 

526 ): 

527 url = "http:" + url 

528 else: 

529 url = "https:" + url 

530 print(f"Using URL {url}") 


532 while not key: # pylint: disable=while-used 

533 key = input("Key: ").strip() 

534 if not key: 

535 print("No key given!") 


537 if proxy_type is None or "--new-proxy" in sys.argv: 

538 proxy_url_str = input("Proxy (leave empty for none): ").strip() 

539 if proxy_url_str: 

540 if "://" not in proxy_url_str: 

541 if not proxy_url_str.startswith("//"): 

542 proxy_url_str = "//" + proxy_url_str 

543 proxy_url_str = "socks5:" + proxy_url_str 

544 proxy_url = urlsplit(proxy_url_str) 

545 if proxy_url.hostname: 

546 proxy_type = int(socks.PROXY_TYPES[proxy_url.scheme.upper()]) 

547 proxy_addr = proxy_url.hostname 

548 proxy_port = proxy_url.port 

549 proxy_rdns = True 

550 proxy_username = proxy_url.username or None 

551 proxy_password = proxy_url.password or None 

552 else: 

553 print("Invalid proxy URL!") 

554 else: 

555 print("No proxy given!") 

556 if isinstance(proxy_type, EllipsisType): 

557 proxy_type = None 

558 if proxy_type is not None: 

559 print( 

560 f"Using {socks.PRINTABLE_PROXY_TYPES[proxy_type]} proxy " 

561 f"{proxy_addr}{f':{proxy_port}' if proxy_port else ''}" 

562 + (f" with username {proxy_username}" if proxy_username else "") 

563 ) 

564 else: 

565 print("Using no proxy (use --new-proxy to be able to set one)") 


567 if not session or "--new-session" in sys.argv: 

568 session = input("Session (enter nothing for random session): ") 

569 if not session: 

570 session = str(uuid.uuid4()) 

571 print(f"Using session {session}") 


573 if "--no-config" not in sys.argv: 

574 os.makedirs(os.path.dirname(config_pickle), exist_ok=True) 

575 with open(config_pickle, "wb") as file: 

576 pickle.dump( 

577 { 

578 "url": url, 

579 "key": key, 

580 "session": session, 

581 "proxy_type": proxy_type or ..., # not None (None == ask) 

582 "proxy_addr": proxy_addr, 

583 "proxy_port": proxy_port, 

584 "proxy_rdns": proxy_rdns, 

585 "proxy_username": proxy_username, 

586 "proxy_password": proxy_password, 

587 }, 

588 file, 

589 5, 

590 ) 


592 proxy: None | Proxy 


594 if proxy_type and proxy_addr: 

595 proxy = {"type": proxy_type, "addr": proxy_addr} 

596 if proxy_port: 

597 proxy["port"] = proxy_port 

598 if proxy_rdns: 

599 proxy["rdns"] = proxy_rdns 

600 if proxy_username and proxy_password: 

601 proxy["username"] = proxy_username 

602 proxy["password"] = proxy_password 

603 else: 

604 proxy = None 


606 def send_to_remote(code: str, *, mode: str) -> Any: 

607 """Send code to the remote backdoor and return the unpickled body.""" 

608 return send( 

609 url, 

610 key, 

611 code, 

612 mode, 

613 session, 

614 proxy, 

615 )[2] 


617 if "--no-patch-help" not in sys.argv: 

618 body = send_to_remote( 

619 # fmt: off 

620 "class _HelpHelper_92005ecf3788faea8346a7919fba0232188561ab:\n" 

621 " def __call__(self, *args, **kwargs):\n" 

622 " import io\n" 

623 " import pydoc\n" 

624 " helper_output = io.StringIO()\n" 

625 " pydoc.Helper(io.StringIO(), helper_output)(*args, **kwargs)\n" 

626 " return 'PagerTuple', helper_output.getvalue()\n" 

627 f" __str__ = __repr__ = lambda _:{repr(help)!r}\n" # noqa: E131 

628 "help = _HelpHelper_92005ecf3788faea8346a7919fba0232188561ab()\n" 

629 "del _HelpHelper_92005ecf3788faea8346a7919fba0232188561ab", 

630 # fmt: on 

631 mode="exec", 

632 ) 

633 if not (isinstance(body, dict) and body["success"]): 

634 print("\033[91mPatching help() failed!\033[0m") 


636 if "--lisp" in sys.argv: 

637 if not hy: 

638 sys.exit("\033[91mHy is not installed!\033[0m") 

639 body = send_to_remote("__import__('hy')", mode="exec") 

640 if not (isinstance(body, dict) and body["success"]): 

641 print("\033[91mInjecting Hy builtins failed!\033[0m") 


643 body = send_to_remote( 

644 "import sys\nprint('Python', sys.version, 'on', sys.platform)", 

645 mode="exec", 

646 ) 

647 if isinstance(body, dict) and body["success"] and body["output"]: 

648 print(f"\033[92mConnection to {url} was successful.\033[0m") 

649 print(body["output"].strip()) 

650 else: 

651 print("\033[91mGetting remote information failed.\033[0m") 

652 print( 

653 'Type "copyright", "credits" or ' 

654 'use the "help" function for more information.' 

655 ) 


657 def _run_and_print( # type: ignore[no-any-unimported] 

658 self: ReaderConsole, 

659 code: str, 

660 ) -> None: 

661 # pylint: disable=unused-argument 

662 try: 

663 run_and_print( 

664 url, 

665 key, 

666 shellify(code), 

667 "--lisp" in sys.argv, 

668 session, 

669 proxy, 

670 "--timing" in sys.argv, 

671 ) 

672 except Exception: # pylint: disable=broad-except 

673 print( 

674 "\033[91mAn unexpected error occurred. " 

675 "Please contact a developer.\033[0m" 

676 ) 

677 traceback.print_exc() 


679 # patch the reader console to use our function 

680 rc_execute = ReaderConsole.execute 

681 try: 

682 ReaderConsole.execute = _run_and_print 

683 # run the reader 

684 _main(print_banner=False, clear_main=False) 

685 except EOFError: 

686 pass 

687 finally: 

688 # restore the original method 

689 # pylint: disable-next=redefined-variable-type 

690 ReaderConsole.execute = rc_execute 


692 return 0 



695if __name__ == "__main__": 

696 sys.exit(main())