Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved threading capabilities of S3+parquet #5451

Merged
merged 11 commits into from
May 8, 2024
1 change: 0 additions & 1 deletion Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//
package io.deephaven.base;

import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import org.jetbrains.annotations.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ enum ChannelType {
private final RAPriQueue<PerPathPool> releasePriority =
new RAPriQueue<>(8, PerPathPool.RAPQ_ADAPTER, PerPathPool.class);

public CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider,
public static CachedChannelProvider create(@NotNull final SeekableChannelsProvider wrappedProvider,
final int maximumPooledCount) {
if (wrappedProvider instanceof CachedChannelProvider) {
throw new IllegalArgumentException("Cannot wrap a CachedChannelProvider in another CachedChannelProvider");
}
return new CachedChannelProvider(wrappedProvider, maximumPooledCount);
}

private CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider,
final int maximumPooledCount) {
this.wrappedProvider = wrappedProvider;
this.maximumPooledCount = Require.gtZero(maximumPooledCount, "maximumPooledCount");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ private SeekableChannelsProviderLoader() {
}

/**
* Create a new {@link SeekableChannelsProvider} based on given URI and object using the plugins loaded by the
* {@link ServiceLoader}. For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can
* read files from S3.
* Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the
* plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a
* {@link SeekableChannelsProvider} which can read files from S3.
*
* @param uri The URI
* @param object An optional object to pass to the {@link SeekableChannelsProviderPlugin} implementations.
* @param specialInstructions An optional object to pass special instructions to the provider.
* @return A {@link SeekableChannelsProvider} for the given URI.
*/
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, @Nullable final Object object) {
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri,
@Nullable final Object specialInstructions) {
for (final SeekableChannelsProviderPlugin plugin : providers) {
if (plugin.isCompatible(uri, object)) {
return plugin.createProvider(uri, object);
if (plugin.isCompatible(uri, specialInstructions)) {
return plugin.createProvider(uri, specialInstructions);
}
}
throw new UnsupportedOperationException("No plugin found for uri: " + uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
Expand All @@ -32,7 +33,7 @@ public class CachedChannelProviderTest {
@Test
public void testSimpleRead() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int ii = 0; ii < 100; ++ii) {
final SeekableByteChannel[] sameFile = new SeekableByteChannel[10];
for (int jj = 0; jj < sameFile.length; ++jj) {
Expand All @@ -55,7 +56,7 @@ public void testSimpleRead() throws IOException {
@Test
public void testSimpleReadWrite() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc =
((i / 100) % 2 == 0 ? cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i)
Expand All @@ -69,7 +70,7 @@ public void testSimpleReadWrite() throws IOException {
@Test
public void testSimpleWrite() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + i, false);
// Call write to hit the assertions inside the mock channel
Expand All @@ -86,7 +87,7 @@ public void testSimpleWrite() throws IOException {
@Test
public void testSimpleAppend() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("a" + i, true);
rc.close();
Expand All @@ -100,7 +101,7 @@ public void testSimpleAppend() throws IOException {
@Test
public void testCloseOrder() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 20; i++) {
List<SeekableByteChannel> channels = new ArrayList<>();
for (int j = 0; j < 50; j++) {
Expand All @@ -121,7 +122,7 @@ public void testCloseOrder() throws IOException {
@Test
public void testReuse() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 50);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 50);
final SeekableByteChannel[] someResult = new SeekableByteChannel[50];
final ByteBuffer buffer = ByteBuffer.allocate(1);
for (int ci = 0; ci < someResult.length; ++ci) {
Expand Down Expand Up @@ -149,7 +150,7 @@ public void testReuse() throws IOException {
@Test
public void testReuse10() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
final SeekableByteChannel[] someResult = new SeekableByteChannel[100];
for (int pi = 0; pi < 10; ++pi) {
for (int ci = 0; ci < 10; ++ci) {
Expand All @@ -173,6 +174,17 @@ public void testReuse10() throws IOException {
assertEquals(0, closed.size());
}

@Test
void testRewrapCachedChannelProvider() {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
try {
CachedChannelProvider.create(cachedChannelProvider, 100);
fail("Expected IllegalArgumentException on rewrapping CachedChannelProvider");
} catch (final IllegalArgumentException expected) {
}
}


private class TestChannelProvider implements SeekableChannelsProvider {

Expand Down
26 changes: 26 additions & 0 deletions Util/src/main/java/io/deephaven/util/thread/ThreadHelpers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.util.thread;

import io.deephaven.configuration.Configuration;

public class ThreadHelpers {
/**
* Get the number of threads to use for a given configuration key, defaulting to the number of available processors
* if the configuration key is set to a non-positive value, or the configuration key is not set and the provided
* default is non-positive.
*
* @param configKey The configuration key to look up
* @param defaultValue The default value to use if the configuration key is not set
* @return The number of threads to use
*/
public static int getOrComputeThreadCountProperty(final String configKey, final int defaultValue) {
final int numThreads = Configuration.getInstance().getIntegerWithDefault(configKey, defaultValue);
if (numThreads <= 0) {
return Runtime.getRuntime().availableProcessors();
} else {
return numThreads;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.engine.table.impl;

import io.deephaven.chunk.util.pools.MultiChunkPool;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.util.thread.NamingThreadFactory;
Expand All @@ -17,6 +16,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static io.deephaven.util.thread.ThreadHelpers.getOrComputeThreadCountProperty;

/**
* Implementation of OperationInitializer that delegates to a pool of threads.
*/
Expand All @@ -25,17 +26,8 @@ public class OperationInitializationThreadPool implements OperationInitializer {
/**
* The number of threads that will be used for parallel initialization in this process
*/
public static final int NUM_THREADS;

static {
final int numThreads =
Configuration.getInstance().getIntegerWithDefault("OperationInitializationThreadPool.threads", -1);
if (numThreads <= 0) {
NUM_THREADS = Runtime.getRuntime().availableProcessors();
} else {
NUM_THREADS = numThreads;
}
}
private static final int NUM_THREADS =
getOrComputeThreadCountProperty("OperationInitializationThreadPool.threads", -1);
private final ThreadLocal<Boolean> isInitializationThread = ThreadLocal.withInitial(() -> false);

private final ThreadPoolExecutor executorService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3890,7 +3890,7 @@ public void testMultiPartitionSymbolTableBy() throws IOException {
t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num");

final Table loaded = ParquetTools.readPartitionedTableInferSchema(
new ParquetKeyValuePartitionedLayout(testRootFile, 2, ParquetInstructions.EMPTY),
new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY),
ParquetInstructions.EMPTY);

// verify the sources are identical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
//
package io.deephaven.parquet.base;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import org.apache.parquet.format.*;
import org.apache.parquet.format.ColumnOrder;
import org.apache.parquet.format.Type;
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.util.*;
Expand All @@ -44,94 +42,50 @@ public class ParquetFileReader {

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
* {@link UncheckedIOException}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final File parquetFile,
@Nullable final Object specialInstructions) {
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return createChecked(parquetFile, specialInstructions);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFile, e);
return new ParquetFileReader(convertToURI(parquetFile, false), channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFile, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
* {@link UncheckedIOException}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final URI parquetFileURI,
@Nullable final Object specialInstructions) {
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return createChecked(parquetFileURI, specialInstructions);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFileURI, e);
return new ParquetFileReader(parquetFileURI, channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFileURI, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final File parquetFile,
@Nullable final Object specialInstructions) throws IOException {
return createChecked(convertToURI(parquetFile, false), specialInstructions);
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final URI parquetFileURI,
@Nullable final Object specialInstructions) throws IOException {
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
parquetFileURI, specialInstructions);
return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(provider, 1 << 7));
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param source The source path or URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*/
public ParquetFileReader(final String source, final SeekableChannelsProvider channelsProvider)
throws IOException {
this(convertToURI(source, false), channelsProvider);
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @param provider The {@link SeekableChannelsProvider} to use for reading the file
*/
public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider)
throws IOException {
this.channelsProvider = channelsProvider;
private ParquetFileReader(
@NotNull final URI parquetFileURI,
@NotNull final SeekableChannelsProvider provider) throws IOException {
this.channelsProvider = CachedChannelProvider.create(provider, 1 << 7);
if (!parquetFileURI.getRawPath().endsWith(".parquet") && FILE_URI_SCHEME.equals(parquetFileURI.getScheme())) {
// Construct a new file URI for the parent directory
rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true);
Expand Down Expand Up @@ -270,7 +224,7 @@ private Set<String> calculateColumnsWithDictionaryUsedOnEveryDataPage() {

/**
* Create a {@link RowGroupReader} object for provided row group number
*
*
* @param version The "version" string from deephaven specific parquet metadata, or null if it's not present.
*/
public RowGroupReader getRowGroup(final int groupNumber, final String version) {
Expand Down Expand Up @@ -506,7 +460,7 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedTyp

/**
* Helper method to determine if a logical type is adjusted to UTC.
*
*
* @param logicalType the logical type to check
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
*/
Expand Down
Loading
Loading