Prechádzať zdrojové kódy

Fix flaps in test_p2p_daemon_bindings.py (#352)

@deniskamazur has found that tests in master fail with an exception in [4 of 90 runs](https://github.com/learning-at-home/hivemind/actions/runs/1139793757).

3 of 4 fails are related to race conditions with `get_free_port()` leading to the `Address already in use` error in `test_p2p_daemon_bindings.py`.

This PR:

1. Fixes the race conditions by switching to Unix sockets for client-daemon communication (as in the `hivemind.p2p.P2P`) and `/tcp/0` instead of `tcp/{get_free_port()}` for `-hostAddr`.
2. Fixes the code to remove the sockets after closing the server.
3. Adds a note about race condition to the `get_free_port()` docstring to discourage future users.
4. Removes a duplicate of `test_client_identify()` (surprisingly, there were two identical tests).
5. Decreases timeout values for timeout tests to avoid flaps like [this](https://github.com/learning-at-home/hivemind/runs/3358128875).
Alexander Borzunov 4 rokov pred
rodič
commit
cf31951504

+ 6 - 1
hivemind/utils/networking.py

@@ -31,7 +31,12 @@ def strip_port(endpoint: Endpoint) -> Hostname:
 
 
 def get_free_port(params=(socket.AF_INET, socket.SOCK_STREAM), opt=(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)):
-    """Finds a tcp port that can be occupied with a socket with *params and use *opt options"""
+    """
+    Finds a tcp port that can be occupied with a socket with *params and use *opt options.
+
+    :note: Using this function is discouraged since it often leads to a race condition
+           with the "Address is already in use" error if the code is run in parallel.
+    """
     try:
         with closing(socket.socket(*params)) as sock:
             sock.bind(("", 0))

+ 1 - 1
tests/test_dht.py

@@ -22,7 +22,7 @@ async def test_startup_error():
 
     dht = hivemind.DHT(start=True, await_ready=False)
     with pytest.raises(concurrent.futures.TimeoutError):
-        dht.wait_until_ready(timeout=0.1)
+        dht.wait_until_ready(timeout=0.01)
     dht.shutdown()
 
 

+ 1 - 1
tests/test_p2p_daemon.py

@@ -42,7 +42,7 @@ async def test_startup_error_message():
         )
 
     with pytest.raises(P2PDaemonError, match=r"Daemon failed to start in .+ seconds"):
-        await P2P.create(startup_timeout=0.1)  # Test that startup_timeout works
+        await P2P.create(startup_timeout=0.01)  # Test that startup_timeout works
 
 
 @pytest.mark.parametrize(

+ 2 - 7
tests/test_p2p_daemon_bindings.py

@@ -18,7 +18,7 @@ from hivemind.p2p.p2p_daemon_bindings.utils import (
 )
 from hivemind.proto import p2pd_pb2 as p2pd_pb
 
-from test_utils.p2p_daemon import connect_safe, make_p2pd_pair_ip4
+from test_utils.p2p_daemon import connect_safe, make_p2pd_pair_unix
 
 
 def test_raise_if_failed_raises():
@@ -61,7 +61,7 @@ ENABLE_CONTROL = True
 ENABLE_CONNMGR = False
 ENABLE_DHT = False
 ENABLE_PUBSUB = False
-FUNC_MAKE_P2PD_PAIR = make_p2pd_pair_ip4
+FUNC_MAKE_P2PD_PAIR = make_p2pd_pair_unix
 
 
 class MockReader(io.BytesIO):
@@ -376,11 +376,6 @@ async def p2pcs():
         yield tuple(p2pd_tuple.client for p2pd_tuple in p2pd_tuples)
 
 
-@pytest.mark.asyncio
-async def test_client_identify_unix_socket(p2pcs):
-    await p2pcs[0].identify()
-
-
 @pytest.mark.asyncio
 async def test_client_identify(p2pcs):
     await p2pcs[0].identify()

+ 16 - 19
tests/test_utils/p2p_daemon.py

@@ -4,7 +4,7 @@ import os
 import subprocess
 import time
 import uuid
-from contextlib import asynccontextmanager
+from contextlib import asynccontextmanager, suppress
 from typing import NamedTuple
 
 from multiaddr import Multiaddr, protocols
@@ -57,7 +57,7 @@ class Daemon:
 
     def _run(self):
         cmd_list = [P2PD_PATH, f"-listen={str(self.control_maddr)}"]
-        cmd_list += [f"-hostAddrs=/ip4/127.0.0.1/tcp/{get_free_port()}"]
+        cmd_list += ["-hostAddrs=/ip4/127.0.0.1/tcp/0"]
         if self.enable_connmgr:
             cmd_list += ["-connManager=true", "-connLo=1", "-connHi=2", "-connGrace=0"]
         if self.enable_dht:
@@ -107,24 +107,21 @@ async def make_p2pd_pair_unix(enable_control, enable_connmgr, enable_dht, enable
     name = str(uuid.uuid4())[:8]
     control_maddr = Multiaddr(f"/unix/tmp/test_p2pd_control_{name}.sock")
     listen_maddr = Multiaddr(f"/unix/tmp/test_p2pd_listen_{name}.sock")
-    # Remove the existing unix socket files if they are existing
     try:
-        os.unlink(control_maddr.value_for_protocol(protocols.P_UNIX))
-    except FileNotFoundError:
-        pass
-    try:
-        os.unlink(listen_maddr.value_for_protocol(protocols.P_UNIX))
-    except FileNotFoundError:
-        pass
-    async with _make_p2pd_pair(
-        control_maddr=control_maddr,
-        listen_maddr=listen_maddr,
-        enable_control=enable_control,
-        enable_connmgr=enable_connmgr,
-        enable_dht=enable_dht,
-        enable_pubsub=enable_pubsub,
-    ) as pair:
-        yield pair
+        async with _make_p2pd_pair(
+            control_maddr=control_maddr,
+            listen_maddr=listen_maddr,
+            enable_control=enable_control,
+            enable_connmgr=enable_connmgr,
+            enable_dht=enable_dht,
+            enable_pubsub=enable_pubsub,
+        ) as pair:
+            yield pair
+    finally:
+        with suppress(FileNotFoundError):
+            os.unlink(control_maddr.value_for_protocol(protocols.P_UNIX))
+        with suppress(FileNotFoundError):
+            os.unlink(listen_maddr.value_for_protocol(protocols.P_UNIX))
 
 
 @asynccontextmanager