Skip to content

Commit

Permalink
Added a soft-reference based shared cache for S3 reads (#5357)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored and stanbrub committed May 17, 2024
1 parent db65c7c commit 61a11f4
Show file tree
Hide file tree
Showing 16 changed files with 802 additions and 610 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,29 @@
*/
public class CleanupReferenceProcessor {

@NotNull
public static CleanupReferenceProcessor getDefault() {
return Instance.DEFAULT.cleanupReferenceProcessor;
}

private enum Instance {
// @formatter:off
DEFAULT(new CleanupReferenceProcessor("default", 1000,
(l, r, e) -> l.warn()
.append(Thread.currentThread().getName())
.append(": Exception thrown from cleanup of ").append(Utils.REFERENT_FORMATTER, r)
.append(": ").append(e)
.endl())
);
// @formatter:on

private final CleanupReferenceProcessor cleanupReferenceProcessor;

Instance(@NotNull final CleanupReferenceProcessor cleanupReferenceProcessor) {
this.cleanupReferenceProcessor = Require.neqNull(cleanupReferenceProcessor, "cleanupReferenceProcessor");
}
}

private static final Logger log = LoggerFactory.getLogger(CleanupReferenceProcessor.class);

private static final boolean LOG_CLEANED_REFERENCES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public interface MemoizableOperation<T extends DynamicNode & NotificationStepRec
Configuration.getInstance().getIntegerWithDefault("QueryTable.parallelWhereSegments", -1);

/**
* You can chose to enable or disable the column parallel select and update.
* You can choose to enable or disable the column parallel select and update.
*/
static boolean ENABLE_PARALLEL_SELECT_AND_UPDATE =
Configuration.getInstance().getBooleanWithDefault("QueryTable.enableParallelSelectAndUpdate", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.engine.util.reference;

import io.deephaven.base.verify.Require;
import io.deephaven.util.Utils;
import io.deephaven.util.annotations.TestUseOnly;
import io.deephaven.util.reference.CleanupReferenceProcessor;
import org.jetbrains.annotations.NotNull;
Expand All @@ -17,13 +16,6 @@
public enum CleanupReferenceProcessorInstance {

// @formatter:off
DEFAULT(new CleanupReferenceProcessor("default", 1000,
(l, r, e) -> l.warn()
.append(Thread.currentThread().getName())
.append(": Exception thrown from cleanup of ").append(Utils.REFERENT_FORMATTER, r)
.append(": ").append(e)
.endl())
),
LIVENESS(new CleanupReferenceProcessor("liveness", 1000,
(l, r, e) -> {
if (e instanceof RuntimeException) {
Expand All @@ -50,5 +42,6 @@ public static void resetAllForUnitTests() {
final CleanupReferenceProcessorInstance instance : values()) {
instance.cleanupReferenceProcessor.resetForUnitTests();
}
CleanupReferenceProcessor.getDefault().resetForUnitTests();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
import io.deephaven.base.ArrayUtil;
import io.deephaven.base.reference.WeakCleanupReference;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.ResettableContext;
import io.deephaven.engine.util.file.FileHandle;
import io.deephaven.engine.util.file.TrackedFileHandleFactory;
import io.deephaven.engine.util.reference.CleanupReferenceProcessorInstance;
import io.deephaven.extensions.arrow.sources.ArrowByteColumnSource;
import io.deephaven.extensions.arrow.sources.ArrowCharColumnSource;
import io.deephaven.extensions.arrow.sources.ArrowInstantColumnSource;
Expand Down Expand Up @@ -55,6 +53,7 @@
import io.deephaven.util.annotations.TestUseOnly;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.util.datastructures.SizeException;
import io.deephaven.util.reference.CleanupReferenceProcessor;
import org.apache.arrow.compression.CommonsCompressionFactory;
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.RecordBatch;
Expand Down Expand Up @@ -441,7 +440,7 @@ private static final class ReaderCleanup extends WeakCleanupReference<Shareable>
private final ArrowFileReader reader;

private ReaderCleanup(final Shareable shareable) {
super(shareable, CleanupReferenceProcessorInstance.DEFAULT.getReferenceQueue());
super(shareable, CleanupReferenceProcessor.getDefault().getReferenceQueue());
this.reader = shareable.getReader();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private ParquetFileReader(
rootURI = parquetFileURI;
}
try (
final SeekableChannelContext context = channelsProvider.makeContext();
final SeekableChannelContext context = channelsProvider.makeSingleUseContext();
final SeekableByteChannel ch = channelsProvider.getReadChannel(context, parquetFileURI)) {
positionToFileMetadata(parquetFileURI, ch);
try (final InputStream in = channelsProvider.getInputStream(ch)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static DeephavenCompressorAdapterFactory createInstance() {
// Use the Parquet LZ4_RAW codec, which internally uses aircompressor
Lz4RawCodec.class, CompressionCodecName.LZ4_RAW,

// The rest of these are aircompressor codecs which have fast / pure java implementations
// The rest of these are aircompressor codecs that have fast / pure java implementations
JdkGzipCodec.class, CompressionCodecName.GZIP,
LzoCodec.class, CompressionCodecName.LZO,
Lz4Codec.class, CompressionCodecName.LZ4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.file.TrackedFileHandleFactory;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.parquet.base.InvalidParquetFileException;
import io.deephaven.parquet.base.NullStatistics;
import io.deephaven.parquet.table.location.ParquetTableLocation;
Expand Down Expand Up @@ -76,7 +74,6 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
Expand Down Expand Up @@ -129,11 +126,6 @@ public final class ParquetTableReadWriteTest {
private static final ParquetInstructions EMPTY = ParquetInstructions.EMPTY;
private static final ParquetInstructions REFRESHING = ParquetInstructions.builder().setIsRefreshing(true).build();

// TODO(deephaven-core#5064): Add support for local S3 testing
// The following tests are disabled by default, as they are verifying against a remote system
private static final boolean ENABLE_S3_TESTING =
Configuration.getInstance().getBooleanWithDefault("ParquetTest.enableS3Testing", false);

private static File rootFile;

@Rule
Expand Down Expand Up @@ -1477,184 +1469,6 @@ public void testArrayColumns() {
&& !firstColumnMetadata.contains("RLE_DICTIONARY"));
}

@Test
public void readSampleParquetFilesFromDeephavenS3Bucket() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.build();
final Table fromAws1 =
ParquetTools.readTable("s3://dh-s3-parquet-test1/multiColFile.parquet", readInstructions).select();
final Table dhTable1 = TableTools.emptyTable(1_000_000).update("A=(int)i", "B=(double)(i+1)");
assertTableEquals(fromAws1, dhTable1);

final Table fromAws2 =
ParquetTools.readTable("s3://dh-s3-parquet-test1/singleColFile.parquet", readInstructions).select();
final Table dhTable2 = TableTools.emptyTable(5).update("A=(int)i");
assertTableEquals(fromAws2, dhTable2);

final Table fromAws3 = ParquetTools
.readTable("s3://dh-s3-parquet-test1/single%20col%20file%20with%20spaces%20in%20name.parquet",
readInstructions)
.select();
assertTableEquals(fromAws3, dhTable2);

final Table fromAws4 =
ParquetTools.readTable("s3://dh-s3-parquet-test1/singleColFile.parquet", readInstructions)
.select().sumBy();
final Table dhTable4 = TableTools.emptyTable(5).update("A=(int)i").sumBy();
assertTableEquals(fromAws4, dhTable4);
}

@Test
public void readSampleParquetFilesFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-2")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.connectionTimeout(Duration.ofSeconds(1))
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final TableDefinition tableDefinition = TableDefinition.of(
ColumnDefinition.ofString("hash"),
ColumnDefinition.ofLong("version"),
ColumnDefinition.ofLong("size"),
ColumnDefinition.ofString("block_hash"),
ColumnDefinition.ofLong("block_number"),
ColumnDefinition.ofLong("index"),
ColumnDefinition.ofLong("virtual_size"),
ColumnDefinition.ofLong("lock_time"),
ColumnDefinition.ofLong("input_count"),
ColumnDefinition.ofLong("output_count"),
ColumnDefinition.ofBoolean("isCoinbase"),
ColumnDefinition.ofDouble("output_value"),
ColumnDefinition.ofTime("last_modified"),
ColumnDefinition.ofDouble("input_value"));
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setTableDefinition(tableDefinition)
.build();
ParquetTools.readTable(
"s3://aws-public-blockchain/v1.0/btc/transactions/date=2009-01-03/part-00000-bdd84ab2-82e9-4a79-8212-7accd76815e8-c000.snappy.parquet",
readInstructions).head(10).select();

ParquetTools.readTable(
"s3://aws-public-blockchain/v1.0/btc/transactions/date=2023-11-13/part-00000-da3a3c27-700d-496d-9c41-81281388eca8-c000.snappy.parquet",
readInstructions).head(10).select();
}

@Test
public void readFlatPartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setFileLayout(ParquetInstructions.ParquetFileLayout.FLAT_PARTITIONED)
.build();
final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/flatPartitionedParquet/",
readInstructions);
final Table expected = emptyTable(30).update("A = (int)i % 10");
assertTableEquals(expected, table);
}

@Test
public void readFlatPartitionedDataAsKeyValuePartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setFileLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)
.build();
final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/flatPartitionedParquet3/",
readInstructions);
final Table expected = emptyTable(30).update("A = (int)i % 10");
assertTableEquals(expected, table);
}

@Test
public void readKeyValuePartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setFileLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)
.build();
final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/KeyValuePartitionedData/",
readInstructions);
final List<ColumnDefinition<?>> partitioningColumns = table.getDefinition().getPartitioningColumns();
assertEquals(3, partitioningColumns.size());
assertEquals("PC1", partitioningColumns.get(0).getName());
assertEquals("PC2", partitioningColumns.get(1).getName());
assertEquals("PC3", partitioningColumns.get(2).getName());
assertEquals(100, table.size());
assertEquals(3, table.selectDistinct("PC1").size());
assertEquals(2, table.selectDistinct("PC2").size());
assertEquals(2, table.selectDistinct("PC3").size());
assertEquals(100, table.selectDistinct("I").size());
assertEquals(1, table.selectDistinct("J").size());
}

@Test
public void readKeyValuePartitionedParquetFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final TableDefinition ookla_table_definition = TableDefinition.of(
ColumnDefinition.ofInt("quarter").withPartitioning(),
ColumnDefinition.ofString("quadkey"));
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setTableDefinition(ookla_table_definition)
.build();
final Table table = ParquetTools.readTable("s3://ookla-open-data/parquet/performance/type=mobile/year=2023",
readInstructions).head(10).select();
assertEquals(2, table.numColumns());
}

@Test
public void stringDictionaryTest() {
final int nullPos = -5;
Expand Down
Loading

0 comments on commit 61a11f4

Please sign in to comment.