DagsterDocs

Run Requests

class dagster.RunRequest(run_key, run_config=None, tags=None)[source]

Represents all the information required to launch a single run. Must be returned by a SensorDefinition or ScheduleDefinition’s evaluation function for a run to be launched.

run_key

A string key to identify this launched run. For sensors, ensures that only one run is created per run key across all sensor evaluations. For schedules, ensures that one run is created per tick, across failure recoveries. Passing in a None value means that a run will always be launched per evaluation.

Type

str | None

run_config

The environment config that parameterizes the run execution to be launched, as a dict.

Type

Optional[Dict]

tags

A dictionary of tags (string key-value pairs) to attach to the launched run.

Type

Optional[Dict[str, str]]

class dagster.SkipReason(skip_message=None)[source]

Represents a skipped evaluation, where no runs are requested. May contain a message to indicate why no runs were requested.

skip_message

A message displayed in dagit for why this evaluation resulted in no requested runs.

Type

Optional[str]

Schedules

@dagster.schedule(cron_schedule, pipeline_name=None, name=None, tags=None, tags_fn=None, solid_selection=None, mode='default', should_execute=None, environment_vars=None, execution_timezone=None, description=None, job=None)[source]

Create a schedule.

The decorated function will be called as the run_config_fn of the underlying ScheduleDefinition and should take a ScheduleExecutionContext as its only argument, returning the run config for the scheduled execution.

Parameters
  • cron_schedule (str) – A valid cron string specifying when the schedule will run, e.g., '45 23 * * 6' for a schedule that runs at 11:45 PM every Saturday.

  • pipeline_name (str) – The name of the pipeline to execute when the schedule runs.

  • name (Optional[str]) – The name of the schedule to create.

  • tags (Optional[Dict[str, str]]) – A dictionary of tags (string key-value pairs) to attach to the scheduled runs.

  • tags_fn (Optional[Callable[[ScheduleExecutionContext], Optional[Dict[str, str]]]]) – A function that generates tags to attach to the schedules runs. Takes a ScheduleExecutionContext and returns a dictionary of tags (string key-value pairs). You may set only one of tags and tags_fn.

  • solid_selection (Optional[List[str]]) – A list of solid subselection (including single solid names) to execute when the schedule runs. e.g. ['*some_solid+', 'other_solid']

  • mode (Optional[str]) – The pipeline mode in which to execute this schedule. (Default: ‘default’)

  • should_execute (Optional[Callable[[ScheduleExecutionContext], bool]]) – A function that runs at schedule execution tie to determine whether a schedule should execute or skip. Takes a ScheduleExecutionContext and returns a boolean (True if the schedule should execute). Defaults to a function that always returns True.

  • environment_vars (Optional[Dict[str, str]]) – Any environment variables to set when executing the schedule.

  • execution_timezone (Optional[str]) – Timezone in which the schedule should run. Only works with DagsterDaemonScheduler, and must be set when using that scheduler.

  • description (Optional[str]) – A human-readable description of the schedule.

  • job (Optional[PipelineDefinition]) – Experimental

@dagster.monthly_schedule(pipeline_name, start_date, name=None, execution_day_of_month=1, execution_time=datetime.time(0, 0), tags_fn_for_date=None, solid_selection=None, mode='default', should_execute=None, environment_vars=None, end_date=None, execution_timezone=None, partition_months_offset=1, description=None)[source]

Create a partitioned schedule that runs monthly.

The decorated function should accept a datetime object as its only argument. The datetime represents the date partition that it’s meant to run on.

The decorated function should return a run configuration dictionary, which will be used as configuration for the scheduled run.

The decorator produces a PartitionScheduleDefinition.

