node.py 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934
  1. from __future__ import annotations
  2. import asyncio
  3. import dataclasses
  4. import os
  5. import random
  6. from collections import Counter, defaultdict
  7. from dataclasses import dataclass, field
  8. from functools import partial
  9. from typing import (
  10. Any,
  11. Awaitable,
  12. Callable,
  13. Collection,
  14. DefaultDict,
  15. Dict,
  16. List,
  17. Optional,
  18. Sequence,
  19. Set,
  20. Tuple,
  21. Type,
  22. Union,
  23. )
  24. from multiaddr import Multiaddr
  25. from sortedcontainers import SortedSet
  26. from hivemind.dht.crypto import DHTRecord, RecordValidatorBase
  27. from hivemind.dht.protocol import DHTProtocol
  28. from hivemind.dht.routing import DHTID, BinaryDHTValue, DHTKey, DHTValue, Subkey, get_dht_time
  29. from hivemind.dht.storage import DictionaryDHTValue
  30. from hivemind.dht.traverse import traverse_dht
  31. from hivemind.p2p import P2P, PeerID
  32. from hivemind.utils import MSGPackSerializer, SerializerBase, get_logger
  33. from hivemind.utils.auth import AuthorizerBase
  34. from hivemind.utils.timed_storage import DHTExpiration, TimedStorage, ValueWithExpiration
  35. logger = get_logger(__name__)
  36. DEFAULT_NUM_WORKERS = int(os.getenv("HIVEMIND_DHT_NUM_WORKERS", 4))
  37. class DHTNode:
  38. """
  39. Asyncio-based class that represents one DHT participant. Created via await DHTNode.create(...)
  40. Each DHTNode has an identifier, a local storage and access too other nodes via DHTProtocol.
  41. :note: Hivemind DHT is optimized to store a lot of temporary metadata that is regularly updated.
  42. For example, expert heartbeat emitted by a hivemind.moe.Server responsible for that expert.
  43. Such metadata does not require regular maintenance by peers or persistence on shutdown.
  44. Instead, DHTNode is designed to rapidly send bulk data and resolve conflicts.
  45. Every (key, value) pair in this DHT has an expiration time - float computed as get_dht_time() (UnixTime by default)
  46. DHT nodes always prefer values with higher expiration time and may delete any value past its expiration.
  47. Similar to Kademlia RPC protocol, hivemind DHT has 3 RPCs:
  48. * ping - request peer's identifier and update routing table (same as Kademlia PING RPC)
  49. * store - send several (key, value, expiration_time) pairs to the same peer (like Kademlia STORE, but in bulk)
  50. * find - request one or several keys, get values and expiration (if peer finds it locally) and :bucket_size: of
  51. nearest peers from recipient's routing table (ordered nearest-to-farthest, not including recipient itself)
  52. This RPC is a mixture between Kademlia FIND_NODE and FIND_VALUE with multiple keys per call.
  53. A DHTNode follows the following contract:
  54. - when asked to get(key), a node must find and return a value with highest expiration time that it found across DHT
  55. IF that time has not come yet. if expiration time is smaller than current get_dht_time(), node may return None;
  56. - when requested to store(key: value, expiration_time), a node must store (key => value) at until expiration time
  57. or until DHTNode gets the same key with greater expiration time. If a node is asked to store a key but it already
  58. has the same key with newer expiration, store will be rejected. Store returns True if accepted, False if rejected;
  59. - when requested to store(key: value, expiration_time, subkey=subkey), adds a sub-key to a dictionary value type.
  60. Dictionary values can have multiple sub-keys stored by different peers with individual expiration times. A subkey
  61. will be accepted to a dictionary either if there is no such sub-key or if new subkey's expiration is later than
  62. previous expiration under that subkey. See DHTProtocol.call_store for details.
  63. DHTNode also features several (optional) caching policies:
  64. - cache_locally: after GET, store the result in node's own local cache
  65. - cache_nearest: after GET, send the result to this many nearest nodes that don't have that value yet (see Kademlia)
  66. - cache_on_store: after STORE, either save or remove that key from node's own cache depending on store status
  67. - cache_refresh_before_expiry: if a value in cache was used and is about to expire, try to GET it this many seconds
  68. before expiration. The motivation here is that some frequent keys should be always kept in cache to avoid latency.
  69. - reuse_get_requests: if there are several concurrent GET requests, when one request finishes, DHTNode will attempt
  70. to reuse the result of this GET request for other requests with the same key. Useful for batch-parallel requests.
  71. """
  72. # fmt:off
  73. node_id: DHTID; is_alive: bool; peer_id: PeerID; num_replicas: int; num_workers: int; protocol: DHTProtocol
  74. chunk_size: int; refresh_timeout: float; cache_locally: bool; cache_nearest: int; cache_refresh_before_expiry: float
  75. cache_on_store: bool; reuse_get_requests: bool; pending_get_requests: DefaultDict[DHTID, SortedSet[_SearchState]]
  76. cache_refresh_task: Optional[asyncio.Task]; cache_refresh_evt: asyncio.Event; cache_refresh_queue: CacheRefreshQueue
  77. blacklist: Blacklist
  78. # fmt:on
  79. @classmethod
  80. async def create(
  81. cls,
  82. p2p: Optional[P2P] = None,
  83. node_id: Optional[DHTID] = None,
  84. initial_peers: Optional[Sequence[Union[Multiaddr, str]]] = None,
  85. bucket_size: int = 20,
  86. num_replicas: int = 5,
  87. depth_modulo: int = 5,
  88. parallel_rpc: int = None,
  89. wait_timeout: float = 3,
  90. refresh_timeout: Optional[float] = None,
  91. bootstrap_timeout: Optional[float] = None,
  92. cache_locally: bool = True,
  93. cache_nearest: int = 1,
  94. cache_size=None,
  95. cache_refresh_before_expiry: float = 5,
  96. cache_on_store: bool = True,
  97. reuse_get_requests: bool = True,
  98. num_workers: int = DEFAULT_NUM_WORKERS,
  99. chunk_size: int = 16,
  100. blacklist_time: float = 5.0,
  101. backoff_rate: float = 2.0,
  102. client_mode: bool = False,
  103. record_validator: Optional[RecordValidatorBase] = None,
  104. authorizer: Optional[AuthorizerBase] = None,
  105. ensure_bootstrap_success: bool = True,
  106. strict: bool = True,
  107. **kwargs,
  108. ) -> DHTNode:
  109. """
  110. :param p2p: instance of hivemind.p2p.P2P that will be used for communication.
  111. If None, DHTNode will create and manage its own P2P instance with given initial_peers and
  112. parameters from ``kwargs``
  113. :param node_id: current node's DHTID for hivemind.dht, determines which keys it will store locally,
  114. defaults to random id
  115. :param initial_peers: multiaddrs of one or more active DHT peers (if you want to join an existing DHT)
  116. :param bucket_size: max number of nodes in one k-bucket (k). Trying to add {k+1}st node will cause a bucket to
  117. either split in two buckets along the midpoint or reject the new node (but still save it as a replacement)
  118. Recommended value: k is chosen s.t. any given k nodes are very unlikely to all fail after staleness_timeout
  119. :param num_replicas: number of nearest nodes that will be asked to store a given key, default = bucket_size (≈k)
  120. :param depth_modulo: split full k-bucket if it contains root OR up to the nearest multiple of this value (≈b)
  121. :param parallel_rpc: maximum number of concurrent outgoing RPC requests emitted by DHTProtocol
  122. Reduce this value if your RPC requests register no response despite the peer sending the response.
  123. :param wait_timeout: a kademlia rpc request is deemed lost if we did not receive a reply in this many seconds
  124. :param refresh_timeout: refresh buckets if no node from that bucket was updated in this many seconds
  125. if staleness_timeout is None, DHTNode will not refresh stale buckets (which is usually okay)
  126. :param bootstrap_timeout: after one of peers responds, await other peers for at most this many seconds
  127. :param cache_locally: if True, caches all values (stored or found) in a node-local cache
  128. :param cache_on_store: if True, update cache entries for a key after storing a new item for that key
  129. :param cache_nearest: whenever DHTNode finds a value, it will also store (cache) this value on this many
  130. nodes nearest nodes visited by search algorithm. Prefers nodes that are nearest to :key: but have no value yet
  131. :param cache_size: if specified, local cache will store up to this many records (as in LRU cache)
  132. :param cache_refresh_before_expiry: if nonzero, refreshes locally cached values
  133. if they are accessed this many seconds before expiration time.
  134. :param reuse_get_requests: if True, DHTNode allows only one traverse_dht procedure for every key
  135. all concurrent get requests for the same key will reuse the procedure that is currently in progress
  136. :param num_workers: concurrent workers in traverse_dht (see traverse_dht num_workers param)
  137. :param chunk_size: maximum number of concurrent calls in get_many and cache refresh queue
  138. :param blacklist_time: excludes non-responsive peers from search for this many seconds (set 0 to disable)
  139. :param backoff_rate: blacklist time will be multiplied by :backoff_rate: for each successive non-response
  140. :param ensure_bootstrap_success: raise an error if node could not connect to initial peers (or vice versa)
  141. If False, print a warning instead. It is recommended to keep this flag unless you know what you're doing.
  142. :param strict: if True, any error encountered in validation will interrupt the creation of DHTNode
  143. :param client_mode: if False (default), this node will accept incoming requests as a full DHT "citizen"
  144. if True, this node will refuse any incoming requests, effectively being only a client
  145. :param record_validator: instance of RecordValidatorBase used for signing and validating stored records
  146. :param authorizer: instance of AuthorizerBase used for signing and validating requests and response
  147. for a given authorization protocol
  148. :param kwargs: extra parameters for an internally created instance of hivemind.p2p.P2P.
  149. Should be empty if the P2P instance is provided in the constructor
  150. """
  151. self = cls(_initialized_with_create=True)
  152. self.node_id = node_id if node_id is not None else DHTID.generate()
  153. self.num_replicas, self.num_workers, self.chunk_size = num_replicas, num_workers, chunk_size
  154. self.is_alive = True # if set to False, cancels all background jobs such as routing table refresh
  155. self.reuse_get_requests = reuse_get_requests
  156. self.pending_get_requests = defaultdict(partial(SortedSet, key=lambda _res: -_res.sufficient_expiration_time))
  157. # caching policy
  158. self.refresh_timeout = refresh_timeout
  159. self.cache_locally, self.cache_nearest, self.cache_on_store = cache_locally, cache_nearest, cache_on_store
  160. self.cache_refresh_before_expiry = cache_refresh_before_expiry
  161. self.blacklist = Blacklist(blacklist_time, backoff_rate)
  162. self.cache_refresh_queue = CacheRefreshQueue()
  163. self.cache_refresh_evt = asyncio.Event()
  164. self.cache_refresh_task = None
  165. if p2p is None:
  166. if not kwargs.get("use_ipfs"):
  167. kwargs["initial_peers"] = initial_peers
  168. p2p = await P2P.create(**kwargs)
  169. self._should_shutdown_p2p = True
  170. else:
  171. if kwargs:
  172. raise ValueError(
  173. f"**kwargs in DHTNode.create() should be empty if hivemind.p2p.P2P instance is provided"
  174. f"in the constructor. Got kwargs = {kwargs} instead. "
  175. f"You may have a typo in a DHTNode.create() parameter name"
  176. )
  177. self._should_shutdown_p2p = False
  178. self.p2p = p2p
  179. self.protocol = await DHTProtocol.create(
  180. p2p,
  181. self.node_id,
  182. bucket_size,
  183. depth_modulo,
  184. num_replicas,
  185. wait_timeout,
  186. parallel_rpc,
  187. cache_size,
  188. client_mode,
  189. record_validator,
  190. authorizer,
  191. )
  192. self.peer_id = p2p.peer_id
  193. if initial_peers:
  194. initial_peers = {PeerID.from_base58(Multiaddr(item)["p2p"]) for item in initial_peers}
  195. # stage 1: ping initial_peers, add each other to the routing table
  196. bootstrap_timeout = bootstrap_timeout if bootstrap_timeout is not None else wait_timeout
  197. start_time = get_dht_time()
  198. ping_tasks = set(
  199. asyncio.create_task(self.protocol.call_ping(peer, validate=ensure_bootstrap_success, strict=strict))
  200. for peer in initial_peers
  201. )
  202. finished_pings, unfinished_pings = await asyncio.wait(ping_tasks, return_when=asyncio.FIRST_COMPLETED)
  203. # stage 2: gather remaining peers (those who respond within bootstrap_timeout)
  204. if unfinished_pings:
  205. finished_in_time, stragglers = await asyncio.wait(
  206. unfinished_pings, timeout=bootstrap_timeout - get_dht_time() + start_time
  207. )
  208. for straggler in stragglers:
  209. straggler.cancel()
  210. finished_pings |= finished_in_time
  211. if not finished_pings or all(ping.result() is None for ping in finished_pings):
  212. message = "DHTNode bootstrap failed: none of the initial_peers responded to a ping."
  213. if ensure_bootstrap_success:
  214. raise RuntimeError(f"{message} (set ensure_bootstrap_success=False to ignore)")
  215. else:
  216. logger.warning(message)
  217. if strict:
  218. for task in asyncio.as_completed(finished_pings):
  219. await task # propagate exceptions
  220. # stage 3: traverse dht to find my own nearest neighbors and populate the routing table
  221. # ... maybe receive some values that we are meant to store (see protocol.update_routing_table)
  222. # note: using asyncio.wait instead of wait_for because wait_for cancels task on timeout
  223. await asyncio.wait(
  224. [
  225. asyncio.create_task(self.find_nearest_nodes([self.node_id])),
  226. asyncio.sleep(bootstrap_timeout - get_dht_time() + start_time),
  227. ],
  228. return_when=asyncio.FIRST_COMPLETED,
  229. )
  230. if self.refresh_timeout is not None:
  231. asyncio.create_task(self._refresh_routing_table(period=self.refresh_timeout))
  232. return self
  233. def __init__(self, *, _initialized_with_create=False):
  234. """Internal init method. Please use DHTNode.create coroutine to spawn new node instances"""
  235. assert _initialized_with_create, " Please use DHTNode.create coroutine to spawn new node instances "
  236. super().__init__()
  237. async def shutdown(self):
  238. """Process existing requests, close all connections and stop the server"""
  239. self.is_alive = False
  240. if self._should_shutdown_p2p:
  241. await self.p2p.shutdown()
  242. async def find_nearest_nodes(
  243. self,
  244. queries: Collection[DHTID],
  245. k_nearest: Optional[int] = None,
  246. beam_size: Optional[int] = None,
  247. num_workers: Optional[int] = None,
  248. node_to_peer_id: Optional[Dict[DHTID, PeerID]] = None,
  249. exclude_self: bool = False,
  250. **kwargs,
  251. ) -> Dict[DHTID, Dict[DHTID, PeerID]]:
  252. """
  253. :param queries: find k nearest nodes for each of these DHTIDs
  254. :param k_nearest: return this many nearest nodes for every query (if there are enough nodes)
  255. :param beam_size: replacement for self.beam_size, see traverse_dht beam_size param
  256. :param num_workers: replacement for self.num_workers, see traverse_dht num_workers param
  257. :param node_to_peer_id: if specified, uses this dict[node_id => peer_id] as initial peers
  258. :param exclude_self: if True, nearest nodes will not contain self.node_id (default = use local peers)
  259. :param kwargs: additional params passed to traverse_dht
  260. :returns: for every query, return nearest peers ordered dict[peer DHTID -> network PeerID], nearest-first
  261. """
  262. queries = tuple(queries)
  263. k_nearest = k_nearest if k_nearest is not None else self.protocol.bucket_size
  264. num_workers = num_workers if num_workers is not None else self.num_workers
  265. beam_size = beam_size if beam_size is not None else max(self.protocol.bucket_size, k_nearest)
  266. if k_nearest > beam_size:
  267. logger.warning("Warning: beam_size is too small, beam search is not guaranteed to find enough nodes")
  268. if node_to_peer_id is None:
  269. node_to_peer_id: Dict[DHTID, PeerID] = dict()
  270. for query in queries:
  271. neighbors = self.protocol.routing_table.get_nearest_neighbors(query, beam_size, exclude=self.node_id)
  272. node_to_peer_id.update(self._filter_blacklisted(dict(neighbors)))
  273. async def get_neighbors(peer: DHTID, queries: Collection[DHTID]) -> Dict[DHTID, Tuple[Tuple[DHTID], bool]]:
  274. response = await self._call_find_with_blacklist(node_to_peer_id[peer], queries)
  275. if not response:
  276. return {query: ([], False) for query in queries}
  277. output: Dict[DHTID, Tuple[Tuple[DHTID], bool]] = {}
  278. for query, (_, peers) in response.items():
  279. node_to_peer_id.update(peers)
  280. output[query] = tuple(peers.keys()), False # False means "do not interrupt search"
  281. return output
  282. nearest_nodes_per_query, visited_nodes = await traverse_dht(
  283. queries,
  284. initial_nodes=list(node_to_peer_id),
  285. beam_size=beam_size,
  286. num_workers=num_workers,
  287. queries_per_call=int(len(queries) ** 0.5),
  288. get_neighbors=get_neighbors,
  289. visited_nodes={query: {self.node_id} for query in queries},
  290. **kwargs,
  291. )
  292. nearest_nodes_with_peer_ids = {}
  293. for query, nearest_nodes in nearest_nodes_per_query.items():
  294. if not exclude_self:
  295. nearest_nodes = sorted(nearest_nodes + [self.node_id], key=query.xor_distance)
  296. node_to_peer_id[self.node_id] = self.peer_id
  297. nearest_nodes_with_peer_ids[query] = {node: node_to_peer_id[node] for node in nearest_nodes[:k_nearest]}
  298. return nearest_nodes_with_peer_ids
  299. async def store(
  300. self, key: DHTKey, value: DHTValue, expiration_time: DHTExpiration, subkey: Optional[Subkey] = None, **kwargs
  301. ) -> bool:
  302. """
  303. Find num_replicas best nodes to store (key, value) and store it there at least until expiration time.
  304. :note: store is a simplified interface to store_many, all kwargs are be forwarded there
  305. :returns: True if store succeeds, False if it fails (due to no response or newer value)
  306. """
  307. store_ok = await self.store_many([key], [value], [expiration_time], subkeys=[subkey], **kwargs)
  308. return store_ok[(key, subkey) if subkey is not None else key]
  309. async def store_many(
  310. self,
  311. keys: List[DHTKey],
  312. values: List[DHTValue],
  313. expiration_time: Union[DHTExpiration, List[DHTExpiration]],
  314. subkeys: Optional[Union[Subkey, List[Optional[Subkey]]]] = None,
  315. exclude_self: bool = False,
  316. await_all_replicas=True,
  317. **kwargs,
  318. ) -> Dict[DHTKey, bool]:
  319. """
  320. Traverse DHT to find up :num_replicas: to best nodes to store multiple (key, value, expiration_time) pairs.
  321. :param keys: arbitrary serializable keys associated with each value
  322. :param values: serializable "payload" for each key
  323. :param expiration_time: either one expiration time for all keys or individual expiration times (see class doc)
  324. :param subkeys: an optional list of same shape as keys. If specified, this
  325. :param kwargs: any additional parameters passed to traverse_dht function (e.g. num workers)
  326. :param exclude_self: if True, never store value locally even if you are one of the nearest nodes
  327. :note: if exclude_self is True and self.cache_locally == True, value will still be __cached__ locally
  328. :param await_all_replicas: if False, this function returns after first store_ok and proceeds in background
  329. if True, the function will wait for num_replicas successful stores or running out of beam_size nodes
  330. :returns: for each key: True if store succeeds, False if it fails (due to no response or newer value)
  331. """
  332. if isinstance(expiration_time, DHTExpiration):
  333. expiration_time = [expiration_time] * len(keys)
  334. if subkeys is None:
  335. subkeys = [None] * len(keys)
  336. assert (
  337. len(keys) == len(subkeys) == len(values) == len(expiration_time)
  338. ), "Either of keys, values, subkeys or expiration timestamps have different sequence lengths."
  339. key_id_to_data: DefaultDict[DHTID, List[Tuple[DHTKey, Subkey, DHTValue, DHTExpiration]]] = defaultdict(list)
  340. for key, subkey, value, expiration in zip(keys, subkeys, values, expiration_time):
  341. key_id_to_data[DHTID.generate(source=key)].append((key, subkey, value, expiration))
  342. unfinished_key_ids = set(key_id_to_data.keys()) # use this set to ensure that each store request is finished
  343. store_ok = {(key, subkey): None for key, subkey in zip(keys, subkeys)} # outputs, updated during search
  344. store_finished_events = {(key, subkey): asyncio.Event() for key, subkey in zip(keys, subkeys)}
  345. # pre-populate node_to_peer_id
  346. node_to_peer_id: Dict[DHTID, PeerID] = dict()
  347. for key_id in unfinished_key_ids:
  348. node_to_peer_id.update(
  349. self.protocol.routing_table.get_nearest_neighbors(
  350. key_id, self.protocol.bucket_size, exclude=self.node_id
  351. )
  352. )
  353. async def on_found(key_id: DHTID, nearest_nodes: List[DHTID], visited_nodes: Set[DHTID]) -> None:
  354. """This will be called once per key when find_nearest_nodes is done for a particular node"""
  355. # note: we use callbacks instead of returned values to call store immediately without waiting for stragglers
  356. assert key_id in unfinished_key_ids, "Internal error: traverse_dht finished the same query twice"
  357. assert self.node_id not in nearest_nodes
  358. unfinished_key_ids.remove(key_id)
  359. # ensure k nodes stored the value, optionally include self.node_id as a candidate
  360. num_successful_stores = 0
  361. pending_store_tasks = set()
  362. store_candidates = sorted(
  363. nearest_nodes + ([] if exclude_self else [self.node_id]), key=key_id.xor_distance, reverse=True
  364. ) # ordered so that .pop() returns nearest
  365. [original_key, *_], current_subkeys, current_values, current_expirations = zip(*key_id_to_data[key_id])
  366. key_bytes = key_id.to_bytes()
  367. binary_values = []
  368. stored_records = []
  369. for subkey, value, expiration_time in zip(current_subkeys, current_values, current_expirations):
  370. subkey_bytes = self.protocol.serializer.dumps(subkey)
  371. value_bytes = self.protocol.serializer.dumps(value)
  372. record = DHTRecord(key_bytes, subkey_bytes, value_bytes, expiration_time)
  373. if self.protocol.record_validator is not None:
  374. value_bytes = self.protocol.record_validator.sign_value(record)
  375. record = dataclasses.replace(record, value=value_bytes)
  376. binary_values.append(value_bytes)
  377. stored_records.append(record)
  378. while num_successful_stores < self.num_replicas and (store_candidates or pending_store_tasks):
  379. while store_candidates and num_successful_stores + len(pending_store_tasks) < self.num_replicas:
  380. node_id: DHTID = store_candidates.pop() # nearest untried candidate
  381. if node_id == self.node_id:
  382. num_successful_stores += 1
  383. for subkey, record in zip(current_subkeys, stored_records):
  384. if self.protocol.record_validator is None or self.protocol.record_validator.validate(
  385. record
  386. ):
  387. store_ok[original_key, subkey] = self.protocol.storage.store(
  388. key_id, record.value, record.expiration_time, subkey=subkey
  389. )
  390. else:
  391. store_ok[original_key, subkey] = False
  392. if not await_all_replicas:
  393. store_finished_events[original_key, subkey].set()
  394. else:
  395. pending_store_tasks.add(
  396. asyncio.create_task(
  397. self.protocol.call_store(
  398. node_to_peer_id[node_id],
  399. keys=[key_id] * len(current_values),
  400. values=binary_values,
  401. expiration_time=current_expirations,
  402. subkeys=current_subkeys,
  403. )
  404. )
  405. )
  406. # await nearest task. If it fails, dispatch more on the next iteration
  407. if pending_store_tasks:
  408. finished_store_tasks, pending_store_tasks = await asyncio.wait(
  409. pending_store_tasks, return_when=asyncio.FIRST_COMPLETED
  410. )
  411. for task in finished_store_tasks:
  412. if task.result() is not None:
  413. num_successful_stores += 1
  414. for subkey, store_status in zip(current_subkeys, task.result()):
  415. store_ok[original_key, subkey] = store_status
  416. if not await_all_replicas:
  417. store_finished_events[original_key, subkey].set()
  418. if self.cache_on_store:
  419. self._update_cache_on_store(
  420. key_id,
  421. current_subkeys,
  422. binary_values,
  423. current_expirations,
  424. store_ok=[store_ok[original_key, subkey] for subkey in current_subkeys],
  425. )
  426. for subkey, value_bytes, expiration in zip(current_subkeys, binary_values, current_expirations):
  427. store_finished_events[original_key, subkey].set()
  428. store_task = asyncio.create_task(
  429. self.find_nearest_nodes(
  430. queries=set(unfinished_key_ids),
  431. k_nearest=self.num_replicas,
  432. node_to_peer_id=node_to_peer_id,
  433. found_callback=on_found,
  434. exclude_self=exclude_self,
  435. **kwargs,
  436. )
  437. )
  438. try:
  439. await asyncio.gather(store_task, *(evt.wait() for evt in store_finished_events.values()))
  440. assert len(unfinished_key_ids) == 0, "Internal error: traverse_dht didn't finish search"
  441. return {
  442. (key, subkey) if subkey is not None else key: status or False
  443. for (key, subkey), status in store_ok.items()
  444. }
  445. except asyncio.CancelledError as e:
  446. store_task.cancel()
  447. raise e
  448. def _update_cache_on_store(
  449. self,
  450. key_id: DHTID,
  451. subkeys: List[Subkey],
  452. binary_values: List[bytes],
  453. expirations: List[DHTExpiration],
  454. store_ok: List[bool],
  455. ):
  456. """Update local cache after finishing a store for one key (with perhaps several subkeys)"""
  457. store_succeeded = any(store_ok)
  458. is_dictionary = any(subkey is not None for subkey in subkeys)
  459. if store_succeeded and not is_dictionary: # stored a new regular value, cache it!
  460. stored_expiration, stored_value_bytes = max(zip(expirations, binary_values))
  461. self.protocol.cache.store(key_id, stored_value_bytes, stored_expiration)
  462. elif not store_succeeded and not is_dictionary: # store rejected, check if local cache is also obsolete
  463. rejected_expiration, rejected_value = max(zip(expirations, binary_values))
  464. cached_value = self.protocol.cache.get(key_id)
  465. if (
  466. cached_value is not None and cached_value.expiration_time <= rejected_expiration
  467. ): # cache would be rejected
  468. self._schedule_for_refresh(key_id, refresh_time=get_dht_time()) # fetch new key in background (asap)
  469. elif is_dictionary and key_id in self.protocol.cache: # there can be other keys and we should update
  470. for subkey, stored_value_bytes, expiration_time, accepted in zip(
  471. subkeys, binary_values, expirations, store_ok
  472. ):
  473. if accepted:
  474. self.protocol.cache.store_subkey(key_id, subkey, stored_value_bytes, expiration_time)
  475. self._schedule_for_refresh(key_id, refresh_time=get_dht_time()) # fetch new key in background (asap)
  476. async def get(self, key: DHTKey, latest=False, **kwargs) -> Optional[ValueWithExpiration[DHTValue]]:
  477. """
  478. Search for a key across DHT and return either first or latest entry (if found).
  479. :param key: same key as in node.store(...)
  480. :param latest: if True, finds the latest value, otherwise finds any non-expired value (which is much faster)
  481. :param kwargs: parameters forwarded to get_many_by_id
  482. :returns: (value, expiration time); if value was not found, returns None
  483. """
  484. if latest:
  485. kwargs["sufficient_expiration_time"] = float("inf")
  486. result = await self.get_many([key], **kwargs)
  487. return result[key]
  488. async def get_many(
  489. self, keys: Collection[DHTKey], sufficient_expiration_time: Optional[DHTExpiration] = None, **kwargs
  490. ) -> Dict[
  491. DHTKey, Union[Optional[ValueWithExpiration[DHTValue]], Awaitable[Optional[ValueWithExpiration[DHTValue]]]]
  492. ]:
  493. """
  494. Traverse DHT to find a list of keys. For each key, return latest (value, expiration) or None if not found.
  495. :param keys: traverse the DHT and find the value for each of these keys (or (None, None) if not key found)
  496. :param sufficient_expiration_time: if the search finds a value that expires after this time,
  497. default = time of call, find any value that did not expire by the time of call
  498. If min_expiration_time=float('inf'), this method will find a value with _latest_ expiration
  499. :param kwargs: for full list of parameters, see DHTNode.get_many_by_id
  500. :returns: for each key: value and its expiration time. If nothing is found, returns (None, None) for that key
  501. :note: in order to check if get returned a value, please check if (expiration_time is None)
  502. """
  503. keys = tuple(keys)
  504. key_ids = [DHTID.generate(key) for key in keys]
  505. id_to_original_key = dict(zip(key_ids, keys))
  506. results_by_id = await self.get_many_by_id(key_ids, sufficient_expiration_time, **kwargs)
  507. return {id_to_original_key[key]: result_or_future for key, result_or_future in results_by_id.items()}
  508. async def get_many_by_id(
  509. self,
  510. key_ids: Collection[DHTID],
  511. sufficient_expiration_time: Optional[DHTExpiration] = None,
  512. num_workers: Optional[int] = None,
  513. beam_size: Optional[int] = None,
  514. return_futures: bool = False,
  515. _is_refresh=False,
  516. ) -> Dict[
  517. DHTID, Union[Optional[ValueWithExpiration[DHTValue]], Awaitable[Optional[ValueWithExpiration[DHTValue]]]]
  518. ]:
  519. """
  520. Traverse DHT to find a list of DHTIDs. For each key, return latest (value, expiration) or None if not found.
  521. :param key_ids: traverse the DHT and find the value for each of these keys (or (None, None) if not key found)
  522. :param sufficient_expiration_time: if the search finds a value that expires after this time,
  523. default = time of call, find any value that did not expire by the time of call
  524. If min_expiration_time=float('inf'), this method will find a value with _latest_ expiration
  525. :param beam_size: maintains up to this many nearest nodes when crawling dht, default beam_size = bucket_size
  526. :param num_workers: override for default num_workers, see traverse_dht num_workers param
  527. :param return_futures: if True, immediately return asyncio.Future for every before interacting with the nework.
  528. The algorithm will populate these futures with (value, expiration) when it finds the corresponding key
  529. Note: canceling a future will stop search for the corresponding key
  530. :param _is_refresh: internal flag, set to True by an internal cache refresher (if enabled)
  531. :returns: for each key: value and its expiration time. If nothing is found, returns (None, None) for that key
  532. :note: in order to check if get returned a value, please check (expiration_time is None)
  533. """
  534. sufficient_expiration_time = sufficient_expiration_time or get_dht_time()
  535. beam_size = beam_size if beam_size is not None else self.protocol.bucket_size
  536. num_workers = num_workers if num_workers is not None else self.num_workers
  537. search_results: Dict[DHTID, _SearchState] = {
  538. key_id: _SearchState(
  539. key_id,
  540. sufficient_expiration_time,
  541. serializer=self.protocol.serializer,
  542. record_validator=self.protocol.record_validator,
  543. )
  544. for key_id in key_ids
  545. }
  546. if not _is_refresh: # if we're already refreshing cache, there's no need to trigger subsequent refreshes
  547. for key_id in key_ids:
  548. search_results[key_id].add_done_callback(self._trigger_cache_refresh)
  549. # if we have concurrent get request for some of the same keys, subscribe to their results
  550. if self.reuse_get_requests:
  551. for key_id, search_result in search_results.items():
  552. self.pending_get_requests[key_id].add(search_result)
  553. search_result.add_done_callback(self._reuse_finished_search_result)
  554. # stage 1: check for value in this node's local storage and cache
  555. for key_id in key_ids:
  556. search_results[key_id].add_candidate(self.protocol.storage.get(key_id), source_node_id=self.node_id)
  557. if not _is_refresh:
  558. search_results[key_id].add_candidate(self.protocol.cache.get(key_id), source_node_id=self.node_id)
  559. # stage 2: traverse the DHT to get the remaining keys from remote peers
  560. unfinished_key_ids = [key_id for key_id in key_ids if not search_results[key_id].finished]
  561. node_to_peer_id: Dict[DHTID, PeerID] = dict() # global routing table for all keys
  562. for key_id in unfinished_key_ids:
  563. node_to_peer_id.update(
  564. self.protocol.routing_table.get_nearest_neighbors(
  565. key_id, self.protocol.bucket_size, exclude=self.node_id
  566. )
  567. )
  568. # V-- this function will be called every time traverse_dht decides to request neighbors from a remote peer
  569. async def get_neighbors(peer: DHTID, queries: Collection[DHTID]) -> Dict[DHTID, Tuple[Tuple[DHTID], bool]]:
  570. queries = list(queries)
  571. response = await self._call_find_with_blacklist(node_to_peer_id[peer], queries)
  572. if not response:
  573. return {query: ([], False) for query in queries}
  574. output: Dict[DHTID, Tuple[Tuple[DHTID], bool]] = {}
  575. for key_id, (maybe_value_with_expiration, peers) in response.items():
  576. node_to_peer_id.update(peers)
  577. search_results[key_id].add_candidate(maybe_value_with_expiration, source_node_id=peer)
  578. output[key_id] = tuple(peers.keys()), search_results[key_id].finished
  579. # note: we interrupt search either if key is either found or finished otherwise (e.g. cancelled by user)
  580. return output
  581. # V-- this function will be called exactly once when traverse_dht finishes search for a given key
  582. async def found_callback(key_id: DHTID, nearest_nodes: List[DHTID], _visited: Set[DHTID]):
  583. search_results[key_id].finish_search() # finish search whether or we found something
  584. self._cache_new_result(search_results[key_id], nearest_nodes, node_to_peer_id, _is_refresh=_is_refresh)
  585. asyncio.create_task(
  586. traverse_dht(
  587. queries=list(unfinished_key_ids),
  588. initial_nodes=list(node_to_peer_id),
  589. beam_size=beam_size,
  590. num_workers=num_workers,
  591. queries_per_call=min(int(len(unfinished_key_ids) ** 0.5), self.chunk_size),
  592. get_neighbors=get_neighbors,
  593. visited_nodes={key_id: {self.node_id} for key_id in unfinished_key_ids},
  594. found_callback=found_callback,
  595. await_all_tasks=False,
  596. )
  597. )
  598. if return_futures:
  599. return {key_id: search_result.future for key_id, search_result in search_results.items()}
  600. else:
  601. try:
  602. # note: this should be first time when we await something, there's no need to "try" the entire function
  603. return {key_id: await search_result.future for key_id, search_result in search_results.items()}
  604. except asyncio.CancelledError as e: # terminate remaining tasks ASAP
  605. for key_id, search_result in search_results.items():
  606. search_result.future.cancel()
  607. raise e
  608. def _reuse_finished_search_result(self, finished: _SearchState):
  609. pending_requests = self.pending_get_requests[finished.key_id]
  610. if finished.found_something:
  611. search_result = ValueWithExpiration(finished.binary_value, finished.expiration_time)
  612. expiration_time_threshold = max(finished.expiration_time, finished.sufficient_expiration_time)
  613. # note: pending_requests is sorted in the order of descending sufficient_expiration_time
  614. while pending_requests and expiration_time_threshold >= pending_requests[-1].sufficient_expiration_time:
  615. pending_requests[-1].add_candidate(search_result, source_node_id=finished.source_node_id)
  616. pending_requests[-1].finish_search()
  617. pending_requests.pop()
  618. else:
  619. pending_requests.discard(finished)
  620. async def _call_find_with_blacklist(self, peer_id: PeerID, keys: Collection[DHTID]):
  621. """same as call_find, but skip if :peer_id: is blacklisted; also exclude blacklisted neighbors from result"""
  622. if peer_id in self.blacklist:
  623. return None
  624. response = await self.protocol.call_find(peer_id, keys)
  625. if response:
  626. self.blacklist.register_success(peer_id)
  627. return {
  628. key: (maybe_value, self._filter_blacklisted(nearest_peers))
  629. for key, (maybe_value, nearest_peers) in response.items()
  630. }
  631. else:
  632. self.blacklist.register_failure(peer_id)
  633. return None
  634. def _filter_blacklisted(self, peer_ids: Dict[DHTID, PeerID]):
  635. return {peer: peer_id for peer, peer_id in peer_ids.items() if peer_id not in self.blacklist}
  636. def _trigger_cache_refresh(self, search: _SearchState):
  637. """Called after get request is finished (whether it was found, not found, hit cache, cancelled, or reused)"""
  638. if search.found_something and search.source_node_id == self.node_id:
  639. if self.cache_refresh_before_expiry and search.key_id in self.protocol.cache:
  640. self._schedule_for_refresh(search.key_id, search.expiration_time - self.cache_refresh_before_expiry)
  641. def _schedule_for_refresh(self, key_id: DHTID, refresh_time: DHTExpiration):
  642. """Add key to a refresh queue, refresh at :refresh_time: or later"""
  643. if self.cache_refresh_task is None or self.cache_refresh_task.done() or self.cache_refresh_task.cancelled():
  644. self.cache_refresh_task = asyncio.create_task(self._refresh_stale_cache_entries())
  645. logger.debug("Spawned cache refresh task.")
  646. earliest_key, earliest_item = self.cache_refresh_queue.top()
  647. if earliest_item is None or refresh_time < earliest_item.expiration_time:
  648. self.cache_refresh_evt.set() # if we new element is now earliest, notify the cache queue
  649. self.cache_refresh_queue.store(key_id, value=refresh_time, expiration_time=refresh_time)
  650. async def _refresh_stale_cache_entries(self):
  651. """periodically refresh keys near-expired keys that were accessed at least once during previous lifetime"""
  652. while self.is_alive:
  653. while len(self.cache_refresh_queue) == 0:
  654. await self.cache_refresh_evt.wait()
  655. self.cache_refresh_evt.clear()
  656. key_id, (_, nearest_refresh_time) = self.cache_refresh_queue.top()
  657. try:
  658. # step 1: await until :cache_refresh_before_expiry: seconds before earliest first element expires
  659. time_to_wait = nearest_refresh_time - get_dht_time()
  660. await asyncio.wait_for(self.cache_refresh_evt.wait(), timeout=time_to_wait)
  661. # note: the line above will cause TimeoutError when we are ready to refresh cache
  662. self.cache_refresh_evt.clear() # no timeout error => someone added new entry to queue and ...
  663. continue # ... and this element is earlier than nearest_expiration. we should refresh this entry first
  664. except asyncio.TimeoutError: # caught TimeoutError => it is time to refresh the most recent cached entry
  665. # step 2: find all keys that we should already refresh and remove them from queue
  666. current_time = get_dht_time()
  667. keys_to_refresh = {key_id}
  668. max_expiration_time = nearest_refresh_time
  669. del self.cache_refresh_queue[key_id] # we pledge to refresh this key_id in the nearest batch
  670. while self.cache_refresh_queue and len(keys_to_refresh) < self.chunk_size:
  671. key_id, (_, nearest_refresh_time) = self.cache_refresh_queue.top()
  672. if nearest_refresh_time > current_time:
  673. break
  674. del self.cache_refresh_queue[key_id] # we pledge to refresh this key_id in the nearest batch
  675. keys_to_refresh.add(key_id)
  676. cached_item = self.protocol.cache.get(key_id)
  677. if cached_item is not None and cached_item.expiration_time > max_expiration_time:
  678. max_expiration_time = cached_item.expiration_time
  679. # step 3: search newer versions of these keys, cache them as a side-effect of self.get_many_by_id
  680. sufficient_expiration_time = max_expiration_time + self.cache_refresh_before_expiry + 1
  681. await self.get_many_by_id(keys_to_refresh, sufficient_expiration_time, _is_refresh=True)
  682. def _cache_new_result(
  683. self,
  684. search: _SearchState,
  685. nearest_nodes: List[DHTID],
  686. node_to_peer_id: Dict[DHTID, PeerID],
  687. _is_refresh: bool = False,
  688. ):
  689. """after key_id is found, update cache according to caching policy. used internally in get and get_many"""
  690. if search.found_something:
  691. _, storage_expiration_time = self.protocol.storage.get(search.key_id) or (None, -float("inf"))
  692. _, cache_expiration_time = self.protocol.cache.get(search.key_id) or (None, -float("inf"))
  693. if search.expiration_time > max(storage_expiration_time, cache_expiration_time):
  694. if self.cache_locally or _is_refresh:
  695. self.protocol.cache.store(search.key_id, search.binary_value, search.expiration_time)
  696. if self.cache_nearest:
  697. num_cached_nodes = 0
  698. for node_id in nearest_nodes:
  699. if node_id == search.source_node_id:
  700. continue
  701. asyncio.create_task(
  702. self.protocol.call_store(
  703. node_to_peer_id[node_id],
  704. [search.key_id],
  705. [search.binary_value],
  706. [search.expiration_time],
  707. in_cache=True,
  708. )
  709. )
  710. num_cached_nodes += 1
  711. if num_cached_nodes >= self.cache_nearest:
  712. break
  713. async def _refresh_routing_table(self, *, period: Optional[float]) -> None:
  714. """Tries to find new nodes for buckets that were unused for more than self.staleness_timeout"""
  715. while self.is_alive and period is not None: # if None run once, otherwise run forever
  716. refresh_time = get_dht_time()
  717. staleness_threshold = refresh_time - period
  718. stale_buckets = [
  719. bucket for bucket in self.protocol.routing_table.buckets if bucket.last_updated < staleness_threshold
  720. ]
  721. for bucket in stale_buckets:
  722. refresh_id = DHTID(random.randint(bucket.lower, bucket.upper - 1))
  723. await self.find_nearest_nodes(refresh_id)
  724. await asyncio.sleep(max(0.0, period - (get_dht_time() - refresh_time)))
  725. async def get_visible_maddrs(self, latest: bool = False) -> List[Multiaddr]:
  726. return await self.protocol.p2p.get_visible_maddrs(latest=latest)
  727. @dataclass(init=True, repr=True, frozen=False, order=False)
  728. class _SearchState:
  729. """A helper class that stores current-best GET results with metadata"""
  730. key_id: DHTID
  731. sufficient_expiration_time: DHTExpiration
  732. binary_value: Optional[Union[BinaryDHTValue, DictionaryDHTValue]] = None
  733. expiration_time: Optional[DHTExpiration] = None # best expiration time so far
  734. source_node_id: Optional[DHTID] = None # node that gave us the value
  735. future: asyncio.Future[Optional[ValueWithExpiration[DHTValue]]] = field(default_factory=asyncio.Future)
  736. serializer: Type[SerializerBase] = MSGPackSerializer
  737. record_validator: Optional[RecordValidatorBase] = None
  738. def add_candidate(
  739. self,
  740. candidate: Optional[ValueWithExpiration[Union[BinaryDHTValue, DictionaryDHTValue]]],
  741. source_node_id: Optional[DHTID],
  742. ):
  743. if self.finished or candidate is None:
  744. return
  745. elif isinstance(candidate.value, DictionaryDHTValue) and isinstance(self.binary_value, DictionaryDHTValue):
  746. self.binary_value.maxsize = max(self.binary_value.maxsize, candidate.value.maxsize)
  747. for subkey, subentry in candidate.value.items():
  748. self.binary_value.store(subkey, subentry.value, subentry.expiration_time)
  749. elif candidate.expiration_time > (self.expiration_time or float("-inf")):
  750. self.binary_value = candidate.value
  751. if candidate.expiration_time > (self.expiration_time or float("-inf")):
  752. self.expiration_time = candidate.expiration_time
  753. self.source_node_id = source_node_id
  754. if self.expiration_time >= self.sufficient_expiration_time:
  755. self.finish_search()
  756. def add_done_callback(self, callback: Callable[[_SearchState], Any]):
  757. """Add callback that will be called when _SearchState is done (found OR cancelled by user)"""
  758. self.future.add_done_callback(lambda _future: callback(self))
  759. def finish_search(self):
  760. if self.future.done():
  761. return # either user cancelled our search or someone sent it before us. Nothing more to do here.
  762. elif not self.found_something:
  763. self.future.set_result(None)
  764. elif isinstance(self.binary_value, BinaryDHTValue):
  765. value_bytes = self.binary_value
  766. if self.record_validator is not None:
  767. record = DHTRecord(
  768. self.key_id.to_bytes(), DHTProtocol.IS_REGULAR_VALUE, value_bytes, self.expiration_time
  769. )
  770. value_bytes = self.record_validator.strip_value(record)
  771. self.future.set_result(ValueWithExpiration(self.serializer.loads(value_bytes), self.expiration_time))
  772. elif isinstance(self.binary_value, DictionaryDHTValue):
  773. dict_with_subkeys = {}
  774. for subkey, (value_bytes, item_expiration_time) in self.binary_value.items():
  775. if self.record_validator is not None:
  776. subkey_bytes = self.serializer.dumps(subkey)
  777. record = DHTRecord(self.key_id.to_bytes(), subkey_bytes, value_bytes, item_expiration_time)
  778. value_bytes = self.record_validator.strip_value(record)
  779. dict_with_subkeys[subkey] = ValueWithExpiration(
  780. self.serializer.loads(value_bytes), item_expiration_time
  781. )
  782. self.future.set_result(ValueWithExpiration(dict_with_subkeys, self.expiration_time))
  783. else:
  784. logger.error(f"Invalid value type: {type(self.binary_value)}")
  785. @property
  786. def found_something(self) -> bool:
  787. """Whether or not we have found at least some value, regardless of its expiration time"""
  788. return self.expiration_time is not None
  789. @property
  790. def finished(self) -> bool:
  791. return self.future.done()
  792. def __lt__(self, other: _SearchState):
  793. """_SearchState instances will be sorted by their target expiration time"""
  794. return self.sufficient_expiration_time < other.sufficient_expiration_time
  795. def __hash__(self):
  796. return hash(self.key_id)
  797. class Blacklist:
  798. """
  799. A temporary blacklist of non-responding peers with exponential backoff policy
  800. :param base_time: peers are suspended for this many seconds by default
  801. :param backoff_rate: suspension time increases by this factor after each successive failure
  802. """
  803. def __init__(self, base_time: float, backoff_rate: float, **kwargs):
  804. self.base_time, self.backoff = base_time, backoff_rate
  805. self.banned_peers = TimedStorage[PeerID, int](**kwargs)
  806. self.ban_counter = Counter()
  807. def register_failure(self, peer: PeerID):
  808. """peer failed to respond, add him to blacklist or increase his downtime"""
  809. if peer not in self.banned_peers and self.base_time > 0:
  810. ban_duration = self.base_time * self.backoff ** self.ban_counter[peer]
  811. self.banned_peers.store(peer, self.ban_counter[peer], expiration_time=get_dht_time() + ban_duration)
  812. self.ban_counter[peer] += 1
  813. def register_success(self, peer):
  814. """peer responded successfully, remove him from blacklist and reset his ban time"""
  815. del self.banned_peers[peer], self.ban_counter[peer]
  816. def __contains__(self, peer: PeerID) -> bool:
  817. return peer in self.banned_peers
  818. def __repr__(self):
  819. return (
  820. f"{self.__class__.__name__}(base_time={self.base_time}, backoff={self.backoff}, "
  821. f"banned_peers={len(self.banned_peers)})"
  822. )
  823. def clear(self):
  824. self.banned_peers.clear()
  825. self.ban_counter.clear()
  826. class CacheRefreshQueue(TimedStorage[DHTID, DHTExpiration]):
  827. """a queue of keys scheduled for refresh in future, used in DHTNode"""
  828. frozen = True