from __future__ import annotations

import contextvars
import functools
import gc
import sys
import threading
import time
import types
import weakref
from contextlib import (
    AsyncExitStack,
    ExitStack,
    asynccontextmanager,
    contextmanager,
    suppress,
)
from math import inf, nan
from typing import TYPE_CHECKING, NoReturn, TypeVar
from unittest import mock

import outcome
import pytest
import sniffio

from ... import _core
from ..._threads import to_thread_run_sync
from ..._timeouts import fail_after, sleep
from ...testing import (
    Matcher,
    RaisesGroup,
    Sequencer,
    assert_checkpoints,
    wait_all_tasks_blocked,
)
from .._run import DEADLINE_HEAP_MIN_PRUNE_THRESHOLD, _count_context_run_tb_frames
from .tutil import (
    check_sequence_matches,
    create_asyncio_future_in_new_loop,
    gc_collect_harder,
    ignore_coroutine_never_awaited_warnings,
    restore_unraisablehook,
    slow,
)

if TYPE_CHECKING:
    from collections.abc import (
        AsyncGenerator,
        AsyncIterator,
        Awaitable,
        Callable,
        Generator,
    )

if sys.version_info < (3, 11):
    from exceptiongroup import BaseExceptionGroup, ExceptionGroup


T = TypeVar("T")


# slightly different from _timeouts.sleep_forever because it returns the value
# its rescheduled with, which is really only useful for tests of
# rescheduling...
async def sleep_forever() -> object:
    return await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED)


def not_none(x: T | None) -> T:
    """Assert that this object is not None.

    This is just to satisfy type checkers, if this ever fails the test is broken.
    """
    assert x is not None
    return x


def test_basic() -> None:
    async def trivial(x: T) -> T:
        return x

    assert _core.run(trivial, 8) == 8

    with pytest.raises(TypeError):
        # Missing an argument
        _core.run(trivial)  # type: ignore[arg-type]

    with pytest.raises(TypeError):
        # Not an async function
        _core.run(lambda: None)  # type: ignore

    async def trivial2(x: T) -> T:
        await _core.checkpoint()
        return x

    assert _core.run(trivial2, 1) == 1


def test_initial_task_error() -> None:
    async def main(x: object) -> NoReturn:
        raise ValueError(x)

    with pytest.raises(ValueError, match=r"^17$") as excinfo:
        _core.run(main, 17)
    assert excinfo.value.args == (17,)


def test_run_nesting() -> None:
    async def inception() -> None:
        async def main() -> None:  # pragma: no cover
            pass

        return _core.run(main)

    with pytest.raises(RuntimeError) as excinfo:
        _core.run(inception)
    assert "from inside" in str(excinfo.value)


async def test_nursery_warn_use_async_with() -> None:
    on = _core.open_nursery()
    with pytest.raises(RuntimeError) as excinfo:
        with on:  # type: ignore
            pass  # pragma: no cover
    excinfo.match(
        r"use 'async with open_nursery\(...\)', not 'with open_nursery\(...\)'",
    )

    # avoid unawaited coro.
    async with on:
        pass


async def test_nursery_main_block_error_basic() -> None:
    exc = ValueError("whoops")

    with RaisesGroup(Matcher(check=lambda e: e is exc)):
        async with _core.open_nursery():
            raise exc


async def test_child_crash_basic() -> None:
    my_exc = ValueError("uh oh")

    async def erroring() -> NoReturn:
        raise my_exc

    with RaisesGroup(Matcher(check=lambda e: e is my_exc)):
        # nursery.__aexit__ propagates exception from child back to parent
        async with _core.open_nursery() as nursery:
            nursery.start_soon(erroring)


async def test_basic_interleave() -> None:
    async def looper(whoami: str, record: list[tuple[str, int]]) -> None:
        for i in range(3):
            record.append((whoami, i))
            await _core.checkpoint()

    record: list[tuple[str, int]] = []
    async with _core.open_nursery() as nursery:
        nursery.start_soon(looper, "a", record)
        nursery.start_soon(looper, "b", record)

    check_sequence_matches(
        record,
        [{("a", 0), ("b", 0)}, {("a", 1), ("b", 1)}, {("a", 2), ("b", 2)}],
    )


def test_task_crash_propagation() -> None:
    looper_record: list[str] = []

    async def looper() -> None:
        try:
            while True:
                await _core.checkpoint()
        except _core.Cancelled:
            print("looper cancelled")
            looper_record.append("cancelled")

    async def crasher() -> NoReturn:
        raise ValueError("argh")

    async def main() -> None:
        async with _core.open_nursery() as nursery:
            nursery.start_soon(looper)
            nursery.start_soon(crasher)

    with RaisesGroup(Matcher(ValueError, "^argh$")):
        _core.run(main)

    assert looper_record == ["cancelled"]


def test_main_and_task_both_crash() -> None:
    # If main crashes and there's also a task crash, then we get both in an
    # ExceptionGroup
    async def crasher() -> NoReturn:
        raise ValueError

    async def main() -> NoReturn:
        async with _core.open_nursery() as nursery:
            nursery.start_soon(crasher)
            raise KeyError

    with RaisesGroup(ValueError, KeyError):
        _core.run(main)


def test_two_child_crashes() -> None:
    async def crasher(etype: type[Exception]) -> NoReturn:
        raise etype

    async def main() -> None:
        async with _core.open_nursery() as nursery:
            nursery.start_soon(crasher, KeyError)
            nursery.start_soon(crasher, ValueError)

    with RaisesGroup(ValueError, KeyError):
        _core.run(main)


async def test_child_crash_wakes_parent() -> None:
    async def crasher() -> NoReturn:
        raise ValueError("this is a crash")

    with RaisesGroup(Matcher(ValueError, "^this is a crash$")):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(crasher)
            await sleep_forever()


async def test_reschedule() -> None:
    t1: _core.Task | None = None
    t2: _core.Task | None = None

    async def child1() -> None:
        nonlocal t1, t2
        t1 = _core.current_task()
        print("child1 start")
        x = await sleep_forever()
        print("child1 woke")
        assert x == 0
        print("child1 rescheduling t2")
        _core.reschedule(not_none(t2), outcome.Error(ValueError("error message")))
        print("child1 exit")

    async def child2() -> None:
        nonlocal t1, t2
        print("child2 start")
        t2 = _core.current_task()
        _core.reschedule(not_none(t1), outcome.Value(0))
        print("child2 sleep")
        with pytest.raises(ValueError, match=r"^error message$"):
            await sleep_forever()
        print("child2 successful exit")

    async with _core.open_nursery() as nursery:
        nursery.start_soon(child1)
        # let t1 run and fall asleep
        await _core.checkpoint()
        nursery.start_soon(child2)


async def test_current_time() -> None:
    t1 = _core.current_time()
    # Windows clock is pretty low-resolution -- appveyor tests fail unless we
    # sleep for a bit here.
    time.sleep(time.get_clock_info("perf_counter").resolution)  # noqa: ASYNC251
    t2 = _core.current_time()
    assert t1 < t2


async def test_current_time_with_mock_clock(mock_clock: _core.MockClock) -> None:
    start = mock_clock.current_time()
    assert mock_clock.current_time() == _core.current_time()
    assert mock_clock.current_time() == _core.current_time()
    mock_clock.jump(3.15)
    assert start + 3.15 == mock_clock.current_time() == _core.current_time()


async def test_current_clock(mock_clock: _core.MockClock) -> None:
    assert mock_clock is _core.current_clock()


async def test_current_task() -> None:
    parent_task = _core.current_task()

    async def child() -> None:
        assert not_none(_core.current_task().parent_nursery).parent_task is parent_task

    async with _core.open_nursery() as nursery:
        nursery.start_soon(child)


async def test_root_task() -> None:
    root = not_none(_core.current_root_task())
    assert root.parent_nursery is root.eventual_parent_nursery is None


def test_out_of_context() -> None:
    with pytest.raises(RuntimeError):
        _core.current_task()
    with pytest.raises(RuntimeError):
        _core.current_time()


async def test_current_statistics(mock_clock: _core.MockClock) -> None:
    # Make sure all the early startup stuff has settled down
    await wait_all_tasks_blocked()

    # A child that sticks around to make some interesting stats:
    async def child() -> None:
        with suppress(_core.Cancelled):
            await sleep_forever()

    stats = _core.current_statistics()
    print(stats)
    # 2 system tasks + us
    assert stats.tasks_living == 3
    assert stats.run_sync_soon_queue_size == 0

    async with _core.open_nursery() as nursery:
        nursery.start_soon(child)
        await wait_all_tasks_blocked()
        token = _core.current_trio_token()
        token.run_sync_soon(lambda: None)
        token.run_sync_soon(lambda: None, idempotent=True)
        stats = _core.current_statistics()
        print(stats)
        # 2 system tasks + us + child
        assert stats.tasks_living == 4
        # the exact value here might shift if we change how we do accounting
        # (currently it only counts tasks that we already know will be
        # runnable on the next pass), but still useful to at least test the
        # difference between now and after we wake up the child:
        assert stats.tasks_runnable == 0
        assert stats.run_sync_soon_queue_size == 2

        nursery.cancel_scope.cancel()
        stats = _core.current_statistics()
        print(stats)
        assert stats.tasks_runnable == 1

    # Give the child a chance to die and the run_sync_soon a chance to clear
    await _core.checkpoint()
    await _core.checkpoint()

    with _core.CancelScope(deadline=_core.current_time() + 5):
        stats = _core.current_statistics()
        print(stats)
        assert stats.seconds_to_next_deadline == 5
    stats = _core.current_statistics()
    print(stats)
    assert stats.seconds_to_next_deadline == inf


async def test_cancel_scope_repr(mock_clock: _core.MockClock) -> None:
    scope = _core.CancelScope()
    assert "unbound" in repr(scope)
    with scope:
        assert "active" in repr(scope)
        scope.deadline = _core.current_time() - 1
        assert "deadline is 1.00 seconds ago" in repr(scope)
        scope.deadline = _core.current_time() + 10
        assert "deadline is 10.00 seconds from now" in repr(scope)
        # when not in async context, can't get the current time
        assert "deadline" not in await to_thread_run_sync(repr, scope)
        scope.cancel()
        assert "cancelled" in repr(scope)
    assert "exited" in repr(scope)


async def test_cancel_scope_validation() -> None:
    with pytest.raises(
        ValueError,
        match=r"^Cannot specify both a deadline and a relative deadline$",
    ):
        _core.CancelScope(deadline=7, relative_deadline=3)

    with pytest.raises(ValueError, match=r"^deadline must not be NaN$"):
        _core.CancelScope(deadline=nan)
    with pytest.raises(ValueError, match=r"^relative deadline must not be NaN$"):
        _core.CancelScope(relative_deadline=nan)

    with pytest.raises(ValueError, match=r"^timeout must be non-negative$"):
        _core.CancelScope(relative_deadline=-3)

    scope = _core.CancelScope()

    with pytest.raises(ValueError, match=r"^deadline must not be NaN$"):
        scope.deadline = nan
    with pytest.raises(ValueError, match=r"^relative deadline must not be NaN$"):
        scope.relative_deadline = nan

    with pytest.raises(ValueError, match=r"^relative deadline must be non-negative$"):
        scope.relative_deadline = -3
    scope.relative_deadline = 5
    assert scope.relative_deadline == 5

    # several related tests of CancelScope are implicitly handled by test_timeouts.py


