Source code for cocotb.task

# Copyright cocotb contributors
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations

import collections.abc
import inspect
import logging
import sys
import traceback
from asyncio import CancelledError, InvalidStateError
from bdb import BdbQuit
from collections.abc import Coroutine, Generator
from enum import auto
from functools import cached_property
from types import CoroutineType, SimpleNamespace
from typing import (
    Any,
    Callable,
    Generic,
    TypeVar,
    cast,
)

import cocotb
import cocotb._event_loop
from cocotb import debug
from cocotb._base_triggers import Trigger, TriggerCallback
from cocotb._bridge import bridge, resume
from cocotb._deprecation import deprecated
from cocotb._utils import DocEnum, extract_coro_stack, remove_traceback_frames

if sys.version_info >= (3, 11):
    from typing import Self

__all__ = (
    "Join",
    "Task",
    "TaskComplete",
    "bridge",
    "current_task",
    "resume",
)

# Set __module__ on re-exports
bridge.__module__ = __name__
resume.__module__ = __name__

#: Task result type
ResultType = TypeVar("ResultType")


class _TaskState(DocEnum):
    """State of a Task."""

    UNSTARTED = (auto(), "Task created, but never run and not scheduled")
    SCHEDULED = (auto(), "Task queued to run soon")
    PENDING = (auto(), "Task waiting for Trigger to fire")
    RUNNING = (auto(), "Task is currently running")
    FINISHED = (auto(), "Task has finished with a value")
    ERRORED = (auto(), "Task has finished with an Exception")
    CANCELLED = (auto(), "Task was cancelled before it finished")


