matchmaking.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. """ A background process that averages your tensors with peers """
  2. from __future__ import annotations
  3. import contextlib
  4. import random
  5. from math import isfinite
  6. from typing import Optional, AsyncIterator, Set, Tuple, Dict
  7. import concurrent.futures
  8. import asyncio
  9. import grpc
  10. import grpc._cython.cygrpc
  11. from hivemind.client.averaging.group_info import GroupInfo
  12. from hivemind.client.averaging.key_manager import GroupKeyManager, GroupKey
  13. from hivemind.dht import DHT, DHTID, DHTExpiration, get_dht_time
  14. from hivemind.utils import get_logger, Endpoint, timed_storage, TimedStorage
  15. from hivemind.proto import averaging_pb2, averaging_pb2_grpc
  16. from hivemind.utils.grpc import ChannelCache
  17. logger = get_logger(__name__)
  18. class Matchmaking(averaging_pb2_grpc.DecentralizedAveragingServicer):
  19. f"""
  20. An internal class that is used to form groups of averages for running allreduce
  21. See DecentralizedAverager docstring for the detailed description of all parameters
  22. :note: on implementation: the current matchmaker protocol can encounter one type of (temporary) deadlock;
  23. This deadlock occurs when averager A requests averager B at the same time as averager B requests averager A.
  24. In that case, neither averager can process the other one's request because it is awaiting lock_request_join_group.
  25. This deadlock only happens if averagers have outdated information on expirations (due to network delays).
  26. While A->B->A deadlock is easy to fix, it gets much harder with more peers (e.g. A -> B -> C -> D -> A).
  27. Hence, instead of accounting for such deadlocks, we simply break them with request_timeout.
  28. """
  29. def __init__(self, endpoint: Endpoint, schema_hash: bytes, dht: DHT, *,
  30. prefix: str, target_group_size: int, min_group_size: int,
  31. request_timeout: float, client_mode: bool, initial_group_bits: Optional[str] = None,
  32. averaging_expiration: float = 15):
  33. assert '.' not in prefix, "group prefix must be a string without ."
  34. if request_timeout is None or request_timeout >= averaging_expiration:
  35. logger.warning("It is recommended to use request_timeout smaller than averaging_expiration. Otherwise,"
  36. "matchmaking can cause deadlocks in some rare cases. Please see Matchmaking docstring.")
  37. super().__init__()
  38. self.endpoint, self.schema_hash = endpoint, schema_hash
  39. self.group_key_manager = GroupKeyManager(dht, endpoint, prefix, initial_group_bits, target_group_size)
  40. self.target_group_size, self.min_group_size = target_group_size, min_group_size
  41. self.averaging_expiration, self.request_timeout = averaging_expiration, request_timeout
  42. self.client_mode = client_mode
  43. self.lock_looking_for_group = asyncio.Lock()
  44. self.lock_request_join_group = asyncio.Lock()
  45. self.follower_was_discarded = asyncio.Event()
  46. self.was_accepted_to_group = asyncio.Event()
  47. self.assembled_group = asyncio.Future()
  48. self.current_leader: Optional[Endpoint] = None # iff i am a follower, this is a link to my current leader
  49. self.current_followers: Dict[Endpoint, averaging_pb2.JoinRequest] = {} # my current followers excluding myself
  50. self.potential_leaders = PotentialLeaders(endpoint, averaging_expiration, target_group_size)
  51. self.data_for_gather: Optional[bytes] = None
  52. @property
  53. def is_looking_for_group(self):
  54. return self.lock_looking_for_group.locked()
  55. def __repr__(self):
  56. lfg_status = "looking for group," if self.is_looking_for_group else "not looking for group,"
  57. if self.is_looking_for_group:
  58. if self.current_leader:
  59. lfg_status += f" following {self.current_leader},"
  60. if len(self.current_followers):
  61. lfg_status += f" leading {len(self.current_followers)} followers,"
  62. schema_hash_repr = f"{self.schema_hash[0]}...{self.schema_hash[-8:]}"
  63. return f"{self.__class__.__name__}(endpoint={self.endpoint}, schema={schema_hash_repr}, {lfg_status}" \
  64. f" current key = {self.group_key_manager.current_key}, client_mode={self.client_mode})"
  65. async def look_for_group(self, *, data_for_gather: bytes, timeout: Optional[float] = None) -> Optional[GroupInfo]:
  66. """
  67. :param data_for_gather: optionally send this data to all peers in the next group and gather it from groupmates
  68. :param timeout: maximum time that may be spent looking for group (does not include allreduce itself)
  69. :returns: an assembled group if successful, None if failed; does NOT perform the actual averaging
  70. Iterate over the averagers from a given group_identifier that have higher leadership priority than yourself.
  71. """
  72. if self.is_looking_for_group:
  73. logger.info("Another look_for_group is already in progress. The current run will be scheduled after"
  74. " the existing group is either assembled or disbanded.")
  75. async with self.lock_looking_for_group:
  76. self.data_for_gather = data_for_gather
  77. request_leaders_task = asyncio.create_task(self._request_join_potential_leaders(timeout))
  78. try:
  79. return await asyncio.wait_for(self.assembled_group, timeout=timeout)
  80. except asyncio.TimeoutError:
  81. return None
  82. except BaseException as e:
  83. if len(self.current_followers) > 0:
  84. async with self.lock_request_join_group:
  85. await self.leader_disband_group()
  86. if not self.assembled_group.done():
  87. self.assembled_group.set_exception(e)
  88. raise
  89. finally:
  90. if not request_leaders_task.done():
  91. request_leaders_task.cancel()
  92. if not self.assembled_group.done():
  93. self.assembled_group.cancel()
  94. while len(self.current_followers) > 0:
  95. await self.follower_was_discarded.wait()
  96. self.follower_was_discarded.clear()
  97. # note: the code above ensures that we send all followers away before creating new future
  98. self.assembled_group = asyncio.Future()
  99. self.was_accepted_to_group.clear()
  100. self.data_for_gather = None
  101. async def _request_join_potential_leaders(self, timeout: Optional[float]) -> GroupInfo:
  102. """ Request leaders from queue until we find the first runner. This coroutine is meant to run in background. """
  103. async with self.potential_leaders.begin_search(self.group_key_manager, timeout, declare=not self.client_mode):
  104. while True:
  105. try:
  106. next_leader = await self.potential_leaders.pop_next_leader() # throws TimeoutError on expiration
  107. group = await self.request_join_group(next_leader, self.potential_leaders.request_expiration_time)
  108. if group is not None:
  109. return group
  110. except asyncio.TimeoutError:
  111. async with self.lock_request_join_group:
  112. if self.assembled_group.done():
  113. return self.assembled_group.result()
  114. elif len(self.current_followers) + 1 >= self.min_group_size:
  115. # the time is up, we have a *good enough* group. run allreduce as is.
  116. return await self.leader_assemble_group()
  117. elif len(self.current_followers) > 0:
  118. await self.leader_disband_group()
  119. continue
  120. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  121. break # note: this is a compatibility layer for python3.7
  122. except Exception as e:
  123. if not self.assembled_group.done():
  124. self.assembled_group.set_exception(e)
  125. raise e
  126. async def request_join_group(self, leader: Endpoint, expiration_time: DHTExpiration) -> Optional[GroupInfo]:
  127. """
  128. :param leader: request this peer to be your leader for allreduce
  129. :param expiration_time: inform leader that we intend to begin averaging before this expiration_time
  130. :returns: if leader leader accepted us and started AllReduce, return that AllReduce. Otherwise, return None
  131. :note: this function does not guarantee that your group leader is the same as :leader: parameter
  132. The originally specified leader can disband group and redirect us to a different leader
  133. """
  134. assert self.is_looking_for_group and self.current_leader is None
  135. call: Optional[grpc.aio.UnaryStreamCall] = None
  136. try:
  137. async with self.lock_request_join_group:
  138. leader_stub = ChannelCache.get_stub(leader, averaging_pb2_grpc.DecentralizedAveragingStub, aio=True)
  139. call = leader_stub.rpc_join_group(averaging_pb2.JoinRequest(
  140. endpoint=self.endpoint, schema_hash=self.schema_hash, expiration=expiration_time,
  141. client_mode=self.client_mode, gather=self.data_for_gather))
  142. message = await asyncio.wait_for(call.read(), timeout=self.request_timeout)
  143. if message.code == averaging_pb2.ACCEPTED:
  144. logger.debug(f"{self.endpoint} - joining the group of {leader}; waiting for peers")
  145. self.current_leader = leader
  146. self.was_accepted_to_group.set()
  147. if len(self.current_followers) > 0:
  148. await self.leader_disband_group()
  149. if message.code != averaging_pb2.ACCEPTED:
  150. code = averaging_pb2.MessageCode.Name(message.code)
  151. logger.debug(f"{self.endpoint} - requested {leader} to be my leader, but got rejected with {code}")
  152. return None
  153. async with self.potential_leaders.pause_search():
  154. time_to_expiration = max(expiration_time - get_dht_time(), 0.0)
  155. message = await asyncio.wait_for(call.read(), time_to_expiration + self.request_timeout)
  156. if message.code == averaging_pb2.BEGIN_ALLREDUCE:
  157. async with self.lock_request_join_group:
  158. return await self.follower_assemble_group(leader, message)
  159. if message.code in (averaging_pb2.GROUP_DISBANDED, averaging_pb2.CANCELLED):
  160. if message.suggested_leader and message.suggested_leader != self.endpoint:
  161. logger.debug(f"{self} - leader disbanded group and redirected us to {message.suggested_leader}")
  162. self.current_leader = None
  163. call.cancel()
  164. return await self.request_join_group(message.suggested_leader, expiration_time)
  165. else:
  166. logger.debug(f"{self} - leader disbanded group")
  167. return None
  168. logger.debug(f"{self} - unexpected message from leader: {averaging_pb2.MessageCode.Name(message.code)}")
  169. return None
  170. except asyncio.TimeoutError:
  171. logger.debug(f"{self} - potential leader {leader} did not respond within {self.request_timeout}")
  172. if call is not None:
  173. call.cancel()
  174. return None
  175. except (grpc.RpcError, grpc.aio.AioRpcError, grpc._cython.cygrpc.InternalError, StopAsyncIteration) as e:
  176. logger.error(f"{self} - failed to request potential leader {leader}: {e}")
  177. return None
  178. finally:
  179. self.was_accepted_to_group.clear()
  180. self.current_leader = None
  181. if call is not None:
  182. await call.code()
  183. async def rpc_join_group(self, request: averaging_pb2.JoinRequest, context: grpc.ServicerContext
  184. ) -> AsyncIterator[averaging_pb2.MessageFromLeader]:
  185. """ accept or reject a join request from another averager; if accepted, run him through allreduce steps """
  186. try:
  187. async with self.lock_request_join_group:
  188. reason_to_reject = self._check_reasons_to_reject(request)
  189. if reason_to_reject is not None:
  190. yield reason_to_reject
  191. return
  192. self.current_followers[request.endpoint] = request
  193. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.ACCEPTED)
  194. if len(self.current_followers) + 1 >= self.target_group_size and not self.assembled_group.done():
  195. # outcome 1: we have assembled a full group and are ready for allreduce
  196. await self.leader_assemble_group()
  197. # wait for the group to be assembled or disbanded
  198. timeout = max(0.0, self.potential_leaders.declared_expiration_time - get_dht_time())
  199. await asyncio.wait({self.assembled_group, self.was_accepted_to_group.wait()},
  200. return_when=asyncio.FIRST_COMPLETED, timeout=timeout)
  201. if not self.assembled_group.done() and not self.was_accepted_to_group.is_set():
  202. async with self.lock_request_join_group:
  203. if self.assembled_group.done():
  204. pass # this covers a rare case when the group is assembled while the event loop was busy.
  205. elif len(self.current_followers) + 1 >= self.min_group_size and self.is_looking_for_group:
  206. # outcome 2: the time is up, run allreduce with what we have or disband
  207. await self.leader_assemble_group()
  208. else:
  209. await self.leader_disband_group()
  210. if self.was_accepted_to_group.is_set() or not self.assembled_group.done() \
  211. or self.assembled_group.cancelled() or request.endpoint not in self.assembled_group.result():
  212. if self.current_leader is not None:
  213. # outcome 3: found by a leader with higher priority, send our followers to him
  214. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.GROUP_DISBANDED,
  215. suggested_leader=self.current_leader)
  216. return
  217. else:
  218. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.GROUP_DISBANDED)
  219. return
  220. group_info = self.assembled_group.result()
  221. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.BEGIN_ALLREDUCE, group_id=group_info.group_id,
  222. ordered_group_endpoints=group_info.endpoints,
  223. gathered=group_info.gathered)
  224. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  225. return # note: this is a compatibility layer for python3.7
  226. except Exception as e:
  227. logger.exception(e)
  228. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.INTERNAL_ERROR)
  229. finally: # note: this code is guaranteed to run even if the coroutine is destroyed prematurely
  230. self.current_followers.pop(request.endpoint, None)
  231. self.follower_was_discarded.set()
  232. def _check_reasons_to_reject(self, request: averaging_pb2.JoinRequest) -> Optional[averaging_pb2.MessageFromLeader]:
  233. """ :returns: if accepted, return None, otherwise return a reason for rejection """
  234. if not self.is_looking_for_group or self.assembled_group.done():
  235. return averaging_pb2.MessageFromLeader(code=averaging_pb2.NOT_LOOKING_FOR_GROUP)
  236. if request.ListFields() == 3 and not isinstance(request.schema_hash, bytes) or len(request.schema_hash) == 0 \
  237. or not isinstance(request.expiration, DHTExpiration) or not isfinite(request.expiration) \
  238. or not isinstance(request.endpoint, Endpoint) or len(request.endpoint) == 0 or self.client_mode:
  239. return averaging_pb2.MessageFromLeader(code=averaging_pb2.PROTOCOL_VIOLATION)
  240. elif request.schema_hash != self.schema_hash:
  241. return averaging_pb2.MessageFromLeader(code=averaging_pb2.BAD_SCHEMA_HASH)
  242. elif self.potential_leaders.declared_group_key is None:
  243. return averaging_pb2.MessageFromLeader(code=averaging_pb2.NOT_DECLARED)
  244. elif self.potential_leaders.declared_expiration_time > (request.expiration or float('inf')):
  245. return averaging_pb2.MessageFromLeader(code=averaging_pb2.BAD_EXPIRATION_TIME)
  246. elif self.current_leader is not None:
  247. return averaging_pb2.MessageFromLeader(code=averaging_pb2.NOT_A_LEADER, suggested_leader=self.current_leader
  248. ) # note: this suggested leader is currently ignored
  249. elif request.endpoint == self.endpoint or request.endpoint in self.current_followers:
  250. return averaging_pb2.MessageFromLeader(code=averaging_pb2.DUPLICATE_ENDPOINT)
  251. elif len(self.current_followers) + 1 >= self.target_group_size:
  252. return averaging_pb2.MessageFromLeader(code=averaging_pb2.GROUP_IS_FULL)
  253. else:
  254. return None
  255. async def leader_assemble_group(self) -> GroupInfo:
  256. """ Form up all current followers into a group and gather metadata """
  257. assert self.lock_looking_for_group.locked() and self.lock_request_join_group.locked() and not self.client_mode
  258. assert not self.assembled_group.done()
  259. group_id = DHTID.generate().to_bytes() # note: both groupd_id and the order of endpoints must be random
  260. ordered_group_endpoints = list(self.current_followers)
  261. ordered_group_endpoints.append(self.endpoint)
  262. random.shuffle(ordered_group_endpoints)
  263. gathered = tuple(self.data_for_gather if endpoint == self.endpoint else self.current_followers[endpoint].gather
  264. for endpoint in ordered_group_endpoints)
  265. logger.debug(f"{self.endpoint} - assembled group of {len(ordered_group_endpoints)} peers.")
  266. group_info = GroupInfo(group_id, tuple(ordered_group_endpoints), gathered)
  267. await self.group_key_manager.update_key_on_group_assembled(group_info, is_leader=True)
  268. self.assembled_group.set_result(group_info)
  269. return group_info
  270. async def follower_assemble_group(self, leader: Endpoint, msg: averaging_pb2.MessageFromLeader) -> GroupInfo:
  271. """ Form a group from using peers and metadata provided by our leader """
  272. assert self.lock_looking_for_group.locked() and self.lock_request_join_group.locked()
  273. assert not self.assembled_group.done()
  274. assert self.current_leader == leader, f"averager does not follow {leader} (actual: {self.current_leader})"
  275. group_id, ordered_group_endpoints = msg.group_id, msg.ordered_group_endpoints
  276. assert self.endpoint in ordered_group_endpoints, "Leader sent us group_endpoints that does not contain us!"
  277. assert len(ordered_group_endpoints) == len(msg.gathered)
  278. logger.debug(f"{self.endpoint} - follower assembled group with leader {leader}.")
  279. group_info = GroupInfo(group_id, tuple(ordered_group_endpoints), tuple(msg.gathered))
  280. await self.group_key_manager.update_key_on_group_assembled(group_info)
  281. self.assembled_group.set_result(group_info)
  282. return group_info
  283. async def leader_disband_group(self):
  284. """ Kick out all followers immediately, optionally direct them to our new leader (if we found one) """
  285. assert self.lock_request_join_group.locked() and not self.client_mode
  286. self.current_followers.clear() # this will cause rpc_join_group to kick all followers out
  287. class PotentialLeaders:
  288. """ An utility class that searches for averagers that could become our leaders """
  289. def __init__(self, endpoint: Endpoint, averaging_expiration: DHTExpiration, target_group_size: Optional[int]):
  290. self.endpoint, self.averaging_expiration = endpoint, averaging_expiration
  291. self.target_group_size = target_group_size
  292. self.running, self.update_triggered, self.update_finished = asyncio.Event(), asyncio.Event(), asyncio.Event()
  293. self.declared_expiration, self.lock_search, self.lock_declare = asyncio.Event(), asyncio.Lock(), asyncio.Lock()
  294. self.leader_queue = TimedStorage[Endpoint, DHTExpiration]()
  295. self.past_attempts: Set[Tuple[Endpoint, DHTExpiration]] = set()
  296. self.declared_expiration_time = float('inf')
  297. self.declared_group_key: Optional[GroupKey] = None
  298. self.max_assured_time = float('-inf')
  299. self.search_end_time = float('inf')
  300. @contextlib.asynccontextmanager
  301. async def begin_search(self, key_manager: GroupKeyManager, timeout: Optional[float], declare: bool = True):
  302. async with self.lock_search:
  303. self.running.set()
  304. self.search_end_time = get_dht_time() + timeout if timeout is not None else float('inf')
  305. update_queue_task = asyncio.create_task(self._update_queue_periodically(key_manager))
  306. if declare:
  307. declare_averager_task = asyncio.create_task(self._declare_averager_periodically(key_manager))
  308. try:
  309. yield self
  310. finally:
  311. if not update_queue_task.done():
  312. update_queue_task.cancel()
  313. if declare and not declare_averager_task.done():
  314. declare_averager_task.cancel()
  315. for field in (self.past_attempts, self.leader_queue, self.running,
  316. self.update_finished, self.update_triggered, self.declared_expiration):
  317. field.clear()
  318. self.max_assured_time = float('-inf')
  319. self.search_end_time = float('inf')
  320. @contextlib.asynccontextmanager
  321. async def pause_search(self):
  322. was_running = self.running.is_set()
  323. try:
  324. self.running.clear()
  325. yield
  326. finally:
  327. if was_running:
  328. self.running.set()
  329. else:
  330. self.running.clear()
  331. async def pop_next_leader(self) -> Endpoint:
  332. """ Remove and return the next most suitable leader or throw an exception if reached timeout """
  333. assert self.running.is_set(), "Not running search at the moment"
  334. while True:
  335. maybe_next_leader, entry = self.leader_queue.top()
  336. if maybe_next_leader is None or self.max_assured_time <= entry.expiration_time <= self.search_end_time:
  337. self.update_triggered.set()
  338. if maybe_next_leader is None or entry.expiration_time >= self.declared_expiration_time:
  339. await asyncio.wait({self.update_finished.wait(), self.declared_expiration.wait()},
  340. return_when=asyncio.FIRST_COMPLETED)
  341. self.declared_expiration.clear()
  342. if self.update_finished.is_set():
  343. self.update_finished.clear()
  344. continue
  345. else:
  346. raise asyncio.TimeoutError("pop_next_leader was invalidated: re-declared averager in background")
  347. del self.leader_queue[maybe_next_leader]
  348. self.past_attempts.add((maybe_next_leader, entry.expiration_time))
  349. return maybe_next_leader
  350. @property
  351. def request_expiration_time(self) -> float:
  352. """ this averager's current expiration time - used to send join requests to leaders """
  353. if isfinite(self.declared_expiration_time):
  354. return self.declared_expiration_time
  355. else:
  356. return min(get_dht_time() + self.averaging_expiration, self.search_end_time)
  357. async def _update_queue_periodically(self, key_manager: GroupKeyManager):
  358. try:
  359. DISCREPANCY = timed_storage.MAX_DHT_TIME_DISCREPANCY_SECONDS
  360. while get_dht_time() < self.search_end_time:
  361. new_peers = await key_manager.get_averagers(key_manager.current_key, only_active=True)
  362. self.max_assured_time = max(self.max_assured_time,
  363. get_dht_time() + self.averaging_expiration - DISCREPANCY)
  364. self.leader_queue.clear()
  365. for peer, peer_expiration_time in new_peers:
  366. if peer == self.endpoint or (peer, peer_expiration_time) in self.past_attempts:
  367. continue
  368. self.leader_queue.store(peer, peer_expiration_time, peer_expiration_time)
  369. self.max_assured_time = max(self.max_assured_time, peer_expiration_time - DISCREPANCY)
  370. self.update_finished.set()
  371. await asyncio.wait(
  372. {self.running.wait(), self.update_triggered.wait()}, return_when=asyncio.ALL_COMPLETED,
  373. timeout=self.search_end_time - get_dht_time() if isfinite(self.search_end_time) else None)
  374. self.update_triggered.clear()
  375. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  376. return # note: this is a compatibility layer for python3.7
  377. except Exception as e:
  378. logger.error(f"{self.endpoint} - caught {type(e)}: {e}")
  379. raise
  380. async def _declare_averager_periodically(self, key_manager: GroupKeyManager):
  381. async with self.lock_declare:
  382. try:
  383. while True:
  384. await self.running.wait()
  385. new_expiration_time = min(get_dht_time() + self.averaging_expiration, self.search_end_time)
  386. self.declared_group_key = group_key = key_manager.current_key
  387. self.declared_expiration_time = new_expiration_time
  388. self.declared_expiration.set()
  389. await key_manager.declare_averager(group_key, self.endpoint, expiration_time=new_expiration_time)
  390. await asyncio.sleep(self.declared_expiration_time - get_dht_time())
  391. if self.running.is_set() and len(self.leader_queue) == 0:
  392. await key_manager.update_key_on_not_enough_peers()
  393. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  394. pass # note: this is a compatibility layer for python3.7
  395. except Exception as e: # note: we catch exceptions here because otherwise they are never printed
  396. logger.error(f"{self.endpoint} - caught {type(e)}: {e}")
  397. finally:
  398. if self.declared_group_key is not None:
  399. prev_declared_key, prev_expiration_time = self.declared_group_key, self.declared_expiration_time
  400. self.declared_group_key, self.declared_expiration_time = None, float('inf')
  401. self.leader_queue, self.max_assured_time = TimedStorage[Endpoint, DHTExpiration](), float('-inf')
  402. await key_manager.declare_averager(prev_declared_key, self.endpoint, prev_expiration_time,
  403. looking_for_group=False)
  404. class MatchmakingException(Exception):
  405. """ An internal exception that marks undesired edge cases during averaging """