p2pclient.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. """
  2. Originally taken from: https://github.com/mhchia/py-libp2p-daemon-bindings
  3. Licence: MIT
  4. Author: Kevin Mai-Husan Chia
  5. """
  6. import asyncio
  7. from contextlib import asynccontextmanager
  8. from typing import AsyncIterator, Iterable, Sequence, Tuple
  9. from multiaddr import Multiaddr
  10. from hivemind.p2p.p2p_daemon_bindings.control import ControlClient, DaemonConnector, StreamHandler, TUnaryHandler
  11. from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
  12. class Client:
  13. control: ControlClient
  14. def __init__(self) -> None:
  15. self.control = None
  16. @classmethod
  17. async def create(cls, control_maddr: Multiaddr = None, listen_maddr: Multiaddr = None) -> "Client":
  18. client = cls()
  19. daemon_connector = DaemonConnector(control_maddr=control_maddr)
  20. client.control = await ControlClient.create(daemon_connector=daemon_connector, listen_maddr=listen_maddr)
  21. return client
  22. def close(self) -> None:
  23. self.control.close()
  24. def __del__(self):
  25. self.close()
  26. @asynccontextmanager
  27. async def listen(self) -> AsyncIterator["Client"]:
  28. """
  29. Starts to listen incoming connections for handlers registered via stream_handler.
  30. :return:
  31. """
  32. async with self.control.listen():
  33. yield self
  34. async def add_unary_handler(self, proto: str, handler: TUnaryHandler):
  35. await self.control.add_unary_handler(proto, handler)
  36. async def call_unary_handler(self, peer_id: PeerID, proto: str, data: bytes) -> bytes:
  37. return await self.control.call_unary_handler(peer_id, proto, data)
  38. async def identify(self) -> Tuple[PeerID, Tuple[Multiaddr, ...]]:
  39. """
  40. Get current node peer id and list of addresses
  41. """
  42. return await self.control.identify()
  43. async def connect(self, peer_id: PeerID, maddrs: Iterable[Multiaddr]) -> None:
  44. """
  45. Connect to p2p node with specified addresses and peer id.
  46. :peer_id: node peer id you want connect to
  47. :maddrs: node multiaddresses you want connect to. Of course, it must be reachable.
  48. """
  49. await self.control.connect(peer_id=peer_id, maddrs=maddrs)
  50. async def list_peers(self) -> Tuple[PeerInfo, ...]:
  51. """
  52. Get list of peers that node connect to
  53. """
  54. return await self.control.list_peers()
  55. async def disconnect(self, peer_id: PeerID) -> None:
  56. """
  57. Disconnect from node with specified peer id
  58. :peer_id: node peer id you want disconnect from
  59. """
  60. await self.control.disconnect(peer_id=peer_id)
  61. async def stream_open(
  62. self, peer_id: PeerID, protocols: Sequence[str]
  63. ) -> Tuple[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter]:
  64. """
  65. Open a stream to call other peer (with peer_id) handler for specified protocols
  66. :peer_id: other peer id
  67. :protocols: list of protocols for other peer handling
  68. :return: Returns tuple of stream info (info about connection to second peer) and reader/writer
  69. """
  70. return await self.control.stream_open(peer_id=peer_id, protocols=protocols)
  71. async def stream_handler(self, proto: str, handler_cb: StreamHandler) -> None:
  72. """
  73. Register a stream handler
  74. :param proto: protocols that handler serves
  75. :param handler_cb: handler callback
  76. :return:
  77. """
  78. await self.control.stream_handler(proto=proto, handler_cb=handler_cb)