test_cache.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import random
  2. from typing import Optional
  3. import pytest
  4. import torch
  5. from hivemind import TensorDescriptor
  6. from petals.server.memory_cache import MemoryCache, AllocationFailed
  7. import asyncio
  8. from petals.utils.misc import get_size_in_bytes
  9. import multiprocessing as mp
  10. import pytest_asyncio # make sure the module exists; otherwise the test will be skipped
  11. def _make_tensor_descriptor(num_bytes: int, dtype: Optional[torch.dtype] = None):
  12. if dtype is None:
  13. dtype = random.choice((torch.int64, torch.int8, torch.uint8, torch.float32, torch.bfloat16, torch.bool))
  14. elem_size_bytes = get_size_in_bytes(dtype)
  15. descr = TensorDescriptor.from_tensor(torch.empty((num_bytes // elem_size_bytes,), dtype=dtype))
  16. return descr
  17. @pytest.mark.asyncio
  18. async def test_cache_usage():
  19. cache = MemoryCache(max_size_bytes=2048)
  20. alloc_event, dealloc_e_event, dealloc_bcd_event, dealloc_a_event = mp.Event(), mp.Event(), mp.Event(), mp.Event()
  21. pipe_receiver, pipe_sender = mp.Pipe(duplex=False)
  22. with pytest.raises(AssertionError):
  23. async with cache.allocate_cache(_make_tensor_descriptor(123)):
  24. pass # fails because cache must be allocated from another process
  25. descr_a = TensorDescriptor.from_tensor(torch.empty(768, dtype=torch.uint8)) # 768 bytes
  26. descr_b = TensorDescriptor.from_tensor(torch.empty((), dtype=torch.float64)) # 8 bytes
  27. descr_c = TensorDescriptor.from_tensor(torch.empty((33, ), dtype=torch.bool)) # 33 bytes
  28. descr_d = TensorDescriptor.from_tensor(torch.empty((0, ), dtype=torch.int64)) # 0 bytes
  29. descr_e = TensorDescriptor.from_tensor(torch.empty((96, 8), dtype=torch.bfloat16)) # 1536 bytes
  30. descr_f = TensorDescriptor.from_tensor(torch.empty((1792,), dtype=torch.uint8)) # 1792 bytes
  31. descr_g = TensorDescriptor.from_tensor(torch.empty((1793,), dtype=torch.uint8)) # 1792 bytes
  32. async def _allocate_and_wait(dealloc_event, *descrs, timeout=None):
  33. loop = asyncio.get_event_loop()
  34. async with cache.allocate_cache(*descrs, timeout=timeout) as handles:
  35. pipe_sender.send(handles)
  36. await loop.run_in_executor(None, dealloc_event.wait)
  37. async def _allocate_af():
  38. alloc_event.wait()
  39. print("BEGAN AF")
  40. try:
  41. async with cache.allocate_cache(descr_g):
  42. allocate_f_task = asyncio.create_task(_allocate_and_wait(mp.Event(), descr_f)) # klogs the cache
  43. print("CANCELLED")
  44. raise asyncio.CancelledError()
  45. except asyncio.CancelledError:
  46. pass
  47. allocate_f_task.cancel() # unklog the cache
  48. allocate_a_task = asyncio.create_task(_allocate_and_wait(dealloc_a_event, descr_a))
  49. await allocate_a_task
  50. alloc_process1 = mp.Process(target=lambda: asyncio.run(_allocate_af()), daemon=True)
  51. alloc_process1.start()
  52. async def _allocate_bcde():
  53. await asyncio.sleep(0.2) # ensure that the other tensor is always allocated (and sent through pipe) first
  54. print("BEGAN BCDE")
  55. allocate_bcd_task = asyncio.create_task(_allocate_and_wait(dealloc_bcd_event, descr_b, descr_c, descr_d))
  56. allocate_e_task = asyncio.create_task(_allocate_and_wait(dealloc_e_event, descr_e)) # doesn't fit
  57. await asyncio.wait({allocate_e_task, allocate_bcd_task}, return_when=asyncio.ALL_COMPLETED)
  58. alloc_process2 = mp.Process(target=lambda: asyncio.run(_allocate_bcde()), daemon=True)
  59. alloc_process2.start()
  60. assert cache.current_size_bytes == 0
  61. alloc_event.set()
  62. handle_a, = pipe_receiver.recv()
  63. handle_b, handle_c, handle_d = pipe_receiver.recv()
  64. with cache.use_cache(handle_a) as (tensor_a,):
  65. assert tensor_a.dtype == torch.uint8
  66. tensor_a[2:5] = torch.tensor((42, 43, 44))
  67. with cache.use_cache(handle_a, handle_b, handle_d) as (tensor_a, tensor_b, tensor_d):
  68. assert tensor_b.dtype == torch.float64 and tensor_b.numel() == 1 and tensor_b.ndim == 0
  69. assert tensor_d.dtype == torch.int64 and tensor_d.numel() == 0
  70. tensor_a += 1
  71. tensor_b[...] = -1.337
  72. assert cache.current_size_bytes == 809 # this checks a,b,c,d are allocated but b still awaits memory
  73. dealloc_bcd_event.set()
  74. await asyncio.sleep(0.1)
  75. assert cache.current_size_bytes == 768 # only tensor a is allocated
  76. with pytest.raises(KeyError):
  77. with cache.use_cache(handle_a, handle_b):
  78. pass # one of handles (c) is deallocated
  79. with pytest.raises(KeyError):
  80. with cache.use_cache(handle_d):
  81. pass # handle_e is deallocated, even though it is never used
  82. with cache.use_cache(handle_a) as (tensor_a,):
  83. assert tuple(tensor_a[2:5]) == (43, 44, 45)
  84. dealloc_a_event.set()
  85. handle_e, = pipe_receiver.recv() # e can finally be allocated
  86. assert cache.current_size_bytes == 1536 # tensor e should finally be able to allocate
  87. with pytest.raises(KeyError):
  88. with cache.use_cache(handle_a):
  89. pass # tensor a is no longer allocated
  90. with cache.use_cache(handle_e) as (tensor_e,):
  91. assert tensor_e.dtype == torch.bfloat16 and tensor_e.shape == (96, 8)
  92. dealloc_e_event.set()
  93. alloc_process1.join(1)
  94. alloc_process2.join(1)
  95. assert cache.current_size_bytes == 0
  96. assert alloc_process1.exitcode == 0, "allocation process 1 failed or did not finish, see stderr for details"
  97. assert alloc_process2.exitcode == 0, "allocation process 2 failed or did not finish, see stderr for details"
  98. # cache.runtime_pid += 1 # pretend we're another process
  99. # async with cache.allocate_cache(_make_tensor_descriptor(768)) as a:
  100. # pass
  101. #
  102. #
  103. # async with cache.allocate_cache(_make_tensor_descriptor(768)):
  104. # async with cache.allocate_cache(_make_tensor_descriptor(1024)):
  105. # async with cache.allocate_cache(_make_tensor_descriptor(512), _make_tensor_descriptor(64)):
  106. # async with cache.allocate_cache(_make_tensor_descriptor(1536)):
  107. # with pytest.raises(TimeoutError):
  108. # async with cache.allocate_cache(_make_tensor_descriptor(256), ):
  109. # pass
  110. # async with cache.allocate_cache(_make_tensor_descriptor(192)):
  111. # pass