From 2f131f21efa06d2fb8afa5cdf7b5389d7ded2afb Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Mon, 11 Feb 2019 17:37:46 -0800 Subject: [PATCH] Use loggedAt to replace future occurredAt timestamps (#13) --- ...yftFlinkStreamingPortableTranslations.java | 33 +++++++++++---- ...linkStreamingPortableTranslationsTest.java | 40 ++++++++++++++++++- 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java index 91c08dcfb08a8..4a66c5d6b850c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java @@ -148,7 +148,6 @@ public TypeInformation> getProducedType() { @Override public WindowedValue deserialize( byte[] messageKey, byte[] message, String topic, int partition, long offset) { - // System.out.println("###Kafka record: " + new String(message, Charset.defaultCharset())); return WindowedValue.valueInGlobalWindow(message); } @@ -322,24 +321,42 @@ public WindowedValue deserialize( throw new IOException("Events is not an array"); } + // Determine the timestamp as minimum occurred_at of all contained events. + // For each event, attempt to extract occurred_at from either date string or number. + // Assume timestamp as logged_at, when occurred_at is an (invalid) future timestamp. + Iterator iter = events.elements(); long timestamp = Long.MAX_VALUE; while (iter.hasNext()) { - JsonNode occurredAt = iter.next().path(EventField.EventOccurredAt.fieldName()); + JsonNode event = iter.next(); + JsonNode occurredAt = event.path(EventField.EventOccurredAt.fieldName()); try { + long occurredAtMillis; if (occurredAt.isTextual()) { - timestamp = Math.min(parseDateTime(occurredAt.textValue()), timestamp); + occurredAtMillis = parseDateTime(occurredAt.textValue()); } else if (occurredAt.isNumber()) { - timestamp = occurredAt.asLong(); + occurredAtMillis = occurredAt.asLong(); + } else { + continue; + } + if (event.has(EventField.EventLoggedAt.fieldName())) { + long loggedAtMillis = event.path(EventField.EventLoggedAt.fieldName()).asLong(); + if (loggedAtMillis > 0) { + if (loggedAtMillis < Integer.MAX_VALUE) { + loggedAtMillis = loggedAtMillis * 1000; + } + occurredAtMillis = Math.min(occurredAtMillis, loggedAtMillis); + } } + timestamp = Math.min(occurredAtMillis, timestamp); } catch (DateTimeParseException e) { // skip this timestamp } + } - // if we didn't find any valid timestamps, use Long.MIN_VALUE - if (timestamp == Long.MAX_VALUE) { - timestamp = Long.MIN_VALUE; - } + // if we didn't find any valid timestamps, use Long.MIN_VALUE + if (timestamp == Long.MAX_VALUE) { + timestamp = Long.MIN_VALUE; } return WindowedValue.timestampedValueInGlobalWindow(recordValue, new Instant(timestamp)); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java index 94b31f5b8630f..f40e0a1f0f65a 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java @@ -17,8 +17,13 @@ */ package org.apache.beam.runners.flink; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.Charset; +import java.text.SimpleDateFormat; import java.util.Base64; +import java.util.TimeZone; +import java.util.zip.Deflater; import org.apache.beam.runners.flink.LyftFlinkStreamingPortableTranslations.LyftBase64ZlibJsonSchema; import org.apache.beam.sdk.util.WindowedValue; import org.junit.Assert; @@ -59,8 +64,7 @@ public void testBeamKinesisSchemaLongTimestamp() throws IOException { @Test public void testBeamKinesisSchemaNoTimestamp() throws IOException { - // [{"event_id": 1}] - byte[] message = Base64.getDecoder().decode("eJyLrlZKLUvNK4nPTFGyUjCsjQUANv8Fzg=="); + byte[] message = encode("[{\"event_id\": 1}]"); LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(); WindowedValue value = schema.deserialize(message, "", "", 0, "", ""); @@ -86,4 +90,36 @@ public void testBeamKinesisSchemaMultipleRecords() throws IOException { // we should output the oldest timestamp in the bundle Assert.assertEquals(1540599602000L, value.getTimestamp().getMillis()); } + + @Test + public void testBeamKinesisSchemaFutureOccurredAtTimestamp() throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS"); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + + long loggedAtMillis = sdf.parse("2018-10-27 00:10:02.000000").getTime(); + String events = + "[{\"event_id\": 1, \"occurred_at\": \"2018-10-27 00:20:02.900\", \"logged_at\": " + + loggedAtMillis / 1000 + + "}]"; + byte[] message = encode(events); + LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(); + WindowedValue value = schema.deserialize(message, "", "", 0, "", ""); + + Assert.assertArrayEquals(message, value.getValue()); + Assert.assertEquals(loggedAtMillis, value.getTimestamp().getMillis()); + } + + private static byte[] encode(String data) throws IOException { + Deflater deflater = new Deflater(); + deflater.setInput(data.getBytes(Charset.defaultCharset())); + deflater.finish(); + byte[] buf = new byte[4096]; + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length())) { + while (!deflater.finished()) { + int count = deflater.deflate(buf); + bos.write(buf, 0, count); + } + return bos.toByteArray(); + } + } }