Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22000][SQL] Address missing Upcast in JavaTypeInference.deserializerFor #23854

Closed
wants to merge 13 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Feb 21, 2019

What changes were proposed in this pull request?

Spark expects the type of column and the type of matching field is same when deserializing to Object, but Spark hasn't actually restrict it (at least for Java bean encoder) and some users just do it and experience undefined behavior (in SPARK-22000, Spark throws compilation failure on generated code because it calls .toString() against primitive type.

It doesn't produce error in Scala side because ScalaReflection.deserializerFor properly inject Upcast if necessary. This patch proposes applying same thing to JavaTypeInference.deserializerFor as well.

Credit to @srowen, @maropu, and @cloud-fan since they provided various approaches to solve this.

How was this patch tested?

Added UT which query is slightly modified based on sample code in attachment on JIRA issue.

@HeartSaVioR
Copy link
Contributor Author

cc. @maropu @kiszk @srowen since they had discussion on JIRA issue.

@maropu
Copy link
Member

maropu commented Feb 21, 2019

ok to test

.option("header", "true")
.option("mode", "DROPMALFORMED")
.schema("ref int, userId string, x int, y int")
.load("src/test/resources/test-data/spark-22000.csv")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to read test data from a file instead of spark.createDataFrame(...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not. Let me try to change not to use file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

.format("csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.schema("ref int, userId string, x int, y int")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add tests for more types other than int?

@HeartSaVioR
Copy link
Contributor Author

Thanks @maropu I addressed your comments. The code got much longer - in Java world I had to deal with Row directly, as well as I need to add equals() / hashCode() (this is actually optional but best practice would implement along with equals()) / toString(). Hope this is OK.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put some minor comments on the test, but, I am not sure this change matches the JIRA? The issue concerned calling .toString on primitive types, not String. I suggested String.valueOf because it accepts primitives.

This change alters behavior a little bit. Now a null is deserialized as "null" not null. I'm not sure we want that, nor was that an issue before; can this be called on null anyway?

@@ -115,6 +124,70 @@ public void testBeanWithMapFieldsDeserialization() {
Assert.assertEquals(records, MAP_RECORDS);
}

private static final List<Row> ROWS_SPARK_22000 = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the static initialization could just be in a static block rather than split it up, but could this all be local to the new test method if that's the only place it's used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just followed the approach the test class already has, but I agree they can be local to new test method. Will address.

@@ -252,4 +325,116 @@ public String toString() {
return String.format("[%d,%d]", startTime, endTime);
}
}

public static class RecordSpark22000 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final just to be tidy?

}

@Override
public String toString() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need toString? I understand hashCode and equals

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will help to compare expected and actual when test fails. Otherwise they would've seen as Object.toString() does and it doesn't provide any information why they are not equal.

@SparkQA
Copy link

SparkQA commented Feb 21, 2019

Test build #102584 has finished for PR 23854 at commit 443e74f.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 21, 2019

First of all, I'm not a catalyst expert (actually I'm a beginner on this area) so I might be wrong, so please correct me if I'm mistaken.

The issue concerned calling .toString on primitive types, not String.

Why it was happening is IntegerType is matched to String type of field when deserializing to Java Bean. If it was matched to Integer type of field, Integer.valueOf() would be called.

Looks like Spark doesn't just restrict matching type should be same. It loosens the restriction and handles some implicit type conversion via leveraging Java API - and primitive types are not compatible with how String deals with (.toString()) whereas it can still be converted in other way (String.valueOf()).

This change alters behavior a little bit. Now a null is deserialized as "null" not null. I'm not sure we want that, nor was that an issue before; can this be called on null anyway?

Unless I misread javadoc or javadoc is wrong, StaticInvoke would take care of that. This change would not incur calling String.valueOf(null), otherwise passing null should be happening to other types in same if statement as well, and it will not fail to compile generated code but throw NumberFormatException for numeric types when generated code is running.

