RowMatrix

class pyspark.mllib.linalg.distributed.RowMatrix(rows: Union[pyspark.rdd.RDD[pyspark.mllib.linalg.Vector], pyspark.sql.dataframe.DataFrame], numRows: int = 0, numCols: int = 0)

Represents a row-oriented distributed Matrix with no meaningful row indices.

Parameters
rowspyspark.RDD or pyspark.sql.DataFrame

An RDD or DataFrame of vectors. If a DataFrame is provided, it must have a single vector typed column.

numRowsint, optional

Number of rows in the matrix. A non-positive value means unknown, at which point the number of rows will be determined by the number of records in the rows RDD.

numColsint, optional

Number of columns in the matrix. A non-positive value means unknown, at which point the number of columns will be determined by the size of the first row.

Methods

columnSimilarities([threshold])

Compute similarities between columns of this matrix.

computeColumnSummaryStatistics()

Computes column-wise summary statistics.

computeCovariance()

Computes the covariance matrix, treating each row as an observation.

computeGramianMatrix()

Computes the Gramian matrix A^T A.

computePrincipalComponents(k)

Computes the k principal components of the given row matrix

computeSVD(k[, computeU, rCond])

Computes the singular value decomposition of the RowMatrix.

multiply(matrix)

Multiply this matrix by a local dense matrix on the right.

numCols()

Get or compute the number of cols.

numRows()

Get or compute the number of rows.

tallSkinnyQR([computeQ])

Compute the QR decomposition of this RowMatrix.

Attributes

rows

Rows of the RowMatrix stored as an RDD of vectors.

Methods Documentation

columnSimilarities(threshold: float = 0.0)pyspark.mllib.linalg.distributed.CoordinateMatrix

Compute similarities between columns of this matrix.

The threshold parameter is a trade-off knob between estimate quality and computational cost.

The default threshold setting of 0 guarantees deterministically correct results, but uses the brute-force approach of computing normalized dot products.

Setting the threshold to positive values uses a sampling approach and incurs strictly less computational cost than the brute-force approach. However the similarities computed will be estimates.

The sampling guarantees relative-error correctness for those pairs of columns that have similarity greater than the given similarity threshold.

To describe the guarantee, we set some notation:

  • Let A be the smallest in magnitude non-zero element of this matrix.

  • Let B be the largest in magnitude non-zero element of this matrix.

  • Let L be the maximum number of non-zeros per row.

For example, for {0,1} matrices: A=B=1. Another example, for the Netflix matrix: A=1, B=5

For those column pairs that are above the threshold, the computed similarity is correct to within 20% relative error with probability at least 1 - (0.981)^10/B^

The shuffle size is bounded by the smaller of the following two expressions:

  • O(n log(n) L / (threshold * A))

  • O(m L^2^)

The latter is the cost of the brute-force approach, so for non-zero thresholds, the cost is always cheaper than the brute-force approach.

Parameters
thresholdfloat, optional

Set to 0 for deterministic guaranteed correctness. Similarities above this threshold are estimated with the cost vs estimate quality trade-off described above.

Returns
CoordinateMatrix

An n x n sparse upper-triangular CoordinateMatrix of cosine similarities between columns of this matrix.

Examples

>>> rows = sc.parallelize([[1, 2], [1, 5]])
>>> mat = RowMatrix(rows)
>>> sims = mat.columnSimilarities()
>>> sims.entries.first().value
0.91914503...
computeColumnSummaryStatistics() → pyspark.mllib.stat._statistics.MultivariateStatisticalSummary

Computes column-wise summary statistics.

Returns
MultivariateStatisticalSummary

object containing column-wise summary statistics.

Examples

>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
>>> mat = RowMatrix(rows)
>>> colStats = mat.computeColumnSummaryStatistics()
>>> colStats.mean()
array([ 2.5,  3.5,  4.5])
computeCovariance()pyspark.mllib.linalg.Matrix

Computes the covariance matrix, treating each row as an observation.

Notes

This cannot be computed on matrices with more than 65535 columns.

Examples

>>> rows = sc.parallelize([[1, 2], [2, 1]])
>>> mat = RowMatrix(rows)
>>> mat.computeCovariance()
DenseMatrix(2, 2, [0.5, -0.5, -0.5, 0.5], 0)
computeGramianMatrix()pyspark.mllib.linalg.Matrix

Computes the Gramian matrix A^T A.

Notes

This cannot be computed on matrices with more than 65535 columns.

Examples

>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
>>> mat = RowMatrix(rows)
>>> mat.computeGramianMatrix()
DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
computePrincipalComponents(k: int)pyspark.mllib.linalg.Matrix

Computes the k principal components of the given row matrix

Parameters
kint

Number of principal components to keep.

Returns
pyspark.mllib.linalg.DenseMatrix

Notes

This cannot be computed on matrices with more than 65535 columns.

Examples

