Skip to content

Commit

Permalink
use new protocol in mllib/stat
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 13, 2014
1 parent b30ef35 commit 52d1350
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,28 +356,26 @@ class PythonMLLibAPI extends Serializable {
* Java stub for mllib Statistics.colStats(X: RDD[Vector]).
* TODO figure out return type.
*/
def colStats(X: JavaRDD[Array[Byte]]): MultivariateStatisticalSummarySerialized = {
val cStats = Statistics.colStats(X.rdd.map(SerDe.deserializeDoubleVector(_)))
new MultivariateStatisticalSummarySerialized(cStats)
def colStats(rdd: JavaRDD[Any]): MultivariateStatisticalSummary = {
Statistics.colStats(rdd.rdd.map(_.asInstanceOf[Vector]))
}

/**
* Java stub for mllib Statistics.corr(X: RDD[Vector], method: String).
* Returns the correlation matrix serialized into a byte array understood by deserializers in
* pyspark.
*/
def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = {
val inputMatrix = X.rdd.map(SerDe.deserializeDoubleVector(_))
val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method))
SerDe.serializeDoubleMatrix(SerDe.to2dArray(result))
def corr(X: JavaRDD[Any], method: String): Matrix = {
val inputMatrix = X.rdd.map(_.asInstanceOf[Vector])
Statistics.corr(inputMatrix, getCorrNameOrDefault(method))
}

/**
* Java stub for mllib Statistics.corr(x: RDD[Double], y: RDD[Double], method: String).
*/
def corr(x: JavaRDD[Array[Byte]], y: JavaRDD[Array[Byte]], method: String): Double = {
val xDeser = x.rdd.map(SerDe.deserializeDouble(_))
val yDeser = y.rdd.map(SerDe.deserializeDouble(_))
def corr(x: JavaRDD[Any], y: JavaRDD[Any], method: String): Double = {
val xDeser = x.rdd.map(_.asInstanceOf[Double])
val yDeser = y.rdd.map(_.asInstanceOf[Double])
Statistics.corr(xDeser, yDeser, getCorrNameOrDefault(method))
}

Expand Down Expand Up @@ -483,27 +481,6 @@ class PythonMLLibAPI extends Serializable {

}

/**
* :: DeveloperApi ::
* MultivariateStatisticalSummary with Vector fields serialized.
*/
@DeveloperApi
class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
extends Serializable {

def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)

def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)

def count: Long = summary.count

def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros)

def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max)

def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min)
}

/**
* SerDe utility functions for PythonMLLibAPI.
*/
Expand Down Expand Up @@ -628,11 +605,11 @@ private[spark] object SerDe extends Serializable {
Unpickler.registerConstructor("pyspark.mllib.recommendation", "Rating", new RatingConstructor)
}

