load_balancing.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from typing import Sequence, Optional, Tuple
  2. import numpy as np
  3. import scipy.optimize
  4. from hivemind.utils.logging import get_logger
  5. logger = get_logger(__name__)
  6. LOAD_BALANCING_LP_DECIMALS = 9
  7. def load_balance_peers(vector_size, throughputs: Sequence[Optional[float]], min_size: int = 0) -> Tuple[int, ...]:
  8. """
  9. Find an optimal partitioning of weights for butterfly all-reduce given peer throughputs.
  10. :param vector_size: total size of the averaged vector (in elements, not bytes)
  11. :param throughputs: 1d array of non-negative throughputs for each peer capable of averaging
  12. zeros stand for client-only participants, None represents "not specified" (resolved as mean of other pears)
  13. :param min_size: peers that can aggregate less than this many elements will be assigned nothing
  14. :returns: an integer array where i-th element is the number of weights assigned to i-th peer
  15. """
  16. specified_throughputs = [throughput for throughput in throughputs if throughput is not None and throughput > 0]
  17. if specified_throughputs:
  18. default_throughput = np.mean(specified_throughputs)
  19. throughputs = [throughput if throughput is not None else default_throughput for throughput in throughputs]
  20. scores = optimize_parts_lp(vector_size, np.asarray(throughputs), min_size)
  21. else:
  22. assert not all(throughput == 0 for throughput in throughputs), "Must have at least one nonzero throughput"
  23. scores = np.asarray([1.0 if throughput is None else 0.0 for throughput in throughputs])
  24. return tuple(hagenbach_bishoff(vector_size, scores))
  25. def optimize_parts_lp(vector_size: int, throughputs: np.ndarray, min_size: int = 0) -> np.ndarray:
  26. """
  27. This method solves an optimization problem to minimize the total allreduce time.
  28. In butterfly all-reduce, each peer acts both as a "client" and as an "aggregator":
  29. * a "client" splits his local vector into shards and sends each shard to one peer, then downloads the average
  30. * an "aggregator" receives a certain part of vector components from all peers, aggregates and returns the average
  31. Peer i network load as a "client" = vector_size * (1 - fraction_assigned_to_peer_i)
  32. Peer i network load as an "aggregator" = vector_size * (group_size - 1) * fraction_assigned_to_peer_i
  33. Peer i total communication = vector_size * [1 + (group_size - 2) * fraction_assigned_to_peer_i]
  34. Total time = max_i (total_communication_for_peer_i / throughputs[i])
  35. We solve this optimization problem by reducing it to linear programming with a minimax reduction
  36. (see lecture notes: https://www.usna.edu/Users/math/dphillip/sa305.s15/phillips/lessons/32/32.pdf )
  37. :returns: a vector of "scores", i-th score is proportional to the fraction of weights assigned to i-th peer
  38. """
  39. assert np.all(throughputs >= 0) and np.any(throughputs > 0)
  40. throughputs = np.asarray(throughputs, dtype=np.float64)
  41. permutation = np.argsort(-throughputs)
  42. throughputs = throughputs[permutation]
  43. is_nonzero = throughputs != 0
  44. group_size = len(throughputs)
  45. num_variables = group_size + 1 # [w_1, ..., w_N, xi]
  46. c = np.zeros(num_variables, dtype=np.float64)
  47. c[-1] = 1.0 # optimize w.r.t. xi
  48. # the constraints below are tuples (A, b) such that Ax <= b
  49. nonnegative_weights = -np.eye(group_size, num_variables, dtype=c.dtype), np.zeros(group_size, c.dtype)
  50. weights_sum_to_one = c[None, :] - 1.0, np.array([-1.0])
  51. coeff_per_variable = (group_size - 2.0) / np.maximum(throughputs, 10 ** -LOAD_BALANCING_LP_DECIMALS)
  52. coeff_matrix_minus_xi = np.hstack([np.diag(coeff_per_variable), -np.ones((group_size, 1), c.dtype)])
  53. xi_is_maximum = coeff_matrix_minus_xi[is_nonzero], -1.0 / throughputs[is_nonzero]
  54. force_max_weights = np.eye(group_size, M=num_variables, dtype=c.dtype), is_nonzero.astype(c.dtype)
  55. A, b = list(map(np.concatenate, zip(nonnegative_weights, weights_sum_to_one, xi_is_maximum, force_max_weights)))
  56. solution = scipy.optimize.linprog(c, A_ub=A, b_ub=b, method='interior-point')
  57. if solution.success:
  58. peer_scores = solution.x[:group_size]
  59. # if some peers have less than min_size elements, transfer their share to other peers (if any)
  60. if np.max(peer_scores) >= min_size / float(vector_size):
  61. peer_scores[peer_scores < min_size / float(vector_size)] = 0.0
  62. peer_scores = np.round(peer_scores, LOAD_BALANCING_LP_DECIMALS)
  63. else:
  64. logger.error(f"Failed to solve load-balancing for bandwidths {throughputs}.")
  65. peer_scores = np.ones(group_size, c.dtype)
  66. return peer_scores[np.argsort(permutation)]
  67. def hagenbach_bishoff(vector_size: int, scores: Sequence[float]) -> Sequence[int]:
  68. """
  69. Split a vector between participants based on continuous fractions.
  70. https://en.wikipedia.org/wiki/Hagenbach-Bischoff_system
  71. The code is based on https://github.com/crflynn/voting
  72. :param vector_size: the total number of elements to be split
  73. :param scores: real-valued vector fractions for each peer
  74. :returns: integer-valued partitions assigned to every peer
  75. """
  76. total_score = sum(scores)
  77. allocated = [int(vector_size * score_i / total_score) for score_i in scores]
  78. while sum(allocated) < vector_size:
  79. quotients = [score / (allocated[idx] + 1) for idx, score in enumerate(scores)]
  80. idx_max = quotients.index(max(quotients))
  81. allocated[idx_max] += 1
  82. return allocated