Bläddra i källkod

Reduce complexity of several DHT tests (#334)

- test averaging previously ran with 3 different part sizes in all 8 scenarios, now it uses default part size and only changes it in one (most complex) scenario
- test_dht_node now has reduced number of peers, but also reduced bucket size
- test_dht_node runs more find attempts for stability

Co-authored-by: Alexander Borzunov <borzunov.alexander@gmail.com>
justheuristic 4 år sedan
förälder
incheckning
39afa97e29
3 ändrade filer med 35 tillägg och 23 borttagningar
  1. 11 13
      tests/test_allreduce.py
  2. 19 5
      tests/test_dht_node.py
  3. 5 5
      tests/test_utils/dht_swarms.py

+ 11 - 13
tests/test_allreduce.py

@@ -169,22 +169,20 @@ NODE, CLIENT, AUX = AveragingMode.NODE, AveragingMode.CLIENT, AveragingMode.AUX
 
 
 @pytest.mark.parametrize(
-    "peer_modes, averaging_weights, peer_fractions",
+    "peer_modes, averaging_weights, peer_fractions, part_size_bytes",
     [
-        ((NODE, NODE, NODE, NODE), (1, 1, 1, 1), (1, 1, 1, 1)),
-        ((NODE, NODE, NODE, NODE), (0.1, 0.2, 0.3, 0.4), (1, 1, 1, 1)),
-        ((NODE, NODE, NODE, NODE), (1, 1, 1, 1), (1, 2, 3, 0)),
-        ((NODE, NODE, NODE, CLIENT), (1, 1, 1, 1), (1, 2, 3, 0)),
-        ((NODE, NODE, NODE, AUX), (1, 1, 1, 0), (1, 2, 3, 4)),
-        ((NODE, NODE, NODE, NODE), (0.15, 0.0, 0.35, 0.45), (1, 1, 1, 1)),
-        ((NODE, AUX, NODE, CLIENT), (0.15, 0.0, 0.35, 0.45), (150, 200, 67, 0)),
-        ((AUX, AUX, AUX, AUX), (0.0, 0.0, 0.0, 0.0), (1, 2, 3, 4)),
+        ((NODE, NODE, NODE, NODE), (1, 1, 1, 1), (1, 1, 1, 1), 2 ** 20),
+        ((NODE, NODE, NODE, NODE), (0.1, 0.2, 0.3, 0.4), (1, 1, 1, 1), 2 ** 20),
+        ((NODE, NODE, NODE, NODE), (1, 1, 1, 1), (1, 2, 3, 0), 2 ** 20),
+        ((NODE, NODE, NODE, CLIENT), (1, 1, 1, 1), (1, 2, 3, 0), 2 ** 20),
+        ((NODE, NODE, NODE, AUX), (1, 1, 1, 0), (1, 2, 3, 4), 2 ** 20),
+        ((NODE, NODE, NODE, NODE), (0.15, 0.0, 0.35, 0.45), (1, 1, 1, 1), 2 ** 20),
+        ((NODE, AUX, NODE, CLIENT), (0.15, 0.0, 0.35, 0.45), (150, 200, 67, 0), 2 ** 20),
+        ((NODE, AUX, NODE, CLIENT), (0.15, 0.0, 0.35, 0.45), (150, 200, 67, 0), 256),
+        ((NODE, AUX, NODE, CLIENT), (0.15, 0.0, 0.35, 0.45), (150, 200, 67, 0), 19),
+        ((AUX, AUX, AUX, AUX), (0.0, 0.0, 0.0, 0.0), (1, 2, 3, 4), 2 ** 20),
     ],
 )
-@pytest.mark.parametrize(
-    "part_size_bytes",
-    [2 ** 20, 256, 19],
-)
 @pytest.mark.forked
 @pytest.mark.asyncio
 async def test_allreduce_protocol(peer_modes, averaging_weights, peer_fractions, part_size_bytes):

+ 19 - 5
tests/test_dht_node.py

@@ -194,16 +194,27 @@ def test_empty_table():
 
 
 @pytest.mark.forked
-def test_dht_node():
+def test_dht_node(
+    n_peers: int = 20, n_sequential_peers: int = 5, parallel_rpc: int = 10, bucket_size: int = 5, num_replicas: int = 3
+):
     # step A: create a swarm of 50 dht nodes in separate processes
     #         (first 5 created sequentially, others created in parallel)
-    processes, dht, swarm_maddrs = launch_swarm_in_separate_processes(n_peers=50, n_sequential_peers=5)
+
+    processes, dht, swarm_maddrs = launch_swarm_in_separate_processes(
+        n_peers=n_peers, n_sequential_peers=n_sequential_peers, bucket_size=bucket_size, num_replicas=num_replicas
+    )
 
     # step B: run 51-st node in this process
     loop = asyncio.get_event_loop()
     initial_peers = random.choice(swarm_maddrs)
     me = loop.run_until_complete(
-        DHTNode.create(initial_peers=initial_peers, parallel_rpc=10, cache_refresh_before_expiry=False)
+        DHTNode.create(
+            initial_peers=initial_peers,
+            parallel_rpc=parallel_rpc,
+            bucket_size=bucket_size,
+            num_replicas=num_replicas,
+            cache_refresh_before_expiry=False,
+        )
     )
 
     # test 1: find self
@@ -223,7 +234,7 @@ def test_dht_node():
     jaccard_numerator = jaccard_denominator = 0  # jaccard similarity aka intersection over union
     all_node_ids = list(dht.values())
 
-    for _ in range(10):
+    for _ in range(20):
         query_id = DHTID.generate()
         k_nearest = random.randint(1, 10)
         exclude_self = random.random() > 0.5
@@ -275,7 +286,10 @@ def test_dht_node():
     initial_peers = random.choice(swarm_maddrs)
     that_guy = loop.run_until_complete(
         DHTNode.create(
-            initial_peers=initial_peers, parallel_rpc=10, cache_refresh_before_expiry=False, cache_locally=False
+            initial_peers=initial_peers,
+            parallel_rpc=parallel_rpc,
+            cache_refresh_before_expiry=False,
+            cache_locally=False,
         )
     )
 

+ 5 - 5
tests/test_utils/dht_swarms.py

@@ -11,13 +11,13 @@ from hivemind.dht.node import DHTID, DHTNode
 from hivemind.p2p import PeerID
 
 
-def run_node(initial_peers: List[Multiaddr], info_queue: mp.Queue):
+def run_node(initial_peers: List[Multiaddr], info_queue: mp.Queue, **kwargs):
     if asyncio.get_event_loop().is_running():
         asyncio.get_event_loop().stop()  # if we're in jupyter, get rid of its built-in event loop
         asyncio.set_event_loop(asyncio.new_event_loop())
     loop = asyncio.get_event_loop()
 
-    node = loop.run_until_complete(DHTNode.create(initial_peers=initial_peers, ping_n_attempts=10))
+    node = loop.run_until_complete(DHTNode.create(initial_peers=initial_peers, ping_n_attempts=10, **kwargs))
     maddrs = loop.run_until_complete(node.get_visible_maddrs())
 
     info_queue.put((node.node_id, node.peer_id, maddrs))
@@ -31,7 +31,7 @@ def run_node(initial_peers: List[Multiaddr], info_queue: mp.Queue):
 
 
 def launch_swarm_in_separate_processes(
-    n_peers: int, n_sequential_peers: int
+    n_peers: int, n_sequential_peers: int, **kwargs
 ) -> Tuple[List[mp.Process], Dict[PeerID, DHTID], List[List[Multiaddr]]]:
     assert (
         n_sequential_peers < n_peers
@@ -47,7 +47,7 @@ def launch_swarm_in_separate_processes(
     for _ in range(n_sequential_peers):
         initial_peers = random.choice(swarm_maddrs) if swarm_maddrs else []
 
-        proc = mp.Process(target=run_node, args=(initial_peers, info_queue), daemon=True)
+        proc = mp.Process(target=run_node, args=(initial_peers, info_queue), kwargs=kwargs, daemon=True)
         proc.start()
         processes.append(proc)
 
@@ -72,7 +72,7 @@ def launch_swarm_in_separate_processes(
         with info_lock:
             initial_peers = random.choice(swarm_maddrs)
 
-        proc = mp.Process(target=run_node, args=(initial_peers, info_queue), daemon=True)
+        proc = mp.Process(target=run_node, args=(initial_peers, info_queue), kwargs=kwargs, daemon=True)
         proc.start()
         processes.append(proc)