|
@@ -1,13 +1,14 @@
|
|
|
import random
|
|
|
|
|
|
import numpy as np
|
|
|
-import torch
|
|
|
import pytest
|
|
|
+import torch
|
|
|
|
|
|
import hivemind
|
|
|
-from hivemind.client.averaging.allreduce import AveragingMode
|
|
|
-from hivemind.client.averaging.load_balancing import load_balance_peers
|
|
|
-from hivemind.client.averaging.key_manager import GroupKeyManager
|
|
|
+import hivemind.averaging.averager
|
|
|
+from hivemind.averaging.allreduce import AveragingMode
|
|
|
+from hivemind.averaging.key_manager import GroupKeyManager
|
|
|
+from hivemind.averaging.load_balancing import load_balance_peers
|
|
|
from hivemind.proto.runtime_pb2 import CompressionType
|
|
|
|
|
|
|
|
@@ -45,7 +46,8 @@ def _test_allreduce_once(n_clients, n_aux):
|
|
|
dht = hivemind.DHT(start=True, endpoint=f'{hivemind.LOCALHOST}:*')
|
|
|
|
|
|
n_peers = 4
|
|
|
- modes = [AveragingMode.CLIENT] * n_clients + [AveragingMode.AUX] * n_aux + [AveragingMode.NODE] * (n_peers - n_clients - n_aux)
|
|
|
+ modes = [AveragingMode.CLIENT] * n_clients + [AveragingMode.AUX] * n_aux + [AveragingMode.NODE] * (
|
|
|
+ n_peers - n_clients - n_aux)
|
|
|
random.shuffle(modes)
|
|
|
|
|
|
tensors1 = [torch.randn(123), torch.zeros(3)]
|
|
@@ -55,12 +57,14 @@ def _test_allreduce_once(n_clients, n_aux):
|
|
|
peer_tensors = [tensors1, tensors2, tensors3, tensors4]
|
|
|
|
|
|
reference = [sum(tensors[i] for tensors, mode in zip(peer_tensors, modes)
|
|
|
- if mode != AveragingMode.AUX) / max(1, n_peers - n_aux) for i in range(len(tensors1))]
|
|
|
+ if mode != AveragingMode.AUX) / max(1, n_peers - n_aux) for i in range(len(tensors1))]
|
|
|
|
|
|
- averagers = [hivemind.DecentralizedAverager(tensors, dht=dht, target_group_size=4, averaging_expiration=15,
|
|
|
- prefix='mygroup', listen=mode != AveragingMode.CLIENT, listen_on='127.0.0.1:*',
|
|
|
- auxiliary=mode == AveragingMode.AUX, start=True)
|
|
|
- for tensors, mode in zip(peer_tensors, modes)]
|
|
|
+ averagers = [
|
|
|
+ hivemind.averaging.DecentralizedAverager(tensors, dht=dht, target_group_size=4, averaging_expiration=15,
|
|
|
+ prefix='mygroup', listen=mode != AveragingMode.CLIENT,
|
|
|
+ listen_on='127.0.0.1:*',
|
|
|
+ auxiliary=mode == AveragingMode.AUX, start=True)
|
|
|
+ for tensors, mode in zip(peer_tensors, modes)]
|
|
|
|
|
|
futures = []
|
|
|
for averager in averagers:
|
|
@@ -106,10 +110,10 @@ def test_allreduce_weighted(n_client_mode_peers: int = 2):
|
|
|
tensors2 = [torch.rand(123), torch.ones(3)]
|
|
|
tensors3 = [-torch.rand(123), torch.arange(3).to(torch.float32)]
|
|
|
tensors4 = [torch.randn(123) ** 3, torch.arange(3).to(torch.float32) / 2]
|
|
|
- averagers = [hivemind.DecentralizedAverager(tensors, dht=dht, target_group_size=4, averaging_expiration=15,
|
|
|
- prefix='mygroup', listen=listen, listen_on='127.0.0.1:*',
|
|
|
- start=True)
|
|
|
- for tensors, listen in zip([tensors1, tensors2, tensors3, tensors4], should_listen)]
|
|
|
+ averagers = [
|
|
|
+ hivemind.averaging.DecentralizedAverager(tensors, dht=dht, target_group_size=4, averaging_expiration=15,
|
|
|
+ prefix='mygroup', listen=listen, listen_on='127.0.0.1:*', start=True)
|
|
|
+ for tensors, listen in zip([tensors1, tensors2, tensors3, tensors4], should_listen)]
|
|
|
weights = list(map(float, np.random.rand(len(averagers)) * 10 + 0.01))
|
|
|
reference = [(tensors1[i] * weights[0] + tensors2[i] * weights[1] + tensors3[i] * weights[2]
|
|
|
+ tensors4[i] * weights[3]) / sum(weights) for i in range(len(tensors1))]
|
|
@@ -142,12 +146,13 @@ def test_allreduce_compression():
|
|
|
FLOAT16, UINT8 = CompressionType.FLOAT16, CompressionType.UNIFORM_8BIT
|
|
|
|
|
|
for compression_type_pair in [(FLOAT16, FLOAT16), (FLOAT16, UINT8), (UINT8, FLOAT16), (UINT8, UINT8)]:
|
|
|
- averager1 = hivemind.DecentralizedAverager([x.clone() for x in tensors1], dht=dht,
|
|
|
- compression_type=compression_type_pair, listen=False,
|
|
|
- target_group_size=2, prefix='mygroup', start=True)
|
|
|
- averager2 = hivemind.DecentralizedAverager([x.clone() for x in tensors2], dht=dht,
|
|
|
- compression_type=compression_type_pair,
|
|
|
- target_group_size=2, prefix='mygroup', start=True)
|
|
|
+ averager1 = hivemind.averaging.DecentralizedAverager([x.clone() for x in tensors1], dht=dht,
|
|
|
+ compression_type=compression_type_pair,
|
|
|
+ listen=False, target_group_size=2, prefix='mygroup',
|
|
|
+ start=True)
|
|
|
+ averager2 = hivemind.averaging.DecentralizedAverager([x.clone() for x in tensors2], dht=dht,
|
|
|
+ compression_type=compression_type_pair,
|
|
|
+ target_group_size=2, prefix='mygroup', start=True)
|
|
|
|
|
|
for future in averager1.step(wait=False), averager2.step(wait=False):
|
|
|
future.result()
|
|
@@ -186,7 +191,7 @@ def compute_mean_std(averagers, unbiased=True):
|
|
|
@pytest.mark.forked
|
|
|
def test_allreduce_grid():
|
|
|
dht = hivemind.DHT(start=True, endpoint='127.0.0.1:*')
|
|
|
- averagers = [hivemind.DecentralizedAverager(
|
|
|
+ averagers = [hivemind.averaging.DecentralizedAverager(
|
|
|
averaged_tensors=[torch.randn(3)], dht=dht, target_group_size=2,
|
|
|
prefix='mygroup', initial_group_bits=bin(i // 2)[2:].rjust(2, '0'), start=True)
|
|
|
for i in range(8)]
|
|
@@ -216,9 +221,9 @@ def test_allreduce_grid():
|
|
|
@pytest.mark.forked
|
|
|
def test_allgather():
|
|
|
dht = hivemind.DHT(start=True, endpoint=f'{hivemind.LOCALHOST}:*')
|
|
|
- averagers = [hivemind.DecentralizedAverager([torch.ones(1)], dht=dht, target_group_size=4, averaging_expiration=15,
|
|
|
- prefix='mygroup', initial_group_bits='000', listen_on='127.0.0.1:*',
|
|
|
- start=True)
|
|
|
+ averagers = [hivemind.averaging.DecentralizedAverager([torch.ones(1)], dht=dht, target_group_size=4,
|
|
|
+ averaging_expiration=15, prefix='mygroup',
|
|
|
+ initial_group_bits='000', listen_on='127.0.0.1:*', start=True)
|
|
|
for _ in range(8)]
|
|
|
|
|
|
futures = []
|
|
@@ -286,7 +291,7 @@ def test_load_balancing():
|
|
|
@pytest.mark.forked
|
|
|
def test_too_few_peers():
|
|
|
dht = hivemind.DHT(start=True, endpoint='127.0.0.1:*')
|
|
|
- averagers = [hivemind.DecentralizedAverager(
|
|
|
+ averagers = [hivemind.averaging.DecentralizedAverager(
|
|
|
averaged_tensors=[torch.randn(3)], dht=dht, target_group_size=2,
|
|
|
averaging_expiration=1, request_timeout=0.5,
|
|
|
prefix='mygroup', initial_group_bits=bin(i)[2:].rjust(3, '0'), start=True)
|
|
@@ -303,7 +308,7 @@ def test_too_few_peers():
|
|
|
@pytest.mark.forked
|
|
|
def test_overcrowded(num_peers=16):
|
|
|
dht = hivemind.DHT(start=True, endpoint='127.0.0.1:*')
|
|
|
- averagers = [hivemind.DecentralizedAverager(
|
|
|
+ averagers = [hivemind.averaging.DecentralizedAverager(
|
|
|
averaged_tensors=[torch.randn(3)], dht=dht, target_group_size=2,
|
|
|
averaging_expiration=1, request_timeout=0.5,
|
|
|
prefix='mygroup', initial_group_bits='', start=True)
|
|
@@ -323,7 +328,7 @@ def test_load_state_from_peers():
|
|
|
super_metadata = dict(x=123)
|
|
|
super_tensors = (torch.randn(3), torch.randint(0, 5, (3,)))
|
|
|
|
|
|
- class TestAverager(hivemind.DecentralizedAverager):
|
|
|
+ class TestAverager(hivemind.averaging.DecentralizedAverager):
|
|
|
def get_current_state(self):
|
|
|
"""
|
|
|
Get current state and send it to a peer. executed in the host process. Meant to be overriden.
|
|
@@ -373,8 +378,8 @@ def test_load_state_from_peers():
|
|
|
@pytest.mark.forked
|
|
|
def test_getset_bits():
|
|
|
dht = hivemind.DHT(start=True, endpoint='127.0.0.1:*')
|
|
|
- averager = hivemind.DecentralizedAverager([torch.randn(3)], dht=dht, start=True,
|
|
|
- prefix='test_prefix', target_group_size=2)
|
|
|
+ averager = hivemind.averaging.DecentralizedAverager([torch.randn(3)], dht=dht, start=True,
|
|
|
+ prefix='test_prefix', target_group_size=2)
|
|
|
averager.set_group_bits('00101011101010')
|
|
|
assert averager.get_group_bits() == '00101011101010'
|
|
|
|
|
@@ -389,13 +394,13 @@ def test_training_averager(n_steps: int = 10, n_dims: int = 16):
|
|
|
|
|
|
x1 = torch.randn(n_dims, requires_grad=True)
|
|
|
opt1 = torch.optim.Adam([x1], lr=0.05)
|
|
|
- averager1 = hivemind.client.TrainingAverager(opt1, average_gradients=True, average_parameters=True,
|
|
|
- average_opt_statistics=["exp_avg_sq"], **common_kwargs)
|
|
|
+ averager1 = hivemind.averaging.TrainingAverager(opt1, average_gradients=True, average_parameters=True,
|
|
|
+ average_opt_statistics=["exp_avg_sq"], **common_kwargs)
|
|
|
|
|
|
x2 = torch.randn(n_dims, requires_grad=True)
|
|
|
opt2 = torch.optim.Adam([x2], lr=0.05)
|
|
|
- averager2 = hivemind.client.TrainingAverager(opt2, average_gradients=True, average_parameters=True,
|
|
|
- average_opt_statistics=["exp_avg_sq"], **common_kwargs)
|
|
|
+ averager2 = hivemind.averaging.TrainingAverager(opt2, average_gradients=True, average_parameters=True,
|
|
|
+ average_opt_statistics=["exp_avg_sq"], **common_kwargs)
|
|
|
a = torch.ones(n_dims)
|
|
|
|
|
|
for i in range(n_steps):
|