threading.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import time
  2. from concurrent.futures import Future, TimeoutError
  3. from itertools import count
  4. from threading import Thread, Event, Lock
  5. def run_in_background(func: callable, *args, **kwargs) -> Future:
  6. """ run f(*args, **kwargs) in background and return Future for its outputs """
  7. future = Future()
  8. def _run():
  9. try:
  10. future.set_result(func(*args, **kwargs))
  11. except Exception as e:
  12. future.set_exception(e)
  13. Thread(target=_run).start()
  14. return future
  15. def repeated(func: callable, n_times=None):
  16. """ A function that runs a :func: forever or for a specified number of times; use with run_run_in_background """
  17. def repeat():
  18. for i in count():
  19. if n_times is not None and i > n_times:
  20. break
  21. func()
  22. return repeat
  23. def add_event_callback(event: Event, callback, timeout=None):
  24. """ Add callback that will be executed asynchronously when event is set """
  25. return Thread(target=lambda: (event.wait(timeout), callback())).start()
  26. class CountdownEvent(Event):
  27. def __init__(self, count_to: int, initial=0):
  28. """ An event that must be incremented :count_to: times before it is considered set """
  29. super().__init__()
  30. self.value = initial
  31. self.count_to = count_to
  32. self.lock = Lock()
  33. self.increment(by=0) # trigger set/unset depending on initial value
  34. def increment(self, by=1):
  35. with self.lock:
  36. self.value += by
  37. if self.value >= self.count_to:
  38. super().set()
  39. else:
  40. super().clear()
  41. return self.value
  42. def clear(self):
  43. return self.increment(by=-self.value)
  44. def await_first(*events: Event, k=1, timeout=None):
  45. """
  46. wait until first k (default=1) events are set, return True if event was set fast
  47. # Note: after k successes we manually *set* all events to avoid memory leak.
  48. """
  49. events_done = CountdownEvent(count_to=k)
  50. for event in events:
  51. add_event_callback(event, callback=events_done.increment, timeout=timeout)
  52. if events_done.wait(timeout=timeout):
  53. [event.set() for event in events]
  54. return True
  55. else:
  56. raise TimeoutError()
  57. def run_and_await_k(jobs: callable, k, timeout_after_k=0, timeout_total=None):
  58. """
  59. Runs all :jobs: asynchronously, awaits for at least k of them to finish
  60. :param jobs: functions to call
  61. :param k: how many functions should finish
  62. :param timeout_after_k: after reaching k finished jobs, wait for this long before cancelling
  63. :param timeout_total: if specified, terminate cancel jobs after this many seconds
  64. :returns: a list of either results or exceptions for each job
  65. """
  66. assert k <= len(jobs)
  67. start_time = time.time()
  68. min_successful_jobs = CountdownEvent(count_to=k)
  69. max_failed_jobs = CountdownEvent(count_to=len(jobs) - k + 1)
  70. def _run_and_increment(run_job: callable):
  71. try:
  72. result = run_job()
  73. min_successful_jobs.increment()
  74. return result
  75. except Exception as e:
  76. max_failed_jobs.increment()
  77. return e
  78. def _run_and_await(run_job: callable):
  79. # call function asynchronously. Increment counter after finished
  80. future = run_in_background(_run_and_increment, run_job)
  81. try: # await for success counter to reach k OR for fail counter to reach n - k + 1
  82. await_first(min_successful_jobs, max_failed_jobs,
  83. timeout=None if timeout_total is None else timeout_total - time.time() + start_time)
  84. except TimeoutError as e: # counter didn't reach k jobs in timeout_total
  85. return future.result() if future.done() else e
  86. try: # await for subsequent jobs if asked to
  87. return future.result(timeout=timeout_after_k)
  88. except TimeoutError as e:
  89. future.cancel()
  90. return e
  91. except Exception as e: # job failed with exception. Ignore it.
  92. return e
  93. results = [run_in_background(_run_and_await, f) for f in jobs]
  94. results = [result.result() for result in results]
  95. if min_successful_jobs.is_set():
  96. return results
  97. elif max_failed_jobs.is_set():
  98. raise ValueError("Could not get enough results: too many jobs failed.")
  99. else:
  100. raise TimeoutError("Could not get enough results: reached timeout_total.")