test_dht.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import asyncio
  2. import concurrent.futures
  3. import random
  4. import time
  5. import pytest
  6. from multiaddr import Multiaddr
  7. import hivemind
  8. from hivemind.utils.networking import get_free_port
  9. from test_utils.dht_swarms import launch_dht_instances
  10. @pytest.mark.asyncio
  11. async def test_startup_error():
  12. with pytest.raises(hivemind.p2p.P2PDaemonError, match=r"(?i)Failed to connect to bootstrap peers"):
  13. hivemind.DHT(
  14. initial_peers=[f"/ip4/127.0.0.1/tcp/{get_free_port()}/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1"],
  15. start=True,
  16. )
  17. dht = hivemind.DHT(start=True, await_ready=False)
  18. with pytest.raises(concurrent.futures.TimeoutError):
  19. dht.wait_until_ready(timeout=0.01)
  20. dht.shutdown()
  21. @pytest.mark.forked
  22. def test_get_store(n_peers=10):
  23. peers = launch_dht_instances(n_peers)
  24. node1, node2 = random.sample(peers, 2)
  25. assert node1.store("key1", "value1", expiration_time=hivemind.get_dht_time() + 30)
  26. assert node1.get("key1").value == "value1"
  27. assert node2.get("key1").value == "value1"
  28. assert node2.get("key2") is None
  29. future = node1.get("foo", return_future=True)
  30. assert future.result() is None
  31. future = node1.get("foo", return_future=True)
  32. future.cancel()
  33. assert node2.store("key1", 123, expiration_time=hivemind.get_dht_time() + 31)
  34. assert node2.store("key2", 456, expiration_time=hivemind.get_dht_time() + 32)
  35. assert node1.get("key1", latest=True).value == 123
  36. assert node1.get("key2").value == 456
  37. assert node1.store("key2", subkey="subkey1", value=789, expiration_time=hivemind.get_dht_time() + 32)
  38. assert node2.store("key2", subkey="subkey2", value="pew", expiration_time=hivemind.get_dht_time() + 32)
  39. found_dict = node1.get("key2", latest=True).value
  40. assert isinstance(found_dict, dict) and len(found_dict) == 2
  41. assert found_dict["subkey1"].value == 789 and found_dict["subkey2"].value == "pew"
  42. for peer in peers:
  43. peer.shutdown()
  44. async def dummy_dht_coro(self, node):
  45. return "pew"
  46. async def dummy_dht_coro_error(self, node):
  47. raise ValueError("Oops, i did it again...")
  48. async def dummy_dht_coro_stateful(self, node):
  49. self._x_dummy = getattr(self, "_x_dummy", 123) + 1
  50. return self._x_dummy
  51. async def dummy_dht_coro_long(self, node):
  52. await asyncio.sleep(0.25)
  53. return self._x_dummy ** 2
  54. async def dummy_dht_coro_for_cancel(self, node):
  55. self._x_dummy = -100
  56. await asyncio.sleep(0.5)
  57. self._x_dummy = 999
  58. @pytest.mark.forked
  59. def test_run_coroutine():
  60. dht = hivemind.DHT(start=True)
  61. assert dht.run_coroutine(dummy_dht_coro) == "pew"
  62. with pytest.raises(ValueError):
  63. res = dht.run_coroutine(dummy_dht_coro_error)
  64. bg_task = dht.run_coroutine(dummy_dht_coro_long, return_future=True)
  65. assert dht.run_coroutine(dummy_dht_coro_stateful) == 124
  66. assert dht.run_coroutine(dummy_dht_coro_stateful) == 125
  67. assert dht.run_coroutine(dummy_dht_coro_stateful) == 126
  68. assert not hasattr(dht, "_x_dummy")
  69. assert bg_task.result() == 126 ** 2
  70. future = dht.run_coroutine(dummy_dht_coro_for_cancel, return_future=True)
  71. time.sleep(0.25)
  72. future.cancel()
  73. assert dht.run_coroutine(dummy_dht_coro_stateful) == -99
  74. dht.shutdown()
  75. @pytest.mark.forked
  76. @pytest.mark.asyncio
  77. async def test_dht_get_visible_maddrs():
  78. # test 1: IPv4 localhost multiaddr is visible by default
  79. dht = hivemind.DHT(start=True)
  80. assert any(str(maddr).startswith("/ip4/127.0.0.1") for maddr in dht.get_visible_maddrs())
  81. dht.shutdown()
  82. # test 2: announce_maddrs are the single visible multiaddrs if defined
  83. dummy_endpoint = Multiaddr("/ip4/123.45.67.89/tcp/31337")
  84. p2p = await hivemind.p2p.P2P.create(announce_maddrs=[dummy_endpoint])
  85. dht = hivemind.DHT(start=True, p2p=p2p)
  86. assert dht.get_visible_maddrs() == [dummy_endpoint.encapsulate(f"/p2p/{p2p.peer_id}")]
  87. dht.shutdown()