|
@@ -1,6 +1,7 @@
|
|
from __future__ import annotations
|
|
from __future__ import annotations
|
|
|
|
|
|
import asyncio
|
|
import asyncio
|
|
|
|
+import dataclasses
|
|
import random
|
|
import random
|
|
from collections import defaultdict, Counter
|
|
from collections import defaultdict, Counter
|
|
from dataclasses import dataclass, field
|
|
from dataclasses import dataclass, field
|
|
@@ -303,14 +304,17 @@ class DHTNode:
|
|
|
|
|
|
key_bytes = key_id.to_bytes()
|
|
key_bytes = key_id.to_bytes()
|
|
binary_values = []
|
|
binary_values = []
|
|
|
|
+ stored_records = []
|
|
for subkey, value, expiration_time in zip(
|
|
for subkey, value, expiration_time in zip(
|
|
current_subkeys, current_values, current_expirations):
|
|
current_subkeys, current_values, current_expirations):
|
|
|
|
+ subkey_bytes = self.protocol.serializer.dumps(subkey)
|
|
value_bytes = self.protocol.serializer.dumps(value)
|
|
value_bytes = self.protocol.serializer.dumps(value)
|
|
|
|
+ record = DHTRecord(key_bytes, subkey_bytes, value_bytes, expiration_time)
|
|
if self.protocol.record_validator is not None:
|
|
if self.protocol.record_validator is not None:
|
|
- subkey_bytes = self.protocol.serializer.dumps(subkey)
|
|
|
|
- record = DHTRecord(key_bytes, subkey_bytes, value_bytes, expiration_time)
|
|
|
|
value_bytes = self.protocol.record_validator.sign_value(record)
|
|
value_bytes = self.protocol.record_validator.sign_value(record)
|
|
|
|
+ record = dataclasses.replace(record, value=value_bytes)
|
|
binary_values.append(value_bytes)
|
|
binary_values.append(value_bytes)
|
|
|
|
+ stored_records.append(record)
|
|
|
|
|
|
while num_successful_stores < self.num_replicas and (store_candidates or pending_store_tasks):
|
|
while num_successful_stores < self.num_replicas and (store_candidates or pending_store_tasks):
|
|
while store_candidates and num_successful_stores + len(pending_store_tasks) < self.num_replicas:
|
|
while store_candidates and num_successful_stores + len(pending_store_tasks) < self.num_replicas:
|
|
@@ -318,9 +322,13 @@ class DHTNode:
|
|
|
|
|
|
if node_id == self.node_id:
|
|
if node_id == self.node_id:
|
|
num_successful_stores += 1
|
|
num_successful_stores += 1
|
|
- for subkey, value, expiration_time in zip(current_subkeys, binary_values, current_expirations):
|
|
|
|
- store_ok[original_key, subkey] = self.protocol.storage.store(
|
|
|
|
- key_id, value, expiration_time, subkey=subkey)
|
|
|
|
|
|
+ for subkey, record in zip(current_subkeys, stored_records):
|
|
|
|
+ if (self.protocol.record_validator is None or
|
|
|
|
+ self.protocol.record_validator.validate(record)):
|
|
|
|
+ store_ok[original_key, subkey] = self.protocol.storage.store(
|
|
|
|
+ key_id, record.value, record.expiration_time, subkey=subkey)
|
|
|
|
+ else:
|
|
|
|
+ store_ok[original_key, subkey] = False
|
|
if not await_all_replicas:
|
|
if not await_all_replicas:
|
|
store_finished_events[original_key, subkey].set()
|
|
store_finished_events[original_key, subkey].set()
|
|
else:
|
|
else:
|
|
@@ -364,15 +372,19 @@ class DHTNode:
|
|
store_succeeded = any(store_ok)
|
|
store_succeeded = any(store_ok)
|
|
is_dictionary = any(subkey is not None for subkey in subkeys)
|
|
is_dictionary = any(subkey is not None for subkey in subkeys)
|
|
if store_succeeded and not is_dictionary: # stored a new regular value, cache it!
|
|
if store_succeeded and not is_dictionary: # stored a new regular value, cache it!
|
|
- stored_value_bytes, stored_expiration = max(zip(binary_values, expirations), key=lambda p: p[1])
|
|
|
|
|
|
+ stored_expiration, stored_value_bytes = max(zip(expirations, binary_values))
|
|
self.protocol.cache.store(key_id, stored_value_bytes, stored_expiration)
|
|
self.protocol.cache.store(key_id, stored_value_bytes, stored_expiration)
|
|
elif not store_succeeded and not is_dictionary: # store rejected, check if local cache is also obsolete
|
|
elif not store_succeeded and not is_dictionary: # store rejected, check if local cache is also obsolete
|
|
- rejected_value, rejected_expiration = max(zip(binary_values, expirations), key=lambda p: p[1])
|
|
|
|
- if (self.protocol.cache.get(key_id)[1] or float("inf")) <= rejected_expiration: # cache would be rejected
|
|
|
|
|
|
+ rejected_expiration, rejected_value = max(zip(expirations, binary_values))
|
|
|
|
+ cached_value = self.protocol.cache.get(key_id)
|
|
|
|
+ if (cached_value is not None and
|
|
|
|
+ cached_value.expiration_time <= rejected_expiration): # cache would be rejected
|
|
self._schedule_for_refresh(key_id, refresh_time=get_dht_time()) # fetch new key in background (asap)
|
|
self._schedule_for_refresh(key_id, refresh_time=get_dht_time()) # fetch new key in background (asap)
|
|
elif is_dictionary and key_id in self.protocol.cache: # there can be other keys and we should update
|
|
elif is_dictionary and key_id in self.protocol.cache: # there can be other keys and we should update
|
|
- for subkey, stored_value_bytes, expiration_time in zip(subkeys, binary_values, expirations):
|
|
|
|
- self.protocol.cache.store_subkey(key_id, subkey, stored_value_bytes, expiration_time)
|
|
|
|
|
|
+ for subkey, stored_value_bytes, expiration_time, accepted in zip(
|
|
|
|
+ subkeys, binary_values, expirations, store_ok):
|
|
|
|
+ if accepted:
|
|
|
|
+ self.protocol.cache.store_subkey(key_id, subkey, stored_value_bytes, expiration_time)
|
|
self._schedule_for_refresh(key_id, refresh_time=get_dht_time()) # fetch new key in background (asap)
|
|
self._schedule_for_refresh(key_id, refresh_time=get_dht_time()) # fetch new key in background (asap)
|
|
|
|
|
|
async def get(self, key: DHTKey, latest=False, **kwargs) -> Optional[ValueWithExpiration[DHTValue]]:
|
|
async def get(self, key: DHTKey, latest=False, **kwargs) -> Optional[ValueWithExpiration[DHTValue]]:
|