StreamingKMeansModel¶
-
class
pyspark.mllib.clustering.
StreamingKMeansModel
(clusterCenters: List[VectorLike], clusterWeights: VectorLike)¶ Clustering model which can perform an online update of the centroids.
The update formula for each centroid is given by
c_t+1 = ((c_t * n_t * a) + (x_t * m_t)) / (n_t + m_t)
n_t+1 = n_t * a + m_t
where
c_t: Centroid at the n_th iteration.
n_t: Number of samples (or) weights associated with the centroid at the n_th iteration.
x_t: Centroid of the new data closest to c_t.
m_t: Number of samples (or) weights of the new data closest to c_t
c_t+1: New centroid.
n_t+1: New number of weights.
a: Decay Factor, which gives the forgetfulness.
- Parameters
- clusterCenterslist of
pyspark.mllib.linalg.Vector
or covertible Initial cluster centers.
- clusterWeights
pyspark.mllib.linalg.Vector
or covertible List of weights assigned to each cluster.
- clusterCenterslist of
Notes
If a is set to 1, it is the weighted mean of the previous and new data. If it set to zero, the old centroids are completely forgotten.
Examples
>>> initCenters = [[0.0, 0.0], [1.0, 1.0]] >>> initWeights = [1.0, 1.0] >>> stkm = StreamingKMeansModel(initCenters, initWeights) >>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1], ... [0.9, 0.9], [1.1, 1.1]]) >>> stkm = stkm.update(data, 1.0, "batches") >>> stkm.centers array([[ 0., 0.], [ 1., 1.]]) >>> stkm.predict([-0.1, -0.1]) 0 >>> stkm.predict([0.9, 0.9]) 1 >>> stkm.clusterWeights [3.0, 3.0] >>> decayFactor = 0.0 >>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])]) >>> stkm = stkm.update(data, 0.0, "batches") >>> stkm.centers array([[ 0.2, 0.2], [ 1.5, 1.5]]) >>> stkm.clusterWeights [1.0, 1.0] >>> stkm.predict([0.2, 0.2]) 0 >>> stkm.predict([1.5, 1.5]) 1
Methods
computeCost
(rdd)Return the K-means cost (sum of squared distances of points to their nearest center) for this model on the given data.
load
(sc, path)Load a model from the given path.
predict
(x)Find the cluster that each of the points belongs to in this model.
save
(sc, path)Save this model to the given path.
update
(data, decayFactor, timeUnit)Update the centroids, according to data
Attributes
Get the cluster centers, represented as a list of NumPy arrays.
Return the cluster weights.
Total number of clusters.
Methods Documentation
-
computeCost
(rdd: pyspark.rdd.RDD[VectorLike]) → float¶ Return the K-means cost (sum of squared distances of points to their nearest center) for this model on the given data.
- Parameters
- rdd:
pyspark.RDD
The RDD of points to compute the cost on.
- rdd:
-
classmethod
load
(sc: pyspark.context.SparkContext, path: str) → pyspark.mllib.clustering.KMeansModel¶ Load a model from the given path.
-
predict
(x: Union[VectorLike, pyspark.rdd.RDD[VectorLike]]) → Union[int, pyspark.rdd.RDD[int]]¶ Find the cluster that each of the points belongs to in this model.
- Parameters
- x
pyspark.mllib.linalg.Vector
orpyspark.RDD
A data point (or RDD of points) to determine cluster index.
pyspark.mllib.linalg.Vector
can be replaced with equivalent objects (list, tuple, numpy.ndarray).
- x
- Returns
- int or
pyspark.RDD
of int Predicted cluster index or an RDD of predicted cluster indices if the input is an RDD.
- int or
-
save
(sc: pyspark.context.SparkContext, path: str) → None¶ Save this model to the given path.
-
update
(data: pyspark.rdd.RDD[VectorLike], decayFactor: float, timeUnit: str) → StreamingKMeansModel¶ Update the centroids, according to data
- Parameters
- data
pyspark.RDD
RDD with new data for the model update.
- decayFactorfloat
Forgetfulness of the previous centroids.
- timeUnitstr
Can be “batches” or “points”. If points, then the decay factor is raised to the power of number of new points and if batches, then decay factor will be used as is.
- data
Attributes Documentation
-
clusterCenters
¶ Get the cluster centers, represented as a list of NumPy arrays.
-
clusterWeights
¶ Return the cluster weights.
-
k
¶ Total number of clusters.