Просмотр исходного кода

Refactor P2P before DHT integration (#295)

This PR improves hivemind.p2p.P2P interfaces, docs, and type annotations.

Co-authored-by: justheuristic <justheuristic@gmail.com>
Alexander Borzunov 4 лет назад
Родитель
Сommit
0550a4cd1b
3 измененных файлов с 169 добавлено и 134 удалено
  1. 2 1
      hivemind/p2p/__init__.py
  2. 121 84
      hivemind/p2p/p2p_daemon.py
  3. 46 49
      tests/test_p2p_daemon.py

+ 2 - 1
hivemind/p2p/__init__.py

@@ -1 +1,2 @@
-from hivemind.p2p.p2p_daemon import P2P
+from hivemind.p2p.p2p_daemon import P2P, P2PContext, P2PHandlerError
+from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID

+ 121 - 84
hivemind/p2p/p2p_daemon.py

@@ -3,7 +3,7 @@ from copy import deepcopy
 from dataclasses import dataclass
 from importlib.resources import path
 from subprocess import Popen
-from typing import List, Optional
+from typing import Any, Callable, Dict, List, Optional, Tuple
 
 import google.protobuf
 from multiaddr import Multiaddr
@@ -24,24 +24,27 @@ NUM_RETRIES = 3
 RETRY_DELAY = 0.4
 
 
-class P2PInterruptedError(Exception):
-    pass
-
-
-@dataclass(frozen=False)
+@dataclass(frozen=True)
 class P2PContext(object):
-    id: str
-    port: int
     handle_name: str
-    peer_id: PeerID = None
-    peer_addr: Multiaddr = None
+    local_id: PeerID
+    remote_id: PeerID = None
+    remote_maddr: Multiaddr = None
 
 
 class P2P:
     """
-    Forks a child process and executes p2pd command with given arguments.
-    Can be used for peer to peer communication and procedure calls.
-    Sends SIGKILL to the child in destructor.
+    This class is responsible for establishing peer-to-peer connections through NAT and/or firewalls.
+    It creates and manages a libp2p daemon in a background process, then terminates it when P2P is shut down.
+    In order to communicate, a P2P instance should either use one or more bootstrap_peers that will connect it
+    to the rest of the swarm or use the public IPFS network (https://ipfs.io).
+
+    For incoming connections, P2P instances add RPC handlers that may be accessed by other peers:
+      - `P2P.add_unary_handler` accepts a protobuf message and returns another protobuf
+      - `P2P.add_stream_handler` transfers raw data using bi-directional streaming interface
+
+    To access these handlers, a P2P instance can `P2P.call_unary_handler`/`P2P.call_stream_handler`,
+    using the recipient's unique `P2P.id` and the name of the corresponding handler.
     """
 
     HEADER_LEN = 8
@@ -68,13 +71,14 @@ class P2P:
     @classmethod
     async def create(cls, *args, quic: bool = True, tls: bool = True, conn_manager: bool = True,
                      dht_mode: str = 'dht_server', force_reachability: Optional[str] = None,
-                     nat_port_map: bool = True, auto_nat: bool = True, bootstrap: bool = False,
-                     bootstrap_peers: Optional[List[str]] = None, use_global_ipfs: bool = False, host_port: int = None,
+                     nat_port_map: bool = True, auto_nat: bool = True,
+                     bootstrap_peers: Optional[List[Multiaddr]] = None,
+                     use_ipfs: bool = False, external_port: int = None,
                      daemon_listen_port: int = None, use_relay: bool = True, use_relay_hop: bool = False,
-                     use_relay_discovery: bool = False, use_auto_relay: bool = False, relay_hop_limit: int = 0, **kwargs):
+                     use_relay_discovery: bool = False, use_auto_relay: bool = False, relay_hop_limit: int = 0,
+                     **kwargs) -> 'P2P':
         """
         Start a new p2pd process and connect to it.
-        :param args:
         :param quic: Enables the QUIC transport
         :param tls: Enables TLS1.3 channel security protocol
         :param conn_manager: Enables the Connection Manager
@@ -83,30 +87,28 @@ class P2P:
         :param nat_port_map: Enables NAT port mapping
         :param auto_nat: Enables the AutoNAT service
         :param bootstrap: Connects to bootstrap peers and bootstraps the dht if enabled
-        :param bootstrap_peers: List of bootstrap peers; defaults to the IPFS DHT peers
-        :param use_global_ipfs: Bootstrap to global ipfs (works only if bootstrap=True and bootstrap_peers=None)
-        :param host_port: port for p2p network
+        :param bootstrap_peers: List of bootstrap peers
+        :param use_ipfs: Bootstrap to IPFS (works only if bootstrap=True and bootstrap_peers=None)
+        :param external_port: port for external connections from other p2p instances
         :param daemon_listen_port: port for connection daemon and client binding
         :param use_relay: enables circuit relay
         :param use_relay_hop: enables hop for relay
         :param use_relay_discovery: enables passive discovery for relay
         :param use_auto_relay: enables autorelay
         :param relay_hop_limit: sets the hop limit for hop relays
-        :param kwargs:
-        :return: new wrapper for p2p daemon
+        :param args: positional CLI arguments for the p2p daemon
+        :param kwargs: keyword CLI arguments for the p2p daemon
+        :return: a wrapper for the p2p daemon
         """
 
