pyspark.pandas.mlflow.load_model

pyspark.pandas.mlflow.load_model(model_uri: str, predict_type: Union[str, type, numpy.dtype, pandas.core.dtypes.base.ExtensionDtype] = 'infer')pyspark.pandas.mlflow.PythonModelWrapper

Loads an MLflow model into an wrapper that can be used both for pandas and pandas-on-Spark DataFrame.

Parameters
model_uristr

URI pointing to the model. See MLflow documentation for more details.

predict_typea python basic type, a numpy basic type, a Spark type or ‘infer’.

This is the return type that is expected when calling the predict function of the model. If ‘infer’ is specified, the wrapper will attempt to determine automatically the return type based on the model type.

Returns
PythonModelWrapper

A wrapper around MLflow PythonModel objects. This wrapper is expected to adhere to the interface of mlflow.pyfunc.PythonModel.

Notes

Currently, the model prediction can only be merged back with the existing dataframe. Other columns have to be manually joined. For example, this code will not work:

>>> df = ps.DataFrame({"x1": [2.0], "x2": [3.0], "z": [-1]})
>>> features = df[["x1", "x2"]]
>>> y = model.predict(features)
>>> # Works:
>>> features["y"] = y   
>>> # Will fail with a message about dataframes not aligned.
>>> df["y"] = y   

A current workaround is to use the .merge() function, using the feature values as merging keys.

>>> features['y'] = y
>>> everything = df.merge(features, on=['x1', 'x2'])
>>> everything
    x1   x2  z         y
0  2.0  3.0 -1  1.376932

Examples

Here is a full example that creates a model with scikit-learn and saves the model with

MLflow. The model is then loaded as a predictor that can be applied on a pandas-on-Spark Dataframe.

We first initialize our MLflow environment:

>>> from mlflow.tracking import MlflowClient, set_tracking_uri
>>> import mlflow.sklearn
>>> from tempfile import mkdtemp
>>> d = mkdtemp("pandas_on_spark_mlflow")
>>> set_tracking_uri("file:%s"%d)
>>> client = MlflowClient()
>>> exp_id = mlflow.create_experiment("my_experiment")
>>> exp = mlflow.set_experiment("my_experiment")

We aim at learning this numerical function using a simple linear regressor.

>>> from sklearn.linear_model import LinearRegression
>>> train = pd.DataFrame({"x1": np.arange(8), "x2": np.arange(8)**2,
...                       "y": np.log(2 + np.arange(8))})
>>> train_x = train[["x1", "x2"]]
>>> train_y = train[["y"]]
>>> with mlflow.start_run():
...     lr = LinearRegression()
...     lr.fit(train_x, train_y)
...     mlflow.sklearn.log_model(lr, "model")
LinearRegression(...)

Now that our model is logged using MLflow, we load it back and apply it on a pandas-on-Spark dataframe:

>>> from pyspark.pandas.mlflow import load_model
>>> run_info = client.list_run_infos(exp_id)[-1]
>>> model = load_model("runs:/{run_id}/model".format(run_id=run_info.run_uuid))
>>> prediction_df = ps.DataFrame({"x1": [2.0], "x2": [4.0]})
>>> prediction_df["prediction"] = model.predict(prediction_df)
>>> prediction_df
    x1   x2  prediction
0  2.0  4.0    1.355551

The model also works on pandas DataFrames as expected:

>>> model.predict(prediction_df[["x1", "x2"]].to_pandas())
array([[1.35555142]])