Feature

class databricks.ml_features.entities.feature.Feature(*, source: DataSource, inputs: List[str], function: Function, time_window: TimeWindow, catalog_name: str, schema_name: str, name: Optional[str] = None, description: Optional[str] = None, filter_condition: Optional[str] = None)

Bases: _FeatureStoreObject

Represents a feature definition that combines a data source with aggregation logic.

Parameters
  • catalog_name – The catalog name for the feature (required)

  • schema_name – The schema name for the feature (required)

  • name – The name of the feature. Leading and trailing whitespace will be stripped. If not provided or empty after stripping, a name will be auto-generated based on the input columns, function, and time window.

  • source – The data source for this feature

  • inputs – List of column names from the source to use as input

  • function – The aggregation function to apply to the input columns

  • time_window – The time window for the aggregation

  • description – Optional description of the feature

  • filter_condition – Optional SQL filter condition to apply on the source data before aggregation

__init__(*, source: DataSource, inputs: List[str], function: Function, time_window: TimeWindow, catalog_name: str, schema_name: str, name: Optional[str] = None, description: Optional[str] = None, filter_condition: Optional[str] = None)

Initialize a Feature object. See class documentation. Should not be invoked directly, use FeatureEngineeringClient.create_feature instead. create_feature ensures the Feature is registered in Unity Catalog and properly validated.

property name: str

The leaf name of the feature.

property full_name: str

The fully qualified Unity Catalog name of the feature.

property catalog_name: str

The catalog name of the feature.

property schema_name: str

The schema name of the feature.

property source: DataSource

The data source for this feature.

property inputs: List[str]

List of column names from the source to use as input.

property function: Function

The aggregation function to apply to the input columns.

property time_window: TimeWindow

The time window for the aggregation.

property description: Optional[str]

Optional description of the feature.

property filter_condition: Optional[str]

Optional SQL filter condition to apply on the source data before aggregation.

Feature Aggregations

class databricks.ml_features.entities.feature_aggregations.FeatureAggregations(*, source_table: str, lookup_key: Union[str, List[str]], timestamp_key: str, granularity: timedelta, start_time: datetime, end_time: Optional[datetime] = None, aggregations: List[Aggregation])

Bases: _FeatureStoreObject

Note

Aliases: databricks.feature_engineering.entities.feature_lookup.FeatureLookup, databricks.feature_store.entities.feature_lookup.FeatureLookup

Defines an aggregation specification.

Parameters
  • source_table – The source table to perform aggregation on. The source table can be any delta table.

  • lookup_key – Key to use when computing aggregation. It can be a single key or a list of keys.

  • timestamp_key – Key for timestamp. Used for determining the temporal position of data points.

  • granularity – The temporal granularity at which to generate aggregated features. For example, a granularity of 1 day means the aggregation materialized view will contain one row per primary key and per day since start_time until now.

  • start_time – The earliest time to generate aggregated features from. For example, a start_time of 2020-01-01 means the aggregation materialized view will not contain any rows before this time. This will be the start of the first granularity interval.

  • end_time – The latest time to generate aggregated features to. If None, it means the time of materialization pipeline run; if a datetime object, it means to use it as the end time.

  • aggregations – A list of aggregations to perform. Each aggregation defines an output column.

__init__(*, source_table: str, lookup_key: Union[str, List[str]], timestamp_key: str, granularity: timedelta, start_time: datetime, end_time: Optional[datetime] = None, aggregations: List[Aggregation])

Initialize a FeatureAggregations object. See class documentation.

property source_table: str

Returns the source table used for aggregation.

property lookup_key: Union[str, List[str]]

Returns the lookup key(s) used for aggregation.

property timestamp_key: str

Returns the timestamp key used for aggregation.

property granularity: timedelta

Returns the granularity at which features are aggregated.

property start_time: datetime

Returns the start time from which to generate aggregated features.

property end_time: Optional[datetime]

Returns the end time up to which to generate aggregated features.

property aggregations: List[Aggregation]

Returns the list of aggregations to perform.

copy(**kwargs)

Create a copy of the current object with the specified attributes updated.

Parameters

kwargs – The attributes to update.

Returns

A new FeatureAggregations object with the specified attributes updated.

class databricks.ml_features.entities.aggregation.Aggregation(*, function: Union[str, Function], time_window: Optional[TimeWindow] = None, column: Optional[str] = None, output_column: Optional[str] = None, filter_condition: Optional[str] = None, **kwargs)

Bases: _FeatureStoreObject

Defines a single aggregated feature.