Parameters
  • pipeline_name (str) – The name of the pipeline to execute when the schedule runs.

  • start_date (datetime.datetime) – The date from which to run the schedule.

  • name (Optional[str]) – The name of the schedule to create.

  • execution_day_of_month (int) – The day of the month on which to run the schedule (must be between 1 and 31).

  • execution_time (datetime.time) – The time at which to execute the schedule.

  • tags_fn_for_date (Optional[Callable[[datetime.datetime], Optional[Dict[str, str]]]]) – A function that generates tags to attach to the schedules runs. Takes the date of the schedule run and returns a dictionary of tags (string key-value pairs).

  • solid_selection (Optional[List[str]]) – A list of solid subselection (including single solid names) to execute when the schedule runs. e.g. ['*some_solid+', 'other_solid']

  • mode (Optional[str]) – The pipeline mode in which to execute this schedule. (Default: ‘default’)

  • should_execute (Optional[Callable[ScheduleExecutionContext, bool]]) – A function that runs at schedule execution tie to determine whether a schedule should execute or skip. Takes a ScheduleExecutionContext and returns a boolean (True if the schedule should execute). Defaults to a function that always returns True.

  • environment_vars (Optional[Dict[str, str]]) – Any environment variables to set when executing the schedule.

  • end_date (Optional[datetime.datetime]) – The last time to run the schedule to, defaults to current time.

  • execution_timezone (Optional[str]) – Timezone in which the schedule should run. Only works with DagsterDaemonScheduler, and must be set when using that scheduler.

  • partition_months_offset (Optional[int]) – How many months back to go when choosing the partition for a given schedule execution. For example, when partition_months_offset=1, the schedule that executes during month N will fill in the partition for month N-1. (Default: 1)

  • description (Optional[str]) – A human-readable description of the schedule.

@dagster.weekly_schedule(pipeline_name, start_date, name=None, execution_day_of_week=0, execution_time=datetime.time(0, 0), tags_fn_for_date=None, solid_selection=None, mode='default', should_execute=None, environment_vars=None, end_date=None, execution_timezone=None, partition_weeks_offset=1, description=None)[source]

Create a partitioned schedule that runs daily.

The decorated function should accept a datetime object as its only argument. The datetime represents the date partition that it’s meant to run on.

The decorated function should return a run configuration dictionary, which will be used as configuration for the scheduled run.

The decorator produces a PartitionScheduleDefinition. :param pipeline_name: The name of the pipeline to execute when the schedule runs. :type pipeline_name: str :param start_date: The date from which to run the schedule. :type start_date: datetime.datetime :param name: The name of the schedule to create. :type name: Optional[str] :param execution_day_of_week: The day of the week on which to run the schedule. Must be

between 0 (Sunday) and 6 (Saturday).

Parameters
  • execution_time (datetime.time) – The time at which to execute the schedule.

  • tags_fn_for_date (Optional[Callable[[datetime.datetime], Optional[Dict[str, str]]]]) – A function that generates tags to attach to the schedules runs. Takes the date of the schedule run and returns a dictionary of tags (string key-value pairs).

  • solid_selection (Optional[List[str]]) – A list of solid subselection (including single solid names) to execute when the schedule runs. e.g. ['*some_solid+', 'other_solid']

  • mode (Optional[str]) – The pipeline mode in which to execute this schedule. (Default: ‘default’)

  • should_execute (Optional[Callable[ScheduleExecutionContext, bool]]) – A function that runs at schedule execution tie to determine whether a schedule should execute or skip. Takes a ScheduleExecutionContext and returns a boolean (True if the schedule should execute). Defaults to a function that always returns True.

  • environment_vars (Optional[Dict[str, str]]) – Any environment variables to set when executing the schedule.

  • end_date (Optional[datetime.datetime]) – The last time to run the schedule to, defaults to current time.

  • execution_timezone (Optional[str]) – Timezone in which the schedule should run. Only works with DagsterDaemonScheduler, and must be set when using that scheduler.

  • partition_weeks_offset (Optional[int]) – How many weeks back to go when choosing the partition for a given schedule execution. For example, when partition_weeks_offset=1, the schedule that executes during week N will fill in the partition for week N-1. (Default: 1)

  • description (Optional[str]) – A human-readable description of the schedule.

@dagster.hourly_schedule(pipeline_name, start_date, name=None, execution_time=datetime.time(0, 0), tags_fn_for_date=None, solid_selection=None, mode='default', should_execute=None, environment_vars=None, end_date=None, execution_timezone=None, partition_hours_offset=1, description=None)[source]

Create a partitioned schedule that runs hourly.

The decorated function should accept a datetime object as its only argument. The datetime represents the date partition that it’s meant to run on.

The decorated function should return a run configuration dictionary, which will be used as configuration for the scheduled run.

