123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- import asyncio
- import pytest
- import hivemind
- from concurrent.futures import CancelledError
- def test_mpfuture_result():
- f1, f2 = hivemind.MPFuture.make_pair()
- f1.set_result(321)
- assert f2.result() == 321
- assert f1.result() == 321
- for future in [f1, f2]:
- with pytest.raises(RuntimeError):
- future.set_result(123)
- with pytest.raises(RuntimeError):
- future.set_exception(ValueError())
- assert future.cancel() is False
- assert future.done() and not future.running() and not future.cancelled()
- f1, f2 = hivemind.MPFuture.make_pair()
- with pytest.raises(TimeoutError):
- f1.result(timeout=1e-3)
- f2.set_result(['abacaba', 123])
- assert f1.result() == ['abacaba', 123]
- def test_mpfuture_exception():
- f1, f2 = hivemind.MPFuture.make_pair()
- with pytest.raises(TimeoutError):
- f1.exception(timeout=1e-3)
- f2.set_exception(NotImplementedError())
- for future in [f1, f2]:
- assert isinstance(future.exception(), NotImplementedError)
- with pytest.raises(NotImplementedError):
- future.result()
- assert future.cancel() is False
- assert future.done() and not future.running() and not future.cancelled()
- def test_mpfuture_cancel():
- f1, f2 = hivemind.MPFuture.make_pair()
- assert not f2.cancelled()
- f1.cancel()
- for future in [f1, f2]:
- with pytest.raises(CancelledError):
- future.result()
- with pytest.raises(CancelledError):
- future.exception()
- with pytest.raises(RuntimeError):
- future.set_result(123)
- with pytest.raises(RuntimeError):
- future.set_exception(NotImplementedError)
- assert future.cancelled() and future.done() and not future.running()
- def test_mpfuture_status():
- f1, f2 = hivemind.MPFuture.make_pair()
- assert f1.set_running_or_notify_cancel() is True
- for future in [f1, f2]:
- assert future.running() and not future.done() and not future.cancelled()
- with pytest.raises(RuntimeError):
- future.set_running_or_notify_cancel()
- f2.cancel()
- for future in [f1, f2]:
- assert not future.running() and future.done() and future.cancelled()
- assert future.set_running_or_notify_cancel() is False
- f1, f2 = hivemind.MPFuture.make_pair()
- f1.cancel()
- for future in [f1, f2]:
- assert future.set_running_or_notify_cancel() is False
- def test_await_mpfuture():
- async def _run():
- # await result
- f1, f2 = hivemind.MPFuture.make_pair()
- async def wait_and_assign():
- assert f2.set_running_or_notify_cancel() is True
- await asyncio.sleep(0.1)
- f2.set_result((123, 'ololo'))
- asyncio.create_task(wait_and_assign())
- for future in [f1, f2]:
- res = await future
- assert res == (123, 'ololo')
- # await cancel
- f1, f2 = hivemind.MPFuture.make_pair()
- async def wait_and_cancel():
- await asyncio.sleep(0.1)
- f1.cancel()
- asyncio.create_task(wait_and_cancel())
- for future in [f1, f2]:
- with pytest.raises(CancelledError):
- await future
- # await exception
- f1, f2 = hivemind.MPFuture.make_pair()
- async def wait_and_raise():
- await asyncio.sleep(0.1)
- f1.set_exception(SystemError())
- asyncio.create_task(wait_and_raise())
- for future in [f1, f2]:
- with pytest.raises(SystemError):
- await future
- asyncio.new_event_loop().run_until_complete(_run())
|