Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a soft reference based shared cache for S3 reads #5357

Merged
merged 29 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
82bc5a1
Added a concurrent hash map based shared cache
malhotrashivam Apr 4, 2024
61b70ba
Moved to a soft reference based cache
malhotrashivam Apr 4, 2024
03dd512
Added KeyedLongObjectHashMap based cache
malhotrashivam Apr 8, 2024
c3394ce
Merge branch 'main' into sm-s3-cache
malhotrashivam Apr 23, 2024
c03c7f9
Review comments
malhotrashivam Apr 24, 2024
6fcb50f
Moved from a KeyedIntObjectHashMap to KeyedObjectHashMap
malhotrashivam Apr 24, 2024
e0d4025
Renamed the class
malhotrashivam Apr 24, 2024
a8994c4
Merge branch 'main' into sm-s3-cache
malhotrashivam Apr 24, 2024
0da90b5
Merge branch 'main' into sm-s3-cache
malhotrashivam May 3, 2024
322681b
Review comments
malhotrashivam May 3, 2024
b35db77
Removing unused methods
malhotrashivam May 3, 2024
db09a4a
Improved comments
malhotrashivam May 3, 2024
89d1238
Minor tweaks
malhotrashivam May 3, 2024
ca7c74d
ParquetFileReader will use a single use context
malhotrashivam May 3, 2024
ebbe505
Added debug log messages
malhotrashivam May 3, 2024
9e72124
Merge branch 'main' into sm-s3-cache
malhotrashivam May 8, 2024
f65fbf7
Review comments part 1
malhotrashivam May 10, 2024
8a607a1
Review with Ryan Part 2
malhotrashivam May 10, 2024
364f30f
Review with Ryan Part 3
malhotrashivam May 10, 2024
57b5285
Review with Ryan Part 4
malhotrashivam May 13, 2024
60a4e23
Review with Ryan Part 5
malhotrashivam May 13, 2024
06b0226
Updated the defaults for S3 instructions
malhotrashivam May 13, 2024
b716c67
Renaming some variables
malhotrashivam May 13, 2024
8173578
Removed some unnecessary includes
malhotrashivam May 13, 2024
1a6eaae
Slightly tweaked the caching algorithm
malhotrashivam May 13, 2024
7624442
Python review
malhotrashivam May 13, 2024
20c3245
Review with Ryan contd.
malhotrashivam May 13, 2024
5fb2586
More review comments
malhotrashivam May 14, 2024
c0f4192
Some more review comments
malhotrashivam May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,29 @@
*/
public class CleanupReferenceProcessor {

@NotNull
public static CleanupReferenceProcessor getDefault() {
return Instance.DEFAULT.cleanupReferenceProcessor;
}

private enum Instance {
// @formatter:off
DEFAULT(new CleanupReferenceProcessor("default", 1000,
(l, r, e) -> l.warn()
.append(Thread.currentThread().getName())
.append(": Exception thrown from cleanup of ").append(Utils.REFERENT_FORMATTER, r)
.append(": ").append(e)
.endl())
);
// @formatter:on

private final CleanupReferenceProcessor cleanupReferenceProcessor;

Instance(@NotNull final CleanupReferenceProcessor cleanupReferenceProcessor) {
this.cleanupReferenceProcessor = Require.neqNull(cleanupReferenceProcessor, "cleanupReferenceProcessor");
}
}

private static final Logger log = LoggerFactory.getLogger(CleanupReferenceProcessor.class);

private static final boolean LOG_CLEANED_REFERENCES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public interface MemoizableOperation<T extends DynamicNode & NotificationStepRec
Configuration.getInstance().getIntegerWithDefault("QueryTable.parallelWhereSegments", -1);

/**
* You can chose to enable or disable the column parallel select and update.
* You can choose to enable or disable the column parallel select and update.
*/
static boolean ENABLE_PARALLEL_SELECT_AND_UPDATE =
Configuration.getInstance().getBooleanWithDefault("QueryTable.enableParallelSelectAndUpdate", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.engine.util.reference;

import io.deephaven.base.verify.Require;
import io.deephaven.util.Utils;
import io.deephaven.util.annotations.TestUseOnly;
import io.deephaven.util.reference.CleanupReferenceProcessor;
import org.jetbrains.annotations.NotNull;
Expand All @@ -17,13 +16,6 @@
public enum CleanupReferenceProcessorInstance {

// @formatter:off
DEFAULT(new CleanupReferenceProcessor("default", 1000,
(l, r, e) -> l.warn()
.append(Thread.currentThread().getName())
.append(": Exception thrown from cleanup of ").append(Utils.REFERENT_FORMATTER, r)
.append(": ").append(e)
.endl())
),
LIVENESS(new CleanupReferenceProcessor("liveness", 1000,
(l, r, e) -> {
if (e instanceof RuntimeException) {
Expand All @@ -50,5 +42,6 @@ public static void resetAllForUnitTests() {
final CleanupReferenceProcessorInstance instance : values()) {
instance.cleanupReferenceProcessor.resetForUnitTests();
}
CleanupReferenceProcessor.getDefault().resetForUnitTests();
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
import io.deephaven.base.ArrayUtil;
import io.deephaven.base.reference.WeakCleanupReference;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.ResettableContext;
import io.deephaven.engine.util.file.FileHandle;
import io.deephaven.engine.util.file.TrackedFileHandleFactory;
import io.deephaven.engine.util.reference.CleanupReferenceProcessorInstance;
import io.deephaven.extensions.arrow.sources.ArrowByteColumnSource;
import io.deephaven.extensions.arrow.sources.ArrowCharColumnSource;
import io.deephaven.extensions.arrow.sources.ArrowInstantColumnSource;
Expand Down Expand Up @@ -55,6 +53,7 @@
import io.deephaven.util.annotations.TestUseOnly;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.util.datastructures.SizeException;
import io.deephaven.util.reference.CleanupReferenceProcessor;
import org.apache.arrow.compression.CommonsCompressionFactory;
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.RecordBatch;
Expand Down Expand Up @@ -441,7 +440,7 @@ private static final class ReaderCleanup extends WeakCleanupReference<Shareable>
private final ArrowFileReader reader;

private ReaderCleanup(final Shareable shareable) {
super(shareable, CleanupReferenceProcessorInstance.DEFAULT.getReferenceQueue());
super(shareable, CleanupReferenceProcessor.getDefault().getReferenceQueue());
this.reader = shareable.getReader();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private ParquetFileReader(
rootURI = parquetFileURI;
}
try (
final SeekableChannelContext context = channelsProvider.makeContext();
final SeekableChannelContext context = channelsProvider.makeSingleUseContext();
final SeekableByteChannel ch = channelsProvider.getReadChannel(context, parquetFileURI)) {
positionToFileMetadata(parquetFileURI, ch);
try (final InputStream in = channelsProvider.getInputStream(ch)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static DeephavenCompressorAdapterFactory createInstance() {
// Use the Parquet LZ4_RAW codec, which internally uses aircompressor
Lz4RawCodec.class, CompressionCodecName.LZ4_RAW,

// The rest of these are aircompressor codecs which have fast / pure java implementations
// The rest of these are aircompressor codecs that have fast / pure java implementations
JdkGzipCodec.class, CompressionCodecName.GZIP,
LzoCodec.class, CompressionCodecName.LZO,
Lz4Codec.class, CompressionCodecName.LZ4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.file.TrackedFileHandleFactory;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.parquet.base.InvalidParquetFileException;
import io.deephaven.parquet.base.NullStatistics;
import io.deephaven.parquet.table.location.ParquetTableLocation;
Expand Down Expand Up @@ -76,7 +74,6 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
Expand Down Expand Up @@ -129,11 +126,6 @@ public final class ParquetTableReadWriteTest {
private static final ParquetInstructions EMPTY = ParquetInstructions.EMPTY;
private static final ParquetInstructions REFRESHING = ParquetInstructions.builder().setIsRefreshing(true).build();

// TODO(deephaven-core#5064): Add support for local S3 testing
// The following tests are disabled by default, as they are verifying against a remote system
private static final boolean ENABLE_S3_TESTING =
Configuration.getInstance().getBooleanWithDefault("ParquetTest.enableS3Testing", false);

private static File rootFile;

@Rule
Expand Down Expand Up @@ -1477,184 +1469,6 @@ public void testArrayColumns() {
&& !firstColumnMetadata.contains("RLE_DICTIONARY"));
}

@Test
public void readSampleParquetFilesFromDeephavenS3Bucket() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.build();
final Table fromAws1 =
ParquetTools.readTable("s3://dh-s3-parquet-test1/multiColFile.parquet", readInstructions).select();
final Table dhTable1 = TableTools.emptyTable(1_000_000).update("A=(int)i", "B=(double)(i+1)");
assertTableEquals(fromAws1, dhTable1);

final Table fromAws2 =
ParquetTools.readTable("s3://dh-s3-parquet-test1/singleColFile.parquet", readInstructions).select();
final Table dhTable2 = TableTools.emptyTable(5).update("A=(int)i");
assertTableEquals(fromAws2, dhTable2);

final Table fromAws3 = ParquetTools
.readTable("s3://dh-s3-parquet-test1/single%20col%20file%20with%20spaces%20in%20name.parquet",
readInstructions)
.select();
assertTableEquals(fromAws3, dhTable2);

final Table fromAws4 =
ParquetTools.readTable("s3://dh-s3-parquet-test1/singleColFile.parquet", readInstructions)
.select().sumBy();
final Table dhTable4 = TableTools.emptyTable(5).update("A=(int)i").sumBy();
assertTableEquals(fromAws4, dhTable4);
}

@Test
public void readSampleParquetFilesFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-2")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.connectionTimeout(Duration.ofSeconds(1))
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final TableDefinition tableDefinition = TableDefinition.of(
ColumnDefinition.ofString("hash"),
ColumnDefinition.ofLong("version"),
ColumnDefinition.ofLong("size"),
ColumnDefinition.ofString("block_hash"),
ColumnDefinition.ofLong("block_number"),
ColumnDefinition.ofLong("index"),
ColumnDefinition.ofLong("virtual_size"),
ColumnDefinition.ofLong("lock_time"),
ColumnDefinition.ofLong("input_count"),
ColumnDefinition.ofLong("output_count"),
ColumnDefinition.ofBoolean("isCoinbase"),
ColumnDefinition.ofDouble("output_value"),
ColumnDefinition.ofTime("last_modified"),
ColumnDefinition.ofDouble("input_value"));
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setTableDefinition(tableDefinition)
.build();
ParquetTools.readTable(
"s3://aws-public-blockchain/v1.0/btc/transactions/date=2009-01-03/part-00000-bdd84ab2-82e9-4a79-8212-7accd76815e8-c000.snappy.parquet",
readInstructions).head(10).select();

ParquetTools.readTable(
"s3://aws-public-blockchain/v1.0/btc/transactions/date=2023-11-13/part-00000-da3a3c27-700d-496d-9c41-81281388eca8-c000.snappy.parquet",
readInstructions).head(10).select();
}

@Test
public void readFlatPartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setFileLayout(ParquetInstructions.ParquetFileLayout.FLAT_PARTITIONED)
.build();
final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/flatPartitionedParquet/",
readInstructions);
final Table expected = emptyTable(30).update("A = (int)i % 10");
assertTableEquals(expected, table);
}

@Test
public void readFlatPartitionedDataAsKeyValuePartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setFileLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)
.build();
final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/flatPartitionedParquet3/",
readInstructions);
final Table expected = emptyTable(30).update("A = (int)i % 10");
assertTableEquals(expected, table);
}

@Test
public void readKeyValuePartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setFileLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)
.build();
final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/KeyValuePartitionedData/",
readInstructions);
final List<ColumnDefinition<?>> partitioningColumns = table.getDefinition().getPartitioningColumns();
assertEquals(3, partitioningColumns.size());
assertEquals("PC1", partitioningColumns.get(0).getName());
assertEquals("PC2", partitioningColumns.get(1).getName());
assertEquals("PC3", partitioningColumns.get(2).getName());
assertEquals(100, table.size());
assertEquals(3, table.selectDistinct("PC1").size());
assertEquals(2, table.selectDistinct("PC2").size());
assertEquals(2, table.selectDistinct("PC3").size());
assertEquals(100, table.selectDistinct("I").size());
assertEquals(1, table.selectDistinct("J").size());
}

@Test
public void readKeyValuePartitionedParquetFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final TableDefinition ookla_table_definition = TableDefinition.of(
ColumnDefinition.ofInt("quarter").withPartitioning(),
ColumnDefinition.ofString("quadkey"));
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setTableDefinition(ookla_table_definition)
.build();
final Table table = ParquetTools.readTable("s3://ookla-open-data/parquet/performance/type=mobile/year=2023",
readInstructions).head(10).select();
assertEquals(2, table.numColumns());
}

@Test
public void stringDictionaryTest() {
final int nullPos = -5;
Expand Down
Loading
Loading