123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import random
- import uuid
- from itertools import chain
- import numpy as np
- import hivemind
- from hivemind import LOCALHOST
- def test_store_get_experts():
- peers = [hivemind.DHT(start=True)]
- for i in range(10):
- neighbors_i = [f'{LOCALHOST}:{node.port}' for node in random.sample(peers, min(3, len(peers)))]
- peers.append(hivemind.DHT(initial_peers=neighbors_i, start=True))
- you: hivemind.dht.DHT = random.choice(peers)
- theguyshetoldyounottoworryabout: hivemind.dht.DHT = random.choice(peers)
- expert_uids = [str(uuid.uuid4()) for _ in range(110)]
- batch_size = 10
- for batch_start in range(0, len(expert_uids), batch_size):
- you.declare_experts(expert_uids[batch_start: batch_start + batch_size], 'localhost', 1234)
- found = theguyshetoldyounottoworryabout.get_experts(random.sample(expert_uids, 5) + ['foo', 'bar'])
- assert all(res is not None for res in found[:-2]), "Could not find some existing experts"
- assert all(res is None for res in found[-2:]), "Found non-existing experts"
- that_guys_expert, that_guys_port = str(uuid.uuid4()), random.randint(1000, 9999)
- theguyshetoldyounottoworryabout.declare_experts([that_guys_expert], f'that_host:{that_guys_port}')
- you_notfound, you_found = you.get_experts(['foobar', that_guys_expert])
- assert isinstance(you_found, hivemind.RemoteExpert)
- assert you_found.endpoint == f'that_host:{that_guys_port}'
- # test first_k_active
- assert list(theguyshetoldyounottoworryabout.first_k_active(expert_uids, k=10)) == expert_uids[:10]
- some_permuted_experts = random.sample(expert_uids, k=32)
- assert list(theguyshetoldyounottoworryabout.first_k_active(some_permuted_experts, k=32)) == some_permuted_experts
- assert list(theguyshetoldyounottoworryabout.first_k_active(some_permuted_experts, k=1)) == some_permuted_experts[:1]
- fake_and_real_experts = list(chain(*zip(
- [str(uuid.uuid4()) for _ in some_permuted_experts], some_permuted_experts)))
- assert list(theguyshetoldyounottoworryabout.first_k_active(fake_and_real_experts, k=9)) == some_permuted_experts[:9]
- for peer in peers:
- peer.shutdown()
- def test_beam_search(dht_size=20, total_experts=128, batch_size=32, initial_peers=3, beam_size=4, parallel_rpc=256,
- grid_dims=(32, 32, 32)):
- dht = []
- for i in range(dht_size):
- neighbors_i = [f'{LOCALHOST}:{node.port}' for node in random.sample(dht, min(initial_peers, len(dht)))]
- dht.append(hivemind.DHT(start=True, expiration=999999, initial_peers=neighbors_i, parallel_rpc=parallel_rpc))
- real_experts = sorted({
- 'expert.' + '.'.join([str(random.randint(0, dim - 1)) for dim in grid_dims])
- for _ in range(total_experts)
- })
- for batch_start in range(0, len(real_experts), batch_size):
- random.choice(dht).declare_experts(
- real_experts[batch_start: batch_start + batch_size], wait=True,
- endpoint=f"host{batch_start // batch_size}:{random.randint(0, 65536)}")
- neighbors_i = [f'{LOCALHOST}:{node.port}' for node in random.sample(dht, min(initial_peers, len(dht)))]
- you = hivemind.DHT(start=True, expiration=999999, initial_peers=neighbors_i, parallel_rpc=parallel_rpc)
- for i in range(50):
- topk_experts = you.find_best_experts('expert', [np.random.randn(dim) for dim in grid_dims], beam_size=beam_size)
- assert all(isinstance(e, hivemind.RemoteExpert) for e in topk_experts)
- assert len(topk_experts) == beam_size
- for i in range(10):
- batch_experts = you.batch_find_best_experts('expert', [np.random.randn(batch_size, dim) for dim in grid_dims],
- beam_size=beam_size)
- assert isinstance(batch_experts, list) and len(batch_experts) == batch_size
- assert all(isinstance(e, hivemind.RemoteExpert) for experts in batch_experts for e in experts)
- assert all(len(experts) == beam_size for experts in batch_experts)
- def test_first_k_active():
- node = hivemind.DHT(start=True)
- assert all(node.declare_experts(['e.1.2.3', 'e.1.2.4', 'e.3.4.5'], endpoint=f"{hivemind.LOCALHOST}:1337"))
- assert all(node.declare_experts(['e.2.1.1'], endpoint=f"{hivemind.LOCALHOST}:1338"))
- results = node.first_k_active(['e.0', 'e.1', 'e.2', 'e.3'], k=2)
- assert len(results) == 2 and next(iter(results.keys())) == 'e.1'
- assert results['e.1'].uid in ('e.1.2.3', 'e.1.2.4') and results['e.1'].endpoint == f"{hivemind.LOCALHOST}:1337"
- assert results['e.2'].uid == 'e.2.1.1' and results['e.2'].endpoint == f"{hivemind.LOCALHOST}:1338"
- results = node.first_k_active(['e', 'e.1', 'e.1.2', 'e.1.2.3'], k=10)
- assert len(results) == 4
- assert 'e' in results
- for k in ('e.1', 'e.1.2', 'e.1.2.3'):
- assert results[k].uid in ('e.1.2.3', 'e.1.2.4') and results[k].endpoint == f"{hivemind.LOCALHOST}:1337"
- def test_dht_single_node():
- node = hivemind.DHT(start=True)
- assert node.first_k_active(['e.3', 'e.2'], k=3) == {}
- assert node.get_experts(['e.3', 'e.2']) == [None, None]
- assert all(node.declare_experts(['e.1', 'e.2', 'e.3'], f"{hivemind.LOCALHOST}:1337"))
- for expert in node.get_experts(['e.3', 'e.2']):
- assert expert.endpoint == f"{hivemind.LOCALHOST}:1337"
- active_found = node.first_k_active(['e.0', 'e.1', 'e.3', 'e.5', 'e.2'], k=2)
- assert list(active_found.keys()) == ['e.1', 'e.3']
- assert all(expert.uid.startswith(prefix) for prefix, expert in active_found.items())
- assert all(node.declare_experts(['e.1', 'e.2', 'e.3'], f"{hivemind.LOCALHOST}:1337"))
- assert node.find_best_experts('e', [(0., 1., 2., 3., 4., 5., 6., 7., 8.)], beam_size=4)
|