1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 PySpark supports custom serializers for transferring data; this can improve
20 performance.
21
22 By default, PySpark uses L{PickleSerializer} to serialize objects using Python's
23 C{cPickle} serializer, which can serialize nearly any Python object.
24 Other serializers, like L{MarshalSerializer}, support fewer datatypes but can be
25 faster.
26
27 The serializer is chosen when creating L{SparkContext}:
28
29 >>> from pyspark.context import SparkContext
30 >>> from pyspark.serializers import MarshalSerializer
31 >>> sc = SparkContext('local', 'test', serializer=MarshalSerializer())
32 >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10)
33 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
34 >>> sc.stop()
35
36 By default, PySpark serialize objects in batches; the batch size can be
37 controlled through SparkContext's C{batchSize} parameter
38 (the default size is 1024 objects):
39
40 >>> sc = SparkContext('local', 'test', batchSize=2)
41 >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
42
43 Behind the scenes, this creates a JavaRDD with four partitions, each of
44 which contains two batches of two objects:
45
46 >>> rdd.glom().collect()
47 [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
48 >>> rdd._jrdd.count()
49 8L
50 >>> sc.stop()
51
52 A batch size of -1 uses an unlimited batch size, and a size of 1 disables
53 batching:
54
55 >>> sc = SparkContext('local', 'test', batchSize=1)
56 >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
57 >>> rdd.glom().collect()
58 [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
59 >>> rdd._jrdd.count()
60 16L
61 """
62
63 import cPickle
64 from itertools import chain, izip, product
65 import marshal
66 import struct
67 import sys
68 from pyspark import cloudpickle
69
70
71 __all__ = ["PickleSerializer", "MarshalSerializer"]
72
73
75 END_OF_DATA_SECTION = -1
76 PYTHON_EXCEPTION_THROWN = -2
77 TIMING_DATA = -3
78
79
81
83 """
84 Serialize an iterator of objects to the output stream.
85 """
86 raise NotImplementedError
87
89 """
90 Return an iterator of deserialized objects from the input stream.
91 """
92 raise NotImplementedError
93
94
96 return self.load_stream(stream)
97
98
99
100
101
102
103
105 return isinstance(other, self.__class__)
106
108 return not self.__eq__(other)
109
110
112 """
113 Serializer that writes objects as a stream of (length, data) pairs,
114 where C{length} is a 32-bit integer and data is C{length} bytes.
115 """
116
118
119
120 self._only_write_strings = sys.version_info[0:2] <= (2, 6)
121
123 for obj in iterator:
124 self._write_with_length(obj, stream)
125
127 while True:
128 try:
129 yield self._read_with_length(stream)
130 except EOFError:
131 return
132
134 serialized = self.dumps(obj)
135 write_int(len(serialized), stream)
136 if self._only_write_strings:
137 stream.write(str(serialized))
138 else:
139 stream.write(serialized)
140
142 length = read_int(stream)
143 obj = stream.read(length)
144 if obj == "":
145 raise EOFError
146 return self.loads(obj)
147
149 """
150 Serialize an object into a byte array.
151 When batching is used, this will be called with an array of objects.
152 """
153 raise NotImplementedError
154
156 """
157 Deserialize an object from a byte array.
158 """
159 raise NotImplementedError
160
161
163 """
164 Serializes a stream of objects in batches by calling its wrapped
165 Serializer with streams of objects.
166 """
167
168 UNLIMITED_BATCH_SIZE = -1
169
170 - def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
171 self.serializer = serializer
172 self.batchSize = batchSize
173
175 if self.batchSize == self.UNLIMITED_BATCH_SIZE:
176 yield list(iterator)
177 else:
178 items = []
179 count = 0
180 for item in iterator:
181 items.append(item)
182 count += 1
183 if count == self.batchSize:
184 yield items
185 items = []
186 count = 0
187 if items:
188 yield items
189
191 self.serializer.dump_stream(self._batched(iterator), stream)
192
194 return chain.from_iterable(self._load_stream_without_unbatching(stream))
195
197 return self.serializer.load_stream(stream)
198
200 return isinstance(other, BatchedSerializer) and \
201 other.serializer == self.serializer
202
204 return "BatchedSerializer<%s>" % str(self.serializer)
205
206
208 """
209 Deserializes the JavaRDD cartesian() of two PythonRDDs.
210 """
211
213 self.key_ser = key_ser
214 self.val_ser = val_ser
215
217 key_stream = self.key_ser._load_stream_without_unbatching(stream)
218 val_stream = self.val_ser._load_stream_without_unbatching(stream)
219 key_is_batched = isinstance(self.key_ser, BatchedSerializer)
220 val_is_batched = isinstance(self.val_ser, BatchedSerializer)
221 for (keys, vals) in izip(key_stream, val_stream):
222 keys = keys if key_is_batched else [keys]
223 vals = vals if val_is_batched else [vals]
224 yield (keys, vals)
225
227 for (keys, vals) in self.prepare_keys_values(stream):
228 for pair in product(keys, vals):
229 yield pair
230
232 return isinstance(other, CartesianDeserializer) and \
233 self.key_ser == other.key_ser and self.val_ser == other.val_ser
234
236 return "CartesianDeserializer<%s, %s>" % \
237 (str(self.key_ser), str(self.val_ser))
238
239
241 """
242 Deserializes the JavaRDD zip() of two PythonRDDs.
243 """
244
246 self.key_ser = key_ser
247 self.val_ser = val_ser
248
250 for (keys, vals) in self.prepare_keys_values(stream):
251 for pair in izip(keys, vals):
252 yield pair
253
255 return isinstance(other, PairDeserializer) and \
256 self.key_ser == other.key_ser and self.val_ser == other.val_ser
257
259 return "PairDeserializer<%s, %s>" % \
260 (str(self.key_ser), str(self.val_ser))
261
262
264
265 - def loads(self, obj): return obj
266 - def dumps(self, obj): return obj
267
268
270 """
271 Serializes objects using Python's cPickle serializer:
272
273 http://docs.python.org/2/library/pickle.html
274
275 This serializer supports nearly any Python object, but may
276 not be as fast as more specialized serializers.
277 """
278
279 - def dumps(self, obj): return cPickle.dumps(obj, 2)
280 loads = cPickle.loads
281
283
284 - def dumps(self, obj): return cloudpickle.dumps(obj, 2)
285
286
288 """
289 Serializes objects using Python's Marshal serializer:
290
291 http://docs.python.org/2/library/marshal.html
292
293 This serializer is faster than PickleSerializer but supports fewer datatypes.
294 """
295
296 dumps = marshal.dumps
297 loads = marshal.loads
298
299
301 """
302 Deserializes streams written by String.getBytes.
303 """
304
305 - def loads(self, stream):
306 length = read_int(stream)
307 return stream.read(length).decode('utf8')
308
310 while True:
311 try:
312 yield self.loads(stream)
313 except struct.error:
314 return
315 except EOFError:
316 return
317
318
320 length = stream.read(8)
321 if length == "":
322 raise EOFError
323 return struct.unpack("!q", length)[0]
324
325
327 stream.write(struct.pack("!q", value))
328
329
331 return struct.pack("!q", value)
332
333
335 length = stream.read(4)
336 if length == "":
337 raise EOFError
338 return struct.unpack("!i", length)[0]
339
340
342 stream.write(struct.pack("!i", value))
343
344
346 write_int(len(obj), stream)
347 stream.write(obj)
348