Skip to content

Commit

Permalink
Postgres Source: decrease number of state messages (#20124)
Browse files Browse the repository at this point in the history
* Postgres Source: decrease number of state messages

* add comment
  • Loading branch information
VitaliiMaltsev authored Dec 14, 2022
1 parent 6fb1971 commit 10b67fb
Showing 1 changed file with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected AirbyteMessage computeNext() {
if (stateEmissionFrequency > 0 && !Objects.equals(currentMaxCursor, initialCursor) && messageIterator.hasNext()) {
// Only emit an intermediate state when it is not the first or last record message,
// because the last state message will be taken care of in a different branch.
intermediateStateMessage = createStateMessage(false);
intermediateStateMessage = createStateMessage(false, totalRecordCount);
}
currentMaxCursor = cursorCandidate;
currentMaxCursorRecordCount = 1L;
Expand All @@ -154,7 +154,7 @@ protected AirbyteMessage computeNext() {
return optionalIntermediateMessage.orElse(endOfData());
}
} else if (!hasEmittedFinalState) {
return createStateMessage(true);
return createStateMessage(true, totalRecordCount);
} else {
return endOfData();
}
Expand Down Expand Up @@ -186,20 +186,23 @@ protected final Optional<AirbyteMessage> getIntermediateMessage() {
* read up so far
*
* @param isFinalState marker for if the final state of the iterator has been reached
* @param totalRecordCount count of read messages
* @return AirbyteMessage which includes information on state of records read so far
*/
public AirbyteMessage createStateMessage(final boolean isFinalState) {
public AirbyteMessage createStateMessage(final boolean isFinalState, int totalRecordCount) {
final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, currentMaxCursor, currentMaxCursorRecordCount);
final Optional<CursorInfo> cursorInfo = stateManager.getCursorInfo(pair);
LOGGER.info("State report for stream {} - original: {} = {} (count {}) -> latest: {} = {} (count {})",
pair,
cursorInfo.map(CursorInfo::getOriginalCursorField).orElse(null),
cursorInfo.map(CursorInfo::getOriginalCursor).orElse(null),
cursorInfo.map(CursorInfo::getOriginalCursorRecordCount).orElse(null),
cursorInfo.map(CursorInfo::getCursorField).orElse(null),
cursorInfo.map(CursorInfo::getCursor).orElse(null),
cursorInfo.map(CursorInfo::getCursorRecordCount).orElse(null));

// logging once every 100 messages to reduce log verbosity
if (totalRecordCount % 100 == 0) {
LOGGER.info("State report for stream {} - original: {} = {} (count {}) -> latest: {} = {} (count {})",
pair,
cursorInfo.map(CursorInfo::getOriginalCursorField).orElse(null),
cursorInfo.map(CursorInfo::getOriginalCursor).orElse(null),
cursorInfo.map(CursorInfo::getOriginalCursorRecordCount).orElse(null),
cursorInfo.map(CursorInfo::getCursorField).orElse(null),
cursorInfo.map(CursorInfo::getCursor).orElse(null),
cursorInfo.map(CursorInfo::getCursorRecordCount).orElse(null));
}
if (isFinalState) {
hasEmittedFinalState = true;
if (stateManager.getCursor(pair).isEmpty()) {
Expand Down

0 comments on commit 10b67fb

Please sign in to comment.