Parameters
  • column – The source column to aggregate. The column must exist in the parent FeatureAggregation source_table.

  • output_column – The output column name. If not provided, a default name will be generated.

  • function – The function to use. If a string is given, it will be interpreted as short-hand (e.g., “sum”, “avg”, “count”).

  • time_window – The time window to aggregate data with.

  • filter_condition – Optional SQL WHERE clause to filter source data before aggregation.

__init__(*, function: Union[str, Function], time_window: Optional[TimeWindow] = None, column: Optional[str] = None, output_column: Optional[str] = None, filter_condition: Optional[str] = None, **kwargs)

Initialize an Aggregation object. See class documentation.

property column: str | None

The source column to aggregate.

property output_column: str

The output column name.

property function: Function

The aggregation function to use.

property time_window: TimeWindow

The time window to aggregate data with.

property window: TimeWindow

The time window to aggregate data with.

property filter_condition: Optional[str]

Optional SQL filter condition to apply on the source data before aggregation.

databricks.ml_features.entities.aggregation.Window

alias of TimeWindow

Aggregation Functions

class databricks.ml_features.entities.function.Function

Bases: _FeatureStoreObject

Abstract base class for all aggregation functions.

__init__()
abstract property name: str

Return the name of the aggregation function.

extra_parameters() Dict[str, Any]

Return the extra parameters of the function. Only applicable to a few functions that require additional parameters.

classmethod from_string(function_str: str) Function

Create a Function instance from a string representation.

Parameters

function_str – String name of the aggregation function

Returns

Function instance

Raises

ValueError – If the function string is not recognized

class databricks.ml_features.entities.function.Avg

Bases: Function

Class representing the average (avg) aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.Count

Bases: Function

Class representing the count aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.ApproxCountDistinct(relativeSD: Optional[float] = None)

Bases: Function

Class representing the approximate count distinct aggregation function. See https://docs.databricks.com/en/sql/language-manual/functions/approx_count_distinct.html

Parameters

relativeSD – The relative standard deviation allowed in the approximation.

__init__(relativeSD: Optional[float] = None)
property name: str

Return the name of the aggregation function.

extra_parameters() Dict[str, Any]

Return the extra parameters of the function. Only applicable to a few functions that require additional parameters.

class databricks.ml_features.entities.function.ApproxPercentile(percentile: float, accuracy: Optional[int] = None)

Bases: Function

Class representing the percentile approximation aggregation function. See https://docs.databricks.com/en/sql/language-manual/functions/approx_percentile.html

Parameters
  • percentile – The percentile to approximate.

  • accuracy – The accuracy of the approximation.

__init__(percentile: float, accuracy: Optional[int] = None)
property name: str

Return the name of the aggregation function.

extra_parameters() Dict[str, Any]

Return the extra parameters of the function. Only applicable to a few functions that require additional parameters.

class databricks.ml_features.entities.function.First

Bases: Function

Class representing the first aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.Last

Bases: Function

Class representing the last aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.Max

Bases: Function

Class representing the maximum (max) aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.Min

Bases: Function

Class representing the minimum (min) aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.StddevPop

Bases: Function

Class representing the population standard deviation (stddev_pop) aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.StddevSamp

Bases: Function

Class representing the sample standard deviation (stddev_samp) aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.Sum

Bases: Function

Class representing the sum aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.VarPop

Bases: Function

Class representing the population variance (var_pop) aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.VarSamp

Bases: Function

Class representing the sample variance (var_samp) aggregation function.

__init__()
property name: str

Return the name of the aggregation function.

Data Sources