-        assert not (bootstrap and bootstrap_peers is None and not use_global_ipfs), \
-            'Trying to create with bootstrap node without bootstrap nodes list. ' \
-            'It is very dangerous, because p2pd connects to global ipfs and it is very unstable. ' \
-            'If you really want this, pass use_global_ipfs=True'
-        assert not (bootstrap_peers is not None and use_global_ipfs), \
-            'Non empty bootstrap_nodes and use_global_ipfs=True are incompatible.' \
-            'Choose one option: your nodes list (preferable) or global ipfs (very unstable)'
+        assert not (bootstrap_peers and use_ipfs), \
+            'User-defined bootstrap_peers and use_ipfs=True are incompatible, please choose one option'
 
         self = cls()
         with path(cli, P2PD_FILENAME) as p:
             p2pd_path = p
+
+        need_bootstrap = bool(bootstrap_peers) or use_ipfs
         bootstrap_peers = cls._make_bootstrap_peers(bootstrap_peers)
         dht = cls.DHT_MODE_MAPPING.get(dht_mode, {'dht': 0})
         force_reachability = cls.FORCE_REACHABILITY_MAPPING.get(force_reachability, {})
@@ -116,8 +118,8 @@ class P2P:
             natPortMap=nat_port_map, autonat=auto_nat,
             relay=use_relay, relayHop=use_relay_hop, relayDiscovery=use_relay_discovery,
             autoRelay=use_auto_relay, relayHopLimit=relay_hop_limit,
-            b=bootstrap, **{**bootstrap_peers, **dht, **force_reachability, **kwargs})
-        self._assign_daemon_ports(host_port, daemon_listen_port)
+            b=need_bootstrap, **{**bootstrap_peers, **dht, **force_reachability, **kwargs})
+        self._assign_daemon_ports(external_port, daemon_listen_port)
 
         for try_count in range(NUM_RETRIES):
             try:
@@ -134,11 +136,11 @@ class P2P:
         return self
 
     @classmethod
-    async def replicate(cls, daemon_listen_port: int, host_port: int):
+    async def replicate(cls, daemon_listen_port: int, external_port: int) -> 'P2P':
         """
         Connect to existing p2p daemon
         :param daemon_listen_port: port for connection daemon and client binding
-        :param host_port: port for p2p network
+        :param external_port: port for external connections from other p2p instances
         :return: new wrapper for existing p2p daemon
         """
 
