Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved threading capabilities of S3+parquet #5451

Merged
merged 11 commits into from
May 8, 2024
1 change: 0 additions & 1 deletion Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//
package io.deephaven.base;

import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import org.jetbrains.annotations.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ enum ChannelType {
private final RAPriQueue<PerPathPool> releasePriority =
new RAPriQueue<>(8, PerPathPool.RAPQ_ADAPTER, PerPathPool.class);

public CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider,
public static CachedChannelProvider create(@NotNull final SeekableChannelsProvider wrappedProvider,
final int maximumPooledCount) {
if (wrappedProvider instanceof CachedChannelProvider) {
throw new IllegalArgumentException("Cannot wrap a CachedChannelProvider in another CachedChannelProvider");
}
return new CachedChannelProvider(wrappedProvider, maximumPooledCount);
}

private CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider,
final int maximumPooledCount) {
this.wrappedProvider = wrappedProvider;
this.maximumPooledCount = Require.gtZero(maximumPooledCount, "maximumPooledCount");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* A service loader class for loading {@link SeekableChannelsProviderPlugin} implementations at runtime and provide
* {@link SeekableChannelsProvider} implementations for different URI schemes, e.g., S3.
*/
public final class SeekableChannelsProviderLoader implements SeekableChannelsProviderFactory {
public final class SeekableChannelsProviderLoader {

private static volatile SeekableChannelsProviderLoader instance;

Expand All @@ -37,16 +37,16 @@ private SeekableChannelsProviderLoader() {
}

/**
* Create a new {@link SeekableChannelsProvider} based on given URI and object using the plugins loaded by the
* {@link ServiceLoader}. For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can
* read files from S3.
* Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the
* plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a
* {@link SeekableChannelsProvider} which can read files from S3.
*
* @param uri The URI
* @param specialInstructions An optional object to pass special instructions to the provider.
* @return A {@link SeekableChannelsProvider} for the given URI.
*/
@Override
public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object specialInstructions) {
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri,
@Nullable final Object specialInstructions) {
for (final SeekableChannelsProviderPlugin plugin : providers) {
if (plugin.isCompatible(uri, specialInstructions)) {
return plugin.createProvider(uri, specialInstructions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
Expand All @@ -32,7 +33,7 @@ public class CachedChannelProviderTest {
@Test
public void testSimpleRead() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int ii = 0; ii < 100; ++ii) {
final SeekableByteChannel[] sameFile = new SeekableByteChannel[10];
for (int jj = 0; jj < sameFile.length; ++jj) {
Expand All @@ -55,7 +56,7 @@ public void testSimpleRead() throws IOException {
@Test
public void testSimpleReadWrite() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc =
((i / 100) % 2 == 0 ? cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i)
Expand All @@ -69,7 +70,7 @@ public void testSimpleReadWrite() throws IOException {
@Test
public void testSimpleWrite() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + i, false);
// Call write to hit the assertions inside the mock channel
Expand All @@ -86,7 +87,7 @@ public void testSimpleWrite() throws IOException {
@Test
public void testSimpleAppend() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("a" + i, true);
rc.close();
Expand All @@ -100,7 +101,7 @@ public void testSimpleAppend() throws IOException {
@Test
public void testCloseOrder() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 20; i++) {
List<SeekableByteChannel> channels = new ArrayList<>();
for (int j = 0; j < 50; j++) {
Expand All @@ -121,7 +122,7 @@ public void testCloseOrder() throws IOException {
@Test
public void testReuse() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 50);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 50);
final SeekableByteChannel[] someResult = new SeekableByteChannel[50];
final ByteBuffer buffer = ByteBuffer.allocate(1);
for (int ci = 0; ci < someResult.length; ++ci) {
Expand Down Expand Up @@ -149,7 +150,7 @@ public void testReuse() throws IOException {
@Test
public void testReuse10() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100);
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
final SeekableByteChannel[] someResult = new SeekableByteChannel[100];
for (int pi = 0; pi < 10; ++pi) {
for (int ci = 0; ci < 10; ++ci) {
Expand All @@ -173,6 +174,17 @@ public void testReuse10() throws IOException {
assertEquals(0, closed.size());
}

@Test
void testRewrapCachedChannelProvider() {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
try {
CachedChannelProvider.create(cachedChannelProvider, 100);
fail("Expected IllegalArgumentException on rewrapping CachedChannelProvider");
} catch (final IllegalArgumentException expected) {
}
}


private class TestChannelProvider implements SeekableChannelsProvider {

Expand Down
17 changes: 17 additions & 0 deletions Util/src/main/java/io/deephaven/util/thread/ThreadHelpers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.util.thread;

import io.deephaven.configuration.Configuration;

public class ThreadHelpers {
public static int getNumThreadsFromConfig(final String configKey) {
final int numThreads = Configuration.getInstance().getIntegerWithDefault(configKey, -1);
if (numThreads <= 0) {
return Runtime.getRuntime().availableProcessors();
} else {
return numThreads;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.engine.table.impl;

import io.deephaven.chunk.util.pools.MultiChunkPool;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.util.thread.NamingThreadFactory;
Expand All @@ -17,6 +16,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static io.deephaven.util.thread.ThreadHelpers.getNumThreadsFromConfig;

/**
* Implementation of OperationInitializer that delegates to a pool of threads.
*/
Expand All @@ -25,18 +26,8 @@ public class OperationInitializationThreadPool implements OperationInitializer {
/**
* The number of threads that will be used for parallel initialization in this process
*/
public static final int NUM_THREADS;

static {
final int numThreads =
Configuration.getInstance().getIntegerWithDefault("OperationInitializationThreadPool.threads", -1);
if (numThreads <= 0) {
NUM_THREADS = Runtime.getRuntime().availableProcessors();
} else {
NUM_THREADS = numThreads;
}
}
private final ThreadLocal<Boolean> isInitializationThread = ThreadLocal.withInitial(() -> false);
private static final int NUM_THREADS = getNumThreadsFromConfig("OperationInitializationThreadPool.threads");
private static final ThreadLocal<Boolean> isInitializationThread = ThreadLocal.withInitial(() -> false);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

private final ThreadPoolExecutor executorService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ public FileKeyValuePartitionLayout(
@NotNull final BiFunction<Path, Map<String, Comparable<?>>, TLK> keyFactory,
final int maxPartitioningLevels) {
super(keyFactory);
// TODO This can be an issue because table location keys are being generated for us here. So they would
// internally
// make a different provider for each location key. I know that enterprise uses this constructor.
// Should I just break the constructor of ParquetTableLocationKey and force add a channels provider?
this.tableRootDirectory = tableRootDirectory;
this.pathFilter = pathFilter;
this.locationTableBuilderFactory = locationTableBuilderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3890,7 +3890,7 @@ public void testMultiPartitionSymbolTableBy() throws IOException {
t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num");

final Table loaded = ParquetTools.readPartitionedTableInferSchema(
new ParquetKeyValuePartitionedLayout(testRootFile, 2, ParquetInstructions.EMPTY),
new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY),
ParquetInstructions.EMPTY);

// verify the sources are identical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
//
package io.deephaven.parquet.base;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import org.apache.parquet.format.*;
import org.apache.parquet.format.ColumnOrder;
import org.apache.parquet.format.Type;
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.util.*;
Expand All @@ -44,94 +42,50 @@ public class ParquetFileReader {

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
* {@link UncheckedIOException}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final File parquetFile,
final SeekableChannelsProvider channelsProvider) {
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return createChecked(parquetFile, channelsProvider);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFile, e);
return new ParquetFileReader(convertToURI(parquetFile, false), channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFile, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
* {@link UncheckedIOException}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final URI parquetFileURI,
@Nullable final SeekableChannelsProvider channelsProvider) {
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return createChecked(parquetFileURI, channelsProvider);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFileURI, e);
return new ParquetFileReader(parquetFileURI, channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFileURI, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final File parquetFile,
final SeekableChannelsProvider channelsProvider) throws IOException {
return createChecked(convertToURI(parquetFile, false), channelsProvider);
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final URI parquetFileURI,
final SeekableChannelsProvider channelsProvider) throws IOException {
return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(channelsProvider, 1 << 7));
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param source The source path or URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*
* @deprecated: Use {@link #createChecked(URI, SeekableChannelsProvider)} instead
*/
@Deprecated
public ParquetFileReader(final String source, final SeekableChannelsProvider channelsProvider)
throws IOException {
this(convertToURI(source, false), channelsProvider);
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*
* @deprecated: Use {@link #createChecked(URI, SeekableChannelsProvider)} instead
* @param provider The {@link SeekableChannelsProvider} to use for reading the file
*/
@Deprecated
public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider)
throws IOException {
this.channelsProvider = channelsProvider;
private ParquetFileReader(
@NotNull final URI parquetFileURI,
@NotNull final SeekableChannelsProvider provider) throws IOException {
this.channelsProvider = CachedChannelProvider.create(provider, 1 << 7);
if (!parquetFileURI.getRawPath().endsWith(".parquet") && FILE_URI_SCHEME.equals(parquetFileURI.getScheme())) {
// Construct a new file URI for the parent directory
rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true);
Expand Down Expand Up @@ -270,7 +224,7 @@ private Set<String> calculateColumnsWithDictionaryUsedOnEveryDataPage() {

/**
* Create a {@link RowGroupReader} object for provided row group number
*
*
* @param version The "version" string from deephaven specific parquet metadata, or null if it's not present.
*/
public RowGroupReader getRowGroup(final int groupNumber, final String version) {
Expand Down Expand Up @@ -506,7 +460,7 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedTyp

/**
* Helper method to determine if a logical type is adjusted to UTC.
*
*
* @param logicalType the logical type to check
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
*/
Expand Down
Loading
Loading