Skip to content

Commit

Permalink
Parquet: Lazy dictionary loading, shared page caches, and better file…
Browse files Browse the repository at this point in the history
… channel cache behavior (#1130)

* Lazy-loading and soft-reference caching at every level for Parquet dictionaries.
* Extract PageCache from ColumnChunkPageStore and create them at the ParquetColumnLocation level.
* Do a better job of exploiting our channel cache.
  • Loading branch information
rcaudy committed Aug 26, 2021
1 parent d3d6aa9 commit 7b2eacf
Show file tree
Hide file tree
Showing 21 changed files with 802 additions and 541 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.deephaven.db.v2.locations.parquet;

import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
import io.deephaven.db.v2.locations.parquet.topage.ToPage;
import io.deephaven.db.v2.sources.Releasable;
import io.deephaven.db.v2.sources.chunk.Attributes.Any;
Expand All @@ -14,83 +13,68 @@
import io.deephaven.parquet.ColumnChunkReader;
import io.deephaven.parquet.ColumnPageReader;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.datastructures.intrusive.IntrusiveSoftLRU;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.WeakReference;
import java.util.function.Supplier;

public abstract class ColumnChunkPageStore<ATTR extends Any>
implements PageStore<ATTR, ATTR, ChunkPage<ATTR>>, Page<ATTR>, SafeCloseable, Releasable {

private static final int CACHE_SIZE =
Configuration.getInstance().getIntegerWithDefault("ColumnChunkPageStore.cacheSize", 1 << 13);
private static final WeakReference<?> NULL_PAGE = new WeakReference<>(null);

protected final PageCache<ATTR> pageCache;
private final ColumnChunkReader columnChunkReader;
private final long mask;
private final ToPage<ATTR, ?> toPage;

private final long size;
final ColumnChunkReader.ColumnPageReaderIterator columnPageReaderIterator;

final IntrusiveSoftLRU<IntrusivePage<ATTR>> intrusiveSoftLRU =
new IntrusiveSoftLRU<>(IntrusiveSoftLRU.Node.Adapter.<IntrusivePage<ATTR>>getInstance(), CACHE_SIZE);

static <ATTR extends Any> WeakReference<IntrusivePage<ATTR>> getNullPage() {
// noinspection unchecked
return (WeakReference<IntrusivePage<ATTR>>) NULL_PAGE;
}

static class IntrusivePage<ATTR extends Any> extends IntrusiveSoftLRU.Node.Impl<IntrusivePage<ATTR>> {

private final ChunkPage<ATTR> page;

IntrusivePage(ChunkPage<ATTR> page) {
this.page = page;
}

ChunkPage<ATTR> getPage() {
return page;
}
}

public static class CreatorResult<ATTR extends Any> {

public final ColumnChunkPageStore<ATTR> pageStore;
public final Chunk<ATTR> dictionary;
public final Supplier<Chunk<ATTR>> dictionaryChunkSupplier;
public final ColumnChunkPageStore<DictionaryKeys> dictionaryKeysPageStore;

private CreatorResult(@NotNull final ColumnChunkPageStore<ATTR> pageStore,
final Chunk<ATTR> dictionary,
final Supplier<Chunk<ATTR>> dictionaryChunkSupplier,
final ColumnChunkPageStore<DictionaryKeys> dictionaryKeysPageStore) {
this.pageStore = pageStore;
this.dictionary = dictionary;
this.dictionaryChunkSupplier = dictionaryChunkSupplier;
this.dictionaryKeysPageStore = dictionaryKeysPageStore;
}
}

public static <ATTR extends Any> CreatorResult<ATTR> create(@NotNull final ColumnChunkReader columnChunkReader,
public static <ATTR extends Any> CreatorResult<ATTR> create(
@NotNull final PageCache<ATTR> pageCache,
@NotNull final ColumnChunkReader columnChunkReader,
final long mask,
@NotNull final ToPage<ATTR, ?> toPage) throws IOException {
final boolean fixedSizePages = columnChunkReader.getPageFixedSize() >= 1;
final ColumnChunkPageStore<ATTR> columnChunkPageStore = fixedSizePages
? new FixedPageSizeColumnChunkPageStore<>(columnChunkReader, mask, toPage)
: new VariablePageSizeColumnChunkPageStore<>(columnChunkReader, mask, toPage);
final ToPage<DictionaryKeys, long[]> dictionaryKeysToPage = toPage.getDictionaryKeysToPage();
final ColumnChunkPageStore<DictionaryKeys> dictionaryKeysColumnChunkPageStore = dictionaryKeysToPage == null
? null
: fixedSizePages
? new FixedPageSizeColumnChunkPageStore<>(columnChunkReader, mask, dictionaryKeysToPage)
: new VariablePageSizeColumnChunkPageStore<>(columnChunkReader, mask, dictionaryKeysToPage);
return new CreatorResult<>(columnChunkPageStore, toPage.getDictionary(), dictionaryKeysColumnChunkPageStore);
? new FixedPageSizeColumnChunkPageStore<>(pageCache, columnChunkReader, mask, toPage)
: new VariablePageSizeColumnChunkPageStore<>(pageCache, columnChunkReader, mask, toPage);
final ToPage<DictionaryKeys, long[]> dictionaryKeysToPage =
toPage.getDictionaryKeysToPage();
final ColumnChunkPageStore<DictionaryKeys> dictionaryKeysColumnChunkPageStore =
dictionaryKeysToPage == null ? null
: fixedSizePages
? new FixedPageSizeColumnChunkPageStore<>(pageCache.castAttr(), columnChunkReader, mask,
dictionaryKeysToPage)
: new VariablePageSizeColumnChunkPageStore<>(pageCache.castAttr(), columnChunkReader,
mask,
dictionaryKeysToPage);
return new CreatorResult<>(columnChunkPageStore, toPage::getDictionaryChunk,
dictionaryKeysColumnChunkPageStore);
}

ColumnChunkPageStore(@NotNull final ColumnChunkReader columnChunkReader, final long mask,
ColumnChunkPageStore(@NotNull final PageCache<ATTR> pageCache,
@NotNull final ColumnChunkReader columnChunkReader,
final long mask,
final ToPage<ATTR, ?> toPage) throws IOException {
Require.requirement(((mask + 1) & mask) == 0, "mask is one less than a power of two");

this.pageCache = pageCache;
this.columnChunkReader = columnChunkReader;
this.mask = mask;
this.toPage = toPage;
Expand All @@ -99,7 +83,8 @@ public static <ATTR extends Any> CreatorResult<ATTR> create(@NotNull final Colum
this.columnPageReaderIterator = columnChunkReader.getPageIterator();
}

ChunkPage<ATTR> toPage(final long offset, @NotNull final ColumnPageReader columnPageReader) throws IOException {
ChunkPage<ATTR> toPage(final long offset, @NotNull final ColumnPageReader columnPageReader)
throws IOException {
return toPage.toPage(offset, columnPageReader, mask);
}

Expand Down Expand Up @@ -140,17 +125,10 @@ public boolean usesDictionaryOnEveryPage() {

@Override
public void close() {
intrusiveSoftLRU.clear();
try {
columnPageReaderIterator.close();
} catch (IOException except) {
throw new UncheckedIOException(except);
}
}

@Override
public void releaseCachedResources() {
Releasable.super.releaseCachedResources();
intrusiveSoftLRU.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ class FixedPageSizeColumnChunkPageStore<ATTR extends Any> extends ColumnChunkPag
private final int pageFixedSize;
private volatile int numPages = 0;
private final ColumnPageReader[] columnPageReaders;
private final WeakReference<IntrusivePage<ATTR>>[] pages;
private final WeakReference<PageCache.IntrusivePage<ATTR>>[] pages;

FixedPageSizeColumnChunkPageStore(@NotNull final ColumnChunkReader columnChunkReader, final long mask,
FixedPageSizeColumnChunkPageStore(@NotNull final PageCache<ATTR> pageCache,
@NotNull final ColumnChunkReader columnChunkReader,
final long mask,
@NotNull final ToPage<ATTR, ?> toPage) throws IOException {
super(columnChunkReader, mask, toPage);
super(pageCache, columnChunkReader, mask, toPage);

this.pageFixedSize = columnChunkReader.getPageFixedSize();

Expand All @@ -33,16 +35,17 @@ class FixedPageSizeColumnChunkPageStore<ATTR extends Any> extends ColumnChunkPag
this.columnPageReaders = new ColumnPageReader[numPages];

// noinspection unchecked
this.pages = (WeakReference<IntrusivePage<ATTR>>[]) new WeakReference[numPages];
Arrays.fill(pages, getNullPage());
this.pages = (WeakReference<PageCache.IntrusivePage<ATTR>>[]) new WeakReference[numPages];
Arrays.fill(pages, PageCache.getNullPage());
}

private void fillToPage(final int pageNum) {

while (numPages <= pageNum) {
synchronized (this) {
if (numPages <= pageNum) {
Assert.assertion(columnPageReaderIterator.hasNext(), "columnPageReaderIterator.hasNext()",
Assert.assertion(columnPageReaderIterator.hasNext(),
"columnPageReaderIterator.hasNext()",
"Parquet fixed page size and page iterator don't match, not enough pages.");
columnPageReaders[numPages++] = columnPageReaderIterator.next();
}
Expand All @@ -51,15 +54,16 @@ private void fillToPage(final int pageNum) {
}

private ChunkPage<ATTR> getPage(final int pageNum) {
IntrusivePage<ATTR> page = pages[pageNum].get();
PageCache.IntrusivePage<ATTR> page = pages[pageNum].get();

if (page == null) {
synchronized (columnPageReaders[pageNum]) {
page = pages[pageNum].get();

if (page == null) {
try {
page = new IntrusivePage<>(toPage((long) pageNum * pageFixedSize, columnPageReaders[pageNum]));
page = new PageCache.IntrusivePage<>(
toPage((long) pageNum * pageFixedSize, columnPageReaders[pageNum]));
} catch (IOException except) {
throw new UncheckedIOException(except);
}
Expand All @@ -69,12 +73,13 @@ private ChunkPage<ATTR> getPage(final int pageNum) {
}
}

intrusiveSoftLRU.touch(page);
pageCache.touch(page);
return page.getPage();
}

@Override
public @NotNull ChunkPage<ATTR> getPageContaining(FillContext fillContext, final long elementIndex) {
public @NotNull ChunkPage<ATTR> getPageContaining(FillContext fillContext,
final long elementIndex) {
final long row = elementIndex & mask();
Require.inRange(row, "row", size(), "numRows");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.deephaven.db.v2.locations.parquet;

import io.deephaven.db.v2.sources.chunk.Attributes.Any;
import io.deephaven.db.v2.sources.chunk.page.ChunkPage;
import io.deephaven.util.datastructures.intrusive.IntrusiveSoftLRU;

import java.lang.ref.WeakReference;

/**
* Page cache data structure.
*/
public class PageCache<ATTR extends Any> extends IntrusiveSoftLRU<PageCache.IntrusivePage<ATTR>> {

/**
* Sentinel reference for a null page
*/
private static final WeakReference<?> NULL_PAGE = new WeakReference<>(null);

/**
* @return The null page sentinel
*/
public static <ATTR extends Any> WeakReference<IntrusivePage<ATTR>> getNullPage() {
// noinspection unchecked
return (WeakReference<IntrusivePage<ATTR>>) NULL_PAGE;
}

/**
* Intrusive data structure for page caching.
*/
public static class IntrusivePage<ATTR extends Any> extends IntrusiveSoftLRU.Node.Impl<IntrusivePage<ATTR>> {

private final ChunkPage<ATTR> page;

public IntrusivePage(ChunkPage<ATTR> page) {
this.page = page;
}

public ChunkPage<ATTR> getPage() {
return page;
}
}

public <ATTR2 extends Any> PageCache<ATTR2> castAttr() {
// noinspection unchecked
return (PageCache<ATTR2>) this;
}

public PageCache(final int maxSize) {
super(IntrusiveSoftLRU.Node.Adapter.getInstance(), maxSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,43 @@

class VariablePageSizeColumnChunkPageStore<ATTR extends Any> extends ColumnChunkPageStore<ATTR> {

// We will set numPages after changing all of these arrays in place and/or setting additional elements to the
// end of the array. Thus, for i < numPages, array[i] will always have the same value, and be valid to use, as
// long as we fetch numPages before accessing the arrays. This is the thread-safe pattern used throughout.
// We will set numPages after changing all of these arrays in place and/or setting additional
// elements to the
// end of the array. Thus, for i < numPages, array[i] will always have the same value, and be
// valid to use, as
// long as we fetch numPages before accessing the arrays. This is the thread-safe pattern used
// throughout.

private volatile int numPages = 0;
private volatile long[] pageRowOffsets;
private volatile ColumnPageReader[] columnPageReaders;
private volatile WeakReference<IntrusivePage<ATTR>>[] pages;
private volatile WeakReference<PageCache.IntrusivePage<ATTR>>[] pages;

VariablePageSizeColumnChunkPageStore(@NotNull final ColumnChunkReader columnChunkReader, final long mask,
VariablePageSizeColumnChunkPageStore(@NotNull final PageCache<ATTR> pageCache,
@NotNull final ColumnChunkReader columnChunkReader,
final long mask,
@NotNull final ToPage<ATTR, ?> toPage) throws IOException {
super(columnChunkReader, mask, toPage);
super(pageCache, columnChunkReader, mask, toPage);

final int INIT_ARRAY_SIZE = 15;
pageRowOffsets = new long[INIT_ARRAY_SIZE + 1];
pageRowOffsets[0] = 0;
columnPageReaders = new ColumnPageReader[INIT_ARRAY_SIZE];

// noinspection unchecked
pages = (WeakReference<IntrusivePage<ATTR>>[]) new WeakReference[INIT_ARRAY_SIZE];
pages = (WeakReference<PageCache.IntrusivePage<ATTR>>[]) new WeakReference[INIT_ARRAY_SIZE];
}

private void extendOnePage(final int prevNumPages) {
IntrusivePage<ATTR> page = null;
PageCache.IntrusivePage<ATTR> page = null;

synchronized (this) {
int localNumPages = numPages;

// Make sure that no one has has already extended to this page yet.
if (localNumPages == prevNumPages) {
Assert.assertion(columnPageReaderIterator.hasNext(), "columnPageReaderIterator.hasNext()",
Assert.assertion(columnPageReaderIterator.hasNext(),
"columnPageReaderIterator.hasNext()",
"Parquet num rows and page iterator don't match, not enough pages.");

if (columnPageReaders.length == localNumPages) {
Expand All @@ -60,14 +66,14 @@ private void extendOnePage(final int prevNumPages) {
final ColumnPageReader columnPageReader = columnPageReaderIterator.next();

long numRows;
WeakReference<IntrusivePage<ATTR>> pageRef = getNullPage();
WeakReference<PageCache.IntrusivePage<ATTR>> pageRef = PageCache.getNullPage();
long prevRowOffset = pageRowOffsets[localNumPages];

try {
numRows = columnPageReader.numRows();

if (numRows < 0) {
page = new IntrusivePage<>(toPage(prevRowOffset, columnPageReader));
page = new PageCache.IntrusivePage<>(toPage(prevRowOffset, columnPageReader));
pageRef = new WeakReference<>(page);
numRows = page.getPage().size();
}
Expand All @@ -83,7 +89,7 @@ private void extendOnePage(final int prevNumPages) {
}

if (page != null) {
intrusiveSoftLRU.touch(page);
pageCache.touch(page);
}
}

Expand All @@ -100,7 +106,7 @@ private int fillToRow(int minPageNum, long row) {
}

private ChunkPage<ATTR> getPage(final int pageNum) {
IntrusivePage<ATTR> page = pages[pageNum].get();
PageCache.IntrusivePage<ATTR> page = pages[pageNum].get();

if (page == null) {
synchronized (columnPageReaders[pageNum]) {
Expand All @@ -109,7 +115,8 @@ private ChunkPage<ATTR> getPage(final int pageNum) {

if (page == null) {
try {
page = new IntrusivePage<>(toPage(pageRowOffsets[pageNum], columnPageReaders[pageNum]));
page = new PageCache.IntrusivePage<>(
toPage(pageRowOffsets[pageNum], columnPageReaders[pageNum]));
} catch (IOException except) {
throw new UncheckedIOException(except);
}
Expand All @@ -121,7 +128,7 @@ private ChunkPage<ATTR> getPage(final int pageNum) {
}
}

intrusiveSoftLRU.touch(page);
pageCache.touch(page);
return page.getPage();
}

Expand Down
Loading

0 comments on commit 7b2eacf

Please sign in to comment.