Skip to content

Commit

Permalink
Add tests.
Browse files Browse the repository at this point in the history
Modify test_vectorized_udf_datatype_string not to fail by unrelated error.

closes apache#26
  • Loading branch information
ueshin authored and BryanCutler committed Sep 13, 2017
1 parent 4a2fec2 commit 518126e
Showing 1 changed file with 179 additions and 0 deletions.
179 changes: 179 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3122,6 +3122,185 @@ def test_filtered_frame(self):
self.assertTrue(pdf.empty)


@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
class VectorizedUDFTests(ReusedPySparkTestCase):

@classmethod
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.spark = SparkSession(cls.sc)

@classmethod
def tearDownClass(cls):
ReusedPySparkTestCase.tearDownClass()
cls.spark.stop()

def test_vectorized_udf_basic(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
col('id').cast('int').alias('int'),
col('id').alias('long'),
col('id').cast('float').alias('float'),
col('id').cast('double').alias('double'),
col('id').cast('boolean').alias('bool'))
f = lambda x: x
str_f = pandas_udf(f, StringType())
int_f = pandas_udf(f, IntegerType())
long_f = pandas_udf(f, LongType())
float_f = pandas_udf(f, FloatType())
double_f = pandas_udf(f, DoubleType())
bool_f = pandas_udf(f, BooleanType())
res = df.select(str_f(col('str')), int_f(col('int')),
long_f(col('long')), float_f(col('float')),
double_f(col('double')), bool_f(col('bool')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_boolean(self):
from pyspark.sql.functions import pandas_udf, col
data = [(True,), (True,), (None,), (False,)]
schema = StructType().add("bool", BooleanType())
df = self.spark.createDataFrame(data, schema)
bool_f = pandas_udf(lambda x: x, BooleanType())
res = df.select(bool_f(col('bool')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_byte(self):
from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("byte", ByteType())
df = self.spark.createDataFrame(data, schema)
byte_f = pandas_udf(lambda x: x, ByteType())
res = df.select(byte_f(col('byte')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_short(self):
from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("short", ShortType())
df = self.spark.createDataFrame(data, schema)
short_f = pandas_udf(lambda x: x, ShortType())
res = df.select(short_f(col('short')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_int(self):
from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("int", IntegerType())
df = self.spark.createDataFrame(data, schema)
int_f = pandas_udf(lambda x: x, IntegerType())
res = df.select(int_f(col('int')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_long(self):
from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("long", LongType())
df = self.spark.createDataFrame(data, schema)
long_f = pandas_udf(lambda x: x, LongType())
res = df.select(long_f(col('long')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_float(self):
from pyspark.sql.functions import pandas_udf, col
data = [(3.0,), (5.0,), (-1.0,), (None,)]
schema = StructType().add("float", FloatType())
df = self.spark.createDataFrame(data, schema)
float_f = pandas_udf(lambda x: x, FloatType())
res = df.select(float_f(col('float')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_double(self):
from pyspark.sql.functions import pandas_udf, col
data = [(3.0,), (5.0,), (-1.0,), (None,)]
schema = StructType().add("double", DoubleType())
df = self.spark.createDataFrame(data, schema)
double_f = pandas_udf(lambda x: x, DoubleType())
res = df.select(double_f(col('double')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_string(self):
from pyspark.sql.functions import pandas_udf, col
data = [("foo",), (None,), ("bar",), ("bar",)]
schema = StructType().add("str", StringType())
df = self.spark.createDataFrame(data, schema)
str_f = pandas_udf(lambda x: x, StringType())
res = df.select(str_f(col('str')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_zero_parameter(self):
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = self.spark.range(100000)
f0 = pandas_udf(lambda **kwargs: pd.Series(1).repeat(kwargs['size']), LongType())
res = df.select(f0())
self.assertEquals(df.select(lit(1)).collect(), res.collect())

def test_vectorized_udf_datatype_string(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
col('id').cast('int').alias('int'),
col('id').alias('long'),
col('id').cast('float').alias('float'),
col('id').cast('double').alias('double'),
col('id').cast('boolean').alias('bool'))
f = lambda x: x
str_f = pandas_udf(f, 'string')
int_f = pandas_udf(f, 'integer')
long_f = pandas_udf(f, 'long')
float_f = pandas_udf(f, 'float')
double_f = pandas_udf(f, 'double')
bool_f = pandas_udf(f, 'boolean')
res = df.select(str_f(col('str')), int_f(col('int')),
long_f(col('long')), float_f(col('float')),
double_f(col('double')), bool_f(col('bool')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_complex(self):
from pyspark.sql.functions import pandas_udf, col, expr
df = self.spark.range(10).select(
col('id').cast('int').alias('a'),
col('id').cast('int').alias('b'),
col('id').cast('double').alias('c'))
add = pandas_udf(lambda x, y: x + y, IntegerType())
power2 = pandas_udf(lambda x: 2 ** x, IntegerType())
mul = pandas_udf(lambda x, y: x * y, DoubleType())
res = df.select(add(col('a'), col('b')), power2(col('a')), mul(col('b'), col('c')))
expected = df.select(expr('a + b'), expr('power(2, a)'), expr('b * c'))
self.assertEquals(expected.collect(), res.collect())

def test_vectorized_udf_exception(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, 'division( or modulo)? by zero'):
df.select(raise_exception(col('id'))).collect()

def test_vectorized_udf_invalid_length(self):
from pyspark.sql.functions import pandas_udf, col
import pandas as pd
df = self.spark.range(10)
raise_exception = pandas_udf(lambda: pd.Series(1), LongType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'The length of returned value should be the same as input value'):
df.select(raise_exception()).collect()

def test_vectorized_udf_mix_udf(self):
from pyspark.sql.functions import pandas_udf, udf, col
df = self.spark.range(10)
row_by_row_udf = udf(lambda x: x, LongType())
pd_udf = pandas_udf(lambda x: x, LongType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'Can not mix vectorized and non-vectorized UDFs'):
df.select(row_by_row_udf(col('id')), pd_udf(col('id'))).collect()


if __name__ == "__main__":
from pyspark.sql.tests import *
if xmlrunner:
Expand Down

0 comments on commit 518126e

Please sign in to comment.