1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from numpy import array, ndarray
19 from pyspark import SparkContext
20 from pyspark.mllib._common import \
21 _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
22 _serialize_double_matrix, _deserialize_double_matrix, \
23 _serialize_double_vector, _deserialize_double_vector, \
24 _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
25 _linear_predictor_typecheck, _have_scipy, _scipy_issparse
26 from pyspark.mllib.linalg import SparseVector, Vectors
30
31 """
32 The features and labels of a data point.
33
34 @param label: Label for this data point.
35 @param features: Vector of features for this point (NumPy array, list,
36 pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
37 """
38
40 self.label = label
41 if (type(features) == ndarray or type(features) == SparseVector
42 or (_have_scipy and _scipy_issparse(features))):
43 self.features = features
44 elif type(features) == list:
45 self.features = array(features)
46 else:
47 raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
48
51
54
55 """A linear model that has a vector of coefficients and an intercept."""
56
60
61 @property
64
65 @property
67 return self._intercept
68
71
72 """A linear regression model.
73
74 >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
75 >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
76 True
77 >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
78 True
79 """
80
82 """Predict the value of the dependent variable given a vector x"""
83 """containing values for the independent variables."""
84 _linear_predictor_typecheck(x, self._coeff)
85 return _dot(x, self._coeff) + self._intercept
86
89
90 """A linear regression model derived from a least-squares fit.
91
92 >>> from pyspark.mllib.regression import LabeledPoint
93 >>> data = [
94 ... LabeledPoint(0.0, [0.0]),
95 ... LabeledPoint(1.0, [1.0]),
96 ... LabeledPoint(3.0, [2.0]),
97 ... LabeledPoint(2.0, [3.0])
98 ... ]
99 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
100 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
101 True
102 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
103 True
104 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
105 True
106 >>> data = [
107 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
108 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
109 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
110 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
111 ... ]
112 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
113 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
114 True
115 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
116 True
117 """
118
121
122 @classmethod
123 - def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
124 initialWeights=None, regParam=1.0, regType=None, intercept=False):
125 """
126 Train a linear regression model on the given data.
127
128 @param data: The training data.
129 @param iterations: The number of iterations (default: 100).
130 @param step: The step parameter used in SGD
131 (default: 1.0).
132 @param miniBatchFraction: Fraction of data to be used for each SGD
133 iteration.
134 @param initialWeights: The initial weights (default: None).
135 @param regParam: The regularizer parameter (default: 1.0).
136 @param regType: The type of regularizer used for training
137 our model.
138 Allowed values: "l1" for using L1Updater,
139 "l2" for using
140 SquaredL2Updater,
141 "none" for no regularizer.
142 (default: "none")
143 @param intercept: Boolean parameter which indicates the use
144 or not of the augmented representation for
145 training data (i.e. whether bias features
146 are activated or not).
147 """
148 sc = data.context
149 if regType is None:
150 regType = "none"
151 train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
152 d._jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
153 return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights)
154
157
158 """A linear regression model derived from a least-squares fit with an
159 l_1 penalty term.
160
161 >>> from pyspark.mllib.regression import LabeledPoint
162 >>> data = [
163 ... LabeledPoint(0.0, [0.0]),
164 ... LabeledPoint(1.0, [1.0]),
165 ... LabeledPoint(3.0, [2.0]),
166 ... LabeledPoint(2.0, [3.0])
167 ... ]
168 >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
169 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
170 True
171 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
172 True
173 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
174 True
175 >>> data = [
176 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
177 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
178 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
179 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
180 ... ]
181 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
182 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
183 True
184 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
185 True
186 """
187
190
191 @classmethod
192 - def train(cls, data, iterations=100, step=1.0, regParam=1.0,
193 miniBatchFraction=1.0, initialWeights=None):
194 """Train a Lasso regression model on the given data."""
195 sc = data.context
196 train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(
197 d._jrdd, iterations, step, regParam, miniBatchFraction, i)
198 return _regression_train_wrapper(sc, train_f, LassoModel, data, initialWeights)
199
202
203 """A linear regression model derived from a least-squares fit with an
204 l_2 penalty term.
205
206 >>> from pyspark.mllib.regression import LabeledPoint
207 >>> data = [
208 ... LabeledPoint(0.0, [0.0]),
209 ... LabeledPoint(1.0, [1.0]),
210 ... LabeledPoint(3.0, [2.0]),
211 ... LabeledPoint(2.0, [3.0])
212 ... ]
213 >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
214 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
215 True
216 >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
217 True
218 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
219 True
220 >>> data = [
221 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
222 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
223 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
224 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
225 ... ]
226 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
227 >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
228 True
229 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
230 True
231 """
232
235
236 @classmethod
237 - def train(cls, data, iterations=100, step=1.0, regParam=1.0,
238 miniBatchFraction=1.0, initialWeights=None):
239 """Train a ridge regression model on the given data."""
240 sc = data.context
241 train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(
242 d._jrdd, iterations, step, regParam, miniBatchFraction, i)
243 return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, data, initialWeights)
244
247 import doctest
248 globs = globals().copy()
249 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
250 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
251 globs['sc'].stop()
252 if failure_count:
253 exit(-1)
254
255 if __name__ == "__main__":
256 _test()
257