DagsterDocs

Source code for dagstermill.context

from typing import Any, Dict, Set

from dagster import PipelineDefinition, PipelineRun, SolidDefinition, check
from dagster.core.definitions.dependency import Solid
from dagster.core.execution.context.compute import AbstractComputeExecutionContext
from dagster.core.execution.context.system import PlanExecutionContext
from dagster.core.log_manager import DagsterLogManager
from dagster.core.system_config.objects import EnvironmentConfig


[docs]class DagstermillExecutionContext(AbstractComputeExecutionContext): """Dagstermill-specific execution context. Do not initialize directly: use :func:`dagstermill.get_context`. """ def __init__( self, pipeline_context: PlanExecutionContext, pipeline_def: PipelineDefinition, resource_keys_to_init: Set[str], solid_name: str, solid_config: Any = None, ): self._pipeline_context = check.inst_param( pipeline_context, "pipeline_context", PlanExecutionContext ) self._pipeline_def = check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) self._resource_keys_to_init = check.set_param( resource_keys_to_init, "resource_keys_to_init", of_type=str ) self.solid_name = check.str_param(solid_name, "solid_name") self._solid_config = solid_config
[docs] def has_tag(self, key: str) -> bool: """Check if a logging tag is defined on the context. Args: key (str): The key to check. Returns: bool """ check.str_param(key, "key") return self._pipeline_context.has_tag(key)
[docs] def get_tag(self, key: str) -> str: """Get a logging tag defined on the context. Args: key (str): The key to get. Returns: str """ check.str_param(key, "key") return self._pipeline_context.get_tag(key)
@property def run_id(self) -> str: """str: The run_id for the context.""" return self._pipeline_context.run_id @property def run_config(self) -> Dict[str, Any]: """dict: The run_config for the context.""" return self._pipeline_context.run_config @property def environment_config(self) -> EnvironmentConfig: """:class:`dagster.EnvironmentConfig`: The environment_config for the context""" return self._pipeline_context.environment_config @property def logging_tags(self) -> Dict[str, str]: """dict: The logging tags for the context.""" return self._pipeline_context.logging_tags @property def pipeline_name(self) -> str: return self._pipeline_context.pipeline_name @property def pipeline_def(self) -> PipelineDefinition: """:class:`dagster.PipelineDefinition`: The pipeline definition for the context. This will be a dagstermill-specific shim. """ return self._pipeline_def @property def resources(self) -> Any: """collections.namedtuple: A dynamically-created type whose properties allow access to resources.""" return self._pipeline_context.scoped_resources_builder.build( required_resource_keys=self._resource_keys_to_init, ) @property def pipeline_run(self) -> PipelineRun: """:class:`dagster.PipelineRun`: The pipeline run for the context.""" return self._pipeline_context.pipeline_run @property def log(self) -> DagsterLogManager: """:class:`dagster.DagsterLogManager`: The log manager for the context. Call, e.g., ``log.info()`` to log messages through the Dagster machinery. """ return self._pipeline_context.log @property def solid_def(self) -> SolidDefinition: """:class:`dagster.SolidDefinition`: The solid definition for the context. In interactive contexts, this may be a dagstermill-specific shim, depending whether a solid definition was passed to ``dagstermill.get_context``. """ return self.pipeline_def.solid_def_named(self.solid_name) @property def solid(self) -> Solid: """:class:`dagster.Solid`: The solid for the context. In interactive contexts, this may be a dagstermill-specific shim, depending whether a solid definition was passed to ``dagstermill.get_context``. """ return self.pipeline_def.solid_named(self.solid_name) @property def solid_config(self) -> Any: """collections.namedtuple: A dynamically-created type whose properties allow access to solid-specific config.""" if self._solid_config: return self._solid_config solid_config = self.environment_config.solids.get(self.solid_name) return solid_config.config if solid_config else None
class DagstermillRuntimeExecutionContext(DagstermillExecutionContext): pass