storage.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from __future__ import annotations
  2. from typing import Optional, Union
  3. from hivemind.dht.routing import DHTID, BinaryDHTValue, Subkey
  4. from hivemind.utils.serializer import MSGPackSerializer
  5. from hivemind.utils.timed_storage import DHTExpiration, KeyType, TimedStorage, ValueType
  6. @MSGPackSerializer.ext_serializable(0x50)
  7. class DictionaryDHTValue(TimedStorage[Subkey, BinaryDHTValue]):
  8. """a dictionary-like DHT value type that maps sub-keys to values with individual expirations"""
  9. latest_expiration_time = float("-inf")
  10. def store(self, key: KeyType, value: ValueType, expiration_time: DHTExpiration) -> bool:
  11. self.latest_expiration_time = max(self.latest_expiration_time, expiration_time)
  12. return super().store(key, value, expiration_time)
  13. def packb(self) -> bytes:
  14. """custom behavior for MSGPackSerializer.dumps"""
  15. packed_items = [[key, value, expiration_time] for key, (value, expiration_time) in self.items()]
  16. return MSGPackSerializer.dumps([self.maxsize, self.latest_expiration_time, packed_items])
  17. @classmethod
  18. def unpackb(cls, raw: bytes) -> DictionaryDHTValue:
  19. maxsize, latest_expiration_time, items = MSGPackSerializer.loads(raw)
  20. with DictionaryDHTValue(maxsize).freeze() as new_dict:
  21. for key, value, expiration_time in items:
  22. new_dict.store(key, value, expiration_time)
  23. new_dict.latest_expiration_time = latest_expiration_time
  24. return new_dict
  25. class DHTLocalStorage(TimedStorage[DHTID, Union[BinaryDHTValue, DictionaryDHTValue]]):
  26. """A dictionary-like storage that can store binary values and/or nested dictionaries until expiration"""
  27. def store(
  28. self, key: DHTID, value: BinaryDHTValue, expiration_time: DHTExpiration, subkey: Optional[Subkey] = None
  29. ) -> bool:
  30. """
  31. Store a (key, value) pair locally at least until expiration_time. See class docstring for details.
  32. If subkey is not None, adds a subkey-value pair to a dictionary associated with :key: (see store_subkey below)
  33. :returns: True if new value was stored, False it was rejected (current value is newer)
  34. """
  35. if subkey is not None: # add one sub-key
  36. return self.store_subkey(key, subkey, value, expiration_time)
  37. else: # store regular key
  38. return super().store(key, value, expiration_time)
  39. def store_subkey(self, key: DHTID, subkey: Subkey, value: BinaryDHTValue, expiration_time: DHTExpiration) -> bool:
  40. """
  41. Save a (sub-key, value) into a dictionary associated with a given key.
  42. 1) if self[key] is empty, create a new dictionary and add sub-key there
  43. 2) if self[key] is a dictionary (DictionaryDHTValue), store {sub-key: value, expiration} to that storage
  44. 3) if self[key] is a normal value with smaller expiration time, overwrite it with a dictionary and add sub-key
  45. :returns: True if new entry was stored, False it was rejected (current value is newer)
  46. """
  47. previous_value, previous_expiration_time = self.get(key) or (b"", -float("inf"))
  48. if isinstance(previous_value, BinaryDHTValue) and expiration_time > previous_expiration_time:
  49. new_storage = DictionaryDHTValue()
  50. new_storage.store(subkey, value, expiration_time)
  51. return super().store(key, new_storage, new_storage.latest_expiration_time)
  52. elif isinstance(previous_value, DictionaryDHTValue):
  53. if expiration_time > previous_value.latest_expiration_time:
  54. super().store(key, previous_value, expiration_time) # refresh expiration time
  55. return previous_value.store(subkey, value, expiration_time)
  56. else:
  57. return False