test_chained_calls.py 4.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. ######
  2. # Warning:torch this test is a work in progress. It will be modified soon.
  3. # - if you want more stable tests, see test_block_exact_match
  4. # - if you want to figure out chained inference, ask yozh
  5. import os
  6. import hivemind
  7. import torch
  8. import transformers
  9. from hivemind.moe.expert_uid import UID_DELIMITER, ExpertInfo
  10. from src.bloom.from_pretrained import load_pretrained_block
  11. from src.client.remote_block import RemoteTransformerBlock
  12. from src.dht_utils import get_remote_module
  13. INITIAL_PEERS = os.environ.get("INITIAL_PEERS")
  14. if not INITIAL_PEERS:
  15. raise RuntimeError("Must specify INITIAL_PEERS environment variable with one or more peer ids")
  16. INITIAL_PEERS = INITIAL_PEERS.split()
  17. MODEL_NAME = os.environ.get("MODEL_NAME")
  18. if not MODEL_NAME:
  19. raise RuntimeError("Must specify MODEL_NAME as a name of a model to be tested")
  20. def test_forward_backward_exact_match(atol_forward=1e-4, atol_backward=1e-4, seq_length=1):
  21. dht = hivemind.DHT(initial_peers=INITIAL_PEERS, client_mode=True, start=True)
  22. config = transformers.AutoConfig.from_pretrained(MODEL_NAME)
  23. remote_block = get_remote_module(dht, f"{MODEL_NAME}{UID_DELIMITER}0")
  24. assert remote_block is not None, f"Could not find {MODEL_NAME}{UID_DELIMITER}0 in DHT"
  25. assert isinstance(remote_block, RemoteTransformerBlock)
  26. _ = remote_block.info # lazy-init info now, because otherwise we will _break_ info init by chaning _info
  27. remote_block._info = ExpertInfo(f"{MODEL_NAME}.3 {MODEL_NAME}.4 {MODEL_NAME}.5", remote_block._info.peer_id)
  28. ref_blocks = [
  29. load_pretrained_block(MODEL_NAME, 3, torch_dtype=torch.float32),
  30. load_pretrained_block(MODEL_NAME, 4, torch_dtype=torch.float32),
  31. load_pretrained_block(MODEL_NAME, 5, torch_dtype=torch.float32),
  32. ]
  33. inputs = torch.randn(1, seq_length, config.hidden_size, requires_grad=True)
  34. outputs_rpc = remote_block.forward(inputs)[0]
  35. outputs_rpc.sum().backward()
  36. grads_rpc = inputs.grad
  37. inputs.grad = None
  38. hidden_states = inputs
  39. for ref_block in ref_blocks:
  40. hidden_states = ref_block.forward(hidden_states)[0]
  41. outputs_ref = hidden_states
  42. outputs_ref.sum().backward()
  43. grads_ref = inputs.grad
  44. assert torch.allclose(outputs_ref, outputs_rpc, rtol=0, atol=atol_forward)
  45. assert torch.allclose(grads_ref, grads_rpc, rtol=0, atol=atol_backward)
  46. def test_chained_inference_exact_match(atol_inference=1e-4):
  47. dht = hivemind.DHT(initial_peers=INITIAL_PEERS, client_mode=True, start=True)
  48. config = transformers.AutoConfig.from_pretrained(MODEL_NAME)
  49. remote_block = get_remote_module(dht, f"{MODEL_NAME}{UID_DELIMITER}0")
  50. assert remote_block is not None, f"Could not find {MODEL_NAME}{UID_DELIMITER}0 in DHT"
  51. assert isinstance(remote_block, RemoteTransformerBlock)
  52. _ = remote_block.info # lazy-init info now, because otherwise we will _break_ info init by chaning _info
  53. remote_block._info = ExpertInfo(f"{MODEL_NAME}.3 {MODEL_NAME}.4", remote_block._info.peer_id)
  54. inputs = torch.randn(1, 8, config.hidden_size)
  55. outputs_inference = []
  56. with remote_block.inference_session() as sess:
  57. for i in range(inputs.shape[1]):
  58. outputs_inference.append(sess.step(inputs[:, i : i + 1, :]))
  59. outputs_inference = torch.cat(outputs_inference, dim=1)
  60. ref_blocks = [
  61. load_pretrained_block(MODEL_NAME, 3, torch_dtype=torch.float32),
  62. load_pretrained_block(MODEL_NAME, 4, torch_dtype=torch.float32),
  63. ]
  64. outputs_ref = []
  65. caches = [None, None]
  66. for i in range(inputs.shape[1]):
  67. new_caches = []
  68. hidden_states = inputs[:, i : i + 1, :]
  69. for ref_block, cache in zip(ref_blocks, caches):
  70. with torch.no_grad():
  71. hidden_states, new_cache = ref_block.forward(hidden_states, use_cache=True, layer_past=cache)
  72. new_caches.append(new_cache)
  73. outputs_ref.append(hidden_states)
  74. caches = new_caches
  75. outputs_ref = torch.cat(outputs_ref, dim=1)
  76. assert torch.allclose(outputs_ref, outputs_inference, rtol=0, atol=atol_inference)