Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SqlTypes Date and Timestamp (MicrosInstant) in AvroUtils #23969

Merged
merged 2 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
Expand All @@ -87,6 +89,7 @@
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
Expand All @@ -98,7 +101,42 @@
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

/** Utils to convert AVRO records to Beam rows. */
/**
* Utils to convert AVRO records to Beam rows. Imposes a mapping between common avro types and Beam
* portable schemas (https://s.apache.org/beam-schemas):
*
* <pre>
* Avro Beam Field Type
* INT <-----> INT32
* LONG <-----> INT64
* FLOAT <-----> FLOAT
* DOUBLE <-----> DOUBLE
* BOOLEAN <-----> BOOLEAN
* STRING <-----> STRING
* BYTES <-----> BYTES
* <------ LogicalType(urn="beam:logical_type:var_bytes:v1")
* FIXED <-----> LogicalType(urn="beam:logical_type:fixed_bytes:v1")
* ARRAY <-----> ARRAY
* ENUM <-----> LogicalType(EnumerationType)
* MAP <-----> MAP
* RECORD <-----> ROW
* UNION <-----> LogicalType(OneOfType)
* LogicalTypes.Date <-----> LogicalType(DATE)
* <------ LogicalType(urn="beam:logical_type:date:v1")
* LogicalTypes.TimestampMillis <-----> DATETIME
* LogicalTypes.TimestampMicros <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
* LogicalTypes.Decimal <-----> DECIMAL
* </pre>
*
* For SQL CHAR/VARCHAR types, an Avro schema
*
* <pre>
* LogicalType({"type":"string","logicalType":"char","maxLength":MAX_LENGTH}) or
* LogicalType({"type":"string","logicalType":"varchar","maxLength":MAX_LENGTH})
* </pre>
*
* is used.
*/
@Experimental(Kind.SCHEMAS)
@SuppressWarnings({
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -772,9 +810,11 @@ private static Schema.FieldType toFieldType(TypeWithNullability type) {
if (logicalType instanceof LogicalTypes.Decimal) {
fieldType = FieldType.DECIMAL;
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
// TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
// this is done, this logical type needs to be changed.
// TODO(https://github.com/apache/beam/issues/19215) DATETIME primitive will be removed when
// fully migrates to java.time lib from joda-time
fieldType = FieldType.DATETIME;
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece apparently can introduce breaking change. Previously TimestampMicros Avro Logical Type will return FieldType.INT64 and now it returns FieldType.logicalType(SqlTypes.TIMESTAMP). For modules not yet support this Beam logical type this will fail existing user code.

fieldType = FieldType.logicalType(SqlTypes.TIMESTAMP);
} else if (logicalType instanceof LogicalTypes.Date) {
fieldType = FieldType.DATETIME;
}
Expand Down Expand Up @@ -887,8 +927,8 @@ private static org.apache.avro.Schema getFieldSchema(
break;

case DATETIME:
// TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO actually resolved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new logical type is added (MicrosInstant) but the old field type is yet removed (DATETIME). The latter was attempt in #11456 but there was concern of breaking change. The removal of joda.time (DATETIME's input type) is aimed for Beam 3.0: #19215

Will add this back and link to #19215

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// this is done, this logical type needs to be changed.
// TODO(https://github.com/apache/beam/issues/19215) DATETIME primitive will be removed when
// fully migrates to java.time lib from joda-time
baseType =
LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG));
break;
Expand Down Expand Up @@ -933,10 +973,13 @@ private static org.apache.avro.Schema getFieldSchema(
oneOfType.getOneOfSchema().getFields().stream()
.map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
.collect(Collectors.toList()));
} else if ("DATE".equals(identifier)) {
} else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) {
baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
} else if ("TIME".equals(identifier)) {
baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
baseType =
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
} else {
throw new RuntimeException(
"Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
Expand Down Expand Up @@ -1061,9 +1104,17 @@ private static org.apache.avro.Schema getFieldSchema(
oneOfValue.getValue());
}
} else if ("DATE".equals(identifier)) {
// "Date" is backed by joda.time.Instant
return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays();
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
// portable SqlTypes.DATE is backed by java.time.LocalDate
return ((java.time.LocalDate) value).toEpochDay();
} else if ("TIME".equals(identifier)) {
// "TIME" is backed by joda.time.Instant
return (int) ((Instant) value).getMillis();
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
// portable SqlTypes.TIMESTAMP is backed by java.time.Instant
return getMicrosFromJavaInstant((java.time.Instant) value);
} else {
throw new RuntimeException("Unhandled logical type " + identifier);
}
Expand Down Expand Up @@ -1137,10 +1188,19 @@ private static org.apache.avro.Schema getFieldSchema(
} else {
return convertDateTimeStrict((Long) value, fieldType);
}
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
if (value instanceof java.time.Instant) {
return convertMicroMillisStrict(
getMicrosFromJavaInstant((java.time.Instant) value), fieldType);
} else {
return convertMicroMillisStrict((Long) value, fieldType);
}
} else if (logicalType instanceof LogicalTypes.Date) {
if (value instanceof ReadableInstant) {
int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
return convertDateStrict(epochDays, fieldType);
} else if (value instanceof java.time.LocalDate) {
return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType);
} else {
return convertDateStrict((Integer) value, fieldType);
}
Expand Down Expand Up @@ -1198,6 +1258,14 @@ private static org.apache.avro.Schema getFieldSchema(
}
}