@@ -147,7 +149,7 @@ class P2P:
         # Use external already running p2pd
         self._child = None
         self._alive = True
-        self._assign_daemon_ports(host_port, daemon_listen_port)
+        self._assign_daemon_ports(external_port, daemon_listen_port)
         self._client_listen_port = find_open_port()
         self._client = p2pclient.Client(
             Multiaddr(f'/ip4/127.0.0.1/tcp/{self._daemon_listen_port}'),
@@ -155,7 +157,15 @@ class P2P:
         await self._wait_for_client()
         return self
 
-    async def wait_for_at_least_n_peers(self, n_peers, attempts=3, delay=1):
+    async def identify_maddrs(self) -> List[Multiaddr]:
+        _, maddrs = await self._client.identify()
+        if not maddrs:
+            raise ValueError(f"No multiaddrs found for peer {self.id}")
+
+        p2p_maddr = Multiaddr(f'/p2p/{self.id.to_base58()}')
+        return [addr.encapsulate(p2p_maddr) for addr in maddrs]
+
+    async def wait_for_at_least_n_peers(self, n_peers: int, attempts: int = 3, delay: float = 1) -> None:
         for _ in range(attempts):
             peers = await self._client.list_peers()
             if len(peers) >= n_peers:
@@ -167,7 +177,7 @@ class P2P:
     def _initialize(self, proc_args: List[str]) -> None:
         proc_args = deepcopy(proc_args)
         proc_args.extend(self._make_process_args(
-            hostAddrs=f'/ip4/0.0.0.0/tcp/{self._host_port},/ip4/0.0.0.0/udp/{self._host_port}/quic',
+            hostAddrs=f'/ip4/0.0.0.0/tcp/{self._external_port},/ip4/0.0.0.0/udp/{self._external_port}/quic',
             listen=f'/ip4/127.0.0.1/tcp/{self._daemon_listen_port}'
         ))
         self._child = Popen(args=proc_args, encoding="utf8")
@@ -177,33 +187,36 @@ class P2P:
             Multiaddr(f'/ip4/127.0.0.1/tcp/{self._daemon_listen_port}'),
             Multiaddr(f'/ip4/127.0.0.1/tcp/{self._client_listen_port}'))
 
-    async def _wait_for_client(self, delay=0):
+    async def _wait_for_client(self, delay: float = 0) -> None:
         await asyncio.sleep(delay)
-        encoded = await self._client.identify()
-        self.id = encoded[0].to_base58()
+        self.id, _ = await self._client.identify()
 
-    def _assign_daemon_ports(self, host_port=None, daemon_listen_port=None):
-        if host_port is None:
-            host_port = find_open_port()
+    def _assign_daemon_ports(self, external_port: int = None, daemon_listen_port: int = None) -> None:
+        if external_port is None:
+            external_port = find_open_port()
         if daemon_listen_port is None:
             daemon_listen_port = find_open_port()
-            while daemon_listen_port == host_port:
+            while daemon_listen_port == external_port:
                 daemon_listen_port = find_open_port()
 
-        self._host_port, self._daemon_listen_port = host_port, daemon_listen_port
+        self._external_port, self._daemon_listen_port = external_port, daemon_listen_port
+
+    @property
+    def external_port(self) -> int:
+        return self._external_port
 
     @staticmethod
-    async def send_raw_data(byte_str, writer):
-        request = len(byte_str).to_bytes(P2P.HEADER_LEN, P2P.BYTEORDER) + byte_str
+    async def send_raw_data(data: bytes, writer: asyncio.StreamWriter) -> None:
+        request = len(data).to_bytes(P2P.HEADER_LEN, P2P.BYTEORDER) + data
         writer.write(request)
 
     @staticmethod
-    async def send_msgpack(data, writer):
+    async def send_msgpack(data: Any, writer: asyncio.StreamWriter) -> None:
         raw_data = MSGPackSerializer.dumps(data)
         await P2P.send_raw_data(raw_data, writer)
 
     @staticmethod
