block_selection.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from dataclasses import dataclass
  2. from typing import Dict, List, Optional, Tuple
  3. import numpy as np
  4. from hivemind import PeerID, get_logger
  5. from src.data_structures import RemoteModuleInfo, ServerState
  6. __all__ = ["choose_best_blocks", "should_choose_other_blocks"]
  7. logger = get_logger(__file__)
  8. @dataclass
  9. class Span:
  10. start: int
  11. end: int
  12. throughput: float
  13. @property
  14. def length(self):
  15. return self.end - self.start
  16. def move_to(self, new_start: int) -> None:
  17. self.start, self.end = new_start, new_start + self.length
  18. def _compute_spans(module_infos: List[Optional[RemoteModuleInfo]]) -> Tuple[Dict[PeerID, Span], np.ndarray]:
  19. spans = {}
  20. throughputs = np.zeros(len(module_infos))
  21. for block, module in enumerate(module_infos):
  22. if module is None:
  23. continue
  24. for peer_id, server in module.servers.items():
  25. if server.state == ServerState.OFFLINE:
  26. continue
  27. if peer_id in spans:
  28. spans[peer_id].start = min(spans[peer_id].start, block)
  29. spans[peer_id].end = max(spans[peer_id].start, block + 1)
  30. else:
  31. spans[peer_id] = Span(start=block, end=block + 1, throughput=server.throughput)
  32. throughputs[block] += server.throughput
  33. return spans, throughputs
  34. def _choose_best_start(throughputs: np.ndarray, num_blocks: int, cur_start: Optional[int]) -> int:
  35. options = (
  36. (sorted(throughputs[i : i + num_blocks]), i != cur_start, i)
  37. for i in range(0, len(throughputs) - num_blocks + 1)
  38. )
  39. return min(options)[-1]
  40. def choose_best_blocks(num_blocks: int, module_infos: List[Optional[RemoteModuleInfo]]) -> List[int]:
  41. _, throughputs = _compute_spans(module_infos)
  42. start = _choose_best_start(throughputs, num_blocks, None)
  43. return list(range(start, start + num_blocks))
  44. def should_choose_other_blocks(
  45. local_peer_id: PeerID, module_infos: List[Optional[RemoteModuleInfo]], balance_quality: float
  46. ) -> bool:
  47. if balance_quality > 1.0:
  48. return True # Forces rebalancing on each check (may be used for debugging purposes)
  49. spans, throughputs = _compute_spans(module_infos)
  50. initial_throughput = throughputs.min()
  51. assert local_peer_id in spans, "Span served by this server is not present in the DHT"
  52. local_span = spans[local_peer_id]
  53. throughputs[local_span.start : local_span.end] -= local_span.throughput
  54. new_start = _choose_best_start(throughputs, local_span.length, local_span.start)
  55. if local_span.start == new_start:
  56. return False # This server is on its best place already
  57. local_span.move_to(new_start)
  58. throughputs[local_span.start : local_span.end] += local_span.throughput
  59. moved = True
  60. while moved:
  61. servers = list(spans.keys())
  62. np.random.shuffle(servers)
  63. moved = False
  64. for peer_id in servers:
  65. span = spans[peer_id]
  66. throughputs[span.start : span.end] -= span.throughput
  67. new_start = _choose_best_start(throughputs, span.length, span.start)
  68. if span.start != new_start:
  69. span.move_to(new_start)
  70. moved = True
  71. throughputs[span.start : span.end] += span.throughput
  72. new_throughput = throughputs.min()
  73. actual_quality = initial_throughput / new_throughput
  74. logger.info(f"Swarm balance quality: {actual_quality * 100:.1f}%")
  75. eps = 1e-6
  76. return actual_quality < balance_quality - eps