test_dht.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import asyncio
  2. import random
  3. import time
  4. import pytest
  5. from multiaddr import Multiaddr
  6. import hivemind
  7. @pytest.mark.forked
  8. def test_get_store(n_peers=10):
  9. peers = [hivemind.DHT(start=True)]
  10. initial_peers = peers[0].get_visible_maddrs()
  11. peers += [hivemind.DHT(initial_peers=initial_peers, start=True) for _ in range(n_peers - 1)]
  12. node1, node2 = random.sample(peers, 2)
  13. assert node1.store('key1', 'value1', expiration_time=hivemind.get_dht_time() + 30)
  14. assert node1.get('key1').value == 'value1'
  15. assert node2.get('key1').value == 'value1'
  16. assert node2.get('key2') is None
  17. future = node1.get('foo', return_future=True)
  18. assert future.result() is None
  19. future = node1.get('foo', return_future=True)
  20. future.cancel()
  21. assert node2.store('key1', 123, expiration_time=hivemind.get_dht_time() + 31)
  22. assert node2.store('key2', 456, expiration_time=hivemind.get_dht_time() + 32)
  23. assert node1.get('key1', latest=True).value == 123
  24. assert node1.get('key2').value == 456
  25. assert node1.store('key2', subkey='subkey1', value=789, expiration_time=hivemind.get_dht_time() + 32)
  26. assert node2.store('key2', subkey='subkey2', value='pew', expiration_time=hivemind.get_dht_time() + 32)
  27. found_dict = node1.get('key2', latest=True).value
  28. assert isinstance(found_dict, dict) and len(found_dict) == 2
  29. assert found_dict['subkey1'].value == 789 and found_dict['subkey2'].value == 'pew'
  30. for peer in peers:
  31. peer.shutdown()
  32. async def dummy_dht_coro(self, node):
  33. return 'pew'
  34. async def dummy_dht_coro_error(self, node):
  35. raise ValueError("Oops, i did it again...")
  36. async def dummy_dht_coro_stateful(self, node):
  37. self._x_dummy = getattr(self, '_x_dummy', 123) + 1
  38. return self._x_dummy
  39. async def dummy_dht_coro_long(self, node):
  40. await asyncio.sleep(0.25)
  41. return self._x_dummy ** 2
  42. async def dummy_dht_coro_for_cancel(self, node):
  43. self._x_dummy = -100
  44. await asyncio.sleep(0.5)
  45. self._x_dummy = 999
  46. @pytest.mark.forked
  47. def test_run_coroutine():
  48. dht = hivemind.DHT(start=True)
  49. assert dht.run_coroutine(dummy_dht_coro) == 'pew'
  50. with pytest.raises(ValueError):
  51. res = dht.run_coroutine(dummy_dht_coro_error)
  52. bg_task = dht.run_coroutine(dummy_dht_coro_long, return_future=True)
  53. assert dht.run_coroutine(dummy_dht_coro_stateful) == 124
  54. assert dht.run_coroutine(dummy_dht_coro_stateful) == 125
  55. assert dht.run_coroutine(dummy_dht_coro_stateful) == 126
  56. assert not hasattr(dht, '_x_dummy')
  57. assert bg_task.result() == 126 ** 2
  58. future = dht.run_coroutine(dummy_dht_coro_for_cancel, return_future=True)
  59. time.sleep(0.25)
  60. future.cancel()
  61. assert dht.run_coroutine(dummy_dht_coro_stateful) == -99
  62. dht.shutdown()
  63. @pytest.mark.forked
  64. @pytest.mark.asyncio
  65. async def test_dht_get_visible_maddrs():
  66. # test 1: IPv4 localhost multiaddr is visible by default
  67. dht = hivemind.DHT(start=True)
  68. assert any(str(maddr).startswith('/ip4/127.0.0.1') for maddr in dht.get_visible_maddrs())
  69. dht.shutdown()
  70. # test 2: announce_maddrs are the single visible multiaddrs if defined
  71. dummy_endpoint = Multiaddr('/ip4/123.45.67.89/tcp/31337')
  72. p2p = await hivemind.p2p.P2P.create(announce_maddrs=[dummy_endpoint])
  73. dht = hivemind.DHT(p2p, start=True)
  74. assert dht.get_visible_maddrs() == [dummy_endpoint.encapsulate(f'/p2p/{p2p.id}')]
  75. dht.shutdown()