DagsterDocs

Source code for dagster.core.storage.event_log.base

import warnings
from abc import ABC, abstractmethod, abstractproperty
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union

from dagster.core.definitions.events import AssetKey
from dagster.core.events import DagsterEventType
from dagster.core.events.log import EventRecord
from dagster.core.execution.stats import (
    RunStepKeyStatsSnapshot,
    build_run_stats_from_events,
    build_run_step_stats_from_events,
)
from dagster.core.instance import MayHaveInstanceWeakref
from dagster.core.storage.pipeline_run import PipelineRunStatsSnapshot


[docs]class EventLogStorage(ABC, MayHaveInstanceWeakref): """Abstract base class for storing structured event logs from pipeline runs. Note that event log storages using SQL databases as backing stores should implement :py:class:`~dagster.core.storage.event_log.SqlEventLogStorage`. Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when ``dagit`` and ``dagster-graphql`` load, based on the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME``. Configuration of concrete subclasses of this class should be done by setting values in that file. """ @abstractmethod def get_logs_for_run( self, run_id: str, cursor: Optional[int] = -1, of_type: Optional[DagsterEventType] = None, ) -> Iterable[EventRecord]: """Get all of the logs corresponding to a run. Args: run_id (str): The id of the run for which to fetch logs. cursor (Optional[int]): Zero-indexed logs will be returned starting from cursor + 1, i.e., if cursor is -1, all logs will be returned. (default: -1) of_type (Optional[DagsterEventType]): the dagster event type to filter the logs. """ def get_stats_for_run(self, run_id: str) -> PipelineRunStatsSnapshot: """Get a summary of events that have ocurred in a run.""" return build_run_stats_from_events(run_id, self.get_logs_for_run(run_id)) def get_step_stats_for_run(self, run_id: str, step_keys=None) -> List[RunStepKeyStatsSnapshot]: """Get per-step stats for a pipeline run.""" logs = self.get_logs_for_run(run_id) if step_keys: logs = [ event for event in logs if event.is_dagster_event and event.get_dagster_event().step_key in step_keys ] return build_run_step_stats_from_events(run_id, logs) @abstractmethod def store_event(self, event: EventRecord): """Store an event corresponding to a pipeline run. Args: event (EventRecord): The event to store. """ @abstractmethod def delete_events(self, run_id: str): """Remove events for a given run id""" @abstractmethod def upgrade(self): """This method should perform any schema migrations necessary to bring an out-of-date instance of the storage up to date. """ @abstractmethod def reindex(self, print_fn: Callable = lambda _: None, force: bool = False): """Call this method to run any data migrations, reindexing to build summary tables.""" @abstractmethod def wipe(self): """Clear the log storage.""" @abstractmethod def watch(self, run_id: str, start_cursor: int, callback: Callable): """Call this method to start watching.""" @abstractmethod def end_watch(self, run_id: str, handler: Callable): """Call this method to stop watching.""" @abstractproperty def is_persistent(self) -> bool: """bool: Whether the storage is persistent.""" def dispose(self): """Explicit lifecycle management.""" def optimize_for_dagit(self, statement_timeout: int): """Allows for optimizing database connection / use in the context of a long lived dagit process""" @abstractmethod def has_asset_key(self, asset_key: AssetKey) -> bool: pass @abstractmethod def all_asset_keys(self) -> Iterable[AssetKey]: pass @abstractmethod def all_asset_tags(self) -> Dict[AssetKey, Dict[str, str]]: pass @abstractmethod def get_asset_tags(self, asset_key: AssetKey) -> Dict[str, str]: pass @abstractmethod def get_asset_events( self, asset_key: AssetKey, partitions: List[str] = None, before_cursor: int = None, after_cursor: int = None, limit: int = None, ascending: bool = False, include_cursor: bool = False, before_timestamp=None, cursor: int = None, # deprecated ) -> Union[Iterable[EventRecord], Iterable[Tuple[int, EventRecord]]]: pass @abstractmethod def get_asset_run_ids(self, asset_key: AssetKey) -> Iterable[str]: pass @abstractmethod def wipe_asset(self, asset_key: AssetKey): """Remove asset index history from event log for given asset_key"""
def extract_asset_events_cursor(cursor, before_cursor, after_cursor, ascending): if cursor: warnings.warn( "Parameter `cursor` is a deprecated for `get_asset_events`. Use `before_cursor` or `after_cursor` instead" ) if ascending and after_cursor is None: after_cursor = cursor if not ascending and before_cursor is None: before_cursor = cursor if after_cursor is not None: try: after_cursor = int(after_cursor) except ValueError: after_cursor = None if before_cursor is not None: try: before_cursor = int(before_cursor) except ValueError: before_cursor = None return before_cursor, after_cursor