/**
* Invokes a static function, returning the result. By default, any of the arguments being null
* will result in returning null instead of calling the function.
*
* @param staticObject The target of the static call. This can either be the object itself
* (methods defined on scala objects), or the class object
* (static methods defined in java).
* @param dataType The expected return type of the function call
* @param functionName The name of the method to call.
* @param arguments An optional list of expressions to pass as arguments to the function.
* @param propagateNull When true, and any of the arguments is null, null will be returned instead
* of calling the function.
* @param returnNullable When false, indicating the invoked method will always return
* non-null value.
*/
case class StaticInvoke(
staticObject: Class[_],
dataType: DataType,
functionName: String,
arguments: Seq[Expression] = Nil,
propagateNull: Boolean = true,
returnNullable: Boolean = true) extends InvokeLike {

Btw I guess the case can be added in new UT - we don't need to worry about it after we apply the case in UT. I'll try.

@HeartSaVioR HeartSaVioR changed the title [SPARK-22000][SQL] Use String.valueOf to assign value to String type of field in Java Bean [SPARK-22000][SQL] Use String.valueOf in generated code to assign value to String type of field in Java Bean Encoder Feb 21, 2019
@srowen
Copy link
Member

srowen commented Feb 21, 2019

I agree with your argument about null; it would never have worked here anyway. It feels like there should be an assertion about it to make sure, as if a null reaches here it returns "null" rather than an exception.

I also don't know this part well enough, but primitives are matched to string? That sounds weird. It seems like primitives are already handled in

      case c if !inferExternalType(c).isInstanceOf[ObjectType] => path

It wasn't clear from the JIRA, initially, how this even happened. Now there's a simple test case attached. Could we paste that in as a test here, and see if it even fails without the change? and if so, how.

The test case suggests that a String field is the problem, but it has toString and isn't handled by this part.

@HeartSaVioR
Copy link
Contributor Author

Now there's a simple test case attached. Could we paste that in as a test here, and see if it even fails without the change? and if so, how.

The added UT was slightly modified from attached test case: I just modified it based on review comments. Previously added test failed without the change, and the revised test remains the same.

I agree with your argument about null; it would never have worked here anyway. It feels like there should be an assertion about it to make sure, as if a null reaches here it returns "null" rather than an exception.

No I meant null has been handled well and modified code also works well with null. Please find String.valueOf from generated code - it is guarded with checking null. If the value is null, String.valueOf is not called and the result is null. That's what propagateNull does for us.

07:13:10.684 DEBUG org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection: code for initializejavabean(newInstance(class test.org.apache.spark.sql.JavaBeanDeserializationSuite$RecordSpark22000), (setBooleanField,staticinvoke(class java.lang.String, ObjectType(class java.lang.String), valueOf, input[6, boolean, true], true, false)), (setLongField,staticinvoke(class java.lang.String, ObjectType(class java.lang.String), valueOf, input[2, bigint, true], true, false)), (setIntField,staticinvoke(class java.lang.String, ObjectType(class java.lang.String), valueOf, input[1, int, true], true, false)), (setStringField,staticinvoke(class java.lang.String, ObjectType(class java.lang.String), valueOf, input[5, string, true], true, false)), (setDoubleField,staticinvoke(class java.lang.String, ObjectType(class java.lang.String), valueOf, input[4, double, true], true, false)), (setShortField,staticinvoke(class java.lang.String, ObjectType(class java.lang.String), valueOf, input[0, smallint, true], true, false)), (setFloatField,staticinvoke(class java.lang.String, ObjectType(class java.lang.String), valueOf, input[3, float, true], true, false)), (setTimestampField,staticinvoke(class java.lang.String, ObjectType(class java.lang.String), valueOf, input[7, timestamp, true], true, false)), (setNullIntField,staticinvoke(class java.lang.String, ObjectType(class java.lang.String), valueOf, input[8, int, true], true, false))):
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificSafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private InternalRow mutableRow;
/* 009 */   private boolean resultIsNull_0;
/* 010 */   private boolean argValue_0;
/* 011 */   private boolean resultIsNull_1;
/* 012 */   private long argValue_1;
/* 013 */   private boolean resultIsNull_2;
/* 014 */   private int argValue_2;
/* 015 */   private boolean resultIsNull_3;
/* 016 */   private boolean resultIsNull_4;
/* 017 */   private double argValue_3;
/* 018 */   private boolean resultIsNull_5;
/* 019 */   private short argValue_4;
/* 020 */   private boolean resultIsNull_6;
/* 021 */   private float argValue_5;
/* 022 */   private boolean resultIsNull_7;
/* 023 */   private long argValue_6;
/* 024 */   private boolean resultIsNull_8;
/* 025 */   private int argValue_7;
/* 026 */   private UTF8String[] mutableStateArray_0 = new UTF8String[1];
/* 027 */
/* 028 */   public SpecificSafeProjection(Object[] references) {
/* 029 */     this.references = references;
/* 030 */     mutableRow = (InternalRow) references[references.length - 1];
/* 031 */
/* 032 */
/* 033 */   }
/* 034 */
/* 035 */   public void initialize(int partitionIndex) {
/* 036 */
/* 037 */   }
/* 038 */
/* 039 */   public java.lang.Object apply(java.lang.Object _i) {
/* 040 */     InternalRow i = (InternalRow) _i;
/* 041 */     final test.org.apache.spark.sql.JavaBeanDeserializationSuite$RecordSpark22000 value_1 = false ?
/* 042 */     null : new test.org.apache.spark.sql.JavaBeanDeserializationSuite$RecordSpark22000();
/* 043 */     test.org.apache.spark.sql.JavaBeanDeserializationSuite$RecordSpark22000 javaBean_0 = value_1;
/* 044 */     if (!false) {
/* 045 */       initializeJavaBean_0_0(i, javaBean_0);
/* 046 */       initializeJavaBean_0_1(i, javaBean_0);
/* 047 */       initializeJavaBean_0_2(i, javaBean_0);
/* 048 */     }
/* 049 */     if (false) {
/* 050 */       mutableRow.setNullAt(0);
/* 051 */     } else {
/* 052 */
/* 053 */       mutableRow.update(0, value_1);
/* 054 */     }
/* 055 */
/* 056 */     return mutableRow;
/* 057 */   }
/* 058 */
/* 059 */
/* 060 */   private void initializeJavaBean_0_2(InternalRow i, test.org.apache.spark.sql.JavaBeanDeserializationSuite$RecordSpark22000 javaBean_0) {
/* 061 */
/* 062 */     resultIsNull_6 = false;
/* 063 */     if (!resultIsNull_6) {
/* 064 */       boolean isNull_15 = i.isNullAt(3);
/* 065 */       float value_15 = isNull_15 ?
/* 066 */       -1.0f : (i.getFloat(3));
/* 067 */       resultIsNull_6 = isNull_15;
/* 068 */       argValue_5 = value_15;
/* 069 */     }
/* 070 */
/* 071 */     boolean isNull_14 = resultIsNull_6;
/* 072 */     java.lang.String value_14 = null;
/* 073 */     if (!resultIsNull_6) {
/* 074 */       value_14 = java.lang.String.valueOf(argValue_5);
/* 075 */     }
/* 076 */     if (!isNull_14) {
/* 077 */       javaBean_0.setFloatField(value_14);
/* 078 */     }
/* 079 */
/* 080 */     resultIsNull_7 = false;
/* 081 */     if (!resultIsNull_7) {
/* 082 */       boolean isNull_17 = i.isNullAt(7);
/* 083 */       long value_17 = isNull_17 ?
/* 084 */       -1L : (i.getLong(7));
/* 085 */       resultIsNull_7 = isNull_17;
/* 086 */       argValue_6 = value_17;
/* 087 */     }
/* 088 */
/* 089 */     boolean isNull_16 = resultIsNull_7;
/* 090 */     java.lang.String value_16 = null;
/* 091 */     if (!resultIsNull_7) {
/* 092 */       value_16 = java.lang.String.valueOf(argValue_6);
/* 093 */     }
/* 094 */     if (!isNull_16) {
/* 095 */       javaBean_0.setTimestampField(value_16);
/* 096 */     }
/* 097 */
/* 098 */     resultIsNull_8 = false;
/* 099 */     if (!resultIsNull_8) {
/* 100 */       boolean isNull_19 = i.isNullAt(8);
/* 101 */       int value_19 = isNull_19 ?
/* 102 */       -1 : (i.getInt(8));
/* 103 */       resultIsNull_8 = isNull_19;
/* 104 */       argValue_7 = value_19;
/* 105 */     }
/* 106 */
/* 107 */     boolean isNull_18 = resultIsNull_8;
/* 108 */     java.lang.String value_18 = null;
/* 109 */     if (!resultIsNull_8) {
/* 110 */       value_18 = java.lang.String.valueOf(argValue_7);
/* 111 */     }
/* 112 */     if (!isNull_18) {
/* 113 */       javaBean_0.setNullIntField(value_18);
/* 114 */     }
/* 115 */
/* 116 */   }
/* 117 */
/* 118 */
/* 119 */   private void initializeJavaBean_0_1(InternalRow i, test.org.apache.spark.sql.JavaBeanDeserializationSuite$RecordSpark22000 javaBean_0) {
/* 120 */
/* 121 */     resultIsNull_3 = false;
/* 122 */     if (!resultIsNull_3) {
/* 123 */       boolean isNull_9 = i.isNullAt(5);
/* 124 */       UTF8String value_9 = isNull_9 ?
/* 125 */       null : (i.getUTF8String(5));
/* 126 */       resultIsNull_3 = isNull_9;
/* 127 */       mutableStateArray_0[0] = value_9;
/* 128 */     }
/* 129 */
/* 130 */     boolean isNull_8 = resultIsNull_3;
/* 131 */     java.lang.String value_8 = null;
/* 132 */     if (!resultIsNull_3) {
/* 133 */       value_8 = java.lang.String.valueOf(mutableStateArray_0[0]);
/* 134 */     }
/* 135 */     if (!isNull_8) {
/* 136 */       javaBean_0.setStringField(value_8);
/* 137 */     }
/* 138 */
/* 139 */     resultIsNull_4 = false;
/* 140 */     if (!resultIsNull_4) {
/* 141 */       boolean isNull_11 = i.isNullAt(4);
/* 142 */       double value_11 = isNull_11 ?
/* 143 */       -1.0 : (i.getDouble(4));
/* 144 */       resultIsNull_4 = isNull_11;
/* 145 */       argValue_3 = value_11;
/* 146 */     }
/* 147 */
/* 148 */     boolean isNull_10 = resultIsNull_4;
/* 149 */     java.lang.String value_10 = null;
/* 150 */     if (!resultIsNull_4) {
/* 151 */       value_10 = java.lang.String.valueOf(argValue_3);
/* 152 */     }
/* 153 */     if (!isNull_10) {
/* 154 */       javaBean_0.setDoubleField(value_10);
/* 155 */     }
/* 156 */
/* 157 */     resultIsNull_5 = false;
/* 158 */     if (!resultIsNull_5) {
/* 159 */       boolean isNull_13 = i.isNullAt(0);
/* 160 */       short value_13 = isNull_13 ?
/* 161 */       (short)-1 : (i.getShort(0));
/* 162 */       resultIsNull_5 = isNull_13;
/* 163 */       argValue_4 = value_13;
/* 164 */     }
/* 165 */
/* 166 */     boolean isNull_12 = resultIsNull_5;
/* 167 */     java.lang.String value_12 = null;
/* 168 */     if (!resultIsNull_5) {
/* 169 */       value_12 = java.lang.String.valueOf(argValue_4);
/* 170 */     }
/* 171 */     if (!isNull_12) {
/* 172 */       javaBean_0.setShortField(value_12);
/* 173 */     }
/* 174 */
/* 175 */   }
/* 176 */
/* 177 */
/* 178 */   private void initializeJavaBean_0_0(InternalRow i, test.org.apache.spark.sql.JavaBeanDeserializationSuite$RecordSpark22000 javaBean_0) {
/* 179 */
/* 180 */     resultIsNull_0 = false;
/* 181 */     if (!resultIsNull_0) {
/* 182 */       boolean isNull_3 = i.isNullAt(6);
/* 183 */       boolean value_3 = isNull_3 ?
/* 184 */       false : (i.getBoolean(6));
/* 185 */       resultIsNull_0 = isNull_3;
/* 186 */       argValue_0 = value_3;
/* 187 */     }
/* 188 */
/* 189 */     boolean isNull_2 = resultIsNull_0;
/* 190 */     java.lang.String value_2 = null;
/* 191 */     if (!resultIsNull_0) {
/* 192 */       value_2 = java.lang.String.valueOf(argValue_0);
/* 193 */     }
/* 194 */     if (!isNull_2) {
/* 195 */       javaBean_0.setBooleanField(value_2);
/* 196 */     }
/* 197 */
/* 198 */     resultIsNull_1 = false;
/* 199 */     if (!resultIsNull_1) {
/* 200 */       boolean isNull_5 = i.isNullAt(2);
/* 201 */       long value_5 = isNull_5 ?
/* 202 */       -1L : (i.getLong(2));
/* 203 */       resultIsNull_1 = isNull_5;
/* 204 */       argValue_1 = value_5;
/* 205 */     }
/* 206 */
/* 207 */     boolean isNull_4 = resultIsNull_1;
/* 208 */     java.lang.String value_4 = null;
/* 209 */     if (!resultIsNull_1) {
/* 210 */       value_4 = java.lang.String.valueOf(argValue_1);
/* 211 */     }
/* 212 */     if (!isNull_4) {
/* 213 */       javaBean_0.setLongField(value_4);
/* 214 */     }
/* 215 */
/* 216 */     resultIsNull_2 = false;
/* 217 */     if (!resultIsNull_2) {
/* 218 */       boolean isNull_7 = i.isNullAt(1);
/* 219 */       int value_7 = isNull_7 ?
/* 220 */       -1 : (i.getInt(1));
/* 221 */       resultIsNull_2 = isNull_7;
/* 222 */       argValue_2 = value_7;
/* 223 */     }
/* 224 */
/* 225 */     boolean isNull_6 = resultIsNull_2;
/* 226 */     java.lang.String value_6 = null;
/* 227 */     if (!resultIsNull_2) {
/* 228 */       value_6 = java.lang.String.valueOf(argValue_2);
/* 229 */     }
/* 230 */     if (!isNull_6) {
/* 231 */       javaBean_0.setIntField(value_6);
/* 232 */     }
/* 233 */
/* 234 */   }
/* 235 */
/* 236 */ }

