Преглед изворни кода

Merge remote-tracking branch 'lah/master' into power_ef_new

Artem Chumachenko пре 3 година
родитељ
комит
c4b590b32f

+ 4 - 1
.github/workflows/check-style.yml

@@ -1,6 +1,9 @@
 name: Check style
 
-on: [ push, pull_request ]
+on:
+  push:
+    branches: [ master ]
+  pull_request:
 
 jobs:
   black:

+ 0 - 1
.github/workflows/push-docker-image.yml

@@ -8,7 +8,6 @@ on:
   pull_request:
     branches: [ master ]
 
-
 jobs:
   build:
     runs-on: ubuntu-latest

+ 4 - 2
.github/workflows/run-benchmarks.yml

@@ -1,7 +1,9 @@
 name: Benchmarks
 
-on: [ push, pull_request ]
-
+on:
+  push:
+    branches: [ master ]
+  pull_request:
 
 jobs:
   run_benchmarks:

+ 4 - 2
.github/workflows/run-tests.yml

@@ -1,7 +1,9 @@
 name: Tests
 
-on: [ push, pull_request ]
-
+on:
+  push:
+    branches: [ master ]
+  pull_request:
 
 jobs:
   run_tests:

+ 24 - 24
examples/albert/README.md

@@ -9,7 +9,7 @@ using `hivemind.CollaborativeOptimizer` to exchange information between peers.
 
 * Install hivemind: `pip install git+https://github.com/learning-at-home/hivemind.git`
 * Dependencies: `pip install -r requirements.txt`
-* Preprocess data: `python tokenize_wikitext103.py`
+* Preprocess data: `./tokenize_wikitext103.py`
 * Upload the data to a publicly available location or ask volunteers to preprocess it locally
 
 ## Running an experiment
