Feature

class databricks.ml_features.entities.feature.Feature(*, source: DataSource, function: Union[AggregationFunction, ColumnSelection], entity: List[str], timeseries_column: str, name: Optional[str] = None, description: Optional[str] = None)

Bases: _FeatureStoreObject

Represents a feature definition that combines a data source with aggregation logic.

Parameters
  • source – The data source for this feature

  • function – The function to apply. An AggregationFunction (carrying its inputs and time window) or a ColumnSelection.

  • entity – Required. List of entity column names that act as the lookup/aggregation keys.

  • timeseries_column – Required. The timeseries column name used for point-in-time joins, backfills, and aggregations.

  • name – The name of the feature. Leading and trailing whitespace will be stripped. If not provided or empty after stripping, a name will be auto-generated from the input column, function, and time window.

  • description – Optional description of the feature

The catalog and schema are not set on the constructor; they are assigned by FeatureEngineeringClient.create_feature / register_feature.

__init__(*, source: DataSource, function: Union[AggregationFunction, ColumnSelection], entity: List[str], timeseries_column: str, name: Optional[str] = None, description: Optional[str] = None)

Initialize a Feature object. See class documentation. Should not be invoked directly, use FeatureEngineeringClient.create_feature instead. create_feature ensures the Feature is registered in Unity Catalog and properly validated.

property name: str

The leaf name of the feature.

property full_name: str

The fully qualified Unity Catalog name of the feature.

property has_full_name: bool

Whether this feature has catalog_name and schema_name set.

property is_registered: bool

Whether this feature is persisted to Unity Catalog.

clone(*, new_name: Optional[str] = None) Feature

Create an unregistered deep copy of this feature.

Returns a new Feature with its registration cleared so it can be re-registered in a different catalog/schema via create_feature.

Parameters

new_name – Optional new leaf name for the cloned feature. If not provided, the original name is kept.

Returns

A new, unregistered Feature instance.

property catalog_name: Optional[str]

The catalog name of the feature.

property schema_name: Optional[str]

The schema name of the feature.

property source: DataSource

The data source for this feature.

property entity: Optional[List[str]]

List of entity column names.

property timeseries_column: Optional[str]

The timeseries column name.

property order_column: Optional[str]

The order column name derived from timeseries_column.

property inputs: Optional[List[str]]

Input columns for this feature, taken from its AggregationFunction.

Returns None for ColumnSelection features, which select a single column.

property function: Union[AggregationFunction, ColumnSelection]

The function to apply: an AggregationFunction or a ColumnSelection.

property time_window: Optional[TimeWindow]

The time window, taken from the AggregationFunction (None for ColumnSelection).

property description: Optional[str]

Optional description of the feature.

property filter_condition: Optional[str]

The SQL filter condition applied to the source before aggregation, if any.

Feature Aggregations

class databricks.ml_features.entities.feature_aggregations.FeatureAggregations(*, source_table: str, lookup_key: Union[str, List[str]], timestamp_key: str, granularity: timedelta, start_time: datetime, end_time: Optional[datetime] = None, aggregations: List[Aggregation])

Bases: _FeatureStoreObject

Note

Aliases: databricks.feature_engineering.entities.feature_lookup.FeatureLookup, databricks.feature_store.entities.feature_lookup.FeatureLookup

Defines an aggregation specification.

Parameters
  • source_table – The source table to perform aggregation on. The source table can be any delta table.

  • lookup_key – Key to use when computing aggregation. It can be a single key or a list of keys.

  • timestamp_key – Key for timestamp. Used for determining the temporal position of data points.

  • granularity – The temporal granularity at which to generate aggregated features. For example, a granularity of 1 day means the aggregation materialized view will contain one row per primary key and per day since start_time until now.

  • start_time – The earliest time to generate aggregated features from. For example, a start_time of 2020-01-01 means the aggregation materialized view will not contain any rows before this time. This will be the start of the first granularity interval.

  • end_time – The latest time to generate aggregated features to. If None, it means the time of materialization pipeline run; if a datetime object, it means to use it as the end time.

  • aggregations – A list of aggregations to perform. Each aggregation defines an output column.

