timed_storage.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. """ A dictionary-like storage that stores items until a specified expiration time or up to a limited size """
  2. from __future__ import annotations
  3. import heapq
  4. import time
  5. from contextlib import contextmanager
  6. from dataclasses import dataclass
  7. from typing import Dict, Generic, Iterator, List, Optional, Tuple, TypeVar
  8. KeyType = TypeVar("KeyType")
  9. ValueType = TypeVar("ValueType")
  10. get_dht_time = time.time # a global (weakly synchronized) time
  11. MAX_DHT_TIME_DISCREPANCY_SECONDS = 3 # max allowed difference between get_dht_time for two DHT nodes
  12. DHTExpiration = float
  13. ROOT = 0
  14. @dataclass(init=True, repr=True, frozen=True)
  15. class ValueWithExpiration(Generic[ValueType]):
  16. value: ValueType
  17. expiration_time: DHTExpiration
  18. def __iter__(self):
  19. return iter((self.value, self.expiration_time))
  20. def __getitem__(self, item):
  21. if item == 0:
  22. return self.value
  23. elif item == 1:
  24. return self.expiration_time
  25. else:
  26. return getattr(self, item)
  27. def __eq__(self, item):
  28. if isinstance(item, ValueWithExpiration):
  29. return self.value == item.value and self.expiration_time == item.expiration_time
  30. elif isinstance(item, tuple):
  31. return tuple.__eq__((self.value, self.expiration_time), item)
  32. else:
  33. return False
  34. @dataclass(init=True, repr=True, order=True, frozen=True)
  35. class HeapEntry(Generic[KeyType]):
  36. expiration_time: DHTExpiration
  37. key: KeyType
  38. class TimedStorage(Generic[KeyType, ValueType]):
  39. """A dictionary that maintains up to :maxsize: key-value-expiration tuples until their expiration_time"""
  40. frozen = False # can be set to True. If true, do not remove outdated elements
  41. def __init__(self, maxsize: Optional[int] = None):
  42. self.maxsize = maxsize or float("inf")
  43. self.data: Dict[KeyType, ValueWithExpiration[ValueType]] = dict()
  44. self.expiration_heap: List[HeapEntry[KeyType]] = []
  45. self.key_to_heap: Dict[KeyType, HeapEntry[KeyType]] = dict()
  46. def _remove_outdated(self):
  47. while (
  48. not self.frozen
  49. and self.expiration_heap
  50. and (self.expiration_heap[ROOT].expiration_time < get_dht_time() or len(self.data) > self.maxsize)
  51. ):
  52. heap_entry = heapq.heappop(self.expiration_heap)
  53. if self.key_to_heap.get(heap_entry.key) == heap_entry:
  54. del self.data[heap_entry.key], self.key_to_heap[heap_entry.key]
  55. def store(self, key: KeyType, value: ValueType, expiration_time: DHTExpiration) -> bool:
  56. """
  57. Store a (key, value) pair locally at least until expiration_time. See class docstring for details.
  58. :returns: True if new value was stored, False it was rejected (current value is newer)
  59. """
  60. if expiration_time < get_dht_time() and not self.frozen:
  61. return False
  62. self.key_to_heap[key] = HeapEntry(expiration_time, key)
  63. heapq.heappush(self.expiration_heap, self.key_to_heap[key])
  64. if key in self.data:
  65. if self.data[key].expiration_time < expiration_time:
  66. self.data[key] = ValueWithExpiration(value, expiration_time)
  67. return True
  68. return False
  69. self.data[key] = ValueWithExpiration(value, expiration_time)
  70. self._remove_outdated()
  71. return True
  72. def get(self, key: KeyType) -> Optional[ValueWithExpiration[ValueType]]:
  73. """Get a value corresponding to a key if that (key, value) pair was previously stored under this key."""
  74. self._remove_outdated()
  75. if key in self.data:
  76. return self.data[key]
  77. return None
  78. def items(self) -> Iterator[Tuple[KeyType, ValueWithExpiration[ValueType]]]:
  79. """Iterate over (key, value, expiration_time) tuples stored in this storage"""
  80. self._remove_outdated()
  81. return ((key, value_and_expiration) for key, value_and_expiration in self.data.items())
  82. def top(self) -> Tuple[Optional[KeyType], Optional[ValueWithExpiration[ValueType]]]:
  83. """Return the entry with earliest expiration or None if there isn't any"""
  84. self._remove_outdated()
  85. if self.data:
  86. # skip leftover "ghost" entries until first real entry
  87. while self.key_to_heap.get(self.expiration_heap[ROOT].key) != self.expiration_heap[ROOT]:
  88. heapq.heappop(self.expiration_heap)
  89. top_key = self.expiration_heap[ROOT].key
  90. return top_key, self.data[top_key]
  91. return None, None
  92. def clear(self):
  93. self.data.clear()
  94. self.key_to_heap.clear()
  95. self.expiration_heap.clear()
  96. def __contains__(self, key: KeyType):
  97. self._remove_outdated()
  98. return key in self.data
  99. def __len__(self):
  100. self._remove_outdated()
  101. return len(self.data)
  102. def __delitem__(self, key: KeyType):
  103. if key in self.key_to_heap:
  104. del self.data[key], self.key_to_heap[key]
  105. # note: key may still be in self.expiration_heap, but it will not be used and eventually ._remove_outdated()
  106. def __bool__(self):
  107. return bool(self.data)
  108. def __repr__(self):
  109. return f"{self.__class__.__name__}({self.data})"
  110. @contextmanager
  111. def freeze(self):
  112. """Temporarily cease to ._remove_outdated() elements inside this context to ensure consistency"""
  113. prev_frozen, self.frozen = self.frozen, True
  114. try:
  115. yield self
  116. finally:
  117. self.frozen = prev_frozen