Skip to content

Commit

Permalink
Introduce RemoteTableName class to MongoDB
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Nov 23, 2022
1 parent fcd603d commit 24d7f87
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.SchemaTableName;

import java.util.List;

Expand All @@ -26,22 +25,22 @@
public class MongoInsertTableHandle
implements ConnectorInsertTableHandle
{
private final SchemaTableName schemaTableName;
private final RemoteTableName remoteTableName;
private final List<MongoColumnHandle> columns;

@JsonCreator
public MongoInsertTableHandle(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("columns") List<MongoColumnHandle> columns)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
}

@JsonProperty
public SchemaTableName getSchemaTableName()
public RemoteTableName getRemoteTableName()
{
return schemaTableName;
return remoteTableName;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,30 +192,31 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
mongoSession.createTable(tableMetadata.getTable(), buildColumnHandles(tableMetadata), tableMetadata.getComment());
RemoteTableName remoteTableName = mongoSession.toRemoteSchemaTableName(tableMetadata.getTable());
mongoSession.createTable(remoteTableName, buildColumnHandles(tableMetadata), tableMetadata.getComment());
}

@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
MongoTableHandle table = (MongoTableHandle) tableHandle;

mongoSession.dropTable(table.getSchemaTableName());
mongoSession.dropTable(table.getRemoteTableName());
}

@Override
public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<String> comment)
{
MongoTableHandle table = (MongoTableHandle) tableHandle;
mongoSession.setTableComment(table.getSchemaTableName(), comment);
mongoSession.setTableComment(table, comment);
}

@Override
public void setColumnComment(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle, Optional<String> comment)
{
MongoTableHandle table = (MongoTableHandle) tableHandle;
MongoColumnHandle column = (MongoColumnHandle) columnHandle;
mongoSession.setColumnComment(table.getSchemaTableName(), column.getName(), comment);
mongoSession.setColumnComment(table, column.getName(), comment);
}

@Override
Expand All @@ -225,19 +226,19 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
throw new TrinoException(NOT_SUPPORTED, format("Qualified identifier name must be shorter than or equal to '%s' bytes: '%s'", MAX_QUALIFIED_IDENTIFIER_BYTE_LENGTH, newTableName));
}
MongoTableHandle table = (MongoTableHandle) tableHandle;
mongoSession.renameTable(table.getSchemaTableName(), newTableName);
mongoSession.renameTable(table, newTableName);
}

@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column)
{
mongoSession.addColumn(((MongoTableHandle) tableHandle).getSchemaTableName(), column);
mongoSession.addColumn(((MongoTableHandle) tableHandle), column);
}

@Override
public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column)
{
mongoSession.dropColumn(((MongoTableHandle) tableHandle).getSchemaTableName(), ((MongoColumnHandle) column).getName());
mongoSession.dropColumn(((MongoTableHandle) tableHandle), ((MongoColumnHandle) column).getName());
}

@Override
Expand All @@ -247,14 +248,16 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}

RemoteTableName remoteTableName = mongoSession.toRemoteSchemaTableName(tableMetadata.getTable());

List<MongoColumnHandle> columns = buildColumnHandles(tableMetadata);

mongoSession.createTable(tableMetadata.getTable(), columns, tableMetadata.getComment());
mongoSession.createTable(remoteTableName, columns, tableMetadata.getComment());

setRollback(() -> mongoSession.dropTable(tableMetadata.getTable()));
setRollback(() -> mongoSession.dropTable(remoteTableName));

return new MongoOutputTableHandle(
tableMetadata.getTable(),
remoteTableName,
columns.stream().filter(c -> !c.isHidden()).collect(toList()));
}

Expand All @@ -276,7 +279,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
List<MongoColumnHandle> columns = mongoSession.getTable(table.getSchemaTableName()).getColumns();

