Please note that internal APIs are likely to be in much greater flux pre-1.0 than user-facing APIs,
particularly if not exported in the top level dagster
module.
If you find yourself consulting these docs because you are writing custom components and plug-ins, please get in touch with the core team on our Slack. We’re curious what you’re up to, happy to help, excited for new community contributions, and eager to make the system as easy to work with as possible – including for teams who are looking to customize it.
dagster.
DagsterLogManager
(run_id, logging_tags, loggers)[source]¶Centralized dispatch for logging from user code.
Handles the construction of uniform structured log messages and passes them through to the underlying loggers.
An instance of the log manager is made available to solids as context.log
. Users should not
initialize instances of the log manager directly. To configure custom loggers, set the
logger_defs
on a ModeDefinition
for a pipeline.
The log manager supports standard convenience methods like those exposed by the Python standard
library logging
module (i.e., within the body of a solid,
context.log.{debug, info, warning, warn, error, critical, fatal}
).
The underlying integer API can also be called directly using, e.g.
context.log.log(5, msg)
, and the log manager will delegate to the log
method
defined on each of the loggers it manages.
User-defined custom log levels are not supported, and calls to, e.g.,
context.log.trace
or context.log.notice
will result in hard exceptions at runtime.
debug
(msg, **kwargs)[source]¶Log at the logging.DEBUG
level.
The message will be automatically adorned with contextual information about the name of the pipeline, the name of the solid, etc., so it is generally unnecessary to include this type of information in the log message.
You can optionally additional key-value pairs to an individual log message using the kwargs to this method.
msg (str) – The message to log.
**kwargs (Optional[Any]) – Any additional key-value pairs for only this log message.
fatal
(msg, **kwargs)¶Alias for critical()
Add new tags in “new_tags” to the set of tags attached to this log manager instance, and return a new DagsterLogManager with the merged set of tags.
run ID and loggers.
@
dagster.
executor
(name=None, config_schema=None, requirements=None)[source]¶Define an executor.
The decorated function should accept an InitExecutorContext
and return an instance
of Executor
.
name (Optional[str]) – The name of the executor.
config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.executor_config. If not set, Dagster will accept any config provided for.
requirements (Optional[List[ExecutorRequirement]]) – Any requirements that must be met in order for the executor to be usable for a particular pipeline execution.
dagster.
ExecutorDefinition
(name, config_schema=None, requirements=None, executor_creation_fn=None, description=None)[source]¶name (str) – The name of the executor.
config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.executor_config. If not set, Dagster will accept any config provided.
requirements (Optional[List[ExecutorRequirement]]) – Any requirements that must be met in order for the executor to be usable for a particular pipeline execution.
executor_creation_fn (Optional[Callable]) – Should accept an InitExecutorContext
and return an instance of Executor
required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the executor.
configured
(config_or_config_fn, name=None, config_schema=None, description=None)[source]¶Wraps this object in an object of the same type that provides configuration to the inner object.
config_or_config_fn (Union[Any, Callable[[Any], Any]]) – Either (1) Run configuration
that fully satisfies this object’s config schema or (2) A function that accepts run
configuration and returns run configuration that fully satisfies this object’s
config schema. In the latter case, config_schema must be specified. When
passing a function, it’s easiest to use configured()
.
name (Optional[str]) – Name of the new definition. If not provided, the emitted definition will inherit the name of the ExecutorDefinition upon which this function is called.
config_schema (Optional[ConfigSchema]) – If config_or_config_fn is a function, the config schema that its input must satisfy. If not set, Dagster will accept any config provided.
description (Optional[str]) – Description of the new definition. If not specified, inherits the description of the definition being configured.
Returns (ConfigurableDefinition): A configured version of this object.
dagster.
InitExecutorContext
(pipeline, executor_def, executor_config, instance)[source]¶Executor-specific initialization context.
pipeline
¶The pipeline to be executed.
IPipeline
executor_def
¶The definition of the executor currently being constructed.
instance
¶The current instance.
dagster.
Executor
[source]¶execute
(pipeline_context, execution_plan)[source]¶For the given context and execution plan, orchestrate a series of sub plan executions in a way that satisfies the whole plan being executed.
pipeline_context (PlanOrchestrationContext) – The pipeline orchestration context.
execution_plan (ExecutionPlan) – The plan to execute.
A stream of dagster events.
retries
¶Whether retries are enabled or disabled for this instance of the executor.
Executors should allow this to be controlled via configuration if possible.
Returns: RetryMode
dagster.core.storage.file_manager.
FileManager
[source]¶Base class for all file managers in dagster.
The file manager is an interface that can be implemented by resources to provide abstract access to a file system such as local disk, S3, or other cloud storage.
For examples of usage, see the documentation of the concrete file manager implementations.
copy_handle_to_local_temp
(file_handle)[source]¶Copy a file represented by a file handle to a temp file.
In an implementation built around an object store such as S3, this method would be expected
to download the file from S3 to local filesystem in a location assigned by the standard
library’s tempfile
module.
Temp files returned by this method are not guaranteed to be reusable across solid
boundaries. For files that must be available across solid boundaries, use the
read()
,
read_data()
,
write()
, and
write_data()
methods.
file_handle (FileHandle) – The handle to the file to make available as a local temp file.
Path to the local temp file.
delete_local_temp
()[source]¶Delete all local temporary files created by previous calls to
copy_handle_to_local_temp()
.
Should typically only be called by framework implementors.
read
(file_handle, mode='rb')[source]¶Return a file-like stream for the file handle.
This may incur an expensive network call for file managers backed by object stores such as S3.
file_handle (FileHandle) – The file handle to make available as a stream.
mode (str) – The mode in which to open the file. Default: "rb"
.
A file-like stream.
Union[TextIO, BinaryIO]
read_data
(file_handle)[source]¶Return the bytes for a given file handle. This may incur an expensive network call for file managers backed by object stores such as s3.
file_handle (FileHandle) – The file handle for which to return bytes.
Bytes for a given file handle.
write
(file_obj, mode='wb', ext=None)[source]¶Write the bytes contained within the given file object into the file manager.
A handle to the newly created file.
dagster.
local_file_manager
ResourceDefinition[source]¶FileManager that provides abstract access to a local filesystem.
Implements the FileManager
API.
Examples:
import tempfile
from dagster import ModeDefinition, local_file_manager, pipeline, solid
@solid(required_resource_keys={"file_manager"})
def write_files(context):
fh_1 = context.resources.file_manager.write_data(b"foo")
with tempfile.NamedTemporaryFile("w+") as fd:
fd.write("bar")
fd.seek(0)
fh_2 = context.resources.file_manager.write(fd, mode="w", ext=".txt")
return (fh_1, fh_2)
@solid(required_resource_keys={"file_manager"})
def read_files(context, file_handles):
fh_1, fh_2 = file_handles
assert context.resources.file_manager.read_data(fh_2) == b"bar"
fd = context.resources.file_manager.read(fh_2, mode="r")
assert fd.read() == "foo"
fd.close()
@pipeline(mode_defs=[ModeDefinition(resource_defs={"file_manager": local_file_manager})])
def files_pipeline():
read_files(write_files())
@
dagster.
intermediate_storage
(required_resource_keys=None, name=None, is_persistent=True, config_schema=None)[source]¶Creates an intermediate storage definition
The decorated function will be passed as the intermediate_storage_creation_fn
to a
IntermediateStorageDefinition
.
name (str) – The name of the intermediate storage.
is_persistent (bool) – Whether the storage is persistent in a way that can cross process/node boundaries. Re-execution with, for example, the multiprocess executor, or with dagster-airflow, requires a persistent storage mode.
required_resource_keys (Optional[Set[str]]) – The resources that this storage needs at runtime to function.
config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.intermediate_storage_config.
dagster.
IntermediateStorageDefinition
(name, is_persistent, required_resource_keys, config_schema=None, intermediate_storage_creation_fn=None, description=None)[source]¶Defines intermediate data storage behaviors.
name (str) – Name of the storage mode.
is_persistent (bool) – Whether the storage is persistent in a way that can cross process/node boundaries. Re-execution with, for example, the multiprocess executor, or with dagster-airflow, requires a persistent storage mode.
required_resource_keys (Optional[Set[str]]) – The resources that this storage needs at runtime to function.
config_schema (Optional[ConfigSchema]) – The schema for the storage’s configuration.
Configuration data passed in this schema will be made available to the
intermediate_storage_creation_fn
under init_context.intermediate_storage_config
.
If not set, Dagster will accept any config provided.
intermediate_storage_creation_fn – (Callable[[InitIntermediateStorageContext], IntermediateStorage])
Called to construct the storage. This function should consume the init context and emit
a IntermediateStorage
.
dagster.
InitIntermediateStorageContext
(pipeline_def, mode_def, intermediate_storage_def, pipeline_run, instance, environment_config, type_storage_plugin_registry, resources, intermediate_storage_config)[source]¶Intermediate storage-specific initialization context.
pipeline_def
¶The definition of the pipeline in context.
mode_def
¶The definition of the mode in context.
intermediate_storage_def
¶The definition of the intermediate storage to be constructed.
pipeline_run
¶The pipeline run in context.
instance
¶The instance.
environment_config
¶The environment config.
EnvironmentConfig
type_storage_plugin_registry
¶Registry containing custom type storage plugins.
TypeStoragePluginRegistry
resources
¶Resources available in context.
Any
intermediate_storage_config
¶The intermediate storage-specific configuration data
provided by the environment config. The schema for this data is defined by the
config_schema
argument to IntermediateStorageDefinition
.
Dict[str, Any]
dagster.
DagsterInstance
(instance_type, local_artifact_storage, run_storage, event_storage, compute_log_manager, schedule_storage=None, scheduler=None, run_coordinator=None, run_launcher=None, settings=None, ref=None)[source]¶Core abstraction for managing Dagster’s access to storage and other resources.
Use DagsterInstance.get() to grab the current DagsterInstance which will load based on
the values in the dagster.yaml
file in $DAGSTER_HOME
.
Alternatively, DagsterInstance.ephemeral() can use used which provides a set of transient in-memory components.
Configuration of this class should be done by setting values in $DAGSTER_HOME/dagster.yaml
.
For example, to use Postgres for run and event log storage, you can write a dagster.yaml
such as the following:
instance_type (InstanceType) – Indicates whether the instance is ephemeral or persistent.
Users should not attempt to set this value directly or in their dagster.yaml
files.
local_artifact_storage (LocalArtifactStorage) – The local artifact storage is used to
configure storage for any artifacts that require a local disk, such as schedules, or
when using the filesystem system storage to manage files and intermediates. By default,
this will be a dagster.core.storage.root.LocalArtifactStorage
. Configurable
in dagster.yaml
using the ConfigurableClass
machinery.
run_storage (RunStorage) – The run storage is used to store metadata about ongoing and past
pipeline runs. By default, this will be a
dagster.core.storage.runs.SqliteRunStorage
. Configurable in dagster.yaml
using the ConfigurableClass
machinery.
event_storage (EventLogStorage) – Used to store the structured event logs generated by
pipeline runs. By default, this will be a
dagster.core.storage.event_log.SqliteEventLogStorage
. Configurable in
dagster.yaml
using the ConfigurableClass
machinery.
compute_log_manager (ComputeLogManager) – The compute log manager handles stdout and stderr
logging for solid compute functions. By default, this will be a
dagster.core.storage.local_compute_log_manager.LocalComputeLogManager
.
Configurable in dagster.yaml
using the
ConfigurableClass
machinery.
run_coordinator (RunCoordinator) – A runs coordinator may be used to manage the execution of pipeline runs.
run_launcher (Optional[RunLauncher]) – Optionally, a run launcher may be used to enable a Dagster instance to launch pipeline runs, e.g. on a remote Kubernetes cluster, in addition to running them locally.
settings (Optional[Dict]) – Specifies certain per-instance settings,
such as feature flags. These are set in the dagster.yaml
under a set of whitelisted
keys.
ref (Optional[InstanceRef]) – Used by internal machinery to pass instances across process boundaries.
get_addresses_for_step_output_versions
(step_output_versions)[source]¶For each given step output, finds whether an output exists with the given version, and returns its address if it does.
launch_run
(run_id, external_pipeline)[source]¶Launch a pipeline run.
This method is typically called using instance.submit_run rather than being invoked
directly. This method delegates to the RunLauncher
, if any, configured on the instance,
and will call its implementation of RunLauncher.launch_run()
to begin the execution of
the specified run. Runs should be created in the instance (e.g., by calling
DagsterInstance.create_run()
) before this method is called, and should be in the
PipelineRunStatus.NOT_STARTED
state.
run_id (str) – The id of the run the launch.
report_engine_event
(message, pipeline_run, engine_event_data=None, cls=None, step_key=None)[source]¶Report a EngineEvent that occurred outside of a pipeline execution context.
submit_run
(run_id, external_pipeline)[source]¶Submit a pipeline run to the coordinator.
This method delegates to the RunCoordinator
, configured on the instance, and will
call its implementation of RunCoordinator.submit_run()
to send the run to the
coordinator for execution. Runs should be created in the instance (e.g., by calling
DagsterInstance.create_run()
) before this method is called, and
should be in the PipelineRunStatus.NOT_STARTED
state. They also must have a non-null
ExternalPipelineOrigin.
run_id (str) – The id of the run.
dagster.core.instance.
InstanceRef
(local_artifact_storage_data, run_storage_data, event_storage_data, compute_logs_data, schedule_storage_data, scheduler_data, run_coordinator_data, run_launcher_data, settings, custom_instance_class_data=None)[source]¶Serializable representation of a DagsterInstance
.
Users should not instantiate this class directly.
dagster.serdes.
ConfigurableClass
[source]¶Abstract mixin for classes that can be loaded from config.
This supports a powerful plugin pattern which avoids both a) a lengthy, hard-to-synchronize list of conditional imports / optional extras_requires in dagster core and b) a magic directory or file in which third parties can place plugin packages. Instead, the intention is to make, e.g., run storage, pluggable with a config chunk like:
run_storage:
module: very_cool_package.run_storage
class: SplendidRunStorage
config:
magic_word: "quux"
This same pattern should eventually be viable for other system components, e.g. engines.
The ConfigurableClass
mixin provides the necessary hooks for classes to be instantiated from
an instance of ConfigurableClassData
.
Pieces of the Dagster system which we wish to make pluggable in this way should consume a config type such as:
{'module': str, 'class': str, 'config': Field(Permissive())}
config_type
()[source]¶dagster.ConfigType: The config type against which to validate a config yaml fragment
serialized in an instance of ConfigurableClassData
.
from_config_value
(inst_data, config_value)[source]¶New up an instance of the ConfigurableClass from a validated config value.
Called by ConfigurableClassData.rehydrate.
config_value (dict) – The validated config value to use. Typically this should be the
value
attribute of a
EvaluateValueResult
.
A common pattern is for the implementation to align the config_value with the signature of the ConfigurableClass’s constructor:
@staticmethod
def from_config_value(inst_data, config_value):
return MyConfigurableClass(inst_data=inst_data, **config_value)
inst_data
¶Subclass must be able to return the inst_data as a property if it has been constructed through the from_config_value code path.
dagster.serdes.
ConfigurableClassData
(module_name, class_name, config_yaml)[source]¶Serializable tuple describing where to find a class and the config fragment that should be used to instantiate it.
Users should not instantiate this class directly.
Classes intended to be serialized in this way should implement the
dagster.serdes.ConfigurableClass
mixin.
dagster.core.storage.root.
LocalArtifactStorage
(base_dir, inst_data=None)[source]¶config_type
()[source]¶dagster.ConfigType: The config type against which to validate a config yaml fragment
serialized in an instance of ConfigurableClassData
.
from_config_value
(inst_data, config_value)[source]¶New up an instance of the ConfigurableClass from a validated config value.
Called by ConfigurableClassData.rehydrate.
config_value (dict) – The validated config value to use. Typically this should be the
value
attribute of a
EvaluateValueResult
.
A common pattern is for the implementation to align the config_value with the signature of the ConfigurableClass’s constructor:
@staticmethod
def from_config_value(inst_data, config_value):
return MyConfigurableClass(inst_data=inst_data, **config_value)
inst_data
¶Subclass must be able to return the inst_data as a property if it has been constructed through the from_config_value code path.
dagster.
PipelineRun
(pipeline_name=None, run_id=None, run_config=None, mode=None, solid_selection=None, solids_to_execute=None, step_keys_to_execute=None, status=None, tags=None, root_run_id=None, parent_run_id=None, pipeline_snapshot_id=None, execution_plan_snapshot_id=None, external_pipeline_origin=None)[source]¶Serializable internal representation of a pipeline run, as stored in a
RunStorage
.
dagster.core.storage.runs.
RunStorage
[source]¶Abstract base class for storing pipeline run history.
Note that run storages using SQL databases as backing stores should implement
SqlRunStorage
.
Users should not directly instantiate concrete subclasses of this class; they are instantiated
by internal machinery when dagit
and dagster-graphql
load, based on the values in the
dagster.yaml
file in $DAGSTER_HOME
. Configuration of concrete subclasses of this class
should be done by setting values in that file.
dagster.core.storage.runs.
SqliteRunStorage
(conn_string, inst_data=None)[source]¶SQLite-backed run storage.
Users should not directly instantiate this class; it is instantiated by internal machinery when
dagit
and dagster-graphql
load, based on the values in the dagster.yaml
file in
$DAGSTER_HOME
. Configuration of this class should be done by setting values in that file.
This is the default run storage when none is specified in the dagster.yaml
.
To explicitly specify SQLite for run storage, you can add a block such as the following to your
dagster.yaml
:
run_storage:
module: dagster.core.storage.runs
class: SqliteRunStorage
config:
base_dir: /path/to/dir
The base_dir
param tells the run storage where on disk to store the database.
See also: dagster_postgres.PostgresRunStorage
and dagster_mysql.MySQLRunStorage
.
dagster.core.storage.event_log.
EventLogStorage
[source]¶Abstract base class for storing structured event logs from pipeline runs.
Note that event log storages using SQL databases as backing stores should implement
SqlEventLogStorage
.
Users should not directly instantiate concrete subclasses of this class; they are instantiated
by internal machinery when dagit
and dagster-graphql
load, based on the values in the
dagster.yaml
file in $DAGSTER_HOME
. Configuration of concrete subclasses of this class
should be done by setting values in that file.
dagster.core.storage.event_log.
SqlEventLogStorage
[source]¶Base class for SQL backed event log storages.
Distinguishes between run-based connections and index connections in order to support run-level sharding, while maintaining the ability to do cross-run queries
dagster.core.storage.event_log.
SqliteEventLogStorage
(base_dir, inst_data=None)[source]¶SQLite-backed event log storage.
Users should not directly instantiate this class; it is instantiated by internal machinery when
dagit
and dagster-graphql
load, based on the values in the dagster.yaml
file in
$DAGSTER_HOME
. Configuration of this class should be done by setting values in that file.
This is the default event log storage when none is specified in the dagster.yaml
.
To explicitly specify SQLite for event log storage, you can add a block such as the following
to your dagster.yaml
:
event_log_storage:
module: dagster.core.storage.event_log
class: SqliteEventLogStorage
config:
base_dir: /path/to/dir
The base_dir
param tells the event log storage where on disk to store the databases. To
improve concurrent performance, event logs are stored in a separate SQLite database for each
run.
dagster.core.storage.event_log.
ConsolidatedSqliteEventLogStorage
(base_dir, inst_data=None)[source]¶SQLite-backed consolidated event log storage intended for test cases only.
Users should not directly instantiate this class; it is instantiated by internal machinery when
dagit
and dagster-graphql
load, based on the values in the dagster.yaml
file in
$DAGSTER_HOME
. Configuration of this class should be done by setting values in that file.
To explicitly specify the consolidated SQLite for event log storage, you can add a block such as
the following to your dagster.yaml
:
run_storage:
module: dagster.core.storage.event_log
class: ConsolidatedSqliteEventLogStorage
config:
base_dir: /path/to/dir
The base_dir
param tells the event log storage where on disk to store the database.
See also: dagster_postgres.PostgresEventLogStorage
and dagster_mysql.MySQLEventLogStorage
.
dagster.core.storage.compute_log_manager.
ComputeLogManager
[source]¶Abstract base class for storing unstructured compute logs (stdout/stderr) from the compute steps of pipeline solids.
dagster.core.storage.local_compute_log_manager.
LocalComputeLogManager
(base_dir, polling_timeout=None, inst_data=None)[source]¶Stores copies of stdout & stderr for each compute step locally on disk.
See also: dagster_aws.S3ComputeLogManager
.
dagster.core.storage.memoizable_io_manager.
MemoizableIOManager
[source]¶Base class for IO manager enabled to work with memoized execution. Users should implement
the load_input
and handle_output
methods described in the IOManager
API, and the
has_output
method, which returns a boolean representing whether a data object can be found.
dagster.core.storage.memoizable_io_manager.
versioned_filesystem_io_manager
(init_context)[source]¶Filesystem IO manager that utilizes versioning of stored objects.
It requires users to specify a base directory where all the step outputs will be stored in. It serializes and deserializes output values (assets) using pickling and automatically constructs the filepaths for the assets using the provided directory, and the version for a provided step output.
See also: dagster.IOManager
.
dagster.core.launcher.
DefaultRunLauncher
(inst_data=None, wait_for_processes=False)[source]¶Launches runs against running GRPC servers.
See also: dagster_k8s.K8sRunLauncher
.
dagster.core.scheduler.
Scheduler
[source]¶Abstract base class for a scheduler. This component is responsible for interfacing with an external system such as cron to ensure scheduled repeated execution according.
dagster.core.scheduler.
DagsterDaemonScheduler
(max_catchup_runs=5, inst_data=None)[source]¶Default scheduler implementation that submits runs from the dagster-daemon long-lived process. Periodically checks each running schedule for execution times that don’t have runs yet and launches them.
max_catchup_runs (int) –
For partitioned schedules, controls the maximum number of past partitions for each schedule that will be considered when looking for missing runs (defaults to 5). Generally this parameter will only come into play if the scheduler falls behind or launches after experiencing downtime. This parameter will not be checked for schedules without partition sets (for example, schedules created using the @schedule decorator) - only the most recent execution time will be considered for those schedules.
Note that no matter what this value is, the scheduler will never launch a run from a time before the schedule was turned on (even if the start_date on the schedule is earlier) - if you want to launch runs for earlier partitions, launch a backfill.
dagster_cron.cron_scheduler.
SystemCronScheduler
(inst_data=None)[source]¶Scheduler implementation that uses the local systems cron. Only works on unix systems that have cron.
Enable this scheduler by adding it to your dagster.yaml
in $DAGSTER_HOME
.
dagster.core.storage.schedules.
ScheduleStorage
[source]¶Abstract class for managing persistance of scheduler artifacts
dagster.core.storage.schedules.
SqlScheduleStorage
[source]¶Base class for SQL backed schedule storage
dagster.core.storage.schedules.
SqliteScheduleStorage
(conn_string, inst_data=None)[source]¶Local SQLite backed schedule storage
see also: dagster_postgres.PostgresScheduleStorage
and dagster_mysql.MySQLScheduleStorage
.
dagster.core.errors.
user_code_error_boundary
(error_cls, msg_fn, **kwargs)[source]¶Wraps the execution of user-space code in an error boundary. This places a uniform policy around any user code invoked by the framework. This ensures that all user errors are wrapped in an exception derived from DagsterUserCodeExecutionError, and that the original stack trace of the user error is preserved, so that it can be reported without confusing framework code in the stack trace, if a tool author wishes to do so.
Examples:
with user_code_error_boundary(
# Pass a class that inherits from DagsterUserCodeExecutionError
DagsterExecutionStepExecutionError,
# Pass a function that produces a message
"Error occurred during step execution"
):
call_user_provided_function()