test_p2p_daemon_bindings.py 25 KB


  1. import asyncio
  2. import functools
  3. import io
  4. import os
  5. import subprocess
  6. import time
  7. import uuid
  8. from contextlib import asynccontextmanager, AsyncExitStack
  9. from typing import NamedTuple
  10. from google.protobuf.message import EncodeError
  11. from multiaddr import Multiaddr, protocols
  12. import pytest
  13. from hivemind import find_open_port
  14. from hivemind.p2p.p2p_daemon_bindings.control import parse_conn_protocol, DaemonConnector, ControlClient
  15. from hivemind.p2p.p2p_daemon_bindings.p2pclient import Client
  16. from hivemind.p2p.p2p_daemon_bindings.utils import ControlFailure, raise_if_failed, write_unsigned_varint, \
  17. read_unsigned_varint, read_pbmsg_safe, write_pbmsg
  18. from hivemind.proto import p2pd_pb2 as p2pd_pb
  19. from hivemind.p2p.p2p_daemon_bindings.datastructures import ID, StreamInfo, PeerInfo
  20. def test_raise_if_failed_raises():
  21. resp = p2pd_pb.Response()
  22. resp.type = p2pd_pb.Response.ERROR
  23. with pytest.raises(ControlFailure):
  24. raise_if_failed(resp)
  25. def test_raise_if_failed_not_raises():
  26. resp = p2pd_pb.Response()
  27. resp.type = p2pd_pb.Response.OK
  28. raise_if_failed(resp)
  29. pairs_int_varint_valid = (
  30. (0, b"\x00"),
  31. (1, b"\x01"),
  32. (128, b"\x80\x01"),
  33. (2 ** 32, b"\x80\x80\x80\x80\x10"),
  34. (2 ** 64 - 1, b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01"),
  35. )
  36. pairs_int_varint_overflow = (
  37. (2 ** 64, b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x02"),
  38. (2 ** 64 + 1, b"\x81\x80\x80\x80\x80\x80\x80\x80\x80\x02"),
  39. (
  40. 2 ** 128,
  41. b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x04",
  42. ),
  43. )
  44. class MockReader(io.BytesIO):
  45. async def readexactly(self, n):
  46. await asyncio.sleep(0)
  47. return self.read(n)
  48. class MockWriter(io.BytesIO):
  49. pass
  50. class MockReaderWriter(MockReader, MockWriter):
  51. pass
  52. @pytest.mark.parametrize("integer, var_integer", pairs_int_varint_valid)
  53. @pytest.mark.asyncio
  54. async def test_write_unsigned_varint(integer, var_integer):
  55. s = MockWriter()
  56. await write_unsigned_varint(s, integer)
  57. assert s.getvalue() == var_integer
  58. @pytest.mark.parametrize("integer", tuple(i[0] for i in pairs_int_varint_overflow))
  59. @pytest.mark.asyncio
  60. async def test_write_unsigned_varint_overflow(integer):
  61. s = MockWriter()
  62. with pytest.raises(ValueError):
  63. await write_unsigned_varint(s, integer)
  64. @pytest.mark.parametrize("integer", (-1, -(2 ** 32), -(2 ** 64), -(2 ** 128)))
  65. @pytest.mark.asyncio
  66. async def test_write_unsigned_varint_negative(integer):
  67. s = MockWriter()
  68. with pytest.raises(ValueError):
  69. await write_unsigned_varint(s, integer)
  70. @pytest.mark.parametrize("integer, var_integer", pairs_int_varint_valid)
  71. @pytest.mark.asyncio
  72. async def test_read_unsigned_varint(integer, var_integer):
  73. s = MockReader(var_integer)
  74. result = await read_unsigned_varint(s)
  75. assert result == integer
  76. @pytest.mark.parametrize("var_integer", tuple(i[1] for i in pairs_int_varint_overflow))
  77. @pytest.mark.asyncio
  78. async def test_read_unsigned_varint_overflow(var_integer):
  79. s = MockReader(var_integer)
  80. with pytest.raises(ValueError):
  81. await read_unsigned_varint(s)
  82. @pytest.mark.parametrize("max_bits", (2, 31, 32, 63, 64, 127, 128))
  83. @pytest.mark.asyncio
  84. async def test_read_write_unsigned_varint_max_bits_edge(max_bits):
  85. """
  86. Test the edge with different `max_bits`
  87. """
  88. for i in range(-3, 0):
  89. integer = i + (2 ** max_bits)
  90. s = MockReaderWriter()
  91. await write_unsigned_varint(s, integer, max_bits=max_bits)
  92. s.seek(0, 0)
  93. result = await read_unsigned_varint(s, max_bits=max_bits)
  94. assert integer == result
  95. @pytest.fixture(scope="module")
  96. def peer_id_string():
  97. return "QmS5QmciTXXnCUCyxud5eWFenUMAmvAWSDa1c7dvdXRMZ7"
  98. @pytest.fixture(scope="module")
  99. def peer_id_bytes():
  100. return b'\x12 7\x87F.[\xb5\xb1o\xe5*\xc7\xb9\xbb\x11:"Z|j2\x8ad\x1b\xa6\xe5<Ip\xfe\xb4\xf5v'
  101. @pytest.fixture(scope="module")
  102. def peer_id(peer_id_bytes):
  103. return ID(peer_id_bytes)
  104. @pytest.fixture(scope="module")
  105. def maddr():
  106. return Multiaddr("/unix/123")
  107. def test_peer_id(peer_id_string, peer_id_bytes, peer_id):
  108. # test initialized with bytes
  109. assert peer_id.to_bytes() == peer_id_bytes
  110. assert peer_id.to_string() == peer_id_string
  111. # test initialized with string
  112. peer_id_2 = ID.from_base58(peer_id_string)
  113. assert peer_id_2.to_bytes() == peer_id_bytes
  114. assert peer_id_2.to_string() == peer_id_string
  115. # test equal
  116. assert peer_id == peer_id_2
  117. # test not equal
  118. peer_id_3 = ID.from_base58("QmbmfNDEth7Ucvjuxiw3SP3E4PoJzbk7g4Ge6ZDigbCsNp")
  119. assert peer_id != peer_id_3
  120. def test_stream_info(peer_id, maddr):
  121. proto = "123"
  122. # test case: `StreamInfo.__init__`
  123. si = StreamInfo(peer_id, maddr, proto)
  124. assert si.peer_id == peer_id
  125. assert si.addr == maddr
  126. assert si.proto == proto
  127. # test case: `StreamInfo.to_pb`
  128. pb_si = si.to_pb()
  129. assert pb_si.peer == peer_id.to_bytes()
  130. assert pb_si.addr == maddr.to_bytes()
  131. assert pb_si.proto == si.proto
  132. # test case: `StreamInfo.from_pb`
  133. si_1 = StreamInfo.from_pb(pb_si)
  134. assert si_1.peer_id == peer_id
  135. assert si_1.addr == maddr
  136. assert si_1.proto == proto
  137. def test_peer_info(peer_id, maddr):
  138. pi = PeerInfo(peer_id, [maddr])
  139. # test case: `PeerInfo.__init__`
  140. assert pi.peer_id == peer_id
  141. assert pi.addrs == [maddr]
  142. # test case: `PeerInfo.from_pb`
  143. pi_pb = p2pd_pb.PeerInfo(id=peer_id.to_bytes(), addrs=[maddr.to_bytes()])
  144. pi_1 = PeerInfo.from_pb(pi_pb)
  145. assert pi.peer_id == pi_1.peer_id
  146. assert pi.addrs == pi_1.addrs
  147. @pytest.mark.parametrize(
  148. "maddr_str, expected_proto",
  149. (("/unix/123", protocols.P_UNIX), ("/ip4/127.0.0.1/tcp/7777", protocols.P_IP4)),
  150. )
  151. def test_parse_conn_protocol_valid(maddr_str, expected_proto):
  152. assert parse_conn_protocol(Multiaddr(maddr_str)) == expected_proto
  153. @pytest.mark.parametrize(
  154. "maddr_str",
  155. (
  156. "/p2p/QmbHVEEepCi7rn7VL7Exxpd2Ci9NNB6ifvqwhsrbRMgQFP",
  157. "/onion/timaq4ygg2iegci7:1234",
  158. ),
  159. )
  160. def test_parse_conn_protocol_invalid(maddr_str):
  161. maddr = Multiaddr(maddr_str)
  162. with pytest.raises(ValueError):
  163. parse_conn_protocol(maddr)
  164. @pytest.mark.parametrize("control_maddr_str", ("/unix/123", "/ip4/127.0.0.1/tcp/6666"))
  165. def test_client_ctor_control_maddr(control_maddr_str):
  166. c = DaemonConnector(Multiaddr(control_maddr_str))
  167. assert c.control_maddr == Multiaddr(control_maddr_str)
  168. def test_client_ctor_default_control_maddr():
  169. c = DaemonConnector()
  170. assert c.control_maddr == Multiaddr(DaemonConnector.DEFAULT_CONTROL_MADDR)
  171. @pytest.mark.parametrize("listen_maddr_str", ("/unix/123", "/ip4/127.0.0.1/tcp/6666"))
  172. def test_control_client_ctor_listen_maddr(listen_maddr_str):
  173. c = ControlClient(
  174. daemon_connector=DaemonConnector(), listen_maddr=Multiaddr(listen_maddr_str)
  175. )
  176. assert c.listen_maddr == Multiaddr(listen_maddr_str)
  177. def test_control_client_ctor_default_listen_maddr():
  178. c = ControlClient(daemon_connector=DaemonConnector())
  179. assert c.listen_maddr == Multiaddr(ControlClient.DEFAULT_LISTEN_MADDR)
  180. @pytest.mark.parametrize(
  181. "msg_bytes",
  182. (
  183. b'\x08\x00"R\n"\x12 F\xec\xd3p0X\xbeT\x95p^\xc8{\xc8\x13\xa3\x9c\x84d\x0b\x1b\xbb\xa0P\x98w\xc1\xb3\x981i\x16\x12\x02\xa2\x02\x12\x08\x04\x7f\x00\x00\x01\x06\xc7\xb6\x12\x08\x04\xc0\xa8\n\x87\x06\xc7\xb6\x12\x14)\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x06\xc7\xb7', # noqa: E501
  184. b'\x08\x00"R\n"\x12 \xd0\xf0 \x9a\xc6v\xa6\xd3;\xcac|\x95\x94\xa0\xe6:\nM\xc53T\x0e\xf0\x89\x8e(\x0c\xb9\xf7\\\xa5\x12\x02\xa2\x02\x12\x08\x04\x7f\x00\x00\x01\x06\xc9%\x12\x08\x04\xc0\xa8\n\x87\x06\xc9%\x12\x14)\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x06\xc9&', # noqa: E501
  185. b'\x08\x00"R\n"\x12 \xc3\xc3\xee\x18i\x8a\xde\x13\xa9y\x905\xeb\xcb\xa4\xd07\x14\xbe\xf4\xf8\x1b\xe8[g94\x94\xe3f\x18\xa9\x12\x02\xa2\x02\x12\x08\x04\x7f\x00\x00\x01\x06\xc9`\x12\x08\x04\xc0\xa8\n\x87\x06\xc9`\x12\x14)\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x06\xc9a', # noqa: E501
  186. ),
  187. # give test cases ids to prevent bytes from ruining the terminal
  188. ids=("pb example Response 0", "pb example Response 1", "pb example Response 2"),
  189. )
  190. @pytest.mark.asyncio
  191. async def test_read_pbmsg_safe_valid(msg_bytes):
  192. s = MockReaderWriter()
  193. await write_unsigned_varint(s, len(msg_bytes))
  194. s.write(msg_bytes)
  195. # reset the offset back to the beginning
  196. s.seek(0, 0)
  197. pb_msg = p2pd_pb.Response()
  198. await read_pbmsg_safe(s, pb_msg)
  199. assert pb_msg.SerializeToString() == msg_bytes
  200. @pytest.mark.parametrize(
  201. "pb_msg, msg_bytes",
  202. (
  203. (
  204. p2pd_pb.Response(),
  205. b'Z\x08\x00*V\x08\x01\x12R\n"\x12 \x03\x8d\xf5\xd4(/#\xd6\xed\xa5\x1bU\xb8s\x8c\xfa\xad\xfc{\x04\xe3\xecw\xdeK\xc9,\xfe\x9c\x00:\xc8\x12\x02\xa2\x02\x12\x08\x04\x7f\x00\x00\x01\x06\xdea\x12\x08\x04\xc0\xa8\n\x87\x06\xdea\x12\x14)\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x06\xdeb', # noqa: E501
  206. ),
  207. (p2pd_pb.Request(), b"\x02\x08\x05"),
  208. (
  209. p2pd_pb.DHTRequest(),
  210. b'&\x08\x00\x12"\x12 \xd5\x0b\x18/\x9e\xa5G\x06.\xdd\xebW\xf0N\xf5\x0eW\xd3\xec\xdf\x06\x02\xe2\x89\x1e\xf0\xbb.\xc0\xbdE\xb8', # noqa: E501
  211. ),
  212. (
  213. p2pd_pb.DHTResponse(),
  214. b'V\x08\x01\x12R\n"\x12 wy\xe2\xfa\x11\x9e\xe2\x84X]\x84\xf8\x98\xba\x8c\x8cQ\xd7,\xb59\x1e!G\x92\x86G{\x141\xe9\x1b\x12\x02\xa2\x02\x12\x08\x04\x7f\x00\x00\x01\x06\xdeA\x12\x08\x04\xc0\xa8\n\x87\x06\xdeA\x12\x14)\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x06\xdeB', # noqa: E501
  215. ),
  216. (
  217. p2pd_pb.StreamInfo(),
  218. b';\n"\x12 \xf6\x9e=\x9f\xc1J\xfe\x02\x93k!S\x80\xa0\xcc(s\xea&\xbe\xed\x9274qTI\xc1\xf7\xb6\xbd7\x12\x08\x04\x7f\x00\x00\x01\x06\xde\xc5\x1a\x0bprotocol123', # noqa: E501
  219. ),
  220. ),
  221. ids=(
  222. "pb example Response",
  223. "pb example Request",
  224. "pb example DHTRequest",
  225. "pb example DHTResponse",
  226. "pb example StreamInfo",
  227. ),
  228. )
  229. @pytest.mark.asyncio
  230. async def test_write_pbmsg(pb_msg, msg_bytes):
  231. s_read = MockReaderWriter(msg_bytes)
  232. await read_pbmsg_safe(s_read, pb_msg)
  233. s_write = MockReaderWriter()
  234. await write_pbmsg(s_write, pb_msg)
  235. assert msg_bytes == s_write.getvalue()
  236. @pytest.mark.parametrize(
  237. "pb_msg",
  238. (
  239. p2pd_pb.Response(),
  240. p2pd_pb.Request(),
  241. p2pd_pb.DHTRequest(),
  242. p2pd_pb.DHTResponse(),
  243. p2pd_pb.StreamInfo(),
  244. ),
  245. )
  246. @pytest.mark.asyncio
  247. async def test_write_pbmsg_missing_fields(pb_msg):
  248. with pytest.raises(EncodeError):
  249. await write_pbmsg(MockReaderWriter(), pb_msg)
  250. TIMEOUT_DURATION = 30 # seconds
  251. @pytest.fixture
  252. def num_p2pds():
  253. return 4
  254. @pytest.fixture(scope="module")
  255. def peer_id_random():
  256. return ID.from_base58("QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNK1")
  257. @pytest.fixture
  258. def enable_control():
  259. return True
  260. @pytest.fixture
  261. def enable_connmgr():
  262. return False
  263. @pytest.fixture
  264. def enable_dht():
  265. return False
  266. @pytest.fixture
  267. def enable_pubsub():
  268. return False
  269. @pytest.fixture
  270. def func_make_p2pd_pair():
  271. return make_p2pd_pair_ip4
  272. async def try_until_success(coro_func, timeout=TIMEOUT_DURATION):
  273. """
  274. Keep running ``coro_func`` until the time is out.
  275. All arguments of ``coro_func`` should be filled, i.e. it should be called without arguments.
  276. """
  277. t_start = time.monotonic()
  278. while True:
  279. result = await coro_func()
  280. if result:
  281. break
  282. if (time.monotonic() - t_start) >= timeout:
  283. # timeout
  284. assert False, f"{coro_func} still failed after `{timeout}` seconds"
  285. await asyncio.sleep(0.01)
  286. class Daemon:
  287. control_maddr = None
  288. proc_daemon = None
  289. log_filename = ""
  290. f_log = None
  291. closed = None
  292. def __init__(
  293. self, control_maddr, enable_control, enable_connmgr, enable_dht, enable_pubsub
  294. ):
  295. self.control_maddr = control_maddr
  296. self.enable_control = enable_control
  297. self.enable_connmgr = enable_connmgr
  298. self.enable_dht = enable_dht
  299. self.enable_pubsub = enable_pubsub
  300. self.is_closed = False
  301. self._start_logging()
  302. self._run()
  303. def _start_logging(self):
  304. name_control_maddr = str(self.control_maddr).replace("/", "_").replace(".", "_")
  305. self.log_filename = f"/tmp/log_p2pd{name_control_maddr}.txt"
  306. self.f_log = open(self.log_filename, "wb")
  307. def _run(self):
  308. cmd_list = ["hivemind/hivemind_cli/p2pd", f"-listen={str(self.control_maddr)}"]
  309. cmd_list += [f"-hostAddrs=/ip4/127.0.0.1/tcp/{find_open_port()}"]
  310. if self.enable_connmgr:
  311. cmd_list += ["-connManager=true", "-connLo=1", "-connHi=2", "-connGrace=0"]
  312. if self.enable_dht:
  313. cmd_list += ["-dht=true"]
  314. if self.enable_pubsub:
  315. cmd_list += ["-pubsub=true", "-pubsubRouter=gossipsub"]
  316. self.proc_daemon = subprocess.Popen(
  317. cmd_list, stdout=self.f_log, stderr=self.f_log, bufsize=0
  318. )
  319. async def wait_until_ready(self):
  320. lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:")
  321. lines_head_occurred = {line: False for line in lines_head_pattern}
  322. with open(self.log_filename, "rb") as f_log_read:
  323. async def read_from_daemon_and_check():
  324. line = f_log_read.readline()
  325. for head_pattern in lines_head_occurred:
  326. if line.startswith(head_pattern):
  327. lines_head_occurred[head_pattern] = True
  328. return all([value for _, value in lines_head_occurred.items()])
  329. await try_until_success(read_from_daemon_and_check)
  330. # sleep for a while in case that the daemon haven't been ready after emitting these lines
  331. await asyncio.sleep(0.1)
  332. def close(self):
  333. if self.is_closed:
  334. return
  335. self.proc_daemon.terminate()
  336. self.proc_daemon.wait()
  337. self.f_log.close()
  338. self.is_closed = True
  339. class DaemonTuple(NamedTuple):
  340. daemon: Daemon
  341. client: Client
  342. class ConnectionFailure(Exception):
  343. pass
  344. @asynccontextmanager
  345. async def make_p2pd_pair_unix(
  346. enable_control, enable_connmgr, enable_dht, enable_pubsub
  347. ):
  348. name = str(uuid.uuid4())[:8]
  349. control_maddr = Multiaddr(f"/unix/tmp/test_p2pd_control_{name}.sock")
  350. listen_maddr = Multiaddr(f"/unix/tmp/test_p2pd_listen_{name}.sock")
  351. # Remove the existing unix socket files if they are existing
  352. try:
  353. os.unlink(control_maddr.value_for_protocol(protocols.P_UNIX))
  354. except FileNotFoundError:
  355. pass
  356. try:
  357. os.unlink(listen_maddr.value_for_protocol(protocols.P_UNIX))
  358. except FileNotFoundError:
  359. pass
  360. async with _make_p2pd_pair(
  361. control_maddr=control_maddr,
  362. listen_maddr=listen_maddr,
  363. enable_control=enable_control,
  364. enable_connmgr=enable_connmgr,
  365. enable_dht=enable_dht,
  366. enable_pubsub=enable_pubsub,
  367. ) as pair:
  368. yield pair
  369. @asynccontextmanager
  370. async def make_p2pd_pair_ip4(enable_control, enable_connmgr, enable_dht, enable_pubsub):
  371. control_maddr = Multiaddr(f"/ip4/127.0.0.1/tcp/{find_open_port()}")
  372. listen_maddr = Multiaddr(f"/ip4/127.0.0.1/tcp/{find_open_port()}")
  373. async with _make_p2pd_pair(
  374. control_maddr=control_maddr,
  375. listen_maddr=listen_maddr,
  376. enable_control=enable_control,
  377. enable_connmgr=enable_connmgr,
  378. enable_dht=enable_dht,
  379. enable_pubsub=enable_pubsub,
  380. ) as pair:
  381. yield pair
  382. @asynccontextmanager
  383. async def _make_p2pd_pair(
  384. control_maddr,
  385. listen_maddr,
  386. enable_control,
  387. enable_connmgr,
  388. enable_dht,
  389. enable_pubsub,
  390. ):
  391. p2pd = Daemon(
  392. control_maddr=control_maddr,
  393. enable_control=enable_control,
  394. enable_connmgr=enable_connmgr,
  395. enable_dht=enable_dht,
  396. enable_pubsub=enable_pubsub,
  397. )
  398. # wait for daemon ready
  399. await p2pd.wait_until_ready()
  400. client = Client(control_maddr=control_maddr, listen_maddr=listen_maddr)
  401. try:
  402. async with client.listen():
  403. yield DaemonTuple(daemon=p2pd, client=client)
  404. finally:
  405. if not p2pd.is_closed:
  406. p2pd.close()
  407. @pytest.fixture
  408. async def p2pcs(
  409. num_p2pds,
  410. enable_control,
  411. enable_connmgr,
  412. enable_dht,
  413. enable_pubsub,
  414. func_make_p2pd_pair,
  415. ):
  416. # TODO: Change back to gather style
  417. async with AsyncExitStack() as stack:
  418. p2pd_tuples = [
  419. await stack.enter_async_context(
  420. func_make_p2pd_pair(
  421. enable_control=enable_control,
  422. enable_connmgr=enable_connmgr,
  423. enable_dht=enable_dht,
  424. enable_pubsub=enable_pubsub,
  425. )
  426. )
  427. for _ in range(num_p2pds)
  428. ]
  429. yield tuple(p2pd_tuple.client for p2pd_tuple in p2pd_tuples)
  430. @pytest.mark.parametrize(
  431. "enable_control, func_make_p2pd_pair", ((True, make_p2pd_pair_unix),)
  432. )
  433. @pytest.mark.asyncio
  434. async def test_client_identify_unix_socket(p2pcs):
  435. await p2pcs[0].identify()
  436. @pytest.mark.parametrize("enable_control", (True,))
  437. @pytest.mark.asyncio
  438. async def test_client_identify(p2pcs):
  439. await p2pcs[0].identify()
  440. @pytest.mark.parametrize("enable_control", (True,))
  441. @pytest.mark.asyncio
  442. async def test_client_connect_success(p2pcs):
  443. peer_id_0, maddrs_0 = await p2pcs[0].identify()
  444. peer_id_1, maddrs_1 = await p2pcs[1].identify()
  445. await p2pcs[0].connect(peer_id_1, maddrs_1)
  446. # test case: repeated connections
  447. await p2pcs[1].connect(peer_id_0, maddrs_0)
  448. @pytest.mark.parametrize("enable_control", (True,))
  449. @pytest.mark.asyncio
  450. async def test_client_connect_failure(peer_id_random, p2pcs):
  451. peer_id_1, maddrs_1 = await p2pcs[1].identify()
  452. await p2pcs[0].identify()
  453. # test case: `peer_id` mismatches
  454. with pytest.raises(ControlFailure):
  455. await p2pcs[0].connect(peer_id_random, maddrs_1)
  456. # test case: empty maddrs
  457. with pytest.raises(ControlFailure):
  458. await p2pcs[0].connect(peer_id_1, [])
  459. # test case: wrong maddrs
  460. with pytest.raises(ControlFailure):
  461. await p2pcs[0].connect(peer_id_1, [Multiaddr("/ip4/127.0.0.1/udp/0")])
  462. async def _check_connection(p2pd_tuple_0, p2pd_tuple_1):
  463. peer_id_0, _ = await p2pd_tuple_0.identify()
  464. peer_id_1, _ = await p2pd_tuple_1.identify()
  465. peers_0 = [pinfo.peer_id for pinfo in await p2pd_tuple_0.list_peers()]
  466. peers_1 = [pinfo.peer_id for pinfo in await p2pd_tuple_1.list_peers()]
  467. return (peer_id_0 in peers_1) and (peer_id_1 in peers_0)
  468. async def connect_safe(p2pd_tuple_0, p2pd_tuple_1):
  469. peer_id_1, maddrs_1 = await p2pd_tuple_1.identify()
  470. await p2pd_tuple_0.connect(peer_id_1, maddrs_1)
  471. await try_until_success(
  472. functools.partial(
  473. _check_connection, p2pd_tuple_0=p2pd_tuple_0, p2pd_tuple_1=p2pd_tuple_1
  474. )
  475. )
  476. @pytest.mark.parametrize("enable_control", (True,))
  477. @pytest.mark.asyncio
  478. async def test_connect_safe(p2pcs):
  479. await connect_safe(p2pcs[0], p2pcs[1])
  480. @pytest.mark.parametrize("enable_control", (True,))
  481. @pytest.mark.asyncio
  482. async def test_client_list_peers(p2pcs):
  483. # test case: no peers
  484. assert len(await p2pcs[0].list_peers()) == 0
  485. # test case: 1 peer
  486. await connect_safe(p2pcs[0], p2pcs[1])
  487. assert len(await p2pcs[0].list_peers()) == 1
  488. assert len(await p2pcs[1].list_peers()) == 1
  489. # test case: one more peer
  490. await connect_safe(p2pcs[0], p2pcs[2])
  491. assert len(await p2pcs[0].list_peers()) == 2
  492. assert len(await p2pcs[1].list_peers()) == 1
  493. assert len(await p2pcs[2].list_peers()) == 1
  494. @pytest.mark.parametrize("enable_control", (True,))
  495. @pytest.mark.asyncio
  496. async def test_client_disconnect(peer_id_random, p2pcs):
  497. # test case: disconnect a peer without connections
  498. await p2pcs[1].disconnect(peer_id_random)
  499. # test case: disconnect
  500. peer_id_0, _ = await p2pcs[0].identify()
  501. await connect_safe(p2pcs[0], p2pcs[1])
  502. assert len(await p2pcs[0].list_peers()) == 1
  503. assert len(await p2pcs[1].list_peers()) == 1
  504. await p2pcs[1].disconnect(peer_id_0)
  505. assert len(await p2pcs[0].list_peers()) == 0
  506. assert len(await p2pcs[1].list_peers()) == 0
  507. # test case: disconnect twice
  508. await p2pcs[1].disconnect(peer_id_0)
  509. assert len(await p2pcs[0].list_peers()) == 0
  510. assert len(await p2pcs[1].list_peers()) == 0
  511. @pytest.mark.parametrize("enable_control", (True,))
  512. @pytest.mark.asyncio
  513. async def test_client_stream_open_success(p2pcs):
  514. peer_id_1, maddrs_1 = await p2pcs[1].identify()
  515. await connect_safe(p2pcs[0], p2pcs[1])
  516. proto = "123"
  517. async def handle_proto(stream_info, reader, writer):
  518. await reader.readexactly(1)
  519. await p2pcs[1].stream_handler(proto, handle_proto)
  520. # test case: normal
  521. stream_info, reader, writer = await p2pcs[0].stream_open(peer_id_1, (proto,))
  522. assert stream_info.peer_id == peer_id_1
  523. assert stream_info.addr in maddrs_1
  524. assert stream_info.proto == "123"
  525. writer.close()
  526. # test case: open with multiple protocols
  527. stream_info, reader, writer = await p2pcs[0].stream_open(
  528. peer_id_1, (proto, "another_protocol")
  529. )
  530. assert stream_info.peer_id == peer_id_1
  531. assert stream_info.addr in maddrs_1
  532. assert stream_info.proto == "123"
  533. writer.close()
  534. @pytest.mark.parametrize("enable_control", (True,))
  535. @pytest.mark.asyncio
  536. async def test_client_stream_open_failure(p2pcs):
  537. peer_id_1, _ = await p2pcs[1].identify()
  538. await connect_safe(p2pcs[0], p2pcs[1])
  539. proto = "123"
  540. # test case: `stream_open` to a peer who didn't register the protocol
  541. with pytest.raises(ControlFailure):
  542. await p2pcs[0].stream_open(peer_id_1, (proto,))
  543. # test case: `stream_open` to a peer for a non-registered protocol
  544. async def handle_proto(stream_info, reader, writer):
  545. pass
  546. await p2pcs[1].stream_handler(proto, handle_proto)
  547. with pytest.raises(ControlFailure):
  548. await p2pcs[0].stream_open(peer_id_1, ("another_protocol",))
  549. @pytest.mark.parametrize("enable_control", (True,))
  550. @pytest.mark.asyncio
  551. async def test_client_stream_handler_success(p2pcs):
  552. peer_id_1, _ = await p2pcs[1].identify()
  553. await connect_safe(p2pcs[0], p2pcs[1])
  554. proto = "protocol123"
  555. bytes_to_send = b"yoyoyoyoyog"
  556. # event for this test function to wait until the handler function receiving the incoming data
  557. event_handler_finished = asyncio.Event()
  558. async def handle_proto(stream_info, reader, writer):
  559. nonlocal event_handler_finished
  560. bytes_received = await reader.readexactly(len(bytes_to_send))
  561. assert bytes_received == bytes_to_send
  562. event_handler_finished.set()
  563. await p2pcs[1].stream_handler(proto, handle_proto)
  564. assert proto in p2pcs[1].control.handlers
  565. assert handle_proto == p2pcs[1].control.handlers[proto]
  566. # test case: test the stream handler `handle_proto`
  567. _, reader, writer = await p2pcs[0].stream_open(peer_id_1, (proto,))
  568. # wait until the handler function starts blocking waiting for the data
  569. # because we haven't sent the data, we know the handler function must still blocking waiting.
  570. # get the task of the protocol handler
  571. writer.write(bytes_to_send)
  572. # wait for the handler to finish
  573. writer.close()
  574. await event_handler_finished.wait()
  575. # test case: two streams to different handlers respectively
  576. another_proto = "another_protocol123"
  577. another_bytes_to_send = b"456"
  578. event_another_proto = asyncio.Event()
  579. async def handle_another_proto(stream_info, reader, writer):
  580. event_another_proto.set()
  581. bytes_received = await reader.readexactly(len(another_bytes_to_send))
  582. assert bytes_received == another_bytes_to_send
  583. await p2pcs[1].stream_handler(another_proto, handle_another_proto)
  584. assert another_proto in p2pcs[1].control.handlers
  585. assert handle_another_proto == p2pcs[1].control.handlers[another_proto]
  586. _, reader, writer = await p2pcs[0].stream_open(peer_id_1, (another_proto,))
  587. await event_another_proto.wait()
  588. # we know at this moment the handler must still blocking wait
  589. writer.write(another_bytes_to_send)
  590. writer.close()
  591. # test case: registering twice can override the previous registration
  592. event_third = asyncio.Event()
  593. async def handler_third(stream_info, reader, writer):
  594. event_third.set()
  595. await p2pcs[1].stream_handler(another_proto, handler_third)
  596. assert another_proto in p2pcs[1].control.handlers
  597. # ensure the handler is override
  598. assert handler_third == p2pcs[1].control.handlers[another_proto]
  599. await p2pcs[0].stream_open(peer_id_1, (another_proto,))
  600. # ensure the overriding handler is called when the protocol is opened a stream
  601. await event_third.wait()
  602. @pytest.mark.parametrize("enable_control", (True,))
  603. @pytest.mark.asyncio
  604. async def test_client_stream_handler_failure(p2pcs):
  605. peer_id_1, _ = await p2pcs[1].identify()
  606. await connect_safe(p2pcs[0], p2pcs[1])
  607. proto = "123"
  608. # test case: registered a wrong protocol name
  609. async def handle_proto_correct_params(stream_info, stream):
  610. pass
  611. await p2pcs[1].stream_handler("another_protocol", handle_proto_correct_params)
  612. with pytest.raises(ControlFailure):
  613. await p2pcs[0].stream_open(peer_id_1, (proto,))