Skip to content

Commit

Permalink
Support SqlTypes Date and Timestamp (MicrosInstant) in AvroUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Nov 3, 2022
1 parent ef7c0c9 commit 31eae68
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
Expand All @@ -87,6 +89,7 @@
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
Expand All @@ -98,7 +101,42 @@
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

/** Utils to convert AVRO records to Beam rows. */
/**
* Utils to convert AVRO records to Beam rows. Imposes a mapping between common avro types and Beam
* portable schemas (https://s.apache.org/beam-schemas):
*
* <pre>
* Avro Beam Field Type
* INT <-----> INT32
* LONG <-----> INT64
* FLOAT <-----> FLOAT
* DOUBLE <-----> DOUBLE
* BOOLEAN <-----> BOOLEAN
* STRING <-----> STRING
* BYTES <-----> BYTES
* <------ LogicalType(urn="beam:logical_type:var_bytes:v1")
* FIXED <-----> LogicalType(urn="beam:logical_type:fixed_bytes:v1")
* ARRAY <-----> ARRAY
* ENUM <-----> LogicalType(EnumerationType)
* MAP <-----> MAP
* RECORD <-----> ROW
* UNION <-----> LogicalType(OneOfType)
* LogicalTypes.Date <-----> LogicalType(DATE)
* <------ LogicalType(urn="beam:logical_type:date:v1")
* LogicalTypes.TimestampMillis <-----> DATETIME
* LogicalTypes.TimestampMicros <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
* LogicalTypes.Decimal <-----> DECIMAL
* </pre>
*
* For SQL CHAR/VARCHAR types, an Avro schema
*
* <pre>
* LogicalType({"type":"string","logicalType":"char","maxLength":MAX_LENGTH}) or
* LogicalType({"type":"string","logicalType":"varchar","maxLength":MAX_LENGTH})
* </pre>
*
* is used.
*/
@Experimental(Kind.SCHEMAS)
@SuppressWarnings({
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -772,9 +810,9 @@ private static Schema.FieldType toFieldType(TypeWithNullability type) {
if (logicalType instanceof LogicalTypes.Decimal) {
fieldType = FieldType.DECIMAL;
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
// TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
// this is done, this logical type needs to be changed.
fieldType = FieldType.DATETIME;
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
fieldType = FieldType.logicalType(SqlTypes.TIMESTAMP);
} else if (logicalType instanceof LogicalTypes.Date) {
fieldType = FieldType.DATETIME;
}
Expand Down Expand Up @@ -887,8 +925,6 @@ private static org.apache.avro.Schema getFieldSchema(
break;

case DATETIME:
// TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
// this is done, this logical type needs to be changed.
baseType =
LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG));
break;
Expand Down Expand Up @@ -933,10 +969,13 @@ private static org.apache.avro.Schema getFieldSchema(
oneOfType.getOneOfSchema().getFields().stream()
.map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
.collect(Collectors.toList()));
} else if ("DATE".equals(identifier)) {
} else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) {
baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
} else if ("TIME".equals(identifier)) {
baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
baseType =
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
} else {
throw new RuntimeException(
"Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
Expand Down Expand Up @@ -1061,9 +1100,17 @@ private static org.apache.avro.Schema getFieldSchema(
oneOfValue.getValue());
}
} else if ("DATE".equals(identifier)) {
// "Date" is backed by joda.time.Instant
return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays();
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
// portable SqlTypes.DATE is backed by java.time.LocalDate
return ((java.time.LocalDate) value).toEpochDay();
} else if ("TIME".equals(identifier)) {
// "TIME" is backed by joda.time.Instant
return (int) ((Instant) value).getMillis();
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
// portable SqlTypes.TIMESTAMP is backed by java.time.Instant
return getMicrosFromJavaInstant((java.time.Instant) value);
} else {
throw new RuntimeException("Unhandled logical type " + identifier);
}
Expand Down Expand Up @@ -1137,10 +1184,19 @@ private static org.apache.avro.Schema getFieldSchema(
} else {
return convertDateTimeStrict((Long) value, fieldType);
}
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
if (value instanceof java.time.Instant) {
return convertMicroMillisStrict(
getMicrosFromJavaInstant((java.time.Instant) value), fieldType);
} else {
return convertMicroMillisStrict((Long) value, fieldType);
}
} else if (logicalType instanceof LogicalTypes.Date) {
if (value instanceof ReadableInstant) {
int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
return convertDateStrict(epochDays, fieldType);
} else if (value instanceof java.time.LocalDate) {
return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType);
} else {
return convertDateStrict((Integer) value, fieldType);
}
Expand Down Expand Up @@ -1198,6 +1254,14 @@ private static org.apache.avro.Schema getFieldSchema(
}
}

/** Helper method to get epoch micros required by Avro TimeStampMicros logical type. */
@SuppressWarnings("JavaInstantGetSecondsGetNano")
@VisibleForTesting
static long getMicrosFromJavaInstant(java.time.Instant value) {
return TimeUnit.SECONDS.toMicros(value.getEpochSecond())
+ TimeUnit.NANOSECONDS.toMicros(value.getNano());
}

