databricks.lakehouse_monitoring package

Functions

databricks.lakehouse_monitoring.cancel_refresh(*, table_name: str, refresh_id: str) None

Cancel a specific refresh.

Parameters
  • table_name – Name of the monitored table.

  • refresh_id – The id of the refresh.

Returns

None

Raises

LakehouseMonitoringError – if the job is in a terminal state (e.g., failed, succeeded, etc.) or the refresh/monitor does not exist.

databricks.lakehouse_monitoring.create_monitor(*, table_name: str, profile_type: Union[InferenceLog, TimeSeries, Snapshot], output_schema_name: str, baseline_table_name: Optional[str] = None, slicing_exprs: Optional[Iterable[str]] = None, custom_metrics: Optional[Iterable[Metric]] = None, schedule: Optional[MonitorCronSchedule] = None, assets_dir: Optional[str] = None, skip_builtin_dashboard: bool = False, warehouse_id: Optional[str] = None) MonitorInfo

Create a monitor asynchronously on a table (or view).

Parameters
  • table_name

    Name of table (or view) in the Unity Catalog to be monitored. The name can take the following formats:

    • "{catalog}.{schema}.{table}", or

    • "{schema}.{table}", or

    • "{table}".

    If the catalog or schema are not included, they will be inferred as the current catalog and schema name.

  • profile_type – A databricks.lakehouse_monitoring.InferenceLog or databricks.lakehouse_monitoring.TimeSeries or databricks.lakehouse_monitoring.Snapshot object which determines the type of metrics to be computed/monitored.

  • output_schema_name – Name of the schema in which to create output tables. This can be in either "{schema}" or "{catalog}.{schema}" format.

  • baseline_table_name – Name of a table (or view) containing baseline data for comparison. The baseline table is expected to match the schema of the monitored table (except for timestamps where applicable). If columns are missing on either side then monitoring will use best-effort heuristics to compute the output metrics.

  • slicing_exprs – List of column expressions to slice data with for targeted analysis. The data is grouped by each expression independently, resulting in a separate slice for each predicate and its complements. For example slicing_exprs=["col_1", "col_2 > 10"] will generate the following slices: two slices for col_2 > 10 (True and False), and one slice per unique value in col_1.

  • custom_metrics – A list of databricks.lakehouse_monitoring.Metric instances, representing custom metrics that will be computed and written to the output tables.

  • schedule – A databricks.lakehouse_monitoring.MonitorCronSchedule object that dictates the frequency at which the monitor refreshes its metrics.

  • assets_dir

    The absolute path to user-configurable workspace directory for storing monitoring assets. If provided, the assets will be created under:

    {assets_dir}/{table_name}

    Otherwise, the assets will be stored under the default directory:

    /Users/{user_name}/databricks_lakehouse_monitoring/{table_name}

    Note that this directory can exist anywhere, including /Shared/ or other locations outside of the /Users/ directory; this can be useful when configuring a production monitor intended to be shared within an organization.

  • skip_builtin_dashboard – If True then the built-in dashboard will not be created.

  • warehouse_id – The ID of the warehouse to use for dashboard creation. If no ID is provided, the first running warehouse will be used. Cannot be set if skip_builtin_dasbboard is set to True.

Returns

The info of the pending monitor.

Return type

databricks.lakehouse_monitoring.MonitorInfo

Raises
  • LakehouseMonitoringError – Table is not in UC

  • LakehouseMonitoringError – Monitor already exists on table

  • LakehouseMonitoringError – Input arguments fail validation

databricks.lakehouse_monitoring.delete_monitor(*, table_name: str) None

Delete a monitor on the given table.

Note

The monitor assets (profile tables and dashboard) are not deleted by this method.

Parameters

table_name – The name of the monitored table.

Returns

None

Raises

LakehouseMonitoringError – The monitor does not exist.

databricks.lakehouse_monitoring.get_monitor(*, table_name: str) MonitorInfo

Retrieve info for a monitor.

Parameters

table_name – The name of the monitored table.

Returns

Information about the monitor.

Return type

databricks.lakehouse_monitoring.MonitorInfo

Raises

