|
@@ -305,18 +305,12 @@ class DecentralizedAverager(mp.Process, ServicerBase):
|
|
|
self._ready.set_result(None)
|
|
|
|
|
|
while True:
|
|
|
- try:
|
|
|
- await asyncio.wait_for(pipe_semaphore.acquire(), timeout=self.request_timeout)
|
|
|
- except asyncio.TimeoutError:
|
|
|
- pass
|
|
|
- if not self._inner_pipe.poll():
|
|
|
- continue
|
|
|
+ await pipe_semaphore.acquire()
|
|
|
try:
|
|
|
method, args, kwargs = self._inner_pipe.recv()
|
|
|
except (OSError, ConnectionError, RuntimeError) as e:
|
|
|
logger.exception(e)
|
|
|
- await asyncio.sleep(self.request_timeout)
|
|
|
- continue
|
|
|
+ break
|
|
|
task = asyncio.create_task(getattr(self, method)(*args, **kwargs))
|
|
|
if method == "_shutdown":
|
|
|
await task
|