Databricks FeatureEngineeringClient

class databricks.feature_engineering.client.FeatureEngineeringClient(*, model_registry_uri: Optional[str] = None)

Bases: object

Client for interacting with the Databricks Feature Engineering in Unity Catalog.

Note

Use Databricks FeatureStoreClient for workspace feature tables in hive metastore

create_feature_serving_endpoint(*, name: str = None, config: databricks.ml_features.entities.feature_serving_endpoint.EndpointCoreConfig = None, **kwargs) → databricks.ml_features.entities.feature_serving_endpoint.FeatureServingEndpoint

Note

Experimental: This function may change or be removed in a future release without warning.

Experimental feature: Creates a Feature Serving Endpoint

Parameters:
  • name – The name of the endpoint. Must only contain alphanumerics and dashes.
  • config – Configuration of the endpoint, including features, workload_size, etc.
create_feature_spec(*, name: str, features: List[Union[databricks.ml_features.entities.feature_lookup.FeatureLookup, databricks.ml_features.entities.feature_function.FeatureFunction]], exclude_columns: Optional[List[str]] = None) → databricks.ml_features.entities.feature_spec_info.FeatureSpecInfo

Creates a feature specification in Unity Catalog. The feature spec can be used for serving features & functions.

Parameters:
  • name – The name of the feature spec.
  • features – List of FeatureLookups and FeatureFunctions to include in the feature spec.
  • exclude_columns – List of columns to drop from the final output.
update_feature_spec(*, name: str, owner: str) → None

Update the owner of a feature spec.

Parameters:
  • name – The name of the feature spec.
  • owner – The new owner of the feature spec.
delete_feature_spec(*, name: str) → None

Delete a feature spec.

Parameters:name – The name of the feature spec.
create_table(*, name: str, primary_keys: Union[str, List[str]], df: Optional[pyspark.sql.dataframe.DataFrame] = None, timeseries_columns: Union[str, List[str], None] = None, partition_columns: Union[str, List[str], None] = None, schema: Optional[pyspark.sql.types.StructType] = None, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, **kwargs) → databricks.ml_features.entities.feature_table.FeatureTable

Create and return a feature table with the given name and primary keys.

The returned feature table has the given name and primary keys. Uses the provided schema or the inferred schema of the provided df. If df is provided, this data will be saved in a Delta table. Supported data types for features are: IntegerType, LongType, FloatType, DoubleType, StringType, BooleanType, DateType, TimestampType, ShortType, ArrayType, MapType, and BinaryType, and DecimalType.

Parameters:
  • name – A feature table name. The format is <catalog_name>.<schema_name>.<table_name>, for example ml.dev.user_features.
  • primary_keys – The feature table’s primary keys. If multiple columns are required, specify a list of column names, for example ['customer_id', 'region'].
  • df – Data to insert into this feature table. The schema of df will be used as the feature table schema.
  • timeseries_columns

    Columns containing the event time associated with feature value. Timeseries columns should be part of the primary keys. Combined, the timeseries columns and other primary keys of the feature table uniquely identify the feature value for an entity at a point in time.

    Note

    Experimental: This argument may change or be removed in a future release without warning.

  • partition_columns

    Columns used to partition the feature table. If a list is provided, column ordering in the list will be used for partitioning.

    Note

    When choosing partition columns for your feature table, use columns that do not have a high cardinality. An ideal strategy would be such that you expect data in each partition to be at least 1 GB. The most commonly used partition column is a date.

    Additional info: Choosing the right partition columns for Delta tables

  • schema – Feature table schema. Either schema or df must be provided.
  • description – Description of the feature table.
  • tags – Tags to associate with the feature table.
create_training_set(*, df: pyspark.sql.dataframe.DataFrame, feature_lookups: List[Union[databricks.ml_features.entities.feature_lookup.FeatureLookup, databricks.ml_features.entities.feature_function.FeatureFunction]], label: Union[str, List[str], None], exclude_columns: Optional[List[str]] = None, **kwargs) → databricks.ml_features.training_set.TrainingSet

Create a TrainingSet.

Parameters:
  • df – The DataFrame used to join features into.
  • feature_lookups – List of features to use in the TrainingSet. FeatureLookups are joined into the DataFrame, and FeatureFunctions are computed on-demand.
  • label – Names of column(s) in DataFrame that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None.
  • exclude_columns – Names of the columns to drop from the TrainingSet DataFrame.
Returns:

A TrainingSet object.

delete_feature_serving_endpoint(*, name=None, **kwargs) → None

Note

Experimental: This function may change or be removed in a future release without warning.

Experimental feature

delete_feature_table_tag(*, name: str, key: str) → None

Delete the tag associated with the feature table. Deleting a non-existent tag will emit a warning.

Parameters:
  • name – the feature table name.
  • key – the tag key to delete.
drop_online_table(name: str, online_store: databricks.ml_features.online_store_spec.online_store_spec.OnlineStoreSpec) → None

Drop a table in an online store.

This API first attempts to make a call to the online store provider to drop the table. If successful, it then deletes the online store from the feature catalog.

Parameters:
  • name – Name of feature table associated with online store table to drop.
  • online_store – Specification of the online store.

Note

Deleting an online published table can lead to unexpected failures in downstream dependencies. Ensure that the online table being dropped is no longer used for Model Serving feature lookup or any other use cases.

drop_table(*, name: str) → None

Delete the specified feature table. This API also drops the underlying Delta table.

Parameters:name – A feature table name. The format is <catalog_name>.<schema_name>.<table_name>, for example ml.dev.user_features.

Note

Deleting a feature table can lead to unexpected failures in upstream producers and downstream consumers (models, endpoints, and scheduled jobs). You must delete any existing published online stores separately.

get_feature_serving_endpoint(*, name=None, **kwargs) → databricks.ml_features.entities.feature_serving_endpoint.FeatureServingEndpoint

Note

Experimental: This function may change or be removed in a future release without warning.

Experimental feature

get_table(*, name: str) → databricks.ml_features.entities.feature_table.FeatureTable

Get a feature table’s metadata.

Parameters:name – A feature table name. The format is <catalog_name>.<schema_name>.<table_name>, for example ml.dev.user_features.
log_model(*, model: Any, artifact_path: str, flavor: module, training_set: Optional[databricks.ml_features.training_set.TrainingSet] = None, registered_model_name: Optional[str] = None, await_registration_for: int = 300, infer_input_example: bool = False, **kwargs)

Log an MLflow model packaged with feature lookup information.

Note

The DataFrame returned by TrainingSet.load_df() must be used to train the model. If it has been modified (for example data normalization, add a column, and similar), these modifications will not be applied at inference time, leading to training-serving skew.

Parameters:
  • model – Model to be saved. This model must be capable of being saved by flavor.save_model. See the MLflow Model API.
  • artifact_path – Run-relative artifact path.
  • flavor – MLflow module to use to log the model. flavor should have type ModuleType. The module must have a method save_model, and must support the python_function flavor. For example, mlflow.sklearn, mlflow.xgboost, and similar.
  • training_set – The TrainingSet used to train this model.
  • registered_model_name

    Note

    Experimental: This argument may change or be removed in a future release without warning.

    If given, create a model version under registered_model_name, also creating a registered model if one with the given name does not exist.

  • await_registration_for – Number of seconds to wait for the model version to finish being created and is in READY status. By default, the function waits for five minutes. Specify 0 or None to skip waiting.
  • infer_input_example

    Note

    Experimental: This argument may change or be removed in a future release without warning.

    Automatically log an input example along with the model, using supplied training data. Defaults to False.

Returns:

None

publish_table(*, name: str, online_store: databricks.ml_features.online_store_spec.online_store_spec.OnlineStoreSpec, filter_condition: Optional[str] = None, mode: str = 'merge', streaming: bool = False, checkpoint_location: Optional[str] = None, trigger: Dict[str, Any] = {'processingTime': '5 minutes'}, features: Union[str, List[str], None] = None) → Optional[pyspark.sql.streaming.query.StreamingQuery]

Publish a feature table to an online store.

Parameters:
  • name – Name of the feature table.
  • online_store – Specification of the online store.
  • filter_condition – A SQL expression using feature table columns that filters feature rows prior to publishing to the online store. For example, "dt > '2020-09-10'". This is analogous to running df.filter or a WHERE condition in SQL on a feature table prior to publishing.
  • mode

    Specifies the behavior when data already exists in this feature table. The only supported mode is "merge", with which the new data will be merged in, under these conditions:

    • If a key exists in the online table but not the offline table, the row in the online table is unmodified.
    • If a key exists in the offline table but not the online table, the offline table row is inserted into the online table.
    • If a key exists in both the offline and the online tables, the online table row will be updated.
  • streaming – If True, streams data to the online store.
  • checkpoint_location – Sets the Structured Streaming checkpointLocation option. By setting a checkpoint_location, Spark Structured Streaming will store progress information and intermediate state, enabling recovery after failures. This parameter is only supported when streaming=True.
  • trigger – If streaming=True, trigger defines the timing of stream data processing. The dictionary will be unpacked and passed to DataStreamWriter.trigger as arguments. For example, trigger={'once': True} will result in a call to DataStreamWriter.trigger(once=True).
  • features

    Specifies the feature column(s) to be published to the online store. The selected features must be a superset of existing online store features. Primary key columns and timestamp key columns will always be published.

    Note

    When features is not set, the whole feature table will be published.

