Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22966][PYTHON][SQL] Python UDFs with returnType=StringType should treat return values of datetime.date or datetime.datetime as unconvertible #20163

Closed
wants to merge 1 commit into from

Conversation

rednaxelafx
Copy link
Contributor

What changes were proposed in this pull request?

Perform appropriate conversions for results coming from Python UDFs that return datetime.date or datetime.datetime.

Before this PR, Pyrolite would unpickle both datetime.date and datetime.datetime into a java.util.Calendar, which Spark SQL doesn't understand, which then leads to incorrect results. An example of such incorrect result is:

>>> py_date = udf(datetime.date)
>>> spark.range(1).select(py_date(lit(2017), lit(10), lit(30))).show(truncate=False)
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|date(2017, 10, 30)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2017,MONTH=9,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=30,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=?,ZONE_OFFSET=?,DST_OFFSET=?]|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

After this PR, the same query above would give correct results:

>>> spark.range(1).select(py_date(lit(2017), lit(10), lit(30))).show(truncate=False)
+------------------+
|date(2017, 10, 30)|
+------------------+
|2017-10-30        |
+------------------+

An explicit non-goal of this PR is to change the behavior of timezone awareness or timezone settings of datetime.datetime objects collected from a DataFrame.
Currently PySpark always returns such datetime.datetime objects as timezone unaware (naive) ones that respect Python's current local timezone (#19607 changed the default behavior for Pandas support but not for plain collect()). This PR does not change that behavior.

How was this patch tested?

Added some unit tests to pyspark.sql.tests for such UDFs, so that

  • datetime.date -> StringType
  • datetime.date -> DateType
  • datetime.datetime -> StringType
  • datetime.datetime -> TimestampType
  • datetime.datetime with non-default timezone
  • datetime.datetime with null timezone (naive datetime)
    cases are covered.

# because the format of the string should be different, depending on the type of the input
# object. So for those two specific types we eagerly convert them to string here, where the
# Python type information is still intact.
if returnType == StringType():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to handle when a python udf returns date or datetime but mark the return type as string?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question, why we need to handle this type conversion? If we expect correct string format, isn't it more reasonable to convert the date/datetime to strings in the udf, instead of adding this conversion implicitly?

@cloud-fan
Copy link
Contributor

cloud-fan commented Jan 5, 2018

LGTM, cc @ueshin @icexelloss is this behavior consistent with pandas UDF?

@icexelloss
Copy link
Contributor

I think Scalar and Group map UDF expect pandas Series of datetime64[ns] (native pandas timestamp type) instead of a pandas Series of datetime.date and datetime.datetime object. I don't think it's necessary to have pandas UDF to work with a pandas Series of datetime.date or datetime.datetime object, as the standard type of timestamp is datetime64[ns] in pandas.

@@ -120,10 +121,18 @@ object EvaluatePython {
case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale)

case (c: Int, DateType) => c
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, as a side note, I think we can make the converter for the type and then reuse it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, separate change obviously.

def coerce_to_str(v):
import datetime
if type(v) == datetime.date or type(v) == datetime.datetime:
return str(v)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's weird that we have a cast here alone ... Can't we register a custom Pyrolite unpickler? Does it make the things more complicated?

@HyukjinKwon
Copy link
Member

Wait .. Isn't this because we failed to call toInternal by the return type? Please give me few days .. will double check tonight.

@SparkQA
Copy link

SparkQA commented Jan 5, 2018

Test build #85709 has finished for PR 20163 at commit ca026d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jan 5, 2018

The problem here seems, returnType is mismatched to the value. In case of DateType, it needs an explicit conversion into integers:

def needConversion(self):
return True

def toInternal(self, d):
if d is not None:
return d.toordinal() - self.EPOCH_ORDINAL

which will be called via in worker.py

if return_type.needConversion():
toInternal = return_type.toInternal
return lambda *a: toInternal(f(*a))
else:
return lambda *a: f(*a)

If the returnType is StringType, then it doesn't need the conversion because Pyrolite and serialization work fine between them (strings) up to my knowledge:

class StringType(AtomicType):
"""String data type.
"""
__metaclass__ = DataTypeSingleton

def needConversion(self):
"""
Does this type need to conversion between Python object and internal SQL object.
This is used to avoid the unnecessary conversion for ArrayType/MapType/StructType.
"""
return False

So, here:

if return_type.needConversion():
toInternal = return_type.toInternal
return lambda *a: toInternal(f(*a))
else:
return lambda *a: f(*a)

