|
@@ -5,7 +5,7 @@ Author: Kevin Mai-Husan Chia
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
-from contextlib import asynccontextmanager, closing
|
|
|
+from contextlib import asynccontextmanager, closing, suppress
|
|
|
from typing import AsyncIterator, Awaitable, Callable, Dict, Iterable, Optional, Sequence, Tuple
|
|
|
from uuid import UUID, uuid4
|
|
|
|
|
@@ -109,6 +109,15 @@ class ControlClient:
|
|
|
|
|
|
return control
|
|
|
|
|
|
+ def close(self) -> None:
|
|
|
+ if self._read_task is not None:
|
|
|
+ self._read_task.cancel()
|
|
|
+ if self._write_task is not None:
|
|
|
+ self._write_task.cancel()
|
|
|
+
|
|
|
+ def __del__(self):
|
|
|
+ self.close()
|
|
|
+
|
|
|
async def _handler(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
|
pb_stream_info = p2pd_pb.StreamInfo() # type: ignore
|
|
|
await read_pbmsg_safe(reader, pb_stream_info)
|
|
@@ -134,14 +143,8 @@ class ControlClient:
|
|
|
else:
|
|
|
raise ValueError(f"Protocol not supported: {protocols.protocol_with_code(proto_code)}")
|
|
|
|
|
|
- try:
|
|
|
- async with server:
|
|
|
- yield self
|
|
|
- finally:
|
|
|
- if self._read_task is not None:
|
|
|
- self._read_task.cancel()
|
|
|
- if self._write_task is not None:
|
|
|
- self._write_task.cancel()
|
|
|
+ async with server:
|
|
|
+ yield self
|
|
|
|
|
|
async def _read_from_persistent_conn(self, reader: asyncio.StreamReader):
|
|
|
while True:
|
|
@@ -172,12 +175,7 @@ class ControlClient:
|
|
|
async def _write_to_persistent_conn(self, writer: asyncio.StreamWriter):
|
|
|
with closing(writer):
|
|
|
while True:
|
|
|
- try:
|
|
|
- msg = await self._pending_messages.get()
|
|
|
- except ValueError:
|
|
|
- # Ignore "ValueError: I/O operation on closed file" because it is normal behavior
|
|
|
- # if the connection is closed by the other party
|
|
|
- break
|
|
|
+ msg = await self._pending_messages.get()
|
|
|
await write_pbmsg(writer, msg)
|
|
|
|
|
|
async def _handle_persistent_request(self, call_id: UUID, request: p2pd_pb.CallUnaryRequest):
|