backend.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. """Code for serving bloom blocks via hivemind-server"""
  2. from typing import Sequence, Tuple
  3. import torch
  4. from hivemind.moe.server.module_backend import ModuleBackend
  5. from hivemind.moe.server.task_pool import TaskPool
  6. from src.bloom.from_pretrained import BloomBlock
  7. from src.server.cache import MemoryCache
  8. MAX_LENGTH = 2048
  9. class TransformerBackend(ModuleBackend):
  10. """A wrapper for BloomBlock that can process requests for bloom layer forward, forward_incremental, and backward"""
  11. def __init__(self, *args, memory_cache: MemoryCache, **kwargs):
  12. super().__init__(*args, **kwargs)
  13. assert isinstance(self.module, BloomBlock)
  14. self.memory_cache = memory_cache
  15. for name, param in self.module.named_parameters():
  16. assert not param.requires_grad, f"Bloom layer parameters must not accumulate gradients, but {name} does"
  17. for name, buf in self.module.named_buffers():
  18. assert not buf.requires_grad, f"Bloom layer parameters must not accumulate gradients, but {name} does"
  19. self.inference_pool = TaskPool(self.inference_step, max_batch_size=1, name=f"{self.name}_inference")
  20. def inference_step(self, cache_metadata: torch.IntTensor, *inputs: torch.Tensor) -> Tuple[torch.Tensor, ...]:
  21. with torch.inference_mode():
  22. attention_cache_handle = int(cache_metadata[0, 0].item())
  23. prefix_length = int(cache_metadata[0, 1].item())
  24. hidden_states = inputs[0] # todo: in future, it would be best to support attention mask here
  25. assert (
  26. hidden_states.ndim == 3
  27. ), "expected hidden states to be 3-dimensional: [batch_size, seq_len, hid_size]"
  28. with self.memory_cache.use_cache(attention_cache_handle) as cache:
  29. assert isinstance(self.module, BloomBlock) and cache.shape[0] == 2 and cache.ndim == 5
  30. layer_past = past_k, past_v = cache[0, :, :prefix_length], cache[1, :, :prefix_length]
  31. print("METADATA:", cache_metadata, past_k.shape, past_v.shape)
  32. hidden_states, (new_k, new_v) = self.module.forward(
  33. hidden_states, layer_past=layer_past, use_cache=True
  34. )
  35. # todo remove these asserts once we pass all tests
  36. new_length = new_v.shape[1]
  37. assert new_length > prefix_length
  38. assert new_k.shape[0] == past_k.shape[0] and new_v.shape[0] == past_v.shape[0]
  39. assert new_k.shape[1] == new_length and new_v.shape[1] == new_length
  40. assert new_k.shape[2:] == past_k.shape[2:] and new_v.shape[2:] == past_v.shape[2:]
  41. assert torch.allclose(new_v[:, : past_v.shape[1]], past_v)
  42. assert torch.allclose(new_k[:, : past_k.shape[1]], past_k)
  43. cache[0, :, prefix_length:new_length, :] = new_k[:, prefix_length:new_length]
  44. cache[1, :, prefix_length:new_length, :] = new_v[:, prefix_length:new_length]
  45. return (hidden_states,)
  46. def get_pools(self) -> Sequence[TaskPool]:
  47. return self.forward_pool, self.backward_pool, self.inference_pool