The decorator produces a PartitionScheduleDefinition.

Parameters
  • pipeline_name (str) – The name of the pipeline to execute when the schedule runs.

  • start_date (datetime.datetime) – The date from which to run the schedule.

  • name (Optional[str]) – The name of the schedule to create. By default, this will be the name of the decorated function.

  • execution_time (datetime.time) – The time at which to execute the schedule. Only the minutes component will be respected – the hour should be 0, and will be ignored if it is not 0.

  • tags_fn_for_date (Optional[Callable[[datetime.datetime], Optional[Dict[str, str]]]]) – A function that generates tags to attach to the schedules runs. Takes the date of the schedule run and returns a dictionary of tags (string key-value pairs).

  • solid_selection (Optional[List[str]]) – A list of solid subselection (including single solid names) to execute when the schedule runs. e.g. ['*some_solid+', 'other_solid']

  • mode (Optional[str]) – The pipeline mode in which to execute this schedule. (Default: ‘default’)

  • should_execute (Optional[Callable[ScheduleExecutionContext, bool]]) – A function that runs at schedule execution tie to determine whether a schedule should execute or skip. Takes a ScheduleExecutionContext and returns a boolean (True if the schedule should execute). Defaults to a function that always returns True.

  • environment_vars (Optional[Dict[str, str]]) – Any environment variables to set when executing the schedule.

  • end_date (Optional[datetime.datetime]) – The last time to run the schedule to, defaults to current time.

  • execution_timezone (Optional[str]) – Timezone in which the schedule should run. Only works with DagsterDaemonScheduler, and must be set when using that scheduler.

  • partition_hours_offset (Optional[int]) – How many hours back to go when choosing the partition for a given schedule execution. For example, when partition_hours_offset=1, the schedule that executes during hour N will fill in the partition for hour N-1. (Default: 1)

  • description (Optional[str]) – A human-readable description of the schedule.

@dagster.daily_schedule(pipeline_name, start_date, name=None, execution_time=datetime.time(0, 0), tags_fn_for_date=None, solid_selection=None, mode='default', should_execute=None, environment_vars=None, end_date=None, execution_timezone=None, partition_days_offset=1, description=None)[source]

Create a partitioned schedule that runs daily.

The decorated function should accept a datetime object as its only argument. The datetime represents the date partition that it’s meant to run on.

The decorated function should return a run configuration dictionary, which will be used as configuration for the scheduled run.

The decorator produces a PartitionScheduleDefinition.

Parameters
  • pipeline_name (str) – The name of the pipeline to execute when the schedule runs.

  • start_date (datetime.datetime) – The date from which to run the schedule.

  • name (Optional[str]) – The name of the schedule to create.

  • execution_time (datetime.time) – The time at which to execute the schedule.

  • tags_fn_for_date (Optional[Callable[[datetime.datetime], Optional[Dict[str, str]]]]) – A function that generates tags to attach to the schedules runs. Takes the date of the schedule run and returns a dictionary of tags (string key-value pairs).

  • solid_selection (Optional[List[str]]) – A list of solid subselection (including single solid names) to execute when the schedule runs. e.g. ['*some_solid+', 'other_solid']

  • mode (Optional[str]) – The pipeline mode in which to execute this schedule. (Default: ‘default’)

  • should_execute (Optional[Callable[ScheduleExecutionContext, bool]]) – A function that runs at schedule execution tie to determine whether a schedule should execute or skip. Takes a ScheduleExecutionContext and returns a boolean (True if the schedule should execute). Defaults to a function that always returns True.

  • environment_vars (Optional[Dict[str, str]]) – Any environment variables to set when executing the schedule.

  • end_date (Optional[datetime.datetime]) – The last time to run the schedule to, defaults to current time.

  • execution_timezone (Optional[str]) – Timezone in which the schedule should run. Only works with DagsterDaemonScheduler, and must be set when using that scheduler.

  • partition_days_offset (Optional[int]) – How many days back to go when choosing the partition for a given schedule execution. For example, when partition_days_offset=1, the schedule that executes during day N will fill in the partition for day N-1. (Default: 1)

  • description (Optional[str]) – A human-readable description of the schedule.

