Artem Chumachenko 3 роки тому
батько
коміт
3b43350b7f

+ 77 - 0
examples/example.py

@@ -0,0 +1,77 @@
+import time
+
+import torch
+import torch.nn as nn
+import torch.nn.functional as F
+from torchvision import datasets, transforms
+from tqdm.auto import tqdm
+
+import hivemind
+
+
+class SmallCNN(nn.Module):
+    def __init__(self):
+        super().__init__()
+
+        self.features = nn.Sequential(
+            nn.Conv2d(1, 16, (9, 9)),
+            nn.ReLU(),
+            nn.Conv2d(16, 16, (9, 9)),
+            nn.ReLU(),
+            nn.MaxPool2d(2)
+        )
+
+        self.cls = nn.Sequential(
+            nn.Linear(16 * 6 * 6, 400),
+            nn.ReLU(),
+            nn.Linear(400, 10)
+        )
+
+    def forward(self, x):
+        feature = self.features(x)
+        return self.cls(feature.view(x.size(0), -1))
+
+
+if __name__ == "__main__":
+    # Create dataset and model, same as in the basic tutorial
+    # For this basic tutorial, we download only the training set
+    transform = transforms.Compose([transforms.ToTensor()])
+
+    trainset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
+
+    model = SmallCNN()
+    opt = torch.optim.Adam(model.parameters(), lr=0.001)
+
+    # Create DHT: a decentralized key-value storage shared between peers
+    dht = hivemind.DHT(start=True, initial_peers=["/ip4/127.0.0.1/tcp/36805/p2p/Qmc7nJt6Pc3Eii4X1ZqtkxbiRWvf97nNfuD4CJpAep5THU"])
+    print("To join the training, use initial_peers =", [str(addr) for addr in dht.get_visible_maddrs()])
+
+    # Set up a decentralized optimizer that will average with peers in background
+    opt = hivemind.Optimizer(
+        dht=dht,                  # use a DHT that is connected with other peers
+        run_id='my_cifar_run',    # unique identifier of this collaborative run
+        batch_size_per_step=16,   # each call to opt.step adds this many samples towards the next epoch
+        target_batch_size=1000,  # after peers collectively process this many samples, average weights and begin the next epoch 
+        optimizer=opt,            # wrap the SGD optimizer defined above
+        use_local_updates=False,  # perform optimizer steps with averaged gradients
+        matchmaking_time=3.0,     # when averaging parameters, gather peers in background for up to this many seconds
+        averaging_timeout=10.0,   # give up on averaging if not successful in this many seconds
+        verbose=True,             # print logs incessently
+        grad_rank_averager="power_sgd",
+        grad_averager_opts={"averager_rank": 1}
+    )
+    opt.load_state_from_peers()
+
+    # Note: if you intend to use GPU, switch to it only after the decentralized optimizer is created
+    with tqdm() as progressbar:
+        while True:
+            for x_batch, y_batch in torch.utils.data.DataLoader(trainset, shuffle=True, batch_size=16):
+                time.sleep(0.1)
+                opt.zero_grad()
+                loss = F.cross_entropy(model(x_batch), y_batch)
+                loss.backward()
+                torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
+                opt.step()
+
+                progressbar.desc = f"loss = {loss.item():.3f}"
+                progressbar.update()

+ 0 - 108
examples/playgroud_example.py