private[python] def dumps(obj: AnyRef): Array[Byte] = {
def dumps(obj: AnyRef): Array[Byte] = {
new Pickler().dumps(obj)
}

private[python] def loads(bytes: Array[Byte]): AnyRef = {
def loads(bytes: Array[Byte]): AnyRef = {
new Unpickler().loads(bytes)
}

Expand Down
181 changes: 143 additions & 38 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,76 @@
SciPy is available in their environment.
"""

import numpy
from numpy import array, array_equal, ndarray, float64, int32

import array
import numpy as np

__all__ = ['SparseVector', 'Vectors']


class SparseVector(object):
def _convert_to_vector(l):
from pyspark.mllib._common import _have_scipy, _scipy_issparse

if isinstance(l, Vector):
return l
elif type(l) in (array.array, np.array, list):
return DenseVector(l)
elif _have_scipy and _scipy_issparse(l):
assert l.shape[1] == 1, "Expected column vector"
csc = l.tocsc()
return SparseVector(l.shape[0], csc.indices, csc.data)
else:
raise TypeError("Expected array, NumPy array, list, SparseVector, or scipy.sparse matrix")
return l


class Vector(object):
pass


class DenseVector(Vector):
def __init__(self, ar):
self.array = array.array('d', ar)

def __reduce__(self):
return DenseVector, (self.array,)

def dot(self, other):
if len(self) != len(other):
raise ValueError("two Vectors should have same length")
if isinstance(other, SparseVector):
return other.dot(self)
# TODO improve it using numpy
n = len(self.array)
return sum(self[i] * other[i] for i in xrange(n))

def squared_distance(self, other):
if len(self) != len(other):
raise ValueError("two Vectors should have same length")
if isinstance(other, SparseVector):
return other.squared_distance(self)
n = len(self)
return sum((self[i] - other[i]) ** 2 for i in xrange(n))

def toArray(self):
return np.array(self.array)

def __getitem__(self, item):
return self.array[item]

def __len__(self):
return len(self.array)

def __str__(self):
return str(self.array)

def __repr__(self):
return "DenseVector(%r)" % self.array

def __getattr__(self, item):
return getattr(self.array, item)


class SparseVector(Vector):

"""
A simple sparse vector class for passing data to MLlib. Users may
Expand Down Expand Up @@ -61,43 +123,53 @@ def __init__(self, size, *args):
if type(pairs) == dict:
pairs = pairs.items()
pairs = sorted(pairs)
self.indices = array([p[0] for p in pairs], dtype=int32)
self.values = array([p[1] for p in pairs], dtype=float64)
self.indices = array.array('i', [p[0] for p in pairs])
self.values = array.array('d', [p[1] for p in pairs])
else:
assert len(args[0]) == len(args[1]), "index and value arrays not same length"
self.indices = array(args[0], dtype=int32)
self.values = array(args[1], dtype=float64)
self.indices = array.array('i', args[0])
self.values = array.array('d', args[1])
for i in xrange(len(self.indices) - 1):
if self.indices[i] >= self.indices[i + 1]:
raise TypeError("indices array must be sorted")

def __reduce__(self):
return (SparseVector, (self.size, self.indices, self.values))

def dot(self, other):
"""
Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.dot(a)
25.0
>>> a.dot(array([1., 2., 3., 4.]))
>>> a.dot(array.array('d', [1., 2., 3., 4.]))
22.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.dot(b)
0.0
>>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
>>> a.dot(np.array([[1, 1], [2, 2], [3, 3], [4, 4]]))
array([ 22., 22.])
"""
if type(other) == ndarray:
if type(other) == np.ndarray:
if other.ndim == 1:
result = 0.0
for i in xrange(len(self.indices)):
result += self.values[i] * other[self.indices[i]]
return result
elif other.ndim == 2:
results = [self.dot(other[:, i]) for i in xrange(other.shape[1])]
return array(results)
return np.array(results)
else:
raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
else:

elif type(other) in (array.array, DenseVector):
result = 0.0
for i in xrange(len(self.indices)):
result += self.values[i] * other[self.indices[i]]
return result

elif type(other) is SparseVector:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
Expand All @@ -110,6 +182,8 @@ def dot(self, other):
else:
j += 1
return result
else:
raise TypeError("unexpected type: %s" % type(other))

def squared_distance(self, other):
"""
Expand All @@ -118,30 +192,32 @@ def squared_distance(self, other):
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.squared_distance(a)
0.0
>>> a.squared_distance(array([1., 2., 3., 4.]))
>>> a.squared_distance(array.array('d', [1., 2., 3., 4.]))
11.0
>>> a.squared_distance(np.array([1., 2., 3., 4.]))
11.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.squared_distance(b)
30.0
>>> b.squared_distance(a)
30.0
"""
if type(other) == ndarray:
if other.ndim == 1:
result = 0.0
j = 0 # index into our own array
for i in xrange(other.shape[0]):
if j < len(self.indices) and self.indices[j] == i:
diff = self.values[j] - other[i]
result += diff * diff
j += 1
else:
result += other[i] * other[i]
return result
else:
if type(other) in (list, array.array, DenseVector, np.array, np.ndarray):
if type(other) is np.array and other.ndim != 1:
raise Exception("Cannot call squared_distance with %d-dimensional array" %
other.ndim)
else:
result = 0.0
j = 0 # index into our own array
for i in xrange(len(other)):
if j < len(self.indices) and self.indices[j] == i:
diff = self.values[j] - other[i]
result += diff * diff
j += 1
else:
result += other[i] * other[i]
return result

elif type(other) is SparseVector:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
Expand All @@ -163,16 +239,21 @@ def squared_distance(self, other):
result += other.values[j] * other.values[j]
j += 1
return result
else:
raise TypeError("unexpected type: %s" % type(other))

def toArray(self):
"""
Returns a copy of this SparseVector as a 1-dimensional NumPy array.
"""
arr = numpy.zeros(self.size)
arr = np.zeros((self.size,), dtype=np.float64)
for i in xrange(self.indices.size):
arr[self.indices[i]] = self.values[i]
return arr

def __len__(self):
return self.size

def __str__(self):
inds = "[" + ",".join([str(i) for i in self.indices]) + "]"
vals = "[" + ",".join([str(v) for v in self.values]) + "]"
Expand All @@ -198,8 +279,8 @@ def __eq__(self, other):

return (isinstance(other, self.__class__)
and other.size == self.size
and array_equal(other.indices, self.indices)
and array_equal(other.values, self.values))
and other.indices == self.indices
and other.values == self.values)

def __ne__(self, other):
return not self.__eq__(other)
Expand Down Expand Up @@ -242,9 +323,9 @@ def dense(elements):
returns a NumPy array.
>>> Vectors.dense([1, 2, 3])
array([ 1., 2., 3.])
DenseVector(array('d', [1.0, 2.0, 3.0]))
"""
return array(elements, dtype=float64)
return DenseVector(elements)

@staticmethod
def stringify(vector):
Expand All @@ -255,12 +336,36 @@ def stringify(vector):
>>> Vectors.stringify(Vectors.sparse(2, [1], [1.0]))
'(2,[1],[1.0])'
>>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
'[0.0,1.0]'
"array('d', [0.0, 1.0])"
"""
if type(vector) == SparseVector:
return str(vector)
else:
return "[" + ",".join([str(v) for v in vector]) + "]"
return str(vector)


class Matrix(object):
pass


class DenseMatrix(Matrix):
def __init__(self, nRow, nCol, values):
assert len(values) == nRow * nCol
self.nRow = nRow
self.nCol = nCol
self.values = values

def __reduce__(self):
return DenseMatrix, (self.nRow, self.nCol, self.values)

def toArray(self):
"""
Return an numpy.ndarray
>>> arr = array.array('d', [float(i) for i in range(4)])
>>> m = DenseMatrix(2, 2, arr)
>>> m.toArray()
array([[ 0., 1.],
[ 2., 3.]])
"""
return np.ndarray((self.nRow, self.nCol), np.float64, buffer=self.values.tostring())


def _test():
Expand Down
Loading

0 comments on commit 52d1350

Please sign in to comment.