Databricks FeatureStoreClient
-
class
databricks.feature_store.client.
FeatureStoreClient
(feature_store_uri: Optional[str] = None, model_registry_uri: Optional[str] = None) Bases:
object
Client for interacting with the Databricks Feature Store.
Note
Use
Databricks FeatureEngineeringClient
for feature tables in Unity Catalog-
create_table
(name: str, primary_keys: Union[str, List[str]], df: Optional[pyspark.sql.dataframe.DataFrame] = None, *, timestamp_keys: Union[str, List[str], None] = None, partition_columns: Union[str, List[str], None] = None, schema: Optional[pyspark.sql.types.StructType] = None, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, **kwargs) → databricks.ml_features.entities.feature_table.FeatureTable Create and return a feature table with the given name and primary keys.
The returned feature table has the given name and primary keys. Uses the provided
schema
or the inferred schema of the provideddf
. Ifdf
is provided, this data will be saved in a Delta table. Supported data types for features are:IntegerType
,LongType
,FloatType
,DoubleType
,StringType
,BooleanType
,DateType
,TimestampType
,ShortType
,ArrayType
,MapType
, andBinaryType
, andDecimalType
.Parameters: - name – A feature table name.
For workspace-local feature table, the format is
<database_name>.<table_name>
, for exampledev.user_features
. For feature table in Unity Catalog, the format is<catalog_name>.<schema_name>.<table_name>
, for exampleml.dev.user_features
. - primary_keys – The feature table’s primary keys. If multiple columns are required,
specify a list of column names, for example
['customer_id', 'region']
. - df – Data to insert into this feature table. The schema of
df
will be used as the feature table schema. - timestamp_keys –
Columns containing the event time associated with feature value. Timestamp keys should be part of the primary keys. Combined, the timestamp keys and other primary keys of the feature table uniquely identify the feature value for an entity at a point in time.
Note
Experimental: This argument may change or be removed in a future release without warning.
- partition_columns –
Columns used to partition the feature table. If a list is provided, column ordering in the list will be used for partitioning.
Note
When choosing partition columns for your feature table, use columns that do not have a high cardinality. An ideal strategy would be such that you expect data in each partition to be at least 1 GB. The most commonly used partition column is a
date
.Additional info: Choosing the right partition columns for Delta tables
- schema – Feature table schema. Either
schema
ordf
must be provided. - description – Description of the feature table.
- tags – Tags to associate with the feature table.
Other Parameters: path (
Optional[str]
) – Path in a supported filesystem. Defaults to the database location.Note
The
path
argument is not supported for tables in Unity Catalog.
- name – A feature table name.
For workspace-local feature table, the format is
-
register_table
(*, delta_table: str, primary_keys: Union[str, List[str]], timestamp_keys: Union[str, List[str], None] = None, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None) → databricks.ml_features.entities.feature_table.FeatureTable Register an existing Delta table as a feature table with the given primary keys.
This API is not required if the table is already in Unity Catalog and has primary keys.
The registered feature table has the same name as the Delta table.
Parameters: - delta_table – A Delta table name. The table must exist in the metastore.
For workspace-local table, the format is
<database_name>.<table_name>
, for exampledev.user_features
. For table in Unity Catalog, the format is<catalog_name>.<schema_name>.<table_name>
, for exampleml.dev.user_features
. - primary_keys – The Delta table’s primary keys. If multiple columns are required,
specify a list of column names, for example
['customer_id', 'region']
. - timestamp_keys – Columns containing the event time associated with feature value. Timestamp keys should be part of the primary keys. Combined, the timestamp keys and other primary keys of the feature table uniquely identify the feature value for an entity at a point in time.
- description – Description of the feature table.
- tags – Tags to associate with the feature table.
Returns: A
FeatureTable
object.- delta_table – A Delta table name. The table must exist in the metastore.
For workspace-local table, the format is
-
get_table
(name: str) → databricks.ml_features.entities.feature_table.FeatureTable Get a feature table’s metadata.
Parameters: name – A feature table name. For workspace-local feature table, the format is <database_name>.<table_name>
, for exampledev.user_features
. For feature table in Unity Catalog, the format is<catalog_name>.<schema_name>.<table_name>
, for exampleml.dev.user_features
.
-
drop_table
(name: str) → None Note
Experimental: This function may change or be removed in a future release without warning.
Delete the specified feature table. This API also drops the underlying Delta table.
Parameters: name – The feature table name. For workspace-local feature table, the format is <database_name>.<table_name>
, for exampledev.user_features
. For feature table in Unity Catalog, the format is<catalog_name>.<schema_name>.<table_name>
, for exampleml.dev.user_features
.Note
Deleting a feature table can lead to unexpected failures in upstream producers and downstream consumers (models, endpoints, and scheduled jobs). You must delete any existing published online stores separately.
-
read_table
(name: str, **kwargs) → pyspark.sql.dataframe.DataFrame Read the contents of a feature table.
Parameters: name – A feature table name of the form <database_name>.<table_name>
, for exampledev.user_features
.Returns: The feature table contents, or an exception will be raised if this feature table does not exist.
-
write_table
(name: str, df: pyspark.sql.dataframe.DataFrame, mode: str = 'merge', checkpoint_location: Optional[str] = None, trigger: Dict[str, Any] = {'processingTime': '5 seconds'}) → Optional[pyspark.sql.streaming.query.StreamingQuery] Writes to a feature table.
If the input
DataFrame
is streaming, will create a write stream.Parameters: - name – A feature table name. Raises an exception if this feature table does not exist.
For workspace-local feature table, the format is
<database_name>.<table_name>
, for exampledev.user_features
. For feature table in Unity Catalog, the format is<catalog_name>.<schema_name>.<table_name>
, for exampleml.dev.user_features
. - df – Spark
DataFrame
with feature data. Raises an exception if the schema does not match that of the feature table. - mode –
Two supported write modes:
"overwrite"
updates the whole table."merge"
will upsert the rows indf
into the feature table. Ifdf
contains columns not present in the feature table, these columns will be added as new features.
- checkpoint_location – Sets the Structured Streaming
checkpointLocation
option. By setting acheckpoint_location
, Spark Structured Streaming will store progress information and intermediate state, enabling recovery after failures. This parameter is only supported when the argumentdf
is a streamingDataFrame
. - trigger – If
df.isStreaming
,trigger
defines the timing of stream data processing, the dictionary will be unpacked and passed toDataStreamWriter.trigger
as arguments. For example,trigger={'once': True}
will result in a call toDataStreamWriter.trigger(once=True)
.
Returns: If
df.isStreaming
, returns a PySparkStreamingQuery
.None
otherwise.- name – A feature table name. Raises an exception if this feature table does not exist.
For workspace-local feature table, the format is
-
add_data_sources
(*, feature_table_name: str, source_names: Union[str, List[str]], source_type: str = 'custom') → None Note
Experimental: This function may change or be removed in a future release without warning.
Add data sources to the feature table.
Note
Adding data sources is NOT supported for feature tables in Unity Catalog.
Parameters: - feature_table_name – The feature table name.
- source_names – Data source names. For multiple sources, specify a list. If a data source name already exists, it is ignored.
- source_type –
One of the following:
"table"
: Table in format <database_name>.<table_name> and is stored in the metastore (eg Hive)."path"
: Path, eg in the Databricks File System (DBFS)."custom"
: Manually added data source, neither a table nor a path.
-
delete_data_sources
(*, feature_table_name: str, source_names: Union[str, List[str]]) → None Note
Experimental: This function may change or be removed in a future release without warning.
Delete data sources from the feature table.
Note
Data sources of all types (table, path, custom) that match the source names will be deleted. Deleting data sources is NOT supported for feature tables in Unity Catalog.
Parameters: - feature_table_name – The feature table name.
- source_names – Data source names. For multiple sources, specify a list. If a data source name does not exist, it is ignored.
-
publish_table
(name: str, online_store: databricks.ml_features.online_store_spec.online_store_spec.OnlineStoreSpec, *, filter_condition: Optional[str] = None, mode: str = 'merge', streaming: bool = False, checkpoint_location: Optional[str] = None, trigger: Dict[str, Any] = {'processingTime': '5 minutes'}, features: Union[str, List[str], None] = None) → Optional[pyspark.sql.streaming.query.StreamingQuery] Publish a feature table to an online store.
Parameters: - name – Name of the feature table.
- online_store – Specification of the online store.
- filter_condition – A SQL expression using feature table columns that filters feature
rows prior to publishing to the online store. For example,
"dt > '2020-09-10'"
. This is analogous to runningdf.filter
or aWHERE
condition in SQL on a feature table prior to publishing. - mode –
Specifies the behavior when data already exists in this feature table in the online store. If
"overwrite"
mode is used, existing data is replaced by the new data. If"merge"
mode is used, the new data will be merged in, under these conditions:- If a key exists in the online table but not the offline table, the row in the online table is unmodified.
- If a key exists in the offline table but not the online table, the offline table row is inserted into the online table.
- If a key exists in both the offline and the online tables, the online table row will be updated.
- streaming – If
True
, streams data to the online store. - checkpoint_location – Sets the Structured Streaming
checkpointLocation
option. By setting acheckpoint_location
, Spark Structured Streaming will store progress information and intermediate state, enabling recovery after failures. This parameter is only supported whenstreaming=True
. - trigger – If
streaming=True
,trigger
defines the timing of stream data processing. The dictionary will be unpacked and passed toDataStreamWriter.trigger
as arguments. For example,trigger={'once': True}
will result in a call toDataStreamWriter.trigger(once=True)
. - features –
Specifies the feature column(s) to be published to the online store. The selected features must be a superset of existing online store features. Primary key columns and timestamp key columns will always be published.
Note
This parameter is only supported when
mode="merge"
. Whenfeatures
is not set, the whole feature table will be published.
Returns: If
streaming=True
, returns a PySparkStreamingQuery
,None
otherwise.
-
drop_online_table
(name: str, online_store: databricks.ml_features.online_store_spec.online_store_spec.OnlineStoreSpec) → None Drop a table in an online store.
This API first attempts to make a call to the online store provider to drop the table. If successful, it then deletes the online store from the feature catalog.
Parameters: - name – Name of feature table associated with online store table to drop.
- online_store – Specification of the online store.
Note
Deleting an online published table can lead to unexpected failures in downstream dependencies. Ensure that the online table being dropped is no longer used for Model Serving feature lookup or any other use cases.
-
create_training_set
(df: pyspark.sql.dataframe.DataFrame, feature_lookups: List[Union[databricks.ml_features.entities.feature_lookup.FeatureLookup, databricks.ml_features.entities.feature_function.FeatureFunction]], label: Union[str, List[str], None], exclude_columns: Optional[List[str]] = None, **kwargs) → databricks.ml_features.training_set.TrainingSet Create a
TrainingSet
.Parameters: - df – The
DataFrame
used to join features into. - feature_lookups – List of features to use in the
TrainingSet
.FeatureLookups
are joined into theDataFrame
, andFeatureFunctions
are computed on-demand. - label – Names of column(s) in
DataFrame
that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None. - exclude_columns – Names of the columns to drop from the
TrainingSet
DataFrame
.
Returns: A
TrainingSet
object.- df – The
-
log_model
(model: Any, artifact_path: str, *, flavor: module, training_set: Optional[databricks.ml_features.training_set.TrainingSet] = None, registered_model_name: Optional[str] = None, await_registration_for: int = 300, infer_input_example: bool = False, **kwargs) Log an MLflow model packaged with feature lookup information.
Note
The
DataFrame
returned byTrainingSet.load_df()
must be used to train the model. If it has been modified (for example data normalization, add a column, and similar), these modifications will not be applied at inference time, leading to training-serving skew.Parameters: - model – Model to be saved. This model must be capable of being saved by
flavor.save_model
. See the MLflow Model API. - artifact_path – Run-relative artifact path.
- flavor – MLflow module to use to log the model.
flavor
should have typeModuleType
. The module must have a methodsave_model
, and must support thepython_function
flavor. For example,mlflow.sklearn
,mlflow.xgboost
, and similar. - training_set – The
TrainingSet
used to train this model. - registered_model_name –
Note
Experimental: This argument may change or be removed in a future release without warning.
If given, create a model version under
registered_model_name
, also creating a registered model if one with the given name does not exist. - await_registration_for – Number of seconds to wait for the model version to finish
being created and is in
READY
status. By default, the function waits for five minutes. Specify0
orNone
to skip waiting. - infer_input_example –
Note
Experimental: This argument may change or be removed in a future release without warning.
Automatically log an input example along with the model, using supplied training data. Defaults to
False
.
Returns: - model – Model to be saved. This model must be capable of being saved by
-
score_batch
(model_uri: str, df: pyspark.sql.dataframe.DataFrame, result_type: str = 'double') → pyspark.sql.dataframe.DataFrame Evaluate the model on the provided
DataFrame
.Additional features required for model evaluation will be automatically retrieved from
Feature Store
.The model must have been logged with
FeatureStoreClient.log_model()
, which packages the model with feature metadata. Unless present indf
, these features will be looked up fromFeature Store
and joined withdf
prior to scoring the model.If a feature is included in
df
, the provided feature values will be used rather than those stored inFeature Store
.For example, if a model is trained on two features
account_creation_date
andnum_lifetime_purchases
, as in:feature_lookups = [ FeatureLookup( table_name = 'trust_and_safety.customer_features', feature_name = 'account_creation_date', lookup_key = 'customer_id', ), FeatureLookup( table_name = 'trust_and_safety.customer_features', feature_name = 'num_lifetime_purchases', lookup_key = 'customer_id' ), ] with mlflow.start_run(): training_set = fs.create_training_set( df, feature_lookups = feature_lookups, label = 'is_banned', exclude_columns = ['customer_id'] ) ... fs.log_model( model, "model", flavor=mlflow.sklearn, training_set=training_set, registered_model_name="example_model" )
Then at inference time, the caller of
FeatureStoreClient.score_batch()
must pass aDataFrame
that includescustomer_id
, thelookup_key
specified in theFeatureLookups
of thetraining_set
. If theDataFrame
contains a columnaccount_creation_date
, the values of this column will be used in lieu of those inFeature Store
. As in:# batch_df has columns ['customer_id', 'account_creation_date'] predictions = fs.score_batch( 'models:/example_model/1', batch_df )
Parameters: - model_uri –
The location, in URI format, of the MLflow model logged using
FeatureStoreClient.log_model()
. One of:runs:/<mlflow_run_id>/run-relative/path/to/model
models:/<model_name>/<model_version>
models:/<model_name>/<stage>
For more information about URI schemes, see Referencing Artifacts.
- df –
The
DataFrame
to score the model on.Feature Store
features will be joined withdf
prior to scoring the model.df
must:1. Contain columns for lookup keys required to join feature data from Feature Store, as specified in thefeature_spec.yaml
artifact.2. Contain columns for all source keys required to score the model, as specified in the
feature_spec.yaml
artifact.3. Not contain a column
prediction
, which is reserved for the model’s predictions.df
may contain additional columns.Streaming DataFrames are not supported.
- result_type – The return type of the model.
See
mlflow.pyfunc.spark_udf()
result_type.
Returns: A
DataFrame
containing:- All columns of
df
. - All feature values retrieved from Feature Store.
- A column
prediction
containing the output of the model.
- model_uri –
-
set_feature_table_tag
(*, table_name: str, key: str, value: str) → None Create or update a tag associated with the feature table. If the tag with the corresponding key already exists, its value will be overwritten with the new value.
Parameters: - table_name – the feature table name
- key – tag key
- value – tag value
-
delete_feature_table_tag
(*, table_name: str, key: str) → None Delete the tag associated with the feature table. Deleting a non-existent tag will emit a warning.
Parameters: - table_name – the feature table name.
- key – the tag key to delete.
-
create_feature_serving_endpoint
(*, name: str = None, config: databricks.ml_features.entities.feature_serving_endpoint.EndpointCoreConfig = None) → databricks.ml_features.entities.feature_serving_endpoint.FeatureServingEndpoint Note
Experimental: This function may change or be removed in a future release without warning.
Experimental feature: Creates a Feature Serving Endpoint :param name: The name of the endpoint. Must only contain alphanumerics and dashes. :param config: Configuration of the endpoint, including features, workload_size, etc.
-
get_feature_serving_endpoint
(*, name=None, **kwargs) → databricks.ml_features.entities.feature_serving_endpoint.FeatureServingEndpoint Note
Experimental: This function may change or be removed in a future release without warning.
Experimental feature
-
delete_feature_serving_endpoint
(*, name=None, **kwargs) → None Note
Experimental: This function may change or be removed in a future release without warning.
Experimental feature
-
create_feature_spec
(*, name: str, features: List[Union[databricks.ml_features.entities.feature_lookup.FeatureLookup, databricks.ml_features.entities.feature_function.FeatureFunction]], exclude_columns: Optional[List[str]] = None) → databricks.ml_features.entities.feature_spec_info.FeatureSpecInfo Note
Experimental: This function may change or be removed in a future release without warning.
Experimental feature: Creates a feature specification in Unity Catalog. The feature spec can be used for serving features & functions. :param name: The name of the feature spec. :param features: List of FeatureLookups and FeatureFunctions to include in the feature spec. :param exclude_columns: List of columns to drop from the final output.
-