matchmaking.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. """ A background process that averages your tensors with peers """
  2. from __future__ import annotations
  3. import contextlib
  4. from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID
  5. import random
  6. from math import isfinite
  7. from typing import Optional, AsyncIterator, Set, Tuple, Dict
  8. import concurrent.futures
  9. import asyncio
  10. from hivemind.averaging.group_info import GroupInfo
  11. from hivemind.averaging.key_manager import GroupKeyManager, GroupKey
  12. from hivemind.dht import DHT, DHTID, DHTExpiration
  13. from hivemind.p2p import P2P, P2PContext, P2PHandlerError, PeerID as Endpoint
  14. from hivemind.utils import get_logger, timed_storage, TimedStorage, get_dht_time
  15. from hivemind.utils.asyncio import anext
  16. from hivemind.proto import averaging_pb2
  17. logger = get_logger(__name__)
  18. class Matchmaking:
  19. f"""
  20. An internal class that is used to form groups of averages for running allreduce
  21. See DecentralizedAverager docstring for the detailed description of all parameters
  22. :note: on implementation: the current matchmaker protocol can encounter one type of (temporary) deadlock;
  23. This deadlock occurs when averager A requests averager B at the same time as averager B requests averager A.
  24. In that case, neither averager can process the other one's request because it is awaiting lock_request_join_group.
  25. This deadlock only happens if averagers have outdated information on expirations (due to network delays).
  26. While A->B->A deadlock is easy to fix, it gets much harder with more peers (e.g. A -> B -> C -> D -> A).
  27. Hence, instead of accounting for such deadlocks, we simply break them with request_timeout.
  28. """
  29. def __init__(
  30. self,
  31. p2p: P2P,
  32. schema_hash: bytes,
  33. dht: DHT,
  34. *,
  35. prefix: str,
  36. target_group_size: int,
  37. min_group_size: int,
  38. request_timeout: float,
  39. client_mode: bool,
  40. initial_group_bits: Optional[str] = None,
  41. averaging_expiration: float = 15,
  42. ):
  43. assert "." not in prefix, "group prefix must be a string without ."
  44. if request_timeout is None or request_timeout >= averaging_expiration:
  45. logger.warning(
  46. "It is recommended to use request_timeout smaller than averaging_expiration. Otherwise,"
  47. "matchmaking can cause deadlocks in some rare cases. Please see Matchmaking docstring."
  48. )
  49. super().__init__()
  50. self._p2p = p2p
  51. self.endpoint = p2p.id
  52. self.schema_hash = schema_hash
  53. self.group_key_manager = GroupKeyManager(dht, self.endpoint, prefix, initial_group_bits, target_group_size)
  54. self.target_group_size, self.min_group_size = target_group_size, min_group_size
  55. self.averaging_expiration, self.request_timeout = averaging_expiration, request_timeout
  56. self.client_mode = client_mode
  57. self.lock_looking_for_group = asyncio.Lock()
  58. self.lock_request_join_group = asyncio.Lock()
  59. self.follower_was_discarded = asyncio.Event()
  60. self.was_accepted_to_group = asyncio.Event()
  61. self.assembled_group = asyncio.Future()
  62. self.current_leader: Optional[Endpoint] = None # iff i am a follower, this is a link to my current leader
  63. self.current_followers: Dict[Endpoint, averaging_pb2.JoinRequest] = {} # my current followers excluding myself
  64. self.potential_leaders = PotentialLeaders(self.endpoint, averaging_expiration, target_group_size)
  65. self.data_for_gather: Optional[bytes] = None
  66. @property
  67. def is_looking_for_group(self):
  68. return self.lock_looking_for_group.locked()
  69. def __repr__(self):
  70. lfg_status = "looking for group," if self.is_looking_for_group else "not looking for group,"
  71. if self.is_looking_for_group:
  72. if self.current_leader:
  73. lfg_status += f" following {self.current_leader},"
  74. if len(self.current_followers):
  75. lfg_status += f" leading {len(self.current_followers)} followers,"
  76. schema_hash_repr = f"{self.schema_hash[0]}...{self.schema_hash[-8:]}"
  77. return (
  78. f"{self.__class__.__name__}(endpoint={self.endpoint}, schema={schema_hash_repr}, {lfg_status}"
  79. f" current key = {self.group_key_manager.current_key}, client_mode={self.client_mode})"
  80. )
  81. async def look_for_group(self, *, data_for_gather: bytes, timeout: Optional[float] = None) -> Optional[GroupInfo]:
  82. """
  83. :param data_for_gather: optionally send this data to all peers in the next group and gather it from groupmates
  84. :param timeout: maximum time that may be spent looking for group (does not include allreduce itself)
  85. :returns: an assembled group if successful, None if failed; does NOT perform the actual averaging
  86. Iterate over the averagers from a given group_identifier that have higher leadership priority than yourself.
  87. """
  88. if self.is_looking_for_group:
  89. logger.info(
  90. "Another look_for_group is already in progress. The current run will be scheduled after"
  91. " the existing group is either assembled or disbanded."
  92. )
  93. async with self.lock_looking_for_group:
  94. self.data_for_gather = data_for_gather
  95. request_leaders_task = asyncio.create_task(self._request_join_potential_leaders(timeout))
  96. try:
  97. return await asyncio.wait_for(self.assembled_group, timeout=timeout)
  98. except asyncio.TimeoutError:
  99. return None
  100. except BaseException as e:
  101. if len(self.current_followers) > 0:
  102. async with self.lock_request_join_group:
  103. await self.leader_disband_group()
  104. if not self.assembled_group.done():
  105. self.assembled_group.set_exception(e)
  106. raise
  107. finally:
  108. if not request_leaders_task.done():
  109. request_leaders_task.cancel()
  110. if not self.assembled_group.done():
  111. self.assembled_group.cancel()
  112. while len(self.current_followers) > 0:
  113. await self.follower_was_discarded.wait()
  114. self.follower_was_discarded.clear()
  115. # note: the code above ensures that we send all followers away before creating new future
  116. self.assembled_group = asyncio.Future()
  117. self.was_accepted_to_group.clear()
  118. self.data_for_gather = None
  119. async def _request_join_potential_leaders(self, timeout: Optional[float]) -> GroupInfo:
  120. """Request leaders from queue until we find the first runner. This coroutine is meant to run in background."""
  121. async with self.potential_leaders.begin_search(self.group_key_manager, timeout, declare=not self.client_mode):
  122. while True:
  123. try:
  124. next_leader = await self.potential_leaders.pop_next_leader() # throws TimeoutError on expiration
  125. group = await self.request_join_group(next_leader, self.potential_leaders.request_expiration_time)
  126. if group is not None:
  127. return group
  128. except asyncio.TimeoutError:
  129. async with self.lock_request_join_group:
  130. if self.assembled_group.done():
  131. return self.assembled_group.result()
  132. elif len(self.current_followers) + 1 >= self.min_group_size:
  133. # the time is up, we have a *good enough* group. run allreduce as is.
  134. return await self.leader_assemble_group()
  135. elif len(self.current_followers) > 0:
  136. await self.leader_disband_group()
  137. continue
  138. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  139. break # note: this is a compatibility layer for python3.7
  140. except Exception as e:
  141. if not self.assembled_group.done():
  142. self.assembled_group.set_exception(e)
  143. raise e
  144. async def request_join_group(self, leader: Endpoint, expiration_time: DHTExpiration) -> Optional[GroupInfo]:
  145. """
  146. :param leader: request this peer to be your leader for allreduce
  147. :param expiration_time: inform leader that we intend to begin averaging before this expiration_time
  148. :returns: if leader leader accepted us and started AllReduce, return that AllReduce. Otherwise, return None
  149. :note: this function does not guarantee that your group leader is the same as :leader: parameter
  150. The originally specified leader can disband group and redirect us to a different leader
  151. """
  152. assert self.is_looking_for_group and self.current_leader is None
  153. stream: AsyncIterator[averaging_pb2.MessageFromLeader] = None
  154. try:
  155. async with self.lock_request_join_group:
  156. from hivemind.averaging.averager import DecentralizedAverager
  157. leader_stub = DecentralizedAverager.get_stub(self._p2p, leader)
  158. stream = leader_stub.rpc_join_group(
  159. averaging_pb2.JoinRequest(
  160. endpoint=self.endpoint.to_base58(),
  161. schema_hash=self.schema_hash,
  162. expiration=expiration_time,
  163. client_mode=self.client_mode,
  164. gather=self.data_for_gather,
  165. )
  166. ).__aiter__()
  167. message = await asyncio.wait_for(anext(stream), timeout=self.request_timeout)
  168. if message.code == averaging_pb2.ACCEPTED:
  169. logger.debug(f"{self.endpoint} - joining the group of {leader}; waiting for peers")
  170. self.current_leader = leader
  171. self.was_accepted_to_group.set()
  172. if len(self.current_followers) > 0:
  173. await self.leader_disband_group()
  174. if message.code != averaging_pb2.ACCEPTED:
  175. code = averaging_pb2.MessageCode.Name(message.code)
  176. logger.debug(f"{self.endpoint} - requested {leader} to be my leader, but got rejected with {code}")
  177. return None
  178. async with self.potential_leaders.pause_search():
  179. time_to_expiration = max(expiration_time - get_dht_time(), 0.0)
  180. message = await asyncio.wait_for(anext(stream), time_to_expiration + self.request_timeout)
  181. if message.code == averaging_pb2.BEGIN_ALLREDUCE:
  182. async with self.lock_request_join_group:
  183. return await self.follower_assemble_group(leader, message)
  184. if message.code in (averaging_pb2.GROUP_DISBANDED, averaging_pb2.CANCELLED):
  185. if message.suggested_leader and message.suggested_leader != self.endpoint:
  186. logger.debug(f"{self} - leader disbanded group and redirected us to {message.suggested_leader}")
  187. self.current_leader = None
  188. await stream.aclose()
  189. return await self.request_join_group(message.suggested_leader, expiration_time)
  190. else:
  191. logger.debug(f"{self} - leader disbanded group")
  192. return None
  193. logger.debug(f"{self} - unexpected message from leader: {averaging_pb2.MessageCode.Name(message.code)}")
  194. return None
  195. except asyncio.TimeoutError:
  196. logger.debug(f"{self} - potential leader {leader} did not respond within {self.request_timeout}")
  197. return None
  198. except (P2PHandlerError, StopAsyncIteration) as e:
  199. logger.error(f"{self} - failed to request potential leader {leader}: {e}")
  200. return None
  201. finally:
  202. self.was_accepted_to_group.clear()
  203. self.current_leader = None
  204. if stream is not None:
  205. await stream.aclose()
  206. async def rpc_join_group(
  207. self, request: averaging_pb2.JoinRequest, _: P2PContext
  208. ) -> AsyncIterator[averaging_pb2.MessageFromLeader]:
  209. """accept or reject a join request from another averager; if accepted, run him through allreduce steps"""
  210. request_endpoint = PeerID.from_base58(request.endpoint)
  211. try:
  212. async with self.lock_request_join_group:
  213. reason_to_reject = self._check_reasons_to_reject(request)
  214. if reason_to_reject is not None:
  215. yield reason_to_reject
  216. return
  217. self.current_followers[request_endpoint] = request
  218. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.ACCEPTED)
  219. if len(self.current_followers) + 1 >= self.target_group_size and not self.assembled_group.done():
  220. # outcome 1: we have assembled a full group and are ready for allreduce
  221. await self.leader_assemble_group()
  222. # wait for the group to be assembled or disbanded
  223. timeout = max(0.0, self.potential_leaders.declared_expiration_time - get_dht_time())
  224. await asyncio.wait(
  225. {self.assembled_group, self.was_accepted_to_group.wait()},
  226. return_when=asyncio.FIRST_COMPLETED,
  227. timeout=timeout,
  228. )
  229. if not self.assembled_group.done() and not self.was_accepted_to_group.is_set():
  230. async with self.lock_request_join_group:
  231. if self.assembled_group.done():
  232. pass # this covers a rare case when the group is assembled while the event loop was busy.
  233. elif len(self.current_followers) + 1 >= self.min_group_size and self.is_looking_for_group:
  234. # outcome 2: the time is up, run allreduce with what we have or disband
  235. await self.leader_assemble_group()
  236. else:
  237. await self.leader_disband_group()
  238. if (
  239. self.was_accepted_to_group.is_set()
  240. or not self.assembled_group.done()
  241. or self.assembled_group.cancelled()
  242. or request_endpoint not in self.assembled_group.result()
  243. ):
  244. if self.current_leader is not None:
  245. # outcome 3: found by a leader with higher priority, send our followers to him
  246. yield averaging_pb2.MessageFromLeader(
  247. code=averaging_pb2.GROUP_DISBANDED, suggested_leader=self.current_leader
  248. )
  249. return
  250. else:
  251. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.GROUP_DISBANDED)
  252. return
  253. group_info = self.assembled_group.result()
  254. yield averaging_pb2.MessageFromLeader(
  255. code=averaging_pb2.BEGIN_ALLREDUCE,
  256. group_id=group_info.group_id,
  257. ordered_group_endpoints=group_info.endpoints,
  258. gathered=group_info.gathered,
  259. )
  260. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  261. return # note: this is a compatibility layer for python3.7
  262. except Exception as e:
  263. logger.exception(e)
  264. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.INTERNAL_ERROR)
  265. finally: # note: this code is guaranteed to run even if the coroutine is destroyed prematurely
  266. self.current_followers.pop(request_endpoint, None)
  267. self.follower_was_discarded.set()
  268. def _check_reasons_to_reject(
  269. self, request: averaging_pb2.JoinRequest
  270. ) -> Optional[averaging_pb2.MessageFromLeader]:
  271. """:returns: if accepted, return None, otherwise return a reason for rejection"""
  272. if not self.is_looking_for_group or self.assembled_group.done():
  273. return averaging_pb2.MessageFromLeader(code=averaging_pb2.NOT_LOOKING_FOR_GROUP)
  274. if (
  275. request.ListFields() == 3
  276. and not isinstance(request.schema_hash, bytes)
  277. or len(request.schema_hash) == 0
  278. or not isinstance(request.expiration, DHTExpiration)
  279. or not isfinite(request.expiration)
  280. or not isinstance(request.endpoint, Endpoint)
  281. or len(request.endpoint) == 0
  282. or self.client_mode
  283. ):
  284. return averaging_pb2.MessageFromLeader(code=averaging_pb2.PROTOCOL_VIOLATION)
  285. elif request.schema_hash != self.schema_hash:
  286. return averaging_pb2.MessageFromLeader(code=averaging_pb2.BAD_SCHEMA_HASH)
  287. elif self.potential_leaders.declared_group_key is None:
  288. return averaging_pb2.MessageFromLeader(code=averaging_pb2.NOT_DECLARED)
  289. elif self.potential_leaders.declared_expiration_time > (request.expiration or float("inf")):
  290. return averaging_pb2.MessageFromLeader(code=averaging_pb2.BAD_EXPIRATION_TIME)
  291. elif self.current_leader is not None:
  292. return averaging_pb2.MessageFromLeader(
  293. code=averaging_pb2.NOT_A_LEADER, suggested_leader=self.current_leader
  294. ) # note: this suggested leader is currently ignored
  295. elif request.endpoint == self.endpoint or request.endpoint in self.current_followers:
  296. return averaging_pb2.MessageFromLeader(code=averaging_pb2.DUPLICATE_ENDPOINT)
  297. elif len(self.current_followers) + 1 >= self.target_group_size:
  298. return averaging_pb2.MessageFromLeader(code=averaging_pb2.GROUP_IS_FULL)
  299. else:
  300. return None
  301. async def leader_assemble_group(self) -> GroupInfo:
  302. """Form up all current followers into a group and gather metadata"""
  303. assert self.lock_looking_for_group.locked() and self.lock_request_join_group.locked() and not self.client_mode
  304. assert not self.assembled_group.done()
  305. group_id = DHTID.generate().to_bytes() # note: both groupd_id and the order of endpoints must be random
  306. ordered_group_endpoints = list(self.current_followers)
  307. ordered_group_endpoints.append(self.endpoint)
  308. random.shuffle(ordered_group_endpoints)
  309. gathered = tuple(
  310. self.data_for_gather if endpoint == self.endpoint else self.current_followers[endpoint].gather
  311. for endpoint in ordered_group_endpoints
  312. )
  313. logger.debug(f"{self.endpoint} - assembled group of {len(ordered_group_endpoints)} peers.")
  314. group_info = GroupInfo(group_id, tuple(ordered_group_endpoints), gathered)
  315. await self.group_key_manager.update_key_on_group_assembled(group_info, is_leader=True)
  316. self.assembled_group.set_result(group_info)
  317. return group_info
  318. async def follower_assemble_group(self, leader: Endpoint, msg: averaging_pb2.MessageFromLeader) -> GroupInfo:
  319. """Form a group from using peers and metadata provided by our leader"""
  320. assert self.lock_looking_for_group.locked() and self.lock_request_join_group.locked()
  321. assert not self.assembled_group.done()
  322. assert self.current_leader == leader, f"averager does not follow {leader} (actual: {self.current_leader})"
  323. group_id, ordered_group_endpoints = msg.group_id, msg.ordered_group_endpoints
  324. assert self.endpoint in ordered_group_endpoints, "Leader sent us group_endpoints that does not contain us!"
  325. assert len(ordered_group_endpoints) == len(msg.gathered)
  326. logger.debug(f"{self.endpoint} - follower assembled group with leader {leader}.")
  327. group_info = GroupInfo(group_id, tuple(ordered_group_endpoints), tuple(msg.gathered))
  328. await self.group_key_manager.update_key_on_group_assembled(group_info)
  329. self.assembled_group.set_result(group_info)
  330. return group_info
  331. async def leader_disband_group(self):
  332. """Kick out all followers immediately, optionally direct them to our new leader (if we found one)"""
  333. assert self.lock_request_join_group.locked() and not self.client_mode
  334. self.current_followers.clear() # this will cause rpc_join_group to kick all followers out
  335. class PotentialLeaders:
  336. """An utility class that searches for averagers that could become our leaders"""
  337. def __init__(self, endpoint: Endpoint, averaging_expiration: DHTExpiration, target_group_size: Optional[int]):
  338. self.endpoint, self.averaging_expiration = endpoint, averaging_expiration
  339. self.target_group_size = target_group_size
  340. self.running, self.update_triggered, self.update_finished = asyncio.Event(), asyncio.Event(), asyncio.Event()
  341. self.declared_expiration, self.lock_search, self.lock_declare = asyncio.Event(), asyncio.Lock(), asyncio.Lock()
  342. self.leader_queue = TimedStorage[Endpoint, DHTExpiration]()
  343. self.past_attempts: Set[Tuple[Endpoint, DHTExpiration]] = set()
  344. self.declared_expiration_time = float("inf")
  345. self.declared_group_key: Optional[GroupKey] = None
  346. self.max_assured_time = float("-inf")
  347. self.search_end_time = float("inf")
  348. @contextlib.asynccontextmanager
  349. async def begin_search(self, key_manager: GroupKeyManager, timeout: Optional[float], declare: bool = True):
  350. async with self.lock_search:
  351. self.running.set()
  352. self.search_end_time = get_dht_time() + timeout if timeout is not None else float("inf")
  353. update_queue_task = asyncio.create_task(self._update_queue_periodically(key_manager))
  354. if declare:
  355. declare_averager_task = asyncio.create_task(self._declare_averager_periodically(key_manager))
  356. try:
  357. yield self
  358. finally:
  359. if not update_queue_task.done():
  360. update_queue_task.cancel()
  361. if declare and not declare_averager_task.done():
  362. declare_averager_task.cancel()
  363. for field in (
  364. self.past_attempts,
  365. self.leader_queue,
  366. self.running,
  367. self.update_finished,
  368. self.update_triggered,
  369. self.declared_expiration,
  370. ):
  371. field.clear()
  372. self.max_assured_time = float("-inf")
  373. self.search_end_time = float("inf")
  374. @contextlib.asynccontextmanager
  375. async def pause_search(self):
  376. was_running = self.running.is_set()
  377. try:
  378. self.running.clear()
  379. yield
  380. finally:
  381. if was_running:
  382. self.running.set()
  383. else:
  384. self.running.clear()
  385. async def pop_next_leader(self) -> Endpoint:
  386. """Remove and return the next most suitable leader or throw an exception if reached timeout"""
  387. assert self.running.is_set(), "Not running search at the moment"
  388. while True:
  389. maybe_next_leader, entry = self.leader_queue.top()
  390. if maybe_next_leader is None or self.max_assured_time <= entry.expiration_time <= self.search_end_time:
  391. self.update_triggered.set()
  392. if maybe_next_leader is None or (entry.expiration_time, maybe_next_leader) > (
  393. self.declared_expiration_time,
  394. self.endpoint,
  395. ):
  396. await asyncio.wait(
  397. {self.update_finished.wait(), self.declared_expiration.wait()}, return_when=asyncio.FIRST_COMPLETED
  398. )
  399. self.declared_expiration.clear()
  400. if self.update_finished.is_set():
  401. self.update_finished.clear()
  402. continue
  403. else:
  404. raise asyncio.TimeoutError("pop_next_leader was invalidated: re-declared averager in background")
  405. del self.leader_queue[maybe_next_leader]
  406. self.past_attempts.add((maybe_next_leader, entry.expiration_time))
  407. return maybe_next_leader
  408. @property
  409. def request_expiration_time(self) -> float:
  410. """this averager's current expiration time - used to send join requests to leaders"""
  411. if isfinite(self.declared_expiration_time):
  412. return self.declared_expiration_time
  413. else:
  414. return min(get_dht_time() + self.averaging_expiration, self.search_end_time)
  415. async def _update_queue_periodically(self, key_manager: GroupKeyManager):
  416. try:
  417. DISCREPANCY = timed_storage.MAX_DHT_TIME_DISCREPANCY_SECONDS
  418. while get_dht_time() < self.search_end_time:
  419. new_peers = await key_manager.get_averagers(key_manager.current_key, only_active=True)
  420. self.max_assured_time = max(
  421. self.max_assured_time, get_dht_time() + self.averaging_expiration - DISCREPANCY
  422. )
  423. self.leader_queue.clear()
  424. for peer, peer_expiration_time in new_peers:
  425. if peer == self.endpoint or (peer, peer_expiration_time) in self.past_attempts:
  426. continue
  427. self.leader_queue.store(peer, peer_expiration_time, peer_expiration_time)
  428. self.max_assured_time = max(self.max_assured_time, peer_expiration_time - DISCREPANCY)
  429. self.update_finished.set()
  430. await asyncio.wait(
  431. {self.running.wait(), self.update_triggered.wait()},
  432. return_when=asyncio.ALL_COMPLETED,
  433. timeout=self.search_end_time - get_dht_time() if isfinite(self.search_end_time) else None,
  434. )
  435. self.update_triggered.clear()
  436. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  437. return # note: this is a compatibility layer for python3.7
  438. except Exception as e:
  439. logger.error(f"{self.endpoint} - caught {type(e)}: {e}")
  440. raise
  441. async def _declare_averager_periodically(self, key_manager: GroupKeyManager):
  442. async with self.lock_declare:
  443. try:
  444. while True:
  445. await self.running.wait()
  446. new_expiration_time = min(get_dht_time() + self.averaging_expiration, self.search_end_time)
  447. self.declared_group_key = group_key = key_manager.current_key
  448. self.declared_expiration_time = new_expiration_time
  449. self.declared_expiration.set()
  450. await key_manager.declare_averager(group_key, self.endpoint, expiration_time=new_expiration_time)
  451. await asyncio.sleep(self.declared_expiration_time - get_dht_time())
  452. if self.running.is_set() and len(self.leader_queue) == 0:
  453. await key_manager.update_key_on_not_enough_peers()
  454. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  455. pass # note: this is a compatibility layer for python3.7
  456. except Exception as e: # note: we catch exceptions here because otherwise they are never printed
  457. logger.error(f"{self.endpoint} - caught {type(e)}: {e}")
  458. finally:
  459. if self.declared_group_key is not None:
  460. prev_declared_key, prev_expiration_time = self.declared_group_key, self.declared_expiration_time
  461. self.declared_group_key, self.declared_expiration_time = None, float("inf")
  462. self.leader_queue, self.max_assured_time = TimedStorage[Endpoint, DHTExpiration](), float("-inf")
  463. await key_manager.declare_averager(
  464. prev_declared_key, self.endpoint, prev_expiration_time, looking_for_group=False
  465. )
  466. class MatchmakingException(Exception):
  467. """An internal exception that marks undesired edge cases during averaging"""