1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from base64 import standard_b64encode as b64enc
19 import copy
20 from collections import defaultdict
21 from collections import namedtuple
22 from itertools import chain, ifilter, imap
23 import operator
24 import os
25 import sys
26 import shlex
27 import traceback
28 from subprocess import Popen, PIPE
29 from tempfile import NamedTemporaryFile
30 from threading import Thread
31 import warnings
32 import heapq
33 from random import Random
34
35 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
36 BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
37 from pyspark.join import python_join, python_left_outer_join, \
38 python_right_outer_join, python_cogroup
39 from pyspark.statcounter import StatCounter
40 from pyspark.rddsampler import RDDSampler
41 from pyspark.storagelevel import StorageLevel
42 from pyspark.resultiterable import ResultIterable
43
44 from py4j.java_collections import ListConverter, MapConverter
45
46 __all__ = ["RDD"]
50 """
51 This function returns the traceback info for a callsite, returns a dict
52 with function name, file name and line number
53 """
54 tb = traceback.extract_stack()
55 callsite = namedtuple("Callsite", "function file linenum")
56 if len(tb) == 0:
57 return None
58 file, line, module, what = tb[len(tb) - 1]
59 sparkpath = os.path.dirname(file)
60 first_spark_frame = len(tb) - 1
61 for i in range(0, len(tb)):
62 file, line, fun, what = tb[i]
63 if file.startswith(sparkpath):
64 first_spark_frame = i
65 break
66 if first_spark_frame == 0:
67 file, line, fun, what = tb[0]
68 return callsite(function=fun, file=file, linenum=line)
69 sfile, sline, sfun, swhat = tb[first_spark_frame]
70 ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
71 return callsite(function=sfun, file=ufile, linenum=uline)
72
73 _spark_stack_depth = 0
77 tb = _extract_concise_traceback()
78 if tb is not None:
79 self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum)
80 else:
81 self._traceback = "Error! Could not extract traceback info"
82 self._context = sc
83
89
95
97 """
98 An implementation of MaxHeap.
99 >>> import pyspark.rdd
100 >>> heap = pyspark.rdd.MaxHeapQ(5)
101 >>> [heap.insert(i) for i in range(10)]
102 [None, None, None, None, None, None, None, None, None, None]
103 >>> sorted(heap.getElements())
104 [0, 1, 2, 3, 4]
105 >>> heap = pyspark.rdd.MaxHeapQ(5)
106 >>> [heap.insert(i) for i in range(9, -1, -1)]
107 [None, None, None, None, None, None, None, None, None, None]
108 >>> sorted(heap.getElements())
109 [0, 1, 2, 3, 4]
110 >>> heap = pyspark.rdd.MaxHeapQ(1)
111 >>> [heap.insert(i) for i in range(9, -1, -1)]
112 [None, None, None, None, None, None, None, None, None, None]
113 >>> heap.getElements()
114 [0]
115 """
116
118
119 self.q = [0]
120 self.maxsize = maxsize
121
123 while (k > 1) and (self.q[k/2] < self.q[k]):
124 self._swap(k, k/2)
125 k = k/2
126
128 t = self.q[i]
129 self.q[i] = self.q[j]
130 self.q[j] = t
131
133 N = self.size()
134 while 2 * k <= N:
135 j = 2 * k
136
137
138 if j < N and self.q[j] < self.q[j + 1]:
139 j = j + 1
140 if(self.q[k] > self.q[j]):
141 break
142 self._swap(k, j)
143 k = j
144
146 return len(self.q) - 1
147
149 if (self.size()) < self.maxsize:
150 self.q.append(value)
151 self._swim(self.size())
152 else:
153 self._replaceRoot(value)
154
157
159 if(self.q[1] > value):
160 self.q[1] = value
161 self._sink(1)
162
164 """
165 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
166 Represents an immutable, partitioned collection of elements that can be
167 operated on in parallel.
168 """
169
170 - def __init__(self, jrdd, ctx, jrdd_deserializer):
171 self._jrdd = jrdd
172 self.is_cached = False
173 self.is_checkpointed = False
174 self.ctx = ctx
175 self._jrdd_deserializer = jrdd_deserializer
176 self._id = jrdd.id()
177
179 """
180 A unique ID for this RDD (within its SparkContext).
181 """
182 return self._id
183
185 return self._jrdd.toString()
186
187 @property
189 """
190 The L{SparkContext} that this RDD was created on.
191 """
192 return self.ctx
193
195 """
196 Persist this RDD with the default storage level (C{MEMORY_ONLY}).
197 """
198 self.is_cached = True
199 self._jrdd.cache()
200 return self
201
203 """
204 Set this RDD's storage level to persist its values across operations after the first time
205 it is computed. This can only be used to assign a new storage level if the RDD does not
206 have a storage level set yet.
207 """
208 self.is_cached = True
209 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
210 self._jrdd.persist(javaStorageLevel)
211 return self
212
214 """
215 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
216 """
217 self.is_cached = False
218 self._jrdd.unpersist()
219 return self
220
222 """
223 Mark this RDD for checkpointing. It will be saved to a file inside the
224 checkpoint directory set with L{SparkContext.setCheckpointDir()} and
225 all references to its parent RDDs will be removed. This function must
226 be called before any job has been executed on this RDD. It is strongly
227 recommended that this RDD is persisted in memory, otherwise saving it
228 on a file will require recomputation.
229 """
230 self.is_checkpointed = True
231 self._jrdd.rdd().checkpoint()
232
234 """
235 Return whether this RDD has been checkpointed or not
236 """
237 return self._jrdd.rdd().isCheckpointed()
238
240 """
241 Gets the name of the file to which this RDD was checkpointed
242 """
243 checkpointFile = self._jrdd.rdd().getCheckpointFile()
244 if checkpointFile.isDefined():
245 return checkpointFile.get()
246 else:
247 return None
248
249 - def map(self, f, preservesPartitioning=False):
250 """
251 Return a new RDD by applying a function to each element of this RDD.
252
253 >>> rdd = sc.parallelize(["b", "a", "c"])
254 >>> sorted(rdd.map(lambda x: (x, 1)).collect())
255 [('a', 1), ('b', 1), ('c', 1)]
256 """
257 def func(split, iterator): return imap(f, iterator)
258 return PipelinedRDD(self, func, preservesPartitioning)
259
260 - def flatMap(self, f, preservesPartitioning=False):
261 """
262 Return a new RDD by first applying a function to all elements of this
263 RDD, and then flattening the results.
264
265 >>> rdd = sc.parallelize([2, 3, 4])
266 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
267 [1, 1, 1, 2, 2, 3]
268 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
269 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
270 """
271 def func(s, iterator): return chain.from_iterable(imap(f, iterator))
272 return self.mapPartitionsWithIndex(func, preservesPartitioning)
273
275 """
276 Return a new RDD by applying a function to each partition of this RDD.
277
278 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
279 >>> def f(iterator): yield sum(iterator)
280 >>> rdd.mapPartitions(f).collect()
281 [3, 7]
282 """
283 def func(s, iterator): return f(iterator)
284 return self.mapPartitionsWithIndex(func)
285
287 """
288 Return a new RDD by applying a function to each partition of this RDD,
289 while tracking the index of the original partition.
290
291 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
292 >>> def f(splitIndex, iterator): yield splitIndex
293 >>> rdd.mapPartitionsWithIndex(f).sum()
294 6
295 """
296 return PipelinedRDD(self, f, preservesPartitioning)
297
299 """
300 Deprecated: use mapPartitionsWithIndex instead.
301
302 Return a new RDD by applying a function to each partition of this RDD,
303 while tracking the index of the original partition.
304
305 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
306 >>> def f(splitIndex, iterator): yield splitIndex
307 >>> rdd.mapPartitionsWithSplit(f).sum()
308 6
309 """
310 warnings.warn("mapPartitionsWithSplit is deprecated; "
311 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
312 return self.mapPartitionsWithIndex(f, preservesPartitioning)
313
315 """
316 Return a new RDD containing only the elements that satisfy a predicate.
317
318 >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
319 >>> rdd.filter(lambda x: x % 2 == 0).collect()
320 [2, 4]
321 """
322 def func(iterator): return ifilter(f, iterator)
323 return self.mapPartitions(func)
324
326 """
327 Return a new RDD containing the distinct elements in this RDD.
328
329 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
330 [1, 2, 3]
331 """
332 return self.map(lambda x: (x, None)) \
333 .reduceByKey(lambda x, _: x) \
334 .map(lambda (x, _): x)
335
336 - def sample(self, withReplacement, fraction, seed=None):
337 """
338 Return a sampled subset of this RDD (relies on numpy and falls back
339 on default random generator if numpy is unavailable).
340
341 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
342 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
343 """
344 assert fraction >= 0.0, "Invalid fraction value: %s" % fraction
345 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
346
347
348 - def takeSample(self, withReplacement, num, seed=None):
349 """
350 Return a fixed-size sampled subset of this RDD (currently requires numpy).
351
352 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
353 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
354 """
355
356 fraction = 0.0
357 total = 0
358 multiplier = 3.0
359 initialCount = self.count()
360 maxSelected = 0
361
362 if (num < 0):
363 raise ValueError
364
365 if (initialCount == 0):
366 return list()
367
368 if initialCount > sys.maxint - 1:
369 maxSelected = sys.maxint - 1
370 else:
371 maxSelected = initialCount
372
373 if num > initialCount and not withReplacement:
374 total = maxSelected
375 fraction = multiplier * (maxSelected + 1) / initialCount
376 else:
377 fraction = multiplier * (num + 1) / initialCount
378 total = num
379
380 samples = self.sample(withReplacement, fraction, seed).collect()
381
382
383
384
385 rand = Random(seed)
386 while len(samples) < total:
387 samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect()
388
389 sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint))
390 sampler.shuffle(samples)
391 return samples[0:total]
392
394 """
395 Return the union of this RDD and another one.
396
397 >>> rdd = sc.parallelize([1, 1, 2, 3])
398 >>> rdd.union(rdd).collect()
399 [1, 1, 2, 3, 1, 1, 2, 3]
400 """
401 if self._jrdd_deserializer == other._jrdd_deserializer:
402 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
403 self._jrdd_deserializer)
404 return rdd
405 else:
406
407
408 self_copy = self._reserialize()
409 other_copy = other._reserialize()
410 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
411 self.ctx.serializer)
412
414 """
415 Return the intersection of this RDD and another one. The output will not
416 contain any duplicate elements, even if the input RDDs did.
417
418 Note that this method performs a shuffle internally.
419
420 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
421 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
422 >>> rdd1.intersection(rdd2).collect()
423 [1, 2, 3]
424 """
425 return self.map(lambda v: (v, None)) \
426 .cogroup(other.map(lambda v: (v, None))) \
427 .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \
428 .keys()
429
431 if self._jrdd_deserializer == self.ctx.serializer:
432 return self
433 else:
434 return self.map(lambda x: x, preservesPartitioning=True)
435
437 """
438 Return the union of this RDD and another one.
439
440 >>> rdd = sc.parallelize([1, 1, 2, 3])
441 >>> (rdd + rdd).collect()
442 [1, 1, 2, 3, 1, 1, 2, 3]
443 """
444 if not isinstance(other, RDD):
445 raise TypeError
446 return self.union(other)
447
448 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
449 """
450 Sorts this RDD, which is assumed to consist of (key, value) pairs.
451
452 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
453 >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
454 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
455 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
456 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
457 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
458 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
459 """
460 if numPartitions is None:
461 numPartitions = self.ctx.defaultParallelism
462
463 bounds = list()
464
465
466
467
468 if numPartitions > 1:
469 rddSize = self.count()
470 maxSampleSize = numPartitions * 20.0
471 fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
472
473 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
474 samples = sorted(samples, reverse=(not ascending), key=keyfunc)
475
476
477
478 for i in range(0, numPartitions - 1):
479 index = (len(samples) - 1) * (i + 1) / numPartitions
480 bounds.append(samples[index])
481
482 def rangePartitionFunc(k):
483 p = 0
484 while p < len(bounds) and keyfunc(k) > bounds[p]:
485 p += 1
486 if ascending:
487 return p
488 else:
489 return numPartitions-1-p
490
491 def mapFunc(iterator):
492 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
493
494 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc)
495 .mapPartitions(mapFunc,preservesPartitioning=True)
496 .flatMap(lambda x: x, preservesPartitioning=True))
497
499 """
500 Return an RDD created by coalescing all elements within each partition
501 into a list.
502
503 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
504 >>> sorted(rdd.glom().collect())
505 [[1, 2], [3, 4]]
506 """
507 def func(iterator): yield list(iterator)
508 return self.mapPartitions(func)
509
511 """
512 Return the Cartesian product of this RDD and another one, that is, the
513 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
514 C{b} is in C{other}.
515
516 >>> rdd = sc.parallelize([1, 2])
517 >>> sorted(rdd.cartesian(rdd).collect())
518 [(1, 1), (1, 2), (2, 1), (2, 2)]
519 """
520
521 deserializer = CartesianDeserializer(self._jrdd_deserializer,
522 other._jrdd_deserializer)
523 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
524
525 - def groupBy(self, f, numPartitions=None):
526 """
527 Return an RDD of grouped items.
528
529 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
530 >>> result = rdd.groupBy(lambda x: x % 2).collect()
531 >>> sorted([(x, sorted(y)) for (x, y) in result])
532 [(0, [2, 8]), (1, [1, 1, 3, 5])]
533 """
534 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
535
536 - def pipe(self, command, env={}):
537 """
538 Return an RDD created by piping elements to a forked external process.
539
540 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
541 ['1', '2', '', '3']
542 """
543 def func(iterator):
544 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
545 def pipe_objs(out):
546 for obj in iterator:
547 out.write(str(obj).rstrip('\n') + '\n')
548 out.close()
549 Thread(target=pipe_objs, args=[pipe.stdin]).start()
550 return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
551 return self.mapPartitions(func)
552
554 """
555 Applies a function to all elements of this RDD.
556
557 >>> def f(x): print x
558 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
559 """
560 def processPartition(iterator):
561 for x in iterator:
562 f(x)
563 yield None
564 self.mapPartitions(processPartition).collect()
565
567 """
568 Applies a function to each partition of this RDD.
569
570 >>> def f(iterator):
571 ... for x in iterator:
572 ... print x
573 ... yield None
574 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
575 """
576 self.mapPartitions(f).collect()
577
579 """
580 Return a list that contains all of the elements in this RDD.
581 """
582 with _JavaStackTrace(self.context) as st:
583 bytesInJava = self._jrdd.collect().iterator()
584 return list(self._collect_iterator_through_file(bytesInJava))
585
587
588
589
590 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
591 tempFile.close()
592 self.ctx._writeToFile(iterator, tempFile.name)
593
594 with open(tempFile.name, 'rb') as tempFile:
595 for item in self._jrdd_deserializer.load_stream(tempFile):
596 yield item
597 os.unlink(tempFile.name)
598
600 """
601 Reduces the elements of this RDD using the specified commutative and
602 associative binary operator. Currently reduces partitions locally.
603
604 >>> from operator import add
605 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
606 15
607 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
608 10
609 """
610 def func(iterator):
611 acc = None
612 for obj in iterator:
613 if acc is None:
614 acc = obj
615 else:
616 acc = f(obj, acc)
617 if acc is not None:
618 yield acc
619 vals = self.mapPartitions(func).collect()
620 return reduce(f, vals)
621
622 - def fold(self, zeroValue, op):
623 """
624 Aggregate the elements of each partition, and then the results for all
625 the partitions, using a given associative function and a neutral "zero
626 value."
627
628 The function C{op(t1, t2)} is allowed to modify C{t1} and return it
629 as its result value to avoid object allocation; however, it should not
630 modify C{t2}.
631
632 >>> from operator import add
633 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
634 15
635 """
636 def func(iterator):
637 acc = zeroValue
638 for obj in iterator:
639 acc = op(obj, acc)
640 yield acc
641 vals = self.mapPartitions(func).collect()
642 return reduce(op, vals, zeroValue)
643
644 - def aggregate(self, zeroValue, seqOp, combOp):
645 """
646 Aggregate the elements of each partition, and then the results for all
647 the partitions, using a given combine functions and a neutral "zero
648 value."
649
650 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
651 as its result value to avoid object allocation; however, it should not
652 modify C{t2}.
653
654 The first function (seqOp) can return a different result type, U, than
655 the type of this RDD. Thus, we need one operation for merging a T into an U
656 and one operation for merging two U
657
658 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
659 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
660 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
661 (10, 4)
662 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
663 (0, 0)
664 """
665 def func(iterator):
666 acc = zeroValue
667 for obj in iterator:
668 acc = seqOp(acc, obj)
669 yield acc
670
671 return self.mapPartitions(func).fold(zeroValue, combOp)
672
673
675 """
676 Find the maximum item in this RDD.
677
678 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
679 43.0
680 """
681 return self.reduce(max)
682
684 """
685 Find the maximum item in this RDD.
686
687 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
688 1.0
689 """
690 return self.reduce(min)
691
693 """
694 Add up the elements in this RDD.
695
696 >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
697 6.0
698 """
699 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
700
702 """
703 Return the number of elements in this RDD.
704
705 >>> sc.parallelize([2, 3, 4]).count()
706 3
707 """
708 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
709
711 """
712 Return a L{StatCounter} object that captures the mean, variance
713 and count of the RDD's elements in one operation.
714 """
715 def redFunc(left_counter, right_counter):
716 return left_counter.mergeStats(right_counter)
717
718 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
719
721 """
722 Compute the mean of this RDD's elements.
723
724 >>> sc.parallelize([1, 2, 3]).mean()
725 2.0
726 """
727 return self.stats().mean()
728
730 """
731 Compute the variance of this RDD's elements.
732
733 >>> sc.parallelize([1, 2, 3]).variance()
734 0.666...
735 """
736 return self.stats().variance()
737
739 """
740 Compute the standard deviation of this RDD's elements.
741
742 >>> sc.parallelize([1, 2, 3]).stdev()
743 0.816...
744 """
745 return self.stats().stdev()
746
748 """
749 Compute the sample standard deviation of this RDD's elements (which corrects for bias in
750 estimating the standard deviation by dividing by N-1 instead of N).
751
752 >>> sc.parallelize([1, 2, 3]).sampleStdev()
753 1.0
754 """
755 return self.stats().sampleStdev()
756
758 """
759 Compute the sample variance of this RDD's elements (which corrects for bias in
760 estimating the variance by dividing by N-1 instead of N).
761
762 >>> sc.parallelize([1, 2, 3]).sampleVariance()
763 1.0
764 """
765 return self.stats().sampleVariance()
766
768 """
769 Return the count of each unique value in this RDD as a dictionary of
770 (value, count) pairs.
771
772 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
773 [(1, 2), (2, 3)]
774 """
775 def countPartition(iterator):
776 counts = defaultdict(int)
777 for obj in iterator:
778 counts[obj] += 1
779 yield counts
780 def mergeMaps(m1, m2):
781 for (k, v) in m2.iteritems():
782 m1[k] += v
783 return m1
784 return self.mapPartitions(countPartition).reduce(mergeMaps)
785
786 - def top(self, num):
787 """
788 Get the top N elements from a RDD.
789
790 Note: It returns the list sorted in descending order.
791 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
792 [12]
793 >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2)
794 [6, 5]
795 """
796 def topIterator(iterator):
797 q = []
798 for k in iterator:
799 if len(q) < num:
800 heapq.heappush(q, k)
801 else:
802 heapq.heappushpop(q, k)
803 yield q
804
805 def merge(a, b):
806 return next(topIterator(a + b))
807
808 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True)
809
811 """
812 Get the N elements from a RDD ordered in ascending order or as specified
813 by the optional key function.
814
815 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
816 [1, 2, 3, 4, 5, 6]
817 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
818 [10, 9, 7, 6, 5, 4]
819 """
820
821 def topNKeyedElems(iterator, key_=None):
822 q = MaxHeapQ(num)
823 for k in iterator:
824 if key_ != None:
825 k = (key_(k), k)
826 q.insert(k)
827 yield q.getElements()
828
829 def unKey(x, key_=None):
830 if key_ != None:
831 x = [i[1] for i in x]
832 return x
833
834 def merge(a, b):
835 return next(topNKeyedElems(a + b))
836 result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
837 return sorted(unKey(result, key), key=key)
838
839
840 - def take(self, num):
841 """
842 Take the first num elements of the RDD.
843
844 This currently scans the partitions *one by one*, so it will be slow if
845 a lot of partitions are required. In that case, use L{collect} to get
846 the whole RDD instead.
847
848 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
849 [2, 3]
850 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
851 [2, 3, 4, 5, 6]
852 """
853 def takeUpToNum(iterator):
854 taken = 0
855 while taken < num:
856 yield next(iterator)
857 taken += 1
858
859 mapped = self.mapPartitions(takeUpToNum)
860 items = []
861
862
863
864 with _JavaStackTrace(self.context) as st:
865 for partition in range(mapped._jrdd.splits().size()):
866 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
867 partitionsToTake[0] = partition
868 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
869 items.extend(mapped._collect_iterator_through_file(iterator))
870 if len(items) >= num:
871 break
872 return items[:num]
873
875 """
876 Return the first element in this RDD.
877
878 >>> sc.parallelize([2, 3, 4]).first()
879 2
880 """
881 return self.take(1)[0]
882
883 - def saveAsTextFile(self, path):
884 """
885 Save this RDD as a text file, using string representations of elements.
886
887 >>> tempFile = NamedTemporaryFile(delete=True)
888 >>> tempFile.close()
889 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
890 >>> from fileinput import input
891 >>> from glob import glob
892 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
893 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
894
895 Empty lines are tolerated when saving to text files.
896
897 >>> tempFile2 = NamedTemporaryFile(delete=True)
898 >>> tempFile2.close()
899 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
900 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
901 '\\n\\n\\nbar\\nfoo\\n'
902 """
903 def func(split, iterator):
904 for x in iterator:
905 if not isinstance(x, basestring):
906 x = unicode(x)
907 yield x.encode("utf-8")
908 keyed = PipelinedRDD(self, func)
909 keyed._bypass_serializer = True
910 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
911
912
913
915 """
916 Return the key-value pairs in this RDD to the master as a dictionary.
917
918 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
919 >>> m[1]
920 2
921 >>> m[3]
922 4
923 """
924 return dict(self.collect())
925
927 """
928 Return an RDD with the keys of each tuple.
929 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
930 >>> m.collect()
931 [1, 3]
932 """
933 return self.map(lambda (k, v): k)
934
936 """
937 Return an RDD with the values of each tuple.
938 >>> m = sc.parallelize([(1, 2), (3, 4)]).values()
939 >>> m.collect()
940 [2, 4]
941 """
942 return self.map(lambda (k, v): v)
943
945 """
946 Merge the values for each key using an associative reduce function.
947
948 This will also perform the merging locally on each mapper before
949 sending results to a reducer, similarly to a "combiner" in MapReduce.
950
951 Output will be hash-partitioned with C{numPartitions} partitions, or
952 the default parallelism level if C{numPartitions} is not specified.
953
954 >>> from operator import add
955 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
956 >>> sorted(rdd.reduceByKey(add).collect())
957 [('a', 2), ('b', 1)]
958 """
959 return self.combineByKey(lambda x: x, func, func, numPartitions)
960
962 """
963 Merge the values for each key using an associative reduce function, but
964 return the results immediately to the master as a dictionary.
965
966 This will also perform the merging locally on each mapper before
967 sending results to a reducer, similarly to a "combiner" in MapReduce.
968
969 >>> from operator import add
970 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
971 >>> sorted(rdd.reduceByKeyLocally(add).items())
972 [('a', 2), ('b', 1)]
973 """
974 def reducePartition(iterator):
975 m = {}
976 for (k, v) in iterator:
977 m[k] = v if k not in m else func(m[k], v)
978 yield m
979 def mergeMaps(m1, m2):
980 for (k, v) in m2.iteritems():
981 m1[k] = v if k not in m1 else func(m1[k], v)
982 return m1
983 return self.mapPartitions(reducePartition).reduce(mergeMaps)
984
986 """
987 Count the number of elements for each key, and return the result to the
988 master as a dictionary.
989
990 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
991 >>> sorted(rdd.countByKey().items())
992 [('a', 2), ('b', 1)]
993 """
994 return self.map(lambda x: x[0]).countByValue()
995
996 - def join(self, other, numPartitions=None):
997 """
998 Return an RDD containing all pairs of elements with matching keys in
999 C{self} and C{other}.
1000
1001 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
1002 (k, v1) is in C{self} and (k, v2) is in C{other}.
1003
1004 Performs a hash join across the cluster.
1005
1006 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1007 >>> y = sc.parallelize([("a", 2), ("a", 3)])
1008 >>> sorted(x.join(y).collect())
1009 [('a', (1, 2)), ('a', (1, 3))]
1010 """
1011 return python_join(self, other, numPartitions)
1012
1014 """
1015 Perform a left outer join of C{self} and C{other}.
1016
1017 For each element (k, v) in C{self}, the resulting RDD will either
1018 contain all pairs (k, (v, w)) for w in C{other}, or the pair
1019 (k, (v, None)) if no elements in other have key k.
1020
1021 Hash-partitions the resulting RDD into the given number of partitions.
1022
1023 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1024 >>> y = sc.parallelize([("a", 2)])
1025 >>> sorted(x.leftOuterJoin(y).collect())
1026 [('a', (1, 2)), ('b', (4, None))]
1027 """
1028 return python_left_outer_join(self, other, numPartitions)
1029
1031 """
1032 Perform a right outer join of C{self} and C{other}.
1033
1034 For each element (k, w) in C{other}, the resulting RDD will either
1035 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
1036 if no elements in C{self} have key k.
1037
1038 Hash-partitions the resulting RDD into the given number of partitions.
1039
1040 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1041 >>> y = sc.parallelize([("a", 2)])
1042 >>> sorted(y.rightOuterJoin(x).collect())
1043 [('a', (2, 1)), ('b', (None, 4))]
1044 """
1045 return python_right_outer_join(self, other, numPartitions)
1046
1047
1048 - def partitionBy(self, numPartitions, partitionFunc=hash):
1049 """
1050 Return a copy of the RDD partitioned using the specified partitioner.
1051
1052 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
1053 >>> sets = pairs.partitionBy(2).glom().collect()
1054 >>> set(sets[0]).intersection(set(sets[1]))
1055 set([])
1056 """
1057 if numPartitions is None:
1058 numPartitions = self.ctx.defaultParallelism
1059
1060
1061
1062 outputSerializer = self.ctx._unbatched_serializer
1063 def add_shuffle_key(split, iterator):
1064
1065 buckets = defaultdict(list)
1066
1067 for (k, v) in iterator:
1068 buckets[partitionFunc(k) % numPartitions].append((k, v))
1069 for (split, items) in buckets.iteritems():
1070 yield pack_long(split)
1071 yield outputSerializer.dumps(items)
1072 keyed = PipelinedRDD(self, add_shuffle_key)
1073 keyed._bypass_serializer = True
1074 with _JavaStackTrace(self.context) as st:
1075 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
1076 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
1077 id(partitionFunc))
1078 jrdd = pairRDD.partitionBy(partitioner).values()
1079 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
1080
1081
1082 rdd._partitionFunc = partitionFunc
1083 return rdd
1084
1085
1086 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
1087 numPartitions=None):
1088 """
1089 Generic function to combine the elements for each key using a custom
1090 set of aggregation functions.
1091
1092 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
1093 type" C. Note that V and C can be different -- for example, one might
1094 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
1095
1096 Users provide three functions:
1097
1098 - C{createCombiner}, which turns a V into a C (e.g., creates
1099 a one-element list)
1100 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
1101 a list)
1102 - C{mergeCombiners}, to combine two C's into a single one.
1103
1104 In addition, users can control the partitioning of the output RDD.
1105
1106 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1107 >>> def f(x): return x
1108 >>> def add(a, b): return a + str(b)
1109 >>> sorted(x.combineByKey(str, add, add).collect())
1110 [('a', '11'), ('b', '1')]
1111 """
1112 if numPartitions is None:
1113 numPartitions = self.ctx.defaultParallelism
1114 def combineLocally(iterator):
1115 combiners = {}
1116 for x in iterator:
1117 (k, v) = x
1118 if k not in combiners:
1119 combiners[k] = createCombiner(v)
1120 else:
1121 combiners[k] = mergeValue(combiners[k], v)
1122 return combiners.iteritems()
1123 locally_combined = self.mapPartitions(combineLocally)
1124 shuffled = locally_combined.partitionBy(numPartitions)
1125 def _mergeCombiners(iterator):
1126 combiners = {}
1127 for (k, v) in iterator:
1128 if not k in combiners:
1129 combiners[k] = v
1130 else:
1131 combiners[k] = mergeCombiners(combiners[k], v)
1132 return combiners.iteritems()
1133 return shuffled.mapPartitions(_mergeCombiners)
1134
1135 - def foldByKey(self, zeroValue, func, numPartitions=None):
1136 """
1137 Merge the values for each key using an associative function "func" and a neutral "zeroValue"
1138 which may be added to the result an arbitrary number of times, and must not change
1139 the result (e.g., 0 for addition, or 1 for multiplication.).
1140
1141 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1142 >>> from operator import add
1143 >>> rdd.foldByKey(0, add).collect()
1144 [('a', 2), ('b', 1)]
1145 """
1146 return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
1147
1148
1149
1151 """
1152 Group the values for each key in the RDD into a single sequence.
1153 Hash-partitions the resulting RDD with into numPartitions partitions.
1154
1155 Note: If you are grouping in order to perform an aggregation (such as a
1156 sum or average) over each key, using reduceByKey will provide much better
1157 performance.
1158
1159 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1160 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
1161 [('a', [1, 1]), ('b', [1])]
1162 """
1163
1164 def createCombiner(x):
1165 return [x]
1166
1167 def mergeValue(xs, x):
1168 xs.append(x)
1169 return xs
1170
1171 def mergeCombiners(a, b):
1172 return a + b
1173
1174 return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
1175 numPartitions).mapValues(lambda x: ResultIterable(x))
1176
1177
1179 """
1180 Pass each value in the key-value pair RDD through a flatMap function
1181 without changing the keys; this also retains the original RDD's
1182 partitioning.
1183
1184 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
1185 >>> def f(x): return x
1186 >>> x.flatMapValues(f).collect()
1187 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
1188 """
1189 flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
1190 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1191
1193 """
1194 Pass each value in the key-value pair RDD through a map function
1195 without changing the keys; this also retains the original RDD's
1196 partitioning.
1197
1198 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
1199 >>> def f(x): return len(x)
1200 >>> x.mapValues(f).collect()
1201 [('a', 3), ('b', 1)]
1202 """
1203 map_values_fn = lambda (k, v): (k, f(v))
1204 return self.map(map_values_fn, preservesPartitioning=True)
1205
1206
1208 """
1209 Alias for cogroup.
1210 """
1211 return self.cogroup(other)
1212
1213
1214 - def cogroup(self, other, numPartitions=None):
1215 """
1216 For each key k in C{self} or C{other}, return a resulting RDD that
1217 contains a tuple with the list of values for that key in C{self} as well
1218 as C{other}.
1219
1220 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1221 >>> y = sc.parallelize([("a", 2)])
1222 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
1223 [('a', ([1], [2])), ('b', ([4], []))]
1224 """
1225 return python_cogroup(self, other, numPartitions)
1226
1228 """
1229 Return each (key, value) pair in C{self} that has no pair with matching key
1230 in C{other}.
1231
1232 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
1233 >>> y = sc.parallelize([("a", 3), ("c", None)])
1234 >>> sorted(x.subtractByKey(y).collect())
1235 [('b', 4), ('b', 5)]
1236 """
1237 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
1238 map_func = lambda (key, vals): [(key, val) for val in vals[0]]
1239 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1240
1241 - def subtract(self, other, numPartitions=None):
1242 """
1243 Return each value in C{self} that is not contained in C{other}.
1244
1245 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
1246 >>> y = sc.parallelize([("a", 3), ("c", None)])
1247 >>> sorted(x.subtract(y).collect())
1248 [('a', 1), ('b', 4), ('b', 5)]
1249 """
1250 rdd = other.map(lambda x: (x, True))
1251 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
1252
1254 """
1255 Creates tuples of the elements in this RDD by applying C{f}.
1256
1257 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
1258 >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
1259 >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
1260 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
1261 """
1262 return self.map(lambda x: (f(x), x))
1263
1265 """
1266 Return a new RDD that has exactly numPartitions partitions.
1267
1268 Can increase or decrease the level of parallelism in this RDD. Internally, this uses
1269 a shuffle to redistribute data.
1270 If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
1271 which can avoid performing a shuffle.
1272 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
1273 >>> sorted(rdd.glom().collect())
1274 [[1], [2, 3], [4, 5], [6, 7]]
1275 >>> len(rdd.repartition(2).glom().collect())
1276 2
1277 >>> len(rdd.repartition(10).glom().collect())
1278 10
1279 """
1280 jrdd = self._jrdd.repartition(numPartitions)
1281 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1282
1283 - def coalesce(self, numPartitions, shuffle=False):
1284 """
1285 Return a new RDD that is reduced into `numPartitions` partitions.
1286 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
1287 [[1], [2, 3], [4, 5]]
1288 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
1289 [[1, 2, 3, 4, 5]]
1290 """
1291 jrdd = self._jrdd.coalesce(numPartitions)
1292 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1293
1294 - def zip(self, other):
1295 """
1296 Zips this RDD with another one, returning key-value pairs with the first element in each RDD
1297 second element in each RDD, etc. Assumes that the two RDDs have the same number of
1298 partitions and the same number of elements in each partition (e.g. one was made through
1299 a map on the other).
1300
1301 >>> x = sc.parallelize(range(0,5))
1302 >>> y = sc.parallelize(range(1000, 1005))
1303 >>> x.zip(y).collect()
1304 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
1305 """
1306 pairRDD = self._jrdd.zip(other._jrdd)
1307 deserializer = PairDeserializer(self._jrdd_deserializer,
1308 other._jrdd_deserializer)
1309 return RDD(pairRDD, self.ctx, deserializer)
1310
1312 """
1313 Return the name of this RDD.
1314 """
1315 name_ = self._jrdd.name()
1316 if not name_:
1317 return None
1318 return name_.encode('utf-8')
1319
1321 """
1322 Assign a name to this RDD.
1323 >>> rdd1 = sc.parallelize([1,2])
1324 >>> rdd1.setName('RDD1')
1325 >>> rdd1.name()
1326 'RDD1'
1327 """
1328 self._jrdd.setName(name)
1329
1331 """
1332 A description of this RDD and its recursive dependencies for debugging.
1333 """
1334 debug_string = self._jrdd.toDebugString()
1335 if not debug_string:
1336 return None
1337 return debug_string.encode('utf-8')
1338
1340 """
1341 Get the RDD's current storage level.
1342 >>> rdd1 = sc.parallelize([1,2])
1343 >>> rdd1.getStorageLevel()
1344 StorageLevel(False, False, False, False, 1)
1345 """
1346 java_storage_level = self._jrdd.getStorageLevel()
1347 storage_level = StorageLevel(java_storage_level.useDisk(),
1348 java_storage_level.useMemory(),
1349 java_storage_level.useOffHeap(),
1350 java_storage_level.deserialized(),
1351 java_storage_level.replication())
1352 return storage_level
1353
1360 """
1361 Pipelined maps:
1362 >>> rdd = sc.parallelize([1, 2, 3, 4])
1363 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
1364 [4, 8, 12, 16]
1365 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
1366 [4, 8, 12, 16]
1367
1368 Pipelined reduces:
1369 >>> from operator import add
1370 >>> rdd.map(lambda x: 2 * x).reduce(add)
1371 20
1372 >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
1373 20
1374 """
1375 - def __init__(self, prev, func, preservesPartitioning=False):
1376 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
1377
1378 self.func = func
1379 self.preservesPartitioning = preservesPartitioning
1380 self._prev_jrdd = prev._jrdd
1381 self._prev_jrdd_deserializer = prev._jrdd_deserializer
1382 else:
1383 prev_func = prev.func
1384 def pipeline_func(split, iterator):
1385 return func(split, prev_func(split, iterator))
1386 self.func = pipeline_func
1387 self.preservesPartitioning = \
1388 prev.preservesPartitioning and preservesPartitioning
1389 self._prev_jrdd = prev._prev_jrdd
1390 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
1391 self.is_cached = False
1392 self.is_checkpointed = False
1393 self.ctx = prev.ctx
1394 self.prev = prev
1395 self._jrdd_val = None
1396 self._jrdd_deserializer = self.ctx.serializer
1397 self._bypass_serializer = False
1398
1399 @property
1401 if self._jrdd_val:
1402 return self._jrdd_val
1403 if self._bypass_serializer:
1404 serializer = NoOpSerializer()
1405 else:
1406 serializer = self.ctx.serializer
1407 command = (self.func, self._prev_jrdd_deserializer, serializer)
1408 pickled_command = CloudPickleSerializer().dumps(command)
1409 broadcast_vars = ListConverter().convert(
1410 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
1411 self.ctx._gateway._gateway_client)
1412 self.ctx._pickled_broadcast_vars.clear()
1413 class_tag = self._prev_jrdd.classTag()
1414 env = MapConverter().convert(self.ctx.environment,
1415 self.ctx._gateway._gateway_client)
1416 includes = ListConverter().convert(self.ctx._python_includes,
1417 self.ctx._gateway._gateway_client)
1418 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
1419 bytearray(pickled_command), env, includes, self.preservesPartitioning,
1420 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
1421 class_tag)
1422 self._jrdd_val = python_rdd.asJavaRDD()
1423 return self._jrdd_val
1424
1426 return not (self.is_cached or self.is_checkpointed)
1427
1430 import doctest
1431 from pyspark.context import SparkContext
1432 globs = globals().copy()
1433
1434
1435 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
1436 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
1437 globs['sc'].stop()
1438 if failure_count:
1439 exit(-1)
1440
1441
1442 if __name__ == "__main__":
1443 _test()
1444