Source code for cocotb.task

# Copyright cocotb contributors
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause
import collections.abc
import inspect
import logging
import os
import warnings
from asyncio import CancelledError, InvalidStateError
from enum import auto
from typing import (
    Any,
    Callable,
    Coroutine,
    Generator,
    Generic,
    List,
    Optional,
    TypeVar,
)

import cocotb
import cocotb.triggers
from cocotb._deprecation import deprecated
from cocotb._outcomes import Error, Outcome, Value
from cocotb._py_compat import cached_property
from cocotb._utils import DocEnum, extract_coro_stack, remove_traceback_frames

#: Task result type
ResultType = TypeVar("ResultType")

# Sadly the Python standard logging module is very slow so it's better not to
# make any calls by testing a boolean flag first
_debug = "COCOTB_SCHEDULER_DEBUG" in os.environ


[docs] class Task(Generic[ResultType]): """Concurrently executing task. This class is not intended for users to directly instantiate. Use :func:`cocotb.create_task` to create a Task object, or use :func:`cocotb.start_soon` or :func:`cocotb.start` to create a Task and schedule it to run. .. versionchanged:: 1.8.0 Moved to the ``cocotb.task`` module. .. versionchanged:: 2.0 The ``retval``, ``_finished``, and ``__bool__`` methods were removed. Use :meth:`result`, :meth:`done`, and :meth:`done` methods instead, respectively. """ class _State(DocEnum): """State of a Task.""" UNSTARTED = (auto(), "Task created, but never run and not scheduled") SCHEDULED = (auto(), "Task in Scheduler queue to run soon") PENDING = (auto(), "Task waiting for Trigger to fire") RUNNING = (auto(), "Task is currently running") FINISHED = (auto(), "Task has finished with a value or Exception") CANCELLED = (auto(), "Task was cancelled before it finished") _name: str = "Task" # class name of schedulable task _id_count = 0 # used by the scheduler for debug def __init__(self, inst): if inspect.iscoroutinefunction(inst): raise TypeError( f"Coroutine function {inst} should be called prior to being scheduled." ) elif inspect.isasyncgen(inst): raise TypeError( f"{inst.__qualname__} is an async generator, not a coroutine. " "You likely used the yield keyword instead of await." ) elif not isinstance(inst, collections.abc.Coroutine): raise TypeError(f"{inst} isn't a valid coroutine!") self._coro: Coroutine = inst self._state: Task._State = Task._State.UNSTARTED self._outcome: Optional[Outcome[ResultType]] = None self._trigger: Optional[cocotb.triggers.Trigger] = None self._cancelled_error: Optional[CancelledError] = None self._done_callbacks: List[Callable[[Task[Any]], Any]] = [] self._task_id = self._id_count type(self)._id_count += 1 self.__name__ = f"{type(self)._name} {self._task_id}" self.__qualname__ = self.__name__ @cached_property def log(self) -> logging.Logger: # Creating a logger is expensive, only do it if we actually plan to # log anything return logging.getLogger( f"cocotb.{self.__qualname__}.{self._coro.__qualname__}" ) def __str__(self) -> str: return f"<{self.__name__}>" def _get_coro_stack(self) -> Any: """Get the coroutine callstack of this Task.""" coro_stack = extract_coro_stack(self._coro) # Remove Trigger.__await__() from the stack, as it's not really useful if len(coro_stack) > 0 and coro_stack[-1].name == "__await__": coro_stack.pop() return coro_stack def __repr__(self) -> str: coro_stack = self._get_coro_stack() if self._state is Task._State.RUNNING: fmt = "<{name} running coro={coro}()>" elif self._state is Task._State.FINISHED: fmt = "<{name} finished coro={coro}() outcome={outcome}>" elif self._state is Task._State.PENDING: fmt = "<{name} pending coro={coro}() trigger={trigger}>" elif self._state is Task._State.SCHEDULED: fmt = "<{name} scheduled coro={coro}()>" elif self._state is Task._State.UNSTARTED: fmt = "<{name} created coro={coro}()>" elif self._state is Task._State.CANCELLED: fmt = ( "<{name} cancelled coro={coro} with={cancelled_error} outcome={outcome}" ) else: raise RuntimeError("Task in unknown state") try: coro_name = coro_stack[-1].name # coro_stack may be empty if: # - exhausted generator # - finished coroutine except IndexError: try: coro_name = self._coro.__name__ except AttributeError: coro_name = type(self._coro).__name__ repr_string = fmt.format( name=self.__name__, coro=coro_name, trigger=self._trigger, outcome=self._outcome, cancelled_error=self._cancelled_error, ) return repr_string def _advance(self, outcome: Outcome) -> Any: """Advance to the next yield in this coroutine. Args: outcome: The :any:`outcomes.Outcome` object to resume with. Returns: The object yielded from the coroutine or None if coroutine finished """ try: self._state = Task._State.RUNNING return outcome.send(self._coro) except StopIteration as e: self._outcome = Value(e.value) self._state = Task._State.FINISHED except (KeyboardInterrupt, SystemExit): # Allow these to bubble up to the execution root to fail the sim immediately. # This follows asyncio's behavior. raise except BaseException as e: self._outcome = Error(remove_traceback_frames(e, ["_advance", "send"])) self._state = Task._State.FINISHED if self.done(): self._do_done_callbacks()
[docs] def kill(self) -> None: """Kill a coroutine.""" if self.done(): # already finished, nothing to kill return if _debug: self.log.debug("kill() called on coroutine") # todo: probably better to throw an exception for anyone waiting on the coroutine self._outcome = Value(None) cocotb._scheduler_inst._unschedule(self) # Close coroutine so there is no RuntimeWarning that it was never awaited self._coro.close() self._state = Task._State.FINISHED self._do_done_callbacks()
def _do_done_callbacks(self) -> None: for callback in self._done_callbacks: callback(self) @cached_property def complete(self) -> "cocotb.triggers.TaskComplete[ResultType]": r"""Trigger which fires when the Task completes.""" return cocotb.triggers.TaskComplete._make(self)
[docs] @deprecated( "Using `task` directly is prefered to `task.join()` in all situations where the latter could be used." ) def join(self) -> "Task[ResultType]": r"""Block until the Task completes and return the result. Equivalent to calling :func:`Join(self) <cocotb.triggers.Join>`. .. code-block:: python3 async def coro_inner(): await Timer(1, units="ns") return "Hello world" task = cocotb.start_soon(coro_inner()) result = await task.join() assert result == "Hello world" Returns: Object that can be :keyword:`await`\ ed or passed into :class:`~cocotb.triggers.First` or :class:`~cocotb.triggers.Combine`; the result of which will be the result of the Task. .. deprecated:: 2.0 Using ``task`` directly is prefered to ``task.join()`` in all situations where the latter could be used. """ return self
[docs] def cancel(self, msg: Optional[str] = None) -> None: """Cancel a Task's further execution. When a Task is cancelled, a :exc:`asyncio.CancelledError` is thrown into the Task. """ if self.done(): return self._cancelled_error = CancelledError(msg) warnings.warn( "Calling this method will cause a CancelledError to be thrown in the " "Task sometime in the future.", FutureWarning, stacklevel=2, ) cocotb._scheduler_inst._unschedule(self) # Close coroutine so there is no RuntimeWarning that it was never awaited self._coro.close() self._state = Task._State.CANCELLED self._do_done_callbacks()
[docs] def cancelled(self) -> bool: """Return ``True`` if the Task was cancelled.""" return self._state is Task._State.CANCELLED
[docs] def done(self) -> bool: """Return ``True`` if the Task has finished executing.""" return self._state in (Task._State.FINISHED, Task._State.CANCELLED)
[docs] def result(self) -> ResultType: """Return the result of the Task. If the Task ran to completion, the result is returned. If the Task failed with an exception, the exception is re-raised. If the Task was cancelled, the :exc:`asyncio.CancelledError` is re-raised. If the coroutine is not yet complete, an :exc:`asyncio.InvalidStateError` is raised. """ if self._state is Task._State.CANCELLED: raise self._cancelled_error elif self._state is Task._State.FINISHED: return self._outcome.get() else: raise InvalidStateError("result is not yet available")
[docs] def exception(self) -> Optional[BaseException]: """Return the exception of the Task. If the Task ran to completion, ``None`` is returned. If the Task failed with an exception, the exception is returned. If the Task was cancelled, the :exc:`asyncio.CancelledError` is re-raised. If the coroutine is not yet complete, an :exc:`asyncio.InvalidStateError` is raised. """ if self._state is Task._State.CANCELLED: raise self._cancelled_error elif self._state is Task._State.FINISHED: if isinstance(self._outcome, Error): return self._outcome.error else: return None else: raise InvalidStateError("result is not yet available")
def _add_done_callback(self, callback: Callable[["Task[ResultType]"], Any]) -> None: """Add *callback* to the list of callbacks to be run once the Task becomes "done". Args: callback: The callback to run once "done". .. note:: If the task is already done, calling this function will call the callback immediately. """ if self.done(): callback(self) self._done_callbacks.append(callback) def __await__(self) -> Generator[Any, Any, ResultType]: # It's tempting to use `return (yield from self._coro)` here, # which bypasses the scheduler. Unfortunately, this means that # we can't keep track of the result or state of the coroutine, # things which we expose in our public API. If you want the # efficiency of bypassing the scheduler, remove the `@coroutine` # decorator from your `async` functions. # Hand the coroutine back to the scheduler trampoline. yield self return self.result()
class _RunningTest(Task[None]): """ The result of calling a :class:`cocotb.test` decorated object. All this class does is change ``__name__`` to show "Test" instead of "Task". .. versionchanged:: 1.8.0 Moved to the ``cocotb.task`` module. """ _name: str = "Test" def __init__(self, inst: Coroutine[Any, Any, None], name: str) -> None: super().__init__(inst) self.__name__ = f"{type(self)._name} {name}" self.__qualname__ = self.__name__