Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] mongodb connector v2 add source query capability #3697

Merged
merged 14 commits into from
Dec 26, 2022
6 changes: 6 additions & 0 deletions docs/en/connector-v2/source/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Read data from MongoDB.
| uri | string | yes | - |
| database | string | yes | - |
| collection | string | yes | - |
| matchQuery | string | no | - |
| schema | object | yes | - |
| common-options | config | no | - |

Expand All @@ -37,6 +38,10 @@ MongoDB database

MongoDB collection

### matchQuery [string]

MatchQuery is a JSON string that specifies the selection criteria using query operators for the documents to be returned from the collection.

### schema [object]

#### fields [Config]
Expand Down Expand Up @@ -66,6 +71,7 @@ mongodb {
uri = "mongodb://username:password@127.0.0.1:27017/mypost?retryWrites=true&writeConcern=majority"
database = "mydatabase"
collection = "mycollection"
matchQuery = "{"id":3}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the json in hocon value can be parse normally, maybe you should change a way, like use object

matchQuery = {
     "id"=3
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This place is a string, except that this string has to conform to a certain format in order to be transformed into a BsonDocument

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core is this method:
.find(Optional.ofNullable(params.getMatchQuery()).isPresent() ? BsonDocument.parse(params.getMatchQuery()) : new BsonDocument())
I have tested in my development env example mode, it is ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core is this method: .find(Optional.ofNullable(params.getMatchQuery()).isPresent() ? BsonDocument.parse(params.getMatchQuery()) : new BsonDocument()) I have tested in my development env example mode, it is ok

Because I consider one thing is: the query statement is best to conform to the native syntax of mongodb, this will be very friendly to the developer, he does not need to learn anything extra, directly paste the history of mongodb query statement can be used

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok for me, please update e2e test case. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review,I have made the changes.

schema {
fields {
id = int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,50 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.Builder;
import lombok.Getter;

import java.io.Serializable;



/**
* The config of mongodb
*/
@Builder
@Getter
public class MongodbConfig implements Serializable {

public static final Option<String> URI =
Options.key("uri")
.stringType()
.noDefaultValue()
.withDescription("MongoDB uri");

public static final Option<String> DATABASE =
Options.key("database")
.stringType()
.noDefaultValue()
.withDescription("MongoDB database name");

public static final Option<String> COLLECTION =
Options.key("collection")
.stringType()
.noDefaultValue()
.withDescription("MongoDB collection");

// Don't use now
public static final String FORMAT = "format";

// Don't use now
public static final String DEFAULT_FORMAT = "json";
@Builder.Default
private String uri = URI.defaultValue();
@Builder.Default
private String database = DATABASE.defaultValue();
@Builder.Default
private String collection = COLLECTION.defaultValue();
@Builder.Default
private String matchQuery = MATCHQUERY.defaultValue();
public static MongodbConfig buildWithConfig(Config config) {
MongodbConfigBuilder builder = MongodbConfig.builder();
if (config.hasPath(URI.key())) {
builder.uri(config.getString(URI.key()));
}
if (config.hasPath(DATABASE.key())) {
builder.database(config.getString(DATABASE.key()));
}
if (config.hasPath(COLLECTION.key())) {
builder.collection(config.getString(COLLECTION.key()));
}
if (config.hasPath(MATCHQUERY.key())) {
builder.matchQuery(config.getString(MATCHQUERY.key()));
}
return builder.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.mongodb.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class MongodbOption {
public static final Option<String> URI =
Options.key("uri")
.stringType()
.noDefaultValue()
.withDescription("MongoDB uri");

public static final Option<String> DATABASE =
Options.key("database")
.stringType()
.noDefaultValue()
.withDescription("MongoDB database name");

public static final Option<String> COLLECTION =
Options.key("collection")
.stringType()
.noDefaultValue()
.withDescription("MongoDB collection");

public static final Option<String> MATCHQUERY =
Options.key("matchQuery")
.stringType()
.noDefaultValue()
.withDescription("MatchQuery is a JSON string that specifies the selection criteria using query operators for the documents to be returned from the collection.\n");

// Don't use now
public static final String FORMAT = "format";

// Don't use now
public static final String DEFAULT_FORMAT = "json";
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
Expand All @@ -34,11 +34,10 @@
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;

import com.google.auto.service.AutoService;

Expand All @@ -49,7 +48,7 @@ public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {

private SeaTunnelRowType rowType;

private MongodbParameters params;
private MongodbConfig params;

@Override
public String getPluginName() {
Expand All @@ -65,7 +64,7 @@ public void prepare(Config config) throws PrepareFailException {
getPluginName(), PluginType.SINK, result.getMsg()));
}

this.params = ConfigBeanFactory.create(config, MongodbParameters.class);
this.params = MongodbConfig.buildWithConfig(config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;

Expand Down Expand Up @@ -49,7 +49,7 @@ public class MongodbSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {

public MongodbSinkWriter(SeaTunnelRowType rowType,
boolean useSimpleTextSchema,
MongodbParameters params) {
MongodbConfig params) {
this.rowType = rowType;
this.database = params.getDatabase();
this.collection = params.getCollection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.source;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
Expand All @@ -35,11 +35,10 @@
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;

import com.google.auto.service.AutoService;

Expand All @@ -48,7 +47,7 @@ public class MongodbSource extends AbstractSingleSplitSource<SeaTunnelRow> {

private SeaTunnelRowType rowType;

private MongodbParameters params;
private MongodbConfig params;

@Override
public String getPluginName() {
Expand All @@ -63,9 +62,7 @@ public void prepare(Config config) throws PrepareFailException {
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}

this.params = ConfigBeanFactory.create(config, MongodbParameters.class);

this.params = MongodbConfig.buildWithConfig(config);
if (config.hasPath(SeaTunnelSchema.SCHEMA.key())) {
Config schema = config.getConfig(SeaTunnelSchema.SCHEMA.key());
this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.source;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
Expand All @@ -37,6 +38,8 @@ public String factoryIdentifier() {

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(URI, DATABASE, COLLECTION, SeaTunnelSchema.SCHEMA).build();
return OptionRule.builder()
.required(URI, DATABASE, COLLECTION, SeaTunnelSchema.SCHEMA)
.optional(MATCHQUERY).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultDeserializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Deserializer;

Expand All @@ -32,10 +32,12 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Projections;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;

import java.io.IOException;
import java.util.Optional;

@Slf4j
public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
Expand All @@ -44,7 +46,7 @@ public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>

private MongoClient client;

private final MongodbParameters params;
private final MongodbConfig params;

private final Deserializer deserializer;

Expand All @@ -53,7 +55,7 @@ public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
private final boolean useSimpleTextSchema;

MongodbSourceReader(SingleSplitReaderContext context,
MongodbParameters params,
MongodbConfig params,
SeaTunnelRowType rowType,
boolean useSimpleTextSchema) {
this.context = context;
Expand Down Expand Up @@ -86,7 +88,7 @@ public void close() throws IOException {
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
try (MongoCursor<Document> mongoCursor = client.getDatabase(params.getDatabase())
.getCollection(params.getCollection())
.find()
.find(Optional.ofNullable(params.getMatchQuery()).isPresent() ? BsonDocument.parse(params.getMatchQuery()) : new BsonDocument())
.projection(projectionFields)
.iterator()) {
while (mongoCursor.hasNext()) {
Expand Down
Loading