class dagster.ScheduleDefinition(name, cron_schedule, pipeline_name=None, run_config=None, run_config_fn=None, tags=None, tags_fn=None, solid_selection=None, mode='default', should_execute=None, environment_vars=None, execution_timezone=None, execution_fn=None, description=None, job=None)[source]

Define a schedule that targets a pipeline

Parameters
  • name (str) – The name of the schedule to create.

  • cron_schedule (str) – A valid cron string specifying when the schedule will run, e.g., ‘45 23 * * 6’ for a schedule that runs at 11:45 PM every Saturday.

  • pipeline_name (str) – The name of the pipeline to execute when the schedule runs.

  • execution_fn (Callable[ScheduleExecutionContext]) –

    The core evaluation function for the schedule, which is run at an interval to determine whether a run should be launched or not. Takes a ScheduleExecutionContext.

    This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects.

  • run_config (Optional[Dict]) – The environment config that parameterizes this execution, as a dict.

  • run_config_fn (Optional[Callable[[ScheduleExecutionContext], [Dict]]]) – A function that takes a ScheduleExecutionContext object and returns the environment configuration that parameterizes this execution, as a dict. You may set only one of run_config, run_config_fn, and execution_fn.

  • tags (Optional[Dict[str, str]]) – A dictionary of tags (string key-value pairs) to attach to the scheduled runs.

  • tags_fn (Optional[Callable[[ScheduleExecutionContext], Optional[Dict[str, str]]]]) – A function that generates tags to attach to the schedules runs. Takes a ScheduleExecutionContext and returns a dictionary of tags (string key-value pairs). You may set only one of tags, tags_fn, and execution_fn.

  • solid_selection (Optional[List[str]]) – A list of solid subselection (including single solid names) to execute when the schedule runs. e.g. ['*some_solid+', 'other_solid']

  • mode (Optional[str]) – The mode to apply when executing this schedule. (default: ‘default’)

  • should_execute (Optional[Callable[[ScheduleExecutionContext], bool]]) – A function that runs at schedule execution time to determine whether a schedule should execute or skip. Takes a ScheduleExecutionContext and returns a boolean (True if the schedule should execute). Defaults to a function that always returns True.

  • environment_vars (Optional[dict[str, str]]) – The environment variables to set for the schedule

  • execution_timezone (Optional[str]) – Timezone in which the schedule should run. Only works with DagsterDaemonScheduler, and must be set when using that scheduler.

  • description (Optional[str]) – A human-readable description of the schedule.

  • target (Optional[GraphDefinition]) – Experimental

evaluate_tick(context)[source]

Evaluate schedule using the provided context.

Parameters

context (ScheduleExecutionContext) – The context with which to evaluate this schedule.

Returns

Contains list of run requests, or skip message if present.

Return type

ScheduleExecutionData

class dagster.ScheduleExecutionContext(instance_ref, scheduled_execution_time)[source]

Schedule-specific execution context.

An instance of this class is made available as the first argument to various ScheduleDefinition functions. It is passed as the first argument to run_config_fn, tags_fn, and should_execute.

instance_ref

The serialized instance configured to run the schedule

Type

Optional[InstanceRef]

scheduled_execution_time

The time in which the execution was scheduled to happen. May differ slightly from both the actual execution time and the time at which the run config is computed. Not available in all schedulers - currently only set in deployments using DagsterDaemonScheduler.

Type

datetime

class dagster.PartitionScheduleDefinition(name, cron_schedule, pipeline_name, tags_fn, solid_selection, mode, should_execute, environment_vars, partition_set, run_config_fn=None, execution_timezone=None, execution_fn=None, description=None)[source]
dagster.build_schedule_context(instance=None, scheduled_execution_time=None)[source]

Builds schedule execution context using the provided parameters.

The instance provided to build_schedule_context must be persistent; DagsterInstance.ephemeral() will result in an error.

Parameters
  • instance (Optional[DagsterInstance]) – The dagster instance configured to run the schedule.

  • scheduled_execution_time (datetime) – The time in which the execution was scheduled to happen. May differ slightly from both the actual execution time and the time at which the run config is computed.

Examples

