1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from base64 import standard_b64encode as b64enc
19 import copy
20 from collections import defaultdict
21 from collections import namedtuple
22 from itertools import chain, ifilter, imap
23 import operator
24 import os
25 import sys
26 import shlex
27 import traceback
28 from subprocess import Popen, PIPE
29 from tempfile import NamedTemporaryFile
30 from threading import Thread
31 import warnings
32 import heapq
33 from random import Random
34
35 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
36 BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
37 from pyspark.join import python_join, python_left_outer_join, \
38 python_right_outer_join, python_cogroup
39 from pyspark.statcounter import StatCounter
40 from pyspark.rddsampler import RDDSampler
41 from pyspark.storagelevel import StorageLevel
42 from pyspark.resultiterable import ResultIterable
43
44 from py4j.java_collections import ListConverter, MapConverter
45
46 __all__ = ["RDD"]
52 """
53 This function returns consistent hash code for builtin types, especially
54 for None and tuple with None.
55
56 The algrithm is similar to that one used by CPython 2.7
57
58 >>> portable_hash(None)
59 0
60 >>> portable_hash((None, 1))
61 219750521
62 """
63 if x is None:
64 return 0
65 if isinstance(x, tuple):
66 h = 0x345678
67 for i in x:
68 h ^= portable_hash(i)
69 h *= 1000003
70 h &= 0xffffffff
71 h ^= len(x)
72 if h == -1:
73 h = -2
74 return h
75 return hash(x)
76
79 """
80 This function returns the traceback info for a callsite, returns a dict
81 with function name, file name and line number
82 """
83 tb = traceback.extract_stack()
84 callsite = namedtuple("Callsite", "function file linenum")
85 if len(tb) == 0:
86 return None
87 file, line, module, what = tb[len(tb) - 1]
88 sparkpath = os.path.dirname(file)
89 first_spark_frame = len(tb) - 1
90 for i in range(0, len(tb)):
91 file, line, fun, what = tb[i]
92 if file.startswith(sparkpath):
93 first_spark_frame = i
94 break
95 if first_spark_frame == 0:
96 file, line, fun, what = tb[0]
97 return callsite(function=fun, file=file, linenum=line)
98 sfile, sline, sfun, swhat = tb[first_spark_frame]
99 ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
100 return callsite(function=sfun, file=ufile, linenum=uline)
101
102 _spark_stack_depth = 0
106 tb = _extract_concise_traceback()
107 if tb is not None:
108 self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum)
109 else:
110 self._traceback = "Error! Could not extract traceback info"
111 self._context = sc
112
118
124
126 """
127 An implementation of MaxHeap.
128 >>> import pyspark.rdd
129 >>> heap = pyspark.rdd.MaxHeapQ(5)
130 >>> [heap.insert(i) for i in range(10)]
131 [None, None, None, None, None, None, None, None, None, None]
132 >>> sorted(heap.getElements())
133 [0, 1, 2, 3, 4]
134 >>> heap = pyspark.rdd.MaxHeapQ(5)
135 >>> [heap.insert(i) for i in range(9, -1, -1)]
136 [None, None, None, None, None, None, None, None, None, None]
137 >>> sorted(heap.getElements())
138 [0, 1, 2, 3, 4]
139 >>> heap = pyspark.rdd.MaxHeapQ(1)
140 >>> [heap.insert(i) for i in range(9, -1, -1)]
141 [None, None, None, None, None, None, None, None, None, None]
142 >>> heap.getElements()
143 [0]
144 """
145
147
148 self.q = [0]
149 self.maxsize = maxsize
150
152 while (k > 1) and (self.q[k/2] < self.q[k]):
153 self._swap(k, k/2)
154 k = k/2
155
157 t = self.q[i]
158 self.q[i] = self.q[j]
159 self.q[j] = t
160
162 N = self.size()
163 while 2 * k <= N:
164 j = 2 * k
165
166
167 if j < N and self.q[j] < self.q[j + 1]:
168 j = j + 1
169 if(self.q[k] > self.q[j]):
170 break
171 self._swap(k, j)
172 k = j
173
175 return len(self.q) - 1
176
178 if (self.size()) < self.maxsize:
179 self.q.append(value)
180 self._swim(self.size())
181 else:
182 self._replaceRoot(value)
183
186
188 if(self.q[1] > value):
189 self.q[1] = value
190 self._sink(1)
191
193 """
194 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
195 Represents an immutable, partitioned collection of elements that can be
196 operated on in parallel.
197 """
198
199 - def __init__(self, jrdd, ctx, jrdd_deserializer):
200 self._jrdd = jrdd
201 self.is_cached = False
202 self.is_checkpointed = False
203 self.ctx = ctx
204 self._jrdd_deserializer = jrdd_deserializer
205 self._id = jrdd.id()
206
208 """
209 A unique ID for this RDD (within its SparkContext).
210 """
211 return self._id
212
214 return self._jrdd.toString()
215
216 @property
218 """
219 The L{SparkContext} that this RDD was created on.
220 """
221 return self.ctx
222
224 """
225 Persist this RDD with the default storage level (C{MEMORY_ONLY}).
226 """
227 self.is_cached = True
228 self._jrdd.cache()
229 return self
230
232 """
233 Set this RDD's storage level to persist its values across operations after the first time
234 it is computed. This can only be used to assign a new storage level if the RDD does not
235 have a storage level set yet.
236 """
237 self.is_cached = True
238 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
239 self._jrdd.persist(javaStorageLevel)
240 return self
241
243 """
244 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
245 """
246 self.is_cached = False
247 self._jrdd.unpersist()
248 return self
249
251 """
252 Mark this RDD for checkpointing. It will be saved to a file inside the
253 checkpoint directory set with L{SparkContext.setCheckpointDir()} and
254 all references to its parent RDDs will be removed. This function must
255 be called before any job has been executed on this RDD. It is strongly
256 recommended that this RDD is persisted in memory, otherwise saving it
257 on a file will require recomputation.
258 """
259 self.is_checkpointed = True
260 self._jrdd.rdd().checkpoint()
261
263 """
264 Return whether this RDD has been checkpointed or not
265 """
266 return self._jrdd.rdd().isCheckpointed()
267
269 """
270 Gets the name of the file to which this RDD was checkpointed
271 """
272 checkpointFile = self._jrdd.rdd().getCheckpointFile()
273 if checkpointFile.isDefined():
274 return checkpointFile.get()
275 else:
276 return None
277
278 - def map(self, f, preservesPartitioning=False):
279 """
280 Return a new RDD by applying a function to each element of this RDD.
281
282 >>> rdd = sc.parallelize(["b", "a", "c"])
283 >>> sorted(rdd.map(lambda x: (x, 1)).collect())
284 [('a', 1), ('b', 1), ('c', 1)]
285 """
286 def func(split, iterator): return imap(f, iterator)
287 return PipelinedRDD(self, func, preservesPartitioning)
288
289 - def flatMap(self, f, preservesPartitioning=False):
290 """
291 Return a new RDD by first applying a function to all elements of this
292 RDD, and then flattening the results.
293
294 >>> rdd = sc.parallelize([2, 3, 4])
295 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
296 [1, 1, 1, 2, 2, 3]
297 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
298 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
299 """
300 def func(s, iterator): return chain.from_iterable(imap(f, iterator))
301 return self.mapPartitionsWithIndex(func, preservesPartitioning)
302
304 """
305 Return a new RDD by applying a function to each partition of this RDD.
306
307 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
308 >>> def f(iterator): yield sum(iterator)
309 >>> rdd.mapPartitions(f).collect()
310 [3, 7]
311 """
312 def func(s, iterator): return f(iterator)
313 return self.mapPartitionsWithIndex(func)
314
316 """
317 Return a new RDD by applying a function to each partition of this RDD,
318 while tracking the index of the original partition.
319
320 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
321 >>> def f(splitIndex, iterator): yield splitIndex
322 >>> rdd.mapPartitionsWithIndex(f).sum()
323 6
324 """
325 return PipelinedRDD(self, f, preservesPartitioning)
326
328 """
329 Deprecated: use mapPartitionsWithIndex instead.
330
331 Return a new RDD by applying a function to each partition of this RDD,
332 while tracking the index of the original partition.
333
334 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
335 >>> def f(splitIndex, iterator): yield splitIndex
336 >>> rdd.mapPartitionsWithSplit(f).sum()
337 6
338 """
339 warnings.warn("mapPartitionsWithSplit is deprecated; "
340 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
341 return self.mapPartitionsWithIndex(f, preservesPartitioning)
342
344 """
345 Return a new RDD containing only the elements that satisfy a predicate.
346
347 >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
348 >>> rdd.filter(lambda x: x % 2 == 0).collect()
349 [2, 4]
350 """
351 def func(iterator): return ifilter(f, iterator)
352 return self.mapPartitions(func)
353
355 """
356 Return a new RDD containing the distinct elements in this RDD.
357
358 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
359 [1, 2, 3]
360 """
361 return self.map(lambda x: (x, None)) \
362 .reduceByKey(lambda x, _: x) \
363 .map(lambda (x, _): x)
364
365 - def sample(self, withReplacement, fraction, seed=None):
366 """
367 Return a sampled subset of this RDD (relies on numpy and falls back
368 on default random generator if numpy is unavailable).
369
370 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
371 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
372 """
373 assert fraction >= 0.0, "Invalid fraction value: %s" % fraction
374 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
375
376
377 - def takeSample(self, withReplacement, num, seed=None):
378 """
379 Return a fixed-size sampled subset of this RDD (currently requires numpy).
380
381 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
382 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
383 """
384
385 fraction = 0.0
386 total = 0
387 multiplier = 3.0
388 initialCount = self.count()
389 maxSelected = 0
390
391 if (num < 0):
392 raise ValueError
393
394 if (initialCount == 0):
395 return list()
396
397 if initialCount > sys.maxint - 1:
398 maxSelected = sys.maxint - 1
399 else:
400 maxSelected = initialCount
401
402 if num > initialCount and not withReplacement:
403 total = maxSelected
404 fraction = multiplier * (maxSelected + 1) / initialCount
405 else:
406 fraction = multiplier * (num + 1) / initialCount
407 total = num
408
409 samples = self.sample(withReplacement, fraction, seed).collect()
410
411
412
413
414 rand = Random(seed)
415 while len(samples) < total:
416 samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect()
417
418 sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint))
419 sampler.shuffle(samples)
420 return samples[0:total]
421
423 """
424 Return the union of this RDD and another one.
425
426 >>> rdd = sc.parallelize([1, 1, 2, 3])
427 >>> rdd.union(rdd).collect()
428 [1, 1, 2, 3, 1, 1, 2, 3]
429 """
430 if self._jrdd_deserializer == other._jrdd_deserializer:
431 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
432 self._jrdd_deserializer)
433 return rdd
434 else:
435
436
437 self_copy = self._reserialize()
438 other_copy = other._reserialize()
439 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
440 self.ctx.serializer)
441
443 """
444 Return the intersection of this RDD and another one. The output will not
445 contain any duplicate elements, even if the input RDDs did.
446
447 Note that this method performs a shuffle internally.
448
449 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
450 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
451 >>> rdd1.intersection(rdd2).collect()
452 [1, 2, 3]
453 """
454 return self.map(lambda v: (v, None)) \
455 .cogroup(other.map(lambda v: (v, None))) \
456 .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \
457 .keys()
458
460 if self._jrdd_deserializer == self.ctx.serializer:
461 return self
462 else:
463 return self.map(lambda x: x, preservesPartitioning=True)
464
466 """
467 Return the union of this RDD and another one.
468
469 >>> rdd = sc.parallelize([1, 1, 2, 3])
470 >>> (rdd + rdd).collect()
471 [1, 1, 2, 3, 1, 1, 2, 3]
472 """
473 if not isinstance(other, RDD):
474 raise TypeError
475 return self.union(other)
476
477 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
478 """
479 Sorts this RDD, which is assumed to consist of (key, value) pairs.
480
481 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
482 >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
483 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
484 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
485 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
486 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
487 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
488 """
489 if numPartitions is None:
490 numPartitions = self.ctx.defaultParallelism
491
492 bounds = list()
493
494
495
496
497 if numPartitions > 1:
498 rddSize = self.count()
499 maxSampleSize = numPartitions * 20.0
500 fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
501
502 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
503 samples = sorted(samples, reverse=(not ascending), key=keyfunc)
504
505
506
507 for i in range(0, numPartitions - 1):
508 index = (len(samples) - 1) * (i + 1) / numPartitions
509 bounds.append(samples[index])
510
511 def rangePartitionFunc(k):
512 p = 0
513 while p < len(bounds) and keyfunc(k) > bounds[p]:
514 p += 1
515 if ascending:
516 return p
517 else:
518 return numPartitions-1-p
519
520 def mapFunc(iterator):
521 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
522
523 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc)
524 .mapPartitions(mapFunc,preservesPartitioning=True)
525 .flatMap(lambda x: x, preservesPartitioning=True))
526
528 """
529 Return an RDD created by coalescing all elements within each partition
530 into a list.
531
532 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
533 >>> sorted(rdd.glom().collect())
534 [[1, 2], [3, 4]]
535 """
536 def func(iterator): yield list(iterator)
537 return self.mapPartitions(func)
538
540 """
541 Return the Cartesian product of this RDD and another one, that is, the
542 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
543 C{b} is in C{other}.
544
545 >>> rdd = sc.parallelize([1, 2])
546 >>> sorted(rdd.cartesian(rdd).collect())
547 [(1, 1), (1, 2), (2, 1), (2, 2)]
548 """
549
550 deserializer = CartesianDeserializer(self._jrdd_deserializer,
551 other._jrdd_deserializer)
552 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
553
554 - def groupBy(self, f, numPartitions=None):
555 """
556 Return an RDD of grouped items.
557
558 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
559 >>> result = rdd.groupBy(lambda x: x % 2).collect()
560 >>> sorted([(x, sorted(y)) for (x, y) in result])
561 [(0, [2, 8]), (1, [1, 1, 3, 5])]
562 """
563 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
564
565 - def pipe(self, command, env={}):
566 """
567 Return an RDD created by piping elements to a forked external process.
568
569 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
570 ['1', '2', '', '3']
571 """
572 def func(iterator):
573 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
574 def pipe_objs(out):
575 for obj in iterator:
576 out.write(str(obj).rstrip('\n') + '\n')
577 out.close()
578 Thread(target=pipe_objs, args=[pipe.stdin]).start()
579 return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
580 return self.mapPartitions(func)
581
583 """
584 Applies a function to all elements of this RDD.
585
586 >>> def f(x): print x
587 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
588 """
589 def processPartition(iterator):
590 for x in iterator:
591 f(x)
592 yield None
593 self.mapPartitions(processPartition).collect()
594
596 """
597 Applies a function to each partition of this RDD.
598
599 >>> def f(iterator):
600 ... for x in iterator:
601 ... print x
602 ... yield None
603 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
604 """
605 self.mapPartitions(f).collect()
606
608 """
609 Return a list that contains all of the elements in this RDD.
610 """
611 with _JavaStackTrace(self.context) as st:
612 bytesInJava = self._jrdd.collect().iterator()
613 return list(self._collect_iterator_through_file(bytesInJava))
614
616
617
618
619 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
620 tempFile.close()
621 self.ctx._writeToFile(iterator, tempFile.name)
622
623 with open(tempFile.name, 'rb') as tempFile:
624 for item in self._jrdd_deserializer.load_stream(tempFile):
625 yield item
626 os.unlink(tempFile.name)
627
629 """
630 Reduces the elements of this RDD using the specified commutative and
631 associative binary operator. Currently reduces partitions locally.
632
633 >>> from operator import add
634 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
635 15
636 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
637 10
638 """
639 def func(iterator):
640 acc = None
641 for obj in iterator:
642 if acc is None:
643 acc = obj
644 else:
645 acc = f(obj, acc)
646 if acc is not None:
647 yield acc
648 vals = self.mapPartitions(func).collect()
649 return reduce(f, vals)
650
651 - def fold(self, zeroValue, op):
652 """
653 Aggregate the elements of each partition, and then the results for all
654 the partitions, using a given associative function and a neutral "zero
655 value."
656
657 The function C{op(t1, t2)} is allowed to modify C{t1} and return it
658 as its result value to avoid object allocation; however, it should not
659 modify C{t2}.
660
661 >>> from operator import add
662 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
663 15
664 """
665 def func(iterator):
666 acc = zeroValue
667 for obj in iterator:
668 acc = op(obj, acc)
669 yield acc
670 vals = self.mapPartitions(func).collect()
671 return reduce(op, vals, zeroValue)
672
673 - def aggregate(self, zeroValue, seqOp, combOp):
674 """
675 Aggregate the elements of each partition, and then the results for all
676 the partitions, using a given combine functions and a neutral "zero
677 value."
678
679 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
680 as its result value to avoid object allocation; however, it should not
681 modify C{t2}.
682
683 The first function (seqOp) can return a different result type, U, than
684 the type of this RDD. Thus, we need one operation for merging a T into an U
685 and one operation for merging two U
686
687 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
688 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
689 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
690 (10, 4)
691 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
692 (0, 0)
693 """
694 def func(iterator):
695 acc = zeroValue
696 for obj in iterator:
697 acc = seqOp(acc, obj)
698 yield acc
699
700 return self.mapPartitions(func).fold(zeroValue, combOp)
701
702
704 """
705 Find the maximum item in this RDD.
706
707 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
708 43.0
709 """
710 return self.reduce(max)
711
713 """
714 Find the maximum item in this RDD.
715
716 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
717 1.0
718 """
719 return self.reduce(min)
720
722 """
723 Add up the elements in this RDD.
724
725 >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
726 6.0
727 """
728 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
729
731 """
732 Return the number of elements in this RDD.
733
734 >>> sc.parallelize([2, 3, 4]).count()
735 3
736 """
737 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
738
740 """
741 Return a L{StatCounter} object that captures the mean, variance
742 and count of the RDD's elements in one operation.
743 """
744 def redFunc(left_counter, right_counter):
745 return left_counter.mergeStats(right_counter)
746
747 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
748
750 """
751 Compute the mean of this RDD's elements.
752
753 >>> sc.parallelize([1, 2, 3]).mean()
754 2.0
755 """
756 return self.stats().mean()
757
759 """
760 Compute the variance of this RDD's elements.
761
762 >>> sc.parallelize([1, 2, 3]).variance()
763 0.666...
764 """
765 return self.stats().variance()
766
768 """
769 Compute the standard deviation of this RDD's elements.
770
771 >>> sc.parallelize([1, 2, 3]).stdev()
772 0.816...
773 """
774 return self.stats().stdev()
775
777 """
778 Compute the sample standard deviation of this RDD's elements (which corrects for bias in
779 estimating the standard deviation by dividing by N-1 instead of N).
780
781 >>> sc.parallelize([1, 2, 3]).sampleStdev()
782 1.0
783 """
784 return self.stats().sampleStdev()
785
787 """
788 Compute the sample variance of this RDD's elements (which corrects for bias in
789 estimating the variance by dividing by N-1 instead of N).
790
791 >>> sc.parallelize([1, 2, 3]).sampleVariance()
792 1.0
793 """
794 return self.stats().sampleVariance()
795
797 """
798 Return the count of each unique value in this RDD as a dictionary of
799 (value, count) pairs.
800
801 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
802 [(1, 2), (2, 3)]
803 """
804 def countPartition(iterator):
805 counts = defaultdict(int)
806 for obj in iterator:
807 counts[obj] += 1
808 yield counts
809 def mergeMaps(m1, m2):
810 for (k, v) in m2.iteritems():
811 m1[k] += v
812 return m1
813 return self.mapPartitions(countPartition).reduce(mergeMaps)
814
815 - def top(self, num):
816 """
817 Get the top N elements from a RDD.
818
819 Note: It returns the list sorted in descending order.
820 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
821 [12]
822 >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
823 [6, 5]
824 """
825 def topIterator(iterator):
826 q = []
827 for k in iterator:
828 if len(q) < num:
829 heapq.heappush(q, k)
830 else:
831 heapq.heappushpop(q, k)
832 yield q
833
834 def merge(a, b):
835 return next(topIterator(a + b))
836
837 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True)
838
840 """
841 Get the N elements from a RDD ordered in ascending order or as specified
842 by the optional key function.
843
844 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
845 [1, 2, 3, 4, 5, 6]
846 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
847 [10, 9, 7, 6, 5, 4]
848 """
849
850 def topNKeyedElems(iterator, key_=None):
851 q = MaxHeapQ(num)
852 for k in iterator:
853 if key_ != None:
854 k = (key_(k), k)
855 q.insert(k)
856 yield q.getElements()
857
858 def unKey(x, key_=None):
859 if key_ != None:
860 x = [i[1] for i in x]
861 return x
862
863 def merge(a, b):
864 return next(topNKeyedElems(a + b))
865 result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
866 return sorted(unKey(result, key), key=key)
867
868
869 - def take(self, num):
870 """
871 Take the first num elements of the RDD.
872
873 This currently scans the partitions *one by one*, so it will be slow if
874 a lot of partitions are required. In that case, use L{collect} to get
875 the whole RDD instead.
876
877 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
878 [2, 3]
879 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
880 [2, 3, 4, 5, 6]
881 """
882 def takeUpToNum(iterator):
883 taken = 0
884 while taken < num:
885 yield next(iterator)
886 taken += 1
887
888 mapped = self.mapPartitions(takeUpToNum)
889 items = []
890
891
892
893 with _JavaStackTrace(self.context) as st:
894 for partition in range(mapped._jrdd.splits().size()):
895 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
896 partitionsToTake[0] = partition
897 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
898 items.extend(mapped._collect_iterator_through_file(iterator))
899 if len(items) >= num:
900 break
901 return items[:num]
902
904 """
905 Return the first element in this RDD.
906
907 >>> sc.parallelize([2, 3, 4]).first()
908 2
909 """
910 return self.take(1)[0]
911
912 - def saveAsTextFile(self, path):
913 """
914 Save this RDD as a text file, using string representations of elements.
915
916 >>> tempFile = NamedTemporaryFile(delete=True)
917 >>> tempFile.close()
918 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
919 >>> from fileinput import input
920 >>> from glob import glob
921 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
922 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
923
924 Empty lines are tolerated when saving to text files.
925
926 >>> tempFile2 = NamedTemporaryFile(delete=True)
927 >>> tempFile2.close()
928 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
929 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
930 '\\n\\n\\nbar\\nfoo\\n'
931 """
932 def func(split, iterator):
933 for x in iterator:
934 if not isinstance(x, basestring):
935 x = unicode(x)
936 yield x.encode("utf-8")
937 keyed = PipelinedRDD(self, func)
938 keyed._bypass_serializer = True
939 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
940
941
942
944 """
945 Return the key-value pairs in this RDD to the master as a dictionary.
946
947 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
948 >>> m[1]
949 2
950 >>> m[3]
951 4
952 """
953 return dict(self.collect())
954
956 """
957 Return an RDD with the keys of each tuple.
958 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
959 >>> m.collect()
960 [1, 3]
961 """
962 return self.map(lambda (k, v): k)
963
965 """
966 Return an RDD with the values of each tuple.
967 >>> m = sc.parallelize([(1, 2), (3, 4)]).values()
968 >>> m.collect()
969 [2, 4]
970 """
971 return self.map(lambda (k, v): v)
972
974 """
975 Merge the values for each key using an associative reduce function.
976
977 This will also perform the merging locally on each mapper before
978 sending results to a reducer, similarly to a "combiner" in MapReduce.
979
980 Output will be hash-partitioned with C{numPartitions} partitions, or
981 the default parallelism level if C{numPartitions} is not specified.
982
983 >>> from operator import add
984 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
985 >>> sorted(rdd.reduceByKey(add).collect())
986 [('a', 2), ('b', 1)]
987 """
988 return self.combineByKey(lambda x: x, func, func, numPartitions)
989
991 """
992 Merge the values for each key using an associative reduce function, but
993 return the results immediately to the master as a dictionary.
994
995 This will also perform the merging locally on each mapper before
996 sending results to a reducer, similarly to a "combiner" in MapReduce.
997
998 >>> from operator import add
999 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1000 >>> sorted(rdd.reduceByKeyLocally(add).items())
1001 [('a', 2), ('b', 1)]
1002 """
1003 def reducePartition(iterator):
1004 m = {}
1005 for (k, v) in iterator:
1006 m[k] = v if k not in m else func(m[k], v)
1007 yield m
1008 def mergeMaps(m1, m2):
1009 for (k, v) in m2.iteritems():
1010 m1[k] = v if k not in m1 else func(m1[k], v)
1011 return m1
1012 return self.mapPartitions(reducePartition).reduce(mergeMaps)
1013
1015 """
1016 Count the number of elements for each key, and return the result to the
1017 master as a dictionary.
1018
1019 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1020 >>> sorted(rdd.countByKey().items())
1021 [('a', 2), ('b', 1)]
1022 """
1023 return self.map(lambda x: x[0]).countByValue()
1024
1025 - def join(self, other, numPartitions=None):
1026 """
1027 Return an RDD containing all pairs of elements with matching keys in
1028 C{self} and C{other}.
1029
1030 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
1031 (k, v1) is in C{self} and (k, v2) is in C{other}.
1032
1033 Performs a hash join across the cluster.
1034
1035 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1036 >>> y = sc.parallelize([("a", 2), ("a", 3)])
1037 >>> sorted(x.join(y).collect())
1038 [('a', (1, 2)), ('a', (1, 3))]
1039 """
1040 return python_join(self, other, numPartitions)
1041
1043 """
1044 Perform a left outer join of C{self} and C{other}.
1045
1046 For each element (k, v) in C{self}, the resulting RDD will either
1047 contain all pairs (k, (v, w)) for w in C{other}, or the pair
1048 (k, (v, None)) if no elements in other have key k.
1049
1050 Hash-partitions the resulting RDD into the given number of partitions.
1051
1052 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1053 >>> y = sc.parallelize([("a", 2)])
1054 >>> sorted(x.leftOuterJoin(y).collect())
1055 [('a', (1, 2)), ('b', (4, None))]
1056 """
1057 return python_left_outer_join(self, other, numPartitions)
1058
1060 """
1061 Perform a right outer join of C{self} and C{other}.
1062
1063 For each element (k, w) in C{other}, the resulting RDD will either
1064 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
1065 if no elements in C{self} have key k.
1066
1067 Hash-partitions the resulting RDD into the given number of partitions.
1068
1069 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1070 >>> y = sc.parallelize([("a", 2)])
1071 >>> sorted(y.rightOuterJoin(x).collect())
1072 [('a', (2, 1)), ('b', (None, 4))]
1073 """
1074 return python_right_outer_join(self, other, numPartitions)
1075
1076
1077
1078
1079 - def partitionBy(self, numPartitions, partitionFunc=portable_hash):
1080 """
1081 Return a copy of the RDD partitioned using the specified partitioner.
1082
1083 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
1084 >>> sets = pairs.partitionBy(2).glom().collect()
1085 >>> set(sets[0]).intersection(set(sets[1]))
1086 set([])
1087 """
1088 if numPartitions is None:
1089 numPartitions = self.ctx.defaultParallelism
1090
1091
1092
1093
1094 outputSerializer = self.ctx._unbatched_serializer
1095 def add_shuffle_key(split, iterator):
1096
1097 buckets = defaultdict(list)
1098
1099 for (k, v) in iterator:
1100 buckets[partitionFunc(k) % numPartitions].append((k, v))
1101 for (split, items) in buckets.iteritems():
1102 yield pack_long(split)
1103 yield outputSerializer.dumps(items)
1104 keyed = PipelinedRDD(self, add_shuffle_key)
1105 keyed._bypass_serializer = True
1106 with _JavaStackTrace(self.context) as st:
1107 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
1108 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
1109 id(partitionFunc))
1110 jrdd = pairRDD.partitionBy(partitioner).values()
1111 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
1112
1113
1114 rdd._partitionFunc = partitionFunc
1115 return rdd
1116
1117
1118 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
1119 numPartitions=None):
1120 """
1121 Generic function to combine the elements for each key using a custom
1122 set of aggregation functions.
1123
1124 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
1125 type" C. Note that V and C can be different -- for example, one might
1126 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
1127
1128 Users provide three functions:
1129
1130 - C{createCombiner}, which turns a V into a C (e.g., creates
1131 a one-element list)
1132 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
1133 a list)
1134 - C{mergeCombiners}, to combine two C's into a single one.
1135
1136 In addition, users can control the partitioning of the output RDD.
1137
1138 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1139 >>> def f(x): return x
1140 >>> def add(a, b): return a + str(b)
1141 >>> sorted(x.combineByKey(str, add, add).collect())
1142 [('a', '11'), ('b', '1')]
1143 """
1144 if numPartitions is None:
1145 numPartitions = self.ctx.defaultParallelism
1146 def combineLocally(iterator):
1147 combiners = {}
1148 for x in iterator:
1149 (k, v) = x
1150 if k not in combiners:
1151 combiners[k] = createCombiner(v)
1152 else:
1153 combiners[k] = mergeValue(combiners[k], v)
1154 return combiners.iteritems()
1155 locally_combined = self.mapPartitions(combineLocally)
1156 shuffled = locally_combined.partitionBy(numPartitions)
1157 def _mergeCombiners(iterator):
1158 combiners = {}
1159 for (k, v) in iterator:
1160 if not k in combiners:
1161 combiners[k] = v
1162 else:
1163 combiners[k] = mergeCombiners(combiners[k], v)
1164 return combiners.iteritems()
1165 return shuffled.mapPartitions(_mergeCombiners)
1166
1167 - def foldByKey(self, zeroValue, func, numPartitions=None):
1168 """
1169 Merge the values for each key using an associative function "func" and a neutral "zeroValue"
1170 which may be added to the result an arbitrary number of times, and must not change
1171 the result (e.g., 0 for addition, or 1 for multiplication.).
1172
1173 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1174 >>> from operator import add
1175 >>> rdd.foldByKey(0, add).collect()
1176 [('a', 2), ('b', 1)]
1177 """
1178 return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
1179
1180
1181
1183 """
1184 Group the values for each key in the RDD into a single sequence.
1185 Hash-partitions the resulting RDD with into numPartitions partitions.
1186
1187 Note: If you are grouping in order to perform an aggregation (such as a
1188 sum or average) over each key, using reduceByKey will provide much better
1189 performance.
1190
1191 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1192 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
1193 [('a', [1, 1]), ('b', [1])]
1194 """
1195
1196 def createCombiner(x):
1197 return [x]
1198
1199 def mergeValue(xs, x):
1200 xs.append(x)
1201 return xs
1202
1203 def mergeCombiners(a, b):
1204 return a + b
1205
1206 return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
1207 numPartitions).mapValues(lambda x: ResultIterable(x))
1208
1209
1211 """
1212 Pass each value in the key-value pair RDD through a flatMap function
1213 without changing the keys; this also retains the original RDD's
1214 partitioning.
1215
1216 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
1217 >>> def f(x): return x
1218 >>> x.flatMapValues(f).collect()
1219 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
1220 """
1221 flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
1222 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1223
1225 """
1226 Pass each value in the key-value pair RDD through a map function
1227 without changing the keys; this also retains the original RDD's
1228 partitioning.
1229
1230 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
1231 >>> def f(x): return len(x)
1232 >>> x.mapValues(f).collect()
1233 [('a', 3), ('b', 1)]
1234 """
1235 map_values_fn = lambda (k, v): (k, f(v))
1236 return self.map(map_values_fn, preservesPartitioning=True)
1237
1238
1240 """
1241 Alias for cogroup.
1242 """
1243 return self.cogroup(other)
1244
1245
1246 - def cogroup(self, other, numPartitions=None):
1247 """
1248 For each key k in C{self} or C{other}, return a resulting RDD that
1249 contains a tuple with the list of values for that key in C{self} as well
1250 as C{other}.
1251
1252 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1253 >>> y = sc.parallelize([("a", 2)])
1254 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
1255 [('a', ([1], [2])), ('b', ([4], []))]
1256 """
1257 return python_cogroup(self, other, numPartitions)
1258
1260 """
1261 Return each (key, value) pair in C{self} that has no pair with matching key
1262 in C{other}.
1263
1264 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
1265 >>> y = sc.parallelize([("a", 3), ("c", None)])
1266 >>> sorted(x.subtractByKey(y).collect())
1267 [('b', 4), ('b', 5)]
1268 """
1269 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
1270 map_func = lambda (key, vals): [(key, val) for val in vals[0]]
1271 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1272
1273 - def subtract(self, other, numPartitions=None):
1274 """
1275 Return each value in C{self} that is not contained in C{other}.
1276
1277 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
1278 >>> y = sc.parallelize([("a", 3), ("c", None)])
1279 >>> sorted(x.subtract(y).collect())
1280 [('a', 1), ('b', 4), ('b', 5)]
1281 """
1282 rdd = other.map(lambda x: (x, True))
1283 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
1284
1286 """
1287 Creates tuples of the elements in this RDD by applying C{f}.
1288
1289 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
1290 >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
1291 >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
1292 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
1293 """
1294 return self.map(lambda x: (f(x), x))
1295
1297 """
1298 Return a new RDD that has exactly numPartitions partitions.
1299
1300 Can increase or decrease the level of parallelism in this RDD. Internally, this uses
1301 a shuffle to redistribute data.
1302 If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
1303 which can avoid performing a shuffle.
1304 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
1305 >>> sorted(rdd.glom().collect())
1306 [[1], [2, 3], [4, 5], [6, 7]]
1307 >>> len(rdd.repartition(2).glom().collect())
1308 2
1309 >>> len(rdd.repartition(10).glom().collect())
1310 10
1311 """
1312 jrdd = self._jrdd.repartition(numPartitions)
1313 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1314
1315 - def coalesce(self, numPartitions, shuffle=False):
1316 """
1317 Return a new RDD that is reduced into `numPartitions` partitions.
1318 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
1319 [[1], [2, 3], [4, 5]]
1320 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
1321 [[1, 2, 3, 4, 5]]
1322 """
1323 jrdd = self._jrdd.coalesce(numPartitions)
1324 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1325
1326 - def zip(self, other):
1327 """
1328 Zips this RDD with another one, returning key-value pairs with the first element in each RDD
1329 second element in each RDD, etc. Assumes that the two RDDs have the same number of
1330 partitions and the same number of elements in each partition (e.g. one was made through
1331 a map on the other).
1332
1333 >>> x = sc.parallelize(range(0,5))
1334 >>> y = sc.parallelize(range(1000, 1005))
1335 >>> x.zip(y).collect()
1336 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
1337 """
1338 pairRDD = self._jrdd.zip(other._jrdd)
1339 deserializer = PairDeserializer(self._jrdd_deserializer,
1340 other._jrdd_deserializer)
1341 return RDD(pairRDD, self.ctx, deserializer)
1342
1344 """
1345 Return the name of this RDD.
1346 """
1347 name_ = self._jrdd.name()
1348 if not name_:
1349 return None
1350 return name_.encode('utf-8')
1351
1353 """
1354 Assign a name to this RDD.
1355 >>> rdd1 = sc.parallelize([1,2])
1356 >>> rdd1.setName('RDD1')
1357 >>> rdd1.name()
1358 'RDD1'
1359 """
1360 self._jrdd.setName(name)
1361
1363 """
1364 A description of this RDD and its recursive dependencies for debugging.
1365 """
1366 debug_string = self._jrdd.toDebugString()
1367 if not debug_string:
1368 return None
1369 return debug_string.encode('utf-8')
1370
1372 """
1373 Get the RDD's current storage level.
1374 >>> rdd1 = sc.parallelize([1,2])
1375 >>> rdd1.getStorageLevel()
1376 StorageLevel(False, False, False, False, 1)
1377 """
1378 java_storage_level = self._jrdd.getStorageLevel()
1379 storage_level = StorageLevel(java_storage_level.useDisk(),
1380 java_storage_level.useMemory(),
1381 java_storage_level.useOffHeap(),
1382 java_storage_level.deserialized(),
1383 java_storage_level.replication())
1384 return storage_level
1385
1392 """
1393 Pipelined maps:
1394 >>> rdd = sc.parallelize([1, 2, 3, 4])
1395 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
1396 [4, 8, 12, 16]
1397 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
1398 [4, 8, 12, 16]
1399
1400 Pipelined reduces:
1401 >>> from operator import add
1402 >>> rdd.map(lambda x: 2 * x).reduce(add)
1403 20
1404 >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
1405 20
1406 """
1407 - def __init__(self, prev, func, preservesPartitioning=False):
1408 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
1409
1410 self.func = func
1411 self.preservesPartitioning = preservesPartitioning
1412 self._prev_jrdd = prev._jrdd
1413 self._prev_jrdd_deserializer = prev._jrdd_deserializer
1414 else:
1415 prev_func = prev.func
1416 def pipeline_func(split, iterator):
1417 return func(split, prev_func(split, iterator))
1418 self.func = pipeline_func
1419 self.preservesPartitioning = \
1420 prev.preservesPartitioning and preservesPartitioning
1421 self._prev_jrdd = prev._prev_jrdd
1422 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
1423 self.is_cached = False
1424 self.is_checkpointed = False
1425 self.ctx = prev.ctx
1426 self.prev = prev
1427 self._jrdd_val = None
1428 self._jrdd_deserializer = self.ctx.serializer
1429 self._bypass_serializer = False
1430
1431 @property
1433 if self._jrdd_val:
1434 return self._jrdd_val
1435 if self._bypass_serializer:
1436 serializer = NoOpSerializer()
1437 else:
1438 serializer = self.ctx.serializer
1439 command = (self.func, self._prev_jrdd_deserializer, serializer)
1440 pickled_command = CloudPickleSerializer().dumps(command)
1441 broadcast_vars = ListConverter().convert(
1442 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
1443 self.ctx._gateway._gateway_client)
1444 self.ctx._pickled_broadcast_vars.clear()
1445 class_tag = self._prev_jrdd.classTag()
1446 env = MapConverter().convert(self.ctx.environment,
1447 self.ctx._gateway._gateway_client)
1448 includes = ListConverter().convert(self.ctx._python_includes,
1449 self.ctx._gateway._gateway_client)
1450 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
1451 bytearray(pickled_command), env, includes, self.preservesPartitioning,
1452 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
1453 class_tag)
1454 self._jrdd_val = python_rdd.asJavaRDD()
1455 return self._jrdd_val
1456
1458 return not (self.is_cached or self.is_checkpointed)
1459
1462 import doctest
1463 from pyspark.context import SparkContext
1464 globs = globals().copy()
1465
1466
1467 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
1468 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
1469 globs['sc'].stop()
1470 if failure_count:
1471 exit(-1)
1472
1473
1474 if __name__ == "__main__":
1475 _test()
1476