Prechádzať zdrojové kódy

log number semaphores

Denis Mazur 4 rokov pred
rodič
commit
aa80ca7c60
1 zmenil súbory, kde vykonal 24 pridanie a 0 odobranie
  1. 24 0
      hivemind/dht/protocol.py

+ 24 - 0
hivemind/dht/protocol.py

@@ -21,6 +21,9 @@ from hivemind.utils.timed_storage import (
 logger = get_logger(__name__)
 
 
+N_SEMAPHORES = 0
+
+
 class DHTProtocol(ServicerBase):
     # fmt:off
     p2p: P2P
@@ -101,10 +104,14 @@ class DHTProtocol(ServicerBase):
         """
         try:
             async with self.rpc_semaphore:
+                global N_SEMAPHORES
+                N_SEMAPHORES += 1
+                print(N_SEMAPHORES)
                 ping_request = dht_pb2.PingRequest(peer=self.node_info, validate=validate)
                 time_requested = get_dht_time()
                 response = await self.get_stub(peer).rpc_ping(ping_request, timeout=self.wait_timeout)
                 time_responded = get_dht_time()
+                N_SEMAPHORES -= 1
         except Exception as e:
             logger.debug(f"DHTProtocol failed to ping {peer}", exc_info=True)
             response = None
@@ -139,6 +146,8 @@ class DHTProtocol(ServicerBase):
     async def rpc_ping(self, request: dht_pb2.PingRequest, context: P2PContext) -> dht_pb2.PingResponse:
         """Some node wants us to add it to our routing table."""
 
+        await asyncio.sleep(0.1)
+
         response = dht_pb2.PingResponse(peer=self.node_info, dht_time=get_dht_time(), available=False)
 
         if request.peer and request.peer.node_id:
@@ -215,7 +224,13 @@ class DHTProtocol(ServicerBase):
         )
         try:
             async with self.rpc_semaphore:
+                global N_SEMAPHORES
+                N_SEMAPHORES += 1
+                print(N_SEMAPHORES)
+
                 response = await self.get_stub(peer).rpc_store(store_request, timeout=self.wait_timeout)
+
+                N_SEMAPHORES -=1
             if response.peer and response.peer.node_id:
                 peer_id = DHTID.from_bytes(response.peer.node_id)
                 asyncio.create_task(self.update_routing_table(peer_id, peer, responded=True))
@@ -227,6 +242,8 @@ class DHTProtocol(ServicerBase):
 
     async def rpc_store(self, request: dht_pb2.StoreRequest, context: P2PContext) -> dht_pb2.StoreResponse:
         """Some node wants us to store this (key, value) pair"""
+        await asyncio.sleep(0.1)
+
         if request.peer:  # if requested, add peer to the routing table
             asyncio.create_task(self.rpc_ping(dht_pb2.PingRequest(peer=request.peer), context))
         assert len(request.keys) == len(request.values) == len(request.expiration_time) == len(request.in_cache)
@@ -284,7 +301,13 @@ class DHTProtocol(ServicerBase):
         find_request = dht_pb2.FindRequest(keys=list(map(DHTID.to_bytes, keys)), peer=self.node_info)
         try:
             async with self.rpc_semaphore:
+                global N_SEMAPHORES
+                N_SEMAPHORES += 1
+                print(N_SEMAPHORES)
+
                 response = await self.get_stub(peer).rpc_find(find_request, timeout=self.wait_timeout)
+
+                N_SEMAPHORES -= 1
             if response.peer and response.peer.node_id:
                 peer_id = DHTID.from_bytes(response.peer.node_id)
                 asyncio.create_task(self.update_routing_table(peer_id, peer, responded=True))
@@ -330,6 +353,7 @@ class DHTProtocol(ServicerBase):
         Someone wants to find keys in the DHT. For all keys that we have locally, return value and expiration
         Also return :bucket_size: nearest neighbors from our routing table for each key (whether or not we found value)
         """
+        await asyncio.sleep(0.1)
         if request.peer:  # if requested, add peer to the routing table
             asyncio.create_task(self.rpc_ping(dht_pb2.PingRequest(peer=request.peer), context))