Source code for cocotb.task

# Copyright cocotb contributors
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause
import collections.abc
import inspect
import logging
import traceback
from asyncio import CancelledError, InvalidStateError
from bdb import BdbQuit
from enum import auto
from types import SimpleNamespace
from typing import (
    TYPE_CHECKING,
    Callable,
    Coroutine,
    Generator,
    Generic,
    List,
    Optional,
    TypeVar,
    Union,
    cast,
)

import cocotb
from cocotb._base_triggers import Trigger
from cocotb._bridge import bridge, resume
from cocotb._deprecation import deprecated
from cocotb._outcomes import Error, Outcome, Value
from cocotb._py_compat import Self, cached_property
from cocotb._utils import DocEnum, extract_coro_stack, remove_traceback_frames

if TYPE_CHECKING:
    from types import CoroutineType


__all__ = (
    "Join",
    "Task",
    "TaskComplete",
    "bridge",
    "current_task",
    "resume",
)

# Set __module__ on re-exports
bridge.__module__ = __name__
resume.__module__ = __name__

#: Task result type
ResultType = TypeVar("ResultType")


class _TaskState(DocEnum):
    """State of a Task."""

    UNSTARTED = (auto(), "Task created, but never run and not scheduled")
    SCHEDULED = (auto(), "Task queued to run soon")
    PENDING = (auto(), "Task waiting for Trigger to fire")
    RUNNING = (auto(), "Task is currently running")
    FINISHED = (auto(), "Task has finished with a value or Exception")
    CANCELLED = (auto(), "Task was cancelled before it finished")


[docs] class Task(Generic[ResultType]): """Concurrently executing task. This class is not intended for users to directly instantiate. Use :func:`cocotb.create_task` to create a Task object or :func:`cocotb.start_soon` to create a Task and schedule it to run. .. versionchanged:: 1.8 Moved to the ``cocotb.task`` module. .. versionchanged:: 2.0 The ``retval``, ``_finished``, and ``__bool__`` methods were removed. Use :meth:`result`, :meth:`done`, and :meth:`done` methods instead, respectively. """ _id_count = 0 # used by the scheduler for debug def __init__( self, inst: Coroutine[Trigger, None, ResultType], *, name: Optional[str] = None ) -> None: self._native_coroutine: bool if inspect.iscoroutinefunction(inst): raise TypeError( f"Coroutine function {inst} should be called prior to being scheduled." ) elif inspect.isasyncgen(inst): raise TypeError( f"{inst.__qualname__} is an async generator, not a coroutine. " "You likely used the yield keyword instead of await." ) elif inspect.iscoroutine(inst): self._native_coroutine = True elif isinstance(inst, collections.abc.Coroutine): self._native_coroutine = False else: raise TypeError(f"{inst} isn't a valid coroutine!") self._coro = inst self._state: _TaskState = _TaskState.UNSTARTED self._outcome: Union[Outcome[ResultType], None] = None self._trigger: Union[Trigger, None] = None self._done_callbacks: List[Callable[[Task[ResultType]], None]] = [] self._cancelled_msg: Union[str, None] = None self._must_cancel: bool = False self._locals = SimpleNamespace() self._task_id = self._id_count type(self)._id_count += 1 self._name = f"Task {self._task_id}" if name is None else name @property def locals(self) -> SimpleNamespace: '''Task-local variables. A modifiable namespace where any user-defined variables can be added. Use :func:`~cocotb.task.current_task` to access the current Task's locals. .. code-block:: python def get_rng() -> random.Random: """Get the random number generator for the current Task.""" try: return current_task().locals.rng except AttributeError: rng = random.Random() current_task().locals.rng = rng return rng # Use Task-local RNG to drive a signal rng = get_rng() for _ in range(10): await drive(rng.randint(0, 100)) .. versionadded:: 2.0 ''' return self._locals
[docs] def get_name(self) -> str: """Return the name of the :class:`!Task`. If not set using :meth:`set_name` or passed during construction, a reasonable default name is generated. """ return self._name
[docs] def set_name(self, value: object) -> None: """Set the name of the :class:`!Task`. Args: value: Any object which can be converted to a :class:`str` to use as the name. """ self._name = str(value)
@cached_property def _cancelled_error(self) -> CancelledError: if self._cancelled_msg is None: return CancelledError() else: return CancelledError(self._cancelled_msg) @cached_property def _log(self) -> logging.Logger: coro_name: str if self._native_coroutine: coro_name = self._coro.__qualname__ else: coro_name = type(self._coro).__qualname__ return logging.getLogger(f"cocotb.{self._name}.{coro_name}") def __str__(self) -> str: # TODO Do we really need this? return f"<{self._name}>" def _get_coro_stack(self) -> traceback.StackSummary: """Get the coroutine callstack of this Task. Assumes :attr:`_coro` is a native Python coroutine object. Raises: TypeError: If :attr:`_coro` is not a native Python coroutine object. """ if not self._native_coroutine: raise TypeError( "Task._get_coro_stack() can only be called on native Python coroutines." ) coro_stack = extract_coro_stack( cast("CoroutineType[Trigger, None, ResultType]", self._coro) ) # Remove Trigger.__await__() from the stack, as it's not really useful if len(coro_stack) > 0 and coro_stack[-1].name == "__await__": coro_stack.pop() return coro_stack def __repr__(self) -> str: if self._native_coroutine: coro_stack = self._get_coro_stack() try: coro_name = coro_stack[-1].name # coro_stack may be empty if: # - exhausted generator # - finished coroutine except IndexError: try: coro_name = self._coro.__name__ except AttributeError: coro_name = type(self._coro).__name__ else: coro_name = type(self._coro).__name__ if self._state is _TaskState.RUNNING: return f"<{self._name} running coro={coro_name}()>" elif self._state is _TaskState.FINISHED: return f"<{self._name} finished coro={coro_name}() outcome={self._outcome}>" elif self._state is _TaskState.PENDING: return f"<{self._name} pending coro={coro_name}() trigger={self._trigger}>" elif self._state is _TaskState.SCHEDULED: return f"<{self._name} scheduled coro={coro_name}()>" elif self._state is _TaskState.UNSTARTED: return f"<{self._name} created coro={coro_name}()>" elif self._state is _TaskState.CANCELLED: return f"<{self._name} cancelled coro={coro_name} with={self._cancelled_error} outcome={self._outcome}" else: raise RuntimeError("Task in unknown state") def _set_outcome( self, result: Outcome[ResultType], state: _TaskState = _TaskState.FINISHED ) -> None: self._outcome = result self._state = state # Run done callbacks. for callback in self._done_callbacks: callback(self) # Wake up waiting Tasks. cocotb._scheduler_inst._react(self.complete) cocotb._scheduler_inst._react(self._join) def _advance(self, exc: Union[BaseException, None]) -> Union[Trigger, None]: """Resume execution of the Task. Runs until the coroutine ends, raises, or yields a Trigger. Can optionally throw an Exception into the Task. Args: exc: :exc:`BaseException` to throw into the coroutine or nothing. Returns: The object yielded from the coroutine or ``None`` if coroutine finished. """ self._state = _TaskState.RUNNING if self._must_cancel: exc = self._cancelled_error try: if exc is None: trigger = self._coro.send(None) else: trigger = self._coro.throw(exc) except StopIteration as e: outcome = Value(e.value) if self._must_cancel: self._set_outcome( Error( RuntimeError( "Task was cancelled, but exited normally. Did you forget to re-raise the CancelledError?" ) ) ) else: self._set_outcome(outcome) return None except (KeyboardInterrupt, SystemExit, BdbQuit) as e: # Allow these to bubble up to the execution root to fail the sim immediately. # This follows asyncio's behavior. self._set_outcome(Error(remove_traceback_frames(e, ["_advance"]))) raise except CancelledError as e: self._set_outcome( Error(remove_traceback_frames(e, ["_advance"])), _TaskState.CANCELLED ) return None except BaseException as e: self._set_outcome(Error(remove_traceback_frames(e, ["_advance"]))) return None else: if self._must_cancel: self._set_outcome( Error( RuntimeError( "Task was cancelled, but continued running. Did you forget to re-raise the CancelledError?" ) ) ) return None else: return trigger def _schedule_resume(self, exc: Optional[BaseException] = None) -> None: cocotb._scheduler_inst._unschedule(self) cocotb._scheduler_inst._schedule_task_internal(self, exc)
[docs] @deprecated("`task.kill()` is deprecated in favor of `task.cancel()`") def kill(self) -> None: """Kill a coroutine.""" if self._state in (_TaskState.PENDING, _TaskState.SCHEDULED): # Unschedule if scheduled and unprime triggers if pending. cocotb._scheduler_inst._unschedule(self) elif self._state is _TaskState.UNSTARTED: # Don't need to unschedule. pass elif self._state in (_TaskState.FINISHED, _TaskState.CANCELLED): # Do nothing if already done. return else: raise RuntimeError("Can't kill currently running Task") # Close native coroutines if they were never resumed to prevent ResourceWarnings. if ( inspect.iscoroutine(self._coro) and inspect.getcoroutinestate(self._coro) == "CORO_CREATED" ): self._coro.close() self._set_outcome(Value(None)) # type: ignore # `kill()` sets the result to None regardless of the ResultType
@cached_property def complete(self) -> "TaskComplete[ResultType]": r"""Trigger which fires when the Task completes. Unlike :meth:`join`, this Trigger does not return the result of the Task when :keyword:`await`\ ed. .. code-block:: python async def coro_inner(): await Timer(1, unit="ns") raise ValueError("Oops") task = cocotb.start_soon(coro_inner()) await task.complete # no exception raised here assert task.exception() == ValueError("Oops") """ return TaskComplete._make(self)
[docs] @deprecated( "Using `task` directly is preferred to `task.join()` in all situations where the latter could be used." ) def join(self) -> "Join[ResultType]": r"""Block until the Task completes and return the result. Equivalent to calling :class:`Join(self) <cocotb.task.Join>`. .. code-block:: python async def coro_inner(): await Timer(1, unit="ns") return "Hello world" task = cocotb.start_soon(coro_inner()) result = await task.join() assert result == "Hello world" Returns: Object that can be :keyword:`await`\ ed or passed into :class:`~cocotb.triggers.First` or :class:`~cocotb.triggers.Combine`; the result of which will be the result of the Task. .. deprecated:: 2.0 Using ``task`` directly is preferred to ``task.join()`` in all situations where the latter could be used. """ return self._join
@cached_property def _join(self) -> "Join[ResultType]": return Join._make(self)
[docs] def cancel(self, msg: Optional[str] = None) -> bool: """Cancel a Task's further execution. When a Task is cancelled, a :exc:`asyncio.CancelledError` is thrown into the Task. Returns: ``True`` if the Task was cancelled; ``False`` otherwise. """ if self._state in {_TaskState.PENDING, _TaskState.SCHEDULED}: self._schedule_resume() elif self._state in (_TaskState.UNSTARTED, _TaskState.RUNNING): # (Re)schedule to throw CancelledError cocotb._scheduler_inst._schedule_task_internal(self) else: # Already finished or cancelled return False self._cancelled_msg = msg self._must_cancel = True return True
def _cancel_now(self, msg: Optional[str] = None) -> bool: """Like cancel(), but throws CancelledError into the Task and puts it into a "done" state immediately. Not safe to be called from a running Task. Only from done callbacks or scheduler or Task internals. """ if self.done(): return False self._cancelled_msg = msg self._must_cancel = True if self._state is _TaskState.UNSTARTED: # Must fail immediately as we can't start a coroutine with an exception. self._set_outcome(Error(self._cancelled_error), _TaskState.CANCELLED) else: # Unprime and unschedule the Task so it's out of the scheduler. cocotb._scheduler_inst._unschedule(self) # Force CancelledError to be thrown immediately. self._advance(None) return True
[docs] def cancelled(self) -> bool: """Return ``True`` if the Task was cancelled.""" return self._state is _TaskState.CANCELLED
[docs] def done(self) -> bool: """Return ``True`` if the Task has finished executing.""" return self._state in (_TaskState.FINISHED, _TaskState.CANCELLED)
[docs] def result(self) -> ResultType: """Return the result of the Task. If the Task ran to completion, the result is returned. If the Task failed with an exception, the exception is re-raised. If the Task was cancelled, the :exc:`asyncio.CancelledError` is re-raised. If the coroutine is not yet complete, an :exc:`asyncio.InvalidStateError` is raised. """ if self._state is _TaskState.CANCELLED: raise self._cancelled_error elif self._state is _TaskState.FINISHED: return cast("Outcome[ResultType]", self._outcome).get() else: raise InvalidStateError("result is not yet available")
[docs] def exception(self) -> Optional[BaseException]: """Return the exception of the Task. If the Task ran to completion, ``None`` is returned. If the Task failed with an exception, the exception is returned. If the Task was cancelled, the :exc:`asyncio.CancelledError` is re-raised. If the coroutine is not yet complete, an :exc:`asyncio.InvalidStateError` is raised. """ if self._state is _TaskState.CANCELLED: raise self._cancelled_error elif self._state is _TaskState.FINISHED: if isinstance(self._outcome, Error): return self._outcome.error else: return None else: raise InvalidStateError("result is not yet available")
def _add_done_callback( self, callback: Callable[["Task[ResultType]"], None] ) -> None: """Add *callback* to the list of callbacks to be run once the Task becomes "done". Args: callback: The callback to run once "done". .. note:: If the task is already done, calling this function will call the callback immediately. """ if self.done(): callback(self) self._done_callbacks.append(callback) def __await__(self) -> Generator[Trigger, None, ResultType]: if self._state is _TaskState.UNSTARTED: cocotb._scheduler_inst._schedule_task_internal(self) yield self.complete elif not self.done(): yield self.complete return self.result()
[docs] def current_task() -> Task[object]: """Return the currently running Task. Raises: RuntimeError: If no Task is running. .. versionadded:: 2.0 """ task = cocotb._scheduler_inst._current_task if task is None: raise RuntimeError("No Task is currently running") return task
[docs] class TaskComplete(Trigger, Generic[ResultType]): r"""Fires when a :class:`~cocotb.task.Task` completes. Unlike :class:`~cocotb.task.Join`, this Trigger does not return the result of the Task when :keyword:`await`\ ed. See :attr:`.Task.complete` for more information. .. warning:: This class cannot be instantiated in the normal way. You must use :attr:`.Task.complete`. .. versionadded:: 2.0 """ _task: Task[ResultType] def __new__(cls, task: Task[ResultType]) -> "TaskComplete[ResultType]": raise NotImplementedError( "TaskComplete cannot be instantiated in this way. Use the `task.complete` attribute." ) @classmethod def _make(cls, task: Task[ResultType]) -> "Self": self = super().__new__(cls) super().__init__(self) self._task = task return self def _prime(self, callback: Callable[["Self"], None]) -> None: if self._task.done(): callback(self) else: super()._prime(callback) def __repr__(self) -> str: return f"{type(self).__qualname__}({self._task!s})" @property def task(self) -> Task[ResultType]: """The :class:`.Task` associated with this completion event.""" return self._task
[docs] class Join(TaskComplete[ResultType]): r"""Fires when a :class:`~cocotb.task.Task` completes and returns the Task's result. Equivalent to calling :meth:`task.join() <cocotb.task.Task.join>`. .. code-block:: python async def coro_inner(): await Timer(1, unit="ns") return "Hello world" task = cocotb.start_soon(coro_inner()) result = await Join(task) assert result == "Hello world" Args: task: The Task upon which to wait for completion. Returns: Object that can be :keyword:`await`\ ed or passed into :class:`~cocotb.triggers.First` or :class:`~cocotb.triggers.Combine`; the result of which will be the result of the Task. .. deprecated:: 2.0 Using ``task`` directly is preferred to ``Join(task)`` in all situations where the latter could be used. """ @deprecated( "Using `task` directly is preferred to `Join(task)` in all situations where the latter could be used." ) def __new__(cls, task: Task[ResultType]) -> "Join[ResultType]": return task._join def __init__(self, task: Task[ResultType]) -> None: pass def __await__(self) -> Generator["Self", None, ResultType]: # type: ignore[override] yield self return self._task.result()