Skip to content

Commit

Permalink
Connectors use merge for delete and update
Browse files Browse the repository at this point in the history
Update the connectors that implement DELETE and/or UPDATE to use the
SQL MERGE machinery instead, and change the static booleans to plan
DELETE and UPDATE using merge plumbing.

Fix/add tests as appropriate.
  • Loading branch information
djsstarburst authored and electrum committed Nov 23, 2022
1 parent e052fad commit fcd603d
Show file tree
Hide file tree
Showing 51 changed files with 662 additions and 364 deletions.
2 changes: 1 addition & 1 deletion core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class FeaturesConfig
static final String SPILL_ENABLED = "spill-enabled";
public static final String SPILLER_SPILL_PATH = "spiller-spill-path";

private boolean legacyUpdateDeleteImplementation = true;
private boolean legacyUpdateDeleteImplementation;

private boolean redistributeWrites = true;
private boolean scaleWriters = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,12 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
{
return new MockPageSink();
}

@Override
public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId)
{
return new MockPageSink();
}
}

private static class MockPageSink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class TestFeaturesConfig
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(FeaturesConfig.class)
.setLegacyUpdateDeleteImplementation(true)
.setLegacyUpdateDeleteImplementation(false)
.setLegacyCatalogRoles(false)
.setRedistributeWrites(true)
.setScaleWriters(true)
Expand Down Expand Up @@ -73,7 +73,7 @@ public void testDefaults()
public void testExplicitPropertyMappings()
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("deprecated.legacy-update-delete-implementation", "false")
.put("deprecated.legacy-update-delete-implementation", "true")
.put("redistribute-writes", "false")
.put("scale-writers", "false")
.put("writer-min-size", "42GB")
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testExplicitPropertyMappings()
.buildOrThrow();

FeaturesConfig expected = new FeaturesConfig()
.setLegacyUpdateDeleteImplementation(false)
.setLegacyUpdateDeleteImplementation(true)
.setRedistributeWrites(false)
.setScaleWriters(false)
.setWriterMinSize(DataSize.of(42, GIGABYTE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@

public interface ConnectorMetadata
{
String MODIFYING_ROWS_MESSAGE = "This connector does not support modifying table rows";

/**
* Checks if a schema exists. The connector may have schemas that exist
* but are not enumerable via {@link #listSchemaNames}.
Expand Down Expand Up @@ -702,7 +704,7 @@ default void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHa
*/
default RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support merges");
throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE);
}

/**
Expand All @@ -712,7 +714,7 @@ default RowChangeParadigm getRowChangeParadigm(ConnectorSession session, Connect
*/
default ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support merges");
throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE);
}

/**
Expand All @@ -731,7 +733,7 @@ default Optional<ConnectorPartitioningHandle> getUpdateLayout(ConnectorSession s
*/
default ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support merges");
throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE);
}

/**
Expand Down
1 change: 0 additions & 1 deletion docs/src/main/sphinx/develop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ This guide is intended for Trino contributors and plugin developers.
develop/connectors
develop/example-http
develop/insert
develop/delete-and-update
develop/supporting-merge
develop/types
develop/functions
Expand Down
4 changes: 0 additions & 4 deletions docs/src/main/sphinx/develop/connectors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -507,10 +507,6 @@ provider when it's possible to create pages directly. The conversion of
individual records from a record set provider into pages adds overheads during
query execution.

To add support for updating and/or deleting rows in a connector, it needs
to implement a ``ConnectorPageSourceProvider`` that returns
an ``UpdatablePageSource``. See :doc:`delete-and-update` for more.

.. _connector-page-sink-provider:

ConnectorPageSinkProvider
Expand Down
196 changes: 0 additions & 196 deletions docs/src/main/sphinx/develop/delete-and-update.rst

This file was deleted.

6 changes: 6 additions & 0 deletions docs/src/main/sphinx/develop/supporting-merge.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ of ``ConnectorMergeSink``, which is typically layered on top of a
methods to get a "rowId" column handle; get the row change paradigm;
and to start and complete the ``MERGE`` operation.

The Trino engine machinery used to implement SQL ``MERGE`` has now
been used to support SQL ``DELETE`` and ``UPDATE``, replacing the
previous implementations. This means that all a connector needs to
do is implement support for SQL ``MERGE``, and the connector gets
all the DML operations.

Standard SQL ``MERGE``
----------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,17 @@ public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, Connect
@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
{
throw new TrinoException(NOT_SUPPORTED, "Unsupported delete");
throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE);
}

@Override
public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
// The column is used for row-level merge, which is not supported, but it's required during analysis anyway.
return new JdbcColumnHandle(
"$merge_row_id",
new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()),
BIGINT);
}

@Override
Expand Down
Loading

0 comments on commit fcd603d

Please sign in to comment.