|
@@ -32,7 +32,10 @@ def _compute_spans(module_infos: List[Optional[RemoteModuleInfo]]) -> Tuple[Dict
|
|
|
if module is None:
|
|
|
continue
|
|
|
|
|
|
- for peer_id, server in module.servers.items():
|
|
|
+ # We sort servers here to ensure that we get exactly the same throughputs for a given set of servers.
|
|
|
+ # If the order were not defined, we would get slightly different values due to floating point errors,
|
|
|
+ # which may cause excess block replacements.
|
|
|
+ for peer_id, server in sorted(module.servers.items()):
|
|
|
if server.state == ServerState.OFFLINE:
|
|
|
continue
|
|
|
|
|
@@ -47,17 +50,14 @@ def _compute_spans(module_infos: List[Optional[RemoteModuleInfo]]) -> Tuple[Dict
|
|
|
return spans, throughputs
|
|
|
|
|
|
|
|
|
-def _choose_best_start(throughputs: np.ndarray, num_blocks: int, cur_start: Optional[int]) -> int:
|
|
|
- options = (
|
|
|
- (sorted(throughputs[i : i + num_blocks]), i != cur_start, i)
|
|
|
- for i in range(0, len(throughputs) - num_blocks + 1)
|
|
|
- )
|
|
|
+def _choose_best_start(throughputs: np.ndarray, num_blocks: int) -> int:
|
|
|
+ options = ((sorted(throughputs[i : i + num_blocks]), i) for i in range(0, len(throughputs) - num_blocks + 1))
|
|
|
return min(options)[-1]
|
|
|
|
|
|
|
|
|
def choose_best_blocks(num_blocks: int, module_infos: List[Optional[RemoteModuleInfo]]) -> List[int]:
|
|
|
_, throughputs = _compute_spans(module_infos)
|
|
|
- start = _choose_best_start(throughputs, num_blocks, None)
|
|
|
+ start = _choose_best_start(throughputs, num_blocks)
|
|
|
return list(range(start, start + num_blocks))
|
|
|
|
|
|
|
|
@@ -69,16 +69,22 @@ def should_choose_other_blocks(
|
|
|
|
|
|
spans, throughputs = _compute_spans(module_infos)
|
|
|
initial_throughput = throughputs.min()
|
|
|
+ eps = 1e-3
|
|
|
|
|
|
assert local_peer_id in spans, "Span served by this server is not present in the DHT"
|
|
|
local_span = spans[local_peer_id]
|
|
|
- throughputs[local_span.start : local_span.end] -= local_span.throughput
|
|
|
+ throughputs[local_span.start : local_span.end] -= local_span.throughput * (1 + eps)
|
|
|
+ # Without (1 + eps) here, we would sometimes subtract a value slightly less than local_span.throughput
|
|
|
+ # due to the floating point error, which would cause excess block replacements.
|
|
|
+ # Also, subtracting local_span.throughput * (1 + eps) makes _choose_best_start() prefer
|
|
|
+ # the previous server position in case of other things being almost equal.
|
|
|
|
|
|
- new_start = _choose_best_start(throughputs, local_span.length, local_span.start)
|
|
|
+ new_start = _choose_best_start(throughputs, local_span.length)
|
|
|
if local_span.start == new_start:
|
|
|
return False # This server is on its best place already
|
|
|
- local_span.move_to(new_start)
|
|
|
|
|
|
+ throughputs[local_span.start : local_span.end] += local_span.throughput * eps
|
|
|
+ local_span.move_to(new_start)
|
|
|
throughputs[local_span.start : local_span.end] += local_span.throughput
|
|
|
|
|
|
moved = True
|
|
@@ -89,18 +95,18 @@ def should_choose_other_blocks(
|
|
|
moved = False
|
|
|
for peer_id in servers:
|
|
|
span = spans[peer_id]
|
|
|
- throughputs[span.start : span.end] -= span.throughput
|
|
|
+ throughputs[span.start : span.end] -= span.throughput * (1 + eps)
|
|
|
|
|
|
- new_start = _choose_best_start(throughputs, span.length, span.start)
|
|
|
+ new_start = _choose_best_start(throughputs, span.length)
|
|
|
+
|
|
|
+ throughputs[span.start : span.end] += span.throughput * eps
|
|
|
if span.start != new_start:
|
|
|
span.move_to(new_start)
|
|
|
moved = True
|
|
|
-
|
|
|
throughputs[span.start : span.end] += span.throughput
|
|
|
|
|
|
new_throughput = throughputs.min()
|
|
|
actual_quality = initial_throughput / new_throughput
|
|
|
logger.info(f"Swarm balance quality: {actual_quality * 100:.1f}%")
|
|
|
|
|
|
- eps = 1e-6
|
|
|
return actual_quality < balance_quality - eps
|