|
@@ -144,6 +144,8 @@ class P2P:
|
|
|
|
|
|
await self._ping_daemon_with_retries(ping_n_attempts, ping_delay)
|
|
await self._ping_daemon_with_retries(ping_n_attempts, ping_delay)
|
|
|
|
|
|
|
|
+ self.persistent_streams = dict()
|
|
|
|
+
|
|
return self
|
|
return self
|
|
|
|
|
|
async def _ping_daemon_with_retries(self, ping_n_attempts: int, ping_delay: float) -> None:
|
|
async def _ping_daemon_with_retries(self, ping_n_attempts: int, ping_delay: float) -> None:
|
|
@@ -369,6 +371,15 @@ class P2P:
|
|
finally:
|
|
finally:
|
|
writer.close()
|
|
writer.close()
|
|
|
|
|
|
|
|
+ async def create_persistent_stream(self, peer_id: PeerID, handler_name: str):
|
|
|
|
+ if stream := self.persistent_streams.get((peer_id, handler_name)):
|
|
|
|
+ return stream
|
|
|
|
+
|
|
|
|
+ stream_info, reader, writer = await self._client.stream_open(peer_id, (handler_name,))
|
|
|
|
+ self.persistent_streams[(peer_id, handler_name)] = (stream_info, reader, writer)
|
|
|
|
+
|
|
|
|
+ return stream_info, reader, writer
|
|
|
|
+
|
|
async def call_unary_handler(self, peer_id: PeerID, handler_name: str,
|
|
async def call_unary_handler(self, peer_id: PeerID, handler_name: str,
|
|
request_protobuf: Any, response_proto_type: type) -> Any:
|
|
request_protobuf: Any, response_proto_type: type) -> Any:
|
|
stream_info, reader, writer = await self._client.stream_open(peer_id, (handler_name,))
|
|
stream_info, reader, writer = await self._client.stream_open(peer_id, (handler_name,))
|