p2pclient.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. """
  2. Originally taken from: https://github.com/mhchia/py-libp2p-daemon-bindings
  3. Licence: MIT
  4. Author: Kevin Mai-Husan Chia
  5. """
  6. import asyncio
  7. from contextlib import asynccontextmanager
  8. from typing import AsyncIterator, Iterable, Sequence, Tuple
  9. from multiaddr import Multiaddr
  10. from hivemind.p2p.p2p_daemon_bindings.control import (ControlClient,
  11. DaemonConnector,
  12. StreamHandler)
  13. from hivemind.p2p.p2p_daemon_bindings.datastructures import (PeerID, PeerInfo,
  14. StreamInfo)
  15. class Client:
  16. control: ControlClient
  17. def __init__(
  18. self, control_maddr: Multiaddr = None, listen_maddr: Multiaddr = None
  19. ) -> None:
  20. daemon_connector = DaemonConnector(control_maddr=control_maddr)
  21. self.control = ControlClient(
  22. daemon_connector=daemon_connector, listen_maddr=listen_maddr
  23. )
  24. @asynccontextmanager
  25. async def listen(self) -> AsyncIterator["Client"]:
  26. """
  27. Starts to listen incoming connections for handlers registered via stream_handler.
  28. :return:
  29. """
  30. async with self.control.listen():
  31. yield self
  32. async def identify(self) -> Tuple[PeerID, Tuple[Multiaddr, ...]]:
  33. """
  34. Get current node peer id and list of addresses
  35. """
  36. return await self.control.identify()
  37. async def connect(self, peer_id: PeerID, maddrs: Iterable[Multiaddr]) -> None:
  38. """
  39. Connect to p2p node with specified addresses and peer id.
  40. :peer_id: node peer id you want connect to
  41. :maddrs: node multiaddresses you want connect to. Of course, it must be reachable.
  42. """
  43. await self.control.connect(peer_id=peer_id, maddrs=maddrs)
  44. async def list_peers(self) -> Tuple[PeerInfo, ...]:
  45. """
  46. Get list of peers that node connect to
  47. """
  48. return await self.control.list_peers()
  49. async def disconnect(self, peer_id: PeerID) -> None:
  50. """
  51. Disconnect from node with specified peer id
  52. :peer_id: node peer id you want disconnect from
  53. """
  54. await self.control.disconnect(peer_id=peer_id)
  55. async def stream_open(
  56. self, peer_id: PeerID, protocols: Sequence[str]
  57. ) -> Tuple[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter]:
  58. """
  59. Open a stream to call other peer (with peer_id) handler for specified protocols
  60. :peer_id: other peer id
  61. :protocols: list of protocols for other peer handling
  62. :return: Returns tuple of stream info (info about connection to second peer) and reader/writer
  63. """
  64. return await self.control.stream_open(peer_id=peer_id, protocols=protocols)
  65. async def stream_handler(self, proto: str, handler_cb: StreamHandler) -> None:
  66. """
  67. Register a stream handler
  68. :param proto: protocols that handler serves
  69. :param handler_cb: handler callback
  70. :return:
  71. """
  72. await self.control.stream_handler(proto=proto, handler_cb=handler_cb)