Skip to content

Commit

Permalink
🎉 New API endpoint to search connections (#4813)
Browse files Browse the repository at this point in the history
* New search endpoint for web backend

* Unit tests

* Remove recreate endpoints

* Format
  • Loading branch information
mmolimar authored Sep 23, 2021
1 parent 9a6cf1e commit 98b1900
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 0 deletions.
82 changes: 82 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,29 @@ paths:
$ref: "#/components/schemas/WebBackendConnectionRead"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/web_backend/connections/search:
post:
tags:
- web_backend
summary: Search connections
operationId: webBackendConnectionSearch
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/WebBackendConnectionSearch"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/WebBackendConnectionReadList"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/web_backend/sources/create:
post:
tags:
Expand Down Expand Up @@ -2046,6 +2069,21 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
jobInfo:
$ref: "#/components/schemas/SynchronousJobRead"
SourceSearch:
type: object
properties:
sourceDefinitionId:
$ref: "#/components/schemas/SourceDefinitionId"
sourceId:
$ref: "#/components/schemas/SourceId"
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
connectionConfiguration:
$ref: "#/components/schemas/SourceConfiguration"
name:
type: string
sourceName:
type: string
# DESTINATION DEFINITION
DestinationDefinitionId:
type: string
Expand Down Expand Up @@ -2232,6 +2270,21 @@ components:
type: array
items:
$ref: "#/components/schemas/DestinationRead"
DestinationSearch:
type: object
properties:
destinationDefinitionId:
$ref: "#/components/schemas/DestinationDefinitionId"
destinationId:
$ref: "#/components/schemas/DestinationId"
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
connectionConfiguration:
$ref: "#/components/schemas/DestinationConfiguration"
name:
type: string
destinationName:
type: string
# CONNECTION
ConnectionId:
type: string
Expand Down Expand Up @@ -2444,6 +2497,35 @@ components:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
$ref: "#/components/schemas/ResourceRequirements"
WebBackendConnectionSearch:
type: object
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
name:
type: string
namespaceDefinition:
$ref: "#/components/schemas/NamespaceDefinitionType"
namespaceFormat:
type: string
description: Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If "${SOURCE_NAMESPACE}" then behaves like namespaceDefinition = 'source'.
default: null
example: "${SOURCE_NAMESPACE}"
prefix:
type: string
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
sourceId:
$ref: "#/components/schemas/SourceId"
destinationId:
$ref: "#/components/schemas/DestinationId"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
status:
$ref: "#/components/schemas/ConnectionStatus"
source:
$ref: "#/components/schemas/SourceSearch"
destination:
$ref: "#/components/schemas/DestinationSearch"
ConnectionReadList:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import io.airbyte.api.model.WebBackendConnectionRead;
import io.airbyte.api.model.WebBackendConnectionReadList;
import io.airbyte.api.model.WebBackendConnectionRequestBody;
import io.airbyte.api.model.WebBackendConnectionSearch;
import io.airbyte.api.model.WebBackendConnectionUpdate;
import io.airbyte.api.model.WorkspaceCreate;
import io.airbyte.api.model.WorkspaceIdRequestBody;
Expand Down Expand Up @@ -585,6 +586,11 @@ public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final
return execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody));
}

@Override
public WebBackendConnectionReadList webBackendConnectionSearch(final WebBackendConnectionSearch webBackendConnectionSearch) {
return execute(() -> webBackendConnectionsHandler.webBackendSearchConnections(webBackendConnectionSearch));
}

@Override
public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnectionRequestBody webBackendConnectionRequestBody) {
return execute(() -> webBackendConnectionsHandler.webBackendGetConnection(webBackendConnectionRequestBody));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,19 @@ public ConnectionReadList listConnectionsForWorkspace(WorkspaceIdRequestBody wor
return new ConnectionReadList().connections(connectionReads);
}

public ConnectionReadList listConnections() throws JsonValidationException, ConfigNotFoundException, IOException {
final List<ConnectionRead> connectionReads = Lists.newArrayList();

for (StandardSync standardSync : configRepository.listStandardSyncs()) {
if (standardSync.getStatus() == StandardSync.Status.DEPRECATED) {
continue;
}
connectionReads.add(buildConnectionRead(standardSync.getConnectionId()));
}

return new ConnectionReadList().connections(connectionReads);
}

public ConnectionRead getConnection(ConnectionIdRequestBody connectionIdRequestBody)
throws JsonValidationException, IOException, ConfigNotFoundException {
return buildConnectionRead(connectionIdRequestBody.getConnectionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import static java.util.stream.Collectors.toMap;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand All @@ -39,6 +41,7 @@
import io.airbyte.api.model.ConnectionUpdate;
import io.airbyte.api.model.DestinationIdRequestBody;
import io.airbyte.api.model.DestinationRead;
import io.airbyte.api.model.DestinationSearch;
import io.airbyte.api.model.JobConfigType;
import io.airbyte.api.model.JobListRequestBody;
import io.airbyte.api.model.JobRead;
Expand All @@ -51,10 +54,12 @@
import io.airbyte.api.model.SourceDiscoverSchemaRead;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceRead;
import io.airbyte.api.model.SourceSearch;
import io.airbyte.api.model.WebBackendConnectionCreate;
import io.airbyte.api.model.WebBackendConnectionRead;
import io.airbyte.api.model.WebBackendConnectionReadList;
import io.airbyte.api.model.WebBackendConnectionRequestBody;
import io.airbyte.api.model.WebBackendConnectionSearch;
import io.airbyte.api.model.WebBackendConnectionUpdate;
import io.airbyte.api.model.WebBackendOperationCreateOrUpdate;
import io.airbyte.api.model.WorkspaceIdRequestBody;
Expand All @@ -70,6 +75,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
import org.apache.logging.log4j.util.Strings;

public class WebBackendConnectionsHandler {

Expand Down Expand Up @@ -172,6 +178,116 @@ private void setLatestSyncJobProperties(WebBackendConnectionRead WebBackendConne
});
}

public WebBackendConnectionReadList webBackendSearchConnections(WebBackendConnectionSearch webBackendConnectionSearch)
throws ConfigNotFoundException, IOException, JsonValidationException {

final List<WebBackendConnectionRead> reads = Lists.newArrayList();
for (ConnectionRead connectionRead : connectionsHandler.listConnections().getConnections()) {
if (matchSearch(webBackendConnectionSearch, connectionRead)) {
reads.add(buildWebBackendConnectionRead(connectionRead));
}
}

return new WebBackendConnectionReadList().connections(reads);
}

private boolean matchSearch(WebBackendConnectionSearch connectionSearch, ConnectionRead connectionRead)
throws JsonValidationException, ConfigNotFoundException, IOException {

final ConnectionRead connectionReadFromSearch = fromConnectionSearch(connectionSearch, connectionRead);
final SourceRead sourceRead = sourceHandler.getSource(new SourceIdRequestBody().sourceId(connectionRead.getSourceId()));
final SourceRead sourceReadFromSearch = fromSourceSearch(connectionSearch.getSource(), sourceRead);
final DestinationRead destinationRead =
destinationHandler.getDestination(new DestinationIdRequestBody().destinationId(connectionRead.getDestinationId()));
final DestinationRead destinationReadFromSearch = fromDestinationSearch(connectionSearch.getDestination(), destinationRead);

return (connectionReadFromSearch == null || connectionReadFromSearch.equals(connectionRead)) &&
(sourceReadFromSearch == null || sourceReadFromSearch.equals(sourceRead)) &&
(destinationReadFromSearch == null || destinationReadFromSearch.equals(destinationRead));
}

private ConnectionRead fromConnectionSearch(WebBackendConnectionSearch connectionSearch, ConnectionRead connectionRead) {
if (connectionSearch == null)
return connectionRead;

final ConnectionRead fromSearch = new ConnectionRead();
fromSearch.connectionId(connectionSearch.getConnectionId() == null ? connectionRead.getConnectionId() : connectionSearch.getConnectionId());
fromSearch.destinationId(connectionSearch.getDestinationId() == null ? connectionRead.getDestinationId() : connectionSearch.getDestinationId());
fromSearch.name(Strings.isBlank(connectionSearch.getName()) ? connectionRead.getName() : connectionSearch.getName());
fromSearch.namespaceFormat(Strings.isBlank(connectionSearch.getNamespaceFormat()) || connectionSearch.getNamespaceFormat().equals("null")
? connectionRead.getNamespaceFormat()
: connectionSearch.getNamespaceFormat());
fromSearch.namespaceDefinition(
connectionSearch.getNamespaceDefinition() == null ? connectionRead.getNamespaceDefinition() : connectionSearch.getNamespaceDefinition());
fromSearch.prefix(Strings.isBlank(connectionSearch.getPrefix()) ? connectionRead.getPrefix() : connectionSearch.getPrefix());
fromSearch.schedule(connectionSearch.getSchedule() == null ? connectionRead.getSchedule() : connectionSearch.getSchedule());
fromSearch.sourceId(connectionSearch.getSourceId() == null ? connectionRead.getSourceId() : connectionSearch.getSourceId());
fromSearch.status(connectionSearch.getStatus() == null ? connectionRead.getStatus() : connectionSearch.getStatus());

// these properties are not enabled in the search
fromSearch.resourceRequirements(connectionRead.getResourceRequirements());
fromSearch.syncCatalog(connectionRead.getSyncCatalog());
fromSearch.operationIds(connectionRead.getOperationIds());

return fromSearch;
}

private SourceRead fromSourceSearch(SourceSearch sourceSearch, SourceRead sourceRead) {
if (sourceSearch == null)
return sourceRead;

final SourceRead fromSearch = new SourceRead();
fromSearch.name(Strings.isBlank(sourceSearch.getName()) ? sourceRead.getName() : sourceSearch.getName());
fromSearch
.sourceDefinitionId(sourceSearch.getSourceDefinitionId() == null ? sourceRead.getSourceDefinitionId() : sourceSearch.getSourceDefinitionId());
fromSearch.sourceId(sourceSearch.getSourceId() == null ? sourceRead.getSourceId() : sourceSearch.getSourceId());
fromSearch.sourceName(Strings.isBlank(sourceSearch.getSourceName()) ? sourceRead.getSourceName() : sourceSearch.getSourceName());
fromSearch.workspaceId(sourceSearch.getWorkspaceId() == null ? sourceRead.getWorkspaceId() : sourceSearch.getWorkspaceId());
if (sourceSearch.getConnectionConfiguration() == null) {
fromSearch.connectionConfiguration(sourceRead.getConnectionConfiguration());
} else {
JsonNode connectionConfiguration = sourceSearch.getConnectionConfiguration();
sourceRead.getConnectionConfiguration().fieldNames()
.forEachRemaining(field -> {
if (!connectionConfiguration.has(field) && connectionConfiguration instanceof ObjectNode) {
((ObjectNode) connectionConfiguration).set(field, sourceRead.getConnectionConfiguration().get(field));
}
});
fromSearch.connectionConfiguration(connectionConfiguration);
}

return fromSearch;
}

private DestinationRead fromDestinationSearch(DestinationSearch destinationSearch, DestinationRead destinationRead) {
if (destinationSearch == null)
return destinationRead;

final DestinationRead fromSearch = new DestinationRead();
fromSearch.name(Strings.isBlank(destinationSearch.getName()) ? destinationRead.getName() : destinationSearch.getName());
fromSearch.destinationDefinitionId(destinationSearch.getDestinationDefinitionId() == null ? destinationRead.getDestinationDefinitionId()
: destinationSearch.getDestinationDefinitionId());
fromSearch
.destinationId(destinationSearch.getDestinationId() == null ? destinationRead.getDestinationId() : destinationSearch.getDestinationId());
fromSearch.destinationName(
Strings.isBlank(destinationSearch.getDestinationName()) ? destinationRead.getDestinationName() : destinationSearch.getDestinationName());
fromSearch.workspaceId(destinationSearch.getWorkspaceId() == null ? destinationRead.getWorkspaceId() : destinationSearch.getWorkspaceId());
if (destinationSearch.getConnectionConfiguration() == null) {
fromSearch.connectionConfiguration(destinationRead.getConnectionConfiguration());
} else {
JsonNode connectionConfiguration = destinationSearch.getConnectionConfiguration();
destinationRead.getConnectionConfiguration().fieldNames()
.forEachRemaining(field -> {
if (!connectionConfiguration.has(field) && connectionConfiguration instanceof ObjectNode) {
((ObjectNode) connectionConfiguration).set(field, destinationRead.getConnectionConfiguration().get(field));
}
});
fromSearch.connectionConfiguration(connectionConfiguration);
}

return fromSearch;
}

public WebBackendConnectionRead webBackendGetConnection(WebBackendConnectionRequestBody webBackendConnectionRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,22 @@ void testListConnectionsForWorkspace() throws JsonValidationException, ConfigNot
actualConnectionReadList.getConnections().get(0));
}

@Test
void testListConnections() throws JsonValidationException, ConfigNotFoundException, IOException {
when(configRepository.listStandardSyncs())
.thenReturn(Lists.newArrayList(standardSync));
when(configRepository.getSourceConnection(source.getSourceId()))
.thenReturn(source);
when(configRepository.getStandardSync(standardSync.getConnectionId()))
.thenReturn(standardSync);

final ConnectionReadList actualConnectionReadList = connectionsHandler.listConnections();

assertEquals(
ConnectionHelpers.generateExpectedConnectionRead(standardSync),
actualConnectionReadList.getConnections().get(0));
}

@Test
void testDeleteConnection() throws JsonValidationException, IOException, ConfigNotFoundException {
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(standardSync.getConnectionId());
Expand Down
Loading

0 comments on commit 98b1900

Please sign in to comment.