DagsterDocs

Source code for dagster.core.definitions.pipeline

from functools import update_wrapper
from typing import TYPE_CHECKING, AbstractSet, Any, Dict, FrozenSet, List, Optional, Set, Union

from dagster import check
from dagster.core.definitions.input import InputMapping
from dagster.core.definitions.output import OutputMapping
from dagster.core.definitions.policy import RetryPolicy
from dagster.core.definitions.resource import ResourceDefinition
from dagster.core.definitions.solid import NodeDefinition
from dagster.core.errors import (
    DagsterInvalidDefinitionError,
    DagsterInvalidSubsetError,
    DagsterInvariantViolationError,
)
from dagster.core.storage.output_manager import IOutputManagerDefinition
from dagster.core.storage.root_input_manager import (
    IInputManagerDefinition,
    RootInputManagerDefinition,
)
from dagster.core.types.dagster_type import DagsterType, DagsterTypeKind
from dagster.core.utils import str_format_set
from dagster.utils.backcompat import experimental_arg_warning

from .config import ConfigMapping
from .dependency import (
    DependencyDefinition,
    DependencyStructure,
    DynamicCollectDependencyDefinition,
    IDependencyDefinition,
    MultiDependencyDefinition,
    Solid,
    SolidHandle,
    SolidInvocation,
)
from .graph import GraphDefinition
from .hook import HookDefinition
from .mode import ModeDefinition
from .preset import PresetDefinition
from .solid import NodeDefinition
from .utils import validate_tags

if TYPE_CHECKING:
    from .run_config_schema import RunConfigSchema
    from dagster.core.snap import PipelineSnapshot, ConfigSchemaSnapshot
    from dagster.core.host_representation import PipelineIndex


