diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/Segment.java b/client/trino-client/src/main/java/io/trino/client/spooling/Segment.java index df905903adac..85bb9ce35295 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/Segment.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/Segment.java @@ -87,7 +87,7 @@ public static Segment inlined(byte[] data, DataAttributes attributes) return new InlineSegment(data, attributes); } - public static Segment spooled(URI retrieveUri, URI ackUri, DataAttributes attributes, Map> headers) + public static Segment spooled(URI retrieveUri, Optional ackUri, DataAttributes attributes, Map> headers) { return new SpooledSegment(retrieveUri, ackUri, attributes, headers); } diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/SegmentLoader.java b/client/trino-client/src/main/java/io/trino/client/spooling/SegmentLoader.java index 7403ce81c7ca..54d9d26ac8fa 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/SegmentLoader.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/SegmentLoader.java @@ -26,6 +26,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; @@ -55,7 +56,7 @@ public InputStream load(SpooledSegment segment) return loadFromURI(segment.getDataUri(), segment.getAckUri(), segment.getHeaders()); } - public InputStream loadFromURI(URI segmentUri, URI ackUri, Map> headers) + public InputStream loadFromURI(URI segmentUri, Optional ackUri, Map> headers) throws IOException { Headers requestHeaders = toHeaders(headers); @@ -99,7 +100,7 @@ public void onResponse(Call call, Response response) }); } - private InputStream delegatingInputStream(Response response, InputStream delegate, URI ackUri, Headers headers) + private InputStream delegatingInputStream(Response response, InputStream delegate, Optional ackUri, Headers headers) { return new FilterInputStream(delegate) { @@ -108,7 +109,7 @@ public void close() throws IOException { try (Response ignored = response; InputStream ignored2 = delegate) { - acknowledge(ackUri, headers); + ackUri.ifPresent(uri -> acknowledge(uri, headers)); } } }; diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegment.java b/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegment.java index 31fd4b7c1ead..a6f22672cb83 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegment.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegment.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Optional; import static com.google.common.base.MoreObjects.firstNonNull; import static java.lang.String.format; @@ -30,20 +31,20 @@ public final class SpooledSegment extends Segment { private final URI dataUri; + private final Optional ackUri; private final Map> headers; - private final URI ackUri; @JsonCreator public SpooledSegment( @JsonProperty("uri") URI dataUri, - @JsonProperty("ackUri") URI ackUri, + @JsonProperty("ackUri") Optional ackUri, @JsonProperty("metadata") Map metadata, @JsonProperty("headers") Map> headers) { this(dataUri, ackUri, new DataAttributes(metadata), headers); } - SpooledSegment(URI dataUri, URI ackUri, DataAttributes metadata, Map> headers) + SpooledSegment(URI dataUri, Optional ackUri, DataAttributes metadata, Map> headers) { super(metadata); this.dataUri = requireNonNull(dataUri, "dataUri is null"); @@ -58,7 +59,7 @@ public URI getDataUri() } @JsonProperty("ackUri") - public URI getAckUri() + public Optional getAckUri() { return ackUri; } @@ -73,6 +74,6 @@ public Map> getHeaders() @Override public String toString() { - return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet()); + return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s, ack=%b}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet(), ackUri.isPresent()); } } diff --git a/core/trino-main/src/main/java/io/trino/operator/OutputSpoolingOperatorFactory.java b/core/trino-main/src/main/java/io/trino/operator/OutputSpoolingOperatorFactory.java index 59ef1e338a3a..b096a079aa17 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OutputSpoolingOperatorFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/OutputSpoolingOperatorFactory.java @@ -135,6 +135,7 @@ static class OutputSpoolingOperator implements Operator { private final OutputSpoolingController controller; + private final boolean explicitAck; enum State { @@ -165,6 +166,7 @@ public OutputSpoolingOperator(OperatorContext operatorContext, QueryDataEncoder spoolingConfig.getMaximumSegmentSize().toBytes(), spoolingConfig.getInitialSegmentSize().toBytes(), spoolingConfig.getMaximumSegmentSize().toBytes()); + this.explicitAck = spoolingConfig.isExplicitAck(); this.userMemoryContext = operatorContext.newLocalUserMemoryContext(OutputSpoolingOperator.class.getSimpleName()); this.queryDataEncoder = requireNonNull(queryDataEncoder, "queryDataEncoder is null"); this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null"); @@ -278,7 +280,7 @@ private Page spool(List pages, boolean finished) controller.recordEncoded(attributes.get(SEGMENT_SIZE, Integer.class)); // This page is small (hundreds of bytes) so there is no point in tracking its memory usage - return emptySingleRowPage(SpooledBlock.forLocation(spoolingManager.location(segmentHandle), attributes).serialize()); + return emptySingleRowPage(SpooledBlock.forLocation(spoolingManager.location(segmentHandle), attributes, explicitAck).serialize()); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/CoordinatorSegmentResource.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/CoordinatorSegmentResource.java index 0ec86b4f53ef..2c6cd673ff7f 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/CoordinatorSegmentResource.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/CoordinatorSegmentResource.java @@ -53,12 +53,14 @@ public class CoordinatorSegmentResource private final SpoolingManager spoolingManager; private final SegmentRetrievalMode retrievalMode; private final InternalNodeManager nodeManager; + private final boolean explicitAck; @Inject public CoordinatorSegmentResource(SpoolingManager spoolingManager, SpoolingConfig config, InternalNodeManager nodeManager) { this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null"); this.retrievalMode = requireNonNull(config, "config is null").getRetrievalMode(); + this.explicitAck = config.isExplicitAck(); this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); } @@ -97,6 +99,12 @@ public Response download(@Context UriInfo uriInfo, @PathParam("identifier") Stri public Response acknowledge(@PathParam("identifier") String identifier, @Context HttpHeaders headers) throws IOException { + if (!explicitAck) { + return Response.status(Response.Status.NOT_ACCEPTABLE) + .entity("Explicit segment acknowledgment is disabled") + .build(); + } + try { spoolingManager.acknowledge(handle(identifier, headers)); return Response.ok().build(); diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpooledBlock.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpooledBlock.java index 5fa8e0dccb70..b292f967113e 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpooledBlock.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpooledBlock.java @@ -36,9 +36,10 @@ import static io.airlift.json.JsonCodec.listJsonCodec; import static io.airlift.json.JsonCodec.mapJsonCodec; import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.VarcharType.VARCHAR; -public record SpooledBlock(Slice identifier, Optional directUri, Map> headers, DataAttributes attributes) +public record SpooledBlock(Slice identifier, Optional directUri, Map> headers, DataAttributes attributes, boolean explicitAck) { private static final JsonCodec>> HEADERS_CODEC = mapJsonCodec(String.class, listJsonCodec(String.class)); private static final JsonCodec ATTRIBUTES_CODEC = JsonCodec.jsonCodec(DataAttributes.class); @@ -47,7 +48,8 @@ public record SpooledBlock(Slice identifier, Optional directUri, Map new SpooledBlock( directLocation.identifier(), Optional.of(directLocation.directUri()), directLocation.headers(), - attributes); + attributes, + explicitAck); case CoordinatorLocation coordinatorLocation -> new SpooledBlock( coordinatorLocation.identifier(), Optional.empty(), coordinatorLocation.headers(), - attributes); + attributes, + explicitAck); }; } @@ -109,6 +115,7 @@ void serialize(RowBlockBuilder rowBlockBuilder) } VARCHAR.writeSlice(rowEntryBuilder.get(2), utf8Slice(HEADERS_CODEC.toJson(headers))); VARCHAR.writeSlice(rowEntryBuilder.get(3), utf8Slice(ATTRIBUTES_CODEC.toJson(attributes))); + BOOLEAN.writeBoolean(rowEntryBuilder.get(4), explicitAck); }); } diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpooledQueryDataProducer.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpooledQueryDataProducer.java index c6b4d900ff14..ab50708f8b92 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpooledQueryDataProducer.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpooledQueryDataProducer.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.net.URI; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; import static io.trino.client.spooling.DataAttribute.ROWS_COUNT; @@ -76,8 +77,9 @@ public QueryData produce(ExternalUriInfo uriInfo, Session session, QueryResultRo .set(ROW_OFFSET, currentOffset) .build(); builder.withSegment(spooled( - metadata.directUri().orElseGet(() -> buildSegmentDownloadURI(uriBuilder, metadata.identifier())), - buildSegmentAckURI(uriBuilder, metadata.identifier()), + metadata.directUri() + .orElseGet(() -> buildSegmentDownloadURI(uriBuilder, metadata.identifier())), + metadata.explicitAck() ? Optional.of(buildSegmentAckURI(uriBuilder, metadata.identifier())) : Optional.empty(), attributes, metadata.headers())); currentOffset += attributes.get(ROWS_COUNT, Long.class); diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingConfig.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingConfig.java index f4143347be0e..2c05e399c5c7 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingConfig.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingConfig.java @@ -38,6 +38,7 @@ public class SpoolingConfig private Optional storageRedirectTtl = Optional.empty(); private boolean allowInlining = true; + private boolean explicitAck = true; private long maximumInlinedRows = 1000; private DataSize maximumInlinedSize = DataSize.of(128, KILOBYTE); private DataSize initialSegmentSize = DataSize.of(8, MEGABYTE); @@ -123,6 +124,19 @@ public SpoolingConfig setAllowInlining(boolean allowInlining) return this; } + public boolean isExplicitAck() + { + return explicitAck; + } + + @ConfigDescription("Allow client to acknowledge segment retrieval and its eager removal") + @Config("protocol.spooling.explicit-ack.enabled") + public SpoolingConfig setExplicitAck(boolean explicitAck) + { + this.explicitAck = explicitAck; + return this; + } + public long getMaximumInlinedRows() { return maximumInlinedRows; diff --git a/core/trino-main/src/test/java/io/trino/server/protocol/TestQueryDataSerialization.java b/core/trino-main/src/test/java/io/trino/server/protocol/TestQueryDataSerialization.java index d1beddc5ca75..46562a3ee0fc 100644 --- a/core/trino-main/src/test/java/io/trino/server/protocol/TestQueryDataSerialization.java +++ b/core/trino-main/src/test/java/io/trino/server/protocol/TestQueryDataSerialization.java @@ -34,6 +34,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.OptionalDouble; import java.util.Set; @@ -138,11 +139,11 @@ public void testSpooledQueryDataSerialization() inlined("super".getBytes(UTF_8), dataAttributes(0, 100, 5)), spooled( URI.create("http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/1"), - URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/1"), + Optional.of(URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/1")), dataAttributes(100, 100, 1024), Map.of("x-amz-server-side-encryption", List.of("AES256"))), spooled( URI.create("http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2"), - URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/2"), + Optional.empty(), dataAttributes(200, 100, 1024), Map.of("x-amz-server-side-encryption", List.of("AES256"))))) .withAttributes(DataAttributes.builder() .set(SCHEMA, "serializedSchema") @@ -179,7 +180,6 @@ public void testSpooledQueryDataSerialization() { "type": "spooled", "uri": "http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2", - "ackUri": "http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/2", "metadata": { "rowOffset": 200, "rowsCount": 100, @@ -199,9 +199,15 @@ public void testEncodedQueryDataToString() EncodedQueryData spooledQueryData = new EncodedQueryData("json+zstd", ImmutableMap.of("decryption_key", "secret"), ImmutableList.of(spooled( URI.create("http://coordinator:8080/v1/segments/uuid"), + Optional.of(URI.create("http://coordinator:8080/v1/segments/uuid")), + dataAttributes(10, 2, 1256), headers()))); + assertThat(spooledQueryData.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption], ack=true}], metadata=[decryption_key]}"); + + EncodedQueryData spooledQueryDataWithoutAck = new EncodedQueryData("json+zstd", ImmutableMap.of("decryption_key", "secret"), ImmutableList.of(spooled( URI.create("http://coordinator:8080/v1/segments/uuid"), + Optional.empty(), dataAttributes(10, 2, 1256), headers()))); - assertThat(spooledQueryData.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption]}], metadata=[decryption_key]}"); + assertThat(spooledQueryDataWithoutAck.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption], ack=false}], metadata=[decryption_key]}"); } private void testRoundTrip(QueryData queryData, String expectedDataRepresentation) diff --git a/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestSpooledBlock.java b/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestSpooledBlock.java index 5dcc642be1ed..bdabb7f3d92c 100644 --- a/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestSpooledBlock.java +++ b/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestSpooledBlock.java @@ -55,7 +55,7 @@ public void verifySerialization(Slice identifier, Optional directUri, Map directUri, Map> headers) { - SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1200)); + SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1200), true); Page page = new Page(metadata.serialize()); SpooledBlock retrieved = SpooledBlock.deserialize(page); assertThat(metadata).isEqualTo(retrieved); @@ -63,7 +63,7 @@ public void verifySerializationRoundTrip(Slice identifier, Optional directU private void verifySerializationRoundTripWithNonEmptyPage(Slice identifier, Optional directUri, Map> headers) { - SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1100)); + SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1100), false); Page page = new Page(blockWithPositions(1, true), metadata.serialize()); SpooledBlock retrieved = SpooledBlock.deserialize(page); assertThat(metadata).isEqualTo(retrieved); @@ -71,7 +71,7 @@ private void verifySerializationRoundTripWithNonEmptyPage(Slice identifier, Opti private void verifyThrowsErrorOnNonNullPositions(Slice identifier, Optional directUri, Map> headers) { - SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(20, 1200)); + SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(20, 1200), true); assertThatThrownBy(() -> SpooledBlock.deserialize(new Page(blockWithPositions(1, false), metadata.serialize()))) .hasMessage("Spooling metadata block must have all but last channels null"); @@ -79,7 +79,7 @@ private void verifyThrowsErrorOnNonNullPositions(Slice identifier, Optional private void verifyThrowsErrorOnMultiplePositions(Slice identifier, Optional directUri, Map> headers) { - SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(30, 1300)); + SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(30, 1300), false); RowBlockBuilder rowBlockBuilder = SPOOLING_METADATA_TYPE.createBlockBuilder(null, 2); metadata.serialize(rowBlockBuilder); metadata.serialize(rowBlockBuilder); diff --git a/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestSpoolingConfig.java b/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestSpoolingConfig.java index 61db68fa4a1e..28a0420d95cf 100644 --- a/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestSpoolingConfig.java +++ b/core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestSpoolingConfig.java @@ -44,7 +44,8 @@ public void testDefaults() .setMaximumSegmentSize(DataSize.of(16, MEGABYTE)) .setMaximumInlinedRows(1000) .setMaximumInlinedSize(DataSize.of(128, KILOBYTE)) - .setAllowInlining(true)); + .setAllowInlining(true) + .setExplicitAck(true)); } @Test @@ -57,6 +58,7 @@ public void testExplicitPropertyMappings() .put("protocol.spooling.retrieval-mode", "coordinator_storage_redirect") .put("protocol.spooling.coordinator-storage-redirect-ttl", "60s") .put("protocol.spooling.inlining.enabled", "false") + .put("protocol.spooling.explicit-ack.enabled", "false") .put("protocol.spooling.initial-segment-size", "1kB") .put("protocol.spooling.maximum-segment-size", "8kB") .put("protocol.spooling.inlining.max-rows", "1024") @@ -71,7 +73,8 @@ public void testExplicitPropertyMappings() .setMaximumSegmentSize(DataSize.of(8, KILOBYTE)) .setMaximumInlinedRows(1024) .setMaximumInlinedSize(DataSize.of(1, MEGABYTE)) - .setAllowInlining(false); + .setAllowInlining(false) + .setExplicitAck(false); assertFullMapping(properties, expected); }