we will send the return values as are without conversion, which ends up with datetime.date -> java.util.Calendar as you described in the PR description. Therefore, I don't think the current fix in EvaluatePython.scala is reachable in the reproducer above.

For the fix in Python side in udf.py, I think this is a band-aid fix. To deal with this problem correctly, I believe we should do something like:

diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 146e673ae97..37137e02c08 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -144,6 +144,17 @@ class StringType(AtomicType):

     __metaclass__ = DataTypeSingleton

+    def needConversion(self):
+        return True
+
+    def toInternal(self, v):
+        if v is not None:
+            return str(v)
+
+    def fromInternal(self, v):
+        if v is not None:
+            return str(v)
+

but then this will bring performance regression because str is required to be called every value and we need extra function calls (toInternal). This extra function call could cause performance regression, for example, see both #19246 and #19249.

I am less sure if this is something we should allow. Can we simply document this saying returnType should be compatible with the actual return value?

Please correct me if I missed anything.

@icexelloss
Copy link
Contributor

icexelloss commented Jan 5, 2018

I ran some experiments:

py_date = udf(datetime.date, DateType())
py_timestamp = udf(datetime.datetime, TimestampType())

This works correctly

spark.range(1).select(py_date(lit(2017), lit(10), lit(30))).show()
spark.range(1).select(py_timestamp(lit(2017), lit(10), lit(30))).show()

Result:

+------------------+
|date(2017, 10, 30)|
+------------------+
|        2017-10-30|
+------------------+

+----------------------+
|datetime(2017, 10, 30)|
+----------------------+
|   2017-10-30 00:00:00|
+----------------------+

The change that the PR proposes seem to be coercing python datetime.datetime and datetime.date to the python datetime string representation rather the java one. We could call function str on the return value of the python udf if it's a StringType to get the python string representation, but this probably needs some microbenchmark to see the performance implication.

@rednaxelafx
Copy link
Contributor Author

Thanks for all of your comments, @HyukjinKwon and @icexelloss !
I'd like to wait for more discussions / suggestions on whether or not we want a behavior change that makes this reproducer work, or a simple document change that'll just say PySpark doesn't support mismatching returnType.

All of what you guys mentioned are correct. Sorry for the mess, I actually got myself confused...
It's been a while since I first noticed the problem and came up with a patch, and obviously when I picked it back up I've lost some context as to what exactly failed in the first place...

Both @HyukjinKwon and @icexelloss correctly pointed out that the bug only happens when the udf() creation declared a mismatching type versus what it actually returns. In the reproducer, the declared UDF return type is the default string type but the actual return types were datetime.date / datetime.datetime. That followed the path of not going through in pyspark/sql/types.py, so it went through to Pyrolite getting unpickled as a java.util.Calendar.

A note on how I got here: the reason why my current PR (incorrectly) contained the cases for case (Calendar, DateType) and friends was that, initially I only had a reproducer for a Python UDF actually returning datetime.date but was using the default return type (string), and I had fixed it by introducing a case for converting from java.util.Calendar to the appropriate type in EvaluatePython.scala. At that time that case was certainly executed and it did give me correct results for that single reproducer. I did notice that the cases where the UDF return type was correctly declared was working correctly.

But then I realized I also needed to handle the case where I have to tell apart datetime.date and datetime.datetime, and then I can't do that in a single case (Calendar, StringType) in EvaluatePython.scala anymore because I'm lacking the type information from the Python side at that point. So I went back to the Python side and thought I needed to handle datetime.date / datetime.datetime separately, but eventually they ended up both being handled by just a str(value) coercion in a band-aid fix in udf.py. The version that @HyukjinKwon suggested above is indeed a proper version of that.
At this point the new code in EvaluatePython.scala and friends are dead code.

To address a point from @icexelloss :

The change that the PR proposes seem to be coercing python datetime.datetime and datetime.date to the python datetime string representation rather the java one.
The reason why I used str() there was that both Python and Spark SQL followed the same default string formats for date (yyyy-MM-dd) and datetime (yyyy-MM-dd HH:mm:ss), e.g.

static PyObject *
date_isoformat(PyDateTime_Date *self)
{
    return PyUnicode_FromFormat("%04d-%02d-%02d",
                                GET_YEAR(self), GET_MONTH(self), GET_DAY(self));
}

and

  // `SimpleDateFormat` is not thread-safe.
  private val threadLocalDateFormat = new ThreadLocal[DateFormat] {
    override def initialValue(): SimpleDateFormat = {
      new SimpleDateFormat("yyyy-MM-dd", Locale.US)
    }
  }

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jan 6, 2018

