Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce RemoteTableName class to MongoDB #15114

Merged
merged 2 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.SchemaTableName;

import java.util.List;

Expand All @@ -26,22 +25,22 @@
public class MongoInsertTableHandle
implements ConnectorInsertTableHandle
{
private final SchemaTableName schemaTableName;
private final RemoteTableName remoteTableName;
private final List<MongoColumnHandle> columns;

@JsonCreator
public MongoInsertTableHandle(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("columns") List<MongoColumnHandle> columns)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
}

@JsonProperty
public SchemaTableName getSchemaTableName()
public RemoteTableName getRemoteTableName()
{
return schemaTableName;
return remoteTableName;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
{
requireNonNull(tableHandle, "tableHandle is null");
SchemaTableName tableName = getTableName(tableHandle);
return getTableMetadata(session, tableName);
return getTableMetadata(tableName);
}

@Override
Expand Down Expand Up @@ -166,7 +166,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
for (SchemaTableName tableName : listTables(session, prefix)) {
try {
columns.put(tableName, getTableMetadata(session, tableName).getColumns());
columns.put(tableName, getTableMetadata(tableName).getColumns());
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
}
catch (NotFoundException e) {
// table disappeared during listing operation
Expand All @@ -192,30 +192,31 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
mongoSession.createTable(tableMetadata.getTable(), buildColumnHandles(tableMetadata), tableMetadata.getComment());
RemoteTableName remoteTableName = mongoSession.toRemoteSchemaTableName(tableMetadata.getTable());
mongoSession.createTable(remoteTableName, buildColumnHandles(tableMetadata), tableMetadata.getComment());
}

@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
MongoTableHandle table = (MongoTableHandle) tableHandle;

mongoSession.dropTable(table.getSchemaTableName());
mongoSession.dropTable(table.getRemoteTableName());
}

@Override
public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<String> comment)
{
MongoTableHandle table = (MongoTableHandle) tableHandle;
mongoSession.setTableComment(table.getSchemaTableName(), comment);
mongoSession.setTableComment(table, comment);
}

@Override
public void setColumnComment(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle, Optional<String> comment)
{
MongoTableHandle table = (MongoTableHandle) tableHandle;
MongoColumnHandle column = (MongoColumnHandle) columnHandle;
mongoSession.setColumnComment(table.getSchemaTableName(), column.getName(), comment);
mongoSession.setColumnComment(table, column.getName(), comment);
}

@Override
Expand All @@ -225,19 +226,19 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
throw new TrinoException(NOT_SUPPORTED, format("Qualified identifier name must be shorter than or equal to '%s' bytes: '%s'", MAX_QUALIFIED_IDENTIFIER_BYTE_LENGTH, newTableName));
}
MongoTableHandle table = (MongoTableHandle) tableHandle;
mongoSession.renameTable(table.getSchemaTableName(), newTableName);
mongoSession.renameTable(table, newTableName);
}

@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column)
{
mongoSession.addColumn(((MongoTableHandle) tableHandle).getSchemaTableName(), column);
mongoSession.addColumn(((MongoTableHandle) tableHandle), column);
}

@Override
public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column)
{
mongoSession.dropColumn(((MongoTableHandle) tableHandle).getSchemaTableName(), ((MongoColumnHandle) column).getName());
mongoSession.dropColumn(((MongoTableHandle) tableHandle), ((MongoColumnHandle) column).getName());
}

@Override
Expand All @@ -247,14 +248,16 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}

RemoteTableName remoteTableName = mongoSession.toRemoteSchemaTableName(tableMetadata.getTable());

List<MongoColumnHandle> columns = buildColumnHandles(tableMetadata);

mongoSession.createTable(tableMetadata.getTable(), columns, tableMetadata.getComment());
mongoSession.createTable(remoteTableName, columns, tableMetadata.getComment());

setRollback(() -> mongoSession.dropTable(tableMetadata.getTable()));
setRollback(() -> mongoSession.dropTable(remoteTableName));

return new MongoOutputTableHandle(
tableMetadata.getTable(),
remoteTableName,
columns.stream().filter(c -> !c.isHidden()).collect(toList()));
}

Expand All @@ -276,7 +279,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
List<MongoColumnHandle> columns = mongoSession.getTable(table.getSchemaTableName()).getColumns();

