import asyncio import io from contextlib import AsyncExitStack import pytest from google.protobuf.message import EncodeError from multiaddr import Multiaddr, protocols from hivemind.p2p.p2p_daemon_bindings.control import ControlClient, DaemonConnector, parse_conn_protocol from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo from hivemind.p2p.p2p_daemon_bindings.utils import (ControlFailure, raise_if_failed, read_pbmsg_safe, read_unsigned_varint, write_pbmsg, write_unsigned_varint) from hivemind.proto import p2pd_pb2 as p2pd_pb from test_utils import make_p2pd_pair_ip4, connect_safe def test_raise_if_failed_raises(): resp = p2pd_pb.Response() resp.type = p2pd_pb.Response.ERROR with pytest.raises(ControlFailure): raise_if_failed(resp) def test_raise_if_failed_not_raises(): resp = p2pd_pb.Response() resp.type = p2pd_pb.Response.OK raise_if_failed(resp) PAIRS_INT_SERIALIZED_VALID = ( (0, b"\x00"), (1, b"\x01"), (128, b"\x80\x01"), (2 ** 32, b"\x80\x80\x80\x80\x10"), (2 ** 64 - 1, b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01"), ) PAIRS_INT_SERIALIZED_OVERFLOW = ( (2 ** 64, b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x02"), (2 ** 64 + 1, b"\x81\x80\x80\x80\x80\x80\x80\x80\x80\x02"), ( 2 ** 128, b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x80\x04", ), ) PEER_ID_STRING = "QmS5QmciTXXnCUCyxud5eWFenUMAmvAWSDa1c7dvdXRMZ7" PEER_ID_BYTES = b'\x12 7\x87F.[\xb5\xb1o\xe5*\xc7\xb9\xbb\x11:"Z|j2\x8ad\x1b\xa6\xe5