LakehouseMonitoringError – Monitor does not exist or if an error occurs while reading the information from the Lakehouse Monitoring service

databricks.lakehouse_monitoring.get_refresh(*, table_name: str, refresh_id: str) RefreshInfo

Retrieve information for a specific refresh.

Parameters
  • table_name – Name of the monitored table.

  • refresh_id – The id of the refresh.

Returns

Information about the refresh.

Return type

databricks.lakehouse_monitoring.RefreshInfo

Raises

LakehouseMonitoringError – if there is an error retrieving the information

databricks.lakehouse_monitoring.list_refreshes(*, table_name: str) List[RefreshInfo]

Retrieve information on the most recent refreshes.

Parameters

table_name – Name of the monitored table.

Returns

Information about the refresh.

Return type

List[databricks.lakehouse_monitoring.RefreshInfo]

Raises

LakehouseMonitoringError – if there is an error retrieving the information

databricks.lakehouse_monitoring.run_refresh(*, table_name: str) RefreshInfo

Queue a metric refresh for the input table. The refresh will execute in the background.

Parameters

table_name – Name of the monitored table.

Returns

Information for the refresh operation.

Return type

databricks.lakehouse_monitoring.RefreshInfo

databricks.lakehouse_monitoring.update_monitor(*, table_name: str, updated_params: Mapping[str, Any]) MonitorInfo

Update the parameters of an existing monitor on a table.

Note

Monitoring metrics will reflect the new configuration at the next time that a refresh runs. See databricks.lakehouse_monitoring.run_refresh() if you want to manually trigger a refresh.

Parameters
  • table_name – Name of monitored table

  • updated_params

    Dict of updated parameters and their new values. Any parameter set to None will be cleared in the configuration. The following parameters are recognized:

    • 'profile_type': one of databricks.lakehouse_monitoring.InferenceLog or databricks.lakehouse_monitoring.TimeSeries or databricks.lakehouse_monitoring.Snapshot

    • 'baseline_table_name': a non-empty table name or None

    • 'slicing_exprs': a non-empty Iterable[str] or None

    • 'custom_metrics': a non-empty Iterable of databricks.lakehouse_monitoring.Metric or None

    • 'schedule': a non-empty databricks.lakehouse_monitoring.MonitorCronSchedule or None.

    • 'output_schema_name': the two layer name of the schema in which to store the output metric tables.

    • 'dashboard_id': A string with the id of the dashboard that should be associated with the monitor.

Returns

Updated information about the monitor.

Return type

databricks.lakehouse_monitoring.MonitorInfo

Raises
  • LakehouseMonitoringError – Monitor does not exist

  • LakehouseMonitoringError – Caller does not have permissions to update

  • LakehouseMonitoringError – The parameters are invalid

Classes

class databricks.lakehouse_monitoring.InferenceLog(timestamp_col: str, granularities: List[str], model_id_col: str, problem_type: str, prediction_col: str, label_col: Optional[str] = None, prediction_proba_col: Optional[str] = None)

Bases: object

Analysis type spec for ML inference data.

granularities: List[str]

Granularities for aggregating data into time windows based on their timestamp. Currently the following static granularities are supported: {"5 minutes", "30 minutes", "1 hour", "1 day", "<n> week(s)", "1 month", "1 year"}.

label_col: Optional[str] = None

Optional column that contains the ground-truth for the prediction.

model_id_col: str

Column that contains the id of the model generating the predictions. Metrics will be computed per model id by default, and also across all model ids.

prediction_col: str

Column that contains the output/prediction from the model.

prediction_proba_col: Optional[str] = None

Optional column that contains the prediction probabilities for each class in a classification problem type. The values in this column should be a map, mapping each class label to the prediction probability for a given sample. The map should be of PySpark MapType().

problem_type: str

One of "regression" or "classification". Determines the type of model-quality metrics that will be computed.

timestamp_col: str

Column that contains the timestamps of requests. The column must be one of the following:

  • A TimestampType column

  • A column whose values can be converted to timestamps through the pyspark to_timestamp function.

class databricks.lakehouse_monitoring.Metric(type: str, name: str, input_columns: Union[List[str], str], definition: str, output_data_type: DataType)

Bases: object

