Source code for dagstermill.context
from typing import Any, Dict, Set
from dagster import PipelineDefinition, PipelineRun, SolidDefinition, check
from dagster.core.definitions.dependency import Solid
from dagster.core.execution.context.compute import AbstractComputeExecutionContext
from dagster.core.execution.context.system import PlanExecutionContext
from dagster.core.log_manager import DagsterLogManager
from dagster.core.system_config.objects import EnvironmentConfig
[docs]class DagstermillExecutionContext(AbstractComputeExecutionContext):
"""Dagstermill-specific execution context.
Do not initialize directly: use :func:`dagstermill.get_context`.
"""
def __init__(
self,
pipeline_context: PlanExecutionContext,
pipeline_def: PipelineDefinition,
resource_keys_to_init: Set[str],
solid_name: str,
solid_config: Any = None,
):
self._pipeline_context = check.inst_param(
pipeline_context, "pipeline_context", PlanExecutionContext
)
self._pipeline_def = check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
self._resource_keys_to_init = check.set_param(
resource_keys_to_init, "resource_keys_to_init", of_type=str
)
self.solid_name = check.str_param(solid_name, "solid_name")
self._solid_config = solid_config
[docs] def has_tag(self, key: str) -> bool:
"""Check if a logging tag is defined on the context.
Args:
key (str): The key to check.
Returns:
bool
"""
check.str_param(key, "key")
return self._pipeline_context.has_tag(key)
[docs] def get_tag(self, key: str) -> str:
"""Get a logging tag defined on the context.
Args:
key (str): The key to get.
Returns:
str
"""
check.str_param(key, "key")
return self._pipeline_context.get_tag(key)
@property
def run_id(self) -> str:
"""str: The run_id for the context."""
return self._pipeline_context.run_id
@property
def run_config(self) -> Dict[str, Any]:
"""dict: The run_config for the context."""
return self._pipeline_context.run_config
@property
def environment_config(self) -> EnvironmentConfig:
""":class:`dagster.EnvironmentConfig`: The environment_config for the context"""
return self._pipeline_context.environment_config
@property
def logging_tags(self) -> Dict[str, str]:
"""dict: The logging tags for the context."""
return self._pipeline_context.logging_tags
@property
def pipeline_name(self) -> str:
return self._pipeline_context.pipeline_name
@property
def pipeline_def(self) -> PipelineDefinition:
""":class:`dagster.PipelineDefinition`: The pipeline definition for the context.
This will be a dagstermill-specific shim.
"""
return self._pipeline_def
@property
def resources(self) -> Any:
"""collections.namedtuple: A dynamically-created type whose properties allow access to
resources."""
return self._pipeline_context.scoped_resources_builder.build(
required_resource_keys=self._resource_keys_to_init,
)
@property
def pipeline_run(self) -> PipelineRun:
""":class:`dagster.PipelineRun`: The pipeline run for the context."""
return self._pipeline_context.pipeline_run
@property
def log(self) -> DagsterLogManager:
""":class:`dagster.DagsterLogManager`: The log manager for the context.
Call, e.g., ``log.info()`` to log messages through the Dagster machinery.
"""
return self._pipeline_context.log
@property
def solid_def(self) -> SolidDefinition:
""":class:`dagster.SolidDefinition`: The solid definition for the context.
In interactive contexts, this may be a dagstermill-specific shim, depending whether a
solid definition was passed to ``dagstermill.get_context``.
"""
return self.pipeline_def.solid_def_named(self.solid_name)
@property
def solid(self) -> Solid:
""":class:`dagster.Solid`: The solid for the context.
In interactive contexts, this may be a dagstermill-specific shim, depending whether a
solid definition was passed to ``dagstermill.get_context``.
"""
return self.pipeline_def.solid_named(self.solid_name)
@property
def solid_config(self) -> Any:
"""collections.namedtuple: A dynamically-created type whose properties allow access to
solid-specific config."""
if self._solid_config:
return self._solid_config
solid_config = self.environment_config.solids.get(self.solid_name)
return solid_config.config if solid_config else None
class DagstermillRuntimeExecutionContext(DagstermillExecutionContext):
pass