Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-2.44.0] Cherry-pick: Revert 23969 sqlttypes #24492

Merged
merged 2 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
Expand Down Expand Up @@ -77,7 +76,6 @@
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
Expand All @@ -89,7 +87,6 @@
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -122,9 +119,7 @@
* RECORD <-----> ROW
* UNION <-----> LogicalType(OneOfType)
* LogicalTypes.Date <-----> LogicalType(DATE)
* <------ LogicalType(urn="beam:logical_type:date:v1")
* LogicalTypes.TimestampMillis <-----> DATETIME
* LogicalTypes.TimestampMicros <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
* LogicalTypes.Decimal <-----> DECIMAL
* </pre>
*
Expand Down Expand Up @@ -810,11 +805,9 @@ private static Schema.FieldType toFieldType(TypeWithNullability type) {
if (logicalType instanceof LogicalTypes.Decimal) {
fieldType = FieldType.DECIMAL;
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
// TODO(https://github.com/apache/beam/issues/19215) DATETIME primitive will be removed when
// fully migrates to java.time lib from joda-time
// TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
// this is done, this logical type needs to be changed.
fieldType = FieldType.DATETIME;
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
fieldType = FieldType.logicalType(SqlTypes.TIMESTAMP);
} else if (logicalType instanceof LogicalTypes.Date) {
fieldType = FieldType.DATETIME;
}
Expand Down Expand Up @@ -927,8 +920,8 @@ private static org.apache.avro.Schema getFieldSchema(
break;

case DATETIME:
// TODO(https://github.com/apache/beam/issues/19215) DATETIME primitive will be removed when
// fully migrates to java.time lib from joda-time
// TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
// this is done, this logical type needs to be changed.
baseType =
LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG));
break;
Expand Down Expand Up @@ -973,13 +966,10 @@ private static org.apache.avro.Schema getFieldSchema(
oneOfType.getOneOfSchema().getFields().stream()
.map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
.collect(Collectors.toList()));
} else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) {
} else if ("DATE".equals(identifier)) {
baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
} else if ("TIME".equals(identifier)) {
baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
baseType =
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
} else {
throw new RuntimeException(
"Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
Expand Down Expand Up @@ -1104,17 +1094,9 @@ private static org.apache.avro.Schema getFieldSchema(
oneOfValue.getValue());
}
} else if ("DATE".equals(identifier)) {
// "Date" is backed by joda.time.Instant
return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays();
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
// portable SqlTypes.DATE is backed by java.time.LocalDate
return ((java.time.LocalDate) value).toEpochDay();
} else if ("TIME".equals(identifier)) {
// "TIME" is backed by joda.time.Instant
return (int) ((Instant) value).getMillis();
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
// portable SqlTypes.TIMESTAMP is backed by java.time.Instant
return getMicrosFromJavaInstant((java.time.Instant) value);
} else {
throw new RuntimeException("Unhandled logical type " + identifier);
}
Expand Down Expand Up @@ -1188,19 +1170,10 @@ private static org.apache.avro.Schema getFieldSchema(
} else {
return convertDateTimeStrict((Long) value, fieldType);
}
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
if (value instanceof java.time.Instant) {
return convertMicroMillisStrict(
getMicrosFromJavaInstant((java.time.Instant) value), fieldType);
} else {
return convertMicroMillisStrict((Long) value, fieldType);
}
} else if (logicalType instanceof LogicalTypes.Date) {
if (value instanceof ReadableInstant) {
int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
return convertDateStrict(epochDays, fieldType);
} else if (value instanceof java.time.LocalDate) {
return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType);
} else {
return convertDateStrict((Integer) value, fieldType);
}
Expand Down Expand Up @@ -1258,14 +1231,6 @@ private static org.apache.avro.Schema getFieldSchema(
}
}

/** Helper method to get epoch micros required by Avro TimeStampMicros logical type. */
@SuppressWarnings("JavaInstantGetSecondsGetNano")
@VisibleForTesting
static long getMicrosFromJavaInstant(java.time.Instant value) {
return TimeUnit.SECONDS.toMicros(value.getEpochSecond())
+ TimeUnit.NANOSECONDS.toMicros(value.getNano());
}

private static Object convertRecordStrict(GenericRecord record, Schema.FieldType fieldType) {
checkTypeName(fieldType.getTypeName(), Schema.TypeName.ROW, "record");
return toBeamRowStrict(record, fieldType.getRowSchema());
Expand Down Expand Up @@ -1315,17 +1280,6 @@ private static Object convertDateTimeStrict(Long value, Schema.FieldType fieldTy
return new Instant(value);
}

private static Object convertMicroMillisStrict(Long value, Schema.FieldType fieldType) {
checkTypeName(
fieldType.getTypeName(), TypeName.LOGICAL_TYPE, SqlTypes.TIMESTAMP.getIdentifier());
checkArgument(
fieldType.getLogicalType().getIdentifier().equals(SqlTypes.TIMESTAMP.getIdentifier()));

return java.time.Instant.ofEpochSecond(
TimeUnit.MICROSECONDS.toSeconds(value),
TimeUnit.MICROSECONDS.toNanos(Math.floorMod(value, TimeUnit.SECONDS.toMicros(1))));
}

private static Object convertFloatStrict(Float value, Schema.FieldType fieldType) {
checkTypeName(fieldType.getTypeName(), Schema.TypeName.FLOAT, "float");
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.sql.JDBCType;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
Expand All @@ -47,7 +46,6 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.utils.AvroGenerators.RecordSchemaGenerator;
import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
import org.apache.beam.sdk.testing.CoderProperties;
Expand Down Expand Up @@ -231,12 +229,6 @@ private static org.apache.avro.Schema getAvroSchema() {
LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
"",
(Object) null));
fields.add(
new org.apache.avro.Schema.Field(
"timestampMicros",
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
"",
(Object) null));
fields.add(new org.apache.avro.Schema.Field("row", getAvroSubSchema("row"), "", (Object) null));
fields.add(
new org.apache.avro.Schema.Field(
Expand Down Expand Up @@ -269,7 +261,6 @@ private Schema getBeamSchema() {
.addField(Field.of("bytes", FieldType.BYTES))
.addField(Field.of("decimal", FieldType.DECIMAL))
.addField(Field.of("timestampMillis", FieldType.DATETIME))
.addField(Field.of("timestampMicros", FieldType.logicalType(SqlTypes.TIMESTAMP)))
.addField(Field.of("row", FieldType.row(subSchema)))
.addField(Field.of("array", FieldType.array(FieldType.row(subSchema))))
.addField(Field.of("map", FieldType.map(FieldType.STRING, FieldType.row(subSchema))))
Expand All @@ -279,9 +270,6 @@ private Schema getBeamSchema() {
private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4};
private static final DateTime DATE_TIME =
new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC);
private static final java.time.Instant MICROS_INSTANT =
java.time.Instant.ofEpochMilli(DATE_TIME.getMillis())
.plusNanos(TimeUnit.MICROSECONDS.toNanos(123));
private static final BigDecimal BIG_DECIMAL = new BigDecimal(3600);

private Row getBeamRow() {
Expand All @@ -296,7 +284,6 @@ private Row getBeamRow() {
.addValue(BYTE_ARRAY)
.addValue(BIG_DECIMAL)
.addValue(DATE_TIME)
.addValue(MICROS_INSTANT)
.addValue(subRow)
.addValue(ImmutableList.of(subRow, subRow))
.addValue(ImmutableMap.of("k1", subRow, "k2", subRow))
Expand Down Expand Up @@ -329,7 +316,6 @@ private static GenericRecord getGenericRecord() {
.set("bytes", ByteBuffer.wrap(BYTE_ARRAY))
.set("decimal", encodedDecimal)
.set("timestampMillis", DATE_TIME.getMillis())
.set("timestampMicros", AvroUtils.getMicrosFromJavaInstant(MICROS_INSTANT))
.set("row", getSubGenericRecord("row"))
.set("array", ImmutableList.of(getSubGenericRecord("array"), getSubGenericRecord("array")))
.set(
Expand Down Expand Up @@ -728,25 +714,6 @@ public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() {
assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
}

@Test
public void testSqlTypesToGenericRecord() {
// SqlTypes to LogicalTypes.date conversion are one direction
java.time.LocalDate localDate = java.time.LocalDate.of(1979, 3, 14);

Schema beamSchema =
Schema.builder()
.addField(Field.of("local_date", FieldType.logicalType(SqlTypes.DATE)))
.build();

Row rowData = Row.withSchema(beamSchema).addValue(localDate).build();

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
GenericRecord expectedRecord =
new GenericRecordBuilder(avroSchema).set("local_date", localDate.toEpochDay()).build();

assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
}

@Test
public void testBeamRowToGenericRecord() {
GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null);
Expand Down