Definition of a custom metric. Examples:

  • An aggregate metric that only applies to column f1. It computes the avg of the log of the target column:

    >>> agg_metric = Metric(
        type="aggregate",
        name="avg_log",
        input_columns=["f1"],
        definition="avg(log(`{{input_column}}`))",
        output_data_type=T.DoubleType())
    
  • An aggregate metric that is calculated using multiple columns. It computes the avg of the difference between columns f1 and f2:

    >>> agg_metric = Metric(
        type="aggregate",
        name="avg_diff_f1_f2",
        input_columns=[":table"],
        definition="avg(f1 - f2)",
        output_data_type=T.DoubleType())
    
  • A derived metric that depends on metric avg_log. It computes the exp exponential of the avg_log metric:

    >>> derived_metric = Metric(
        type="derived",
        name="exp_avg_log",
        input_columns=["f1"],
        definition="exp(avg_log)",
        output_data_type=T.DoubleType())
    
  • A drift metric that depends on metric avg_log. It computes the diff between the avg_log metrics across baseline and input table, and across the two consecutive time windows:

    >>> drift_metric = Metric(
        type="drift",
        name="avg_log_delta",
        input_columns=["f1"],
        definition="{{current_df}}.avg_log - {{base_df}}.avg_log",
        output_data_type=T.DoubleType())
    
definition: str

A SQL expression jinja template that defines the computation of the metric. Supported template parameters:

  • {{input_column}} : name of the column on which the metric is computed

  • {{prediction_col}} : name of the ML model prediction column

  • {{label_col}} : name of the ML ground truth label column

  • {{current_df}} / {{base_df}}Alias of the datasets being compared for drift metrics.

    For BASELINE drift, {{base_df}} will represent the baseline data; for CONSECUTIVE drift, {{base_df}} will represent data from the previous time window.

input_columns: Union[List[str], str]

A list of column names in the input table the metric should be computed for. Can use ":table" to indicate that the metric needs information from multiple columns.

name: str

Name of the metric in the output tables

output_data_type: pyspark.sql.types.DataType

Spark datatype of the metric output.

type: str

Can only be one of "aggregate", "derived", or "drift". The "aggregate" and "derived" metrics are computed on a single table, whereas the "drift" compare metrics across baseline and input table, or across the two consecutive time windows.

  • aggregate metrics: only depend on the existing columns in your table

  • derived metrics: depend on previously computed aggregate metrics

  • drift metrics: depend on previously computed aggregate or derived metrics

class databricks.lakehouse_monitoring.MonitorCronSchedule(quartz_cron_expression: str, timezone_id: str)

Bases: object

The refresh schedule for the monitor.

quartz_cron_expression: str

The expresssion that determines when to run the monitor. See examples

timezone_id: str

The timezone id (e.g., "PST") in which to evaluate the quartz expression.

class databricks.lakehouse_monitoring.MonitorInfo(table_name: str, profile_type: Union[InferenceLog, TimeSeries, Snapshot], output_schema_name: Optional[str], status: MonitorStatus, profile_metrics_table_name: str, drift_metrics_table_name: str, monitor_version: int, baseline_table_name: Optional[str] = None, schedule: Optional[MonitorCronSchedule] = None, slicing_exprs: Optional[List[str]] = None, custom_metrics: Optional[List[metrics.Metric]] = None, assets_dir: Optional[str] = None, dashboard_id: Optional[str] = None)

Bases: object

All monitor information, i.e. the configuration, assets, and status of the monitor.

assets_dir: Optional[str] = None

The workspace directory that stores monitoring assets, including the dashboard and its associated queries.

baseline_table_name: Optional[str] = None

Baseline table against which to compare the input table. See databricks.lakehouse_monitoring.create_monitor() for further details.

custom_metrics: Optional[List[databricks.data_monitoring.metrics.Metric]] = None

Custom metrics that are computed in addition to the built-in metrics. See databricks.lakehouse_monitoring.Metric for further details.

dashboard_id: Optional[str] = None

Id of dashboard that visualizes the computed metrics. This can be empty if the monitor is in PENDING state.

drift_metrics_table_name: str

Name of table holding the drift metrics.

