1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import os
19 import shutil
20 import sys
21 from threading import Lock
22 from tempfile import NamedTemporaryFile
23 from collections import namedtuple
24
25 from pyspark import accumulators
26 from pyspark.accumulators import Accumulator
27 from pyspark.broadcast import Broadcast
28 from pyspark.conf import SparkConf
29 from pyspark.files import SparkFiles
30 from pyspark.java_gateway import launch_gateway
31 from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
32 PairDeserializer, CompressedSerializer
33 from pyspark.storagelevel import StorageLevel
34 from pyspark import rdd
35 from pyspark.rdd import RDD
36
37 from py4j.java_collections import ListConverter
38
39
40
41
42 DEFAULT_CONFIGS = {
43 "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
44 "spark.serializer.objectStreamReset": 100,
45 "spark.rdd.compress": True,
46 }
47
48
49 -class SparkContext(object):
50
51 """
52 Main entry point for Spark functionality. A SparkContext represents the
53 connection to a Spark cluster, and can be used to create L{RDD}s and
54 broadcast variables on that cluster.
55 """
56
57 _gateway = None
58 _jvm = None
59 _writeToFile = None
60 _next_accum_id = 0
61 _active_spark_context = None
62 _lock = Lock()
63 _python_includes = None
64 _default_batch_size_for_serialized_input = 10
65
66 - def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
67 environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
68 gateway=None):
69 """
70 Create a new SparkContext. At least the master and app name should be set,
71 either through the named parameters here or through C{conf}.
72
73 @param master: Cluster URL to connect to
74 (e.g. mesos://host:port, spark://host:port, local[4]).
75 @param appName: A name for your job, to display on the cluster web UI.
76 @param sparkHome: Location where Spark is installed on cluster nodes.
77 @param pyFiles: Collection of .zip or .py files to send to the cluster
78 and add to PYTHONPATH. These can be paths on the local file
79 system or HDFS, HTTP, HTTPS, or FTP URLs.
80 @param environment: A dictionary of environment variables to set on
81 worker nodes.
82 @param batchSize: The number of Python objects represented as a single
83 Java object. Set 1 to disable batching or -1 to use an
84 unlimited batch size.
85 @param serializer: The serializer for RDDs.
86 @param conf: A L{SparkConf} object setting Spark properties.
87 @param gateway: Use an existing gateway and JVM, otherwise a new JVM
88 will be instantiated.
89
90
91 >>> from pyspark.context import SparkContext
92 >>> sc = SparkContext('local', 'test')
93
94 >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL
95 Traceback (most recent call last):
96 ...
97 ValueError:...
98 """
99 if rdd._extract_concise_traceback() is not None:
100 self._callsite = rdd._extract_concise_traceback()
101 else:
102 tempNamedTuple = namedtuple("Callsite", "function file linenum")
103 self._callsite = tempNamedTuple(function=None, file=None, linenum=None)
104 SparkContext._ensure_initialized(self, gateway=gateway)
105 try:
106 self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
107 conf)
108 except:
109
110 self.stop()
111 raise
112
113 - def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
114 conf):
115 self.environment = environment or {}
116 self._conf = conf or SparkConf(_jvm=self._jvm)
117 self._batchSize = batchSize
118 self._unbatched_serializer = serializer
119 if batchSize == 1:
120 self.serializer = self._unbatched_serializer
121 else:
122 self.serializer = BatchedSerializer(self._unbatched_serializer,
123 batchSize)
124
125
126 if master:
127 self._conf.setMaster(master)
128 if appName:
129 self._conf.setAppName(appName)
130 if sparkHome:
131 self._conf.setSparkHome(sparkHome)
132 if environment:
133 for key, value in environment.iteritems():
134 self._conf.setExecutorEnv(key, value)
135 for key, value in DEFAULT_CONFIGS.items():
136 self._conf.setIfMissing(key, value)
137
138
139 if not self._conf.contains("spark.master"):
140 raise Exception("A master URL must be set in your configuration")
141 if not self._conf.contains("spark.app.name"):
142 raise Exception("An application name must be set in your configuration")
143
144
145
146 self.master = self._conf.get("spark.master")
147 self.appName = self._conf.get("spark.app.name")
148 self.sparkHome = self._conf.get("spark.home", None)
149 for (k, v) in self._conf.getAll():
150 if k.startswith("spark.executorEnv."):
151 varName = k[len("spark.executorEnv."):]
152 self.environment[varName] = v
153
154
155 self._jsc = self._initialize_context(self._conf._jconf)
156
157
158
159 self._accumulatorServer = accumulators._start_update_server()
160 (host, port) = self._accumulatorServer.server_address
161 self._javaAccumulator = self._jsc.accumulator(
162 self._jvm.java.util.ArrayList(),
163 self._jvm.PythonAccumulatorParam(host, port))
164
165 self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
166
167
168
169
170
171 self._pickled_broadcast_vars = set()
172
173 SparkFiles._sc = self
174 root_dir = SparkFiles.getRootDirectory()
175 sys.path.append(root_dir)
176
177
178 self._python_includes = list()
179 for path in (pyFiles or []):
180 self.addPyFile(path)
181
182
183
184 for path in self._conf.get("spark.submit.pyFiles", "").split(","):
185 if path != "":
186 (dirname, filename) = os.path.split(path)
187 self._python_includes.append(filename)
188 sys.path.append(path)
189 if dirname not in sys.path:
190 sys.path.append(dirname)
191
192
193 local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
194 self._temp_dir = \
195 self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
196
197 - def _initialize_context(self, jconf):
198 """
199 Initialize SparkContext in function to allow subclass specific initialization
200 """
201 return self._jvm.JavaSparkContext(jconf)
202
203 @classmethod
204 - def _ensure_initialized(cls, instance=None, gateway=None):
205 """
206 Checks whether a SparkContext is initialized or not.
207 Throws error if a SparkContext is already running.
208 """
209 with SparkContext._lock:
210 if not SparkContext._gateway:
211 SparkContext._gateway = gateway or launch_gateway()
212 SparkContext._jvm = SparkContext._gateway.jvm
213 SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
214
215 if instance:
216 if (SparkContext._active_spark_context and
217 SparkContext._active_spark_context != instance):
218 currentMaster = SparkContext._active_spark_context.master
219 currentAppName = SparkContext._active_spark_context.appName
220 callsite = SparkContext._active_spark_context._callsite
221
222
223 raise ValueError(
224 "Cannot run multiple SparkContexts at once; "
225 "existing SparkContext(app=%s, master=%s)"
226 " created by %s at %s:%s "
227 % (currentAppName, currentMaster,
228 callsite.function, callsite.file, callsite.linenum))
229 else:
230 SparkContext._active_spark_context = instance
231
232 @classmethod
233 - def setSystemProperty(cls, key, value):
234 """
235 Set a Java system property, such as spark.executor.memory. This must
236 must be invoked before instantiating SparkContext.
237 """
238 SparkContext._ensure_initialized()
239 SparkContext._jvm.java.lang.System.setProperty(key, value)
240
241 @property
243 """
244 The version of Spark on which this application is running.
245 """
246 return self._jsc.version()
247
248 @property
250 """
251 Default level of parallelism to use when not given by user (e.g. for
252 reduce tasks)
253 """
254 return self._jsc.sc().defaultParallelism()
255
256 @property
258 """
259 Default min number of partitions for Hadoop RDDs when not given by user
260 """
261 return self._jsc.sc().defaultMinPartitions()
262
264 """
265 Shut down the SparkContext.
266 """
267 if getattr(self, "_jsc", None):
268 self._jsc.stop()
269 self._jsc = None
270 if getattr(self, "_accumulatorServer", None):
271 self._accumulatorServer.shutdown()
272 self._accumulatorServer = None
273 with SparkContext._lock:
274 SparkContext._active_spark_context = None
275
276 - def parallelize(self, c, numSlices=None):
277 """
278 Distribute a local Python collection to form an RDD.
279
280 >>> sc.parallelize(range(5), 5).glom().collect()
281 [[0], [1], [2], [3], [4]]
282 """
283 numSlices = numSlices or self.defaultParallelism
284
285
286
287 tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
288
289 if "__len__" not in dir(c):
290 c = list(c)
291 batchSize = min(len(c) // numSlices, self._batchSize)
292 if batchSize > 1:
293 serializer = BatchedSerializer(self._unbatched_serializer,
294 batchSize)
295 else:
296 serializer = self._unbatched_serializer
297 serializer.dump_stream(c, tempFile)
298 tempFile.close()
299 readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
300 jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
301 return RDD(jrdd, self, serializer)
302
303 - def pickleFile(self, name, minPartitions=None):
304 """
305 Load an RDD previously saved using L{RDD.saveAsPickleFile} method.
306
307 >>> tmpFile = NamedTemporaryFile(delete=True)
308 >>> tmpFile.close()
309 >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
310 >>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
311 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
312 """
313 minPartitions = minPartitions or self.defaultMinPartitions
314 return RDD(self._jsc.objectFile(name, minPartitions), self,
315 BatchedSerializer(PickleSerializer()))
316
317 - def textFile(self, name, minPartitions=None):
318 """
319 Read a text file from HDFS, a local file system (available on all
320 nodes), or any Hadoop-supported file system URI, and return it as an
321 RDD of Strings.
322
323 >>> path = os.path.join(tempdir, "sample-text.txt")
324 >>> with open(path, "w") as testFile:
325 ... testFile.write("Hello world!")
326 >>> textFile = sc.textFile(path)
327 >>> textFile.collect()
328 [u'Hello world!']
329 """
330 minPartitions = minPartitions or min(self.defaultParallelism, 2)
331 return RDD(self._jsc.textFile(name, minPartitions), self,
332 UTF8Deserializer())
333
334 - def wholeTextFiles(self, path, minPartitions=None):
335 """
336 Read a directory of text files from HDFS, a local file system
337 (available on all nodes), or any Hadoop-supported file system
338 URI. Each file is read as a single record and returned in a
339 key-value pair, where the key is the path of each file, the
340 value is the content of each file.
341
342 For example, if you have the following files::
343
344 hdfs://a-hdfs-path/part-00000
345 hdfs://a-hdfs-path/part-00001
346 ...
347 hdfs://a-hdfs-path/part-nnnnn
348
349 Do C{rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")},
350 then C{rdd} contains::
351
352 (a-hdfs-path/part-00000, its content)
353 (a-hdfs-path/part-00001, its content)
354 ...
355 (a-hdfs-path/part-nnnnn, its content)
356
357 NOTE: Small files are preferred, as each file will be loaded
358 fully in memory.
359
360 >>> dirPath = os.path.join(tempdir, "files")
361 >>> os.mkdir(dirPath)
362 >>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
363 ... file1.write("1")
364 >>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
365 ... file2.write("2")
366 >>> textFiles = sc.wholeTextFiles(dirPath)
367 >>> sorted(textFiles.collect())
368 [(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]
369 """
370 minPartitions = minPartitions or self.defaultMinPartitions
371 return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
372 PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
373
374 - def _dictToJavaMap(self, d):
375 jm = self._jvm.java.util.HashMap()
376 if not d:
377 d = {}
378 for k, v in d.iteritems():
379 jm[k] = v
380 return jm
381
382 - def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
383 valueConverter=None, minSplits=None, batchSize=None):
384 """
385 Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
386 a local file system (available on all nodes), or any Hadoop-supported file system URI.
387 The mechanism is as follows:
388 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key
389 and value Writable classes
390 2. Serialization is attempted via Pyrolite pickling
391 3. If this fails, the fallback is to call 'toString' on each key and value
392 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side
393
394 @param path: path to sequncefile
395 @param keyClass: fully qualified classname of key Writable class
396 (e.g. "org.apache.hadoop.io.Text")
397 @param valueClass: fully qualified classname of value Writable class
398 (e.g. "org.apache.hadoop.io.LongWritable")
399 @param keyConverter:
400 @param valueConverter:
401 @param minSplits: minimum splits in dataset
402 (default min(2, sc.defaultParallelism))
403 @param batchSize: The number of Python objects represented as a single
404 Java object. (default sc._default_batch_size_for_serialized_input)
405 """
406 minSplits = minSplits or min(self.defaultParallelism, 2)
407 batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
408 ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
409 jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
410 keyConverter, valueConverter, minSplits, batchSize)
411 return RDD(jrdd, self, ser)
412
413 - def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
414 valueConverter=None, conf=None, batchSize=None):
415 """
416 Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
417 a local file system (available on all nodes), or any Hadoop-supported file system URI.
418 The mechanism is the same as for sc.sequenceFile.
419
420 A Hadoop configuration can be passed in as a Python dict. This will be converted into a
421 Configuration in Java
422
423 @param path: path to Hadoop file
424 @param inputFormatClass: fully qualified classname of Hadoop InputFormat
425 (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
426 @param keyClass: fully qualified classname of key Writable class
427 (e.g. "org.apache.hadoop.io.Text")
428 @param valueClass: fully qualified classname of value Writable class
429 (e.g. "org.apache.hadoop.io.LongWritable")
430 @param keyConverter: (None by default)
431 @param valueConverter: (None by default)
432 @param conf: Hadoop configuration, passed in as a dict
433 (None by default)
434 @param batchSize: The number of Python objects represented as a single
435 Java object. (default sc._default_batch_size_for_serialized_input)
436 """
437 jconf = self._dictToJavaMap(conf)
438 batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
439 ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
440 jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
441 valueClass, keyConverter, valueConverter,
442 jconf, batchSize)
443 return RDD(jrdd, self, ser)
444
445 - def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
446 valueConverter=None, conf=None, batchSize=None):
447 """
448 Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
449 Hadoop configuration, which is passed in as a Python dict.
450 This will be converted into a Configuration in Java.
451 The mechanism is the same as for sc.sequenceFile.
452
453 @param inputFormatClass: fully qualified classname of Hadoop InputFormat
454 (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
455 @param keyClass: fully qualified classname of key Writable class
456 (e.g. "org.apache.hadoop.io.Text")
457 @param valueClass: fully qualified classname of value Writable class
458 (e.g. "org.apache.hadoop.io.LongWritable")
459 @param keyConverter: (None by default)
460 @param valueConverter: (None by default)
461 @param conf: Hadoop configuration, passed in as a dict
462 (None by default)
463 @param batchSize: The number of Python objects represented as a single
464 Java object. (default sc._default_batch_size_for_serialized_input)
465 """
466 jconf = self._dictToJavaMap(conf)
467 batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
468 ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
469 jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
470 valueClass, keyConverter, valueConverter,
471 jconf, batchSize)
472 return RDD(jrdd, self, ser)
473
474 - def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
475 valueConverter=None, conf=None, batchSize=None):
476 """
477 Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
478 a local file system (available on all nodes), or any Hadoop-supported file system URI.
479 The mechanism is the same as for sc.sequenceFile.
480
481 A Hadoop configuration can be passed in as a Python dict. This will be converted into a
482 Configuration in Java.
483
484 @param path: path to Hadoop file
485 @param inputFormatClass: fully qualified classname of Hadoop InputFormat
486 (e.g. "org.apache.hadoop.mapred.TextInputFormat")
487 @param keyClass: fully qualified classname of key Writable class
488 (e.g. "org.apache.hadoop.io.Text")
489 @param valueClass: fully qualified classname of value Writable class
490 (e.g. "org.apache.hadoop.io.LongWritable")
491 @param keyConverter: (None by default)
492 @param valueConverter: (None by default)
493 @param conf: Hadoop configuration, passed in as a dict
494 (None by default)
495 @param batchSize: The number of Python objects represented as a single
496 Java object. (default sc._default_batch_size_for_serialized_input)
497 """
498 jconf = self._dictToJavaMap(conf)
499 batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
500 ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
501 jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
502 valueClass, keyConverter, valueConverter,
503 jconf, batchSize)
504 return RDD(jrdd, self, ser)
505
506 - def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
507 valueConverter=None, conf=None, batchSize=None):
508 """
509 Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
510 Hadoop configuration, which is passed in as a Python dict.
511 This will be converted into a Configuration in Java.
512 The mechanism is the same as for sc.sequenceFile.
513
514 @param inputFormatClass: fully qualified classname of Hadoop InputFormat
515 (e.g. "org.apache.hadoop.mapred.TextInputFormat")
516 @param keyClass: fully qualified classname of key Writable class
517 (e.g. "org.apache.hadoop.io.Text")
518 @param valueClass: fully qualified classname of value Writable class
519 (e.g. "org.apache.hadoop.io.LongWritable")
520 @param keyConverter: (None by default)
521 @param valueConverter: (None by default)
522 @param conf: Hadoop configuration, passed in as a dict
523 (None by default)
524 @param batchSize: The number of Python objects represented as a single
525 Java object. (default sc._default_batch_size_for_serialized_input)
526 """
527 jconf = self._dictToJavaMap(conf)
528 batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
529 ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
530 jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass,
531 valueClass, keyConverter, valueConverter,
532 jconf, batchSize)
533 return RDD(jrdd, self, ser)
534
535 - def _checkpointFile(self, name, input_deserializer):
536 jrdd = self._jsc.checkpointFile(name)
537 return RDD(jrdd, self, input_deserializer)
538
539 - def union(self, rdds):
540 """
541 Build the union of a list of RDDs.
542
543 This supports unions() of RDDs with different serialized formats,
544 although this forces them to be reserialized using the default
545 serializer:
546
547 >>> path = os.path.join(tempdir, "union-text.txt")
548 >>> with open(path, "w") as testFile:
549 ... testFile.write("Hello")
550 >>> textFile = sc.textFile(path)
551 >>> textFile.collect()
552 [u'Hello']
553 >>> parallelized = sc.parallelize(["World!"])
554 >>> sorted(sc.union([textFile, parallelized]).collect())
555 [u'Hello', 'World!']
556 """
557 first_jrdd_deserializer = rdds[0]._jrdd_deserializer
558 if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds):
559 rdds = [x._reserialize() for x in rdds]
560 first = rdds[0]._jrdd
561 rest = [x._jrdd for x in rdds[1:]]
562 rest = ListConverter().convert(rest, self._gateway._gateway_client)
563 return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer)
564
565 - def broadcast(self, value):
566 """
567 Broadcast a read-only variable to the cluster, returning a
568 L{Broadcast<pyspark.broadcast.Broadcast>}
569 object for reading it in distributed functions. The variable will
570 be sent to each cluster only once.
571 """
572 ser = CompressedSerializer(PickleSerializer())
573
574 tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
575 ser.dump_stream([value], tempFile)
576 tempFile.close()
577 jbroadcast = self._jvm.PythonRDD.readBroadcastFromFile(self._jsc, tempFile.name)
578 return Broadcast(jbroadcast.id(), None, jbroadcast,
579 self._pickled_broadcast_vars, tempFile.name)
580
581 - def accumulator(self, value, accum_param=None):
582 """
583 Create an L{Accumulator} with the given initial value, using a given
584 L{AccumulatorParam} helper object to define how to add values of the
585 data type if provided. Default AccumulatorParams are used for integers
586 and floating-point numbers if you do not provide one. For other types,
587 a custom AccumulatorParam can be used.
588 """
589 if accum_param is None:
590 if isinstance(value, int):
591 accum_param = accumulators.INT_ACCUMULATOR_PARAM
592 elif isinstance(value, float):
593 accum_param = accumulators.FLOAT_ACCUMULATOR_PARAM
594 elif isinstance(value, complex):
595 accum_param = accumulators.COMPLEX_ACCUMULATOR_PARAM
596 else:
597 raise Exception("No default accumulator param for type %s" % type(value))
598 SparkContext._next_accum_id += 1
599 return Accumulator(SparkContext._next_accum_id - 1, value, accum_param)
600
601 - def addFile(self, path):
602 """
603 Add a file to be downloaded with this Spark job on every node.
604 The C{path} passed can be either a local file, a file in HDFS
605 (or other Hadoop-supported filesystems), or an HTTP, HTTPS or
606 FTP URI.
607
608 To access the file in Spark jobs, use
609 L{SparkFiles.get(path)<pyspark.files.SparkFiles.get>} to find its
610 download location.
611
612 >>> from pyspark import SparkFiles
613 >>> path = os.path.join(tempdir, "test.txt")
614 >>> with open(path, "w") as testFile:
615 ... testFile.write("100")
616 >>> sc.addFile(path)
617 >>> def func(iterator):
618 ... with open(SparkFiles.get("test.txt")) as testFile:
619 ... fileVal = int(testFile.readline())
620 ... return [x * fileVal for x in iterator]
621 >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
622 [100, 200, 300, 400]
623 """
624 self._jsc.sc().addFile(path)
625
626 - def clearFiles(self):
627 """
628 Clear the job's list of files added by L{addFile} or L{addPyFile} so
629 that they do not get downloaded to any new nodes.
630 """
631
632 self._jsc.sc().clearFiles()
633
634 - def addPyFile(self, path):
635 """
636 Add a .py or .zip dependency for all tasks to be executed on this
637 SparkContext in the future. The C{path} passed can be either a local
638 file, a file in HDFS (or other Hadoop-supported filesystems), or an
639 HTTP, HTTPS or FTP URI.
640 """
641 self.addFile(path)
642 (dirname, filename) = os.path.split(path)
643
644 if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
645 self._python_includes.append(filename)
646
647 sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename))
648
649 - def setCheckpointDir(self, dirName):
650 """
651 Set the directory under which RDDs are going to be checkpointed. The
652 directory must be a HDFS path if running on a cluster.
653 """
654 self._jsc.sc().setCheckpointDir(dirName)
655
656 - def _getJavaStorageLevel(self, storageLevel):
657 """
658 Returns a Java StorageLevel based on a pyspark.StorageLevel.
659 """
660 if not isinstance(storageLevel, StorageLevel):
661 raise Exception("storageLevel must be of type pyspark.StorageLevel")
662
663 newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
664 return newStorageLevel(storageLevel.useDisk,
665 storageLevel.useMemory,
666 storageLevel.useOffHeap,
667 storageLevel.deserialized,
668 storageLevel.replication)
669
670 - def setJobGroup(self, groupId, description, interruptOnCancel=False):
671 """
672 Assigns a group ID to all the jobs started by this thread until the group ID is set to a
673 different value or cleared.
674
675 Often, a unit of execution in an application consists of multiple Spark actions or jobs.
676 Application programmers can use this method to group all those jobs together and give a
677 group description. Once set, the Spark web UI will associate such jobs with this group.
678
679 The application can use L{SparkContext.cancelJobGroup} to cancel all
680 running jobs in this group.
681
682 >>> import thread, threading
683 >>> from time import sleep
684 >>> result = "Not Set"
685 >>> lock = threading.Lock()
686 >>> def map_func(x):
687 ... sleep(100)
688 ... raise Exception("Task should have been cancelled")
689 >>> def start_job(x):
690 ... global result
691 ... try:
692 ... sc.setJobGroup("job_to_cancel", "some description")
693 ... result = sc.parallelize(range(x)).map(map_func).collect()
694 ... except Exception as e:
695 ... result = "Cancelled"
696 ... lock.release()
697 >>> def stop_job():
698 ... sleep(5)
699 ... sc.cancelJobGroup("job_to_cancel")
700 >>> supress = lock.acquire()
701 >>> supress = thread.start_new_thread(start_job, (10,))
702 >>> supress = thread.start_new_thread(stop_job, tuple())
703 >>> supress = lock.acquire()
704 >>> print result
705 Cancelled
706
707 If interruptOnCancel is set to true for the job group, then job cancellation will result
708 in Thread.interrupt() being called on the job's executor threads. This is useful to help
709 ensure that the tasks are actually stopped in a timely manner, but is off by default due
710 to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
711 """
712 self._jsc.setJobGroup(groupId, description, interruptOnCancel)
713
714 - def setLocalProperty(self, key, value):
715 """
716 Set a local property that affects jobs submitted from this thread, such as the
717 Spark fair scheduler pool.
718 """
719 self._jsc.setLocalProperty(key, value)
720
721 - def getLocalProperty(self, key):
722 """
723 Get a local property set in this thread, or null if it is missing. See
724 L{setLocalProperty}
725 """
726 return self._jsc.getLocalProperty(key)
727
728 - def sparkUser(self):
729 """
730 Get SPARK_USER for user who is running SparkContext.
731 """
732 return self._jsc.sc().sparkUser()
733
734 - def cancelJobGroup(self, groupId):
735 """
736 Cancel active jobs for the specified group. See L{SparkContext.setJobGroup}
737 for more information.
738 """
739 self._jsc.sc().cancelJobGroup(groupId)
740
741 - def cancelAllJobs(self):
742 """
743 Cancel all jobs that have been scheduled or are running.
744 """
745 self._jsc.sc().cancelAllJobs()
746
747 - def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
748 """
749 Executes the given partitionFunc on the specified set of partitions,
750 returning the result as an array of elements.
751
752 If 'partitions' is not specified, this will run over all partitions.
753
754 >>> myRDD = sc.parallelize(range(6), 3)
755 >>> sc.runJob(myRDD, lambda part: [x * x for x in part])
756 [0, 1, 4, 9, 16, 25]
757
758 >>> myRDD = sc.parallelize(range(6), 3)
759 >>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
760 [0, 1, 16, 25]
761 """
762 if partitions is None:
763 partitions = range(rdd._jrdd.partitions().size())
764 javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client)
765
766
767
768
769 mappedRDD = rdd.mapPartitions(partitionFunc)
770 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
771 return list(mappedRDD._collect_iterator_through_file(it))
772
775 import atexit
776 import doctest
777 import tempfile
778 globs = globals().copy()
779 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
780 globs['tempdir'] = tempfile.mkdtemp()
781 atexit.register(lambda: shutil.rmtree(globs['tempdir']))
782 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
783 globs['sc'].stop()
784 if failure_count:
785 exit(-1)
786
787
788 if __name__ == "__main__":
789 _test()
790