Package pyspark :: Package mllib :: Module random
[frames] | no frames]

Source Code for Module pyspark.mllib.random

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one or more 
  3  # contributor license agreements.  See the NOTICE file distributed with 
  4  # this work for additional information regarding copyright ownership. 
  5  # The ASF licenses this file to You under the Apache License, Version 2.0 
  6  # (the "License"); you may not use this file except in compliance with 
  7  # the License.  You may obtain a copy of the License at 
  8  # 
  9  #    http://www.apache.org/licenses/LICENSE-2.0 
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software 
 12  # distributed under the License is distributed on an "AS IS" BASIS, 
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14  # See the License for the specific language governing permissions and 
 15  # limitations under the License. 
 16  # 
 17   
 18  """ 
 19  Python package for random data generation. 
 20  """ 
 21   
 22   
 23  from pyspark.rdd import RDD 
 24  from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector 
 25  from pyspark.serializers import NoOpSerializer 
26 27 28 -class RandomRDDs:
29 """ 30 Generator methods for creating RDDs comprised of i.i.d samples from 31 some distribution. 32 """ 33 34 @staticmethod
35 - def uniformRDD(sc, size, numPartitions=None, seed=None):
36 """ 37 Generates an RDD comprised of i.i.d. samples from the 38 uniform distribution U(0.0, 1.0). 39 40 To transform the distribution in the generated RDD from U(0.0, 1.0) 41 to U(a, b), use 42 C{RandomRDDs.uniformRDD(sc, n, p, seed)\ 43 .map(lambda v: a + (b - a) * v)} 44 45 >>> x = RandomRDDs.uniformRDD(sc, 100).collect() 46 >>> len(x) 47 100 48 >>> max(x) <= 1.0 and min(x) >= 0.0 49 True 50 >>> RandomRDDs.uniformRDD(sc, 100, 4).getNumPartitions() 51 4 52 >>> parts = RandomRDDs.uniformRDD(sc, 100, seed=4).getNumPartitions() 53 >>> parts == sc.defaultParallelism 54 True 55 """ 56 jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) 57 uniform = RDD(jrdd, sc, NoOpSerializer()) 58 return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes)))
59 60 @staticmethod
61 - def normalRDD(sc, size, numPartitions=None, seed=None):
62 """ 63 Generates an RDD comprised of i.i.d. samples from the standard normal 64 distribution. 65 66 To transform the distribution in the generated RDD from standard normal 67 to some other normal N(mean, sigma^2), use 68 C{RandomRDDs.normal(sc, n, p, seed)\ 69 .map(lambda v: mean + sigma * v)} 70 71 >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L) 72 >>> stats = x.stats() 73 >>> stats.count() 74 1000L 75 >>> abs(stats.mean() - 0.0) < 0.1 76 True 77 >>> abs(stats.stdev() - 1.0) < 0.1 78 True 79 """ 80 jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) 81 normal = RDD(jrdd, sc, NoOpSerializer()) 82 return normal.map(lambda bytes: _deserialize_double(bytearray(bytes)))
83 84 @staticmethod
85 - def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
86 """ 87 Generates an RDD comprised of i.i.d. samples from the Poisson 88 distribution with the input mean. 89 90 >>> mean = 100.0 91 >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=1L) 92 >>> stats = x.stats() 93 >>> stats.count() 94 1000L 95 >>> abs(stats.mean() - mean) < 0.5 96 True 97 >>> from math import sqrt 98 >>> abs(stats.stdev() - sqrt(mean)) < 0.5 99 True 100 """ 101 jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) 102 poisson = RDD(jrdd, sc, NoOpSerializer()) 103 return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes)))
104 105 @staticmethod
106 - def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
107 """ 108 Generates an RDD comprised of vectors containing i.i.d. samples drawn 109 from the uniform distribution U(0.0, 1.0). 110 111 >>> import numpy as np 112 >>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect()) 113 >>> mat.shape 114 (10, 10) 115 >>> mat.max() <= 1.0 and mat.min() >= 0.0 116 True 117 >>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions() 118 4 119 """ 120 jrdd = sc._jvm.PythonMLLibAPI() \ 121 .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) 122 uniform = RDD(jrdd, sc, NoOpSerializer()) 123 return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
124 125 @staticmethod
126 - def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
127 """ 128 Generates an RDD comprised of vectors containing i.i.d. samples drawn 129 from the standard normal distribution. 130 131 >>> import numpy as np 132 >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect()) 133 >>> mat.shape 134 (100, 100) 135 >>> abs(mat.mean() - 0.0) < 0.1 136 True 137 >>> abs(mat.std() - 1.0) < 0.1 138 True 139 """ 140 jrdd = sc._jvm.PythonMLLibAPI() \ 141 .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) 142 normal = RDD(jrdd, sc, NoOpSerializer()) 143 return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
144 145 @staticmethod
146 - def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
147 """ 148 Generates an RDD comprised of vectors containing i.i.d. samples drawn 149 from the Poisson distribution with the input mean. 150 151 >>> import numpy as np 152 >>> mean = 100.0 153 >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L) 154 >>> mat = np.mat(rdd.collect()) 155 >>> mat.shape 156 (100, 100) 157 >>> abs(mat.mean() - mean) < 0.5 158 True 159 >>> from math import sqrt 160 >>> abs(mat.std() - sqrt(mean)) < 0.5 161 True 162 """ 163 jrdd = sc._jvm.PythonMLLibAPI() \ 164 .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) 165 poisson = RDD(jrdd, sc, NoOpSerializer()) 166 return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
167
168 169 -def _test():
170 import doctest 171 from pyspark.context import SparkContext 172 globs = globals().copy() 173 # The small batch size here ensures that we see multiple batches, 174 # even in these small test examples: 175 globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) 176 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 177 globs['sc'].stop() 178 if failure_count: 179 exit(-1)
180 181 182 if __name__ == "__main__": 183 _test() 184