Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow disabling client segment acknowledgment #24125

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static Segment inlined(byte[] data, DataAttributes attributes)
return new InlineSegment(data, attributes);
}

public static Segment spooled(URI retrieveUri, URI ackUri, DataAttributes attributes, Map<String, List<String>> headers)
public static Segment spooled(URI retrieveUri, Optional<URI> ackUri, DataAttributes attributes, Map<String, List<String>> headers)
{
return new SpooledSegment(retrieveUri, ackUri, attributes, headers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -55,7 +56,7 @@ public InputStream load(SpooledSegment segment)
return loadFromURI(segment.getDataUri(), segment.getAckUri(), segment.getHeaders());
}

public InputStream loadFromURI(URI segmentUri, URI ackUri, Map<String, List<String>> headers)
public InputStream loadFromURI(URI segmentUri, Optional<URI> ackUri, Map<String, List<String>> headers)
throws IOException
{
Headers requestHeaders = toHeaders(headers);
Expand Down Expand Up @@ -99,7 +100,7 @@ public void onResponse(Call call, Response response)
});
}

private InputStream delegatingInputStream(Response response, InputStream delegate, URI ackUri, Headers headers)
private InputStream delegatingInputStream(Response response, InputStream delegate, Optional<URI> ackUri, Headers headers)
{
return new FilterInputStream(delegate)
{
Expand All @@ -108,7 +109,7 @@ public void close()
throws IOException
{
try (Response ignored = response; InputStream ignored2 = delegate) {
acknowledge(ackUri, headers);
ackUri.ifPresent(uri -> acknowledge(uri, headers));
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.MoreObjects.firstNonNull;
import static java.lang.String.format;
Expand All @@ -30,20 +31,20 @@ public final class SpooledSegment
extends Segment
{
private final URI dataUri;
private final Optional<URI> ackUri;
private final Map<String, List<String>> headers;
private final URI ackUri;

@JsonCreator
public SpooledSegment(
@JsonProperty("uri") URI dataUri,
@JsonProperty("ackUri") URI ackUri,
@JsonProperty("ackUri") Optional<URI> ackUri,
@JsonProperty("metadata") Map<String, Object> metadata,
@JsonProperty("headers") Map<String, List<String>> headers)
{
this(dataUri, ackUri, new DataAttributes(metadata), headers);
}

SpooledSegment(URI dataUri, URI ackUri, DataAttributes metadata, Map<String, List<String>> headers)
SpooledSegment(URI dataUri, Optional<URI> ackUri, DataAttributes metadata, Map<String, List<String>> headers)
{
super(metadata);
this.dataUri = requireNonNull(dataUri, "dataUri is null");
Expand All @@ -58,7 +59,7 @@ public URI getDataUri()
}

@JsonProperty("ackUri")
public URI getAckUri()
public Optional<URI> getAckUri()
{
return ackUri;
}
Expand All @@ -73,6 +74,6 @@ public Map<String, List<String>> getHeaders()
@Override
public String toString()
{
return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet());
return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s, ack=%b}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet(), ackUri.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static class OutputSpoolingOperator
implements Operator
{
private final OutputSpoolingController controller;
private final boolean explicitAck;

enum State
{
Expand Down Expand Up @@ -165,6 +166,7 @@ public OutputSpoolingOperator(OperatorContext operatorContext, QueryDataEncoder
spoolingConfig.getMaximumSegmentSize().toBytes(),
spoolingConfig.getInitialSegmentSize().toBytes(),
spoolingConfig.getMaximumSegmentSize().toBytes());
this.explicitAck = spoolingConfig.isExplicitAck();
this.userMemoryContext = operatorContext.newLocalUserMemoryContext(OutputSpoolingOperator.class.getSimpleName());
this.queryDataEncoder = requireNonNull(queryDataEncoder, "queryDataEncoder is null");
this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null");
Expand Down Expand Up @@ -278,7 +280,7 @@ private Page spool(List<Page> pages, boolean finished)
controller.recordEncoded(attributes.get(SEGMENT_SIZE, Integer.class));

// This page is small (hundreds of bytes) so there is no point in tracking its memory usage
return emptySingleRowPage(SpooledBlock.forLocation(spoolingManager.location(segmentHandle), attributes).serialize());
return emptySingleRowPage(SpooledBlock.forLocation(spoolingManager.location(segmentHandle), attributes, explicitAck).serialize());
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ public class CoordinatorSegmentResource
private final SpoolingManager spoolingManager;
private final SegmentRetrievalMode retrievalMode;
private final InternalNodeManager nodeManager;
private final boolean explicitAck;

@Inject
public CoordinatorSegmentResource(SpoolingManager spoolingManager, SpoolingConfig config, InternalNodeManager nodeManager)
{
this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null");
this.retrievalMode = requireNonNull(config, "config is null").getRetrievalMode();
this.explicitAck = config.isExplicitAck();
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
}

Expand Down Expand Up @@ -97,6 +99,12 @@ public Response download(@Context UriInfo uriInfo, @PathParam("identifier") Stri
public Response acknowledge(@PathParam("identifier") String identifier, @Context HttpHeaders headers)
throws IOException
{
if (!explicitAck) {
return Response.status(Response.Status.NOT_ACCEPTABLE)
.entity("Explicit segment acknowledgment is disabled")
.build();
}

try {
spoolingManager.acknowledge(handle(identifier, headers));
return Response.ok().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@
import static io.airlift.json.JsonCodec.listJsonCodec;
import static io.airlift.json.JsonCodec.mapJsonCodec;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.VarcharType.VARCHAR;

public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers, DataAttributes attributes)
public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers, DataAttributes attributes, boolean explicitAck)
{
private static final JsonCodec<Map<String, List<String>>> HEADERS_CODEC = mapJsonCodec(String.class, listJsonCodec(String.class));
private static final JsonCodec<DataAttributes> ATTRIBUTES_CODEC = JsonCodec.jsonCodec(DataAttributes.class);
Expand All @@ -47,7 +48,8 @@ public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String
new RowType.Field(Optional.of("identifier"), VARCHAR),
new RowType.Field(Optional.of("directLocation"), VARCHAR),
new RowType.Field(Optional.of("headers"), VARCHAR),
new RowType.Field(Optional.of("metadata"), VARCHAR)));
new RowType.Field(Optional.of("metadata"), VARCHAR),
new RowType.Field(Optional.of("explicitAck"), BOOLEAN)));

public static final String SPOOLING_METADATA_COLUMN_NAME = "$spooling:metadata$";
public static final Symbol SPOOLING_METADATA_SYMBOL = new Symbol(SPOOLING_METADATA_TYPE, SPOOLING_METADATA_COLUMN_NAME);
Expand All @@ -63,29 +65,33 @@ public static SpooledBlock deserialize(Page page)
VARCHAR.getSlice(row.getRawFieldBlock(0), 0),
Optional.empty(), // Not a direct location
HEADERS_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(2), 0).toStringUtf8()),
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()));
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()),
BOOLEAN.getBoolean(row.getRawFieldBlock(4), 0));
}

