Package pyspark :: Module rdd
[frames] | no frames]

Source Code for Module pyspark.rdd

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one or more 
   3  # contributor license agreements.  See the NOTICE file distributed with 
   4  # this work for additional information regarding copyright ownership. 
   5  # The ASF licenses this file to You under the Apache License, Version 2.0 
   6  # (the "License"); you may not use this file except in compliance with 
   7  # the License.  You may obtain a copy of the License at 
   8  # 
   9  #    http://www.apache.org/licenses/LICENSE-2.0 
  10  # 
  11  # Unless required by applicable law or agreed to in writing, software 
  12  # distributed under the License is distributed on an "AS IS" BASIS, 
  13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14  # See the License for the specific language governing permissions and 
  15  # limitations under the License. 
  16  # 
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from collections import namedtuple 
  22  from itertools import chain, ifilter, imap 
  23  import operator 
  24  import os 
  25  import sys 
  26  import shlex 
  27  import traceback 
  28  from subprocess import Popen, PIPE 
  29  from tempfile import NamedTemporaryFile 
  30  from threading import Thread 
  31  import warnings 
  32  import heapq 
  33  from random import Random 
  34   
  35  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  36      BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long 
  37  from pyspark.join import python_join, python_left_outer_join, \ 
  38      python_right_outer_join, python_cogroup 
  39  from pyspark.statcounter import StatCounter 
  40  from pyspark.rddsampler import RDDSampler 
  41  from pyspark.storagelevel import StorageLevel 
  42  from pyspark.resultiterable import ResultIterable 
  43   
  44  from py4j.java_collections import ListConverter, MapConverter 
  45   
  46  __all__ = ["RDD"] 
47 48 49 -def _extract_concise_traceback():
50 """ 51 This function returns the traceback info for a callsite, returns a dict 52 with function name, file name and line number 53 """ 54 tb = traceback.extract_stack() 55 callsite = namedtuple("Callsite", "function file linenum") 56 if len(tb) == 0: 57 return None 58 file, line, module, what = tb[len(tb) - 1] 59 sparkpath = os.path.dirname(file) 60 first_spark_frame = len(tb) - 1 61 for i in range(0, len(tb)): 62 file, line, fun, what = tb[i] 63 if file.startswith(sparkpath): 64 first_spark_frame = i 65 break 66 if first_spark_frame == 0: 67 file, line, fun, what = tb[0] 68 return callsite(function=fun, file=file, linenum=line) 69 sfile, sline, sfun, swhat = tb[first_spark_frame] 70 ufile, uline, ufun, uwhat = tb[first_spark_frame-1] 71 return callsite(function=sfun, file=ufile, linenum=uline)
72 73 _spark_stack_depth = 0
74 75 -class _JavaStackTrace(object):
76 - def __init__(self, sc):
77 tb = _extract_concise_traceback() 78 if tb is not None: 79 self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum) 80 else: 81 self._traceback = "Error! Could not extract traceback info" 82 self._context = sc
83
84 - def __enter__(self):
85 global _spark_stack_depth 86 if _spark_stack_depth == 0: 87 self._context._jsc.setCallSite(self._traceback) 88 _spark_stack_depth += 1
89
90 - def __exit__(self, type, value, tb):
91 global _spark_stack_depth 92 _spark_stack_depth -= 1 93 if _spark_stack_depth == 0: 94 self._context._jsc.setCallSite(None)
95
96 -class MaxHeapQ(object):
97 """ 98 An implementation of MaxHeap. 99 >>> import pyspark.rdd 100 >>> heap = pyspark.rdd.MaxHeapQ(5) 101 >>> [heap.insert(i) for i in range(10)] 102 [None, None, None, None, None, None, None, None, None, None] 103 >>> sorted(heap.getElements()) 104 [0, 1, 2, 3, 4] 105 >>> heap = pyspark.rdd.MaxHeapQ(5) 106 >>> [heap.insert(i) for i in range(9, -1, -1)] 107 [None, None, None, None, None, None, None, None, None, None] 108 >>> sorted(heap.getElements()) 109 [0, 1, 2, 3, 4] 110 >>> heap = pyspark.rdd.MaxHeapQ(1) 111 >>> [heap.insert(i) for i in range(9, -1, -1)] 112 [None, None, None, None, None, None, None, None, None, None] 113 >>> heap.getElements() 114 [0] 115 """ 116
117 - def __init__(self, maxsize):
118 # we start from q[1], this makes calculating children as trivial as 2 * k 119 self.q = [0] 120 self.maxsize = maxsize
121
122 - def _swim(self, k):
123 while (k > 1) and (self.q[k/2] < self.q[k]): 124 self._swap(k, k/2) 125 k = k/2
126
127 - def _swap(self, i, j):
128 t = self.q[i] 129 self.q[i] = self.q[j] 130 self.q[j] = t
131
132 - def _sink(self, k):
133 N = self.size() 134 while 2 * k <= N: 135 j = 2 * k 136 # Here we test if both children are greater than parent 137 # if not swap with larger one. 138 if j < N and self.q[j] < self.q[j + 1]: 139 j = j + 1 140 if(self.q[k] > self.q[j]): 141 break 142 self._swap(k, j) 143 k = j
144
145 - def size(self):
146 return len(self.q) - 1
147
148 - def insert(self, value):
149 if (self.size()) < self.maxsize: 150 self.q.append(value) 151 self._swim(self.size()) 152 else: 153 self._replaceRoot(value)
154
155 - def getElements(self):
156 return self.q[1:]
157
158 - def _replaceRoot(self, value):
159 if(self.q[1] > value): 160 self.q[1] = value 161 self._sink(1)
162
163 -class RDD(object):
164 """ 165 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 166 Represents an immutable, partitioned collection of elements that can be 167 operated on in parallel. 168 """ 169
170 - def __init__(self, jrdd, ctx, jrdd_deserializer):
171 self._jrdd = jrdd 172 self.is_cached = False 173 self.is_checkpointed = False 174 self.ctx = ctx 175 self._jrdd_deserializer = jrdd_deserializer 176 self._id = jrdd.id()
177
178 - def id(self):
179 """ 180 A unique ID for this RDD (within its SparkContext). 181 """ 182 return self._id
183
184 - def __repr__(self):
185 return self._jrdd.toString()
186 187 @property
188 - def context(self):
189 """ 190 The L{SparkContext} that this RDD was created on. 191 """ 192 return self.ctx
193
194 - def cache(self):
195 """ 196 Persist this RDD with the default storage level (C{MEMORY_ONLY}). 197 """ 198 self.is_cached = True 199 self._jrdd.cache() 200 return self
201
202 - def persist(self, storageLevel):
203 """ 204 Set this RDD's storage level to persist its values across operations after the first time 205 it is computed. This can only be used to assign a new storage level if the RDD does not 206 have a storage level set yet. 207 """ 208 self.is_cached = True 209 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 210 self._jrdd.persist(javaStorageLevel) 211 return self
212
213 - def unpersist(self):
214 """ 215 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 216 """ 217 self.is_cached = False 218 self._jrdd.unpersist() 219 return self
220
221 - def checkpoint(self):
222 """ 223 Mark this RDD for checkpointing. It will be saved to a file inside the 224 checkpoint directory set with L{SparkContext.setCheckpointDir()} and 225 all references to its parent RDDs will be removed. This function must 226 be called before any job has been executed on this RDD. It is strongly 227 recommended that this RDD is persisted in memory, otherwise saving it 228 on a file will require recomputation. 229 """ 230 self.is_checkpointed = True 231 self._jrdd.rdd().checkpoint()
232
233 - def isCheckpointed(self):
234 """ 235 Return whether this RDD has been checkpointed or not 236 """ 237 return self._jrdd.rdd().isCheckpointed()
238
239 - def getCheckpointFile(self):
240 """ 241 Gets the name of the file to which this RDD was checkpointed 242 """ 243 checkpointFile = self._jrdd.rdd().getCheckpointFile() 244 if checkpointFile.isDefined(): 245 return checkpointFile.get() 246 else: 247 return None
248
249 - def map(self, f, preservesPartitioning=False):
250 """ 251 Return a new RDD by applying a function to each element of this RDD. 252 253 >>> rdd = sc.parallelize(["b", "a", "c"]) 254 >>> sorted(rdd.map(lambda x: (x, 1)).collect()) 255 [('a', 1), ('b', 1), ('c', 1)] 256 """ 257 def func(split, iterator): return imap(f, iterator) 258 return PipelinedRDD(self, func, preservesPartitioning)
259
260 - def flatMap(self, f, preservesPartitioning=False):
261 """ 262 Return a new RDD by first applying a function to all elements of this 263 RDD, and then flattening the results. 264 265 >>> rdd = sc.parallelize([2, 3, 4]) 266 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 267 [1, 1, 1, 2, 2, 3] 268 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 269 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 270 """ 271 def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 272 return self.mapPartitionsWithIndex(func, preservesPartitioning)
273
274 - def mapPartitions(self, f, preservesPartitioning=False):
275 """ 276 Return a new RDD by applying a function to each partition of this RDD. 277 278 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 279 >>> def f(iterator): yield sum(iterator) 280 >>> rdd.mapPartitions(f).collect() 281 [3, 7] 282 """ 283 def func(s, iterator): return f(iterator) 284 return self.mapPartitionsWithIndex(func)
285
286 - def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
287 """ 288 Return a new RDD by applying a function to each partition of this RDD, 289 while tracking the index of the original partition. 290 291 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 292 >>> def f(splitIndex, iterator): yield splitIndex 293 >>> rdd.mapPartitionsWithIndex(f).sum() 294 6 295 """ 296 return PipelinedRDD(self, f, preservesPartitioning)
297
298 - def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
299 """ 300 Deprecated: use mapPartitionsWithIndex instead. 301 302 Return a new RDD by applying a function to each partition of this RDD, 303 while tracking the index of the original partition. 304 305 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 306 >>> def f(splitIndex, iterator): yield splitIndex 307 >>> rdd.mapPartitionsWithSplit(f).sum() 308 6 309 """ 310 warnings.warn("mapPartitionsWithSplit is deprecated; " 311 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 312 return self.mapPartitionsWithIndex(f, preservesPartitioning)
313
314 - def filter(self, f):
315 """ 316 Return a new RDD containing only the elements that satisfy a predicate. 317 318 >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 319 >>> rdd.filter(lambda x: x % 2 == 0).collect() 320 [2, 4] 321 """ 322 def func(iterator): return ifilter(f, iterator) 323 return self.mapPartitions(func)
324
325 - def distinct(self):
326 """ 327 Return a new RDD containing the distinct elements in this RDD. 328 329 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 330 [1, 2, 3] 331 """ 332 return self.map(lambda x: (x, None)) \ 333 .reduceByKey(lambda x, _: x) \ 334 .map(lambda (x, _): x)
335
336 - def sample(self, withReplacement, fraction, seed=None):
337 """ 338 Return a sampled subset of this RDD (relies on numpy and falls back 339 on default random generator if numpy is unavailable). 340 341 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 342 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 343 """ 344 assert fraction >= 0.0, "Invalid fraction value: %s" % fraction 345 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
346 347 # this is ported from scala/spark/RDD.scala
348 - def takeSample(self, withReplacement, num, seed=None):
349 """ 350 Return a fixed-size sampled subset of this RDD (currently requires numpy). 351 352 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 353 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 354 """ 355 356 fraction = 0.0 357 total = 0 358 multiplier = 3.0 359 initialCount = self.count() 360 maxSelected = 0 361 362 if (num < 0): 363 raise ValueError 364 365 if (initialCount == 0): 366 return list() 367 368 if initialCount > sys.maxint - 1: 369 maxSelected = sys.maxint - 1 370 else: 371 maxSelected = initialCount 372 373 if num > initialCount and not withReplacement: 374 total = maxSelected 375 fraction = multiplier * (maxSelected + 1) / initialCount 376 else: 377 fraction = multiplier * (num + 1) / initialCount 378 total = num 379 380 samples = self.sample(withReplacement, fraction, seed).collect() 381 382 # If the first sample didn't turn out large enough, keep trying to take samples; 383 # this shouldn't happen often because we use a big multiplier for their initial size. 384 # See: scala/spark/RDD.scala 385 rand = Random(seed) 386 while len(samples) < total: 387 samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect() 388 389 sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint)) 390 sampler.shuffle(samples) 391 return samples[0:total]
392
393 - def union(self, other):
394 """ 395 Return the union of this RDD and another one. 396 397 >>> rdd = sc.parallelize([1, 1, 2, 3]) 398 >>> rdd.union(rdd).collect() 399 [1, 1, 2, 3, 1, 1, 2, 3] 400 """ 401 if self._jrdd_deserializer == other._jrdd_deserializer: 402 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 403 self._jrdd_deserializer) 404 return rdd 405 else: 406 # These RDDs contain data in different serialized formats, so we 407 # must normalize them to the default serializer. 408 self_copy = self._reserialize() 409 other_copy = other._reserialize() 410 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 411 self.ctx.serializer)
412
413 - def intersection(self, other):
414 """ 415 Return the intersection of this RDD and another one. The output will not 416 contain any duplicate elements, even if the input RDDs did. 417 418 Note that this method performs a shuffle internally. 419 420 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) 421 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) 422 >>> rdd1.intersection(rdd2).collect() 423 [1, 2, 3] 424 """ 425 return self.map(lambda v: (v, None)) \ 426 .cogroup(other.map(lambda v: (v, None))) \ 427 .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ 428 .keys()
429
430 - def _reserialize(self):
431 if self._jrdd_deserializer == self.ctx.serializer: 432 return self 433 else: 434 return self.map(lambda x: x, preservesPartitioning=True)
435
436 - def __add__(self, other):
437 """ 438 Return the union of this RDD and another one. 439 440 >>> rdd = sc.parallelize([1, 1, 2, 3]) 441 >>> (rdd + rdd).collect() 442 [1, 1, 2, 3, 1, 1, 2, 3] 443 """ 444 if not isinstance(other, RDD): 445 raise TypeError 446 return self.union(other)
447
448 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
449 """ 450 Sorts this RDD, which is assumed to consist of (key, value) pairs. 451 452 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 453 >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 454 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 455 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 456 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 457 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 458 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] 459 """ 460 if numPartitions is None: 461 numPartitions = self.ctx.defaultParallelism 462 463 bounds = list() 464 465 # first compute the boundary of each part via sampling: we want to partition 466 # the key-space into bins such that the bins have roughly the same 467 # number of (key, value) pairs falling into them 468 if numPartitions > 1: 469 rddSize = self.count() 470 maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner 471 fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 472 473 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 474 samples = sorted(samples, reverse=(not ascending), key=keyfunc) 475 476 # we have numPartitions many parts but one of the them has 477 # an implicit boundary 478 for i in range(0, numPartitions - 1): 479 index = (len(samples) - 1) * (i + 1) / numPartitions 480 bounds.append(samples[index]) 481 482 def rangePartitionFunc(k): 483 p = 0 484 while p < len(bounds) and keyfunc(k) > bounds[p]: 485 p += 1 486 if ascending: 487 return p 488 else: 489 return numPartitions-1-p
490 491 def mapFunc(iterator): 492 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
493 494 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) 495 .mapPartitions(mapFunc,preservesPartitioning=True) 496 .flatMap(lambda x: x, preservesPartitioning=True)) 497
498 - def glom(self):
499 """ 500 Return an RDD created by coalescing all elements within each partition 501 into a list. 502 503 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 504 >>> sorted(rdd.glom().collect()) 505 [[1, 2], [3, 4]] 506 """ 507 def func(iterator): yield list(iterator) 508 return self.mapPartitions(func)
509
510 - def cartesian(self, other):
511 """ 512 Return the Cartesian product of this RDD and another one, that is, the 513 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 514 C{b} is in C{other}. 515 516 >>> rdd = sc.parallelize([1, 2]) 517 >>> sorted(rdd.cartesian(rdd).collect()) 518 [(1, 1), (1, 2), (2, 1), (2, 2)] 519 """ 520 # Due to batching, we can't use the Java cartesian method. 521 deserializer = CartesianDeserializer(self._jrdd_deserializer, 522 other._jrdd_deserializer) 523 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
524
525 - def groupBy(self, f, numPartitions=None):
526 """ 527 Return an RDD of grouped items. 528 529 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 530 >>> result = rdd.groupBy(lambda x: x % 2).collect() 531 >>> sorted([(x, sorted(y)) for (x, y) in result]) 532 [(0, [2, 8]), (1, [1, 1, 3, 5])] 533 """ 534 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
535
536 - def pipe(self, command, env={}):
537 """ 538 Return an RDD created by piping elements to a forked external process. 539 540 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() 541 ['1', '2', '', '3'] 542 """ 543 def func(iterator): 544 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 545 def pipe_objs(out): 546 for obj in iterator: 547 out.write(str(obj).rstrip('\n') + '\n') 548 out.close()
549 Thread(target=pipe_objs, args=[pipe.stdin]).start() 550 return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) 551 return self.mapPartitions(func) 552
553 - def foreach(self, f):
554 """ 555 Applies a function to all elements of this RDD. 556 557 >>> def f(x): print x 558 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 559 """ 560 def processPartition(iterator): 561 for x in iterator: 562 f(x) 563 yield None
564 self.mapPartitions(processPartition).collect() # Force evaluation 565
566 - def foreachPartition(self, f):
567 """ 568 Applies a function to each partition of this RDD. 569 570 >>> def f(iterator): 571 ... for x in iterator: 572 ... print x 573 ... yield None 574 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) 575 """ 576 self.mapPartitions(f).collect() # Force evaluation
577
578 - def collect(self):
579 """ 580 Return a list that contains all of the elements in this RDD. 581 """ 582 with _JavaStackTrace(self.context) as st: 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava))
585
586 - def _collect_iterator_through_file(self, iterator):
587 # Transferring lots of data through Py4J can be slow because 588 # socket.readline() is inefficient. Instead, we'll dump the data to a 589 # file and read it back. 590 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 591 tempFile.close() 592 self.ctx._writeToFile(iterator, tempFile.name) 593 # Read the data into Python and deserialize it: 594 with open(tempFile.name, 'rb') as tempFile: 595 for item in self._jrdd_deserializer.load_stream(tempFile): 596 yield item 597 os.unlink(tempFile.name)
598
599 - def reduce(self, f):
600 """ 601 Reduces the elements of this RDD using the specified commutative and 602 associative binary operator. Currently reduces partitions locally. 603 604 >>> from operator import add 605 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 606 15 607 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 608 10 609 """ 610 def func(iterator): 611 acc = None 612 for obj in iterator: 613 if acc is None: 614 acc = obj 615 else: 616 acc = f(obj, acc) 617 if acc is not None: 618 yield acc
619 vals = self.mapPartitions(func).collect() 620 return reduce(f, vals) 621
622 - def fold(self, zeroValue, op):
623 """ 624 Aggregate the elements of each partition, and then the results for all 625 the partitions, using a given associative function and a neutral "zero 626 value." 627 628 The function C{op(t1, t2)} is allowed to modify C{t1} and return it 629 as its result value to avoid object allocation; however, it should not 630 modify C{t2}. 631 632 >>> from operator import add 633 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 634 15 635 """ 636 def func(iterator): 637 acc = zeroValue 638 for obj in iterator: 639 acc = op(obj, acc) 640 yield acc
641 vals = self.mapPartitions(func).collect() 642 return reduce(op, vals, zeroValue) 643
644 - def aggregate(self, zeroValue, seqOp, combOp):
645 """ 646 Aggregate the elements of each partition, and then the results for all 647 the partitions, using a given combine functions and a neutral "zero 648 value." 649 650 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it 651 as its result value to avoid object allocation; however, it should not 652 modify C{t2}. 653 654 The first function (seqOp) can return a different result type, U, than 655 the type of this RDD. Thus, we need one operation for merging a T into an U 656 and one operation for merging two U 657 658 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 659 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 660 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 661 (10, 4) 662 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) 663 (0, 0) 664 """ 665 def func(iterator): 666 acc = zeroValue 667 for obj in iterator: 668 acc = seqOp(acc, obj) 669 yield acc
670 671 return self.mapPartitions(func).fold(zeroValue, combOp) 672 673
674 - def max(self):
675 """ 676 Find the maximum item in this RDD. 677 678 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 679 43.0 680 """ 681 return self.reduce(max)
682
683 - def min(self):
684 """ 685 Find the maximum item in this RDD. 686 687 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 688 1.0 689 """ 690 return self.reduce(min)
691
692 - def sum(self):
693 """ 694 Add up the elements in this RDD. 695 696 >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 697 6.0 698 """ 699 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
700
701 - def count(self):
702 """ 703 Return the number of elements in this RDD. 704 705 >>> sc.parallelize([2, 3, 4]).count() 706 3 707 """ 708 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
709
710 - def stats(self):
711 """ 712 Return a L{StatCounter} object that captures the mean, variance 713 and count of the RDD's elements in one operation. 714 """ 715 def redFunc(left_counter, right_counter): 716 return left_counter.mergeStats(right_counter)
717 718 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 719
720 - def mean(self):
721 """ 722 Compute the mean of this RDD's elements. 723 724 >>> sc.parallelize([1, 2, 3]).mean() 725 2.0 726 """ 727 return self.stats().mean()
728
729 - def variance(self):
730 """ 731 Compute the variance of this RDD's elements. 732 733 >>> sc.parallelize([1, 2, 3]).variance() 734 0.666... 735 """ 736 return self.stats().variance()
737
738 - def stdev(self):
739 """ 740 Compute the standard deviation of this RDD's elements. 741 742 >>> sc.parallelize([1, 2, 3]).stdev() 743 0.816... 744 """ 745 return self.stats().stdev()
746
747 - def sampleStdev(self):
748 """ 749 Compute the sample standard deviation of this RDD's elements (which corrects for bias in 750 estimating the standard deviation by dividing by N-1 instead of N). 751 752 >>> sc.parallelize([1, 2, 3]).sampleStdev() 753 1.0 754 """ 755 return self.stats().sampleStdev()
756
757 - def sampleVariance(self):
758 """ 759 Compute the sample variance of this RDD's elements (which corrects for bias in 760 estimating the variance by dividing by N-1 instead of N). 761 762 >>> sc.parallelize([1, 2, 3]).sampleVariance() 763 1.0 764 """ 765 return self.stats().sampleVariance()
766
767 - def countByValue(self):
768 """ 769 Return the count of each unique value in this RDD as a dictionary of 770 (value, count) pairs. 771 772 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 773 [(1, 2), (2, 3)] 774 """ 775 def countPartition(iterator): 776 counts = defaultdict(int) 777 for obj in iterator: 778 counts[obj] += 1 779 yield counts
780 def mergeMaps(m1, m2): 781 for (k, v) in m2.iteritems(): 782 m1[k] += v 783 return m1 784 return self.mapPartitions(countPartition).reduce(mergeMaps) 785
786 - def top(self, num):
787 """ 788 Get the top N elements from a RDD. 789 790 Note: It returns the list sorted in descending order. 791 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 792 [12] 793 >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) 794 [6, 5] 795 """ 796 def topIterator(iterator): 797 q = [] 798 for k in iterator: 799 if len(q) < num: 800 heapq.heappush(q, k) 801 else: 802 heapq.heappushpop(q, k) 803 yield q
804 805 def merge(a, b): 806 return next(topIterator(a + b)) 807 808 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 809
810 - def takeOrdered(self, num, key=None):
811 """ 812 Get the N elements from a RDD ordered in ascending order or as specified 813 by the optional key function. 814 815 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 816 [1, 2, 3, 4, 5, 6] 817 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 818 [10, 9, 7, 6, 5, 4] 819 """ 820 821 def topNKeyedElems(iterator, key_=None): 822 q = MaxHeapQ(num) 823 for k in iterator: 824 if key_ != None: 825 k = (key_(k), k) 826 q.insert(k) 827 yield q.getElements()
828 829 def unKey(x, key_=None): 830 if key_ != None: 831 x = [i[1] for i in x] 832 return x 833 834 def merge(a, b): 835 return next(topNKeyedElems(a + b)) 836 result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) 837 return sorted(unKey(result, key), key=key) 838 839
840 - def take(self, num):
841 """ 842 Take the first num elements of the RDD. 843 844 This currently scans the partitions *one by one*, so it will be slow if 845 a lot of partitions are required. In that case, use L{collect} to get 846 the whole RDD instead. 847 848 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 849 [2, 3] 850 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 851 [2, 3, 4, 5, 6] 852 """ 853 def takeUpToNum(iterator): 854 taken = 0 855 while taken < num: 856 yield next(iterator) 857 taken += 1
858 # Take only up to num elements from each partition we try 859 mapped = self.mapPartitions(takeUpToNum) 860 items = [] 861 # TODO(shivaram): Similar to the scala implementation, update the take 862 # method to scan multiple splits based on an estimate of how many elements 863 # we have per-split. 864 with _JavaStackTrace(self.context) as st: 865 for partition in range(mapped._jrdd.splits().size()): 866 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) 867 partitionsToTake[0] = partition 868 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() 869 items.extend(mapped._collect_iterator_through_file(iterator)) 870 if len(items) >= num: 871 break 872 return items[:num] 873
874 - def first(self):
875 """ 876 Return the first element in this RDD. 877 878 >>> sc.parallelize([2, 3, 4]).first() 879 2 880 """ 881 return self.take(1)[0]
882
883 - def saveAsTextFile(self, path):
884 """ 885 Save this RDD as a text file, using string representations of elements. 886 887 >>> tempFile = NamedTemporaryFile(delete=True) 888 >>> tempFile.close() 889 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 890 >>> from fileinput import input 891 >>> from glob import glob 892 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 893 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 894 895 Empty lines are tolerated when saving to text files. 896 897 >>> tempFile2 = NamedTemporaryFile(delete=True) 898 >>> tempFile2.close() 899 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) 900 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) 901 '\\n\\n\\nbar\\nfoo\\n' 902 """ 903 def func(split, iterator): 904 for x in iterator: 905 if not isinstance(x, basestring): 906 x = unicode(x) 907 yield x.encode("utf-8")
908 keyed = PipelinedRDD(self, func) 909 keyed._bypass_serializer = True 910 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 911 912 # Pair functions 913
914 - def collectAsMap(self):
915 """ 916 Return the key-value pairs in this RDD to the master as a dictionary. 917 918 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 919 >>> m[1] 920 2 921 >>> m[3] 922 4 923 """ 924 return dict(self.collect())
925
926 - def keys(self):
927 """ 928 Return an RDD with the keys of each tuple. 929 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() 930 >>> m.collect() 931 [1, 3] 932 """ 933 return self.map(lambda (k, v): k)
934
935 - def values(self):
936 """ 937 Return an RDD with the values of each tuple. 938 >>> m = sc.parallelize([(1, 2), (3, 4)]).values() 939 >>> m.collect() 940 [2, 4] 941 """ 942 return self.map(lambda (k, v): v)
943
944 - def reduceByKey(self, func, numPartitions=None):
945 """ 946 Merge the values for each key using an associative reduce function. 947 948 This will also perform the merging locally on each mapper before 949 sending results to a reducer, similarly to a "combiner" in MapReduce. 950 951 Output will be hash-partitioned with C{numPartitions} partitions, or 952 the default parallelism level if C{numPartitions} is not specified. 953 954 >>> from operator import add 955 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 956 >>> sorted(rdd.reduceByKey(add).collect()) 957 [('a', 2), ('b', 1)] 958 """ 959 return self.combineByKey(lambda x: x, func, func, numPartitions)
960
961 - def reduceByKeyLocally(self, func):
962 """ 963 Merge the values for each key using an associative reduce function, but 964 return the results immediately to the master as a dictionary. 965 966 This will also perform the merging locally on each mapper before 967 sending results to a reducer, similarly to a "combiner" in MapReduce. 968 969 >>> from operator import add 970 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 971 >>> sorted(rdd.reduceByKeyLocally(add).items()) 972 [('a', 2), ('b', 1)] 973 """ 974 def reducePartition(iterator): 975 m = {} 976 for (k, v) in iterator: 977 m[k] = v if k not in m else func(m[k], v) 978 yield m
979 def mergeMaps(m1, m2): 980 for (k, v) in m2.iteritems(): 981 m1[k] = v if k not in m1 else func(m1[k], v) 982 return m1 983 return self.mapPartitions(reducePartition).reduce(mergeMaps) 984
985 - def countByKey(self):
986 """ 987 Count the number of elements for each key, and return the result to the 988 master as a dictionary. 989 990 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 991 >>> sorted(rdd.countByKey().items()) 992 [('a', 2), ('b', 1)] 993 """ 994 return self.map(lambda x: x[0]).countByValue()
995
996 - def join(self, other, numPartitions=None):
997 """ 998 Return an RDD containing all pairs of elements with matching keys in 999 C{self} and C{other}. 1000 1001 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 1002 (k, v1) is in C{self} and (k, v2) is in C{other}. 1003 1004 Performs a hash join across the cluster. 1005 1006 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1007 >>> y = sc.parallelize([("a", 2), ("a", 3)]) 1008 >>> sorted(x.join(y).collect()) 1009 [('a', (1, 2)), ('a', (1, 3))] 1010 """ 1011 return python_join(self, other, numPartitions)
1012
1013 - def leftOuterJoin(self, other, numPartitions=None):
1014 """ 1015 Perform a left outer join of C{self} and C{other}. 1016 1017 For each element (k, v) in C{self}, the resulting RDD will either 1018 contain all pairs (k, (v, w)) for w in C{other}, or the pair 1019 (k, (v, None)) if no elements in other have key k. 1020 1021 Hash-partitions the resulting RDD into the given number of partitions. 1022 1023 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1024 >>> y = sc.parallelize([("a", 2)]) 1025 >>> sorted(x.leftOuterJoin(y).collect()) 1026 [('a', (1, 2)), ('b', (4, None))] 1027 """ 1028 return python_left_outer_join(self, other, numPartitions)
1029
1030 - def rightOuterJoin(self, other, numPartitions=None):
1031 """ 1032 Perform a right outer join of C{self} and C{other}. 1033 1034 For each element (k, w) in C{other}, the resulting RDD will either 1035 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 1036 if no elements in C{self} have key k. 1037 1038 Hash-partitions the resulting RDD into the given number of partitions. 1039 1040 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1041 >>> y = sc.parallelize([("a", 2)]) 1042 >>> sorted(y.rightOuterJoin(x).collect()) 1043 [('a', (2, 1)), ('b', (None, 4))] 1044 """ 1045 return python_right_outer_join(self, other, numPartitions)
1046 1047 # TODO: add option to control map-side combining
1048 - def partitionBy(self, numPartitions, partitionFunc=None):
1049 """ 1050 Return a copy of the RDD partitioned using the specified partitioner. 1051 1052 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 1053 >>> sets = pairs.partitionBy(2).glom().collect() 1054 >>> set(sets[0]).intersection(set(sets[1])) 1055 set([]) 1056 """ 1057 if numPartitions is None: 1058 numPartitions = self.ctx.defaultParallelism 1059 1060 if partitionFunc is None: 1061 partitionFunc = lambda x: 0 if x is None else hash(x) 1062 # Transferring O(n) objects to Java is too expensive. Instead, we'll 1063 # form the hash buckets in Python, transferring O(numPartitions) objects 1064 # to Java. Each object is a (splitNumber, [objects]) pair. 1065 outputSerializer = self.ctx._unbatched_serializer 1066 def add_shuffle_key(split, iterator): 1067 1068 buckets = defaultdict(list) 1069 1070 for (k, v) in iterator: 1071 buckets[partitionFunc(k) % numPartitions].append((k, v)) 1072 for (split, items) in buckets.iteritems(): 1073 yield pack_long(split) 1074 yield outputSerializer.dumps(items)
1075 keyed = PipelinedRDD(self, add_shuffle_key) 1076 keyed._bypass_serializer = True 1077 with _JavaStackTrace(self.context) as st: 1078 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 1079 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 1080 id(partitionFunc)) 1081 jrdd = pairRDD.partitionBy(partitioner).values() 1082 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 1083 # This is required so that id(partitionFunc) remains unique, even if 1084 # partitionFunc is a lambda: 1085 rdd._partitionFunc = partitionFunc 1086 return rdd 1087 1088 # TODO: add control over map-side aggregation
1089 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 1090 numPartitions=None):
1091 """ 1092 Generic function to combine the elements for each key using a custom 1093 set of aggregation functions. 1094 1095 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 1096 type" C. Note that V and C can be different -- for example, one might 1097 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 1098 1099 Users provide three functions: 1100 1101 - C{createCombiner}, which turns a V into a C (e.g., creates 1102 a one-element list) 1103 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 1104 a list) 1105 - C{mergeCombiners}, to combine two C's into a single one. 1106 1107 In addition, users can control the partitioning of the output RDD. 1108 1109 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1110 >>> def f(x): return x 1111 >>> def add(a, b): return a + str(b) 1112 >>> sorted(x.combineByKey(str, add, add).collect()) 1113 [('a', '11'), ('b', '1')] 1114 """ 1115 if numPartitions is None: 1116 numPartitions = self.ctx.defaultParallelism 1117 def combineLocally(iterator): 1118 combiners = {} 1119 for x in iterator: 1120 (k, v) = x 1121 if k not in combiners: 1122 combiners[k] = createCombiner(v) 1123 else: 1124 combiners[k] = mergeValue(combiners[k], v) 1125 return combiners.iteritems()
1126 locally_combined = self.mapPartitions(combineLocally) 1127 shuffled = locally_combined.partitionBy(numPartitions) 1128 def _mergeCombiners(iterator): 1129 combiners = {} 1130 for (k, v) in iterator: 1131 if not k in combiners: 1132 combiners[k] = v 1133 else: 1134 combiners[k] = mergeCombiners(combiners[k], v) 1135 return combiners.iteritems() 1136 return shuffled.mapPartitions(_mergeCombiners) 1137
1138 - def foldByKey(self, zeroValue, func, numPartitions=None):
1139 """ 1140 Merge the values for each key using an associative function "func" and a neutral "zeroValue" 1141 which may be added to the result an arbitrary number of times, and must not change 1142 the result (e.g., 0 for addition, or 1 for multiplication.). 1143 1144 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1145 >>> from operator import add 1146 >>> rdd.foldByKey(0, add).collect() 1147 [('a', 2), ('b', 1)] 1148 """ 1149 return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
1150 1151 1152 # TODO: support variant with custom partitioner
1153 - def groupByKey(self, numPartitions=None):
1154 """ 1155 Group the values for each key in the RDD into a single sequence. 1156 Hash-partitions the resulting RDD with into numPartitions partitions. 1157 1158 Note: If you are grouping in order to perform an aggregation (such as a 1159 sum or average) over each key, using reduceByKey will provide much better 1160 performance. 1161 1162 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 1163 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) 1164 [('a', [1, 1]), ('b', [1])] 1165 """ 1166 1167 def createCombiner(x): 1168 return [x]
1169 1170 def mergeValue(xs, x): 1171 xs.append(x) 1172 return xs 1173 1174 def mergeCombiners(a, b): 1175 return a + b 1176 1177 return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 1178 numPartitions).mapValues(lambda x: ResultIterable(x)) 1179 1180 # TODO: add tests
1181 - def flatMapValues(self, f):
1182 """ 1183 Pass each value in the key-value pair RDD through a flatMap function 1184 without changing the keys; this also retains the original RDD's 1185 partitioning. 1186 1187 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) 1188 >>> def f(x): return x 1189 >>> x.flatMapValues(f).collect() 1190 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 1191 """ 1192 flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 1193 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1194
1195 - def mapValues(self, f):
1196 """ 1197 Pass each value in the key-value pair RDD through a map function 1198 without changing the keys; this also retains the original RDD's 1199 partitioning. 1200 1201 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) 1202 >>> def f(x): return len(x) 1203 >>> x.mapValues(f).collect() 1204 [('a', 3), ('b', 1)] 1205 """ 1206 map_values_fn = lambda (k, v): (k, f(v)) 1207 return self.map(map_values_fn, preservesPartitioning=True)
1208 1209 # TODO: support varargs cogroup of several RDDs.
1210 - def groupWith(self, other):
1211 """ 1212 Alias for cogroup. 1213 """ 1214 return self.cogroup(other)
1215 1216 # TODO: add variant with custom parittioner
1217 - def cogroup(self, other, numPartitions=None):
1218 """ 1219 For each key k in C{self} or C{other}, return a resulting RDD that 1220 contains a tuple with the list of values for that key in C{self} as well 1221 as C{other}. 1222 1223 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 1224 >>> y = sc.parallelize([("a", 2)]) 1225 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) 1226 [('a', ([1], [2])), ('b', ([4], []))] 1227 """ 1228 return python_cogroup(self, other, numPartitions)
1229
1230 - def subtractByKey(self, other, numPartitions=None):
1231 """ 1232 Return each (key, value) pair in C{self} that has no pair with matching key 1233 in C{other}. 1234 1235 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 1236 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1237 >>> sorted(x.subtractByKey(y).collect()) 1238 [('b', 4), ('b', 5)] 1239 """ 1240 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 1241 map_func = lambda (key, vals): [(key, val) for val in vals[0]] 1242 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1243
1244 - def subtract(self, other, numPartitions=None):
1245 """ 1246 Return each value in C{self} that is not contained in C{other}. 1247 1248 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 1249 >>> y = sc.parallelize([("a", 3), ("c", None)]) 1250 >>> sorted(x.subtract(y).collect()) 1251 [('a', 1), ('b', 4), ('b', 5)] 1252 """ 1253 rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder 1254 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
1255
1256 - def keyBy(self, f):
1257 """ 1258 Creates tuples of the elements in this RDD by applying C{f}. 1259 1260 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 1261 >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 1262 >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) 1263 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 1264 """ 1265 return self.map(lambda x: (f(x), x))
1266
1267 - def repartition(self, numPartitions):
1268 """ 1269 Return a new RDD that has exactly numPartitions partitions. 1270 1271 Can increase or decrease the level of parallelism in this RDD. Internally, this uses 1272 a shuffle to redistribute data. 1273 If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 1274 which can avoid performing a shuffle. 1275 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 1276 >>> sorted(rdd.glom().collect()) 1277 [[1], [2, 3], [4, 5], [6, 7]] 1278 >>> len(rdd.repartition(2).glom().collect()) 1279 2 1280 >>> len(rdd.repartition(10).glom().collect()) 1281 10 1282 """ 1283 jrdd = self._jrdd.repartition(numPartitions) 1284 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1285
1286 - def coalesce(self, numPartitions, shuffle=False):
1287 """ 1288 Return a new RDD that is reduced into `numPartitions` partitions. 1289 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 1290 [[1], [2, 3], [4, 5]] 1291 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 1292 [[1, 2, 3, 4, 5]] 1293 """ 1294 jrdd = self._jrdd.coalesce(numPartitions) 1295 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1296
1297 - def zip(self, other):
1298 """ 1299 Zips this RDD with another one, returning key-value pairs with the first element in each RDD 1300 second element in each RDD, etc. Assumes that the two RDDs have the same number of 1301 partitions and the same number of elements in each partition (e.g. one was made through 1302 a map on the other). 1303 1304 >>> x = sc.parallelize(range(0,5)) 1305 >>> y = sc.parallelize(range(1000, 1005)) 1306 >>> x.zip(y).collect() 1307 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 1308 """ 1309 pairRDD = self._jrdd.zip(other._jrdd) 1310 deserializer = PairDeserializer(self._jrdd_deserializer, 1311 other._jrdd_deserializer) 1312 return RDD(pairRDD, self.ctx, deserializer)
1313
1314 - def name(self):
1315 """ 1316 Return the name of this RDD. 1317 """ 1318 name_ = self._jrdd.name() 1319 if not name_: 1320 return None 1321 return name_.encode('utf-8')
1322
1323 - def setName(self, name):
1324 """ 1325 Assign a name to this RDD. 1326 >>> rdd1 = sc.parallelize([1,2]) 1327 >>> rdd1.setName('RDD1') 1328 >>> rdd1.name() 1329 'RDD1' 1330 """ 1331 self._jrdd.setName(name)
1332
1333 - def toDebugString(self):
1334 """ 1335 A description of this RDD and its recursive dependencies for debugging. 1336 """ 1337 debug_string = self._jrdd.toDebugString() 1338 if not debug_string: 1339 return None 1340 return debug_string.encode('utf-8')
1341
1342 - def getStorageLevel(self):
1343 """ 1344 Get the RDD's current storage level. 1345 >>> rdd1 = sc.parallelize([1,2]) 1346 >>> rdd1.getStorageLevel() 1347 StorageLevel(False, False, False, False, 1) 1348 """ 1349 java_storage_level = self._jrdd.getStorageLevel() 1350 storage_level = StorageLevel(java_storage_level.useDisk(), 1351 java_storage_level.useMemory(), 1352 java_storage_level.useOffHeap(), 1353 java_storage_level.deserialized(), 1354 java_storage_level.replication()) 1355 return storage_level
1356
1357 # TODO: `lookup` is disabled because we can't make direct comparisons based 1358 # on the key; we need to compare the hash of the key to the hash of the 1359 # keys in the pairs. This could be an expensive operation, since those 1360 # hashes aren't retained. 1361 1362 -class PipelinedRDD(RDD):
1363 """ 1364 Pipelined maps: 1365 >>> rdd = sc.parallelize([1, 2, 3, 4]) 1366 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 1367 [4, 8, 12, 16] 1368 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 1369 [4, 8, 12, 16] 1370 1371 Pipelined reduces: 1372 >>> from operator import add 1373 >>> rdd.map(lambda x: 2 * x).reduce(add) 1374 20 1375 >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 1376 20 1377 """
1378 - def __init__(self, prev, func, preservesPartitioning=False):
1379 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 1380 # This transformation is the first in its stage: 1381 self.func = func 1382 self.preservesPartitioning = preservesPartitioning 1383 self._prev_jrdd = prev._jrdd 1384 self._prev_jrdd_deserializer = prev._jrdd_deserializer 1385 else: 1386 prev_func = prev.func 1387 def pipeline_func(split, iterator): 1388 return func(split, prev_func(split, iterator))
1389 self.func = pipeline_func 1390 self.preservesPartitioning = \ 1391 prev.preservesPartitioning and preservesPartitioning 1392 self._prev_jrdd = prev._prev_jrdd # maintain the pipeline 1393 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 1394 self.is_cached = False 1395 self.is_checkpointed = False 1396 self.ctx = prev.ctx 1397 self.prev = prev 1398 self._jrdd_val = None 1399 self._jrdd_deserializer = self.ctx.serializer 1400 self._bypass_serializer = False
1401 1402 @property
1403 - def _jrdd(self):
1404 if self._jrdd_val: 1405 return self._jrdd_val 1406 if self._bypass_serializer: 1407 serializer = NoOpSerializer() 1408 else: 1409 serializer = self.ctx.serializer 1410 command = (self.func, self._prev_jrdd_deserializer, serializer) 1411 pickled_command = CloudPickleSerializer().dumps(command) 1412 broadcast_vars = ListConverter().convert( 1413 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 1414 self.ctx._gateway._gateway_client) 1415 self.ctx._pickled_broadcast_vars.clear() 1416 class_tag = self._prev_jrdd.classTag() 1417 env = MapConverter().convert(self.ctx.environment, 1418 self.ctx._gateway._gateway_client) 1419 includes = ListConverter().convert(self.ctx._python_includes, 1420 self.ctx._gateway._gateway_client) 1421 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 1422 bytearray(pickled_command), env, includes, self.preservesPartitioning, 1423 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, 1424 class_tag) 1425 self._jrdd_val = python_rdd.asJavaRDD() 1426 return self._jrdd_val
1427
1428 - def _is_pipelinable(self):
1429 return not (self.is_cached or self.is_checkpointed)
1430
1431 1432 -def _test():
1433 import doctest 1434 from pyspark.context import SparkContext 1435 globs = globals().copy() 1436 # The small batch size here ensures that we see multiple batches, 1437 # even in these small test examples: 1438 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 1439 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 1440 globs['sc'].stop() 1441 if failure_count: 1442 exit(-1)
1443 1444 1445 if __name__ == "__main__": 1446 _test() 1447