# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # __all__ = ["SparkConf"] import sys from typing import Dict, List, Optional, Tuple, cast, overload from py4j.java_gateway import JVMView, JavaObject [docs]class SparkConf: """ Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. Most of the time, you would create a SparkConf object with ``SparkConf()``, which will load values from `spark.*` Java system properties as well. In this case, any parameters you set directly on the :class:`SparkConf` object take priority over system properties. For unit tests, you can also call ``SparkConf(false)`` to skip loading external settings and get the same configuration no matter what the system properties are. All setter methods in this class support chaining. For example, you can write ``conf.setMaster("local").setAppName("My app")``. Parameters ---------- loadDefaults : bool whether to load values from Java system properties (True by default) _jvm : class:`py4j.java_gateway.JVMView` internal parameter used to pass a handle to the Java VM; does not need to be set by users _jconf : class:`py4j.java_gateway.JavaObject` Optionally pass in an existing SparkConf handle to use its parameters Notes ----- Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user. Examples -------- >>> from pyspark.conf import SparkConf >>> from pyspark.context import SparkContext >>> conf = SparkConf() >>> conf.setMaster("local").setAppName("My app") <pyspark.conf.SparkConf object at ...> >>> conf.get("spark.master") 'local' >>> conf.get("spark.app.name") 'My app' >>> sc = SparkContext(conf=conf) >>> sc.master 'local' >>> sc.appName 'My app' >>> sc.sparkHome is None True >>> conf = SparkConf(loadDefaults=False) >>> conf.setSparkHome("/path") <pyspark.conf.SparkConf object at ...> >>> conf.get("spark.home") '/path' >>> conf.setExecutorEnv("VAR1", "value1") <pyspark.conf.SparkConf object at ...> >>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")]) <pyspark.conf.SparkConf object at ...> >>> conf.get("spark.executorEnv.VAR1") 'value1' >>> print(conf.toDebugString()) spark.executorEnv.VAR1=value1 spark.executorEnv.VAR3=value3 spark.executorEnv.VAR4=value4 spark.home=/path >>> for p in sorted(conf.getAll(), key=lambda p: p[0]): ... print(p) ('spark.executorEnv.VAR1', 'value1') ('spark.executorEnv.VAR3', 'value3') ('spark.executorEnv.VAR4', 'value4') ('spark.home', '/path') >>> conf._jconf.setExecutorEnv("VAR5", "value5") JavaObject id... >>> print(conf.toDebugString()) spark.executorEnv.VAR1=value1 spark.executorEnv.VAR3=value3 spark.executorEnv.VAR4=value4 spark.executorEnv.VAR5=value5 spark.home=/path """ _jconf: Optional[JavaObject] _conf: Optional[Dict[str, str]] def __init__( self, loadDefaults: bool = True, _jvm: Optional[JVMView] = None, _jconf: Optional[JavaObject] = None, ): """ Create a new Spark configuration. """ if _jconf: self._jconf = _jconf else: from pyspark.context import SparkContext _jvm = _jvm or SparkContext._jvm if _jvm is not None: # JVM is created, so create self._jconf directly through JVM self._jconf = _jvm.SparkConf(loadDefaults) self._conf = None else: # JVM is not created, so store data in self._conf first self._jconf = None self._conf = {} [docs] def set(self, key: str, value: str) -> "SparkConf": """Set a configuration property.""" # Try to set self._jconf first if JVM is created, set self._conf if JVM is not created yet. if self._jconf is not None: self._jconf.set(key, str(value)) else: assert self._conf is not None self._conf[key] = str(value) return self [docs] def setIfMissing(self, key: str, value: str) -> "SparkConf": """Set a configuration property, if not already set.""" if self.get(key) is None: self.set(key, value) return self [docs] def setMaster(self, value: str) -> "SparkConf": """Set master URL to connect to.""" self.set("spark.master", value) return self [docs] def setAppName(self, value: str) -> "SparkConf": """Set application name.""" self.set("spark.app.name", value) return self [docs] def setSparkHome(self, value: str) -> "SparkConf": """Set path where Spark is installed on worker nodes.""" self.set("spark.home", value) return self @overload def setExecutorEnv(self, key: str, value: str) -> "SparkConf": ... @overload def setExecutorEnv(self, *, pairs: List[Tuple[str, str]]) -> "SparkConf": ... [docs] def setExecutorEnv( self, key: Optional[str] = None, value: Optional[str] = None, pairs: Optional[List[Tuple[str, str]]] = None, ) -> "SparkConf": """Set an environment variable to be passed to executors.""" if (key is not None and pairs is not None) or (key is None and pairs is None): raise RuntimeError("Either pass one key-value pair or a list of pairs") elif key is not None: self.set("spark.executorEnv.{}".format(key), cast(str, value)) elif pairs is not None: for (k, v) in pairs: self.set("spark.executorEnv.{}".format(k), v) return self [docs] def setAll(self, pairs: List[Tuple[str, str]]) -> "SparkConf": """ Set multiple parameters, passed as a list of key-value pairs. Parameters ---------- pairs : iterable of tuples list of key-value pairs to set """ for (k, v) in pairs: self.set(k, v) return self @overload def get(self, key: str) -> Optional[str]: ... @overload def get(self, key: str, defaultValue: None) -> Optional[str]: ... @overload def get(self, key: str, defaultValue: str) -> str: ... [docs] def get(self, key: str, defaultValue: Optional[str] = None) -> Optional[str]: """Get the configured value for some key, or return a default otherwise.""" if defaultValue is None: # Py4J doesn't call the right get() if we pass None if self._jconf is not None: if not self._jconf.contains(key): return None return self._jconf.get(key) else: assert self._conf is not None return self._conf.get(key, None) else: if self._jconf is not None: return self._jconf.get(key, defaultValue) else: assert self._conf is not None return self._conf.get(key, defaultValue) [docs] def getAll(self) -> List[Tuple[str, str]]: """Get all values as a list of key-value pairs.""" if self._jconf is not None: return [(elem._1(), elem._2()) for elem in cast(JavaObject, self._jconf).getAll()] else: assert self._conf is not None return list(self._conf.items()) [docs] def contains(self, key: str) -> bool: """Does this configuration contain a given key?""" if self._jconf is not None: return self._jconf.contains(key) else: assert self._conf is not None return key in self._conf [docs] def toDebugString(self) -> str: """ Returns a printable version of the configuration, as a list of key=value pairs, one per line. """ if self._jconf is not None: return self._jconf.toDebugString() else: assert self._conf is not None return "\n".join("%s=%s" % (k, v) for k, v in self._conf.items()) def _test() -> None: import doctest (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) if failure_count: sys.exit(-1) if __name__ == "__main__": _test()