pyspark.pandas.groupby.GroupBy.apply¶
-
GroupBy.
apply
(func: Callable, *args: Any, **kwargs: Any) → Union[pyspark.pandas.frame.DataFrame, pyspark.pandas.series.Series]¶ Apply function func group-wise and combine the results together.
The function passed to apply must take a DataFrame as its first argument and return a DataFrame. apply will then take care of combining the results back together into a single dataframe. apply is therefore a highly flexible grouping method.
While apply is a very flexible method, its downside is that using it can be quite a bit slower than using more specific methods like agg or transform. pandas-on-Spark offers a wide range of method that will be much faster than using apply for their specific purposes, so try to use them before reaching for apply.
Note
this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting.
To avoid this, specify return type in
func
, for instance, as below:>>> def pandas_div(x) -> ps.DataFrame[int, [float, float]]: ... return x[['B', 'C']] / x[['B', 'C']]
If the return type is specified, the output column names become c0, c1, c2 … cn. These names are positionally mapped to the returned DataFrame in
func
.To specify the column names, you can assign them in a NumPy compound type style as below:
>>> def pandas_div(x) -> ps.DataFrame[("index", int), [("a", float), ("b", float)]]: ... return x[['B', 'C']] / x[['B', 'C']]
>>> pdf = pd.DataFrame({'B': [1.], 'C': [3.]}) >>> def plus_one(x) -> ps.DataFrame[ ... (pdf.index.name, pdf.index.dtype), zip(pdf.columns, pdf.dtypes)]: ... return x[['B', 'C']] / x[['B', 'C']]
Note
the dataframe within
func
is actually a pandas dataframe. Therefore, any pandas API within this function is allowed.- Parameters
- funccallable
A callable that takes a DataFrame as its first argument, and returns a dataframe.
- *args
Positional arguments to pass to func.
- **kwargs
Keyword arguments to pass to func.
- Returns
- appliedDataFrame or Series
See also
aggregate
Apply aggregate function to the GroupBy object.
DataFrame.apply
Apply a function to a DataFrame.
Series.apply
Apply a function to a Series.
Examples
>>> df = ps.DataFrame({'A': 'a a b'.split(), ... 'B': [1, 2, 3], ... 'C': [4, 6, 5]}, columns=['A', 'B', 'C']) >>> g = df.groupby('A')
Notice that
g
has two groups,a
andb
. Calling apply in various ways, we can get different grouping results:Below the functions passed to apply takes a DataFrame as its argument and returns a DataFrame. apply combines the result for each group together into a new DataFrame:
>>> def plus_min(x): ... return x + x.min() >>> g.apply(plus_min).sort_index() A B C 0 aa 2 8 1 aa 3 10 2 bb 6 10
>>> g.apply(sum).sort_index() A B C A a aa 3 10 b b 3 5
>>> g.apply(len).sort_index() A a 2 b 1 dtype: int64
You can specify the type hint and prevent schema inference for better performance.
>>> def pandas_div(x) -> ps.DataFrame[int, [float, float]]: ... return x[['B', 'C']] / x[['B', 'C']] >>> g.apply(pandas_div).sort_index() c0 c1 0 1.0 1.0 1 1.0 1.0 2 1.0 1.0
>>> def pandas_div(x) -> ps.DataFrame[("index", int), [("f1", float), ("f2", float)]]: ... return x[['B', 'C']] / x[['B', 'C']] >>> g.apply(pandas_div).sort_index() f1 f2 index 0 1.0 1.0 1 1.0 1.0 2 1.0 1.0
In case of Series, it works as below.
>>> def plus_max(x) -> ps.Series[np.int]: ... return x + x.max() >>> df.B.groupby(df.A).apply(plus_max).sort_index() 0 6 1 3 2 4 Name: B, dtype: int64
>>> def plus_min(x): ... return x + x.min() >>> df.B.groupby(df.A).apply(plus_min).sort_index() 0 2 1 3 2 6 Name: B, dtype: int64
You can also return a scalar value as a aggregated value of the group:
>>> def plus_length(x) -> np.int: ... return len(x) >>> df.B.groupby(df.A).apply(plus_length).sort_index() 0 1 1 2 Name: B, dtype: int64
The extra arguments to the function can be passed as below.
>>> def calculation(x, y, z) -> np.int: ... return len(x) + y * z >>> df.B.groupby(df.A).apply(calculation, 5, z=10).sort_index() 0 51 1 52 Name: B, dtype: int64