context = build_schedule_context(instance)
daily_schedule.evaluate_tick(context)
class dagster.core.scheduler.DagsterDaemonScheduler(max_catchup_runs=5, inst_data=None)[source]

Default scheduler implementation that submits runs from the dagster-daemon long-lived process. Periodically checks each running schedule for execution times that don’t have runs yet and launches them.

Parameters

max_catchup_runs (int) –

For partitioned schedules, controls the maximum number of past partitions for each schedule that will be considered when looking for missing runs (defaults to 5). Generally this parameter will only come into play if the scheduler falls behind or launches after experiencing downtime. This parameter will not be checked for schedules without partition sets (for example, schedules created using the @schedule decorator) - only the most recent execution time will be considered for those schedules.

Note that no matter what this value is, the scheduler will never launch a run from a time before the schedule was turned on (even if the start_date on the schedule is earlier) - if you want to launch runs for earlier partitions, launch a backfill.

Sensors

@dagster.sensor(pipeline_name=None, name=None, solid_selection=None, mode=None, minimum_interval_seconds=None, description=None, job=None)[source]

Creates a sensor where the decorated function is used as the sensor’s evaluation function. The decorated function may:

  1. Return a RunRequest object.

  2. Yield multiple of RunRequest objects.

  3. Return or yield a SkipReason object, providing a descriptive message of why no runs were requested.

  4. Return or yield nothing (skipping without providing a reason)

Takes a SensorExecutionContext.

Parameters
  • pipeline_name (str) – Name of the target pipeline

  • name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.

  • solid_selection (Optional[List[str]]) – A list of solid subselection (including single solid names) to execute for runs for this sensor e.g. ['*some_solid+', 'other_solid']

  • mode (Optional[str]) – The mode to apply when executing runs for this sensor. (default: ‘default’)

  • minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.

  • description (Optional[str]) – A human-readable description of the sensor.

  • job (Optional[PipelineDefinition]) – Experimental

class dagster.SensorDefinition(name, pipeline_name, evaluation_fn, solid_selection=None, mode=None, minimum_interval_seconds=None, description=None, job=None)[source]

Define a sensor that initiates a set of runs based on some external state

Parameters
  • name (str) – The name of the sensor to create.

  • pipeline_name (str) – The name of the pipeline to execute when the sensor fires.

  • evaluation_fn (Callable[[SensorExecutionContext]]) –

    The core evaluation function for the sensor, which is run at an interval to determine whether a run should be launched or not. Takes a SensorExecutionContext.

    This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects.

  • solid_selection (Optional[List[str]]) – A list of solid subselection (including single solid names) to execute when the sensor runs. e.g. ['*some_solid+', 'other_solid']

  • mode (Optional[str]) – The mode to apply when executing runs triggered by this sensor. (default: ‘default’)

  • minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.

  • description (Optional[str]) – A human-readable description of the sensor.

  • job (Optional[PipelineDefinition]) – Experimental

evaluate_tick(context)[source]

Evaluate sensor using the provided context.

Parameters

context (SensorExecutionContext) – The context with which to evaluate this sensor.

Returns

Contains list of run requests, or skip message if present.

Return type

SensorExecutionData

class dagster.SensorExecutionContext(instance_ref, last_completion_time, last_run_key, cursor)[source]

Sensor execution context.

An instance of this class is made available as the first argument to the evaluation function on SensorDefinition.

instance_ref

The serialized instance configured to run the schedule

Type

Optional[InstanceRef]

cursor

The cursor, passed back from the last sensor evaluation via the cursor attribute of SkipReason and RunRequest

Type

Optional[str]

last_completion_time

DEPRECATED The last time that the sensor was evaluated (UTC).

Type

float

last_run_key

DEPRECATED The run key of the RunRequest most recently created by this sensor. Use the preferred cursor attribute instead.

Type

str

dagster.build_sensor_context(instance=None, cursor=None)[source]

Builds sensor execution context using the provided parameters.

If provided, the dagster instance must be persistent; DagsterInstance.ephemeral() will result in an error.

Parameters
  • instance (Optional[DagsterInstance]) – The dagster instance configured to run the sensor.

  • cursor (Optional[str]) – A cursor value to provide to the evaluation of the sensor.

Examples

context = build_sensor_context()
my_sensor.evaluate_tick(context)