1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from numpy import array, ndarray
19 from pyspark import SparkContext
20 from pyspark.mllib._common import \
21 _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
22 _serialize_double_matrix, _deserialize_double_matrix, \
23 _serialize_double_vector, _deserialize_double_vector, \
24 _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
25 _linear_predictor_typecheck, _have_scipy, _scipy_issparse
26 from pyspark.mllib.linalg import SparseVector
30 """
31 The features and labels of a data point.
32
33 @param label: Label for this data point.
34 @param features: Vector of features for this point (NumPy array, list,
35 pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
36 """
38 self.label = label
39 if (type(features) == ndarray or type(features) == SparseVector
40 or (_have_scipy and _scipy_issparse(features))):
41 self.features = features
42 elif type(features) == list:
43 self.features = array(features)
44 else:
45 raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
46
49 """A linear model that has a vector of coefficients and an intercept."""
53
54 @property
57
58 @property
60 return self._intercept
61
64 """A linear regression model.
65
66 >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
67 >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
68 True
69 >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
70 True
71 """
73 """Predict the value of the dependent variable given a vector x"""
74 """containing values for the independent variables."""
75 _linear_predictor_typecheck(x, self._coeff)
76 return _dot(x, self._coeff) + self._intercept
77
80 """A linear regression model derived from a least-squares fit.
81
82 >>> from pyspark.mllib.regression import LabeledPoint
83 >>> data = [
84 ... LabeledPoint(0.0, [0.0]),
85 ... LabeledPoint(1.0, [1.0]),
86 ... LabeledPoint(3.0, [2.0]),
87 ... LabeledPoint(2.0, [3.0])
88 ... ]
89 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
90 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
91 True
92 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
93 True
94 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
95 True
96 >>> data = [
97 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
98 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
99 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
100 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
101 ... ]
102 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
103 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
104 True
105 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
106 True
107 """
108
111 @classmethod
112 - def train(cls, data, iterations=100, step=1.0,
113 miniBatchFraction=1.0, initialWeights=None):
114 """Train a linear regression model on the given data."""
115 sc = data.context
116 train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
117 d._jrdd, iterations, step, miniBatchFraction, i)
118 return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights)
119
122 """A linear regression model derived from a least-squares fit with an
123 l_1 penalty term.
124
125 >>> from pyspark.mllib.regression import LabeledPoint
126 >>> data = [
127 ... LabeledPoint(0.0, [0.0]),
128 ... LabeledPoint(1.0, [1.0]),
129 ... LabeledPoint(3.0, [2.0]),
130 ... LabeledPoint(2.0, [3.0])
131 ... ]
132 >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
133 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
134 True
135 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
136 True
137 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
138 True
139 >>> data = [
140 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
141 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
142 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
143 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
144 ... ]
145 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
146 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
147 True
148 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
149 True
150 """
151
154 @classmethod
155 - def train(cls, data, iterations=100, step=1.0, regParam=1.0,
156 miniBatchFraction=1.0, initialWeights=None):
157 """Train a Lasso regression model on the given data."""
158 sc = data.context
159 train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(
160 d._jrdd, iterations, step, regParam, miniBatchFraction, i)
161 return _regression_train_wrapper(sc, train_f, LassoModel, data, initialWeights)
162
165 """A linear regression model derived from a least-squares fit with an
166 l_2 penalty term.
167
168 >>> from pyspark.mllib.regression import LabeledPoint
169 >>> data = [
170 ... LabeledPoint(0.0, [0.0]),
171 ... LabeledPoint(1.0, [1.0]),
172 ... LabeledPoint(3.0, [2.0]),
173 ... LabeledPoint(2.0, [3.0])
174 ... ]
175 >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
176 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
177 True
178 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
179 True
180 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
181 True
182 >>> data = [
183 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
184 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
185 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
186 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
187 ... ]
188 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
189 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
190 True
191 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
192 True
193 """
194
197 @classmethod
198 - def train(cls, data, iterations=100, step=1.0, regParam=1.0,
199 miniBatchFraction=1.0, initialWeights=None):
200 """Train a ridge regression model on the given data."""
201 sc = data.context
202 train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(
203 d._jrdd, iterations, step, regParam, miniBatchFraction, i)
204 return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, data, initialWeights)
205
208 import doctest
209 globs = globals().copy()
210 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
211 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
212 globs['sc'].stop()
213 if failure_count:
214 exit(-1)
215
216 if __name__ == "__main__":
217 _test()
218