1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Python package for statistical functions in MLlib.
20 """
21
22 from pyspark.mllib._common import \
23 _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \
24 _serialize_double, _serialize_double_vector, \
25 _deserialize_double, _deserialize_double_matrix, _deserialize_double_vector
29
30 """
31 Trait for multivariate statistical summary of a data matrix.
32 """
33
35 """
36 :param sc: Spark context
37 :param java_summary: Handle to Java summary object
38 """
39 self._sc = sc
40 self._java_summary = java_summary
41
44
46 return _deserialize_double_vector(self._java_summary.mean())
47
49 return _deserialize_double_vector(self._java_summary.variance())
50
52 return self._java_summary.count()
53
55 return _deserialize_double_vector(self._java_summary.numNonzeros())
56
58 return _deserialize_double_vector(self._java_summary.max())
59
61 return _deserialize_double_vector(self._java_summary.min())
62
65
66 @staticmethod
68 """
69 Computes column-wise summary statistics for the input RDD[Vector].
70
71 >>> from linalg import Vectors
72 >>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
73 ... Vectors.dense([4, 5, 0, 3]),
74 ... Vectors.dense([6, 7, 0, 8])])
75 >>> cStats = Statistics.colStats(rdd)
76 >>> cStats.mean()
77 array([ 4., 4., 0., 3.])
78 >>> cStats.variance()
79 array([ 4., 13., 0., 25.])
80 >>> cStats.count()
81 3L
82 >>> cStats.numNonzeros()
83 array([ 3., 2., 0., 3.])
84 >>> cStats.max()
85 array([ 6., 7., 0., 8.])
86 >>> cStats.min()
87 array([ 2., 0., 0., -2.])
88 """
89 sc = X.ctx
90 Xser = _get_unmangled_double_vector_rdd(X)
91 cStats = sc._jvm.PythonMLLibAPI().colStats(Xser._jrdd)
92 return MultivariateStatisticalSummary(sc, cStats)
93
94 @staticmethod
95 - def corr(x, y=None, method=None):
96 """
97 Compute the correlation (matrix) for the input RDD(s) using the
98 specified method.
99 Methods currently supported: I{pearson (default), spearman}.
100
101 If a single RDD of Vectors is passed in, a correlation matrix
102 comparing the columns in the input RDD is returned. Use C{method=}
103 to specify the method to be used for single RDD inout.
104 If two RDDs of floats are passed in, a single float is returned.
105
106 >>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
107 >>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
108 >>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
109 >>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
110 True
111 >>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
112 True
113 >>> Statistics.corr(x, y, "spearman")
114 0.5
115 >>> from math import isnan
116 >>> isnan(Statistics.corr(x, zeros))
117 True
118 >>> from linalg import Vectors
119 >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
120 ... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
121 >>> pearsonCorr = Statistics.corr(rdd)
122 >>> print str(pearsonCorr).replace('nan', 'NaN')
123 [[ 1. 0.05564149 NaN 0.40047142]
124 [ 0.05564149 1. NaN 0.91359586]
125 [ NaN NaN 1. NaN]
126 [ 0.40047142 0.91359586 NaN 1. ]]
127 >>> spearmanCorr = Statistics.corr(rdd, method="spearman")
128 >>> print str(spearmanCorr).replace('nan', 'NaN')
129 [[ 1. 0.10540926 NaN 0.4 ]
130 [ 0.10540926 1. NaN 0.9486833 ]
131 [ NaN NaN 1. NaN]
132 [ 0.4 0.9486833 NaN 1. ]]
133 >>> try:
134 ... Statistics.corr(rdd, "spearman")
135 ... print "Method name as second argument without 'method=' shouldn't be allowed."
136 ... except TypeError:
137 ... pass
138 """
139 sc = x.ctx
140
141
142
143 if type(y) == str:
144 raise TypeError("Use 'method=' to specify method name.")
145 if not y:
146 try:
147 Xser = _get_unmangled_double_vector_rdd(x)
148 except TypeError:
149 raise TypeError("corr called on a single RDD not consisted of Vectors.")
150 resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method)
151 return _deserialize_double_matrix(resultMat)
152 else:
153 xSer = _get_unmangled_rdd(x, _serialize_double)
154 ySer = _get_unmangled_rdd(y, _serialize_double)
155 result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, method)
156 return result
157
160 import doctest
161 from pyspark import SparkContext
162 globs = globals().copy()
163 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
164 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
165 globs['sc'].stop()
166 if failure_count:
167 exit(-1)
168
169
170 if __name__ == "__main__":
171 _test()
172