def test_cancel_points() -> None:
    async def main1() -> None:
        with _core.CancelScope() as scope:
            await _core.checkpoint_if_cancelled()
            scope.cancel()
            with pytest.raises(_core.Cancelled):
                await _core.checkpoint_if_cancelled()

    _core.run(main1)

    async def main2() -> None:
        with _core.CancelScope() as scope:
            await _core.checkpoint()
            scope.cancel()
            with pytest.raises(_core.Cancelled):
                await _core.checkpoint()

    _core.run(main2)

    async def main3() -> None:
        with _core.CancelScope() as scope:
            scope.cancel()
            with pytest.raises(_core.Cancelled):
                await sleep_forever()

    _core.run(main3)

    async def main4() -> None:
        with _core.CancelScope() as scope:
            scope.cancel()
            await _core.cancel_shielded_checkpoint()
            await _core.cancel_shielded_checkpoint()
            with pytest.raises(_core.Cancelled):
                await _core.checkpoint()

    _core.run(main4)


async def test_cancel_edge_cases() -> None:
    with _core.CancelScope() as scope:
        # Two cancels in a row -- idempotent
        scope.cancel()
        scope.cancel()
        await _core.checkpoint()
    assert scope.cancel_called
    assert scope.cancelled_caught

    with _core.CancelScope() as scope:
        # Check level-triggering
        scope.cancel()
        with pytest.raises(_core.Cancelled):
            await sleep_forever()
        with pytest.raises(_core.Cancelled):
            await sleep_forever()


async def test_cancel_scope_exceptiongroup_filtering() -> None:
    async def crasher() -> NoReturn:
        raise KeyError

    # This is outside the outer scope, so all the Cancelled
    # exceptions should have been absorbed, leaving just a regular
    # KeyError from crasher(), wrapped in an ExceptionGroup
    with RaisesGroup(KeyError):
        with _core.CancelScope() as outer:
            # Since the outer scope became cancelled before the
            # nursery block exited, all cancellations inside the
            # nursery block continue propagating to reach the
            # outer scope.
            with RaisesGroup(
                _core.Cancelled,
                _core.Cancelled,
                _core.Cancelled,
                KeyError,
            ) as excinfo:
                async with _core.open_nursery() as nursery:
                    # Two children that get cancelled by the nursery scope
                    nursery.start_soon(sleep_forever)  # t1
                    nursery.start_soon(sleep_forever)  # t2
                    nursery.cancel_scope.cancel()
                    with _core.CancelScope(shield=True):
                        await wait_all_tasks_blocked()
                    # One child that gets cancelled by the outer scope
                    nursery.start_soon(sleep_forever)  # t3
                    outer.cancel()
                    # And one that raises a different error
                    nursery.start_soon(crasher)  # t4
                # and then our __aexit__ also receives an outer Cancelled
            # reraise the exception caught by RaisesGroup for the
            # CancelScope to handle
            raise excinfo.value


async def test_precancelled_task() -> None:
    # a task that gets spawned into an already-cancelled nursery should begin
    # execution (https://github.com/python-trio/trio/issues/41), but get a
    # cancelled error at its first blocking call.
    record: list[str] = []

    async def blocker() -> None:
        record.append("started")
        await sleep_forever()

    async with _core.open_nursery() as nursery:
        nursery.cancel_scope.cancel()
        nursery.start_soon(blocker)
    assert record == ["started"]


async def test_cancel_shielding() -> None:
    with _core.CancelScope() as outer:
        with _core.CancelScope() as inner:
            await _core.checkpoint()
            outer.cancel()
            with pytest.raises(_core.Cancelled):
                await _core.checkpoint()

            assert inner.shield is False
            with pytest.raises(TypeError):
                inner.shield = "hello"  # type: ignore
            assert inner.shield is False

            inner.shield = True
            assert inner.shield is True
            # shield protects us from 'outer'
            await _core.checkpoint()

            with _core.CancelScope() as innerest:
                innerest.cancel()
                # but it doesn't protect us from scope inside inner
                with pytest.raises(_core.Cancelled):
                    await _core.checkpoint()
            await _core.checkpoint()

            inner.shield = False
            # can disable shield again
            with pytest.raises(_core.Cancelled):
                await _core.checkpoint()

            # re-enable shield
            inner.shield = True
            await _core.checkpoint()
            # shield doesn't protect us from inner itself
            inner.cancel()
            # This should now raise, but be absorbed by the inner scope
            await _core.checkpoint()
        assert inner.cancelled_caught


# make sure that cancellation propagates immediately to all children
async def test_cancel_inheritance() -> None:
    record: set[str] = set()

    async def leaf(ident: str) -> None:
        try:
            await sleep_forever()
        except _core.Cancelled:
            record.add(ident)

    async def worker(ident: str) -> None:
        async with _core.open_nursery() as nursery:
            nursery.start_soon(leaf, ident + "-l1")
            nursery.start_soon(leaf, ident + "-l2")

    async with _core.open_nursery() as nursery:
        nursery.start_soon(worker, "w1")
        nursery.start_soon(worker, "w2")
        nursery.cancel_scope.cancel()

    assert record == {"w1-l1", "w1-l2", "w2-l1", "w2-l2"}


async def test_cancel_shield_abort() -> None:
    with _core.CancelScope() as outer:
        async with _core.open_nursery() as nursery:
            outer.cancel()
            nursery.cancel_scope.shield = True
            # The outer scope is cancelled, but this task is protected by the
            # shield, so it manages to get to sleep
            record = []

            async def sleeper() -> None:
                record.append("sleeping")
                try:
                    await sleep_forever()
                except _core.Cancelled:
                    record.append("cancelled")

            nursery.start_soon(sleeper)
            await wait_all_tasks_blocked()
            assert record == ["sleeping"]
            # now when we unshield, it should abort the sleep.
            nursery.cancel_scope.shield = False
            # wait for the task to finish before entering the nursery
            # __aexit__, because __aexit__ could make it spuriously look like
            # this worked by cancelling the nursery scope. (When originally
            # written, without these last few lines, the test spuriously
            # passed, even though shield assignment was buggy.)
            with _core.CancelScope(shield=True):
                await wait_all_tasks_blocked()
                assert record == ["sleeping", "cancelled"]


async def test_basic_timeout(mock_clock: _core.MockClock) -> None:
    start = _core.current_time()
    with _core.CancelScope() as scope:
        assert scope.deadline == inf
        scope.deadline = start + 1
        assert scope.deadline == start + 1
    assert not scope.cancel_called
    mock_clock.jump(2)
    await _core.checkpoint()
    await _core.checkpoint()
    await _core.checkpoint()
    assert not scope.cancel_called

    start = _core.current_time()
    with _core.CancelScope(deadline=start + 1) as scope:
        mock_clock.jump(2)
        await sleep_forever()
    # But then the scope swallowed the exception... but we can still see it
    # here:
    assert scope.cancel_called
    assert scope.cancelled_caught

    # changing deadline
    start = _core.current_time()
    with _core.CancelScope() as scope:
        await _core.checkpoint()
        scope.deadline = start + 10
        await _core.checkpoint()
        mock_clock.jump(5)
        await _core.checkpoint()
        scope.deadline = start + 1
        with pytest.raises(_core.Cancelled):
            await _core.checkpoint()
        with pytest.raises(_core.Cancelled):
            await _core.checkpoint()


async def test_cancel_scope_nesting() -> None:
    # Nested scopes: if two triggering at once, the outer one wins
    with _core.CancelScope() as scope1:
        with _core.CancelScope() as scope2:
            with _core.CancelScope() as scope3:
                scope3.cancel()
                scope2.cancel()
                await sleep_forever()
    assert scope3.cancel_called
    assert not scope3.cancelled_caught
    assert scope2.cancel_called
    assert scope2.cancelled_caught
    assert not scope1.cancel_called
    assert not scope1.cancelled_caught

    # shielding
    with _core.CancelScope() as scope1:
        with _core.CancelScope() as scope2:
            scope1.cancel()
            with pytest.raises(_core.Cancelled):
                await _core.checkpoint()
            with pytest.raises(_core.Cancelled):
                await _core.checkpoint()
            scope2.shield = True
            await _core.checkpoint()
            scope2.cancel()
            with pytest.raises(_core.Cancelled):
                await _core.checkpoint()

    # if a scope is pending, but then gets popped off the stack, then it
    # isn't delivered
    with _core.CancelScope() as scope:
        scope.cancel()
        await _core.cancel_shielded_checkpoint()
    await _core.checkpoint()
    assert not scope.cancelled_caught


# Regression test for https://github.com/python-trio/trio/issues/1175
async def test_unshield_while_cancel_propagating() -> None:
    with _core.CancelScope() as outer:
        with _core.CancelScope() as inner:
            outer.cancel()
            try:
                await _core.checkpoint()
            finally:
                inner.shield = True
    assert outer.cancelled_caught
    assert not inner.cancelled_caught


async def test_cancel_unbound() -> None:
    async def sleep_until_cancelled(scope: _core.CancelScope) -> None:
        with scope, fail_after(1):
            await sleep_forever()

    # Cancel before entry
    scope = _core.CancelScope()
    scope.cancel()
    async with _core.open_nursery() as nursery:
        nursery.start_soon(sleep_until_cancelled, scope)

    # Cancel after entry
    scope = _core.CancelScope()
    async with _core.open_nursery() as nursery:
        nursery.start_soon(sleep_until_cancelled, scope)
        await wait_all_tasks_blocked()
        scope.cancel()

    # Shield before entry
    scope = _core.CancelScope()
    scope.shield = True
    with _core.CancelScope() as outer, scope:
        outer.cancel()
        await _core.checkpoint()
        scope.shield = False
        with pytest.raises(_core.Cancelled):
            await _core.checkpoint()

    # Can't reuse
    with _core.CancelScope() as scope:
        await _core.checkpoint()
    scope.cancel()
    await _core.checkpoint()
    assert scope.cancel_called
    assert not scope.cancelled_caught
    with pytest.raises(RuntimeError) as exc_info:
        with scope:
            pass  # pragma: no cover
    assert "single 'with' block" in str(exc_info.value)

    # Can't reenter
    with _core.CancelScope() as scope:
        with pytest.raises(RuntimeError) as exc_info:
            with scope:
                pass  # pragma: no cover
        assert "single 'with' block" in str(exc_info.value)

    # Can't enter from multiple tasks simultaneously
    scope = _core.CancelScope()

    async def enter_scope() -> None:
        with scope:
            await sleep_forever()

    async with _core.open_nursery() as nursery:
        nursery.start_soon(enter_scope, name="this one")
        await wait_all_tasks_blocked()

        with pytest.raises(RuntimeError) as exc_info:
            with scope:
                pass  # pragma: no cover
        assert "single 'with' block" in str(exc_info.value)
        nursery.cancel_scope.cancel()

    # If not yet entered, cancel_called is true when the deadline has passed
    # even if cancel() hasn't been called yet
    scope = _core.CancelScope(deadline=_core.current_time() + 1)
    assert not scope.cancel_called
    scope.deadline -= 1
    assert scope.cancel_called
    scope.deadline += 1
    assert scope.cancel_called  # never become un-cancelled


async def test_cancel_scope_misnesting_1() -> None:
    outer = _core.CancelScope()
    inner = _core.CancelScope()
    with ExitStack() as stack:
        stack.enter_context(outer)
        with inner:
            with pytest.raises(RuntimeError, match="still within its child"):
                stack.close()
        # No further error is raised when exiting the inner context


