1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import numpy
19
20 from numpy import array, shape
21 from pyspark import SparkContext
22 from pyspark.mllib._common import \
23 _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
24 _serialize_double_matrix, _deserialize_double_matrix, \
25 _serialize_double_vector, _deserialize_double_vector, \
26 _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
27 _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
28 from pyspark.mllib.linalg import SparseVector
29 from pyspark.mllib.regression import LabeledPoint, LinearModel
30 from math import exp, log
34 """A linear binary classification model derived from logistic regression.
35
36 >>> data = [
37 ... LabeledPoint(0.0, [0.0]),
38 ... LabeledPoint(1.0, [1.0]),
39 ... LabeledPoint(1.0, [2.0]),
40 ... LabeledPoint(1.0, [3.0])
41 ... ]
42 >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
43 >>> lrm.predict(array([1.0])) > 0
44 True
45 >>> lrm.predict(array([0.0])) <= 0
46 True
47 >>> sparse_data = [
48 ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
49 ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
50 ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
51 ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
52 ... ]
53 >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
54 >>> lrm.predict(array([0.0, 1.0])) > 0
55 True
56 >>> lrm.predict(array([0.0, 0.0])) <= 0
57 True
58 >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
59 True
60 >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
61 True
62 """
64 _linear_predictor_typecheck(x, self._coeff)
65 margin = _dot(x, self._coeff) + self._intercept
66 prob = 1/(1 + exp(-margin))
67 return 1 if prob > 0.5 else 0
68
71 @classmethod
72 - def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None):
73 """Train a logistic regression model on the given data."""
74 sc = data.context
75 train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(
76 d._jrdd, iterations, step, miniBatchFraction, i)
77 return _regression_train_wrapper(sc, train_func, LogisticRegressionModel, data,
78 initialWeights)
79
82 """A support vector machine.
83
84 >>> data = [
85 ... LabeledPoint(0.0, [0.0]),
86 ... LabeledPoint(1.0, [1.0]),
87 ... LabeledPoint(1.0, [2.0]),
88 ... LabeledPoint(1.0, [3.0])
89 ... ]
90 >>> svm = SVMWithSGD.train(sc.parallelize(data))
91 >>> svm.predict(array([1.0])) > 0
92 True
93 >>> sparse_data = [
94 ... LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
95 ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
96 ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
97 ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
98 ... ]
99 >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
100 >>> svm.predict(SparseVector(2, {1: 1.0})) > 0
101 True
102 >>> svm.predict(SparseVector(2, {0: -1.0})) <= 0
103 True
104 """
106 _linear_predictor_typecheck(x, self._coeff)
107 margin = _dot(x, self._coeff) + self._intercept
108 return 1 if margin >= 0 else 0
109
112 @classmethod
113 - def train(cls, data, iterations=100, step=1.0, regParam=1.0,
114 miniBatchFraction=1.0, initialWeights=None):
115 """Train a support vector machine on the given data."""
116 sc = data.context
117 train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(
118 d._jrdd, iterations, step, regParam, miniBatchFraction, i)
119 return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights)
120
123 """
124 Model for Naive Bayes classifiers.
125
126 Contains two parameters:
127 - pi: vector of logs of class priors (dimension C)
128 - theta: matrix of logs of class conditional probabilities (CxD)
129
130 >>> data = [
131 ... LabeledPoint(0.0, [0.0, 0.0]),
132 ... LabeledPoint(0.0, [0.0, 1.0]),
133 ... LabeledPoint(1.0, [1.0, 0.0]),
134 ... ]
135 >>> model = NaiveBayes.train(sc.parallelize(data))
136 >>> model.predict(array([0.0, 1.0]))
137 0.0
138 >>> model.predict(array([1.0, 0.0]))
139 1.0
140 >>> sparse_data = [
141 ... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
142 ... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
143 ... LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
144 ... ]
145 >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
146 >>> model.predict(SparseVector(2, {1: 1.0}))
147 0.0
148 >>> model.predict(SparseVector(2, {0: 1.0}))
149 1.0
150 """
151
153 self.labels = labels
154 self.pi = pi
155 self.theta = theta
156
158 """Return the most likely class for a data vector x"""
159 return self.labels[numpy.argmax(self.pi + _dot(x, self.theta.transpose()))]
160
163 @classmethod
164 - def train(cls, data, lambda_=1.0):
165 """
166 Train a Naive Bayes model given an RDD of (label, features) vectors.
167
168 This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can
169 handle all kinds of discrete data. For example, by converting
170 documents into TF-IDF vectors, it can be used for document
171 classification. By making every vector a 0-1 vector, it can also be
172 used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
173
174 @param data: RDD of NumPy vectors, one per element, where the first
175 coordinate is the label and the rest is the feature vector
176 (e.g. a count vector).
177 @param lambda_: The smoothing parameter
178 """
179 sc = data.context
180 dataBytes = _get_unmangled_labeled_point_rdd(data)
181 ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
182 return NaiveBayesModel(
183 _deserialize_double_vector(ans[0]),
184 _deserialize_double_vector(ans[1]),
185 _deserialize_double_matrix(ans[2]))
186
189 import doctest
190 globs = globals().copy()
191 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
192 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
193 globs['sc'].stop()
194 if failure_count:
195 exit(-1)
196
197 if __name__ == "__main__":
198 _test()
199