Skip to content

Commit

Permalink
[Improve] mongodb connector v2 add source query capability (apache#3697)
Browse files Browse the repository at this point in the history
* [Improve] mongodb connector v2 add source query capability
  • Loading branch information
MonsterChenzhuo authored and lhyundeadsoul committed Jan 3, 2023
1 parent f3af61e commit 559473d
Show file tree
Hide file tree
Showing 12 changed files with 477 additions and 87 deletions.
6 changes: 6 additions & 0 deletions docs/en/connector-v2/source/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Read data from MongoDB.
| uri | string | yes | - |
| database | string | yes | - |
| collection | string | yes | - |
| matchQuery | string | no | - |
| schema | object | yes | - |
| common-options | config | no | - |

Expand All @@ -37,6 +38,10 @@ MongoDB database

MongoDB collection

### matchQuery [string]

MatchQuery is a JSON string that specifies the selection criteria using query operators for the documents to be returned from the collection.

### schema [object]

#### fields [Config]
Expand Down Expand Up @@ -66,6 +71,7 @@ mongodb {
uri = "mongodb://username:password@127.0.0.1:27017/mypost?retryWrites=true&writeConcern=majority"
database = "mydatabase"
collection = "mycollection"
matchQuery = "{"id":3}"
schema {
fields {
id = int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,50 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.Builder;
import lombok.Getter;

import java.io.Serializable;



/**
* The config of mongodb
*/
@Builder
@Getter
public class MongodbConfig implements Serializable {

public static final Option<String> URI =
Options.key("uri")
.stringType()
.noDefaultValue()
.withDescription("MongoDB uri");

public static final Option<String> DATABASE =
Options.key("database")
.stringType()
.noDefaultValue()
.withDescription("MongoDB database name");

public static final Option<String> COLLECTION =
Options.key("collection")
.stringType()
.noDefaultValue()
.withDescription("MongoDB collection");

// Don't use now
public static final String FORMAT = "format";

// Don't use now
public static final String DEFAULT_FORMAT = "json";
@Builder.Default
private String uri = URI.defaultValue();
@Builder.Default
private String database = DATABASE.defaultValue();
@Builder.Default
private String collection = COLLECTION.defaultValue();
@Builder.Default
private String matchQuery = MATCHQUERY.defaultValue();
public static MongodbConfig buildWithConfig(Config config) {
MongodbConfigBuilder builder = MongodbConfig.builder();
if (config.hasPath(URI.key())) {
builder.uri(config.getString(URI.key()));
}
if (config.hasPath(DATABASE.key())) {
builder.database(config.getString(DATABASE.key()));
}
if (config.hasPath(COLLECTION.key())) {
builder.collection(config.getString(COLLECTION.key()));
}
if (config.hasPath(MATCHQUERY.key())) {
builder.matchQuery(config.getString(MATCHQUERY.key()));
}
return builder.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.mongodb.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class MongodbOption {
public static final Option<String> URI =
Options.key("uri")
.stringType()
.noDefaultValue()
.withDescription("MongoDB uri");

public static final Option<String> DATABASE =
Options.key("database")
.stringType()
.noDefaultValue()
.withDescription("MongoDB database name");

public static final Option<String> COLLECTION =
Options.key("collection")
.stringType()
.noDefaultValue()
.withDescription("MongoDB collection");

public static final Option<String> MATCHQUERY =
Options.key("matchQuery")
.stringType()
.noDefaultValue()
.withDescription("MatchQuery is a JSON string that specifies the selection criteria using query operators for the documents to be returned from the collection.\n");

// Don't use now
public static final String FORMAT = "format";

// Don't use now
public static final String DEFAULT_FORMAT = "json";
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
Expand All @@ -34,11 +34,10 @@
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;

import com.google.auto.service.AutoService;

Expand All @@ -49,7 +48,7 @@ public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {

private SeaTunnelRowType rowType;

private MongodbParameters params;
private MongodbConfig params;

@Override
public String getPluginName() {
Expand All @@ -65,7 +64,7 @@ public void prepare(Config config) throws PrepareFailException {
getPluginName(), PluginType.SINK, result.getMsg()));
}

this.params = ConfigBeanFactory.create(config, MongodbParameters.class);
this.params = MongodbConfig.buildWithConfig(config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;

Expand Down Expand Up @@ -49,7 +49,7 @@ public class MongodbSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {

public MongodbSinkWriter(SeaTunnelRowType rowType,
boolean useSimpleTextSchema,
MongodbParameters params) {
MongodbConfig params) {
this.rowType = rowType;
this.database = params.getDatabase();
this.collection = params.getCollection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.source;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
Expand All @@ -35,11 +35,10 @@
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;

import com.google.auto.service.AutoService;

Expand All @@ -48,7 +47,7 @@ public class MongodbSource extends AbstractSingleSplitSource<SeaTunnelRow> {

private SeaTunnelRowType rowType;

private MongodbParameters params;
private MongodbConfig params;

@Override
public String getPluginName() {
Expand All @@ -63,9 +62,7 @@ public void prepare(Config config) throws PrepareFailException {
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}

this.params = ConfigBeanFactory.create(config, MongodbParameters.class);

this.params = MongodbConfig.buildWithConfig(config);
if (config.hasPath(SeaTunnelSchema.SCHEMA.key())) {
Config schema = config.getConfig(SeaTunnelSchema.SCHEMA.key());
this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.source;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
Expand All @@ -37,6 +38,8 @@ public String factoryIdentifier() {

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(URI, DATABASE, COLLECTION, SeaTunnelSchema.SCHEMA).build();
return OptionRule.builder()
.required(URI, DATABASE, COLLECTION, SeaTunnelSchema.SCHEMA)
.optional(MATCHQUERY).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultDeserializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Deserializer;

Expand All @@ -32,10 +32,12 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Projections;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;

import java.io.IOException;
import java.util.Optional;

@Slf4j
public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
Expand All @@ -44,7 +46,7 @@ public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>

private MongoClient client;

private final MongodbParameters params;
private final MongodbConfig params;

private final Deserializer deserializer;

Expand All @@ -53,7 +55,7 @@ public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
private final boolean useSimpleTextSchema;

MongodbSourceReader(SingleSplitReaderContext context,
MongodbParameters params,
MongodbConfig params,
SeaTunnelRowType rowType,
boolean useSimpleTextSchema) {
this.context = context;
Expand Down Expand Up @@ -86,7 +88,7 @@ public void close() throws IOException {
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
try (MongoCursor<Document> mongoCursor = client.getDatabase(params.getDatabase())
.getCollection(params.getCollection())
.find()
.find(Optional.ofNullable(params.getMatchQuery()).isPresent() ? BsonDocument.parse(params.getMatchQuery()) : new BsonDocument())
.projection(projectionFields)
.iterator()) {
while (mongoCursor.hasNext()) {
Expand Down
Loading

0 comments on commit 559473d

Please sign in to comment.