Skip to content

Commit

Permalink
prevent the cursor or primary key fields from being deselected (#20481)
Browse files Browse the repository at this point in the history
* prevent the cursor or primary key fields from being deselected

* fix format
  • Loading branch information
mfsiega-airbyte authored Dec 14, 2022
1 parent 1fab191 commit b89f3e7
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -66,7 +67,18 @@ private static io.airbyte.protocol.models.AirbyteStream toProtocol(final Airbyte
}
}
// Only include the selected fields.
List<String> selectedFieldNames = config.getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).collect(Collectors.toList());
final Set<String> selectedFieldNames =
config.getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).collect(Collectors.toSet());
// TODO(mfsiega-airbyte): we only check the top level of the cursor/primary key fields because we
// don't support filtering nested fields yet.
if (!selectedFieldNames.contains(config.getCursorField().get(0))) {
throw new JsonValidationException("Cursor field cannot be de-selected");
}
for (final List<String> primaryKeyComponent : config.getPrimaryKey()) {
if (!selectedFieldNames.contains(primaryKeyComponent.get(0))) {
throw new JsonValidationException("Primary key field cannot be de-selected");
}
}
for (final String selectedFieldName : selectedFieldNames) {
if (!properties.has(selectedFieldName)) {
throw new JsonValidationException(String.format("Requested selected field %s not found in JSON schema", selectedFieldName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.server.handlers;

import static io.airbyte.server.helpers.ConnectionHelpers.FIELD_NAME;
import static io.airbyte.server.helpers.ConnectionHelpers.SECOND_FIELD_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -353,6 +354,47 @@ void testCreateConnectionWithSelectedFields() throws IOException, JsonValidation
verify(configRepository).writeStandardSync(standardSync);
}

@Test
void testFieldSelectionRemoveCursorFails() throws JsonValidationException, ConfigNotFoundException, IOException {
// Test that if we try to de-select a field that's being used for the cursor, the request will fail.
// The connection initially has a catalog with one stream, and two fields in that stream.
standardSync.setCatalog(ConnectionHelpers.generateAirbyteCatalogWithTwoFields());

// Send an update that sets a cursor but de-selects that field.
final AirbyteCatalog catalogForUpdate = ConnectionHelpers.generateApiCatalogWithTwoFields();
catalogForUpdate.getStreams().get(0).getConfig()
.fieldSelectionEnabled(true)
.selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)))
.cursorField(List.of(SECOND_FIELD_NAME));

final ConnectionUpdate connectionUpdate = new ConnectionUpdate()
.connectionId(standardSync.getConnectionId())
.syncCatalog(catalogForUpdate);

assertThrows(JsonValidationException.class, () -> connectionsHandler.updateConnection(connectionUpdate));
}

@Test
void testFieldSelectionRemovePrimaryKeyFails() throws JsonValidationException, ConfigNotFoundException, IOException {
// Test that if we try to de-select a field that's being used for the primary key, the request will
// fail.
// The connection initially has a catalog with one stream, and two fields in that stream.
standardSync.setCatalog(ConnectionHelpers.generateAirbyteCatalogWithTwoFields());

// Send an update that sets a primary key but deselects that field.
final AirbyteCatalog catalogForUpdate = ConnectionHelpers.generateApiCatalogWithTwoFields();
catalogForUpdate.getStreams().get(0).getConfig()
.fieldSelectionEnabled(true)
.selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)))
.primaryKey(List.of(List.of(SECOND_FIELD_NAME)));

final ConnectionUpdate connectionUpdate = new ConnectionUpdate()
.connectionId(standardSync.getConnectionId())
.syncCatalog(catalogForUpdate);

assertThrows(JsonValidationException.class, () -> connectionsHandler.updateConnection(connectionUpdate));
}

@Test
void testValidateConnectionCreateSourceAndDestinationInDifferenceWorkspace() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class ConnectionHelpers {
private static final String STREAM_NAME_BASE = "users-data";
private static final String STREAM_NAME = STREAM_NAME_BASE + "0";
public static final String FIELD_NAME = "id";

public static final String SECOND_FIELD_NAME = "id2";
private static final String BASIC_SCHEDULE_TIME_UNIT = "days";
private static final long BASIC_SCHEDULE_UNITS = 1L;
private static final String BASIC_SCHEDULE_DATA_TIME_UNITS = "days";
Expand Down Expand Up @@ -284,7 +286,7 @@ public static JsonNode generateBasicJsonSchema() {
public static JsonNode generateJsonSchemaWithTwoFields() {
return CatalogHelpers.fieldsToJsonSchema(
Field.of(FIELD_NAME, JsonSchemaType.STRING),
Field.of(FIELD_NAME + "2", JsonSchemaType.STRING));
Field.of(SECOND_FIELD_NAME, JsonSchemaType.STRING));
}

public static ConfiguredAirbyteCatalog generateBasicConfiguredAirbyteCatalog() {
Expand Down

0 comments on commit b89f3e7

Please sign in to comment.