DagsterDocs

Pipeline#

A pipeline is a set of solids with explicit data dependencies on each other, creating a directed acyclic graph, or DAG.

Pipeline Diagram

Relevant APIs#

NameDescription
@pipelineThe decorator used to define a pipeline.
PipelineDefinitionClass for pipelines. You almost never want to use initialize this class directly. Instead, you should use the @pipeline which returns a PipelineDefinition
ModeDefinitionModes allow you to vary pipeline behavior between different deployment environments. For more info, see the Modes section.
PresetDefinitionPresets allow you to predefine pipeline configurations.

Overview#

Solids are linked together into pipelines by defining the dependencies between their inputs and outputs. An important difference between Dagster and other workflow systems is that in Dagster, solids dependencies are expressed as data dependencies instead of just the order solids should execute in.

This difference enables Dagster to support richer modeling of dependencies. Instead of merely ensuring that the order of execution is correct, dependencies in Dagster provide a variety of compile and run-time checks.

Dependencies are expressed in Pipelines using Dagster's function invocation DSL.


Defining a pipeline#

To define a pipeline, use the @pipeline decorator.

Within the decorated function body, we use function calls to indicate the dependency structure between the solids making up the pipeline.

In this example, the add_one solid depends on the return_one solid's output. Because this data dependency exists, the return_one solid executes after add_one runs successfully and emits the required output.

@solid
def return_one(context):
    return 1


@solid
def add_one(context, number: int):
    return number + 1


@pipeline
def one_plus_one_pipeline():
    add_one(return_one())

Aliases and Tags#

Solid aliases#

You can use the same solid definition multiple times in the same pipeline.

@pipeline
def multiple_usage_pipeline():
    add_one(add_one(return_one()))

To differentiate between the two invocations of add_one, Dagster automatically aliases the solid names to be add_one and add_one_2.

You can also manually define the alias by using the .alias method on the solid invocation.

@pipeline
def alias_pipeline():
    add_one.alias("second_addition")(add_one(return_one()))

Solid Tags#

Similar to aliases, you can also define solid tags on a solid invocation.

@pipeline
def tag_pipeline():
    add_one.tag({"my_tag": "my_value"})(add_one(return_one()))

Pipeline Configuration#

Pipeline Modes#

Pipeline definitions do not expose a configuration schema. Instead, they specify a set of ModeDefinitions that can be used with the pipeline. For more information on Modes, see the Modes section.

dev_mode = ModeDefinition("dev")
staging_mode = ModeDefinition("staging")
prod_mode = ModeDefinition("prod")


@pipeline(mode_defs=[dev_mode, staging_mode, prod_mode])
def my_modes_pipeline():
    my_solid()

Pipeline Presets#

You can predefine configurations in which a pipeline can execute. PresetDefinition allows you to do so by specifying configuration values at the pipeline definition time.

@pipeline(
    preset_defs=[
        PresetDefinition(
            name="one",
            run_config={"solids": {"add_one": {"inputs": {"number": 1}}}},
        ),
        PresetDefinition(
            name="two",
            run_config={"solids": {"add_one": {"inputs": {"number": 2}}}},
        ),
    ]
)
def my_presets_pipeline():
    add_one()

In Dagit, You can choose and load a preset in the Playground:

Pipeline Presets

To run a pipeline in a script or in a test, You can specify a preset name via preset argument to the Python API execute_pipeline:

def run_pipeline():
    execute_pipeline(my_presets_pipeline, preset="one")

You can also use the Dagster CLI dagster pipeline execute --preset to run a pipeline with a preset name:

dagster pipeline execute my_presets_pipeline --preset one

Pipeline Tags#

Pipelines can specify a set of tags that are also automatically set on the resulting pipeline runs.

@pipeline(tags={"my_tag": "my_value"})
def my_tags_pipeline():
    my_solid()

This pipeline defines a my_tag tag. Any pipeline runs created using this pipeline will also have the same tag.

Examples#

There are many DAG structures can be represented using pipelines. This section covers a few basic patterns you can use to build more complex pipelines.

Linear Dependencies#

The simplest pipeline structure is the linear pipeline. We return one output from the root solid, and pass along data through single inputs and outputs.

Linear Pipeline
from dagster import pipeline, solid


@solid
def return_one(context) -> int:
    return 1