return new SpooledBlock(
VARCHAR.getSlice(row.getRawFieldBlock(0), 0),
Optional.of(URI.create(VARCHAR.getSlice(row.getRawFieldBlock(1), 0).toStringUtf8())),
HEADERS_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(2), 0).toStringUtf8()),
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()));
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()),
BOOLEAN.getBoolean(row.getRawFieldBlock(4), 0));
}

public static SpooledBlock forLocation(SpooledLocation location, DataAttributes attributes)
public static SpooledBlock forLocation(SpooledLocation location, DataAttributes attributes, boolean explicitAck)
{
return switch (location) {
case DirectLocation directLocation -> new SpooledBlock(
directLocation.identifier(),
Optional.of(directLocation.directUri()),
directLocation.headers(),
attributes);
attributes,
explicitAck);
case CoordinatorLocation coordinatorLocation -> new SpooledBlock(
coordinatorLocation.identifier(),
Optional.empty(),
coordinatorLocation.headers(),
attributes);
attributes,
explicitAck);
};
}

Expand All @@ -109,6 +115,7 @@ void serialize(RowBlockBuilder rowBlockBuilder)
}
VARCHAR.writeSlice(rowEntryBuilder.get(2), utf8Slice(HEADERS_CODEC.toJson(headers)));
VARCHAR.writeSlice(rowEntryBuilder.get(3), utf8Slice(ATTRIBUTES_CODEC.toJson(attributes)));
BOOLEAN.writeBoolean(rowEntryBuilder.get(4), explicitAck);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

