|
@@ -6,6 +6,8 @@ from hivemind.utils.logging import get_logger
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
+LOAD_BALANCING_LP_DECIMALS = 9
|
|
|
+
|
|
|
|
|
|
def load_balance_peers(vector_size, throughputs: Sequence[Optional[float]], min_size: int = 0) -> Tuple[int, ...]:
|
|
|
"""
|
|
@@ -29,7 +31,7 @@ def load_balance_peers(vector_size, throughputs: Sequence[Optional[float]], min_
|
|
|
return tuple(hagenbach_bishoff(vector_size, scores))
|
|
|
|
|
|
|
|
|
-def optimize_parts_lp(vector_size: int, throughputs: np.ndarray, min_size: int = 0, eps: float = 1e-15) -> np.ndarray:
|
|
|
+def optimize_parts_lp(vector_size: int, throughputs: np.ndarray, min_size: int = 0) -> np.ndarray:
|
|
|
"""
|
|
|
This method solves an optimization problem to minimize the total allreduce time.
|
|
|
In butterfly all-reduce, each peer acts both as a "client" and as an "aggregator":
|
|
@@ -47,6 +49,7 @@ def optimize_parts_lp(vector_size: int, throughputs: np.ndarray, min_size: int =
|
|
|
:returns: a vector of "scores", i-th score is proportional to the fraction of weights assigned to i-th peer
|
|
|
"""
|
|
|
assert np.all(throughputs >= 0) and np.any(throughputs > 0)
|
|
|
+ throughputs = np.asarray(throughputs, dtype=np.float64)
|
|
|
permutation = np.argsort(-throughputs)
|
|
|
throughputs = throughputs[permutation]
|
|
|
is_nonzero = throughputs != 0
|
|
@@ -54,28 +57,29 @@ def optimize_parts_lp(vector_size: int, throughputs: np.ndarray, min_size: int =
|
|
|
group_size = len(throughputs)
|
|
|
num_variables = group_size + 1 # [w_1, ..., w_N, xi]
|
|
|
|
|
|
- c = np.zeros(num_variables)
|
|
|
+ c = np.zeros(num_variables, dtype=np.float64)
|
|
|
c[-1] = 1.0 # optimize w.r.t. xi
|
|
|
|
|
|
# the constraints below are tuples (A, b) such that Ax <= b
|
|
|
- nonnegative_weights = -np.eye(group_size, M=num_variables), np.zeros(group_size)
|
|
|
+ nonnegative_weights = -np.eye(group_size, num_variables, dtype=c.dtype), np.zeros(group_size, c.dtype)
|
|
|
weights_sum_to_one = c[None, :] - 1.0, np.array([-1.0])
|
|
|
- coeff_per_variable = (group_size - 2.0) / np.maximum(throughputs, eps)
|
|
|
- coeff_matrix_minus_xi = np.hstack([np.diag(coeff_per_variable), -np.ones((group_size, 1))])
|
|
|
+ coeff_per_variable = (group_size - 2.0) / np.maximum(throughputs, 10 ** -LOAD_BALANCING_LP_DECIMALS)
|
|
|
+ coeff_matrix_minus_xi = np.hstack([np.diag(coeff_per_variable), -np.ones((group_size, 1), c.dtype)])
|
|
|
xi_is_maximum = coeff_matrix_minus_xi[is_nonzero], -1.0 / throughputs[is_nonzero]
|
|
|
- force_max_weights = np.eye(group_size, M=num_variables), is_nonzero.astype(c.dtype)
|
|
|
+ force_max_weights = np.eye(group_size, M=num_variables, dtype=c.dtype), is_nonzero.astype(c.dtype)
|
|
|
|
|
|
A, b = list(map(np.concatenate, zip(nonnegative_weights, weights_sum_to_one, xi_is_maximum, force_max_weights)))
|
|
|
|
|
|
- solution = scipy.optimize.linprog(c, A_ub=A, b_ub=b)
|
|
|
+ solution = scipy.optimize.linprog(c, A_ub=A, b_ub=b, method='interior-point')
|
|
|
if solution.success:
|
|
|
peer_scores = solution.x[:group_size]
|
|
|
# if some peers have less than min_size elements, transfer their share to other peers (if any)
|
|
|
if np.max(peer_scores) >= min_size / float(vector_size):
|
|
|
peer_scores[peer_scores < min_size / float(vector_size)] = 0.0
|
|
|
+ peer_scores = np.round(peer_scores, LOAD_BALANCING_LP_DECIMALS)
|
|
|
else:
|
|
|
logger.error(f"Failed to solve load-balancing for bandwidths {throughputs}.")
|
|
|
- peer_scores = np.ones(group_size)
|
|
|
+ peer_scores = np.ones(group_size, c.dtype)
|
|
|
|
|
|
return peer_scores[np.argsort(permutation)]
|
|
|
|