Skip to content

Commit

Permalink
Fix test issues
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jun 13, 2022
1 parent 9058389 commit 0415c0f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import java.math.BigDecimal;
import java.sql.SQLException;
Expand Down Expand Up @@ -976,9 +977,9 @@ protected boolean supportsPerStream() {
protected JsonNode createEmptyState(final String streamName, final String streamNamespace) {
if (supportsPerStream()) {
final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage()
.withStateType(AirbyteStateType.PER_STREAM)
.withStreams(List.of(new AirbyteStreamState().withName(streamName).withNamespace(streamNamespace)));
return Jsons.jsonNode(airbyteStateMessage);
.withStateType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName(streamName).withNamespace(streamNamespace)));
return Jsons.jsonNode(List.of(airbyteStateMessage));
} else {
final DbState dbState = new DbState()
.withStreams(List.of(new DbStreamState().withStreamName(streamName).withStreamNamespace(streamNamespace)));
Expand All @@ -994,13 +995,13 @@ protected JsonNode createEmptyState(final String streamName, final String stream
*/
protected JsonNode createState(final List<DbStreamState> streams) {
if (supportsPerStream()) {
final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage()
.withStateType(AirbyteStateType.PER_STREAM)
.withStreams(streams.stream()
.map(s -> new AirbyteStreamState().withName(s.getStreamName()).withNamespace(s.getStreamNamespace()).withState(Jsons.jsonNode(s)))
.collect(Collectors.toList()));

return Jsons.jsonNode(airbyteStateMessage);
final List<AirbyteStateMessage> messages = streams.stream()
.map(s -> new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withName(s.getStreamName()).withNamespace(s.getStreamNamespace()))
.withStreamState(Jsons.jsonNode(s))))
.collect(Collectors.toList());
return Jsons.jsonNode(messages);
} else {
final DbState dbState = new DbState()
.withStreams(streams.stream().collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ public AirbyteMessage saveState(final Map<String, String> offset, final String d
LOGGER.info("debezium state: {}", asJson);
final CdcState cdcState = new CdcState().withState(asJson);
stateManager.getCdcStateManager().setCdcState(cdcState);
final AirbyteStateMessage stateMessage = stateManager.emit();
/*
* Namespace pair is ignored by global state manager, but is needed for satisfy the API contract.
* Therefore, it doesn't matter what we pass here, as it will be ignored.
*/
final AirbyteStateMessage stateMessage = stateManager.emit(null);
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
}

Expand Down

0 comments on commit 0415c0f

Please sign in to comment.