Skip to content

Commit

Permalink
Add MemoryOrcDataSource
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Sep 12, 2020
1 parent 4744925 commit 794b5e7
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,13 @@ private void testPageSource(boolean useCache)
FileFormatDataSourceStats stats = new FileFormatDataSourceStats();
ConnectorPageSource pageSource = testPreparer.newPageSource(stats, useCache ? CACHED_SESSION : UNCACHED_SESSION);

assertEquals(pageSource.getSystemMemoryUsage(), 0);
if (useCache) {
// file is fully cached
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), testPreparer.getFileSize(), testPreparer.getFileSize() + 200);
}
else {
assertEquals(pageSource.getSystemMemoryUsage(), 0);
}

long memoryUsage = -1;
int totalRows = 0;
Expand All @@ -207,15 +213,20 @@ private void testPageSource(boolean useCache)
if (memoryUsage == -1) {
// Memory usage before lazy-loading the block
if (useCache) {
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), 180_000L, 189_999L);
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), testPreparer.getFileSize(), testPreparer.getFileSize() + 2000);
}
else {
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), 0L, 1000L);
}
createUnboundedVarcharType().getSlice(block, block.getPositionCount() - 1); // trigger loading for lazy block
memoryUsage = pageSource.getSystemMemoryUsage();
// Memory usage after lazy-loading the actual block
assertBetweenInclusive(memoryUsage, 460_000L, 469_999L);
if (useCache) {
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), testPreparer.getFileSize() + 270_000, testPreparer.getFileSize() + 280_000);
}
else {
assertBetweenInclusive(memoryUsage, 460_000L, 469_999L);
}
}
else {
assertEquals(pageSource.getSystemMemoryUsage(), memoryUsage);
Expand All @@ -235,15 +246,20 @@ private void testPageSource(boolean useCache)
if (memoryUsage == -1) {
// Memory usage before lazy-loading the block
if (useCache) {
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), 180_000L, 189_999L);
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), testPreparer.getFileSize(), testPreparer.getFileSize() + 2000);
}
else {
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), 0L, 1000L);
}
createUnboundedVarcharType().getSlice(block, block.getPositionCount() - 1); // trigger loading for lazy block
memoryUsage = pageSource.getSystemMemoryUsage();
// Memory usage after lazy-loading the actual block
assertBetweenInclusive(memoryUsage, 460000L, 469999L);
if (useCache) {
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), testPreparer.getFileSize() + 270_000, testPreparer.getFileSize() + 280_000);
}
else {
assertBetweenInclusive(memoryUsage, 460_000L, 469_999L);
}
}
else {
assertEquals(pageSource.getSystemMemoryUsage(), memoryUsage);
Expand All @@ -263,15 +279,20 @@ private void testPageSource(boolean useCache)
if (memoryUsage == -1) {
// Memory usage before lazy-loading the block
if (useCache) {
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), 90_000L, 99_999L);
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), testPreparer.getFileSize(), testPreparer.getFileSize() + 2000);
}
else {
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), 0L, 1000L);
}
createUnboundedVarcharType().getSlice(block, block.getPositionCount() - 1); // trigger loading for lazy block
memoryUsage = pageSource.getSystemMemoryUsage();
// Memory usage after loading the actual block
assertBetweenInclusive(memoryUsage, 360_000L, 369_999L);
if (useCache) {
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), testPreparer.getFileSize() + 260_000, testPreparer.getFileSize() + 270_000);
}
else {
assertBetweenInclusive(memoryUsage, 360_000L, 369_999L);
}
}
else {
assertEquals(pageSource.getSystemMemoryUsage(), memoryUsage);
Expand All @@ -284,7 +305,13 @@ private void testPageSource(boolean useCache)
assertFalse(pageSource.isFinished());
assertNull(pageSource.getNextPage());
assertTrue(pageSource.isFinished());
assertEquals(pageSource.getSystemMemoryUsage(), 0);
if (useCache) {
// file is fully cached
assertBetweenInclusive(pageSource.getSystemMemoryUsage(), testPreparer.getFileSize(), testPreparer.getFileSize() + 200);
}
else {
assertEquals(pageSource.getSystemMemoryUsage(), 0);
}
pageSource.close();
}

