justheuristic 4 years ago
parent
commit
8352f1a45f
2 changed files with 13 additions and 5 deletions
  1. 5 3
      hivemind/p2p/p2p_daemon.py
  2. 8 2
      hivemind/p2p/p2p_daemon_bindings/control.py

+ 5 - 3
hivemind/p2p/p2p_daemon.py

@@ -20,6 +20,7 @@ from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, St
 from hivemind.proto.p2pd_pb2 import RPCError
 from hivemind.utils.asyncio import as_aiter, asingle
 from hivemind.utils.logging import get_logger, golog_level_to_python, loglevel, python_level_to_golog
+from hivemind.utils.networking import get_free_port
 
 logger = get_logger(__name__)
 
@@ -137,9 +138,10 @@ class P2P:
         with path(cli, P2PD_FILENAME) as p:
             p2pd_path = p
 
-        socket_uid = secrets.token_urlsafe(8)
-        self._daemon_listen_maddr = Multiaddr(cls._UNIX_SOCKET_PREFIX + f"p2pd-{socket_uid}.sock")
-        self._client_listen_maddr = Multiaddr(cls._UNIX_SOCKET_PREFIX + f"p2pclient-{socket_uid}.sock")
+        self._daemon_listen_port = get_free_port()
+        self._client_listen_port = get_free_port()
+        self._daemon_listen_maddr = Multiaddr(f'/ip4/127.0.0.1/tcp/{self._daemon_listen_port}')
+        self._client_listen_maddr = Multiaddr(f'/ip4/127.0.0.1/tcp/{self._client_listen_port}')
 
         need_bootstrap = bool(initial_peers) or use_ipfs
         process_kwargs = cls.DHT_MODE_MAPPING.get(dht_mode, {"dht": 0})

+ 8 - 2
hivemind/p2p/p2p_daemon_bindings/control.py

@@ -27,6 +27,8 @@ SUPPORTED_PROTOS = (protocols.protocol_with_code(proto) for proto in SUPPORT_CON
 logger = get_logger(__name__)
 
 DEFAULT_MAX_MSG_SIZE = 4 * 1024 ** 2
+DEFAULT_CONN_HIGH_WATERMARK = 2 ** 24
+
 
 
 def parse_conn_protocol(maddr: Multiaddr) -> int:
@@ -62,6 +64,8 @@ class DaemonConnector:
         Open connection to daemon and upgrade it to a persistent one
         """
         reader, writer = await self.open_connection()
+        reader.transport.set_write_buffer_limits(low=2 ** 22, high=2 ** 24)
+        writer.transport.set_write_buffer_limits(low=2 ** 22, high=2 ** 24)
         req = p2pd_pb.Request(type=p2pd_pb.Request.PERSISTENT_CONN_UPGRADE)
         await write_pbmsg(writer, req)
 
@@ -152,11 +156,11 @@ class ControlClient:
         proto_code = parse_conn_protocol(self.listen_maddr)
         if proto_code == protocols.P_UNIX:
             listen_path = self.listen_maddr.value_for_protocol(protocols.P_UNIX)
-            server = await asyncio.start_unix_server(self._handler, path=listen_path)
+            server = await asyncio.start_unix_server(self._handler, path=listen_path, limit=DEFAULT_CONN_HIGH_WATERMARK)
         elif proto_code == protocols.P_IP4:
             host = self.listen_maddr.value_for_protocol(protocols.P_IP4)
             port = int(self.listen_maddr.value_for_protocol(protocols.P_TCP))
-            server = await asyncio.start_server(self._handler, port=port, host=host)
+            server = await asyncio.start_server(self._handler, port=port, host=host, limit=DEFAULT_CONN_HIGH_WATERMARK)
         else:
             raise ValueError(f"Protocol not supported: {protocols.protocol_with_code(proto_code)}")
 
@@ -344,6 +348,8 @@ class ControlClient:
         self, peer_id: PeerID, protocols: Sequence[str]
     ) -> Tuple[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter]:
         reader, writer = await self.daemon_connector.open_connection()
+        reader.transport.set_write_buffer_limits(low=2 ** 22, high=2 ** 24)
+        writer.transport.set_write_buffer_limits(low=2 ** 22, high=2 ** 24)
 
         stream_open_req = p2pd_pb.StreamOpenRequest(peer=peer_id.to_bytes(), proto=list(protocols))
         req = p2pd_pb.Request(type=p2pd_pb.Request.STREAM_OPEN, streamOpen=stream_open_req)