Skip to content

Commit

Permalink
Add configuration diff (#14603)
Browse files Browse the repository at this point in the history
* Add configuration diff

* Change stream descriptor type

* PR comments

* PR comments

* Format

* move to set

* increase waiting time

* Wait after success

* increase sleep
  • Loading branch information
benmoriceau authored Jul 14, 2022
1 parent 8bbe993 commit 2f0acbf
Show file tree
Hide file tree
Showing 3 changed files with 415 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.collect.Lists;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.api.model.generated.AirbyteCatalog;
import io.airbyte.api.model.generated.AirbyteStreamConfiguration;
import io.airbyte.api.model.generated.CatalogDiff;
import io.airbyte.api.model.generated.ConnectionCreate;
import io.airbyte.api.model.generated.ConnectionRead;
Expand All @@ -20,6 +21,7 @@
import io.airbyte.api.model.generated.DestinationSearch;
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.SourceSearch;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -50,10 +52,13 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -268,6 +273,57 @@ public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog
.toList());
}

/**
* Returns the list of the streamDescriptor that have their config updated.
*
* @param oldCatalog the old catalog
* @param newCatalog the new catalog
* @return the list of StreamDescriptor that have their configuration changed
*/
public Set<StreamDescriptor> getConfigurationDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
final Map<StreamDescriptor, AirbyteStreamConfiguration> oldStreams = catalogToPerStreamConfiguration(oldCatalog);
final Map<StreamDescriptor, AirbyteStreamConfiguration> newStreams = catalogToPerStreamConfiguration(newCatalog);

final Set<StreamDescriptor> streamWithDifferentConf = new HashSet<>();

newStreams.forEach(((streamDescriptor, airbyteStreamConfiguration) -> {
final AirbyteStreamConfiguration oldConfig = oldStreams.get(streamDescriptor);

if (oldConfig == null) {
// The stream is a new one, the config has not change and it needs to be in the schema change list.
} else {
if (haveConfigChange(oldConfig, airbyteStreamConfiguration)) {
streamWithDifferentConf.add(streamDescriptor);
}
}
}));

return streamWithDifferentConf;
}

private boolean haveConfigChange(final AirbyteStreamConfiguration oldConfig, final AirbyteStreamConfiguration newConfig) {
final List<String> oldCursors = oldConfig.getCursorField();
final List<String> newCursors = newConfig.getCursorField();
final boolean hasCursorChanged = !(oldCursors.equals(newCursors));

final boolean hasSyncModeChanged = !oldConfig.getSyncMode().equals(newConfig.getSyncMode());

final boolean hasDestinationSyncModeChanged = !oldConfig.getDestinationSyncMode().equals(newConfig.getDestinationSyncMode());

final Set<List<String>> convertedOldPrimaryKey = new HashSet<>(oldConfig.getPrimaryKey());
final Set<List<String>> convertedNewPrimaryKey = new HashSet<>(newConfig.getPrimaryKey());
final boolean hasPrimaryKeyChanged = !(convertedOldPrimaryKey.equals(convertedNewPrimaryKey));

return hasCursorChanged || hasSyncModeChanged || hasDestinationSyncModeChanged || hasPrimaryKeyChanged;
}

private Map<StreamDescriptor, AirbyteStreamConfiguration> catalogToPerStreamConfiguration(final AirbyteCatalog catalog) {
return catalog.getStreams().stream().collect(Collectors.toMap(stream -> new StreamDescriptor()
.name(stream.getStream().getName())
.namespace(stream.getStream().getNamespace()),
stream -> stream.getConfig()));
}

public Optional<AirbyteCatalog> getConnectionAirbyteCatalog(final UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSync connection = configRepository.getStandardSync(connectionId);
Expand Down
Loading

0 comments on commit 2f0acbf

Please sign in to comment.