diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index a4885686dafc..ef014af53035 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -75,6 +75,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType; @@ -123,6 +124,7 @@ * RECORD <-----> ROW * UNION <-----> LogicalType(OneOfType) * LogicalTypes.Date <-----> LogicalType(DATE) + * <------ LogicalType(urn="beam:logical_type:date:v1") * LogicalTypes.TimestampMillis <-----> DATETIME * LogicalTypes.Decimal <-----> DECIMAL * @@ -967,7 +969,7 @@ private static org.apache.avro.Schema getFieldSchema( oneOfType.getOneOfSchema().getFields().stream() .map(x -> getFieldSchema(x.getType(), x.getName(), namespace)) .collect(Collectors.toList())); - } else if ("DATE".equals(identifier)) { + } else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) { baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT)); } else if ("TIME".equals(identifier)) { baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT)); @@ -1095,7 +1097,11 @@ private static org.apache.avro.Schema getFieldSchema( oneOfValue.getValue()); } } else if ("DATE".equals(identifier)) { + // "Date" is backed by joda.time.Instant return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays(); + } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { + // portable SqlTypes.DATE is backed by java.time.LocalDate + return ((java.time.LocalDate) value).toEpochDay(); } else if ("TIME".equals(identifier)) { return (int) ((Instant) value).getMillis(); } else { @@ -1175,6 +1181,8 @@ private static org.apache.avro.Schema getFieldSchema( if (value instanceof ReadableInstant) { int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays(); return convertDateStrict(epochDays, fieldType); + } else if (value instanceof java.time.LocalDate) { + return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType); } else { return convertDateStrict((Integer) value, fieldType); } diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index adacec1ebbbd..4e282fb7094b 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -715,6 +716,25 @@ public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() { assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); } + @Test + public void testSqlTypesToGenericRecord() { + // SqlTypes to LogicalTypes.date conversion is one direction + java.time.LocalDate localDate = java.time.LocalDate.of(1979, 3, 14); + + Schema beamSchema = + Schema.builder() + .addField(Field.of("local_date", FieldType.logicalType(SqlTypes.DATE))) + .build(); + + Row rowData = Row.withSchema(beamSchema).addValue(localDate).build(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); + GenericRecord expectedRecord = + new GenericRecordBuilder(avroSchema).set("local_date", localDate.toEpochDay()).build(); + + assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); + } + @Test public void testBeamRowToGenericRecord() { GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null);