__init__.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import asyncio
  2. import functools
  3. import os
  4. import subprocess
  5. import time
  6. import uuid
  7. from contextlib import asynccontextmanager
  8. from typing import NamedTuple
  9. from multiaddr import Multiaddr, protocols
  10. from hivemind import find_open_port
  11. from hivemind.p2p.p2p_daemon_bindings.p2pclient import Client
  12. TIMEOUT_DURATION = 30 # seconds
  13. async def try_until_success(coro_func, timeout=TIMEOUT_DURATION):
  14. """
  15. Keep running ``coro_func`` until the time is out.
  16. All arguments of ``coro_func`` should be filled, i.e. it should be called without arguments.
  17. """
  18. t_start = time.monotonic()
  19. while True:
  20. result = await coro_func()
  21. if result:
  22. break
  23. if (time.monotonic() - t_start) >= timeout:
  24. # timeout
  25. assert False, f"{coro_func} still failed after `{timeout}` seconds"
  26. await asyncio.sleep(0.01)
  27. class Daemon:
  28. control_maddr = None
  29. proc_daemon = None
  30. log_filename = ""
  31. f_log = None
  32. closed = None
  33. def __init__(
  34. self, control_maddr, enable_control, enable_connmgr, enable_dht, enable_pubsub
  35. ):
  36. self.control_maddr = control_maddr
  37. self.enable_control = enable_control
  38. self.enable_connmgr = enable_connmgr
  39. self.enable_dht = enable_dht
  40. self.enable_pubsub = enable_pubsub
  41. self.is_closed = False
  42. self._start_logging()
  43. self._run()
  44. def _start_logging(self):
  45. name_control_maddr = str(self.control_maddr).replace("/", "_").replace(".", "_")
  46. self.log_filename = f"/tmp/log_p2pd{name_control_maddr}.txt"
  47. self.f_log = open(self.log_filename, "wb")
  48. def _run(self):
  49. cmd_list = ["hivemind/hivemind_cli/p2pd", f"-listen={str(self.control_maddr)}"]
  50. cmd_list += [f"-hostAddrs=/ip4/127.0.0.1/tcp/{find_open_port()}"]
  51. if self.enable_connmgr:
  52. cmd_list += ["-connManager=true", "-connLo=1", "-connHi=2", "-connGrace=0"]
  53. if self.enable_dht:
  54. cmd_list += ["-dht=true"]
  55. if self.enable_pubsub:
  56. cmd_list += ["-pubsub=true", "-pubsubRouter=gossipsub"]
  57. self.proc_daemon = subprocess.Popen(
  58. cmd_list, stdout=self.f_log, stderr=self.f_log, bufsize=0
  59. )
  60. async def wait_until_ready(self):
  61. lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:")
  62. lines_head_occurred = {line: False for line in lines_head_pattern}
  63. with open(self.log_filename, "rb") as f_log_read:
  64. async def read_from_daemon_and_check():
  65. line = f_log_read.readline()
  66. for head_pattern in lines_head_occurred:
  67. if line.startswith(head_pattern):
  68. lines_head_occurred[head_pattern] = True
  69. return all([value for _, value in lines_head_occurred.items()])
  70. await try_until_success(read_from_daemon_and_check)
  71. # sleep for a while in case that the daemon haven't been ready after emitting these lines
  72. await asyncio.sleep(0.1)
  73. def close(self):
  74. if self.is_closed:
  75. return
  76. self.proc_daemon.terminate()
  77. self.proc_daemon.wait()
  78. self.f_log.close()
  79. self.is_closed = True
  80. class DaemonTuple(NamedTuple):
  81. daemon: Daemon
  82. client: Client
  83. class ConnectionFailure(Exception):
  84. pass
  85. @asynccontextmanager
  86. async def make_p2pd_pair_unix(
  87. enable_control, enable_connmgr, enable_dht, enable_pubsub
  88. ):
  89. name = str(uuid.uuid4())[:8]
  90. control_maddr = Multiaddr(f"/unix/tmp/test_p2pd_control_{name}.sock")
  91. listen_maddr = Multiaddr(f"/unix/tmp/test_p2pd_listen_{name}.sock")
  92. # Remove the existing unix socket files if they are existing
  93. try:
  94. os.unlink(control_maddr.value_for_protocol(protocols.P_UNIX))
  95. except FileNotFoundError:
  96. pass
  97. try:
  98. os.unlink(listen_maddr.value_for_protocol(protocols.P_UNIX))
  99. except FileNotFoundError:
  100. pass
  101. async with _make_p2pd_pair(
  102. control_maddr=control_maddr,
  103. listen_maddr=listen_maddr,
  104. enable_control=enable_control,
  105. enable_connmgr=enable_connmgr,
  106. enable_dht=enable_dht,
  107. enable_pubsub=enable_pubsub,
  108. ) as pair:
  109. yield pair
  110. @asynccontextmanager
  111. async def make_p2pd_pair_ip4(enable_control, enable_connmgr, enable_dht, enable_pubsub):
  112. control_maddr = Multiaddr(f"/ip4/127.0.0.1/tcp/{find_open_port()}")
  113. listen_maddr = Multiaddr(f"/ip4/127.0.0.1/tcp/{find_open_port()}")
  114. async with _make_p2pd_pair(
  115. control_maddr=control_maddr,
  116. listen_maddr=listen_maddr,
  117. enable_control=enable_control,
  118. enable_connmgr=enable_connmgr,
  119. enable_dht=enable_dht,
  120. enable_pubsub=enable_pubsub,
  121. ) as pair:
  122. yield pair
  123. @asynccontextmanager
  124. async def _make_p2pd_pair(
  125. control_maddr,
  126. listen_maddr,
  127. enable_control,
  128. enable_connmgr,
  129. enable_dht,
  130. enable_pubsub,
  131. ):
  132. p2pd = Daemon(
  133. control_maddr=control_maddr,
  134. enable_control=enable_control,
  135. enable_connmgr=enable_connmgr,
  136. enable_dht=enable_dht,
  137. enable_pubsub=enable_pubsub,
  138. )
  139. # wait for daemon ready
  140. await p2pd.wait_until_ready()
  141. client = Client(control_maddr=control_maddr, listen_maddr=listen_maddr)
  142. try:
  143. async with client.listen():
  144. yield DaemonTuple(daemon=p2pd, client=client)
  145. finally:
  146. if not p2pd.is_closed:
  147. p2pd.close()
  148. async def _check_connection(p2pd_tuple_0, p2pd_tuple_1):
  149. peer_id_0, _ = await p2pd_tuple_0.identify()
  150. peer_id_1, _ = await p2pd_tuple_1.identify()
  151. peers_0 = [pinfo.peer_id for pinfo in await p2pd_tuple_0.list_peers()]
  152. peers_1 = [pinfo.peer_id for pinfo in await p2pd_tuple_1.list_peers()]
  153. return (peer_id_0 in peers_1) and (peer_id_1 in peers_0)
  154. async def connect_safe(p2pd_tuple_0, p2pd_tuple_1):
  155. peer_id_1, maddrs_1 = await p2pd_tuple_1.identify()
  156. await p2pd_tuple_0.connect(peer_id_1, maddrs_1)
  157. await try_until_success(
  158. functools.partial(
  159. _check_connection, p2pd_tuple_0=p2pd_tuple_0, p2pd_tuple_1=p2pd_tuple_1
  160. )
  161. )