async def test_cancel_scope_misnesting_2() -> None:
    # If there are other tasks inside the abandoned part of the cancel tree,
    # they get cancelled when the misnesting is detected
    async def task1() -> None:
        with pytest.raises(_core.Cancelled):
            await sleep_forever()

    # Even if inside another cancel scope
    async def task2() -> None:
        with _core.CancelScope():
            with pytest.raises(
                _core.Cancelled,
                match=r"^cancelled due to unknown with reason 'misnesting'$",
            ):
                await sleep_forever()

    with ExitStack() as stack:
        stack.enter_context(_core.CancelScope())
        async with _core.open_nursery() as nursery:
            nursery.start_soon(task1)
            nursery.start_soon(task2)
            await wait_all_tasks_blocked()
            with pytest.raises(RuntimeError, match="still within its child"):
                stack.close()

    # Variant that makes the child tasks direct children of the scope
    # that noticed the misnesting:
    nursery_mgr = _core.open_nursery()
    nursery = await nursery_mgr.__aenter__()
    try:
        nursery.start_soon(task1)
        nursery.start_soon(task2)
        nursery.start_soon(sleep_forever)
        await wait_all_tasks_blocked()
        nursery.cancel_scope.__exit__(None, None, None)
    finally:
        with pytest.raises(
            RuntimeError,
            match="which had already been exited",
        ) as exc_info:
            await nursery_mgr.__aexit__(*sys.exc_info())

    def no_context(exc: RuntimeError) -> bool:
        return exc.__context__ is None

    msg = "closed before the task exited"
    group = RaisesGroup(
        Matcher(RuntimeError, match=msg, check=no_context),
        Matcher(RuntimeError, match=msg, check=no_context),
        # sleep_forever
        Matcher(
            RuntimeError,
            match=msg,
            check=lambda x: isinstance(x.__context__, _core.Cancelled),
        ),
    )
    assert group.matches(exc_info.value.__context__)


async def test_cancel_scope_misnesting_3() -> None:
    # Trying to exit a cancel scope from an unrelated task raises an error
    # without affecting any state
    async def task3(task_status: _core.TaskStatus[_core.CancelScope]) -> None:
        with _core.CancelScope() as scope:
            task_status.started(scope)
            await sleep_forever()

    async with _core.open_nursery() as nursery:
        value = await nursery.start(task3)
        assert isinstance(value, _core.CancelScope)
        scope: _core.CancelScope = value
        with pytest.raises(RuntimeError, match="from unrelated"):
            scope.__exit__(None, None, None)
        scope.cancel()


# helper to check we're not outputting overly verbose tracebacks
def no_cause_or_context(e: BaseException) -> bool:
    return e.__cause__ is None and e.__context__ is None


async def test_nursery_misnest() -> None:
    # See https://github.com/python-trio/trio/issues/3298
    async def inner_func() -> None:
        inner_nursery = await inner_cm.__aenter__()
        inner_nursery.start_soon(sleep, 1)

    with pytest.RaisesGroup(
        pytest.RaisesExc(
            RuntimeError, match="Nursery stack corrupted", check=no_cause_or_context
        ),
        check=no_cause_or_context,
    ):
        async with _core.open_nursery() as outer_nursery:
            inner_cm = _core.open_nursery()
            outer_nursery.start_soon(inner_func)


def test_nursery_nested_child_misnest() -> None:
    # Note that this example does *not* raise an exception group.
    async def main() -> None:
        async with _core.open_nursery():
            inner_cm = _core.open_nursery()
            await inner_cm.__aenter__()

    with pytest.raises(RuntimeError, match="Nursery stack corrupted") as excinfo:
        _core.run(main)
    assert excinfo.value.__cause__ is None
    # This AssertionError is kind of redundant, but I don't think we want to remove
    # the assertion and don't think we care enough to suppress it in this specific case.
    assert pytest.RaisesExc(
        AssertionError, match="^Nursery misnesting detected!$"
    ).matches(excinfo.value.__context__)
    assert excinfo.value.__context__.__cause__ is None
    assert excinfo.value.__context__.__context__ is None


async def test_asyncexitstack_nursery_misnest() -> None:
    # This example is trickier than the above ones, and is the one that requires
    # special logic of abandoned nurseries to avoid nasty internal errors that masks
    # the RuntimeError.
    @asynccontextmanager
    async def asynccontextmanager_that_creates_a_nursery_internally() -> (
        AsyncGenerator[None]
    ):
        async with _core.open_nursery() as nursery:
            await nursery.start(started_sleeper)
            nursery.start_soon(unstarted_task)
            yield

    async def started_sleeper(task_status: _core.TaskStatus[None]) -> None:
        task_status.started()
        await sleep_forever()

    async def unstarted_task() -> None:
        await _core.checkpoint()

    with pytest.RaisesGroup(
        pytest.RaisesGroup(
            pytest.RaisesExc(
                RuntimeError, match="Nursery stack corrupted", check=no_cause_or_context
            ),
            check=no_cause_or_context,
        ),
        check=no_cause_or_context,
    ):
        async with AsyncExitStack() as stack, _core.open_nursery() as nursery:
            # The asynccontextmanager is going to create a nursery that outlives this nursery!
            nursery.start_soon(
                stack.enter_async_context,
                asynccontextmanager_that_creates_a_nursery_internally(),
            )


def test_asyncexitstack_nursery_misnest_cleanup() -> None:
    # We guarantee that abandoned tasks get to do cleanup *eventually*, but exceptions
    # are lost. With more effort it's possible we could reschedule child tasks to exit
    # promptly.
    finally_entered = []

    async def main() -> None:
        async def unstarted_task() -> None:
            try:
                await _core.checkpoint()
            finally:
                finally_entered.append(True)
                raise ValueError("this exception is lost")

        # rest of main() is ~identical to the above test
        @asynccontextmanager
        async def asynccontextmanager_that_creates_a_nursery_internally() -> (
            AsyncGenerator[None]
        ):
            async with _core.open_nursery() as nursery:
                nursery.start_soon(unstarted_task)
                yield

        with pytest.RaisesGroup(
            pytest.RaisesGroup(
                pytest.RaisesExc(
                    RuntimeError,
                    match="Nursery stack corrupted",
                    check=no_cause_or_context,
                ),
                check=no_cause_or_context,
            ),
            check=no_cause_or_context,
        ):
            async with AsyncExitStack() as stack, _core.open_nursery() as nursery:
                # The asynccontextmanager is going to create a nursery that outlives this nursery!
                nursery.start_soon(
                    stack.enter_async_context,
                    asynccontextmanager_that_creates_a_nursery_internally(),
                )
        assert not finally_entered  # abandoned task still hasn't been cleaned up

    _core.run(main)
    assert finally_entered  # now it has


@slow
async def test_timekeeping() -> None:
    # probably a good idea to use a real clock for *one* test anyway...
    TARGET = 1.0
    # give it a few tries in case of random CI server flakiness
    for _ in range(4):
        real_start = time.perf_counter()
        with _core.CancelScope() as scope:
            scope.deadline = _core.current_time() + TARGET
            await sleep_forever()
        real_duration = time.perf_counter() - real_start
        accuracy = real_duration / TARGET
        print(accuracy)
        # Actual time elapsed should always be >= target time
        # (== is possible depending on system behavior for time.perf_counter resolution
        if 1.0 <= accuracy < 2:  # pragma: no branch
            break
    else:  # pragma: no cover
        raise AssertionError()


async def test_failed_abort() -> None:
    stubborn_task: _core.Task | None = None
    stubborn_scope: _core.CancelScope | None = None
    record: list[str] = []

    async def stubborn_sleeper() -> None:
        nonlocal stubborn_task, stubborn_scope
        stubborn_task = _core.current_task()
        with _core.CancelScope() as scope:
            stubborn_scope = scope
            record.append("sleep")
            x = await _core.wait_task_rescheduled(lambda _: _core.Abort.FAILED)
            assert x == 1
            record.append("woke")
            try:
                await _core.checkpoint_if_cancelled()
            except _core.Cancelled:
                record.append("cancelled")

    async with _core.open_nursery() as nursery:
        nursery.start_soon(stubborn_sleeper)
        await wait_all_tasks_blocked()
        assert record == ["sleep"]
        not_none(stubborn_scope).cancel()
        await wait_all_tasks_blocked()
        # cancel didn't wake it up
        assert record == ["sleep"]
        # wake it up again by hand
        _core.reschedule(not_none(stubborn_task), outcome.Value(1))
    assert record == ["sleep", "woke", "cancelled"]


@restore_unraisablehook()
def test_broken_abort() -> None:
    async def main() -> None:
        # These yields are here to work around an annoying warning -- we're
        # going to crash the main loop, and if we (by chance) do this before
        # the run_sync_soon task runs for the first time, then Python gives us
        # a spurious warning about it not being awaited. (I mean, the warning
        # is correct, but here we're testing our ability to deliver a
        # semi-meaningful error after things have gone totally pear-shaped, so
        # it's not relevant.) By letting the run_sync_soon_task run first, we
        # avoid the warning.
        await _core.checkpoint()
        await _core.checkpoint()
        with _core.CancelScope() as scope:
            scope.cancel()
            # None is not a legal return value here
            await _core.wait_task_rescheduled(lambda _: None)  # type: ignore

    with pytest.raises(_core.TrioInternalError):
        _core.run(main)

    # Because this crashes, various __del__ methods print complaints on
    # stderr. Make sure that they get run now, so the output is attached to
    # this test.
    gc_collect_harder()


# This segfaults, so we need to skipif. Remember to remove the skipif once
# the upstream issue is resolved.
@restore_unraisablehook()
@pytest.mark.skipif(
    sys.version_info[:3] == (3, 14, 0),
    reason="https://github.com/python/cpython/issues/133932",
)
def test_error_in_run_loop() -> None:
    # Blow stuff up real good to check we at least get a TrioInternalError
    async def main() -> None:
        task = _core.current_task()
        task._schedule_points = "hello!"  # type: ignore
        await _core.checkpoint()

    with ignore_coroutine_never_awaited_warnings():
        with pytest.raises(_core.TrioInternalError):
            _core.run(main)


async def test_spawn_system_task() -> None:
    record: list[tuple[str, int]] = []

    async def system_task(x: int) -> None:
        record.append(("x", x))
        record.append(("ki", _core.currently_ki_protected()))
        await _core.checkpoint()

    _core.spawn_system_task(system_task, 1)
    await wait_all_tasks_blocked()
    assert record == [("x", 1), ("ki", True)]


# intentionally make a system task crash
def test_system_task_crash() -> None:
    async def crasher() -> NoReturn:
        raise KeyError

    async def main() -> None:
        _core.spawn_system_task(crasher)
        await sleep_forever()

    with pytest.raises(_core.TrioInternalError):
        _core.run(main)


def test_system_task_crash_ExceptionGroup() -> None:
    async def crasher1() -> NoReturn:
        raise KeyError

    async def crasher2() -> NoReturn:
        raise ValueError

    async def system_task() -> None:
        async with _core.open_nursery() as nursery:
            nursery.start_soon(crasher1)
            nursery.start_soon(crasher2)

    async def main() -> None:
        _core.spawn_system_task(system_task)
        await sleep_forever()

    # TrioInternalError is not wrapped
    with pytest.raises(_core.TrioInternalError) as excinfo:
        _core.run(main)

    # the first exceptiongroup is from the first nursery opened in Runner.init()
    # the second exceptiongroup is from the second nursery opened in Runner.init()
    # the third exceptongroup is from the nursery defined in `system_task` above
    assert RaisesGroup(RaisesGroup(RaisesGroup(KeyError, ValueError))).matches(
        excinfo.value.__cause__,
    )


def test_system_task_crash_plus_Cancelled() -> None:
    # Set up a situation where a system task crashes with a
    # ExceptionGroup([Cancelled, ValueError])
    async def crasher() -> None:
        try:
            await sleep_forever()
        except _core.Cancelled:
            raise ValueError from None

    async def cancelme() -> None:
        await sleep_forever()

    async def system_task() -> None:
        async with _core.open_nursery() as nursery:
            nursery.start_soon(crasher)
            nursery.start_soon(cancelme)

    async def main() -> None:
        _core.spawn_system_task(system_task)
        # then we exit, triggering a cancellation

    with pytest.raises(_core.TrioInternalError) as excinfo:
        _core.run(main)

    # See explanation for triple-wrap in test_system_task_crash_ExceptionGroup
    assert RaisesGroup(RaisesGroup(RaisesGroup(ValueError))).matches(
        excinfo.value.__cause__,
    )


