1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from base64 import standard_b64encode as b64enc
19 import copy
20 from collections import defaultdict
21 from collections import namedtuple
22 from itertools import chain, ifilter, imap
23 import operator
24 import os
25 import sys
26 import shlex
27 import traceback
28 from subprocess import Popen, PIPE
29 from tempfile import NamedTemporaryFile
30 from threading import Thread
31 import warnings
32 import heapq
33 from random import Random
34
35 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
36 BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
37 from pyspark.join import python_join, python_left_outer_join, \
38 python_right_outer_join, python_cogroup
39 from pyspark.statcounter import StatCounter
40 from pyspark.rddsampler import RDDSampler
41 from pyspark.storagelevel import StorageLevel
42 from pyspark.resultiterable import ResultIterable
43
44 from py4j.java_collections import ListConverter, MapConverter
45
46 __all__ = ["RDD"]
50 """
51 This function returns the traceback info for a callsite, returns a dict
52 with function name, file name and line number
53 """
54 tb = traceback.extract_stack()
55 callsite = namedtuple("Callsite", "function file linenum")
56 if len(tb) == 0:
57 return None
58 file, line, module, what = tb[len(tb) - 1]
59 sparkpath = os.path.dirname(file)
60 first_spark_frame = len(tb) - 1
61 for i in range(0, len(tb)):
62 file, line, fun, what = tb[i]
63 if file.startswith(sparkpath):
64 first_spark_frame = i
65 break
66 if first_spark_frame == 0:
67 file, line, fun, what = tb[0]
68 return callsite(function=fun, file=file, linenum=line)
69 sfile, sline, sfun, swhat = tb[first_spark_frame]
70 ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
71 return callsite(function=sfun, file=ufile, linenum=uline)
72
73 _spark_stack_depth = 0
77 tb = _extract_concise_traceback()
78 if tb is not None:
79 self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum)
80 else:
81 self._traceback = "Error! Could not extract traceback info"
82 self._context = sc
83
89
95
97 """
98 An implementation of MaxHeap.
99 >>> import pyspark.rdd
100 >>> heap = pyspark.rdd.MaxHeapQ(5)
101 >>> [heap.insert(i) for i in range(10)]
102 [None, None, None, None, None, None, None, None, None, None]
103 >>> sorted(heap.getElements())
104 [0, 1, 2, 3, 4]
105 >>> heap = pyspark.rdd.MaxHeapQ(5)
106 >>> [heap.insert(i) for i in range(9, -1, -1)]
107 [None, None, None, None, None, None, None, None, None, None]
108 >>> sorted(heap.getElements())
109 [0, 1, 2, 3, 4]
110 >>> heap = pyspark.rdd.MaxHeapQ(1)
111 >>> [heap.insert(i) for i in range(9, -1, -1)]
112 [None, None, None, None, None, None, None, None, None, None]
113 >>> heap.getElements()
114 [0]
115 """
116
118
119 self.q = [0]
120 self.maxsize = maxsize
121
123 while (k > 1) and (self.q[k/2] < self.q[k]):
124 self._swap(k, k/2)
125 k = k/2
126
128 t = self.q[i]
129 self.q[i] = self.q[j]
130 self.q[j] = t
131
133 N = self.size()
134 while 2 * k <= N:
135 j = 2 * k
136
137
138 if j < N and self.q[j] < self.q[j + 1]:
139 j = j + 1
140 if(self.q[k] > self.q[j]):
141 break
142 self._swap(k, j)
143 k = j
144
146 return len(self.q) - 1
147
149 if (self.size()) < self.maxsize:
150 self.q.append(value)
151 self._swim(self.size())
152 else:
153 self._replaceRoot(value)
154
157
159 if(self.q[1] > value):
160 self.q[1] = value
161 self._sink(1)
162
164 """
165 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
166 Represents an immutable, partitioned collection of elements that can be
167 operated on in parallel.
168 """
169
170 - def __init__(self, jrdd, ctx, jrdd_deserializer):
171 self._jrdd = jrdd
172 self.is_cached = False
173 self.is_checkpointed = False
174 self.ctx = ctx
175 self._jrdd_deserializer = jrdd_deserializer
176 self._id = jrdd.id()
177
179 """
180 A unique ID for this RDD (within its SparkContext).
181 """
182 return self._id
183
185 return self._jrdd.toString()
186
187 @property
189 """
190 The L{SparkContext} that this RDD was created on.
191 """
192 return self.ctx
193
195 """
196 Persist this RDD with the default storage level (C{MEMORY_ONLY}).
197 """
198 self.is_cached = True
199 self._jrdd.cache()
200 return self
201
203 """
204 Set this RDD's storage level to persist its values across operations after the first time
205 it is computed. This can only be used to assign a new storage level if the RDD does not
206 have a storage level set yet.
207 """
208 self.is_cached = True
209 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
210 self._jrdd.persist(javaStorageLevel)
211 return self
212
214 """
215 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
216 """
217 self.is_cached = False
218 self._jrdd.unpersist()
219 return self
220
222 """
223 Mark this RDD for checkpointing. It will be saved to a file inside the
224 checkpoint directory set with L{SparkContext.setCheckpointDir()} and
225 all references to its parent RDDs will be removed. This function must
226 be called before any job has been executed on this RDD. It is strongly
227 recommended that this RDD is persisted in memory, otherwise saving it
228 on a file will require recomputation.
229 """
230 self.is_checkpointed = True
231 self._jrdd.rdd().checkpoint()
232
234 """
235 Return whether this RDD has been checkpointed or not
236 """
237 return self._jrdd.rdd().isCheckpointed()
238
240 """
241 Gets the name of the file to which this RDD was checkpointed
242 """
243 checkpointFile = self._jrdd.rdd().getCheckpointFile()
244 if checkpointFile.isDefined():
245 return checkpointFile.get()
246 else:
247 return None
248
249 - def map(self, f, preservesPartitioning=False):
250 """
251 Return a new RDD by applying a function to each element of this RDD.
252
253 >>> rdd = sc.parallelize(["b", "a", "c"])
254 >>> sorted(rdd.map(lambda x: (x, 1)).collect())
255 [('a', 1), ('b', 1), ('c', 1)]
256 """
257 def func(split, iterator): return imap(f, iterator)
258 return PipelinedRDD(self, func, preservesPartitioning)
259
260 - def flatMap(self, f, preservesPartitioning=False):
261 """
262 Return a new RDD by first applying a function to all elements of this
263 RDD, and then flattening the results.
264
265 >>> rdd = sc.parallelize([2, 3, 4])
266 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
267 [1, 1, 1, 2, 2, 3]
268 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
269 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
270 """
271 def func(s, iterator): return chain.from_iterable(imap(f, iterator))
272 return self.mapPartitionsWithIndex(func, preservesPartitioning)
273
275 """
276 Return a new RDD by applying a function to each partition of this RDD.
277
278 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
279 >>> def f(iterator): yield sum(iterator)
280 >>> rdd.mapPartitions(f).collect()
281 [3, 7]
282 """
283 def func(s, iterator): return f(iterator)
284 return self.mapPartitionsWithIndex(func)
285
287 """
288 Return a new RDD by applying a function to each partition of this RDD,
289 while tracking the index of the original partition.
290
291 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
292 >>> def f(splitIndex, iterator): yield splitIndex
293 >>> rdd.mapPartitionsWithIndex(f).sum()
294 6
295 """
296 return PipelinedRDD(self, f, preservesPartitioning)
297
299 """
300 Deprecated: use mapPartitionsWithIndex instead.
301
302 Return a new RDD by applying a function to each partition of this RDD,
303 while tracking the index of the original partition.
304
305 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
306 >>> def f(splitIndex, iterator): yield splitIndex
307 >>> rdd.mapPartitionsWithSplit(f).sum()
308 6
309 """
310 warnings.warn("mapPartitionsWithSplit is deprecated; "
311 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
312 return self.mapPartitionsWithIndex(f, preservesPartitioning)
313
315 """
316 Return a new RDD containing only the elements that satisfy a predicate.
317
318 >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
319 >>> rdd.filter(lambda x: x % 2 == 0).collect()
320 [2, 4]
321 """
322 def func(iterator): return ifilter(f, iterator)
323 return self.mapPartitions(func)
324
326 """
327 Return a new RDD containing the distinct elements in this RDD.
328
329 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
330 [1, 2, 3]
331 """
332 return self.map(lambda x: (x, None)) \
333 .reduceByKey(lambda x, _: x) \
334 .map(lambda (x, _): x)
335
336 - def sample(self, withReplacement, fraction, seed=None):
337 """
338 Return a sampled subset of this RDD (relies on numpy and falls back
339 on default random generator if numpy is unavailable).
340
341 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
342 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
343 """
344 assert fraction >= 0.0, "Invalid fraction value: %s" % fraction
345 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
346
347
348 - def takeSample(self, withReplacement, num, seed=None):
349 """
350 Return a fixed-size sampled subset of this RDD (currently requires numpy).
351
352 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
353 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
354 """
355
356 fraction = 0.0
357 total = 0
358 multiplier = 3.0
359 initialCount = self.count()
360 maxSelected = 0
361
362 if (num < 0):
363 raise ValueError
364
365 if (initialCount == 0):
366 return list()
367
368 if initialCount > sys.maxint - 1:
369 maxSelected = sys.maxint - 1
370 else:
371 maxSelected = initialCount
372
373 if num > initialCount and not withReplacement:
374 total = maxSelected
375 fraction = multiplier * (maxSelected + 1) / initialCount
376 else:
377 fraction = multiplier * (num + 1) / initialCount
378 total = num
379
380 samples = self.sample(withReplacement, fraction, seed).collect()
381
382
383
384
385 rand = Random(seed)
386 while len(samples) < total:
387 samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect()
388
389 sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint))
390 sampler.shuffle(samples)
391 return samples[0:total]
392
394 """
395 Return the union of this RDD and another one.
396
397 >>> rdd = sc.parallelize([1, 1, 2, 3])
398 >>> rdd.union(rdd).collect()
399 [1, 1, 2, 3, 1, 1, 2, 3]
400 """
401 if self._jrdd_deserializer == other._jrdd_deserializer:
402 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
403 self._jrdd_deserializer)
404 return rdd
405 else:
406
407
408 self_copy = self._reserialize()
409 other_copy = other._reserialize()
410 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
411 self.ctx.serializer)
412
414 """
415 Return the intersection of this RDD and another one. The output will not
416 contain any duplicate elements, even if the input RDDs did.
417
418 Note that this method performs a shuffle internally.
419
420 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
421 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
422 >>> rdd1.intersection(rdd2).collect()
423 [1, 2, 3]
424 """
425 return self.map(lambda v: (v, None)) \
426 .cogroup(other.map(lambda v: (v, None))) \
427 .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \
428 .keys()
429
431 if self._jrdd_deserializer == self.ctx.serializer:
432 return self
433 else:
434 return self.map(lambda x: x, preservesPartitioning=True)
435
437 """
438 Return the union of this RDD and another one.
439
440 >>> rdd = sc.parallelize([1, 1, 2, 3])
441 >>> (rdd + rdd).collect()
442 [1, 1, 2, 3, 1, 1, 2, 3]
443 """
444 if not isinstance(other, RDD):
445 raise TypeError
446 return self.union(other)
447
448 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
449 """
450 Sorts this RDD, which is assumed to consist of (key, value) pairs.
451
452 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
453 >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
454 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
455 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
456 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
457 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
458 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
459 """
460 if numPartitions is None:
461 numPartitions = self.ctx.defaultParallelism
462
463 bounds = list()
464
465
466
467
468 if numPartitions > 1:
469 rddSize = self.count()
470 maxSampleSize = numPartitions * 20.0
471 fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
472
473 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
474 samples = sorted(samples, reverse=(not ascending), key=keyfunc)
475
476
477
478 for i in range(0, numPartitions - 1):
479 index = (len(samples) - 1) * (i + 1) / numPartitions
480 bounds.append(samples[index])
481
482 def rangePartitionFunc(k):
483 p = 0
484 while p < len(bounds) and keyfunc(k) > bounds[p]:
485 p += 1
486 if ascending:
487 return p
488 else:
489 return numPartitions-1-p
490
491 def mapFunc(iterator):
492 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
493
494 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc)
495 .mapPartitions(mapFunc,preservesPartitioning=True)
496 .flatMap(lambda x: x, preservesPartitioning=True))
497
499 """
500 Return an RDD created by coalescing all elements within each partition
501 into a list.
502
503 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
504 >>> sorted(rdd.glom().collect())
505 [[1, 2], [3, 4]]
506 """
507 def func(iterator): yield list(iterator)
508 return self.mapPartitions(func)
509
511 """
512 Return the Cartesian product of this RDD and another one, that is, the
513 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
514 C{b} is in C{other}.
515
516 >>> rdd = sc.parallelize([1, 2])
517 >>> sorted(rdd.cartesian(rdd).collect())
518 [(1, 1), (1, 2), (2, 1), (2, 2)]
519 """
520
521 deserializer = CartesianDeserializer(self._jrdd_deserializer,
522 other._jrdd_deserializer)
523 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
524
525 - def groupBy(self, f, numPartitions=None):
526 """
527 Return an RDD of grouped items.
528
529 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
530 >>> result = rdd.groupBy(lambda x: x % 2).collect()
531 >>> sorted([(x, sorted(y)) for (x, y) in result])
532 [(0, [2, 8]), (1, [1, 1, 3, 5])]
533 """
534 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
535
536 - def pipe(self, command, env={}):
537 """
538 Return an RDD created by piping elements to a forked external process.
539
540 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
541 ['1', '2', '', '3']
542 """
543 def func(iterator):
544 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
545 def pipe_objs(out):
546 for obj in iterator:
547 out.write(str(obj).rstrip('\n') + '\n')
548 out.close()
549 Thread(target=pipe_objs, args=[pipe.stdin]).start()
550 return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
551 return self.mapPartitions(func)
552
554 """
555 Applies a function to all elements of this RDD.
556
557 >>> def f(x): print x
558 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
559 """
560 def processPartition(iterator):
561 for x in iterator:
562 f(x)
563 yield None
564 self.mapPartitions(processPartition).collect()
565
567 """
568 Applies a function to each partition of this RDD.
569
570 >>> def f(iterator):
571 ... for x in iterator:
572 ... print x
573 ... yield None
574 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
575 """
576 self.mapPartitions(f).collect()
577
579 """
580 Return a list that contains all of the elements in this RDD.
581 """
582 with _JavaStackTrace(self.context) as st:
583 bytesInJava = self._jrdd.collect().iterator()
584 return list(self._collect_iterator_through_file(bytesInJava))
585
587
588
589
590 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
591 tempFile.close()
592 self.ctx._writeToFile(iterator, tempFile.name)
593
594 with open(tempFile.name, 'rb') as tempFile:
595 for item in self._jrdd_deserializer.load_stream(tempFile):
596 yield item
597 os.unlink(tempFile.name)
598
600 """
601 Reduces the elements of this RDD using the specified commutative and
602 associative binary operator. Currently reduces partitions locally.
603
604 >>> from operator import add
605 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
606 15
607 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
608 10
609 """
610 def func(iterator):
611 acc = None
612 for obj in iterator:
613 if acc is None:
614 acc = obj
615 else:
616 acc = f(obj, acc)
617 if acc is not None:
618 yield acc
619 vals = self.mapPartitions(func).collect()
620 return reduce(f, vals)
621
622 - def fold(self, zeroValue, op):
623 """
624 Aggregate the elements of each partition, and then the results for all
625 the partitions, using a given associative function and a neutral "zero
626 value."
627
628 The function C{op(t1, t2)} is allowed to modify C{t1} and return it
629 as its result value to avoid object allocation; however, it should not
630 modify C{t2}.
631
632 >>> from operator import add
633 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
634 15
635 """
636 def func(iterator):
637 acc = zeroValue
638 for obj in iterator:
639 acc = op(obj, acc)
640 yield acc
641 vals = self.mapPartitions(func).collect()
642 return reduce(op, vals, zeroValue)
643
644 - def aggregate(self, zeroValue, seqOp, combOp):
645 """
646 Aggregate the elements of each partition, and then the results for all
647 the partitions, using a given combine functions and a neutral "zero
648 value."
649
650 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
651 as its result value to avoid object allocation; however, it should not
652 modify C{t2}.
653
654 The first function (seqOp) can return a different result type, U, than
655 the type of this RDD. Thus, we need one operation for merging a T into an U
656 and one operation for merging two U
657
658 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
659 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
660 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
661 (10, 4)
662 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
663 (0, 0)
664 """
665 def func(iterator):
666 acc = zeroValue
667 for obj in iterator:
668 acc = seqOp(acc, obj)
669 yield acc
670
671 return self.mapPartitions(func).fold(zeroValue, combOp)
672
673
675 """
676 Find the maximum item in this RDD.
677
678 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
679 43.0
680 """
681 return self.reduce(max)
682
684 """
685 Find the maximum item in this RDD.
686
687 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
688 1.0
689 """
690 return self.reduce(min)
691
693 """
694 Add up the elements in this RDD.
695
696 >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
697 6.0
698 """
699 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
700
702 """
703 Return the number of elements in this RDD.
704
705 >>> sc.parallelize([2, 3, 4]).count()
706 3
707 """
708 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
709
711 """
712 Return a L{StatCounter} object that captures the mean, variance
713 and count of the RDD's elements in one operation.
714 """
715 def redFunc(left_counter, right_counter):
716 return left_counter.mergeStats(right_counter)
717
718 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
719
721 """
722 Compute the mean of this RDD's elements.
723
724 >>> sc.parallelize([1, 2, 3]).mean()
725 2.0
726 """
727 return self.stats().mean()
728
730 """
731 Compute the variance of this RDD's elements.
732
733 >>> sc.parallelize([1, 2, 3]).variance()
734 0.666...
735 """
736 return self.stats().variance()
737
739 """
740 Compute the standard deviation of this RDD's elements.
741
742 >>> sc.parallelize([1, 2, 3]).stdev()
743 0.816...
744 """
745 return self.stats().stdev()
746
748 """
749 Compute the sample standard deviation of this RDD's elements (which corrects for bias in
750 estimating the standard deviation by dividing by N-1 instead of N).
751
752 >>> sc.parallelize([1, 2, 3]).sampleStdev()
753 1.0
754 """
755 return self.stats().sampleStdev()
756
758 """
759 Compute the sample variance of this RDD's elements (which corrects for bias in
760 estimating the variance by dividing by N-1 instead of N).
761
762 >>> sc.parallelize([1, 2, 3]).sampleVariance()
763 1.0
764 """
765 return self.stats().sampleVariance()
766
768 """
769 Return the count of each unique value in this RDD as a dictionary of
770 (value, count) pairs.
771
772 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
773 [(1, 2), (2, 3)]
774 """
775 def countPartition(iterator):
776 counts = defaultdict(int)
777 for obj in iterator:
778 counts[obj] += 1
779 yield counts
780 def mergeMaps(m1, m2):
781 for (k, v) in m2.iteritems():
782 m1[k] += v
783 return m1
784 return self.mapPartitions(countPartition).reduce(mergeMaps)
785
786 - def top(self, num):
787 """
788 Get the top N elements from a RDD.
789
790 Note: It returns the list sorted in descending order.
791 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
792 [12]
793 >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
794 [6, 5]
795 """
796 def topIterator(iterator):
797 q = []
798 for k in iterator:
799 if len(q) < num:
800 heapq.heappush(q, k)
801 else:
802 heapq.heappushpop(q, k)
803 yield q
804
805 def merge(a, b):
806 return next(topIterator(a + b))
807
808 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True)
809
811 """
812 Get the N elements from a RDD ordered in ascending order or as specified
813 by the optional key function.
814
815 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
816 [1, 2, 3, 4, 5, 6]
817 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
818 [10, 9, 7, 6, 5, 4]
819 """
820
821 def topNKeyedElems(iterator, key_=None):
822 q = MaxHeapQ(num)
823 for k in iterator:
824 if key_ != None:
825 k = (key_(k), k)
826 q.insert(k)
827 yield q.getElements()
828
829 def unKey(x, key_=None):
830 if key_ != None:
831 x = [i[1] for i in x]
832 return x
833
834 def merge(a, b):
835 return next(topNKeyedElems(a + b))
836 result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
837 return sorted(unKey(result, key), key=key)
838
839
840 - def take(self, num):
841 """
842 Take the first num elements of the RDD.
843
844 This currently scans the partitions *one by one*, so it will be slow if
845 a lot of partitions are required. In that case, use L{collect} to get
846 the whole RDD instead.
847
848 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
849 [2, 3]
850 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
851 [2, 3, 4, 5, 6]
852 """
853 def takeUpToNum(iterator):
854 taken = 0
855 while taken < num:
856 yield next(iterator)
857 taken += 1
858
859 mapped = self.mapPartitions(takeUpToNum)
860 items = []
861
862
863
864 with _JavaStackTrace(self.context) as st:
865 for partition in range(mapped._jrdd.splits().size()):
866 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
867 partitionsToTake[0] = partition
868 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
869 items.extend(mapped._collect_iterator_through_file(iterator))
870 if len(items) >= num:
871 break
872 return items[:num]
873
875 """
876 Return the first element in this RDD.
877
878 >>> sc.parallelize([2, 3, 4]).first()
879 2
880 """
881 return self.take(1)[0]
882
883 - def saveAsTextFile(self, path):
884 """
885 Save this RDD as a text file, using string representations of elements.
886
887 >>> tempFile = NamedTemporaryFile(delete=True)
888 >>> tempFile.close()
889 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
890 >>> from fileinput import input
891 >>> from glob import glob
892 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
893 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
894
895 Empty lines are tolerated when saving to text files.
896
897 >>> tempFile2 = NamedTemporaryFile(delete=True)
898 >>> tempFile2.close()
899 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
900 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
901 '\\n\\n\\nbar\\nfoo\\n'
902 """
903 def func(split, iterator):
904 for x in iterator:
905 if not isinstance(x, basestring):
906 x = unicode(x)
907 yield x.encode("utf-8")
908 keyed = PipelinedRDD(self, func)
909 keyed._bypass_serializer = True
910 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
911
912
913
915 """
916 Return the key-value pairs in this RDD to the master as a dictionary.
917
918 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
919 >>> m[1]
920 2
921 >>> m[3]
922 4
923 """
924 return dict(self.collect())
925
927 """
928 Return an RDD with the keys of each tuple.
929 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
930 >>> m.collect()
931 [1, 3]
932 """
933 return self.map(lambda (k, v): k)
934
936 """
937 Return an RDD with the values of each tuple.
938 >>> m = sc.parallelize([(1, 2), (3, 4)]).values()
939 >>> m.collect()
940 [2, 4]
941 """
942 return self.map(lambda (k, v): v)
943
945 """
946 Merge the values for each key using an associative reduce function.
947
948 This will also perform the merging locally on each mapper before
949 sending results to a reducer, similarly to a "combiner" in MapReduce.
950
951 Output will be hash-partitioned with C{numPartitions} partitions, or
952 the default parallelism level if C{numPartitions} is not specified.
953
954 >>> from operator import add
955 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
956 >>> sorted(rdd.reduceByKey(add).collect())
957 [('a', 2), ('b', 1)]
958 """
959 return self.combineByKey(lambda x: x, func, func, numPartitions)
960
962 """
963 Merge the values for each key using an associative reduce function, but
964 return the results immediately to the master as a dictionary.
965
966 This will also perform the merging locally on each mapper before
967 sending results to a reducer, similarly to a "combiner" in MapReduce.
968
969 >>> from operator import add
970 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
971 >>> sorted(rdd.reduceByKeyLocally(add).items())
972 [('a', 2), ('b', 1)]
973 """
974 def reducePartition(iterator):
975 m = {}
976 for (k, v) in iterator:
977 m[k] = v if k not in m else func(m[k], v)
978 yield m
979 def mergeMaps(m1, m2):
980 for (k, v) in m2.iteritems():
981 m1[k] = v if k not in m1 else func(m1[k], v)
982 return m1
983 return self.mapPartitions(reducePartition).reduce(mergeMaps)
984
986 """
987 Count the number of elements for each key, and return the result to the
988 master as a dictionary.
989
990 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
991 >>> sorted(rdd.countByKey().items())
992 [('a', 2), ('b', 1)]
993 """
994 return self.map(lambda x: x[0]).countByValue()
995
996 - def join(self, other, numPartitions=None):
997 """
998 Return an RDD containing all pairs of elements with matching keys in
999 C{self} and C{other}.
1000
1001 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
1002 (k, v1) is in C{self} and (k, v2) is in C{other}.
1003
1004 Performs a hash join across the cluster.
1005
1006 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1007 >>> y = sc.parallelize([("a", 2), ("a", 3)])
1008 >>> sorted(x.join(y).collect())
1009 [('a', (1, 2)), ('a', (1, 3))]
1010 """
1011 return python_join(self, other, numPartitions)
1012
1014 """
1015 Perform a left outer join of C{self} and C{other}.
1016
1017 For each element (k, v) in C{self}, the resulting RDD will either
1018 contain all pairs (k, (v, w)) for w in C{other}, or the pair
1019 (k, (v, None)) if no elements in other have key k.
1020
1021 Hash-partitions the resulting RDD into the given number of partitions.
1022
1023 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1024 >>> y = sc.parallelize([("a", 2)])
1025 >>> sorted(x.leftOuterJoin(y).collect())
1026 [('a', (1, 2)), ('b', (4, None))]
1027 """
1028 return python_left_outer_join(self, other, numPartitions)
1029
1031 """
1032 Perform a right outer join of C{self} and C{other}.
1033
1034 For each element (k, w) in C{other}, the resulting RDD will either
1035 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
1036 if no elements in C{self} have key k.
1037
1038 Hash-partitions the resulting RDD into the given number of partitions.
1039
1040 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1041 >>> y = sc.parallelize([("a", 2)])
1042 >>> sorted(y.rightOuterJoin(x).collect())
1043 [('a', (2, 1)), ('b', (None, 4))]
1044 """
1045 return python_right_outer_join(self, other, numPartitions)
1046
1047
1048 - def partitionBy(self, numPartitions, partitionFunc=None):
1049 """
1050 Return a copy of the RDD partitioned using the specified partitioner.
1051
1052 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
1053 >>> sets = pairs.partitionBy(2).glom().collect()
1054 >>> set(sets[0]).intersection(set(sets[1]))
1055 set([])
1056 """
1057 if numPartitions is None:
1058 numPartitions = self.ctx.defaultParallelism
1059
1060 if partitionFunc is None:
1061 partitionFunc = lambda x: 0 if x is None else hash(x)
1062
1063
1064
1065 outputSerializer = self.ctx._unbatched_serializer
1066 def add_shuffle_key(split, iterator):
1067
1068 buckets = defaultdict(list)
1069
1070 for (k, v) in iterator:
1071 buckets[partitionFunc(k) % numPartitions].append((k, v))
1072 for (split, items) in buckets.iteritems():
1073 yield pack_long(split)
1074 yield outputSerializer.dumps(items)
1075 keyed = PipelinedRDD(self, add_shuffle_key)
1076 keyed._bypass_serializer = True
1077 with _JavaStackTrace(self.context) as st:
1078 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
1079 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
1080 id(partitionFunc))
1081 jrdd = pairRDD.partitionBy(partitioner).values()
1082 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
1083
1084
1085 rdd._partitionFunc = partitionFunc
1086 return rdd
1087
1088
1089 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
1090 numPartitions=None):
1091 """
1092 Generic function to combine the elements for each key using a custom
1093 set of aggregation functions.
1094
1095 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
1096 type" C. Note that V and C can be different -- for example, one might
1097 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
1098
1099 Users provide three functions:
1100
1101 - C{createCombiner}, which turns a V into a C (e.g., creates
1102 a one-element list)
1103 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
1104 a list)
1105 - C{mergeCombiners}, to combine two C's into a single one.
1106
1107 In addition, users can control the partitioning of the output RDD.
1108
1109 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1110 >>> def f(x): return x
1111 >>> def add(a, b): return a + str(b)
1112 >>> sorted(x.combineByKey(str, add, add).collect())
1113 [('a', '11'), ('b', '1')]
1114 """
1115 if numPartitions is None:
1116 numPartitions = self.ctx.defaultParallelism
1117 def combineLocally(iterator):
1118 combiners = {}
1119 for x in iterator:
1120 (k, v) = x
1121 if k not in combiners:
1122 combiners[k] = createCombiner(v)
1123 else:
1124 combiners[k] = mergeValue(combiners[k], v)
1125 return combiners.iteritems()
1126 locally_combined = self.mapPartitions(combineLocally)
1127 shuffled = locally_combined.partitionBy(numPartitions)
1128 def _mergeCombiners(iterator):
1129 combiners = {}
1130 for (k, v) in iterator:
1131 if not k in combiners:
1132 combiners[k] = v
1133 else:
1134 combiners[k] = mergeCombiners(combiners[k], v)
1135 return combiners.iteritems()
1136 return shuffled.mapPartitions(_mergeCombiners)
1137
1138 - def foldByKey(self, zeroValue, func, numPartitions=None):
1139 """
1140 Merge the values for each key using an associative function "func" and a neutral "zeroValue"
1141 which may be added to the result an arbitrary number of times, and must not change
1142 the result (e.g., 0 for addition, or 1 for multiplication.).
1143
1144 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1145 >>> from operator import add
1146 >>> rdd.foldByKey(0, add).collect()
1147 [('a', 2), ('b', 1)]
1148 """
1149 return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
1150
1151
1152
1154 """
1155 Group the values for each key in the RDD into a single sequence.
1156 Hash-partitions the resulting RDD with into numPartitions partitions.
1157
1158 Note: If you are grouping in order to perform an aggregation (such as a
1159 sum or average) over each key, using reduceByKey will provide much better
1160 performance.
1161
1162 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1163 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
1164 [('a', [1, 1]), ('b', [1])]
1165 """
1166
1167 def createCombiner(x):
1168 return [x]
1169
1170 def mergeValue(xs, x):
1171 xs.append(x)
1172 return xs
1173
1174 def mergeCombiners(a, b):
1175 return a + b
1176
1177 return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
1178 numPartitions).mapValues(lambda x: ResultIterable(x))
1179
1180
1182 """
1183 Pass each value in the key-value pair RDD through a flatMap function
1184 without changing the keys; this also retains the original RDD's
1185 partitioning.
1186
1187 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
1188 >>> def f(x): return x
1189 >>> x.flatMapValues(f).collect()
1190 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
1191 """
1192 flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
1193 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1194
1196 """
1197 Pass each value in the key-value pair RDD through a map function
1198 without changing the keys; this also retains the original RDD's
1199 partitioning.
1200
1201 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
1202 >>> def f(x): return len(x)
1203 >>> x.mapValues(f).collect()
1204 [('a', 3), ('b', 1)]
1205 """
1206 map_values_fn = lambda (k, v): (k, f(v))
1207 return self.map(map_values_fn, preservesPartitioning=True)
1208
1209
1211 """
1212 Alias for cogroup.
1213 """
1214 return self.cogroup(other)
1215
1216
1217 - def cogroup(self, other, numPartitions=None):
1218 """
1219 For each key k in C{self} or C{other}, return a resulting RDD that
1220 contains a tuple with the list of values for that key in C{self} as well
1221 as C{other}.
1222
1223 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1224 >>> y = sc.parallelize([("a", 2)])
1225 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
1226 [('a', ([1], [2])), ('b', ([4], []))]
1227 """
1228 return python_cogroup(self, other, numPartitions)
1229
1231 """
1232 Return each (key, value) pair in C{self} that has no pair with matching key
1233 in C{other}.
1234
1235 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
1236 >>> y = sc.parallelize([("a", 3), ("c", None)])
1237 >>> sorted(x.subtractByKey(y).collect())
1238 [('b', 4), ('b', 5)]
1239 """
1240 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
1241 map_func = lambda (key, vals): [(key, val) for val in vals[0]]
1242 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1243
1244 - def subtract(self, other, numPartitions=None):
1245 """
1246 Return each value in C{self} that is not contained in C{other}.
1247
1248 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
1249 >>> y = sc.parallelize([("a", 3), ("c", None)])
1250 >>> sorted(x.subtract(y).collect())
1251 [('a', 1), ('b', 4), ('b', 5)]
1252 """
1253 rdd = other.map(lambda x: (x, True))
1254 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
1255
1257 """
1258 Creates tuples of the elements in this RDD by applying C{f}.
1259
1260 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
1261 >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
1262 >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
1263 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
1264 """
1265 return self.map(lambda x: (f(x), x))
1266
1268 """
1269 Return a new RDD that has exactly numPartitions partitions.
1270
1271 Can increase or decrease the level of parallelism in this RDD. Internally, this uses
1272 a shuffle to redistribute data.
1273 If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
1274 which can avoid performing a shuffle.
1275 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
1276 >>> sorted(rdd.glom().collect())
1277 [[1], [2, 3], [4, 5], [6, 7]]
1278 >>> len(rdd.repartition(2).glom().collect())
1279 2
1280 >>> len(rdd.repartition(10).glom().collect())
1281 10
1282 """
1283 jrdd = self._jrdd.repartition(numPartitions)
1284 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1285
1286 - def coalesce(self, numPartitions, shuffle=False):
1287 """
1288 Return a new RDD that is reduced into `numPartitions` partitions.
1289 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
1290 [[1], [2, 3], [4, 5]]
1291 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
1292 [[1, 2, 3, 4, 5]]
1293 """
1294 jrdd = self._jrdd.coalesce(numPartitions)
1295 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1296
1297 - def zip(self, other):
1298 """
1299 Zips this RDD with another one, returning key-value pairs with the first element in each RDD
1300 second element in each RDD, etc. Assumes that the two RDDs have the same number of
1301 partitions and the same number of elements in each partition (e.g. one was made through
1302 a map on the other).
1303
1304 >>> x = sc.parallelize(range(0,5))
1305 >>> y = sc.parallelize(range(1000, 1005))
1306 >>> x.zip(y).collect()
1307 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
1308 """
1309 pairRDD = self._jrdd.zip(other._jrdd)
1310 deserializer = PairDeserializer(self._jrdd_deserializer,
1311 other._jrdd_deserializer)
1312 return RDD(pairRDD, self.ctx, deserializer)
1313
1315 """
1316 Return the name of this RDD.
1317 """
1318 name_ = self._jrdd.name()
1319 if not name_:
1320 return None
1321 return name_.encode('utf-8')
1322
1324 """
1325 Assign a name to this RDD.
1326 >>> rdd1 = sc.parallelize([1,2])
1327 >>> rdd1.setName('RDD1')
1328 >>> rdd1.name()
1329 'RDD1'
1330 """
1331 self._jrdd.setName(name)
1332
1334 """
1335 A description of this RDD and its recursive dependencies for debugging.
1336 """
1337 debug_string = self._jrdd.toDebugString()
1338 if not debug_string:
1339 return None
1340 return debug_string.encode('utf-8')
1341
1343 """
1344 Get the RDD's current storage level.
1345 >>> rdd1 = sc.parallelize([1,2])
1346 >>> rdd1.getStorageLevel()
1347 StorageLevel(False, False, False, False, 1)
1348 """
1349 java_storage_level = self._jrdd.getStorageLevel()
1350 storage_level = StorageLevel(java_storage_level.useDisk(),
1351 java_storage_level.useMemory(),
1352 java_storage_level.useOffHeap(),
1353 java_storage_level.deserialized(),
1354 java_storage_level.replication())
1355 return storage_level
1356
1363 """
1364 Pipelined maps:
1365 >>> rdd = sc.parallelize([1, 2, 3, 4])
1366 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
1367 [4, 8, 12, 16]
1368 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
1369 [4, 8, 12, 16]
1370
1371 Pipelined reduces:
1372 >>> from operator import add
1373 >>> rdd.map(lambda x: 2 * x).reduce(add)
1374 20
1375 >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
1376 20
1377 """
1378 - def __init__(self, prev, func, preservesPartitioning=False):
1379 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
1380
1381 self.func = func
1382 self.preservesPartitioning = preservesPartitioning
1383 self._prev_jrdd = prev._jrdd
1384 self._prev_jrdd_deserializer = prev._jrdd_deserializer
1385 else:
1386 prev_func = prev.func
1387 def pipeline_func(split, iterator):
1388 return func(split, prev_func(split, iterator))
1389 self.func = pipeline_func
1390 self.preservesPartitioning = \
1391 prev.preservesPartitioning and preservesPartitioning
1392 self._prev_jrdd = prev._prev_jrdd
1393 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
1394 self.is_cached = False
1395 self.is_checkpointed = False
1396 self.ctx = prev.ctx
1397 self.prev = prev
1398 self._jrdd_val = None
1399 self._jrdd_deserializer = self.ctx.serializer
1400 self._bypass_serializer = False
1401
1402 @property
1404 if self._jrdd_val:
1405 return self._jrdd_val
1406 if self._bypass_serializer:
1407 serializer = NoOpSerializer()
1408 else:
1409 serializer = self.ctx.serializer
1410 command = (self.func, self._prev_jrdd_deserializer, serializer)
1411 pickled_command = CloudPickleSerializer().dumps(command)
1412 broadcast_vars = ListConverter().convert(
1413 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
1414 self.ctx._gateway._gateway_client)
1415 self.ctx._pickled_broadcast_vars.clear()
1416 class_tag = self._prev_jrdd.classTag()
1417 env = MapConverter().convert(self.ctx.environment,
1418 self.ctx._gateway._gateway_client)
1419 includes = ListConverter().convert(self.ctx._python_includes,
1420 self.ctx._gateway._gateway_client)
1421 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
1422 bytearray(pickled_command), env, includes, self.preservesPartitioning,
1423 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
1424 class_tag)
1425 self._jrdd_val = python_rdd.asJavaRDD()
1426 return self._jrdd_val
1427
1429 return not (self.is_cached or self.is_checkpointed)
1430
1433 import doctest
1434 from pyspark.context import SparkContext
1435 globs = globals().copy()
1436
1437
1438 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
1439 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
1440 globs['sc'].stop()
1441 if failure_count:
1442 exit(-1)
1443
1444
1445 if __name__ == "__main__":
1446 _test()
1447