Explorar o código

Support calling Servicer.get_stub without having servicer instances

Aleksandr Borzunov %!s(int64=4) %!d(string=hai) anos
pai
achega
c89b598fbc
Modificáronse 4 ficheiros con 148 adicións e 30 borrados
  1. 1 2
      hivemind/averaging/averager.py
  2. 33 23
      hivemind/p2p/servicer.py
  3. 109 0
      tests/log
  4. 5 5
      tests/test_p2p_servicer.py

+ 1 - 2
hivemind/averaging/averager.py

@@ -119,8 +119,7 @@ class DecentralizedAverager(mp.Process, ServicerBase):
         assert initial_group_bits is None or all(bit in "01" for bit in initial_group_bits)
         assert not client_mode or not auxiliary, "auxiliary peers must accept incoming connections"
 
-        mp.Process.__init__(self)
-        ServicerBase.__init__(self)
+        super().__init__()
         self.dht = dht
         self.client_mode = client_mode
         self._parent_pid = os.getpid()

+ 33 - 23
hivemind/p2p/servicer.py

@@ -1,7 +1,7 @@
 import asyncio
 import inspect
 from dataclasses import dataclass
-from typing import Any, AsyncIterator, Optional, Tuple, get_type_hints
+from typing import Any, AsyncIterator, List, Optional, Tuple, Type, get_type_hints
 
 from hivemind.p2p.p2p_daemon import P2P
 from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID
@@ -42,11 +42,34 @@ class ServicerBase:
       to calls to the remote peer.
     """
 
-    def __init__(self):
-        class_name = self.__class__.__name__
+    _rpc_handlers: Optional[List[RPCHandler]] = None
+    _stub_type: Optional[Type[StubBase]] = None
 
-        self._rpc_handlers = []
-        for method_name, method in inspect.getmembers(self.__class__, predicate=inspect.isfunction):
+    async def add_p2p_handlers(self, p2p: P2P, wrapper: Any = None) -> None:
+        self._collect_rpc_handlers()
+
+        servicer = self if wrapper is None else wrapper
+        for handler in self._rpc_handlers:
+            await p2p.add_protobuf_handler(
+                handler.handle_name,
+                getattr(servicer, handler.method_name),
+                handler.request_type,
+                stream_input=handler.stream_input,
+            )
+
+    @classmethod
+    def get_stub(cls, p2p: P2P, peer: PeerID) -> StubBase:
+        cls._collect_rpc_handlers()
+        return cls._stub_type(p2p, peer)
+
+    @classmethod
+    def _collect_rpc_handlers(cls) -> None:
+        if cls._rpc_handlers is not None:
+            return
+
+        class_name = cls.__name__
+        cls._rpc_handlers = []
+        for method_name, method in inspect.getmembers(cls, predicate=inspect.isfunction):
             if method_name.startswith("rpc_"):
                 handle_name = f"{class_name}.{method_name}"
 
@@ -65,17 +88,17 @@ class ServicerBase:
                         f"like `dht_pb2.FindRequest` or `AsyncIterator[dht_pb2.FindRequest]` "
                         f"for the `{request_arg}` parameter and the return value"
                     )
-                request_type, stream_input = self._strip_iterator_hint(request_type)
-                response_type, stream_output = self._strip_iterator_hint(response_type)
+                request_type, stream_input = cls._strip_iterator_hint(request_type)
+                response_type, stream_output = cls._strip_iterator_hint(response_type)
 
-                self._rpc_handlers.append(
+                cls._rpc_handlers.append(
                     RPCHandler(method_name, handle_name, request_type, response_type, stream_input, stream_output)
                 )
 
-        self._stub_type = type(
+        cls._stub_type = type(
             f"{class_name}Stub",
             (StubBase,),
-            {handler.method_name: self._make_rpc_caller(handler) for handler in self._rpc_handlers},
+            {handler.method_name: cls._make_rpc_caller(handler) for handler in cls._rpc_handlers},
         )
 
     @staticmethod
@@ -111,19 +134,6 @@ class ServicerBase:
         caller.__name__ = handler.method_name
         return caller
 
-    async def add_p2p_handlers(self, p2p: P2P, wrapper: Any = None) -> None:
-        servicer = self if wrapper is None else wrapper
-        for handler in self._rpc_handlers:
-            await p2p.add_protobuf_handler(
-                handler.handle_name,
-                getattr(servicer, handler.method_name),
-                handler.request_type,
-                stream_input=handler.stream_input,
-            )
-
-    def get_stub(self, p2p: P2P, peer: PeerID) -> StubBase:
-        return self._stub_type(p2p, peer)
-
     @staticmethod
     def _strip_iterator_hint(hint: type) -> Tuple[type, bool]:
         if hasattr(hint, "_name") and hint._name in ("AsyncIterator", "AsyncIterable"):

+ 109 - 0
tests/log

@@ -0,0 +1,109 @@
+[2021/07/10 11:53:40.062][DEBUG][utils.grpc.ChannelCache:49] Eviction period = 600.0s, max channels = 4096
+[2021/07/10 11:53:40.247][DEBUG][asyncio.__init__:59] Using selector: EpollSelector
+[2021/07/10 11:53:40.654][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe5d8f481f0> opens connection to /unix/tmp/hivemind-p2pd-B4tnhAq6U1U.sock
+[2021/07/10 11:53:40.672][DEBUG][p2p.p2p_daemon._ping_daemon:193] Launched p2pd with id = QmTqNtzhfJrChohykAkiZQ8xBJW8995EfQynqAf37BNCak, host multiaddrs = (<Multiaddr /ip4/127.0.0.1/tcp/38633>,)
+[2021/07/10 11:53:40.673][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe5d8f481f0> opens connection to /unix/tmp/hivemind-p2pd-B4tnhAq6U1U.sock
+[2021/07/10 11:53:40.674][DEBUG][p2p.p2p_daemon_bindings.control.listen:108] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.ControlClient object at 0x7fe5d8f48250> starts listening to /unix/tmp/hivemind-p2pclient-B4tnhAq6U1U.sock
+[2021/07/10 11:53:40.674][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe5d8f481f0> opens connection to /unix/tmp/hivemind-p2pd-B4tnhAq6U1U.sock
+[2021/07/10 11:53:40.675][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe5d8f481f0> opens connection to /unix/tmp/hivemind-p2pd-B4tnhAq6U1U.sock
+[2021/07/10 11:53:40.676][INFO][root.run_protocol_listener:40] Started peer id=DHTID(0x3db9c894718f96bdd909afcef13a545941e7e1e4) visible_maddrs=[<Multiaddr /ip4/127.0.0.1/tcp/38633/p2p/QmTqNtzhfJrChohykAkiZQ8xBJW8995EfQynqAf37BNCak>]
+[2021/07/10 11:53:40.683][DEBUG][asyncio.__init__:59] Using selector: EpollSelector
+[2021/07/10 11:53:41.101][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e2d30> opens connection to /unix/tmp/hivemind-p2pd-vXE2xWbi9bg.sock
+[2021/07/10 11:53:41.103][DEBUG][p2p.p2p_daemon._ping_daemon:193] Launched p2pd with id = Qma6rFAHFdwecmQKqWtU59LRqGAeXDam1rLc1u5ueNbwXm, host multiaddrs = (<Multiaddr /ip4/127.0.0.1/tcp/41695>,)
+[2021/07/10 11:53:41.105][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e2d30> opens connection to /unix/tmp/hivemind-p2pd-vXE2xWbi9bg.sock
+[2021/07/10 11:53:41.105][DEBUG][p2p.p2p_daemon_bindings.control.listen:108] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.ControlClient object at 0x7fe6623e2dc0> starts listening to /unix/tmp/hivemind-p2pclient-vXE2xWbi9bg.sock
+[2021/07/10 11:53:41.106][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e2d30> opens connection to /unix/tmp/hivemind-p2pd-vXE2xWbi9bg.sock
+[2021/07/10 11:53:41.106][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e2d30> opens connection to /unix/tmp/hivemind-p2pd-vXE2xWbi9bg.sock
+[2021/07/10 11:53:41.107][INFO][root.run_protocol_listener:40] Started peer id=DHTID(0xa7087d197c0f2758c6bff7256e3cb78c5152c137) visible_maddrs=[<Multiaddr /ip4/127.0.0.1/tcp/41695/p2p/Qma6rFAHFdwecmQKqWtU59LRqGAeXDam1rLc1u5ueNbwXm>]
+[2021/07/10 11:53:41.108][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e2d30> opens connection to /unix/tmp/hivemind-p2pd-vXE2xWbi9bg.sock
+[2021/07/10 11:53:41.109][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=Qma6rFAHFdwecmQKqWtU59LRqGAeXDam1rLc1u5ueNbwXm addr=/ip4/127.0.0.1/tcp/41695 proto=DHTProtocol.rpc_ping>
+[2021/07/10 11:53:41.111][DEBUG][asyncio.__init__:59] Using selector: EpollSelector
+[2021/07/10 11:53:41.528][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e5d90> opens connection to /unix/tmp/hivemind-p2pd-VRUjGSy2BP8.sock
+[2021/07/10 11:53:41.530][DEBUG][p2p.p2p_daemon._ping_daemon:193] Launched p2pd with id = QmYqP7ECXjtxaTi54eznbk2tgq9D43xpyL8hRYsksRMQ8Q, host multiaddrs = (<Multiaddr /ip4/127.0.0.1/tcp/34419>,)
+[2021/07/10 11:53:41.531][INFO][root.test_dht_protocol:81] Self id=DHTID(0xc147c315f7abd7fff33ccb0bd7b35cb0215ca616)
+[2021/07/10 11:53:41.531][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e5d90> opens connection to /unix/tmp/hivemind-p2pd-VRUjGSy2BP8.sock
+[2021/07/10 11:53:41.532][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmYqP7ECXjtxaTi54eznbk2tgq9D43xpyL8hRYsksRMQ8Q addr=/ip4/127.0.0.1/tcp/34419 proto=DHTProtocol.rpc_ping>
+[2021/07/10 11:53:41.534][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e5d90> opens connection to /unix/tmp/hivemind-p2pd-VRUjGSy2BP8.sock
+[2021/07/10 11:53:41.535][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmYqP7ECXjtxaTi54eznbk2tgq9D43xpyL8hRYsksRMQ8Q addr=/ip4/127.0.0.1/tcp/34419 proto=DHTProtocol.rpc_store>
+[2021/07/10 11:53:41.536][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e5d90> opens connection to /unix/tmp/hivemind-p2pd-VRUjGSy2BP8.sock
+[2021/07/10 11:53:41.536][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmYqP7ECXjtxaTi54eznbk2tgq9D43xpyL8hRYsksRMQ8Q addr=/ip4/127.0.0.1/tcp/34419 proto=DHTProtocol.rpc_find>
+[2021/07/10 11:53:41.538][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e5d90> opens connection to /unix/tmp/hivemind-p2pd-VRUjGSy2BP8.sock
+[2021/07/10 11:53:41.539][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmYqP7ECXjtxaTi54eznbk2tgq9D43xpyL8hRYsksRMQ8Q addr=/ip4/127.0.0.1/tcp/34419 proto=DHTProtocol.rpc_find>
+[2021/07/10 11:53:41.541][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e5d90> opens connection to /unix/tmp/hivemind-p2pd-VRUjGSy2BP8.sock
+[2021/07/10 11:53:41.542][ERROR][dht.protocol.call_find:278] DHTProtocol failed to find at fakeid
+Traceback (most recent call last):
+  File "/home/borzunov/hivemind/hivemind/dht/protocol.py", line 245, in call_find
+    response = await self.get_stub(peer).rpc_find(find_request, timeout=self.wait_timeout)
+  File "/home/borzunov/hivemind/hivemind/utils/auth.py", line 199, in wrapped_rpc
+    response = await method(request, *args, **kwargs)
+  File "/home/borzunov/hivemind/hivemind/p2p/servicer.py", line 72, in caller
+    return await asyncio.wait_for(
+  File "/home/borzunov/anaconda3/lib/python3.8/asyncio/tasks.py", line 483, in wait_for
+    return fut.result()
+  File "/home/borzunov/hivemind/hivemind/p2p/p2p_daemon.py", line 374, in call_protobuf_handler
+    stream_info, reader, writer = await self._client.stream_open(peer_id, (handler_name,))
+  File "/home/borzunov/hivemind/hivemind/p2p/p2p_daemon_bindings/p2pclient.py", line 76, in stream_open
+    return await self.control.stream_open(peer_id=peer_id, protocols=protocols)
+  File "/home/borzunov/hivemind/hivemind/p2p/p2p_daemon_bindings/control.py", line 185, in stream_open
+    raise_if_failed(resp)
+  File "/home/borzunov/hivemind/hivemind/p2p/p2p_daemon_bindings/utils.py", line 60, in raise_if_failed
+    raise ControlFailure(f"Connect failed. msg={response.error.msg}")
+hivemind.p2p.p2p_daemon_bindings.utils.ControlFailure: Connect failed. msg=length greater than remaining number of bytes in buffer
+[2021/07/10 11:53:41.544][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e5d90> opens connection to /unix/tmp/hivemind-p2pd-VRUjGSy2BP8.sock
+[2021/07/10 11:53:41.545][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmYqP7ECXjtxaTi54eznbk2tgq9D43xpyL8hRYsksRMQ8Q addr=/ip4/127.0.0.1/tcp/34419 proto=DHTProtocol.rpc_store>
+[2021/07/10 11:53:41.546][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e5d90> opens connection to /unix/tmp/hivemind-p2pd-VRUjGSy2BP8.sock
+[2021/07/10 11:53:41.547][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmYqP7ECXjtxaTi54eznbk2tgq9D43xpyL8hRYsksRMQ8Q addr=/ip4/127.0.0.1/tcp/34419 proto=DHTProtocol.rpc_store>
+[2021/07/10 11:53:41.547][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe6623e5d90> opens connection to /unix/tmp/hivemind-p2pd-VRUjGSy2BP8.sock
+[2021/07/10 11:53:41.548][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmYqP7ECXjtxaTi54eznbk2tgq9D43xpyL8hRYsksRMQ8Q addr=/ip4/127.0.0.1/tcp/34419 proto=DHTProtocol.rpc_find>
+[2021/07/10 11:53:41.958][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.960][DEBUG][p2p.p2p_daemon._ping_daemon:193] Launched p2pd with id = QmfJuUCnS8PFAazbfgiExgpYGDCGnq5pBCQBSFZjVB71mE, host multiaddrs = (<Multiaddr /ip4/127.0.0.1/tcp/35705>,)
+[2021/07/10 11:53:41.961][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.962][DEBUG][p2p.p2p_daemon_bindings.control.listen:108] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.ControlClient object at 0x7fe66237f220> starts listening to /unix/tmp/hivemind-p2pclient-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.963][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.964][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.965][INFO][root.test_dht_protocol:81] Self id=DHTID(0x493ce245bb79ab0900ad291134df41125dfcf217)
+[2021/07/10 11:53:41.966][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.967][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmfJuUCnS8PFAazbfgiExgpYGDCGnq5pBCQBSFZjVB71mE addr=/ip4/127.0.0.1/tcp/35705 proto=DHTProtocol.rpc_ping>
+[2021/07/10 11:53:41.968][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.969][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmfJuUCnS8PFAazbfgiExgpYGDCGnq5pBCQBSFZjVB71mE addr=/ip4/127.0.0.1/tcp/35705 proto=DHTProtocol.rpc_store>
+[2021/07/10 11:53:41.970][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.971][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmfJuUCnS8PFAazbfgiExgpYGDCGnq5pBCQBSFZjVB71mE addr=/ip4/127.0.0.1/tcp/35705 proto=DHTProtocol.rpc_find>
+[2021/07/10 11:53:41.973][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.974][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmfJuUCnS8PFAazbfgiExgpYGDCGnq5pBCQBSFZjVB71mE addr=/ip4/127.0.0.1/tcp/35705 proto=DHTProtocol.rpc_find>
+[2021/07/10 11:53:41.975][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.975][ERROR][dht.protocol.call_find:278] DHTProtocol failed to find at fakeid
+Traceback (most recent call last):
+  File "/home/borzunov/hivemind/hivemind/dht/protocol.py", line 245, in call_find
+    response = await self.get_stub(peer).rpc_find(find_request, timeout=self.wait_timeout)
+  File "/home/borzunov/hivemind/hivemind/utils/auth.py", line 199, in wrapped_rpc
+    response = await method(request, *args, **kwargs)
+  File "/home/borzunov/hivemind/hivemind/p2p/servicer.py", line 72, in caller
+    return await asyncio.wait_for(
+  File "/home/borzunov/anaconda3/lib/python3.8/asyncio/tasks.py", line 483, in wait_for
+    return fut.result()
+  File "/home/borzunov/hivemind/hivemind/p2p/p2p_daemon.py", line 374, in call_protobuf_handler
+    stream_info, reader, writer = await self._client.stream_open(peer_id, (handler_name,))
+  File "/home/borzunov/hivemind/hivemind/p2p/p2p_daemon_bindings/p2pclient.py", line 76, in stream_open
+    return await self.control.stream_open(peer_id=peer_id, protocols=protocols)
+  File "/home/borzunov/hivemind/hivemind/p2p/p2p_daemon_bindings/control.py", line 185, in stream_open
+    raise_if_failed(resp)
+  File "/home/borzunov/hivemind/hivemind/p2p/p2p_daemon_bindings/utils.py", line 60, in raise_if_failed
+    raise ControlFailure(f"Connect failed. msg={response.error.msg}")
+hivemind.p2p.p2p_daemon_bindings.utils.ControlFailure: Connect failed. msg=length greater than remaining number of bytes in buffer
+[2021/07/10 11:53:41.976][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.977][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmfJuUCnS8PFAazbfgiExgpYGDCGnq5pBCQBSFZjVB71mE addr=/ip4/127.0.0.1/tcp/35705 proto=DHTProtocol.rpc_store>
+[2021/07/10 11:53:41.978][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.978][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmfJuUCnS8PFAazbfgiExgpYGDCGnq5pBCQBSFZjVB71mE addr=/ip4/127.0.0.1/tcp/35705 proto=DHTProtocol.rpc_store>
+[2021/07/10 11:53:41.979][DEBUG][p2p.p2p_daemon_bindings.control.open_connection:57] DaemonConnector <hivemind.p2p.p2p_daemon_bindings.control.DaemonConnector object at 0x7fe66237f0a0> opens connection to /unix/tmp/hivemind-p2pd-Pswf-De9YNA.sock
+[2021/07/10 11:53:41.980][DEBUG][p2p.p2p_daemon_bindings.control._handler:83] New incoming stream: <StreamInfo peer_id=QmfJuUCnS8PFAazbfgiExgpYGDCGnq5pBCQBSFZjVB71mE addr=/ip4/127.0.0.1/tcp/35705 proto=DHTProtocol.rpc_find>
+2021-07-10T11:53:41.981+0300	ERROR	p2pd	error accepting connection	{"error": "accept unix /tmp/hivemind-p2pd-Pswf-De9YNA.sock: use of closed network connection"}
+[2021/07/10 11:53:41.985][DEBUG][p2p.p2p_daemon._terminate:401] Terminated p2pd with id = QmfJuUCnS8PFAazbfgiExgpYGDCGnq5pBCQBSFZjVB71mE
+2021-07-10T11:53:41.985+0300	ERROR	p2pd	error accepting connection	{"error": "accept unix /tmp/hivemind-p2pd-B4tnhAq6U1U.sock: use of closed network connection"}
+[2021/07/10 11:53:41.988][DEBUG][p2p.p2p_daemon._terminate:401] Terminated p2pd with id = Qma6rFAHFdwecmQKqWtU59LRqGAeXDam1rLc1u5ueNbwXm
+[2021/07/10 11:53:41.988][DEBUG][p2p.p2p_daemon._terminate:401] Terminated p2pd with id = Qma6rFAHFdwecmQKqWtU59LRqGAeXDam1rLc1u5ueNbwXm
+[2021/07/10 11:53:41.989][INFO][root.shutdown:49] Finished peer id=DHTID(0xa7087d197c0f2758c6bff7256e3cb78c5152c137) maddrs=[<Multiaddr /ip4/127.0.0.1/tcp/41695/p2p/Qma6rFAHFdwecmQKqWtU59LRqGAeXDam1rLc1u5ueNbwXm>]
+[2021/07/10 11:53:41.989][DEBUG][p2p.p2p_daemon._terminate:401] Terminated p2pd with id = QmTqNtzhfJrChohykAkiZQ8xBJW8995EfQynqAf37BNCak
+[2021/07/10 11:53:41.989][DEBUG][p2p.p2p_daemon._terminate:401] Terminated p2pd with id = QmTqNtzhfJrChohykAkiZQ8xBJW8995EfQynqAf37BNCak
+[2021/07/10 11:53:41.990][INFO][root.shutdown:49] Finished peer id=DHTID(0x3db9c894718f96bdd909afcef13a545941e7e1e4) maddrs=[<Multiaddr /ip4/127.0.0.1/tcp/38633/p2p/QmTqNtzhfJrChohykAkiZQ8xBJW8995EfQynqAf37BNCak>]
+[2021/07/10 11:53:41.990][INFO][root.shutdown:49] Finished peer id=DHTID(0x3db9c894718f96bdd909afcef13a545941e7e1e4) maddrs=[<Multiaddr /ip4/127.0.0.1/tcp/38633/p2p/QmTqNtzhfJrChohykAkiZQ8xBJW8995EfQynqAf37BNCak>]
+[2021/07/10 11:53:42.033][DEBUG][p2p.p2p_daemon._terminate:401] Terminated p2pd with id = QmYqP7ECXjtxaTi54eznbk2tgq9D43xpyL8hRYsksRMQ8Q

+ 5 - 5
tests/test_p2p_servicer.py

@@ -25,7 +25,7 @@ async def test_unary_unary(server_client):
     server, client = server_client
     servicer = ExampleServicer()
     await servicer.add_p2p_handlers(server)
-    stub = servicer.get_stub(client, server.id)
+    stub = ExampleServicer.get_stub(client, server.id)
 
     assert await stub.rpc_square(test_pb2.TestRequest(number=10)) == test_pb2.TestResponse(number=100)
 
@@ -42,7 +42,7 @@ async def test_stream_unary(server_client):
     server, client = server_client
     servicer = ExampleServicer()
     await servicer.add_p2p_handlers(server)
-    stub = servicer.get_stub(client, server.id)
+    stub = ExampleServicer.get_stub(client, server.id)
 
     async def generate_requests() -> AsyncIterator[test_pb2.TestRequest]:
         for i in range(10):
@@ -63,7 +63,7 @@ async def test_unary_stream(server_client):
     server, client = server_client
     servicer = ExampleServicer()
     await servicer.add_p2p_handlers(server)
-    stub = servicer.get_stub(client, server.id)
+    stub = ExampleServicer.get_stub(client, server.id)
 
     i = 0
     async for item in stub.rpc_count(test_pb2.TestRequest(number=10)):
@@ -85,7 +85,7 @@ async def test_stream_stream(server_client):
     server, client = server_client
     servicer = ExampleServicer()
     await servicer.add_p2p_handlers(server)
-    stub = servicer.get_stub(client, server.id)
+    stub = ExampleServicer.get_stub(client, server.id)
 
     async def generate_requests() -> AsyncIterator[test_pb2.TestRequest]:
         for i in range(10):
@@ -134,7 +134,7 @@ async def test_unary_stream_cancel(server_client, cancel_reason):
 
         writer.close()
     elif cancel_reason == "close_generator":
-        stub = servicer.get_stub(client, server.id)
+        stub = ExampleServicer.get_stub(client, server.id)
         iter = stub.rpc_wait(test_pb2.TestRequest(number=10)).__aiter__()
 
         assert await iter.__anext__() == test_pb2.TestResponse(number=11)