So it's completely safe to use String.valueOf() for String type, even other types in if statement.

@HeartSaVioR
Copy link
Contributor Author

      case c if !inferExternalType(c).isInstanceOf[ObjectType] => path

This would match when the type of field in Java class is primitive. c is not for type of column in Dataset but for Java class (and fields in class, with considering nested maybe?) we are creating Encoder. I guess you've got confusion here.

@srowen
Copy link
Member

srowen commented Feb 21, 2019

OK agree about null.

I'm still trying to understand why the change fixes it, though I believe it does. Is it accidental or actually solving the problem?

Before your change, which field has the wrong generated code -- what type is it? the original test case suggests the String field is the problem, not the primitive problem. But then I don't get where the problem is coming from.

Just trying to understand if we're actually understanding why it happens?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 21, 2019

Before your change, which field has the wrong generated code -- what type is it?

The field/column was ref: the type from ref column in Dataset was IntegerType and the type of ref field in Java class was String.

To be honest I have no idea this is intended to support (implicit) type conversions when deserializing, but any types to String would sound convenient, and not much weird. Without knowing history/intention I guess I can't explain more.

@HeartSaVioR
Copy link
Contributor Author

@cloud-fan Could you help putting some knowledge around this issue, since you look to be author of Javabean Encoder? Thanks in advance!

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 21, 2019

Looks like Encoder is not aware of the type of columns in Dataset: then I understand why it doesn't have strict type check on columns and fields in Java bean. Still not sure what's the intention: expect only StringType for String field so safe to call .toString() there, or allow any types for String field so take .toString() to work with various types.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'm getting this now. In the test case attached to the JIRA, the first column 'ref' is (correctly) inferred as IntegerType. The bean class has it as a String. That is arguably an error, but, it should be reasonable to make the assignment. Because it goes to a String, it is most correct to call String.valueOf, especially as that is consistent with how other types are handled and how the JDK API is set up, with valueOf methods for these boxed types and String. This should not change results, String.valueOf(Object) of course returns its .toString

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102598 has finished for PR 23854 at commit d869bba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final static class RecordSpark22000

@maropu
Copy link
Member

maropu commented Feb 22, 2019

btw, why don't we cast internal data into more suitable typed data before deserializing them into Bean instead of this weird type handling like int-> string in deserializer?

Probably, it seems we can add cast exprs before collecting data?
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3360

@srowen
Copy link
Member

srowen commented Feb 22, 2019

That's probably a better design; this at least fixes the current design and bug. WDYT about proceeding anyway?

@maropu
Copy link
Member

maropu commented Feb 22, 2019

Yea, sure. +1 for the fix in the current design. cc: @cloud-fan

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 22, 2019