Returns:

If streaming=True, returns a PySpark StreamingQuery, None otherwise.

read_table(*, name: str, **kwargs) → pyspark.sql.dataframe.DataFrame

Read the contents of a feature table.

Parameters:name – A feature table name. The format is <catalog_name>.<schema_name>.<table_name>, for example ml.dev.user_features.
Returns:The feature table contents, or an exception will be raised if this feature table does not exist.
score_batch(*, model_uri: str, df: pyspark.sql.dataframe.DataFrame, result_type: str = 'double') → pyspark.sql.dataframe.DataFrame

Evaluate the model on the provided DataFrame.

Additional features required for model evaluation will be automatically retrieved from feature tables.

The model must have been logged with FeatureEngineeringClient.log_model(), which packages the model with feature metadata. Unless present in df, these features will be looked up from feature tables and joined with df prior to scoring the model.

If a feature is included in df, the provided feature values will be used rather than those stored in feature tables.

For example, if a model is trained on two features account_creation_date and num_lifetime_purchases, as in:

feature_lookups = [
    FeatureLookup(
        table_name = 'trust_and_safety.customer_features',
        feature_name = 'account_creation_date',
        lookup_key = 'customer_id',
    ),
    FeatureLookup(
        table_name = 'trust_and_safety.customer_features',
        feature_name = 'num_lifetime_purchases',
        lookup_key = 'customer_id'
    ),
]

with mlflow.start_run():
    training_set = fe.create_training_set(
        df,
        feature_lookups = feature_lookups,
        label = 'is_banned',
        exclude_columns = ['customer_id']
    )
    ...
      fe.log_model(
        model,
        "model",
        flavor=mlflow.sklearn,
        training_set=training_set,
        registered_model_name="example_model"
      )

Then at inference time, the caller of FeatureEngineeringClient.score_batch() must pass a DataFrame that includes customer_id, the lookup_key specified in the FeatureLookups of the training_set. If the DataFrame contains a column account_creation_date, the values of this column will be used in lieu of those in feature tables. As in:

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
    'models:/example_model/1',
    batch_df
)
Parameters:
  • model_uri

    The location, in URI format, of the MLflow model logged using FeatureEngineeringClient.log_model(). One of:

    • runs:/<mlflow_run_id>/run-relative/path/to/model
    • models:/<model_name>/<model_version>
    • models:/<model_name>/<stage>

    For more information about URI schemes, see Referencing Artifacts.

  • df

    The DataFrame to score the model on. Features from feature tables will be joined with df prior to scoring the model. df must:

    1. Contain columns for lookup keys required to join feature data from feature tables, as specified in the feature_spec.yaml artifact.

    2. Contain columns for all source keys required to score the model, as specified in the feature_spec.yaml artifact.

    3. Not contain a column prediction, which is reserved for the model’s predictions. df may contain additional columns.

    Streaming DataFrames are not supported.

  • result_type – The return type of the model. See mlflow.pyfunc.spark_udf() result_type.
Returns:

A DataFrame containing:

  1. All columns of df.
  2. All feature values retrieved from feature tables.
  3. A column prediction containing the output of the model.

set_feature_table_tag(*, name: str, key: str, value: str) → None

Create or update a tag associated with the feature table. If the tag with the corresponding key already exists, its value will be overwritten with the new value.

Parameters:
  • name – the feature table name
  • key – tag key
  • value – tag value
write_table(*, name: str, df: pyspark.sql.dataframe.DataFrame, mode: str = 'merge', checkpoint_location: Optional[str] = None, trigger: Dict[str, Any] = {'processingTime': '5 seconds'}) → Optional[pyspark.sql.streaming.query.StreamingQuery]

Writes to a feature table.

If the input DataFrame is streaming, will create a write stream.

Parameters:
  • name – A feature table name. The format is <catalog_name>.<schema_name>.<table_name>, for example ml.dev.user_features.
  • df – Spark DataFrame with feature data. Raises an exception if the schema does not match that of the feature table.
  • mode

    There is only one supported write mode:

    • "merge" upserts the rows in df into the feature table. If df contains columns not present in the feature table, these columns will be added as new features.

    If you want to overwrite a table, drop and recreate it.

  • checkpoint_location – Sets the Structured Streaming checkpointLocation option. By setting a checkpoint_location, Spark Structured Streaming will store progress information and intermediate state, enabling recovery after failures. This parameter is only supported when the argument df is a streaming DataFrame.
  • trigger – If df.isStreaming, trigger defines the timing of stream data processing, the dictionary will be unpacked and passed to DataStreamWriter.trigger as arguments. For example, trigger={'once': True} will result in a call to DataStreamWriter.trigger(once=True).
Returns:

If df.isStreaming, returns a PySpark StreamingQuery. None otherwise.