From 55914a5c4177a1da6a1a36a8fae03ffe9afb94af Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 18 Nov 2022 19:48:08 +0900 Subject: [PATCH 1/2] Introduce RemoteTableName class to MongoDB --- .../mongodb/MongoInsertTableHandle.java | 11 +- .../trino/plugin/mongodb/MongoMetadata.java | 32 +++-- .../mongodb/MongoOutputTableHandle.java | 11 +- .../trino/plugin/mongodb/MongoPageSink.java | 9 +- .../plugin/mongodb/MongoPageSinkProvider.java | 4 +- .../io/trino/plugin/mongodb/MongoSession.java | 130 +++++++++--------- .../plugin/mongodb/MongoTableHandle.java | 15 +- .../trino/plugin/mongodb/RemoteTableName.java | 72 ++++++++++ .../io/trino/plugin/mongodb/ptf/Query.java | 4 +- .../plugin/mongodb/TestMongoTableHandle.java | 21 ++- 10 files changed, 205 insertions(+), 104 deletions(-) create mode 100644 plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/RemoteTableName.java diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoInsertTableHandle.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoInsertTableHandle.java index 3747aee4c9d0..b0403e9c9e3b 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoInsertTableHandle.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoInsertTableHandle.java @@ -17,7 +17,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.trino.spi.connector.ConnectorInsertTableHandle; -import io.trino.spi.connector.SchemaTableName; import java.util.List; @@ -26,22 +25,22 @@ public class MongoInsertTableHandle implements ConnectorInsertTableHandle { - private final SchemaTableName schemaTableName; + private final RemoteTableName remoteTableName; private final List columns; @JsonCreator public MongoInsertTableHandle( - @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("remoteTableName") RemoteTableName remoteTableName, @JsonProperty("columns") List columns) { - this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null"); this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); } @JsonProperty - public SchemaTableName getSchemaTableName() + public RemoteTableName getRemoteTableName() { - return schemaTableName; + return remoteTableName; } @JsonProperty diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java index 438cfacd9e55..19583ce9ff17 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java @@ -192,7 +192,8 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable @Override public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) { - mongoSession.createTable(tableMetadata.getTable(), buildColumnHandles(tableMetadata), tableMetadata.getComment()); + RemoteTableName remoteTableName = mongoSession.toRemoteSchemaTableName(tableMetadata.getTable()); + mongoSession.createTable(remoteTableName, buildColumnHandles(tableMetadata), tableMetadata.getComment()); } @Override @@ -200,14 +201,14 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle { MongoTableHandle table = (MongoTableHandle) tableHandle; - mongoSession.dropTable(table.getSchemaTableName()); + mongoSession.dropTable(table.getRemoteTableName()); } @Override public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional comment) { MongoTableHandle table = (MongoTableHandle) tableHandle; - mongoSession.setTableComment(table.getSchemaTableName(), comment); + mongoSession.setTableComment(table, comment); } @Override @@ -215,7 +216,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl { MongoTableHandle table = (MongoTableHandle) tableHandle; MongoColumnHandle column = (MongoColumnHandle) columnHandle; - mongoSession.setColumnComment(table.getSchemaTableName(), column.getName(), comment); + mongoSession.setColumnComment(table, column.getName(), comment); } @Override @@ -225,19 +226,19 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand throw new TrinoException(NOT_SUPPORTED, format("Qualified identifier name must be shorter than or equal to '%s' bytes: '%s'", MAX_QUALIFIED_IDENTIFIER_BYTE_LENGTH, newTableName)); } MongoTableHandle table = (MongoTableHandle) tableHandle; - mongoSession.renameTable(table.getSchemaTableName(), newTableName); + mongoSession.renameTable(table, newTableName); } @Override public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column) { - mongoSession.addColumn(((MongoTableHandle) tableHandle).getSchemaTableName(), column); + mongoSession.addColumn(((MongoTableHandle) tableHandle), column); } @Override public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) { - mongoSession.dropColumn(((MongoTableHandle) tableHandle).getSchemaTableName(), ((MongoColumnHandle) column).getName()); + mongoSession.dropColumn(((MongoTableHandle) tableHandle), ((MongoColumnHandle) column).getName()); } @Override @@ -247,14 +248,16 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries"); } + RemoteTableName remoteTableName = mongoSession.toRemoteSchemaTableName(tableMetadata.getTable()); + List columns = buildColumnHandles(tableMetadata); - mongoSession.createTable(tableMetadata.getTable(), columns, tableMetadata.getComment()); + mongoSession.createTable(remoteTableName, columns, tableMetadata.getComment()); - setRollback(() -> mongoSession.dropTable(tableMetadata.getTable())); + setRollback(() -> mongoSession.dropTable(remoteTableName)); return new MongoOutputTableHandle( - tableMetadata.getTable(), + remoteTableName, columns.stream().filter(c -> !c.isHidden()).collect(toList())); } @@ -276,7 +279,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto List columns = mongoSession.getTable(table.getSchemaTableName()).getColumns(); return new MongoInsertTableHandle( - table.getSchemaTableName(), + table.getRemoteTableName(), columns.stream() .filter(column -> !column.isHidden()) .peek(column -> validateColumnNameForInsert(column.getName())) @@ -312,7 +315,7 @@ public Optional applyDelete(ConnectorSession session, Conn public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle) { MongoTableHandle table = (MongoTableHandle) handle; - return OptionalLong.of(mongoSession.deleteDocuments(table.getSchemaTableName(), table.getConstraint())); + return OptionalLong.of(mongoSession.deleteDocuments(table.getRemoteTableName(), table.getConstraint())); } @Override @@ -365,7 +368,7 @@ public Optional> applyLimit(Connect } return Optional.of(new LimitApplicationResult<>( - new MongoTableHandle(handle.getSchemaTableName(), handle.getFilter(), handle.getConstraint(), OptionalInt.of(toIntExact(limit))), + new MongoTableHandle(handle.getSchemaTableName(), handle.getRemoteTableName(), handle.getFilter(), handle.getConstraint(), OptionalInt.of(toIntExact(limit))), true, false)); } @@ -409,6 +412,7 @@ public Optional> applyFilter(C handle = new MongoTableHandle( handle.getSchemaTableName(), + handle.getRemoteTableName(), handle.getFilter(), newDomain, handle.getLimit()); @@ -464,7 +468,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema getColumnHandles(session, tableHandle).values().stream() .map(MongoColumnHandle.class::cast) .map(MongoColumnHandle::toColumnMetadata) - .collect(toImmutableList()); + .collect(toImmutableList()); return new ConnectorTableMetadata(tableName, columns, ImmutableMap.of(), mongoTable.getComment()); } diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoOutputTableHandle.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoOutputTableHandle.java index bcbeab4e4bfb..615dcd7a0759 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoOutputTableHandle.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoOutputTableHandle.java @@ -17,7 +17,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.trino.spi.connector.ConnectorOutputTableHandle; -import io.trino.spi.connector.SchemaTableName; import java.util.List; @@ -26,22 +25,22 @@ public class MongoOutputTableHandle implements ConnectorOutputTableHandle { - private final SchemaTableName schemaTableName; + private final RemoteTableName remoteTableName; private final List columns; @JsonCreator public MongoOutputTableHandle( - @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("remoteTableName") RemoteTableName remoteTableName, @JsonProperty("columns") List columns) { - this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null"); this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); } @JsonProperty - public SchemaTableName getSchemaTableName() + public RemoteTableName getRemoteTableName() { - return schemaTableName; + return remoteTableName; } @JsonProperty diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoPageSink.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoPageSink.java index df233935412a..73c4068f3dc2 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoPageSink.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoPageSink.java @@ -24,7 +24,6 @@ import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.connector.ConnectorPageSink; -import io.trino.spi.connector.SchemaTableName; import io.trino.spi.type.BigintType; import io.trino.spi.type.BooleanType; import io.trino.spi.type.CharType; @@ -85,18 +84,18 @@ public class MongoPageSink implements ConnectorPageSink { private final MongoSession mongoSession; - private final SchemaTableName schemaTableName; + private final RemoteTableName remoteTableName; private final List columns; private final String implicitPrefix; public MongoPageSink( MongoClientConfig config, MongoSession mongoSession, - SchemaTableName schemaTableName, + RemoteTableName remoteTableName, List columns) { this.mongoSession = mongoSession; - this.schemaTableName = schemaTableName; + this.remoteTableName = remoteTableName; this.columns = columns; this.implicitPrefix = requireNonNull(config.getImplicitRowFieldPrefix(), "config.getImplicitRowFieldPrefix() is null"); } @@ -104,7 +103,7 @@ public MongoPageSink( @Override public CompletableFuture appendPage(Page page) { - MongoCollection collection = mongoSession.getCollection(schemaTableName); + MongoCollection collection = mongoSession.getCollection(remoteTableName); List batch = new ArrayList<>(page.getPositionCount()); for (int position = 0; position < page.getPositionCount(); position++) { diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoPageSinkProvider.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoPageSinkProvider.java index 63815b63adc9..6882b64f8511 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoPageSinkProvider.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoPageSinkProvider.java @@ -40,13 +40,13 @@ public MongoPageSinkProvider(MongoClientConfig config, MongoSession mongoSession public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, ConnectorPageSinkId pageSinkId) { MongoOutputTableHandle handle = (MongoOutputTableHandle) outputTableHandle; - return new MongoPageSink(config, mongoSession, handle.getSchemaTableName(), handle.getColumns()); + return new MongoPageSink(config, mongoSession, handle.getRemoteTableName(), handle.getColumns()); } @Override public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, ConnectorPageSinkId pageSinkId) { MongoInsertTableHandle handle = (MongoInsertTableHandle) insertTableHandle; - return new MongoPageSink(config, mongoSession, handle.getSchemaTableName(), handle.getColumns()); + return new MongoPageSink(config, mongoSession, handle.getRemoteTableName(), handle.getColumns()); } } diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java index 5c34d09757e2..e35b7f98b870 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java @@ -221,41 +221,41 @@ public MongoTable getTable(SchemaTableName tableName) } } - public void createTable(SchemaTableName name, List columns, Optional comment) + public void createTable(RemoteTableName name, List columns, Optional comment) { - if (!getAllSchemas().contains(name.getSchemaName())) { - throw new SchemaNotFoundException(name.getSchemaName()); + if (getAllSchemas().stream().noneMatch(schemaName -> schemaName.equalsIgnoreCase(name.getDatabaseName()))) { + throw new SchemaNotFoundException(name.getDatabaseName()); } createTableMetadata(name, columns, comment); - client.getDatabase(name.getSchemaName()).createCollection(name.getTableName()); + client.getDatabase(name.getDatabaseName()).createCollection(name.getCollectionName()); } - public void dropTable(SchemaTableName tableName) + public void dropTable(RemoteTableName remoteTableName) { - deleteTableMetadata(tableName); - getCollection(tableName).drop(); + deleteTableMetadata(remoteTableName); + getCollection(remoteTableName).drop(); - tableCache.invalidate(tableName); + tableCache.invalidate(new SchemaTableName(remoteTableName.getDatabaseName(), remoteTableName.getCollectionName())); } - public void setTableComment(SchemaTableName schemaTableName, Optional comment) + public void setTableComment(MongoTableHandle table, Optional comment) { - String schemaName = toRemoteSchemaName(schemaTableName.getSchemaName()); - String tableName = toRemoteTableName(schemaName, schemaTableName.getTableName()); + String remoteSchemaName = table.getRemoteTableName().getDatabaseName(); + String remoteTableName = table.getRemoteTableName().getCollectionName(); - Document metadata = getTableMetadata(schemaName, tableName); + Document metadata = getTableMetadata(remoteSchemaName, remoteTableName); metadata.append(COMMENT_KEY, comment.orElse(null)); - client.getDatabase(schemaName).getCollection(schemaCollection) - .findOneAndReplace(new Document(TABLE_NAME_KEY, tableName), metadata); + client.getDatabase(remoteSchemaName).getCollection(schemaCollection) + .findOneAndReplace(new Document(TABLE_NAME_KEY, remoteTableName), metadata); - tableCache.invalidate(schemaTableName); + tableCache.invalidate(table.getSchemaTableName()); } - public void setColumnComment(SchemaTableName schemaTableName, String columnName, Optional comment) + public void setColumnComment(MongoTableHandle table, String columnName, Optional comment) { - String remoteSchemaName = toRemoteSchemaName(schemaTableName.getSchemaName()); - String remoteTableName = toRemoteTableName(remoteSchemaName, schemaTableName.getTableName()); + String remoteSchemaName = table.getRemoteTableName().getDatabaseName(); + String remoteTableName = table.getRemoteTableName().getCollectionName(); Document metadata = getTableMetadata(remoteSchemaName, remoteTableName); @@ -272,13 +272,13 @@ public void setColumnComment(SchemaTableName schemaTableName, String columnName, client.getDatabase(remoteSchemaName).getCollection(schemaCollection) .findOneAndReplace(new Document(TABLE_NAME_KEY, remoteTableName), metadata); - tableCache.invalidate(schemaTableName); + tableCache.invalidate(table.getSchemaTableName()); } - public void renameTable(SchemaTableName oldName, SchemaTableName newName) + public void renameTable(MongoTableHandle table, SchemaTableName newName) { - String oldSchemaName = toRemoteSchemaName(oldName.getSchemaName()); - String oldTableName = toRemoteTableName(oldSchemaName, oldName.getTableName()); + String oldSchemaName = table.getRemoteTableName().getDatabaseName(); + String oldTableName = table.getRemoteTableName().getCollectionName(); String newSchemaName = toRemoteSchemaName(newName.getSchemaName()); // Schema collection should always have the source table definition @@ -292,18 +292,18 @@ public void renameTable(SchemaTableName oldName, SchemaTableName newName) // Need to check explicitly because the old collection may not exist when it doesn't have any data if (collectionExists(client.getDatabase(oldSchemaName), oldTableName)) { - getCollection(oldName).renameCollection(new MongoNamespace(newSchemaName, newName.getTableName())); + getCollection(table.getRemoteTableName()).renameCollection(new MongoNamespace(newSchemaName, newName.getTableName())); } - tableCache.invalidate(oldName); + tableCache.invalidate(table.getSchemaTableName()); } - public void addColumn(SchemaTableName schemaTableName, ColumnMetadata columnMetadata) + public void addColumn(MongoTableHandle table, ColumnMetadata columnMetadata) { - String schemaName = toRemoteSchemaName(schemaTableName.getSchemaName()); - String tableName = toRemoteTableName(schemaName, schemaTableName.getTableName()); + String remoteSchemaName = table.getRemoteTableName().getDatabaseName(); + String remoteTableName = table.getRemoteTableName().getCollectionName(); - Document metadata = getTableMetadata(schemaName, tableName); + Document metadata = getTableMetadata(remoteSchemaName, remoteTableName); List columns = new ArrayList<>(getColumnMetadata(metadata)); @@ -316,17 +316,17 @@ public void addColumn(SchemaTableName schemaTableName, ColumnMetadata columnMeta metadata.append(FIELDS_KEY, columns); - MongoDatabase db = client.getDatabase(schemaName); + MongoDatabase db = client.getDatabase(remoteSchemaName); MongoCollection schema = db.getCollection(schemaCollection); - schema.findOneAndReplace(new Document(TABLE_NAME_KEY, tableName), metadata); + schema.findOneAndReplace(new Document(TABLE_NAME_KEY, remoteTableName), metadata); - tableCache.invalidate(schemaTableName); + tableCache.invalidate(table.getSchemaTableName()); } - public void dropColumn(SchemaTableName schemaTableName, String columnName) + public void dropColumn(MongoTableHandle table, String columnName) { - String remoteSchemaName = toRemoteSchemaName(schemaTableName.getSchemaName()); - String remoteTableName = toRemoteTableName(remoteSchemaName, schemaTableName.getTableName()); + String remoteSchemaName = table.getRemoteTableName().getDatabaseName(); + String remoteTableName = table.getRemoteTableName().getCollectionName(); Document metadata = getTableMetadata(remoteSchemaName, remoteTableName); @@ -340,16 +340,17 @@ public void dropColumn(SchemaTableName schemaTableName, String columnName) MongoCollection schema = database.getCollection(schemaCollection); schema.findOneAndReplace(new Document(TABLE_NAME_KEY, remoteTableName), metadata); - tableCache.invalidate(schemaTableName); + tableCache.invalidate(table.getSchemaTableName()); } private MongoTable loadTableSchema(SchemaTableName schemaTableName) throws TableNotFoundException { - String schemaName = toRemoteSchemaName(schemaTableName.getSchemaName()); - String tableName = toRemoteTableName(schemaName, schemaTableName.getTableName()); + RemoteTableName remoteSchemaTableName = toRemoteSchemaTableName(schemaTableName); + String remoteSchemaName = remoteSchemaTableName.getDatabaseName(); + String remoteTableName = remoteSchemaTableName.getCollectionName(); - Document tableMeta = getTableMetadata(schemaName, tableName); + Document tableMeta = getTableMetadata(remoteSchemaName, remoteTableName); ImmutableList.Builder columnHandles = ImmutableList.builder(); @@ -358,8 +359,8 @@ private MongoTable loadTableSchema(SchemaTableName schemaTableName) columnHandles.add(columnHandle); } - MongoTableHandle tableHandle = new MongoTableHandle(schemaTableName, Optional.empty()); - return new MongoTable(tableHandle, columnHandles.build(), getIndexes(schemaName, tableName), getComment(tableMeta)); + MongoTableHandle tableHandle = new MongoTableHandle(schemaTableName, remoteSchemaTableName, Optional.empty()); + return new MongoTable(tableHandle, columnHandles.build(), getIndexes(remoteSchemaName, remoteTableName), getComment(tableMeta)); } private MongoColumnHandle buildColumnHandle(Document columnMeta) @@ -388,16 +389,9 @@ private static Optional getComment(Document doc) return Optional.ofNullable(doc.getString(COMMENT_KEY)); } - public MongoCollection getCollection(SchemaTableName tableName) - { - return getCollection(tableName.getSchemaName(), tableName.getTableName()); - } - - private MongoCollection getCollection(String schema, String table) + public MongoCollection getCollection(RemoteTableName remoteTableName) { - String schemaName = toRemoteSchemaName(schema); - String tableName = toRemoteTableName(schemaName, table); - return client.getDatabase(schemaName).getCollection(tableName); + return client.getDatabase(remoteTableName.getDatabaseName()).getCollection(remoteTableName.getCollectionName()); } public List getIndexes(String schemaName, String tableName) @@ -409,12 +403,12 @@ public List getIndexes(String schemaName, String tableName) return MongoIndex.parse(collection.listIndexes()); } - public long deleteDocuments(SchemaTableName schemaTableName, TupleDomain constraint) + public long deleteDocuments(RemoteTableName remoteTableName, TupleDomain constraint) { Document filter = buildQuery(constraint); - log.debug("Delete documents: collection: %s, filter: %s", schemaTableName, filter); + log.debug("Delete documents: collection: %s, filter: %s", remoteTableName, filter); - DeleteResult result = getCollection(schemaTableName).deleteMany(filter); + DeleteResult result = getCollection(remoteTableName).deleteMany(filter); return result.getDeletedCount(); } @@ -424,7 +418,7 @@ public MongoCursor execute(MongoTableHandle tableHandle, List collection = getCollection(tableHandle.getSchemaTableName()); + MongoCollection collection = getCollection(tableHandle.getRemoteTableName()); Document filter = buildFilter(tableHandle); FindIterable iterable = collection.find(filter).projection(output).collation(SIMPLE_COLLATION); tableHandle.getLimit().ifPresent(iterable::limit); @@ -674,13 +668,13 @@ private Set getTableMetadataNames(String schemaName) return names; } - private void createTableMetadata(SchemaTableName schemaTableName, List columns, Optional tableComment) + private void createTableMetadata(RemoteTableName remoteSchemaTableName, List columns, Optional tableComment) { - String schemaName = schemaTableName.getSchemaName(); - String tableName = schemaTableName.getTableName(); + String remoteSchemaName = remoteSchemaTableName.getDatabaseName(); + String remoteTableName = remoteSchemaTableName.getCollectionName(); - MongoDatabase db = client.getDatabase(schemaName); - Document metadata = new Document(TABLE_NAME_KEY, tableName); + MongoDatabase db = client.getDatabase(remoteSchemaName); + Document metadata = new Document(TABLE_NAME_KEY, remoteTableName); ArrayList fields = new ArrayList<>(); if (!columns.stream().anyMatch(c -> c.getName().equals("_id"))) { @@ -702,19 +696,16 @@ private void createTableMetadata(SchemaTableName schemaTableName, List constraint; private final Optional filter; private final OptionalInt limit; - public MongoTableHandle(SchemaTableName schemaTableName, Optional filter) + public MongoTableHandle(SchemaTableName schemaTableName, RemoteTableName remoteTableName, Optional filter) { - this(schemaTableName, filter, TupleDomain.all(), OptionalInt.empty()); + this(schemaTableName, remoteTableName, filter, TupleDomain.all(), OptionalInt.empty()); } @JsonCreator public MongoTableHandle( @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("remoteTableName") RemoteTableName remoteTableName, @JsonProperty("filter") Optional filter, @JsonProperty("constraint") TupleDomain constraint, @JsonProperty("limit") OptionalInt limit) { this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null"); this.filter = requireNonNull(filter, "filter is null"); this.constraint = requireNonNull(constraint, "constraint is null"); this.limit = requireNonNull(limit, "limit is null"); @@ -60,6 +63,12 @@ public SchemaTableName getSchemaTableName() return schemaTableName; } + @JsonProperty + public RemoteTableName getRemoteTableName() + { + return remoteTableName; + } + @JsonProperty public Optional getFilter() { @@ -95,6 +104,7 @@ public boolean equals(Object obj) } MongoTableHandle other = (MongoTableHandle) obj; return Objects.equals(this.schemaTableName, other.schemaTableName) && + Objects.equals(this.remoteTableName, other.remoteTableName) && Objects.equals(this.filter, other.filter) && Objects.equals(this.constraint, other.constraint) && Objects.equals(this.limit, other.limit); @@ -105,6 +115,7 @@ public String toString() { return toStringHelper(this) .add("schemaTableName", schemaTableName) + .add("remoteTableName", remoteTableName) .add("filter", filter) .add("limit", limit) .add("constraint", constraint) diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/RemoteTableName.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/RemoteTableName.java new file mode 100644 index 000000000000..221240dc32c0 --- /dev/null +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/RemoteTableName.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.mongodb; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public final class RemoteTableName +{ + private final String databaseName; + private final String collectionName; + + @JsonCreator + public RemoteTableName(@JsonProperty String databaseName, @JsonProperty String collectionName) + { + this.databaseName = requireNonNull(databaseName, "databaseName is null"); + this.collectionName = requireNonNull(collectionName, "collectionName is null"); + } + + @JsonProperty + public String getDatabaseName() + { + return databaseName; + } + + @JsonProperty + public String getCollectionName() + { + return collectionName; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RemoteTableName that = (RemoteTableName) o; + return databaseName.equals(that.databaseName) && + collectionName.equals(that.collectionName); + } + + @Override + public int hashCode() + { + return Objects.hash(databaseName, collectionName); + } + + @Override + public String toString() + { + return databaseName + "." + collectionName; + } +} diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/ptf/Query.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/ptf/Query.java index d1fd37af1430..d3b5f835464a 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/ptf/Query.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/ptf/Query.java @@ -21,6 +21,7 @@ import io.trino.plugin.mongodb.MongoMetadata; import io.trino.plugin.mongodb.MongoSession; import io.trino.plugin.mongodb.MongoTableHandle; +import io.trino.plugin.mongodb.RemoteTableName; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnSchema; @@ -115,8 +116,9 @@ public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransact if (!collection.equals(collection.toLowerCase(ENGLISH))) { throw new TrinoException(INVALID_FUNCTION_ARGUMENT, "Only lowercase collection name is supported"); } + RemoteTableName remoteTableName = new RemoteTableName(database, collection); - MongoTableHandle tableHandle = new MongoTableHandle(new SchemaTableName(database, collection), Optional.of(parseFilter(filter))); + MongoTableHandle tableHandle = new MongoTableHandle(new SchemaTableName(database, collection), remoteTableName, Optional.of(parseFilter(filter))); ConnectorTableSchema tableSchema = metadata.getTableSchema(session, tableHandle); Map columnsByName = metadata.getColumnHandles(session, tableHandle); List columns = tableSchema.getColumns().stream() diff --git a/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoTableHandle.java b/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoTableHandle.java index 200952ffbb10..9792b72989d1 100644 --- a/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoTableHandle.java +++ b/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoTableHandle.java @@ -29,7 +29,22 @@ public class TestMongoTableHandle @Test public void testRoundTripWithoutQuery() { - MongoTableHandle expected = new MongoTableHandle(new SchemaTableName("schema", "table"), Optional.empty()); + SchemaTableName schemaTableName = new SchemaTableName("schema", "table"); + RemoteTableName remoteTableName = new RemoteTableName("schema", "table"); + MongoTableHandle expected = new MongoTableHandle(schemaTableName, remoteTableName, Optional.empty()); + + String json = codec.toJson(expected); + MongoTableHandle actual = codec.fromJson(json); + + assertEquals(actual.getSchemaTableName(), expected.getSchemaTableName()); + } + + @Test + public void testRoundTripNonLowercaseWithoutQuery() + { + SchemaTableName schemaTableName = new SchemaTableName("schema", "table"); + RemoteTableName remoteTableName = new RemoteTableName("Schema", "Table"); + MongoTableHandle expected = new MongoTableHandle(schemaTableName, remoteTableName, Optional.empty()); String json = codec.toJson(expected); MongoTableHandle actual = codec.fromJson(json); @@ -40,7 +55,9 @@ public void testRoundTripWithoutQuery() @Test public void testRoundTripWithQuery() { - MongoTableHandle expected = new MongoTableHandle(new SchemaTableName("schema", "table"), Optional.of(new Document("key", "value"))); + SchemaTableName schemaTableName = new SchemaTableName("schema", "table"); + RemoteTableName remoteTableName = new RemoteTableName("schema", "table"); + MongoTableHandle expected = new MongoTableHandle(schemaTableName, remoteTableName, Optional.of(new Document("key", "value"))); String json = codec.toJson(expected); MongoTableHandle actual = codec.fromJson(json); From 40fa9d39eb666c1d06addf0f885f64e6e2cd1fbd Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 23 Nov 2022 07:05:20 +0900 Subject: [PATCH 2/2] Simplify MongoMetadata.getTableMetadata --- .../io/trino/plugin/mongodb/MongoMetadata.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java index 19583ce9ff17..cf18b6bf4e6d 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java @@ -129,7 +129,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect { requireNonNull(tableHandle, "tableHandle is null"); SchemaTableName tableName = getTableName(tableHandle); - return getTableMetadata(session, tableName); + return getTableMetadata(tableName); } @Override @@ -166,7 +166,7 @@ public Map> listTableColumns(ConnectorSess ImmutableMap.Builder> columns = ImmutableMap.builder(); for (SchemaTableName tableName : listTables(session, prefix)) { try { - columns.put(tableName, getTableMetadata(session, tableName).getColumns()); + columns.put(tableName, getTableMetadata(tableName).getColumns()); } catch (NotFoundException e) { // table disappeared during listing operation @@ -459,16 +459,13 @@ private static SchemaTableName getTableName(ConnectorTableHandle tableHandle) return ((MongoTableHandle) tableHandle).getSchemaTableName(); } - private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName) + private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) { MongoTable mongoTable = mongoSession.getTable(tableName); - MongoTableHandle tableHandle = mongoTable.getTableHandle(); - List columns = - getColumnHandles(session, tableHandle).values().stream() - .map(MongoColumnHandle.class::cast) - .map(MongoColumnHandle::toColumnMetadata) - .collect(toImmutableList()); + List columns = mongoTable.getColumns().stream() + .map(MongoColumnHandle::toColumnMetadata) + .collect(toImmutableList()); return new ConnectorTableMetadata(tableName, columns, ImmutableMap.of(), mongoTable.getComment()); }