Databricks FeatureEngineeringClient

class databricks.feature_engineering.client.FeatureEngineeringClient(*, model_registry_uri: Optional[str] = None)

Bases: object

Client for interacting with the Databricks Feature Engineering in Unity Catalog.

Note

Use Databricks FeatureStoreClient for workspace feature tables in hive metastore

create_feature(*, source: DataSource, function: Union[AggregationFunction, ColumnSelection], catalog_name: str, schema_name: str, entity: List[str], timeseries_column: str, name: Optional[str] = None, description: Optional[str] = None) Feature

Note

Experimental: This function may change or be removed in a future release without warning.

Create a Feature instance with comprehensive validation.

Parameters
  • source – Required. The data source for this feature (DeltaTableSource, KafkaSource, DataFrameSource, or RequestSource)

  • function – Required. The function to apply: an AggregationFunction (carrying its inputs and time window) or a ColumnSelection. Input column names with periods are interpreted as struct field nesting (e.g., “value.amount” accesses the “amount” field in the “value” struct). Literal periods in column names are not supported.

  • catalog_name – Required. The catalog name for the feature

  • schema_name – Required. The schema name for the feature

  • entity – Required. List of entity column names that act as the lookup/aggregation keys. Required for all feature types.

  • timeseries_column – Required. The timeseries column name used for point-in-time joins, backfills, and aggregations.

  • name – Optional name for the feature

  • description – Optional description of the feature

Returns

A validated Feature instance

Raises

ValueError – If any validation fails

register_feature(*, feature: Feature, catalog_name: Optional[str] = None, schema_name: Optional[str] = None) Feature

Note

Experimental: This function may change or be removed in a future release without warning.

Register a Feature constructed via Feature(...) in Unity Catalog. Features with DataFrameSource cannot be registered.

Parameters
  • feature – A Feature instance constructed via Feature(...).

  • catalog_name – The Unity Catalog catalog name to register under. Optional if the Feature already has catalog_name set.

  • schema_name – The Unity Catalog schema name to register under. Optional if the Feature already has schema_name set.

Returns

The registered Feature with server-populated fields.

Raises

ValueError – If the feature source is DataFrameSource, or if catalog/schema names conflict between the Feature and the arguments.

get_feature(*, full_name: str) Feature

Retrieve a feature definition from Unity Catalog.

is_beta_feature_view(*, full_name: str) bool

Note

Experimental: This function may change or be removed in a future release without warning.

Check whether a Feature View is a beta feature.

Beta Feature Views (formerly “declarative features”) are represented as both a Feature object and a UC function of the same name. Public Preview features exist only as a Feature object.

param full_name

The fully qualified feature name in <catalog>.<schema>.<feature_name> format.

return

True if the feature is registered and a feature-typed UC function of the same name exists (beta), False otherwise (Public Preview).

raises NotFound

If the feature is not registered in Unity Catalog.

raises ValueError

If full_name is not a non-empty string.

list_beta_feature_views(*, catalog_name: Optional[str] = None, schema_name: Optional[str] = None, max_results: int = 100) List[Feature]

Note

Experimental: This function may change or be removed in a future release without warning.

List beta Feature Views across catalogs and schemas.

Scans UC functions and returns those that are the feature-as-function representation of a beta Feature View (formerly “declarative feature”), which should be migrated to the Public Preview feature representation. Candidates are classified directly from the listed function metadata, so no per-function lookup is performed to identify them.

The scan can be narrowed with the optional catalog_name and schema_name filters. When schema_name is supplied, catalog_name must also be provided.

param catalog_name

If provided, only scan this catalog. Otherwise all accessible catalogs in the metastore are scanned.

param schema_name

If provided, only scan this schema within the specified catalog. Requires catalog_name.

param max_results

Maximum number of beta Feature Views to return, default 100. The scan stops once this many are found, bounding the work done; specifying a high number may result in a long-running operation.

return

List of Feature instances for beta Feature Views, sorted by fully qualified name, at most max_results entries.

raises ValueError

If schema_name is given without catalog_name, if catalog_name or schema_name is an empty string, or if max_results is not a positive integer.

list_features(*, catalog_name: str, schema_name: str, max_results: int = 100) List[Feature]

List features in a given catalog and schema in Unity Catalog.

Parameters
  • catalog_name – Name of the parent catalog for features of interest.

  • schema_name – Name of the parent schema relative to its parent catalog.

  • max_results – The maximum number of features to return, default is 100. Specifying a high number may result in a long-running operation.

Returns

List of Feature instances

delete_feature(*, full_name: str) None

Note

Experimental: This function may change or be removed in a future release without warning.

Delete a feature from Unity Catalog.

Parameters

full_name – The full name of the feature in format ‘<catalog>.<schema>.<feature_name>’

list_materialized_features(*, feature_name: str, max_results: int = 100) List[MaterializedFeature]

List materialized features for a given feature in the user’s Unity Catalog metastore.

Parameters
  • feature_name – Full name of the feature whose materializations should be listed.

  • max_results – The maximum number of materialized features to return, default is 100. Specifying a high number may result in a long-running operation.

Returns

List of MaterializedFeature instances

delete_materialized_feature(*, materialized_feature: MaterializedFeature) None

Delete a materialized feature

Parameters

materialized_feature – The materialized feature to delete.

grant_feature_serving_access(*, grant_to: List[str], model_uri: Optional[str] = None, feature_spec: Optional[Union[str, FeatureSpecDeclarative]] = None, dry_run: bool = False) FeatureServingAccessReport

Note

Experimental: This function may change or be removed in a future release without warning.

Grant SELECT on the online tables a model or feature spec serves from.

Resolves each of the input’s features to its online (Lakebase) materialized table and grants SELECT on those tables (plus USE_CATALOG and USE_SCHEMA) to grant_to. It always returns a report mapping each table to the features it contains. The online table for a feature can hold other features materialized at the same time. Not all of these features may be used by the feature spec or model for serving. A report is generated to understand which additional feature columns you are granting access to.

Exactly one of model_uri or feature_spec must be provided.

param grant_to

Principals (for example, email addresses) to grant SELECT to.

param model_uri

URI of a logged model to determine the features and materialized tables needed, for example models:/<name>/<version> or models:/<name>@<alias>.

param feature_spec

The name of a registered feature spec (loaded from Unity Catalog), or a FeatureSpecDeclarative object.

param dry_run

When True, only report what would be granted without applying any grants. Use this to preview the access (including unused features) before granting.

return

A FeatureServingAccessReport.

feature_spec_from_batch_declarative_features(*, name: str, features: List[Feature], exclude_columns: Optional[List[str]] = None) FeatureSpecInfo

Note

Experimental: This function may change or be removed in a future release without warning.

Create a FeatureSpecInfo from a list of Feature objects.

Deprecated since version Use: create_feature_spec() with Feature objects directly instead. This method is kept for backward compatibility and will be removed in a future release.

Parameters
  • name – The name of the feature spec to create.

  • features – List of Feature objects to include in the feature spec.

  • exclude_columns – Optional list of columns to drop from the final output.

Returns

A FeatureSpecInfo for the created feature spec.

create_feature_serving_endpoint(*, name: Optional[str] = None, config: Optional[EndpointCoreConfig] = None, **kwargs) FeatureServingEndpoint

Creates a Feature Serving Endpoint

Parameters
  • name – The name of the endpoint. Must only contain alphanumerics and dashes.

  • config – Configuration of the endpoint, including features, workload_size, etc.

create_feature_spec(*, name: str, features: List[Union[FeatureLookup, FeatureFunction, Feature]], exclude_columns: Optional[List[str]] = None) FeatureSpecInfo

Creates a feature specification in Unity Catalog. The feature spec can be used for serving features & functions.

Parameters
  • name – The name of the feature spec.

  • features – List of FeatureLookups, FeatureFunctions, or Feature objects to include in the feature spec. Feature objects cannot be mixed with FeatureLookups or FeatureFunctions.

  • exclude_columns – List of columns to drop from the final output. Not supported with Feature objects.

update_feature_spec(*, name: str, owner: str) None

Update the owner of a feature spec.

Parameters
  • name – The name of the feature spec.

  • owner – The new owner of the feature spec.

delete_feature_spec(*, name: str) None

Delete a feature spec.

Parameters

name – The name of the feature spec.

create_table(*, name: str, primary_keys: Union[str, List[str]], df: Optional[DataFrame] = None, timeseries_column: Optional[str] = None, partition_columns: Optional[Union[str, List[str]]] = None, schema: Optional[StructType] = None, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None, **kwargs) FeatureTable

Create and return a feature table with the given name and primary keys.

The returned feature table has the given name and primary keys. Uses the provided schema or the inferred schema of the provided df. If df is provided, this data will be saved in a Delta table. Supported data types for features are: IntegerType, LongType, FloatType, DoubleType, StringType, BooleanType, DateType, TimestampType, ShortType, ArrayType, MapType, and BinaryType, DecimalType, and StructType.

Parameters
  • name – A feature table name. The format is <catalog_name>.<schema_name>.<table_name>, for example ml.dev.user_features.

  • primary_keys – The feature table’s primary keys. If multiple columns are required, specify a list of column names, for example ['customer_id', 'region'].

  • df – Data to insert into this feature table. The schema of df will be used as the feature table schema.

  • timeseries_column – Column containing the event time associated with feature value. Timeseries column should be part of the primary keys. Combined, the timeseries column and other primary keys of the feature table uniquely identify the feature value for an entity at a point in time.

  • partition_columns

    Columns used to partition the feature table. If a list is provided, column ordering in the list will be used for partitioning.

    Note

    When choosing partition columns for your feature table, use columns that do not have a high cardinality. An ideal strategy would be such that you expect data in each partition to be at least 1 GB. The most commonly used partition column is a date.

    Additional info: Choosing the right partition columns for Delta tables

  • schema – Feature table schema. Either schema or df must be provided.

  • description – Description of the feature table.

  • tags – Tags to associate with the feature table.

create_training_set(*, df: DataFrame, feature_lookups: Optional[List[Union[FeatureLookup, FeatureFunction]]] = None, feature_spec: Optional[str] = None, features: Optional[List[Feature]] = None, label: Optional[Union[str, List[str]]], exclude_columns: Optional[List[str]] = None, use_spark_native_join: bool = False, **kwargs) TrainingSet

Create a TrainingSet using feature_lookups, feature_spec, or features.

Parameters
  • df – The DataFrame used to join features into.

  • feature_lookups – List of features to use in the TrainingSet. FeatureLookups are joined into the DataFrame, and FeatureFunctions are computed on-demand. feature_lookups cannot be specified if feature_spec or features is provided.

  • feature_spec – Full name of the FeatureSpec in Unity Catalog. feature_spec cannot be specified if feature_lookups or features is provided.

  • features – List of Feature objects to use in the training set. features cannot be specified if feature_lookups or feature_spec is provided.

  • label – Names of column(s) in DataFrame that contain training set labels. To create a training set without a label field, i.e. for unsupervised training set, specify label = None.

  • exclude_columns – Names of the columns to drop from the TrainingSet DataFrame.

  • use_spark_native_join – Use spark to optimize table joining performance. The optimization is only applicable when Photon is enabled.

Returns

A TrainingSet object.

delete_feature_serving_endpoint(*, name=None, **kwargs) None
delete_feature_table_tag(*, name: str, key: str) None

Delete the tag associated with the feature table. Deleting a non-existent tag will emit a warning.

Parameters
  • name – the feature table name.

  • key – the tag key to delete.

drop_online_table(name: str, online_store: OnlineStoreSpec) None

Drop a table in an online store.

This API first attempts to make a call to the online store provider to drop the table. If successful, it then deletes the online store from the feature catalog.

Parameters
  • name – Name of feature table associated with online store table to drop.

  • online_store – Specification of the online store.

Note

Deleting an online published table can lead to unexpected failures in downstream dependencies. Ensure that the online table being dropped is no longer used for Model Serving feature lookup or any other use cases.

drop_table(*, name: str) None

Delete the specified feature table. This API also drops the underlying Delta table.

Parameters

name – A feature table name. The format is <catalog_name>.<schema_name>.<table_name>, for example ml.dev.user_features.

Note

Deleting a feature table can lead to unexpected failures in upstream producers and downstream consumers (models, endpoints, and scheduled jobs). You must delete any existing published online stores separately.

get_feature_serving_endpoint(*, name=None, **kwargs) FeatureServingEndpoint
get_table(*, name: str) FeatureTable

Get a feature table’s metadata.

Parameters

name – A feature table name. The format is <catalog_name>.<schema_name>.<table_name>, for example ml.dev.user_features.

log_model(*, model: Any, artifact_path: str, flavor: module, training_set: Optional[TrainingSet] = None, registered_model_name: Optional[str] = None, await_registration_for: int = 300, infer_input_example: bool = False, **kwargs)

Log an MLflow model packaged with feature lookup information.

Note

The DataFrame returned by TrainingSet.load_df() must be used to train the model. If it has been modified (for example data normalization, add a column, and similar), these modifications will not be applied at inference time, leading to training-serving skew.

Parameters
  • model – Model to be saved. This model must be capable of being saved by flavor.save_model. See the MLflow Model API.

  • artifact_path – Run-relative artifact path.

  • flavor – MLflow module to use to log the model. flavor should have type ModuleType. The module must have a method save_model, and must support the python_function flavor. For example, mlflow.sklearn, mlflow.xgboost, and similar.

  • training_set – The TrainingSet used to train this model.

  • registered_model_name – When provided, create a model version under registered_model_name, also creating a registered model if one with the given name does not exist.

  • await_registration_for – Number of seconds to wait for the model version to finish being created and is in READY status. By default, the function waits for five minutes. Specify 0 or None to skip waiting.

  • infer_input_example

    Note

    Experimental: This argument may change or be removed in a future release without warning.

    Automatically log an input example along with the model, using supplied training data. Defaults to False.

Kwargs

If other keyword arguments are provided, in most cases, they are passed to the underlying MLflow API flavor.save_model() when saving and registering the model.

Note

  • signature is not recommended and it’s preferred to use infer_input_example.

  • output_schema: When logging a model without labels in the training set, you must pass output_schema to log_model to suggest the output schema explicitly. For example:

    from mlflow.types import ColSpec, DataType, Schema
    
    output_schema = Schema([ColSpec(DataType.???)])  # Refer to mlflow signature types for the right choice of type here
    ...
    fe.log_model(
        ...
        output_schema=output_schema
    )
    
Returns

None

publish_table(*, online_store: Union[OnlineStoreSpec, DatabricksOnlineStore], source_table_name: Optional[str] = None, online_table_name: Optional[str] = None, publish_mode: str = 'TRIGGERED', filter_condition: Optional[str] = None, mode: str = 'merge', streaming: bool = False, checkpoint_location: Optional[str] = None, trigger: Dict[str, Any] = {'processingTime': '5 minutes'}, features: Optional[Union[str, List[str]]] = None, **kwargs) Optional[Union[StreamingQuery, PublishedTable]]

Publish a feature table to an online store.

Parameters
  • source_table_name – Name of the feature table. This is a required parameter.

  • online_table_name – Name of the online table. This is a required parameter when publishing to Databricks Online Store. The catalog name, schema name, and table name each are limited to a maximum of 63 bytes.

  • online_store – Specification of the online store. This is a required parameter.

  • publish_mode – supported modes are "SNAPSHOT", "CONTINUOUS", and "TRIGGERED". Default is "TRIGGERED".

Note

Change Data Feed (CDF) must be enabled for CONTINUOUS and TRIGGERED modes.

Legacy parameters: The following parameters are only used by third party online stores.

Parameters
  • filter_condition – A SQL expression using feature table columns that filters feature rows prior to publishing to the online store. For example, "dt > '2020-09-10'". This is analogous to running df.filter or a WHERE condition in SQL on a feature table prior to publishing.

  • mode

    Specifies the behavior when data already exists in this feature table. The only supported mode is "merge", with which the new data will be merged in, under these conditions:

    • If a key exists in the online table but not the offline table, the row in the online table is unmodified.

    • If a key exists in the offline table but not the online table, the offline table row is inserted into the online table.

    • If a key exists in both the offline and the online tables, the online table row will be updated.

  • streaming – If True, streams data to the online store.

  • checkpoint_location – Sets the Structured Streaming checkpointLocation option. By setting a checkpoint_location, Spark Structured Streaming will store progress information and intermediate state, enabling recovery after failures. This parameter is only supported when streaming=True.

  • trigger – If streaming=True, trigger defines the timing of stream data processing. The dictionary will be unpacked and passed to DataStreamWriter.trigger as arguments. For example, trigger={'once': True} will result in a call to DataStreamWriter.trigger(once=True).

  • features

    Specifies the feature column(s) to be published to the online store. The selected features must be a superset of existing online store features. Primary key columns and timestamp key columns will always be published.

    Note

    When features is not set, the whole feature table will be published.

Returns

If streaming=True, returns a PySpark StreamingQuery, None otherwise.

read_table(*, name: str, **kwargs) DataFrame

Read the contents of a feature table.

Parameters

name – A feature table name. The format is <catalog_name>.<schema_name>.<table_name>, for example ml.dev.user_features.

Returns

The feature table contents, or an exception will be raised if this feature table does not exist.

score_batch(*, model_uri: str, df: DataFrame, result_type: str = 'double', env_manager: str = 'local', params: Optional[dict[str, Any]] = None, use_spark_native_join: bool = False, **kwargs) DataFrame

Evaluate the model on the provided DataFrame.

Additional features required for model evaluation will be automatically retrieved from feature tables.

The model must have been logged with FeatureEngineeringClient.log_model(), which packages the model with feature metadata. Unless present in df, these features will be looked up from feature tables and joined with df prior to scoring the model.

If a feature is included in df, the provided feature values will be used rather than those stored in feature tables.

For example, if a model is trained on two features account_creation_date and num_lifetime_purchases, as in:

feature_lookups = [
    FeatureLookup(
        table_name = 'trust_and_safety.customer_features',
        feature_name = 'account_creation_date',
        lookup_key = 'customer_id',
    ),
    FeatureLookup(
        table_name = 'trust_and_safety.customer_features',
        feature_name = 'num_lifetime_purchases',
        lookup_key = 'customer_id'
    ),
]

with mlflow.start_run():
    training_set = fe.create_training_set(
        df,
        feature_lookups = feature_lookups,
        label = 'is_banned',
        exclude_columns = ['customer_id']
    )
    ...
      fe.log_model(
        model,
        "model",
        flavor=mlflow.sklearn,
        training_set=training_set,
        registered_model_name="example_model"
      )

Then at inference time, the caller of FeatureEngineeringClient.score_batch() must pass a DataFrame that includes customer_id, the lookup_key specified in the FeatureLookups of the TrainingSet. If the DataFrame contains a column account_creation_date, the values of this column will be used in lieu of those in feature tables. As in:

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
    'models:/example_model/1',
    batch_df
)
Parameters
  • model_uri

    The location, in URI format, of the MLflow model logged using FeatureEngineeringClient.log_model(). One of:

    • runs:/<mlflow_run_id>/run-relative/path/to/model

    • models:/<model_name>/<model_version>

    • models:/<model_name>/<stage>

    For more information about URI schemes, see Referencing Artifacts.

  • df

    The DataFrame to score the model on. Features from feature tables will be joined with df prior to scoring the model. df must:

    1. Contain columns for lookup keys required to join feature data from feature tables, as specified in the feature_spec.yaml artifact.

    2. Contain columns for all source keys required to score the model, as specified in the feature_spec.yaml artifact.

    3. Not contain a column prediction, which is reserved for the model’s predictions. df may contain additional columns.

    Streaming DataFrames are not supported.

  • result_type – The return type of the model. See mlflow.pyfunc.spark_udf() result_type.

  • env_manager – The environment manager to use in order to create the python environment for model inference. See mlflow.pyfunc.spark_udf() env_manager.

  • params – Additional parameters to pass to the model for inference.

  • use_spark_native_join

    Use spark to optimize table joining performance. The optimization is only applicable when Photon is enabled.

Returns

A DataFrame containing:

  1. All columns of df.

  2. All feature values retrieved from feature tables.

  3. A column prediction containing the output of the model.

set_feature_table_tag(*, name: str, key: str, value: str) None

Create or update a tag associated with the feature table. If the tag with the corresponding key already exists, its value will be overwritten with the new value.

Parameters
  • name – the feature table name

  • key – tag key

  • value – tag value

write_table(*, name: str, df: DataFrame, mode: str = 'merge', checkpoint_location: Optional[str] = None, trigger: Dict[str, Any] = {'processingTime': '5 seconds'}) Optional[StreamingQuery]

Writes to a feature table.

If the input DataFrame is streaming, will create a write stream.

Parameters
  • name – A feature table name. The format is <catalog_name>.<schema_name>.<table_name>, for example ml.dev.user_features.

  • df – Spark DataFrame with feature data. Raises an exception if the schema does not match that of the feature table.

  • mode

    There is only one supported write mode:

    • "merge" upserts the rows in df into the feature table. If df contains columns not present in the feature table, these columns will be added as new features.

    If you want to overwrite a table, run SQL DELETE FROM <table name>; to delete all rows, or drop and recreate the table before calling this method.

  • checkpoint_location – Sets the Structured Streaming checkpointLocation option. By setting a checkpoint_location, Spark Structured Streaming will store progress information and intermediate state, enabling recovery after failures. This parameter is only supported when the argument df is a streaming DataFrame.

  • trigger – If df.isStreaming, trigger defines the timing of stream data processing, the dictionary will be unpacked and passed to DataStreamWriter.trigger as arguments. For example, trigger={'once': True} will result in a call to DataStreamWriter.trigger(once=True).

Returns

If df.isStreaming, returns a PySpark StreamingQuery. None otherwise.

compute_features(*, features: List[Feature]) DataFrame

Compute feature values for every row in the source table.

Evaluates each feature at every row’s own timestamp using the point-in-time window engine, producing the historical values that materialization would generate. Useful for prototyping and validating feature definitions before creating a pipeline.

Each feature is scored using only data strictly before that row’s timestamp, respecting the feature’s time window (rolling, sliding, or tumbling), entity columns, and any source-level filter condition.

The returned DataFrame contains the feature’s entity columns, timeseries column, and one column per computed feature. Other source columns are not included, and exact duplicate source rows collapse to a single output row.

All features must read from the same source table. Features may use different entity columns, timeseries columns, filter conditions, and time windows — these are handled independently and joined on the output.

Parameters

features – Features to compute. All must share the same source table.

Returns

A DataFrame with entity columns, the timeseries column, and one column per feature.

aggregate_features(*, features: FeatureAggregations) DataFrame

Computes the specified aggregations and returns a DataFrame containing the results.

Parameters

features – The aggregation specification to compute.

create_materialized_view(*, features: FeatureAggregations, output_table: str, schedule: Optional[CronSchedule]) MaterializedViewInfo

Creates and runs a pipeline that materializes the given feature aggregation specification into a materialized view.

Parameters
  • features – The aggregation specification to materialize.

  • output_table – The name of the output materialized view.

  • schedule – The schedule at which to run the materialization pipeline. If not provided, the pipeline can only be run manually.

create_pipeline(*, features: List[Feature], offline_store_config: OfflineStoreConfig, schedule: Optional[CronSchedule] = None, start_time: Optional[datetime] = None) List[MaterializedViewInfo]

Note

Experimental: This function may change or be removed in a future release without warning.

Creates and runs a pipeline that materializes the given features into tables.

Deprecated since version This: method is deprecated and not supported as part of Feature Views PuPr.

Features are grouped by (source, granularity) and each unique combination creates a separate materialized view.

Restrictions:

  • Supports SlidingWindow and TumblingWindow features

  • Features within same (source, granularity) group must have same granularity (slide_duration for SlidingWindow, window_duration for TumblingWindow)

  • All features must use a single column for aggregation

  • RollingWindow not supported, at this time

Parameters
  • features – List of features to materialize

  • offline_store_config – The offline store configuration containing catalog, schema, and table prefix

  • schedule – The schedule at which to run the pipeline (applied to all created views)

  • start_time – The earliest time to generate features from. If not provided, will use the minimum timestamp from the source table’s timeseries column

Returns

List of MaterializedViewInfo, one per unique (source, granularity) combination

build_model_env(model_uri: str, save_path: str) str

Prebuild the model Python environment required by the given model and generate an archive file saved to the specified save_path. The resulting environment can then be used in FeatureEngineeringClient.score_batch() as the prebuilt_env_uri parameter.

Parameters
  • model_uri – URI of the model used to build the Python environment.

  • save_path – Directory path to save the prebuilt model environment archive file. This can be a local directory path, a mounted DBFS path (e.g., ‘/dbfs/…’), or a mounted UC volume path (e.g., ‘/Volumes/…’).

Returns

The path of the archive file containing the Python environment data.

create_online_store(*, name: str, capacity: str, read_replica_count: Optional[int] = None, usage_policy_id: Optional[str] = None) DatabricksOnlineStore

Create an Online Feature Store.

Parameters
  • name – The name of the online store. Maximum of 63 bytes.

  • capacity – The capacity of the online store. Valid values are “CU_1”, “CU_2”, “CU_4”, “CU_8”.

  • read_replica_count – Optional. The number of read replicas for the online store.

  • usage_policy_id – Optional. ID of the usage policy to associate with the online store used for cost tracking.

Returns

The created online store.

get_online_store(*, name: str) Optional[DatabricksOnlineStore]

Get an Online Feature Store.

Parameters

name – The name of the online store.

Returns

The retrieved online store, or None if not found.

update_online_store(*, name: str, capacity: ~typing.Union[str, ~databricks.feature_engineering.client._UnsetType] = <UNSET>, read_replica_count: ~typing.Union[int, ~databricks.feature_engineering.client._UnsetType] = <UNSET>, usage_policy_id: ~typing.Union[str, ~databricks.feature_engineering.client._UnsetType] = <UNSET>) DatabricksOnlineStore

Update an Online Feature Store. Only the fields specified will be updated. Fields that are not specified will remain unchanged.

Parameters
  • name – The name of the online store.

  • capacity – Optional. The capacity of the online store. Valid values are “CU_1”, “CU_2”, “CU_4”, “CU_8”.

  • read_replica_count – Optional. The number of read replicas for the online store.

  • usage_policy_id – Optional. ID of the usage policy to associate with the online store used for cost tracking.

Returns

The updated online store.

delete_online_store(*, name: str) None

Delete an Online Feature Store.

Parameters

name – The name of the online store.

Returns

None.

list_online_stores() List[DatabricksOnlineStore]

List available Online Feature Stores. These are online stores that are associated with a feature store. Note that online stores not associated with a feature store will be excluded from the output.

Returns

A list of DatabricksOnlineStore objects.

materialize_features(*, features: List[Feature], offline_config: Optional[OfflineStoreConfig] = None, online_config: Optional[OnlineStoreConfig] = None, trigger: Optional[Union[CronSchedule, TableTrigger, StreamingMode]] = None) Optional[List[MaterializedFeature]]

Direct Databricks to materialize a list of feature definitions stored in Unity Catalog to an offline (Delta Live Table) store and/or an online store for model serving. Note that the features must already exist in Unity Catalog as functions, which can be created using FeatureEngineeringClient.create_feature().

Features are handled differently based on their source and function type. offline_config must not be provided if the call contains no batch aggregation features.

  • Batch aggregation features (Delta table source with aggregation function): Can be materialized to offline or online stores. Requires offline_config. The online_config is optional and if provided, there will be an offline and online materialized feature returned for each batch feature provided.

  • Batch column selection features (Delta table source with ColumnSelection function): Can only be materialized to online stores. Requires online_config. Pass offline_config only with batch aggregation features; otherwise the call is rejected.

  • Streaming features (Kafka source): Can only be materialized to online stores. Cannot be mixed with batch features in the same call because batch and streaming pipelines require different trigger types. Issue separate calls. offline_config is not supported for streaming features.

Parameters
  • features – The list of features to materialize.

  • offline_config – Offline store configuration. Required when the call includes batch aggregation features; must be omitted otherwise. Not supported for streaming features.

  • online_config – The online store configuration. Required for column selection and streaming features, optional for batch aggregation features.

  • trigger – Required. The trigger for the materialization pipeline. Must be a Trigger subclass (e.g. CronSchedule, TableTrigger, or StreamingMode). The trigger carries the scheduling configuration. TableTrigger is required for ColumnSelection features backed by a DeltaTableSource and only supported for that combination; StreamingMode selects the Structured Streaming trigger mode and is only supported for streaming (KafkaSource) features; CronSchedule is used for every other supported feature shape. Mixing trigger types across feature shapes in a single call is not supported – issue separate materialize_features calls instead.

Returns

The list of materialized features.

Example:

When the feature set contains both ColumnSelection features (which need TableTrigger) and aggregation features (which need CronSchedule), issue two separate calls:

fe.materialize_features(
    features=column_selection_features,
    online_config=online_cfg,
    trigger=TableTrigger(),
)
fe.materialize_features(
    features=aggregation_features,
    offline_config=offline_cfg,
    online_config=online_cfg,
    trigger=CronSchedule(
        quartz_cron_expression="0 0 * * * ?",
        timezone_id="UTC",
    ),
)
create_kafka_config(*, name: str, bootstrap_servers: str, subscription_mode: SubscriptionMode, auth_config: AuthConfig, key_schema: Optional[SchemaConfig] = None, value_schema: Optional[SchemaConfig] = None, extra_options: Optional[Dict[str, str]] = None, backfill_source: Optional[Union[BackfillSource, str]] = None) KafkaConfig

Note

Experimental: This function may change or be removed in a future release without warning.

Create a Kafka configuration for streaming feature ingestion.

Deprecated since version Use: create_stream() instead. Kafka configs will be removed in a future release.

