from abc import ABC, abstractmethod, abstractproperty
from typing import Any, Callable, Dict, Optional, Union
from dagster import Field, check
from dagster.config.evaluate_value_result import EvaluateValueResult
from .definition_config_schema import (
ConfiguredDefinitionConfigSchema,
IDefinitionConfigSchema,
convert_user_facing_definition_config_schema,
)
class ConfigurableDefinition(ABC):
@abstractproperty
def config_schema(self) -> Optional[IDefinitionConfigSchema]:
raise NotImplementedError()
@property
def has_config_field(self) -> bool:
return self.config_schema is not None and bool(self.config_schema.as_field())
@property
def config_field(self) -> Optional[Field]:
return None if not self.config_schema else self.config_schema.as_field()
# getter for typed access
def get_config_field(self) -> Field:
field = self.config_field
if field is None:
check.failed("Must check has_config_Field before calling get_config_field")
return field
def apply_config_mapping(self, config: Any) -> EvaluateValueResult:
"""
Applies user-provided config mapping functions to the given configuration and validates the
results against the respective config schema.
Expects incoming config to be validated and have fully-resolved values (StringSource values
resolved, Enum types hydrated, etc.) via process_config() during EnvironmentConfig
construction and CompositeSolid config mapping.
Args:
config (Any): A validated and resolved configuration dictionary matching this object's
config_schema
Returns (EvaluateValueResult):
If successful, the value is a validated and resolved configuration dictionary for the
innermost wrapped object after applying the config mapping transformation function.
"""
# If schema is on a mapped schema this is the innermost resource (base case),
# so we aren't responsible for validating against anything farther down.
# Returns an EVR for type consistency with config_mapping_fn.
return (
self.config_schema.resolve_config(config)
if isinstance(self.config_schema, ConfiguredDefinitionConfigSchema)
else EvaluateValueResult.for_value(config)
)
class AnonymousConfigurableDefinition(ConfigurableDefinition):
"""An interface that makes the `configured` method not accept a name argument."""
def configured(
self,
config_or_config_fn: Any,
config_schema: Optional[Dict[str, Any]] = None,
description: Optional[str] = None,
):
"""
Wraps this object in an object of the same type that provides configuration to the inner
object.
Args:
config_or_config_fn (Union[Any, Callable[[Any], Any]]): Either (1) Run configuration
that fully satisfies this object's config schema or (2) A function that accepts run
configuration and returns run configuration that fully satisfies this object's
config schema. In the latter case, config_schema must be specified. When
passing a function, it's easiest to use :py:func:`configured`.
config_schema (ConfigSchema): If config_or_config_fn is a function, the config schema
that its input must satisfy.
description (Optional[str]): Description of the new definition. If not specified,
inherits the description of the definition being configured.
Returns (ConfigurableDefinition): A configured version of this object.
"""
new_config_schema = ConfiguredDefinitionConfigSchema(
self, convert_user_facing_definition_config_schema(config_schema), config_or_config_fn
)
return self.copy_for_configured(description, new_config_schema, config_or_config_fn)
@abstractmethod
def copy_for_configured(
self,
description: Optional[str],
config_schema: IDefinitionConfigSchema,
config_or_config_fn: Union[Any, Callable[[Any], Any]],
):
raise NotImplementedError()
class NamedConfigurableDefinition(ConfigurableDefinition):
"""An interface that makes the `configured` method require a positional `name` argument."""
def configured(
self,
config_or_config_fn: Any,
name: str,
config_schema: Optional[Dict[str, Any]] = None,
description: Optional[str] = None,
):
"""
Wraps this object in an object of the same type that provides configuration to the inner
object.
Args:
config_or_config_fn (Union[Any, Callable[[Any], Any]]): Either (1) Run configuration
that fully satisfies this object's config schema or (2) A function that accepts run
configuration and returns run configuration that fully satisfies this object's
config schema. In the latter case, config_schema must be specified. When
passing a function, it's easiest to use :py:func:`configured`.
name (str): Name of the new definition. This is a required argument, as this definition
type has a name uniqueness constraint.
config_schema (ConfigSchema): If config_or_config_fn is a function, the config schema
that its input must satisfy.
description (Optional[str]): Description of the new definition. If not specified,
inherits the description of the definition being configured.
Returns (ConfigurableDefinition): A configured version of this object.
"""
name = check.str_param(name, "name")
new_config_schema = ConfiguredDefinitionConfigSchema(
self, convert_user_facing_definition_config_schema(config_schema), config_or_config_fn
)
return self.copy_for_configured(name, description, new_config_schema, config_or_config_fn)
@abstractmethod
def copy_for_configured(
self,
name: str,
description: Optional[str],
config_schema: IDefinitionConfigSchema,
config_or_config_fn: Union[Any, Callable[[Any], Any]],
):
raise NotImplementedError()
def _check_configurable_param(configurable: ConfigurableDefinition) -> Any:
from dagster.core.definitions.composition import PendingNodeInvocation
check.param_invariant(
not isinstance(configurable, PendingNodeInvocation),
"configurable",
(
"You have invoked `configured` on a PendingNodeInvocation (an intermediate type), which is "
"produced by aliasing or tagging a solid definition. To configure a solid, you must "
"call `configured` on either a SolidDefinition and CompositeSolidDefinition. To fix "
"this error, make sure to call `configured` on the definition object *before* using "
"the `tag` or `alias` methods. For usage examples, see "
"https://docs.dagster.io/overview/configuration#configured"
),
)
check.inst_param(
configurable,
"configurable",
ConfigurableDefinition,
(
"Only the following types can be used with the `configured` method: ResourceDefinition, "
"ExecutorDefinition, CompositeSolidDefinition, SolidDefinition, LoggerDefinition, "
"and IntermediateStorageDefinition. For usage examples of "
"`configured`, see https://docs.dagster.io/overview/configuration#configured"
),
)
def _is_named_configurable_param(configurable: ConfigurableDefinition) -> bool:
return isinstance(configurable, NamedConfigurableDefinition)