|
@@ -84,6 +84,7 @@ class ControlClient:
|
|
|
self._pers_conn_open: bool = False
|
|
|
self.unary_handlers: Dict[str, TUnaryHandler] = {}
|
|
|
|
|
|
+ self._ensure_conn_lock = asyncio.Lock()
|
|
|
self.pending_messages: asyncio.Queue[p2pd_pb.Request] = asyncio.Queue()
|
|
|
self.pending_calls: Dict[CallID, asyncio.Future] = {}
|
|
|
|
|
@@ -156,10 +157,12 @@ class ControlClient:
|
|
|
|
|
|
async def _ensure_persistent_conn(self):
|
|
|
if not self._pers_conn_open:
|
|
|
- reader, writer = await self.daemon_connector.open_persistent_connection()
|
|
|
- asyncio.create_task(self._read_from_persistent_conn(reader))
|
|
|
- asyncio.create_task(self._write_to_persistent_conn(writer))
|
|
|
- self._pers_conn_open = True # TODO FIXME
|
|
|
+ with self._ensure_conn_lock:
|
|
|
+ if not self._pers_conn_open:
|
|
|
+ reader, writer = await self.daemon_connector.open_persistent_connection()
|
|
|
+ asyncio.create_task(self._read_from_persistent_conn(reader))
|
|
|
+ asyncio.create_task(self._write_to_persistent_conn(writer))
|
|
|
+ self._pers_conn_open = True
|
|
|
|
|
|
async def add_unary_handler(self, proto: str, handler: TUnaryHandler):
|
|
|
await self._ensure_persistent_conn()
|