matchmaking.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. """ A background process that averages your tensors with peers """
  2. from __future__ import annotations
  3. import asyncio
  4. import concurrent.futures
  5. import contextlib
  6. import random
  7. from math import isfinite
  8. from typing import AsyncIterator, Dict, Optional, Set, Tuple, Type
  9. from hivemind.averaging.control import StepControl
  10. from hivemind.averaging.group_info import GroupInfo
  11. from hivemind.averaging.key_manager import GroupKey, GroupKeyManager
  12. from hivemind.dht import DHT, DHTID, DHTExpiration
  13. from hivemind.p2p import P2P, P2PContext, P2PHandlerError, PeerID, ServicerBase
  14. from hivemind.p2p.p2p_daemon_bindings.utils import ControlFailure, DispatchFailure
  15. from hivemind.proto import averaging_pb2
  16. from hivemind.utils import TimedStorage, get_dht_time, get_logger, timed_storage
  17. from hivemind.utils.asyncio import anext, cancel_and_wait
  18. logger = get_logger(__name__)
  19. class Matchmaking:
  20. f"""
  21. An internal class that is used to form groups of averages for running allreduce
  22. See DecentralizedAverager docstring for the detailed description of all parameters
  23. :note: on implementation: the current matchmaker protocol can encounter one type of (temporary) deadlock;
  24. This deadlock occurs when averager A requests averager B at the same time as averager B requests averager A.
  25. In that case, neither averager can process the other one's request because it is awaiting lock_request_join_group.
  26. This deadlock only happens if averagers have outdated information on expirations (due to network delays).
  27. While A->B->A deadlock is easy to fix, it gets much harder with more peers (e.g. A -> B -> C -> D -> A).
  28. Hence, instead of accounting for such deadlocks, we simply break them with request_timeout.
  29. """
  30. def __init__(
  31. self,
  32. p2p: P2P,
  33. schema_hash: bytes,
  34. dht: DHT,
  35. *,
  36. servicer_type: Type[ServicerBase],
  37. prefix: str,
  38. target_group_size: Optional[int],
  39. min_group_size: int,
  40. min_matchmaking_time: float,
  41. request_timeout: float,
  42. client_mode: bool,
  43. initial_group_bits: str = "",
  44. ):
  45. assert "." not in prefix, "group prefix must be a string without ."
  46. if request_timeout is None or request_timeout >= min_matchmaking_time:
  47. logger.warning(
  48. "It is recommended to use request_timeout smaller than min_matchmaking_time. Otherwise,"
  49. " matchmaking can cause deadlocks in some rare cases. Please see Matchmaking docstring."
  50. )
  51. super().__init__()
  52. self._p2p = p2p
  53. if not issubclass(servicer_type, ServicerBase):
  54. raise TypeError("`servicer_type` is expected to be a ServicerBase subclass")
  55. self._servicer_type = servicer_type
  56. self._prefix = prefix
  57. self.peer_id = p2p.peer_id
  58. self.schema_hash = schema_hash
  59. self.group_key_manager = GroupKeyManager(dht, prefix, initial_group_bits, target_group_size)
  60. self.target_group_size, self.min_group_size = target_group_size, min_group_size
  61. self.min_matchmaking_time, self.request_timeout = min_matchmaking_time, request_timeout
  62. self.client_mode = client_mode
  63. self.lock_looking_for_group = asyncio.Lock()
  64. self.lock_request_join_group = asyncio.Lock()
  65. self.follower_was_discarded = asyncio.Event()
  66. self.was_accepted_to_group = asyncio.Event()
  67. self.assembled_group = asyncio.Future()
  68. self.current_leader: Optional[PeerID] = None # iff i am a follower, this is a link to my current leader
  69. self.current_followers: Dict[PeerID, averaging_pb2.JoinRequest] = {} # my current followers excluding myself
  70. self.potential_leaders = PotentialLeaders(self.peer_id, min_matchmaking_time, target_group_size)
  71. self.step_control: Optional[StepControl] = None
  72. @contextlib.asynccontextmanager
  73. async def looking_for_group(self, step_control: StepControl):
  74. async with self.lock_looking_for_group:
  75. assert self.step_control is None
  76. try:
  77. self.step_control = step_control
  78. yield
  79. finally:
  80. self.step_control = None
  81. @property
  82. def is_looking_for_group(self):
  83. return self.lock_looking_for_group.locked()
  84. def __repr__(self):
  85. lfg_status = "looking for group," if self.is_looking_for_group else "not looking for group,"
  86. if self.is_looking_for_group:
  87. if self.current_leader:
  88. lfg_status += f" following {self.current_leader},"
  89. if len(self.current_followers):
  90. lfg_status += f" leading {len(self.current_followers)} followers,"
  91. schema_hash_repr = f"{self.schema_hash[0]}...{self.schema_hash[-8:]}"
  92. return (
  93. f"{self.__class__.__name__}(peer_id={self.peer_id}, schema={schema_hash_repr}, {lfg_status}"
  94. f" current key = {self.group_key_manager.current_key}, client_mode={self.client_mode})"
  95. )
  96. async def look_for_group(self, step: StepControl) -> Optional[GroupInfo]:
  97. """
  98. :param step: step parameters and user control structure for the current step
  99. :returns: an assembled group if successful, None if failed; does NOT perform the actual averaging
  100. Iterate over the averagers from a given group_identifier that have higher leadership priority than yourself.
  101. """
  102. if self.is_looking_for_group:
  103. logger.info(
  104. "Another look_for_group is already in progress. The current run will be scheduled after"
  105. " the existing group is either assembled or disbanded."
  106. )
  107. async with self.looking_for_group(step):
  108. request_leaders_task = asyncio.create_task(self._request_join_potential_leaders(step))
  109. try:
  110. return await asyncio.wait_for(self.assembled_group, timeout=step.get_timeout())
  111. except asyncio.TimeoutError:
  112. return None
  113. except BaseException as e:
  114. if len(self.current_followers) > 0:
  115. async with self.lock_request_join_group:
  116. await self.leader_disband_group()
  117. if not self.assembled_group.done():
  118. self.assembled_group.set_exception(e)
  119. raise
  120. finally:
  121. await cancel_and_wait(request_leaders_task)
  122. self.assembled_group.cancel()
  123. while len(self.current_followers) > 0:
  124. await self.follower_was_discarded.wait()
  125. self.follower_was_discarded.clear()
  126. # note: the code above ensures that we send all followers away before creating new future
  127. self.assembled_group = asyncio.Future()
  128. self.was_accepted_to_group.clear()
  129. async def _request_join_potential_leaders(self, step: StepControl) -> GroupInfo:
  130. """Request leaders from queue until we find the first runner. This coroutine is meant to run in background."""
  131. assert self.is_looking_for_group
  132. async with self.potential_leaders.begin_search(step, self.group_key_manager, declare=not self.client_mode):
  133. while True:
  134. try:
  135. next_leader = await self.potential_leaders.pop_next_leader() # throws TimeoutError on expiration
  136. group = await self._request_join_group(next_leader)
  137. if group is not None:
  138. return group
  139. except asyncio.TimeoutError:
  140. async with self.lock_request_join_group:
  141. if self.assembled_group.done():
  142. return self.assembled_group.result()
  143. elif len(self.current_followers) + 1 >= self.min_group_size:
  144. # the time is up, we have a *good enough* group. run allreduce as is.
  145. return await self.leader_assemble_group()
  146. elif len(self.current_followers) > 0:
  147. await self.leader_disband_group()
  148. continue
  149. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  150. break # note: this is a compatibility layer for python3.7
  151. except Exception as e:
  152. if not self.assembled_group.done():
  153. self.assembled_group.set_exception(e)
  154. raise e
  155. async def _request_join_group(self, leader: PeerID) -> Optional[GroupInfo]:
  156. """
  157. :param leader: request this peer to be your leader for allreduce
  158. :returns: if leader leader accepted us and started AllReduce, return that AllReduce. Otherwise, return None
  159. :note: this function does not guarantee that your group leader is the same as :leader: parameter
  160. The originally specified leader can disband group and redirect us to a different leader
  161. """
  162. assert self.is_looking_for_group and self.current_leader is None
  163. stream: Optional[AsyncIterator[averaging_pb2.MessageFromLeader]] = None
  164. try:
  165. async with self.lock_request_join_group:
  166. leader_stub = self._servicer_type.get_stub(self._p2p, leader, namespace=self._prefix)
  167. request_expiration_time = self.get_request_expiration_time()
  168. stream = await leader_stub.rpc_join_group(
  169. averaging_pb2.JoinRequest(
  170. schema_hash=self.schema_hash,
  171. expiration=request_expiration_time,
  172. client_mode=self.client_mode,
  173. gather=self.step_control.data_for_gather,
  174. group_key=self.group_key_manager.current_key,
  175. )
  176. )
  177. message = await asyncio.wait_for(anext(stream), timeout=self.request_timeout)
  178. if message.code == averaging_pb2.ACCEPTED:
  179. logger.debug(f"{self.peer_id} - joining the group of {leader}, waiting for peers")
  180. self.current_leader = leader
  181. self.was_accepted_to_group.set()
  182. if len(self.current_followers) > 0:
  183. await self.leader_disband_group()
  184. if message.code != averaging_pb2.ACCEPTED:
  185. code = averaging_pb2.MessageCode.Name(message.code)
  186. logger.debug(f"{self.peer_id} - requested {leader} to be my leader, but got rejected with {code}")
  187. return None
  188. async with self.potential_leaders.pause_search():
  189. time_to_expiration = max(0.0, request_expiration_time - get_dht_time())
  190. message = await asyncio.wait_for(anext(stream), time_to_expiration + self.request_timeout)
  191. if message.code == averaging_pb2.BEGIN_ALLREDUCE:
  192. async with self.lock_request_join_group:
  193. return await self.follower_assemble_group(leader, message)
  194. if message.code in (averaging_pb2.GROUP_DISBANDED, averaging_pb2.CANCELLED):
  195. if message.suggested_leader:
  196. suggested_leader = PeerID(message.suggested_leader)
  197. if suggested_leader != self.peer_id:
  198. logger.debug(f"{self} - leader disbanded group and redirected us to {suggested_leader}")
  199. self.current_leader = None
  200. try:
  201. await stream.aclose()
  202. except RuntimeError as e:
  203. logger.debug(e, exc_info=True)
  204. return await self._request_join_group(suggested_leader)
  205. logger.debug(f"{self} - leader disbanded group")
  206. return None
  207. logger.debug(f"{self} - unexpected message from leader: {averaging_pb2.MessageCode.Name(message.code)}")
  208. return None
  209. except asyncio.TimeoutError:
  210. logger.debug(f"{self} - potential leader {leader} did not respond within {self.request_timeout}")
  211. return None
  212. except (P2PHandlerError, ControlFailure, DispatchFailure, StopAsyncIteration) as e:
  213. logger.debug(f"{self} - failed to request potential leader {leader}:", exc_info=True)
  214. return None
  215. finally:
  216. self.was_accepted_to_group.clear()
  217. self.current_leader = None
  218. if stream is not None:
  219. try:
  220. await stream.aclose()
  221. except RuntimeError as e:
  222. logger.debug(e, exc_info=True)
  223. def get_request_expiration_time(self) -> float:
  224. """Returns the averager's current expiration time, which is used to send join requests to leaders"""
  225. if isfinite(self.potential_leaders.declared_expiration_time):
  226. return self.potential_leaders.declared_expiration_time
  227. else:
  228. scheduled_time = max(self.step_control.scheduled_time, get_dht_time() + self.min_matchmaking_time)
  229. return min(scheduled_time, self.potential_leaders.search_end_time)
  230. async def rpc_join_group(
  231. self, request: averaging_pb2.JoinRequest, context: P2PContext
  232. ) -> AsyncIterator[averaging_pb2.MessageFromLeader]:
  233. """accept or reject a join request from another averager; if accepted, run him through allreduce steps"""
  234. try:
  235. async with self.lock_request_join_group:
  236. reason_to_reject = self._check_reasons_to_reject(request, context)
  237. if reason_to_reject is not None:
  238. yield reason_to_reject
  239. return
  240. self.current_followers[context.remote_id] = request
  241. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.ACCEPTED)
  242. if (
  243. self.target_group_size is not None
  244. and len(self.current_followers) + 1 >= self.target_group_size
  245. and not self.assembled_group.done()
  246. ):
  247. # outcome 1: we have assembled a full group and are ready for allreduce
  248. await self.leader_assemble_group()
  249. # wait for the group to be assembled or disbanded
  250. timeout = max(0.0, self.potential_leaders.declared_expiration_time - get_dht_time())
  251. await asyncio.wait(
  252. {self.assembled_group, self.was_accepted_to_group.wait()},
  253. return_when=asyncio.FIRST_COMPLETED,
  254. timeout=timeout,
  255. )
  256. if not self.assembled_group.done() and not self.was_accepted_to_group.is_set():
  257. async with self.lock_request_join_group:
  258. if self.assembled_group.done():
  259. pass # this covers a rare case when the group is assembled while the event loop was busy.
  260. elif len(self.current_followers) + 1 >= self.min_group_size and self.is_looking_for_group:
  261. # outcome 2: the time is up, run allreduce with what we have or disband
  262. await self.leader_assemble_group()
  263. else:
  264. await self.leader_disband_group()
  265. if (
  266. self.was_accepted_to_group.is_set()
  267. or not self.assembled_group.done()
  268. or self.assembled_group.cancelled()
  269. or context.remote_id not in self.assembled_group.result()
  270. ):
  271. if self.current_leader is not None:
  272. # outcome 3: found by a leader with higher priority, send our followers to him
  273. yield averaging_pb2.MessageFromLeader(
  274. code=averaging_pb2.GROUP_DISBANDED, suggested_leader=self.current_leader.to_bytes()
  275. )
  276. return
  277. else:
  278. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.GROUP_DISBANDED)
  279. return
  280. group_info = self.assembled_group.result()
  281. yield averaging_pb2.MessageFromLeader(
  282. code=averaging_pb2.BEGIN_ALLREDUCE,
  283. group_id=group_info.group_id,
  284. ordered_peer_ids=[item.to_bytes() for item in group_info.peer_ids],
  285. gathered=group_info.gathered,
  286. )
  287. except (concurrent.futures.CancelledError, asyncio.CancelledError):
  288. return # note: this is a compatibility layer for python3.7
  289. except Exception as e:
  290. logger.exception(e)
  291. yield averaging_pb2.MessageFromLeader(code=averaging_pb2.INTERNAL_ERROR)
  292. finally: # note: this code is guaranteed to run even if the coroutine is destroyed prematurely
  293. self.current_followers.pop(context.remote_id, None)
  294. self.follower_was_discarded.set()
  295. def _check_reasons_to_reject(
  296. self, request: averaging_pb2.JoinRequest, context: P2PContext
  297. ) -> Optional[averaging_pb2.MessageFromLeader]:
  298. """:returns: if accepted, return None, otherwise return a reason for rejection"""
  299. if not self.is_looking_for_group or self.assembled_group.done():
  300. return averaging_pb2.MessageFromLeader(code=averaging_pb2.NOT_LOOKING_FOR_GROUP)
  301. if (
  302. request.ListFields() == 3
  303. and not isinstance(request.schema_hash, bytes)
  304. or len(request.schema_hash) == 0
  305. or not isinstance(request.expiration, DHTExpiration)
  306. or not isfinite(request.expiration)
  307. or self.client_mode
  308. or not isinstance(request.group_key, GroupKey)
  309. ):
  310. return averaging_pb2.MessageFromLeader(code=averaging_pb2.PROTOCOL_VIOLATION)
  311. elif request.schema_hash != self.schema_hash:
  312. return averaging_pb2.MessageFromLeader(code=averaging_pb2.BAD_SCHEMA_HASH)
  313. elif request.group_key != self.group_key_manager.current_key:
  314. return averaging_pb2.MessageFromLeader(code=averaging_pb2.BAD_GROUP_KEY)
  315. elif self.potential_leaders.declared_group_key is None:
  316. return averaging_pb2.MessageFromLeader(code=averaging_pb2.NOT_DECLARED)
  317. elif self.potential_leaders.declared_expiration_time > (request.expiration or float("inf")):
  318. return averaging_pb2.MessageFromLeader(code=averaging_pb2.BAD_EXPIRATION_TIME)
  319. elif self.current_leader is not None:
  320. return averaging_pb2.MessageFromLeader(
  321. code=averaging_pb2.NOT_A_LEADER, suggested_leader=self.current_leader.to_bytes()
  322. )
  323. elif context.remote_id == self.peer_id or context.remote_id in self.current_followers:
  324. return averaging_pb2.MessageFromLeader(code=averaging_pb2.DUPLICATE_PEER_ID)
  325. elif self.target_group_size is not None and len(self.current_followers) + 1 >= self.target_group_size:
  326. return averaging_pb2.MessageFromLeader(code=averaging_pb2.GROUP_IS_FULL)
  327. else:
  328. return None
  329. async def leader_assemble_group(self) -> GroupInfo:
  330. """Form up all current followers into a group and gather metadata"""
  331. assert self.lock_looking_for_group.locked() and self.lock_request_join_group.locked() and not self.client_mode
  332. assert not self.assembled_group.done()
  333. group_id = DHTID.generate().to_bytes() # note: both groupd_id and the order of peer_ids must be random
  334. ordered_peer_ids = list(self.current_followers)
  335. ordered_peer_ids.append(self.peer_id)
  336. random.shuffle(ordered_peer_ids)
  337. gathered = tuple(
  338. self.step_control.data_for_gather if peer_id == self.peer_id else self.current_followers[peer_id].gather
  339. for peer_id in ordered_peer_ids
  340. )
  341. logger.debug(f"{self.peer_id} - assembled group of {len(ordered_peer_ids)} peers")
  342. group_info = GroupInfo(group_id, tuple(ordered_peer_ids), gathered)
  343. await self.group_key_manager.update_key_on_group_assembled(group_info, is_leader=True)
  344. self.assembled_group.set_result(group_info)
  345. return group_info
  346. async def follower_assemble_group(self, leader: PeerID, msg: averaging_pb2.MessageFromLeader) -> GroupInfo:
  347. """Form a group from using peers and metadata provided by our leader"""
  348. assert self.lock_looking_for_group.locked() and self.lock_request_join_group.locked()
  349. assert not self.assembled_group.done()
  350. assert self.current_leader == leader, f"averager does not follow {leader} (actual: {self.current_leader})"
  351. group_id = msg.group_id
  352. ordered_peer_ids = [PeerID(item) for item in msg.ordered_peer_ids]
  353. assert self.peer_id in ordered_peer_ids, "Leader sent us group_peer_ids that does not contain us!"
  354. assert len(ordered_peer_ids) == len(msg.gathered)
  355. logger.debug(f"{self.peer_id} - follower assembled group with leader {leader}")
  356. group_info = GroupInfo(group_id, tuple(ordered_peer_ids), tuple(msg.gathered))
  357. await self.group_key_manager.update_key_on_group_assembled(group_info)
  358. self.assembled_group.set_result(group_info)
  359. return group_info
  360. async def leader_disband_group(self):
  361. """Kick out all followers immediately, optionally direct them to our new leader (if we found one)"""
  362. assert self.lock_request_join_group.locked() and not self.client_mode
  363. self.current_followers.clear() # this will cause rpc_join_group to kick all followers out
  364. class PotentialLeaders:
  365. """An utility class that searches for averagers that could become our leaders"""
  366. def __init__(self, peer_id: PeerID, min_matchmaking_time: DHTExpiration, target_group_size: Optional[int]):
  367. self.peer_id, self.min_matchmaking_time = peer_id, min_matchmaking_time
  368. self.target_group_size = target_group_size
  369. self.running, self.update_triggered, self.update_finished = asyncio.Event(), asyncio.Event(), asyncio.Event()
  370. self.declared_expiration, self.lock_search, self.lock_declare = asyncio.Event(), asyncio.Lock(), asyncio.Lock()
  371. self.leader_queue = TimedStorage[PeerID, DHTExpiration]()
  372. self.past_attempts: Set[Tuple[PeerID, DHTExpiration]] = set()
  373. self.declared_expiration_time = float("inf")
  374. self.declared_group_key: Optional[GroupKey] = None
  375. self.max_assured_time = float("-inf")
  376. self.search_end_time = float("inf")
  377. @contextlib.asynccontextmanager
  378. async def begin_search(self, step: StepControl, key_manager: GroupKeyManager, declare: bool = True):
  379. async with self.lock_search:
  380. self.running.set()
  381. self.search_end_time = step.deadline if step.deadline is not None else float("inf")
  382. update_queue_task = asyncio.create_task(self._update_queue_periodically(key_manager))
  383. if declare:
  384. declare_averager_task = asyncio.create_task(self._declare_averager_periodically(step, key_manager))
  385. try:
  386. yield self
  387. finally:
  388. await cancel_and_wait(update_queue_task)
  389. if declare:
  390. await cancel_and_wait(declare_averager_task)
  391. for field in (
  392. self.past_attempts,
  393. self.leader_queue,
  394. self.running,
  395. self.update_finished,
  396. self.update_triggered,
  397. self.declared_expiration,
  398. ):
  399. field.clear()
  400. self.max_assured_time = float("-inf")
  401. self.search_end_time = float("inf")
  402. @contextlib.asynccontextmanager
  403. async def pause_search(self):
  404. was_running = self.running.is_set()
  405. try:
  406. self.running.clear()
  407. yield
  408. finally:
  409. if was_running:
  410. self.running.set()
  411. else:
  412. self.running.clear()
  413. async def pop_next_leader(self) -> PeerID:
  414. """Remove and return the next most suitable leader or throw an exception if reached timeout"""
  415. assert self.running.is_set(), "Not running search at the moment"
  416. while True:
  417. maybe_next_leader, entry = self.leader_queue.top()
  418. if maybe_next_leader is None or self.max_assured_time <= entry.expiration_time <= self.search_end_time:
  419. self.update_triggered.set()
  420. if maybe_next_leader is None or (entry.expiration_time, maybe_next_leader.to_bytes()) > (
  421. self.declared_expiration_time,
  422. self.peer_id.to_bytes(),
  423. ):
  424. await asyncio.wait(
  425. {self.update_finished.wait(), self.declared_expiration.wait()}, return_when=asyncio.FIRST_COMPLETED
  426. )
  427. self.declared_expiration.clear()
  428. if self.update_finished.is_set():
  429. self.update_finished.clear()
  430. continue
  431. else:
  432. raise asyncio.TimeoutError("pop_next_leader was invalidated: re-declared averager in background")
  433. del self.leader_queue[maybe_next_leader]
  434. self.past_attempts.add((maybe_next_leader, entry.expiration_time))
  435. return maybe_next_leader
  436. async def _update_queue_periodically(self, key_manager: GroupKeyManager) -> None:
  437. DISCREPANCY = timed_storage.MAX_DHT_TIME_DISCREPANCY_SECONDS
  438. while get_dht_time() < self.search_end_time:
  439. new_peers = await key_manager.get_averagers(key_manager.current_key, only_active=True)
  440. self.max_assured_time = max(
  441. self.max_assured_time, get_dht_time() + self.min_matchmaking_time - DISCREPANCY
  442. )
  443. self.leader_queue.clear()
  444. for peer, peer_expiration_time in new_peers:
  445. if peer == self.peer_id or (peer, peer_expiration_time) in self.past_attempts:
  446. continue
  447. self.leader_queue.store(peer, peer_expiration_time, peer_expiration_time)
  448. self.max_assured_time = max(self.max_assured_time, peer_expiration_time - DISCREPANCY)
  449. self.update_finished.set()
  450. await asyncio.wait(
  451. {self.running.wait(), self.update_triggered.wait()},
  452. return_when=asyncio.ALL_COMPLETED,
  453. timeout=self.search_end_time - get_dht_time() if isfinite(self.search_end_time) else None,
  454. )
  455. self.update_triggered.clear()
  456. async def _declare_averager_periodically(self, step: StepControl, key_manager: GroupKeyManager) -> None:
  457. async with self.lock_declare:
  458. try:
  459. while True:
  460. await self.running.wait()
  461. new_expiration_time = float(
  462. min(max(step.scheduled_time, get_dht_time() + self.min_matchmaking_time), self.search_end_time)
  463. )
  464. self.declared_group_key = group_key = key_manager.current_key
  465. self.declared_expiration_time = new_expiration_time
  466. self.declared_expiration.set()
  467. await key_manager.declare_averager(group_key, self.peer_id, expiration_time=new_expiration_time)
  468. await asyncio.sleep(self.declared_expiration_time - get_dht_time())
  469. if self.running.is_set() and len(self.leader_queue) == 0:
  470. await key_manager.update_key_on_not_enough_peers()
  471. finally:
  472. if self.declared_group_key is not None:
  473. prev_declared_key, prev_expiration_time = self.declared_group_key, self.declared_expiration_time
  474. self.declared_group_key, self.declared_expiration_time = None, float("inf")
  475. self.leader_queue, self.max_assured_time = TimedStorage[PeerID, DHTExpiration](), float("-inf")
  476. await key_manager.declare_averager(
  477. prev_declared_key, self.peer_id, prev_expiration_time, looking_for_group=False
  478. )
  479. class MatchmakingException(Exception):
  480. """An internal exception that marks undesired edge cases during averaging"""