[docs]class PipelineDefinition(GraphDefinition): """Defines a Dagster pipeline. A pipeline is made up of - Solids, each of which is a single functional unit of data computation. - Dependencies, which determine how the values produced by solids as their outputs flow from one solid to another. This tells Dagster how to arrange solids, and potentially multiple aliased instances of solids, into a directed, acyclic graph (DAG) of compute. - Modes, which can be used to attach resources, custom loggers, custom system storage options, and custom executors to a pipeline, and to switch between them. - Presets, which can be used to ship common combinations of pipeline config options in Python code, and to switch between them. Args: solid_defs (List[SolidDefinition]): The set of solids used in this pipeline. name (str): The name of the pipeline. Must be unique within any :py:class:`RepositoryDefinition` containing the pipeline. description (Optional[str]): A human-readable description of the pipeline. dependencies (Optional[Dict[Union[str, SolidInvocation], Dict[str, DependencyDefinition]]]): A structure that declares the dependencies of each solid's inputs on the outputs of other solids in the pipeline. Keys of the top level dict are either the string names of solids in the pipeline or, in the case of aliased solids, :py:class:`SolidInvocations <SolidInvocation>`. Values of the top level dict are themselves dicts, which map input names belonging to the solid or aliased solid to :py:class:`DependencyDefinitions <DependencyDefinition>`. mode_defs (Optional[List[ModeDefinition]]): The set of modes in which this pipeline can operate. Modes are used to attach resources, custom loggers, custom system storage options, and custom executors to a pipeline. Modes can be used, e.g., to vary available resource and logging implementations between local test and production runs. preset_defs (Optional[List[PresetDefinition]]): A set of preset collections of configuration options that may be used to execute a pipeline. A preset consists of an environment dict, an optional subset of solids to execute, and a mode selection. Presets can be used to ship common combinations of options to pipeline end users in Python code, and can be selected by tools like Dagit. tags (Optional[Dict[str, Any]]): Arbitrary metadata for any execution run of the pipeline. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. These tag values may be overwritten by tag values provided at invocation time. hook_defs (Optional[AbstractSet[HookDefinition]]): A set of hook definitions applied to the pipeline. When a hook is applied to a pipeline, it will be attached to all solid instances within the pipeline. solid_retry_policy (Optional[RetryPolicy]): The default retry policy for all solids in this pipeline. Only used if retry policy is not defined on the solid definition or solid invocation. _parent_pipeline_def (INTERNAL ONLY): Used for tracking pipelines created using solid subsets. Examples: .. code-block:: python @solid def return_one(_): return 1 @solid(input_defs=[InputDefinition('num')], required_resource_keys={'op'}) def apply_op(context, num): return context.resources.op(num) @resource(config_schema=Int) def adder_resource(init_context): return lambda x: x + init_context.resource_config add_mode = ModeDefinition( name='add_mode', resource_defs={'op': adder_resource}, description='Mode that adds things', ) add_three_preset = PresetDefinition( name='add_three_preset', run_config={'resources': {'op': {'config': 3}}}, mode='add_mode', ) pipeline_def = PipelineDefinition( name='basic', solid_defs=[return_one, apply_op], dependencies={'apply_op': {'num': DependencyDefinition('return_one')}}, mode_defs=[add_mode], preset_defs=[add_three_preset], ) """ def __init__( self, solid_defs: List[NodeDefinition], name: str, description: Optional[str] = None, dependencies: Optional[ Dict[Union[str, SolidInvocation], Dict[str, IDependencyDefinition]] ] = None, mode_defs: Optional[List[ModeDefinition]] = None, preset_defs: Optional[List[PresetDefinition]] = None, tags: Dict[str, Any] = None, hook_defs: Optional[AbstractSet[HookDefinition]] = None, input_mappings: Optional[List[InputMapping]] = None, output_mappings: Optional[List[OutputMapping]] = None, config_mapping: Optional[ConfigMapping] = None, positional_inputs: List[str] = None, solid_retry_policy: Optional[RetryPolicy] = None, _parent_pipeline_def: Optional[ "PipelineDefinition" ] = None, # https://github.com/dagster-io/dagster/issues/2115 ): # For these warnings they check truthiness because they get changed to [] higher # in the stack for the decorator case if input_mappings: experimental_arg_warning("input_mappings", "PipelineDefinition") if output_mappings: experimental_arg_warning("output_mappings", "PipelineDefinition") if config_mapping is not None: experimental_arg_warning("config_mapping", "PipelineDefinition") if positional_inputs: experimental_arg_warning("positional_inputs", "PipelineDefinition") super(PipelineDefinition, self).__init__( name=name, description=description, dependencies=dependencies, node_defs=solid_defs, tags=check.opt_dict_param(tags, "tags", key_type=str), positional_inputs=positional_inputs, input_mappings=input_mappings, output_mappings=output_mappings, config_mapping=config_mapping, ) self._current_level_node_defs = solid_defs self._tags = validate_tags(tags) mode_definitions = check.opt_list_param(mode_defs, "mode_defs", of_type=ModeDefinition) if not mode_definitions: mode_definitions = [ModeDefinition()] self._mode_definitions = mode_definitions seen_modes = set() for mode_def in mode_definitions: if mode_def.name in seen_modes: raise DagsterInvalidDefinitionError( ( 'Two modes seen with the name "{mode_name}" in "{pipeline_name}". ' "Modes must have unique names." ).format(mode_name=mode_def.name, pipeline_name=self._name) ) seen_modes.add(mode_def.name) self._hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition) self._solid_retry_policy = check.opt_inst_param( solid_retry_policy, "solid_retry_policy", RetryPolicy ) self._preset_defs = check.opt_list_param(preset_defs, "preset_defs", PresetDefinition) self._preset_dict: Dict[str, PresetDefinition] = {} for preset in self._preset_defs: if preset.name in self._preset_dict: raise DagsterInvalidDefinitionError( ( 'Two PresetDefinitions seen with the name "{name}" in "{pipeline_name}". ' "PresetDefinitions must have unique names." ).format(name=preset.name, pipeline_name=self._name) ) if preset.mode not in seen_modes: raise DagsterInvalidDefinitionError( ( 'PresetDefinition "{name}" in "{pipeline_name}" ' 'references mode "{mode}" which is not defined.' ).format(name=preset.name, pipeline_name=self._name, mode=preset.mode) ) self._preset_dict[preset.name] = preset self._resource_requirements = { mode_def.name: _checked_resource_reqs_for_mode( mode_def, self._current_level_node_defs, self._dagster_type_dict, self._solid_dict, self._hook_defs, self._dependency_structure, ) for mode_def in self._mode_definitions } # Recursively explore all nodes in the this pipeline self._all_node_defs = _build_all_node_defs(self._current_level_node_defs) self._parent_pipeline_def = check.opt_inst_param( _parent_pipeline_def, "_parent_pipeline_def", PipelineDefinition ) self._cached_run_config_schemas: Dict[str, "RunConfigSchema"] = {} self._cached_external_pipeline = None def copy_for_configured( self, name: str, description: Optional[str], config_schema: Any, config_or_config_fn, ) -> "PipelineDefinition": if not self.has_config_mapping: raise DagsterInvalidDefinitionError( "Only pipelines utilizing config mapping can be pre-configured. The pipeline " '"{graph_name}" does not have a config mapping, and thus has nothing to be ' "configured.".format(graph_name=self.name) ) return PipelineDefinition( solid_defs=self._solid_defs, name=name, description=description or self.description, dependencies=self._dependencies, mode_defs=self._mode_definitions, preset_defs=self.preset_defs, hook_defs=self.hook_defs, input_mappings=self._input_mappings, output_mappings=self._output_mappings, config_mapping=ConfigMapping( self._config_mapping.config_fn, config_schema=config_schema ), positional_inputs=self.positional_inputs, _parent_pipeline_def=self._parent_pipeline_def, ) def get_run_config_schema(self, mode: Optional[str] = None) -> "RunConfigSchema": check.str_param(mode, "mode") mode_def = self.get_mode_definition(mode) if mode_def.name in self._cached_run_config_schemas: return self._cached_run_config_schemas[mode_def.name] self._cached_run_config_schemas[mode_def.name] = _create_run_config_schema( self, mode_def, self._resource_requirements[mode_def.name] ) return self._cached_run_config_schemas[mode_def.name] @property def mode_definitions(self) -> List[ModeDefinition]: return self._mode_definitions @property def preset_defs(self) -> List[PresetDefinition]: return self._preset_defs def _get_mode_definition(self, mode: str) -> Optional[ModeDefinition]: check.str_param(mode, "mode") for mode_definition in self._mode_definitions: if mode_definition.name == mode: return mode_definition return None def get_default_mode(self) -> ModeDefinition: return self._mode_definitions[0] @property def is_single_mode(self) -> bool: return len(self._mode_definitions) == 1 @property def is_multi_mode(self) -> bool: return len(self._mode_definitions) > 1 def has_mode_definition(self, mode: str) -> bool: check.str_param(mode, "mode") return bool(self._get_mode_definition(mode)) def get_default_mode_name(self) -> str: return self._mode_definitions[0].name def get_mode_definition(self, mode: Optional[str] = None) -> ModeDefinition: check.opt_str_param(mode, "mode") if mode is None: check.invariant(self.is_single_mode) return self.get_default_mode() mode_def = self._get_mode_definition(mode) if mode_def is None: check.failed( "Could not find mode {mode} in pipeline {name}".format(mode=mode, name=self._name), ) return mode_def @property def available_modes(self) -> List[str]: return [mode_def.name for mode_def in self._mode_definitions] def get_required_resource_defs_for_mode(self, mode: str) -> Dict[str, ResourceDefinition]: return { resource_key: resource for resource_key, resource in self.get_mode_definition(mode).resource_defs.items() if resource_key in self._resource_requirements[mode] } @property def tags(self) -> Dict[str, Any]: return self._tags def has_dagster_type(self, name: str) -> bool: check.str_param(name, "name") return name in self._dagster_type_dict def dagster_type_named(self, name: str) -> DagsterType: check.str_param(name, "name") return self._dagster_type_dict[name] @property def all_solid_defs(self) -> List[NodeDefinition]: return list(self._all_node_defs.values()) @property def top_level_solid_defs(self) -> List[NodeDefinition]: return self._current_level_node_defs def solid_def_named(self, name: str) -> NodeDefinition: check.str_param(name, "name") check.invariant(name in self._all_node_defs, "{} not found".format(name)) return self._all_node_defs[name] def has_solid_def(self, name: str) -> bool: check.str_param(name, "name") return name in self._all_node_defs def get_pipeline_subset_def(self, solids_to_execute: AbstractSet[str]) -> "PipelineDefinition": return ( self if solids_to_execute is None else _get_pipeline_subset_def(self, solids_to_execute) ) def has_preset(self, name: str) -> bool: check.str_param(name, "name") return name in self._preset_dict def get_preset(self, name: str) -> PresetDefinition: check.str_param(name, "name") if name not in self._preset_dict: raise DagsterInvariantViolationError( ( 'Could not find preset for "{name}". Available presets ' 'for pipeline "{pipeline_name}" are {preset_names}.' ).format( name=name, preset_names=list(self._preset_dict.keys()), pipeline_name=self._name ) ) return self._preset_dict[name] def get_pipeline_snapshot(self) -> "PipelineSnapshot": return self.get_pipeline_index().pipeline_snapshot def get_pipeline_snapshot_id(self) -> str: return self.get_pipeline_index().pipeline_snapshot_id def get_pipeline_index(self) -> "PipelineIndex": from dagster.core.snap import PipelineSnapshot from dagster.core.host_representation import PipelineIndex return PipelineIndex( PipelineSnapshot.from_pipeline_def(self), self.get_parent_pipeline_snapshot() ) def get_config_schema_snapshot(self) -> "ConfigSchemaSnapshot": return self.get_pipeline_snapshot().config_schema_snapshot @property def is_subset_pipeline(self) -> bool: return False @property def parent_pipeline_def(self) -> Optional["PipelineDefinition"]: return None def get_parent_pipeline_snapshot(self) -> Optional["PipelineSnapshot"]: return None @property def solids_to_execute(self) -> Optional[FrozenSet[str]]: return None @property def hook_defs(self) -> AbstractSet[HookDefinition]: return self._hook_defs def get_all_hooks_for_handle(self, handle: SolidHandle) -> FrozenSet[HookDefinition]: """Gather all the hooks for the given solid from all places possibly attached with a hook. A hook can be attached to any of the following objects * Solid (solid invocation) * PipelineDefinition Args: handle (SolidHandle): The solid's handle Returns: FrozenSet[HookDefinition] """ check.inst_param(handle, "handle", SolidHandle) hook_defs: AbstractSet[HookDefinition] = set() current = handle lineage = [] while current: lineage.append(current.name) current = current.parent # hooks on top-level solid name = lineage.pop() solid = self.solid_named(name) hook_defs = hook_defs.union(solid.hook_defs) # hooks on non-top-level solids while lineage: name = lineage.pop() solid = solid.definition.solid_named(name) hook_defs = hook_defs.union(solid.hook_defs) # hooks applied to a pipeline definition will run on every solid hook_defs = hook_defs.union(self.hook_defs) return frozenset(hook_defs) def get_retry_policy_for_handle(self, handle: SolidHandle) -> Optional[RetryPolicy]: solid = self.get_solid(handle) if solid.retry_policy: return solid.retry_policy elif solid.definition.retry_policy: return solid.definition.retry_policy # could be expanded to look in composite_solid / graph containers else: return self._solid_retry_policy def with_hooks(self, hook_defs: AbstractSet[HookDefinition]) -> "PipelineDefinition": """Apply a set of hooks to all solid instances within the pipeline.""" hook_defs = check.set_param(hook_defs, "hook_defs", of_type=HookDefinition) pipeline_def = PipelineDefinition( solid_defs=self.top_level_solid_defs, name=self.name, description=self.description, dependencies=self.dependencies, mode_defs=self.mode_definitions, preset_defs=self.preset_defs, tags=self.tags, hook_defs=hook_defs | self.hook_defs, _parent_pipeline_def=self._parent_pipeline_def, ) update_wrapper(pipeline_def, self, updated=()) return pipeline_def
class PipelineSubsetDefinition(PipelineDefinition): @property def solids_to_execute(self) -> Optional[FrozenSet[str]]: return frozenset(self._solid_dict.keys()) @property def solid_selection(self) -> List[str]: # we currently don't pass the real solid_selection (the solid query list) down here. # so in the short-term, to make the call sites cleaner, we will convert the solids to execute # to a list return list(self._solid_dict.keys()) @property def parent_pipeline_def(self) -> Optional["PipelineDefinition"]: return self._parent_pipeline_def def get_parent_pipeline_snapshot(self) -> Optional["PipelineSnapshot"]: return self._parent_pipeline_def.get_pipeline_snapshot() @property def is_subset_pipeline(self) -> bool: return True def get_pipeline_subset_def( self, solids_to_execute: AbstractSet[str] ) -> "PipelineSubsetDefinition": raise DagsterInvariantViolationError("Pipeline subsets may not be subset again.") def _dep_key_of(solid: Solid) -> SolidInvocation: return SolidInvocation( name=solid.definition.name, alias=solid.name, tags=solid.tags, hook_defs=solid.hook_defs, retry_policy=solid.retry_policy, ) def _get_pipeline_subset_def( pipeline_def, solids_to_execute: AbstractSet[str] ) -> "PipelineSubsetDefinition": """ Build a pipeline which is a subset of another pipeline. Only includes the solids which are in solids_to_execute. """ check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) check.set_param(solids_to_execute, "solids_to_execute", of_type=str) for solid_name in solids_to_execute: if not pipeline_def.has_solid_named(solid_name): raise DagsterInvalidSubsetError( "Pipeline {pipeline_name} has no solid named {name}.".format( pipeline_name=pipeline_def.name, name=solid_name ), ) solids = list(map(pipeline_def.solid_named, solids_to_execute)) deps: Dict[ Union[str, SolidInvocation], Dict[str, IDependencyDefinition], ] = {_dep_key_of(solid): {} for solid in solids} for solid in solids: for input_handle in solid.input_handles(): if pipeline_def.dependency_structure.has_direct_dep(input_handle): output_handle = pipeline_def.dependency_structure.get_direct_dep(input_handle) if output_handle.solid.name in solids_to_execute: deps[_dep_key_of(solid)][input_handle.input_def.name] = DependencyDefinition( solid=output_handle.solid.name, output=output_handle.output_def.name ) if pipeline_def.dependency_structure.has_dynamic_fan_in_dep(input_handle): output_handle = pipeline_def.dependency_structure.get_dynamic_fan_in_dep( input_handle ) if output_handle.solid.name in solids_to_execute: deps[_dep_key_of(solid)][ input_handle.input_def.name ] = DynamicCollectDependencyDefinition( solid_name=output_handle.solid.name, output_name=output_handle.output_def.name, ) elif pipeline_def.dependency_structure.has_fan_in_deps(input_handle): output_handles = pipeline_def.dependency_structure.get_fan_in_deps(input_handle) deps[_dep_key_of(solid)][input_handle.input_def.name] = MultiDependencyDefinition( [ DependencyDefinition( solid=output_handle.solid.name, output=output_handle.output_def.name ) for output_handle in output_handles if output_handle.solid.name in solids_to_execute ] ) try: sub_pipeline_def = PipelineSubsetDefinition( name=pipeline_def.name, # should we change the name for subsetted pipeline? solid_defs=list({solid.definition for solid in solids}), mode_defs=pipeline_def.mode_definitions, dependencies=deps, _parent_pipeline_def=pipeline_def, tags=pipeline_def.tags, hook_defs=pipeline_def.hook_defs, ) return sub_pipeline_def except DagsterInvalidDefinitionError as exc: # This handles the case when you construct a subset such that an unsatisfied # input cannot be loaded from config. Instead of throwing a DagsterInvalidDefinitionError, # we re-raise a DagsterInvalidSubsetError. raise DagsterInvalidSubsetError( f"The attempted subset {str_format_set(solids_to_execute)} for pipeline " f"{pipeline_def.name} results in an invalid pipeline" ) from exc def _checked_resource_reqs_for_mode( mode_def: ModeDefinition, node_defs: List[NodeDefinition], dagster_type_dict: Dict[str, DagsterType], solid_dict: Dict[str, Solid], pipeline_hook_defs: AbstractSet[HookDefinition], dependency_structure: DependencyStructure, ) -> Set[str]: """ Calculate the resource requirements for the pipeline in this mode and ensure they are provided by the mode. We combine these operations in to one traversal to allow for raising excpetions that provide as much context as possible about where the unsatisfied resource requirement came from. """ resource_reqs: Set[str] = set() mode_output_managers = set( key for key, resource_def in mode_def.resource_defs.items() if isinstance(resource_def, IOutputManagerDefinition) ) mode_resources = set(mode_def.resource_defs.keys()) for node_def in node_defs: for solid_def in node_def.iterate_solid_defs(): for required_resource in solid_def.required_resource_keys: resource_reqs.add(required_resource) if required_resource not in mode_resources: raise DagsterInvalidDefinitionError( ( f'Resource key "{required_resource}" is required by solid def ' f'{solid_def.name}, but is not provided by mode "{mode_def.name}". ' f'In mode "{mode_def.name}", provide a resource for key "{required_resource}", ' f'or change "{required_resource}" to one of the provided resources keys: {sorted(mode_resources)}.' ) ) for output_def in solid_def.output_defs: resource_reqs.add(output_def.io_manager_key) if output_def.io_manager_key not in mode_resources: raise DagsterInvalidDefinitionError( ( f'IO manager key "{output_def.io_manager_key}" is required by output ' f'"{output_def.name}" of solid def {solid_def.name}, but is not ' f'provided by mode "{mode_def.name}". ' f'In mode "{mode_def.name}", provide an IO manager for key "{output_def.io_manager_key}", ' f'or change "{output_def.io_manager_key}" to one of the provided IO manager keys: {sorted(mode_output_managers)}.' ) ) resource_reqs.update( _checked_type_resource_reqs_for_mode( mode_def, dagster_type_dict, ) ) # Validate unsatisfied inputs can be materialized from config resource_reqs.update( _checked_input_resource_reqs_for_mode(dependency_structure, solid_dict, mode_def) ) for intermediate_storage in mode_def.intermediate_storage_defs or []: for required_resource in intermediate_storage.required_resource_keys: resource_reqs.add(required_resource) if required_resource not in mode_resources: raise DagsterInvalidDefinitionError( ( f'Resource key "{required_resource}" is required by intermediate storage ' f'"{intermediate_storage.name}", but is not provided by mode "{mode_def.name}". ' f'In mode "{mode_def.name}", provide a resource for key "{required_resource}", ' f'or change "{required_resource}" to one of the provided resources keys: {sorted(mode_resources)}.' ) ) for solid in solid_dict.values(): for hook_def in solid.hook_defs: for required_resource in hook_def.required_resource_keys: resource_reqs.add(required_resource) if required_resource not in mode_resources: raise DagsterInvalidDefinitionError( ( f'Resource key "{required_resource}" is required by hook "{hook_def.name}", but is not ' f'provided by mode "{mode_def.name}". ' f'In mode "{mode_def.name}", provide a resource for key "{required_resource}", ' f'or change "{required_resource}" to one of the provided resources keys: {sorted(mode_resources)}.' ) ) for hook_def in pipeline_hook_defs: for required_resource in hook_def.required_resource_keys: resource_reqs.add(required_resource) if required_resource not in mode_resources: raise DagsterInvalidDefinitionError( ( f'Resource key "{required_resource}" is required by hook "{hook_def.name}", but is not ' f'provided by mode "{mode_def.name}". ' f'In mode "{mode_def.name}", provide a resource for key "{required_resource}", ' f'or change "{required_resource}" to one of the provided resources keys: {sorted(mode_resources)}.' ) ) for resource_key, resource in mode_def.resource_defs.items(): for required_resource in resource.required_resource_keys: resource_reqs.add(required_resource) if required_resource not in mode_resources: raise DagsterInvalidDefinitionError( f'Resource key "{required_resource}" is required by resource at key "{resource_key}", ' f'but is not provided by mode "{mode_def.name}" ' f'In mode "{mode_def.name}", provide a resource for key "{required_resource}", ' f'or change "{required_resource}" to one of the provided resources keys: {sorted(mode_resources)}.' ) return resource_reqs def _checked_type_resource_reqs_for_mode( mode_def: ModeDefinition, dagster_type_dict: Dict[str, DagsterType], ) -> Set[str]: """ Calculate all the resource requirements related to DagsterTypes for this mode and ensure the mode provides those resources. """ resource_reqs = set() mode_resources = set(mode_def.resource_defs.keys()) for dagster_type in dagster_type_dict.values(): for required_resource in dagster_type.required_resource_keys: resource_reqs.add(required_resource) if required_resource not in mode_resources: raise DagsterInvalidDefinitionError( ( f'Resource key "{required_resource}" is required by type "{dagster_type.display_name}", ' f'but is not provided by mode "{mode_def.name}". ' f'In mode "{mode_def.name}", provide a resource for key "{required_resource}", ' f'or change "{required_resource}" to one of the provided resources keys: {sorted(mode_resources)}.' ) ) if dagster_type.loader: for required_resource in dagster_type.loader.required_resource_keys(): resource_reqs.add(required_resource) if required_resource not in mode_resources: raise DagsterInvalidDefinitionError( ( f'Resource key "{required_resource}" is required by the loader on type ' f'"{dagster_type.display_name}", but is not provided by mode "{mode_def.name}". ' f'In mode "{mode_def.name}", provide a resource for key "{required_resource}", ' f'or change "{required_resource}" to one of the provided resources keys: {sorted(mode_resources)}.' ) ) if dagster_type.materializer: for required_resource in dagster_type.materializer.required_resource_keys(): resource_reqs.add(required_resource) if required_resource not in mode_resources: raise DagsterInvalidDefinitionError( ( f'Resource key "{required_resource}" is required by the materializer on type ' f'"{dagster_type.display_name}", but is not provided by mode "{mode_def.name}". ' f'In mode "{mode_def.name}", provide a resource for key "{required_resource}", ' f'or change "{required_resource}" to one of the provided resources keys: {sorted(mode_resources)}.' ) ) for plugin in dagster_type.auto_plugins: used_by_storage = set( [ intermediate_storage_def.name for intermediate_storage_def in mode_def.intermediate_storage_defs if plugin.compatible_with_storage_def(intermediate_storage_def) ] ) if used_by_storage: for required_resource in plugin.required_resource_keys(): resource_reqs.add(required_resource) if required_resource not in mode_resources: raise DagsterInvalidDefinitionError( ( f'Resource key "{required_resource}" is required by the plugin "{plugin.__name__}" ' f'on type "{dagster_type.display_name}" (used with storages {used_by_storage}), ' f'but is not provided by mode "{mode_def.name}". ' f'In mode "{mode_def.name}", provide a resource for key "{required_resource}", ' f'or change "{required_resource}" to one of the provided resources keys: {sorted(mode_resources)}.' ) ) return resource_reqs def _checked_input_resource_reqs_for_mode( dependency_structure: DependencyStructure, solid_dict: Dict[str, Solid], mode_def: ModeDefinition, ) -> Set[str]: resource_reqs = set() mode_root_input_managers = set( key for key, resource_def in mode_def.resource_defs.items() if isinstance(resource_def, RootInputManagerDefinition) ) for solid in solid_dict.values(): for handle in solid.input_handles(): if dependency_structure.has_deps(handle): for source_output_handle in dependency_structure.get_deps_list(handle): output_manager_key = source_output_handle.output_def.io_manager_key output_manager_def = mode_def.resource_defs[output_manager_key] if not isinstance(output_manager_def, IInputManagerDefinition): raise DagsterInvalidDefinitionError( f'Input "{handle.input_def.name}" of solid "{solid.name}" is ' f'connected to output "{source_output_handle.output_def.name}" ' f'of solid "{source_output_handle.solid.name}". In mode ' f'"{mode_def.name}", that output does not have an output ' f"manager that knows how to load inputs, so we don't know how " f"to load the input. To address this, assign an IOManager to " f"the upstream output." ) else: input_def = handle.input_def if ( not input_def.dagster_type.loader and not input_def.dagster_type.kind == DagsterTypeKind.NOTHING and not input_def.root_manager_key ): raise DagsterInvalidDefinitionError( 'Input "{input_name}" in solid "{solid_name}" is not connected to ' "the output of a previous solid and can not be loaded from configuration, " "creating an impossible to execute pipeline. " "Possible solutions are:\n" ' * add a dagster_type_loader for the type "{dagster_type}"\n' ' * connect "{input_name}" to the output of another solid\n'.format( solid_name=solid.name, input_name=input_def.name, dagster_type=input_def.dagster_type.display_name, ) ) # If a root manager is provided, it's always used. I.e. it has priority over # the other ways of loading unsatisfied inputs - dagster type loaders and # default values. if input_def.root_manager_key: resource_reqs.add(input_def.root_manager_key) if input_def.root_manager_key not in mode_def.resource_defs: raise DagsterInvalidDefinitionError( f'Root input manager key "{input_def.root_manager_key}" is required by ' f'unsatisfied input "{input_def.name}" of solid def {solid.name}, but is not ' f'provided by mode "{mode_def.name}". ' f'In mode "{mode_def.name}", provide a root input manager for key "{input_def.root_manager_key}", ' f'or change "{input_def.root_manager_key}" to one of the provided root input managers keys: {sorted(mode_root_input_managers)}.' ) return resource_reqs def _build_all_node_defs(node_defs: List[NodeDefinition]) -> Dict[str, NodeDefinition]: all_defs: Dict[str, NodeDefinition] = {} for current_level_node_def in node_defs: for node_def in current_level_node_def.iterate_node_defs(): if node_def.name in all_defs: if all_defs[node_def.name] != node_def: raise DagsterInvalidDefinitionError( 'Detected conflicting solid definitions with the same name "{name}"'.format( name=node_def.name ) ) else: all_defs[node_def.name] = node_def return all_defs def _create_run_config_schema( pipeline_def: PipelineDefinition, mode_definition: ModeDefinition, required_resources: Set[str], ) -> "RunConfigSchema": from .environment_configs import ( EnvironmentClassCreationData, construct_config_type_dictionary, define_environment_cls, ) from .run_config_schema import RunConfigSchema # When executing with a subset pipeline, include the missing solids # from the original pipeline as ignored to allow execution with # run config that is valid for the original if pipeline_def.is_subset_pipeline: if pipeline_def.parent_pipeline_def is None: check.failed("Unexpected subset pipeline state") ignored_solids = [ solid for solid in pipeline_def.parent_pipeline_def.solids if not pipeline_def.has_solid_named(solid.name) ] else: ignored_solids = [] environment_type = define_environment_cls( EnvironmentClassCreationData( pipeline_name=pipeline_def.name, solids=pipeline_def.solids, dependency_structure=pipeline_def.dependency_structure, mode_definition=mode_definition, logger_defs=mode_definition.loggers, ignored_solids=ignored_solids, required_resources=required_resources, ) ) config_type_dict_by_name, config_type_dict_by_key = construct_config_type_dictionary( pipeline_def.all_solid_defs, environment_type ) return RunConfigSchema( environment_type=environment_type, config_type_dict_by_name=config_type_dict_by_name, config_type_dict_by_key=config_type_dict_by_key, )