123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- import random
- from typing import Union
- import hivemind
- import pytest
- import torch
- from transformers.models.bloom.configuration_bloom import BloomConfig
- from petals.bloom.block import WrappedBloomBlock
- from petals.bloom.from_pretrained import DTYPE_MAP, _load_state_dict, load_pretrained_block
- from petals.client import DistributedBloomConfig
- from petals.client.remote_sequential import RemoteTransformerBlock
- from petals.data_structures import UID_DELIMITER
- from petals.dht_utils import get_remote_module
- from test_utils import *
- @pytest.mark.forked
- def test_remote_block_exact_match(atol_forward=1e-4, atol_inference=1e-3):
- dht = hivemind.DHT(initial_peers=INITIAL_PEERS, client_mode=True, start=True)
- config = DistributedBloomConfig.from_pretrained(MODEL_NAME)
- for block_index in random.sample(range(config.n_layer), 3):
- remote_block = get_remote_module(dht, f"{MODEL_NAME}{UID_DELIMITER}{block_index}", config)
- assert isinstance(remote_block, RemoteTransformerBlock)
- inputs = torch.randn(1, 8, config.hidden_size)
- outputs_forward = remote_block(inputs)
- outputs_inference = []
- with remote_block.inference_session(max_length=inputs.shape[1]) as sess:
- for i in range(inputs.shape[1]):
- outputs_inference.append(sess.step(inputs[:, i : i + 1, :]))
- # test that max length is respected
- with pytest.raises(ValueError, match=r"Maximum length exceeded") as exc_info:
- sess.step(inputs[:, -1:, :])
- assert "Maximum length exceeded" in repr(exc_info.value)
- outputs_inference = torch.cat(outputs_inference, dim=1)
- ref_block = load_pretrained_block(MODEL_NAME, block_index, torch_dtype=torch.float32)
- (outputs_local,) = ref_block(inputs)
- assert torch.allclose(outputs_local, outputs_forward, rtol=0, atol=atol_forward)
- assert torch.allclose(outputs_local, outputs_inference, rtol=0, atol=atol_inference)
- def _old_load_pretrained_block(
- converted_model_name_or_path: str,
- block_index: int,
- torch_dtype: Union[torch.dtype, str] = "auto",
- ) -> WrappedBloomBlock:
- """Load the BLOOM block by directly initializing the weights.
- This test is used to check consistency with the previous implementation and can be removed in the future."""
- config = BloomConfig.from_pretrained(converted_model_name_or_path)
- block = WrappedBloomBlock(config)
- state_dict = _load_state_dict(
- converted_model_name_or_path,
- block_index,
- config,
- cache_dir=None,
- )
- if torch_dtype == "auto":
- with torch.no_grad():
- for name, param in block.named_parameters():
- assert name in state_dict, f"{name} not in state dict"
- param.data = param.data.to(state_dict[name].dtype)
- else:
- assert torch_dtype in DTYPE_MAP.values(), f"torch_dtype must be one of {list(DTYPE_MAP.values())}"
- block = block.to(dtype=torch_dtype)
- block.load_state_dict(state_dict, strict=True)
- return block
- @pytest.mark.forked
- def test_init_pretrained_block(torch_dtype=torch.float32, atol_forward=1e-8):
- config = DistributedBloomConfig.from_pretrained(MODEL_NAME)
- torch.random.manual_seed(0)
- inputs = torch.randn(1, 16, config.hidden_size, dtype=torch_dtype)
- block = load_pretrained_block(MODEL_NAME, 3, torch_dtype=torch_dtype)
- ref_block = _old_load_pretrained_block(MODEL_NAME, 3, torch_dtype=torch_dtype)
- outputs = block.forward(inputs)[0]
- outputs_ref = ref_block.forward(inputs)[0]
- assert torch.allclose(outputs, outputs_ref, rtol=0, atol=atol_forward)
|