ALS¶
-
class
pyspark.ml.recommendation.
ALS
(*, rank: int = 10, maxIter: int = 10, regParam: float = 0.1, numUserBlocks: int = 10, numItemBlocks: int = 10, implicitPrefs: bool = False, alpha: float = 1.0, userCol: str = 'user', itemCol: str = 'item', seed: Optional[int] = None, ratingCol: str = 'rating', nonnegative: bool = False, checkpointInterval: int = 10, intermediateStorageLevel: str = 'MEMORY_AND_DISK', finalStorageLevel: str = 'MEMORY_AND_DISK', coldStartStrategy: str = 'nan', blockSize: int = 4096)¶ Alternating Least Squares (ALS) matrix factorization.
ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices, X and Y, i.e. X * Yt = R. Typically these approximations are called ‘factor’ matrices. The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly-solved factor matrix is then held constant while solving for the other factor matrix.
This is a blocked implementation of the ALS factorization algorithm that groups the two sets of factors (referred to as “users” and “products”) into blocks and reduces communication by only sending one copy of each user vector to each product block on each iteration, and only for the product blocks that need that user’s feature vector. This is achieved by pre-computing some information about the ratings matrix to determine the “out-links” of each user (which blocks of products it will contribute to) and “in-link” information for each product (which of the feature vectors it receives from each user block it will depend on). This allows us to send only an array of feature vectors between each user block and product block, and have the product block find the users’ ratings and update the products based on these messages.
For implicit preference data, the algorithm used is based on “Collaborative Filtering for Implicit Feedback Datasets”,, adapted for the blocked approach used here.
Essentially instead of finding the low-rank approximations to the rating matrix R, this finds the approximations for a preference matrix P where the elements of P are 1 if r > 0 and 0 if r <= 0. The ratings then act as ‘confidence’ values related to strength of indicated user preferences rather than explicit ratings given to items.
Notes
The input rating dataframe to the ALS implementation should be deterministic. Nondeterministic data can cause failure during fitting ALS model. For example, an order-sensitive operation like sampling after a repartition makes dataframe output nondeterministic, like df.repartition(2).sample(False, 0.5, 1618). Checkpointing sampled dataframe or adding a sort before sampling can help make the dataframe deterministic.
Examples
>>> df = spark.createDataFrame( ... [(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)], ... ["user", "item", "rating"]) >>> als = ALS(rank=10, seed=0) >>> als.setMaxIter(5) ALS... >>> als.getMaxIter() 5 >>> als.setRegParam(0.1) ALS... >>> als.getRegParam() 0.1 >>> als.clear(als.regParam) >>> model = als.fit(df) >>> model.getBlockSize() 4096 >>> model.getUserCol() 'user' >>> model.setUserCol("user") ALSModel... >>> model.getItemCol() 'item' >>> model.setPredictionCol("newPrediction") ALS... >>> model.rank 10 >>> model.userFactors.orderBy("id").collect() [Row(id=0, features=[...]), Row(id=1, ...), Row(id=2, ...)] >>> test = spark.createDataFrame([(0, 2), (1, 0), (2, 0)], ["user", "item"]) >>> predictions = sorted(model.transform(test).collect(), key=lambda r: r[0]) >>> predictions[0] Row(user=0, item=2, newPrediction=0.6929...) >>> predictions[1] Row(user=1, item=0, newPrediction=3.47356...) >>> predictions[2] Row(user=2, item=0, newPrediction=-0.899198...) >>> user_recs = model.recommendForAllUsers(3) >>> user_recs.where(user_recs.user == 0) .select("recommendations.item", "recommendations.rating").collect() [Row(item=[0, 1, 2], rating=[3.910..., 1.997..., 0.692...])] >>> item_recs = model.recommendForAllItems(3) >>> item_recs.where(item_recs.item == 2) .select("recommendations.user", "recommendations.rating").collect() [Row(user=[2, 1, 0], rating=[4.892..., 3.991..., 0.692...])] >>> user_subset = df.where(df.user == 2) >>> user_subset_recs = model.recommendForUserSubset(user_subset, 3) >>> user_subset_recs.select("recommendations.item", "recommendations.rating").first() Row(item=[2, 1, 0], rating=[4.892..., 1.076..., -0.899...]) >>> item_subset = df.where(df.item == 0) >>> item_subset_recs = model.recommendForItemSubset(item_subset, 3) >>> item_subset_recs.select("recommendations.user", "recommendations.rating").first() Row(user=[0, 1, 2], rating=[3.910..., 3.473..., -0.899...]) >>> als_path = temp_path + "/als" >>> als.save(als_path) >>> als2 = ALS.load(als_path) >>> als.getMaxIter() 5 >>> model_path = temp_path + "/als_model" >>> model.save(model_path) >>> model2 = ALSModel.load(model_path) >>> model.rank == model2.rank True >>> sorted(model.userFactors.collect()) == sorted(model2.userFactors.collect()) True >>> sorted(model.itemFactors.collect()) == sorted(model2.itemFactors.collect()) True >>> model.transform(test).sort("user").take(1) == model2.transform(test).sort("user").take(1) True
Methods
clear
(param)Clears a param from the param map if it has been explicitly set.
copy
([extra])Creates a copy of this instance with the same uid and some extra params.
explainParam
(param)Explains a single param and returns its name, doc, and optional default value and user-supplied value in a string.
Returns the documentation of all params with their optionally default values and user-supplied values.
extractParamMap
([extra])Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into a flat param map, where the latter value is used if there exist conflicts, i.e., with ordering: default param values < user-supplied values < extra.
fit
(dataset[, params])Fits a model to the input dataset with optional parameters.
fitMultiple
(dataset, paramMaps)Fits a model to the input dataset for each param map in paramMaps.
getAlpha
()Gets the value of alpha or its default value.
Gets the value of blockSize or its default value.
Gets the value of checkpointInterval or its default value.
Gets the value of coldStartStrategy or its default value.
Gets the value of finalStorageLevel or its default value.
Gets the value of implicitPrefs or its default value.
Gets the value of intermediateStorageLevel or its default value.
Gets the value of itemCol or its default value.
Gets the value of maxIter or its default value.
Gets the value of nonnegative or its default value.
Gets the value of numItemBlocks or its default value.
Gets the value of numUserBlocks or its default value.
getOrDefault
(param)Gets the value of a param in the user-supplied param map or its default value.
getParam
(paramName)Gets a param by its name.
Gets the value of predictionCol or its default value.
getRank
()Gets the value of rank or its default value.
Gets the value of ratingCol or its default value.
Gets the value of regParam or its default value.
getSeed
()Gets the value of seed or its default value.
Gets the value of userCol or its default value.
hasDefault
(param)Checks whether a param has a default value.
hasParam
(paramName)Tests whether this instance contains a param with a given (string) name.
isDefined
(param)Checks whether a param is explicitly set by user or has a default value.
isSet
(param)Checks whether a param is explicitly set by user.
load
(path)Reads an ML instance from the input path, a shortcut of read().load(path).
read
()Returns an MLReader instance for this class.
save
(path)Save this ML instance to the given path, a shortcut of ‘write().save(path)’.
set
(param, value)Sets a parameter in the embedded param map.
setAlpha
(value)Sets the value of
alpha
.setBlockSize
(value)Sets the value of
blockSize
.setCheckpointInterval
(value)Sets the value of
checkpointInterval
.setColdStartStrategy
(value)Sets the value of
coldStartStrategy
.setFinalStorageLevel
(value)Sets the value of
finalStorageLevel
.setImplicitPrefs
(value)Sets the value of
implicitPrefs
.setIntermediateStorageLevel
(value)Sets the value of
intermediateStorageLevel
.setItemCol
(value)Sets the value of
itemCol
.setMaxIter
(value)Sets the value of
maxIter
.setNonnegative
(value)Sets the value of
nonnegative
.setNumBlocks
(value)Sets both
numUserBlocks
andnumItemBlocks
to the specific value.setNumItemBlocks
(value)Sets the value of
numItemBlocks
.setNumUserBlocks
(value)Sets the value of
numUserBlocks
.setParams
(self, \*[, rank, maxIter, …])Sets params for ALS.
setPredictionCol
(value)Sets the value of
predictionCol
.setRank
(value)Sets the value of
rank
.setRatingCol
(value)Sets the value of
ratingCol
.setRegParam
(value)Sets the value of
regParam
.setSeed
(value)Sets the value of
seed
.setUserCol
(value)Sets the value of
userCol
.write
()Returns an MLWriter instance for this ML instance.
Attributes
Returns all params ordered by name.
Methods Documentation
-
clear
(param: pyspark.ml.param.Param) → None¶ Clears a param from the param map if it has been explicitly set.
-
copy
(extra: Optional[ParamMap] = None) → JP¶ Creates a copy of this instance with the same uid and some extra params. This implementation first calls Params.copy and then make a copy of the companion Java pipeline component with extra params. So both the Python wrapper and the Java pipeline component get copied.
- Parameters
- extradict, optional
Extra parameters to copy to the new instance
- Returns
JavaParams
Copy of this instance
-
explainParam
(param: Union[str, pyspark.ml.param.Param]) → str¶ Explains a single param and returns its name, doc, and optional default value and user-supplied value in a string.
-
explainParams
() → str¶ Returns the documentation of all params with their optionally default values and user-supplied values.
-
extractParamMap
(extra: Optional[ParamMap] = None) → ParamMap¶ Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into a flat param map, where the latter value is used if there exist conflicts, i.e., with ordering: default param values < user-supplied values < extra.
- Parameters
- extradict, optional
extra param values
- Returns
- dict
merged param map
-
fit
(dataset: pyspark.sql.dataframe.DataFrame, params: Union[ParamMap, List[ParamMap], Tuple[ParamMap], None] = None) → Union[M, List[M]]¶ Fits a model to the input dataset with optional parameters.
- Parameters
- dataset
pyspark.sql.DataFrame
input dataset.
- paramsdict or list or tuple, optional
an optional param map that overrides embedded params. If a list/tuple of param maps is given, this calls fit on each param map and returns a list of models.
- dataset
- Returns
Transformer
or a list ofTransformer
fitted model(s)
-
fitMultiple
(dataset: pyspark.sql.dataframe.DataFrame, paramMaps: Sequence[ParamMap]) → Iterator[Tuple[int, M]]¶ Fits a model to the input dataset for each param map in paramMaps.
- Parameters
- dataset
pyspark.sql.DataFrame
input dataset.
- paramMaps
collections.abc.Sequence
A Sequence of param maps.
- dataset
- Returns
_FitMultipleIterator
A thread safe iterable which contains one model for each param map. Each call to next(modelIterator) will return (index, model) where model was fit using paramMaps[index]. index values may not be sequential.
-
getAlpha
() → float¶ Gets the value of alpha or its default value.
-
getBlockSize
() → int¶ Gets the value of blockSize or its default value.
-
getCheckpointInterval
() → int¶ Gets the value of checkpointInterval or its default value.
-
getColdStartStrategy
() → str¶ Gets the value of coldStartStrategy or its default value.
-
getFinalStorageLevel
() → str¶ Gets the value of finalStorageLevel or its default value.
-
getImplicitPrefs
() → bool¶ Gets the value of implicitPrefs or its default value.
-
getIntermediateStorageLevel
() → str¶ Gets the value of intermediateStorageLevel or its default value.
-
getItemCol
() → str¶ Gets the value of itemCol or its default value.
-
getMaxIter
() → int¶ Gets the value of maxIter or its default value.
-
getNonnegative
() → bool¶ Gets the value of nonnegative or its default value.
-
getNumItemBlocks
() → int¶ Gets the value of numItemBlocks or its default value.
-
getNumUserBlocks
() → int¶ Gets the value of numUserBlocks or its default value.
-
getOrDefault
(param: Union[str, pyspark.ml.param.Param[T]]) → Union[Any, T]¶ Gets the value of a param in the user-supplied param map or its default value. Raises an error if neither is set.
-
getParam
(paramName: str) → pyspark.ml.param.Param¶ Gets a param by its name.
-
getPredictionCol
() → str¶ Gets the value of predictionCol or its default value.
-
getRank
() → int¶ Gets the value of rank or its default value.
-
getRatingCol
() → str¶ Gets the value of ratingCol or its default value.
-
getRegParam
() → float¶ Gets the value of regParam or its default value.
-
getSeed
() → int¶ Gets the value of seed or its default value.
-
getUserCol
() → str¶ Gets the value of userCol or its default value.
-
hasDefault
(param: Union[str, pyspark.ml.param.Param[Any]]) → bool¶ Checks whether a param has a default value.
-
hasParam
(paramName: str) → bool¶ Tests whether this instance contains a param with a given (string) name.
-
isDefined
(param: Union[str, pyspark.ml.param.Param[Any]]) → bool¶ Checks whether a param is explicitly set by user or has a default value.
-
isSet
(param: Union[str, pyspark.ml.param.Param[Any]]) → bool¶ Checks whether a param is explicitly set by user.
-
classmethod
load
(path: str) → RL¶ Reads an ML instance from the input path, a shortcut of read().load(path).
-
classmethod
read
() → pyspark.ml.util.JavaMLReader[RL]¶ Returns an MLReader instance for this class.
-
save
(path: str) → None¶ Save this ML instance to the given path, a shortcut of ‘write().save(path)’.
-
set
(param: pyspark.ml.param.Param, value: Any) → None¶ Sets a parameter in the embedded param map.
-
setAlpha
(value: float) → pyspark.ml.recommendation.ALS¶ Sets the value of
alpha
.
-
setBlockSize
(value: int) → pyspark.ml.recommendation.ALS¶ Sets the value of
blockSize
.
-
setCheckpointInterval
(value: int) → pyspark.ml.recommendation.ALS¶ Sets the value of
checkpointInterval
.
-
setColdStartStrategy
(value: str) → pyspark.ml.recommendation.ALS¶ Sets the value of
coldStartStrategy
.
-
setFinalStorageLevel
(value: str) → pyspark.ml.recommendation.ALS¶ Sets the value of
finalStorageLevel
.
-
setImplicitPrefs
(value: bool) → pyspark.ml.recommendation.ALS¶ Sets the value of
implicitPrefs
.
-
setIntermediateStorageLevel
(value: str) → pyspark.ml.recommendation.ALS¶ Sets the value of
intermediateStorageLevel
.
-
setItemCol
(value: str) → pyspark.ml.recommendation.ALS¶ Sets the value of
itemCol
.
-
setMaxIter
(value: int) → pyspark.ml.recommendation.ALS¶ Sets the value of
maxIter
.
-
setNonnegative
(value: bool) → pyspark.ml.recommendation.ALS¶ Sets the value of
nonnegative
.
-
setNumBlocks
(value: int) → pyspark.ml.recommendation.ALS¶ Sets both
numUserBlocks
andnumItemBlocks
to the specific value.
-
setNumItemBlocks
(value: int) → pyspark.ml.recommendation.ALS¶ Sets the value of
numItemBlocks
.
-
setNumUserBlocks
(value: int) → pyspark.ml.recommendation.ALS¶ Sets the value of
numUserBlocks
.
-
setParams
(self, \*, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)¶ Sets params for ALS.
-
setPredictionCol
(value: str) → pyspark.ml.recommendation.ALS¶ Sets the value of
predictionCol
.
-
setRank
(value: int) → pyspark.ml.recommendation.ALS¶ Sets the value of
rank
.
-
setRatingCol
(value: str) → pyspark.ml.recommendation.ALS¶ Sets the value of
ratingCol
.
-
setRegParam
(value: float) → pyspark.ml.recommendation.ALS¶ Sets the value of
regParam
.
-
setSeed
(value: int) → pyspark.ml.recommendation.ALS¶ Sets the value of
seed
.
-
setUserCol
(value: str) → pyspark.ml.recommendation.ALS¶ Sets the value of
userCol
.
-
write
() → pyspark.ml.util.JavaMLWriter¶ Returns an MLWriter instance for this ML instance.
Attributes Documentation
-
alpha
= Param(parent='undefined', name='alpha', doc='alpha for implicit preference')¶
-
blockSize
= Param(parent='undefined', name='blockSize', doc='block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.')¶
-
checkpointInterval
= Param(parent='undefined', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.')¶
-
coldStartStrategy
= Param(parent='undefined', name='coldStartStrategy', doc="strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'.")¶
-
finalStorageLevel
= Param(parent='undefined', name='finalStorageLevel', doc='StorageLevel for ALS model factors.')¶
-
implicitPrefs
= Param(parent='undefined', name='implicitPrefs', doc='whether to use implicit preference')¶
-
intermediateStorageLevel
= Param(parent='undefined', name='intermediateStorageLevel', doc="StorageLevel for intermediate datasets. Cannot be 'NONE'.")¶
-
itemCol
= Param(parent='undefined', name='itemCol', doc='column name for item ids. Ids must be within the integer value range.')¶
-
maxIter
= Param(parent='undefined', name='maxIter', doc='max number of iterations (>= 0).')¶
-
nonnegative
= Param(parent='undefined', name='nonnegative', doc='whether to use nonnegative constraint for least squares')¶
-
numItemBlocks
= Param(parent='undefined', name='numItemBlocks', doc='number of item blocks')¶
-
numUserBlocks
= Param(parent='undefined', name='numUserBlocks', doc='number of user blocks')¶
-
params
¶ Returns all params ordered by name. The default implementation uses
dir()
to get all attributes of typeParam
.
-
predictionCol
= Param(parent='undefined', name='predictionCol', doc='prediction column name.')¶
-
rank
= Param(parent='undefined', name='rank', doc='rank of the factorization')¶
-
ratingCol
= Param(parent='undefined', name='ratingCol', doc='column name for ratings')¶
-
regParam
= Param(parent='undefined', name='regParam', doc='regularization parameter (>= 0).')¶
-
seed
= Param(parent='undefined', name='seed', doc='random seed.')¶
-
userCol
= Param(parent='undefined', name='userCol', doc='column name for user ids. Ids must be within the integer value range.')¶
-