import static io.trino.client.spooling.DataAttribute.ROWS_COUNT;
Expand Down Expand Up @@ -76,8 +77,9 @@ public QueryData produce(ExternalUriInfo uriInfo, Session session, QueryResultRo
.set(ROW_OFFSET, currentOffset)
.build();
builder.withSegment(spooled(
metadata.directUri().orElseGet(() -> buildSegmentDownloadURI(uriBuilder, metadata.identifier())),
buildSegmentAckURI(uriBuilder, metadata.identifier()),
metadata.directUri()
.orElseGet(() -> buildSegmentDownloadURI(uriBuilder, metadata.identifier())),
metadata.explicitAck() ? Optional.of(buildSegmentAckURI(uriBuilder, metadata.identifier())) : Optional.empty(),
attributes,
metadata.headers()));
currentOffset += attributes.get(ROWS_COUNT, Long.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class SpoolingConfig
private Optional<Duration> storageRedirectTtl = Optional.empty();

private boolean allowInlining = true;
private boolean explicitAck = true;
private long maximumInlinedRows = 1000;
private DataSize maximumInlinedSize = DataSize.of(128, KILOBYTE);
private DataSize initialSegmentSize = DataSize.of(8, MEGABYTE);
Expand Down Expand Up @@ -123,6 +124,19 @@ public SpoolingConfig setAllowInlining(boolean allowInlining)
return this;
}

public boolean isExplicitAck()
{
return explicitAck;
}

@ConfigDescription("Allow client to acknowledge segment retrieval and its eager removal")
@Config("protocol.spooling.explicit-ack.enabled")
public SpoolingConfig setExplicitAck(boolean explicitAck)
{
this.explicitAck = explicitAck;
return this;
}

public long getMaximumInlinedRows()
{
return maximumInlinedRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Set;

Expand Down Expand Up @@ -138,11 +139,11 @@ public void testSpooledQueryDataSerialization()
inlined("super".getBytes(UTF_8), dataAttributes(0, 100, 5)),
spooled(
URI.create("http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/1"),
URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/1"),
Optional.of(URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/1")),
dataAttributes(100, 100, 1024), Map.of("x-amz-server-side-encryption", List.of("AES256"))),
spooled(
URI.create("http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2"),
URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/2"),
Optional.empty(),
dataAttributes(200, 100, 1024), Map.of("x-amz-server-side-encryption", List.of("AES256")))))
.withAttributes(DataAttributes.builder()
.set(SCHEMA, "serializedSchema")
Expand Down Expand Up @@ -179,7 +180,6 @@ public void testSpooledQueryDataSerialization()
{
"type": "spooled",
"uri": "http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2",
"ackUri": "http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/2",
"metadata": {
"rowOffset": 200,
"rowsCount": 100,
Expand All @@ -199,9 +199,15 @@ public void testEncodedQueryDataToString()

EncodedQueryData spooledQueryData = new EncodedQueryData("json+zstd", ImmutableMap.of("decryption_key", "secret"), ImmutableList.of(spooled(
URI.create("http://coordinator:8080/v1/segments/uuid"),
Optional.of(URI.create("http://coordinator:8080/v1/segments/uuid")),
dataAttributes(10, 2, 1256), headers())));
assertThat(spooledQueryData.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption], ack=true}], metadata=[decryption_key]}");

EncodedQueryData spooledQueryDataWithoutAck = new EncodedQueryData("json+zstd", ImmutableMap.of("decryption_key", "secret"), ImmutableList.of(spooled(
URI.create("http://coordinator:8080/v1/segments/uuid"),
Optional.empty(),
dataAttributes(10, 2, 1256), headers())));
assertThat(spooledQueryData.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption]}], metadata=[decryption_key]}");
assertThat(spooledQueryDataWithoutAck.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption], ack=false}], metadata=[decryption_key]}");
}

private void testRoundTrip(QueryData queryData, String expectedDataRepresentation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,31 @@ public void verifySerialization(Slice identifier, Optional<URI> directUri, Map<S

public void verifySerializationRoundTrip(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1200));
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1200), true);
Page page = new Page(metadata.serialize());
SpooledBlock retrieved = SpooledBlock.deserialize(page);
assertThat(metadata).isEqualTo(retrieved);
}

private void verifySerializationRoundTripWithNonEmptyPage(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1100));
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1100), false);
Page page = new Page(blockWithPositions(1, true), metadata.serialize());
SpooledBlock retrieved = SpooledBlock.deserialize(page);
assertThat(metadata).isEqualTo(retrieved);
}

private void verifyThrowsErrorOnNonNullPositions(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(20, 1200));
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(20, 1200), true);

assertThatThrownBy(() -> SpooledBlock.deserialize(new Page(blockWithPositions(1, false), metadata.serialize())))
.hasMessage("Spooling metadata block must have all but last channels null");
}

private void verifyThrowsErrorOnMultiplePositions(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
{
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(30, 1300));
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(30, 1300), false);
RowBlockBuilder rowBlockBuilder = SPOOLING_METADATA_TYPE.createBlockBuilder(null, 2);
metadata.serialize(rowBlockBuilder);
metadata.serialize(rowBlockBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public void testDefaults()
.setMaximumSegmentSize(DataSize.of(16, MEGABYTE))
.setMaximumInlinedRows(1000)
.setMaximumInlinedSize(DataSize.of(128, KILOBYTE))
.setAllowInlining(true));
.setAllowInlining(true)
.setExplicitAck(true));
}

@Test
Expand All @@ -57,6 +58,7 @@ public void testExplicitPropertyMappings()
.put("protocol.spooling.retrieval-mode", "coordinator_storage_redirect")
.put("protocol.spooling.coordinator-storage-redirect-ttl", "60s")
.put("protocol.spooling.inlining.enabled", "false")
.put("protocol.spooling.explicit-ack.enabled", "false")
.put("protocol.spooling.initial-segment-size", "1kB")
.put("protocol.spooling.maximum-segment-size", "8kB")
.put("protocol.spooling.inlining.max-rows", "1024")
Expand All @@ -71,7 +73,8 @@ public void testExplicitPropertyMappings()
.setMaximumSegmentSize(DataSize.of(8, KILOBYTE))
.setMaximumInlinedRows(1024)
.setMaximumInlinedSize(DataSize.of(1, MEGABYTE))
.setAllowInlining(false);
.setAllowInlining(false)
.setExplicitAck(false);

assertFullMapping(properties, expected);
}
Expand Down
Loading