1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 >>> from pyspark.context import SparkContext
20 >>> sc = SparkContext('local', 'test')
21 >>> a = sc.accumulator(1)
22 >>> a.value
23 1
24 >>> a.value = 2
25 >>> a.value
26 2
27 >>> a += 5
28 >>> a.value
29 7
30
31 >>> sc.accumulator(1.0).value
32 1.0
33
34 >>> sc.accumulator(1j).value
35 1j
36
37 >>> rdd = sc.parallelize([1,2,3])
38 >>> def f(x):
39 ... global a
40 ... a += x
41 >>> rdd.foreach(f)
42 >>> a.value
43 13
44
45 >>> b = sc.accumulator(0)
46 >>> def g(x):
47 ... b.add(x)
48 >>> rdd.foreach(g)
49 >>> b.value
50 6
51
52 >>> from pyspark.accumulators import AccumulatorParam
53 >>> class VectorAccumulatorParam(AccumulatorParam):
54 ... def zero(self, value):
55 ... return [0.0] * len(value)
56 ... def addInPlace(self, val1, val2):
57 ... for i in xrange(len(val1)):
58 ... val1[i] += val2[i]
59 ... return val1
60 >>> va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())
61 >>> va.value
62 [1.0, 2.0, 3.0]
63 >>> def g(x):
64 ... global va
65 ... va += [x] * 3
66 >>> rdd.foreach(g)
67 >>> va.value
68 [7.0, 8.0, 9.0]
69
70 >>> rdd.map(lambda x: a.value).collect() # doctest: +IGNORE_EXCEPTION_DETAIL
71 Traceback (most recent call last):
72 ...
73 Py4JJavaError:...
74
75 >>> def h(x):
76 ... global a
77 ... a.value = 7
78 >>> rdd.foreach(h) # doctest: +IGNORE_EXCEPTION_DETAIL
79 Traceback (most recent call last):
80 ...
81 Py4JJavaError:...
82
83 >>> sc.accumulator([1.0, 2.0, 3.0]) # doctest: +IGNORE_EXCEPTION_DETAIL
84 Traceback (most recent call last):
85 ...
86 Exception:...
87 """
88
89 import struct
90 import SocketServer
91 import threading
92 from pyspark.cloudpickle import CloudPickler
93 from pyspark.serializers import read_int, PickleSerializer
94
95
96 pickleSer = PickleSerializer()
97
98
99
100 _accumulatorRegistry = {}
109
112 """
113 A shared variable that can be accumulated, i.e., has a commutative and associative "add"
114 operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=}
115 operator, but only the driver program is allowed to access its value, using C{value}.
116 Updates from the workers get propagated automatically to the driver program.
117
118 While C{SparkContext} supports accumulators for primitive data types like C{int} and
119 C{float}, users can also define accumulators for custom types by providing a custom
120 L{AccumulatorParam} object. Refer to the doctest of this module for an example.
121 """
122
123 - def __init__(self, aid, value, accum_param):
131
133 """Custom serialization; saves the zero value from our AccumulatorParam"""
134 param = self.accum_param
135 return (_deserialize_accumulator, (self.aid, param.zero(self._value), param))
136
137 @property
139 """Get the accumulator's value; only usable in driver program"""
140 if self._deserialized:
141 raise Exception("Accumulator.value cannot be accessed inside tasks")
142 return self._value
143
144 @value.setter
146 """Sets the accumulator's value; only usable in driver program"""
147 if self._deserialized:
148 raise Exception("Accumulator.value cannot be accessed inside tasks")
149 self._value = value
150
151 - def add(self, term):
152 """Adds a term to this accumulator's value"""
153 self._value = self.accum_param.addInPlace(self._value, term)
154
156 """The += operator; adds a term to this accumulator's value"""
157 self.add(term)
158 return self
159
161 return str(self._value)
162
164 return "Accumulator<id=%i, value=%s>" % (self.aid, self._value)
165
168 """
169 Helper object that defines how to accumulate values of a given type.
170 """
171
172 - def zero(self, value):
173 """
174 Provide a "zero value" for the type, compatible in dimensions with the
175 provided C{value} (e.g., a zero vector)
176 """
177 raise NotImplementedError
178
180 """
181 Add two values of the accumulator's data type, returning a new value;
182 for efficiency, can also update C{value1} in place and return it.
183 """
184 raise NotImplementedError
185
188 """
189 An AccumulatorParam that uses the + operators to add values. Designed for simple types
190 such as integers, floats, and lists. Requires the zero value for the underlying type
191 as a parameter.
192 """
193
195 self.zero_value = zero_value
196
197 - def zero(self, value):
198 return self.zero_value
199
201 value1 += value2
202 return value1
203
204
205
206 INT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0)
207 FLOAT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0)
208 COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)
220
223 """Start a TCP server to receive accumulator updates in a daemon thread, and returns it"""
224 server = SocketServer.TCPServer(("localhost", 0), _UpdateRequestHandler)
225 thread = threading.Thread(target=server.serve_forever)
226 thread.daemon = True
227 thread.start()
228 return server
229