Skip to content

Commit

Permalink
Fault Tolerant Execution for PostgreSQL and MySQL connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
mwd410 authored and losipiuk committed Oct 22, 2022
1 parent f546865 commit 5579e70
Show file tree
Hide file tree
Showing 33 changed files with 965 additions and 100 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
run: |
set -x
# Fake identity to tag rebased commits with.
# These commits will be discarded, but the important part is that `git rebase` executes commands between all commits.
# These commits will be discarded, but the important part is that `git rebase` executes commands between all commits.
git config user.name "Compile all commits builder"
git config user.email "compile-all-commits@trino.io"
# Show the entire PR branch and the base ref all the way to the fork point.
Expand Down Expand Up @@ -477,6 +477,8 @@ jobs:
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-hive-1 }
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-hive-2 }
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-iceberg }
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-postgresql }
- { modules: testing/trino-faulttolerant-tests, profile: test-fault-tolerant-mysql }
- { modules: testing/trino-tests }
EOF
./.github/bin/build-matrix-from-impacted.py -v -i gib-impacted.log -m .github/test-matrix.yaml -o matrix.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorMergeSink;
import io.trino.split.PageSinkId;
import io.trino.split.PageSinkManager;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableWriterNode.MergeTarget;
Expand Down Expand Up @@ -59,7 +60,7 @@ public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, MergeWriterOperator.class.getSimpleName());
ConnectorMergeSink mergeSink = pageSinkManager.createMergeSink(session, target.getMergeHandle().orElseThrow());
ConnectorMergeSink mergeSink = pageSinkManager.createMergeSink(session, target.getMergeHandle().orElseThrow(), PageSinkId.fromTaskId(driverContext.getTaskId()));
return new MergeWriterOperator(context, mergeSink, pagePreprocessor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.type.Type;
import io.trino.split.PageSinkId;
import io.trino.split.PageSinkManager;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableWriterNode;
Expand Down Expand Up @@ -112,22 +113,22 @@ public Operator createOperator(DriverContext driverContext)
OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, TableWriterOperator.class.getSimpleName());
Operator statisticsAggregationOperator = statisticsAggregationOperatorFactory.createOperator(driverContext);
boolean statisticsCpuTimerEnabled = !(statisticsAggregationOperator instanceof DevNullOperator) && isStatisticsCpuTimerEnabled(session);
return new TableWriterOperator(context, createPageSink(), columnChannels, statisticsAggregationOperator, types, statisticsCpuTimerEnabled);
return new TableWriterOperator(context, createPageSink(driverContext), columnChannels, statisticsAggregationOperator, types, statisticsCpuTimerEnabled);
}

private ConnectorPageSink createPageSink()
private ConnectorPageSink createPageSink(DriverContext driverContext)
{
if (target instanceof CreateTarget) {
return pageSinkManager.createPageSink(session, ((CreateTarget) target).getHandle());
return pageSinkManager.createPageSink(session, ((CreateTarget) target).getHandle(), PageSinkId.fromTaskId(driverContext.getTaskId()));
}
if (target instanceof InsertTarget) {
return pageSinkManager.createPageSink(session, ((InsertTarget) target).getHandle());
return pageSinkManager.createPageSink(session, ((InsertTarget) target).getHandle(), PageSinkId.fromTaskId(driverContext.getTaskId()));
}
if (target instanceof TableWriterNode.RefreshMaterializedViewTarget) {
return pageSinkManager.createPageSink(session, ((TableWriterNode.RefreshMaterializedViewTarget) target).getInsertHandle());
return pageSinkManager.createPageSink(session, ((TableWriterNode.RefreshMaterializedViewTarget) target).getInsertHandle(), PageSinkId.fromTaskId(driverContext.getTaskId()));
}
if (target instanceof TableWriterNode.TableExecuteTarget) {
return pageSinkManager.createPageSink(session, ((TableWriterNode.TableExecuteTarget) target).getExecuteHandle());
return pageSinkManager.createPageSink(session, ((TableWriterNode.TableExecuteTarget) target).getExecuteHandle(), PageSinkId.fromTaskId(driverContext.getTaskId()));
}
throw new UnsupportedOperationException("Unhandled target type: " + target.getClass().getName());
}
Expand Down
47 changes: 47 additions & 0 deletions core/trino-main/src/main/java/io/trino/split/PageSinkId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.split;

import io.trino.execution.TaskId;
import io.trino.spi.connector.ConnectorPageSinkId;

import static com.google.common.base.Preconditions.checkArgument;

public class PageSinkId
implements ConnectorPageSinkId
{
private final long id;

public static PageSinkId fromTaskId(TaskId taskId)
{
long stageId = taskId.getStageId().getId();
long partitionId = taskId.getPartitionId();
checkArgument(partitionId == (partitionId & 0x00FFFFFF), "partitionId is out of allowable range");
long attemptId = taskId.getAttemptId();
checkArgument(attemptId == (attemptId & 0xFF), "attemptId is out of allowable range");
long id = (stageId << 32) + (partitionId << 8) + attemptId;
return new PageSinkId(id);
}

private PageSinkId(long id)
{
this.id = id;
}

@Override
public long getId()
{
return this.id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.metadata.TableHandle;
import io.trino.spi.connector.ConnectorMergeSink;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkId;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;

Expand All @@ -42,35 +43,35 @@ public PageSinkManager(CatalogServiceProvider<ConnectorPageSinkProvider> pageSin
}

@Override
public ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle)
public ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getCatalogHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle(), pageSinkId);
}

@Override
public ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle)
public ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
// assumes connectorId and catalog are the same
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getCatalogHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle(), pageSinkId);
}

@Override
public ConnectorPageSink createPageSink(Session session, TableExecuteHandle tableHandle)
public ConnectorPageSink createPageSink(Session session, TableExecuteHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
// assumes connectorId and catalog are the same
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getCatalogHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle());
return providerFor(tableHandle.getCatalogHandle()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle(), pageSinkId);
}

@Override
public ConnectorMergeSink createMergeSink(Session session, MergeHandle mergeHandle)
public ConnectorMergeSink createMergeSink(Session session, MergeHandle mergeHandle, ConnectorPageSinkId pageSinkId)
{
// assumes connectorId and catalog are the same
TableHandle tableHandle = mergeHandle.getTableHandle();
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getCatalogHandle());
return providerFor(tableHandle.getCatalogHandle()).createMergeSink(tableHandle.getTransaction(), connectorSession, mergeHandle.getConnectorMergeHandle());
return providerFor(tableHandle.getCatalogHandle()).createMergeSink(tableHandle.getTransaction(), connectorSession, mergeHandle.getConnectorMergeHandle(), pageSinkId);
}

private ConnectorPageSinkProvider providerFor(CatalogHandle catalogHandle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@
import io.trino.metadata.TableExecuteHandle;
import io.trino.spi.connector.ConnectorMergeSink;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkId;

public interface PageSinkProvider
{
/*
* Used for CTAS
*/
ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle);
ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle, ConnectorPageSinkId pageSinkId);

/*
* Used to insert into an existing table
*/
ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle);
ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle, ConnectorPageSinkId pageSinkId);

ConnectorPageSink createPageSink(Session session, TableExecuteHandle tableHandle);
ConnectorPageSink createPageSink(Session session, TableExecuteHandle tableHandle, ConnectorPageSinkId pageSinkId);

/*
* Used to write the result of SQL MERGE to an existing table
*/
ConnectorMergeSink createMergeSink(Session session, MergeHandle mergeHandle);
ConnectorMergeSink createMergeSink(Session session, MergeHandle mergeHandle, ConnectorPageSinkId pageSinkId);
}
58 changes: 58 additions & 0 deletions core/trino-main/src/test/java/io/trino/split/TestPageSinkId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.split;

import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.spi.QueryId;
import org.testng.annotations.Test;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;

public class TestPageSinkId
{
private PageSinkId fromTaskId(int stageId, int partitionId, int attemptId)
{
return PageSinkId.fromTaskId(new TaskId(new StageId(new QueryId("query"), stageId), partitionId, attemptId));
}

@Test
public void testFromTaskId()
{
PageSinkId pageSinkId = fromTaskId(1, 2, 3);
long expected = (1L << 32) + (2L << 8) + 3L;
assertEquals(pageSinkId.getId(), expected);
}

@Test
public void testFromTaskIdChecks()
{
assertThatThrownBy(() -> {
fromTaskId(1, 1 << 24, 3);
}).hasMessageContaining("partitionId is out of allowable range");

assertThatThrownBy(() -> {
fromTaskId(1, -1, 3);
}).hasMessageContaining("partitionId is negative");

assertThatThrownBy(() -> {
fromTaskId(1, 2, 256);
}).hasMessageContaining("attemptId is out of allowable range");

assertThatThrownBy(() -> {
fromTaskId(1, 2, -1);
}).hasMessageContaining("attemptId is negative");
}
}
Loading

0 comments on commit 5579e70

Please sign in to comment.