|
@@ -74,7 +74,7 @@ class ControlClient:
|
|
|
DEFAULT_LISTEN_MADDR = "/unix/tmp/p2pclient.sock"
|
|
|
|
|
|
def __init__(
|
|
|
- self, daemon_connector: DaemonConnector, listen_maddr: Multiaddr = Multiaddr(DEFAULT_LISTEN_MADDR)
|
|
|
+ self, daemon_connector: DaemonConnector, listen_maddr: Multiaddr = Multiaddr(DEFAULT_LISTEN_MADDR)
|
|
|
) -> None:
|
|
|
self.listen_maddr = listen_maddr
|
|
|
self.daemon_connector = daemon_connector
|
|
@@ -108,21 +108,21 @@ class ControlClient:
|
|
|
resp: p2pd_pb.Response = p2pd_pb.Response() # type: ignore
|
|
|
await read_pbmsg_safe(reader, resp)
|
|
|
|
|
|
- if resp.callUnaryResponse:
|
|
|
+ if resp.HasField("callUnaryResponse"):
|
|
|
call_id = uuid.UUID(bytes=resp.callUnaryResponse.callId)
|
|
|
|
|
|
- if call_id in self.pending_calls and resp.data:
|
|
|
- self.pending_calls[call_id].set_result(call_id)
|
|
|
- elif call_id in self.pending_calls and resp.error:
|
|
|
- remote_exc = RemoteException(str(resp.error))
|
|
|
+ if call_id in self.pending_calls and resp.callUnaryResponse.HasField("result"):
|
|
|
+ self.pending_calls[call_id].set_result(resp.callUnaryResponse.result)
|
|
|
+ elif call_id in self.pending_calls and resp.callUnaryResponse.HasField("error"):
|
|
|
+ remote_exc = RemoteException(str(resp.callUnaryResponse.error))
|
|
|
self.pending_calls[call_id].set_exception(remote_exc)
|
|
|
else:
|
|
|
logger.debug(f"received unexpected unary call")
|
|
|
|
|
|
- elif resp.requestHandling:
|
|
|
+ elif resp.HasField("requestHandling"):
|
|
|
asyncio.create_task(self._handle_persistent_request(resp.requestHandling))
|
|
|
pass
|
|
|
-
|
|
|
+
|
|
|
async def _write_to_persistent_conn(self, writer: asyncio.StreamWriter):
|
|
|
while True:
|
|
|
msg = await self.pending_messages.get()
|
|
@@ -132,17 +132,17 @@ class ControlClient:
|
|
|
assert request.proto in self.unary_handlers
|
|
|
|
|
|
try:
|
|
|
- response_payload: bytes = self.unary_handlers[request.protocol](request.payload)
|
|
|
+ response_payload: bytes = self.unary_handlers[request.proto](request.data)
|
|
|
response = p2pd_pb.CallUnaryResponse(
|
|
|
- call_id=request.call_id,
|
|
|
- data=response_payload)
|
|
|
+ callId=request.callId,
|
|
|
+ result=response_payload)
|
|
|
except Exception as e:
|
|
|
response = p2pd_pb.CallUnaryResponse(
|
|
|
- call_id=request.call_id,
|
|
|
+ callId=request.callId,
|
|
|
error=repr(e))
|
|
|
|
|
|
await self.pending_messages.put(
|
|
|
- p2pd_pb.Request(type=p2pd_pb.Request.UNARY_RESPONSE, response=response))
|
|
|
+ p2pd_pb.Request(type=p2pd_pb.Request.SEND_RESPONSE_TO_REMOTE, sendResponseToRemote=response))
|
|
|
|
|
|
async def _handler(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
|
pb_stream_info = p2pd_pb.StreamInfo() # type: ignore
|
|
@@ -161,6 +161,7 @@ class ControlClient:
|
|
|
reader, writer = await self.daemon_connector.open_persistent_connection()
|
|
|
asyncio.create_task(self._read_from_persistent_conn(reader))
|
|
|
asyncio.create_task(self._write_to_persistent_conn(writer))
|
|
|
+ self._pers_conn_open = True # TODO FIXME
|
|
|
|
|
|
async def add_unary_handler(self, proto: str, handler: TUnaryHandler):
|
|
|
await self._ensure_persistent_conn()
|
|
@@ -182,7 +183,7 @@ class ControlClient:
|
|
|
peer=peer_id.to_bytes(),
|
|
|
proto=proto,
|
|
|
data=data,
|
|
|
- callId=call_id,
|
|
|
+ callId=call_id.bytes,
|
|
|
)
|
|
|
req = p2pd_pb.Request(
|
|
|
type=p2pd_pb.Request.CALL_UNARY,
|
|
@@ -252,7 +253,7 @@ class ControlClient:
|
|
|
raise_if_failed(resp)
|
|
|
|
|
|
async def stream_open(
|
|
|
- self, peer_id: PeerID, protocols: Sequence[str]
|
|
|
+ self, peer_id: PeerID, protocols: Sequence[str]
|
|
|
) -> Tuple[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter]:
|
|
|
reader, writer = await self.daemon_connector.open_connection()
|
|
|
|