Skip to content

Commit

Permalink
Support retries in Memory connector
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Mar 2, 2022
1 parent 9d67738 commit 17ef770
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SampleApplicationResult;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SchemaNotFoundException;
Expand Down Expand Up @@ -68,6 +69,7 @@
import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.SampleType.SYSTEM;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -214,12 +216,12 @@ public synchronized void renameTable(ConnectorSession session, ConnectorTableHan
@Override
public synchronized void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
ConnectorOutputTableHandle outputTableHandle = beginCreateTable(session, tableMetadata, Optional.empty());
ConnectorOutputTableHandle outputTableHandle = beginCreateTable(session, tableMetadata, Optional.empty(), NO_RETRIES);
finishCreateTable(session, outputTableHandle, ImmutableList.of(), ImmutableList.of());
}

@Override
public synchronized MemoryOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout)
public synchronized MemoryOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
checkSchemaExists(tableMetadata.getTable().getSchemaName());
checkTableNotExists(tableMetadata.getTable());
Expand Down Expand Up @@ -272,7 +274,7 @@ public synchronized Optional<ConnectorOutputMetadata> finishCreateTable(Connecto
}

@Override
public synchronized MemoryInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns)
public synchronized MemoryInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns, RetryMode retryMode)
{
MemoryTableHandle memoryTableHandle = (MemoryTableHandle) tableHandle;
return new MemoryInsertTableHandle(memoryTableHandle.getId(), ImmutableSet.copyOf(tableIds.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@

import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -84,7 +86,7 @@ private static class MemoryPageSink
private final MemoryPagesStore pagesStore;
private final HostAddress currentHostAddress;
private final long tableId;
private long addedRows;
private final List<Page> appendedPages = new ArrayList<>();

public MemoryPageSink(MemoryPagesStore pagesStore, HostAddress currentHostAddress, long tableId)
{
Expand All @@ -96,14 +98,20 @@ public MemoryPageSink(MemoryPagesStore pagesStore, HostAddress currentHostAddres
@Override
public CompletableFuture<?> appendPage(Page page)
{
pagesStore.add(tableId, page);
addedRows += page.getPositionCount();
appendedPages.add(page);
return NOT_BLOCKED;
}

@Override
public CompletableFuture<Collection<Slice>> finish()
{
// add pages to pagesStore
long addedRows = 0;
for (Page page : appendedPages) {
pagesStore.add(tableId, page);
addedRows += page.getPositionCount();
}

return completedFuture(ImmutableList.of(new MemoryDataFragment(currentHostAddress, addedRows).toSlice()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.security.PrincipalType.USER;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
Expand Down Expand Up @@ -70,7 +71,8 @@ public void tableIsCreatedAfterCommits()
ConnectorOutputTableHandle table = metadata.beginCreateTable(
SESSION,
new ConnectorTableMetadata(schemaTableName, ImmutableList.of(), ImmutableMap.of()),
Optional.empty());
Optional.empty(),
NO_RETRIES);

metadata.finishCreateTable(SESSION, table, ImmutableList.of(), ImmutableList.of());

Expand Down Expand Up @@ -111,7 +113,7 @@ public void testActiveTableIds()
MemoryTableHandle firstTableHandle = (MemoryTableHandle) metadata.getTableHandle(SESSION, firstTableName);
long firstTableId = firstTableHandle.getId();

assertTrue(metadata.beginInsert(SESSION, firstTableHandle, ImmutableList.of()).getActiveTableIds().contains(firstTableId));
assertTrue(metadata.beginInsert(SESSION, firstTableHandle, ImmutableList.of(), NO_RETRIES).getActiveTableIds().contains(firstTableId));

SchemaTableName secondTableName = new SchemaTableName("default", "second_table");
metadata.createTable(SESSION, new ConnectorTableMetadata(secondTableName, ImmutableList.of(), ImmutableMap.of()), false);
Expand All @@ -120,8 +122,8 @@ public void testActiveTableIds()
long secondTableId = secondTableHandle.getId();

assertNotEquals(firstTableId, secondTableId);
assertTrue(metadata.beginInsert(SESSION, secondTableHandle, ImmutableList.of()).getActiveTableIds().contains(firstTableId));
assertTrue(metadata.beginInsert(SESSION, secondTableHandle, ImmutableList.of()).getActiveTableIds().contains(secondTableId));
assertTrue(metadata.beginInsert(SESSION, secondTableHandle, ImmutableList.of(), NO_RETRIES).getActiveTableIds().contains(firstTableId));
assertTrue(metadata.beginInsert(SESSION, secondTableHandle, ImmutableList.of(), NO_RETRIES).getActiveTableIds().contains(secondTableId));
}

@Test
Expand All @@ -134,7 +136,8 @@ public void testReadTableBeforeCreationCompleted()
ConnectorOutputTableHandle table = metadata.beginCreateTable(
SESSION,
new ConnectorTableMetadata(tableName, ImmutableList.of(), ImmutableMap.of()),
Optional.empty());
Optional.empty(),
NO_RETRIES);

List<SchemaTableName> tableNames = metadata.listTables(SESSION, Optional.empty());
assertEquals(tableNames.size(), 1, "Expected exactly one table");
Expand Down Expand Up @@ -277,7 +280,11 @@ public void testCreateTableAndViewInNotExistSchema()
assertEquals(metadata.listSchemaNames(SESSION), ImmutableList.of("default"));

SchemaTableName table1 = new SchemaTableName("test1", "test_schema_table1");
assertTrinoExceptionThrownBy(() -> metadata.beginCreateTable(SESSION, new ConnectorTableMetadata(table1, ImmutableList.of(), ImmutableMap.of()), Optional.empty()))
assertTrinoExceptionThrownBy(() -> metadata.beginCreateTable(
SESSION,
new ConnectorTableMetadata(table1, ImmutableList.of(), ImmutableMap.of()),
Optional.empty(),
NO_RETRIES))
.hasErrorCode(NOT_FOUND)
.hasMessage("Schema test1 not found");
assertNull(metadata.getTableHandle(SESSION, table1));
Expand Down Expand Up @@ -305,7 +312,8 @@ public void testRenameTable()
ConnectorOutputTableHandle table = metadata.beginCreateTable(
SESSION,
new ConnectorTableMetadata(tableName, ImmutableList.of(), ImmutableMap.of()),
Optional.empty());
Optional.empty(),
NO_RETRIES);
metadata.finishCreateTable(SESSION, table, ImmutableList.of(), ImmutableList.of());

// rename table to schema which does not exist
Expand Down

0 comments on commit 17ef770

Please sign in to comment.