|
@@ -91,6 +91,18 @@ class ControlClient:
|
|
|
self._read_task: Optional[asyncio.Task] = None
|
|
|
self._write_task: Optional[asyncio.Task] = None
|
|
|
|
|
|
+ async def _handler(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
|
+ pb_stream_info = p2pd_pb.StreamInfo() # type: ignore
|
|
|
+ await read_pbmsg_safe(reader, pb_stream_info)
|
|
|
+ stream_info = StreamInfo.from_protobuf(pb_stream_info)
|
|
|
+ try:
|
|
|
+ handler = self.handlers[stream_info.proto]
|
|
|
+ except KeyError as e:
|
|
|
+ # should never enter here... daemon should reject the stream for us.
|
|
|
+ writer.close()
|
|
|
+ raise DispatchFailure(e)
|
|
|
+ await handler(stream_info, reader, writer)
|
|
|
+
|
|
|
@asynccontextmanager
|
|
|
async def listen(self) -> AsyncIterator["ControlClient"]:
|
|
|
proto_code = parse_conn_protocol(self.listen_maddr)
|
|
@@ -163,19 +175,7 @@ class ControlClient:
|
|
|
)
|
|
|
self._handler_tasks.pop(call_id)
|
|
|
|
|
|
- async def _handler(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
|
- pb_stream_info = p2pd_pb.StreamInfo() # type: ignore
|
|
|
- await read_pbmsg_safe(reader, pb_stream_info)
|
|
|
- stream_info = StreamInfo.from_protobuf(pb_stream_info)
|
|
|
- try:
|
|
|
- handler = self.handlers[stream_info.proto]
|
|
|
- except KeyError as e:
|
|
|
- # should never enter here... daemon should reject the stream for us.
|
|
|
- writer.close()
|
|
|
- raise DispatchFailure(e)
|
|
|
- await handler(stream_info, reader, writer)
|
|
|
-
|
|
|
- async def _send_call_cancel(self, call_id: uuid.UUID):
|
|
|
+ async def _cancel_unary_call(self, call_id: uuid.UUID):
|
|
|
await self._pending_messages.put(
|
|
|
p2pd_pb.PersistentConnectionRequest(
|
|
|
callId=call_id.bytes,
|
|
@@ -228,7 +228,7 @@ class ControlClient:
|
|
|
return await self._pending_calls[call_id]
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
- await self._send_call_cancel(call_id)
|
|
|
+ await self._cancel_unary_call(call_id)
|
|
|
raise
|
|
|
|
|
|
finally:
|