@@ -1,108 +0,0 @@
-import hivemind
-from hivemind.optim.experimental.grad_averager import GradientAverager
-from hivemind.optim.experimental.power_ef_averager import PowerEFGradientAverager
-from hivemind.optim.experimental.power_sgd_averager import PowerSGDGradientAverager
-
-import faulthandler
-import torch
-import torch.nn as nn
-import torch.nn.functional as F
-import torchvision
-from torchvision.datasets import MNIST
-
-import multiprocessing as mp
-import threading
-import os
-import random
-import time
-
-
-print_step = 10
-
-
-class Peer(threading.Thread):
-    def __init__(self, idx, *, start: bool):
-        super().__init__(daemon=True)
-        self.dht = hivemind.DHT(initial_peers=dht_root.get_visible_maddrs(), start=True)
-        self.model = SmallCNN()
-        for param in self.model.parameters():
-            param.grad = torch.zeros_like(param).share_memory_()
-
-        if start:
-            self.start()
-
-        self.idx = idx
-        
-    def run(self):
-        torch.manual_seed(self.idx)
-        print('started', self.dht.peer_id)
-        transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])
-        train_data = MNIST(f".", download=True, transform=transform)
-
-        def data():
-            while True:
-                train_dataloader = torch.utils.data.DataLoader(train_data, num_workers=0, batch_size=64, shuffle=True)
-                for batch in train_dataloader:
-                    yield batch
-        
-        opt = hivemind.Optimizer(
-            dht=self.dht,
-            prefix="my_super_run",
-            params=self.model.parameters(),
-            optimizer=torch.optim.SGD,
-            lr=0.1,
-            train_batch_size=256,
-            batch_size=64
-        )
-        opt.load_state_from_peers()
-
-        for i, (xb, yb) in enumerate(data()):
-            logits = self.model(xb)
-            loss = F.cross_entropy(logits, yb)
-
-            loss.backward()
-            torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
-            
-            self.averager.accumulate_grads_(batch_size=64)
-
-            opt.step()
-            opt.zero_grad()
-            if i > 100000: break
-
-
-class SmallCNN(nn.Module):
-    def __init__(self):
-        super().__init__()
-
-        self.features = nn.Sequential(
-            nn.Conv2d(1, 16, (9, 9)),
-            nn.ReLU(),
-            nn.Conv2d(16, 16, (9, 9)),
-            nn.ReLU(),
-            nn.MaxPool2d(2)
-        )
-
-        self.cls = nn.Sequential(
-            nn.Linear(16 * 6 * 6, 400),
-            nn.ReLU(),
-            nn.Linear(400, 10)
-        )
-
-    def forward(self, x):
-        feature = self.features(x)
-        return self.cls(feature.view(x.size(0), -1))
-
-
-if __name__ == "__main__":
-    dht_root = hivemind.DHT(start=True)
-
-    peers = [
-        Peer(i, start=False) for i in range(4)
-    ]
-    for i in range(1, 4):
-        peers[i].model.load_state_dict(peers[0].model.state_dict())
-
-    for peer in peers:
-        peer.start()
-    for p in peers:
-        p.join()

+ 3 - 3
hivemind/averaging/averager.py

@@ -455,10 +455,10 @@ class DecentralizedAverager(mp.Process, ServicerBase):
 
                     group_info = await matchmaking_task
 
-                    with self._register_allreduce_group(group_info):
-                        if group_info is None:
-                            raise AllreduceException("Averaging step failed: could not find a group")
+                    if group_info is None:
+                         raise AllreduceException("Averaging step failed: could not find a group")
 
+                    with self._register_allreduce_group(group_info):
                         step.stage = AveragingStage.RUNNING_ALLREDUCE
 
                         step.set_result(

+ 13 - 3
hivemind/optim/experimental/optimizer.py

@@ -12,6 +12,8 @@ from hivemind.averaging.control import AveragingStage, StepControl
 from hivemind.compression import CompressionBase, NoCompression
 from hivemind.dht import DHT
 from hivemind.optim.experimental.grad_averager import GradientAverager
+from hivemind.optim.experimental.power_ef_averager import PowerEFGradientAverager
+from hivemind.optim.experimental.power_sgd_averager import PowerSGDGradientAverager
 from hivemind.optim.experimental.progress_tracker import ProgressTracker
 from hivemind.optim.experimental.state_averager import (
     LRSchedulerBase,
@@ -187,11 +189,13 @@ class Optimizer(torch.optim.Optimizer):
         client_mode: bool = None,
         auxiliary: bool = False,
         grad_compression: CompressionBase = NoCompression(),
+        grad_rank_averager: Optional[str] = None,
         state_averaging_compression: CompressionBase = NoCompression(),
         load_state_compression: CompressionBase = NoCompression(),
         average_opt_statistics: Sequence[str] = (),
         extra_tensors: Sequence[torch.Tensor] = (),
         averager_opts: Optional[dict] = None,
+        grad_averager_opts: Optional[dict] = None,
         tracker_opts: Optional[dict] = None,
         performance_ema_alpha: float = 0.1,
         shutdown_timeout: float = 5,
@@ -255,7 +259,7 @@ class Optimizer(torch.optim.Optimizer):
         )
         if not use_local_updates:
             self.grad_averager = self._make_gradient_averager(
-                reuse_grad_buffers=reuse_grad_buffers, compression=grad_compression, **averager_opts or {}
+                reuse_grad_buffers=reuse_grad_buffers, grad_rank_averager=grad_rank_averager, compression=grad_compression, **grad_averager_opts or {}
             )
         else:
             self.grad_averager = None
@@ -289,9 +293,15 @@ class Optimizer(torch.optim.Optimizer):
             **kwargs,
         )
 
