Skip to content

Commit

Permalink
feat(topicdata): handle logical types local-timestamp-millis and loca…
Browse files Browse the repository at this point in the history
…l-timestamp-micros (#1015)
  • Loading branch information
opeti authored Feb 9, 2022
1 parent a5eb393 commit c0838e6
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 8 deletions.
26 changes: 26 additions & 0 deletions src/main/java/org/akhq/utils/AvroDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.*;
import java.util.stream.Collectors;
Expand All @@ -26,6 +27,8 @@ public class AvroDeserializer {
private static final String TIME_MICROS = "time-micros";
private static final String TIMESTAMP_MILLIS = "timestamp-millis";
private static final String TIMESTAMP_MICROS = "timestamp-micros";
private static final String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis";
private static final String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros";

private static final DecimalConversion DECIMAL_CONVERSION = new DecimalConversion();
private static final UUIDConversion UUID_CONVERSION = new UUIDConversion();
Expand All @@ -34,6 +37,8 @@ public class AvroDeserializer {
private static final TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeMillisConversion();
private static final TimestampMicrosConversion TIMESTAMP_MICROS_CONVERSION = new TimestampMicrosConversion();
private static final TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimestampMillisConversion();
private static final LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new LocalTimestampMicrosConversion();
private static final LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new LocalTimestampMillisConversion();

public static Map<String, Object> recordDeserializer(GenericRecord record) {
return record
Expand Down Expand Up @@ -68,6 +73,10 @@ private static Object objectDeserializer(Object value, Schema schema) {
return AvroDeserializer.timestampMicrosDeserializer(value, schema, primitiveType, logicalType);
case TIMESTAMP_MILLIS:
return AvroDeserializer.timestampMillisDeserializer(value, schema, primitiveType, logicalType);
case LOCAL_TIMESTAMP_MICROS:
return AvroDeserializer.localTimestampMicrosDeserializer(value, schema, primitiveType, logicalType);
case LOCAL_TIMESTAMP_MILLIS:
return AvroDeserializer.localTimestampMillisDeserializer(value, schema, primitiveType, logicalType);
case UUID:
return AvroDeserializer.uuidDeserializer(value, schema, primitiveType, logicalType);
default:
Expand Down Expand Up @@ -153,6 +162,23 @@ private static Instant timestampMillisDeserializer(Object value, Schema schema,
throw new IllegalStateException("Unexpected value: " + primitiveType);
}

private static LocalDateTime localTimestampMicrosDeserializer(Object value, Schema schema, Type primitiveType, LogicalType logicalType) {
switch (primitiveType) {
case LONG:
return AvroDeserializer.LOCAL_TIMESTAMP_MICROS_CONVERSION.fromLong((Long) value, schema, logicalType);
default:
throw new IllegalStateException("Unexpected value: " + primitiveType);
}
}

private static LocalDateTime localTimestampMillisDeserializer(Object value, Schema schema, Type primitiveType, LogicalType logicalType) {
if (primitiveType == Type.LONG) {
return AvroDeserializer.LOCAL_TIMESTAMP_MILLIS_CONVERSION.fromLong((Long) value, schema, logicalType);
}

throw new IllegalStateException("Unexpected value: " + primitiveType);
}

private static LocalTime timeMicrosDeserializer(Object value, Schema schema, Type primitiveType, LogicalType logicalType) {
if (primitiveType == Type.LONG) {
return AvroDeserializer.TIME_MICROS_CONVERSION.fromLong((Long) value, schema, logicalType);
Expand Down
62 changes: 60 additions & 2 deletions src/main/java/org/akhq/utils/AvroSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.AbstractMap;
Expand All @@ -33,6 +35,8 @@ public class AvroSerializer {
private static final String TIME_MICROS = "time-micros";
private static final String TIMESTAMP_MILLIS = "timestamp-millis";
private static final String TIMESTAMP_MICROS = "timestamp-micros";
private static final String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis";
private static final String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros";

private static final Conversions.DecimalConversion DECIMAL_CONVERSION = new Conversions.DecimalConversion();
private static final Conversions.UUIDConversion UUID_CONVERSION = new Conversions.UUIDConversion();
Expand All @@ -41,6 +45,8 @@ public class AvroSerializer {
private static final TimeConversions.TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
private static final TimeConversions.TimestampMicrosConversion TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion();
private static final TimeConversions.TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
private static final TimeConversions.LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new TimeConversions.LocalTimestampMicrosConversion();
private static final TimeConversions.LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.LocalTimestampMillisConversion();

protected static final String DATE_FORMAT = "yyyy-MM-dd[XXX]";
protected static final String TIME_FORMAT = "HH:mm[:ss][.SSSSSS][XXX]";
Expand Down Expand Up @@ -76,13 +82,17 @@ private static Object objectSerializer(Object value, Schema schema) {
case DECIMAL:
return AvroSerializer.decimalSerializer(value, schema, primitiveType, logicalType);
case TIME_MICROS:
return AvroSerializer.timeMicrosDeserializer(value, schema, primitiveType, logicalType);
return AvroSerializer.timeMicrosSerializer(value, schema, primitiveType, logicalType);
case TIME_MILLIS:
return AvroSerializer.timeMillisSerializer(value, schema, primitiveType, logicalType);
case TIMESTAMP_MICROS:
return AvroSerializer.timestampMicrosSerializer(value, schema, primitiveType, logicalType);
case TIMESTAMP_MILLIS:
return AvroSerializer.timestampMillisSerializer(value, schema, primitiveType, logicalType);
case LOCAL_TIMESTAMP_MICROS:
return AvroSerializer.localTimestampMicrosSerializer(value, schema, primitiveType, logicalType);
case LOCAL_TIMESTAMP_MILLIS:
return AvroSerializer.localTimestampMillisSerializer(value, schema, primitiveType, logicalType);
case UUID:
return AvroSerializer.uuidSerializer(value, schema, primitiveType, logicalType);
default:
Expand Down Expand Up @@ -214,12 +224,60 @@ private static Long timestampMillisSerializer(Object data, Schema schema, Schema
throw new IllegalStateException("Unexpected value: " + primitiveType + " on schema " + schema);
}

private static Long localTimestampMicrosSerializer(Object data, Schema schema, Schema.Type primitiveType, LogicalType logicalType) {
LocalDateTime value;

if (data instanceof String) {
try {
value = LocalDateTime.ofInstant(Instant.ofEpochSecond(0, Long.parseLong((String) data) * 1000), ZoneOffset.UTC);
} catch (NumberFormatException ignored) {
value = LocalDateTime.parse((String) data);
}
} else if (data instanceof Long) {
value = LocalDateTime.ofInstant(Instant.ofEpochSecond(0, (Long) data * 1000), ZoneOffset.UTC);
} else if (data instanceof Integer) {
value = LocalDateTime.ofInstant(Instant.ofEpochSecond(0, ((Integer) data).longValue() * 1000), ZoneOffset.UTC);
} else {
value = (LocalDateTime) data;
}

if (primitiveType == Schema.Type.LONG) {
return AvroSerializer.LOCAL_TIMESTAMP_MICROS_CONVERSION.toLong(value, schema, logicalType);
}

throw new IllegalStateException("Unexpected value: " + primitiveType + " on schema " + schema);
}

private static Long localTimestampMillisSerializer(Object data, Schema schema, Schema.Type primitiveType, LogicalType logicalType) {
LocalDateTime value;

if (data instanceof String) {
try {
value = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong((String) data)), ZoneOffset.UTC);
} catch (NumberFormatException ignored) {
value = LocalDateTime.parse((String) data);
}
} else if (data instanceof Long) {
value = LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) data), ZoneOffset.UTC);
} else if (data instanceof Integer) {
value = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Integer) data).longValue()), ZoneOffset.UTC);
} else {
value = (LocalDateTime) data;
}

if (primitiveType == Schema.Type.LONG) {
return AvroSerializer.LOCAL_TIMESTAMP_MILLIS_CONVERSION.toLong(value, schema, logicalType);
}

throw new IllegalStateException("Unexpected value: " + primitiveType + " on schema " + schema);
}

protected static Instant parseDateTime(String data) {
TimeZone tz = TimeZone.getDefault();
return DATETIME_FORMAT.withZone(tz.toZoneId()).parse(data, Instant::from);
}

private static Long timeMicrosDeserializer(Object data, Schema schema, Schema.Type primitiveType, LogicalType logicalType) {
private static Long timeMicrosSerializer(Object data, Schema schema, Schema.Type primitiveType, LogicalType logicalType) {
LocalTime value;
if (data instanceof String) {
value = LocalTime.parse((String) data, DateTimeFormatter.ofPattern(AvroSerializer.TIME_FORMAT));
Expand Down
19 changes: 13 additions & 6 deletions src/test/java/org/akhq/utils/AvroDeserializerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.math.BigDecimal;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -40,10 +41,12 @@ static Stream<Arguments> primitiveSource() {
Arguments.of(UUID.randomUUID(), "{\"type\": \"string\", \"logicalType\": \"uuid\"}"),
Arguments.of(LocalDate.now(), "{\"type\": \"int\", \"logicalType\": \"date\"}"),
Arguments.of(LocalTime.now(Clock.tickMillis(ZoneId.of("UTC"))), "{\"type\": \"int\", \"logicalType\": \"time-millis\"}"),
Arguments.of(LocalTime.now(), "{\"type\": \"long\", \"logicalType\": \"time-micros\"}"),
Arguments.of(LocalTime.now().truncatedTo(ChronoUnit.MICROS), "{\"type\": \"long\", \"logicalType\": \"time-micros\"}"),
Arguments.of(Instant.now(Clock.tickMillis(ZoneId.of("UTC"))), "{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}"),
Arguments.of(Instant.now(), "{\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}"),
Arguments.of(Instant.now(), "[\"null\", {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}]")
Arguments.of(Instant.now().truncatedTo(ChronoUnit.MICROS), "{\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}"),
Arguments.of(Instant.now().truncatedTo(ChronoUnit.MICROS), "[\"null\", {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}]"),
Arguments.of(LocalDateTime.now().truncatedTo(ChronoUnit.MICROS), "{\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\"}"),
Arguments.of(LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS), "{\"type\": \"long\", \"logicalType\": \"local-timestamp-millis\"}")
);
}

Expand All @@ -70,8 +73,10 @@ void testPrimitive(Object value, String type) {

static Stream<Arguments> convertionSource() {
UUID uuid = UUID.randomUUID();
LocalTime localTime = LocalTime.now();
Instant now = Instant.now();
LocalTime localTime = LocalTime.now().truncatedTo(ChronoUnit.MICROS);
Instant now = Instant.now().truncatedTo(ChronoUnit.MICROS);
LocalDateTime localDateTimeMicros = LocalDateTime.now().truncatedTo(ChronoUnit.MICROS);
LocalDateTime localDateTimeMillis = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS);

return Stream.of(
Arguments.of("abc", "\"bytes\"", "abc".getBytes()),
Expand All @@ -81,7 +86,9 @@ static Stream<Arguments> convertionSource() {
Arguments.of(uuid.toString(), "{\"type\": \"string\", \"logicalType\": \"uuid\"}", uuid),
Arguments.of(LocalDate.now().format(DateTimeFormatter.ISO_LOCAL_DATE), "{\"type\": \"int\", \"logicalType\": \"date\"}", LocalDate.now()),
Arguments.of(localTime.format(DateTimeFormatter.ISO_LOCAL_TIME), "{\"type\": \"long\", \"logicalType\": \"time-micros\"}", localTime),
Arguments.of(now.atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), "{\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}", now)
Arguments.of(now.atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), "{\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}", now),
Arguments.of(localDateTimeMicros.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), "{\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\"}", localDateTimeMicros),
Arguments.of(localDateTimeMillis.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), "{\"type\": \"long\", \"logicalType\": \"local-timestamp-millis\"}", localDateTimeMillis)
);
}

Expand Down

0 comments on commit c0838e6

Please sign in to comment.