فهرست منبع

Merge branch 'mpfuture-again' into test-again

justheuristic 4 سال پیش
والد
کامیت
aa1dfa4841
1فایلهای تغییر یافته به همراه6 افزوده شده و 7 حذف شده
  1. 6 7
      hivemind/utils/mpfuture.py

+ 6 - 7
hivemind/utils/mpfuture.py

@@ -53,7 +53,6 @@ class MPFuture(base.Future, Generic[ResultType]):
     :param use_lock: if True, operations with MPFuture use a global lock to prevent concurrent writes to the same pipe;
     :param use_lock: if True, operations with MPFuture use a global lock to prevent concurrent writes to the same pipe;
       If set to False, writing to this future ignores global lock, slightly improving performance, but making user
       If set to False, writing to this future ignores global lock, slightly improving performance, but making user
       responsible for avoiding concurrent set_result / set_exception calls to futures with the same process of origin.
       responsible for avoiding concurrent set_result / set_exception calls to futures with the same process of origin.
-    :param loop: if specified, overrides default asyncio event loop for the purpose of awaiting MPFuture
 
 
     :note: This is an internal primitive that is not guaranteed to work outside of hivemind applications.
     :note: This is an internal primitive that is not guaranteed to work outside of hivemind applications.
      More specifically, there are two known limitations:
      More specifically, there are two known limitations:
@@ -64,7 +63,7 @@ class MPFuture(base.Future, Generic[ResultType]):
 
 
     _initialization_lock = mp.Lock()  # global lock that prevents simultaneous initialization of two processes
     _initialization_lock = mp.Lock()  # global lock that prevents simultaneous initialization of two processes
     _update_lock = mp.Lock()  # global lock that prevents simultaneous writing of results/exceptions through same pipe
     _update_lock = mp.Lock()  # global lock that prevents simultaneous writing of results/exceptions through same pipe
-    _status_lock = mp.Lock()  # global lock that prevents simultaneous sening of status updates through same pipe
+    _status_lock = mp.Lock()  # global lock that prevents simultaneous sending of status updates through same pipe
     _process_inner_pipe: Optional[PipeEnd] = None  # a pipe that is used to read results and send status updates
     _process_inner_pipe: Optional[PipeEnd] = None  # a pipe that is used to read results and send status updates
     _process_outer_pipe: Optional[PipeEnd] = None  # a pipe that is used to send results and receive status updates
     _process_outer_pipe: Optional[PipeEnd] = None  # a pipe that is used to send results and receive status updates
     _pipe_waiter_thread: Optional[threading.Thread] = None  # process-specific thread that receives results/exceptions
     _pipe_waiter_thread: Optional[threading.Thread] = None  # process-specific thread that receives results/exceptions
@@ -146,16 +145,16 @@ class MPFuture(base.Future, Generic[ResultType]):
     def _process_updates_in_background(cls):
     def _process_updates_in_background(cls):
         pid = os.getpid()
         pid = os.getpid()
         with DefaultSelector() as selector:
         with DefaultSelector() as selector:
-            selector.register(cls._process_inner_pipe, EVENT_READ, data=cls._process_inner_pipe)
-            selector.register(cls._process_outer_pipe, EVENT_READ, data=cls._process_outer_pipe)
+            selector.register(cls._process_inner_pipe, EVENT_READ)
+            selector.register(cls._process_outer_pipe, EVENT_READ)
 
 
             while True:
             while True:
                 try:
                 try:
                     if cls._pipe_waiter_thread is not threading.current_thread():
                     if cls._pipe_waiter_thread is not threading.current_thread():
                         break  # Backend was reset, a new background thread has started
                         break  # Backend was reset, a new background thread has started
 
 
-                    pipe = next((key.data for (key, events) in selector.select()))
-                    uid, msg_type, payload = pipe.recv()
+                    (key, events), *_ = selector.select()
+                    uid, msg_type, payload = key.fileobj.recv()
                     future = None
                     future = None
                     future_ref = cls._active_futures.get(uid)
                     future_ref = cls._active_futures.get(uid)
                     if future_ref is not None:
                     if future_ref is not None:
@@ -229,7 +228,7 @@ class MPFuture(base.Future, Generic[ResultType]):
         # otherwise create a new request for synchronization
         # otherwise create a new request for synchronization
 
 
         try:
         try:
-            with MPFuture._status_lock if self._use_lock else nullcontext():
+            with MPFuture._update_lock if self._use_lock else nullcontext():
                 payload = (self._use_lock, self._process_inner_pipe)
                 payload = (self._use_lock, self._process_inner_pipe)
                 self._pipe_to_origin.send((self._uid, MessageType.STATE_REQUEST, payload))
                 self._pipe_to_origin.send((self._uid, MessageType.STATE_REQUEST, payload))
             status_updated.wait(MPFuture.SOFT_UPDATE_TIMEOUT)
             status_updated.wait(MPFuture.SOFT_UPDATE_TIMEOUT)