123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- from typing import Sequence, Optional, Tuple
- import numpy as np
- import scipy.optimize
- from hivemind.utils.logging import get_logger
- logger = get_logger(__name__)
- LOAD_BALANCING_LP_DECIMALS = 9
- def load_balance_peers(vector_size, throughputs: Sequence[Optional[float]], min_size: int = 0) -> Tuple[int, ...]:
- """
- Find an optimal partitioning of weights for butterfly all-reduce given peer throughputs.
- :param vector_size: total size of the averaged vector (in elements, not bytes)
- :param throughputs: 1d array of non-negative throughputs for each peer capable of averaging
- zeros stand for client-only participants, None represents "not specified" (resolved as mean of other pears)
- :param min_size: peers that can aggregate less than this many elements will be assigned nothing
- :returns: an integer array where i-th element is the number of weights assigned to i-th peer
- """
- specified_throughputs = [throughput for throughput in throughputs if throughput is not None and throughput > 0]
- if specified_throughputs:
- default_throughput = np.mean(specified_throughputs)
- throughputs = [throughput if throughput is not None else default_throughput for throughput in throughputs]
- scores = optimize_parts_lp(vector_size, np.asarray(throughputs), min_size)
- else:
- assert not all(throughput == 0 for throughput in throughputs), "Must have at least one nonzero throughput"
- scores = np.asarray([1.0 if throughput is None else 0.0 for throughput in throughputs])
- return tuple(hagenbach_bishoff(vector_size, scores))
- def optimize_parts_lp(vector_size: int, throughputs: np.ndarray, min_size: int = 0) -> np.ndarray:
- """
- This method solves an optimization problem to minimize the total allreduce time.
- In butterfly all-reduce, each peer acts both as a "client" and as an "aggregator":
- * a "client" splits his local vector into shards and sends each shard to one peer, then downloads the average
- * an "aggregator" receives a certain part of vector components from all peers, aggregates and returns the average
- Peer i network load as a "client" = vector_size * (1 - fraction_assigned_to_peer_i)
- Peer i network load as an "aggregator" = vector_size * (group_size - 1) * fraction_assigned_to_peer_i
- Peer i total communication = vector_size * [1 + (group_size - 2) * fraction_assigned_to_peer_i]
- Total time = max_i (total_communication_for_peer_i / throughputs[i])
- We solve this optimization problem by reducing it to linear programming with a minimax reduction
- (see lecture notes: https://www.usna.edu/Users/math/dphillip/sa305.s15/phillips/lessons/32/32.pdf )
- :returns: a vector of "scores", i-th score is proportional to the fraction of weights assigned to i-th peer
- """
- assert np.all(throughputs >= 0) and np.any(throughputs > 0)
- throughputs = np.asarray(throughputs, dtype=np.float64)
- permutation = np.argsort(-throughputs)
- throughputs = throughputs[permutation]
- is_nonzero = throughputs != 0
- group_size = len(throughputs)
- num_variables = group_size + 1 # [w_1, ..., w_N, xi]
- c = np.zeros(num_variables, dtype=np.float64)
- c[-1] = 1.0 # optimize w.r.t. xi
- # the constraints below are tuples (A, b) such that Ax <= b
- nonnegative_weights = -np.eye(group_size, num_variables, dtype=c.dtype), np.zeros(group_size, c.dtype)
- weights_sum_to_one = c[None, :] - 1.0, np.array([-1.0])
- coeff_per_variable = (group_size - 2.0) / np.maximum(throughputs, 10 ** -LOAD_BALANCING_LP_DECIMALS)
- coeff_matrix_minus_xi = np.hstack([np.diag(coeff_per_variable), -np.ones((group_size, 1), c.dtype)])
- xi_is_maximum = coeff_matrix_minus_xi[is_nonzero], -1.0 / throughputs[is_nonzero]
- force_max_weights = np.eye(group_size, M=num_variables, dtype=c.dtype), is_nonzero.astype(c.dtype)
- A, b = list(map(np.concatenate, zip(nonnegative_weights, weights_sum_to_one, xi_is_maximum, force_max_weights)))
- solution = scipy.optimize.linprog(c, A_ub=A, b_ub=b, method='interior-point')
- if solution.success:
- peer_scores = solution.x[:group_size]
- # if some peers have less than min_size elements, transfer their share to other peers (if any)
- if np.max(peer_scores) >= min_size / float(vector_size):
- peer_scores[peer_scores < min_size / float(vector_size)] = 0.0
- peer_scores = np.round(peer_scores, LOAD_BALANCING_LP_DECIMALS)
- else:
- logger.error(f"Failed to solve load-balancing for bandwidths {throughputs}.")
- peer_scores = np.ones(group_size, c.dtype)
- return peer_scores[np.argsort(permutation)]
- def hagenbach_bishoff(vector_size: int, scores: Sequence[float]) -> Sequence[int]:
- """
- Split a vector between participants based on continuous fractions.
- https://en.wikipedia.org/wiki/Hagenbach-Bischoff_system
- The code is based on https://github.com/crflynn/voting
- :param vector_size: the total number of elements to be split
- :param scores: real-valued vector fractions for each peer
- :returns: integer-valued partitions assigned to every peer
- """
- total_score = sum(scores)
- allocated = [int(vector_size * score_i / total_score) for score_i in scores]
- while sum(allocated) < vector_size:
- quotients = [score / (allocated[idx] + 1) for idx, score in enumerate(scores)]
- idx_max = quotients.index(max(quotients))
- allocated[idx_max] += 1
- return allocated
|