databricks.lakehouse_monitoring package
Functions
- databricks.lakehouse_monitoring.cancel_refresh(*, table_name: str, refresh_id: str) None
Cancel a specific refresh.
- Parameters
table_name – Name of the monitored table.
refresh_id – The id of the refresh.
- Returns
None
- Raises
LakehouseMonitoringError – if the job is in a terminal state (e.g., failed, succeeded, etc.) or the refresh/monitor does not exist.
- databricks.lakehouse_monitoring.create_monitor(*, table_name: str, profile_type: Union[InferenceLog, TimeSeries, Snapshot], output_schema_name: str, baseline_table_name: Optional[str] = None, slicing_exprs: Optional[Iterable[str]] = None, custom_metrics: Optional[Iterable[Metric]] = None, schedule: Optional[MonitorCronSchedule] = None, assets_dir: Optional[str] = None, skip_builtin_dashboard: bool = False, warehouse_id: Optional[str] = None) MonitorInfo
Create a monitor asynchronously on a table (or view).
- Parameters
table_name –
Name of table (or view) in the Unity Catalog to be monitored. The name can take the following formats:
"{catalog}.{schema}.{table}"
, or"{schema}.{table}"
, or"{table}"
.
If the catalog or schema are not included, they will be inferred as the current catalog and schema name.
profile_type – A
databricks.lakehouse_monitoring.InferenceLog
ordatabricks.lakehouse_monitoring.TimeSeries
ordatabricks.lakehouse_monitoring.Snapshot
object which determines the type of metrics to be computed/monitored.output_schema_name – Name of the schema in which to create output tables. This can be in either
"{schema}"
or"{catalog}.{schema}"
format.baseline_table_name – Name of a table (or view) containing baseline data for comparison. The baseline table is expected to match the schema of the monitored table (except for timestamps where applicable). If columns are missing on either side then monitoring will use best-effort heuristics to compute the output metrics.
slicing_exprs – List of column expressions to slice data with for targeted analysis. The data is grouped by each expression independently, resulting in a separate slice for each predicate and its complements. For example
slicing_exprs=["col_1", "col_2 > 10"]
will generate the following slices: two slices forcol_2 > 10
(True and False), and one slice per unique value incol_1
.custom_metrics – A list of
databricks.lakehouse_monitoring.Metric
instances, representing custom metrics that will be computed and written to the output tables.schedule – A
databricks.lakehouse_monitoring.MonitorCronSchedule
object that dictates the frequency at which the monitor refreshes its metrics.assets_dir –
The absolute path to user-configurable workspace directory for storing monitoring assets. If provided, the assets will be created under:
{assets_dir}/{table_name}
Otherwise, the assets will be stored under the default directory:
/Users/{user_name}/databricks_lakehouse_monitoring/{table_name}
Note that this directory can exist anywhere, including
/Shared/
or other locations outside of the/Users/
directory; this can be useful when configuring a production monitor intended to be shared within an organization.skip_builtin_dashboard – If True then the built-in dashboard will not be created.
warehouse_id – The ID of the warehouse to use for dashboard creation. If no ID is provided, the first running warehouse will be used. Cannot be set if
skip_builtin_dasbboard
is set to True.
- Returns
The info of the pending monitor.
- Return type
databricks.lakehouse_monitoring.MonitorInfo
- Raises
LakehouseMonitoringError – Table is not in UC
LakehouseMonitoringError – Monitor already exists on table
LakehouseMonitoringError – Input arguments fail validation
- databricks.lakehouse_monitoring.delete_monitor(*, table_name: str) None
Delete a monitor on the given table.
Note
The monitor assets (profile tables and dashboard) are not deleted by this method.
- Parameters
table_name – The name of the monitored table.
- Returns
None
- Raises
LakehouseMonitoringError – The monitor does not exist.
- databricks.lakehouse_monitoring.get_monitor(*, table_name: str) MonitorInfo
Retrieve info for a monitor.
- Parameters
table_name – The name of the monitored table.
- Returns
Information about the monitor.
- Return type
databricks.lakehouse_monitoring.MonitorInfo
- Raises
LakehouseMonitoringError – Monitor does not exist or if an error occurs while reading the information from the Lakehouse Monitoring service
- databricks.lakehouse_monitoring.get_refresh(*, table_name: str, refresh_id: str) RefreshInfo
Retrieve information for a specific refresh.
- Parameters
table_name – Name of the monitored table.
refresh_id – The id of the refresh.
- Returns
Information about the refresh.
- Return type
databricks.lakehouse_monitoring.RefreshInfo
- Raises
LakehouseMonitoringError – if there is an error retrieving the information
- databricks.lakehouse_monitoring.list_refreshes(*, table_name: str) List[RefreshInfo]
Retrieve information on the most recent refreshes.
- Parameters
table_name – Name of the monitored table.
- Returns
Information about the refresh.
- Return type
List[
databricks.lakehouse_monitoring.RefreshInfo
]- Raises
LakehouseMonitoringError – if there is an error retrieving the information
- databricks.lakehouse_monitoring.run_refresh(*, table_name: str) RefreshInfo
Queue a metric refresh for the input table. The refresh will execute in the background.
- Parameters
table_name – Name of the monitored table.
- Returns
Information for the refresh operation.
- Return type
databricks.lakehouse_monitoring.RefreshInfo
- databricks.lakehouse_monitoring.update_monitor(*, table_name: str, updated_params: Mapping[str, Any]) MonitorInfo
Update the parameters of an existing monitor on a table.
Note
Monitoring metrics will reflect the new configuration at the next time that a refresh runs. See
databricks.lakehouse_monitoring.run_refresh()
if you want to manually trigger a refresh.- Parameters
table_name – Name of monitored table
updated_params –
Dict of updated parameters and their new values. Any parameter set to
None
will be cleared in the configuration. The following parameters are recognized:'profile_type'
: one ofdatabricks.lakehouse_monitoring.InferenceLog
ordatabricks.lakehouse_monitoring.TimeSeries
ordatabricks.lakehouse_monitoring.Snapshot
'baseline_table_name'
: a non-empty table name orNone
'slicing_exprs'
: a non-emptyIterable[str]
orNone
'custom_metrics'
: a non-emptyIterable
ofdatabricks.lakehouse_monitoring.Metric
orNone
'schedule'
: a non-emptydatabricks.lakehouse_monitoring.MonitorCronSchedule
orNone
.'output_schema_name'
: the two layer name of the schema in which to store the output metric tables.'dashboard_id'
: A string with the id of the dashboard that should be associated with the monitor.
- Returns
Updated information about the monitor.
- Return type
databricks.lakehouse_monitoring.MonitorInfo
- Raises
LakehouseMonitoringError – Monitor does not exist
LakehouseMonitoringError – Caller does not have permissions to update
LakehouseMonitoringError – The parameters are invalid
Classes
- class databricks.lakehouse_monitoring.InferenceLog(timestamp_col: str, granularities: List[str], model_id_col: str, problem_type: str, prediction_col: str, label_col: Optional[str] = None, prediction_proba_col: Optional[str] = None)
Bases:
object
Analysis type spec for ML inference data.
- granularities: List[str]
Granularities for aggregating data into time windows based on their timestamp. Currently the following static granularities are supported: {
"5 minutes"
,"30 minutes"
,"1 hour"
,"1 day"
,"<n> week(s)"
,"1 month"
,"1 year"
}.
- label_col: Optional[str] = None
Optional column that contains the ground-truth for the prediction.
- model_id_col: str
Column that contains the id of the model generating the predictions. Metrics will be computed per model id by default, and also across all model ids.
- prediction_col: str
Column that contains the output/prediction from the model.
- prediction_proba_col: Optional[str] = None
Optional column that contains the prediction probabilities for each class in a classification problem type. The values in this column should be a map, mapping each class label to the prediction probability for a given sample. The map should be of PySpark MapType().
- problem_type: str
One of
"regression"
or"classification"
. Determines the type of model-quality metrics that will be computed.
- timestamp_col: str
Column that contains the timestamps of requests. The column must be one of the following:
A
TimestampType
columnA column whose values can be converted to timestamps through the pyspark
to_timestamp
function.
- class databricks.lakehouse_monitoring.Metric(type: str, name: str, input_columns: Union[List[str], str], definition: str, output_data_type: DataType)
Bases:
object
Definition of a custom metric. Examples:
An aggregate metric that only applies to column
f1
. It computes the avg of the log of the target column:>>> agg_metric = Metric( type="aggregate", name="avg_log", input_columns=["f1"], definition="avg(log(`{{input_column}}`))", output_data_type=T.DoubleType())
An aggregate metric that is calculated using multiple columns. It computes the avg of the difference between columns
f1
andf2
:>>> agg_metric = Metric( type="aggregate", name="avg_diff_f1_f2", input_columns=[":table"], definition="avg(f1 - f2)", output_data_type=T.DoubleType())
A derived metric that depends on metric
avg_log
. It computes the exp exponential of theavg_log
metric:>>> derived_metric = Metric( type="derived", name="exp_avg_log", input_columns=["f1"], definition="exp(avg_log)", output_data_type=T.DoubleType())
A drift metric that depends on metric
avg_log
. It computes the diff between theavg_log
metrics across baseline and input table, and across the two consecutive time windows:>>> drift_metric = Metric( type="drift", name="avg_log_delta", input_columns=["f1"], definition="{{current_df}}.avg_log - {{base_df}}.avg_log", output_data_type=T.DoubleType())
- definition: str
A SQL expression jinja template that defines the computation of the metric. Supported template parameters:
{{input_column}}
: name of the column on which the metric is computed{{prediction_col}}
: name of the ML model prediction column{{label_col}}
: name of the ML ground truth label column{{current_df}}
/{{base_df}}
Alias of the datasets being compared for drift metrics.For BASELINE drift,
{{base_df}}
will represent the baseline data; for CONSECUTIVE drift,{{base_df}}
will represent data from the previous time window.
- input_columns: Union[List[str], str]
A list of column names in the input table the metric should be computed for. Can use
":table"
to indicate that the metric needs information from multiple columns.
- name: str
Name of the metric in the output tables
- output_data_type: pyspark.sql.types.DataType
Spark datatype of the metric output.
- type: str
Can only be one of
"aggregate"
,"derived"
, or"drift"
. The"aggregate"
and"derived"
metrics are computed on a single table, whereas the"drift"
compare metrics across baseline and input table, or across the two consecutive time windows.aggregate metrics: only depend on the existing columns in your table
derived metrics: depend on previously computed aggregate metrics
drift metrics: depend on previously computed aggregate or derived metrics
- class databricks.lakehouse_monitoring.MonitorCronSchedule(quartz_cron_expression: str, timezone_id: str)
Bases:
object
The refresh schedule for the monitor.
- quartz_cron_expression: str
The expresssion that determines when to run the monitor. See examples
- timezone_id: str
The timezone id (e.g.,
"PST"
) in which to evaluate the quartz expression.
- class databricks.lakehouse_monitoring.MonitorInfo(table_name: str, profile_type: Union[InferenceLog, TimeSeries, Snapshot], output_schema_name: Optional[str], status: MonitorStatus, profile_metrics_table_name: str, drift_metrics_table_name: str, monitor_version: int, baseline_table_name: Optional[str] = None, schedule: Optional[MonitorCronSchedule] = None, slicing_exprs: Optional[List[str]] = None, custom_metrics: Optional[List[metrics.Metric]] = None, assets_dir: Optional[str] = None, dashboard_id: Optional[str] = None)
Bases:
object
All monitor information, i.e. the configuration, assets, and status of the monitor.
- assets_dir: Optional[str] = None
The workspace directory that stores monitoring assets, including the dashboard and its associated queries.
- baseline_table_name: Optional[str] = None
Baseline table against which to compare the input table. See
databricks.lakehouse_monitoring.create_monitor()
for further details.
- custom_metrics: Optional[List[databricks.data_monitoring.metrics.Metric]] = None
Custom metrics that are computed in addition to the built-in metrics. See
databricks.lakehouse_monitoring.Metric
for further details.
- dashboard_id: Optional[str] = None
Id of dashboard that visualizes the computed metrics. This can be empty if the monitor is in PENDING state.
- drift_metrics_table_name: str
Name of table holding the drift metrics.
- monitor_version: int
Version number of the monitor configuration used in the refresh pipeline.
- output_schema_name: Optional[str]
Name of the schema in which to store the output metric tables. This may be None if the output schema is inaccessible after the monitor creation.
- profile_metrics_table_name: str
Name of table holding the profile metrics.
- profile_type: Union[databricks.data_monitoring.analysis.InferenceLog, databricks.data_monitoring.analysis.TimeSeries, databricks.data_monitoring.analysis.Snapshot]
The type of profiling done on the table. This determines the metrics computed by monitoring. See
databricks.lakehouse_monitoring.create_monitor()
for further details.
- schedule: Optional[databricks.data_monitoring.metadata.MonitorCronSchedule] = None
Schedule on which to run refreshes on the monitor.
- slicing_exprs: Optional[List[str]] = None
Expressions that determine data slices on which to compute metrics. See
databricks.lakehouse_monitoring.create_monitor()
for further details.
- status: databricks.data_monitoring.metadata.MonitorStatus
Status of the monitor.
- table_name: str
Name of monitored table.
- class databricks.lakehouse_monitoring.MonitorStatus(value)
Bases:
str
,Enum
Tracks the status of an existing monitor
- ACTIVE = 'MONITOR_STATUS_ACTIVE'
The monitor is active and works as expected
- DELETE_PENDING = 'MONITOR_STATUS_DELETE_PENDING'
The monitor is being deleted. Its state might be inconsistent and cannot be used
- ERROR = 'MONITOR_STATUS_ERROR'
An operation on the monitor has resulted in a recoverable error. Details for the error can be found in the monitor information (see
databricks.lakehouse_monitoring.get_monitor()
). The user may be able to fix the error by updating the monitor.
- FAILED = 'MONITOR_STATUS_FAILED'
An operation on the monitor has resulted in a non-recoverable error. Details can be found in the monitor information (see
databricks.lakehouse_monitoring.get_monitor()
). The user can only delete the monitor and re-create it if needed.
- PENDING = 'MONITOR_STATUS_PENDING'
The monitor is being updated. Its state might be inconsistent and cannot be used
- class databricks.lakehouse_monitoring.RefreshInfo(refresh_id: str, state: RefreshState, start_time_ms: int, end_time_ms: Optional[int] = None, message: Optional[str] = None)
Bases:
object
Information for a refresh operation of the output metrics.
- end_time_ms: Optional[int] = None
Time at which refresh operation completed (milliseconds since 1/1/1970 UTC).
- message: Optional[str] = None
An optional message that provides more information about the current status of the refresh.
- refresh_id: str
Unique id of the refresh operation.
- start_time_ms: int
Time at which refresh operation was initiated (milliseconds since 1/1/1970 UTC).
- state: databricks.data_monitoring.metadata.RefreshState
State (
databricks.lakehouse_monitoring.RefreshState
) of the refresh operation.
- class databricks.lakehouse_monitoring.RefreshState(value)
Bases:
str
,Enum
Different states for a refresh operation
- CANCELED = 'CANCELED'
The refresh was canceled. This is a terminal state.
- FAILED = 'FAILED'
An error occurred during refresh. This is a terminal state.
- PENDING = 'PENDING'
The refresh has been initiated, but is waiting to run.
- RUNNING = 'RUNNING'
The refresh is in progress.
- SUCCESS = 'SUCCESS'
The refresh finished successfully without any errors. This is a terminal state.
- class databricks.lakehouse_monitoring.Snapshot
Bases:
object
Snapshot-based analysis type spec for generic data.
- class databricks.lakehouse_monitoring.TimeSeries(timestamp_col: str, granularities: List[str])
Bases:
object
Analysis type spec for time series data.
- granularities: List[str]
Granularities for aggregating data into time windows based on their timestamp. Currently the following static granularities are supported: {
"5 minutes"
,"30 minutes"
,"1 hour"
,"1 day"
,"<n> week(s)"
,"1 month"
,"1 year"
}.
- timestamp_col: str
Column that contains the timestamp of each row. The column must be one of the following:
A
TimestampType
column, orA column whose values can be converted to timestamps through the pyspark
to_timestamp
function.
Errors
- exception databricks.lakehouse_monitoring.LakehouseMonitoringError
Bases:
Exception
An error from the Lakehouse Monitoring service.