>>> rows = sc.parallelize([[1, 2, 3], [2, 4, 5], [3, 6, 1]])
>>> rm = RowMatrix(rows)
>>> # Returns the two principal components of rm
>>> pca = rm.computePrincipalComponents(2)
>>> pca
DenseMatrix(3, 2, [-0.349, -0.6981, 0.6252, -0.2796, -0.5592, -0.7805], 0)
>>> # Transform into new dimensions with the greatest variance.
>>> rm.multiply(pca).rows.collect() 
[DenseVector([0.1305, -3.7394]), DenseVector([-0.3642, -6.6983]),         DenseVector([-4.6102, -4.9745])]
computeSVD(k: int, computeU: bool = False, rCond: float = 1e-09)pyspark.mllib.linalg.distributed.SingularValueDecomposition[pyspark.mllib.linalg.distributed.RowMatrix, pyspark.mllib.linalg.Matrix]

Computes the singular value decomposition of the RowMatrix.

The given row matrix A of dimension (m X n) is decomposed into U * s * V’T where

  • U: (m X k) (left singular vectors) is a RowMatrix whose columns are the eigenvectors of (A X A’)

  • s: DenseVector consisting of square root of the eigenvalues (singular values) in descending order.

  • v: (n X k) (right singular vectors) is a Matrix whose columns are the eigenvectors of (A’ X A)

For more specific details on implementation, please refer the Scala documentation.

Parameters
kint

Number of leading singular values to keep (0 < k <= n). It might return less than k if there are numerically zero singular values or there are not enough Ritz values converged before the maximum number of Arnoldi update iterations is reached (in case that matrix A is ill-conditioned).

computeUbool, optional

Whether or not to compute U. If set to be True, then U is computed by A * V * s^-1

rCondfloat, optional

Reciprocal condition number. All singular values smaller than rCond * s[0] are treated as zero where s[0] is the largest singular value.

Returns
SingularValueDecomposition

Examples

>>> rows = sc.parallelize([[3, 1, 1], [-1, 3, 1]])
>>> rm = RowMatrix(rows)
>>> svd_model = rm.computeSVD(2, True)
>>> svd_model.U.rows.collect()
[DenseVector([-0.7071, 0.7071]), DenseVector([-0.7071, -0.7071])]
>>> svd_model.s
DenseVector([3.4641, 3.1623])
>>> svd_model.V
DenseMatrix(3, 2, [-0.4082, -0.8165, -0.4082, 0.8944, -0.4472, ...0.0], 0)
multiply(matrix: pyspark.mllib.linalg.Matrix)pyspark.mllib.linalg.distributed.RowMatrix

Multiply this matrix by a local dense matrix on the right.

Parameters
matrixpyspark.mllib.linalg.Matrix

a local dense matrix whose number of rows must match the number of columns of this matrix

Returns
RowMatrix

Examples

>>> rm = RowMatrix(sc.parallelize([[0, 1], [2, 3]]))
>>> rm.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.collect()
[DenseVector([2.0, 3.0]), DenseVector([6.0, 11.0])]
numCols() → int

Get or compute the number of cols.

Examples

>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
...                        [7, 8, 9], [10, 11, 12]])
>>> mat = RowMatrix(rows)
>>> print(mat.numCols())
3
>>> mat = RowMatrix(rows, 7, 6)
>>> print(mat.numCols())
6
numRows() → int

Get or compute the number of rows.

Examples

>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
...                        [7, 8, 9], [10, 11, 12]])
>>> mat = RowMatrix(rows)
>>> print(mat.numRows())
4
>>> mat = RowMatrix(rows, 7, 6)
>>> print(mat.numRows())
7
tallSkinnyQR(computeQ: bool = False)pyspark.mllib.linalg.QRDecomposition[Optional[pyspark.mllib.linalg.distributed.RowMatrix], pyspark.mllib.linalg.Matrix]

Compute the QR decomposition of this RowMatrix.

The implementation is designed to optimize the QR decomposition (factorization) for the RowMatrix of a tall and skinny shape [1].

1

Paul G. Constantine, David F. Gleich. “Tall and skinny QR factorizations in MapReduce architectures” https://doi.org/10.1145/1996092.1996103

Parameters
computeQbool, optional

whether to computeQ

Returns
pyspark.mllib.linalg.QRDecomposition

QRDecomposition(Q: RowMatrix, R: Matrix), where Q = None if computeQ = false.

Examples

>>> rows = sc.parallelize([[3, -6], [4, -8], [0, 1]])
>>> mat = RowMatrix(rows)
>>> decomp = mat.tallSkinnyQR(True)
>>> Q = decomp.Q
>>> R = decomp.R
>>> # Test with absolute values
>>> absQRows = Q.rows.map(lambda row: abs(row.toArray()).tolist())
>>> absQRows.collect()
[[0.6..., 0.0], [0.8..., 0.0], [0.0, 1.0]]
>>> # Test with absolute values
>>> abs(R.toArray()).tolist()
[[5.0, 10.0], [0.0, 1.0]]

Attributes Documentation

rows

Rows of the RowMatrix stored as an RDD of vectors.

Examples

>>> mat = RowMatrix(sc.parallelize([[1, 2, 3], [4, 5, 6]]))
>>> rows = mat.rows
>>> rows.first()
DenseVector([1.0, 2.0, 3.0])