Browse Source

use separate reader in queue

Alexander Borzunov 4 năm trước cách đây
mục cha
commit
7bd26fae7d
1 tập tin đã thay đổi với 33 bổ sung8 xóa
  1. 33 8
      hivemind/utils/mpfuture.py

+ 33 - 8
hivemind/utils/mpfuture.py

@@ -7,6 +7,7 @@ import multiprocessing as mp
 import os
 import os
 import threading
 import threading
 import uuid
 import uuid
+from queue import SimpleQueue
 from weakref import ref
 from weakref import ref
 from enum import Enum, auto
 from enum import Enum, auto
 from typing import Generic, TypeVar, Dict, Optional, Any, Callable, Type, Tuple
 from typing import Generic, TypeVar, Dict, Optional, Any, Callable, Type, Tuple
@@ -64,7 +65,9 @@ class MPFuture(base.Future, Generic[ResultType]):
     _initialization_lock = mp.Lock()  # global lock that prevents simultaneous initialization of two processes
     _initialization_lock = mp.Lock()  # global lock that prevents simultaneous initialization of two processes
     _update_lock = mp.Lock()  # global lock that prevents simultaneous writing to the same pipe
     _update_lock = mp.Lock()  # global lock that prevents simultaneous writing to the same pipe
     _process_wide_pipe: Optional[PipeEnd] = None  # a pipe that is used to send results/exceptions to this process
     _process_wide_pipe: Optional[PipeEnd] = None  # a pipe that is used to send results/exceptions to this process
-    _pipe_waiter_thread: Optional[threading.Thread] = None  # process-specific thread that receives results/exceptions
+    _pending_updates: Optional[SimpleQueue] = None  # a queue of updates to be processed by background thread
+    _update_reading_thread: Optional[threading.Thread] = None  # process-specific thread that reads updates from pipe
+    _update_processing_thread: Optional[threading.Thread] = None  # process-specific thread that processes updates
     _active_futures: Optional[Dict[UID, Type[ref][MPFuture]]] = None  # non-done futures originated from this process
     _active_futures: Optional[Dict[UID, Type[ref][MPFuture]]] = None  # non-done futures originated from this process
     _status_requests: Optional[Dict[UID, Tuple[MPFuture, threading.Event]]] = None  # futures to be updated by origin
     _status_requests: Optional[Dict[UID, Tuple[MPFuture, threading.Event]]] = None  # futures to be updated by origin
     _active_pid: Optional[PID] = None  # pid of currently active process; used to handle forks natively
     _active_pid: Optional[PID] = None  # pid of currently active process; used to handle forks natively
@@ -118,13 +121,21 @@ class MPFuture(base.Future, Generic[ResultType]):
                     logger.debug(f"Initializing MPFuture backend for pid {pid}")
                     logger.debug(f"Initializing MPFuture backend for pid {pid}")
                     receiver_pipe, cls._process_wide_pipe = mp.Pipe(duplex=False)
                     receiver_pipe, cls._process_wide_pipe = mp.Pipe(duplex=False)
                     cls._active_pid, cls._active_futures, cls._status_requests = pid, {}, {}
                     cls._active_pid, cls._active_futures, cls._status_requests = pid, {}, {}
-                    cls._pipe_waiter_thread = threading.Thread(
-                        target=cls._process_updates_in_background,
+                    cls._pending_updates = SimpleQueue()
+
+                    cls._update_reading_thread = threading.Thread(
+                        target=cls._read_updates_in_background,
                         args=[receiver_pipe],
                         args=[receiver_pipe],
-                        name=f"{__name__}.BACKEND",
+                        name=f"{__name__}.READER",
+                        daemon=True,
+                    )
+                    cls._update_reading_thread.start()
+                    cls._update_processing_thread = threading.Thread(
+                        target=cls._process_updates_in_background,
+                        name=f"{__name__}.PROCESSOR",
                         daemon=True,
                         daemon=True,
                     )
                     )
-                    cls._pipe_waiter_thread.start()
+                    cls._update_processing_thread.start()
 
 
     @classmethod
     @classmethod
     def reset_backend(cls):
     def reset_backend(cls):
@@ -140,14 +151,28 @@ class MPFuture(base.Future, Generic[ResultType]):
         cls._active_pid = None
         cls._active_pid = None
 
 
     @classmethod
     @classmethod
-    def _process_updates_in_background(cls, receiver_pipe: mp.connection.Connection):
+    def _read_updates_in_background(cls, receiver_pipe: mp.connection.Connection):
+        pid = os.getpid()
+        while True:
+            if cls._update_reading_thread is not threading.current_thread():
+                break  # Backend was reset, a new background thread has started
+
+            try:
+                cls._pending_updates.put(receiver_pipe.recv())
+            except (BrokenPipeError, EOFError, ConnectionError):
+                logger.debug(f"Update pipe was was shut down unexpectedly (pid={pid})")
+            except Exception as e:
+                logger.exception(f"Could not retrieve update: caught {repr(e)} (pid={pid})")
+
+    @classmethod
+    def _process_updates_in_background(cls):
         pid = os.getpid()
         pid = os.getpid()
         while True:
         while True:
             try:
             try:
-                if cls._pipe_waiter_thread is not threading.current_thread():
+                if cls._update_processing_thread is not threading.current_thread():
                     break  # Backend was reset, a new background thread has started
                     break  # Backend was reset, a new background thread has started
 
 
-                uid, msg_type, payload = receiver_pipe.recv()
+                uid, msg_type, payload = cls._pending_updates.get()
                 future = None
                 future = None
                 future_ref = cls._active_futures.get(uid)
                 future_ref = cls._active_futures.get(uid)
                 if future_ref is not None:
                 if future_ref is not None: