p2p_daemon.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729
  1. import asyncio
  2. import json
  3. import logging
  4. import os
  5. import secrets
  6. import warnings
  7. from collections.abc import AsyncIterable as AsyncIterableABC
  8. from contextlib import closing, suppress
  9. from dataclasses import dataclass
  10. from datetime import datetime
  11. from importlib.resources import path
  12. from typing import Any, AsyncIterator, Awaitable, Callable, List, Optional, Sequence, Tuple, Type, TypeVar, Union
  13. from google.protobuf.message import Message
  14. from multiaddr import Multiaddr
  15. import hivemind.hivemind_cli as cli
  16. import hivemind.p2p.p2p_daemon_bindings.p2pclient as p2pclient
  17. from hivemind.p2p.p2p_daemon_bindings.control import DEFAULT_MAX_MSG_SIZE, P2PDaemonError, P2PHandlerError
  18. from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
  19. from hivemind.p2p.p2p_daemon_bindings.utils import ControlFailure
  20. from hivemind.proto import crypto_pb2
  21. from hivemind.proto.p2pd_pb2 import RPCError
  22. from hivemind.utils.asyncio import as_aiter, asingle
  23. from hivemind.utils.crypto import RSAPrivateKey
  24. from hivemind.utils.logging import get_logger, golog_level_to_python, loglevel, python_level_to_golog
  25. logger = get_logger(__name__)
  26. P2PD_FILENAME = "p2pd"
  27. @dataclass(frozen=True)
  28. class P2PContext(object):
  29. handle_name: str
  30. local_id: PeerID
  31. remote_id: PeerID = None
  32. class P2P:
  33. """
  34. This class is responsible for establishing peer-to-peer connections through NAT and/or firewalls.
  35. It creates and manages a libp2p daemon (https://libp2p.io) in a background process,
  36. then terminates it when P2P is shut down. In order to communicate, a P2P instance should
  37. either use one or more initial_peers that will connect it to the rest of the swarm or
  38. use the public IPFS network (https://ipfs.io).
  39. For incoming connections, P2P instances add RPC handlers that may be accessed by other peers:
  40. - `P2P.add_protobuf_handler` accepts a protobuf message and returns another protobuf
  41. - `P2P.add_binary_stream_handler` transfers raw data using bi-directional streaming interface
  42. To access these handlers, a P2P instance can `P2P.call_protobuf_handler`/`P2P.call_binary_stream_handler`,
  43. using the recipient's unique `P2P.peer_id` and the name of the corresponding handler.
  44. """
  45. HEADER_LEN = 8
  46. BYTEORDER = "big"
  47. MESSAGE_MARKER = b"\x00"
  48. ERROR_MARKER = b"\x01"
  49. END_OF_STREAM = RPCError()
  50. DHT_MODE_MAPPING = {
  51. "auto": {"dht": 1},
  52. "server": {"dhtServer": 1},
  53. "client": {"dhtClient": 1},
  54. }
  55. FORCE_REACHABILITY_MAPPING = {
  56. "public": {"forceReachabilityPublic": 1},
  57. "private": {"forceReachabilityPrivate": 1},
  58. }
  59. _UNIX_SOCKET_PREFIX = "/unix/tmp/hivemind-"
  60. def __init__(self):
  61. self.peer_id = None
  62. self._client = None
  63. self._child = None
  64. self._alive = False
  65. self._reader_task = None
  66. self._listen_task = None
  67. @classmethod
  68. async def create(
  69. cls,
  70. initial_peers: Optional[Sequence[Union[Multiaddr, str]]] = None,
  71. *,
  72. announce_maddrs: Optional[Sequence[Union[Multiaddr, str]]] = None,
  73. auto_nat: bool = True,
  74. conn_manager: bool = True,
  75. dht_mode: str = "server",
  76. force_reachability: Optional[str] = None,
  77. host_maddrs: Optional[Sequence[Union[Multiaddr, str]]] = ("/ip4/127.0.0.1/tcp/0",),
  78. identity_path: Optional[str] = None,
  79. idle_timeout: float = 30,
  80. nat_port_map: bool = True,
  81. relay_hop_limit: int = 0,
  82. startup_timeout: float = 15,
  83. tls: bool = True,
  84. use_auto_relay: bool = False,
  85. use_ipfs: bool = False,
  86. use_relay: bool = True,
  87. persistent_conn_max_msg_size: int = DEFAULT_MAX_MSG_SIZE,
  88. quic: Optional[bool] = None,
  89. use_relay_hop: Optional[bool] = None,
  90. use_relay_discovery: Optional[bool] = None,
  91. check_if_identity_free: bool = True,
  92. no_listen: bool = False,
  93. trusted_relays: Optional[Sequence[Union[Multiaddr, str]]] = None,
  94. ) -> "P2P":
  95. """
  96. Start a new p2pd process and connect to it.
  97. :param initial_peers: List of bootstrap peers
  98. :param auto_nat: Enables the AutoNAT service
  99. :param announce_maddrs: Visible multiaddrs that the peer will announce
  100. for external connections from other p2p instances
  101. :param conn_manager: Enables the Connection Manager
  102. :param dht_mode: libp2p DHT mode (auto/client/server).
  103. Defaults to "server" to make collaborations work in local networks.
  104. Details: https://pkg.go.dev/github.com/libp2p/go-libp2p-kad-dht#ModeOpt
  105. :param force_reachability: Force reachability mode (public/private)
  106. :param host_maddrs: Multiaddrs to listen for external connections from other p2p instances
  107. :param identity_path: Path to a private key file. If defined, makes the peer ID deterministic.
  108. If the file does not exist yet, writes a new private key to this file.
  109. :param idle_timeout: kill daemon if client has been idle for a given number of
  110. seconds before opening persistent streams
  111. :param nat_port_map: Enables NAT port mapping
  112. :param relay_hop_limit: sets the hop limit for hop relays
  113. :param startup_timeout: raise a P2PDaemonError if the daemon does not start in ``startup_timeout`` seconds
  114. :param tls: Enables TLS1.3 channel security protocol
  115. :param use_auto_relay: enables autorelay
  116. :param use_ipfs: Bootstrap to IPFS (incompatible with initial_peers)
  117. :param use_relay: enables circuit relay
  118. :param quic: Deprecated, has no effect since libp2p 0.17.0
  119. :param use_relay_hop: Deprecated, has no effect since libp2p 0.17.0
  120. :param use_relay_discovery: Deprecated, has no effect since libp2p 0.17.0
  121. :param check_if_identity_free: If enabled (default), ``identity_path`` is provided,
  122. and we are connecting to an existing swarm,
  123. ensure that this identity is not used by other peers already.
  124. This slows down ``P2P.create()`` but protects from unintuitive libp2p errors
  125. appearing in case of the identity collision.
  126. :return: a wrapper for the p2p daemon
  127. """
  128. assert not (
  129. initial_peers and use_ipfs
  130. ), "User-defined initial_peers and use_ipfs=True are incompatible, please choose one option"
  131. if not all(arg is None for arg in [quic, use_relay_hop, use_relay_discovery]):
  132. warnings.warn(
  133. "Parameters `quic`, `use_relay_hop`, and `use_relay_discovery` of hivemind.P2P "
  134. "have no effect since libp2p 0.17.0 and will be removed in hivemind 1.2.0+",
  135. DeprecationWarning,
  136. stacklevel=2,
  137. )
  138. self = cls()
  139. with path(cli, P2PD_FILENAME) as p:
  140. p2pd_path = p
  141. socket_uid = secrets.token_urlsafe(8)
  142. self._daemon_listen_maddr = Multiaddr(cls._UNIX_SOCKET_PREFIX + f"p2pd-{socket_uid}.sock")
  143. self._client_listen_maddr = Multiaddr(cls._UNIX_SOCKET_PREFIX + f"p2pclient-{socket_uid}.sock")
  144. if announce_maddrs is not None:
  145. for addr in announce_maddrs:
  146. addr = Multiaddr(addr)
  147. if ("tcp" in addr and addr["tcp"] == "0") or ("udp" in addr and addr["udp"] == "0"):
  148. raise ValueError("Please specify an explicit port in announce_maddrs: port 0 is not supported")
  149. need_bootstrap = bool(initial_peers) or use_ipfs
  150. process_kwargs = cls.DHT_MODE_MAPPING[dht_mode].copy()
  151. process_kwargs.update(cls.FORCE_REACHABILITY_MAPPING.get(force_reachability, {}))
  152. for param, value in [
  153. ("bootstrapPeers", initial_peers),
  154. ("hostAddrs", host_maddrs),
  155. ("announceAddrs", announce_maddrs),
  156. ("trustedRelays", trusted_relays),
  157. ]:
  158. if value:
  159. process_kwargs[param] = self._maddrs_to_str(value)
  160. if no_listen:
  161. process_kwargs["noListenAddrs"] = 1
  162. if identity_path is not None:
  163. if os.path.isfile(identity_path):
  164. if check_if_identity_free and need_bootstrap:
  165. logger.info(f"Checking that identity from `{identity_path}` is not used by other peers")
  166. if await cls.is_identity_taken(
  167. identity_path,
  168. initial_peers=initial_peers,
  169. tls=tls,
  170. use_auto_relay=use_auto_relay,
  171. use_ipfs=use_ipfs,
  172. use_relay=use_relay,
  173. ):
  174. raise P2PDaemonError(f"Identity from `{identity_path}` is already taken by another peer")
  175. else:
  176. logger.info(f"Generating new identity to be saved in `{identity_path}`")
  177. self.generate_identity(identity_path)
  178. # A newly generated identity is not taken with ~100% probability
  179. process_kwargs["id"] = identity_path
  180. proc_args = self._make_process_args(
  181. str(p2pd_path),
  182. autoRelay=use_auto_relay,
  183. autonat=auto_nat,
  184. b=need_bootstrap,
  185. connManager=conn_manager,
  186. idleTimeout=f"{idle_timeout}s",
  187. listen=self._daemon_listen_maddr,
  188. natPortMap=nat_port_map,
  189. relay=use_relay,
  190. relayHopLimit=relay_hop_limit,
  191. tls=tls,
  192. persistentConnMaxMsgSize=persistent_conn_max_msg_size,
  193. **process_kwargs,
  194. )
  195. env = os.environ.copy()
  196. env.setdefault("GOLOG_LOG_LEVEL", python_level_to_golog(loglevel))
  197. env["GOLOG_LOG_FMT"] = "json"
  198. logger.debug(f"Launching {proc_args}")
  199. self._child = await asyncio.subprocess.create_subprocess_exec(
  200. *proc_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, env=env
  201. )
  202. self._alive = True
  203. ready = asyncio.Future()
  204. self._reader_task = asyncio.create_task(self._read_outputs(ready))
  205. try:
  206. await asyncio.wait_for(ready, startup_timeout)
  207. except asyncio.TimeoutError:
  208. await self.shutdown()
  209. raise P2PDaemonError(f"Daemon failed to start in {startup_timeout:.1f} seconds")
  210. self._client = await p2pclient.Client.create(
  211. control_maddr=self._daemon_listen_maddr,
  212. listen_maddr=self._client_listen_maddr,
  213. persistent_conn_max_msg_size=persistent_conn_max_msg_size,
  214. )
  215. await self._ping_daemon()
  216. return self
  217. @classmethod
  218. async def is_identity_taken(
  219. cls,
  220. identity_path: str,
  221. *,
  222. initial_peers: Optional[Sequence[Union[Multiaddr, str]]],
  223. tls: bool,
  224. use_auto_relay: bool,
  225. use_ipfs: bool,
  226. use_relay: bool,
  227. ) -> bool:
  228. with open(identity_path, "rb") as f:
  229. peer_id = PeerID.from_identity(f.read())
  230. anonymous_p2p = await cls.create(
  231. initial_peers=initial_peers,
  232. dht_mode="client",
  233. tls=tls,
  234. use_auto_relay=use_auto_relay,
  235. use_ipfs=use_ipfs,
  236. use_relay=use_relay,
  237. )
  238. try:
  239. await anonymous_p2p._client.connect(peer_id, [])
  240. return True
  241. except ControlFailure:
  242. return False
  243. finally:
  244. await anonymous_p2p.shutdown()
  245. @staticmethod
  246. def generate_identity(identity_path: str) -> None:
  247. private_key = RSAPrivateKey()
  248. protobuf = crypto_pb2.PrivateKey(key_type=crypto_pb2.KeyType.RSA, data=private_key.to_bytes())
  249. try:
  250. with open(identity_path, "wb") as f:
  251. f.write(protobuf.SerializeToString())
  252. except FileNotFoundError:
  253. raise FileNotFoundError(
  254. f"The directory `{os.path.dirname(identity_path)}` for saving the identity does not exist"
  255. )
  256. os.chmod(identity_path, 0o400)
  257. @classmethod
  258. async def replicate(cls, daemon_listen_maddr: Multiaddr) -> "P2P":
  259. """
  260. Connect to existing p2p daemon
  261. :param daemon_listen_maddr: multiaddr of the existing p2p daemon
  262. :return: new wrapper for the existing p2p daemon
  263. """
  264. self = cls()
  265. # There is no child under control
  266. # Use external already running p2pd
  267. self._child = None
  268. self._alive = True
  269. socket_uid = secrets.token_urlsafe(8)
  270. self._daemon_listen_maddr = daemon_listen_maddr
  271. self._client_listen_maddr = Multiaddr(cls._UNIX_SOCKET_PREFIX + f"p2pclient-{socket_uid}.sock")
  272. self._client = await p2pclient.Client.create(self._daemon_listen_maddr, self._client_listen_maddr)
  273. await self._ping_daemon()
  274. return self
  275. async def _ping_daemon(self) -> None:
  276. self.peer_id, self._visible_maddrs = await self._client.identify()
  277. logger.debug(f"Launched p2pd with peer id = {self.peer_id}, host multiaddrs = {self._visible_maddrs}")
  278. async def get_visible_maddrs(self, latest: bool = False) -> List[Multiaddr]:
  279. """
  280. Get multiaddrs of the current peer that should be accessible by other peers.
  281. :param latest: ask the P2P daemon to refresh the visible multiaddrs
  282. """
  283. if latest:
  284. _, self._visible_maddrs = await self._client.identify()
  285. if not self._visible_maddrs:
  286. raise ValueError(f"No multiaddrs found for peer {self.peer_id}")
  287. p2p_maddr = Multiaddr(f"/p2p/{self.peer_id.to_base58()}")
  288. return [addr.encapsulate(p2p_maddr) for addr in self._visible_maddrs]
  289. async def list_peers(self) -> List[PeerInfo]:
  290. return list(await self._client.list_peers())
  291. async def wait_for_at_least_n_peers(self, n_peers: int, attempts: int = 3, delay: float = 1) -> None:
  292. for _ in range(attempts):
  293. peers = await self._client.list_peers()
  294. if len(peers) >= n_peers:
  295. return
  296. await asyncio.sleep(delay)
  297. raise RuntimeError("Not enough peers")
  298. @property
  299. def daemon_listen_maddr(self) -> Multiaddr:
  300. return self._daemon_listen_maddr
  301. @staticmethod
  302. async def send_raw_data(data: bytes, writer: asyncio.StreamWriter, *, chunk_size: int = 2**16) -> None:
  303. writer.write(len(data).to_bytes(P2P.HEADER_LEN, P2P.BYTEORDER))
  304. data = memoryview(data)
  305. for offset in range(0, len(data), chunk_size):
  306. writer.write(data[offset : offset + chunk_size])
  307. await writer.drain()
  308. @staticmethod
  309. async def receive_raw_data(reader: asyncio.StreamReader) -> bytes:
  310. header = await reader.readexactly(P2P.HEADER_LEN)
  311. content_length = int.from_bytes(header, P2P.BYTEORDER)
  312. data = await reader.readexactly(content_length)
  313. return data
  314. TInputProtobuf = TypeVar("TInputProtobuf")
  315. TOutputProtobuf = TypeVar("TOutputProtobuf")
  316. @staticmethod
  317. async def send_protobuf(protobuf: Union[TOutputProtobuf, RPCError], writer: asyncio.StreamWriter) -> None:
  318. if isinstance(protobuf, RPCError):
  319. writer.write(P2P.ERROR_MARKER)
  320. else:
  321. writer.write(P2P.MESSAGE_MARKER)
  322. await P2P.send_raw_data(protobuf.SerializeToString(), writer)
  323. @staticmethod
  324. async def receive_protobuf(
  325. input_protobuf_type: Type[Message], reader: asyncio.StreamReader
  326. ) -> Tuple[Optional[TInputProtobuf], Optional[RPCError]]:
  327. msg_type = await reader.readexactly(1)
  328. if msg_type == P2P.MESSAGE_MARKER:
  329. protobuf = input_protobuf_type()
  330. protobuf.ParseFromString(await P2P.receive_raw_data(reader))
  331. return protobuf, None
  332. elif msg_type == P2P.ERROR_MARKER:
  333. protobuf = RPCError()
  334. protobuf.ParseFromString(await P2P.receive_raw_data(reader))
  335. return None, protobuf
  336. else:
  337. raise TypeError("Invalid Protobuf message type")
  338. TInputStream = AsyncIterator[TInputProtobuf]
  339. TOutputStream = AsyncIterator[TOutputProtobuf]
  340. async def _add_protobuf_stream_handler(
  341. self,
  342. name: str,
  343. handler: Callable[[TInputStream, P2PContext], TOutputStream],
  344. input_protobuf_type: Type[Message],
  345. max_prefetch: int = 5,
  346. balanced: bool = False,
  347. ) -> None:
  348. """
  349. :param max_prefetch: Maximum number of items to prefetch from the request stream.
  350. ``max_prefetch <= 0`` means unlimited.
  351. :note: Since the cancel messages are sent via the input stream,
  352. they will not be received while the prefetch buffer is full.
  353. """
  354. async def _handle_stream(
  355. stream_info: StreamInfo, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
  356. ) -> None:
  357. context = P2PContext(
  358. handle_name=name,
  359. local_id=self.peer_id,
  360. remote_id=stream_info.peer_id,
  361. )
  362. requests = asyncio.Queue(max_prefetch)
  363. async def _read_stream() -> P2P.TInputStream:
  364. while True:
  365. request = await requests.get()
  366. if request is None:
  367. break
  368. yield request
  369. async def _process_stream() -> None:
  370. try:
  371. async for response in handler(_read_stream(), context):
  372. try:
  373. await P2P.send_protobuf(response, writer)
  374. except Exception:
  375. # The connection is unexpectedly closed by the caller or broken.
  376. # The loglevel is DEBUG since the actual error will be reported on the caller
  377. logger.debug("Exception while sending response:", exc_info=True)
  378. break
  379. except Exception as e:
  380. logger.warning("Handler failed with the exception:", exc_info=True)
  381. with suppress(Exception):
  382. # Sometimes `e` is a connection error, so it is okay if we fail to report `e` to the caller
  383. await P2P.send_protobuf(RPCError(message=str(e)), writer)
  384. with closing(writer):
  385. processing_task = asyncio.create_task(_process_stream())
  386. try:
  387. while True:
  388. receive_task = asyncio.create_task(P2P.receive_protobuf(input_protobuf_type, reader))
  389. await asyncio.wait({processing_task, receive_task}, return_when=asyncio.FIRST_COMPLETED)
  390. if processing_task.done():
  391. receive_task.cancel()
  392. return
  393. if receive_task.done():
  394. try:
  395. request, _ = await receive_task
  396. except asyncio.IncompleteReadError: # Connection is closed (the client cancelled or died)
  397. return
  398. await requests.put(request) # `request` is None for the end-of-stream message
  399. except Exception:
  400. logger.warning("Exception while receiving requests:", exc_info=True)
  401. finally:
  402. processing_task.cancel()
  403. await self.add_binary_stream_handler(name, _handle_stream, balanced=balanced)
  404. async def _iterate_protobuf_stream_handler(
  405. self, peer_id: PeerID, name: str, requests: TInputStream, output_protobuf_type: Type[Message]
  406. ) -> TOutputStream:
  407. _, reader, writer = await self.call_binary_stream_handler(peer_id, name)
  408. async def _write_to_stream() -> None:
  409. async for request in requests:
  410. await P2P.send_protobuf(request, writer)
  411. await P2P.send_protobuf(P2P.END_OF_STREAM, writer)
  412. async def _read_from_stream() -> AsyncIterator[Message]:
  413. with closing(writer):
  414. try:
  415. while True:
  416. try:
  417. response, err = await P2P.receive_protobuf(output_protobuf_type, reader)
  418. except asyncio.IncompleteReadError: # Connection is closed
  419. break
  420. if err is not None:
  421. raise P2PHandlerError(f"Failed to call handler `{name}` at {peer_id}: {err.message}")
  422. yield response
  423. await writing_task
  424. finally:
  425. writing_task.cancel()
  426. writing_task = asyncio.create_task(_write_to_stream())
  427. return _read_from_stream()
  428. async def add_protobuf_handler(
  429. self,
  430. name: str,
  431. handler: Callable[
  432. [Union[TInputProtobuf, TInputStream], P2PContext], Union[Awaitable[TOutputProtobuf], TOutputStream]
  433. ],
  434. input_protobuf_type: Type[Message],
  435. *,
  436. stream_input: bool = False,
  437. stream_output: bool = False,
  438. balanced: bool = False,
  439. ) -> None:
  440. """
  441. :param stream_input: If True, assume ``handler`` to take ``TInputStream``
  442. (not just ``TInputProtobuf``) as input.
  443. :param stream_output: If True, assume ``handler`` to return ``TOutputStream``
  444. (not ``Awaitable[TOutputProtobuf]``).
  445. :param balanced: If True, handler will be balanced on p2pd side between all handlers in python.
  446. Default: False
  447. """
  448. if not stream_input and not stream_output:
  449. await self._add_protobuf_unary_handler(name, handler, input_protobuf_type, balanced=balanced)
  450. return
  451. async def _stream_handler(requests: P2P.TInputStream, context: P2PContext) -> P2P.TOutputStream:
  452. input = requests if stream_input else await asingle(requests)
  453. output = handler(input, context)
  454. if isinstance(output, AsyncIterableABC):
  455. async for item in output:
  456. yield item
  457. else:
  458. yield await output
  459. await self._add_protobuf_stream_handler(name, _stream_handler, input_protobuf_type, balanced=balanced)
  460. async def remove_protobuf_handler(
  461. self,
  462. name: str,
  463. *,
  464. stream_input: bool = False,
  465. stream_output: bool = False,
  466. ) -> None:
  467. if not stream_input and not stream_output:
  468. await self._client.remove_unary_handler(name)
  469. return
  470. await self.remove_binary_stream_handler(name)
  471. async def _add_protobuf_unary_handler(
  472. self,
  473. handle_name: str,
  474. handler: Callable[[TInputProtobuf, P2PContext], Awaitable[TOutputProtobuf]],
  475. input_protobuf_type: Type[Message],
  476. balanced: bool = False,
  477. ) -> None:
  478. """
  479. Register a request-response (unary) handler. Unary requests and responses
  480. are sent through persistent multiplexed connections to the daemon for the
  481. sake of reducing the number of open files.
  482. :param handle_name: name of the handler (protocol id)
  483. :param handler: function handling the unary requests
  484. :param input_protobuf_type: protobuf type of the request
  485. """
  486. async def _unary_handler(request: bytes, remote_id: PeerID) -> bytes:
  487. input_serialized = input_protobuf_type.FromString(request)
  488. context = P2PContext(
  489. handle_name=handle_name,
  490. local_id=self.peer_id,
  491. remote_id=remote_id,
  492. )
  493. response = await handler(input_serialized, context)
  494. return response.SerializeToString()
  495. await self._client.add_unary_handler(handle_name, _unary_handler, balanced=balanced)
  496. async def call_protobuf_handler(
  497. self,
  498. peer_id: PeerID,
  499. name: str,
  500. input: Union[TInputProtobuf, TInputStream],
  501. output_protobuf_type: Type[Message],
  502. ) -> Awaitable[TOutputProtobuf]:
  503. if not isinstance(input, AsyncIterableABC):
  504. return await self._call_unary_protobuf_handler(peer_id, name, input, output_protobuf_type)
  505. responses = await self._iterate_protobuf_stream_handler(peer_id, name, input, output_protobuf_type)
  506. return await asingle(responses)
  507. async def _call_unary_protobuf_handler(
  508. self,
  509. peer_id: PeerID,
  510. handle_name: str,
  511. input: TInputProtobuf,
  512. output_protobuf_type: Type[Message],
  513. ) -> Awaitable[TOutputProtobuf]:
  514. serialized_input = input.SerializeToString()
  515. response = await self._client.call_unary_handler(peer_id, handle_name, serialized_input)
  516. return output_protobuf_type.FromString(response)
  517. async def iterate_protobuf_handler(
  518. self,
  519. peer_id: PeerID,
  520. name: str,
  521. input: Union[TInputProtobuf, TInputStream],
  522. output_protobuf_type: Type[Message],
  523. ) -> TOutputStream:
  524. requests = input if isinstance(input, AsyncIterableABC) else as_aiter(input)
  525. return await self._iterate_protobuf_stream_handler(peer_id, name, requests, output_protobuf_type)
  526. def _start_listening(self) -> None:
  527. async def listen() -> None:
  528. async with self._client.listen():
  529. await asyncio.Future() # Wait until this task will be cancelled in _terminate()
  530. self._listen_task = asyncio.create_task(listen())
  531. async def add_binary_stream_handler(
  532. self, name: str, handler: p2pclient.StreamHandler, balanced: bool = False
  533. ) -> None:
  534. if self._listen_task is None:
  535. self._start_listening()
  536. await self._client.stream_handler(name, handler, balanced)
  537. async def remove_binary_stream_handler(self, name: str) -> None:
  538. await self._client.remove_stream_handler(name)
  539. async def call_binary_stream_handler(
  540. self, peer_id: PeerID, handler_name: str
  541. ) -> Tuple[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter]:
  542. return await self._client.stream_open(peer_id, (handler_name,))
  543. def __del__(self):
  544. self._terminate()
  545. @property
  546. def is_alive(self) -> bool:
  547. return self._alive
  548. async def shutdown(self) -> None:
  549. self._terminate()
  550. if self._child is not None:
  551. await self._child.wait()
  552. def _terminate(self) -> None:
  553. if self._client is not None:
  554. self._client.close()
  555. if self._listen_task is not None:
  556. self._listen_task.cancel()
  557. if self._reader_task is not None:
  558. self._reader_task.cancel()
  559. self._alive = False
  560. if self._child is not None and self._child.returncode is None:
  561. self._child.terminate()
  562. logger.debug(f"Terminated p2pd with id = {self.peer_id}")
  563. with suppress(FileNotFoundError):
  564. os.remove(self._daemon_listen_maddr["unix"])
  565. with suppress(FileNotFoundError):
  566. os.remove(self._client_listen_maddr["unix"])
  567. @staticmethod
  568. def _make_process_args(*args, **kwargs) -> List[str]:
  569. proc_args = []
  570. proc_args.extend(str(entry) for entry in args)
  571. proc_args.extend(
  572. f"-{key}={P2P._convert_process_arg_type(value)}" if value is not None else f"-{key}"
  573. for key, value in kwargs.items()
  574. )
  575. return proc_args
  576. @staticmethod
  577. def _convert_process_arg_type(val: Any) -> Any:
  578. if isinstance(val, bool):
  579. return int(val)
  580. return val
  581. @staticmethod
  582. def _maddrs_to_str(maddrs: List[Multiaddr]) -> str:
  583. return ",".join(str(addr) for addr in maddrs)
  584. async def _read_outputs(self, ready: asyncio.Future) -> None:
  585. last_line = None
  586. while True:
  587. line = await self._child.stdout.readline()
  588. if not line: # Stream closed
  589. break
  590. last_line = line.rstrip().decode(errors="ignore")
  591. self._log_p2pd_message(last_line)
  592. if last_line.startswith("Peer ID:"):
  593. ready.set_result(None)
  594. if not ready.done():
  595. ready.set_exception(P2PDaemonError(f"Daemon failed to start: {last_line}"))
  596. @staticmethod
  597. def _log_p2pd_message(line: str) -> None:
  598. if '"logger"' not in line: # User-friendly info from p2pd stdout
  599. logger.debug(line, extra={"caller": "p2pd"})
  600. return
  601. try:
  602. record = json.loads(line)
  603. caller = record["caller"]
  604. level = golog_level_to_python(record["level"])
  605. if level <= logging.WARNING:
  606. # Many Go loggers are excessively verbose (e.g. show warnings for unreachable peers),
  607. # so we downgrade INFO and WARNING messages to DEBUG.
  608. # The Go verbosity can still be controlled via the GOLOG_LOG_LEVEL env variable.
  609. # Details: https://github.com/ipfs/go-log#golog_log_level
  610. level = logging.DEBUG
  611. message = record["msg"]
  612. if "error" in record:
  613. message += f": {record['error']}"
  614. logger.log(
  615. level,
  616. message,
  617. extra={
  618. "origin_created": datetime.strptime(record["ts"], "%Y-%m-%dT%H:%M:%S.%f%z").timestamp(),
  619. "caller": caller,
  620. },
  621. )
  622. except Exception:
  623. # Parsing errors are unlikely, but we don't want to lose these messages anyway
  624. logger.warning(line, extra={"caller": "p2pd"})
  625. logger.exception("Failed to parse go-log message:")