def test_system_task_crash_KeyboardInterrupt() -> None:
    async def ki() -> NoReturn:
        raise KeyboardInterrupt

    async def main() -> None:
        _core.spawn_system_task(ki)
        await sleep_forever()

    with pytest.raises(_core.TrioInternalError) as excinfo:
        _core.run(main)
    # "Only" double-wrapped since ki() doesn't create an exceptiongroup
    assert RaisesGroup(RaisesGroup(KeyboardInterrupt)).matches(excinfo.value.__cause__)


# This used to fail because checkpoint was a yield followed by an immediate
# reschedule. So we had:
# 1) this task yields
# 2) this task is rescheduled
# ...
# 3) next iteration of event loop starts, runs timeouts
# 4) this task has timed out
# 5) ...but it's on the run queue, so the timeout is queued to be delivered
#    the next time that it's blocked.
async def test_yield_briefly_checks_for_timeout(mock_clock: _core.MockClock) -> None:
    with _core.CancelScope(deadline=_core.current_time() + 5):
        await _core.checkpoint()
        mock_clock.jump(10)
        with pytest.raises(_core.Cancelled):
            await _core.checkpoint()


# This tests that sys.exc_info is properly saved/restored as we swap between
# tasks. It turns out that the interpreter automagically handles this for us
# so there's no special code in Trio required to pass this test, but it's
# still nice to know that it works :-).
#
# Update: it turns out I was right to be nervous! see the next test...
async def test_exc_info() -> None:
    record: list[str] = []
    seq = Sequencer()

    async def child1() -> None:
        async with seq(0):
            pass  # we don't yield until seq(2) below
        record.append("child1 raise")
        with pytest.raises(ValueError, match=r"^child1$") as excinfo:
            try:
                raise ValueError("child1")
            except ValueError:
                record.append("child1 sleep")
                async with seq(2):
                    pass
                assert "child2 wake" in record
                record.append("child1 re-raise")
                raise
        assert excinfo.value.__context__ is None
        record.append("child1 success")

    async def child2() -> None:
        async with seq(1):
            pass  # we don't yield until seq(3) below
        assert "child1 sleep" in record
        record.append("child2 wake")
        assert sys.exc_info() == (None, None, None)
        with pytest.raises(KeyError) as excinfo:
            try:
                raise KeyError("child2")
            except KeyError:
                record.append("child2 sleep again")
                async with seq(3):
                    pass
                assert "child1 re-raise" in record
                record.append("child2 re-raise")
                raise
        assert excinfo.value.__context__ is None
        record.append("child2 success")

    async with _core.open_nursery() as nursery:
        nursery.start_soon(child1)
        nursery.start_soon(child2)

    assert record == [
        "child1 raise",
        "child1 sleep",
        "child2 wake",
        "child2 sleep again",
        "child1 re-raise",
        "child1 success",
        "child2 re-raise",
        "child2 success",
    ]


# On all CPython versions (at time of writing), using .throw() to raise an
# exception inside a coroutine/generator can cause the original `exc_info` state
# to be lost, so things like re-raising and exception chaining are broken unless
# Trio implements its workaround. At time of writing, CPython main (3.13-dev)
# and every CPython release (excluding releases for old Python versions not
# supported by Trio) is affected (albeit in differing ways).
#
# If the `ValueError()` gets sent in via `throw` and is suppressed, then CPython
# loses track of the original `exc_info`:
#   https://bugs.python.org/issue25612 (Example 1)
#   https://bugs.python.org/issue29587 (Example 2)
# This is fixed in CPython >= 3.7.
async def test_exc_info_after_throw_suppressed() -> None:
    child_task: _core.Task | None = None

    async def child() -> None:
        nonlocal child_task
        child_task = _core.current_task()

        try:
            raise KeyError
        except KeyError:
            with suppress(ValueError):
                await sleep_forever()
            raise

    with RaisesGroup(Matcher(KeyError, check=lambda e: e.__context__ is None)):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(child)
            await wait_all_tasks_blocked()
            _core.reschedule(not_none(child_task), outcome.Error(ValueError()))


# Similar to previous test -- if the `ValueError()` gets sent in via 'throw' and
# propagates out, then CPython doesn't set its `__context__` so normal implicit
# exception chaining is broken:
#   https://bugs.python.org/issue25612 (Example 2)
#   https://bugs.python.org/issue25683
#   https://bugs.python.org/issue29587 (Example 1)
# This is fixed in CPython >= 3.9.
async def test_exception_chaining_after_throw() -> None:
    child_task: _core.Task | None = None

    async def child() -> None:
        nonlocal child_task
        child_task = _core.current_task()

        try:
            raise KeyError
        except KeyError:
            await sleep_forever()

    with RaisesGroup(
        Matcher(
            ValueError,
            "error text",
            lambda e: isinstance(e.__context__, KeyError),
        ),
    ):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(child)
            await wait_all_tasks_blocked()
            _core.reschedule(
                not_none(child_task),
                outcome.Error(ValueError("error text")),
            )


# Similar to previous tests -- if the `ValueError()` gets sent into an inner
# `await` via 'throw' and is suppressed there, then CPython loses track of
# `exc_info` in the inner coroutine:
#   https://github.com/python/cpython/issues/108668
# This is unfixed in CPython at time of writing.
async def test_exc_info_after_throw_to_inner_suppressed() -> None:
    child_task: _core.Task | None = None

    async def child() -> None:
        nonlocal child_task
        child_task = _core.current_task()

        try:
            raise KeyError
        except KeyError as exc:
            await inner(exc)
            raise

    async def inner(exc: BaseException) -> None:
        with suppress(ValueError):
            await sleep_forever()
        assert not_none(sys.exc_info()[1]) is exc

    with RaisesGroup(Matcher(KeyError, check=lambda e: e.__context__ is None)):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(child)
            await wait_all_tasks_blocked()
            _core.reschedule(not_none(child_task), outcome.Error(ValueError()))


# Similar to previous tests -- if the `ValueError()` gets sent into an inner
# `await` via `throw` and propagates out, then CPython incorrectly sets its
# `__context__` so normal implicit exception chaining is broken:
#   https://bugs.python.org/issue40694
# This is unfixed in CPython at time of writing.
async def test_exception_chaining_after_throw_to_inner() -> None:
    child_task: _core.Task | None = None

    async def child() -> None:
        nonlocal child_task
        child_task = _core.current_task()

        try:
            raise KeyError
        except KeyError:
            await inner()

    async def inner() -> None:
        try:
            raise IndexError
        except IndexError:
            await sleep_forever()

    with RaisesGroup(
        Matcher(
            ValueError,
            "^Unique Text$",
            lambda e: isinstance(e.__context__, IndexError)
            and isinstance(e.__context__.__context__, KeyError),
        ),
    ):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(child)
            await wait_all_tasks_blocked()
            _core.reschedule(
                not_none(child_task),
                outcome.Error(ValueError("Unique Text")),
            )


async def test_nursery_exception_chaining_doesnt_make_context_loops() -> None:
    async def crasher() -> NoReturn:
        raise KeyError

    # the ExceptionGroup should not have the KeyError or ValueError as context
    with RaisesGroup(ValueError, KeyError, check=lambda x: x.__context__ is None):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(crasher)
            raise ValueError


def test_TrioToken_identity() -> None:
    async def get_and_check_token() -> _core.TrioToken:
        token = _core.current_trio_token()
        # Two calls in the same run give the same object
        assert token is _core.current_trio_token()
        return token

    t1 = _core.run(get_and_check_token)
    t2 = _core.run(get_and_check_token)
    assert t1 is not t2
    assert t1 != t2
    assert hash(t1) != hash(t2)


async def test_TrioToken_run_sync_soon_basic() -> None:
    record: list[tuple[str, int]] = []

    def cb(x: int) -> None:
        record.append(("cb", x))

    token = _core.current_trio_token()
    token.run_sync_soon(cb, 1)
    assert not record
    await wait_all_tasks_blocked()
    assert record == [("cb", 1)]


def test_TrioToken_run_sync_soon_too_late() -> None:
    token: _core.TrioToken | None = None

    async def main() -> None:
        nonlocal token
        token = _core.current_trio_token()

    _core.run(main)
    with pytest.raises(_core.RunFinishedError):
        not_none(token).run_sync_soon(lambda: None)  # pragma: no branch


async def test_TrioToken_run_sync_soon_idempotent() -> None:
    record: list[int] = []

    def cb(x: int) -> None:
        record.append(x)

    token = _core.current_trio_token()
    token.run_sync_soon(cb, 1)
    token.run_sync_soon(cb, 1, idempotent=True)
    token.run_sync_soon(cb, 1, idempotent=True)
    token.run_sync_soon(cb, 1, idempotent=True)
    token.run_sync_soon(cb, 2, idempotent=True)
    token.run_sync_soon(cb, 2, idempotent=True)
    await wait_all_tasks_blocked()
    assert len(record) == 3
    assert sorted(record) == [1, 1, 2]

    # ordering test
    record = []
    for _ in range(3):
        for i in range(100):
            token.run_sync_soon(cb, i, idempotent=True)
    await wait_all_tasks_blocked()
    # We guarantee FIFO
    assert record == list(range(100))


def test_TrioToken_run_sync_soon_idempotent_requeue() -> None:
    # We guarantee that if a call has finished, queueing it again will call it
    # again. Due to the lack of synchronization, this effectively means that
    # we have to guarantee that once a call has *started*, queueing it again
    # will call it again. Also this is much easier to test :-)
    record: list[None] = []

    def redo(token: _core.TrioToken) -> None:
        record.append(None)
        with suppress(_core.RunFinishedError):
            token.run_sync_soon(redo, token, idempotent=True)

    async def main() -> None:
        token = _core.current_trio_token()
        token.run_sync_soon(redo, token, idempotent=True)
        await _core.checkpoint()
        await _core.checkpoint()
        await _core.checkpoint()

    _core.run(main)

    assert len(record) >= 2


def test_TrioToken_run_sync_soon_after_main_crash() -> None:
    record: list[str] = []

    async def main() -> None:
        token = _core.current_trio_token()
        # After main exits but before finally cleaning up, callback processed
        # normally
        token.run_sync_soon(lambda: record.append("sync-cb"))
        raise ValueError("error text")

    with pytest.raises(ValueError, match=r"^error text$"):
        _core.run(main)

    assert record == ["sync-cb"]


def test_TrioToken_run_sync_soon_crashes() -> None:
    record: set[str] = set()

    async def main() -> None:
        token = _core.current_trio_token()
        token.run_sync_soon(lambda: {}["nope"])  # type: ignore[index]
        # check that a crashing run_sync_soon callback doesn't stop further
        # calls to run_sync_soon
        token.run_sync_soon(lambda: record.add("2nd run_sync_soon ran"))
        try:
            await sleep_forever()
        except _core.Cancelled:
            record.add("cancelled!")

    with pytest.raises(_core.TrioInternalError) as excinfo:
        _core.run(main)
    # the first exceptiongroup is from the first nursery opened in Runner.init()
    # the second exceptiongroup is from the second nursery opened in Runner.init()
    assert RaisesGroup(RaisesGroup(KeyError)).matches(excinfo.value.__cause__)
    assert record == {"2nd run_sync_soon ran", "cancelled!"}


async def test_TrioToken_run_sync_soon_FIFO() -> None:
    N = 100
    record = []
    token = _core.current_trio_token()
    for i in range(N):
        token.run_sync_soon(lambda j: record.append(j), i)
    await wait_all_tasks_blocked()
    assert record == list(range(N))