private static Object convertRecordStrict(GenericRecord record, Schema.FieldType fieldType) {
checkTypeName(fieldType.getTypeName(), Schema.TypeName.ROW, "record");
return toBeamRowStrict(record, fieldType.getRowSchema());
Expand Down Expand Up @@ -1247,6 +1311,17 @@ private static Object convertDateTimeStrict(Long value, Schema.FieldType fieldTy
return new Instant(value);
}

private static Object convertMicroMillisStrict(Long value, Schema.FieldType fieldType) {
checkTypeName(
fieldType.getTypeName(), TypeName.LOGICAL_TYPE, SqlTypes.TIMESTAMP.getIdentifier());
checkArgument(
fieldType.getLogicalType().getIdentifier().equals(SqlTypes.TIMESTAMP.getIdentifier()));

return java.time.Instant.ofEpochSecond(
TimeUnit.MICROSECONDS.toSeconds(value),
TimeUnit.MICROSECONDS.toNanos(Math.floorMod(value, TimeUnit.SECONDS.toMicros(1))));
}

private static Object convertFloatStrict(Float value, Schema.FieldType fieldType) {
checkTypeName(fieldType.getTypeName(), Schema.TypeName.FLOAT, "float");
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.sql.JDBCType;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
Expand All @@ -46,6 +47,7 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.utils.AvroGenerators.RecordSchemaGenerator;
import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
import org.apache.beam.sdk.testing.CoderProperties;
Expand Down Expand Up @@ -229,6 +231,12 @@ private static org.apache.avro.Schema getAvroSchema() {
LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
"",
(Object) null));
fields.add(
new org.apache.avro.Schema.Field(
"timestampMicros",
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
"",
(Object) null));
fields.add(new org.apache.avro.Schema.Field("row", getAvroSubSchema("row"), "", (Object) null));
fields.add(
new org.apache.avro.Schema.Field(
Expand Down Expand Up @@ -261,6 +269,7 @@ private Schema getBeamSchema() {
.addField(Field.of("bytes", FieldType.BYTES))
.addField(Field.of("decimal", FieldType.DECIMAL))
.addField(Field.of("timestampMillis", FieldType.DATETIME))
.addField(Field.of("timestampMicros", FieldType.logicalType(SqlTypes.TIMESTAMP)))
.addField(Field.of("row", FieldType.row(subSchema)))
.addField(Field.of("array", FieldType.array(FieldType.row(subSchema))))
.addField(Field.of("map", FieldType.map(FieldType.STRING, FieldType.row(subSchema))))
Expand All @@ -270,6 +279,9 @@ private Schema getBeamSchema() {
private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4};
private static final DateTime DATE_TIME =
new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC);
private static final java.time.Instant MICROS_INSTANT =
java.time.Instant.ofEpochMilli(DATE_TIME.getMillis())
.plusNanos(TimeUnit.MICROSECONDS.toNanos(123));
private static final BigDecimal BIG_DECIMAL = new BigDecimal(3600);

private Row getBeamRow() {
Expand All @@ -284,6 +296,7 @@ private Row getBeamRow() {
.addValue(BYTE_ARRAY)
.addValue(BIG_DECIMAL)
.addValue(DATE_TIME)
.addValue(MICROS_INSTANT)
.addValue(subRow)
.addValue(ImmutableList.of(subRow, subRow))
.addValue(ImmutableMap.of("k1", subRow, "k2", subRow))
Expand Down Expand Up @@ -316,6 +329,7 @@ private static GenericRecord getGenericRecord() {
.set("bytes", ByteBuffer.wrap(BYTE_ARRAY))
.set("decimal", encodedDecimal)
.set("timestampMillis", DATE_TIME.getMillis())
.set("timestampMicros", AvroUtils.getMicrosFromJavaInstant(MICROS_INSTANT))
.set("row", getSubGenericRecord("row"))
.set("array", ImmutableList.of(getSubGenericRecord("array"), getSubGenericRecord("array")))
.set(
Expand Down Expand Up @@ -714,6 +728,25 @@ public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() {
assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
}

@Test
public void testSqlTypesToGenericRecord() {
// SqlTypes to LogicalTypes.date conversion are one direction
java.time.LocalDate localDate = java.time.LocalDate.of(1979, 3, 14);

Schema beamSchema =
Schema.builder()
.addField(Field.of("local_date", FieldType.logicalType(SqlTypes.DATE)))
.build();

Row rowData = Row.withSchema(beamSchema).addValue(localDate).build();

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
GenericRecord expectedRecord =
new GenericRecordBuilder(avroSchema).set("local_date", localDate.toEpochDay()).build();

assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
}

@Test
public void testBeamRowToGenericRecord() {
GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null);
Expand Down

0 comments on commit 31eae68

Please sign in to comment.