ALS

class pyspark.ml.recommendation.ALS(*, rank: int = 10, maxIter: int = 10, regParam: float = 0.1, numUserBlocks: int = 10, numItemBlocks: int = 10, implicitPrefs: bool = False, alpha: float = 1.0, userCol: str = 'user', itemCol: str = 'item', seed: Optional[int] = None, ratingCol: str = 'rating', nonnegative: bool = False, checkpointInterval: int = 10, intermediateStorageLevel: str = 'MEMORY_AND_DISK', finalStorageLevel: str = 'MEMORY_AND_DISK', coldStartStrategy: str = 'nan', blockSize: int = 4096)

Alternating Least Squares (ALS) matrix factorization.

ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices, X and Y, i.e. X * Yt = R. Typically these approximations are called ‘factor’ matrices. The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly-solved factor matrix is then held constant while solving for the other factor matrix.

This is a blocked implementation of the ALS factorization algorithm that groups the two sets of factors (referred to as “users” and “products”) into blocks and reduces communication by only sending one copy of each user vector to each product block on each iteration, and only for the product blocks that need that user’s feature vector. This is achieved by pre-computing some information about the ratings matrix to determine the “out-links” of each user (which blocks of products it will contribute to) and “in-link” information for each product (which of the feature vectors it receives from each user block it will depend on). This allows us to send only an array of feature vectors between each user block and product block, and have the product block find the users’ ratings and update the products based on these messages.

For implicit preference data, the algorithm used is based on “Collaborative Filtering for Implicit Feedback Datasets”,, adapted for the blocked approach used here.

Essentially instead of finding the low-rank approximations to the rating matrix R, this finds the approximations for a preference matrix P where the elements of P are 1 if r > 0 and 0 if r <= 0. The ratings then act as ‘confidence’ values related to strength of indicated user preferences rather than explicit ratings given to items.

Notes

The input rating dataframe to the ALS implementation should be deterministic. Nondeterministic data can cause failure during fitting ALS model. For example, an order-sensitive operation like sampling after a repartition makes dataframe output nondeterministic, like df.repartition(2).sample(False, 0.5, 1618). Checkpointing sampled dataframe or adding a sort before sampling can help make the dataframe deterministic.

Examples

