|
@@ -13,6 +13,7 @@ from multiaddr import Multiaddr
|
|
|
import hivemind.hivemind_cli as cli
|
|
|
import hivemind.p2p.p2p_daemon_bindings.p2pclient as p2pclient
|
|
|
from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
|
|
|
+from hivemind.p2p.p2p_daemon_bindings.utils import ControlFailure
|
|
|
from hivemind.proto.p2pd_pb2 import RPCError
|
|
|
from hivemind.utils.asyncio import aiter
|
|
|
from hivemind.utils.logging import get_logger
|
|
@@ -469,9 +470,19 @@ class P2P:
|
|
|
await self._client.stream_handler(name, handler)
|
|
|
|
|
|
async def call_binary_stream_handler(
|
|
|
- self, peer_id: PeerID, handler_name: str
|
|
|
+ self, peer_id: PeerID, handler_name: str, n_attempts: int = 5, delay: float = 0.5
|
|
|
) -> Tuple[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter]:
|
|
|
- return await self._client.stream_open(peer_id, (handler_name,))
|
|
|
+ for try_number in range(n_attempts):
|
|
|
+ try:
|
|
|
+ return await self._client.stream_open(peer_id, (handler_name,))
|
|
|
+ except ControlFailure:
|
|
|
+ if try_number == n_attempts - 1:
|
|
|
+ logger.exception(f"Failed to open stream to {peer_id} for handle `{handler_name}`. "
|
|
|
+ f"Made {n_attempts} attempts, giving up:")
|
|
|
+ raise
|
|
|
+ logger.warning(f"Failed to open stream to {peer_id} for handle `{handler_name}`:")
|
|
|
+
|
|
|
+ await asyncio.sleep(delay * (2 ** try_number))
|
|
|
|
|
|
def __del__(self):
|
|
|
self._terminate()
|