def test_TrioToken_run_sync_soon_starvation_resistance() -> None:
    # Even if we push callbacks in from callbacks, so that the callback queue
    # never empties out, then we still can't starve out other tasks from
    # running.
    token: _core.TrioToken | None = None
    record: list[tuple[str, int]] = []

    def naughty_cb(i: int) -> None:
        try:
            not_none(token).run_sync_soon(naughty_cb, i + 1)
        except _core.RunFinishedError:
            record.append(("run finished", i))

    async def main() -> None:
        nonlocal token
        token = _core.current_trio_token()
        token.run_sync_soon(naughty_cb, 0)
        record.append(("starting", 0))
        for _ in range(20):
            await _core.checkpoint()

    _core.run(main)
    assert len(record) == 2
    assert record[0] == ("starting", 0)
    assert record[1][0] == "run finished"
    assert record[1][1] >= 19


def test_TrioToken_run_sync_soon_threaded_stress_test() -> None:
    cb_counter = 0

    def cb() -> None:
        nonlocal cb_counter
        cb_counter += 1

    def stress_thread(token: _core.TrioToken) -> None:
        try:
            while True:
                token.run_sync_soon(cb)
                time.sleep(0)
        except _core.RunFinishedError:
            pass

    async def main() -> None:
        token = _core.current_trio_token()
        thread = threading.Thread(target=stress_thread, args=(token,))
        thread.start()
        for _ in range(10):
            start_value = cb_counter
            while cb_counter == start_value:
                await sleep(0.01)

    _core.run(main)
    print(cb_counter)


async def test_TrioToken_run_sync_soon_massive_queue() -> None:
    # There are edge cases in the wakeup fd code when the wakeup fd overflows,
    # so let's try to make that happen. This is also just a good stress test
    # in general. (With the current-as-of-2017-02-14 code using a socketpair
    # with minimal buffer, Linux takes 6 wakeups to fill the buffer and macOS
    # takes 1 wakeup. So 1000 is overkill if anything. Windows OTOH takes
    # ~600,000 wakeups, but has the same code paths...)
    COUNT = 1000
    token = _core.current_trio_token()
    counter = [0]

    def cb(i: int) -> None:
        # This also tests FIFO ordering of callbacks
        assert counter[0] == i
        counter[0] += 1

    for i in range(COUNT):
        token.run_sync_soon(cb, i)
    await wait_all_tasks_blocked()
    assert counter[0] == COUNT


def test_TrioToken_run_sync_soon_late_crash() -> None:
    # Crash after system nursery is closed -- easiest way to do that is
    # from an async generator finalizer.
    record: list[str] = []
    saved: list[AsyncGenerator[int, None]] = []

    async def agen() -> AsyncGenerator[int, None]:
        token = _core.current_trio_token()
        try:
            yield 1
        finally:
            token.run_sync_soon(lambda: {}["nope"])  # type: ignore[index]
            token.run_sync_soon(lambda: record.append("2nd ran"))

    async def main() -> None:
        saved.append(agen())
        await saved[-1].asend(None)
        record.append("main exiting")

    with pytest.raises(_core.TrioInternalError) as excinfo:
        _core.run(main)

    assert RaisesGroup(KeyError).matches(excinfo.value.__cause__)
    assert record == ["main exiting", "2nd ran"]


async def test_slow_abort_basic() -> None:
    with _core.CancelScope() as scope:
        scope.cancel()

        task = _core.current_task()
        token = _core.current_trio_token()

        def slow_abort(raise_cancel: _core.RaiseCancelT) -> _core.Abort:
            result = outcome.capture(raise_cancel)
            token.run_sync_soon(_core.reschedule, task, result)
            return _core.Abort.FAILED

        with pytest.raises(_core.Cancelled):
            await _core.wait_task_rescheduled(slow_abort)


async def test_slow_abort_edge_cases() -> None:
    record: list[str] = []

    async def slow_aborter() -> None:
        task = _core.current_task()
        token = _core.current_trio_token()

        def slow_abort(raise_cancel: _core.RaiseCancelT) -> _core.Abort:
            record.append("abort-called")
            result = outcome.capture(raise_cancel)
            token.run_sync_soon(_core.reschedule, task, result)
            return _core.Abort.FAILED

        record.append("sleeping")
        with pytest.raises(_core.Cancelled):
            await _core.wait_task_rescheduled(slow_abort)
        record.append("cancelled")
        # blocking again, this time it's okay, because we're shielded
        await _core.checkpoint()
        record.append("done")

    with _core.CancelScope() as outer1:
        with _core.CancelScope() as outer2:
            async with _core.open_nursery() as nursery:
                # So we have a task blocked on an operation that can't be
                # aborted immediately
                nursery.start_soon(slow_aborter)
                await wait_all_tasks_blocked()
                assert record == ["sleeping"]
                # And then we cancel it, so the abort callback gets run
                outer1.cancel()
                assert record == ["sleeping", "abort-called"]
                # In fact that happens twice! (This used to cause the abort
                # callback to be run twice)
                outer2.cancel()
                assert record == ["sleeping", "abort-called"]
                # But then before the abort finishes, the task gets shielded!
                nursery.cancel_scope.shield = True
                # Now we wait for the task to finish...
            # The cancellation was delivered, even though it was shielded
            assert record == ["sleeping", "abort-called", "cancelled", "done"]