>>> df = spark.createDataFrame(
...     [(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
...     ["user", "item", "rating"])
>>> als = ALS(rank=10, seed=0)
>>> als.setMaxIter(5)
ALS...
>>> als.getMaxIter()
5
>>> als.setRegParam(0.1)
ALS...
>>> als.getRegParam()
0.1
>>> als.clear(als.regParam)
>>> model = als.fit(df)
>>> model.getBlockSize()
4096
>>> model.getUserCol()
'user'
>>> model.setUserCol("user")
ALSModel...
>>> model.getItemCol()
'item'
>>> model.setPredictionCol("newPrediction")
ALS...
>>> model.rank
10
>>> model.userFactors.orderBy("id").collect()
[Row(id=0, features=[...]), Row(id=1, ...), Row(id=2, ...)]
>>> test = spark.createDataFrame([(0, 2), (1, 0), (2, 0)], ["user", "item"])
>>> predictions = sorted(model.transform(test).collect(), key=lambda r: r[0])
>>> predictions[0]
Row(user=0, item=2, newPrediction=0.6929...)
>>> predictions[1]
Row(user=1, item=0, newPrediction=3.47356...)
>>> predictions[2]
Row(user=2, item=0, newPrediction=-0.899198...)
>>> user_recs = model.recommendForAllUsers(3)
>>> user_recs.where(user_recs.user == 0)        .select("recommendations.item", "recommendations.rating").collect()
[Row(item=[0, 1, 2], rating=[3.910..., 1.997..., 0.692...])]
>>> item_recs = model.recommendForAllItems(3)
>>> item_recs.where(item_recs.item == 2)        .select("recommendations.user", "recommendations.rating").collect()
[Row(user=[2, 1, 0], rating=[4.892..., 3.991..., 0.692...])]
>>> user_subset = df.where(df.user == 2)
>>> user_subset_recs = model.recommendForUserSubset(user_subset, 3)
>>> user_subset_recs.select("recommendations.item", "recommendations.rating").first()
Row(item=[2, 1, 0], rating=[4.892..., 1.076..., -0.899...])
>>> item_subset = df.where(df.item == 0)
>>> item_subset_recs = model.recommendForItemSubset(item_subset, 3)
>>> item_subset_recs.select("recommendations.user", "recommendations.rating").first()
Row(user=[0, 1, 2], rating=[3.910..., 3.473..., -0.899...])
>>> als_path = temp_path + "/als"
>>> als.save(als_path)
>>> als2 = ALS.load(als_path)
>>> als.getMaxIter()
5
>>> model_path = temp_path + "/als_model"
>>> model.save(model_path)
>>> model2 = ALSModel.load(model_path)
>>> model.rank == model2.rank
True
>>> sorted(model.userFactors.collect()) == sorted(model2.userFactors.collect())
True
>>> sorted(model.itemFactors.collect()) == sorted(model2.itemFactors.collect())
True
>>> model.transform(test).sort("user").take(1) == model2.transform(test).sort("user").take(1)
True

Methods

clear(param)

Clears a param from the param map if it has been explicitly set.

copy([extra])

Creates a copy of this instance with the same uid and some extra params.

explainParam(param)

Explains a single param and returns its name, doc, and optional default value and user-supplied value in a string.

explainParams()

Returns the documentation of all params with their optionally default values and user-supplied values.

extractParamMap([extra])

Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into a flat param map, where the latter value is used if there exist conflicts, i.e., with ordering: default param values < user-supplied values < extra.

fit(dataset[, params])

Fits a model to the input dataset with optional parameters.

fitMultiple(dataset, paramMaps)

Fits a model to the input dataset for each param map in paramMaps.

getAlpha()

Gets the value of alpha or its default value.

getBlockSize()

Gets the value of blockSize or its default value.

getCheckpointInterval()

Gets the value of checkpointInterval or its default value.

getColdStartStrategy()

Gets the value of coldStartStrategy or its default value.

getFinalStorageLevel()

Gets the value of finalStorageLevel or its default value.

getImplicitPrefs()

Gets the value of implicitPrefs or its default value.

getIntermediateStorageLevel()

Gets the value of intermediateStorageLevel or its default value.

getItemCol()

Gets the value of itemCol or its default value.

getMaxIter()

Gets the value of maxIter or its default value.

getNonnegative()

Gets the value of nonnegative or its default value.

getNumItemBlocks()

Gets the value of numItemBlocks or its default value.

getNumUserBlocks()

Gets the value of numUserBlocks or its default value.

getOrDefault(param)

Gets the value of a param in the user-supplied param map or its default value.

getParam(paramName)

Gets a param by its name.

getPredictionCol()

Gets the value of predictionCol or its default value.

getRank()

Gets the value of rank or its default value.

getRatingCol()

Gets the value of ratingCol or its default value.

getRegParam()

Gets the value of regParam or its default value.

getSeed()

Gets the value of seed or its default value.

getUserCol()

Gets the value of userCol or its default value.

hasDefault(param)

Checks whether a param has a default value.

hasParam(paramName)

Tests whether this instance contains a param with a given (string) name.

isDefined(param)

Checks whether a param is explicitly set by user or has a default value.

isSet(param)

Checks whether a param is explicitly set by user.

load(path)

Reads an ML instance from the input path, a shortcut of read().load(path).

read()

Returns an MLReader instance for this class.

save(path)

Save this ML instance to the given path, a shortcut of ‘write().save(path)’.

set(param, value)

Sets a parameter in the embedded param map.

setAlpha(value)

Sets the value of alpha.

setBlockSize(value)

Sets the value of blockSize.

setCheckpointInterval(value)

Sets the value of checkpointInterval.

setColdStartStrategy(value)

Sets the value of coldStartStrategy.

setFinalStorageLevel(value)

Sets the value of finalStorageLevel.

setImplicitPrefs(value)

Sets the value of implicitPrefs.

setIntermediateStorageLevel(value)

Sets the value of intermediateStorageLevel.

setItemCol(value)

Sets the value of itemCol.

setMaxIter(value)

Sets the value of maxIter.

setNonnegative(value)

Sets the value of nonnegative.

setNumBlocks(value)

Sets both numUserBlocks and numItemBlocks to the specific value.

setNumItemBlocks(value)

Sets the value of numItemBlocks.

setNumUserBlocks(value)

Sets the value of numUserBlocks.

setParams(self, \*[, rank, maxIter, …])

Sets params for ALS.

setPredictionCol(value)

Sets the value of predictionCol.

setRank(value)

Sets the value of rank.

setRatingCol(value)

Sets the value of ratingCol.

setRegParam(value)

Sets the value of regParam.

setSeed(value)

Sets the value of seed.

setUserCol(value)

Sets the value of userCol.

write()

Returns an MLWriter instance for this ML instance.

Attributes

alpha

blockSize

checkpointInterval

coldStartStrategy

finalStorageLevel

implicitPrefs

intermediateStorageLevel

itemCol

maxIter

nonnegative

numItemBlocks

numUserBlocks

params

Returns all params ordered by name.

predictionCol

rank

ratingCol

regParam

seed

userCol

Methods Documentation

clear(param: pyspark.ml.param.Param) → None

Clears a param from the param map if it has been explicitly set.

copy(extra: Optional[ParamMap] = None) → JP

Creates a copy of this instance with the same uid and some extra params. This implementation first calls Params.copy and then make a copy of the companion Java pipeline component with extra params. So both the Python wrapper and the Java pipeline component get copied.

Parameters
extradict, optional

Extra parameters to copy to the new instance

Returns
JavaParams

Copy of this instance

explainParam(param: Union[str, pyspark.ml.param.Param]) → str

Explains a single param and returns its name, doc, and optional default value and user-supplied value in a string.

explainParams() → str

Returns the documentation of all params with their optionally default values and user-supplied values.

extractParamMap(extra: Optional[ParamMap] = None) → ParamMap

Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into a flat param map, where the latter value is used if there exist conflicts, i.e., with ordering: default param values < user-supplied values < extra.

Parameters
extradict, optional

extra param values

Returns
dict

merged param map

fit(dataset: pyspark.sql.dataframe.DataFrame, params: Union[ParamMap, List[ParamMap], Tuple[ParamMap], None] = None) → Union[M, List[M]]

Fits a model to the input dataset with optional parameters.

Parameters
datasetpyspark.sql.DataFrame

input dataset.

paramsdict or list or tuple, optional

an optional param map that overrides embedded params. If a list/tuple of param maps is given, this calls fit on each param map and returns a list of models.

Returns
Transformer or a list of Transformer

fitted model(s)

fitMultiple(dataset: pyspark.sql.dataframe.DataFrame, paramMaps: Sequence[ParamMap]) → Iterator[Tuple[int, M]]

Fits a model to the input dataset for each param map in paramMaps.

Parameters
datasetpyspark.sql.DataFrame

input dataset.

paramMapscollections.abc.Sequence

A Sequence of param maps.

Returns
_FitMultipleIterator

A thread safe iterable which contains one model for each param map. Each call to next(modelIterator) will return (index, model) where model was fit using paramMaps[index]. index values may not be sequential.

getAlpha() → float

Gets the value of alpha or its default value.

getBlockSize() → int

Gets the value of blockSize or its default value.

getCheckpointInterval() → int

Gets the value of checkpointInterval or its default value.

getColdStartStrategy() → str

Gets the value of coldStartStrategy or its default value.

getFinalStorageLevel() → str

Gets the value of finalStorageLevel or its default value.

getImplicitPrefs() → bool

Gets the value of implicitPrefs or its default value.

getIntermediateStorageLevel() → str

Gets the value of intermediateStorageLevel or its default value.

getItemCol() → str

Gets the value of itemCol or its default value.

getMaxIter() → int

Gets the value of maxIter or its default value.

getNonnegative() → bool

Gets the value of nonnegative or its default value.

getNumItemBlocks() → int

Gets the value of numItemBlocks or its default value.

getNumUserBlocks() → int

Gets the value of numUserBlocks or its default value.

getOrDefault(param: Union[str, pyspark.ml.param.Param[T]]) → Union[Any, T]

Gets the value of a param in the user-supplied param map or its default value. Raises an error if neither is set.

getParam(paramName: str)pyspark.ml.param.Param

Gets a param by its name.

getPredictionCol() → str

Gets the value of predictionCol or its default value.

getRank() → int

Gets the value of rank or its default value.

getRatingCol() → str

Gets the value of ratingCol or its default value.

getRegParam() → float

Gets the value of regParam or its default value.

getSeed() → int

Gets the value of seed or its default value.

getUserCol() → str

Gets the value of userCol or its default value.

hasDefault(param: Union[str, pyspark.ml.param.Param[Any]]) → bool

Checks whether a param has a default value.

hasParam(paramName: str) → bool

Tests whether this instance contains a param with a given (string) name.

isDefined(param: Union[str, pyspark.ml.param.Param[Any]]) → bool

Checks whether a param is explicitly set by user or has a default value.

isSet(param: Union[str, pyspark.ml.param.Param[Any]]) → bool

Checks whether a param is explicitly set by user.

classmethod load(path: str) → RL

Reads an ML instance from the input path, a shortcut of read().load(path).

classmethod read() → pyspark.ml.util.JavaMLReader[RL]

Returns an MLReader instance for this class.

save(path: str) → None

Save this ML instance to the given path, a shortcut of ‘write().save(path)’.

set(param: pyspark.ml.param.Param, value: Any) → None

Sets a parameter in the embedded param map.

setAlpha(value: float)pyspark.ml.recommendation.ALS

Sets the value of alpha.

setBlockSize(value: int)pyspark.ml.recommendation.ALS

Sets the value of blockSize.

setCheckpointInterval(value: int)pyspark.ml.recommendation.ALS

Sets the value of checkpointInterval.

setColdStartStrategy(value: str)pyspark.ml.recommendation.ALS

Sets the value of coldStartStrategy.

setFinalStorageLevel(value: str)pyspark.ml.recommendation.ALS

Sets the value of finalStorageLevel.

setImplicitPrefs(value: bool)pyspark.ml.recommendation.ALS

Sets the value of implicitPrefs.

setIntermediateStorageLevel(value: str)pyspark.ml.recommendation.ALS

Sets the value of intermediateStorageLevel.

setItemCol(value: str)pyspark.ml.recommendation.ALS

Sets the value of itemCol.

setMaxIter(value: int)pyspark.ml.recommendation.ALS

Sets the value of maxIter.

setNonnegative(value: bool)pyspark.ml.recommendation.ALS

Sets the value of nonnegative.

setNumBlocks(value: int)pyspark.ml.recommendation.ALS

Sets both numUserBlocks and numItemBlocks to the specific value.

setNumItemBlocks(value: int)pyspark.ml.recommendation.ALS

Sets the value of numItemBlocks.

setNumUserBlocks(value: int)pyspark.ml.recommendation.ALS

Sets the value of numUserBlocks.

setParams(self, \*, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, ratingCol="rating", nonnegative=False, checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)

Sets params for ALS.

setPredictionCol(value: str)pyspark.ml.recommendation.ALS

Sets the value of predictionCol.

setRank(value: int)pyspark.ml.recommendation.ALS

Sets the value of rank.

setRatingCol(value: str)pyspark.ml.recommendation.ALS

Sets the value of ratingCol.

setRegParam(value: float)pyspark.ml.recommendation.ALS

Sets the value of regParam.

setSeed(value: int)pyspark.ml.recommendation.ALS

Sets the value of seed.

setUserCol(value: str)pyspark.ml.recommendation.ALS

Sets the value of userCol.

write() → pyspark.ml.util.JavaMLWriter

Returns an MLWriter instance for this ML instance.

Attributes Documentation

alpha = Param(parent='undefined', name='alpha', doc='alpha for implicit preference')
blockSize = Param(parent='undefined', name='blockSize', doc='block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.')
checkpointInterval = Param(parent='undefined', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.')
coldStartStrategy = Param(parent='undefined', name='coldStartStrategy', doc="strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'.")
finalStorageLevel = Param(parent='undefined', name='finalStorageLevel', doc='StorageLevel for ALS model factors.')
implicitPrefs = Param(parent='undefined', name='implicitPrefs', doc='whether to use implicit preference')
intermediateStorageLevel = Param(parent='undefined', name='intermediateStorageLevel', doc="StorageLevel for intermediate datasets. Cannot be 'NONE'.")
itemCol = Param(parent='undefined', name='itemCol', doc='column name for item ids. Ids must be within the integer value range.')
maxIter = Param(parent='undefined', name='maxIter', doc='max number of iterations (>= 0).')
nonnegative = Param(parent='undefined', name='nonnegative', doc='whether to use nonnegative constraint for least squares')
numItemBlocks = Param(parent='undefined', name='numItemBlocks', doc='number of item blocks')
numUserBlocks = Param(parent='undefined', name='numUserBlocks', doc='number of user blocks')
params

Returns all params ordered by name. The default implementation uses dir() to get all attributes of type Param.

predictionCol = Param(parent='undefined', name='predictionCol', doc='prediction column name.')
rank = Param(parent='undefined', name='rank', doc='rank of the factorization')
ratingCol = Param(parent='undefined', name='ratingCol', doc='column name for ratings')
regParam = Param(parent='undefined', name='regParam', doc='regularization parameter (>= 0).')
seed = Param(parent='undefined', name='seed', doc='random seed.')
userCol = Param(parent='undefined', name='userCol', doc='column name for user ids. Ids must be within the integer value range.')