Expand Down Expand Up @@ -435,7 +462,11 @@ public void testScanFilterAndProjectOperator()
assertFalse(operator.isFinished());
Page page = operator.getOutput();
assertNotNull(page);
assertBetweenInclusive(driverContext.getSystemMemoryUsage(), 90_000L, 499_999L);

// memory usage varies depending on stripe alignment
long memoryUsage = driverContext.getSystemMemoryUsage();
assertTrue(memoryUsage < 1000 || (memoryUsage > 350_000L && memoryUsage < 465_000L));

totalRows += page.getPositionCount();
}

Expand Down Expand Up @@ -506,14 +537,14 @@ public TestPreparer(String tempFilePath, List<TestColumn> testColumns, int numRo
fileSplit = createTestFile(tempFilePath, serde, null, testColumns, numRows, stripeRows);
}

public ConnectorPageSource newPageSource()
public long getFileSize()
{
return newPageSource(new FileFormatDataSourceStats(), SESSION);
return fileSplit.getLength();
}

public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats)
public ConnectorPageSource newPageSource()
{
return newPageSource(stats, SESSION);
return newPageSource(new FileFormatDataSourceStats(), UNCACHED_SESSION);
}

public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, ConnectorSession session)
Expand Down
102 changes: 102 additions & 0 deletions presto-orc/src/main/java/io/prestosql/orc/MemoryOrcDataSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.orc;

import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.prestosql.orc.stream.MemoryOrcDataReader;
import io.prestosql.orc.stream.OrcDataReader;

import java.util.Map;
import java.util.Map.Entry;

import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

public class MemoryOrcDataSource
implements OrcDataSource
{
private final OrcDataSourceId id;
private final Slice data;
private long readBytes;

public MemoryOrcDataSource(OrcDataSourceId id, Slice data)
{
this.id = requireNonNull(id, "id is null");
this.data = requireNonNull(data, "data is null");
}

@Override
public OrcDataSourceId getId()
{
return id;
}

@Override
public long getReadBytes()
{
return readBytes;
}

@Override
public long getReadTimeNanos()
{
return 0;
}

@Override
public final long getSize()
{
return data.length();
}

@Override
public long getRetainedSize()
{
return data.getRetainedSize();
}

@Override
public final Slice readFully(long position, int length)
{
readBytes += length;
return data.slice(toIntExact(position), length);
}

@Override
public final <K> Map<K, OrcDataReader> readFully(Map<K, DiskRange> diskRanges)
{
requireNonNull(diskRanges, "diskRanges is null");

if (diskRanges.isEmpty()) {
return ImmutableMap.of();
}

ImmutableMap.Builder<K, OrcDataReader> slices = ImmutableMap.builder();
for (Entry<K, DiskRange> entry : diskRanges.entrySet()) {
DiskRange diskRange = entry.getValue();
Slice slice = readFully(diskRange.getOffset(), diskRange.getLength());
// retained memory is reported by this data source, so it should not be declared in the reader
slices.put(entry.getKey(), new MemoryOrcDataReader(id, slice, 0));
}

return slices.build();
}

@Override
public final String toString()
{
return id.toString();
}
}
8 changes: 5 additions & 3 deletions presto-orc/src/main/java/io/prestosql/orc/OrcReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,17 @@ public OrcRecordReader createRecordReader(
}

private static OrcDataSource wrapWithCacheIfTiny(OrcDataSource dataSource, DataSize maxCacheSize)
throws IOException
{
if (dataSource instanceof CachingOrcDataSource) {
if (dataSource instanceof MemoryOrcDataSource || dataSource instanceof CachingOrcDataSource) {
return dataSource;
}
if (dataSource.getSize() > maxCacheSize.toBytes()) {
return dataSource;
}
DiskRange diskRange = new DiskRange(0, toIntExact(dataSource.getSize()));
return new CachingOrcDataSource(dataSource, desiredOffset -> diskRange);
Slice data = dataSource.readFully(0, toIntExact(dataSource.getSize()));
dataSource.close();
return new MemoryOrcDataSource(dataSource.getId(), data);
}

private static OrcColumn createOrcColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private static boolean isStripeIncluded(
@VisibleForTesting
static OrcDataSource wrapWithCacheIfTinyStripes(OrcDataSource dataSource, List<StripeInformation> stripes, DataSize maxMergeDistance, DataSize tinyStripeThreshold)
{
if (dataSource instanceof CachingOrcDataSource) {
if (dataSource instanceof MemoryOrcDataSource || dataSource instanceof CachingOrcDataSource) {
return dataSource;
}
for (StripeInformation stripe : stripes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.prestosql.orc;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slices;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.type.DecimalType;
Expand Down Expand Up @@ -67,7 +68,7 @@
import static io.prestosql.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static java.lang.Math.toIntExact;
import static java.nio.file.Files.readAllBytes;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.joda.time.DateTimeZone.UTC;
Expand Down Expand Up @@ -340,10 +341,7 @@ public void setup(Type type)
orcFile = new File(temporaryDirectory, randomUUID().toString());
writeOrcColumnPresto(orcFile, NONE, type, createValues(), new OrcWriterStats());

OrcDataSource dataSource = new FileOrcDataSource(orcFile, new OrcReaderOptions());
DiskRange diskRange = new DiskRange(0, toIntExact(dataSource.getSize()));
dataSource = new CachingOrcDataSource(dataSource, desiredOffset -> diskRange);
this.dataSource = dataSource;
dataSource = new MemoryOrcDataSource(new OrcDataSourceId(orcFile.getPath()), Slices.wrappedBuffer(readAllBytes(orcFile.toPath())));
}

@TearDown
Expand Down
22 changes: 2 additions & 20 deletions presto-orc/src/test/java/io/prestosql/orc/TestOrcLz4.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.prestosql.orc;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
Expand All @@ -28,7 +29,6 @@
import static io.prestosql.orc.metadata.CompressionKind.LZ4;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static java.lang.Math.toIntExact;
import static org.testng.Assert.assertEquals;

public class TestOrcLz4
Expand All @@ -43,7 +43,7 @@ public void testReadLz4()
// TODO: use Apache ORC library in OrcTester
byte[] data = toByteArray(getResource("apache-lz4.orc"));

OrcReader orcReader = new OrcReader(new InMemoryOrcDataSource(data), new OrcReaderOptions());
OrcReader orcReader = new OrcReader(new MemoryOrcDataSource(new OrcDataSourceId("memory"), Slices.wrappedBuffer(data)), new OrcReaderOptions());

assertEquals(orcReader.getCompressionKind(), LZ4);
assertEquals(orcReader.getFooter().getNumberOfRows(), 10_000);
Expand Down Expand Up @@ -79,22 +79,4 @@ public void testReadLz4()

assertEquals(rows, reader.getFileRowCount());
}

private static class InMemoryOrcDataSource
extends AbstractOrcDataSource
{
private final byte[] data;

public InMemoryOrcDataSource(byte[] data)
{
super(new OrcDataSourceId("memory"), data.length, new OrcReaderOptions());
this.data = data;
}

@Override
protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength)
{
System.arraycopy(data, toIntExact(position), buffer, bufferOffset, bufferLength);
}
}
}
3 changes: 3 additions & 0 deletions presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ public void testWriteOutputStreamsInOrder()
OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), READER_OPTIONS);
Footer footer = new OrcReader(orcDataSource, READER_OPTIONS).getFooter();

// OrcReader closes the original data source because it buffers the full file, so we need to reopen
orcDataSource = new FileOrcDataSource(tempFile.getFile(), READER_OPTIONS);

for (StripeInformation stripe : footer.getStripes()) {
// read the footer
Slice tailBuffer = orcDataSource.readFully(stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength(), toIntExact(stripe.getFooterLength()));
Expand Down

0 comments on commit 794b5e7

Please sign in to comment.