class databricks.ml_features.entities.data_source.DataSourceTypes(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Enumeration of supported data source types.

DELTA = 'delta'
UNIFORM = 'uniform'
KAFKA = 'kafka'
VOLUME = 'volume'
DATAFRAME = 'dataframe'
class databricks.ml_features.entities.data_source.DataSource(*, source_type: DataSourceTypes, entity_columns: List[str], timeseries_column: str)

Bases: _FeatureStoreObject, ABC

Abstract base class for data sources used in feature computation.

Parameters
  • source_type – The type of data source

  • entity_columns – List of column names that serve as primary keys

  • timeseries_column – Column name that contains timestamp data

__init__(*, source_type: DataSourceTypes, entity_columns: List[str], timeseries_column: str)

Initialize a DataSource object. See class documentation.

property source_type: DataSourceTypes

The type of data source.

property entity_columns: List[str]

List of column names that serve as primary keys.

property timeseries_column: str

Column name that contains timestamp data.

property order_column: str

The column name for the order column.

abstract full_name() str

Return the full name/identifier for this data source.

abstract load_df(spark_client)

Load the data source as a Spark DataFrame. Must be implemented by all derived classes.

Args:

spark_client: The Spark client for DataFrame operations

class databricks.ml_features.entities.data_source.DeltaTableSource(*, catalog_name: str, schema_name: str, table_name: str, entity_columns: List[str], timeseries_column: str)

Bases: BackfillSource

Data source implementation for Delta Lake tables.

Parameters
  • catalog_name – The name of the Unity Catalog catalog

  • schema_name – The name of the schema within the catalog

  • table_name – The name of the table within the schema

  • entity_columns – List of column names that serve as primary keys

  • timeseries_column – Column name that contains timestamp data

__init__(*, catalog_name: str, schema_name: str, table_name: str, entity_columns: List[str], timeseries_column: str)

Initialize a DeltaTableSource object. See class documentation.

property catalog_name: str

The name of the Unity Catalog catalog.

property schema_name: str

The name of the schema within the catalog.

property table_name: str

The name of the table within the schema.

full_name() str

Return the full table name in catalog.schema.table format.

load_df(spark_client)

Load the Delta table as a Spark DataFrame.

class databricks.ml_features.entities.data_source.VolumeSource(*, entity_columns: List[str], timeseries_column: str)

Bases: DataSource

Data source implementation for Unity Catalog Volumes.

TODO: Implementation to be defined based on volume requirements.

__init__(*, entity_columns: List[str], timeseries_column: str)

Initialize a VolumeSource object. See class documentation.

full_name() str

Return the full volume path identifier.

load_df(spark_client)

Load the volume data as a Spark DataFrame.

class databricks.ml_features.entities.data_source.KafkaSource(*, name: str, entity_columns: List[str], timeseries_column: str)

Bases: DataSource

Data source implementation for Kafka streams.

KafkaSource references a KafkaConfig by name. The KafkaConfig contains connection details, authentication, and schemas for the Kafka topics. Column names must be prefixed with ‘key:’ or ‘value:’ to indicate which schema to use. Examples: ‘key:customer_id’ or ‘value:trip_details.pickup_zip’ for nested JSON fields.

Parameters
  • name – Name of the KafkaConfig to use (uniquely identifies the KafkaConfig in the metastore)

  • entity_columns – List of column names with schema prefix (e.g., [‘key:customer_id’, ‘value:trip_details.pickup_zip’])

  • timeseries_column – Column name with schema prefix (e.g., ‘value:event_timestamp’)

__init__(*, name: str, entity_columns: List[str], timeseries_column: str)

Initialize a KafkaSource object. See class documentation.

class SchemaType

Bases: object

Constants for Kafka schema types.

property name: str

The name of the KafkaConfig this source references.

full_name() str

Return the Kafka config name as the full identifier.

load_df(spark_client)

Return an empty DataFrame for Kafka sources with schema from KafkaConfig.

Note: Features with Kafka sources are not supported in the training workflow. Calling load_df() or score_batch() on training sets or models with Kafka-source features will raise NotImplementedError.

This method returns an empty DataFrame to allow training set creation for model logging purposes only. The feature metadata will be captured and associated with the model, enabling feature lookups at inference time from the online store.

Args:

spark_client: The Spark client for DataFrame operations

static get_columns_from_kafka_config(kafka_config) Dict[str, DataType]

Extract all columns and their Spark types from a KafkaConfig.

This method parses the key_schema and value_schema from the KafkaConfig and returns a dictionary mapping column names (with key:/value: prefixes) to their Spark DataTypes.

Args:

kafka_config: The KafkaConfig object containing schema information

Returns:

Dictionary mapping prefixed column names to Spark DataTypes

static validate_columns_exist_in_schemas(columns: List[str], kafka_config) None

Validate that columns exist in the Kafka config schemas. All columns must be prefixed with ‘key:’ or ‘value:’ and the column must exist in the corresponding schema.

class databricks.ml_features.entities.data_source.DataFrameSource(*, dataframe, entity_columns: List[str], timeseries_column: str, source_name: Optional[str] = None)

Bases: DataSource

Data source implementation for Spark DataFrames.

This allows using an existing Spark DataFrame directly as a data source for feature computation, useful for in-memory data processing and testing.

Parameters
  • dataframe – The Spark DataFrame to use as the data source

  • entity_columns – List of column names that serve as primary keys

  • timeseries_column – Column name that contains timestamp data

  • source_name – Optional name for the DataFrame source (for identification)

__init__(*, dataframe, entity_columns: List[str], timeseries_column: str, source_name: Optional[str] = None)

Initialize a DataFrameSource object. See class documentation.

property dataframe

The Spark DataFrame being used as the data source.

property source_name: str

The name identifier for this DataFrame source.

full_name() str

Return the source name identifier for this DataFrame.

load_df(spark_client)

Return the existing Spark DataFrame.