test_dht.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import asyncio
  2. import random
  3. import time
  4. import pytest
  5. from multiaddr import Multiaddr
  6. import hivemind
  7. from test_utils.dht_swarms import launch_dht_instances
  8. @pytest.mark.forked
  9. def test_get_store(n_peers=10):
  10. peers = launch_dht_instances(n_peers)
  11. node1, node2 = random.sample(peers, 2)
  12. assert node1.store("key1", "value1", expiration_time=hivemind.get_dht_time() + 30)
  13. assert node1.get("key1").value == "value1"
  14. assert node2.get("key1").value == "value1"
  15. assert node2.get("key2") is None
  16. future = node1.get("foo", return_future=True)
  17. assert future.result() is None
  18. future = node1.get("foo", return_future=True)
  19. future.cancel()
  20. assert node2.store("key1", 123, expiration_time=hivemind.get_dht_time() + 31)
  21. assert node2.store("key2", 456, expiration_time=hivemind.get_dht_time() + 32)
  22. assert node1.get("key1", latest=True).value == 123
  23. assert node1.get("key2").value == 456
  24. assert node1.store("key2", subkey="subkey1", value=789, expiration_time=hivemind.get_dht_time() + 32)
  25. assert node2.store("key2", subkey="subkey2", value="pew", expiration_time=hivemind.get_dht_time() + 32)
  26. found_dict = node1.get("key2", latest=True).value
  27. assert isinstance(found_dict, dict) and len(found_dict) == 2
  28. assert found_dict["subkey1"].value == 789 and found_dict["subkey2"].value == "pew"
  29. for peer in peers:
  30. peer.shutdown()
  31. async def dummy_dht_coro(self, node):
  32. return "pew"
  33. async def dummy_dht_coro_error(self, node):
  34. raise ValueError("Oops, i did it again...")
  35. async def dummy_dht_coro_stateful(self, node):
  36. self._x_dummy = getattr(self, "_x_dummy", 123) + 1
  37. return self._x_dummy
  38. async def dummy_dht_coro_long(self, node):
  39. await asyncio.sleep(0.25)
  40. return self._x_dummy ** 2
  41. async def dummy_dht_coro_for_cancel(self, node):
  42. self._x_dummy = -100
  43. await asyncio.sleep(0.5)
  44. self._x_dummy = 999
  45. @pytest.mark.forked
  46. def test_run_coroutine():
  47. dht = hivemind.DHT(start=True)
  48. assert dht.run_coroutine(dummy_dht_coro) == "pew"
  49. with pytest.raises(ValueError):
  50. res = dht.run_coroutine(dummy_dht_coro_error)
  51. bg_task = dht.run_coroutine(dummy_dht_coro_long, return_future=True)
  52. assert dht.run_coroutine(dummy_dht_coro_stateful) == 124
  53. assert dht.run_coroutine(dummy_dht_coro_stateful) == 125
  54. assert dht.run_coroutine(dummy_dht_coro_stateful) == 126
  55. assert not hasattr(dht, "_x_dummy")
  56. assert bg_task.result() == 126 ** 2
  57. future = dht.run_coroutine(dummy_dht_coro_for_cancel, return_future=True)
  58. time.sleep(0.25)
  59. future.cancel()
  60. assert dht.run_coroutine(dummy_dht_coro_stateful) == -99
  61. dht.shutdown()
  62. @pytest.mark.forked
  63. @pytest.mark.asyncio
  64. async def test_dht_get_visible_maddrs():
  65. # test 1: IPv4 localhost multiaddr is visible by default
  66. dht = hivemind.DHT(start=True)
  67. assert any(str(maddr).startswith("/ip4/127.0.0.1") for maddr in dht.get_visible_maddrs())
  68. dht.shutdown()
  69. # test 2: announce_maddrs are the single visible multiaddrs if defined
  70. dummy_endpoint = Multiaddr("/ip4/123.45.67.89/tcp/31337")
  71. p2p = await hivemind.p2p.P2P.create(announce_maddrs=[dummy_endpoint])
  72. dht = hivemind.DHT(start=True, p2p=await p2p.replicate(p2p.daemon_listen_maddr))
  73. assert dht.get_visible_maddrs() == [dummy_endpoint.encapsulate(f"/p2p/{p2p.peer_id}")]
  74. dht.shutdown()