return new MongoInsertTableHandle(
table.getSchemaTableName(),
table.getRemoteTableName(),
columns.stream()
.filter(column -> !column.isHidden())
.peek(column -> validateColumnNameForInsert(column.getName()))
Expand Down Expand Up @@ -312,7 +315,7 @@ public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, Conn
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle)
{
MongoTableHandle table = (MongoTableHandle) handle;
return OptionalLong.of(mongoSession.deleteDocuments(table.getSchemaTableName(), table.getConstraint()));
return OptionalLong.of(mongoSession.deleteDocuments(table.getRemoteTableName(), table.getConstraint()));
}

@Override
Expand Down Expand Up @@ -365,7 +368,7 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
}

return Optional.of(new LimitApplicationResult<>(
new MongoTableHandle(handle.getSchemaTableName(), handle.getFilter(), handle.getConstraint(), OptionalInt.of(toIntExact(limit))),
new MongoTableHandle(handle.getSchemaTableName(), handle.getRemoteTableName(), handle.getFilter(), handle.getConstraint(), OptionalInt.of(toIntExact(limit))),
true,
false));
}
Expand Down Expand Up @@ -409,6 +412,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C

handle = new MongoTableHandle(
handle.getSchemaTableName(),
handle.getRemoteTableName(),
handle.getFilter(),
newDomain,
handle.getLimit());
Expand Down Expand Up @@ -455,15 +459,12 @@ private static SchemaTableName getTableName(ConnectorTableHandle tableHandle)
return ((MongoTableHandle) tableHandle).getSchemaTableName();
}

private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName)
private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
{
MongoTable mongoTable = mongoSession.getTable(tableName);
MongoTableHandle tableHandle = mongoTable.getTableHandle();

List<ColumnMetadata> columns =
getColumnHandles(session, tableHandle).values().stream()
.map(MongoColumnHandle.class::cast)
.map(MongoColumnHandle::toColumnMetadata)
List<ColumnMetadata> columns = mongoTable.getColumns().stream()
.map(MongoColumnHandle::toColumnMetadata)
.collect(toImmutableList());

return new ConnectorTableMetadata(tableName, columns, ImmutableMap.of(), mongoTable.getComment());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.SchemaTableName;

import java.util.List;

Expand All @@ -26,22 +25,22 @@
public class MongoOutputTableHandle
implements ConnectorOutputTableHandle
{
private final SchemaTableName schemaTableName;
private final RemoteTableName remoteTableName;
private final List<MongoColumnHandle> columns;

@JsonCreator
public MongoOutputTableHandle(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("columns") List<MongoColumnHandle> columns)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
}

@JsonProperty
public SchemaTableName getSchemaTableName()
public RemoteTableName getRemoteTableName()
{
return schemaTableName;
return remoteTableName;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.CharType;
Expand Down Expand Up @@ -85,26 +84,26 @@ public class MongoPageSink
implements ConnectorPageSink
{
private final MongoSession mongoSession;
private final SchemaTableName schemaTableName;
private final RemoteTableName remoteTableName;
private final List<MongoColumnHandle> columns;
private final String implicitPrefix;

public MongoPageSink(
MongoClientConfig config,
MongoSession mongoSession,
SchemaTableName schemaTableName,
RemoteTableName remoteTableName,
List<MongoColumnHandle> columns)
{
this.mongoSession = mongoSession;
this.schemaTableName = schemaTableName;
this.remoteTableName = remoteTableName;
this.columns = columns;
this.implicitPrefix = requireNonNull(config.getImplicitRowFieldPrefix(), "config.getImplicitRowFieldPrefix() is null");
}

@Override
public CompletableFuture<?> appendPage(Page page)
{
MongoCollection<Document> collection = mongoSession.getCollection(schemaTableName);
MongoCollection<Document> collection = mongoSession.getCollection(remoteTableName);
List<Document> batch = new ArrayList<>(page.getPositionCount());

for (int position = 0; position < page.getPositionCount(); position++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ public MongoPageSinkProvider(MongoClientConfig config, MongoSession mongoSession
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, ConnectorPageSinkId pageSinkId)
{
MongoOutputTableHandle handle = (MongoOutputTableHandle) outputTableHandle;
return new MongoPageSink(config, mongoSession, handle.getSchemaTableName(), handle.getColumns());
return new MongoPageSink(config, mongoSession, handle.getRemoteTableName(), handle.getColumns());
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, ConnectorPageSinkId pageSinkId)
{
MongoInsertTableHandle handle = (MongoInsertTableHandle) insertTableHandle;
return new MongoPageSink(config, mongoSession, handle.getSchemaTableName(), handle.getColumns());
return new MongoPageSink(config, mongoSession, handle.getRemoteTableName(), handle.getColumns());
}
}
Loading