return new MongoInsertTableHandle(
table.getSchemaTableName(),
table.getRemoteTableName(),
columns.stream()
.filter(column -> !column.isHidden())
.peek(column -> validateColumnNameForInsert(column.getName()))
Expand Down Expand Up @@ -318,7 +321,7 @@ public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, Conn
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle)
{
MongoTableHandle table = (MongoTableHandle) handle;
return OptionalLong.of(mongoSession.deleteDocuments(table.getSchemaTableName(), table.getConstraint()));
return OptionalLong.of(mongoSession.deleteDocuments(table.getRemoteTableName(), table.getConstraint()));
}

@Override
Expand Down Expand Up @@ -371,7 +374,7 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
}

return Optional.of(new LimitApplicationResult<>(
new MongoTableHandle(handle.getSchemaTableName(), handle.getFilter(), handle.getConstraint(), OptionalInt.of(toIntExact(limit))),
new MongoTableHandle(handle.getSchemaTableName(), handle.getRemoteTableName(), handle.getFilter(), handle.getConstraint(), OptionalInt.of(toIntExact(limit))),
true,
false));
}
Expand Down Expand Up @@ -415,6 +418,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C

handle = new MongoTableHandle(
handle.getSchemaTableName(),
handle.getRemoteTableName(),
handle.getFilter(),
newDomain,
handle.getLimit());
Expand Down Expand Up @@ -470,7 +474,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema
getColumnHandles(session, tableHandle).values().stream()
.map(MongoColumnHandle.class::cast)
.map(MongoColumnHandle::toColumnMetadata)
.collect(toImmutableList());
.collect(toImmutableList());

return new ConnectorTableMetadata(tableName, columns, ImmutableMap.of(), mongoTable.getComment());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.SchemaTableName;

import java.util.List;

Expand All @@ -26,22 +25,22 @@
public class MongoOutputTableHandle
implements ConnectorOutputTableHandle
{
private final SchemaTableName schemaTableName;
private final RemoteTableName remoteTableName;
private final List<MongoColumnHandle> columns;

@JsonCreator
public MongoOutputTableHandle(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("columns") List<MongoColumnHandle> columns)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
}

@JsonProperty
public SchemaTableName getSchemaTableName()
public RemoteTableName getRemoteTableName()
{
return schemaTableName;
return remoteTableName;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.CharType;
Expand Down Expand Up @@ -85,26 +84,26 @@ public class MongoPageSink
implements ConnectorPageSink
{
private final MongoSession mongoSession;
private final SchemaTableName schemaTableName;
private final RemoteTableName remoteTableName;
private final List<MongoColumnHandle> columns;
private final String implicitPrefix;

public MongoPageSink(
MongoClientConfig config,
MongoSession mongoSession,
SchemaTableName schemaTableName,
RemoteTableName remoteTableName,
List<MongoColumnHandle> columns)
{
this.mongoSession = mongoSession;
this.schemaTableName = schemaTableName;
this.remoteTableName = remoteTableName;
this.columns = columns;
this.implicitPrefix = requireNonNull(config.getImplicitRowFieldPrefix(), "config.getImplicitRowFieldPrefix() is null");
}

@Override
public CompletableFuture<?> appendPage(Page page)
{
MongoCollection<Document> collection = mongoSession.getCollection(schemaTableName);
MongoCollection<Document> collection = mongoSession.getCollection(remoteTableName);
List<Document> batch = new ArrayList<>(page.getPositionCount());

for (int position = 0; position < page.getPositionCount(); position++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ public MongoPageSinkProvider(MongoClientConfig config, MongoSession mongoSession
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, ConnectorPageSinkId pageSinkId)
{
MongoOutputTableHandle handle = (MongoOutputTableHandle) outputTableHandle;
return new MongoPageSink(config, mongoSession, handle.getSchemaTableName(), handle.getColumns());
return new MongoPageSink(config, mongoSession, handle.getRemoteTableName(), handle.getColumns());
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, ConnectorPageSinkId pageSinkId)
{
MongoInsertTableHandle handle = (MongoInsertTableHandle) insertTableHandle;
return new MongoPageSink(config, mongoSession, handle.getSchemaTableName(), handle.getColumns());
return new MongoPageSink(config, mongoSession, handle.getRemoteTableName(), handle.getColumns());
}
}
Loading

0 comments on commit 24d7f87

Please sign in to comment.