In production Dagster deployments, there are often many runs being launched at once. The run coordinator lets you control the policy that Dagster uses to manage the set of runs in your deployment. When you submit a run from Dagit or the Dagster command line, it is first sent to the run coordinator, which applies any limits or prioritization policies before eventually sending it to the run launcher to be launched.
The following run coordinators can be configured on your Dagster instance:
The DefaultRunCoordinator
simply calls launch_run
on the instance's run launcher immediately in the same process, without applying any limits or prioritization rules. With this run coordinator set, clicking Launch Run
in Dagit will cause the run to immediately launch from the Dagit process. Similarly, scheduled runs will launch immediately from the scheduler process.
The QueuedRunCoordinator
sends runs to the Dagster Daemon via a queue. The daemon pulls from the queue, and calls launch_run
on the submitted runs. Using this run coordinator enables instance-level limits on run concurrency, as well as custom run prioritization rules.
Dagster offers several options for limiting the number of concurrent pipeline runs on your instance. This can be useful when:
Concurrency limits are supported by the QueuedRunCoordinator, which uses the Dagster Daemon to coordinate which runs execute. To use this run coordinator, configure it on your Dagster instance and include the daemon as part of your Dagster deployment.
You can set the following parameters on the QueuedRunCoordinator by modifying its configuration in the Dagster instance yaml.
You can place limits on the number of runs that can be in progress at a single time. Any runs beyond this limit will be queued, and won’t use any compute.
max_concurrent_runs
key.tag_concurrency_limits
. Limits can be specified for all runs with a certain tag key or key-value pair.If any limit would be exceeded by launching a run, then the run will stay queued.
For example, you can add the following to your dagster.yaml:
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 25
tag_concurrency_limits:
- key: "database"
value: "redshift"
limit: 4
- key: "dagster/backfill"
limit: 10
This run queue will only allow a maximum of 25 runs at once. Additionally, only 4 runs with the "database" tag equal to "redshift" can run at once, and at most 10 runs with the "dagster/backfill" tag and any value can run at once.
The run queue is a first-in-first-out priority queue. By default, all runs have a priority of 0. Dagster will launch runs with higher priority first. If multiple runs have the same priority, Dagster will launch the runs in the order you submitted them to the queue. Negative priorities are also allowed and are useful for de-prioritizing sets of runs such as backfills. Priority values must be integers.
You can specify a custom priority using the dagster/priority tag, which you can set in code on definitions or in the Dagit playground. Note that because both tag keys and values must be strings, so you must specify the priority integer as a string. For example, if you wanted to specify a priority of -1, you would set the dagster/priority tag value to be the string "-1".
@pipeline(tags={"dagster/priority": "3"})
def important_pipeline():
...
@schedule(
cron_schedule="* * * * *",
pipeline_name="my_pipeline",
execution_timezone="US/Central",
tags={"dagster/priority": "-1"},
)
def less_important_schedule(_):
...
A priority tag on the pipeline definition can be overriden when launching in the Dagit playground.
In the absence of tag limits and priorities, queued runs are first-in-first-out. However, a run that is blocked by tag limits will not block runs submitted after it.
To illustrate, let’s say we submit 3 runs in order: run A (tagged ‘foo’:‘bar’), run B (tagged ‘foo’:‘bar’), and run C (no tags). With no limits configured, the runs will be launched in the order they were submitted, A → B → C. Next, say we add the following instance configuration:
tag_concurrency_limits:
- key: "foo"
limit: 1
Now A and B are not able to execute concurrently, while there is no limit on C. Assume each will execute for at least a few minutes. If we submit in order A → B → C, then
Thus the launch order will be A → C → B.
Below are some possible issues that you can run into with limiting run concurrency using the QueuedRunCoordinator.
This likely means that you are not using the QueuedRunCoordinator, which is responsible for run queuing. You can check the run coordinator on your instance in Dagit by going to Status
-> Configuration
.
If your runs stay in the QUEUED
status, the issue is likely either that the Dagster Daemon is not deployed on your instance, or that the queue configuration is blocking your runs.
In Dagit, go to Status
-> HEALTH
, and check that the Run queue
is healthy. If not:
Run queuing depends on the Daemon. See the Daemon Overview for how to set it up on your instance.
If you have started a Daemon process, you should make sure that the Daemon is accessing the same storages as the Dagit process, meaning both processes should be using the same dagster.yaml
. Locally, this means both processes should have the same DAGSTER_HOME
environment variable set. See the Dagster Instance docs for more information.
If the Daemon is running on your instance, it may be the case that runs are intentionally being left in the queue due to concurrency rules. It may be helpful to look at the logged output from the Daemon process, as this will indicate when runs are skipped.
The run queue is configured in the dagster.yaml
under run_coordinator
. This can be viewed in Dagit via Status
-> Configuration
. Runs may be staying in QUEUED
due to rules here. The most basic rule that would block the queue would be max_concurrent_runs: 0
. In more practical cases, the queue will be blocked by some number of in progress runs. To understand the state of your run queue, it's helpful to view both the 'Queued' and 'In Progress' tabs of the Dagit run page.
If there are runs in progress that are blocking the queue, you have the option of terminating them via Dagit so that other runs can proceed.
If these steps didn't help, reach out in Slack or file an issue and we'll be happy to help investigate.