@@ -20,18 +20,16 @@ Run the first DHT peer to welcome trainers and record training statistics (e.g.,
 
 - In this example, we use [wandb.ai](https://wandb.ai/site) to plot training metrics. If you're unfamiliar with Weights
   & Biases, here's a [quickstart tutorial](https://docs.wandb.ai/quickstart).
-- Run `python run_training_monitor.py --experiment_prefix YOUR_EXPERIMENT_NAME --wandb_project YOUR_WANDB_PROJECT`
+- Run `./run_training_monitor.py --wandb_project YOUR_WANDB_PROJECT`
 
-  - `YOUR_EXPERIMENT_NAME` must be a unique name of this training run, e.g. `my-albert-v1`. It cannot contain `.`
-    due to naming conventions.
   - `YOUR_WANDB_PROJECT` is a name of wandb project used to track training metrics. Multiple experiments can have the
     same project name.
 
 ```
-$ python run_training_monitor.py --experiment_prefix my-albert-v1 --wandb_project Demo-run
-Oct 14 16:26:36.083 [INFO] [utils.log_visible_maddrs:47] Running a DHT peer. To connect other peers to this one over the Internet,
+$ ./run_training_monitor.py --wandb_project Demo-run
+Oct 14 16:26:36.083 [INFO] Running a DHT peer. To connect other peers to this one over the Internet,
 use --initial_peers /ip4/1.2.3.4/tcp/1337/p2p/XXXX /ip4/1.2.3.4/udp/31337/quic/p2p/XXXX
-Oct 14 16:26:36.083 [INFO] [utils.log_visible_maddrs:50] Full list of visible multiaddresses: ...
+Oct 14 16:26:36.083 [INFO] Full list of visible multiaddresses: ...
 wandb: Currently logged in as: XXX (use `wandb login --relogin` to force relogin)
 wandb: Tracking run with wandb version 0.10.32
 wandb: Syncing run dry-mountain-2
@@ -39,12 +37,12 @@ wandb:  View project at https://wandb.ai/XXX/Demo-run
 wandb:  View run at https://wandb.ai/XXX/Demo-run/runs/YYY
 wandb: Run data is saved locally in /path/to/run/data
 wandb: Run `wandb offline` to turn off syncing.
-Oct 14 16:26:41.064 [INFO] [optim.collaborative._fetch_state:448] Found no active peers: None
-Oct 14 16:26:44.068 [INFO] [optim.collaborative._fetch_state:448] Found no active peers: None
+Oct 14 16:26:41.064 [INFO] Found no active peers: None
+Oct 14 16:26:44.068 [INFO] Found no active peers: None
 ...
-Oct 14 16:37:37.246 [INFO] [__main__.<module>:209] Step #1  loss = 11.05164
-Oct 14 16:39:37.441 [INFO] [__main__.<module>:209] Step #2  loss = 11.03771
-Oct 14 16:40:37.541 [INFO] [__main__.<module>:209] Step #3  loss = 11.02886
+Oct 14 16:37:37.246 [INFO] Step #1  loss = 11.05164
+Oct 14 16:39:37.441 [INFO] Step #2  loss = 11.03771
+Oct 14 16:40:37.541 [INFO] Step #3  loss = 11.02886
 ```
 
 ### GPU trainers
@@ -57,8 +55,8 @@ To join the collaboration with a GPU trainer,
   (see [default paths](./arguments.py#L117-L134) for reference)
 - Run:
   ```bash
-  python run_trainer.py \
-      --experiment_prefix YOUR_EXPERIMENT_NAME --initial_peers ONE_OR_MORE_PEERS \
+  ./run_trainer.py \
+      --initial_peers ONE_OR_MORE_PEERS \
       --logging_first_step --output_dir ./outputs --overwrite_output_dir --logging_dir ./logs
   ```
 
@@ -89,16 +87,18 @@ See the ["Tips and tricks"](#tips-and-tricks) section for more information on se
 As the peer begins training, it will periodically report training logs in the following form:
 
 ```
-... [INFO] [...] my-albert-v1 accumulated 448 samples from 17 peers for step #0. ETA 18.88 sec (refresh in 15.73 sec)
-... [INFO] [...] my-albert-v1 accumulated 4096 samples from 16 peers for step #0. ETA 0.00 sec (refresh in 0.50 sec)
-... [INFO] [optim.collaborative.step:283] Averaged tensors successfully with 17 peers
-... [INFO] [optim.collaborative.step:317] Optimizer step: done!
-Oct 14 18:58:03.750 [INFO] [__main__.on_step_end:141] Step 1
-Oct 14 18:58:03.750 [INFO] [__main__.on_step_end:142] Your current contribution: 892 samples
-Oct 14 18:58:03.750 [INFO] [__main__.on_step_end:143] Local loss: 11.023
+Dec 28 00:15:31.482 [INFO] albert accumulated 4056 samples for epoch #0 from 2 peers. ETA 0.75 sec (refresh in 0.50 sec)
+Dec 28 00:15:31.990 [INFO] albert accumulated 4072 samples for epoch #0 from 2 peers. ETA 0.24 sec (refresh in 0.50 sec)
+...
+Dec 28 00:15:32.857 [INFO] Step #1
+Dec 28 00:15:32.857 [INFO] Your current contribution: 2144 samples
+Dec 28 00:15:32.857 [INFO] Performance: 20.924 samples/sec
+Dec 28 00:15:32.857 [INFO] Local loss: 11.06709
+Dec 28 00:15:33.580 [INFO] Averaged gradients with 2 peers
+Dec 28 00:15:38.336 [INFO] Averaged parameters with 2 peers
 ```
 
-__Sanity check:__ a healthy peer will periodically report `Averaged tensors successfully with [N > 1]` peers.
+__Sanity check:__ a healthy peer will periodically report `Averaged gradients/parameters with [N > 1]` peers.
 
 For convenience, you can view (and share!) the learning curves of your collaborative experiments in wandb:
 
@@ -169,8 +169,8 @@ Here's an example of a full trainer script for Google Colab:
 !pip install transformers datasets sentencepiece torch_optimizer==0.1.0
 !git clone https://github.com/learning-at-home/hivemind && cd hivemind && pip install -e .
 !curl -L YOUR_HOSTED_DATA | tar xzf -
-!ulimit -n 4096 && python ./hivemind/examples/albert/run_trainer.py \
-    --experiment_prefix YOUR_EXPERIMENT_NAME --initial_peers ONE_OR_MORE_PEERS \
+!ulimit -n 4096 && ./hivemind/examples/albert/run_trainer.py \
+    --initial_peers ONE_OR_MORE_PEERS \
     --logging_dir ./logs --logging_first_step --output_dir ./outputs --overwrite_output_dir \
     --client_mode --averaging_expiration 10 --batch_size_lead 300 --gradient_accumulation_steps 1
 ```

+ 1 - 1
examples/albert/arguments.py

@@ -7,7 +7,7 @@ from transformers import TrainingArguments
 @dataclass
 class BaseTrainingArguments:
     experiment_prefix: str = field(
-        metadata={"help": "A unique 'name' of this experiment, used to store metadata on the DHT"}
+        default="albert", metadata={"help": "A unique 'name' of this experiment, used to store metadata on the DHT"}
     )
     initial_peers: List[str] = field(
         default_factory=list,

+ 5 - 5
examples/albert/requirements.txt

@@ -1,7 +1,7 @@
-transformers>=4.6.0
-datasets>=1.5.0
-torch_optimizer>=0.1.0
-wandb>=0.10.26
+transformers==4.6.0
+datasets==1.5.0
+torch_optimizer==0.1.0
+wandb==0.10.26
 sentencepiece
 requests
-nltk>=3.6.2
+nltk==3.6.5

+ 42 - 39
examples/albert/run_trainer.py

@@ -1,7 +1,8 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 import os
 import pickle
+import sys
 from dataclasses import asdict
 from pathlib import Path
 
@@ -58,36 +59,6 @@ def get_model(training_args, config, tokenizer):
     return model
 
 
-def get_optimizer_and_scheduler(training_args, model):
-    no_decay = ["bias", "LayerNorm.weight"]
-    optimizer_grouped_parameters = [
-        {
-            "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
-            "weight_decay": training_args.weight_decay,
-        },
-        {
-            "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
-            "weight_decay": 0.0,
-        },
-    ]
-
-    opt = Lamb(
-        optimizer_grouped_parameters,
-        lr=training_args.learning_rate,
-        betas=(training_args.adam_beta1, training_args.adam_beta2),
-        eps=training_args.adam_epsilon,
-        weight_decay=training_args.weight_decay,
-        clamp_value=training_args.clamp_value,
-        debias=True,
-    )
-
-    scheduler = get_linear_schedule_with_warmup(
-        opt, num_warmup_steps=training_args.warmup_steps, num_training_steps=training_args.max_steps
-    )
-
-    return opt, scheduler
-
-
 class CollaborativeCallback(transformers.TrainerCallback):
     """
     This callback monitors and reports collaborative training progress.
@@ -149,9 +120,9 @@ class CollaborativeCallback(transformers.TrainerCallback):
                 )
                 logger.info(f"Step #{self.optimizer.local_epoch}")
                 logger.info(f"Your current contribution: {self.total_samples_processed} samples")
-                logger.info(f"Performance: {samples_per_second} samples per second.")
+                logger.info(f"Performance: {samples_per_second:.3f} samples/sec")
                 if self.steps:
-                    logger.info(f"Local loss: {self.loss / self.steps}")
+                    logger.info(f"Local loss: {self.loss / self.steps:.5f}")
                 if self.optimizer.local_epoch % self.backup_every_steps == 0:
                     self.latest_backup = self.backup_state()
 
@@ -219,10 +190,7 @@ def main():
         )
     )
     training_args, dataset_args, collaboration_args, averager_args, tracker_args = parser.parse_args_into_dataclasses()
-
     logger.info(f"Found {len(collaboration_args.initial_peers)} initial peers: {collaboration_args.initial_peers}")
-    if len(collaboration_args.initial_peers) == 0:
-        raise ValueError("Please specify at least one network endpoint in initial peers.")
 
     setup_transformers_logging(training_args.local_rank)
     logger.info(f"Training/evaluation parameters:\n{training_args}")
@@ -231,7 +199,15 @@ def main():
     set_seed(training_args.seed)
 
     config = AlbertConfig.from_pretrained(dataset_args.config_path, cache_dir=dataset_args.cache_dir)
-    tokenizer = AlbertTokenizerFast.from_pretrained(dataset_args.tokenizer_path, cache_dir=dataset_args.cache_dir)
+    try:
+        tokenizer = AlbertTokenizerFast.from_pretrained(dataset_args.tokenizer_path, cache_dir=dataset_args.cache_dir)
+    except OSError:
+        logger.fatal(
+            f"No tokenizer data found in {dataset_args.tokenizer_path}, "
+            f"please run ./tokenize_wikitext103.py before running this"
+        )
+        sys.exit(1)
+
     model = get_model(training_args, config, tokenizer)
     model.to(training_args.device)
 
@@ -239,8 +215,6 @@ def main():
     # This data collator will take care of randomly masking the tokens.
     data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer)
 
-    opt, scheduler = get_optimizer_and_scheduler(training_args, model)
-
     validators, local_public_key = utils.make_validators(collaboration_args.experiment_prefix)
 
     dht = DHT(
@@ -261,12 +235,41 @@ def main():
 
     adjusted_target_batch_size = collaboration_args.target_batch_size - collaboration_args.batch_size_lead
 
+    # We need to make such a lambda function instead of just an optimizer instance
+    # to make hivemind.Optimizer(..., offload_optimizer=True) work
+    opt = lambda params: Lamb(
+        params,
+        lr=training_args.learning_rate,
+        betas=(training_args.adam_beta1, training_args.adam_beta2),
+        eps=training_args.adam_epsilon,
+        weight_decay=training_args.weight_decay,
+        clamp_value=training_args.clamp_value,
+        debias=True,
+    )
+
+    no_decay = ["bias", "LayerNorm.weight"]
+    params = [
+        {
+            "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
+            "weight_decay": training_args.weight_decay,
+        },
+        {
+            "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
+            "weight_decay": 0.0,
+        },
+    ]
+
+    scheduler = lambda opt: get_linear_schedule_with_warmup(
+        opt, num_warmup_steps=training_args.warmup_steps, num_training_steps=training_args.max_steps
+    )
+
     optimizer = Optimizer(
         dht=dht,
         run_id=collaboration_args.experiment_prefix,
         target_batch_size=adjusted_target_batch_size,
         batch_size_per_step=total_batch_size_per_step,
         optimizer=opt,
+        params=params,
         scheduler=scheduler,
         matchmaking_time=collaboration_args.matchmaking_time,
         averaging_timeout=collaboration_args.averaging_timeout,

+ 1 - 1
examples/albert/run_training_monitor.py

@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 import time
 from dataclasses import asdict, dataclass, field

+ 1 - 1
examples/albert/tokenize_wikitext103.py

@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 """ This script builds a pre-tokenized compressed representation of WikiText-103 using huggingface/datasets """
 import random
 from functools import partial

+ 9 - 5
hivemind/averaging/allreduce.py

@@ -4,7 +4,7 @@ from typing import Any, AsyncIterator, Dict, Optional, Sequence, Set, Tuple, Typ
 
 import torch
 
-from hivemind.averaging.partition import AllreduceException, TensorPartContainer, TensorPartReducer
+from hivemind.averaging.partition import AllreduceException, BannedException, TensorPartContainer, TensorPartReducer
 from hivemind.compression import deserialize_torch_tensor, serialize_torch_tensor
 from hivemind.p2p import P2P, P2PContext, PeerID, ServicerBase, StubBase
 from hivemind.proto import averaging_pb2
@@ -343,10 +343,14 @@ class AllReduceRunner(ServicerBase):
                 stream,
                 max_prefetch=self.tensor_part_container.prefetch,
             ):
-                averaged_part = await self.tensor_part_reducer.accumulate_part(
-                    sender_index, part_index, tensor_part, weight=weight
-                )
-                part_index += 1
+                try:
+                    averaged_part = await self.tensor_part_reducer.accumulate_part(
+                        sender_index, part_index, tensor_part, weight=weight
+                    )
+                    part_index += 1
+                except BannedException:
+                    logger.debug(f"Sender {sender_index} is already banned")
+                    break  # sender was banned, we no longer need to aggregate it
 
                 serialized_delta = await loop.run_in_executor(
                     None, lambda: serialize_torch_tensor(averaged_part - tensor_part, part_compression)

+ 2 - 4
hivemind/averaging/matchmaking.py

@@ -9,8 +9,6 @@ import random
 from math import isfinite
 from typing import AsyncIterator, Dict, Optional, Set, Tuple, Type
 
-import numpy as np
-
 from hivemind.averaging.control import StepControl
 from hivemind.averaging.group_info import GroupInfo
 from hivemind.averaging.key_manager import GroupKey, GroupKeyManager
@@ -203,7 +201,7 @@ class Matchmaking:
                 message = await asyncio.wait_for(anext(stream), timeout=self.request_timeout)
 
                 if message.code == averaging_pb2.ACCEPTED:
-                    logger.debug(f"{self.peer_id} - joining the group of {leader}; waiting for peers")
+                    logger.debug(f"{self.peer_id} - joining the group of {leader}, waiting for peers")
                     self.current_leader = leader
                     self.was_accepted_to_group.set()
                     if len(self.current_followers) > 0:
@@ -242,7 +240,7 @@ class Matchmaking:
             logger.debug(f"{self} - potential leader {leader} did not respond within {self.request_timeout}")
             return None
         except (P2PHandlerError, ControlFailure, DispatchFailure, StopAsyncIteration) as e:
-            logger.debug(f"{self} - failed to request potential leader {leader}:")
+            logger.debug(f"{self} - failed to request potential leader {leader}:", exc_info=True)
             return None
 
         finally:

+ 9 - 0
hivemind/averaging/partition.py

@@ -227,6 +227,9 @@ class TensorPartReducer:
             await asyncio.wait({self.current_part_future, self.finished.wait()}, return_when=asyncio.FIRST_COMPLETED)
             if self.finished.is_set():
                 raise AllreduceException(f"attempted to aggregate part in a finalized {self.__class__.__name__}")
+
+        if self.sender_failed_after[sender_index] != float("inf"):
+            raise BannedException(f"sender {sender_index} was banned in background")
         assert part_index == self.current_part_index
 
         current_part_future = self.current_part_future
@@ -241,6 +244,8 @@ class TensorPartReducer:
     def on_sender_failed(self, sender_index: int):
         """Exclude that sender's data for averaging any parts that it did not submit yet."""
         self.sender_failed_after[sender_index] = self.num_parts_received[sender_index]
+        if self.finished.is_set():
+            return
         if self.current_part_index == self.num_parts_received[sender_index]:
             self.num_current_senders -= 1
             self.check_current_part_finished()
@@ -270,3 +275,7 @@ class TensorPartReducer:
 
 class AllreduceException(Exception):
     """A special exception that is raised when allreduce can't continue normally (e.g. disconnected/protocol error)"""
+
+
+class BannedException(AllreduceException):
+    """An exception that indicates that a given sender was banned and will no longer be aggregated"""

+ 16 - 5
hivemind/optim/grad_scaler.py

@@ -35,6 +35,7 @@ class GradScaler(TorchGradScaler):
         super().__init__(*args, **kwargs)
         self._is_running_global_step = False
         self._is_ready_to_update = False
+        self._inner_optimizer_states = {}
         self._optimizer_states_to_reset = set()
         self._lock = threading.RLock()
 
@@ -52,7 +53,12 @@ class GradScaler(TorchGradScaler):
             assert isinstance(optimizer, (hivemind.Optimizer, hivemind.DecentralizedOptimizerBase))
             if self._is_running_global_step:
                 super().unscale_(optimizer)
-                self._per_optimizer_states[id(optimizer.opt)] = deepcopy(self._per_optimizer_states[id(optimizer)])
+                self._inner_optimizer_states[id(optimizer.opt)] = deepcopy(self._per_optimizer_states[id(optimizer)])
+                # note: we store unscaled optimizer state in a separate dict and not in _per_optimizer_states in order
+                # to avoid an edge case where full DPU peer encounters overflow in local gradients while averaging
+                # offloaded gradients (i.e. after global unscale but before global step). Due to overflow, next call to
+                # .update on user side would reset *all* optimizer states and cause .step to unscale gradients twice.
+                # Offloaded optimizer is not affected by overflow in on-device gradients and should not be reset.
                 return True
             else:
                 self._check_inf_per_device(optimizer)
@@ -62,14 +68,19 @@ class GradScaler(TorchGradScaler):
     def step(self, optimizer: TorchOptimizer, *args, **kwargs) -> bool:
         if self._is_running_global_step and not isinstance(optimizer, hivemind.Optimizer):
             # ^-- invoked privately within hivemind optimizer
+            inner_optimizer = optimizer
             with self._lock:
                 if self._is_ready_to_update:
                     logger.warning("Please call grad_scaler.update() after each step")
+
+                inner_optimizer_state = self._inner_optimizer_states.pop(id(inner_optimizer), None)
+                if inner_optimizer_state is not None:
+                    self._per_optimizer_states[id(inner_optimizer)] = inner_optimizer_state
                 assert (
-                    self._per_optimizer_states[id(optimizer)]["stage"] == OptState.UNSCALED
-                ), "InternalError: Optimizer should have called .unscale internally before invoking grad_scaler.step."
-                if self.are_grads_finite(optimizer, use_cached=True):
-                    super().step(optimizer, *args, **kwargs)
+                    self._per_optimizer_states[id(inner_optimizer)]["stage"] == OptState.UNSCALED
+                ), "InternalError: Optimizer should have called .unscale internally before invoking grad_scaler.step"
+                if self.are_grads_finite(inner_optimizer, use_cached=True):
+                    super().step(inner_optimizer, *args, **kwargs)
                 else:
                     logger.warning("Skipping global step due to gradient over/underflow")
                 self._is_ready_to_update = True

+ 2 - 1
hivemind/optim/optimizer.py

@@ -195,6 +195,8 @@ class Optimizer(torch.optim.Optimizer):
         shutdown_timeout: float = 5,
         verbose: bool = False,
     ):
+        self._parent_pid = os.getpid()
+
         client_mode = client_mode if client_mode is None else dht.client_mode
         delay_optimizer_step = delay_optimizer_step if delay_optimizer_step is not None else delay_grad_averaging
         offload_optimizer = offload_optimizer if offload_optimizer is not None else (params is not None)
@@ -273,7 +275,6 @@ class Optimizer(torch.optim.Optimizer):
 
         self._should_check_synchronization_on_update = True  # used in self.should_load_state_from_peers
         self._schema_hash = self._compute_schema_hash()
-        self._parent_pid = os.getpid()
 
         self.delay_before_state_averaging = PerformanceEMA(alpha=performance_ema_alpha)
         # measures the average time from the beginning of self._update_global_epoch to the call to state_averager

+ 17 - 12
hivemind/optim/progress_tracker.py

@@ -195,6 +195,7 @@ class ProgressTracker(threading.Thread):
     async def _progress_reporter(self):
         """Periodically publish metadata and the current number of samples accumulated towards the next epoch"""
         last_report_time = -float("inf")
+        last_report_epoch = -float("inf")
         store_task = None
         try:
             while not self.shutdown_triggered.is_set():
@@ -209,19 +210,23 @@ class ProgressTracker(threading.Thread):
 
                 local_progress = self.local_progress
                 last_report_time = get_dht_time()
-
-                store_task = asyncio.create_task(
-                    asyncio.wait_for(
-                        self.dht.store(
-                            key=self.training_progress_key,
-                            subkey=self._local_public_key,
-                            value=local_progress.dict(),
-                            expiration_time=last_report_time + self.metadata_expiration,
-                            return_future=True,
-                        ),
-                        timeout=self.metadata_expiration,
+                if local_progress.samples_accumulated > 0:
+                    last_report_epoch = self.global_epoch
+
+                if last_report_epoch >= self.global_epoch - 1:
+                    # report progress if peer is synchronized and actively reporting samples. Do not report aux peers.
+                    store_task = asyncio.create_task(
+                        asyncio.wait_for(
+                            self.dht.store(
+                                key=self.training_progress_key,
+                                subkey=self._local_public_key,
+                                value=local_progress.dict(),
+                                expiration_time=last_report_time + self.metadata_expiration,
+                                return_future=True,
+                            ),
+                            timeout=self.metadata_expiration,
+                        )
                     )
-                )
         finally:
             logger.log(self.status_loglevel, f"No longer reporting progress for {self.prefix}")
             if store_task is not None:

+ 9 - 0
hivemind/optim/state_averager.py

@@ -152,6 +152,15 @@ class TrainingStateAverager(DecentralizedAverager):
         parameter_names = tuple(nested_flatten(parameter_names))
         assert len(parameters) == len(parameter_names), f"Expected {len(parameters)} names, got {len(parameter_names)}"
         assert len(set(parameters)) == len(parameters), "Found duplicate parameters in param_groups"
+        params_with_grad = sum(p.numel() for p in parameters if p.requires_grad)
+        params_no_grad = sum(p.numel() for p in parameters if not p.requires_grad)
+        if params_no_grad >= params_with_grad:
+            logger.warning(
+                "The majority of parameters have requires_grad=False, but they are still synchronized"
+                " with peers. If these parameters are frozen (not updated), please do not feed them into "
+                "the optimizer at all in order to avoid communication overhead. Proceeding anyway."
+            )
+
         return param_groups, parameters, parameter_names
 
     def _make_averaged_parameters(self, main_parameters: Sequence[torch.Tensor]):

+ 5 - 0
hivemind/p2p/p2p_daemon.py

@@ -140,6 +140,11 @@ class P2P:
         socket_uid = secrets.token_urlsafe(8)
         self._daemon_listen_maddr = Multiaddr(cls._UNIX_SOCKET_PREFIX + f"p2pd-{socket_uid}.sock")
         self._client_listen_maddr = Multiaddr(cls._UNIX_SOCKET_PREFIX + f"p2pclient-{socket_uid}.sock")
+        if announce_maddrs is not None:
+            for addr in announce_maddrs:
+                addr = Multiaddr(addr)
+                if ("tcp" in addr and addr["tcp"] == "0") or ("udp" in addr and addr["udp"] == "0"):
+                    raise ValueError("Please specify an explicit port in announce_maddrs: port 0 is not supported")
 
         need_bootstrap = bool(initial_peers) or use_ipfs
         process_kwargs = cls.DHT_MODE_MAPPING.get(dht_mode, {"dht": 0})