backend.py 4.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. """Code for serving bloom blocks via hivemind-server"""
  2. from queue import Empty
  3. from typing import Optional, Sequence, Tuple
  4. import torch
  5. from hivemind import use_hivemind_log_handler
  6. from hivemind.moe.server.module_backend import ModuleBackend
  7. from hivemind.moe.server.task_pool import TaskPool
  8. from hivemind.utils import InvalidStateError, get_logger
  9. from src.bloom.from_pretrained import BloomBlock
  10. from src.server.cache import MemoryCache
  11. use_hivemind_log_handler("in_root_logger")
  12. logger = get_logger(__file__)
  13. MAX_LENGTH = 2048
  14. class InferenceTaskPool(TaskPool):
  15. def __init__(self, *args, **kwargs):
  16. super().__init__(*args, **kwargs)
  17. assert self.min_batch_size == 1, "min_batch_size in InferenceTaskPool cannot be greater 1"
  18. def iterate_minibatches(self, *args, **kwargs):
  19. """Form minibatches by grouping one or more tasks together up to self.max_batch_size"""
  20. while True:
  21. try:
  22. logger.debug(f"{self.name} getting next task")
  23. task = self.tasks.get(timeout=self.timeout)
  24. except Empty:
  25. logger.warning(f"Timeout reached but batch doesn't contain >={self.min_batch_size} elements yet")
  26. continue
  27. try:
  28. if task.future.set_running_or_notify_cancel():
  29. yield [task]
  30. except InvalidStateError as e:
  31. logger.debug(f"Failed to add task to batch: {task.future} raised {e}")
  32. class TransformerBackend(ModuleBackend):
  33. """A wrapper for BloomBlock that can process requests for bloom layer forward, forward_incremental, and backward"""
  34. def __init__(self, *args, memory_cache: MemoryCache, backend_dtype: Optional[torch.dtype] = None, **kwargs):
  35. super().__init__(*args, **kwargs)
  36. assert isinstance(self.module, BloomBlock)
  37. self.memory_cache = memory_cache
  38. for name, param in self.module.named_parameters():
  39. assert not param.requires_grad, f"Bloom layer parameters must not accumulate gradients, but {name} does"
  40. for name, buf in self.module.named_buffers():
  41. assert not buf.requires_grad, f"Bloom layer parameters must not accumulate gradients, but {name} does"
  42. self.inference_pool = InferenceTaskPool(
  43. self.inference_step, max_batch_size=self.forward_pool.max_batch_size, name=f"{self.name}_inference"
  44. )
  45. self.dtype = backend_dtype if backend_dtype else self.module.input_layernorm.weight.dtype
  46. def inference_step(self, cache_metadata: torch.IntTensor, *inputs: torch.Tensor) -> Tuple[torch.Tensor, ...]:
  47. with torch.inference_mode():
  48. attention_cache_handle = int(cache_metadata[0, 0].item())
  49. prefix_length = int(cache_metadata[0, 1].item())
  50. hidden_states = inputs[0] # todo: in future, it would be best to support attention mask here
  51. assert (
  52. hidden_states.ndim == 3
  53. ), "expected hidden states to be 3-dimensional: [batch_size, seq_len, hid_size]"
  54. with self.memory_cache.use_cache(attention_cache_handle) as cache:
  55. assert isinstance(self.module, BloomBlock) and cache.shape[0] == 2 and cache.ndim == 5
  56. layer_past = past_k, past_v = cache[0, :, :prefix_length], cache[1, :, :prefix_length]
  57. print("METADATA:", cache_metadata, past_k.shape, past_v.shape)
  58. hidden_states, (new_k, new_v) = self.module.forward(
  59. hidden_states, layer_past=layer_past, use_cache=True
  60. )
  61. # todo remove these asserts once we pass all tests
  62. new_length = new_v.shape[1]
  63. assert new_length > prefix_length
  64. assert new_k.shape[0] == past_k.shape[0] and new_v.shape[0] == past_v.shape[0]
  65. assert new_k.shape[1] == new_length and new_v.shape[1] == new_length
  66. assert new_k.shape[2:] == past_k.shape[2:] and new_v.shape[2:] == past_v.shape[2:]
  67. cache[0, :, prefix_length:new_length, :] = new_k[:, prefix_length:new_length]
  68. cache[1, :, prefix_length:new_length, :] = new_v[:, prefix_length:new_length]
  69. return (hidden_states,)
  70. def get_pools(self) -> Sequence[TaskPool]:
  71. return self.forward_pool, self.backward_pool, self.inference_pool