__init__(*, source_table: str, lookup_key: Union[str, List[str]], timestamp_key: str, granularity: timedelta, start_time: datetime, end_time: Optional[datetime] = None, aggregations: List[Aggregation])

Initialize a FeatureAggregations object. See class documentation.

property source_table: str

Returns the source table used for aggregation.

property lookup_key: Union[str, List[str]]

Returns the lookup key(s) used for aggregation.

property timestamp_key: str

Returns the timestamp key used for aggregation.

property granularity: timedelta

Returns the granularity at which features are aggregated.

property start_time: datetime

Returns the start time from which to generate aggregated features.

property end_time: Optional[datetime]

Returns the end time up to which to generate aggregated features.

property aggregations: List[Aggregation]

Returns the list of aggregations to perform.

copy(**kwargs)

Create a copy of the current object with the specified attributes updated.

Parameters

kwargs – The attributes to update.

Returns

A new FeatureAggregations object with the specified attributes updated.

class databricks.ml_features.entities.aggregation.Aggregation(*, function: Union[str, Function], time_window: Optional[TimeWindow] = None, column: Optional[str] = None, output_column: Optional[str] = None, filter_condition: Optional[str] = None, **kwargs)

Bases: _FeatureStoreObject

Defines a single aggregated feature.

Parameters
  • column – The source column to aggregate. The column must exist in the parent FeatureAggregation source_table.

  • output_column – The output column name. If not provided, a default name will be generated.

  • function – The function to use. If a string is given, it will be interpreted as short-hand (e.g., “sum”, “avg”, “count”).

  • time_window – The time window to aggregate data with.

  • filter_condition – Optional SQL WHERE clause to filter source data before aggregation.

__init__(*, function: Union[str, Function], time_window: Optional[TimeWindow] = None, column: Optional[str] = None, output_column: Optional[str] = None, filter_condition: Optional[str] = None, **kwargs)

Initialize an Aggregation object. See class documentation.

property column: str | None

The source column to aggregate.

property output_column: str

The output column name.

property function: Function

The aggregation function to use.

property time_window: TimeWindow

The time window to aggregate data with.

property window: TimeWindow

The time window to aggregate data with.

property filter_condition: Optional[str]

Optional SQL filter condition to apply on the source data before aggregation.

Aggregation Functions

class databricks.ml_features.entities.function.Function(inputs: List[str])

Bases: _FeatureStoreObject

Abstract base class for all aggregation function operators.

Subclasses represent specific aggregation operations (e.g. Avg, Sum, Count). Each function operates on one or more input columns from a data source.

Parameters

inputs – Column name(s) from the source to aggregate.

__init__(inputs: List[str])
property inputs: List[str]

The input column names for this function.

spark_function(filter_condition: Optional[str] = None) Column

Build the Spark aggregation Column over this function’s input columns.

Parameters

filter_condition – Optional SQL filter condition to apply before aggregating.

Returns

A Spark Column representing the aggregation.

abstract property name: str

Return the name of the aggregation function.

extra_parameters() Dict[str, Any]

Return the extra parameters of the function. Only applicable to a few functions that require additional parameters.

class databricks.ml_features.entities.function.Avg(input: str)

Bases: Function

Class representing the average (avg) aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.Count(input: str)

Bases: Function

Class representing the count aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.ApproxCountDistinct(input: str, relativeSD: Optional[float] = None)

Bases: Function

Class representing the approximate count distinct aggregation function. See https://docs.databricks.com/en/sql/language-manual/functions/approx_count_distinct.html

Parameters

relativeSD – The relative standard deviation allowed in the approximation.

__init__(input: str, relativeSD: Optional[float] = None)
property name: str

Return the name of the aggregation function.

extra_parameters() Dict[str, Any]

Return the extra parameters of the function. Only applicable to a few functions that require additional parameters.

class databricks.ml_features.entities.function.ApproxPercentile(percentile: float, input: str, accuracy: Optional[int] = None)

Bases: Function

Class representing the percentile approximation aggregation function. See https://docs.databricks.com/en/sql/language-manual/functions/approx_percentile.html

Parameters
  • percentile – The percentile to approximate.

  • accuracy – The accuracy of the approximation.

__init__(percentile: float, input: str, accuracy: Optional[int] = None)
property name: str

Return the name of the aggregation function.

extra_parameters() Dict[str, Any]

Return the extra parameters of the function. Only applicable to a few functions that require additional parameters.

class databricks.ml_features.entities.function.First(input: str)

Bases: Function

Class representing the first aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.Last(input: str)

Bases: Function

Class representing the last aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.Max(input: str)

Bases: Function

Class representing the maximum (max) aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.Min(input: str)

Bases: Function

Class representing the minimum (min) aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.StddevPop(input: str)

Bases: Function

Class representing the population standard deviation (stddev_pop) aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.StddevSamp(input: str)

Bases: Function

Class representing the sample standard deviation (stddev_samp) aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.Sum(input: str)

Bases: Function

Class representing the sum aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.VarPop(input: str)

Bases: Function

Class representing the population variance (var_pop) aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

class databricks.ml_features.entities.function.VarSamp(input: str)

Bases: Function

Class representing the sample variance (var_samp) aggregation function.

__init__(input: str)
property name: str

Return the name of the aggregation function.

Data Sources

class databricks.ml_features.entities.data_source.DataSourceTypes(value)

Enumeration of supported data source types.

DELTA = 'delta'
UNIFORM = 'uniform'
KAFKA = 'kafka'
VOLUME = 'volume'
DATAFRAME = 'dataframe'
REQUEST = 'request'
STREAM = 'stream'
class databricks.ml_features.entities.data_source.DataSource(*, source_type: DataSourceTypes)

Bases: _FeatureStoreObject, ABC

Abstract base class for data sources used in feature computation.

Parameters

source_type – The type of data source

__init__(*, source_type: DataSourceTypes)

Initialize a DataSource object. See class documentation.

property source_type: DataSourceTypes

The type of data source.

static timeseries_to_order_column(timeseries_column: Optional[str]) Optional[str]

Derive the order column name from a timeseries column name.

abstract full_name() str

Return the full name/identifier for this data source.

abstract load_df(spark_client)

Load the data source as a Spark DataFrame. Must be implemented by all derived classes.

Args:

spark_client: The Spark client for DataFrame operations

class databricks.ml_features.entities.data_source.DeltaTableSource(*, catalog_name: str, schema_name: str, table_name: str, filter_condition: Optional[str] = None, transformation_sql: Optional[str] = None, dataframe_schema: Optional[str] = None, _skip_validation: bool = False)

Bases: BackfillSource

Data source implementation for Delta Lake tables.

Parameters
  • catalog_name – The name of the Unity Catalog catalog

  • schema_name – The name of the schema within the catalog

  • table_name – The name of the table within the schema

  • filter_condition – Optional SQL WHERE clause to filter the source table

  • transformation_sql – Optional SQL SELECT expression applied after filtering. If not provided, all columns are selected from the source table (equivalent to *).

  • dataframe_schema – Optional Spark StructType JSON schema of the transformed output

__init__(*, catalog_name: str, schema_name: str, table_name: str, filter_condition: Optional[str] = None, transformation_sql: Optional[str] = None, dataframe_schema: Optional[str] = None, _skip_validation: bool = False)

Initialize a DeltaTableSource object. See class documentation.

property catalog_name: Optional[str]

The name of the Unity Catalog catalog, or None if inaccessible.

property schema_name: Optional[str]

The name of the schema within the catalog, or None if inaccessible.

property table_name: Optional[str]

The name of the table within the schema, or None if inaccessible.

property is_inaccessible: bool

Whether this source was returned without a table name.

True when the remote omitted the table name because the current user lacks access to the underlying Unity Catalog table (or it was deleted). An inaccessible source cannot be loaded, serialized, or materialized – those operations raise instead of acting on a missing name. Always False for user-constructed sources.

property filter_condition: Optional[str]

Optional SQL WHERE clause filter applied to the source table.

property transformation_sql: Optional[str]

Optional SQL SELECT expression applied to the source table.

property dataframe_schema: Optional[str]

Optional Spark StructType JSON schema of the transformed output.

full_name() Optional[str]

Return the full table name in catalog.schema.table format.

Returns None when the source is_inaccessible.

to_dataframe(spark_client)

Load the source table as a Spark DataFrame, applying any transformation_sql and filter_condition.

Args:

spark_client: The Spark client for DataFrame operations.

Returns:

