Skip to content

Commit

Permalink
Address review comments from maropu
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Feb 21, 2019
1 parent acd5d55 commit 443e74f
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@
import java.io.Serializable;
import java.util.*;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRowConverterSuite;
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import org.junit.*;

import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -115,35 +124,68 @@ public void testBeanWithMapFieldsDeserialization() {
Assert.assertEquals(records, MAP_RECORDS);
}

private static final List<Row> ROWS_SPARK_22000 = new ArrayList<>();
private static final List<RecordSpark22000> RECORDS_SPARK_22000 = new ArrayList<>();

private static Row createRecordSpark22000Row(Long index) {
Object[] values = new Object[] {
index.shortValue(),
index.intValue(),
index,
index.floatValue(),
index.doubleValue(),
String.valueOf(index),
index % 2 == 0,
new java.sql.Timestamp(System.currentTimeMillis())
};
return new GenericRow(values);
}

private static RecordSpark22000 createRecordSpark22000(Row recordRow) {
RecordSpark22000 record = new RecordSpark22000();
record.setShortField(String.valueOf(recordRow.getShort(0)));
record.setIntField(String.valueOf(recordRow.getInt(1)));
record.setLongField(String.valueOf(recordRow.getLong(2)));
record.setFloatField(String.valueOf(recordRow.getFloat(3)));
record.setDoubleField(String.valueOf(recordRow.getDouble(4)));
record.setStringField(recordRow.getString(5));
record.setBooleanField(String.valueOf(recordRow.getBoolean(6)));
record.setTimestampField(String.valueOf(recordRow.getTimestamp(7).getTime() * 1000));
return record;
}

static {
RECORDS_SPARK_22000.add(new RecordSpark22000("1", "j123@aaa.com", 2, 11));
RECORDS_SPARK_22000.add(new RecordSpark22000("2", "j123@aaa.com", 3, 12));
RECORDS_SPARK_22000.add(new RecordSpark22000("3", "j123@aaa.com", 4, 13));
RECORDS_SPARK_22000.add(new RecordSpark22000("4", "j123@aaa.com", 5, 14));
for (long idx = 0 ; idx < 5 ; idx++) {
Row row = createRecordSpark22000Row(idx);
ROWS_SPARK_22000.add(row);
RECORDS_SPARK_22000.add(createRecordSpark22000(row));
}
}

@Test
public void testSpark22000() {
// Here we try to convert the type of 'ref' field, from integer to string.
// Here we try to convert the fields, from any types to string.
// Before applying SPARK-22000, Spark called toString() against variable which type might be primitive.
// SPARK-22000 it calls String.valueOf() which finally calls toString() but handles boxing
// if the type is primitive.
Encoder<RecordSpark22000> encoder = Encoders.bean(RecordSpark22000.class);

Dataset<RecordSpark22000> dataset = spark
.read()
.format("csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.schema("ref int, userId string, x int, y int")
.load("src/test/resources/test-data/spark-22000.csv")
.as(encoder);
StructType schema = new StructType()
.add("shortField", DataTypes.ShortType)
.add("intField", DataTypes.IntegerType)
.add("longField", DataTypes.LongType)
.add("floatField", DataTypes.FloatType)
.add("doubleField", DataTypes.DoubleType)
.add("stringField", DataTypes.StringType)
.add("booleanField", DataTypes.BooleanType)
.add("timestampField", DataTypes.TimestampType);

Dataset<Row> dataFrame = spark.createDataFrame(ROWS_SPARK_22000, schema);
Dataset<RecordSpark22000> dataset = dataFrame.as(encoder);

List<RecordSpark22000> records = dataset.collectAsList();

Assert.assertEquals(records, RECORDS_SPARK_22000);
Assert.assertEquals(RECORDS_SPARK_22000, records);
}

public static class ArrayRecord {
Expand Down Expand Up @@ -285,71 +327,114 @@ public String toString() {
}

public static class RecordSpark22000 {
private String ref;
private String userId;
private int x;
private int y;
private String shortField;
private String intField;
private String longField;
private String floatField;
private String doubleField;
private String stringField;
private String booleanField;
private String timestampField;

public RecordSpark22000() { }

RecordSpark22000(String ref, String userId, int x, int y) {
this.ref = ref;
this.userId = userId;
this.x = x;
this.y = y;
public String getShortField() {
return shortField;
}

public void setShortField(String shortField) {
this.shortField = shortField;
}

public String getIntField() {
return intField;
}

public void setIntField(String intField) {
this.intField = intField;
}

public String getLongField() {
return longField;
}

public void setLongField(String longField) {
this.longField = longField;
}

public String getFloatField() {
return floatField;
}

public void setFloatField(String floatField) {
this.floatField = floatField;
}

public String getRef() {
return ref;
public String getDoubleField() {
return doubleField;
}

public void setRef(String ref) {
this.ref = ref;
public void setDoubleField(String doubleField) {
this.doubleField = doubleField;
}

public String getUserId() {
return userId;
public String getStringField() {
return stringField;
}

public void setUserId(String userId) {
this.userId = userId;
public void setStringField(String stringField) {
this.stringField = stringField;
}

public int getX() {
return x;
public String getBooleanField() {
return booleanField;
}

public void setX(int x) {
this.x = x;
public void setBooleanField(String booleanField) {
this.booleanField = booleanField;
}

public int getY() {
return y;
public String getTimestampField() {
return timestampField;
}

public void setY(int y) {
this.y = y;
public void setTimestampField(String timestampField) {
this.timestampField = timestampField;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RecordSpark22000 that = (RecordSpark22000) o;
return x == that.x &&
y == that.y &&
Objects.equals(ref, that.ref) &&
Objects.equals(userId, that.userId);
return Objects.equals(shortField, that.shortField) &&
Objects.equals(intField, that.intField) &&
Objects.equals(longField, that.longField) &&
Objects.equals(floatField, that.floatField) &&
Objects.equals(doubleField, that.doubleField) &&
Objects.equals(stringField, that.stringField) &&
Objects.equals(booleanField, that.booleanField) &&
Objects.equals(timestampField, that.timestampField);
}

@Override
public int hashCode() {
return Objects.hash(ref, userId, x, y);
return Objects.hash(shortField, intField, longField, floatField, doubleField, stringField,
booleanField, timestampField);
}

@Override
public String toString() {
return String.format("ref='%s', userId='%s', x=%d, y=%d", ref, userId, x, y);
return com.google.common.base.Objects.toStringHelper(this)
.add("shortField", shortField)
.add("intField", intField)
.add("longField", longField)
.add("floatField", floatField)
.add("doubleField", doubleField)
.add("stringField", stringField)
.add("booleanField", booleanField)
.add("timestampField", timestampField)
.toString();
}
}
}
5 changes: 0 additions & 5 deletions sql/core/src/test/resources/test-data/spark-22000.csv

This file was deleted.

0 comments on commit 443e74f

Please sign in to comment.