Dagster provides a GraphQL Python Client to interface with Dagster's GraphQL API from Python.
Name | Description |
---|---|
DagsterGraphQLClient Experimental | The client class to interact with Dagster's GraphQL API from Python. |
DagsterGraphQLClientError | The exception that the client raises upon a response error. |
The Dagster Python Client provides bindings in Python to programmatically interact with Dagster's GraphQL API.
When is this useful? Dagster exposes a powerful GraphQL API, but this level of flexibility is not always necessary. For example, when submitting a new pipeline run, you may only want to think about the pipeline name and configuration and to think less about maintaining a long GraphQL query.
DagsterGraphQLClient
provides a way to solve this issue by providing a module with a simple interface to interact with the GraphQL API.
Note that all GraphQL methods on the API are not yet available in Python - the DagsterGraphQLClient
currently only provides the following methods:
DagsterGraphQLClient.submit_pipeline_execution
DagsterGraphQLClient.get_run_status
DagsterGraphQLClient.reload_repository_location
The snippet below shows example instantiation of the client:
from dagster_graphql import DagsterGraphQLClient
client = DagsterGraphQLClient("localhost", port_number=3000)
You can use the client to get the status of a pipeline run as follows:
from dagster_graphql import DagsterGraphQLClientError
from dagster.core.storage.pipeline_run import PipelineRunStatus
try:
status: PipelineRunStatus = client.get_run_status(RUN_ID)
if status == PipelineRunStatus.SUCCESS:
do_something_on_success()
else:
do_something_else()
except DagsterGraphQLClientError as exc:
do_something_with_exc(exc)
raise exc
You can also reload a repository location in a Dagster deployment.
This reloads all repositories in that repository location. This is useful in a variety of contexts, including refreshing Dagit without restarting the server. Example usage is as follows:
from dagster_graphql import (
ReloadRepositoryLocationInfo,
ReloadRepositoryLocationStatus,
)
reload_info: ReloadRepositoryLocationInfo = client.reload_repository_location(REPO_NAME)
if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:
do_something_on_success()
else:
raise Exception(
"Repository location reload failed because of a "
f"{reload_info.failure_type} error: {reload_info.message}"
)
You can use the client to submit a pipeline run as follows:
from dagster_graphql import DagsterGraphQLClientError
try:
new_run_id: str = client.submit_pipeline_execution(
PIPELINE_NAME,
repository_location_name=REPO_LOCATION_NAME,
repository_name=REPO_NAME,
run_config={},
mode="default",
)
do_something_on_success(new_run_id)
except DagsterGraphQLClientError as exc:
do_something_with_exc(exc)
raise exc
You can also submit a pipeline from a preset with the client:
from dagster_graphql import DagsterGraphQLClientError
try:
new_run_id: str = client.submit_pipeline_execution(
PIPELINE_NAME,
repository_location_name=REPO_LOCATION_NAME,
repository_name=REPO_NAME,
preset=PRESET_NAME,
)
do_something_on_success(new_run_id)
except DagsterGraphQLClientError as exc:
do_something_with_exc(exc)
raise exc
Note that specifying the repository location name and repository name are not always necessary; the GraphQL client will infer the repository name and repository location name if the pipeline name is unique.
from dagster_graphql import DagsterGraphQLClientError
try:
new_run_id: str = client.submit_pipeline_execution(
PIPELINE_NAME,
run_config={},
mode="default",
)
do_something_on_success(new_run_id)
except DagsterGraphQLClientError as exc:
do_something_with_exc(exc)
raise exc