Databricks FeatureEngineeringClient
- class databricks.feature_engineering.client.FeatureEngineeringClient(*, model_registry_uri: Optional[str] = None)
Bases:
objectClient for interacting with the Databricks Feature Engineering in Unity Catalog.
Note
Use
Databricks FeatureStoreClientfor workspace feature tables in hive metastore- create_feature(*, source: DataSource, function: Union[AggregationFunction, ColumnSelection], catalog_name: str, schema_name: str, entity: List[str], timeseries_column: str, name: Optional[str] = None, description: Optional[str] = None) Feature
Note
Experimental: This function may change or be removed in a future release without warning.
Create a Feature instance with comprehensive validation.
- Parameters
source – Required. The data source for this feature (DeltaTableSource, KafkaSource, DataFrameSource, or RequestSource)
function – Required. The function to apply: an AggregationFunction (carrying its inputs and time window) or a ColumnSelection. Input column names with periods are interpreted as struct field nesting (e.g., “value.amount” accesses the “amount” field in the “value” struct). Literal periods in column names are not supported.
catalog_name – Required. The catalog name for the feature
schema_name – Required. The schema name for the feature
entity – Required. List of entity column names that act as the lookup/aggregation keys. Required for all feature types.
timeseries_column – Required. The timeseries column name used for point-in-time joins, backfills, and aggregations.
name – Optional name for the feature
description – Optional description of the feature
- Returns
A validated Feature instance
- Raises
ValueError – If any validation fails
- register_feature(*, feature: Feature, catalog_name: Optional[str] = None, schema_name: Optional[str] = None) Feature
Note
Experimental: This function may change or be removed in a future release without warning.
Register a Feature constructed via
Feature(...)in Unity Catalog. Features with DataFrameSource cannot be registered.- Parameters
feature – A Feature instance constructed via
Feature(...).catalog_name – The Unity Catalog catalog name to register under. Optional if the Feature already has catalog_name set.
schema_name – The Unity Catalog schema name to register under. Optional if the Feature already has schema_name set.
- Returns
The registered Feature with server-populated fields.
- Raises
ValueError – If the feature source is DataFrameSource, or if catalog/schema names conflict between the Feature and the arguments.
- is_beta_feature_view(*, full_name: str) bool
Note
Experimental: This function may change or be removed in a future release without warning.
Check whether a Feature View is a beta feature.
Beta Feature Views (formerly “declarative features”) are represented as both a Feature object and a UC function of the same name. Public Preview features exist only as a Feature object.
- param full_name
The fully qualified feature name in
<catalog>.<schema>.<feature_name>format.- return
Trueif the feature is registered and a feature-typed UC function of the same name exists (beta),Falseotherwise (Public Preview).- raises NotFound
If the feature is not registered in Unity Catalog.
- raises ValueError
If
full_nameis not a non-empty string.
- list_beta_feature_views(*, catalog_name: Optional[str] = None, schema_name: Optional[str] = None, max_results: int = 100) List[Feature]
Note
Experimental: This function may change or be removed in a future release without warning.
List beta Feature Views across catalogs and schemas.
Scans UC functions and returns those that are the feature-as-function representation of a beta Feature View (formerly “declarative feature”), which should be migrated to the Public Preview feature representation. Candidates are classified directly from the listed function metadata, so no per-function lookup is performed to identify them.
The scan can be narrowed with the optional catalog_name and schema_name filters. When schema_name is supplied, catalog_name must also be provided.
- param catalog_name
If provided, only scan this catalog. Otherwise all accessible catalogs in the metastore are scanned.
- param schema_name
If provided, only scan this schema within the specified catalog. Requires catalog_name.
- param max_results
Maximum number of beta Feature Views to return, default 100. The scan stops once this many are found, bounding the work done; specifying a high number may result in a long-running operation.
- return
List of Feature instances for beta Feature Views, sorted by fully qualified name, at most max_results entries.
- raises ValueError
If schema_name is given without catalog_name, if catalog_name or schema_name is an empty string, or if max_results is not a positive integer.
- list_features(*, catalog_name: str, schema_name: str, max_results: int = 100) List[Feature]
List features in a given catalog and schema in Unity Catalog.
- Parameters
catalog_name – Name of the parent catalog for features of interest.
schema_name – Name of the parent schema relative to its parent catalog.
max_results – The maximum number of features to return, default is 100. Specifying a high number may result in a long-running operation.
- Returns
List of Feature instances
- delete_feature(*, full_name: str) None
Note
Experimental: This function may change or be removed in a future release without warning.
Delete a feature from Unity Catalog.
- Parameters
full_name – The full name of the feature in format ‘<catalog>.<schema>.<feature_name>’
- list_materialized_features(*, feature_name: str, max_results: int = 100) List[MaterializedFeature]
List materialized features for a given feature in the user’s Unity Catalog metastore.
- Parameters
feature_name – Full name of the feature whose materializations should be listed.
max_results – The maximum number of materialized features to return, default is 100. Specifying a high number may result in a long-running operation.
- Returns
List of MaterializedFeature instances
- delete_materialized_feature(*, materialized_feature: MaterializedFeature) None
Delete a materialized feature
- Parameters
materialized_feature – The materialized feature to delete.
- grant_feature_serving_access(*, grant_to: List[str], model_uri: Optional[str] = None, feature_spec: Optional[Union[str, FeatureSpecDeclarative]] = None, dry_run: bool = False) FeatureServingAccessReport
Note
Experimental: This function may change or be removed in a future release without warning.
Grant
SELECTon the online tables a model or feature spec serves from.Resolves each of the input’s features to its online (Lakebase) materialized table and grants
SELECTon those tables (plusUSE_CATALOGandUSE_SCHEMA) togrant_to. It always returns a report mapping each table to the features it contains. The online table for a feature can hold other features materialized at the same time. Not all of these features may be used by the feature spec or model for serving. A report is generated to understand which additional feature columns you are granting access to.Exactly one of
model_uriorfeature_specmust be provided.- param grant_to
Principals (for example, email addresses) to grant
SELECTto.- param model_uri
URI of a logged model to determine the features and materialized tables needed, for example
models:/<name>/<version>ormodels:/<name>@<alias>.- param feature_spec
The name of a registered feature spec (loaded from Unity Catalog), or a
FeatureSpecDeclarativeobject.- param dry_run
When
True, only report what would be granted without applying any grants. Use this to preview the access (including unused features) before granting.- return
A
FeatureServingAccessReport.
- feature_spec_from_batch_declarative_features(*, name: str, features: List[Feature], exclude_columns: Optional[List[str]] = None) FeatureSpecInfo
Note
Experimental: This function may change or be removed in a future release without warning.
Create a
FeatureSpecInfofrom a list ofFeatureobjects.Deprecated since version Use:
create_feature_spec()withFeatureobjects directly instead. This method is kept for backward compatibility and will be removed in a future release.- Parameters
name – The name of the feature spec to create.
features – List of
Featureobjects to include in the feature spec.exclude_columns – Optional list of columns to drop from the final output.
- Returns
A
FeatureSpecInfofor the created feature spec.
- create_feature_serving_endpoint(*, name: Optional[str] = None, config: Optional[EndpointCoreConfig] = None, **kwargs) FeatureServingEndpoint
Creates a Feature Serving Endpoint
- Parameters
name – The name of the endpoint. Must only contain alphanumerics and dashes.
config – Configuration of the endpoint, including features, workload_size, etc.
- create_feature_spec(*, name: str, features: List[Union[FeatureLookup, FeatureFunction, Feature]], exclude_columns: Optional[List[str]] = None) FeatureSpecInfo
Creates a feature specification in Unity Catalog. The feature spec can be used for serving features & functions.
- Parameters
name – The name of the feature spec.
features – List of FeatureLookups, FeatureFunctions, or Feature objects to include in the feature spec. Feature objects cannot be mixed with FeatureLookups or FeatureFunctions.
exclude_columns – List of columns to drop from the final output. Not supported with Feature objects.
- update_feature_spec(*, name: str, owner: str) None
Update the owner of a feature spec.
- Parameters
name – The name of the feature spec.
owner – The new owner of the feature spec.
- delete_feature_spec(*, name: str) None
Delete a feature spec.
- Parameters
name – The name of the feature spec.
- create_table(*, name: str, primary_keys: Union[str, List[str]], df: Optional[DataFrame] = None, timeseries_column: Optional[str] = None, partition_columns: Optional[Union[str, List[str]]] = None, schema: Optional[StructType] = None, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, **kwargs) FeatureTable
Create and return a feature table with the given name and primary keys.
The returned feature table has the given name and primary keys. Uses the provided
schemaor the inferred schema of the provideddf. Ifdfis provided, this data will be saved in a Delta table. Supported data types for features are:IntegerType,LongType,FloatType,DoubleType,StringType,BooleanType,DateType,TimestampType,ShortType,ArrayType,MapType, andBinaryType,DecimalType, andStructType.- Parameters
name – A feature table name. The format is
<catalog_name>.<schema_name>.<table_name>, for exampleml.dev.user_features.primary_keys – The feature table’s primary keys. If multiple columns are required, specify a list of column names, for example
['customer_id', 'region'].df – Data to insert into this feature table. The schema of
dfwill be used as the feature table schema.timeseries_column – Column containing the event time associated with feature value. Timeseries column should be part of the primary keys. Combined, the timeseries column and other primary keys of the feature table uniquely identify the feature value for an entity at a point in time.
partition_columns –
Columns used to partition the feature table. If a list is provided, column ordering in the list will be used for partitioning.
Note
When choosing partition columns for your feature table, use columns that do not have a high cardinality. An ideal strategy would be such that you expect data in each partition to be at least 1 GB. The most commonly used partition column is a
date.Additional info: Choosing the right partition columns for Delta tables
schema – Feature table schema. Either
schemaordfmust be provided.description – Description of the feature table.
tags – Tags to associate with the feature table.
- create_training_set(*, df: DataFrame, feature_lookups: Optional[List[Union[FeatureLookup, FeatureFunction]]] = None, feature_spec: Optional[str] = None, features: Optional[List[Feature]] = None, label: Optional[Union[str, List[str]]], exclude_columns: Optional[List[str]] = None, use_spark_native_join: bool = False, **kwargs) TrainingSet
Create a
TrainingSetusing feature_lookups, feature_spec, or features.- Parameters
df – The
DataFrameused to join features into.feature_lookups – List of features to use in the
TrainingSet.FeatureLookupsare joined into theDataFrame, andFeatureFunctionsare computed on-demand.feature_lookupscannot be specified iffeature_specorfeaturesis provided.feature_spec – Full name of the FeatureSpec in Unity Catalog.
feature_speccannot be specified iffeature_lookupsorfeaturesis provided.features – List of
Featureobjects to use in the training set.featurescannot be specified iffeature_lookupsorfeature_specis provided.label – Names of column(s) in
DataFramethat contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None.exclude_columns – Names of the columns to drop from the
TrainingSetDataFrame.use_spark_native_join – Use spark to optimize table joining performance. The optimization is only applicable when Photon is enabled.
- Returns
A
TrainingSetobject.
- delete_feature_serving_endpoint(*, name=None, **kwargs) None
- delete_feature_table_tag(*, name: str, key: str) None
Delete the tag associated with the feature table. Deleting a non-existent tag will emit a warning.
- Parameters
name – the feature table name.
key – the tag key to delete.
- drop_online_table(name: str, online_store: OnlineStoreSpec) None
Drop a table in an online store.
This API first attempts to make a call to the online store provider to drop the table. If successful, it then deletes the online store from the feature catalog.
- Parameters
name – Name of feature table associated with online store table to drop.
online_store – Specification of the online store.
Note
Deleting an online published table can lead to unexpected failures in downstream dependencies. Ensure that the online table being dropped is no longer used for Model Serving feature lookup or any other use cases.
- drop_table(*, name: str) None
Delete the specified feature table. This API also drops the underlying Delta table.
- Parameters
name – A feature table name. The format is
<catalog_name>.<schema_name>.<table_name>, for exampleml.dev.user_features.
Note
Deleting a feature table can lead to unexpected failures in upstream producers and downstream consumers (models, endpoints, and scheduled jobs). You must delete any existing published online stores separately.
- get_feature_serving_endpoint(*, name=None, **kwargs) FeatureServingEndpoint
- get_table(*, name: str) FeatureTable
Get a feature table’s metadata.
- Parameters
name – A feature table name. The format is
<catalog_name>.<schema_name>.<table_name>, for exampleml.dev.user_features.
- log_model(*, model: Any, artifact_path: str, flavor: module, training_set: Optional[TrainingSet] = None, registered_model_name: Optional[str] = None, await_registration_for: int = 300, infer_input_example: bool = False, **kwargs)
Log an MLflow model packaged with feature lookup information.
Note
The
DataFramereturned byTrainingSet.load_df()must be used to train the model. If it has been modified (for example data normalization, add a column, and similar), these modifications will not be applied at inference time, leading to training-serving skew.- Parameters
model – Model to be saved. This model must be capable of being saved by
flavor.save_model. See the MLflow Model API.artifact_path – Run-relative artifact path.
flavor – MLflow module to use to log the model.
flavorshould have typeModuleType. The module must have a methodsave_model, and must support thepython_functionflavor. For example,mlflow.sklearn,mlflow.xgboost, and similar.training_set – The
TrainingSetused to train this model.registered_model_name – When provided, create a model version under
registered_model_name, also creating a registered model if one with the given name does not exist.await_registration_for – Number of seconds to wait for the model version to finish being created and is in
READYstatus. By default, the function waits for five minutes. Specify0orNoneto skip waiting.infer_input_example –
Note
Experimental: This argument may change or be removed in a future release without warning.
Automatically log an input example along with the model, using supplied training data. Defaults to
False.
- Kwargs
If other keyword arguments are provided, in most cases, they are passed to the underlying MLflow API flavor.save_model() when saving and registering the model.
Note
signatureis not recommended and it’s preferred to useinfer_input_example.output_schema: When logging a model without labels in the training set, you must passoutput_schematolog_modelto suggest the output schema explicitly. For example:from mlflow.types import ColSpec, DataType, Schema output_schema = Schema([ColSpec(DataType.???)]) # Refer to mlflow signature types for the right choice of type here ... fe.log_model( ... output_schema=output_schema )
- Returns
- publish_table(*, online_store: Union[OnlineStoreSpec, DatabricksOnlineStore], source_table_name: Optional[str] = None, online_table_name: Optional[str] = None, publish_mode: str = 'TRIGGERED', filter_condition: Optional[str] = None, mode: str = 'merge', streaming: bool = False, checkpoint_location: Optional[str] = None, trigger: Dict[str, Any] = {'processingTime': '5 minutes'}, features: Optional[Union[str, List[str]]] = None, **kwargs) Optional[Union[StreamingQuery, PublishedTable]]
Publish a feature table to an online store.
- Parameters
source_table_name – Name of the feature table. This is a required parameter.
online_table_name – Name of the online table. This is a required parameter when publishing to Databricks Online Store. The catalog name, schema name, and table name each are limited to a maximum of 63 bytes.
online_store – Specification of the online store. This is a required parameter.
publish_mode – supported modes are
"SNAPSHOT","CONTINUOUS", and"TRIGGERED". Default is"TRIGGERED".
Note
Change Data Feed (CDF) must be enabled for CONTINUOUS and TRIGGERED modes.
Legacy parameters: The following parameters are only used by third party online stores.
- Parameters
filter_condition – A SQL expression using feature table columns that filters feature rows prior to publishing to the online store. For example,
"dt > '2020-09-10'". This is analogous to runningdf.filteror aWHEREcondition in SQL on a feature table prior to publishing.mode –
Specifies the behavior when data already exists in this feature table. The only supported mode is
"merge", with which the new data will be merged in, under these conditions:If a key exists in the online table but not the offline table, the row in the online table is unmodified.
If a key exists in the offline table but not the online table, the offline table row is inserted into the online table.
If a key exists in both the offline and the online tables, the online table row will be updated.
streaming – If
True, streams data to the online store.checkpoint_location – Sets the Structured Streaming
checkpointLocationoption. By setting acheckpoint_location, Spark Structured Streaming will store progress information and intermediate state, enabling recovery after failures. This parameter is only supported whenstreaming=True.trigger – If
streaming=True,triggerdefines the timing of stream data processing. The dictionary will be unpacked and passed toDataStreamWriter.triggeras arguments. For example,trigger={'once': True}will result in a call toDataStreamWriter.trigger(once=True).features –
Specifies the feature column(s) to be published to the online store. The selected features must be a superset of existing online store features. Primary key columns and timestamp key columns will always be published.
Note
When
featuresis not set, the whole feature table will be published.
- Returns
If
streaming=True, returns a PySparkStreamingQuery,Noneotherwise.
- read_table(*, name: str, **kwargs) DataFrame
Read the contents of a feature table.
- Parameters
name – A feature table name. The format is
<catalog_name>.<schema_name>.<table_name>, for exampleml.dev.user_features.- Returns
The feature table contents, or an exception will be raised if this feature table does not exist.
- score_batch(*, model_uri: str, df: DataFrame, result_type: str = 'double', env_manager: str = 'local', params: Optional[dict[str, Any]] = None, use_spark_native_join: bool = False, **kwargs) DataFrame
Evaluate the model on the provided
DataFrame.Additional features required for model evaluation will be automatically retrieved from feature tables.
The model must have been logged with
FeatureEngineeringClient.log_model(), which packages the model with feature metadata. Unless present indf, these features will be looked up from feature tables and joined withdfprior to scoring the model.If a feature is included in
df, the provided feature values will be used rather than those stored in feature tables.For example, if a model is trained on two features
account_creation_dateandnum_lifetime_purchases, as in:feature_lookups = [ FeatureLookup( table_name = 'trust_and_safety.customer_features', feature_name = 'account_creation_date', lookup_key = 'customer_id', ), FeatureLookup( table_name = 'trust_and_safety.customer_features', feature_name = 'num_lifetime_purchases', lookup_key = 'customer_id' ), ] with mlflow.start_run(): training_set = fe.create_training_set( df, feature_lookups = feature_lookups, label = 'is_banned', exclude_columns = ['customer_id'] ) ... fe.log_model( model, "model", flavor=mlflow.sklearn, training_set=training_set, registered_model_name="example_model" )
Then at inference time, the caller of
FeatureEngineeringClient.score_batch()must pass aDataFramethat includescustomer_id, thelookup_keyspecified in theFeatureLookupsof theTrainingSet. If theDataFramecontains a columnaccount_creation_date, the values of this column will be used in lieu of those in feature tables. As in:# batch_df has columns ['customer_id', 'account_creation_date'] predictions = fe.score_batch( 'models:/example_model/1', batch_df )
- Parameters
model_uri –
The location, in URI format, of the MLflow model logged using
FeatureEngineeringClient.log_model(). One of:runs:/<mlflow_run_id>/run-relative/path/to/modelmodels:/<model_name>/<model_version>models:/<model_name>/<stage>
For more information about URI schemes, see Referencing Artifacts.
df –
The
DataFrameto score the model on. Features from feature tables will be joined withdfprior to scoring the model.dfmust:1. Contain columns for lookup keys required to join feature data from feature tables, as specified in the
feature_spec.yamlartifact.2. Contain columns for all source keys required to score the model, as specified in the
feature_spec.yamlartifact.3. Not contain a column
prediction, which is reserved for the model’s predictions.dfmay contain additional columns.Streaming DataFrames are not supported.
result_type – The return type of the model. See
mlflow.pyfunc.spark_udf()result_type.env_manager – The environment manager to use in order to create the python environment for model inference. See
mlflow.pyfunc.spark_udf()env_manager.params – Additional parameters to pass to the model for inference.
use_spark_native_join –
Use spark to optimize table joining performance. The optimization is only applicable when Photon is enabled.
- Returns
A
DataFramecontaining:All columns of
df.All feature values retrieved from feature tables.
A column
predictioncontaining the output of the model.
- set_feature_table_tag(*, name: str, key: str, value: str) None
Create or update a tag associated with the feature table. If the tag with the corresponding key already exists, its value will be overwritten with the new value.
- Parameters
name – the feature table name
key – tag key
value – tag value
- write_table(*, name: str, df: DataFrame, mode: str = 'merge', checkpoint_location: Optional[str] = None, trigger: Dict[str, Any] = {'processingTime': '5 seconds'}) Optional[StreamingQuery]
Writes to a feature table.
If the input
DataFrameis streaming, will create a write stream.- Parameters
name – A feature table name. The format is
<catalog_name>.<schema_name>.<table_name>, for exampleml.dev.user_features.df – Spark
DataFramewith feature data. Raises an exception if the schema does not match that of the feature table.mode –
There is only one supported write mode:
"merge"upserts the rows indfinto the feature table. Ifdfcontains columns not present in the feature table, these columns will be added as new features.
If you want to overwrite a table, run SQL
DELETE FROM <table name>;to delete all rows, or drop and recreate the table before calling this method.checkpoint_location – Sets the Structured Streaming
checkpointLocationoption. By setting acheckpoint_location, Spark Structured Streaming will store progress information and intermediate state, enabling recovery after failures. This parameter is only supported when the argumentdfis a streamingDataFrame.trigger – If
df.isStreaming,triggerdefines the timing of stream data processing, the dictionary will be unpacked and passed toDataStreamWriter.triggeras arguments. For example,trigger={'once': True}will result in a call toDataStreamWriter.trigger(once=True).
- Returns
If
df.isStreaming, returns a PySparkStreamingQuery.Noneotherwise.
- compute_features(*, features: List[Feature]) DataFrame
Compute feature values for every row in the source table.
Evaluates each feature at every row’s own timestamp using the point-in-time window engine, producing the historical values that materialization would generate. Useful for prototyping and validating feature definitions before creating a pipeline.
Each feature is scored using only data strictly before that row’s timestamp, respecting the feature’s time window (rolling, sliding, or tumbling), entity columns, and any source-level filter condition.
The returned DataFrame contains the feature’s entity columns, timeseries column, and one column per computed feature. Other source columns are not included, and exact duplicate source rows collapse to a single output row.
All features must read from the same source table. Features may use different entity columns, timeseries columns, filter conditions, and time windows — these are handled independently and joined on the output.
- Parameters
features – Features to compute. All must share the same source table.
- Returns
A
DataFramewith entity columns, the timeseries column, and one column per feature.
- aggregate_features(*, features: FeatureAggregations) DataFrame
Computes the specified aggregations and returns a DataFrame containing the results.
- Parameters
features – The aggregation specification to compute.
- create_materialized_view(*, features: FeatureAggregations, output_table: str, schedule: Optional[CronSchedule]) MaterializedViewInfo
Creates and runs a pipeline that materializes the given feature aggregation specification into a materialized view.
- Parameters
features – The aggregation specification to materialize.
output_table – The name of the output materialized view.
schedule – The schedule at which to run the materialization pipeline. If not provided, the pipeline can only be run manually.
- create_pipeline(*, features: List[Feature], offline_store_config: OfflineStoreConfig, schedule: Optional[CronSchedule] = None, start_time: Optional[datetime] = None) List[MaterializedViewInfo]
Note
Experimental: This function may change or be removed in a future release without warning.
Creates and runs a pipeline that materializes the given features into tables.
Deprecated since version This: method is deprecated and not supported as part of Feature Views PuPr.
Features are grouped by (source, granularity) and each unique combination creates a separate materialized view.
Restrictions:
Supports SlidingWindow and TumblingWindow features
Features within same (source, granularity) group must have same granularity (slide_duration for SlidingWindow, window_duration for TumblingWindow)
All features must use a single column for aggregation
RollingWindow not supported, at this time
- Parameters
features – List of features to materialize
offline_store_config – The offline store configuration containing catalog, schema, and table prefix
schedule – The schedule at which to run the pipeline (applied to all created views)
start_time – The earliest time to generate features from. If not provided, will use the minimum timestamp from the source table’s timeseries column
- Returns
List of MaterializedViewInfo, one per unique (source, granularity) combination
- build_model_env(model_uri: str, save_path: str) str
Prebuild the model Python environment required by the given model and generate an archive file saved to the specified
save_path. The resulting environment can then be used inFeatureEngineeringClient.score_batch()as theprebuilt_env_uriparameter.- Parameters
model_uri – URI of the model used to build the Python environment.
save_path – Directory path to save the prebuilt model environment archive file. This can be a local directory path, a mounted DBFS path (e.g., ‘/dbfs/…’), or a mounted UC volume path (e.g., ‘/Volumes/…’).
- Returns
The path of the archive file containing the Python environment data.
- create_online_store(*, name: str, capacity: str, read_replica_count: Optional[int] = None, usage_policy_id: Optional[str] = None) DatabricksOnlineStore
Create an Online Feature Store.
- Parameters
name – The name of the online store. Maximum of 63 bytes.
capacity – The capacity of the online store. Valid values are “CU_1”, “CU_2”, “CU_4”, “CU_8”.
read_replica_count – Optional. The number of read replicas for the online store.
usage_policy_id – Optional. ID of the usage policy to associate with the online store used for cost tracking.
- Returns
The created online store.
- get_online_store(*, name: str) Optional[DatabricksOnlineStore]
Get an Online Feature Store.
- Parameters
name – The name of the online store.
- Returns
The retrieved online store, or None if not found.
- update_online_store(*, name: str, capacity: ~typing.Union[str, ~databricks.feature_engineering.client._UnsetType] = <UNSET>, read_replica_count: ~typing.Union[int, ~databricks.feature_engineering.client._UnsetType] = <UNSET>, usage_policy_id: ~typing.Union[str, ~databricks.feature_engineering.client._UnsetType] = <UNSET>) DatabricksOnlineStore
Update an Online Feature Store. Only the fields specified will be updated. Fields that are not specified will remain unchanged.
- Parameters
name – The name of the online store.
capacity – Optional. The capacity of the online store. Valid values are “CU_1”, “CU_2”, “CU_4”, “CU_8”.
read_replica_count – Optional. The number of read replicas for the online store.
usage_policy_id – Optional. ID of the usage policy to associate with the online store used for cost tracking.
- Returns
The updated online store.
- delete_online_store(*, name: str) None
Delete an Online Feature Store.
- Parameters
name – The name of the online store.
- Returns
None.
- list_online_stores() List[DatabricksOnlineStore]
List available Online Feature Stores. These are online stores that are associated with a feature store. Note that online stores not associated with a feature store will be excluded from the output.
- Returns
A list of DatabricksOnlineStore objects.
- materialize_features(*, features: List[Feature], offline_config: Optional[OfflineStoreConfig] = None, online_config: Optional[OnlineStoreConfig] = None, trigger: Optional[Union[CronSchedule, TableTrigger, StreamingMode]] = None) Optional[List[MaterializedFeature]]
Direct Databricks to materialize a list of feature definitions stored in Unity Catalog to an offline (Delta Live Table) store and/or an online store for model serving. Note that the features must already exist in Unity Catalog as functions, which can be created using
FeatureEngineeringClient.create_feature().Features are handled differently based on their source and function type. offline_config must not be provided if the call contains no batch aggregation features.
Batch aggregation features (Delta table source with aggregation function): Can be materialized to offline or online stores. Requires offline_config. The online_config is optional and if provided, there will be an offline and online materialized feature returned for each batch feature provided.
Batch column selection features (Delta table source with ColumnSelection function): Can only be materialized to online stores. Requires online_config. Pass offline_config only with batch aggregation features; otherwise the call is rejected.
Streaming features (Kafka source): Can only be materialized to online stores. Cannot be mixed with batch features in the same call because batch and streaming pipelines require different trigger types. Issue separate calls. offline_config is not supported for streaming features.
- Parameters
features – The list of features to materialize.
offline_config – Offline store configuration. Required when the call includes batch aggregation features; must be omitted otherwise. Not supported for streaming features.
online_config – The online store configuration. Required for column selection and streaming features, optional for batch aggregation features.
trigger – Required. The trigger for the materialization pipeline. Must be a
Triggersubclass (e.g.CronSchedule,TableTrigger, orStreamingMode). The trigger carries the scheduling configuration.TableTriggeris required forColumnSelectionfeatures backed by aDeltaTableSourceand only supported for that combination;StreamingModeselects the Structured Streaming trigger mode and is only supported for streaming (KafkaSource) features;CronScheduleis used for every other supported feature shape. Mixing trigger types across feature shapes in a single call is not supported – issue separatematerialize_featurescalls instead.
- Returns
The list of materialized features.
- Example:
When the feature set contains both
ColumnSelectionfeatures (which needTableTrigger) and aggregation features (which needCronSchedule), issue two separate calls:fe.materialize_features( features=column_selection_features, online_config=online_cfg, trigger=TableTrigger(), ) fe.materialize_features( features=aggregation_features, offline_config=offline_cfg, online_config=online_cfg, trigger=CronSchedule( quartz_cron_expression="0 0 * * * ?", timezone_id="UTC", ), )
- create_kafka_config(*, name: str, bootstrap_servers: str, subscription_mode: SubscriptionMode, auth_config: AuthConfig, key_schema: Optional[SchemaConfig] = None, value_schema: Optional[SchemaConfig] = None, extra_options: Optional[Dict[str, str]] = None, backfill_source: Optional[Union[BackfillSource, str]] = None) KafkaConfig
Note
Experimental: This function may change or be removed in a future release without warning.
Create a Kafka configuration for streaming feature ingestion.
Deprecated since version Use:
create_stream()instead. Kafka configs will be removed in a future release.- Parameters
name – Required. A unique name for the Kafka configuration within the metastore. Can be distinct from the topic name.
bootstrap_servers – Required. A comma-separated list of host/port pairs pointing to the Kafka cluster (e.g. “host1:port1,host2:port2”). See https://spark.apache.org/docs/latest/streaming/structured-streaming-kafka-integration.html for more details on this parameter.
subscription_mode – Required. A SubscriptionMode instance specifying how to select Kafka topics to consume from. Supports three modes: subscribe (comma-separated topic list), subscribe_pattern (regex pattern), or assign (JSON with specific topic-partitions). See https://spark.apache.org/docs/latest/streaming/structured-streaming-kafka-integration.html for more details on this parameter.
auth_config –
Required. An AuthConfig instance containing authentication configuration for connecting to Kafka topics. Supports two auth modes:
Unity Catalog service credentials. See https://docs.databricks.com/aws/en/connect/unity-catalog/cloud-services for setting up a UC service credential.
Mutual-TLS via JKS keystore/truststore files on a Unity Catalog volume with passwords referenced through Databricks secret scopes. See https://docs.databricks.com/aws/en/connect/streaming/kafka/authentication#use-ssl-to-connect-databricks-to-kafka for the setup pattern.
key_schema – Optional. A SchemaConfig instance defining the schema for extracting message keys from Kafka topics. Supports IEFT JSON Schema format (https://json-schema.org/). At least one of key_schema or value_schema must be provided.
value_schema – Optional. A SchemaConfig instance defining the schema for extracting message values from Kafka topics. Supports IEFT JSON Schema format (https://json-schema.org/). At least one of key_schema or value_schema must be provided.
extra_options – Optional. A dictionary of additional Kafka consumer configuration options. Keys should be source options or Kafka consumer options (e.g. {“kafka.security.protocol”: “SASL_SSL”}). See https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html for more details on optional configurations.
backfill_source – Optional. A user-provided source for backfilling historical data. The schema for this source must match exactly that of the key and value schemas specified for this Kafka config. Can be passed as a three-part Delta table name string (e.g. “catalog.schema.table”) or as a DeltaTableSource instance (deprecated). Note: backfill_source is used when creating a training set for Kafka backed features. At a future milestone, this data will be used for storing an offline copy of Kafka data and backfilling materialized features.
- Returns
KafkaConfig instance containing the created configuration.
- get_kafka_config(*, name: str) KafkaConfig
Note
Experimental: This function may change or be removed in a future release without warning.
Retrieve a Kafka configuration by name within the caller’s metastore.
Deprecated since version Use:
get_stream()instead. Kafka configs will be removed in a future release.- Parameters
name – Required. The name of the Kafka configuration to retrieve.
- Returns
KafkaConfig instance containing the configuration.
- list_kafka_configs(*, max_results: int = 50, include_schemas: bool = False) List[KafkaConfig]
Note
Experimental: This function may change or be removed in a future release without warning.
List Kafka configurations in the current metastore.
Deprecated since version Use:
list_streams()instead. Kafka configs will be removed in a future release.- Parameters
max_results – The maximum number of configs to return, default is 50. Specifying a high number may result in a long-running operation.
include_schemas – Whether to include schema information in the response, default is False. Schemas may be large and can be retrieved individually instead using
FeatureEngineeringClient.get_kafka_config(). Specifying True may result in a long-running operation.
- Returns
List of KafkaConfig instances.
- delete_kafka_config(*, name: str) None
Note
Experimental: This function may change or be removed in a future release without warning.
Delete a Kafka configuration.
Deprecated since version Use:
delete_stream()instead. Kafka configs will be removed in a future release.- Parameters
name – Required. The name of the Kafka configuration to delete.
Note
The caller must be the creator of the Kafka configuration in order to delete it.
- create_stream(*, name: str, source_config: Union[KafkaStreamConfig, StreamSourceConfig], connection_config: Union[DirectMtlsConfig, StreamConnectionConfig], schema_config: Union[DirectSchemas, StreamSchemaConfig], ingestion_config: IngestionConfig, description: Optional[str] = None) Stream
Note
Experimental: This function may change or be removed in a future release without warning.
Create a Stream, a governed Unity Catalog entity representing an external streaming data source.
Authorization on the Stream is governed by the ingestion table: SELECT on the ingestion table grants read access to the Stream, and MANAGE on the ingestion table grants delete access.
- Parameters
name – Required. The full three-part name of the stream (catalog.schema.stream).
source_config –
Required. Specifies the type of streaming source (e.g. Kafka). Accepts either:
A platform-specific config instance (e.g. KafkaStreamConfig) which is automatically wrapped into a StreamSourceConfig.
A StreamSourceConfig instance directly.
For Kafka streams:
KafkaStreamConfig( subscription_mode=KafkaSubscriptionMode(subscribe="my-topic"), extra_options={"maxOffsetsPerTrigger": "1000"}, )
connection_config –
Required. Specifies how to connect and authenticate to the stream platform. Accepts either:
A StreamConnectionConfig instance, e.g.
StreamConnectionConfig(uc_connection_name="my-kafka-connection")for UC connections.A DirectMtlsConfig instance for direct mTLS authentication (automatically wrapped). In a future milestone, this will be deprecated when Kafka UC Connections support mTLS.
schema_config –
Required. Schema definitions for the stream. Accepts either:
A DirectSchemas instance (automatically wrapped into a StreamSchemaConfig), e.g.:
DirectSchemas( payload_schema=SchemaConfig(json_schema='{"type": "object", ...}'), key_schema=SchemaConfig(json_schema='{"type": "object", ...}'), )
A StreamSchemaConfig instance directly. In a future milestone, this will also support schema registries through a UC Connection.
At least one of
payload_schemaorkey_schemamust be provided in the DirectSchemas.ingestion_config –
Required. Configuration for the Databricks-managed ingestion pipeline. Groups the ingestion destination table, optional backfill source, and deduplication settings:
from databricks.feature_engineering.entities import StreamBackfillSource IngestionConfig( ingestion_destination=IngestionDestination( delta_table_name="catalog.schema.ingestion_table" ), backfill_source=StreamBackfillSource( delta_table_name="catalog.schema.historical_data" ), deduplication_columns=["value.user_id"], )
Authorization on the Stream is governed by the ingestion destination table: SELECT grants read access, MANAGE grants delete access.
deduplication_columnsspecifies column paths used to identify duplicate rows during ingestion. Use dot notation for nested fields (e.g."value.user_id"). For Kafka streams, if not provided, the default dedup key is(key, value, stream_record_timestamp).When
backfill_sourceis provided, the backfill table must include a platform-specific timestamp column of typeTIMESTAMP. This is the canonical time column used by the ingestion table for both backfill and forward-fill data, and by the backfill scheduler for overlap-window detection. For Kafka streams, this column must be namedstream_record_timestamp. Other Kafka metadata columns (kafka_topic,kafka_partition,kafka_offset) are passed through if present on the backfill source, or set to NULL otherwise.description – Optional. A user-provided description of the stream.
- Returns
Stream instance containing the created stream.
- get_stream(*, name: str) Stream
Note
Experimental: This function may change or be removed in a future release without warning.
Retrieve a Stream by its full three-part name.
- Parameters
name – Required. The full three-part name of the stream (catalog.schema.stream).
- Returns
Stream instance.
- list_streams(*, catalog_name: str, schema_name: str, max_results: int = 50, include_schemas: bool = False) List[Stream]
Note
Experimental: This function may change or be removed in a future release without warning.
List Streams in a given catalog and schema.
- Parameters
catalog_name – Name of the parent catalog.
schema_name – Name of the parent schema relative to its parent catalog.
max_results – The maximum number of streams to return, default is 50. Specifying a high number may result in a long-running operation.
include_schemas – Whether to include schema information in the response, default is False. Schemas may be large and can be retrieved individually instead using
get_stream(). Specifying True may result in a long-running operation.
- Returns
List of Stream instances.