|
@@ -3,6 +3,7 @@ from __future__ import annotations
|
|
|
import contextlib
|
|
|
import logging
|
|
|
import random
|
|
|
+from typing import Union, Optional
|
|
|
|
|
|
import torch
|
|
|
from hivemind import DHT, P2P, get_logger, use_hivemind_log_handler
|
|
@@ -25,7 +26,15 @@ class RemoteSequential(nn.Module):
|
|
|
A sequence of transformer blocks hosted by the swarm.
|
|
|
"""
|
|
|
|
|
|
- def __init__(self, config: src.DistributedBloomConfig, dht: DHT, prefix: str, max_retries: int = 3):
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ config: src.DistributedBloomConfig,
|
|
|
+ dht: DHT,
|
|
|
+ prefix: str,
|
|
|
+ max_retries: int = 3,
|
|
|
+ p2p: Optional[P2P] = None,
|
|
|
+ sequence_manager: Optional[RemoteSequenceManager] = None,
|
|
|
+ ):
|
|
|
logger.warning(f"{self.__class__.__name__} is in active development; expect adventures")
|
|
|
if prefix.endswith(UID_DELIMITER):
|
|
|
logger.warning(
|
|
@@ -39,12 +48,17 @@ class RemoteSequential(nn.Module):
|
|
|
self.dht = dht
|
|
|
self.prefix = prefix
|
|
|
self.max_retries = max_retries
|
|
|
- self.p2p = RemoteExpertWorker.run_coroutine(dht.replicate_p2p())
|
|
|
-
|
|
|
- block_uids = tuple(f"{prefix}{UID_DELIMITER}{i}" for i in range(config.n_layer))
|
|
|
-
|
|
|
- logger.debug(f"Remote block uids: {block_uids}")
|
|
|
- self.remote_sequence_info = RemoteSequenceManager(dht, block_uids)
|
|
|
+ self.p2p = RemoteExpertWorker.run_coroutine(dht.replicate_p2p()) if p2p is None else p2p
|
|
|
+
|
|
|
+ block_uids = [f"{prefix}{UID_DELIMITER}{i}" for i in range(config.n_layer)]
|
|
|
+ if sequence_manager is None:
|
|
|
+ logger.debug(f"Creating new sequence manager for block uids: {block_uids}")
|
|
|
+ self.sequence_manager = RemoteSequenceManager(dht, block_uids)
|
|
|
+ self.is_subsequence = False
|
|
|
+ else:
|
|
|
+ assert isinstance(sequence_manager.block_uids, list)
|
|
|
+ logger.debug(f"Reusing sequence manager with {len(self.sequence_manager)}")
|
|
|
+ self.is_subsequence = self.sequence_manager.block_uids == block_uids
|
|
|
|
|
|
def forward(self, inputs: torch.Tensor):
|
|
|
assert isinstance(inputs, torch.Tensor) and inputs.ndim == 3 and inputs.shape[-1] == self.config.n_embed
|
|
@@ -64,21 +78,32 @@ class RemoteSequential(nn.Module):
|
|
|
logging.debug(f"Caught {e} when running forward for block {block_index}", exc_info=True)
|
|
|
return inputs
|
|
|
|
|
|
- def __getitem__(self, block_index: int):
|
|
|
- assert 0 <= block_index < self.config.n_layer
|
|
|
- (module,) = _create_remote_modules_from_infos([self.remote_sequence_info.block_infos[block_index]], self.p2p)
|
|
|
- return module
|
|
|
+ def __getitem__(self, ix: Union[int, slice]) -> Union[RemoteTransformerBlock, RemoteSequential]:
|
|
|
+ assert isinstance(ix, (int, slice))
|
|
|
+ if isinstance(ix, int):
|
|
|
+ assert 0 <= ix < self.config.n_layer
|
|
|
+ (module,) = _create_remote_modules_from_infos([self.sequence_manager.block_infos[ix]], self.p2p)
|
|
|
+ return module
|
|
|
+ else:
|
|
|
+ return RemoteSequential(
|
|
|
+ self.config,
|
|
|
+ self.dht,
|
|
|
+ prefix=self.prefix,
|
|
|
+ max_retries=self.max_retries,
|
|
|
+ p2p=self.p2p,
|
|
|
+ sequence_manager=self.sequence_manager[ix],
|
|
|
+ )
|
|
|
|
|
|
def __iter__(self):
|
|
|
for block_index in range(self.config.n_layer):
|
|
|
yield self[block_index]
|
|
|
|
|
|
def __len__(self):
|
|
|
- return len(self.remote_sequence_info)
|
|
|
+ return len(self.sequence_manager)
|
|
|
|
|
|
def inference_session(self) -> RemoteSequentialInferenceSession:
|
|
|
- self.remote_sequence_info.update_()
|
|
|
- return RemoteSequentialInferenceSession(self.remote_sequence_info, self.p2p)
|
|
|
+ self.sequence_manager.update_()
|
|
|
+ return RemoteSequentialInferenceSession(self.sequence_manager, self.p2p)
|
|
|
|
|
|
|
|
|
class RemoteSequentialInferenceSession:
|