[docs] class Task(Generic[ResultType]): """Concurrently executing task. This class is not intended for users to directly instantiate. Use :func:`cocotb.create_task` to create a Task object or :func:`cocotb.start_soon` to create a Task and schedule it to run. .. versionchanged:: 1.8 Moved to the ``cocotb.task`` module. .. versionchanged:: 2.0 The ``retval``, ``_finished``, and ``__bool__`` methods were removed. Use :meth:`result`, :meth:`done`, and :meth:`done` methods instead, respectively. """ _id_count = 0 # used by the scheduler for debug def __init__( self, inst: Coroutine[Trigger, None, ResultType], *, name: str | None = None ) -> None: self._native_coroutine: bool if inspect.iscoroutine(inst): self._native_coroutine = True elif isinstance(inst, collections.abc.Coroutine): self._native_coroutine = False elif inspect.iscoroutinefunction(inst): raise TypeError( f"Coroutine function {inst} should be called prior to being scheduled." ) elif inspect.isasyncgen(inst): raise TypeError( f"{inst.__qualname__} is an async generator, not a coroutine. " "You likely used the yield keyword instead of await." ) else: raise TypeError(f"{inst} isn't a valid coroutine!") self._coro = inst self._state: _TaskState = _TaskState.UNSTARTED self._outcome: ResultType | BaseException self._trigger: Trigger self._schedule_callback: cocotb._event_loop.ScheduledCallback self._trigger_callback: TriggerCallback self._done_callbacks: list[Callable[[Task[ResultType]], None]] = [] self._must_cancel: int = 0 self._locals = SimpleNamespace() self._exc: BaseException | None = None self._task_id = self._id_count type(self)._id_count += 1 self._name = f"Task {self._task_id}" if name is None else name @property def locals(self) -> SimpleNamespace: '''Task-local variables. A modifiable namespace where any user-defined variables can be added. Use :func:`~cocotb.task.current_task` to access the current Task's locals. .. autolink-skip:: .. code-block:: python def get_rng() -> random.Random: """Get the random number generator for the current Task.""" try: return current_task().locals.rng except AttributeError: rng = random.Random() current_task().locals.rng = rng return rng # Use Task-local RNG to drive a signal rng = get_rng() for _ in range(10): await drive(rng.randint(0, 100)) .. versionadded:: 2.0 ''' return self._locals
[docs] def get_name(self) -> str: """Return the name of the :class:`!Task`. If not set using :meth:`set_name` or passed during construction, a reasonable default name is generated. """ return self._name
[docs] def set_name(self, value: object) -> None: """Set the name of the :class:`!Task`. Args: value: Any object which can be converted to a :class:`str` to use as the name. """ if "_log" in vars(self): del self._log # clear cached value self._name = str(value)
@cached_property def _log(self) -> logging.Logger: return logging.getLogger(f"cocotb.task.{self._name}") def __str__(self) -> str: # TODO Do we really need this? return f"<{self._name}>" def _get_coro_stack(self) -> traceback.StackSummary: """Get the coroutine callstack of this Task. Assumes :attr:`_coro` is a native Python coroutine object. Raises: TypeError: If :attr:`_coro` is not a native Python coroutine object. """ if not self._native_coroutine: raise TypeError( "Task._get_coro_stack() can only be called on native Python coroutines." ) coro_stack = extract_coro_stack( cast("CoroutineType[Trigger, None, ResultType]", self._coro) ) # Remove Trigger.__await__() from the stack, as it's not really useful if len(coro_stack) > 0 and coro_stack[-1].name == "__await__": coro_stack.pop() return coro_stack def __repr__(self) -> str: if self._native_coroutine: coro_stack = self._get_coro_stack() try: coro_name = coro_stack[-1].name # coro_stack may be empty if: # - exhausted generator # - finished coroutine except IndexError: try: coro_name = self._coro.__name__ except AttributeError: coro_name = type(self._coro).__name__ else: coro_name = type(self._coro).__name__ state = self._state if state is _TaskState.RUNNING: return f"<{self._name} running coro={coro_name}()>" elif state is _TaskState.FINISHED: return ( f"<{self._name} finished coro={coro_name}() outcome={self._outcome!r}>" ) elif state is _TaskState.ERRORED: return f"<{self._name} error coro={coro_name}() outcome={self._outcome!r}>" elif state is _TaskState.PENDING: return ( f"<{self._name} pending coro={coro_name}() trigger={self._trigger!r}>" ) elif state is _TaskState.SCHEDULED: return f"<{self._name} scheduled coro={coro_name}()>" elif state is _TaskState.UNSTARTED: return f"<{self._name} created coro={coro_name}()>" elif state is _TaskState.CANCELLED: return f"<{self._name} cancelled coro={coro_name} with={self._outcome!r}" else: raise RuntimeError("Task in unknown state") def _ensure_started(self) -> None: state = self._state if state is _TaskState.UNSTARTED: if debug.debug: self._log.debug("Starting %r", self) self._schedule_resume() elif state in ( _TaskState.FINISHED, _TaskState.ERRORED, _TaskState.CANCELLED, ): raise RuntimeError("Cannot start a finished Task") # RUNNING, SCHEDULED, PENDING are already running def _set_outcome( self, result: ResultType | BaseException, state: _TaskState, ) -> None: self._outcome = result self._state = state # Run done callbacks. for callback in self._done_callbacks: callback(self) # Wake up waiting Tasks. self.complete._react() self._join._react() def _schedule_resume(self, exc: BaseException | None = None) -> None: self._state = _TaskState.SCHEDULED self._exc = exc self._schedule_callback = cocotb._event_loop._inst.schedule(self._resume) def _resume(self) -> None: """Resume execution of the Task. Runs until the coroutine ends, raises, or yields a Trigger. Can optionally throw an Exception into the Task. Args: exc: :exc:`BaseException` to throw into the coroutine or nothing. Returns: The object yielded from the coroutine or ``None`` if coroutine finished. """ if debug.debug: self._log.debug("Resuming %r", self) self._state = _TaskState.RUNNING # Set this Task and the current. Unset in finally block. global _current_task _current_task = self try: if self._exc is None: trigger = self._coro.send(None) else: trigger = self._coro.throw(self._exc) except StopIteration as e: if self._must_cancel: if debug.debug: self._log.debug("Task %r was cancelled but exited normally", self) self._set_outcome( RuntimeError( "Task was cancelled, but exited normally. Did you forget to re-raise the CancelledError?" ), _TaskState.ERRORED, ) else: if debug.debug: self._log.debug( "Task %r finished with value of type %r", self, type(e.value) ) self._set_outcome(e.value, _TaskState.FINISHED) except (KeyboardInterrupt, SystemExit, BdbQuit) as e: # Allow these to bubble up to the execution root to fail the sim immediately. # This follows asyncio's behavior. if debug.debug: self._log.debug( "Task %r errored with exception of type %r", self, type(e) ) self._set_outcome( remove_traceback_frames(e, ["_resume"]), _TaskState.ERRORED ) raise except CancelledError as e: if debug.debug: # Print the message only if it exists reason = str(e) msg = "Task %r was cancelled" if reason: msg += f": {reason}" self._log.debug(msg, self) self._set_outcome( remove_traceback_frames(e, ["_resume"]), _TaskState.CANCELLED ) except BaseException as e: if debug.debug: self._log.debug( "Task %r errored with exception of type %r", self, type(e) ) self._set_outcome( remove_traceback_frames(e, ["_resume"]), _TaskState.ERRORED ) else: if self._must_cancel: if debug.debug: self._log.debug("Task %r was cancelled but continued running", self) self._set_outcome( RuntimeError( "Task was cancelled, but continued running. Did you forget to re-raise the CancelledError?" ), _TaskState.ERRORED, ) elif not isinstance(trigger, Trigger): if debug.debug: self._log.debug( "Task %r yielded non-Trigger of type: %r", self, type(trigger) ) self._schedule_resume( TypeError( f"Coroutine yielded object of type {type(trigger)!r}, which the scheduler can't handle." ) ) else: self._state = _TaskState.PENDING self._trigger = trigger # `trigger._register()`` calls `trigger._prime()` and `trigger._prime()` # can call `trigger._react()`, which can immediately schedule this task. # So setting the state to PENDING *must* be done *before* calling # `_register()` so the possible call to `_react()` can override the # state correctly. # TODO Don't allow `_prime()` to call `_react()`? try: self._trigger_callback = trigger._register(self._schedule_resume) except Exception as e: self._schedule_resume(remove_traceback_frames(e, ["_resume"])) else: if debug.debug: self._log.debug("Pending %r on %r", self, trigger) finally: _current_task = None
[docs] @deprecated("`task.kill()` is deprecated in favor of `task.cancel()`") def kill(self) -> None: """Kill a coroutine.""" state = self._state if state is _TaskState.PENDING: # Unprime triggers if pending. self._trigger_callback.cancel() elif state is _TaskState.SCHEDULED: # Unschedule if scheduled. self._schedule_callback.cancel() elif state is _TaskState.UNSTARTED: pass elif state is _TaskState.RUNNING: raise RuntimeError("Can't kill currently running Task") else: return # The coroutine may have never been started (happens in UNSTARTED or SCHEDULED state) # and this will cause a ResourceWarning if the coroutine is not manually closed. # This is only an issue because kill() does not run the coroutine after it's killed. if ( self._native_coroutine and inspect.getcoroutinestate(self._coro) == "CORO_CREATED" ): self._coro.close() if debug.debug: self._log.debug("Killed %r", self) self._set_outcome( None, # type: ignore # `kill()` sets the result to None regardless of the ResultType _TaskState.FINISHED, )
@cached_property def complete(self) -> TaskComplete[ResultType]: r"""Trigger which fires when the Task completes. Unlike :meth:`join`, this Trigger does not return the result of the Task when :keyword:`await`\ ed. .. code-block:: python async def coro_inner(): await Timer(1, unit="ns") raise ValueError("Oops") task = cocotb.start_soon(coro_inner()) await task.complete # no exception raised here assert task.exception() == ValueError("Oops") """ return TaskComplete._make(self)
[docs] @deprecated( "Using `task` directly is preferred to `task.join()` in all situations where the latter could be used." ) def join(self) -> Join[ResultType]: r"""Block until the Task completes and return the result. Equivalent to calling :class:`Join(self) <cocotb.task.Join>`. .. code-block:: python async def coro_inner(): await Timer(1, unit="ns") return "Hello world" task = cocotb.start_soon(coro_inner()) result = await task.join() assert result == "Hello world" Returns: Object that can be :keyword:`await`\ ed or passed into :class:`~cocotb.triggers.First` or :class:`~cocotb.triggers.Combine`; the result of which will be the result of the Task. .. deprecated:: 2.0 Using ``task`` directly is preferred to ``task.join()`` in all situations where the latter could be used. """ return self._join
@cached_property def _join(self) -> Join[ResultType]: return Join._make(self)
[docs] def cancel(self, msg: str | None = None) -> bool: """Cancel a Task's further execution. When a Task is cancelled, a :exc:`asyncio.CancelledError` is thrown into the Task. Returns: ``True`` if the Task was cancelled; ``False`` otherwise. """ state = self._state if state in ( _TaskState.FINISHED, _TaskState.ERRORED, _TaskState.CANCELLED, ): return False elif state is _TaskState.RUNNING: raise RuntimeError("Can't cancel() currently running Task") # Set state to do cancel if msg is None: cancelled_error = CancelledError() else: cancelled_error = CancelledError(msg) if debug.debug: self._log.debug("Cancelling %r", self) if state is _TaskState.PENDING: # Unprime triggers if pending. self._trigger_callback.cancel() self._schedule_resume(cancelled_error) elif state is _TaskState.SCHEDULED: # hijack scheduled callback if scheduled. self._exc = cancelled_error else: # UNSTARTED # Schedule anyways, the error will come out when calling coro.throw() on an # unstarted coroutine and this will prevent ResourceWarnings from un-awaited # coroutines. self._schedule_resume(cancelled_error) self._must_cancel += 1 return True
def _cancel_now(self, msg: str | None = None) -> bool: """Like cancel(), but throws CancelledError into the Task and puts it into a "done" state immediately. Not safe to be called from a running Task. Only from done callbacks or scheduler or Task internals. """ state = self._state if state is _TaskState.PENDING: # Unprime triggers if pending. self._trigger_callback.cancel() elif state is _TaskState.SCHEDULED: # Unschedule if scheduled. self._schedule_callback.cancel() elif state is _TaskState.UNSTARTED: # Resume anyways, the error will come out when calling coro.throw() on an # unstarted coroutine and this will prevent ResourceWarnings from un-awaited # coroutines. pass elif state is _TaskState.RUNNING: raise RuntimeError("Can't _cancel_now() currently running Task") else: # FINISHED, ERRORED, CANCELLED return False # Set state to do cancel if msg is None: cancelled_error = CancelledError() else: cancelled_error = CancelledError(msg) self._must_cancel += 1 # Throw CancelledError now self._exc = cancelled_error self._resume() return True def _uncancel(self) -> None: """Prevents :exc:`RuntimeError` from occurring when cancelled Tasks suppress :exc:`asyncio.CancelledError`. Currently only useful for :class:`.TaskManager`. """ if self._must_cancel > 0: self._must_cancel -= 1
[docs] def cancelled(self) -> bool: """Return ``True`` if the Task was cancelled.""" return self._state is _TaskState.CANCELLED
[docs] def done(self) -> bool: """Return ``True`` if the Task has finished executing.""" return self._state in ( _TaskState.FINISHED, _TaskState.ERRORED, _TaskState.CANCELLED, )
[docs] def result(self) -> ResultType: """Return the result of the Task. If the Task ran to completion, the result is returned. If the Task failed with an exception, the exception is re-raised. If the Task was cancelled, the :exc:`asyncio.CancelledError` is re-raised. If the coroutine is not yet complete, an :exc:`asyncio.InvalidStateError` is raised. """ state = self._state if state is _TaskState.CANCELLED: raise cast("CancelledError", self._outcome) elif state is _TaskState.FINISHED: return cast("ResultType", self._outcome) elif state is _TaskState.ERRORED: raise cast("BaseException", self._outcome) else: raise InvalidStateError("result is not yet available")
[docs] def exception(self) -> BaseException | None: """Return the exception of the Task. If the Task ran to completion, ``None`` is returned. If the Task failed with an exception, the exception is returned. If the Task was cancelled, the :exc:`asyncio.CancelledError` is re-raised. If the coroutine is not yet complete, an :exc:`asyncio.InvalidStateError` is raised. """ state = self._state if state is _TaskState.CANCELLED: raise cast("CancelledError", self._outcome) elif state is _TaskState.FINISHED: return None elif state is _TaskState.ERRORED: return cast("BaseException", self._outcome) else: raise InvalidStateError("result is not yet available")
def _add_done_callback(self, callback: Callable[[Task[ResultType]], None]) -> None: """Add *callback* to the list of callbacks to be run once the Task becomes "done". Args: callback: The callback to run once "done". .. note:: If the task is already done, calling this function will call the callback immediately. """ if self.done(): callback(self) self._done_callbacks.append(callback) def __await__(self) -> Generator[Trigger, None, ResultType]: if not self.done(): self._ensure_started() yield self.complete return self.result()
_current_task: Task[Any] | None = None
[docs] def current_task() -> Task[object]: """Return the currently running Task. Raises: RuntimeError: If no Task is running. """ if _current_task is None: raise RuntimeError("No currently running Task") return _current_task
[docs] class TaskComplete(Trigger, Generic[ResultType]): r"""Fires when a :class:`~cocotb.task.Task` completes. Unlike :class:`~cocotb.task.Join`, this Trigger does not return the result of the Task when :keyword:`await`\ ed. See :attr:`.Task.complete` for more information. .. warning:: This class cannot be instantiated in the normal way. You must use :attr:`.Task.complete`. .. versionadded:: 2.0 """ _task: Task[ResultType] def __new__(cls, task: Task[ResultType]) -> TaskComplete[ResultType]: raise NotImplementedError( "TaskComplete cannot be instantiated in this way. Use the `task.complete` attribute." ) @classmethod def _make(cls, task: Task[ResultType]) -> Self: self = super().__new__(cls) super().__init__(self) self._task = task return self def _prime(self) -> None: if self._task.done(): self._react() def __repr__(self) -> str: return f"{type(self).__qualname__}({self._task!s})" @property def task(self) -> Task[ResultType]: """The :class:`.Task` associated with this completion event.""" return self._task
[docs] class Join(TaskComplete[ResultType]): r"""Fires when a :class:`~cocotb.task.Task` completes and returns the Task's result. Equivalent to calling :meth:`task.join() <cocotb.task.Task.join>`. .. code-block:: python async def coro_inner(): await Timer(1, unit="ns") return "Hello world" task = cocotb.start_soon(coro_inner()) result = await Join(task) assert result == "Hello world" Args: task: The Task upon which to wait for completion. Returns: Object that can be :keyword:`await`\ ed or passed into :class:`~cocotb.triggers.First` or :class:`~cocotb.triggers.Combine`; the result of which will be the result of the Task. .. deprecated:: 2.0 Using ``task`` directly is preferred to ``Join(task)`` in all situations where the latter could be used. """ @deprecated( "Using `task` directly is preferred to `Join(task)` in all situations where the latter could be used." ) def __new__(cls, task: Task[ResultType]) -> Join[ResultType]: return task._join def __init__(self, task: Task[ResultType]) -> None: pass def __await__(self) -> Generator[Self, None, ResultType]: # type: ignore[override] yield self return self._task.result()