Dagster includes a scheduler that can be used to launch runs on a fixed interval.
Name | Description |
---|---|
@daily_schedule | Decorator that defines a schedule that executes every day at a fixed time |
@hourly_schedule | Decorator that defines a schedule that executes every hour at a fixed time |
@weekly_schedule | Decorator that defines a schedule that executes every week on a fixed day and time |
@monthly_schedule | Decorator that defines a schedule that executes every month on a fixed day and time |
@schedule | Decorator that defines a schedule that executes according to a given cron schedule. It's important to note that schedules created using this schedule are non-partition based schedules |
ScheduleExecutionContext | The context passed to the schedule definition execution function |
build_schedule_context | A function that constructs a ScheduleExecutionContext |
ScheduleDefinition | Class for schedules. You almost never want to use initialize this class directly. Instead, you should use one of the many decorators above |
A schedule is a definition in Dagster that is used to execute a pipeline at a fixed interval. Each time at which a schedule is evaluated is called a tick. The schedule definition is responsible for generating run configuration for the pipeline on each tick.
Each schedule:
should_execute
function, which can be used to skip some ticks.Dagster includes a scheduler, which runs as part of the dagster-daemon process. Once you have defined a schedule, see the dagster-daemon page for instructions on how to run the daemon in order to execute your schedules.
There are two types of schedules in Dagster:
Partition-based schedules are best when your pipeline operates on data that is partitioned by time (for example, a pipeline that fills in a daily partition of a table). Non-partition-based schedules are best when you just want to run an operation on a fixed interval (for example, a pipeline that rebuilds the same table on each execution)
Partition-based schedules generate an underlying PartitionSetDefinition
to execute against. The scheduler will make sure that a run is executed for every partition in the partition set, starting from the time that the schedule is turned on.
For example, if we wanted to create a schedule that runs at 11:00 AM every day, we could use @daily_schedule
, a decorator that produces a partition-based daily schedule.
@daily_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2021, 1, 1),
execution_time=datetime.time(11, 0),
execution_timezone="US/Central",
)
def my_daily_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
The decorated schedule function should accept a datetime
and return the run config for the pipeline run associated with that partition. The scheduler will ensure that the schedule function is evaluated exactly once for every partition to generate run config, which is then used to create and launch a pipeline run.
Partition-based schedules require a start_date
that indicates when the set of partitions begins. The scheduler will only kick off runs for times after both the start_date
and the time when the schedule was turned on. You can kick off runs for times between the start_date
and when the schedule is turned on by launching a backfill.
By default, the partition that is used for the run will be one partition earlier than the partition that includes the current time, to capture a common ETL use case - for example, a daily schedule will fill in the previous day's partition, and a monthly schedule will fill in last month's partition. You can customize this behavior by changing the partition_days_offset
parameter for a daily schedule. The default value of this parameter is 1, which means that the scheduler goes back one day to determine the partition. Setting the value to 0 will cause the schedule to fill in the partition that corresponds to the current day, and increasing it above 1 will cause it to fill in an even earlier partition. A similarly-named parameter also exists for the other execution intervals.
When you want to run a schedule on a fixed interval and don't need partitions, you can use the @schedule
decorator to define your schedule.
For example, this schedule runs at 1:00 AM in US/Central time every day:
@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central")
def my_schedule(_context):
return {"solids": {"dataset_name": "my_dataset"}}
You can also access the execution time fron the passed-in context:
@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central")
def my_execution_time_schedule(context):
date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {"solids": {"dataset_name": "my_dataset", "execution_date": date}}
You can customize the timezone in which your schedule executes by setting the execution_timezone
parameter on your schedule to any tz timezone. Schedules with no timezone set run in UTC.
For example, the following schedule executes daily at 9AM in US/Pacific time:
@daily_schedule(
pipeline_name="my_data_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_time=datetime.time(9, 0),
execution_timezone="US/Pacific",
)
def my_timezone_schedule(date):
return {
"solids": {
"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H:%M:%S")}}
}
}
Because of Daylight Savings Time transitions, it's possible to specify an execution time that does not exist for every scheduled interval. For example, say you have a daily schedule with an execution time of 2:30 AM in the US/Eastern timezone. On 2019/03/10, the time jumps from 2:00 AM to 3:00 AM when Daylight Savings Time begins. Therefore, the time of 2:30 AM did not exist for the day.
If you specify such an execution time, Dagster runs your schedule at the next time that exists. In the previous example, the schedule would run at 3:00 AM.
It's also possible to specify an execution time that exists twice on one day every year. For example, on 2019/11/03 in US/Eastern time, the hour from 1:00 AM to 2:00 AM repeats, so a daily schedule running at 1:30 AM has two possible times in which it could execute. In this case, Dagster will execute your schedule at the latter of the two possible times.
Hourly schedules will be unaffected by daylight savings time transitions - the schedule will continue to run exactly once every hour, even as the timezone changes. In the example above where the hour from 1:00 AM to 2:00 AM repeats, an hourly schedule running at 30 minutes past the hour would run at 12:30 AM, both instances of 1:30 AM, and then proceed normally from 2:30 AM on.
In order for your schedule to run, it must be started. The easiest way to start and stop schedules is in Dagit from the Schedules page. You can also start and stop a schedule with the dagster schedule start
and dagster schedule stop
commands.
Once your schedule is started, if you're running the dagster-daemon process as part of your deployment, the schedule will begin executing immediately. See the Troubleshooting section below if your schedule has been started but isn't submitting runs.
Schedules can be tested by using build_schedule_context
to construct a ScheduleExecutionContext
, and passing this to the schedule's evaluate_tick
function. The resulting run config from this function can then be validated using the validate_run_config
function.
@hourly_schedule(
pipeline_name="test_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_time=datetime.time(hour=0, minute=25),
execution_timezone="US/Central",
)
def hourly_schedule_to_test(date):
return {
"solids": {
"process_data_for_date": {
"config": {
"date": date.strftime("%Y-%m-%d %H"),
}
}
}
}
def test_hourly_schedule():
schedule_data = hourly_schedule_to_test.evaluate_tick(build_schedule_context())
for run_request in schedule_data.run_requests:
assert validate_run_config(pipeline_for_test, run_request.run_config)
@hourly_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_time=datetime.time(hour=0, minute=25),
execution_timezone="US/Central",
)
def my_hourly_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H")}}}}
@daily_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_time=datetime.time(hour=9, minute=0),
execution_timezone="US/Central",
)
def my_daily_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
@weekly_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_day_of_week=1, # Monday
execution_timezone="US/Central",
)
def my_weekly_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
@monthly_schedule(
pipeline_name="my_pipeline",
start_date=datetime.datetime(2020, 1, 1),
execution_timezone="US/Central",
execution_day_of_month=15,
execution_time=datetime.time(hour=9, minute=0),
)
def my_monthly_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m")}}}}
If you already have a preset defined for a pipeline you want to schedule, you can extract the necessary attributes from the preset and pass them to the schedule decorator.
In this example, we directly use the solid_selection
, mode
, tags
, and run_config
from the preset:
@daily_schedule(
start_date=datetime.datetime(2020, 1, 1),
pipeline_name="my_pipeline",
solid_selection=preset.solid_selection,
mode=preset.mode,
tags_fn_for_date=lambda _: preset.tags,
)
def my_preset_schedule(_date):
return preset.run_config
You might need to modify the preset's run config to include information about the date partition:
@daily_schedule(
start_date=datetime.datetime(2020, 1, 1),
pipeline_name="my_pipeline",
solid_selection=preset.solid_selection,
mode=preset.mode,
tags_fn_for_date=lambda _: preset.tags,
)
def my_modified_preset_schedule(date):
modified_run_config = preset.run_config.copy()
modified_run_config["solids"]["process_data_for_date"]["config"]["date"] = date.strftime(
"%Y-%m-%d"
)
return modified_run_config
If you find yourself using presets to generate schedule definitions frequently, you can use a helper function similar to this one to take a preset and return a schedule.
def daily_schedule_definition_from_pipeline_preset(pipeline, preset_name, start_date):
preset = pipeline.get_preset(preset_name)
if not preset:
raise Exception(
"Preset {preset_name} was not found "
"on pipeline {pipeline_name}".format(
preset_name=preset_name, pipeline_name=pipeline.name
)
)
@daily_schedule(
start_date=start_date,
pipeline_name=pipeline.name,
solid_selection=preset.solid_selection,
mode=preset.mode,
tags_fn_for_date=lambda _: preset.tags,
)
def my_schedule(_date):
return preset.run_config
return my_schedule
Try these steps if you're trying to run a schedule and are running into problems.
The left-hand navigation bar in Dagit shows all of the schedules for the currently-selected repository, with a green dot next to each schedule that is running. Make sure that your schedule appears in this list with a green dot. To ensure that Dagit has loaded the latest version of your schedule code, you can press the reload button next to your repository name to reload all the code in your repository.
When you click on your schedule name in the left-hand nav in Dagit, you'll be take to a page where you can view more information about the schedule. If the schedule is running, there should be a "Next tick" row near the top of the page that tells you when the schedule is expected to run next. Make sure that time is what you expect (including the timezone).
It's possible that the dagster-daemon
process that submits runs for your schedule is not working correctly. If you haven't set up dagster-daemon
yet, check the Deploying Dagster section to find the steps to do so.
First, check that the daemon is running. Click on "Status" in the left nav in Dagit, and examine the "Scheduler" row under the "Daemon statuses" section. The daemon process periodically sends out a heartbeat from the scheduler, so if the scheduler daemon is listed as "Not running", that indicates that there's a problem with your daemon deployment. If the daemon ran into an error that caused it to throw an exception, that error will often appear in this UI as well.
If there isn't a clear error on this page, or if the daemon should be sending heartbeats but isn't, you may need to check the logs from the daemon process. The steps to do this will depend on your deployment - for example, if you're using Kubernetes, you'll need to get the logs from the pod that's running the daemon. You should be able to search those logs for the name of your schedule (or SchedulerDaemon
to see all logs associated with the scheduler) to gain an understanding of what's going wrong.
Finally, it's possible that the daemon is running correctly, but there's a problem with your schedule code. Check the "Latest tick" row on the page for your schedule. If there was an error while trying to submit runs for your schedule, there should be a red "Failure" box next to the time. Clicking on the box should display an error with a stack trace showing you why the schedule couldn't execute. If the schedule is working as expected, it should display a blue box instead with information about any runs that were created by that schedule tick.
If these steps didn't help and your schedule still isn't running, reach out in Slack or file an issue and we'll be happy to help investigate.