瀏覽代碼

implement request handling

Denis Mazur 4 年之前
父節點
當前提交
914fa51926
共有 1 個文件被更改,包括 5 次插入5 次删除
  1. 5 5
      hivemind/p2p/p2p_daemon_bindings/control.py

+ 5 - 5
hivemind/p2p/p2p_daemon_bindings/control.py

@@ -87,7 +87,7 @@ class ControlClient:
         self.pending_messages: asyncio.Queue[p2pd_pb.Request] = asyncio.Queue()
         self.pending_messages: asyncio.Queue[p2pd_pb.Request] = asyncio.Queue()
         self.pending_calls: Dict[CallID, asyncio.Future] = {}
         self.pending_calls: Dict[CallID, asyncio.Future] = {}
 
 
-    async def read_from_persistent_conn(self, reader: asyncio.StreamReader):
+    async def _read_from_persistent_conn(self, reader: asyncio.StreamReader):
         while True:
         while True:
             resp: p2pd_pb.Response = p2pd_pb.Response()  # type: ignore
             resp: p2pd_pb.Response = p2pd_pb.Response()  # type: ignore
             await read_pbmsg_safe(reader, resp)
             await read_pbmsg_safe(reader, resp)
@@ -104,7 +104,7 @@ class ControlClient:
                     logger.debug(f"received unexpected unary call")
                     logger.debug(f"received unexpected unary call")
 
 
             elif resp.requestHandling:
             elif resp.requestHandling:
-                # asyncio.create_task(self.read)
+                asyncio.create_task(self._handle_persistent_request(resp.requestHandling))
                 pass
                 pass
 
 
     async def _handle_persistent_request(self, request):
     async def _handle_persistent_request(self, request):
@@ -123,7 +123,7 @@ class ControlClient:
         await self.pending_messages.put(
         await self.pending_messages.put(
             p2pd_pb.Request(type=p2pd_pb.Request.UNARY_RESPONSE, response=response))
             p2pd_pb.Request(type=p2pd_pb.Request.UNARY_RESPONSE, response=response))
 
 
-    async def write_to_persistent_conn(self, writer: asyncio.StreamWriter):
+    async def _write_to_persistent_conn(self, writer: asyncio.StreamWriter):
         while True:
         while True:
             msg = await self.pending_messages.get()
             msg = await self.pending_messages.get()
             await write_pbmsg(writer, msg)
             await write_pbmsg(writer, msg)
@@ -159,8 +159,8 @@ class ControlClient:
     async def _ensure_persistent_conn(self):
     async def _ensure_persistent_conn(self):
         if not self._pers_conn_open:
         if not self._pers_conn_open:
             reader, writer = await self.daemon_connector.open_persistent_connection()
             reader, writer = await self.daemon_connector.open_persistent_connection()
-            asyncio.create_task(self.read_from_persistent_conn(reader))
-            asyncio.create_task(self.write_to_persistent_conn(writer))
+            asyncio.create_task(self._read_from_persistent_conn(reader))
+            asyncio.create_task(self._write_to_persistent_conn(writer))
 
 
     async def add_unary_handler(self, proto: str, handler: TUnaryHandler):
     async def add_unary_handler(self, proto: str, handler: TUnaryHandler):
         await self._ensure_persistent_conn()
         await self._ensure_persistent_conn()