From 1b1448d5cce7a5a5586ef4b868dc7bc8e8b82ed8 Mon Sep 17 00:00:00 2001 From: Anne <102554163+alovew@users.noreply.github.com> Date: Fri, 12 Aug 2022 15:07:00 -0700 Subject: [PATCH] Track Max & mean seconds to receive state messages (#15586) * Segment tracking for max and mean seconds to receive state message from source --- .../src/main/resources/types/SyncStats.yaml | 4 ++ .../job_tracker/TrackingMetadata.java | 4 ++ .../job_tracker/JobTrackerTest.java | 4 ++ .../general/DefaultReplicationWorker.java | 4 +- .../internal/AirbyteMessageTracker.java | 59 +++++++++++++++++++ .../workers/internal/MessageTracker.java | 4 ++ .../general/DefaultReplicationWorkerTest.java | 12 +++- .../internal/AirbyteMessageTrackerTest.java | 9 +++ 8 files changed, 98 insertions(+), 2 deletions(-) diff --git a/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml b/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml index 6616996cb33d..09613e94cd78 100644 --- a/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml +++ b/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml @@ -21,3 +21,7 @@ properties: type: integer recordsCommitted: type: integer # if unset, committed records could not be computed + meanSecondsBeforeSourceStateMessageEmitted: + type: integer + maxSecondsBeforeSourceStateMessageEmitted: + type: integer diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java index 42db33fca844..90febc6f59aa 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java @@ -111,6 +111,10 @@ public static ImmutableMap generateJobAttemptMetadata(final Job metadata.put("volume_rows", syncSummary.getRecordsSynced()); metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted()); metadata.put("count_state_messages_from_destination", syncSummary.getTotalStats().getDestinationStateMessagesEmitted()); + metadata.put("max_seconds_before_source_state_message_emitted", + syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted()); + metadata.put("mean_seconds_before_source_state_message_emitted", + syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted()); } } diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java index 56f5347677af..a0ce87ac3563 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java @@ -114,6 +114,8 @@ class JobTrackerTest { .put("volume_mb", SYNC_BYTES_SYNC) .put("count_state_messages_from_source", 3L) .put("count_state_messages_from_destination", 1L) + .put("max_seconds_before_source_state_message_emitted", 5L) + .put("mean_seconds_before_source_state_message_emitted", 4L) .build(); private static final ImmutableMap SYNC_CONFIG_METADATA = ImmutableMap.builder() .put(JobTracker.CONFIG + ".source.key", JobTracker.SET) @@ -496,6 +498,8 @@ private Attempt getAttemptMock() { when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput)); when(syncStats.getSourceStateMessagesEmitted()).thenReturn(3L); when(syncStats.getDestinationStateMessagesEmitted()).thenReturn(1L); + when(syncStats.getMaxSecondsBeforeSourceStateMessageEmitted()).thenReturn(5L); + when(syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()).thenReturn(4L); return attempt; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index f1b708549389..5bae9b605bba 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -202,7 +202,9 @@ else if (hasFailed.get()) { .withRecordsEmitted(messageTracker.getTotalRecordsEmitted()) .withBytesEmitted(messageTracker.getTotalBytesEmitted()) .withSourceStateMessagesEmitted(messageTracker.getTotalSourceStateMessagesEmitted()) - .withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted()); + .withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted()) + .withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage()) + .withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage()); if (outputStatus == ReplicationStatus.COMPLETED) { totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index 81d25e1ed091..c4e2b6ff5155 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.joda.time.DateTime; +import org.joda.time.Seconds; @Slf4j public class AirbyteMessageTracker implements MessageTracker { @@ -40,6 +42,8 @@ public class AirbyteMessageTracker implements MessageTracker { private final AtomicReference destinationOutputState; private final AtomicLong totalSourceEmittedStateMessages; private final AtomicLong totalDestinationEmittedStateMessages; + private Long maxSecondsToReceiveSourceStateMessage; + private Long meanSecondsToReceiveSourceStateMessage; private final Map streamToRunningCount; private final HashFunction hashFunction; private final BiMap streamNameToIndex; @@ -49,6 +53,8 @@ public class AirbyteMessageTracker implements MessageTracker { private final List destinationErrorTraceMessages; private final List sourceErrorTraceMessages; private final StateAggregator stateAggregator; + private DateTime firstRecordReceivedAt; + private final DateTime lastStateMessageReceivedAt; private short nextStreamIndex; @@ -74,6 +80,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final this.destinationOutputState = new AtomicReference<>(); this.totalSourceEmittedStateMessages = new AtomicLong(0L); this.totalDestinationEmittedStateMessages = new AtomicLong(0L); + this.maxSecondsToReceiveSourceStateMessage = 0L; + this.meanSecondsToReceiveSourceStateMessage = 0L; this.streamToRunningCount = new HashMap<>(); this.streamNameToIndex = HashBiMap.create(); this.hashFunction = Hashing.murmur3_32_fixed(); @@ -85,6 +93,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final this.destinationErrorTraceMessages = new ArrayList<>(); this.sourceErrorTraceMessages = new ArrayList<>(); this.stateAggregator = stateAggregator; + this.firstRecordReceivedAt = null; + this.lastStateMessageReceivedAt = null; } @Override @@ -111,6 +121,10 @@ public void acceptFromDestination(final AirbyteMessage message) { * total byte count for the record's stream. */ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage) { + if (firstRecordReceivedAt == null) { + firstRecordReceivedAt = DateTime.now(); + } + final short streamIndex = getStreamIndex(recordMessage.getStream()); final long currentRunningCount = streamToRunningCount.getOrDefault(streamIndex, 0L); @@ -131,6 +145,7 @@ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage) * correctly. */ private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) { + updateMaxAndMeanSecondsToReceiveStateMessage(DateTime.now()); sourceOutputState.set(new State().withState(stateMessage.getData())); totalSourceEmittedStateMessages.incrementAndGet(); final int stateHash = getStateHashCode(stateMessage); @@ -327,4 +342,48 @@ public Long getTotalDestinationStateMessagesEmitted() { return totalDestinationEmittedStateMessages.get(); } + @Override + public Long getMaxSecondsToReceiveSourceStateMessage() { + return maxSecondsToReceiveSourceStateMessage; + } + + @Override + public Long getMeanSecondsToReceiveSourceStateMessage() { + return meanSecondsToReceiveSourceStateMessage; + } + + private void updateMaxAndMeanSecondsToReceiveStateMessage(final DateTime stateMessageReceivedAt) { + final Long secondsSinceLastStateMessage = calculateSecondsSinceLastStateEmitted(stateMessageReceivedAt); + if (maxSecondsToReceiveSourceStateMessage < secondsSinceLastStateMessage) { + maxSecondsToReceiveSourceStateMessage = secondsSinceLastStateMessage; + } + + if (meanSecondsToReceiveSourceStateMessage == 0) { + meanSecondsToReceiveSourceStateMessage = secondsSinceLastStateMessage; + } else { + final Long newMeanSeconds = + calculateMean(meanSecondsToReceiveSourceStateMessage, totalSourceEmittedStateMessages.get(), secondsSinceLastStateMessage); + meanSecondsToReceiveSourceStateMessage = newMeanSeconds; + } + } + + private Long calculateSecondsSinceLastStateEmitted(final DateTime stateMessageReceivedAt) { + if (lastStateMessageReceivedAt != null) { + return Long.valueOf(Seconds.secondsBetween(lastStateMessageReceivedAt, stateMessageReceivedAt).getSeconds()); + } else if (firstRecordReceivedAt != null) { + return Long.valueOf(Seconds.secondsBetween(firstRecordReceivedAt, stateMessageReceivedAt).getSeconds()); + } else { + // If we receive a State Message before a Record Message there is no previous timestamp to use for a + // calculation + return 0L; + } + } + + @VisibleForTesting + protected Long calculateMean(final Long currentMean, final Long totalCount, final Long newDataPoint) { + final Long previousCount = totalCount - 1; + final double result = (Double.valueOf(currentMean * previousCount) / totalCount) + (Double.valueOf(newDataPoint) / totalCount); + return (long) result; + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/MessageTracker.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/MessageTracker.java index 94266bf0f34c..f9196161a641 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/MessageTracker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/MessageTracker.java @@ -106,6 +106,10 @@ public interface MessageTracker { Long getTotalDestinationStateMessagesEmitted(); + Long getMaxSecondsToReceiveSourceStateMessage(); + + Long getMeanSecondsToReceiveSourceStateMessage(); + AirbyteTraceMessage getFirstDestinationErrorTraceMessage(); AirbyteTraceMessage getFirstSourceErrorTraceMessage(); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java index b51e9bad82b6..1a00b8f37ebe 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java @@ -445,6 +445,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException { when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(1L); when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L)); when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L)); + when(messageTracker.getMaxSecondsToReceiveSourceStateMessage()).thenReturn(5L); + when(messageTracker.getMeanSecondsToReceiveSourceStateMessage()).thenReturn(4L); final ReplicationWorker worker = new DefaultReplicationWorker( JOB_ID, @@ -467,6 +469,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException { .withBytesEmitted(100L) .withSourceStateMessagesEmitted(3L) .withDestinationStateMessagesEmitted(1L) + .withMaxSecondsBeforeSourceStateMessageEmitted(5L) + .withMeanSecondsBeforeSourceStateMessageEmitted(4L) .withRecordsCommitted(12L)) // since success, should use emitted count .withStreamStats(Collections.singletonList( new StreamSyncStats() @@ -476,7 +480,9 @@ void testPopulatesOutputOnSuccess() throws WorkerException { .withRecordsEmitted(12L) .withRecordsCommitted(12L) // since success, should use emitted count .withSourceStateMessagesEmitted(null) - .withDestinationStateMessagesEmitted(null))))) + .withDestinationStateMessagesEmitted(null) + .withMaxSecondsBeforeSourceStateMessageEmitted(null) + .withMeanSecondsBeforeSourceStateMessageEmitted(null))))) .withOutputCatalog(syncInput.getCatalog()) .withState(new State().withState(expectedState)); @@ -548,6 +554,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception { when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L)); when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L)); when(messageTracker.getStreamToCommittedRecords()).thenReturn(Optional.of(Collections.singletonMap(STREAM1, 6L))); + when(messageTracker.getMaxSecondsToReceiveSourceStateMessage()).thenReturn(10L); + when(messageTracker.getMeanSecondsToReceiveSourceStateMessage()).thenReturn(8L); final ReplicationWorker worker = new DefaultReplicationWorker( JOB_ID, @@ -565,6 +573,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception { .withBytesEmitted(100L) .withSourceStateMessagesEmitted(3L) .withDestinationStateMessagesEmitted(2L) + .withMaxSecondsBeforeSourceStateMessageEmitted(10L) + .withMeanSecondsBeforeSourceStateMessageEmitted(8L) .withRecordsCommitted(6L); final List expectedStreamStats = Collections.singletonList( new StreamSyncStats() diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java index df7f1f988b7c..16dc6f875524 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java @@ -324,4 +324,13 @@ void testErrorTraceMessageFailureWithNoTraceErrors() throws Exception { assertEquals(messageTracker.errorTraceMessageFailure(Long.valueOf(123), 1), null); } + @Test + void testCalculateMean() throws Exception { + // Mean for 3 state messages is 5, 4th state message is 9, new mean should be 6 + assertEquals(6L, messageTracker.calculateMean(5L, 4L, 9L)); + + // Mean for 5 state messages is 10, 4th state message is 12, new mean is 10.33 rounded down to 10 + assertEquals(10L, messageTracker.calculateMean(10L, 6L, 12L)); + } + }