Dagster's core abstractions are solids and pipelines.
Solids are individual units of computation that we wire together to form pipelines. By default, all solids in a pipeline execute in the same process. In production environments, Dagster is usually configured so that each solid executes in its own process.
In this section, we'll cover how to define a simple pipeline with a single solid, and then execute it.
Our pipeline will operate on a simple but scary CSV dataset, cereal.csv, which contains nutritional facts about 80 breakfast cereals.
Let's write our first Dagster solid and save it as hello_cereal.py
.
A solid is a unit of computation in a data pipeline. Typically, you'll define solids by annotating ordinary Python functions with the @solid
decorator.
Our first solid does three things: downloads a csv of cereal data, reads it into a list of dictionaries which each represent a row in the CSV, and logs the number of rows it finds.
import requests
import csv
from dagster import pipeline, solid
@solid
def hello_cereal(context):
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
cereals = [row for row in csv.DictReader(lines)]
context.log.info(f"Found {len(cereals)} cereals")
return cereals
In this simple case, our solid takes no arguments except for the context
in which it executes (provided by the Dagster framework as the first argument to solids who specify it), and also returns no outputs. Don't worry, we'll soon encounter solids that are much more dynamic.
To execute our solid, we'll embed it in an equally simple pipeline. A pipeline is a set of solids arranged into a DAG of computation. You'll typically define pipelines by annotating ordinary Python functions with the @pipeline
decorator.
@pipeline
def hello_cereal_pipeline():
hello_cereal()
Here you'll see that we call hello_cereal()
. This call doesn't actually execute the solid. Within the bodies of functions decorated with @pipeline
, we use function calls to indicate the dependency structure of the solids making up the pipeline. Here, we indicate that the execution of hello_cereal
doesn't depend on any other solids by calling it with no arguments.
Assuming you’ve saved this pipeline as hello_cereal.py
, you can execute it via any of three different mechanisms:
To visualize your pipeline (which only has one node) in Dagit, from the directory in which you've saved the pipeline file, just run:
dagit -f hello_cereal.py
You'll see output like
Loading repository... Serving on http://127.0.0.1:3000
You should be able to navigate to http://127.0.0.1:3000/pipeline/hello_cereal_pipeline/ in your web browser and view your pipeline. It isn't very interesting yet, because it only has one node.
Click on the "Playground" tab and you'll see the view below.
The large upper left pane is empty here, but, in pipelines with parameters, this is where you'll be able to edit pipeline configuration on the fly.
Click the "Launch Execution" button on the bottom right to execute this plan directly from Dagit. A new window should open, and you'll see a much more structured view of the stream of Dagster events start to appear in the left-hand pane.
If you have pop-up blocking enabled, you may need to tell your browser to allow pop-ups from 127.0.0.1—or, just navigate to the "Runs" tab to see this, and every run of your pipeline.
In this view, you can filter and search through the logs corresponding to your pipeline run.
From the directory in which you've saved the pipeline file, just run:
dagster pipeline execute -f hello_cereal.py
You'll see the full stream of events emitted by Dagster appear in the console, including our call to the logging machinery, which will look like:
2021-02-05 08:50:25 - dagster - INFO - system - ce5d4576-2569-44ff-a14a-51010eea5329 - hello_cereal - Found 77 cereals
Success!
If you'd rather execute your pipelines as a script, you can do that without using the Dagster CLI at all. Just add a few lines to hello_cereal.py
from dagster import execute_pipeline
if __name__ == "__main__":
result = execute_pipeline(hello_cereal_pipeline)
Now you can just run:
python hello_cereal.py
The execute_pipeline()
function called here is the core Python API for executing Dagster pipelines from code.