-    async def send_protobuf(protobuf, out_proto_type, writer):
+    async def send_protobuf(protobuf, out_proto_type: type, writer: asyncio.StreamWriter) -> None:
         if type(protobuf) != out_proto_type:
             raise TypeError('Unary handler returned protobuf of wrong type.')
         if out_proto_type == p2pd_pb2.RPCError:
@@ -214,18 +227,19 @@ class P2P:
         await P2P.send_raw_data(protobuf.SerializeToString(), writer)
 
     @staticmethod
-    async def receive_raw_data(reader: asyncio.StreamReader, header_len=HEADER_LEN):
-        header = await reader.readexactly(header_len)
+    async def receive_raw_data(reader: asyncio.StreamReader) -> bytes:
+        header = await reader.readexactly(P2P.HEADER_LEN)
         content_length = int.from_bytes(header, P2P.BYTEORDER)
         data = await reader.readexactly(content_length)
         return data
 
     @staticmethod
-    async def receive_msgpack(reader):
+    async def receive_msgpack(reader: asyncio.StreamReader) -> Any:
         return MSGPackSerializer.loads(await P2P.receive_raw_data(reader))
 
     @staticmethod
-    async def receive_protobuf(in_proto_type, reader):
+    async def receive_protobuf(in_proto_type: type, reader: asyncio.StreamReader) -> \
+            Tuple[Any, Optional[p2pd_pb2.RPCError]]:
         msg_type = await P2P.receive_raw_data(reader)
         if msg_type == P2P.RESULT_MESSAGE:
             protobuf = in_proto_type()
@@ -239,8 +253,9 @@ class P2P:
             raise TypeError('Invalid Protobuf message type')
 
     @staticmethod