@solid
def add_one(context, number: int) -> int:
    return number + 1


@pipeline
def linear_pipeline():
    add_one(add_one(add_one(return_one())))

Multiple Inputs and Outputs#

Branching Pipeline

A single output can be passed to multiple inputs on downstream solids. In this example, the output from the first solid is passed to two different solids. The outputs of those solids are combined and passed to the final solid.

from dagster import pipeline, solid


@solid
def return_one(context) -> int:
    return 1


@solid
def add_one(context, number: int):
    return number + 1


@solid
def adder(context, a: int, b: int) -> int:
    return a + b


@pipeline
def inputs_and_outputs_pipeline():
    value = return_one()
    a = add_one(value)
    b = add_one(value)
    adder(a, b)

Conditional Branching#

branch

A solid only starts to execute once all of its inputs have been resolved. We can use this behavior to model conditional execution of solids.

In this example, the branching_solid outputs either the branch_1 result or branch_2 result. Since solid execution is skipped for solids that have unresolved inputs, only one of the downstream solids will execute.

import random

from dagster import Output, OutputDefinition, pipeline, solid


@solid(
    output_defs=[
        OutputDefinition(name="branch_1", is_required=False),
        OutputDefinition(name="branch_2", is_required=False),
    ]
)
def branching_solid():
    num = random.randint(0, 1)
    if num == 0:
        yield Output(1, "branch_1")
    else:
        yield Output(2, "branch_2")


@solid
def branch_1_solid(_input):
    pass


@solid
def branch_2_solid(_input):
    pass


@pipeline
def branching_pipeline():
    branch_1, branch_2 = branching_solid()
    branch_1_solid(branch_1)
    branch_2_solid(branch_2)

Fixed Fan-in Pipeline#

If you have a fixed set of solids that all return the same output type, you can collect all the outputs into a list and pass them into a single downstream solid.

The downstream solid executes only if all of the outputs were successfully created by the upstream solids.

from typing import List

from dagster import pipeline, solid


@solid
def return_one() -> int:
    return 1


@solid
def sum_fan_in(nums: List[int]) -> int:
    return sum(nums)


@pipeline
def fan_in_pipeline():
    fan_outs = []
    for i in range(0, 10):
        fan_outs.append(return_one.alias(f"return_one_{i}")())
    sum_fan_in(fan_outs)

In this example, we have 10 solids that all output the number 1. The sum_fan_in solid takes all of these outputs as a list and sums them.

Dynamic Mapping & Collect
Experimental
#

In most cases, the structure of a pipeline is pre-determined before execution. Dagster now has experimental support for creating pipelines where the final structure is not determined until run-time. This is useful for pipeline structures where you want to execute a separate instance of a solid for each entry in a certain output.

In this example we have a solid files_in_directory that defines a DynamicOutputDefinition. We map over the dynamic output which will cause the downstream dependencies to be cloned for each DynamicOutput that is yielded. The downstream copies can be identified by the mapping_key supplied to DynamicOutput. Once that's all complete, we collect the over results of process_file and pass that in to summarize_directory.

import os
from typing import List

from dagster import Field, pipeline, solid
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from dagster.utils import file_relative_path


@solid(
    config_schema={"path": Field(str, default_value=file_relative_path(__file__, "sample"))},
    output_defs=[DynamicOutputDefinition(str)],
)
def files_in_directory(context):
    path = context.solid_config["path"]
    dirname, _, filenames = next(os.walk(path))
    for file in filenames:
        yield DynamicOutput(
            value=os.path.join(dirname, file),
            # create a mapping key from the file name
            mapping_key=file.replace(".", "_").replace("-", "_"),
        )


@solid
def process_file(path: str) -> int:
    # simple example of calculating size
    return os.path.getsize(path)


@solid
def summarize_directory(sizes: List[int]) -> int:
    # simple example of totalling sizes
    return sum(sizes)


@pipeline
def process_directory():
    file_results = files_in_directory().map(process_file)
    summarize_directory(file_results.collect())

Order-based Dependencies (Nothing dependencies)#

linear

Dependencies in Dagster are primarily data dependencies. Using data dependencies means each input of a solid depends on the output of an upstream solid.

If you have a solid, say Solid A, that does not depend on any outputs of another solid, say Solid B, there theoretically shouldn't be a reason for Solid A to run after Solid B. In most cases, these two solids should be parallelizable. However, there are some cases where an explicit ordering is required, but it doesn't make sense to pass data through inputs and outputs to model the dependency.

