5
0

backend.py 4.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. """Code for serving bloom blocks via hivemind-server"""
  2. from typing import Any, Dict, Optional, Sequence, Tuple
  3. import torch
  4. from hivemind import BatchTensorDescriptor, use_hivemind_log_handler
  5. from hivemind.moe.server.module_backend import ModuleBackend
  6. from hivemind.utils import get_logger
  7. from src.bloom.from_pretrained import BloomBlock
  8. from src.server.cache import MemoryCache
  9. from src.server.task_pool import PrioritizedTaskPool
  10. from src.utils.misc import is_dummy
  11. use_hivemind_log_handler("in_root_logger")
  12. logger = get_logger(__file__)
  13. class TransformerBackend(ModuleBackend):
  14. """A wrapper for BloomBlock that can process requests for bloom layer forward, forward_incremental, and backward"""
  15. def __init__(self, *args, memory_cache: MemoryCache, backend_dtype: Optional[torch.dtype] = None, **kwargs):
  16. super().__init__(*args, **kwargs)
  17. assert isinstance(self.module, BloomBlock)
  18. self.memory_cache = memory_cache
  19. for name, param in self.module.named_parameters():
  20. assert not param.requires_grad, f"Bloom layer parameters must not accumulate gradients, but {name} does"
  21. for name, buf in self.module.named_buffers():
  22. assert not buf.requires_grad, f"Bloom layer parameters must not accumulate gradients, but {name} does"
  23. self.inference_pool = PrioritizedTaskPool(
  24. self.inference_step, max_batch_size=self.forward_pool.max_batch_size, name=f"{self.name}_inference"
  25. )
  26. self.forward_pool = PrioritizedTaskPool(self.forward, name=f"{self.name}_forward", **kwargs)
  27. self.backward_pool = PrioritizedTaskPool(self.backward, name=f"{self.name}_backward", **kwargs)
  28. self.dtype = backend_dtype if backend_dtype else self.module.input_layernorm.weight.dtype
  29. self.inference_schema = (
  30. (
  31. *self.args_schema,
  32. BatchTensorDescriptor((), dtype=self.dtype),
  33. BatchTensorDescriptor((), dtype=torch.int64),
  34. ),
  35. self.kwargs_schema,
  36. )
  37. def inference_step(self, cache_metadata: torch.IntTensor, *inputs: torch.Tensor) -> Tuple[torch.Tensor, ...]:
  38. with torch.inference_mode():
  39. attention_cache_handle = int(cache_metadata[0, 0].item())
  40. prefix_length = int(cache_metadata[0, 1].item())
  41. (hidden_states, hypo_ids) = inputs
  42. assert (
  43. hidden_states.ndim == 3
  44. ), "expected hidden states to be 3-dimensional: [batch_size, seq_len, hid_size]"
  45. with self.memory_cache.use_cache(attention_cache_handle) as cache:
  46. assert isinstance(self.module, BloomBlock) and cache.shape[0] == 2 and cache.ndim == 5
  47. if not is_dummy(hypo_ids):
  48. cache[:, :] = cache[:, hypo_ids] # in-place reorder cache by hypo ids
  49. layer_past = past_k, past_v = cache[0, :, :prefix_length], cache[1, :, :prefix_length]
  50. print("METADATA:", cache_metadata, past_k.shape, past_v.shape)
  51. hidden_states, (new_k, new_v) = self.module.forward(
  52. hidden_states, layer_past=layer_past, use_cache=True
  53. )
  54. # todo remove these asserts once we pass all tests
  55. new_length = new_v.shape[1]
  56. assert new_length > prefix_length
  57. assert new_k.shape[0] == past_k.shape[0] and new_v.shape[0] == past_v.shape[0]
  58. assert new_k.shape[1] == new_length and new_v.shape[1] == new_length
  59. assert new_k.shape[2:] == past_k.shape[2:] and new_v.shape[2:] == past_v.shape[2:]
  60. cache[0, :, prefix_length:new_length, :] = new_k[:, prefix_length:new_length]
  61. cache[1, :, prefix_length:new_length, :] = new_v[:, prefix_length:new_length]
  62. return (hidden_states,)
  63. def get_pools(self) -> Sequence[PrioritizedTaskPool]:
  64. return self.forward_pool, self.backward_pool, self.inference_pool
  65. def get_info(self) -> Dict[str, Any]:
  66. """Get expert parameters and stats. Used by RemoteExpert to check shapes and for DMoE orchestration."""
  67. return dict(super().get_info(), inference_schema=self.inference_schema)