test_sequence_manager.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import threading
  2. import time
  3. import pytest
  4. import torch
  5. from hivemind import DHT, get_logger
  6. from petals.client import RemoteSequenceManager, RemoteSequential
  7. from petals.client.remote_model import DistributedBloomConfig
  8. from petals.data_structures import UID_DELIMITER
  9. from test_utils import *
  10. logger = get_logger(__name__)
  11. @pytest.mark.forked
  12. @pytest.mark.parametrize("mode", ["fastest", "random"])
  13. def test_sequence_manager_basics(mode: str):
  14. config = DistributedBloomConfig.from_pretrained(MODEL_NAME, initial_peers=INITIAL_PEERS)
  15. dht = DHT(initial_peers=config.initial_peers, client_mode=True, start=True)
  16. sequential = RemoteSequential(config, dht)
  17. shutdown_evt = threading.Event()
  18. # test RemoteSequential with lossy compression
  19. block_uids = [f"{config.dht_prefix}{UID_DELIMITER}{i}" for i in range(config.n_layer)]
  20. sequential = RemoteSequential(
  21. config,
  22. dht,
  23. sequence_manager=TestSequenceManager(dht, block_uids, sequential.p2p, _was_shut_down=shutdown_evt, start=True),
  24. )
  25. sequence = sequential.sequence_manager.make_sequence(mode=mode)
  26. assert all(sequence[i].peer_id != sequence[i + 1].peer_id for i in range(len(sequence) - 1))
  27. assert sequential.sequence_manager.is_alive()
  28. assert sequential.sequence_manager._thread.ready.is_set()
  29. assert not shutdown_evt.is_set()
  30. sequential(torch.randn(1, 2, config.hidden_size))
  31. sequential.sequence_manager.shutdown()
  32. del sequential
  33. time.sleep(1)
  34. assert shutdown_evt.is_set()
  35. class TestSequenceManager(RemoteSequenceManager):
  36. """A sequence manager that signals if it was shut down"""
  37. def __init__(self, *args, _was_shut_down: threading.Event, **kwargs):
  38. super().__init__(*args, **kwargs)
  39. self._was_shut_down = _was_shut_down
  40. def shutdown(self):
  41. super().shutdown()
  42. assert not self.is_alive()
  43. self._was_shut_down.set()