Hey @rednaxelafx that's fine. We all make mistake and I usually think it's generally better then not trying. I also made a mistake at the first time above. It was easier to debug this by reading your comments and all details in the PR description. Thank you.

I'd like to wait for more discussions / suggestions on whether or not we want a behavior change that makes this reproducer work, or a simple document change that'll just say PySpark doesn't support mismatching returnType.

So, few options might be ...

  1. Simply document this

  2. str logics in type.StringType - in this case, I think we should do a small banchmark. It wouldn't be so hard and I think you could reuse commands I used here - [SPARK-22025][PySpark] Speeding up fromInternal for StructField #19246 (comment) . Simple benchmark should be fine.

  3. Investigate the way to register a custom Pyrolite unpickler that converts datetime.date* to Timestamp or Date, and see if it's possible. I believe we already have some custom fixes there.

@HyukjinKwon
Copy link
Member

@ueshin @icexelloss @cloud-fan @rednaxelafx, which one would you prefer?

To me, I like 1 at most. If the perf diff is trivial, 2. is also fine. If 3. works fine, I think I am also fine with it.

}

/**
* Returns SQLTimestamp from java.util.Calendar (microseconds since epoch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Matching the comment of fromJavaCalendarForDate.)
nit: Returns the number of microseconds since epoch from java.util.Calendar.

# because the format of the string should be different, depending on the type of the input
# object. So for those two specific types we eagerly convert them to string here, where the
# Python type information is still intact.
if returnType == StringType():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question, why we need to handle this type conversion? If we expect correct string format, isn't it more reasonable to convert the date/datetime to strings in the udf, instead of adding this conversion implicitly?

@@ -120,10 +121,18 @@ object EvaluatePython {
case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale)

case (c: Int, DateType) => c
// Pyrolite will unpickle a Python datetime.date to a java.util.Calendar
case (c: Calendar, DateType) => DateTimeUtils.fromJavaCalendarForDate(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we will never hit this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems what we need is a case (c: Calendar, StringType) => ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he did this in Python side because here we don't know if Calendar is from datetime.date or datetime.datetime.

@HyukjinKwon
Copy link
Member

@cloud-fan, actually I have the similar question too - #20163 (comment). I tend to agree with it and I think we disallow this and document this.

Just want to check if you feel strongly about this. If we need to support this, I believe the ways are 2. or 3. in #20163 (comment).

@cloud-fan
Copy link
Contributor

The current behavior looks weird, we should either throw exception and ask users to give a corrected return type or fix it via proposal 2.

@@ -120,10 +121,18 @@ object EvaluatePython {
case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale)

case (c: Int, DateType) => c
// Pyrolite will unpickle a Python datetime.date to a java.util.Calendar
case (c: Calendar, DateType) => DateTimeUtils.fromJavaCalendarForDate(c)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we return null in this case? Other cases seems also returning null if it fails to be converted:

>>> from pyspark.sql.functions import udf
>>> f = udf(lambda x: x, "double")
>>> spark.range(1).select(f("id")).show()
+------------+
|<lambda>(id)|
+------------+
|        null|
+------------+

Seems we can do it like:

    case StringType => (obj: Any) => nullSafeConvert(obj) {
      case c: Calendar => null
      case _ => UTF8String.fromString(obj.toString)
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it's consistent with other un-convertible cases, but StringType is the default return type, I'm afraid many users many hit this and get confused.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Let's go ahead for 2. then. I am fine if it's done as an exception for practical purpose. Maybe we could add an if isinstance(.., basestring) and return directly as a shortcut. I haven't checked the perf diff but I think we can do it easily via profile as I mentioned above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about ^ @ueshin?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, 2. should work for StringType.

I'd also like to add some documents like 1. for users to be careful about the return type. I've found that udfs return null and pandas_udfs throw some exception in most case when the return type is mismatching.
Of course we can try to make the behavior differences between udf and pandas_udf closer as possible in the future, but I think it is the best effort basis for the mismatching return type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My 2 cents:

  • I am +0 for 2. I think having type coercion for str type but not for other types can be confusing to users. Realistically, I think any udf users would hit the null result for mismatch type anyway (I have hit it so many times..) and will learn that null means type mismatch. Even we make the behavior for str a bit friendly, they will likely to hit the issue with other types anyway. I don't think "default returnType for udf is str" is a strong reason for "having special type coercion for str", they seem orthogonal.
    I prefer that we keep the type mismatch behavior consistent for str types vs other types (return null) and document this more clearly.
  • I find returning null for type mismatch is an unintuitive behavior of row-at-a-time udf and prefer not to replicate it in pandas udf.

@ueshin
Copy link
Member

ueshin commented Jan 9, 2018

I investigated the behavior differences between udf and pandas_udf for the wrong return types and found there are many differences actually.
Basically udfs return null as @HyukjinKwon mentioned, whereas pandas_udfs throw some ArrowException. There seem some exceptions, though.

@HyukjinKwon
Copy link
Member

Probably we consider to catch and set nulls in pandas_udf if possible to match the behaviour with udf ...

@rednaxelafx
Copy link
Contributor Author

Given the above discussion, do we have consensus on all of the following:

  • Update the documentation for PySpark UDFs to warn about the behavior of mismatched declared returnType vs actual runtime return values
  • Make Python UDFs that declared returnType as StringType recognize java.util.Calendar and convert the value into a null (as in the example from @HyukjinKwon ), essentially marking it as unconvertible.

I believe we all agree on the first point. The second point above is in line with @icexelloss 's opinion, which I tend to agree in terms of API semantic consistency. It might not be as user-friendly as Option 2 from @HyukjinKwon , but it's less magic and more consistent. I tend to find more consistency leads to less surprises.

If we have consensus then I'll update the JIRA ticket and this PR to reflect that.

@cloud-fan
Copy link
Contributor

SGTM

@HyukjinKwon
Copy link
Member

One more SGTM

@viirya
Copy link
Member

viirya commented Jan 11, 2018

+1

1 similar comment
@icexelloss
Copy link
Contributor

+1

@rednaxelafx rednaxelafx changed the title [SPARK-22966][PySpark] Spark SQL should handle Python UDFs that return a datetime.date or datetime.datetime [SPARK-22966][PYTHON][SQL] Python UDFs with returnType=StringType should treat return values of datetime.date or datetime.datetime as unconvertible Jan 11, 2018
@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86000 has finished for PR 20163 at commit 4c7bcc1.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

…uld treat return values of datetime.date or datetime.datetime as unconvertible

Add conversion to PySpark to mark Python UDFs that declared returnType=StringType() but actually returned a datatime.date or datetime.datetime as unconvertible, i.e. converting it to null.

Also added a new unit test to pyspark/sql/tests.py to reflect current semantics of Python UDFs returning a value of mismatched type with the declared returnType.
@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86003 has finished for PR 20163 at commit d307cee.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rednaxelafx
Copy link
Contributor Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86023 has finished for PR 20163 at commit d307cee.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -144,6 +145,7 @@ object EvaluatePython {
}

case StringType => (obj: Any) => nullSafeConvert(obj) {
case _: Calendar => null
case _ => UTF8String.fromString(obj.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we blacklist more types? e.g. if a udf returns decimal and mark the return type as string type, is it a mismatch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was pounding on that yesterday, too... somehow I have this feeling that no matter which direction we take, there's no good answer to type mismatch situations.

Let's say if we blacklist more types, should we document the list so that it's clear what will definitely NOT work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the perfectness, I think we should check all the types, https://github.com/irmen/Pyrolite,

PYTHON    ---->     JAVA
------              ----
None                null
bool                boolean
int                 int
long                long or BigInteger  (depending on size)
string              String
unicode             String
complex             net.razorvine.pickle.objects.ComplexNumber
datetime.date       java.util.Calendar
datetime.datetime   java.util.Calendar
datetime.time       net.razorvine.pickle.objects.Time
datetime.timedelta  net.razorvine.pickle.objects.TimeDelta
float               double   (float isn't used) 
array.array         array of appropriate primitive type (char, int, short, long, float, double)
list                java.util.List<Object>
tuple               Object[]
set                 java.util.Set
dict                java.util.Map
bytes               byte[]
bytearray           byte[]
decimal             BigDecimal    
custom class        Map<String, Object>  (dict with class attributes including its name in "__class__")
Pyro4.core.URI      net.razorvine.pyro.PyroURI
Pyro4.core.Proxy    net.razorvine.pyro.PyroProxy
Pyro4.errors.*      net.razorvine.pyro.PyroException
Pyro4.utils.flame.FlameBuiltin     net.razorvine.pyro.FlameBuiltin 
Pyro4.utils.flame.FlameModule      net.razorvine.pyro.FlameModule 
Pyro4.utils.flame.RemoteInteractiveConsole    net.razorvine.pyro.FlameRemoteConsole 

and then check if the string conversion looks reasonably consistent by obj.toString. If not, we add it in the blacklist.

Another possibility is to whitelist String, but then I guess this is rather a radical change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if the string conversion looks reasonably consistent by obj.toString. If not, we add it in the blacklist.

hmm, this seems weird as the type mismatch now is defined by Pyrolite object's toString behavior...

Copy link
Member

@HyukjinKwon HyukjinKwon Jan 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, for now .. I think it's fine as a small fix as is ... We are going to document that the return type and return value should be matched anyway ..

So, expected return values will be (including dict, list, tuple and array):

# Mapping Python types to Spark SQL DataType
_type_mappings = {
type(None): NullType,
bool: BooleanType,
int: LongType,
float: DoubleType,
str: StringType,
bytearray: BinaryType,
decimal.Decimal: DecimalType,
datetime.date: DateType,
datetime.datetime: TimestampType,
datetime.time: TimestampType,
}
if sys.version < "3":
_type_mappings.update({
unicode: StringType,
long: LongType,
})

Seems, we can also check if the string conversion looks reasonable and then blacklist net.razorvine.pickle.objects.Time if not ...

How does this sound to you @cloud-fan and @rednaxelafx?

Copy link
Member

@HyukjinKwon HyukjinKwon Jan 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, seems there is another hole when we actually do the internal conversion with unexpected types:

>>> from pyspark.sql.functions import udf
>>> f = udf(lambda x: x, "date")
>>> spark.range(1).select(f("id")).show()
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "./python/pyspark/worker.py", line 229, in main
    process()
  File "./python/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "./python/pyspark/worker.py", line 149, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "<string>", line 1, in <lambda>
  File "./python/pyspark/worker.py", line 72, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/.../pyspark/sql/types.py", line 175, in toInternal
    return d.toordinal() - self.EPOCH_ORDINAL
AttributeError: 'int' object has no attribute 'toordinal'

another hole

>>> from pyspark.sql.functions import udf, struct
>>> f = udf(lambda x: x, "string")
>>> spark.range(1).select(f(struct("id"))).show()
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:86)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:85)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no perfect solution .. I think #20163 (comment) sounds good enough as a fix for this issue for now ..

Copy link
Member

@HyukjinKwon HyukjinKwon Jan 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, how about something like this then?

    case StringType => (obj: Any) => nullSafeConvert(obj) {
      // Shortcut for string conversion
      case c: String => UTF8String.fromString(c)

      // Here, we return null for 'array', 'tuple', 'dict', 'list', 'datetime.datetime',
      // 'datetime.date' and 'datetime.time' because those string conversions are
      // not quite consistent with SQL string representation of data.
      case _: java.util.Calendar | _: net.razorvine.pickle.objects.Time |
           _: java.util.List[_] | _: java.util.Map[_, _] =>
        null
      case c if c.getClass.isArray => null

      // Here, we keep the string conversion fall back for compatibility.
      // TODO: We should revisit this and rewrite the type conversion logic in Spark 3.x.
      case c => UTF8String.fromString(c.toString)
    }

My few tests:

datetime.time:

from pyspark.sql.functions import udf
from datetime import time

f = udf(lambda x: time(0, 0), "string")
spark.range(1).select(f("id")).show()
+--------------------+
|        <lambda>(id)|
+--------------------+
|Time: 0 hours, 0 ...|
+--------------------+

array:

from pyspark.sql.functions import udf
import array

f = udf(lambda x: array.array("c", "aaa"), "string")
spark.range(1).select(f("id")).show()
+------------+
|<lambda>(id)|
+------------+
| [C@11618d9e|
+------------+

tuple:

from pyspark.sql.functions import udf

f = udf(lambda x: (x,), "string")
spark.range(1).select(f("id")).show()
+--------------------+
|        <lambda>(id)|
+--------------------+
|[Ljava.lang.Objec...|
+--------------------+

list:

from pyspark.sql.functions import udf
from datetime import datetime

f = udf(lambda x: [datetime(1990, 1, 1)], "string")
spark.range(1).select(f("id")).show()
+--------------------+
|        <lambda>(id)|
+--------------------+
|[java.util.Gregor...|
+--------------------+

dict:

from pyspark.sql.functions import udf
from datetime import datetime

f = udf(lambda x: {1: datetime(1990, 1, 1)}, "string")
spark.range(1).select(f("id")).show()
+--------------------+
|        <lambda>(id)|
+--------------------+
|{1=java.util.Greg...|
+--------------------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, the array case seems a bit weird?

asfgit pushed a commit that referenced this pull request Oct 8, 2018
…hon data and SQL types in normal UDFs

### What changes were proposed in this pull request?

We are facing some problems about type conversions between Python data and SQL types in UDFs (Pandas UDFs as well).
It's even difficult to identify the problems (see #20163 and #22610).

This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them.

```python
import sys
import array
import datetime
from decimal import Decimal

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import udf

if sys.version >= '3':
    long = int

data = [
    None,
    True,
    1,
    long(1),
    "a",
    u"a",
    datetime.date(1970, 1, 1),
    datetime.datetime(1970, 1, 1, 0, 0),
    1.0,
    array.array("i", [1]),
    [1],
    (1,),
    bytearray([65, 66, 67]),
    Decimal(1),
    {"a": 1},
    Row(kwargs=1),
    Row("namedtuple")(1),
]

types =  [
    BooleanType(),
    ByteType(),
    ShortType(),
    IntegerType(),
    LongType(),
    StringType(),
    DateType(),
    TimestampType(),
    FloatType(),
    DoubleType(),
    ArrayType(IntegerType()),
    BinaryType(),
    DecimalType(10, 0),
    MapType(StringType(), IntegerType()),
    StructType([StructField("_1", IntegerType())]),
]

df = spark.range(1)
results = []
count = 0
total = len(types) * len(data)
spark.sparkContext.setLogLevel("FATAL")
for t in types:
    result = []
    for v in data:
        try:
            row = df.select(udf(lambda: v, t)()).first()
            ret_str = repr(row[0])
        except Exception:
            ret_str = "X"
        result.append(ret_str)
        progress = "SQL Type: [%s]\n  Python Value: [%s(%s)]\n  Result Python Value: [%s]" % (
            t.simpleString(), str(v), type(v).__name__, ret_str)
        count += 1
        print("%s/%s:\n  %s" % (count, total, progress))
    results.append([t.simpleString()] + list(map(str, result)))

schema = ["SQL Type \\ Python Value(Type)"] + list(map(lambda v: "%s(%s)" % (str(v), type(v).__name__), data))
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False)
print("\n".join(map(lambda line: "    # %s  # noqa" % line, strings.strip().split("\n"))))
```

This table was generated under Python 2 but the code above is Python 3 compatible as well.

## How was this patch tested?

Manually tested and lint check.

Closes #22655 from HyukjinKwon/SPARK-25666.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
asfgit pushed a commit that referenced this pull request Oct 24, 2018
…das data and SQL types in Pandas UDFs

## What changes were proposed in this pull request?

We are facing some problems about type conversions between Pandas data and SQL types in Pandas UDFs.
It's even difficult to identify the problems (see #20163 and #22610).

This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them.

Table can be generated via the codes below:

```python
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf

columns = [
    ('none', 'object(NoneType)'),
    ('bool', 'bool'),
    ('int8', 'int8'),
    ('int16', 'int16'),
    ('int32', 'int32'),
    ('int64', 'int64'),
    ('uint8', 'uint8'),
    ('uint16', 'uint16'),
    ('uint32', 'uint32'),
    ('uint64', 'uint64'),
    ('float64', 'float16'),
    ('float64', 'float32'),
    ('float64', 'float64'),
    ('date', 'datetime64[ns]'),
    ('tz_aware_dates', 'datetime64[ns, US/Eastern]'),
    ('string', 'object(string)'),
    ('decimal', 'object(Decimal)'),
    ('array', 'object(array[int32])'),
    ('float128', 'float128'),
    ('complex64', 'complex64'),
    ('complex128', 'complex128'),
    ('category', 'category'),
    ('tdeltas', 'timedelta64[ns]'),
]

def create_dataframe():
    import pandas as pd
    import numpy as np
    import decimal
    pdf = pd.DataFrame({
        'none': [None, None],
        'bool': [True, False],
        'int8': np.arange(1, 3).astype('int8'),
        'int16': np.arange(1, 3).astype('int16'),
        'int32': np.arange(1, 3).astype('int32'),
        'int64': np.arange(1, 3).astype('int64'),
        'uint8': np.arange(1, 3).astype('uint8'),
        'uint16': np.arange(1, 3).astype('uint16'),
        'uint32': np.arange(1, 3).astype('uint32'),
        'uint64': np.arange(1, 3).astype('uint64'),
        'float16': np.arange(1, 3).astype('float16'),
        'float32': np.arange(1, 3).astype('float32'),
        'float64': np.arange(1, 3).astype('float64'),
        'float128': np.arange(1, 3).astype('float128'),
        'complex64': np.arange(1, 3).astype('complex64'),
        'complex128': np.arange(1, 3).astype('complex128'),
        'string': list('ab'),
        'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), np.array([1, 2, 3], dtype=np.int32)]),
        'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]),
        'date': pd.date_range('19700101', periods=2).values,
        'category': pd.Series(list("AB")).astype('category')})
    pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]]
    pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, tz='US/Eastern')
    return pdf

types =  [
    BooleanType(),
    ByteType(),
    ShortType(),
    IntegerType(),
    LongType(),
    FloatType(),
    DoubleType(),
    DateType(),
    TimestampType(),
    StringType(),
    DecimalType(10, 0),
    ArrayType(IntegerType()),
    MapType(StringType(), IntegerType()),
    StructType([StructField("_1", IntegerType())]),
    BinaryType(),
]

df = spark.range(2).repartition(1)
results = []
count = 0
total = len(types) * len(columns)
values = []
spark.sparkContext.setLogLevel("FATAL")
for t in types:
    result = []
    for column, pandas_t in columns:
        v = create_dataframe()[column][0]
        values.append(v)
        try:
            row = df.select(pandas_udf(lambda _: create_dataframe()[column], t)(df.id)).first()
            ret_str = repr(row[0])
        except Exception:
            ret_str = "X"
        result.append(ret_str)
        progress = "SQL Type: [%s]\n  Pandas Value(Type): %s(%s)]\n  Result Python Value: [%s]" % (
            t.simpleString(), v, pandas_t, ret_str)
        count += 1
        print("%s/%s:\n  %s" % (count, total, progress))
    results.append([t.simpleString()] + list(map(str, result)))

schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda values_column: "%s(%s)" % (values_column[0], values_column[1][1]), zip(values, columns)))
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False)
print("\n".join(map(lambda line: "    # %s  # noqa" % line, strings.strip().split("\n"))))

```

This code is compatible with both Python 2 and 3 but the table was generated under Python 2.

## How was this patch tested?

Manually tested and lint check.

Closes #22795 from HyukjinKwon/SPARK-25798.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
@HyukjinKwon
Copy link
Member

Let's leave this closed.

@asfgit asfgit closed this in a3ba3a8 Nov 11, 2018
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…hon data and SQL types in normal UDFs

### What changes were proposed in this pull request?

We are facing some problems about type conversions between Python data and SQL types in UDFs (Pandas UDFs as well).
It's even difficult to identify the problems (see apache#20163 and apache#22610).

This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them.

```python
import sys
import array
import datetime
from decimal import Decimal

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import udf

if sys.version >= '3':
    long = int

data = [
    None,
    True,
    1,
    long(1),
    "a",
    u"a",
    datetime.date(1970, 1, 1),
    datetime.datetime(1970, 1, 1, 0, 0),
    1.0,
    array.array("i", [1]),
    [1],
    (1,),
    bytearray([65, 66, 67]),
    Decimal(1),
    {"a": 1},
    Row(kwargs=1),
    Row("namedtuple")(1),
]

types =  [
    BooleanType(),
    ByteType(),
    ShortType(),
    IntegerType(),
    LongType(),
    StringType(),
    DateType(),
    TimestampType(),
    FloatType(),
    DoubleType(),
    ArrayType(IntegerType()),
    BinaryType(),
    DecimalType(10, 0),
    MapType(StringType(), IntegerType()),
    StructType([StructField("_1", IntegerType())]),
]

df = spark.range(1)
results = []
count = 0
total = len(types) * len(data)
spark.sparkContext.setLogLevel("FATAL")
for t in types:
    result = []
    for v in data:
        try:
            row = df.select(udf(lambda: v, t)()).first()
            ret_str = repr(row[0])
        except Exception:
            ret_str = "X"
        result.append(ret_str)
        progress = "SQL Type: [%s]\n  Python Value: [%s(%s)]\n  Result Python Value: [%s]" % (
            t.simpleString(), str(v), type(v).__name__, ret_str)
        count += 1
        print("%s/%s:\n  %s" % (count, total, progress))
    results.append([t.simpleString()] + list(map(str, result)))

schema = ["SQL Type \\ Python Value(Type)"] + list(map(lambda v: "%s(%s)" % (str(v), type(v).__name__), data))
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False)
print("\n".join(map(lambda line: "    # %s  # noqa" % line, strings.strip().split("\n"))))
```

This table was generated under Python 2 but the code above is Python 3 compatible as well.

## How was this patch tested?

Manually tested and lint check.

Closes apache#22655 from HyukjinKwon/SPARK-25666.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…das data and SQL types in Pandas UDFs

## What changes were proposed in this pull request?

We are facing some problems about type conversions between Pandas data and SQL types in Pandas UDFs.
It's even difficult to identify the problems (see apache#20163 and apache#22610).

This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them.

Table can be generated via the codes below:

```python
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf

columns = [
    ('none', 'object(NoneType)'),
    ('bool', 'bool'),
    ('int8', 'int8'),
    ('int16', 'int16'),
    ('int32', 'int32'),
    ('int64', 'int64'),
    ('uint8', 'uint8'),
    ('uint16', 'uint16'),
    ('uint32', 'uint32'),
    ('uint64', 'uint64'),
    ('float64', 'float16'),
    ('float64', 'float32'),
    ('float64', 'float64'),
    ('date', 'datetime64[ns]'),
    ('tz_aware_dates', 'datetime64[ns, US/Eastern]'),
    ('string', 'object(string)'),
    ('decimal', 'object(Decimal)'),
    ('array', 'object(array[int32])'),
    ('float128', 'float128'),
    ('complex64', 'complex64'),
    ('complex128', 'complex128'),
    ('category', 'category'),
    ('tdeltas', 'timedelta64[ns]'),
]

def create_dataframe():
    import pandas as pd
    import numpy as np
    import decimal
    pdf = pd.DataFrame({
        'none': [None, None],
        'bool': [True, False],
        'int8': np.arange(1, 3).astype('int8'),
        'int16': np.arange(1, 3).astype('int16'),
        'int32': np.arange(1, 3).astype('int32'),
        'int64': np.arange(1, 3).astype('int64'),
        'uint8': np.arange(1, 3).astype('uint8'),
        'uint16': np.arange(1, 3).astype('uint16'),
        'uint32': np.arange(1, 3).astype('uint32'),
        'uint64': np.arange(1, 3).astype('uint64'),
        'float16': np.arange(1, 3).astype('float16'),
        'float32': np.arange(1, 3).astype('float32'),
        'float64': np.arange(1, 3).astype('float64'),
        'float128': np.arange(1, 3).astype('float128'),
        'complex64': np.arange(1, 3).astype('complex64'),
        'complex128': np.arange(1, 3).astype('complex128'),
        'string': list('ab'),
        'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), np.array([1, 2, 3], dtype=np.int32)]),
        'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]),
        'date': pd.date_range('19700101', periods=2).values,
        'category': pd.Series(list("AB")).astype('category')})
    pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]]
    pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, tz='US/Eastern')
    return pdf

types =  [
    BooleanType(),
    ByteType(),
    ShortType(),
    IntegerType(),
    LongType(),
    FloatType(),
    DoubleType(),
    DateType(),
    TimestampType(),
    StringType(),
    DecimalType(10, 0),
    ArrayType(IntegerType()),
    MapType(StringType(), IntegerType()),
    StructType([StructField("_1", IntegerType())]),
    BinaryType(),
]

df = spark.range(2).repartition(1)
results = []
count = 0
total = len(types) * len(columns)
values = []
spark.sparkContext.setLogLevel("FATAL")
for t in types:
    result = []
    for column, pandas_t in columns:
        v = create_dataframe()[column][0]
        values.append(v)
        try:
            row = df.select(pandas_udf(lambda _: create_dataframe()[column], t)(df.id)).first()
            ret_str = repr(row[0])
        except Exception:
            ret_str = "X"
        result.append(ret_str)
        progress = "SQL Type: [%s]\n  Pandas Value(Type): %s(%s)]\n  Result Python Value: [%s]" % (
            t.simpleString(), v, pandas_t, ret_str)
        count += 1
        print("%s/%s:\n  %s" % (count, total, progress))
    results.append([t.simpleString()] + list(map(str, result)))

schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda values_column: "%s(%s)" % (values_column[0], values_column[1][1]), zip(values, columns)))
strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False)
print("\n".join(map(lambda line: "    # %s  # noqa" % line, strings.strip().split("\n"))))

```

This code is compatible with both Python 2 and 3 but the table was generated under Python 2.

## How was this patch tested?

Manually tested and lint check.

Closes apache#22795 from HyukjinKwon/SPARK-25798.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants