control.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. """
  2. Originally taken from: https://github.com/mhchia/py-libp2p-daemon-bindings
  3. Licence: MIT
  4. Author: Kevin Mai-Husan Chia
  5. """
  6. import asyncio
  7. from contextlib import asynccontextmanager
  8. from typing import (AsyncIterator, Awaitable, Callable, Dict, Iterable,
  9. Sequence, Tuple)
  10. from multiaddr import Multiaddr, protocols
  11. from hivemind.p2p.p2p_daemon_bindings.datastructures import (PeerID, PeerInfo,
  12. StreamInfo)
  13. from hivemind.p2p.p2p_daemon_bindings.utils import (DispatchFailure,
  14. raise_if_failed,
  15. read_pbmsg_safe,
  16. write_pbmsg)
  17. from hivemind.proto import p2pd_pb2 as p2pd_pb
  18. from hivemind.utils.logging import get_logger
  19. StreamHandler = Callable[[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter], Awaitable[None]]
  20. SUPPORT_CONN_PROTOCOLS = (
  21. protocols.P_IP4,
  22. # protocols.P_IP6,
  23. protocols.P_UNIX,
  24. )
  25. SUPPORTED_PROTOS = (
  26. protocols.protocol_with_code(proto) for proto in SUPPORT_CONN_PROTOCOLS
  27. )
  28. logger = get_logger(__name__)
  29. def parse_conn_protocol(maddr: Multiaddr) -> int:
  30. proto_codes = set(proto.code for proto in maddr.protocols())
  31. proto_cand = proto_codes.intersection(SUPPORT_CONN_PROTOCOLS)
  32. if len(proto_cand) != 1:
  33. raise ValueError(
  34. f"connection protocol should be only one protocol out of {SUPPORTED_PROTOS}"
  35. f", maddr={maddr}"
  36. )
  37. return tuple(proto_cand)[0]
  38. class DaemonConnector:
  39. DEFAULT_CONTROL_MADDR = "/unix/tmp/p2pd.sock"
  40. def __init__(self, control_maddr: Multiaddr = Multiaddr(DEFAULT_CONTROL_MADDR)) -> None:
  41. self.control_maddr = control_maddr
  42. self.proto_code = parse_conn_protocol(self.control_maddr)
  43. async def open_connection(self) -> (asyncio.StreamReader, asyncio.StreamWriter):
  44. if self.proto_code == protocols.P_UNIX:
  45. control_path = self.control_maddr.value_for_protocol(protocols.P_UNIX)
  46. logger.debug(f"DaemonConnector {self} opens connection to {self.control_maddr}")
  47. return await asyncio.open_unix_connection(control_path)
  48. elif self.proto_code == protocols.P_IP4:
  49. host = self.control_maddr.value_for_protocol(protocols.P_IP4)
  50. port = int(self.control_maddr.value_for_protocol(protocols.P_TCP))
  51. return await asyncio.open_connection(host, port)
  52. else:
  53. raise ValueError(
  54. f"Protocol not supported: {protocols.protocol_with_code(self.proto_code)}"
  55. )
  56. class ControlClient:
  57. DEFAULT_LISTEN_MADDR = "/unix/tmp/p2pclient.sock"
  58. def __init__(
  59. self, daemon_connector: DaemonConnector, listen_maddr: Multiaddr = Multiaddr(DEFAULT_LISTEN_MADDR)
  60. ) -> None:
  61. self.listen_maddr = listen_maddr
  62. self.daemon_connector = daemon_connector
  63. self.handlers: Dict[str, StreamHandler] = {}
  64. async def _handler(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
  65. pb_stream_info = p2pd_pb.StreamInfo() # type: ignore
  66. await read_pbmsg_safe(reader, pb_stream_info)
  67. stream_info = StreamInfo.from_protobuf(pb_stream_info)
  68. logger.debug(f"New incoming stream: {stream_info}")
  69. try:
  70. handler = self.handlers[stream_info.proto]
  71. except KeyError as e:
  72. # should never enter here... daemon should reject the stream for us.
  73. writer.close()
  74. raise DispatchFailure(e)
  75. await handler(stream_info, reader, writer)
  76. @asynccontextmanager
  77. async def listen(self) -> AsyncIterator["ControlClient"]:
  78. proto_code = parse_conn_protocol(self.listen_maddr)
  79. if proto_code == protocols.P_UNIX:
  80. listen_path = self.listen_maddr.value_for_protocol(protocols.P_UNIX)
  81. server = await asyncio.start_unix_server(self._handler, path=listen_path)
  82. elif proto_code == protocols.P_IP4:
  83. host = self.listen_maddr.value_for_protocol(protocols.P_IP4)
  84. port = int(self.listen_maddr.value_for_protocol(protocols.P_TCP))
  85. server = await asyncio.start_server(self._handler, port=port, host=host)
  86. else:
  87. raise ValueError(
  88. f"Protocol not supported: {protocols.protocol_with_code(proto_code)}"
  89. )
  90. async with server:
  91. logger.info(f"DaemonConnector {self} starts listening to {self.listen_maddr}")
  92. yield self
  93. logger.info(f"DaemonConnector {self} closed")
  94. async def identify(self) -> Tuple[PeerID, Tuple[Multiaddr, ...]]:
  95. reader, writer = await self.daemon_connector.open_connection()
  96. req = p2pd_pb.Request(type=p2pd_pb.Request.IDENTIFY)
  97. await write_pbmsg(writer, req)
  98. resp = p2pd_pb.Response() # type: ignore
  99. await read_pbmsg_safe(reader, resp)
  100. writer.close()
  101. raise_if_failed(resp)
  102. peer_id_bytes = resp.identify.id
  103. maddrs_bytes = resp.identify.addrs
  104. maddrs = tuple(Multiaddr(maddr_bytes) for maddr_bytes in maddrs_bytes)
  105. peer_id = PeerID(peer_id_bytes)
  106. return peer_id, maddrs
  107. async def connect(self, peer_id: PeerID, maddrs: Iterable[Multiaddr]) -> None:
  108. reader, writer = await self.daemon_connector.open_connection()
  109. maddrs_bytes = [i.to_bytes() for i in maddrs]
  110. connect_req = p2pd_pb.ConnectRequest(
  111. peer=peer_id.to_bytes(), addrs=maddrs_bytes
  112. )
  113. req = p2pd_pb.Request(type=p2pd_pb.Request.CONNECT, connect=connect_req)
  114. await write_pbmsg(writer, req)
  115. resp = p2pd_pb.Response() # type: ignore
  116. await read_pbmsg_safe(reader, resp)
  117. writer.close()
  118. raise_if_failed(resp)
  119. async def list_peers(self) -> Tuple[PeerInfo, ...]:
  120. req = p2pd_pb.Request(type=p2pd_pb.Request.LIST_PEERS)
  121. reader, writer = await self.daemon_connector.open_connection()
  122. await write_pbmsg(writer, req)
  123. resp = p2pd_pb.Response() # type: ignore
  124. await read_pbmsg_safe(reader, resp)
  125. writer.close()
  126. raise_if_failed(resp)
  127. peers = tuple(PeerInfo.from_protobuf(pinfo) for pinfo in resp.peers)
  128. return peers
  129. async def disconnect(self, peer_id: PeerID) -> None:
  130. disconnect_req = p2pd_pb.DisconnectRequest(peer=peer_id.to_bytes())
  131. req = p2pd_pb.Request(
  132. type=p2pd_pb.Request.DISCONNECT, disconnect=disconnect_req
  133. )
  134. reader, writer = await self.daemon_connector.open_connection()
  135. await write_pbmsg(writer, req)
  136. resp = p2pd_pb.Response() # type: ignore
  137. await read_pbmsg_safe(reader, resp)
  138. writer.close()
  139. raise_if_failed(resp)
  140. async def stream_open(
  141. self, peer_id: PeerID, protocols: Sequence[str]
  142. ) -> Tuple[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter]:
  143. reader, writer = await self.daemon_connector.open_connection()
  144. stream_open_req = p2pd_pb.StreamOpenRequest(
  145. peer=peer_id.to_bytes(), proto=list(protocols)
  146. )
  147. req = p2pd_pb.Request(
  148. type=p2pd_pb.Request.STREAM_OPEN, streamOpen=stream_open_req
  149. )
  150. await write_pbmsg(writer, req)
  151. resp = p2pd_pb.Response() # type: ignore
  152. await read_pbmsg_safe(reader, resp)
  153. raise_if_failed(resp)
  154. pb_stream_info = resp.streamInfo
  155. stream_info = StreamInfo.from_protobuf(pb_stream_info)
  156. return stream_info, reader, writer
  157. async def stream_handler(self, proto: str, handler_cb: StreamHandler) -> None:
  158. reader, writer = await self.daemon_connector.open_connection()
  159. listen_path_maddr_bytes = self.listen_maddr.to_bytes()
  160. stream_handler_req = p2pd_pb.StreamHandlerRequest(
  161. addr=listen_path_maddr_bytes, proto=[proto]
  162. )
  163. req = p2pd_pb.Request(
  164. type=p2pd_pb.Request.STREAM_HANDLER, streamHandler=stream_handler_req
  165. )
  166. await write_pbmsg(writer, req)
  167. resp = p2pd_pb.Response() # type: ignore
  168. await read_pbmsg_safe(reader, resp)
  169. writer.close()
  170. raise_if_failed(resp)
  171. # if success, add the handler to the dict
  172. self.handlers[proto] = handler_cb