performance_ema.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. import time
  2. from contextlib import contextmanager
  3. from threading import Lock
  4. from typing import Optional
  5. class PerformanceEMA:
  6. """
  7. A running estimate of performance (operations/sec) using adjusted exponential moving average
  8. :param alpha: Smoothing factor in range [0, 1], [default: 0.1].
  9. """
  10. def __init__(self, alpha: float = 0.1, eps: float = 1e-20, paused: bool = False):
  11. self.alpha, self.eps, self.num_updates = alpha, eps, 0
  12. self.ema_seconds_per_sample, self.samples_per_second = 0, eps
  13. self.timestamp = time.perf_counter()
  14. self.paused = paused
  15. self.lock = Lock()
  16. def update(self, task_size: float, interval: Optional[float] = None) -> float:
  17. """
  18. :param task_size: how many items were processed since last call
  19. :param interval: optionally provide the time delta it took to process this task
  20. :returns: current estimate of performance (samples per second), but at most
  21. """
  22. assert task_size > 0, f"Can't register processing {task_size} samples"
  23. if not self.paused:
  24. self.timestamp, old_timestamp = time.perf_counter(), self.timestamp
  25. interval = interval if interval is not None else self.timestamp - old_timestamp
  26. else:
  27. assert interval is not None, "If PerformanceEMA is paused, please specify the time interval"
  28. self.ema_seconds_per_sample = (
  29. self.alpha * interval / task_size + (1 - self.alpha) * self.ema_seconds_per_sample
  30. )
  31. self.num_updates += 1
  32. adjusted_seconds_per_sample = self.ema_seconds_per_sample / (1 - (1 - self.alpha) ** self.num_updates)
  33. self.samples_per_second = 1 / max(adjusted_seconds_per_sample, self.eps)
  34. return self.samples_per_second
  35. def reset_timer(self):
  36. """Reset the time since the last update so that the next task performance is counted from current time"""
  37. self.timestamp = time.perf_counter()
  38. @contextmanager
  39. def pause(self):
  40. """While inside this context, EMA will not count the time passed towards the performance estimate"""
  41. self.paused, was_paused = True, self.paused
  42. try:
  43. yield
  44. finally:
  45. self.paused = was_paused
  46. self.reset_timer()
  47. def __repr__(self):
  48. return f"{self.__class__.__name__}(ema={self.samples_per_second:.5f}, num_updates={self.num_updates})"
  49. @contextmanager
  50. def update_threadsafe(self, task_size: float):
  51. """
  52. Update the EMA throughput of a code that runs inside the context manager, supports multiple concurrent threads.
  53. :param task_size: how many items were processed since last call
  54. """
  55. start_timestamp = time.perf_counter()
  56. yield
  57. with self.lock:
  58. self.update(task_size, interval=time.perf_counter() - max(start_timestamp, self.timestamp))
  59. # note: we define interval as such to support two distinct scenarios:
  60. # (1) if this is the first call to measure_threadsafe after a pause, count time from entering this context
  61. # (2) if there are concurrent calls to measure_threadsafe, respect the timestamp updates from these calls