justheuristic 3 年之前
父節點
當前提交
b5a9a17469
共有 1 個文件被更改,包括 30 次插入10 次删除
  1. 30 10
      src/client/routing/sequence_manager.py

+ 30 - 10
src/client/routing/sequence_manager.py

@@ -22,7 +22,7 @@ logger = get_logger(__file__)
 
 
 class RemoteSequenceManager(threading.Thread):
 class RemoteSequenceManager(threading.Thread):
     """
     """
-    Sequence manager is a thread that keeps track of information on remote servers that constitute a RemoteSequential.
+    Sequence manager is a thread that keeps track of remote servers that hold the specified sequence of blocks.
     TL;DR it tells you, which peers you should ask to get a specific layer. It is used in RemoteSequential.
     TL;DR it tells you, which peers you should ask to get a specific layer. It is used in RemoteSequential.
 
 
     When created, RemoteSequenceManager looks up which servers serve necessary layers by reading from DHT.
     When created, RemoteSequenceManager looks up which servers serve necessary layers by reading from DHT.
@@ -60,26 +60,38 @@ class RemoteSequenceManager(threading.Thread):
         super().__init__(daemon=True)
         super().__init__(daemon=True)
         self.dht, self.p2p = dht, (p2p if p2p is not None else dht.replicate_p2p())
         self.dht, self.p2p = dht, (p2p if p2p is not None else dht.replicate_p2p())
         self.sequence_info = RemoteSequenceInfo.make_empty(block_uids)  # to be updated in a background thread
         self.sequence_info = RemoteSequenceInfo.make_empty(block_uids)  # to be updated in a background thread
+        self.update_period = update_period
 
 
         if routing_strategies is None:
         if routing_strategies is None:
             routing_strategies = {key: Strategy(self.sequence_info) for key, Strategy in ALL_ROUTING_STRATEGIES.items()}
             routing_strategies = {key: Strategy(self.sequence_info) for key, Strategy in ALL_ROUTING_STRATEGIES.items()}
         self.routing_strategies = routing_strategies
         self.routing_strategies = routing_strategies
-        self.last_update_time: DHTExpiration = -float("inf")
-        self.update_period = update_period
 
 
-        self._rpc_info = None  # TODO move to RemoteSequenceInfo
-        self._lock_changes = threading.Lock()  # TODO move to RemoteSequenceInfo
+        self._rpc_info = None
+        self._should_shutdown = False
+
         self.ready = threading.Event()  # whether or not you are ready to make_sequence
         self.ready = threading.Event()  # whether or not you are ready to make_sequence
-        self.update_()  # TODO replace with background thread and await ready
+        self.lock_changes = threading.Lock()  # internal lock on sequence_info and strategies
+        self.update_trigger = threading.Event()
 
 
         if start:
         if start:
             self.run_in_background()
             self.run_in_background()
 
 
     def run(self) -> None:
     def run(self) -> None:
         self.ready.set()
         self.ready.set()
-        threading.Event().wait()
 
 
-        # TODO
+        while not self._should_shutdown:
+            self.update_trigger.wait(self.update_period)
+            if self._should_shutdown:
+                logger.debug(f"{self.__class__.__name__} is shutting down")
+                break
+
+            try:
+                self.update_()
+            except Exception as e:
+                logger.exception(e)
+            self.update_trigger.clear()
+
+        logger.info(f"{self.__class__.__name__} thread exited")
 
 
     def make_sequence(
     def make_sequence(
         self,
         self,
@@ -114,7 +126,7 @@ class RemoteSequenceManager(threading.Thread):
             ix = slice(int(ix), int(ix) + 1, 1)
             ix = slice(int(ix), int(ix) + 1, 1)
 
 
         self.ready.wait()
         self.ready.wait()
-        with self._lock_changes:
+        with self.sequence_info.lock_changes:
             subseq = RemoteSequenceManager(
             subseq = RemoteSequenceManager(
                 self.dht,
                 self.dht,
                 self.block_uids[ix],
                 self.block_uids[ix],
@@ -130,7 +142,7 @@ class RemoteSequenceManager(threading.Thread):
         return subseq
         return subseq
 
 
     def update_(self):
     def update_(self):
-        with self._lock_changes:
+        with self.sequence_info.lock_changes:
             self.sequence_info.update_(self.dht)
             self.sequence_info.update_(self.dht)
             for name, strategy in self.routing_strategies.items():
             for name, strategy in self.routing_strategies.items():
                 strategy.update_()
                 strategy.update_()
@@ -184,3 +196,11 @@ class RemoteSequenceManager(threading.Thread):
             " chained forward/backward. If you have questions about the roadmap, please ping yozh@ ."
             " chained forward/backward. If you have questions about the roadmap, please ping yozh@ ."
         )
         )
         return 3
         return 3
+
+    def shutdown(self):
+        self._should_shutdown = True
+        self.update_trigger.set()
+
+    def __del__(self):
+        if self.is_alive():
+            self.shutdown()