Skip to content

Commit

Permalink
✨ Source MongoDB Internal POC: Code Cleanup (#29006)
Browse files Browse the repository at this point in the history
* Cleanup from previous PR's

* Formatting

* Remove unused dependency

* Change display name
  • Loading branch information
jdpgrailsdev authored Aug 2, 2023
1 parent ff2d987 commit ac09c80
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ dependencies {

implementation 'org.mongodb:mongodb-driver-sync:4.10.2'

testImplementation libs.connectors.testcontainers.mongodb

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-mongodb-internal-poc')
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data:
githubIssueLabel: source-mongodb-internal-poc
icon: mongodb.svg
license: ELv2
name: MongoDb
name: MongoDb POC
registries:
cloud:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ public class MongoConnectionUtils {
*/
public static MongoClient createMongoClient(final JsonNode config) {
final String authSource = config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText();
final String connectionString = config.get(CONNECTION_STRING_CONFIGURATION_KEY).asText();
final String replicaSet = config.get(REPLICA_SET_CONFIGURATION_KEY).asText();

final ConnectionString mongoConnectionString = new ConnectionString(connectionString + "?replicaSet=" +
replicaSet + "&retryWrites=false&provider=airbyte&tls=true");
final ConnectionString mongoConnectionString = new ConnectionString(buildConnectionString(config));

final MongoDriverInformation mongoDriverInformation = MongoDriverInformation.builder()
.driverName("Airbyte")
Expand All @@ -55,4 +52,17 @@ public static MongoClient createMongoClient(final JsonNode config) {
return MongoClients.create(mongoClientSettingsBuilder.build(), mongoDriverInformation);
}

private static String buildConnectionString(final JsonNode config) {
final String connectionString = config.get(CONNECTION_STRING_CONFIGURATION_KEY).asText();
final String replicaSet = config.get(REPLICA_SET_CONFIGURATION_KEY).asText();
final StringBuilder builder = new StringBuilder();
builder.append(connectionString);
builder.append("?replicaSet=");
builder.append(replicaSet);
builder.append("&retryWrites=false");
builder.append("&provider=airbyte");
builder.append("&tls=true");
return builder.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,25 @@
import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.DATABASE_CONFIGURATION_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.MongoSecurityException;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.connection.ClusterType;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbSource extends BaseConnector implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);

/**
* Set of collection prefixes that should be ignored when performing operations, such as discover to
* avoid access issues.
*/
private static final Set<String> IGNORED_COLLECTIONS = Set.of("system.", "replset.", "oplog.");

public static void main(final String[] args) throws Exception {
final Source source = new MongoDbSource();
LOGGER.info("starting source: {}", MongoDbSource.class);
Expand All @@ -58,15 +35,15 @@ public static void main(final String[] args) throws Exception {

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try (final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config)) {
try (final MongoClient mongoClient = createMongoClient(config)) {
final String databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText();

/*
* Perform the authorized collections check before the cluster type check. The MongoDB Java driver
* needs to actually execute a command in order to fetch the cluster description. Querying for the
* authorized collections guarantees that the cluster description will be available to the driver.
*/
if (getAuthorizedCollections(mongoClient, databaseName).isEmpty()) {
if (MongoUtil.getAuthorizedCollections(mongoClient, databaseName).isEmpty()) {
return new AirbyteConnectionStatus()
.withMessage("Target MongoDB database does not contain any authorized collections.")
.withStatus(AirbyteConnectionStatus.Status.FAILED);
Expand All @@ -89,8 +66,11 @@ public AirbyteConnectionStatus check(final JsonNode config) {

@Override
public AirbyteCatalog discover(final JsonNode config) {
final List<AirbyteStream> streams = discoverInternal(config);
return new AirbyteCatalog().withStreams(streams);
try (final MongoClient mongoClient = createMongoClient(config)) {
final String databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText();
final List<AirbyteStream> streams = MongoUtil.getAirbyteStreams(mongoClient, databaseName);
return new AirbyteCatalog().withStreams(streams);
}
}

@Override
Expand All @@ -101,88 +81,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
return null;
}

private Set<String> getAuthorizedCollections(final MongoClient mongoClient, final String databaseName) {
/*
* db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command
* returns only those collections for which the user has privileges. For example, if a user has find
* action on specific collections, the command returns only those collections; or, if a user has
* find or any other action, on the database resource, the command lists all collections in the
* database.
*/
try {
final Document document = mongoClient.getDatabase(databaseName).runCommand(new Document("listCollections", 1)
.append("authorizedCollections", true)
.append("nameOnly", true))
.append("filter", "{ 'type': 'collection' }");
return document.toBsonDocument()
.get("cursor").asDocument()
.getArray("firstBatch")
.stream()
.map(bsonValue -> bsonValue.asDocument().getString("name").getValue())
.filter(this::isSupportedCollection)
.collect(Collectors.toSet());
} catch (final MongoSecurityException e) {
final MongoCommandException exception = (MongoCommandException) e.getCause();
throw new ConnectionErrorException(String.valueOf(exception.getCode()), e);
} catch (final MongoException e) {
throw new ConnectionErrorException(String.valueOf(e.getCode()), e);
}
}

private List<AirbyteStream> discoverInternal(final JsonNode config) {
final List<AirbyteStream> streams = new ArrayList<>();
try (final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config)) {
final String databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText();
final Set<String> authorizedCollections = getAuthorizedCollections(mongoClient, databaseName);
authorizedCollections.parallelStream().forEach(collectionName -> {
final List<Field> fields = getFields(mongoClient.getDatabase(databaseName).getCollection(collectionName));
streams.add(CatalogHelpers.createAirbyteStream(collectionName, "", fields));
});
return streams;
}
}

private List<Field> getFields(final MongoCollection collection) {
final Map<String, Object> fieldsMap = Map.of("input", Map.of("$objectToArray", "$$ROOT"),
"as", "each",
"in", Map.of("k", "$$each.k", "v", Map.of("$type", "$$each.v")));

final Document mapFunction = new Document("$map", fieldsMap);
final Document arrayToObjectAggregation = new Document("$arrayToObject", mapFunction);
final Document projection = new Document("$project", new Document("fields", arrayToObjectAggregation));

final Map<String, Object> groupMap = new HashMap<>();
groupMap.put("_id", null);
groupMap.put("fields", Map.of("$addToSet", "$fields"));

final AggregateIterable<Document> output = collection.aggregate(Arrays.asList(
projection,
new Document("$unwind", "$fields"),
new Document("$group", groupMap)));

final MongoCursor<Document> cursor = output.cursor();
if (cursor.hasNext()) {
final Map<String, String> fields = ((List<Map<String, String>>) output.cursor().next().get("fields")).get(0);
return fields.entrySet().stream()
.map(e -> new Field(e.getKey(), convertToSchemaType(e.getValue())))
.collect(Collectors.toList());
} else {
return List.of();
}
}

private JsonSchemaType convertToSchemaType(final String type) {
return switch (type) {
case "boolean" -> JsonSchemaType.BOOLEAN;
case "int", "long", "double", "decimal" -> JsonSchemaType.NUMBER;
case "array" -> JsonSchemaType.ARRAY;
case "object", "javascriptWithScope" -> JsonSchemaType.OBJECT;
default -> JsonSchemaType.STRING;
};
}

private boolean isSupportedCollection(final String collectionName) {
return !IGNORED_COLLECTIONS.stream().anyMatch(s -> collectionName.startsWith(s));
protected MongoClient createMongoClient(final JsonNode config) {
return MongoConnectionUtils.createMongoClient(config);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb.internal;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.MongoSecurityException;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.bson.Document;

public class MongoUtil {

/**
* Set of collection prefixes that should be ignored when performing operations, such as discover to
* avoid access issues.
*/
private static final Set<String> IGNORED_COLLECTIONS = Set.of("system.", "replset.", "oplog.");

/**
* Returns the set of collections that the current credentials are authorized to access.
*
* @param mongoClient The {@link MongoClient} used to query the MongoDB server for authorized
* collections.
* @param databaseName The name of the database to query for authorized collections.
* @return The set of authorized collection names (may be empty).
* @throws ConnectionErrorException if unable to perform the authorized collection query.
*/
public static Set<String> getAuthorizedCollections(final MongoClient mongoClient, final String databaseName) {
/*
* db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command
* returns only those collections for which the user has privileges. For example, if a user has find
* action on specific collections, the command returns only those collections; or, if a user has
* find or any other action, on the database resource, the command lists all collections in the
* database.
*/
try {
final Document document = mongoClient.getDatabase(databaseName).runCommand(new Document("listCollections", 1)
.append("authorizedCollections", true)
.append("nameOnly", true))
.append("filter", "{ 'type': 'collection' }");
return document.toBsonDocument()
.get("cursor").asDocument()
.getArray("firstBatch")
.stream()
.map(bsonValue -> bsonValue.asDocument().getString("name").getValue())
.filter(MongoUtil::isSupportedCollection)
.collect(Collectors.toSet());
} catch (final MongoSecurityException e) {
final MongoCommandException exception = (MongoCommandException) e.getCause();
throw new ConnectionErrorException(String.valueOf(exception.getCode()), e);
} catch (final MongoException e) {
throw new ConnectionErrorException(String.valueOf(e.getCode()), e);
}
}

/**
* Retrieves the {@link AirbyteStream}s available to the source by querying the MongoDB server.
*
* @param mongoClient The {@link MongoClient} used to query the MongoDB server.
* @param databaseName The name of the database to query for collections.
* @return The list of {@link AirbyteStream}s that map to the available collections in the provided
* database.
*/
public static List<AirbyteStream> getAirbyteStreams(final MongoClient mongoClient, final String databaseName) {
final List<AirbyteStream> streams = new ArrayList<>();
final Set<String> authorizedCollections = getAuthorizedCollections(mongoClient, databaseName);
authorizedCollections.parallelStream().forEach(collectionName -> {
final List<Field> fields = getFieldsInCollection(mongoClient.getDatabase(databaseName).getCollection(collectionName));
streams.add(CatalogHelpers.createAirbyteStream(collectionName, databaseName, fields));
});
return streams;
}

private static List<Field> getFieldsInCollection(final MongoCollection collection) {
final Map<String, Object> fieldsMap = Map.of("input", Map.of("$objectToArray", "$$ROOT"),
"as", "each",
"in", Map.of("k", "$$each.k", "v", Map.of("$type", "$$each.v")));

final Document mapFunction = new Document("$map", fieldsMap);
final Document arrayToObjectAggregation = new Document("$arrayToObject", mapFunction);
final Document projection = new Document("$project", new Document("fields", arrayToObjectAggregation));

final Map<String, Object> groupMap = new HashMap<>();
groupMap.put("_id", null);
groupMap.put("fields", Map.of("$addToSet", "$fields"));

final AggregateIterable<Document> output = collection.aggregate(Arrays.asList(
projection,
new Document("$unwind", "$fields"),
new Document("$group", groupMap)));

final MongoCursor<Document> cursor = output.cursor();
if (cursor.hasNext()) {
final Map<String, String> fields = ((List<Map<String, String>>) output.cursor().next().get("fields")).get(0);
return fields.entrySet().stream()
.map(e -> new Field(e.getKey(), convertToSchemaType(e.getValue())))
.collect(Collectors.toList());
} else {
return List.of();
}
}

private static JsonSchemaType convertToSchemaType(final String type) {
return switch (type) {
case "boolean" -> JsonSchemaType.BOOLEAN;
case "int", "long", "double", "decimal" -> JsonSchemaType.NUMBER;
case "array" -> JsonSchemaType.ARRAY;
case "object", "javascriptWithScope" -> JsonSchemaType.OBJECT;
default -> JsonSchemaType.STRING;
};
}

private static boolean isSupportedCollection(final String collectionName) {
return !IGNORED_COLLECTIONS.stream().anyMatch(s -> collectionName.startsWith(s));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"title": "Connection String",
"type": "string",
"description": "The connection string of the database that you want to replicate..",
"examples": ["mongodb+srv://example.mongodb.net", "mongodb://example1.host.com:27017,example2.host.com:27017,example3.host.com:27017", "mongodb://example.host.com:27017"],
"examples": ["mongodb+srv://example.mongodb.net/", "mongodb://example1.host.com:27017,example2.host.com:27017,example3.host.com:27017/", "mongodb://example.host.com:27017/"],
"order": 1
},
"database": {
Expand Down
Loading

0 comments on commit ac09c80

Please sign in to comment.