DagsterDocs

Connecting Solids in Pipelines#

You can find the code for this example on Github

Our pipelines wouldn't be very interesting if they were limited to single solids. Pipelines connect solids into arbitrary DAGs of computation.

Why split up code into solids instead of splitting it up into regular Python functions? There are a few reasons:

  • Dagster can execute sets of solids without executing the entire pipeline. This means that, if we hit a failure in our pipeline, we can re-run just the steps that didn't complete successfully, which often allows us to avoid re-executing expensive steps.
  • When two solids don't depend on each other, Dagster can execute them simultaneously.
  • Dagster can materialize the output of a solid to persistent storage. IOManagers let us separate business logic from IO, which lets us write code that's more testable and portable across environments.

Dagster pipelines model a dataflow graph. In data pipelines, the reason that a later step comes after an earlier step is almost always that it uses data produced by the earlier step. Dagster models these dataflow dependencies with inputs and outputs.

Let's Get Serial#

We'll expand the pipeline we worked with in the first section of the tutorial into two solids:

  • The first solid will download the cereal data and return it as an output.
  • The second solid will consume the cereal data produced by the first solid and find the cereal with the most sugar.

This will allow us to re-run the code that finds the sugariest cereal without re-running the code that downloads the cereal data. If we spot a bug in our sugariness code, or if we decide we want to compute some other statistics about the cereal data, we won't need to re-download the data.

import csv

import requests
from dagster import pipeline, solid


@solid
def download_cereals():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]


@solid
def find_sugariest(context, cereals):
    sorted_by_sugar = sorted(cereals, key=lambda cereal: cereal["sugars"])
    context.log.info(f'{sorted_by_sugar[-1]["name"]} is the sugariest cereal')


@pipeline
def serial_pipeline():
    find_sugariest(download_cereals())

You'll see that we've modified our existing download_cereals solid to return an output, in this case the data frame representing the cereals dataset. We no longer log from this solid, so we no longer need the context argument here.

We've defined our new solid, find_sugariest, to take a user-defined input, cereals, in addition to the system-provided context object.

We can use inputs and outputs to connect solids to each other. Here we tell Dagster that:

  • download_cereals doesn't depend on the output of any other solid.
  • find_sugariest depends on the output of download_cereals.

Let's visualize this pipeline in Dagit:

dagit -f serial_pipeline.py

Navigate to http://127.0.0.1:3000/pipeline/serial_pipeline/ or choose "serial_pipeline" from the left sidebar:

serial_pipeline_figure_one.png

A More Complex DAG#

Solids don't need to be wired together serially. The output of one solid can be consumed by any number of other solids, and the outputs of several different solids can be consumed by a single solid.

@solid
def download_cereals():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]


@solid
def find_highest_calorie_cereal(cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["calories"])
    )
    return sorted_cereals[-1]["name"]


@solid
def find_highest_protein_cereal(cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["protein"])
    )
    return sorted_cereals[-1]["name"]


@solid
def display_results(context, most_calories, most_protein):
    context.log.info(f"Most caloric cereal: {most_calories}")
    context.log.info(f"Most protein-rich cereal: {most_protein}")


@pipeline
def complex_pipeline():
    cereals = download_cereals()
    display_results(
        most_calories=find_highest_calorie_cereal(cereals),
        most_protein=find_highest_protein_cereal(cereals),
    )

First we introduce the intermediate variable cereals into our pipeline definition to represent the output of the load_cereals solid. Then we make both find_highest_calorie_cereal and find_highest_protein_cereal consume this output. Their outputs are in turn both consumed by display_results.

Let's visualize this pipeline in Dagit:

dagit -f complex_pipeline.py
complex_pipeline_figure_one.png

When you execute this example from Dagit, you'll see that load_cereals executes first, followed by find_highest_calorie_cereal and find_highest_protein_cereal—in any order—and that display_results executes last, only after find_highest_calorie_cereal and find_highest_protein_cereal have both executed.

In more sophisticated execution environments, find_highest_calorie_cereal and find_highest_protein_cereal could execute not just in any order, but at the same time, since they don't depend on each other's outputs—but both would still have to execute after load_cereals (because they depend on its output) and before display_results (because display_results depends on both of their outputs).