-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fault tolerant execution for Mongo connector #15062
Conversation
87e28de
to
772653b
Compare
772653b
to
65c4fce
Compare
plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java
Outdated
Show resolved
Hide resolved
plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/MongoServer.java
Show resolved
Hide resolved
65c4fce
to
9f263b2
Compare
@@ -14,7 +14,7 @@ Requirements | |||
|
|||
To connect to MongoDB, you need: | |||
|
|||
* MongoDB 4.0 or higher. | |||
* MongoDB 4.2 or higher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should go into release notes @colebow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there something I need to do for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, you're good! You've ticked the box in the PR template, so I've got it from here. 😄
0360f33
to
78aa2b7
Compare
Map<String, String> properties = ImmutableMap.of( | ||
"mongodb.case-insensitive-name-matching", "true", | ||
"mongodb.connection-url", server.getConnectionString().toString()); | ||
connectorProperties = new HashMap(ImmutableMap.copyOf(connectorProperties)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why HashMap? I would stick to the pattern and use ImmutableMap.Builder here,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PostgreSqlQueryRunner
uses a HashMap, but OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using HashMap makes it possible to override the default connectorProperties. If i used an ImmutableMap.builder(), the builder doesn't have a putIfAbsent
, and our style checks require i use buildOrThrow()
rather than build()
, so it'd be very awkward to allow that.
this.temporaryTableName = requireNonNull(temporaryTableName, "temporaryTableName is null"); | ||
this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verify either both or none set. Those always come in pairs, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gah - i added that to MongoInsertTableHandle and forgot to here.
.addAll(columns) | ||
.add(pageSinkIdColumn) | ||
.build(); | ||
SchemaTableName tempTable = SchemaTableName.schemaTableName(tableMetadata.getTable().getSchemaName(), generateTemporaryTableName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/temp/temporary/
|
||
MongoTableHandle table = (MongoTableHandle) tableHandle; | ||
List<MongoColumnHandle> columns = mongoSession.getTable(table.getSchemaTableName()).getColumns(); | ||
SchemaTableName tempTable = SchemaTableName.schemaTableName(handle.getSchemaTableName().getSchemaName(), generateTemporaryTableName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
|
||
try { | ||
// Create the temporary page sink ID table | ||
SchemaTableName pageSinkTable = SchemaTableName.schemaTableName(tempTable.getSchemaName(), generateTemporaryTableName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pageSinkIdsTable?
return Optional.empty(); | ||
} | ||
|
||
private void finishInsert( | ||
SchemaTableName targetTable, | ||
SchemaTableName tempTable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
temporary
closer.register(() -> mongoSession.dropTable(pageSinkTable)); | ||
|
||
// Insert all the page sink IDs into the page sink ID table | ||
MongoCollection<Document> pageSinkCollection = mongoSession.getCollection(pageSinkTable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pageSinkIds?
.collect(toImmutableList()); | ||
pageSinkCollection.insertMany(pageSinkIds); | ||
|
||
MongoCollection<Document> tempCollection = mongoSession.getCollection(tempTable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfilteredInsertedRows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stuck with temporaryCollection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - some naming suggestions.
@ebyhr do you have some more actual comments or is it good to go?
78aa2b7
to
0208bd9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you rebase on upstream and resolve conflicts?
plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoTable.java
Outdated
Show resolved
Hide resolved
0208bd9
to
e85eb93
Compare
...olerant-tests/src/test/java/io/trino/faulttolerant/mongodb/BaseMongoFailureRecoveryTest.java
Show resolved
Hide resolved
e85eb93
to
8af798e
Compare
🎉 |
Description
This provides support for FTE with the Mongo connector. It uses the same paradigm as my previous PR. We only use this new paradigm with retries enabled, otherwise we use the old paradigm. This is important because this also increases our minimum mongo version from 4.0 to 4.2, but only when FTE is enabled - this allows clients using Mongo on 4.0 to continue upgrading their trino version without having to upgrade their mongo, so long as they can live without FTE.
Non-technical explanation
This provides support for fault tolerant execution on MongoDB. If you want to enable FTE, we require a minimum of Mongo version 4.2, otherwise we still support 4.0.
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text: