p2pclient.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. """
  2. Originally taken from: https://github.com/mhchia/py-libp2p-daemon-bindings
  3. Licence: MIT
  4. Author: Kevin Mai-Husan Chia
  5. """
  6. import asyncio
  7. from contextlib import asynccontextmanager
  8. from typing import AsyncIterator, Iterable, Sequence, Tuple
  9. from multiaddr import Multiaddr
  10. from hivemind.p2p.p2p_daemon_bindings.control import (
  11. DEFAULT_MAX_MSG_SIZE,
  12. ControlClient,
  13. DaemonConnector,
  14. StreamHandler,
  15. TUnaryHandler,
  16. )
  17. from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
  18. class Client:
  19. control: ControlClient
  20. def __init__(self, *, _initialized_with_create=False) -> None:
  21. assert _initialized_with_create, "Please use Client.create coroutine to spawn new client instances"
  22. self.control = None
  23. @classmethod
  24. async def create(
  25. cls,
  26. control_maddr: Multiaddr = None,
  27. listen_maddr: Multiaddr = None,
  28. *,
  29. persistent_conn_max_msg_size: int = DEFAULT_MAX_MSG_SIZE,
  30. ) -> "Client":
  31. client = cls(_initialized_with_create=True)
  32. daemon_connector = DaemonConnector(control_maddr=control_maddr)
  33. client.control = await ControlClient.create(
  34. daemon_connector=daemon_connector,
  35. listen_maddr=listen_maddr,
  36. persistent_conn_max_msg_size=persistent_conn_max_msg_size,
  37. )
  38. return client
  39. def close(self) -> None:
  40. self.control.close()
  41. def __del__(self):
  42. self.close()
  43. @asynccontextmanager
  44. async def listen(self) -> AsyncIterator["Client"]:
  45. """
  46. Starts to listen incoming connections for handlers registered via stream_handler.
  47. :return:
  48. """
  49. async with self.control.listen():
  50. yield self
  51. async def add_unary_handler(self, proto: str, handler: TUnaryHandler, balanced: bool = False):
  52. await self.control.add_unary_handler(proto, handler, balanced=balanced)
  53. async def call_unary_handler(self, peer_id: PeerID, proto: str, data: bytes) -> bytes:
  54. return await self.control.call_unary_handler(peer_id, proto, data)
  55. async def identify(self) -> Tuple[PeerID, Tuple[Multiaddr, ...]]:
  56. """
  57. Get current node peer id and list of addresses
  58. """
  59. return await self.control.identify()
  60. async def connect(self, peer_id: PeerID, maddrs: Iterable[Multiaddr]) -> None:
  61. """
  62. Connect to p2p node with specified addresses and peer id.
  63. :peer_id: node peer id you want connect to
  64. :maddrs: node multiaddresses you want connect to. Of course, it must be reachable.
  65. """
  66. await self.control.connect(peer_id=peer_id, maddrs=maddrs)
  67. async def list_peers(self) -> Tuple[PeerInfo, ...]:
  68. """
  69. Get list of peers that node connect to
  70. """
  71. return await self.control.list_peers()
  72. async def disconnect(self, peer_id: PeerID) -> None:
  73. """
  74. Disconnect from node with specified peer id
  75. :peer_id: node peer id you want disconnect from
  76. """
  77. await self.control.disconnect(peer_id=peer_id)
  78. async def stream_open(
  79. self, peer_id: PeerID, protocols: Sequence[str]
  80. ) -> Tuple[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter]:
  81. """
  82. Open a stream to call other peer (with peer_id) handler for specified protocols
  83. :peer_id: other peer id
  84. :protocols: list of protocols for other peer handling
  85. :return: Returns tuple of stream info (info about connection to second peer) and reader/writer
  86. """
  87. return await self.control.stream_open(peer_id=peer_id, protocols=protocols)
  88. async def stream_handler(self, proto: str, handler_cb: StreamHandler, balanced: bool = False) -> None:
  89. """
  90. Register a stream handler
  91. :param proto: protocols that handler serves
  92. :param handler_cb: handler callback
  93. :param balanced: flag if stream handler should be balanced on p2pd side. Default: False.
  94. :return:
  95. """
  96. await self.control.stream_handler(proto=proto, handler_cb=handler_cb, balanced=balanced)