-    def _make_gradient_averager(self, **kwargs) -> GradientAverager:
+    def _make_gradient_averager(self, grad_rank_averager, **kwargs) -> GradientAverager:
         assert hasattr(self, "state_averager"), "must initialize state averager first"
-        grad_averager = GradientAverager(
+        if grad_rank_averager == "power_ef":
+            grad_averager_type = PowerEFGradientAverager
+        elif grad_rank_averager == "power_sgd":
+            grad_averager_type = PowerSGDGradientAverager
+        else:
+            grad_averager_type = GradientAverager
+        grad_averager = grad_averager_type(
             dht=self.dht,
             prefix=f"{self.run_id}_grad_averager",
             parameters=self.state_averager.main_parameters,

+ 2 - 2
hivemind/optim/experimental/power_ef_averager.py

@@ -48,7 +48,7 @@ class PowerEFGradientAverager(GradientAverager):
     def __init__(
         self,
         parameters: Iterable[torch.nn.Parameter],
-        rank: int,
+        averager_rank: int,
         *,
         dht: hivemind.DHT,
         prefix: str,
@@ -58,7 +58,7 @@ class PowerEFGradientAverager(GradientAverager):
         warn: bool = True,
         **kwargs,
     ):
-        self.rank = rank
+        self.rank = averager_rank
         self.parameters = tuple(parameters)
         self._uncompressed_gradients = set(i for i, grad in enumerate(self._grads_from_parameters()) if len(tuple(grad.size())) == 1)
         self._gs = list(

+ 6 - 6
hivemind/optim/experimental/power_sgd_averager.py

@@ -49,20 +49,20 @@ class PowerSGDGradientAverager(PowerEFGradientAverager):
     def __init__(
         self,
         parameters: Iterable[torch.nn.Parameter],
-        rank: int,
+        averager_rank: int,
         *,
         dht: hivemind.DHT,
         prefix: str,
-        local_updates: bool = False,
+        averager_local_ef: bool = False,
         reuse_grad_buffers: bool = False,
         accumulate_grads_on: Optional[torch.device] = None,
         client_mode: bool = None,
         warn: bool = True,
         **kwargs,
     ):
-        self.rank = rank
+        self.rank = averager_rank
         self.parameters = tuple(parameters)
-        self._local_updates = local_updates
+        self._local_ef = averager_local_ef
         self._uncompressed_gradients = set(i for i, grad in enumerate(self._grads_from_parameters()) if len(tuple(grad.size())) == 1)
         self._ms = list(
             torch.zeros_like(grad, device=accumulate_grads_on)
@@ -83,7 +83,7 @@ class PowerSGDGradientAverager(PowerEFGradientAverager):
 
         super().__init__(
             self.parameters,
-            rank=rank,
+            averager_rank=averager_rank,
             dht=dht,
             prefix=prefix,
             reuse_grad_buffers=reuse_grad_buffers,
@@ -183,7 +183,7 @@ class PowerSGDGradientAverager(PowerEFGradientAverager):
                 for p, local_p, q, local_q, m, g in zip(ps, local_ps, self._qs, local_qs, self._ms, self._gs):
                     new_g = torch.matmul(p, q.t()).reshape(g.size())
                     g.copy_(new_g)
-                    sub_g = torch.matmul(local_p, local_q.t()).reshape(g.size()) if self._local_updates else new_g
+                    sub_g = torch.matmul(local_p, local_q.t()).reshape(g.size()) if self._local_ef else new_g
                     torch.sub(m, sub_g, out=m)
 
                 return allreduce1.gathered