Feature
- class databricks.ml_features.entities.feature.Feature(*, source: DataSource, function: Union[AggregationFunction, ColumnSelection], entity: List[str], timeseries_column: str, name: Optional[str] = None, description: Optional[str] = None)
Bases:
_FeatureStoreObjectRepresents a feature definition that combines a data source with aggregation logic.
- Parameters
source – The data source for this feature
function – The function to apply. An AggregationFunction (carrying its inputs and time window) or a ColumnSelection.
entity – Required. List of entity column names that act as the lookup/aggregation keys.
timeseries_column – Required. The timeseries column name used for point-in-time joins, backfills, and aggregations.
name – The name of the feature. Leading and trailing whitespace will be stripped. If not provided or empty after stripping, a name will be auto-generated from the input column, function, and time window.
description – Optional description of the feature
The catalog and schema are not set on the constructor; they are assigned by
FeatureEngineeringClient.create_feature/register_feature.- __init__(*, source: DataSource, function: Union[AggregationFunction, ColumnSelection], entity: List[str], timeseries_column: str, name: Optional[str] = None, description: Optional[str] = None)
Initialize a Feature object. See class documentation. Should not be invoked directly, use
FeatureEngineeringClient.create_featureinstead.create_featureensures the Feature is registered in Unity Catalog and properly validated.
- property name: str
The leaf name of the feature.
- property full_name: str
The fully qualified Unity Catalog name of the feature.
- property has_full_name: bool
Whether this feature has catalog_name and schema_name set.
- property is_registered: bool
Whether this feature is persisted to Unity Catalog.
- clone(*, new_name: Optional[str] = None) Feature
Create an unregistered deep copy of this feature.
Returns a new Feature with its registration cleared so it can be re-registered in a different catalog/schema via
create_feature.- Parameters
new_name – Optional new leaf name for the cloned feature. If not provided, the original name is kept.
- Returns
A new, unregistered Feature instance.
- property source: DataSource
The data source for this feature.
- property inputs: Optional[List[str]]
Input columns for this feature, taken from its AggregationFunction.
Returns
Nonefor ColumnSelection features, which select a single column.
- property function: Union[AggregationFunction, ColumnSelection]
The function to apply: an AggregationFunction or a ColumnSelection.
- property time_window: Optional[TimeWindow]
The time window, taken from the AggregationFunction (None for ColumnSelection).
Feature Aggregations
- class databricks.ml_features.entities.feature_aggregations.FeatureAggregations(*, source_table: str, lookup_key: Union[str, List[str]], timestamp_key: str, granularity: timedelta, start_time: datetime, end_time: Optional[datetime] = None, aggregations: List[Aggregation])
Bases:
_FeatureStoreObjectNote
Aliases:
databricks.feature_engineering.entities.feature_lookup.FeatureLookup,databricks.feature_store.entities.feature_lookup.FeatureLookupDefines an aggregation specification.
- Parameters
source_table – The source table to perform aggregation on. The source table can be any delta table.
lookup_key – Key to use when computing aggregation. It can be a single key or a list of keys.
timestamp_key – Key for timestamp. Used for determining the temporal position of data points.
granularity – The temporal granularity at which to generate aggregated features. For example, a granularity of 1 day means the aggregation materialized view will contain one row per primary key and per day since start_time until now.
start_time – The earliest time to generate aggregated features from. For example, a start_time of 2020-01-01 means the aggregation materialized view will not contain any rows before this time. This will be the start of the first granularity interval.
end_time – The latest time to generate aggregated features to. If None, it means the time of materialization pipeline run; if a datetime object, it means to use it as the end time.
aggregations – A list of aggregations to perform. Each aggregation defines an output column.
- __init__(*, source_table: str, lookup_key: Union[str, List[str]], timestamp_key: str, granularity: timedelta, start_time: datetime, end_time: Optional[datetime] = None, aggregations: List[Aggregation])
Initialize a FeatureAggregations object. See class documentation.
- property source_table: str
Returns the source table used for aggregation.
- property timestamp_key: str
Returns the timestamp key used for aggregation.
- property granularity: timedelta
Returns the granularity at which features are aggregated.
- property start_time: datetime
Returns the start time from which to generate aggregated features.
- property end_time: Optional[datetime]
Returns the end time up to which to generate aggregated features.
- property aggregations: List[Aggregation]
Returns the list of aggregations to perform.
- class databricks.ml_features.entities.aggregation.Aggregation(*, function: Union[str, Function], time_window: Optional[TimeWindow] = None, column: Optional[str] = None, output_column: Optional[str] = None, filter_condition: Optional[str] = None, **kwargs)
Bases:
_FeatureStoreObjectDefines a single aggregated feature.
- Parameters
column – The source column to aggregate. The column must exist in the parent FeatureAggregation source_table.
output_column – The output column name. If not provided, a default name will be generated.
function – The function to use. If a string is given, it will be interpreted as short-hand (e.g., “sum”, “avg”, “count”).
time_window – The time window to aggregate data with.
filter_condition – Optional SQL WHERE clause to filter source data before aggregation.
- __init__(*, function: Union[str, Function], time_window: Optional[TimeWindow] = None, column: Optional[str] = None, output_column: Optional[str] = None, filter_condition: Optional[str] = None, **kwargs)
Initialize an Aggregation object. See class documentation.
- property output_column: str
The output column name.
- property function: Function
The aggregation function to use.
- property time_window: TimeWindow
The time window to aggregate data with.
- property window: TimeWindow
The time window to aggregate data with.
Aggregation Functions
- class databricks.ml_features.entities.function.Function(inputs: List[str])
Bases:
_FeatureStoreObjectAbstract base class for all aggregation function operators.
Subclasses represent specific aggregation operations (e.g. Avg, Sum, Count). Each function operates on one or more input columns from a data source.
- Parameters
inputs – Column name(s) from the source to aggregate.
- spark_function(filter_condition: Optional[str] = None) Column
Build the Spark aggregation Column over this function’s input columns.
- Parameters
filter_condition – Optional SQL filter condition to apply before aggregating.
- Returns
A Spark Column representing the aggregation.
- abstract property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.Avg(input: str)
Bases:
FunctionClass representing the average (avg) aggregation function.
- __init__(input: str)
- property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.Count(input: str)
Bases:
FunctionClass representing the count aggregation function.
- __init__(input: str)
- property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.ApproxCountDistinct(input: str, relativeSD: Optional[float] = None)
Bases:
FunctionClass representing the approximate count distinct aggregation function. See https://docs.databricks.com/en/sql/language-manual/functions/approx_count_distinct.html
- Parameters
relativeSD – The relative standard deviation allowed in the approximation.
- property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.ApproxPercentile(percentile: float, input: str, accuracy: Optional[int] = None)
Bases:
FunctionClass representing the percentile approximation aggregation function. See https://docs.databricks.com/en/sql/language-manual/functions/approx_percentile.html
- Parameters
percentile – The percentile to approximate.
accuracy – The accuracy of the approximation.
- property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.First(input: str)
Bases:
FunctionClass representing the first aggregation function.
- __init__(input: str)
- property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.Last(input: str)
Bases:
FunctionClass representing the last aggregation function.
- __init__(input: str)
- property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.Max(input: str)
Bases:
FunctionClass representing the maximum (max) aggregation function.
- __init__(input: str)
- property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.Min(input: str)
Bases:
FunctionClass representing the minimum (min) aggregation function.
- __init__(input: str)
- property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.StddevPop(input: str)
Bases:
FunctionClass representing the population standard deviation (stddev_pop) aggregation function.
- __init__(input: str)
- property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.StddevSamp(input: str)
Bases:
FunctionClass representing the sample standard deviation (stddev_samp) aggregation function.
- __init__(input: str)
- property name: str
Return the name of the aggregation function.
- class databricks.ml_features.entities.function.Sum(input: str)
Bases:
FunctionClass representing the sum aggregation function.
- __init__(input: str)
- property name: str
Return the name of the aggregation function.
Data Sources
- class databricks.ml_features.entities.data_source.DataSourceTypes(value)
Enumeration of supported data source types.
- class databricks.ml_features.entities.data_source.DataSource(*, source_type: DataSourceTypes)
Bases:
_FeatureStoreObject,ABCAbstract base class for data sources used in feature computation.
- Parameters
source_type – The type of data source
- __init__(*, source_type: DataSourceTypes)
Initialize a DataSource object. See class documentation.
- property source_type: DataSourceTypes
The type of data source.
- static timeseries_to_order_column(timeseries_column: Optional[str]) Optional[str]
Derive the order column name from a timeseries column name.
- abstract full_name() str
Return the full name/identifier for this data source.
- class databricks.ml_features.entities.data_source.DeltaTableSource(*, catalog_name: str, schema_name: str, table_name: str, filter_condition: Optional[str] = None, transformation_sql: Optional[str] = None, dataframe_schema: Optional[str] = None, _skip_validation: bool = False)
Bases:
BackfillSourceData source implementation for Delta Lake tables.
- Parameters
catalog_name – The name of the Unity Catalog catalog
schema_name – The name of the schema within the catalog
table_name – The name of the table within the schema
filter_condition – Optional SQL WHERE clause to filter the source table
transformation_sql – Optional SQL SELECT expression applied after filtering. If not provided, all columns are selected from the source table (equivalent to
*).dataframe_schema – Optional Spark StructType JSON schema of the transformed output
- __init__(*, catalog_name: str, schema_name: str, table_name: str, filter_condition: Optional[str] = None, transformation_sql: Optional[str] = None, dataframe_schema: Optional[str] = None, _skip_validation: bool = False)
Initialize a DeltaTableSource object. See class documentation.
- property catalog_name: Optional[str]
The name of the Unity Catalog catalog, or
Noneif inaccessible.
- property schema_name: Optional[str]
The name of the schema within the catalog, or
Noneif inaccessible.
- property table_name: Optional[str]
The name of the table within the schema, or
Noneif inaccessible.
- property is_inaccessible: bool
Whether this source was returned without a table name.
Truewhen the remote omitted the table name because the current user lacks access to the underlying Unity Catalog table (or it was deleted). An inaccessible source cannot be loaded, serialized, or materialized – those operations raise instead of acting on a missing name. AlwaysFalsefor user-constructed sources.
- property filter_condition: Optional[str]
Optional SQL WHERE clause filter applied to the source table.
- property transformation_sql: Optional[str]
Optional SQL SELECT expression applied to the source table.
- property dataframe_schema: Optional[str]
Optional Spark StructType JSON schema of the transformed output.
- full_name() Optional[str]
Return the full table name in catalog.schema.table format.
Returns
Nonewhen the sourceis_inaccessible.
- to_dataframe(spark_client)
Load the source table as a Spark DataFrame, applying any transformation_sql and filter_condition.
- Args:
spark_client: The Spark client for DataFrame operations.
- Returns:
A Spark DataFrame with transformations applied.
- load_df(spark_client)
Load the Delta table as a Spark DataFrame.
If transformation_sql or filter_condition is set, delegates to to_dataframe() which applies the transformations via spark.sql(). Otherwise, uses spark_client.read_table() for standard table reading.
- class ParsedSQL(table_name: str, transformation_sql: str, filter_condition: Optional[str])
Bases:
NamedTupleResult of parsing a simple SQL SELECT statement.
- table_name: str
Alias for field number 0
- transformation_sql: str
Alias for field number 1
- classmethod from_sql(*, sql: str, spark: SparkSession) DeltaTableSource
Create a DeltaTableSource from a SQL query string.
Parses the SQL to extract table name, transformation_sql, and filter_condition.
This is a best-effort convenience method that only supports simple queries (one SELECT, one FROM, optional WHERE). Not all simple queries will work. For complex SQL or unsupported queries, construct DeltaTableSource directly with fields passed individually.
- Args:
sql: A SQL query string (e.g., “SELECT a, b FROM catalog.schema.table WHERE x > 1”). spark: The SparkSession for schema inference.
- Returns:
A DeltaTableSource instance.
- Raises:
ValueError: If the SQL cannot be parsed or the table name is not 3-part.
- class databricks.ml_features.entities.data_source.VolumeSource
Bases:
DataSourceData source implementation for Unity Catalog Volumes.
TODO: Implementation to be defined based on volume requirements.
- full_name() str
Return the full volume path identifier.
- class databricks.ml_features.entities.data_source.KafkaSource(*, name: str, filter_condition: Optional[str] = None)
Bases:
DataSourceData source implementation for Kafka streams.
KafkaSource references a KafkaConfig by name. The KafkaConfig contains connection details, authentication, and schemas for the Kafka topics. Column names must be prefixed with ‘key.’ or ‘value.’ to indicate which schema to use. Examples: ‘key.customer_id’ or ‘value.trip_details.pickup_zip’ for nested JSON fields.
- Parameters
name – Name of the KafkaConfig to use (uniquely identifies the KafkaConfig in the metastore)
- __init__(*, name: str, filter_condition: Optional[str] = None)
Initialize a KafkaSource object. See class documentation.
- class SchemaType
Bases:
objectConstants for Kafka schema types.
- property name: str
The name of the KafkaConfig this source references.
- full_name() str
Return the Kafka config name as the full identifier.
- property filter_condition: Optional[str]
Optional filter condition to apply when reading from this source.
- load_df(spark_client)
Return a DataFrame for Kafka sources.
When backfill_source is configured on the KafkaConfig, reads the backfill Delta table directly. Otherwise, returns an empty DataFrame with key/value STRUCT columns matching the KafkaConfig schemas.
Entity and timeseries extraction to leaf names happens in pit_computation_utils.py, not here.
- Args:
spark_client: The Spark client for DataFrame operations
- static get_columns_from_kafka_config(kafka_config) Dict[str, DataType]
Extract all columns and their Spark types from a KafkaConfig.
This method parses the key_schema and value_schema from the KafkaConfig and returns a dictionary mapping column names (with key./value. prefixes) to their Spark DataTypes.
- Args:
kafka_config: The KafkaConfig object containing schema information
- Returns:
Dictionary mapping dot-prefixed column names to Spark DataTypes
- class databricks.ml_features.entities.data_source.DataFrameSource(*, dataframe, source_name: Optional[str] = None)
Bases:
DataSourceData source implementation for Spark DataFrames.
This allows using an existing Spark DataFrame directly as a data source for feature computation, useful for in-memory data processing and testing.
- Parameters
dataframe – The Spark DataFrame to use as the data source
source_name – Optional name for the DataFrame source (for identification)
- __init__(*, dataframe, source_name: Optional[str] = None)
Initialize a DataFrameSource object. See class documentation.
- property source_name: str
The name identifier for this DataFrame source.
- full_name() str
Return the source name identifier for this DataFrame.