async def test_task_tree_introspection() -> None:
    tasks: dict[str, _core.Task] = {}
    nurseries: dict[str, _core.Nursery] = {}

    async def parent(
        task_status: _core.TaskStatus[None] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        tasks["parent"] = _core.current_task()

        assert tasks["parent"].child_nurseries == []

        async with _core.open_nursery() as nursery1:
            async with _core.open_nursery() as nursery2:
                assert tasks["parent"].child_nurseries == [nursery1, nursery2]

        assert tasks["parent"].child_nurseries == []

        nursery: _core.Nursery | None
        async with _core.open_nursery() as nursery:
            nurseries["parent"] = nursery
            await nursery.start(child1)

        # Upward links survive after tasks/nurseries exit
        assert nurseries["parent"].parent_task is tasks["parent"]
        assert tasks["child1"].parent_nursery is nurseries["parent"]
        assert nurseries["child1"].parent_task is tasks["child1"]
        assert tasks["child2"].parent_nursery is nurseries["child1"]

        nursery = _core.current_task().parent_nursery
        # Make sure that chaining eventually gives a nursery of None (and not,
        # for example, an error)
        while nursery is not None:
            t = nursery.parent_task
            nursery = t.parent_nursery

    async def child2() -> None:
        tasks["child2"] = _core.current_task()
        assert tasks["parent"].child_nurseries == [nurseries["parent"]]
        assert nurseries["parent"].child_tasks == frozenset({tasks["child1"]})
        assert tasks["child1"].child_nurseries == [nurseries["child1"]]
        assert nurseries["child1"].child_tasks == frozenset({tasks["child2"]})
        assert tasks["child2"].child_nurseries == []

    async def child1(
        *,
        task_status: _core.TaskStatus[None] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        me = tasks["child1"] = _core.current_task()
        assert not_none(me.parent_nursery).parent_task is tasks["parent"]
        assert me.parent_nursery is not nurseries["parent"]
        assert me.eventual_parent_nursery is nurseries["parent"]
        task_status.started()
        assert me.parent_nursery is nurseries["parent"]
        assert me.eventual_parent_nursery is None

        # Wait for the start() call to return and close its internal nursery, to
        # ensure consistent results in child2:
        await _core.wait_all_tasks_blocked()

        async with _core.open_nursery() as nursery:
            nurseries["child1"] = nursery
            nursery.start_soon(child2)

    async with _core.open_nursery() as nursery:
        nursery.start_soon(parent)

    # There are no pending starts, so no one should have a non-None
    # eventual_parent_nursery
    for task in tasks.values():
        assert task.eventual_parent_nursery is None


async def test_nursery_closure() -> None:
    async def child1(nursery: _core.Nursery) -> None:
        # We can add new tasks to the nursery even after entering __aexit__,
        # so long as there are still tasks running
        nursery.start_soon(child2)

    async def child2() -> None:
        pass

    async with _core.open_nursery() as nursery:
        nursery.start_soon(child1, nursery)

    # But once we've left __aexit__, the nursery is closed
    with pytest.raises(RuntimeError):
        nursery.start_soon(child2)


async def test_spawn_name() -> None:
    async def func1(expected: str) -> None:
        task = _core.current_task()
        assert expected in task.name

    async def func2() -> None:  # pragma: no cover
        pass

    async def check(  # type: ignore[explicit-any]
        spawn_fn: Callable[..., object],
    ) -> None:
        spawn_fn(func1, "func1")
        spawn_fn(func1, "func2", name=func2)
        spawn_fn(func1, "func3", name="func3")
        spawn_fn(functools.partial(func1, "func1"))
        spawn_fn(func1, "object", name=object())

    async with _core.open_nursery() as nursery:
        await check(nursery.start_soon)
    await check(_core.spawn_system_task)


async def test_current_effective_deadline(mock_clock: _core.MockClock) -> None:
    assert _core.current_effective_deadline() == inf

    with _core.CancelScope(deadline=5) as scope1:
        with _core.CancelScope(deadline=10) as scope2:
            assert _core.current_effective_deadline() == 5
            scope2.deadline = 3
            assert _core.current_effective_deadline() == 3
            scope2.deadline = 10
            assert _core.current_effective_deadline() == 5
            scope2.shield = True
            assert _core.current_effective_deadline() == 10
            scope2.shield = False
            assert _core.current_effective_deadline() == 5
            scope1.cancel()
            assert _core.current_effective_deadline() == -inf
            scope2.shield = True
            assert _core.current_effective_deadline() == 10
        assert _core.current_effective_deadline() == -inf
    assert _core.current_effective_deadline() == inf


def test_nice_error_on_bad_calls_to_run_or_spawn() -> None:
    def bad_call_run(  # type: ignore[explicit-any]
        func: Callable[..., Awaitable[object]],
        *args: tuple[object, ...],
    ) -> None:
        _core.run(func, *args)

    def bad_call_spawn(  # type: ignore[explicit-any]
        func: Callable[..., Awaitable[object]],
        *args: tuple[object, ...],
    ) -> None:
        async def main() -> None:
            async with _core.open_nursery() as nursery:
                nursery.start_soon(func, *args)

        _core.run(main)

    async def f() -> None:  # pragma: no cover
        pass

    async def async_gen(arg: T) -> AsyncGenerator[T, None]:  # pragma: no cover
        yield arg

    # If/when RaisesGroup/Matcher is added to pytest in some form this test can be
    # rewritten to use a loop again, and avoid specifying the exceptions twice in
    # different ways
    with pytest.raises(
        TypeError,
        match=r"^Trio was expecting an async function, but instead it got a coroutine object <.*>",
    ):
        bad_call_run(f())  # type: ignore[arg-type]
    with pytest.raises(
        TypeError,
        match=r"expected an async function but got an async generator",
    ):
        bad_call_run(async_gen, 0)  # type: ignore

    # bad_call_spawn calls the function inside a nursery, so the exception will be
    # wrapped in an exceptiongroup
    with RaisesGroup(Matcher(TypeError, "expecting an async function")):
        bad_call_spawn(f())  # type: ignore[arg-type]

    with RaisesGroup(
        Matcher(TypeError, "expected an async function but got an async generator"),
    ):
        bad_call_spawn(async_gen, 0)  # type: ignore


def test_calling_asyncio_function_gives_nice_error() -> None:
    async def child_xyzzy() -> None:
        await create_asyncio_future_in_new_loop()

    async def misguided() -> None:
        await child_xyzzy()

    with pytest.raises(TypeError, match="asyncio") as excinfo:
        _core.run(misguided)

    # The traceback should point to the location of the foreign await
    assert any(  # pragma: no branch
        entry.name == "child_xyzzy" for entry in excinfo.traceback
    )


async def test_asyncio_function_inside_nursery_does_not_explode() -> None:
    # Regression test for https://github.com/python-trio/trio/issues/552
    with RaisesGroup(Matcher(TypeError, "asyncio")):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(sleep_forever)
            await create_asyncio_future_in_new_loop()


async def test_trivial_yields() -> None:
    with assert_checkpoints():
        await _core.checkpoint()

    with assert_checkpoints():
        await _core.checkpoint_if_cancelled()
        await _core.cancel_shielded_checkpoint()

    # Weird case: opening and closing a nursery schedules, but doesn't check
    # for cancellation (unless something inside the nursery does)
    task = _core.current_task()
    before_schedule_points = task._schedule_points
    with _core.CancelScope() as cs:
        cs.cancel()
        async with _core.open_nursery():
            pass
    assert not cs.cancelled_caught
    assert task._schedule_points > before_schedule_points

    before_schedule_points = task._schedule_points

    async def noop_with_no_checkpoint() -> None:
        pass

    with _core.CancelScope() as cs:
        cs.cancel()
        async with _core.open_nursery() as nursery:
            nursery.start_soon(noop_with_no_checkpoint)
    assert not cs.cancelled_caught

    assert task._schedule_points > before_schedule_points

    with _core.CancelScope() as cancel_scope:
        cancel_scope.cancel()
        with RaisesGroup(KeyError):
            async with _core.open_nursery():
                raise KeyError


async def test_nursery_start(autojump_clock: _core.MockClock) -> None:
    async def no_args() -> None:  # pragma: no cover
        pass

    # Errors in calling convention get raised immediately from start
    async with _core.open_nursery() as nursery:
        with pytest.raises(TypeError):
            await nursery.start(no_args)

    async def sleep_then_start(
        seconds: int,
        *,
        task_status: _core.TaskStatus[int] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        repr(task_status)  # smoke test
        await sleep(seconds)
        task_status.started(seconds)
        await sleep(seconds)

    # Basic happy-path check: start waits for the task to call started(), then
    # returns, passes back the value, and the given nursery then waits for it
    # to exit.
    for seconds in [1, 2]:
        async with _core.open_nursery() as nursery:
            assert len(nursery.child_tasks) == 0
            t0 = _core.current_time()
            assert await nursery.start(sleep_then_start, seconds) == seconds
            assert _core.current_time() - t0 == seconds
            assert len(nursery.child_tasks) == 1
        assert _core.current_time() - t0 == 2 * seconds

    # Make sure TASK_STATUS_IGNORED works so task function can be called
    # directly
    t0 = _core.current_time()
    await sleep_then_start(3)
    assert _core.current_time() - t0 == 2 * 3

    # calling started twice
    async def double_started(
        *,
        task_status: _core.TaskStatus[None] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        task_status.started()
        with pytest.raises(RuntimeError):
            task_status.started()

    async with _core.open_nursery() as nursery:
        await nursery.start(double_started)

    # child crashes before calling started -> error comes out of .start()
    async def raise_keyerror(
        *,
        task_status: _core.TaskStatus[None] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        raise KeyError("oops")

    async with _core.open_nursery() as nursery:
        with pytest.raises(KeyError):
            await nursery.start(raise_keyerror)

    # child exiting cleanly before calling started -> triggers a RuntimeError
    async def nothing(
        *,
        task_status: _core.TaskStatus[None] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        return

    async with _core.open_nursery() as nursery:
        with pytest.raises(RuntimeError) as excinfo1:
            await nursery.start(nothing)
        assert "exited without calling" in str(excinfo1.value)

    # if the call to start() is cancelled, then the call to started() does
    # nothing -- the child keeps executing under start(). The value it passed
    # is ignored; start() raises Cancelled.
    async def just_started(
        *,
        task_status: _core.TaskStatus[str] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        task_status.started("hi")
        await _core.checkpoint()

    async with _core.open_nursery() as nursery:
        with _core.CancelScope() as cs:
            cs.cancel()
            with pytest.raises(_core.Cancelled):
                await nursery.start(just_started)

    # but if the task does not execute any checkpoints, and exits, then start()
    # doesn't raise Cancelled, since the task completed successfully.
    async def started_with_no_checkpoint(
        *,
        task_status: _core.TaskStatus[None] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        task_status.started(None)

    async with _core.open_nursery() as nursery:
        with _core.CancelScope() as cs:
            cs.cancel()
            await nursery.start(started_with_no_checkpoint)
        assert not cs.cancelled_caught

    # and since starting in a cancelled context makes started() a no-op, if
    # the child crashes after calling started(), the error can *still* come
    # out of start()
    async def raise_keyerror_after_started(
        *,
        task_status: _core.TaskStatus[None] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        task_status.started()
        raise KeyError("whoopsiedaisy")

    async with _core.open_nursery() as nursery:
        with _core.CancelScope() as cs:
            cs.cancel()
            with pytest.raises(KeyError):
                await nursery.start(raise_keyerror_after_started)

    # trying to start in a closed nursery raises an error immediately
    async with _core.open_nursery() as closed_nursery:
        pass
    t0 = _core.current_time()
    with pytest.raises(RuntimeError):
        await closed_nursery.start(sleep_then_start, 7)
    # sub-second delays can be caused by unrelated multitasking by an OS
    assert int(_core.current_time()) == int(t0)


async def test_task_nursery_stack() -> None:
    task = _core.current_task()
    assert task._child_nurseries == []
    async with _core.open_nursery() as nursery1:
        assert task._child_nurseries == [nursery1]
        with RaisesGroup(KeyError):
            async with _core.open_nursery() as nursery2:
                assert task._child_nurseries == [nursery1, nursery2]
                raise KeyError
        assert task._child_nurseries == [nursery1]
    assert task._child_nurseries == []


async def test_nursery_start_with_cancelled_nursery() -> None:
    # This function isn't testing task_status, it's using task_status as a
    # convenient way to get a nursery that we can test spawning stuff into.
    async def setup_nursery(
        task_status: _core.TaskStatus[_core.Nursery] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        async with _core.open_nursery() as nursery:
            task_status.started(nursery)
            await sleep_forever()

    # Calls started() while children are asleep, so we can make sure
    # that the cancellation machinery notices and aborts when a sleeping task
    # is moved into a cancelled scope.
    async def sleeping_children(
        fn: Callable[[], object],
        *,
        task_status: _core.TaskStatus[None] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        async with _core.open_nursery() as nursery:
            nursery.start_soon(sleep_forever)
            nursery.start_soon(sleep_forever)
            await wait_all_tasks_blocked()
            fn()
            task_status.started()

    # Cancelling the setup_nursery just *before* calling started()
    async with _core.open_nursery() as nursery:
        value = await nursery.start(setup_nursery)
        assert isinstance(value, _core.Nursery)
        target_nursery: _core.Nursery = value
        await target_nursery.start(
            sleeping_children,
            target_nursery.cancel_scope.cancel,
        )

    # Cancelling the setup_nursery just *after* calling started()
    async with _core.open_nursery() as nursery:
        value = await nursery.start(setup_nursery)
        assert isinstance(value, _core.Nursery)
        target_nursery = value
        await target_nursery.start(sleeping_children, lambda: None)
        target_nursery.cancel_scope.cancel()


async def test_nursery_start_keeps_nursery_open(
    autojump_clock: _core.MockClock,
) -> None:
    async def sleep_a_bit(
        task_status: _core.TaskStatus[None] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        await sleep(2)
        task_status.started()
        await sleep(3)

    async with _core.open_nursery() as nursery1:
        t0 = _core.current_time()
        async with _core.open_nursery() as nursery2:
            # Start the 'start' call running in the background
            nursery1.start_soon(nursery2.start, sleep_a_bit)
            # Sleep a bit
            await sleep(1)
            # Start another one.
            nursery1.start_soon(nursery2.start, sleep_a_bit)
            # Then exit this nursery. At this point, there are no tasks
            # present in this nursery -- the only thing keeping it open is
            # that the tasks will be placed into it soon, when they call
            # started().
        assert _core.current_time() - t0 == 6

    # Check that it still works even if the task that the nursery is waiting
    # for ends up crashing, and never actually enters the nursery.
    async def sleep_then_crash(
        task_status: _core.TaskStatus[None] = _core.TASK_STATUS_IGNORED,
    ) -> None:
        await sleep(7)
        raise KeyError

    async def start_sleep_then_crash(nursery: _core.Nursery) -> None:
        with pytest.raises(KeyError):
            await nursery.start(sleep_then_crash)

    async with _core.open_nursery() as nursery1:
        t0 = _core.current_time()
        async with _core.open_nursery() as nursery2:
            nursery1.start_soon(start_sleep_then_crash, nursery2)
            await wait_all_tasks_blocked()
        assert _core.current_time() - t0 == 7


async def test_nursery_explicit_exception() -> None:
    with RaisesGroup(KeyError):
        async with _core.open_nursery():
            raise KeyError()


async def test_nursery_stop_iteration() -> None:
    async def fail() -> NoReturn:
        raise ValueError

    with RaisesGroup(StopIteration, ValueError):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(fail)
            raise StopIteration


async def test_nursery_stop_async_iteration() -> None:
    class it:
        def __init__(self, count: int) -> None:
            self.count = count
            self.val = 0

        async def __anext__(self) -> int:
            await sleep(0)
            val = self.val
            if val >= self.count:
                raise StopAsyncIteration
            self.val += 1
            return val

    class async_zip:
        def __init__(self, *largs: it) -> None:
            self.nexts = [obj.__anext__ for obj in largs]

        async def _accumulate(
            self,
            f: Callable[[], Awaitable[int]],
            items: list[int],
            i: int,
        ) -> None:
            items[i] = await f()

        def __aiter__(self) -> async_zip:
            return self

        async def __anext__(self) -> list[int]:
            nexts = self.nexts
            items: list[int] = [-1] * len(nexts)

            try:
                async with _core.open_nursery() as nursery:
                    for i, f in enumerate(nexts):
                        nursery.start_soon(self._accumulate, f, items, i)
            except ExceptionGroup as e:
                # With strict_exception_groups enabled, users now need to unwrap
                # StopAsyncIteration and re-raise it.
                # This would be relatively clean on python3.11+ with except*.
                # We could also use RaisesGroup, but that's primarily meant as
                # test infra, not as a runtime tool.
                if len(e.exceptions) == 1 and isinstance(
                    e.exceptions[0],
                    StopAsyncIteration,
                ):
                    raise e.exceptions[0] from None
                else:  # pragma: no cover
                    raise AssertionError("unknown error in _accumulate") from e

            return items

    result: list[list[int]] = [vals async for vals in async_zip(it(4), it(2))]
    assert result == [[0, 0], [1, 1]]


async def test_traceback_frame_removal() -> None:
    async def my_child_task() -> NoReturn:
        raise KeyError()

    def check_traceback(exc: KeyError) -> bool:
        # The top frame in the exception traceback should be inside the child
        # task, not trio/contextvars internals. And there's only one frame
        # inside the child task, so this will also detect if our frame-removal
        # is too eager.
        tb = exc.__traceback__
        assert tb is not None
        return tb.tb_frame.f_code is my_child_task.__code__

    with RaisesGroup(Matcher(KeyError, check=check_traceback)):
        # For now cancel/nursery scopes still leave a bunch of tb gunk behind.
        # But if there's an Exceptiongroup, they leave it on the group,
        # which lets us get a clean look at the KeyError itself.
        async with _core.open_nursery() as nursery:
            nursery.start_soon(my_child_task)


def test_contextvar_support() -> None:
    var: contextvars.ContextVar[str] = contextvars.ContextVar("test")
    var.set("before")

    assert var.get() == "before"

    async def inner() -> None:
        task = _core.current_task()
        assert task.context.get(var) == "before"
        assert var.get() == "before"
        var.set("after")
        assert var.get() == "after"
        assert var in task.context
        assert task.context.get(var) == "after"

    _core.run(inner)
    assert var.get() == "before"


async def test_contextvar_multitask() -> None:
    var = contextvars.ContextVar("test", default="hmmm")

    async def t1() -> None:
        assert var.get() == "hmmm"
        var.set("hmmmm")
        assert var.get() == "hmmmm"

    async def t2() -> None:
        assert var.get() == "hmmmm"

    async with _core.open_nursery() as n:
        n.start_soon(t1)
        await wait_all_tasks_blocked()
        assert var.get() == "hmmm"
        var.set("hmmmm")
        n.start_soon(t2)
        await wait_all_tasks_blocked()


def test_system_task_contexts() -> None:
    cvar: contextvars.ContextVar[str] = contextvars.ContextVar("qwilfish")
    cvar.set("water")

    async def system_task() -> None:
        assert cvar.get() == "water"

    async def regular_task() -> None:
        assert cvar.get() == "poison"

    async def inner() -> None:
        async with _core.open_nursery() as nursery:
            cvar.set("poison")
            nursery.start_soon(regular_task)
            _core.spawn_system_task(system_task)
            await wait_all_tasks_blocked()

    _core.run(inner)


async def test_Nursery_init() -> None:
    """Test that nurseries cannot be constructed directly."""
    # This function is async so that we have access to a task object we can
    # pass in. It should never be accessed though.
    task = _core.current_task()
    scope = _core.CancelScope()
    with pytest.raises(TypeError):
        _core._run.Nursery(task, scope, True)


async def test_Nursery_private_init() -> None:
    # context manager creation should not raise
    async with _core.open_nursery() as nursery:
        assert not nursery._closed


def test_Nursery_subclass() -> None:
    with pytest.raises(TypeError):
        type("Subclass", (_core._run.Nursery,), {})


def test_CancelScope_subclass() -> None:
    with pytest.raises(TypeError):
        type("Subclass", (_core.CancelScope,), {})


def test_sniffio_integration() -> None:
    with pytest.raises(sniffio.AsyncLibraryNotFoundError):
        sniffio.current_async_library()

    async def check_inside_trio() -> None:
        assert sniffio.current_async_library() == "trio"

    def check_function_returning_coroutine() -> Awaitable[object]:
        assert sniffio.current_async_library() == "trio"
        return check_inside_trio()

    _core.run(check_inside_trio)

    with pytest.raises(sniffio.AsyncLibraryNotFoundError):
        sniffio.current_async_library()

    @contextmanager
    def alternate_sniffio_library() -> Generator[None, None, None]:
        prev_token = sniffio.current_async_library_cvar.set("nullio")
        prev_library, sniffio.thread_local.name = sniffio.thread_local.name, "nullio"
        try:
            yield
            assert sniffio.current_async_library() == "nullio"
        finally:
            sniffio.thread_local.name = prev_library
            sniffio.current_async_library_cvar.reset(prev_token)

    async def check_new_task_resets_sniffio_library() -> None:
        with alternate_sniffio_library():
            _core.spawn_system_task(check_inside_trio)
        async with _core.open_nursery() as nursery:
            with alternate_sniffio_library():
                nursery.start_soon(check_inside_trio)
                nursery.start_soon(check_function_returning_coroutine)

    _core.run(check_new_task_resets_sniffio_library)


async def test_Task_custom_sleep_data() -> None:
    task = _core.current_task()
    assert task.custom_sleep_data is None
    task.custom_sleep_data = 1
    assert task.custom_sleep_data == 1
    await _core.checkpoint()
    assert task.custom_sleep_data is None


@types.coroutine
def async_yield(value: T) -> Generator[T, None, None]:
    yield value


async def test_permanently_detach_coroutine_object() -> None:
    task: _core.Task | None = None
    pdco_outcome: outcome.Outcome[str] | None = None

    async def detachable_coroutine(
        task_outcome: outcome.Outcome[None],
        yield_value: object,
    ) -> None:
        await sleep(0)
        nonlocal task, pdco_outcome
        task = _core.current_task()
        # `No overload variant of "acapture" matches argument types "Callable[[Outcome[object]], Coroutine[Any, Any, object]]", "Outcome[None]"`
        pdco_outcome = await outcome.acapture(  # type: ignore[call-overload]
            _core.permanently_detach_coroutine_object,
            task_outcome,
        )
        await async_yield(yield_value)

    async with _core.open_nursery() as nursery:
        nursery.start_soon(detachable_coroutine, outcome.Value(None), "I'm free!")

    # If we get here then Trio thinks the task has exited... but the coroutine
    # is still iterable. At that point anything can be sent into the coroutine, so the .coro type
    # is wrong.
    assert pdco_outcome is None
    # `Argument 1 to "send" of "Coroutine" has incompatible type "str"; expected "Outcome[object]"`
    assert not_none(task).coro.send("be free!") == "I'm free!"  # type: ignore[arg-type]
    assert pdco_outcome == outcome.Value("be free!")
    with pytest.raises(StopIteration):
        not_none(task).coro.send(None)  # type: ignore[arg-type]

    # Check the exception paths too
    task = None
    pdco_outcome = None
    with RaisesGroup(KeyError):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(detachable_coroutine, outcome.Error(KeyError()), "uh oh")
    throw_in = ValueError()
    assert isinstance(task, _core.Task)  # For type checkers.
    assert not_none(task).coro.throw(throw_in) == "uh oh"
    assert pdco_outcome == outcome.Error(throw_in)
    with pytest.raises(StopIteration):
        task.coro.send(None)

    async def bad_detach() -> None:
        async with _core.open_nursery():
            with pytest.raises(RuntimeError) as excinfo:
                await _core.permanently_detach_coroutine_object(outcome.Value(None))
            assert "open nurser" in str(excinfo.value)

    async with _core.open_nursery() as nursery:
        nursery.start_soon(bad_detach)


async def test_detach_and_reattach_coroutine_object() -> None:
    unrelated_task: _core.Task | None = None
    task: _core.Task | None = None

    async def unrelated_coroutine() -> None:
        nonlocal unrelated_task
        unrelated_task = _core.current_task()

    async def reattachable_coroutine() -> None:
        nonlocal task
        await sleep(0)

        task = _core.current_task()

        def abort_fn(_: _core.RaiseCancelT) -> _core.Abort:  # pragma: no cover
            return _core.Abort.FAILED

        got = await _core.temporarily_detach_coroutine_object(abort_fn)
        assert got == "not trio!"

        await async_yield(1)
        await async_yield(2)

        with pytest.raises(RuntimeError) as excinfo:
            await _core.reattach_detached_coroutine_object(
                not_none(unrelated_task),
                None,
            )
        assert "does not match" in str(excinfo.value)

        await _core.reattach_detached_coroutine_object(task, "byebye")

        await sleep(0)

    async with _core.open_nursery() as nursery:
        nursery.start_soon(unrelated_coroutine)
        nursery.start_soon(reattachable_coroutine)
        await wait_all_tasks_blocked()

        # Okay, it's detached. Here's our coroutine runner:
        # `Argument 1 to "send" of "Coroutine" has incompatible type "str"; expected "Outcome[object]"`
        assert not_none(task).coro.send("not trio!") == 1  # type: ignore[arg-type]
        assert not_none(task).coro.send(None) == 2  # type: ignore[arg-type]
        assert not_none(task).coro.send(None) == "byebye"  # type: ignore[arg-type]

        # Now it's been reattached, and we can leave the nursery


async def test_detached_coroutine_cancellation() -> None:
    abort_fn_called = False
    task: _core.Task | None = None

    async def reattachable_coroutine() -> None:
        await sleep(0)

        nonlocal task
        task = _core.current_task()

        def abort_fn(_: _core.RaiseCancelT) -> _core.Abort:
            nonlocal abort_fn_called
            abort_fn_called = True
            return _core.Abort.FAILED

        await _core.temporarily_detach_coroutine_object(abort_fn)
        await _core.reattach_detached_coroutine_object(task, None)
        with pytest.raises(_core.Cancelled):
            await sleep(0)

    async with _core.open_nursery() as nursery:
        nursery.start_soon(reattachable_coroutine)
        await wait_all_tasks_blocked()
        assert task is not None
        nursery.cancel_scope.cancel()
        # `Argument 1 to "send" of "Coroutine" has incompatible type "None"; expected "Outcome[object]"`
        task.coro.send(None)  # type: ignore[arg-type]

    assert abort_fn_called


@restore_unraisablehook()
def test_async_function_implemented_in_C() -> None:
    # These used to crash because we'd try to mutate the coroutine object's
    # cr_frame, but C functions don't have Python frames.

    async def agen_fn(record: list[str]) -> AsyncIterator[None]:
        assert not _core.currently_ki_protected()
        record.append("the generator ran")
        yield

    run_record: list[str] = []
    agen = agen_fn(run_record)
    _core.run(agen.__anext__)
    assert run_record == ["the generator ran"]

    async def main() -> None:
        start_soon_record: list[str] = []
        agen = agen_fn(start_soon_record)
        async with _core.open_nursery() as nursery:
            nursery.start_soon(agen.__anext__)
        assert start_soon_record == ["the generator ran"]

    _core.run(main)


async def test_very_deep_cancel_scope_nesting() -> None:
    # This used to crash with a RecursionError in CancelStatus.recalculate
    with ExitStack() as exit_stack:
        outermost_scope = _core.CancelScope()
        exit_stack.enter_context(outermost_scope)
        for _ in range(5000):
            exit_stack.enter_context(_core.CancelScope())
        outermost_scope.cancel()


async def test_cancel_scope_deadline_duplicates() -> None:
    # This exercises an assert in Deadlines._prune, by intentionally creating
    # duplicate entries in the deadline heap.
    now = _core.current_time()
    with _core.CancelScope() as cscope:
        for _ in range(DEADLINE_HEAP_MIN_PRUNE_THRESHOLD * 2):
            cscope.deadline = now + 9998
            cscope.deadline = now + 9999
        await sleep(0.01)


# I don't know if this one can fail anymore, the `del` next to the comment that used to
# refer to this only seems to break test_cancel_scope_exit_doesnt_create_cyclic_garbage
# We're keeping it for now to cover Outcome and potential future refactoring
@pytest.mark.skipif(
    sys.implementation.name != "cpython",
    reason="Only makes sense with refcounting GC",
)
async def test_simple_cancel_scope_usage_doesnt_create_cyclic_garbage() -> None:
    # https://github.com/python-trio/trio/issues/1770
    gc.collect()

    async def do_a_cancel() -> None:
        with _core.CancelScope() as cscope:
            cscope.cancel()
            await sleep_forever()

    async def crasher() -> NoReturn:
        raise ValueError("this is a crash")

    old_flags = gc.get_debug()
    try:
        gc.collect()
        gc.set_debug(gc.DEBUG_SAVEALL)

        # cover outcome.Error.unwrap
        # (See https://github.com/python-trio/outcome/pull/29)
        await do_a_cancel()
        # cover outcome.Error.unwrap if unrolled_run hangs on to exception refs
        # (See https://github.com/python-trio/trio/pull/1864)
        await do_a_cancel()

        with RaisesGroup(Matcher(ValueError, "^this is a crash$")):
            async with _core.open_nursery() as nursery:
                # cover NurseryManager.__aexit__
                nursery.start_soon(crasher)

        gc.collect()
        assert not gc.garbage
    finally:
        gc.set_debug(old_flags)
        gc.garbage.clear()


@pytest.mark.skipif(
    sys.implementation.name != "cpython",
    reason="Only makes sense with refcounting GC",
)
async def test_cancel_scope_exit_doesnt_create_cyclic_garbage() -> None:
    # https://github.com/python-trio/trio/pull/2063
    gc.collect()

    async def crasher() -> NoReturn:
        raise ValueError("this is a crash")

    old_flags = gc.get_debug()
    try:
        # fmt: off
        # Remove after 3.9 unsupported, black formats in a way that breaks if
        # you do `-X oldparser`
        with RaisesGroup(
            Matcher(ValueError, "^this is a crash$"),
        ), _core.CancelScope() as outer:
            # fmt: on
            async with _core.open_nursery() as nursery:
                gc.collect()
                gc.set_debug(gc.DEBUG_SAVEALL)
                # One child that gets cancelled by the outer scope
                nursery.start_soon(sleep_forever)
                outer.cancel()
                # And one that raises a different error
                nursery.start_soon(crasher)
                # so that outer filters a Cancelled from the ExceptionGroup and
                # covers CancelScope.__exit__ (and NurseryManager.__aexit__)
                # (See https://github.com/python-trio/trio/pull/2063)

        gc.collect()
        assert not gc.garbage
    finally:
        gc.set_debug(old_flags)
        gc.garbage.clear()


@pytest.mark.skipif(
    sys.implementation.name != "cpython",
    reason="Only makes sense with refcounting GC",
)
async def test_nursery_cancel_doesnt_create_cyclic_garbage() -> None:
    collected = False

    # https://github.com/python-trio/trio/issues/1770#issuecomment-730229423
    def toggle_collected() -> None:
        nonlocal collected
        collected = True

    gc.collect()
    old_flags = gc.get_debug()
    try:
        gc.set_debug(0)
        gc.collect()
        gc.set_debug(gc.DEBUG_SAVEALL)

        # cover Nursery._nested_child_finished
        async with _core.open_nursery() as nursery:
            nursery.cancel_scope.cancel()

        weakref.finalize(nursery, toggle_collected)
        del nursery
        # a checkpoint clears the nursery from the internals, apparently
        # TODO: stop event loop from hanging on to the nursery at this point
        await _core.checkpoint()

        assert collected
        gc.collect()
        assert not gc.garbage
    finally:
        gc.set_debug(old_flags)
        gc.garbage.clear()


@pytest.mark.skipif(
    sys.implementation.name != "cpython",
    reason="Only makes sense with refcounting GC",
)
async def test_locals_destroyed_promptly_on_cancel() -> None:
    destroyed = False

    def finalizer() -> None:
        nonlocal destroyed
        destroyed = True

    class A:
        pass

    async def task() -> None:
        a = A()
        weakref.finalize(a, finalizer)
        await _core.checkpoint()

    async with _core.open_nursery() as nursery:
        nursery.start_soon(task)
        nursery.cancel_scope.cancel()
    assert destroyed


def _create_kwargs(strictness: bool | None) -> dict[str, bool]:
    """Turn a bool|None into a kwarg dict that can be passed to `run` or `open_nursery`"""

    if strictness is None:
        return {}
    return {"strict_exception_groups": strictness}


@pytest.mark.filterwarnings(
    "ignore:.*strict_exception_groups=False:trio.TrioDeprecationWarning",
)
@pytest.mark.parametrize("run_strict", [True, False, None])
@pytest.mark.parametrize("open_nursery_strict", [True, False, None])
@pytest.mark.parametrize("multiple_exceptions", [True, False])
def test_setting_strict_exception_groups(
    run_strict: bool | None,
    open_nursery_strict: bool | None,
    multiple_exceptions: bool,
) -> None:
    """
    Test default values and that nurseries can both inherit and override the global context
    setting of strict_exception_groups.
    """

    async def raise_error() -> NoReturn:
        raise RuntimeError("test error")

    async def main() -> None:
        """Open a nursery, and raise one or two errors inside"""
        async with _core.open_nursery(**_create_kwargs(open_nursery_strict)) as nursery:
            nursery.start_soon(raise_error)
            if multiple_exceptions:
                nursery.start_soon(raise_error)

    def run_main() -> None:
        # mypy doesn't like kwarg magic
        _core.run(main, **_create_kwargs(run_strict))  # type: ignore[arg-type]

    matcher = Matcher(RuntimeError, r"^test error$")

    if multiple_exceptions:
        with RaisesGroup(matcher, matcher):
            run_main()
    elif open_nursery_strict or (
        open_nursery_strict is None and run_strict is not False
    ):
        with RaisesGroup(matcher):
            run_main()
    else:
        with pytest.raises(RuntimeError, match=r"^test error$"):
            run_main()


@pytest.mark.filterwarnings(
    "ignore:.*strict_exception_groups=False:trio.TrioDeprecationWarning",
)
@pytest.mark.parametrize("strict", [True, False, None])
async def test_nursery_collapse(strict: bool | None) -> None:
    """
    Test that a single exception from a nested nursery gets collapsed correctly
    depending on strict_exception_groups value when CancelledErrors are stripped from it.
    """

    async def raise_error() -> NoReturn:
        raise RuntimeError("test error")

    # mypy requires explicit type for conditional expression
    maybe_wrapped_runtime_error: type[RuntimeError] | RaisesGroup[RuntimeError] = (
        RuntimeError if strict is False else RaisesGroup(RuntimeError)
    )

    with RaisesGroup(RuntimeError, maybe_wrapped_runtime_error):
        async with _core.open_nursery() as nursery:
            nursery.start_soon(sleep_forever)
            nursery.start_soon(raise_error)
            async with _core.open_nursery(**_create_kwargs(strict)) as nursery2:
                nursery2.start_soon(sleep_forever)
                nursery2.start_soon(raise_error)
                nursery.cancel_scope.cancel()


async def test_cancel_scope_no_cancellederror() -> None:
    """
    Test that when a cancel scope encounters an exception group that does NOT contain
    a Cancelled exception, it will NOT set the ``cancelled_caught`` flag.
    """

    with RaisesGroup(RuntimeError, RuntimeError, match="test"):
        with _core.CancelScope() as scope:
            scope.cancel()
            raise ExceptionGroup("test", [RuntimeError(), RuntimeError()])

    assert not scope.cancelled_caught


@pytest.mark.filterwarnings(
    "ignore:.*strict_exception_groups=False:trio.TrioDeprecationWarning",
)
@pytest.mark.parametrize("run_strict", [False, True])
@pytest.mark.parametrize("start_raiser_strict", [False, True, None])
@pytest.mark.parametrize("raise_after_started", [False, True])
@pytest.mark.parametrize("raise_custom_exc_grp", [False, True])
def test_trio_run_strict_before_started(
    run_strict: bool,
    start_raiser_strict: bool | None,
    raise_after_started: bool,
    raise_custom_exc_grp: bool,
) -> None:
    """
    Regression tests for #2611, where exceptions raised before
    `TaskStatus.started()` caused `Nursery.start()` to wrap them in an
    ExceptionGroup when using `run(..., strict_exception_groups=True)`.

    Regression tests for #2844, where #2611 was initially fixed in a way that
    had unintended side effects.
    """

    raiser_exc: ValueError | ExceptionGroup[ValueError]
    if raise_custom_exc_grp:
        raiser_exc = ExceptionGroup("my group", [ValueError()])
    else:
        raiser_exc = ValueError()

    async def raiser(*, task_status: _core.TaskStatus[None]) -> None:
        if raise_after_started:
            task_status.started()
        raise raiser_exc

    async def start_raiser() -> None:
        try:
            async with _core.open_nursery(
                strict_exception_groups=start_raiser_strict,
            ) as nursery:
                await nursery.start(raiser)
        except BaseExceptionGroup as exc_group:
            if start_raiser_strict:
                # Iff the code using the nursery *forced* it to be strict
                # (overriding the runner setting) then it may replace the bland
                # exception group raised by trio with a more specific one (subtype,
                # different message, etc.).
                raise BaseExceptionGroup(
                    "start_raiser nursery custom message",
                    exc_group.exceptions,
                ) from None
            raise

    with pytest.raises(BaseException) as exc_info:  # noqa: PT011  # no `match`
        _core.run(start_raiser, strict_exception_groups=run_strict)

    if start_raiser_strict or (run_strict and start_raiser_strict is None):
        # start_raiser's nursery was strict.
        assert isinstance(exc_info.value, BaseExceptionGroup)
        if start_raiser_strict:
            # start_raiser didn't unknowingly inherit its nursery strictness
            # from `run`---it explicitly chose for its nursery to be strict.
            assert exc_info.value.message == "start_raiser nursery custom message"
        assert len(exc_info.value.exceptions) == 1
        should_be_raiser_exc = exc_info.value.exceptions[0]
    else:
        # start_raiser's nursery was not strict.
        should_be_raiser_exc = exc_info.value
    if isinstance(raiser_exc, ValueError):
        assert should_be_raiser_exc is raiser_exc
    else:
        # Check attributes, not identity, because should_be_raiser_exc may be a
        # copy of raiser_exc rather than raiser_exc by identity.
        assert type(should_be_raiser_exc) is type(raiser_exc)
        assert should_be_raiser_exc.message == raiser_exc.message
        assert should_be_raiser_exc.exceptions == raiser_exc.exceptions


async def test_internal_error_old_nursery_multiple_tasks() -> None:
    async def error_func() -> None:
        raise ValueError

    async def spawn_tasks_in_old_nursery(task_status: _core.TaskStatus[None]) -> None:
        old_nursery = _core.current_task().parent_nursery
        assert old_nursery is not None
        old_nursery.start_soon(error_func)
        old_nursery.start_soon(error_func)

    async with _core.open_nursery() as nursery:
        with pytest.raises(_core.TrioInternalError) as excinfo:
            await nursery.start(spawn_tasks_in_old_nursery)
    assert RaisesGroup(ValueError, ValueError).matches(excinfo.value.__cause__)


if sys.version_info >= (3, 11):

    def no_other_refs() -> list[object]:
        return []

else:

    def no_other_refs() -> list[object]:
        return [sys._getframe(1)]


@pytest.mark.skipif(
    sys.implementation.name != "cpython",
    reason="Only makes sense with refcounting GC",
)
@pytest.mark.xfail(
    sys.version_info >= (3, 14),
    reason="https://github.com/python/cpython/issues/125603",
)
async def test_ki_protection_doesnt_leave_cyclic_garbage() -> None:
    class MyException(Exception):
        pass

    async def demo() -> None:
        async def handle_error() -> None:
            try:
                raise MyException
            except MyException as e:
                exceptions.append(e)

        exceptions: list[MyException] = []
        try:
            async with _core.open_nursery() as n:
                n.start_soon(handle_error)
            raise ExceptionGroup("errors", exceptions)
        finally:
            exceptions = []

    exc: Exception | None = None
    try:
        await demo()
    except ExceptionGroup as excs:
        exc = excs.exceptions[0]

    assert isinstance(exc, MyException)
    assert gc.get_referrers(exc) == no_other_refs()


def test_context_run_tb_frames() -> None:
    class Context:
        def run(self, fn: Callable[[], object]) -> object:
            return fn()

    with mock.patch("trio._core._run.copy_context", return_value=Context()):
        assert _count_context_run_tb_frames() == 1


@restore_unraisablehook()
def test_trio_context_detection() -> None:
    assert not _core.in_trio_run()
    assert not _core.in_trio_task()

    def inner() -> None:
        assert _core.in_trio_run()
        assert _core.in_trio_task()

    def sync_inner() -> None:
        assert not _core.in_trio_run()
        assert not _core.in_trio_task()

    def inner_abort(_: object) -> _core.Abort:
        assert _core.in_trio_run()
        assert _core.in_trio_task()
        return _core.Abort.SUCCEEDED

    async def main() -> None:
        assert _core.in_trio_run()
        assert _core.in_trio_task()

        inner()

        await to_thread_run_sync(sync_inner)
        with _core.CancelScope(deadline=_core.current_time() - 1):
            await _core.wait_task_rescheduled(inner_abort)

    _core.run(main)