A Spark DataFrame with transformations applied.

load_df(spark_client)

Load the Delta table as a Spark DataFrame.

If transformation_sql or filter_condition is set, delegates to to_dataframe() which applies the transformations via spark.sql(). Otherwise, uses spark_client.read_table() for standard table reading.

class ParsedSQL(table_name: str, transformation_sql: str, filter_condition: Optional[str])

Bases: NamedTuple

Result of parsing a simple SQL SELECT statement.

table_name: str

Alias for field number 0

transformation_sql: str

Alias for field number 1

filter_condition: Optional[str]

Alias for field number 2

classmethod from_sql(*, sql: str, spark: SparkSession) DeltaTableSource

Create a DeltaTableSource from a SQL query string.

Parses the SQL to extract table name, transformation_sql, and filter_condition.

This is a best-effort convenience method that only supports simple queries (one SELECT, one FROM, optional WHERE). Not all simple queries will work. For complex SQL or unsupported queries, construct DeltaTableSource directly with fields passed individually.

Args:

sql: A SQL query string (e.g., “SELECT a, b FROM catalog.schema.table WHERE x > 1”). spark: The SparkSession for schema inference.

Returns:

A DeltaTableSource instance.

Raises:

ValueError: If the SQL cannot be parsed or the table name is not 3-part.

class databricks.ml_features.entities.data_source.VolumeSource

Bases: DataSource

Data source implementation for Unity Catalog Volumes.

TODO: Implementation to be defined based on volume requirements.

__init__()

Initialize a VolumeSource object. See class documentation.

full_name() str

Return the full volume path identifier.

load_df(spark_client)

Load the volume data as a Spark DataFrame.

class databricks.ml_features.entities.data_source.KafkaSource(*, name: str, filter_condition: Optional[str] = None)

Bases: DataSource

Data source implementation for Kafka streams.

KafkaSource references a KafkaConfig by name. The KafkaConfig contains connection details, authentication, and schemas for the Kafka topics. Column names must be prefixed with ‘key.’ or ‘value.’ to indicate which schema to use. Examples: ‘key.customer_id’ or ‘value.trip_details.pickup_zip’ for nested JSON fields.

Parameters

name – Name of the KafkaConfig to use (uniquely identifies the KafkaConfig in the metastore)

__init__(*, name: str, filter_condition: Optional[str] = None)

Initialize a KafkaSource object. See class documentation.

class SchemaType

Bases: object

Constants for Kafka schema types.

property name: str

The name of the KafkaConfig this source references.

full_name() str

Return the Kafka config name as the full identifier.

property filter_condition: Optional[str]

Optional filter condition to apply when reading from this source.

load_df(spark_client)

Return a DataFrame for Kafka sources.

When backfill_source is configured on the KafkaConfig, reads the backfill Delta table directly. Otherwise, returns an empty DataFrame with key/value STRUCT columns matching the KafkaConfig schemas.

Entity and timeseries extraction to leaf names happens in pit_computation_utils.py, not here.

Args:

spark_client: The Spark client for DataFrame operations

static get_columns_from_kafka_config(kafka_config) Dict[str, DataType]

Extract all columns and their Spark types from a KafkaConfig.

This method parses the key_schema and value_schema from the KafkaConfig and returns a dictionary mapping column names (with key./value. prefixes) to their Spark DataTypes.

Args:

kafka_config: The KafkaConfig object containing schema information

Returns:

Dictionary mapping dot-prefixed column names to Spark DataTypes

class databricks.ml_features.entities.data_source.DataFrameSource(*, dataframe, source_name: Optional[str] = None)

Bases: DataSource

Data source implementation for Spark DataFrames.

This allows using an existing Spark DataFrame directly as a data source for feature computation, useful for in-memory data processing and testing.

Parameters
  • dataframe – The Spark DataFrame to use as the data source

  • source_name – Optional name for the DataFrame source (for identification)

__init__(*, dataframe, source_name: Optional[str] = None)

Initialize a DataFrameSource object. See class documentation.

property dataframe

The Spark DataFrame being used as the data source.

property source_name: str

The name identifier for this DataFrame source.

full_name() str

Return the source name identifier for this DataFrame.

load_df(spark_client)

Return the existing Spark DataFrame.