Parameters
  • name – Required. A unique name for the Kafka configuration within the metastore. Can be distinct from the topic name.

  • bootstrap_servers – Required. A comma-separated list of host/port pairs pointing to the Kafka cluster (e.g. “host1:port1,host2:port2”). See https://spark.apache.org/docs/latest/streaming/structured-streaming-kafka-integration.html for more details on this parameter.

  • subscription_mode – Required. A SubscriptionMode instance specifying how to select Kafka topics to consume from. Supports three modes: subscribe (comma-separated topic list), subscribe_pattern (regex pattern), or assign (JSON with specific topic-partitions). See https://spark.apache.org/docs/latest/streaming/structured-streaming-kafka-integration.html for more details on this parameter.

  • auth_config

    Required. An AuthConfig instance containing authentication configuration for connecting to Kafka topics. Supports two auth modes:

  • key_schema – Optional. A SchemaConfig instance defining the schema for extracting message keys from Kafka topics. Supports IEFT JSON Schema format (https://json-schema.org/). At least one of key_schema or value_schema must be provided.

  • value_schema – Optional. A SchemaConfig instance defining the schema for extracting message values from Kafka topics. Supports IEFT JSON Schema format (https://json-schema.org/). At least one of key_schema or value_schema must be provided.

  • extra_options – Optional. A dictionary of additional Kafka consumer configuration options. Keys should be source options or Kafka consumer options (e.g. {“kafka.security.protocol”: “SASL_SSL”}). See https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html for more details on optional configurations.

  • backfill_source – Optional. A user-provided source for backfilling historical data. The schema for this source must match exactly that of the key and value schemas specified for this Kafka config. Can be passed as a three-part Delta table name string (e.g. “catalog.schema.table”) or as a DeltaTableSource instance (deprecated). Note: backfill_source is used when creating a training set for Kafka backed features. At a future milestone, this data will be used for storing an offline copy of Kafka data and backfilling materialized features.

Returns

KafkaConfig instance containing the created configuration.

get_kafka_config(*, name: str) KafkaConfig

Note

Experimental: This function may change or be removed in a future release without warning.

Retrieve a Kafka configuration by name within the caller’s metastore.

Deprecated since version Use: get_stream() instead. Kafka configs will be removed in a future release.

Parameters

name – Required. The name of the Kafka configuration to retrieve.

Returns

KafkaConfig instance containing the configuration.

list_kafka_configs(*, max_results: int = 50, include_schemas: bool = False) List[KafkaConfig]

Note

Experimental: This function may change or be removed in a future release without warning.

List Kafka configurations in the current metastore.

Deprecated since version Use: list_streams() instead. Kafka configs will be removed in a future release.

Parameters
  • max_results – The maximum number of configs to return, default is 50. Specifying a high number may result in a long-running operation.

  • include_schemas – Whether to include schema information in the response, default is False. Schemas may be large and can be retrieved individually instead using FeatureEngineeringClient.get_kafka_config(). Specifying True may result in a long-running operation.

Returns

List of KafkaConfig instances.

delete_kafka_config(*, name: str) None

Note

Experimental: This function may change or be removed in a future release without warning.

Delete a Kafka configuration.

Deprecated since version Use: delete_stream() instead. Kafka configs will be removed in a future release.

Parameters

name – Required. The name of the Kafka configuration to delete.

Note

The caller must be the creator of the Kafka configuration in order to delete it.

create_stream(*, name: str, source_config: Union[KafkaStreamConfig, StreamSourceConfig], connection_config: Union[DirectMtlsConfig, StreamConnectionConfig], schema_config: Union[DirectSchemas, StreamSchemaConfig], ingestion_config: IngestionConfig, description: Optional[str] = None) Stream

Note

Experimental: This function may change or be removed in a future release without warning.

Create a Stream, a governed Unity Catalog entity representing an external streaming data source.

Authorization on the Stream is governed by the ingestion table: SELECT on the ingestion table grants read access to the Stream, and MANAGE on the ingestion table grants delete access.

Parameters
  • name – Required. The full three-part name of the stream (catalog.schema.stream).

  • source_config

    Required. Specifies the type of streaming source (e.g. Kafka). Accepts either:

    • A platform-specific config instance (e.g. KafkaStreamConfig) which is automatically wrapped into a StreamSourceConfig.

    • A StreamSourceConfig instance directly.

    For Kafka streams:

    KafkaStreamConfig(
        subscription_mode=KafkaSubscriptionMode(subscribe="my-topic"),
        extra_options={"maxOffsetsPerTrigger": "1000"},
    )
    

  • connection_config

    Required. Specifies how to connect and authenticate to the stream platform. Accepts either:

    • A StreamConnectionConfig instance, e.g. StreamConnectionConfig(uc_connection_name="my-kafka-connection") for UC connections.

    • A DirectMtlsConfig instance for direct mTLS authentication (automatically wrapped). In a future milestone, this will be deprecated when Kafka UC Connections support mTLS.

  • schema_config

    Required. Schema definitions for the stream. Accepts either:

    • A DirectSchemas instance (automatically wrapped into a StreamSchemaConfig), e.g.:

      DirectSchemas(
          payload_schema=SchemaConfig(json_schema='{"type": "object", ...}'),
          key_schema=SchemaConfig(json_schema='{"type": "object", ...}'),
      )
      
    • A StreamSchemaConfig instance directly. In a future milestone, this will also support schema registries through a UC Connection.

    At least one of payload_schema or key_schema must be provided in the DirectSchemas.

  • ingestion_config

    Required. Configuration for the Databricks-managed ingestion pipeline. Groups the ingestion destination table, optional backfill source, and deduplication settings:

    from databricks.feature_engineering.entities import StreamBackfillSource
    
    IngestionConfig(
        ingestion_destination=IngestionDestination(
            delta_table_name="catalog.schema.ingestion_table"
        ),
        backfill_source=StreamBackfillSource(
            delta_table_name="catalog.schema.historical_data"
        ),
        deduplication_columns=["value.user_id"],
    )
    

    Authorization on the Stream is governed by the ingestion destination table: SELECT grants read access, MANAGE grants delete access.

    deduplication_columns specifies column paths used to identify duplicate rows during ingestion. Use dot notation for nested fields (e.g. "value.user_id"). For Kafka streams, if not provided, the default dedup key is (key, value, stream_record_timestamp).

    When backfill_source is provided, the backfill table must include a platform-specific timestamp column of type TIMESTAMP. This is the canonical time column used by the ingestion table for both backfill and forward-fill data, and by the backfill scheduler for overlap-window detection. For Kafka streams, this column must be named stream_record_timestamp. Other Kafka metadata columns (kafka_topic, kafka_partition, kafka_offset) are passed through if present on the backfill source, or set to NULL otherwise.

  • description – Optional. A user-provided description of the stream.

Returns

Stream instance containing the created stream.

get_stream(*, name: str) Stream

Note

Experimental: This function may change or be removed in a future release without warning.

Retrieve a Stream by its full three-part name.

Parameters

name – Required. The full three-part name of the stream (catalog.schema.stream).

Returns

Stream instance.

list_streams(*, catalog_name: str, schema_name: str, max_results: int = 50, include_schemas: bool = False) List[Stream]

Note

Experimental: This function may change or be removed in a future release without warning.

List Streams in a given catalog and schema.

Parameters
  • catalog_name – Name of the parent catalog.

  • schema_name – Name of the parent schema relative to its parent catalog.

  • max_results – The maximum number of streams to return, default is 50. Specifying a high number may result in a long-running operation.

  • include_schemas – Whether to include schema information in the response, default is False. Schemas may be large and can be retrieved individually instead using get_stream(). Specifying True may result in a long-running operation.

Returns

List of Stream instances.

delete_stream(*, name: str) None

Note

Experimental: This function may change or be removed in a future release without warning.

Delete a Stream by its full three-part name.

Parameters

name – Required. The full three-part name of the stream (catalog.schema.stream).