From 14a1c9eafa610f9411843725f16447b3699dc04e Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 2 Dec 2022 08:42:34 -0500 Subject: [PATCH 1/2] Revert "Support SqlTypes Date and Timestamp (MicrosInstant) in AvroUtils (#23969)" This reverts commit d5d76b974592d45de368ab641647ca5cc4ec12ec. --- .../beam/sdk/schemas/utils/AvroUtils.java | 91 ++----------------- .../beam/sdk/schemas/utils/AvroUtilsTest.java | 33 ------- 2 files changed, 6 insertions(+), 118 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index e61dbe505e22c..8d01ed0406a0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nonnull; import net.bytebuddy.description.type.TypeDescription.ForLoadedType; @@ -77,7 +76,6 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; -import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType; @@ -89,7 +87,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -101,42 +98,7 @@ import org.joda.time.Instant; import org.joda.time.ReadableInstant; -/** - * Utils to convert AVRO records to Beam rows. Imposes a mapping between common avro types and Beam - * portable schemas (https://s.apache.org/beam-schemas): - * - *
- *   Avro                Beam Field Type
- *   INT         <-----> INT32
- *   LONG        <-----> INT64
- *   FLOAT       <-----> FLOAT
- *   DOUBLE      <-----> DOUBLE
- *   BOOLEAN     <-----> BOOLEAN
- *   STRING      <-----> STRING
- *   BYTES       <-----> BYTES
- *               <------ LogicalType(urn="beam:logical_type:var_bytes:v1")
- *   FIXED       <-----> LogicalType(urn="beam:logical_type:fixed_bytes:v1")
- *   ARRAY       <-----> ARRAY
- *   ENUM        <-----> LogicalType(EnumerationType)
- *   MAP         <-----> MAP
- *   RECORD      <-----> ROW
- *   UNION       <-----> LogicalType(OneOfType)
- *   LogicalTypes.Date              <-----> LogicalType(DATE)
- *                                  <------ LogicalType(urn="beam:logical_type:date:v1")
- *   LogicalTypes.TimestampMillis   <-----> DATETIME
- *   LogicalTypes.TimestampMicros   <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
- *   LogicalTypes.Decimal           <-----> DECIMAL
- * 
- * - * For SQL CHAR/VARCHAR types, an Avro schema - * - *
- *   LogicalType({"type":"string","logicalType":"char","maxLength":MAX_LENGTH}) or
- *   LogicalType({"type":"string","logicalType":"varchar","maxLength":MAX_LENGTH})
- * 
- * - * is used. - */ +/** Utils to convert AVRO records to Beam rows. */ @Experimental(Kind.SCHEMAS) @SuppressWarnings({ "nullness", // TODO(https://github.com/apache/beam/issues/20497) @@ -810,11 +772,9 @@ private static Schema.FieldType toFieldType(TypeWithNullability type) { if (logicalType instanceof LogicalTypes.Decimal) { fieldType = FieldType.DECIMAL; } else if (logicalType instanceof LogicalTypes.TimestampMillis) { - // TODO(https://github.com/apache/beam/issues/19215) DATETIME primitive will be removed when - // fully migrates to java.time lib from joda-time + // TODO: There is a desire to move Beam schema DATETIME to a micros representation. When + // this is done, this logical type needs to be changed. fieldType = FieldType.DATETIME; - } else if (logicalType instanceof LogicalTypes.TimestampMicros) { - fieldType = FieldType.logicalType(SqlTypes.TIMESTAMP); } else if (logicalType instanceof LogicalTypes.Date) { fieldType = FieldType.DATETIME; } @@ -927,8 +887,8 @@ private static org.apache.avro.Schema getFieldSchema( break; case DATETIME: - // TODO(https://github.com/apache/beam/issues/19215) DATETIME primitive will be removed when - // fully migrates to java.time lib from joda-time + // TODO: There is a desire to move Beam schema DATETIME to a micros representation. When + // this is done, this logical type needs to be changed. baseType = LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG)); break; @@ -973,13 +933,10 @@ private static org.apache.avro.Schema getFieldSchema( oneOfType.getOneOfSchema().getFields().stream() .map(x -> getFieldSchema(x.getType(), x.getName(), namespace)) .collect(Collectors.toList())); - } else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) { + } else if ("DATE".equals(identifier)) { baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT)); } else if ("TIME".equals(identifier)) { baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT)); - } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) { - baseType = - LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)); } else { throw new RuntimeException( "Unhandled logical type " + fieldType.getLogicalType().getIdentifier()); @@ -1104,17 +1061,9 @@ private static org.apache.avro.Schema getFieldSchema( oneOfValue.getValue()); } } else if ("DATE".equals(identifier)) { - // "Date" is backed by joda.time.Instant return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays(); - } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { - // portable SqlTypes.DATE is backed by java.time.LocalDate - return ((java.time.LocalDate) value).toEpochDay(); } else if ("TIME".equals(identifier)) { - // "TIME" is backed by joda.time.Instant return (int) ((Instant) value).getMillis(); - } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) { - // portable SqlTypes.TIMESTAMP is backed by java.time.Instant - return getMicrosFromJavaInstant((java.time.Instant) value); } else { throw new RuntimeException("Unhandled logical type " + identifier); } @@ -1188,19 +1137,10 @@ private static org.apache.avro.Schema getFieldSchema( } else { return convertDateTimeStrict((Long) value, fieldType); } - } else if (logicalType instanceof LogicalTypes.TimestampMicros) { - if (value instanceof java.time.Instant) { - return convertMicroMillisStrict( - getMicrosFromJavaInstant((java.time.Instant) value), fieldType); - } else { - return convertMicroMillisStrict((Long) value, fieldType); - } } else if (logicalType instanceof LogicalTypes.Date) { if (value instanceof ReadableInstant) { int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays(); return convertDateStrict(epochDays, fieldType); - } else if (value instanceof java.time.LocalDate) { - return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType); } else { return convertDateStrict((Integer) value, fieldType); } @@ -1258,14 +1198,6 @@ private static org.apache.avro.Schema getFieldSchema( } } - /** Helper method to get epoch micros required by Avro TimeStampMicros logical type. */ - @SuppressWarnings("JavaInstantGetSecondsGetNano") - @VisibleForTesting - static long getMicrosFromJavaInstant(java.time.Instant value) { - return TimeUnit.SECONDS.toMicros(value.getEpochSecond()) - + TimeUnit.NANOSECONDS.toMicros(value.getNano()); - } - private static Object convertRecordStrict(GenericRecord record, Schema.FieldType fieldType) { checkTypeName(fieldType.getTypeName(), Schema.TypeName.ROW, "record"); return toBeamRowStrict(record, fieldType.getRowSchema()); @@ -1315,17 +1247,6 @@ private static Object convertDateTimeStrict(Long value, Schema.FieldType fieldTy return new Instant(value); } - private static Object convertMicroMillisStrict(Long value, Schema.FieldType fieldType) { - checkTypeName( - fieldType.getTypeName(), TypeName.LOGICAL_TYPE, SqlTypes.TIMESTAMP.getIdentifier()); - checkArgument( - fieldType.getLogicalType().getIdentifier().equals(SqlTypes.TIMESTAMP.getIdentifier())); - - return java.time.Instant.ofEpochSecond( - TimeUnit.MICROSECONDS.toSeconds(value), - TimeUnit.MICROSECONDS.toNanos(Math.floorMod(value, TimeUnit.SECONDS.toMicros(1)))); - } - private static Object convertFloatStrict(Float value, Schema.FieldType fieldType) { checkTypeName(fieldType.getTypeName(), Schema.TypeName.FLOAT, "float"); return value; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java index 0605aa6fd1cca..3087959c1e02d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java @@ -29,7 +29,6 @@ import java.sql.JDBCType; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -47,7 +46,6 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; -import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.utils.AvroGenerators.RecordSchemaGenerator; import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability; import org.apache.beam.sdk.testing.CoderProperties; @@ -231,12 +229,6 @@ private static org.apache.avro.Schema getAvroSchema() { LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG)), "", (Object) null)); - fields.add( - new org.apache.avro.Schema.Field( - "timestampMicros", - LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)), - "", - (Object) null)); fields.add(new org.apache.avro.Schema.Field("row", getAvroSubSchema("row"), "", (Object) null)); fields.add( new org.apache.avro.Schema.Field( @@ -269,7 +261,6 @@ private Schema getBeamSchema() { .addField(Field.of("bytes", FieldType.BYTES)) .addField(Field.of("decimal", FieldType.DECIMAL)) .addField(Field.of("timestampMillis", FieldType.DATETIME)) - .addField(Field.of("timestampMicros", FieldType.logicalType(SqlTypes.TIMESTAMP))) .addField(Field.of("row", FieldType.row(subSchema))) .addField(Field.of("array", FieldType.array(FieldType.row(subSchema)))) .addField(Field.of("map", FieldType.map(FieldType.STRING, FieldType.row(subSchema)))) @@ -279,9 +270,6 @@ private Schema getBeamSchema() { private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4}; private static final DateTime DATE_TIME = new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); - private static final java.time.Instant MICROS_INSTANT = - java.time.Instant.ofEpochMilli(DATE_TIME.getMillis()) - .plusNanos(TimeUnit.MICROSECONDS.toNanos(123)); private static final BigDecimal BIG_DECIMAL = new BigDecimal(3600); private Row getBeamRow() { @@ -296,7 +284,6 @@ private Row getBeamRow() { .addValue(BYTE_ARRAY) .addValue(BIG_DECIMAL) .addValue(DATE_TIME) - .addValue(MICROS_INSTANT) .addValue(subRow) .addValue(ImmutableList.of(subRow, subRow)) .addValue(ImmutableMap.of("k1", subRow, "k2", subRow)) @@ -329,7 +316,6 @@ private static GenericRecord getGenericRecord() { .set("bytes", ByteBuffer.wrap(BYTE_ARRAY)) .set("decimal", encodedDecimal) .set("timestampMillis", DATE_TIME.getMillis()) - .set("timestampMicros", AvroUtils.getMicrosFromJavaInstant(MICROS_INSTANT)) .set("row", getSubGenericRecord("row")) .set("array", ImmutableList.of(getSubGenericRecord("array"), getSubGenericRecord("array"))) .set( @@ -728,25 +714,6 @@ public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() { assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); } - @Test - public void testSqlTypesToGenericRecord() { - // SqlTypes to LogicalTypes.date conversion are one direction - java.time.LocalDate localDate = java.time.LocalDate.of(1979, 3, 14); - - Schema beamSchema = - Schema.builder() - .addField(Field.of("local_date", FieldType.logicalType(SqlTypes.DATE))) - .build(); - - Row rowData = Row.withSchema(beamSchema).addValue(localDate).build(); - - org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); - GenericRecord expectedRecord = - new GenericRecordBuilder(avroSchema).set("local_date", localDate.toEpochDay()).build(); - - assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); - } - @Test public void testBeamRowToGenericRecord() { GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null); From 81b4c7153831580d4b3d6fc93bb7c3d214e4cbd2 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 2 Dec 2022 08:50:22 -0500 Subject: [PATCH 2/2] Add back javadoc --- .../beam/sdk/schemas/utils/AvroUtils.java | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 8d01ed0406a0b..8a19672f9ea14 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -98,7 +98,40 @@ import org.joda.time.Instant; import org.joda.time.ReadableInstant; -/** Utils to convert AVRO records to Beam rows. */ +/** + * Utils to convert AVRO records to Beam rows. Imposes a mapping between common avro types and Beam + * portable schemas (https://s.apache.org/beam-schemas): + * + *
+ *   Avro                Beam Field Type
+ *   INT         <-----> INT32
+ *   LONG        <-----> INT64
+ *   FLOAT       <-----> FLOAT
+ *   DOUBLE      <-----> DOUBLE
+ *   BOOLEAN     <-----> BOOLEAN
+ *   STRING      <-----> STRING
+ *   BYTES       <-----> BYTES
+ *               <------ LogicalType(urn="beam:logical_type:var_bytes:v1")
+ *   FIXED       <-----> LogicalType(urn="beam:logical_type:fixed_bytes:v1")
+ *   ARRAY       <-----> ARRAY
+ *   ENUM        <-----> LogicalType(EnumerationType)
+ *   MAP         <-----> MAP
+ *   RECORD      <-----> ROW
+ *   UNION       <-----> LogicalType(OneOfType)
+ *   LogicalTypes.Date              <-----> LogicalType(DATE)
+ *   LogicalTypes.TimestampMillis   <-----> DATETIME
+ *   LogicalTypes.Decimal           <-----> DECIMAL
+ * 
+ * + * For SQL CHAR/VARCHAR types, an Avro schema + * + *
+ *   LogicalType({"type":"string","logicalType":"char","maxLength":MAX_LENGTH}) or
+ *   LogicalType({"type":"string","logicalType":"varchar","maxLength":MAX_LENGTH})
+ * 
+ * + * is used. + */ @Experimental(Kind.SCHEMAS) @SuppressWarnings({ "nullness", // TODO(https://github.com/apache/beam/issues/20497)