p2pclient.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. """
  2. Originally taken from: https://github.com/mhchia/py-libp2p-daemon-bindings
  3. Licence: MIT
  4. Author: Kevin Mai-Husan Chia
  5. """
  6. import asyncio
  7. from contextlib import asynccontextmanager
  8. from typing import AsyncIterator, Iterable, Sequence, Tuple
  9. from multiaddr import Multiaddr
  10. from hivemind.p2p.p2p_daemon_bindings.control import ControlClient, DaemonConnector, StreamHandler, TUnaryHandler
  11. from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
  12. class Client:
  13. control: ControlClient
  14. def __init__(self, *, _initialized_with_create=False) -> None:
  15. assert _initialized_with_create, "Please use Client.create coroutine to spawn new client instances"
  16. self.control = None
  17. @classmethod
  18. async def create(cls, control_maddr: Multiaddr = None, listen_maddr: Multiaddr = None) -> "Client":
  19. client = cls(_initialized_with_create=True)
  20. daemon_connector = DaemonConnector(control_maddr=control_maddr)
  21. client.control = await ControlClient.create(daemon_connector=daemon_connector, listen_maddr=listen_maddr)
  22. return client
  23. def close(self) -> None:
  24. self.control.close()
  25. def __del__(self):
  26. self.close()
  27. @asynccontextmanager
  28. async def listen(self) -> AsyncIterator["Client"]:
  29. """
  30. Starts to listen incoming connections for handlers registered via stream_handler.
  31. :return:
  32. """
  33. async with self.control.listen():
  34. yield self
  35. async def add_unary_handler(self, proto: str, handler: TUnaryHandler):
  36. await self.control.add_unary_handler(proto, handler)
  37. async def call_unary_handler(self, peer_id: PeerID, proto: str, data: bytes) -> bytes:
  38. return await self.control.call_unary_handler(peer_id, proto, data)
  39. async def identify(self) -> Tuple[PeerID, Tuple[Multiaddr, ...]]:
  40. """
  41. Get current node peer id and list of addresses
  42. """
  43. return await self.control.identify()
  44. async def connect(self, peer_id: PeerID, maddrs: Iterable[Multiaddr]) -> None:
  45. """
  46. Connect to p2p node with specified addresses and peer id.
  47. :peer_id: node peer id you want connect to
  48. :maddrs: node multiaddresses you want connect to. Of course, it must be reachable.
  49. """
  50. await self.control.connect(peer_id=peer_id, maddrs=maddrs)
  51. async def list_peers(self) -> Tuple[PeerInfo, ...]:
  52. """
  53. Get list of peers that node connect to
  54. """
  55. return await self.control.list_peers()
  56. async def disconnect(self, peer_id: PeerID) -> None:
  57. """
  58. Disconnect from node with specified peer id
  59. :peer_id: node peer id you want disconnect from
  60. """
  61. await self.control.disconnect(peer_id=peer_id)
  62. async def stream_open(
  63. self, peer_id: PeerID, protocols: Sequence[str]
  64. ) -> Tuple[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter]:
  65. """
  66. Open a stream to call other peer (with peer_id) handler for specified protocols
  67. :peer_id: other peer id
  68. :protocols: list of protocols for other peer handling
  69. :return: Returns tuple of stream info (info about connection to second peer) and reader/writer
  70. """
  71. return await self.control.stream_open(peer_id=peer_id, protocols=protocols)
  72. async def stream_handler(self, proto: str, handler_cb: StreamHandler) -> None:
  73. """
  74. Register a stream handler
  75. :param proto: protocols that handler serves
  76. :param handler_cb: handler callback
  77. :return:
  78. """
  79. await self.control.stream_handler(proto=proto, handler_cb=handler_cb)