If you need to model an explicit ordering dependency, you can use the Nothing Dagster type on the input definition of the downstream solid. This type specifies that you are passing "nothing" via Dagster between the solids, while still uses inputs and outputs to model the dependency between the two solids.

from dagster import InputDefinition, Nothing, pipeline, solid


@solid
def create_table_1() -> Nothing:
    get_database_connection().execute("create table_1 as select * from some_source_table")


@solid(input_defs=[InputDefinition("start", Nothing)])
def create_table_2():
    get_database_connection().execute("create table_2 as select * from table_1")


@pipeline
def nothing_dependency_pipeline():
    create_table_2(create_table_1())

In this example, create_table_1 returns an output of type Nothing, and create_table_2 has an input of type Nothing. This lets us connect them in the pipeline definition so that create_table_2 executes only after create_table_1 successfully executes.

Note that in most cases, it is usually possible to pass some data dependency. In the example above, even though we probably don't want to pass the table data itself between the solids, we could pass table pointers. For example, create_table_1 could return a table_pointer output of type str with a value of table_1, and this table name can be used in create_table_2 to more accurately model the data dependency.

Dagster also provides more advanced abstractions to handle dependencies and IO. If you find that you are finding it difficult to model data dependencies when using external storages, check out IOManagers.

Patterns#

Constructing PipelineDefinitions#

You may run into a situation where you need to programmatically construct the dependency graph for a pipeline. In that case, you can directly define the PipelineDefinition object.

To construct a PipelineDefinition, you need to pass the constructor a pipeline name, a list of solid definitions, and a dictionary defining the dependency structure. The dependency structure declares the dependencies of each solid’s inputs on the outputs of other solids in the pipeline. The top-level keys of the dependency dictionary are the string names of solids. If you are using solid aliases be sure to use the aliased name. Values of the top-level keys are also dictionary, which maps input names to a DependencyDefinition.

one_plus_one_pipeline_def = PipelineDefinition(
    name="one_plus_one_pipeline",
    solid_defs=[return_one, add_one],
    dependencies={"add_one": {"number": DependencyDefinition("return_one")}},
)

Pipeline DSL#

Sometimes you may want to construct the dependencies of a pipeline definition from a YAML file or similar. This is useful when migrating to Dagster from other workflow systems.

For example, you can have a YAML like this:

pipeline:
  name: some_example
  description: blah blah blah
  solids:
    - def: add_one
      alias: A
    - def: add_one
      alias: B
      deps:
        num:
          solid: A
    - def: add_two
      alias: C
      deps:
        num:
          solid: A
    - def: subtract
      deps:
        left:
          solid: B
        right:
          solid: C

You can programatically generate a PipelineDefinition from this YAML:

@solid
def add_one(num: int) -> int:
    return num + 1


@solid
def add_two(num: int) -> int:
    return num + 2


@solid
def subtract(left: int, right: int) -> int:
    return left + right


def construct_pipeline_with_yaml(yaml_file, solid_defs):
    yaml_data = load_yaml_from_path(yaml_file)
    solid_def_dict = {s.name: s for s in solid_defs}

    deps = {}

    for solid_yaml_data in yaml_data["pipeline"]["solids"]:
        check.invariant(solid_yaml_data["def"] in solid_def_dict)
        def_name = solid_yaml_data["def"]
        alias = solid_yaml_data.get("alias", def_name)
        solid_deps_entry = {}
        for input_name, input_data in solid_yaml_data.get("deps", {}).items():
            solid_deps_entry[input_name] = DependencyDefinition(
                solid=input_data["solid"], output=input_data.get("output", "result")
            )
        deps[SolidInvocation(name=def_name, alias=alias)] = solid_deps_entry

    return PipelineDefinition(
        name=yaml_data["pipeline"]["name"],
        description=yaml_data["pipeline"].get("description"),
        solid_defs=solid_defs,
        dependencies=deps,
    )


def define_dep_dsl_pipeline():
    return construct_pipeline_with_yaml(
        file_relative_path(__file__, "example.yaml"), [add_one, add_two, subtract]
    )


@repository
def define_repository():
    return {"pipelines": {"some_example": define_dep_dsl_pipeline}}