|
@@ -115,32 +115,37 @@ class ControlClient:
|
|
|
self._write_task.cancel()
|
|
|
|
|
|
async def _read_from_persistent_conn(self, reader: asyncio.StreamReader):
|
|
|
- while True:
|
|
|
- resp = p2pd_pb.PCResponse()
|
|
|
- await read_pbmsg_safe(reader, resp)
|
|
|
+ with closing(reader):
|
|
|
+ while True:
|
|
|
+ resp = p2pd_pb.PCResponse()
|
|
|
+ await read_pbmsg_safe(reader, resp)
|
|
|
+
|
|
|
+ print("received response:", resp)
|
|
|
|
|
|
- call_id = uuid.UUID(bytes=resp.callId)
|
|
|
+ call_id = uuid.UUID(bytes=resp.callId)
|
|
|
|
|
|
- if resp.HasField("callUnaryResponse"):
|
|
|
- if call_id in self.pending_calls and resp.callUnaryResponse.HasField("result"):
|
|
|
- self.pending_calls[call_id].set_result(resp.callUnaryResponse.result)
|
|
|
- elif call_id in self.pending_calls and resp.callUnaryResponse.HasField("error"):
|
|
|
- remote_exc = P2PHandlerError(resp.callUnaryResponse.error.decode())
|
|
|
- self.pending_calls[call_id].set_exception(remote_exc)
|
|
|
- else:
|
|
|
- logger.debug(f"received unexpected unary call")
|
|
|
+ if resp.HasField("callUnaryResponse"):
|
|
|
+ print("HASHSAHAS")
|
|
|
+ if call_id in self.pending_calls and resp.callUnaryResponse.HasField("response"):
|
|
|
+ self.pending_calls[call_id].set_result(resp.callUnaryResponse.response)
|
|
|
+ elif call_id in self.pending_calls and resp.callUnaryResponse.HasField("error"):
|
|
|
+ remote_exc = P2PHandlerError(resp.callUnaryResponse.error.decode())
|
|
|
+ self.pending_calls[call_id].set_exception(remote_exc)
|
|
|
+ else:
|
|
|
+ logger.debug(f"received unexpected unary call")
|
|
|
|
|
|
- elif resp.HasField("requestHandling"):
|
|
|
- handler_task = asyncio.create_task(self._handle_persistent_request(call_id, resp.requestHandling))
|
|
|
- self.handler_tasks[call_id] = handler_task
|
|
|
+ elif resp.HasField("requestHandling"):
|
|
|
+ handler_task = asyncio.create_task(self._handle_persistent_request(call_id, resp.requestHandling))
|
|
|
+ self.handler_tasks[call_id] = handler_task
|
|
|
|
|
|
- elif call_id in self.handler_tasks and resp.HasField("cancel"):
|
|
|
- self.handler_tasks[call_id].cancel()
|
|
|
+ elif call_id in self.handler_tasks and resp.HasField("cancel"):
|
|
|
+ self.handler_tasks[call_id].cancel()
|
|
|
|
|
|
async def _write_to_persistent_conn(self, writer: asyncio.StreamWriter):
|
|
|
with closing(writer):
|
|
|
while True:
|
|
|
msg = await self.pending_messages.get()
|
|
|
+ print("writing to persicon:", msg)
|
|
|
await write_pbmsg(writer, msg)
|
|
|
|
|
|
async def _handle_persistent_request(self, call_id: uuid.UUID, request: p2pd_pb.CallUnaryRequest):
|
|
@@ -150,7 +155,7 @@ class ControlClient:
|
|
|
try:
|
|
|
remote_id = PeerID(request.peer)
|
|
|
response_payload: bytes = await self.unary_handlers[request.proto](request.data, remote_id)
|
|
|
- response = p2pd_pb.CallUnaryResponse(result=response_payload)
|
|
|
+ response = p2pd_pb.CallUnaryResponse(response=response_payload)
|
|
|
|
|
|
except Exception as e:
|
|
|
response = p2pd_pb.CallUnaryResponse(error=repr(e).encode())
|