Source code for dagster.core.execution.context.compute
from abc import ABC, abstractmethod, abstractproperty
from typing import Any, Optional
from dagster import check
from dagster.core.definitions.dependency import Solid, SolidHandle
from dagster.core.definitions.mode import ModeDefinition
from dagster.core.definitions.pipeline import PipelineDefinition
from dagster.core.definitions.resource import Resources
from dagster.core.definitions.solid import SolidDefinition
from dagster.core.definitions.step_launcher import StepLauncher
from dagster.core.errors import DagsterInvalidPropertyError
from dagster.core.instance import DagsterInstance
from dagster.core.log_manager import DagsterLogManager
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.utils.forked_pdb import ForkedPdb
from .system import StepExecutionContext
class AbstractComputeExecutionContext(ABC): # pylint: disable=no-init
"""Base class for solid context implemented by SolidExecutionContext and DagstermillExecutionContext"""
@abstractmethod
def has_tag(self, key) -> bool:
"""Implement this method to check if a logging tag is set."""
@abstractmethod
def get_tag(self, key: str) -> str:
"""Implement this method to get a logging tag."""
@abstractproperty
def run_id(self) -> str:
"""The run id for the context."""
@abstractproperty
def solid_def(self) -> SolidDefinition:
"""The solid definition corresponding to the execution step being executed."""
@abstractproperty
def solid(self) -> Solid:
"""The solid corresponding to the execution step being executed."""
@abstractproperty
def pipeline_def(self) -> PipelineDefinition:
"""The pipeline being executed."""
@abstractproperty
def pipeline_run(self) -> PipelineRun:
"""The PipelineRun object corresponding to the execution."""
@abstractproperty
def resources(self) -> Any:
"""Resources available in the execution context."""
@abstractproperty
def log(self) -> DagsterLogManager:
"""The log manager available in the execution context."""
@abstractproperty
def solid_config(self) -> Any:
"""The parsed config specific to this solid."""
[docs]class SolidExecutionContext(AbstractComputeExecutionContext):
"""The ``context`` object that can be made available as the first argument to a solid's compute
function.
The context object provides system information such as resources, config, and logging to a
solid's compute function. Users should not instantiate this object directly.
Example:
.. code-block:: python
@solid
def hello_world(context: SolidExecutionContext):
context.log.info("Hello, world!")
"""
__slots__ = ["_step_execution_context"]
def __init__(self, step_execution_context: StepExecutionContext):
self._step_execution_context = check.inst_param(
step_execution_context,
"step_execution_context",
StepExecutionContext,
)
self._pdb: Optional[ForkedPdb] = None
@property
def solid_config(self) -> Any:
solid_config = self._step_execution_context.environment_config.solids.get(
str(self.solid_handle)
)
return solid_config.config if solid_config else None
@property
def pipeline_run(self) -> PipelineRun:
"""PipelineRun: The current pipeline run"""
return self._step_execution_context.pipeline_run
@property
def instance(self) -> DagsterInstance:
"""DagsterInstance: The current Dagster instance"""
return self._step_execution_context.instance
@property
def pdb(self) -> ForkedPdb:
"""dagster.utils.forked_pdb.ForkedPdb: Gives access to pdb debugging from within the solid.
Example:
.. code-block:: python
@solid
def debug_solid(context):
context.pdb.set_trace()
"""
if self._pdb is None:
self._pdb = ForkedPdb()
return self._pdb
@property
def file_manager(self):
"""Deprecated access to the file manager.
:meta private:
"""
raise DagsterInvalidPropertyError(
"You have attempted to access the file manager which has been moved to resources in 0.10.0. "
"Please access it via `context.resources.file_manager` instead."
)
@property
def resources(self) -> Resources:
"""Resources: The currently available resources."""
return self._step_execution_context.resources
@property
def step_launcher(self) -> Optional[StepLauncher]:
"""Optional[StepLauncher]: The current step launcher, if any."""
return self._step_execution_context.step_launcher
@property
def run_id(self) -> str:
"""str: The id of the current execution's run."""
return self._step_execution_context.run_id
@property
def run_config(self) -> dict:
"""dict: The run config for the current execution."""
return self._step_execution_context.run_config
@property
def pipeline_def(self) -> PipelineDefinition:
"""PipelineDefinition: The currently executing pipeline."""
return self._step_execution_context.pipeline_def
@property
def pipeline_name(self) -> str:
"""str: The name of the currently executing pipeline."""
return self._step_execution_context.pipeline_name
@property
def mode_def(self) -> ModeDefinition:
"""ModeDefinition: The mode of the current execution."""
return self._step_execution_context.mode_def
@property
def log(self) -> DagsterLogManager:
"""DagsterLogManager: The log manager available in the execution context."""
return self._step_execution_context.log
@property
def solid_handle(self) -> SolidHandle:
"""SolidHandle: The current solid's handle.
:meta private:
"""
return self._step_execution_context.solid_handle
@property
def solid(self) -> Solid:
"""Solid: The current solid object.
:meta private:
"""
return self._step_execution_context.pipeline_def.get_solid(self.solid_handle)
@property
def solid_def(self) -> SolidDefinition:
"""SolidDefinition: The current solid definition."""
return self._step_execution_context.pipeline_def.get_solid(self.solid_handle).definition
[docs] def has_tag(self, key: str) -> bool:
"""Check if a logging tag is set.
Args:
key (str): The tag to check.
Returns:
bool: Whether the tag is set.
"""
return self._step_execution_context.has_tag(key)
[docs] def get_tag(self, key: str) -> str:
"""Get a logging tag.
Args:
key (tag): The tag to get.
Returns:
str: The value of the tag.
"""
return self._step_execution_context.get_tag(key)
def get_step_execution_context(self) -> StepExecutionContext:
"""Allows advanced users (e.g. framework authors) to punch through to the underlying
step execution context.
:meta private:
Returns:
StepExecutionContext: The underlying system context.
"""
return self._step_execution_context