/** Helper method to get epoch micros required by Avro TimeStampMicros logical type. */
@SuppressWarnings("JavaInstantGetSecondsGetNano")
@VisibleForTesting
static long getMicrosFromJavaInstant(java.time.Instant value) {
return TimeUnit.SECONDS.toMicros(value.getEpochSecond())
+ TimeUnit.NANOSECONDS.toMicros(value.getNano());
}

private static Object convertRecordStrict(GenericRecord record, Schema.FieldType fieldType) {
checkTypeName(fieldType.getTypeName(), Schema.TypeName.ROW, "record");
return toBeamRowStrict(record, fieldType.getRowSchema());
Expand Down Expand Up @@ -1247,6 +1315,17 @@ private static Object convertDateTimeStrict(Long value, Schema.FieldType fieldTy
return new Instant(value);
}

private static Object convertMicroMillisStrict(Long value, Schema.FieldType fieldType) {
checkTypeName(
fieldType.getTypeName(), TypeName.LOGICAL_TYPE, SqlTypes.TIMESTAMP.getIdentifier());
checkArgument(
fieldType.getLogicalType().getIdentifier().equals(SqlTypes.TIMESTAMP.getIdentifier()));

return java.time.Instant.ofEpochSecond(
TimeUnit.MICROSECONDS.toSeconds(value),
TimeUnit.MICROSECONDS.toNanos(Math.floorMod(value, TimeUnit.SECONDS.toMicros(1))));
}

private static Object convertFloatStrict(Float value, Schema.FieldType fieldType) {
checkTypeName(fieldType.getTypeName(), Schema.TypeName.FLOAT, "float");
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.sql.JDBCType;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
Expand All @@ -46,6 +47,7 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.utils.AvroGenerators.RecordSchemaGenerator;
import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
import org.apache.beam.sdk.testing.CoderProperties;
Expand Down Expand Up @@ -229,6 +231,12 @@ private static org.apache.avro.Schema getAvroSchema() {
LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
"",
(Object) null));
fields.add(
new org.apache.avro.Schema.Field(
"timestampMicros",
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
"",
(Object) null));
fields.add(new org.apache.avro.Schema.Field("row", getAvroSubSchema("row"), "", (Object) null));
fields.add(
new org.apache.avro.Schema.Field(
Expand Down Expand Up @@ -261,6 +269,7 @@ private Schema getBeamSchema() {
.addField(Field.of("bytes", FieldType.BYTES))
.addField(Field.of("decimal", FieldType.DECIMAL))
.addField(Field.of("timestampMillis", FieldType.DATETIME))
.addField(Field.of("timestampMicros", FieldType.logicalType(SqlTypes.TIMESTAMP)))
.addField(Field.of("row", FieldType.row(subSchema)))
.addField(Field.of("array", FieldType.array(FieldType.row(subSchema))))
.addField(Field.of("map", FieldType.map(FieldType.STRING, FieldType.row(subSchema))))
Expand All @@ -270,6 +279,9 @@ private Schema getBeamSchema() {
private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4};
private static final DateTime DATE_TIME =
new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC);
private static final java.time.Instant MICROS_INSTANT =
java.time.Instant.ofEpochMilli(DATE_TIME.getMillis())
.plusNanos(TimeUnit.MICROSECONDS.toNanos(123));
private static final BigDecimal BIG_DECIMAL = new BigDecimal(3600);

private Row getBeamRow() {
Expand All @@ -284,6 +296,7 @@ private Row getBeamRow() {
.addValue(BYTE_ARRAY)
.addValue(BIG_DECIMAL)
.addValue(DATE_TIME)
.addValue(MICROS_INSTANT)
.addValue(subRow)
.addValue(ImmutableList.of(subRow, subRow))
.addValue(ImmutableMap.of("k1", subRow, "k2", subRow))
Expand Down Expand Up @@ -316,6 +329,7 @@ private static GenericRecord getGenericRecord() {
.set("bytes", ByteBuffer.wrap(BYTE_ARRAY))
.set("decimal", encodedDecimal)
.set("timestampMillis", DATE_TIME.getMillis())
.set("timestampMicros", AvroUtils.getMicrosFromJavaInstant(MICROS_INSTANT))
.set("row", getSubGenericRecord("row"))
.set("array", ImmutableList.of(getSubGenericRecord("array"), getSubGenericRecord("array")))
.set(
Expand Down Expand Up @@ -714,6 +728,25 @@ public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() {
assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
}

@Test
public void testSqlTypesToGenericRecord() {
// SqlTypes to LogicalTypes.date conversion are one direction
java.time.LocalDate localDate = java.time.LocalDate.of(1979, 3, 14);

Schema beamSchema =
Schema.builder()
.addField(Field.of("local_date", FieldType.logicalType(SqlTypes.DATE)))
.build();

Row rowData = Row.withSchema(beamSchema).addValue(localDate).build();

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
GenericRecord expectedRecord =
new GenericRecordBuilder(avroSchema).set("local_date", localDate.toEpochDay()).build();

assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
}

@Test
public void testBeamRowToGenericRecord() {
GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null);
Expand Down