Sure I think it sounds much better, but as a view of newcomer of Spark SQL it doesn't seem to be trivial.

Probably, it seems we can add cast exprs before collecting data?
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3360

I guess we also need to handle Dataset.rdd as well, and both path looks like going through CatalystSerde.deserialize to create DeserializeToObject. So I looked through how to deal with types of logical plan and deserializer, but deserializer has type of fields as JavaType (like ObjectType(java.lang.String) for String field) which we need to match and decide against Spark SQL type. Does Spark have some code to match Spark SQL DataTypes to Java ObjectTypes, or even handle cast?

@maropu
Copy link
Member

maropu commented Feb 22, 2019

We've already got SQL inferred types for java beans by JavaTypeInference.inferDataType, so I think its ok to just cast internal data into the inferred types somewhere.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 22, 2019

The thing is, inferred types are not propagated into deserializer (which is InitializeJavaBean). It only has setters which StaticInvoke is passed for the case of String field, and it doesn't have information which type is required, only have information for return type of StaticInvoke which is wrapped with ObjectType.
(So StaticInvoke - maybe Invoke too - doesn't expect specific DataType and accepts any DataType as parameter. The issue might be starting from here.)

So if we want to leverage inferred SQL DataType to avoid dealing with ObjectType, it may need to be also propagated in StaticInvoke (and also Invoke maybe) which also looks like non-trivial thing (the code change might be small but just not sure this is what we want).

@HeartSaVioR
Copy link
Contributor Author

Never mind. I realized encoderFor[T] has schema information. I'll try to apply the new approach.

@@ -235,9 +236,6 @@ object JavaTypeInference {
path :: Nil,
returnNullable = false)

case c if c == classOf[java.lang.String] =>
Invoke(path, "toString", ObjectType(classOf[String]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ScalaReflection does the same thing, do we have a problem there too?

AFAIK the path should be a string type column, and it's always safe to call UTF8String.toString. My gut feeling is, we miss to add Upcast somewhere in JavaTypeInference.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the path should be a string type column

The sample code in JIRA issue tried to bind IntegerType column to String field in Java bean, which looks to break your expectation. (I guess ScalaReflection would not encounter this case.)

Spark doesn't throw error for this case though - actually Spark would show undefined behaviors, compilation failures on generated code, even might be possible to throw runtime exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sample code tried to bind IntegerType column to String field in Java bean

In scala, we can also do this and Spark will add Upcast. e.g. spark.range(1).as[String].collect works fine.

I did a quick search and JavaTypeInference has no Upcast. We should fix it and follow ScalaReflection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK. I'll check and address it. Maybe it would be a separate PR if it doesn't fix the new test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah your suggestion seems to work nicely! I left comment to ask which approach to choose: please compare both approach and comment. Thanks!

@SparkQA
Copy link

SparkQA commented Feb 25, 2019

Test build #102751 has finished for PR 23854 at commit 4d564ab.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 25, 2019

Java style test may be a false alarm: failure is not due to violation but inaccessible of dtd link.
Just submitted the fix for this: #23887

@HeartSaVioR
Copy link
Contributor Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Feb 25, 2019

Test build #102762 has finished for PR 23854 at commit 4d564ab.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 26, 2019

Test build #102768 has finished for PR 23854 at commit e01bfe6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

private[spark] object DeserializerBuildHelper {
Copy link
Contributor

@cloud-fan cloud-fan Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: everything in catalyst is considered as private, we don't need the private[spark]


case t if t <:< localTypeOf[java.math.BigDecimal] =>
Invoke(path, "toJavaBigDecimal", ObjectType(classOf[java.math.BigDecimal]),
returnNullable = false)
createDeserializerForJavaBigDecimal(path, returnNullable = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mind taking some time to investigate it? Why the java side uses returnNullable = true but scala side uses returnNullable = false?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createDeserializerForJavaBigInteger, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we mind if we file a new issue which tracks the effort to investigate it? IMHO it looks like beyond this PR. and I'm seeing multiple PRs which keep adding requirements and suddenly reviewers lost interest so fail to reach conclusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea fine to do it in followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

/** Returns the current path with a field at ordinal extracted. */
def addToPathOrdinal(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only used in ScalaReflection? If so, how about moving this into there and make it private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be weird if we split out similar code (basically this only adds index handling on addToPath) to multiple places, but no strong opinion. WDYT? If you still feel better to move this to ScalaReflection, please let me know so that I can move it.

upCastToExpectedType(newPath, dataType, walkedTypePath)
}

def expressionWithNullSafety(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

case c if c == java.lang.Short.TYPE => "toShortArray"
case c if c == java.lang.Byte.TYPE => "toByteArray"
case c if c == java.lang.Boolean.TYPE => "toBooleanArray"
case other => throw new IllegalStateException("expect primitive array element type " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we don't allow non-nullable & non-primitive case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah my bad. It was for sync between ScalaReflection and JavaTypeInference and I realized it is not exactly same. I'll roll back the change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we don't have any test for this code path? (it seems the latest Jenkins test run passed though...)

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is very hard to cover all cases, especially in Java test we rely on defining actual bean class to verify. It would be nice to cover more cases if we could define (column type, field type) matrix and generate classes on runtime and leverage these classes into test. Maybe another JIRA issue for this?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu
Sorry to make a confusion. I've got confused while dealing with other things altogether. I wasn't wrong so the change was unnecessary, so I'm rolling back the change.
To answer the origin comment, non-nullable & non-primitive case => not possible according to the implementation of inferDataType. nullable = true for only primitive case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-nullable & non-primitive case => not possible

I agree with this, and I think the original code style is better, i.e.

val primitiveMethod = ...
val primitiveMethod.map {
  ...
}.getOrElse {
  ...
}

We can change the scala side to follow this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Would we keep mapFunction to deal with upcast? I guess primitiveMethod.map.getOrElse gets rid of necessary of if (elementNullable), but just confirm again whether we would want to go back to origin code which upcast was not there, or just take previous code style.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we definitely need to do upcast, just take the code style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarification! Just addressed.

@SparkQA
Copy link

SparkQA commented Feb 26, 2019

Test build #102782 has finished for PR 23854 at commit 8cbad26.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 26, 2019

Test build #102785 has finished for PR 23854 at commit 8cbad26.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

primitiveMethod.map { method =>
Invoke(path, method, ObjectType(c))
}.getOrElse {
Invoke(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between this branch and if (elementNullable)? Shall we follow the previous code? e.g.

val primitiveMethod = ...
primitiveMethod.map(...).getOrElse...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... sorry I think I was a bit confused. After looking into it again, the content before this commit seems correct and also in sync with ScalaReflection. I'll revert it back.
Btw, the difference between previous code is that new code handles upcast which ScalaReflection also handles it.

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102796 has finished for PR 23854 at commit 24a1b19.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in dea18ee Feb 27, 2019
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing, providing nice approaches, and finally merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-22000 branch February 27, 2019 05:55
cloud-fan pushed a commit that referenced this pull request Mar 4, 2019
…ction and JavaTypeInference

## What changes were proposed in this pull request?

This patch proposes refactoring `serializerFor` method between `ScalaReflection` and `JavaTypeInference`, being consistent with what we refactored for `deserializerFor` in #23854.

This patch also extracts the logic on recording walk type path since the logic is duplicated across `serializerFor` and `deserializerFor` with `ScalaReflection` and `JavaTypeInference`.

## How was this patch tested?

Existing tests.

Closes #23908 from HeartSaVioR/SPARK-27001.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants