-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allocate new Slice for VariableWidthBlockEncoding readBlock #11235
Conversation
@@ -68,7 +68,8 @@ public Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput sliceIn | |||
boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); | |||
|
|||
int blockSize = sliceInput.readInt(); | |||
Slice slice = sliceInput.readSlice(blockSize); | |||
Slice slice = Slices.allocate(blockSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implies data copying. In memory connector we just use page.compact();
(see
trino/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPagesStore.java
Line 69 in f13283b
page.compact(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting me to compact page before return here?
Compact will also create a copy, therefore achieve the same purpose, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the only place when in block decoding a view is preferred to a copy. It feels like it is more natural to create a copy here. @sopel39 Do you think there's a practical reason why it is better to compact vs creating a copy explicitly here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting me to compact page before return here?
I'm suggesting to compact
if you mean to store page. This is exactly what we do in OrderByOperator
, PartitioningExchanger
, PagesIndex
and others.
@sopel39 Do you think there's a practical reason why it is better to compact vs creating a copy explicitly here?
You don't need that extra copy if the data is going to be consumed immediately (e.g. aggregation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the uncompacted pages are somehow being retained for longer than expected. @linzebing what is the code path that retains those pages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason we don't compact or eagerly copy it that is can be really expensive. This is the same reason we don't compact dictionaries. Also there are methods that will attempt to dedupe things, but we don't use those because they had a big performance impact.
All of that said, I'm not saying "don't do this". I'm saying "be careful" and check the performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also there are methods that will attempt to dedupe things, but we don't use those because they had a big performance impact.
By that @dain means using something like io.trino.operator.project.PageProcessor.ProjectSelectedPositions#updateRetainedSize
on Page
level.
I think this is because compact doesn't compact the dictionary field of a DictionaryBlock, as I mentioned above.
I think we should compact dictionaries on io.trino.spi.block.DictionaryBlock#compact
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll jump in with yet another potential solution here:
You could re-frame this as "VariableWidthBlock
is only block encoding that doesn't copy memory into a new structure again". It would be a bigger change, but if the deserialization operated directly on an InputStreamSliceInput
in the first place instead of eagerly copying the whole serialized page into a Slice
- then this wouldn't be adding unnecessary overheads.
I've looked into trying to do something like that before, and the problems I saw were solvable but included:
- Finding a clean way to still be able to compute the page checksum on the input stream (and having a way to get the current digest and reset for the next logical page at some middle point in the buffer)
- Verifying that moving the deserialization point into the I/O layers (eg: exchange client, unspilling thread) wouldn't degrade performance in unexpected ways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, one way or another the data will have to be copied from an IO
buffer (InputStream
, IO
buffer in ExchangeSource
, etc.) before it can be processed by the engine. The page decoding step seems like a good place to do so. I don't really see why the page decoding should return uncompacted pages if the decoding process requires allocations and memory copying anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I didn’t notice you had made the same suggestion in another comment.
@sopel39 , it feels like the right way to go is to allocate a new Slice here (to be consistent with other encodings). Meanwhile we can eliminate the extra copy here. https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerde.java#L224-L228 How about approving this PR to unblock and file an issue for the latter which I will follow up? |
We use copy here for primitive blocks because it's not possible to map Ideally, you compact pages in places where you want to store them. There are other reasons why page might not be compact (e.g.
How you can eliminate copy there? |
This is largely true. However we do create a copy for flags and byte arrays in other places (e.g.: for "null" flags)
Assuming we eliminate the extra copy in the |
Currently this is not a hard blocker for us so we will get around for now. Create an issue to track this #11315. Will follow up later. |
} | ||
|
||
private static class SerializedPageReader | ||
extends AbstractIterator<Slice> | ||
{ | ||
private final SliceInput input; | ||
private final LittleEndianDataInputStream input; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: LittleEndianDataInputStream
is not efficient at decoding int
/ long
. A more efficient way would be to allocate a single Slice
of a size of the header. Then read the entire header from an InputStream at once into a Slice
and extract the page size as int
with Slice#getInt
that does that in a single machine instruction. Then the content of that Slice
could be efficiently copied into a Slice
returned from the computeNext
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, addressed comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few review notes about the current implementation. It looks like this still requires copying VariableWidthBlock
contents twice if I understand correctly? Is this saving an intermediate copy through InputStreamSliceInput
internal buffers?
if (b < 0) { | ||
return endOfData(); | ||
} | ||
byte b1 = (byte) b; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be: byte b1 = (byte) (b & 0xFF);
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be needed as read()
will only read one byte
int positionCount = Ints.fromBytes(b4, b3, b2, b1); | ||
byte marker = input.readByte(); | ||
int uncompressedSize = input.readInt(); | ||
int compressedSize = input.readInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with @arhimondr, you could pre-allocate and reuse a buffer for the header size and reuse it between calls to computeNext()
since this iterator isn't expected to be threadsafe. If you did, you could even avoid using LittleEndianDataInputStream
entirely and operate directly on any InputStream
. That it might look something like:
private byte[] headerBuffer = new byte[SERIALIZED_PAGE_HEADER_SIZE];
private Slice headerSlice = Slices.wrappedBuffer(headerBuffer);
...
@Override
protected Slice computeNext()
{
try {
int read = ByteStreams.read(input, headerBuffer, 0, headerBuffer.length);
if (read < 0) {
return endOfData();
}
else if (read != headerBuffer.length) {
throw new EOFException();
}
// contents of headerBuffer are visible through headerSlice
int compressedSize = headerSlice.getInt(COMPRESSED_SIZE_OFFSET);
byte[] output = new byte[headerBuffer.length + compressedSize];
System.arraycopy(headerBuffer, 0, output, 0, headerBuffer.length);
ByteStreams.readFully(input, output, headerBuffer.length, compressedSize);
return Slices.wrappedBuffer(output);
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Addressed.
@pettyjamesm : yes The change in |
@linzebing What's the total number of copies now? This PR requires benchmarks to be run to check impact. |
@sopel39 : for Benchmark result indicates that this is a net efficiency win. |
25e5d98
to
096fb7a
Compare
core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeUtil.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeUtil.java
Outdated
Show resolved
Hide resolved
021db85
to
4ea01b1
Compare
|
||
int compressedSize = getIntUnchecked(headerSlice, COMPRESSED_SIZE_OFFSET); | ||
byte[] outputBuffer = new byte[SERIALIZED_PAGE_HEADER_SIZE + compressedSize]; | ||
arraycopy(headerSlice.byteArray(), 0, outputBuffer, 0, SERIALIZED_PAGE_HEADER_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: since we’re using headerSlice.byteArray() here but the Slice is created externally, it would be safer to use headerSlice.byteArrayOffset() instead of assuming that the offset is 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I wonder if I should just use headerSlice.getBytes(0, outputBuffer, 0, SERIALIZED_PAGE_HEADER_SIZE);
. Do you think this makes any difference in performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't matter much here, but I suspect System.arraycopy
would have a slight edge over Slice#getBytes
because of the way that Slice
uses Unsafe.copyMemory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find https://groups.google.com/g/mechanical-sympathy/c/sug91A1ynF4 saying that Unsafe.copyMemory
is slight faster than System.arraycopy
. So using headerSlice.getBytes(0, outputBuffer, 0, SERIALIZED_PAGE_HEADER_SIZE);
instead.
core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeUtil.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/HttpPageBufferClient.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeUtil.java
Show resolved
Hide resolved
030cfb5
to
bd30233
Compare
core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerde.java
Outdated
Show resolved
Hide resolved
context.close(); // Release context buffers | ||
return endOfData(); | ||
try { | ||
int read = ByteStreams.read(inputStream, headerBuffer, 0, headerBuffer.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this potentially generate a lot of OS sys calls when pages are small?
We now have a read for page size + a read for page data. If pages are tiny (e.g. because there is a lot of nodes) this increase number of OS IO calls a lot (which are slow)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. Netty inputstreams have internal buffering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. Netty inputstreams have internal buffering.
I don't think it's true though for unspilled data. However, page size should still be bigger than SliceInput
buffer
bd30233
to
525bd37
Compare
context.close(); // Release context buffers | ||
return endOfData(); | ||
try { | ||
int read = ByteStreams.read(inputStream, headerBuffer, 0, headerBuffer.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. Netty inputstreams have internal buffering.
I don't think it's true though for unspilled data. However, page size should still be bigger than SliceInput
buffer
@@ -168,7 +167,7 @@ private void writePages(Iterator<Page> pageIterator) | |||
|
|||
try { | |||
InputStream input = closer.register(targetFile.newInputStream()); | |||
Iterator<Page> pages = PagesSerdeUtil.readPages(serde, new InputStreamSliceInput(input, BUFFER_SIZE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here reads
still go to OS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of reads shouldn't increase as long as the average page size is above 8kb
. Is this generally the case for spilling?
Description
This PR changes VariableWidthBlock to be backed by newly allocated Slices instead of returning a view into a page:
VariableWidthBlockEncoding
is the only block encoding that creates direct references to the input slice instead of copying data into newly allocated memory. It's better to make things consistent.VariableWidthBlockEncoding
retaining references, it caused memory leaks.This however, will have a small performance cost. I did another optimization to offset the cost --- replacing
InputStreamSliceInput
withLittleEndianDataInputStream
inHttpPageBufferClient
, such that we avoid memory copy costs inInputStreamSliceInput
's internal buffer.It turns out that these two changes combines resulted in a net win for efficiency, both in terms of CPU and latency (benchmark on
![image](https://user-images.githubusercontent.com/9404831/157388361-f4adb08f-588a-4a6e-ae6c-a6a2e34d19fe.png)
![image](https://user-images.githubusercontent.com/9404831/157560592-c489fa6e-2520-485e-afad-e1d51acd299c.png)
hive_sf1000_parquet_part
):(benchmark on
hive_sf1000_parquet_unpart
):Core query engine.
N/A
Related issues, pull requests, and links
Blocks #11174
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
() No release notes entries required.
(x) Release notes entries required with the following suggested text: