Skip to content

Commit

Permalink
Track Max & mean seconds to receive state messages (#15586)
Browse files Browse the repository at this point in the history
* Segment tracking for max and mean seconds to receive state message from source
  • Loading branch information
alovew authored Aug 12, 2022
1 parent 2a2c164 commit 1b1448d
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ properties:
type: integer
recordsCommitted:
type: integer # if unset, committed records could not be computed
meanSecondsBeforeSourceStateMessageEmitted:
type: integer
maxSecondsBeforeSourceStateMessageEmitted:
type: integer
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public static ImmutableMap<String, Object> generateJobAttemptMetadata(final Job
metadata.put("volume_rows", syncSummary.getRecordsSynced());
metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted());
metadata.put("count_state_messages_from_destination", syncSummary.getTotalStats().getDestinationStateMessagesEmitted());
metadata.put("max_seconds_before_source_state_message_emitted",
syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted());
metadata.put("mean_seconds_before_source_state_message_emitted",
syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class JobTrackerTest {
.put("volume_mb", SYNC_BYTES_SYNC)
.put("count_state_messages_from_source", 3L)
.put("count_state_messages_from_destination", 1L)
.put("max_seconds_before_source_state_message_emitted", 5L)
.put("mean_seconds_before_source_state_message_emitted", 4L)
.build();
private static final ImmutableMap<String, Object> SYNC_CONFIG_METADATA = ImmutableMap.<String, Object>builder()
.put(JobTracker.CONFIG + ".source.key", JobTracker.SET)
Expand Down Expand Up @@ -496,6 +498,8 @@ private Attempt getAttemptMock() {
when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput));
when(syncStats.getSourceStateMessagesEmitted()).thenReturn(3L);
when(syncStats.getDestinationStateMessagesEmitted()).thenReturn(1L);
when(syncStats.getMaxSecondsBeforeSourceStateMessageEmitted()).thenReturn(5L);
when(syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()).thenReturn(4L);
return attempt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ else if (hasFailed.get()) {
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
.withSourceStateMessagesEmitted(messageTracker.getTotalSourceStateMessagesEmitted())
.withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted());
.withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted())
.withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage())
.withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage());

if (outputStatus == ReplicationStatus.COMPLETED) {
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.joda.time.Seconds;

@Slf4j
public class AirbyteMessageTracker implements MessageTracker {
Expand All @@ -40,6 +42,8 @@ public class AirbyteMessageTracker implements MessageTracker {
private final AtomicReference<State> destinationOutputState;
private final AtomicLong totalSourceEmittedStateMessages;
private final AtomicLong totalDestinationEmittedStateMessages;
private Long maxSecondsToReceiveSourceStateMessage;
private Long meanSecondsToReceiveSourceStateMessage;
private final Map<Short, Long> streamToRunningCount;
private final HashFunction hashFunction;
private final BiMap<String, Short> streamNameToIndex;
Expand All @@ -49,6 +53,8 @@ public class AirbyteMessageTracker implements MessageTracker {
private final List<AirbyteTraceMessage> destinationErrorTraceMessages;
private final List<AirbyteTraceMessage> sourceErrorTraceMessages;
private final StateAggregator stateAggregator;
private DateTime firstRecordReceivedAt;
private final DateTime lastStateMessageReceivedAt;

private short nextStreamIndex;

Expand All @@ -74,6 +80,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final
this.destinationOutputState = new AtomicReference<>();
this.totalSourceEmittedStateMessages = new AtomicLong(0L);
this.totalDestinationEmittedStateMessages = new AtomicLong(0L);
this.maxSecondsToReceiveSourceStateMessage = 0L;
this.meanSecondsToReceiveSourceStateMessage = 0L;
this.streamToRunningCount = new HashMap<>();
this.streamNameToIndex = HashBiMap.create();
this.hashFunction = Hashing.murmur3_32_fixed();
Expand All @@ -85,6 +93,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final
this.destinationErrorTraceMessages = new ArrayList<>();
this.sourceErrorTraceMessages = new ArrayList<>();
this.stateAggregator = stateAggregator;
this.firstRecordReceivedAt = null;
this.lastStateMessageReceivedAt = null;
}

@Override
Expand All @@ -111,6 +121,10 @@ public void acceptFromDestination(final AirbyteMessage message) {
* total byte count for the record's stream.
*/
private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage) {
if (firstRecordReceivedAt == null) {
firstRecordReceivedAt = DateTime.now();
}

final short streamIndex = getStreamIndex(recordMessage.getStream());

final long currentRunningCount = streamToRunningCount.getOrDefault(streamIndex, 0L);
Expand All @@ -131,6 +145,7 @@ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage)
* correctly.
*/
private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) {
updateMaxAndMeanSecondsToReceiveStateMessage(DateTime.now());
sourceOutputState.set(new State().withState(stateMessage.getData()));
totalSourceEmittedStateMessages.incrementAndGet();
final int stateHash = getStateHashCode(stateMessage);
Expand Down Expand Up @@ -327,4 +342,48 @@ public Long getTotalDestinationStateMessagesEmitted() {
return totalDestinationEmittedStateMessages.get();
}

@Override
public Long getMaxSecondsToReceiveSourceStateMessage() {
return maxSecondsToReceiveSourceStateMessage;
}

@Override
public Long getMeanSecondsToReceiveSourceStateMessage() {
return meanSecondsToReceiveSourceStateMessage;
}

private void updateMaxAndMeanSecondsToReceiveStateMessage(final DateTime stateMessageReceivedAt) {
final Long secondsSinceLastStateMessage = calculateSecondsSinceLastStateEmitted(stateMessageReceivedAt);
if (maxSecondsToReceiveSourceStateMessage < secondsSinceLastStateMessage) {
maxSecondsToReceiveSourceStateMessage = secondsSinceLastStateMessage;
}

if (meanSecondsToReceiveSourceStateMessage == 0) {
meanSecondsToReceiveSourceStateMessage = secondsSinceLastStateMessage;
} else {
final Long newMeanSeconds =
calculateMean(meanSecondsToReceiveSourceStateMessage, totalSourceEmittedStateMessages.get(), secondsSinceLastStateMessage);
meanSecondsToReceiveSourceStateMessage = newMeanSeconds;
}
}

private Long calculateSecondsSinceLastStateEmitted(final DateTime stateMessageReceivedAt) {
if (lastStateMessageReceivedAt != null) {
return Long.valueOf(Seconds.secondsBetween(lastStateMessageReceivedAt, stateMessageReceivedAt).getSeconds());
} else if (firstRecordReceivedAt != null) {
return Long.valueOf(Seconds.secondsBetween(firstRecordReceivedAt, stateMessageReceivedAt).getSeconds());
} else {
// If we receive a State Message before a Record Message there is no previous timestamp to use for a
// calculation
return 0L;
}
}

@VisibleForTesting
protected Long calculateMean(final Long currentMean, final Long totalCount, final Long newDataPoint) {
final Long previousCount = totalCount - 1;
final double result = (Double.valueOf(currentMean * previousCount) / totalCount) + (Double.valueOf(newDataPoint) / totalCount);
return (long) result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public interface MessageTracker {

Long getTotalDestinationStateMessagesEmitted();

Long getMaxSecondsToReceiveSourceStateMessage();

Long getMeanSecondsToReceiveSourceStateMessage();

AirbyteTraceMessage getFirstDestinationErrorTraceMessage();

AirbyteTraceMessage getFirstSourceErrorTraceMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(1L);
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
when(messageTracker.getMaxSecondsToReceiveSourceStateMessage()).thenReturn(5L);
when(messageTracker.getMeanSecondsToReceiveSourceStateMessage()).thenReturn(4L);

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
Expand All @@ -467,6 +469,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
.withBytesEmitted(100L)
.withSourceStateMessagesEmitted(3L)
.withDestinationStateMessagesEmitted(1L)
.withMaxSecondsBeforeSourceStateMessageEmitted(5L)
.withMeanSecondsBeforeSourceStateMessageEmitted(4L)
.withRecordsCommitted(12L)) // since success, should use emitted count
.withStreamStats(Collections.singletonList(
new StreamSyncStats()
Expand All @@ -476,7 +480,9 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
.withRecordsEmitted(12L)
.withRecordsCommitted(12L) // since success, should use emitted count
.withSourceStateMessagesEmitted(null)
.withDestinationStateMessagesEmitted(null)))))
.withDestinationStateMessagesEmitted(null)
.withMaxSecondsBeforeSourceStateMessageEmitted(null)
.withMeanSecondsBeforeSourceStateMessageEmitted(null)))))
.withOutputCatalog(syncInput.getCatalog())
.withState(new State().withState(expectedState));

Expand Down Expand Up @@ -548,6 +554,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
when(messageTracker.getStreamToCommittedRecords()).thenReturn(Optional.of(Collections.singletonMap(STREAM1, 6L)));
when(messageTracker.getMaxSecondsToReceiveSourceStateMessage()).thenReturn(10L);
when(messageTracker.getMeanSecondsToReceiveSourceStateMessage()).thenReturn(8L);

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
Expand All @@ -565,6 +573,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
.withBytesEmitted(100L)
.withSourceStateMessagesEmitted(3L)
.withDestinationStateMessagesEmitted(2L)
.withMaxSecondsBeforeSourceStateMessageEmitted(10L)
.withMeanSecondsBeforeSourceStateMessageEmitted(8L)
.withRecordsCommitted(6L);
final List<StreamSyncStats> expectedStreamStats = Collections.singletonList(
new StreamSyncStats()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,13 @@ void testErrorTraceMessageFailureWithNoTraceErrors() throws Exception {
assertEquals(messageTracker.errorTraceMessageFailure(Long.valueOf(123), 1), null);
}

@Test
void testCalculateMean() throws Exception {
// Mean for 3 state messages is 5, 4th state message is 9, new mean should be 6
assertEquals(6L, messageTracker.calculateMean(5L, 4L, 9L));

// Mean for 5 state messages is 10, 4th state message is 12, new mean is 10.33 rounded down to 10
assertEquals(10L, messageTracker.calculateMean(10L, 6L, 12L));
}

}

0 comments on commit 1b1448d

Please sign in to comment.