浏览代码

Implement DHT.p2p property

Aleksandr Borzunov 4 年之前
父节点
当前提交
e907eb25a2
共有 1 个文件被更改,包括 19 次插入1 次删除
  1. 19 1
      hivemind/dht/__init__.py

+ 19 - 1
hivemind/dht/__init__.py

@@ -23,7 +23,8 @@ from typing import Awaitable, Callable, Iterable, List, Optional, Sequence, Type
 
 
 from multiaddr import Multiaddr
 from multiaddr import Multiaddr
 
 
-from hivemind.dht.node import DHTID, DHTNode
+from hivemind.dht.node import DHTNode
+from hivemind.p2p import P2P
 from hivemind.dht.routing import DHTKey, DHTValue, Subkey
 from hivemind.dht.routing import DHTKey, DHTValue, Subkey
 from hivemind.dht.validation import CompositeValidator, RecordValidatorBase
 from hivemind.dht.validation import CompositeValidator, RecordValidatorBase
 from hivemind.utils import DHTExpiration, MPFuture, ValueWithExpiration, await_cancelled, get_logger, switch_to_uvloop
 from hivemind.utils import DHTExpiration, MPFuture, ValueWithExpiration, await_cancelled, get_logger, switch_to_uvloop
@@ -85,6 +86,7 @@ class DHT(mp.Process):
         self.shutdown_timeout = shutdown_timeout
         self.shutdown_timeout = shutdown_timeout
         self.ready = mp.Event()
         self.ready = mp.Event()
         self.daemon = daemon
         self.daemon = daemon
+        self._p2p_replica = None
         if start:
         if start:
             self.run_in_background(await_ready=True)
             self.run_in_background(await_ready=True)
 
 
@@ -266,6 +268,22 @@ class DHT(mp.Process):
     async def _get_visible_maddrs(self, node: DHTNode, latest: bool = False) -> List[Multiaddr]:
     async def _get_visible_maddrs(self, node: DHTNode, latest: bool = False) -> List[Multiaddr]:
         return await node.get_visible_maddrs(latest=latest)
         return await node.get_visible_maddrs(latest=latest)
 
 
+    @property
+    def p2p(self) -> P2P:
+        """
+        Get a replica of a P2P instance used in the DHT process internally.
+        """
+
+        if self._p2p_replica is not None:
+            return self._p2p_replica
+
+        daemon_listen_maddr = self.run_coroutine(DHT._get_p2p_daemon_listen_maddr)
+        self._p2p_replica = P2P.replicate(daemon_listen_maddr)
+        return self._p2p_replica
+
+    async def _get_p2p_daemon_listen_maddr(self, node: DHTNode) -> Multiaddr:
+        return node.p2p.daemon_listen_maddr
+
     def __del__(self):
     def __del__(self):
         if self._parent_pid == os.getpid() and self.is_alive():
         if self._parent_pid == os.getpid() and self.is_alive():
             self.shutdown()
             self.shutdown()