p2p_daemon.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. import asyncio
  2. import functools
  3. import os
  4. import subprocess
  5. import time
  6. import uuid
  7. from contextlib import asynccontextmanager, suppress
  8. from typing import NamedTuple
  9. from multiaddr import Multiaddr, protocols
  10. from pkg_resources import resource_filename
  11. from hivemind import get_free_port
  12. from hivemind.p2p.p2p_daemon_bindings.p2pclient import Client
  13. TIMEOUT_DURATION = 30 # seconds
  14. P2PD_PATH = resource_filename("hivemind", "hivemind_cli/p2pd")
  15. async def try_until_success(coro_func, timeout=TIMEOUT_DURATION):
  16. """
  17. Keep running ``coro_func`` until the time is out.
  18. All arguments of ``coro_func`` should be filled, i.e. it should be called without arguments.
  19. """
  20. t_start = time.monotonic()
  21. while True:
  22. result = await coro_func()
  23. if result:
  24. break
  25. if (time.monotonic() - t_start) >= timeout:
  26. # timeout
  27. assert False, f"{coro_func} still failed after `{timeout}` seconds"
  28. await asyncio.sleep(0.01)
  29. class Daemon:
  30. control_maddr = None
  31. proc_daemon = None
  32. log_filename = ""
  33. f_log = None
  34. closed = None
  35. def __init__(self, control_maddr, enable_control, enable_connmgr, enable_dht, enable_pubsub):
  36. self.control_maddr = control_maddr
  37. self.enable_control = enable_control
  38. self.enable_connmgr = enable_connmgr
  39. self.enable_dht = enable_dht
  40. self.enable_pubsub = enable_pubsub
  41. self.is_closed = False
  42. self._start_logging()
  43. self._run()
  44. def _start_logging(self):
  45. name_control_maddr = str(self.control_maddr).replace("/", "_").replace(".", "_")
  46. self.log_filename = f"/tmp/log_p2pd{name_control_maddr}.txt"
  47. self.f_log = open(self.log_filename, "wb")
  48. def _run(self):
  49. cmd_list = [P2PD_PATH, f"-listen={str(self.control_maddr)}"]
  50. cmd_list += ["-hostAddrs=/ip4/127.0.0.1/tcp/0"]
  51. if self.enable_connmgr:
  52. cmd_list += ["-connManager=true", "-connLo=1", "-connHi=2", "-connGrace=0"]
  53. if self.enable_dht:
  54. cmd_list += ["-dht=true"]
  55. if self.enable_pubsub:
  56. cmd_list += ["-pubsub=true", "-pubsubRouter=gossipsub"]
  57. self.proc_daemon = subprocess.Popen(cmd_list, stdout=self.f_log, stderr=self.f_log, bufsize=0)
  58. async def wait_until_ready(self):
  59. lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:")
  60. lines_head_occurred = {line: False for line in lines_head_pattern}
  61. with open(self.log_filename, "rb") as f_log_read:
  62. async def read_from_daemon_and_check():
  63. line = f_log_read.readline()
  64. for head_pattern in lines_head_occurred:
  65. if line.startswith(head_pattern):
  66. lines_head_occurred[head_pattern] = True
  67. return all([value for _, value in lines_head_occurred.items()])
  68. await try_until_success(read_from_daemon_and_check)
  69. # sleep for a while in case that the daemon haven't been ready after emitting these lines
  70. await asyncio.sleep(0.1)
  71. def close(self):
  72. if self.is_closed:
  73. return
  74. self.proc_daemon.terminate()
  75. self.proc_daemon.wait()
  76. self.f_log.close()
  77. self.is_closed = True
  78. class DaemonTuple(NamedTuple):
  79. daemon: Daemon
  80. client: Client
  81. class ConnectionFailure(Exception):
  82. pass
  83. @asynccontextmanager
  84. async def make_p2pd_pair_unix(enable_control, enable_connmgr, enable_dht, enable_pubsub):
  85. name = str(uuid.uuid4())[:8]
  86. control_maddr = Multiaddr(f"/unix/tmp/test_p2pd_control_{name}.sock")
  87. listen_maddr = Multiaddr(f"/unix/tmp/test_p2pd_listen_{name}.sock")
  88. try:
  89. async with _make_p2pd_pair(
  90. control_maddr=control_maddr,
  91. listen_maddr=listen_maddr,
  92. enable_control=enable_control,
  93. enable_connmgr=enable_connmgr,
  94. enable_dht=enable_dht,
  95. enable_pubsub=enable_pubsub,
  96. ) as pair:
  97. yield pair
  98. finally:
  99. with suppress(FileNotFoundError):
  100. os.unlink(control_maddr.value_for_protocol(protocols.P_UNIX))
  101. with suppress(FileNotFoundError):
  102. os.unlink(listen_maddr.value_for_protocol(protocols.P_UNIX))
  103. @asynccontextmanager
  104. async def make_p2pd_pair_ip4(enable_control, enable_connmgr, enable_dht, enable_pubsub):
  105. control_maddr = Multiaddr(f"/ip4/127.0.0.1/tcp/{get_free_port()}")
  106. listen_maddr = Multiaddr(f"/ip4/127.0.0.1/tcp/{get_free_port()}")
  107. async with _make_p2pd_pair(
  108. control_maddr=control_maddr,
  109. listen_maddr=listen_maddr,
  110. enable_control=enable_control,
  111. enable_connmgr=enable_connmgr,
  112. enable_dht=enable_dht,
  113. enable_pubsub=enable_pubsub,
  114. ) as pair:
  115. yield pair
  116. @asynccontextmanager
  117. async def _make_p2pd_pair(
  118. control_maddr,
  119. listen_maddr,
  120. enable_control,
  121. enable_connmgr,
  122. enable_dht,
  123. enable_pubsub,
  124. ):
  125. p2pd = Daemon(
  126. control_maddr=control_maddr,
  127. enable_control=enable_control,
  128. enable_connmgr=enable_connmgr,
  129. enable_dht=enable_dht,
  130. enable_pubsub=enable_pubsub,
  131. )
  132. # wait for daemon ready
  133. await p2pd.wait_until_ready()
  134. client = await Client.create(control_maddr=control_maddr, listen_maddr=listen_maddr)
  135. try:
  136. async with client.listen():
  137. yield DaemonTuple(daemon=p2pd, client=client)
  138. finally:
  139. if not p2pd.is_closed:
  140. p2pd.close()
  141. async def _check_connection(p2pd_tuple_0, p2pd_tuple_1):
  142. peer_id_0, _ = await p2pd_tuple_0.identify()
  143. peer_id_1, _ = await p2pd_tuple_1.identify()
  144. peers_0 = [pinfo.peer_id for pinfo in await p2pd_tuple_0.list_peers()]
  145. peers_1 = [pinfo.peer_id for pinfo in await p2pd_tuple_1.list_peers()]
  146. return (peer_id_0 in peers_1) and (peer_id_1 in peers_0)
  147. async def connect_safe(p2pd_tuple_0, p2pd_tuple_1):
  148. peer_id_1, maddrs_1 = await p2pd_tuple_1.identify()
  149. await p2pd_tuple_0.connect(peer_id_1, maddrs_1)
  150. await try_until_success(functools.partial(_check_connection, p2pd_tuple_0=p2pd_tuple_0, p2pd_tuple_1=p2pd_tuple_1))