load_balancing.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from typing import Optional, Sequence, Tuple
  2. import numpy as np
  3. import scipy.optimize
  4. from hivemind.utils.logging import get_logger
  5. logger = get_logger(__name__)
  6. LOAD_BALANCING_LP_DECIMALS = 9
  7. def load_balance_peers(vector_size, bandwidths: Sequence[Optional[float]], min_size: int = 0) -> Tuple[int, ...]:
  8. """
  9. Find an optimal partitioning of weights for butterfly all-reduce given peer bandwidths.
  10. :param vector_size: total size of the averaged vector (in elements, not bytes)
  11. :param bandwidths: 1d array of non-negative bandwidths for each peer capable of averaging
  12. zeros stand for client-only participants, None represents "not specified" (resolved as mean of other pears)
  13. :param min_size: peers that can aggregate less than this many elements will be assigned nothing
  14. :returns: an integer array where i-th element is the number of weights assigned to i-th peer
  15. """
  16. specified_bandwidth = [item for item in bandwidths if item is not None and item > 0]
  17. if specified_bandwidth:
  18. default_bandwidth = np.mean(specified_bandwidth)
  19. bandwidths = [item if item is not None else default_bandwidth for item in bandwidths]
  20. scores = optimize_parts_lp(vector_size, np.asarray(bandwidths), min_size)
  21. else:
  22. assert not all(item == 0 for item in bandwidths), "Must have at least one nonzero bandwidth"
  23. scores = np.asarray([1.0 if item is None else 0.0 for item in bandwidths])
  24. # TODO(jheuristic) we no longer need hagenbach-bishoff with new AllReduceRunner
  25. return tuple(hagenbach_bishoff(vector_size, scores))
  26. def optimize_parts_lp(vector_size: int, bandwidths: np.ndarray, min_size: int = 0) -> np.ndarray:
  27. """
  28. This method solves an optimization problem to minimize the total allreduce time.
  29. In butterfly all-reduce, each peer acts both as a "client" and as an "aggregator":
  30. * a "client" splits his local vector into shards and sends each shard to one peer, then downloads the average
  31. * an "aggregator" receives a certain part of vector components from all peers, aggregates and returns the average
  32. Peer i network load as a "client" = vector_size * (1 - fraction_assigned_to_peer_i)
  33. Peer i network load as an "aggregator" = vector_size * (group_size - 1) * fraction_assigned_to_peer_i
  34. Peer i total communication = vector_size * [1 + (group_size - 2) * fraction_assigned_to_peer_i]
  35. Total time = max_i (total_communication_for_peer_i / bandwidths[i])
  36. We solve this optimization problem by reducing it to linear programming with a minimax reduction
  37. (see lecture notes: https://www.usna.edu/Users/math/dphillip/sa305.s15/phillips/lessons/32/32.pdf )
  38. :returns: a vector of "scores", i-th score is proportional to the fraction of weights assigned to i-th peer
  39. """
  40. assert np.all(bandwidths >= 0) and np.any(bandwidths > 0)
  41. bandwidths = np.asarray(bandwidths, dtype=np.float64)
  42. permutation = np.argsort(-bandwidths)
  43. bandwidths = bandwidths[permutation]
  44. is_nonzero = bandwidths != 0
  45. group_size = len(bandwidths)
  46. num_variables = group_size + 1 # [w_1, ..., w_N, xi]
  47. c = np.zeros(num_variables, dtype=np.float64)
  48. c[-1] = 1.0 # optimize w.r.t. xi
  49. # the constraints below are tuples (A, b) such that Ax <= b
  50. nonnegative_weights = -np.eye(group_size, num_variables, dtype=c.dtype), np.zeros(group_size, c.dtype)
  51. weights_sum_to_one = c[None, :] - 1.0, np.array([-1.0])
  52. coeff_per_variable = (group_size - 2.0) / np.maximum(bandwidths, 10**-LOAD_BALANCING_LP_DECIMALS)
  53. coeff_matrix_minus_xi = np.hstack([np.diag(coeff_per_variable), -np.ones((group_size, 1), c.dtype)])
  54. xi_is_maximum = coeff_matrix_minus_xi[is_nonzero], -1.0 / bandwidths[is_nonzero]
  55. force_max_weights = np.eye(group_size, M=num_variables, dtype=c.dtype), is_nonzero.astype(c.dtype)
  56. A, b = list(map(np.concatenate, zip(nonnegative_weights, weights_sum_to_one, xi_is_maximum, force_max_weights)))
  57. solution = scipy.optimize.linprog(c, A_ub=A, b_ub=b, method="interior-point")
  58. if solution.success:
  59. peer_scores = solution.x[:group_size]
  60. # if some peers have less than min_size elements, transfer their share to other peers (if any)
  61. if np.max(peer_scores) >= min_size / float(vector_size):
  62. peer_scores[peer_scores < min_size / float(vector_size)] = 0.0
  63. peer_scores = np.round(peer_scores, LOAD_BALANCING_LP_DECIMALS)
  64. else:
  65. logger.error(f"Failed to solve load-balancing for bandwidths {bandwidths}")
  66. peer_scores = np.ones(group_size, c.dtype)
  67. return peer_scores[np.argsort(permutation)]
  68. def hagenbach_bishoff(vector_size: int, scores: Sequence[float]) -> Sequence[int]:
  69. """
  70. Split a vector between participants based on continuous fractions.
  71. https://en.wikipedia.org/wiki/Hagenbach-Bischoff_system
  72. The code is based on https://github.com/crflynn/voting
  73. :param vector_size: the total number of elements to be split
  74. :param scores: real-valued vector fractions for each peer
  75. :returns: integer-valued partitions assigned to every peer
  76. """
  77. total_score = sum(scores)
  78. allocated = [int(vector_size * score_i / total_score) for score_i in scores]
  79. while sum(allocated) < vector_size:
  80. quotients = [score / (allocated[idx] + 1) for idx, score in enumerate(scores)]
  81. idx_max = quotients.index(max(quotients))
  82. allocated[idx_max] += 1
  83. return allocated