matchmaking.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. """ A background process that averages your tensors with peers """
  2. from __future__ import annotations
  3. import asyncio
  4. import concurrent.futures
  5. import contextlib
  6. import random
  7. from math import isfinite
  8. from typing import AsyncIterator, Dict, Optional, Set, Tuple, Type
  9. from hivemind.averaging.group_info import GroupInfo
  10. from hivemind.averaging.key_manager import GroupKey, GroupKeyManager
  11. from hivemind.dht import DHT, DHTID, DHTExpiration
  12. from hivemind.p2p import P2P, P2PContext, P2PHandlerError, PeerID, ServicerBase
  13. from hivemind.proto import averaging_pb2
  14. from hivemind.utils import TimedStorage, get_dht_time, get_logger, timed_storage
  15. from hivemind.utils.asyncio import anext, cancel_and_wait
  16. logger = get_logger(__name__)
  17. class Matchmaking:
  18. f"""
  19. An internal class that is used to form groups of averages for running allreduce
  20. See DecentralizedAverager docstring for the detailed description of all parameters
  21. :note: on implementation: the current matchmaker protocol can encounter one type of (temporary) deadlock;
  22. This deadlock occurs when averager A requests averager B at the same time as averager B requests averager A.
  23. In that case, neither averager can process the other one's request because it is awaiting lock_request_join_group.
  24. This deadlock only happens if averagers have outdated information on expirations (due to network delays).
  25. While A->B->A deadlock is easy to fix, it gets much harder with more peers (e.g. A -> B -> C -> D -> A).
  26. Hence, instead of accounting for such deadlocks, we simply break them with request_timeout.
  27. """
  28. def __init__(
  29. self,
  30. p2p: P2P,
  31. schema_hash: bytes,
  32. dht: DHT,
  33. *,
  34. servicer_type: Type[ServicerBase],
  35. prefix: str,
  36. target_group_size: int,
  37. min_group_size: int,
  38. request_timeout: float,
  39. client_mode: bool,
  40. initial_group_bits: str = "",
  41. averaging_expiration: float = 15,
  42. ):
  43. assert "." not in prefix, "group prefix must be a string without ."
  44. if request_timeout is None or request_timeout >= averaging_expiration:
  45. logger.warning(
  46. "It is recommended to use request_timeout smaller than averaging_expiration. Otherwise,"
  47. "matchmaking can cause deadlocks in some rare cases. Please see Matchmaking docstring."
  48. )
  49. super().__init__()
  50. self._p2p = p2p
  51. if not issubclass(servicer_type, ServicerBase):
  52. raise TypeError("`servicer_type` is expected to be a ServicerBase subclass")
  53. self._servicer_type = servicer_type
  54. self._prefix = prefix
  55. self.peer_id = p2p.peer_id
  56. self.schema_hash = schema_hash
  57. self.group_key_manager = GroupKeyManager(dht, prefix, initial_group_bits, target_group_size)
  58. self.target_group_size, self.min_group_size = target_group_size, min_group_size
  59. self.averaging_expiration, self.request_timeout = averaging_expiration, request_timeout
  60. self.client_mode = client_mode
  61. self.lock_looking_for_group = asyncio.Lock()
  62. self.lock_request_join_group = asyncio.Lock()
  63. self.follower_was_discarded = asyncio.Event()
  64. self.was_accepted_to_group = asyncio.Event()
  65. self.assembled_group = asyncio.Future()
  66. self.current_leader: Optional[PeerID] = None # iff i am a follower, this is a link to my current leader
  67. self.current_followers: Dict[PeerID, averaging_pb2.JoinRequest] = {} # my current followers excluding myself
  68. self.potential_leaders = PotentialLeaders(self.peer_id, averaging_expiration, target_group_size)
  69. self.data_for_gather: Optional[bytes] = None
  70. @property
  71. def is_looking_for_group(self):
  72. return self.lock_looking_for_group.locked()
  73. def __repr__(self):
  74. lfg_status = "looking for group," if self.is_looking_for_group else "not looking for group,"
  75. if self.is_looking_for_group:
  76. if self.current_leader:
  77. lfg_status += f" following {self.current_leader},"
  78. if len(self.current_followers):
  79. lfg_status += f" leading {len(self.current_followers)} followers,"
  80. schema_hash_repr = f"{self.schema_hash[0]}...{self.schema_hash[-8:]}"
  81. return (
  82. f"{self.__class__.__name__}(peer_id={self.peer_id}, schema={schema_hash_repr}, {lfg_status}"
  83. f" current key = {self.group_key_manager.current_key}, client_mode={self.client_mode})"
  84. )
  85. async def look_for_group(self, *, data_for_gather: bytes, timeout: Optional[float] = None) -> Optional[GroupInfo]:
  86. """
  87. :param data_for_gather: optionally send this data to all peers in the next group and gather it from groupmates
  88. :param timeout: maximum time that may be spent looking for group (does not include allreduce itself)
  89. :returns: an assembled group if successful, None if failed; does NOT perform the actual averaging
  90. Iterate over the averagers from a given group_identifier that have higher leadership priority than yourself.
  91. """
  92. if self.is_looking_for_group:
  93. logger.info(
  94. "Another look_for_group is already in progress. The current run will be scheduled after"
  95. " the existing group is either assembled or disbanded."
  96. )
  97. async with self.lock_looking_for_group:
  98. self.data_for_gather = data_for_gather
  99. request_leaders_task = asyncio.create_task(self._request_join_potential_leaders(timeout))
  100. try:
  101. return await asyncio.wait_for(self.assembled_group, timeout=timeout)
  102. except asyncio.TimeoutError:
  103. return None
  104. except BaseException as e:
  105. if len(self.current_followers) > 0:
  106. async with self.lock_request_join_group:
  107. await self.leader_disband_group()
  108. if not self.assembled_group.done():
  109. self.assembled_group.set_exception(e)
  110. raise
  111. finally:
  112. await cancel_and_wait(request_leaders_task)
  113. self.assembled_group.cancel()
  114. while len(self.current_followers) > 0:
  115. await self.follower_was_discarded.wait()
  116. self.follower_was_discarded.clear()
  117. # note: the code above ensures that we send all followers away before creating new future
  118. self.assembled_group = asyncio.Future()
  119. self.was_accepted_to_group.clear()
  120. self.data_for_gather = None
  121. async def _request_join_potential_leaders(self, timeout: Optional[float]) -> GroupInfo:
  122. """Request leaders from queue until we find the first runner. This coroutine is meant to run in background."""
  123. async with self.potential_leaders.begin_search(self.group_key_manager, timeout, declare=not self.client_mode):
  124. while True:
  125. try:
  126. next_leader = await self.potential_leaders.pop_next_leader() # throws TimeoutError on expiration
  127. group = await self.request_join_group(next_leader, self.potential_leaders.request_expiration_time)
  128. if group is not None:
  129. return group
  130. except asyncio.TimeoutError:
  131. async with self.lock_request_join_group:
  132. if self.assembled_group.done():
  133. return self.assembled_group.result()
  134. elif len(self.current_followers) + 1 >= self.min_group_size:
  135. # the time is up, we have a *good enough* group. run allreduce as is.
  136. return await self.leader_assemble_group()
  137. elif len(self.current_followers) > 0:
  138. await self.leader_disband_group()
  139. continue
  140. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  141. break # note: this is a compatibility layer for python3.7
  142. except Exception as e:
  143. if not self.assembled_group.done():
  144. self.assembled_group.set_exception(e)
  145. raise e
  146. async def request_join_group(self, leader: PeerID, expiration_time: DHTExpiration) -> Optional[GroupInfo]:
  147. """
  148. :param leader: request this peer to be your leader for allreduce
  149. :param expiration_time: inform leader that we intend to begin averaging before this expiration_time
  150. :returns: if leader leader accepted us and started AllReduce, return that AllReduce. Otherwise, return None
  151. :note: this function does not guarantee that your group leader is the same as :leader: parameter
  152. The originally specified leader can disband group and redirect us to a different leader
  153. """
  154. assert self.is_looking_for_group and self.current_leader is None
  155. stream: AsyncIterator[averaging_pb2.MessageFromLeader] = None
  156. try:
  157. async with self.lock_request_join_group:
  158. leader_stub = self._servicer_type.get_stub(self._p2p, leader, namespace=self._prefix)
  159. stream = leader_stub.rpc_join_group(
  160. averaging_pb2.JoinRequest(
  161. schema_hash=self.schema_hash,
  162. expiration=expiration_time,
  163. client_mode=self.client_mode,
  164. gather=self.data_for_gather,
  165. group_key=self.group_key_manager.current_key,
  166. )
  167. )
  168. message = await asyncio.wait_for(anext(stream), timeout=self.request_timeout)
  169. if message.code == averaging_pb2.ACCEPTED:
  170. logger.debug(f"{self.peer_id} - joining the group of {leader}; waiting for peers")
  171. self.current_leader = leader
  172. self.was_accepted_to_group.set()
  173. if len(self.current_followers) > 0:
  174. await self.leader_disband_group()
  175. if message.code != averaging_pb2.ACCEPTED:
  176. code = averaging_pb2.MessageCode.Name(message.code)
  177. logger.debug(f"{self.peer_id} - requested {leader} to be my leader, but got rejected with {code}")
  178. return None
  179. async with self.potential_leaders.pause_search():
  180. time_to_expiration = max(expiration_time - get_dht_time(), 0.0)
  181. message = await asyncio.wait_for(anext(stream), time_to_expiration + self.request_timeout)
  182. if message.code == averaging_pb2.BEGIN_ALLREDUCE:
  183. async with self.lock_request_join_group:
  184. return await self.follower_assemble_group(leader, message)
  185. if message.code in (averaging_pb2.GROUP_DISBANDED, averaging_pb2.CANCELLED):
  186. if message.suggested_leader:
  187. suggested_leader = PeerID(message.suggested_leader)
  188. if suggested_leader != self.peer_id:
  189. logger.debug(f"{self} - leader disbanded group and redirected us to {suggested_leader}")
  190. self.current_leader = None
  191. await stream.aclose()
  192. return await self.request_join_group(suggested_leader, expiration_time)
  193. logger.debug(f"{self} - leader disbanded group")
  194. return None
  195. logger.debug(f"{self} - unexpected message from leader: {averaging_pb2.MessageCode.Name(message.code)}")
  196. return None
  197. except asyncio.TimeoutError:
  198. logger.debug(f"{self} - potential leader {leader} did not respond within {self.request_timeout}")
  199. return None
  200. except (P2PHandlerError, StopAsyncIteration) as e:
  201. logger.exception(f"{self} - failed to request potential leader {leader}:")
  202. return None
  203. finally:
  204. self.was_accepted_to_group.clear()
  205. self.current_leader = None
  206. if stream is not None:
  207. await stream.aclose()
  208. async def rpc_join_group(
  209. self, request: averaging_pb2.JoinRequest, context: P2PContext
  210. ) -> AsyncIterator[averaging_pb2.MessageFromLeader]:
  211. """accept or reject a join request from another averager; if accepted, run him through allreduce steps"""
  212. try:
  213. async with self.lock_request_join_group:
  214. reason_to_reject = self._check_reasons_to_reject(request, context)
  215. if reason_to_reject is not None:
  216. yield reason_to_reject
  217. return
  218. self.current_followers[context.remote_id] = request
  219. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.ACCEPTED)
  220. if len(self.current_followers) + 1 >= self.target_group_size and not self.assembled_group.done():
  221. # outcome 1: we have assembled a full group and are ready for allreduce
  222. await self.leader_assemble_group()
  223. # wait for the group to be assembled or disbanded
  224. timeout = max(0.0, self.potential_leaders.declared_expiration_time - get_dht_time())
  225. await asyncio.wait(
  226. {self.assembled_group, self.was_accepted_to_group.wait()},
  227. return_when=asyncio.FIRST_COMPLETED,
  228. timeout=timeout,
  229. )
  230. if not self.assembled_group.done() and not self.was_accepted_to_group.is_set():
  231. async with self.lock_request_join_group:
  232. if self.assembled_group.done():
  233. pass # this covers a rare case when the group is assembled while the event loop was busy.
  234. elif len(self.current_followers) + 1 >= self.min_group_size and self.is_looking_for_group:
  235. # outcome 2: the time is up, run allreduce with what we have or disband
  236. await self.leader_assemble_group()
  237. else:
  238. await self.leader_disband_group()
  239. if (
  240. self.was_accepted_to_group.is_set()
  241. or not self.assembled_group.done()
  242. or self.assembled_group.cancelled()
  243. or context.remote_id not in self.assembled_group.result()
  244. ):
  245. if self.current_leader is not None:
  246. # outcome 3: found by a leader with higher priority, send our followers to him
  247. yield averaging_pb2.MessageFromLeader(
  248. code=averaging_pb2.GROUP_DISBANDED, suggested_leader=self.current_leader.to_bytes()
  249. )
  250. return
  251. else:
  252. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.GROUP_DISBANDED)
  253. return
  254. group_info = self.assembled_group.result()
  255. yield averaging_pb2.MessageFromLeader(
  256. code=averaging_pb2.BEGIN_ALLREDUCE,
  257. group_id=group_info.group_id,
  258. ordered_peer_ids=[item.to_bytes() for item in group_info.peer_ids],
  259. gathered=group_info.gathered,
  260. )
  261. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  262. return # note: this is a compatibility layer for python3.7
  263. except Exception as e:
  264. logger.exception(e)
  265. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.INTERNAL_ERROR)
  266. finally: # note: this code is guaranteed to run even if the coroutine is destroyed prematurely
  267. self.current_followers.pop(context.remote_id, None)
  268. self.follower_was_discarded.set()
  269. def _check_reasons_to_reject(
  270. self, request: averaging_pb2.JoinRequest, context: P2PContext
  271. ) -> Optional[averaging_pb2.MessageFromLeader]:
  272. """:returns: if accepted, return None, otherwise return a reason for rejection"""
  273. if not self.is_looking_for_group or self.assembled_group.done():
  274. return averaging_pb2.MessageFromLeader(code=averaging_pb2.NOT_LOOKING_FOR_GROUP)
  275. if (
  276. request.ListFields() == 3
  277. and not isinstance(request.schema_hash, bytes)
  278. or len(request.schema_hash) == 0
  279. or not isinstance(request.expiration, DHTExpiration)
  280. or not isfinite(request.expiration)
  281. or self.client_mode
  282. or not isinstance(request.group_key, GroupKey)
  283. ):
  284. return averaging_pb2.MessageFromLeader(code=averaging_pb2.PROTOCOL_VIOLATION)
  285. elif request.schema_hash != self.schema_hash:
  286. return averaging_pb2.MessageFromLeader(code=averaging_pb2.BAD_SCHEMA_HASH)
  287. elif request.group_key != self.group_key_manager.current_key:
  288. return averaging_pb2.MessageFromLeader(code=averaging_pb2.BAD_GROUP_KEY)
  289. elif self.potential_leaders.declared_group_key is None:
  290. return averaging_pb2.MessageFromLeader(code=averaging_pb2.NOT_DECLARED)
  291. elif self.potential_leaders.declared_expiration_time > (request.expiration or float("inf")):
  292. return averaging_pb2.MessageFromLeader(code=averaging_pb2.BAD_EXPIRATION_TIME)
  293. elif self.current_leader is not None:
  294. return averaging_pb2.MessageFromLeader(
  295. code=averaging_pb2.NOT_A_LEADER, suggested_leader=self.current_leader.to_bytes()
  296. )
  297. elif context.remote_id == self.peer_id or context.remote_id in self.current_followers:
  298. return averaging_pb2.MessageFromLeader(code=averaging_pb2.DUPLICATE_PEER_ID)
  299. elif len(self.current_followers) + 1 >= self.target_group_size:
  300. return averaging_pb2.MessageFromLeader(code=averaging_pb2.GROUP_IS_FULL)
  301. else:
  302. return None
  303. async def leader_assemble_group(self) -> GroupInfo:
  304. """Form up all current followers into a group and gather metadata"""
  305. assert self.lock_looking_for_group.locked() and self.lock_request_join_group.locked() and not self.client_mode
  306. assert not self.assembled_group.done()
  307. group_id = DHTID.generate().to_bytes() # note: both groupd_id and the order of peer_ids must be random
  308. ordered_peer_ids = list(self.current_followers)
  309. ordered_peer_ids.append(self.peer_id)
  310. random.shuffle(ordered_peer_ids)
  311. gathered = tuple(
  312. self.data_for_gather if peer_id == self.peer_id else self.current_followers[peer_id].gather
  313. for peer_id in ordered_peer_ids
  314. )
  315. logger.debug(f"{self.peer_id} - assembled group of {len(ordered_peer_ids)} peers.")
  316. group_info = GroupInfo(group_id, tuple(ordered_peer_ids), gathered)
  317. await self.group_key_manager.update_key_on_group_assembled(group_info, is_leader=True)
  318. self.assembled_group.set_result(group_info)
  319. return group_info
  320. async def follower_assemble_group(self, leader: PeerID, msg: averaging_pb2.MessageFromLeader) -> GroupInfo:
  321. """Form a group from using peers and metadata provided by our leader"""
  322. assert self.lock_looking_for_group.locked() and self.lock_request_join_group.locked()
  323. assert not self.assembled_group.done()
  324. assert self.current_leader == leader, f"averager does not follow {leader} (actual: {self.current_leader})"
  325. group_id = msg.group_id
  326. ordered_peer_ids = [PeerID(item) for item in msg.ordered_peer_ids]
  327. assert self.peer_id in ordered_peer_ids, "Leader sent us group_peer_ids that does not contain us!"
  328. assert len(ordered_peer_ids) == len(msg.gathered)
  329. logger.debug(f"{self.peer_id} - follower assembled group with leader {leader}.")
  330. group_info = GroupInfo(group_id, tuple(ordered_peer_ids), tuple(msg.gathered))
  331. await self.group_key_manager.update_key_on_group_assembled(group_info)
  332. self.assembled_group.set_result(group_info)
  333. return group_info
  334. async def leader_disband_group(self):
  335. """Kick out all followers immediately, optionally direct them to our new leader (if we found one)"""
  336. assert self.lock_request_join_group.locked() and not self.client_mode
  337. self.current_followers.clear() # this will cause rpc_join_group to kick all followers out
  338. class PotentialLeaders:
  339. """An utility class that searches for averagers that could become our leaders"""
  340. def __init__(self, peer_id: PeerID, averaging_expiration: DHTExpiration, target_group_size: Optional[int]):
  341. self.peer_id, self.averaging_expiration = peer_id, averaging_expiration
  342. self.target_group_size = target_group_size
  343. self.running, self.update_triggered, self.update_finished = asyncio.Event(), asyncio.Event(), asyncio.Event()
  344. self.declared_expiration, self.lock_search, self.lock_declare = asyncio.Event(), asyncio.Lock(), asyncio.Lock()
  345. self.leader_queue = TimedStorage[PeerID, DHTExpiration]()
  346. self.past_attempts: Set[Tuple[PeerID, DHTExpiration]] = set()
  347. self.declared_expiration_time = float("inf")
  348. self.declared_group_key: Optional[GroupKey] = None
  349. self.max_assured_time = float("-inf")
  350. self.search_end_time = float("inf")
  351. @contextlib.asynccontextmanager
  352. async def begin_search(self, key_manager: GroupKeyManager, timeout: Optional[float], declare: bool = True):
  353. async with self.lock_search:
  354. self.running.set()
  355. self.search_end_time = get_dht_time() + timeout if timeout is not None else float("inf")
  356. update_queue_task = asyncio.create_task(self._update_queue_periodically(key_manager))
  357. if declare:
  358. declare_averager_task = asyncio.create_task(self._declare_averager_periodically(key_manager))
  359. try:
  360. yield self
  361. finally:
  362. await cancel_and_wait(update_queue_task)
  363. if declare:
  364. await cancel_and_wait(declare_averager_task)
  365. for field in (
  366. self.past_attempts,
  367. self.leader_queue,
  368. self.running,
  369. self.update_finished,
  370. self.update_triggered,
  371. self.declared_expiration,
  372. ):
  373. field.clear()
  374. self.max_assured_time = float("-inf")
  375. self.search_end_time = float("inf")
  376. @contextlib.asynccontextmanager
  377. async def pause_search(self):
  378. was_running = self.running.is_set()
  379. try:
  380. self.running.clear()
  381. yield
  382. finally:
  383. if was_running:
  384. self.running.set()
  385. else:
  386. self.running.clear()
  387. async def pop_next_leader(self) -> PeerID:
  388. """Remove and return the next most suitable leader or throw an exception if reached timeout"""
  389. assert self.running.is_set(), "Not running search at the moment"
  390. while True:
  391. maybe_next_leader, entry = self.leader_queue.top()
  392. if maybe_next_leader is None or self.max_assured_time <= entry.expiration_time <= self.search_end_time:
  393. self.update_triggered.set()
  394. if maybe_next_leader is None or (entry.expiration_time, maybe_next_leader.to_bytes()) > (
  395. self.declared_expiration_time,
  396. self.peer_id.to_bytes(),
  397. ):
  398. await asyncio.wait(
  399. {self.update_finished.wait(), self.declared_expiration.wait()}, return_when=asyncio.FIRST_COMPLETED
  400. )
  401. self.declared_expiration.clear()
  402. if self.update_finished.is_set():
  403. self.update_finished.clear()
  404. continue
  405. else:
  406. raise asyncio.TimeoutError("pop_next_leader was invalidated: re-declared averager in background")
  407. del self.leader_queue[maybe_next_leader]
  408. self.past_attempts.add((maybe_next_leader, entry.expiration_time))
  409. return maybe_next_leader
  410. @property
  411. def request_expiration_time(self) -> float:
  412. """this averager's current expiration time - used to send join requests to leaders"""
  413. if isfinite(self.declared_expiration_time):
  414. return self.declared_expiration_time
  415. else:
  416. return min(get_dht_time() + self.averaging_expiration, self.search_end_time)
  417. async def _update_queue_periodically(self, key_manager: GroupKeyManager) -> None:
  418. DISCREPANCY = timed_storage.MAX_DHT_TIME_DISCREPANCY_SECONDS
  419. while get_dht_time() < self.search_end_time:
  420. new_peers = await key_manager.get_averagers(key_manager.current_key, only_active=True)
  421. self.max_assured_time = max(
  422. self.max_assured_time, get_dht_time() + self.averaging_expiration - DISCREPANCY
  423. )
  424. self.leader_queue.clear()
  425. for peer, peer_expiration_time in new_peers:
  426. if peer == self.peer_id or (peer, peer_expiration_time) in self.past_attempts:
  427. continue
  428. self.leader_queue.store(peer, peer_expiration_time, peer_expiration_time)
  429. self.max_assured_time = max(self.max_assured_time, peer_expiration_time - DISCREPANCY)
  430. self.update_finished.set()
  431. await asyncio.wait(
  432. {self.running.wait(), self.update_triggered.wait()},
  433. return_when=asyncio.ALL_COMPLETED,
  434. timeout=self.search_end_time - get_dht_time() if isfinite(self.search_end_time) else None,
  435. )
  436. self.update_triggered.clear()
  437. async def _declare_averager_periodically(self, key_manager: GroupKeyManager) -> None:
  438. async with self.lock_declare:
  439. try:
  440. while True:
  441. await self.running.wait()
  442. new_expiration_time = min(get_dht_time() + self.averaging_expiration, self.search_end_time)
  443. self.declared_group_key = group_key = key_manager.current_key
  444. self.declared_expiration_time = new_expiration_time
  445. self.declared_expiration.set()
  446. await key_manager.declare_averager(group_key, self.peer_id, expiration_time=new_expiration_time)
  447. await asyncio.sleep(self.declared_expiration_time - get_dht_time())
  448. if self.running.is_set() and len(self.leader_queue) == 0:
  449. await key_manager.update_key_on_not_enough_peers()
  450. finally:
  451. if self.declared_group_key is not None:
  452. prev_declared_key, prev_expiration_time = self.declared_group_key, self.declared_expiration_time
  453. self.declared_group_key, self.declared_expiration_time = None, float("inf")
  454. self.leader_queue, self.max_assured_time = TimedStorage[PeerID, DHTExpiration](), float("-inf")
  455. await key_manager.declare_averager(
  456. prev_declared_key, self.peer_id, prev_expiration_time, looking_for_group=False
  457. )
  458. class MatchmakingException(Exception):
  459. """An internal exception that marks undesired edge cases during averaging"""