monitor_version: int

Version number of the monitor configuration used in the refresh pipeline.

output_schema_name: Optional[str]

Name of the schema in which to store the output metric tables. This may be None if the output schema is inaccessible after the monitor creation.

profile_metrics_table_name: str

Name of table holding the profile metrics.

profile_type: Union[databricks.data_monitoring.analysis.InferenceLog, databricks.data_monitoring.analysis.TimeSeries, databricks.data_monitoring.analysis.Snapshot]

The type of profiling done on the table. This determines the metrics computed by monitoring. See databricks.lakehouse_monitoring.create_monitor() for further details.

schedule: Optional[databricks.data_monitoring.metadata.MonitorCronSchedule] = None

Schedule on which to run refreshes on the monitor.

slicing_exprs: Optional[List[str]] = None

Expressions that determine data slices on which to compute metrics. See databricks.lakehouse_monitoring.create_monitor() for further details.

status: databricks.data_monitoring.metadata.MonitorStatus

Status of the monitor.

table_name: str

Name of monitored table.

class databricks.lakehouse_monitoring.MonitorStatus(value)

Bases: str, Enum

Tracks the status of an existing monitor

ACTIVE = 'MONITOR_STATUS_ACTIVE'

The monitor is active and works as expected

DELETE_PENDING = 'MONITOR_STATUS_DELETE_PENDING'

The monitor is being deleted. Its state might be inconsistent and cannot be used

ERROR = 'MONITOR_STATUS_ERROR'

An operation on the monitor has resulted in a recoverable error. Details for the error can be found in the monitor information (see databricks.lakehouse_monitoring.get_monitor()). The user may be able to fix the error by updating the monitor.

FAILED = 'MONITOR_STATUS_FAILED'

An operation on the monitor has resulted in a non-recoverable error. Details can be found in the monitor information (see databricks.lakehouse_monitoring.get_monitor()). The user can only delete the monitor and re-create it if needed.

PENDING = 'MONITOR_STATUS_PENDING'

The monitor is being updated. Its state might be inconsistent and cannot be used

class databricks.lakehouse_monitoring.RefreshInfo(refresh_id: str, state: RefreshState, start_time_ms: int, end_time_ms: Optional[int] = None, message: Optional[str] = None)

Bases: object

Information for a refresh operation of the output metrics.

end_time_ms: Optional[int] = None

Time at which refresh operation completed (milliseconds since 1/1/1970 UTC).

message: Optional[str] = None

An optional message that provides more information about the current status of the refresh.

refresh_id: str

Unique id of the refresh operation.

start_time_ms: int

Time at which refresh operation was initiated (milliseconds since 1/1/1970 UTC).

state: databricks.data_monitoring.metadata.RefreshState

State (databricks.lakehouse_monitoring.RefreshState) of the refresh operation.

class databricks.lakehouse_monitoring.RefreshState(value)

Bases: str, Enum

Different states for a refresh operation

CANCELED = 'CANCELED'

The refresh was canceled. This is a terminal state.

FAILED = 'FAILED'

An error occurred during refresh. This is a terminal state.

PENDING = 'PENDING'

The refresh has been initiated, but is waiting to run.

RUNNING = 'RUNNING'

The refresh is in progress.

SUCCESS = 'SUCCESS'

The refresh finished successfully without any errors. This is a terminal state.

class databricks.lakehouse_monitoring.Snapshot

Bases: object

Snapshot-based analysis type spec for generic data.

class databricks.lakehouse_monitoring.TimeSeries(timestamp_col: str, granularities: List[str])

Bases: object

Analysis type spec for time series data.

granularities: List[str]

Granularities for aggregating data into time windows based on their timestamp. Currently the following static granularities are supported: {"5 minutes", "30 minutes", "1 hour", "1 day", "<n> week(s)", "1 month", "1 year"}.

timestamp_col: str

Column that contains the timestamp of each row. The column must be one of the following:

  • A TimestampType column, or

  • A column whose values can be converted to timestamps through the pyspark to_timestamp function.

Errors

exception databricks.lakehouse_monitoring.LakehouseMonitoringError

Bases: Exception

An error from the Lakehouse Monitoring service.