-    def _handle_stream(handle):
-        async def do_handle_stream(stream_info, reader, writer):
+    def _handle_stream(handle: Callable[[bytes], bytes]):
+        async def do_handle_stream(
+                stream_info: StreamInfo, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
             try:
                 request = await P2P.receive_raw_data(reader)
             except asyncio.IncompleteReadError:
@@ -255,27 +270,28 @@ class P2P:
 
         return do_handle_stream
 
-    @staticmethod
-    def _handle_unary_stream(handle, context, in_proto_type, out_proto_type):
-        async def watchdog(reader: asyncio.StreamReader):
+    def _handle_unary_stream(self, handle: Callable[[Any, P2PContext], Any], handle_name: str,
+                             in_proto_type: type, out_proto_type: type):
+        async def watchdog(reader: asyncio.StreamReader) -> None:
             await reader.read(n=1)
             raise P2PInterruptedError()
 
-        async def do_handle_unary_stream(
-                stream_info: StreamInfo,
-                reader: asyncio.StreamReader,
-                writer: asyncio.StreamWriter) -> None:
+        async def do_handle_unary_stream(stream_info: StreamInfo,
+                                         reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
             try:
                 try:
-                    request = await P2P.receive_protobuf(in_proto_type, reader)
+                    request, err = await P2P.receive_protobuf(in_proto_type, reader)
                 except asyncio.IncompleteReadError:
-                    logger.debug("Incomplete read while receiving request from peer")
+                    logger.debug('Incomplete read while receiving request from peer')
                     return
                 except google.protobuf.message.DecodeError as error:
-                    logger.exception(error)
+                    logger.debug(f'Failed to decode request protobuf: {error}')
                     return
+                if err is not None:
+                    logger.debug(f'Got an error instead of a request: {err}')
 
-                context.peer_id, context.peer_addr = stream_info.peer_id, stream_info.addr
+                context = P2PContext(handle_name=handle_name, local_id=self.id,
+                                     remote_id=stream_info.peer_id, remote_maddr=stream_info.addr)
                 done, pending = await asyncio.wait([watchdog(reader), handle(request, context)],
                                                    return_when=asyncio.FIRST_COMPLETED)
                 try:
@@ -298,14 +314,14 @@ class P2P:
 
         return do_handle_unary_stream
 
-    def start_listening(self):
-        async def listen():
+    def start_listening(self) -> None:
+        async def listen() -> None:
             async with self._client.listen():
                 await self._server_stopped.wait()
 
         self._listen_task = asyncio.create_task(listen())
 
-    async def stop_listening(self):
+    async def stop_listening(self) -> None:
         if self._listen_task is not None:
             self._server_stopped.set()
             self._listen_task.cancel()
@@ -315,41 +331,53 @@ class P2P:
                 self._listen_task = None
                 self._server_stopped.clear()
 
-    async def add_stream_handler(self, name, handle):
+    async def add_stream_handler(self, name: str, handle: Callable[[bytes], bytes]) -> None:
         if self._listen_task is None:
             self.start_listening()
         await self._client.stream_handler(name, self._handle_stream(handle))
 
-    async def add_unary_handler(self, name, handle, in_proto_type, out_proto_type):
+    async def add_unary_handler(self, name: str, handle: Callable[[Any, P2PContext], Any],
+                                in_proto_type: type, out_proto_type: type) -> None:
         if self._listen_task is None:
             self.start_listening()
-        context = P2PContext(id=self.id, port=self._host_port, handle_name=name)
         await self._client.stream_handler(
-            name, P2P._handle_unary_stream(handle, context, in_proto_type, out_proto_type))
+            name, self._handle_unary_stream(handle, name, in_proto_type, out_proto_type))
 
-    async def call_peer_handler(self, peer_id, handler_name, input_data):
-        libp2p_peer_id = PeerID.from_base58(peer_id)
-        stream_info, reader, writer = await self._client.stream_open(libp2p_peer_id, (handler_name,))
+    async def call_peer_handler(self, peer_id: PeerID, handler_name: str, input_data: bytes) -> bytes:
+        stream_info, reader, writer = await self._client.stream_open(peer_id, (handler_name,))
         try:
             await P2P.send_raw_data(input_data, writer)
             return await P2P.receive_raw_data(reader)
         finally:
             writer.close()
 
+    async def call_unary_handler(self, peer_id: PeerID, handler_name: str,
+                                 request_protobuf: Any, response_proto_type: type) -> Any:
+        stream_info, reader, writer = await self._client.stream_open(peer_id, (handler_name,))
+        try:
+            await P2P.send_protobuf(request_protobuf, type(request_protobuf), writer)
+            result, err = await P2P.receive_protobuf(response_proto_type, reader)
+            if err is not None:
+                raise P2PHandlerError(f'Failed to call unary handler {handler_name} at {peer_id}: {err.message}')
+
+            return result
+        finally:
+            writer.close()
+
     def __del__(self):
         self._terminate()
 
     @property
-    def is_alive(self):
+    def is_alive(self) -> bool:
         return self._alive
 
-    async def shutdown(self):
+    async def shutdown(self) -> None:
         await asyncio.get_event_loop().run_in_executor(None, self._terminate)
 
-    def _terminate(self):
+    def _terminate(self) -> None:
         self._alive = False
         if self._child is not None and self._child.poll() is None:
-            self._child.kill()
+            self._child.terminate()
             self._child.wait()
 
     @staticmethod
@@ -365,13 +393,22 @@ class P2P:
         return proc_args
 
     @staticmethod
-    def _convert_process_arg_type(val):
+    def _convert_process_arg_type(val: Any) -> Any:
         if isinstance(val, bool):
-            return 1 if val else 0
+            return int(val)
         return val
 
     @staticmethod
-    def _make_bootstrap_peers(nodes):
-        if nodes is None:
+    def _make_bootstrap_peers(maddrs: Optional[List[Multiaddr]]) -> Dict[str, str]:
+        if maddrs is None:
             return {}
-        return {'bootstrapPeers': ','.join(nodes)}
+
+        return {'bootstrapPeers': ','.join(str(addr) for addr in maddrs)}
+
+
+class P2PInterruptedError(Exception):
+    pass
+
+
+class P2PHandlerError(Exception):
+    pass

+ 46 - 49
tests/test_p2p_daemon.py

@@ -7,8 +7,9 @@ from typing import List
 import numpy as np
 import pytest
 import torch
+from multiaddr import Multiaddr
 
-from hivemind.p2p import P2P
+from hivemind.p2p import P2P, P2PHandlerError
 from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID
 from hivemind.proto import dht_pb2, runtime_pb2
 from hivemind.utils import MSGPackSerializer
@@ -19,16 +20,19 @@ def is_process_running(pid: int) -> bool:
     return subprocess.run(["ps", "-p", str(pid)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0
 
 
-async def replicate_if_needed(p2p: P2P, replicate: bool):
-    return await P2P.replicate(p2p._daemon_listen_port, p2p._host_port) if replicate else p2p
+async def replicate_if_needed(p2p: P2P, replicate: bool) -> P2P:
+    return await P2P.replicate(p2p._daemon_listen_port, p2p.external_port) if replicate else p2p
 
 
-def bootstrap_addr(host_port, id_):
-    return f'/ip4/127.0.0.1/tcp/{host_port}/p2p/{id_}'
+def bootstrap_addr(external_port: int, id_: str) -> Multiaddr:
+    return Multiaddr(f'/ip4/127.0.0.1/tcp/{external_port}/p2p/{id_}')
 
 
-def bootstrap_from(daemons: List[P2P]) -> List[str]:
-    return [bootstrap_addr(d._host_port, d.id) for d in daemons]
+async def bootstrap_from(daemons: List[P2P]) -> List[Multiaddr]:
+    maddrs = []
+    for d in daemons:
+        maddrs += await d.identify_maddrs()
+    return maddrs
 
 
 @pytest.mark.asyncio
@@ -48,8 +52,8 @@ async def test_server_client_connection():
     peers = await server._client.list_peers()
     assert len(peers) == 0
 
-    nodes = bootstrap_from([server])
-    client = await P2P.create(bootstrap=True, bootstrap_peers=nodes)
+    nodes = await bootstrap_from([server])
+    client = await P2P.create(bootstrap_peers=nodes)
     await client.wait_for_at_least_n_peers(1)
 
     peers = await client._client.list_peers()
@@ -61,7 +65,7 @@ async def test_server_client_connection():
 @pytest.mark.asyncio
 async def test_daemon_replica_does_not_affect_primary():
     p2p_daemon = await P2P.create()
-    p2p_replica = await P2P.replicate(p2p_daemon._daemon_listen_port, p2p_daemon._host_port)
+    p2p_replica = await P2P.replicate(p2p_daemon._daemon_listen_port, p2p_daemon.external_port)
 
     child_pid = p2p_daemon._child.pid
     assert is_process_running(child_pid)
@@ -126,6 +130,8 @@ def handle_add_torch_with_exc(args):
 @pytest.mark.asyncio
 async def test_call_unary_handler(should_cancel, replicate, handle_name="handle"):
     handler_cancelled = False
+    server_primary = await P2P.create()
+    server = await replicate_if_needed(server_primary, replicate)
 
     async def ping_handler(request, context):
         try:
@@ -134,44 +140,38 @@ async def test_call_unary_handler(should_cancel, replicate, handle_name="handle"
             nonlocal handler_cancelled
             handler_cancelled = True
         return dht_pb2.PingResponse(
-            peer=dht_pb2.NodeInfo(
-                node_id=context.id.encode(), rpc_port=context.port),
+            peer=dht_pb2.NodeInfo(node_id=server.id.to_bytes(), rpc_port=server.external_port),
             sender_endpoint=context.handle_name, available=True)
 
-    server_primary = await P2P.create()
-    server = await replicate_if_needed(server_primary, replicate)
     server_pid = server_primary._child.pid
     await server.add_unary_handler(handle_name, ping_handler, dht_pb2.PingRequest,
                                    dht_pb2.PingResponse)
     assert is_process_running(server_pid)
 
-    nodes = bootstrap_from([server])
-    client_primary = await P2P.create(bootstrap=True, bootstrap_peers=nodes)
+    nodes = await bootstrap_from([server])
+    client_primary = await P2P.create(bootstrap_peers=nodes)
     client = await replicate_if_needed(client_primary, replicate)
     client_pid = client_primary._child.pid
     assert is_process_running(client_pid)
+    await client.wait_for_at_least_n_peers(1)
 
     ping_request = dht_pb2.PingRequest(
-        peer=dht_pb2.NodeInfo(node_id=client.id.encode(), rpc_port=client._host_port),
+        peer=dht_pb2.NodeInfo(node_id=client.id.to_bytes(), rpc_port=client.external_port),
         validate=True)
     expected_response = dht_pb2.PingResponse(
-        peer=dht_pb2.NodeInfo(node_id=server.id.encode(), rpc_port=server._host_port),
+        peer=dht_pb2.NodeInfo(node_id=server.id.to_bytes(), rpc_port=server.external_port),
         sender_endpoint=handle_name, available=True)
 
-    await client.wait_for_at_least_n_peers(1)
-    libp2p_server_id = PeerID.from_base58(server.id)
-    stream_info, reader, writer = await client._client.stream_open(libp2p_server_id, (handle_name,))
-
-    await P2P.send_protobuf(ping_request, dht_pb2.PingRequest, writer)
-
     if should_cancel:
+        stream_info, reader, writer = await client._client.stream_open(server.id, (handle_name,))
+        await P2P.send_protobuf(ping_request, dht_pb2.PingRequest, writer)
+
         writer.close()
         await asyncio.sleep(1)
         assert handler_cancelled
     else:
-        result, err = await P2P.receive_protobuf(dht_pb2.PingResponse, reader)
-        assert err is None
-        assert result == expected_response
+        actual_response = await client.call_unary_handler(server.id, handle_name, ping_request, dht_pb2.PingResponse)
+        assert actual_response == expected_response
         assert not handler_cancelled
 
     await server.stop_listening()
@@ -192,22 +192,19 @@ async def test_call_unary_handler_error(handle_name="handle"):
     await server.add_unary_handler(handle_name, error_handler, dht_pb2.PingRequest, dht_pb2.PingResponse)
     assert is_process_running(server_pid)
 
-    nodes = bootstrap_from([server])
-    client = await P2P.create(bootstrap=True, bootstrap_peers=nodes)
+    nodes = await bootstrap_from([server])
+    client = await P2P.create(bootstrap_peers=nodes)
     client_pid = client._child.pid
     assert is_process_running(client_pid)
     await client.wait_for_at_least_n_peers(1)
 
     ping_request = dht_pb2.PingRequest(
-        peer=dht_pb2.NodeInfo(node_id=client.id.encode(), rpc_port=client._host_port),
+        peer=dht_pb2.NodeInfo(node_id=client.id.to_bytes(), rpc_port=client.external_port),
         validate=True)
-    libp2p_server_id = PeerID.from_base58(server.id)
-    stream_info, reader, writer = await client._client.stream_open(libp2p_server_id, (handle_name,))
 
-    await P2P.send_protobuf(ping_request, dht_pb2.PingRequest, writer)
-    result, err = await P2P.receive_protobuf(dht_pb2.PingResponse, reader)
-    assert result is None
-    assert err.message == 'boom'
+    with pytest.raises(P2PHandlerError) as excinfo:
+        await client.call_unary_handler(server.id, handle_name, ping_request, dht_pb2.PingResponse)
+    assert 'boom' in str(excinfo.value)
 
     await server.stop_listening()
     await server.shutdown()
@@ -230,8 +227,8 @@ async def test_call_peer_single_process(test_input, expected, handle, handler_na
     await server.add_stream_handler(handler_name, handle)
     assert is_process_running(server_pid)
 
-    nodes = bootstrap_from([server])
-    client = await P2P.create(bootstrap=True, bootstrap_peers=nodes)
+    nodes = await bootstrap_from([server])
+    client = await P2P.create(bootstrap_peers=nodes)
     client_pid = client._child.pid
     assert is_process_running(client_pid)
 
@@ -257,7 +254,7 @@ async def run_server(handler_name, server_side, client_side, response_received):
     assert is_process_running(server_pid)
 
     server_side.send(server.id)
-    server_side.send(server._host_port)
+    server_side.send(server.external_port)
     while response_received.value == 0:
         await asyncio.sleep(0.5)
 
@@ -286,7 +283,7 @@ async def test_call_peer_different_processes():
     peer_port = client_side.recv()
 
     nodes = [bootstrap_addr(peer_port, peer_id)]
-    client = await P2P.create(bootstrap=True, bootstrap_peers=nodes)
+    client = await P2P.create(bootstrap_peers=nodes)
     client_pid = client._child.pid
     assert is_process_running(client_pid)
 
@@ -319,8 +316,8 @@ async def test_call_peer_torch_square(test_input, expected, handler_name="handle
     server = await P2P.create()
     await server.add_stream_handler(handler_name, handle)
 
-    nodes = bootstrap_from([server])
-    client = await P2P.create(bootstrap=True, bootstrap_peers=nodes)
+    nodes = await bootstrap_from([server])
+    client = await P2P.create(bootstrap_peers=nodes)
 
     await client.wait_for_at_least_n_peers(1)
 
@@ -351,8 +348,8 @@ async def test_call_peer_torch_add(test_input, expected, handler_name="handle"):
     server = await P2P.create()
     await server.add_stream_handler(handler_name, handle)
 
-    nodes = bootstrap_from([server])
-    client = await P2P.create(bootstrap=True, bootstrap_peers=nodes)
+    nodes = await bootstrap_from([server])
+    client = await P2P.create(bootstrap_peers=nodes)
 
     await client.wait_for_at_least_n_peers(1)
 
@@ -382,8 +379,8 @@ async def test_call_peer_error(replicate, handler_name="handle"):
     server = await replicate_if_needed(server_primary, replicate)
     await server.add_stream_handler(handler_name, handle_add_torch_with_exc)
 
-    nodes = bootstrap_from([server])
-    client_primary = await P2P.create(bootstrap=True, bootstrap_peers=nodes)
+    nodes = await bootstrap_from([server])
+    client_primary = await P2P.create(bootstrap_peers=nodes)
     client = await replicate_if_needed(client_primary, replicate)
 
     await client.wait_for_at_least_n_peers(1)
@@ -403,7 +400,7 @@ async def test_handlers_on_different_replicas(handler_name="handle"):
     def handler(arg, key):
         return key
 
-    server_primary = await P2P.create(bootstrap=False)
+    server_primary = await P2P.create()
     server_id = server_primary.id
     await server_primary.add_stream_handler(handler_name, partial(handler, key=b'primary'))
 
@@ -413,8 +410,8 @@ async def test_handlers_on_different_replicas(handler_name="handle"):
     server_replica2 = await replicate_if_needed(server_primary, True)
     await server_replica2.add_stream_handler(handler_name + '2', partial(handler, key=b'replica2'))
 
-    nodes = bootstrap_from([server_primary])
-    client = await P2P.create(bootstrap=True, bootstrap_peers=nodes)
+    nodes = await bootstrap_from([server_primary])
+    client = await P2P.create(bootstrap_peers=nodes)
     await client.wait_for_at_least_n_peers(1)
 
     result = await client.call_peer_handler(server_id, handler_name, b'1')