From 443e74f207d4869d11944f4d95df97f5bc40e6bc Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 21 Feb 2019 23:03:13 +0900 Subject: [PATCH] Address review comments from maropu --- .../sql/JavaBeanDeserializationSuite.java | 175 +++++++++++++----- .../test/resources/test-data/spark-22000.csv | 5 - 2 files changed, 130 insertions(+), 50 deletions(-) delete mode 100755 sql/core/src/test/resources/test-data/spark-22000.csv diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java index ae68c7aa63613..09426e1cf0d73 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java @@ -20,6 +20,15 @@ import java.io.Serializable; import java.util.*; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRowConverterSuite; +import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; import org.junit.*; import org.apache.spark.sql.Dataset; @@ -115,35 +124,68 @@ public void testBeanWithMapFieldsDeserialization() { Assert.assertEquals(records, MAP_RECORDS); } + private static final List ROWS_SPARK_22000 = new ArrayList<>(); private static final List RECORDS_SPARK_22000 = new ArrayList<>(); + private static Row createRecordSpark22000Row(Long index) { + Object[] values = new Object[] { + index.shortValue(), + index.intValue(), + index, + index.floatValue(), + index.doubleValue(), + String.valueOf(index), + index % 2 == 0, + new java.sql.Timestamp(System.currentTimeMillis()) + }; + return new GenericRow(values); + } + + private static RecordSpark22000 createRecordSpark22000(Row recordRow) { + RecordSpark22000 record = new RecordSpark22000(); + record.setShortField(String.valueOf(recordRow.getShort(0))); + record.setIntField(String.valueOf(recordRow.getInt(1))); + record.setLongField(String.valueOf(recordRow.getLong(2))); + record.setFloatField(String.valueOf(recordRow.getFloat(3))); + record.setDoubleField(String.valueOf(recordRow.getDouble(4))); + record.setStringField(recordRow.getString(5)); + record.setBooleanField(String.valueOf(recordRow.getBoolean(6))); + record.setTimestampField(String.valueOf(recordRow.getTimestamp(7).getTime() * 1000)); + return record; + } + static { - RECORDS_SPARK_22000.add(new RecordSpark22000("1", "j123@aaa.com", 2, 11)); - RECORDS_SPARK_22000.add(new RecordSpark22000("2", "j123@aaa.com", 3, 12)); - RECORDS_SPARK_22000.add(new RecordSpark22000("3", "j123@aaa.com", 4, 13)); - RECORDS_SPARK_22000.add(new RecordSpark22000("4", "j123@aaa.com", 5, 14)); + for (long idx = 0 ; idx < 5 ; idx++) { + Row row = createRecordSpark22000Row(idx); + ROWS_SPARK_22000.add(row); + RECORDS_SPARK_22000.add(createRecordSpark22000(row)); + } } @Test public void testSpark22000() { - // Here we try to convert the type of 'ref' field, from integer to string. + // Here we try to convert the fields, from any types to string. // Before applying SPARK-22000, Spark called toString() against variable which type might be primitive. // SPARK-22000 it calls String.valueOf() which finally calls toString() but handles boxing // if the type is primitive. Encoder encoder = Encoders.bean(RecordSpark22000.class); - Dataset dataset = spark - .read() - .format("csv") - .option("header", "true") - .option("mode", "DROPMALFORMED") - .schema("ref int, userId string, x int, y int") - .load("src/test/resources/test-data/spark-22000.csv") - .as(encoder); + StructType schema = new StructType() + .add("shortField", DataTypes.ShortType) + .add("intField", DataTypes.IntegerType) + .add("longField", DataTypes.LongType) + .add("floatField", DataTypes.FloatType) + .add("doubleField", DataTypes.DoubleType) + .add("stringField", DataTypes.StringType) + .add("booleanField", DataTypes.BooleanType) + .add("timestampField", DataTypes.TimestampType); + + Dataset dataFrame = spark.createDataFrame(ROWS_SPARK_22000, schema); + Dataset dataset = dataFrame.as(encoder); List records = dataset.collectAsList(); - Assert.assertEquals(records, RECORDS_SPARK_22000); + Assert.assertEquals(RECORDS_SPARK_22000, records); } public static class ArrayRecord { @@ -285,50 +327,79 @@ public String toString() { } public static class RecordSpark22000 { - private String ref; - private String userId; - private int x; - private int y; + private String shortField; + private String intField; + private String longField; + private String floatField; + private String doubleField; + private String stringField; + private String booleanField; + private String timestampField; public RecordSpark22000() { } - RecordSpark22000(String ref, String userId, int x, int y) { - this.ref = ref; - this.userId = userId; - this.x = x; - this.y = y; + public String getShortField() { + return shortField; + } + + public void setShortField(String shortField) { + this.shortField = shortField; + } + + public String getIntField() { + return intField; + } + + public void setIntField(String intField) { + this.intField = intField; + } + + public String getLongField() { + return longField; + } + + public void setLongField(String longField) { + this.longField = longField; + } + + public String getFloatField() { + return floatField; + } + + public void setFloatField(String floatField) { + this.floatField = floatField; } - public String getRef() { - return ref; + public String getDoubleField() { + return doubleField; } - public void setRef(String ref) { - this.ref = ref; + public void setDoubleField(String doubleField) { + this.doubleField = doubleField; } - public String getUserId() { - return userId; + public String getStringField() { + return stringField; } - public void setUserId(String userId) { - this.userId = userId; + public void setStringField(String stringField) { + this.stringField = stringField; } - public int getX() { - return x; + public String getBooleanField() { + return booleanField; } - public void setX(int x) { - this.x = x; + public void setBooleanField(String booleanField) { + this.booleanField = booleanField; } - public int getY() { - return y; + public String getTimestampField() { + return timestampField; } - public void setY(int y) { - this.y = y; + public void setTimestampField(String timestampField) { + this.timestampField = timestampField; } @Override @@ -336,20 +407,34 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RecordSpark22000 that = (RecordSpark22000) o; - return x == that.x && - y == that.y && - Objects.equals(ref, that.ref) && - Objects.equals(userId, that.userId); + return Objects.equals(shortField, that.shortField) && + Objects.equals(intField, that.intField) && + Objects.equals(longField, that.longField) && + Objects.equals(floatField, that.floatField) && + Objects.equals(doubleField, that.doubleField) && + Objects.equals(stringField, that.stringField) && + Objects.equals(booleanField, that.booleanField) && + Objects.equals(timestampField, that.timestampField); } @Override public int hashCode() { - return Objects.hash(ref, userId, x, y); + return Objects.hash(shortField, intField, longField, floatField, doubleField, stringField, + booleanField, timestampField); } @Override public String toString() { - return String.format("ref='%s', userId='%s', x=%d, y=%d", ref, userId, x, y); + return com.google.common.base.Objects.toStringHelper(this) + .add("shortField", shortField) + .add("intField", intField) + .add("longField", longField) + .add("floatField", floatField) + .add("doubleField", doubleField) + .add("stringField", stringField) + .add("booleanField", booleanField) + .add("timestampField", timestampField) + .toString(); } } } diff --git a/sql/core/src/test/resources/test-data/spark-22000.csv b/sql/core/src/test/resources/test-data/spark-22000.csv deleted file mode 100755 index 06deb6f293352..0000000000000 --- a/sql/core/src/test/resources/test-data/spark-22000.csv +++ /dev/null @@ -1,5 +0,0 @@ -ref,userId,x,y -1,j123@aaa.com,2,11 -2,j123@aaa.com,3,12 -3,j123@aaa.com,4,13 -4,j123@aaa.com,5,14 \ No newline at end of file