From ac09c8016c8b580e54e01ce0a55a737c8bb60dc3 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Wed, 2 Aug 2023 17:11:18 -0400 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Source=20MongoDB=20Internal=20POC:?= =?UTF-8?q?=20Code=20Cleanup=20(#29006)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Cleanup from previous PR's * Formatting * Remove unused dependency * Change display name --- .../source-mongodb-internal-poc/build.gradle | 2 - .../source-mongodb-internal-poc/metadata.yaml | 2 +- .../internal/MongoConnectionUtils.java | 18 +- .../mongodb/internal/MongoDbSource.java | 118 +----------- .../source/mongodb/internal/MongoUtil.java | 134 +++++++++++++ .../src/main/resources/spec.json | 2 +- .../mongodb/internal/MongoDbSourceTest.java | 182 ++++++++++++++++++ .../authorized_collections_response.json | 24 +++ .../no_authorized_collections_response.json | 7 + .../resources/schema_discovery_response.json | 11 ++ 10 files changed, 383 insertions(+), 117 deletions(-) create mode 100644 airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java create mode 100644 airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java create mode 100644 airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/authorized_collections_response.json create mode 100644 airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/no_authorized_collections_response.json create mode 100644 airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/schema_discovery_response.json diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/build.gradle b/airbyte-integrations/connectors/source-mongodb-internal-poc/build.gradle index 0df22cab0c78..7e4ea7008ec9 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/build.gradle @@ -20,8 +20,6 @@ dependencies { implementation 'org.mongodb:mongodb-driver-sync:4.10.2' - testImplementation libs.connectors.testcontainers.mongodb - integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-mongodb-internal-poc') integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml index ed8a5f809df7..94a7715175bc 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml @@ -7,7 +7,7 @@ data: githubIssueLabel: source-mongodb-internal-poc icon: mongodb.svg license: ELv2 - name: MongoDb + name: MongoDb POC registries: cloud: enabled: true diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoConnectionUtils.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoConnectionUtils.java index 107773d9315a..b5f387e3a712 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoConnectionUtils.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoConnectionUtils.java @@ -32,11 +32,8 @@ public class MongoConnectionUtils { */ public static MongoClient createMongoClient(final JsonNode config) { final String authSource = config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(); - final String connectionString = config.get(CONNECTION_STRING_CONFIGURATION_KEY).asText(); - final String replicaSet = config.get(REPLICA_SET_CONFIGURATION_KEY).asText(); - final ConnectionString mongoConnectionString = new ConnectionString(connectionString + "?replicaSet=" + - replicaSet + "&retryWrites=false&provider=airbyte&tls=true"); + final ConnectionString mongoConnectionString = new ConnectionString(buildConnectionString(config)); final MongoDriverInformation mongoDriverInformation = MongoDriverInformation.builder() .driverName("Airbyte") @@ -55,4 +52,17 @@ public static MongoClient createMongoClient(final JsonNode config) { return MongoClients.create(mongoClientSettingsBuilder.build(), mongoDriverInformation); } + private static String buildConnectionString(final JsonNode config) { + final String connectionString = config.get(CONNECTION_STRING_CONFIGURATION_KEY).asText(); + final String replicaSet = config.get(REPLICA_SET_CONFIGURATION_KEY).asText(); + final StringBuilder builder = new StringBuilder(); + builder.append(connectionString); + builder.append("?replicaSet="); + builder.append(replicaSet); + builder.append("&retryWrites=false"); + builder.append("&provider=airbyte"); + builder.append("&tls=true"); + return builder.toString(); + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSource.java index 2143620538ae..317a7917228b 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSource.java @@ -7,35 +7,18 @@ import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.DATABASE_CONFIGURATION_KEY; import com.fasterxml.jackson.databind.JsonNode; -import com.mongodb.MongoCommandException; -import com.mongodb.MongoException; -import com.mongodb.MongoSecurityException; -import com.mongodb.client.AggregateIterable; import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; import com.mongodb.connection.ClusterType; -import io.airbyte.commons.exceptions.ConnectionErrorException; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,12 +26,6 @@ public class MongoDbSource extends BaseConnector implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class); - /** - * Set of collection prefixes that should be ignored when performing operations, such as discover to - * avoid access issues. - */ - private static final Set IGNORED_COLLECTIONS = Set.of("system.", "replset.", "oplog."); - public static void main(final String[] args) throws Exception { final Source source = new MongoDbSource(); LOGGER.info("starting source: {}", MongoDbSource.class); @@ -58,7 +35,7 @@ public static void main(final String[] args) throws Exception { @Override public AirbyteConnectionStatus check(final JsonNode config) { - try (final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config)) { + try (final MongoClient mongoClient = createMongoClient(config)) { final String databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText(); /* @@ -66,7 +43,7 @@ public AirbyteConnectionStatus check(final JsonNode config) { * needs to actually execute a command in order to fetch the cluster description. Querying for the * authorized collections guarantees that the cluster description will be available to the driver. */ - if (getAuthorizedCollections(mongoClient, databaseName).isEmpty()) { + if (MongoUtil.getAuthorizedCollections(mongoClient, databaseName).isEmpty()) { return new AirbyteConnectionStatus() .withMessage("Target MongoDB database does not contain any authorized collections.") .withStatus(AirbyteConnectionStatus.Status.FAILED); @@ -89,8 +66,11 @@ public AirbyteConnectionStatus check(final JsonNode config) { @Override public AirbyteCatalog discover(final JsonNode config) { - final List streams = discoverInternal(config); - return new AirbyteCatalog().withStreams(streams); + try (final MongoClient mongoClient = createMongoClient(config)) { + final String databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText(); + final List streams = MongoUtil.getAirbyteStreams(mongoClient, databaseName); + return new AirbyteCatalog().withStreams(streams); + } } @Override @@ -101,88 +81,8 @@ public AutoCloseableIterator read(final JsonNode config, return null; } - private Set getAuthorizedCollections(final MongoClient mongoClient, final String databaseName) { - /* - * db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command - * returns only those collections for which the user has privileges. For example, if a user has find - * action on specific collections, the command returns only those collections; or, if a user has - * find or any other action, on the database resource, the command lists all collections in the - * database. - */ - try { - final Document document = mongoClient.getDatabase(databaseName).runCommand(new Document("listCollections", 1) - .append("authorizedCollections", true) - .append("nameOnly", true)) - .append("filter", "{ 'type': 'collection' }"); - return document.toBsonDocument() - .get("cursor").asDocument() - .getArray("firstBatch") - .stream() - .map(bsonValue -> bsonValue.asDocument().getString("name").getValue()) - .filter(this::isSupportedCollection) - .collect(Collectors.toSet()); - } catch (final MongoSecurityException e) { - final MongoCommandException exception = (MongoCommandException) e.getCause(); - throw new ConnectionErrorException(String.valueOf(exception.getCode()), e); - } catch (final MongoException e) { - throw new ConnectionErrorException(String.valueOf(e.getCode()), e); - } - } - - private List discoverInternal(final JsonNode config) { - final List streams = new ArrayList<>(); - try (final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config)) { - final String databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText(); - final Set authorizedCollections = getAuthorizedCollections(mongoClient, databaseName); - authorizedCollections.parallelStream().forEach(collectionName -> { - final List fields = getFields(mongoClient.getDatabase(databaseName).getCollection(collectionName)); - streams.add(CatalogHelpers.createAirbyteStream(collectionName, "", fields)); - }); - return streams; - } - } - - private List getFields(final MongoCollection collection) { - final Map fieldsMap = Map.of("input", Map.of("$objectToArray", "$$ROOT"), - "as", "each", - "in", Map.of("k", "$$each.k", "v", Map.of("$type", "$$each.v"))); - - final Document mapFunction = new Document("$map", fieldsMap); - final Document arrayToObjectAggregation = new Document("$arrayToObject", mapFunction); - final Document projection = new Document("$project", new Document("fields", arrayToObjectAggregation)); - - final Map groupMap = new HashMap<>(); - groupMap.put("_id", null); - groupMap.put("fields", Map.of("$addToSet", "$fields")); - - final AggregateIterable output = collection.aggregate(Arrays.asList( - projection, - new Document("$unwind", "$fields"), - new Document("$group", groupMap))); - - final MongoCursor cursor = output.cursor(); - if (cursor.hasNext()) { - final Map fields = ((List>) output.cursor().next().get("fields")).get(0); - return fields.entrySet().stream() - .map(e -> new Field(e.getKey(), convertToSchemaType(e.getValue()))) - .collect(Collectors.toList()); - } else { - return List.of(); - } - } - - private JsonSchemaType convertToSchemaType(final String type) { - return switch (type) { - case "boolean" -> JsonSchemaType.BOOLEAN; - case "int", "long", "double", "decimal" -> JsonSchemaType.NUMBER; - case "array" -> JsonSchemaType.ARRAY; - case "object", "javascriptWithScope" -> JsonSchemaType.OBJECT; - default -> JsonSchemaType.STRING; - }; - } - - private boolean isSupportedCollection(final String collectionName) { - return !IGNORED_COLLECTIONS.stream().anyMatch(s -> collectionName.startsWith(s)); + protected MongoClient createMongoClient(final JsonNode config) { + return MongoConnectionUtils.createMongoClient(config); } } diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java new file mode 100644 index 000000000000..ebd1beef8127 --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mongodb.internal; + +import com.mongodb.MongoCommandException; +import com.mongodb.MongoException; +import com.mongodb.MongoSecurityException; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import io.airbyte.commons.exceptions.ConnectionErrorException; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.v0.AirbyteStream; +import io.airbyte.protocol.models.v0.CatalogHelpers; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.bson.Document; + +public class MongoUtil { + + /** + * Set of collection prefixes that should be ignored when performing operations, such as discover to + * avoid access issues. + */ + private static final Set IGNORED_COLLECTIONS = Set.of("system.", "replset.", "oplog."); + + /** + * Returns the set of collections that the current credentials are authorized to access. + * + * @param mongoClient The {@link MongoClient} used to query the MongoDB server for authorized + * collections. + * @param databaseName The name of the database to query for authorized collections. + * @return The set of authorized collection names (may be empty). + * @throws ConnectionErrorException if unable to perform the authorized collection query. + */ + public static Set getAuthorizedCollections(final MongoClient mongoClient, final String databaseName) { + /* + * db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command + * returns only those collections for which the user has privileges. For example, if a user has find + * action on specific collections, the command returns only those collections; or, if a user has + * find or any other action, on the database resource, the command lists all collections in the + * database. + */ + try { + final Document document = mongoClient.getDatabase(databaseName).runCommand(new Document("listCollections", 1) + .append("authorizedCollections", true) + .append("nameOnly", true)) + .append("filter", "{ 'type': 'collection' }"); + return document.toBsonDocument() + .get("cursor").asDocument() + .getArray("firstBatch") + .stream() + .map(bsonValue -> bsonValue.asDocument().getString("name").getValue()) + .filter(MongoUtil::isSupportedCollection) + .collect(Collectors.toSet()); + } catch (final MongoSecurityException e) { + final MongoCommandException exception = (MongoCommandException) e.getCause(); + throw new ConnectionErrorException(String.valueOf(exception.getCode()), e); + } catch (final MongoException e) { + throw new ConnectionErrorException(String.valueOf(e.getCode()), e); + } + } + + /** + * Retrieves the {@link AirbyteStream}s available to the source by querying the MongoDB server. + * + * @param mongoClient The {@link MongoClient} used to query the MongoDB server. + * @param databaseName The name of the database to query for collections. + * @return The list of {@link AirbyteStream}s that map to the available collections in the provided + * database. + */ + public static List getAirbyteStreams(final MongoClient mongoClient, final String databaseName) { + final List streams = new ArrayList<>(); + final Set authorizedCollections = getAuthorizedCollections(mongoClient, databaseName); + authorizedCollections.parallelStream().forEach(collectionName -> { + final List fields = getFieldsInCollection(mongoClient.getDatabase(databaseName).getCollection(collectionName)); + streams.add(CatalogHelpers.createAirbyteStream(collectionName, databaseName, fields)); + }); + return streams; + } + + private static List getFieldsInCollection(final MongoCollection collection) { + final Map fieldsMap = Map.of("input", Map.of("$objectToArray", "$$ROOT"), + "as", "each", + "in", Map.of("k", "$$each.k", "v", Map.of("$type", "$$each.v"))); + + final Document mapFunction = new Document("$map", fieldsMap); + final Document arrayToObjectAggregation = new Document("$arrayToObject", mapFunction); + final Document projection = new Document("$project", new Document("fields", arrayToObjectAggregation)); + + final Map groupMap = new HashMap<>(); + groupMap.put("_id", null); + groupMap.put("fields", Map.of("$addToSet", "$fields")); + + final AggregateIterable output = collection.aggregate(Arrays.asList( + projection, + new Document("$unwind", "$fields"), + new Document("$group", groupMap))); + + final MongoCursor cursor = output.cursor(); + if (cursor.hasNext()) { + final Map fields = ((List>) output.cursor().next().get("fields")).get(0); + return fields.entrySet().stream() + .map(e -> new Field(e.getKey(), convertToSchemaType(e.getValue()))) + .collect(Collectors.toList()); + } else { + return List.of(); + } + } + + private static JsonSchemaType convertToSchemaType(final String type) { + return switch (type) { + case "boolean" -> JsonSchemaType.BOOLEAN; + case "int", "long", "double", "decimal" -> JsonSchemaType.NUMBER; + case "array" -> JsonSchemaType.ARRAY; + case "object", "javascriptWithScope" -> JsonSchemaType.OBJECT; + default -> JsonSchemaType.STRING; + }; + } + + private static boolean isSupportedCollection(final String collectionName) { + return !IGNORED_COLLECTIONS.stream().anyMatch(s -> collectionName.startsWith(s)); + } + +} diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/resources/spec.json index 4cafb493621b..48887b976230 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/resources/spec.json @@ -12,7 +12,7 @@ "title": "Connection String", "type": "string", "description": "The connection string of the database that you want to replicate..", - "examples": ["mongodb+srv://example.mongodb.net", "mongodb://example1.host.com:27017,example2.host.com:27017,example3.host.com:27017", "mongodb://example.host.com:27017"], + "examples": ["mongodb+srv://example.mongodb.net/", "mongodb://example1.host.com:27017,example2.host.com:27017,example3.host.com:27017/", "mongodb://example.host.com:27017/"], "order": 1 }, "database": { diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java new file mode 100644 index 000000000000..fba770b133ed --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mongodb.internal; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.JsonNode; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.connection.ClusterDescription; +import com.mongodb.connection.ClusterType; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.v0.AirbyteCatalog; +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; +import io.airbyte.protocol.models.v0.AirbyteStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.bson.Document; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class MongoDbSourceTest { + + private static final String DB_NAME = "airbyte_test"; + + private JsonNode airbyteSourceConfig; + private MongoClient mongoClient; + private MongoDbSource source; + + @BeforeEach + void setup() { + airbyteSourceConfig = createConfiguration(Optional.empty(), Optional.empty()); + mongoClient = mock(MongoClient.class); + source = spy(new MongoDbSource()); + doReturn(mongoClient).when(source).createMongoClient(airbyteSourceConfig); + } + + @Test + void testCheckOperation() throws IOException { + final ClusterDescription clusterDescription = mock(ClusterDescription.class); + final Document response = Document.parse(MoreResources.readResource("authorized_collections_response.json")); + final MongoDatabase mongoDatabase = mock(MongoDatabase.class); + + when(clusterDescription.getType()).thenReturn(ClusterType.REPLICA_SET); + when(mongoDatabase.runCommand(any())).thenReturn(response); + when(mongoClient.getDatabase(any())).thenReturn(mongoDatabase); + when(mongoClient.getClusterDescription()).thenReturn(clusterDescription); + + final AirbyteConnectionStatus airbyteConnectionStatus = source.check(airbyteSourceConfig); + assertNotNull(airbyteConnectionStatus); + assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, airbyteConnectionStatus.getStatus()); + } + + @Test + void testCheckOperationNoAuthorizedCollections() throws IOException { + final ClusterDescription clusterDescription = mock(ClusterDescription.class); + final Document response = Document.parse(MoreResources.readResource("no_authorized_collections_response.json")); + final MongoDatabase mongoDatabase = mock(MongoDatabase.class); + + when(clusterDescription.getType()).thenReturn(ClusterType.REPLICA_SET); + when(mongoDatabase.runCommand(any())).thenReturn(response); + when(mongoClient.getDatabase(any())).thenReturn(mongoDatabase); + when(mongoClient.getClusterDescription()).thenReturn(clusterDescription); + + final AirbyteConnectionStatus airbyteConnectionStatus = source.check(airbyteSourceConfig); + assertNotNull(airbyteConnectionStatus); + assertEquals(AirbyteConnectionStatus.Status.FAILED, airbyteConnectionStatus.getStatus()); + assertEquals("Target MongoDB database does not contain any authorized collections.", airbyteConnectionStatus.getMessage()); + } + + @Test + void testCheckOperationInvalidClusterType() throws IOException { + final ClusterDescription clusterDescription = mock(ClusterDescription.class); + final Document response = Document.parse(MoreResources.readResource("authorized_collections_response.json")); + final MongoDatabase mongoDatabase = mock(MongoDatabase.class); + + when(clusterDescription.getType()).thenReturn(ClusterType.STANDALONE); + when(mongoDatabase.runCommand(any())).thenReturn(response); + when(mongoClient.getDatabase(any())).thenReturn(mongoDatabase); + when(mongoClient.getClusterDescription()).thenReturn(clusterDescription); + + final AirbyteConnectionStatus airbyteConnectionStatus = source.check(airbyteSourceConfig); + assertNotNull(airbyteConnectionStatus); + assertEquals(AirbyteConnectionStatus.Status.FAILED, airbyteConnectionStatus.getStatus()); + assertEquals("Target MongoDB instance is not a replica set cluster.", airbyteConnectionStatus.getMessage()); + } + + @Test + void testCheckOperationUnexpectedException() { + final String expectedMessage = "This is just a test failure."; + when(mongoClient.getDatabase(any())).thenThrow(new IllegalArgumentException(expectedMessage)); + + final AirbyteConnectionStatus airbyteConnectionStatus = source.check(airbyteSourceConfig); + assertNotNull(airbyteConnectionStatus); + assertEquals(AirbyteConnectionStatus.Status.FAILED, airbyteConnectionStatus.getStatus()); + assertEquals(expectedMessage, airbyteConnectionStatus.getMessage()); + } + + @Test + void testDiscoverOperation() throws IOException { + final AggregateIterable aggregateIterable = mock(AggregateIterable.class); + final Document schemaDiscoveryResponse = Document.parse(MoreResources.readResource("schema_discovery_response.json")); + final Document authorizedCollectionsResponse = Document.parse(MoreResources.readResource("authorized_collections_response.json")); + final MongoCollection mongoCollection = mock(MongoCollection.class); + final MongoCursor cursor = mock(MongoCursor.class); + final MongoDatabase mongoDatabase = mock(MongoDatabase.class); + + when(cursor.hasNext()).thenReturn(true); + when(cursor.next()).thenReturn(schemaDiscoveryResponse); + when(aggregateIterable.cursor()).thenReturn(cursor); + when(mongoCollection.aggregate(any())).thenReturn(aggregateIterable); + when(mongoDatabase.getCollection(any())).thenReturn(mongoCollection); + when(mongoDatabase.runCommand(any())).thenReturn(authorizedCollectionsResponse); + when(mongoClient.getDatabase(any())).thenReturn(mongoDatabase); + + final AirbyteCatalog airbyteCatalog = source.discover(airbyteSourceConfig); + + assertNotNull(airbyteCatalog); + assertEquals(1, airbyteCatalog.getStreams().size()); + + final Optional stream = airbyteCatalog.getStreams().stream().findFirst(); + assertTrue(stream.isPresent()); + assertEquals(DB_NAME, stream.get().getNamespace()); + assertEquals("testCollection", stream.get().getName()); + assertEquals("string", stream.get().getJsonSchema().get("properties").get("_id").get("type").asText()); + assertEquals("string", stream.get().getJsonSchema().get("properties").get("name").get("type").asText()); + assertEquals("string", stream.get().getJsonSchema().get("properties").get("last_updated").get("type").asText()); + assertEquals("number", stream.get().getJsonSchema().get("properties").get("total").get("type").asText()); + assertEquals("number", stream.get().getJsonSchema().get("properties").get("price").get("type").asText()); + assertEquals("array", stream.get().getJsonSchema().get("properties").get("items").get("type").asText()); + assertEquals("object", stream.get().getJsonSchema().get("properties").get("owners").get("type").asText()); + } + + @Test + void testDiscoverOperationWithUnexpectedFailure() throws IOException { + final String expectedMessage = "This is just a test failure."; + when(mongoClient.getDatabase(any())).thenThrow(new IllegalArgumentException(expectedMessage)); + + assertThrows(IllegalArgumentException.class, () -> source.discover(airbyteSourceConfig)); + } + + @Test + void testFullRefresh() throws Exception { + // TODO implement + } + + @Test + void testIncrementalRefresh() throws Exception { + // TODO implement + } + + private static JsonNode createConfiguration(final Optional username, final Optional password) { + final Map config = new HashMap<>(); + final Map baseConfig = Map.of( + MongoConstants.DATABASE_CONFIGURATION_KEY, DB_NAME, + MongoConstants.CONNECTION_STRING_CONFIGURATION_KEY, "mongodb://localhost:27017/", + MongoConstants.AUTH_SOURCE_CONFIGURATION_KEY, "admin", + MongoConstants.REPLICA_SET_CONFIGURATION_KEY, "replica-set"); + + config.putAll(baseConfig); + username.ifPresent(u -> config.put(MongoConstants.USER_CONFIGURATION_KEY, u)); + password.ifPresent(p -> config.put(MongoConstants.PASSWORD_CONFIGURATION_KEY, p)); + return Jsons.deserialize(Jsons.serialize(config)); + } + +} diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/authorized_collections_response.json b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/authorized_collections_response.json new file mode 100644 index 000000000000..4dcecc75a45e --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/authorized_collections_response.json @@ -0,0 +1,24 @@ +{ + "cursor": { + "id": 0, + "ns": "sample_airbnb.$cmd.listCollections", + "firstBatch": [ + { + "name": "testCollection", + "type": "collection", + "options": {}, + "info": { + "readOnly": false, + "uuid": "68fdfd7d-7cbf-41c2-aa65-277a6cdc478e" + }, + "idIndex": { + "v": 2, + "key": { + "_id": 1 + }, + "name": "_id_" + } + } + ] + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/no_authorized_collections_response.json b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/no_authorized_collections_response.json new file mode 100644 index 000000000000..65960397bfd8 --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/no_authorized_collections_response.json @@ -0,0 +1,7 @@ +{ + "cursor": { + "id": 0, + "ns": "sample_airbnb.$cmd.listCollections", + "firstBatch": [] + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/schema_discovery_response.json b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/schema_discovery_response.json new file mode 100644 index 000000000000..a8e6bf542cc7 --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/resources/schema_discovery_response.json @@ -0,0 +1,11 @@ +{ + "fields": [{ + "_id" : "string", + "name" : "string", + "last_updated" : "date", + "total" : "int", + "price" : "decimal", + "items" : "array", + "owners" : "object" + }] +} \ No newline at end of file