diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureOutputFile.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureOutputFile.java index 261601c197e7..19e4a1522a3d 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureOutputFile.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureOutputFile.java @@ -23,6 +23,7 @@ import java.nio.file.FileAlreadyExistsException; import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static java.util.Objects.requireNonNull; class AzureOutputFile @@ -65,10 +66,21 @@ public OutputStream create(AggregatedMemoryContext memoryContext) } @Override - public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext) + public void createOrOverwrite(byte[] data) throws IOException { - return createOutputStream(memoryContext, true); + try (OutputStream out = createOutputStream(newSimpleAggregatedMemoryContext(), true)) { + out.write(data); + } + } + + @Override + public void createExclusive(byte[] data) + throws IOException + { + try (OutputStream outputStream = create()) { + outputStream.write(data); + } } private AzureOutputStream createOutputStream(AggregatedMemoryContext memoryContext, boolean overwrite) diff --git a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/AbstractTestAzureFileSystem.java b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/AbstractTestAzureFileSystem.java index 91f142324f4c..c9c1a7624258 100644 --- a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/AbstractTestAzureFileSystem.java +++ b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/AbstractTestAzureFileSystem.java @@ -174,6 +174,12 @@ protected final void verifyFileSystemIsEmpty() assertThat(blobContainerClient.listBlobs()).isEmpty(); } + @Override + protected boolean supportsCreateExclusive() + { + return true; + } + @Test @Override public void testPaths() diff --git a/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystem.java b/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystem.java index b84929ebb6bb..7811a4036a61 100644 --- a/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystem.java +++ b/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystem.java @@ -87,12 +87,6 @@ protected boolean isHierarchical() return false; } - @Override - protected boolean isFileContentCaching() - { - return true; - } - @Override protected boolean supportsCreateExclusive() { diff --git a/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystemAccessOperations.java b/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystemAccessOperations.java index 37dfebd201df..449aa4b8607d 100644 --- a/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystemAccessOperations.java +++ b/lib/trino-filesystem-cache-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioCacheFileSystemAccessOperations.java @@ -122,9 +122,7 @@ public void testCache() .build()); byte[] modifiedContent = "modified content".getBytes(StandardCharsets.UTF_8); - try (OutputStream output = fileSystem.newOutputFile(location).createOrOverwrite()) { - output.write(modifiedContent); - } + fileSystem.newOutputFile(location).createOrOverwrite(modifiedContent); // Clear the cache, as lastModified time might be unchanged cacheKeyProvider.increaseCacheVersion(); diff --git a/lib/trino-filesystem-gcs/pom.xml b/lib/trino-filesystem-gcs/pom.xml index 3abf22ab85b4..97b150f30e8e 100644 --- a/lib/trino-filesystem-gcs/pom.xml +++ b/lib/trino-filesystem-gcs/pom.xml @@ -98,11 +98,6 @@ configuration - - io.airlift - slice - - io.airlift units diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsOutputFile.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsOutputFile.java index b8051e0902e1..302286f3f98c 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsOutputFile.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsOutputFile.java @@ -14,34 +14,28 @@ package io.trino.filesystem.gcs; import com.google.cloud.WriteChannel; -import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobTargetOption; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.StorageException; -import io.airlift.slice.Slice; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoOutputFile; import io.trino.memory.context.AggregatedMemoryContext; import java.io.IOException; import java.io.OutputStream; -import java.net.HttpURLConnection; import java.nio.file.FileAlreadyExistsException; import static com.google.common.base.Preconditions.checkArgument; import static io.trino.filesystem.gcs.GcsUtils.getBlob; import static io.trino.filesystem.gcs.GcsUtils.handleGcsException; +import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; import static java.util.Objects.requireNonNull; public class GcsOutputFile implements TrinoOutputFile { - private static final BlobTargetOption[] DOES_NOT_EXIST_TARGET_OPTION = {BlobTargetOption.doesNotExist()}; - private static final BlobWriteOption[] DOES_NOT_EXIST_WRITE_OPTION = {BlobWriteOption.doesNotExist()}; - private static final BlobWriteOption[] EMPTY_WRITE_OPTIONS = {}; - private final GcsLocation location; private final Storage storage; private final long writeBlockSizeBytes; @@ -55,75 +49,46 @@ public GcsOutputFile(GcsLocation location, Storage storage, long writeBlockSizeB } @Override - public OutputStream create(AggregatedMemoryContext memoryContext) - throws IOException - { - return createOutputStream(memoryContext, false); - } - - @Override - public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext) + public void createOrOverwrite(byte[] data) throws IOException { - return createOutputStream(memoryContext, true); + try { + storage.create(blobInfo(), data); + } + catch (RuntimeException e) { + throw handleGcsException(e, "writing file", location); + } } @Override - public void createExclusive(Slice content, AggregatedMemoryContext memoryContext) + public void createExclusive(byte[] data) throws IOException { try { if (getBlob(storage, location).isPresent()) { throw new FileAlreadyExistsException("File %s already exists".formatted(location)); } - storage.create( - BlobInfo.newBuilder(BlobId.of(location.bucket(), location.path())).build(), - content.getBytes(), - DOES_NOT_EXIST_TARGET_OPTION); - } - catch (FileAlreadyExistsException e) { - throw e; - } - catch (StorageException e) { - // When the file corresponding to `location` already exists, the operation will fail with the exception message `412 Precondition Failed` - if (e.getCode() == HttpURLConnection.HTTP_PRECON_FAILED) { - throw new FileAlreadyExistsException(location.toString()); - } - throw handleGcsException(e, "writing file", location); + storage.create(blobInfo(), data, BlobTargetOption.doesNotExist()); } catch (RuntimeException e) { + throwIfAlreadyExists(e); throw handleGcsException(e, "writing file", location); } } - private OutputStream createOutputStream(AggregatedMemoryContext memoryContext, boolean overwrite) + @Override + public OutputStream create(AggregatedMemoryContext memoryContext) throws IOException { try { - BlobWriteOption[] blobWriteOptions = EMPTY_WRITE_OPTIONS; - if (!overwrite) { - if (getBlob(storage, location).isPresent()) { - throw new FileAlreadyExistsException("File %s already exists".formatted(location)); - } - blobWriteOptions = DOES_NOT_EXIST_WRITE_OPTION; + if (getBlob(storage, location).isPresent()) { + throw new FileAlreadyExistsException("File %s already exists".formatted(location)); } - WriteChannel writeChannel = storage.writer( - BlobInfo.newBuilder(BlobId.of(location.bucket(), location.path())).build(), - blobWriteOptions); - + WriteChannel writeChannel = storage.writer(blobInfo(), BlobWriteOption.doesNotExist()); return new GcsOutputStream(location, writeChannel, memoryContext, writeBlockSizeBytes); } - catch (FileAlreadyExistsException e) { - throw e; - } - catch (StorageException e) { - // When the file corresponding to `location` already exists, the operation will fail with the exception message `412 Precondition Failed` - if (e.getCode() == HttpURLConnection.HTTP_PRECON_FAILED) { - throw new FileAlreadyExistsException(location.toString()); - } - throw handleGcsException(e, "writing file", location); - } catch (RuntimeException e) { + throwIfAlreadyExists(e); throw handleGcsException(e, "writing file", location); } } @@ -133,4 +98,18 @@ public Location location() { return location.location(); } + + private BlobInfo blobInfo() + { + return BlobInfo.newBuilder(location.bucket(), location.path()).build(); + } + + private void throwIfAlreadyExists(RuntimeException e) + throws FileAlreadyExistsException + { + // when `location` already exists, the operation will fail with `412 Precondition Failed` + if ((e instanceof StorageException se) && (se.getCode() == HTTP_PRECON_FAILED)) { + throw new FileAlreadyExistsException(location.toString()); + } + } } diff --git a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java index f984c5d6e1bf..f3fa480e8fa8 100644 --- a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java +++ b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java @@ -19,7 +19,6 @@ import org.junit.jupiter.api.TestInstance; import java.io.IOException; -import java.io.OutputStream; import static java.nio.charset.StandardCharsets.UTF_8; @@ -44,9 +43,7 @@ void testCreateFileRetry() // In practice this may happen between 7 and 20 retries. for (int i = 1; i <= 30; i++) { TrinoOutputFile outputFile = getFileSystem().newOutputFile(getRootLocation().appendPath("testFile")); - try (OutputStream out = outputFile.createOrOverwrite()) { - out.write("test".getBytes(UTF_8)); - } + outputFile.createOrOverwrite("test".getBytes(UTF_8)); } } } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java index 7e44ff2406b5..27ef778c5072 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java @@ -18,6 +18,7 @@ import io.trino.memory.context.AggregatedMemoryContext; import software.amazon.awssdk.services.s3.S3Client; +import java.io.IOException; import java.io.OutputStream; import static java.util.Objects.requireNonNull; @@ -38,14 +39,16 @@ public S3OutputFile(S3Client client, S3Context context, S3Location location) } @Override - public OutputStream create(AggregatedMemoryContext memoryContext) + public void createOrOverwrite(byte[] data) + throws IOException { - // S3 doesn't support exclusive file creation. Fallback to createOrOverwrite(). - return createOrOverwrite(memoryContext); + try (OutputStream out = create()) { + out.write(data); + } } @Override - public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext) + public OutputStream create(AggregatedMemoryContext memoryContext) { return new S3OutputStream(memoryContext, client, context, location); } diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/AbstractTestS3FileSystem.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/AbstractTestS3FileSystem.java index 56b17fae8d82..d6be2d050188 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/AbstractTestS3FileSystem.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/AbstractTestS3FileSystem.java @@ -32,7 +32,6 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import java.io.IOException; -import java.io.OutputStream; import java.util.List; import static com.google.common.collect.Iterables.getOnlyElement; @@ -153,9 +152,7 @@ void testFileWithTrailingWhitespaceAgainstNativeClient() // Verify writing byte[] newContents = "bar bar baz new content".getBytes(UTF_8); - try (OutputStream outputStream = fileSystem.newOutputFile(fileEntry.location()).createOrOverwrite()) { - outputStream.write(newContents.clone()); - } + fileSystem.newOutputFile(fileEntry.location()).createOrOverwrite(newContents); assertThat(s3Client.getObjectAsBytes(request -> request.bucket(bucket()).key(key)).asByteArray()) .isEqualTo(newContents); diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoOutputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoOutputFile.java index 99168084cc00..a842ed9c030a 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoOutputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoOutputFile.java @@ -14,7 +14,6 @@ package io.trino.filesystem; -import io.airlift.slice.Slice; import io.trino.memory.context.AggregatedMemoryContext; import java.io.IOException; @@ -26,10 +25,7 @@ public interface TrinoOutputFile { /** - * Create file exclusively, failing or overwriting if the file already exists. For file systems which do not - * support exclusive creation (e.g. S3), this will fallback to createOrOverwrite(). - * - * @throws FileAlreadyExistsException If a file of that name already exists + * This method delegates to {@link #create(AggregatedMemoryContext)}. */ default OutputStream create() throws IOException @@ -37,41 +33,46 @@ default OutputStream create() return create(newSimpleAggregatedMemoryContext()); } - default OutputStream createOrOverwrite() - throws IOException - { - return createOrOverwrite(newSimpleAggregatedMemoryContext()); - } + /** + * Create file with the specified content, atomically if possible. + * The file will be replaced if it already exists. + * If an error occurs while writing and the implementation does not + * support atomic writes, then a partial file may be written, + * or the original file may be deleted or left unchanged. + */ + void createOrOverwrite(byte[] data) + throws IOException; /** - * Create file exclusively and atomically with specified contents. + * Create file exclusively and atomically with the specified content. + * If an error occurs while writing, the file will not be created. + * + * @throws FileAlreadyExistsException if the file already exists + * @throws UnsupportedOperationException if the file system does not support this operation */ - default void createExclusive(Slice content) + default void createExclusive(byte[] data) throws IOException { - createExclusive(content, newSimpleAggregatedMemoryContext()); + throw new UnsupportedOperationException("createExclusive not supported by " + getClass()); } /** - * Create file exclusively, failing or overwriting if the file already exists. For file systems which do not - * support exclusive creation (e.g. S3), this will fall back to createOrOverwrite(). + * Open an output stream for creating a new file. + * This method is expected to be used with unique locations. + *

+ * If the file already exists, the method may fail or the file may be overwritten, + * depending on the file system implementation. + * For example, S3 does not support creating files exclusively, and performing an + * existence check before creating the file is both expensive and unnecessary for + * locations that are expected to be unique. + *

+ * The file may be created immediately, or it may be created atomically when the + * output stream is closed, depending on the file system implementation. * - * @throws FileAlreadyExistsException If a file of that name already exists + * @throws FileAlreadyExistsException if the file already exists (optional) */ OutputStream create(AggregatedMemoryContext memoryContext) throws IOException; - OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext) - throws IOException; - - /** - * Create file exclusively and atomically with specified contents. - */ - default void createExclusive(Slice content, AggregatedMemoryContext memoryContext) - throws IOException - { - throw new UnsupportedOperationException("createExclusive not supported by " + getClass()); - } - Location location(); } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalOutputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalOutputFile.java index 87deb5fd11ff..be6d2f4fb1ec 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalOutputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalOutputFile.java @@ -59,13 +59,15 @@ public OutputStream create(AggregatedMemoryContext memoryContext) } @Override - public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext) + public void createOrOverwrite(byte[] data) throws IOException { try { Files.createDirectories(path.getParent()); OutputStream stream = Files.newOutputStream(path); - return new LocalOutputStream(location, stream); + try (OutputStream out = new LocalOutputStream(location, stream)) { + out.write(data); + } } catch (IOException e) { throw handleException(location, e); diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryOutputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryOutputFile.java index 7d4373242e16..db5e3aac0795 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryOutputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryOutputFile.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; +import static io.airlift.slice.Slices.wrappedBuffer; import static java.util.Objects.requireNonNull; class MemoryOutputFile @@ -57,17 +58,17 @@ public OutputStream create(AggregatedMemoryContext memoryContext) } @Override - public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext) + public void createOrOverwrite(byte[] data) throws IOException { - return new MemoryOutputStream(location, outputBlob::overwriteBlob); + outputBlob.overwriteBlob(wrappedBuffer(data)); } @Override - public void createExclusive(Slice content, AggregatedMemoryContext memoryContext) + public void createExclusive(byte[] data) throws IOException { - outputBlob.createBlob(content); + outputBlob.createBlob(wrappedBuffer(data)); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingOutputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingOutputFile.java index 89125f49d102..e1078e4cb989 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingOutputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingOutputFile.java @@ -13,7 +13,6 @@ */ package io.trino.filesystem.tracing; -import io.airlift.slice.Slice; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.trino.filesystem.Location; @@ -49,23 +48,23 @@ public OutputStream create() } @Override - public OutputStream createOrOverwrite() + public void createOrOverwrite(byte[] data) throws IOException { Span span = tracer.spanBuilder("OutputFile.createOrOverwrite") .setAttribute(FileSystemAttributes.FILE_LOCATION, toString()) .startSpan(); - return withTracing(span, () -> delegate.createOrOverwrite()); + withTracing(span, () -> delegate.createOrOverwrite(data)); } @Override - public void createExclusive(Slice content) + public void createExclusive(byte[] data) throws IOException { Span span = tracer.spanBuilder("OutputFile.createExclusive") .setAttribute(FileSystemAttributes.FILE_LOCATION, toString()) .startSpan(); - withTracing(span, () -> delegate.createExclusive(content)); + withTracing(span, () -> delegate.createExclusive(data)); } @Override @@ -78,26 +77,6 @@ public OutputStream create(AggregatedMemoryContext memoryContext) return withTracing(span, () -> delegate.create(memoryContext)); } - @Override - public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext) - throws IOException - { - Span span = tracer.spanBuilder("OutputFile.createOrOverwrite") - .setAttribute(FileSystemAttributes.FILE_LOCATION, toString()) - .startSpan(); - return withTracing(span, () -> delegate.createOrOverwrite(memoryContext)); - } - - @Override - public void createExclusive(Slice content, AggregatedMemoryContext memoryContext) - throws IOException - { - Span span = tracer.spanBuilder("OutputFile.createExclusive") - .setAttribute(FileSystemAttributes.FILE_LOCATION, toString()) - .startSpan(); - withTracing(span, () -> delegate.createExclusive(content, memoryContext)); - } - @Override public Location location() { diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java index df911b825cdf..dce101db24d5 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java @@ -41,22 +41,19 @@ import java.util.UUID; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import static com.google.common.base.Preconditions.checkState; -import static io.airlift.slice.Slices.EMPTY_SLICE; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.slice.Slices.wrappedBuffer; import static java.lang.Math.min; -import static java.lang.Math.toIntExact; import static java.nio.charset.StandardCharsets.US_ASCII; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.abort; @@ -86,7 +83,7 @@ protected boolean isCreateExclusive() } /** - * Specifies whether implementation supports {@link TrinoOutputFile#createExclusive(Slice)}. + * Specifies whether implementation supports {@link TrinoOutputFile#createExclusive(byte[])}. */ protected boolean supportsCreateExclusive() { @@ -113,11 +110,6 @@ protected boolean seekPastEndOfFileFails() return true; } - protected boolean isFileContentCaching() - { - return false; - } - protected Location createLocation(String path) { if (path.isEmpty()) { @@ -527,12 +519,12 @@ void testOutputFile() // re-create exclusive is an error if (supportsCreateExclusive()) { - assertThatThrownBy(() -> outputFile.createExclusive(EMPTY_SLICE)) + assertThatThrownBy(() -> outputFile.createExclusive(new byte[0])) .isInstanceOf(FileAlreadyExistsException.class) .hasMessageContaining(tempBlob.location().toString()); } else { - assertThatThrownBy(() -> outputFile.createExclusive(EMPTY_SLICE)) + assertThatThrownBy(() -> outputFile.createExclusive(new byte[0])) .isInstanceOf(UnsupportedOperationException.class) .hasMessageStartingWith("createExclusive not supported"); } @@ -551,21 +543,19 @@ void testOutputFile() // create exclusive is an error if (supportsCreateExclusive()) { - assertThatThrownBy(() -> outputFile.createExclusive(EMPTY_SLICE)) + assertThatThrownBy(() -> outputFile.createExclusive(new byte[0])) .isInstanceOf(FileAlreadyExistsException.class) .hasMessageContaining(tempBlob.location().toString()); } else { - assertThatThrownBy(() -> outputFile.createExclusive(EMPTY_SLICE)) + assertThatThrownBy(() -> outputFile.createExclusive(new byte[0])) .isInstanceOf(UnsupportedOperationException.class) .hasMessageStartingWith("createExclusive not supported"); } } // overwrite file - try (OutputStream outputStream = outputFile.createOrOverwrite()) { - outputStream.write("overwrite".getBytes(UTF_8)); - } + outputFile.createOrOverwrite("overwrite".getBytes(UTF_8)); // verify file is different assertThat(tempBlob.read()).isEqualTo("overwrite"); @@ -581,11 +571,11 @@ void testCreateExclusiveIsAtomic() } int timeoutSeconds = 20; - ExecutorService executor = Executors.newCachedThreadPool(io.airlift.concurrent.Threads.daemonThreadsNamed("testCreateExclusiveIsAtomic-%s")); + ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("testCreateExclusiveIsAtomic-%s")); AtomicBoolean finishing = new AtomicBoolean(false); try (TempBlob tempBlob = randomBlobLocation("outputFile")) { TrinoFileSystem fileSystem = getFileSystem(); - Slice content = wrappedBuffer("a".repeat(MEGABYTE).getBytes(US_ASCII)); + byte[] content = "a".repeat(MEGABYTE).getBytes(US_ASCII); fileSystem.deleteFile(tempBlob.location()); CyclicBarrier barrier = new CyclicBarrier(2); @@ -601,7 +591,7 @@ void testCreateExclusiveIsAtomic() while (!finishing.get()) { try (TrinoInput input = inputFile.newInput()) { - return input.readFully(0, content.length()); + return input.readFully(0, content.length); } catch (FileNotFoundException expected) { } @@ -609,7 +599,7 @@ void testCreateExclusiveIsAtomic() throw new RuntimeException("File not created"); }); - assertThat(read.get(timeoutSeconds, SECONDS)).as("read content").isEqualTo(content); + assertThat(read.get(timeoutSeconds, SECONDS).getBytes()).as("read content").isEqualTo(content); write.get(timeoutSeconds, SECONDS); } finally { @@ -700,7 +690,7 @@ protected void testPathHierarchical() { // file outside of root is not allowed // the check is over the entire statement, because some file system delay path checks until the data is uploaded - assertThatThrownBy(() -> getFileSystem().newOutputFile(createLocation("../file")).createOrOverwrite().close()) + assertThatThrownBy(() -> getFileSystem().newOutputFile(createLocation("../file")).createOrOverwrite("test".getBytes(UTF_8))) .isInstanceOfAny(IOException.class, IllegalArgumentException.class) .hasMessageContaining(createLocation("../file").toString()); @@ -1024,9 +1014,8 @@ public void testFileWithTrailingWhitespace() // Verify writing byte[] newContents = "bar bar baz new content".getBytes(UTF_8); - try (OutputStream outputStream = getFileSystem().newOutputFile(location).createOrOverwrite()) { - outputStream.write(newContents.clone()); - } + getFileSystem().newOutputFile(location).createOrOverwrite(newContents); + // Open a new input file with an updated file length. If we read with the old inputFile the cached (wrong) file length would be used. // This can break some file system read operations (e.g., TrinoInput.readTail for most filesystems, newStream for caching file systems). TrinoInputFile newInputFile = getFileSystem().newInputFile(location); @@ -1281,45 +1270,9 @@ public void testLargeFileDoesNotExistUntilClosed() getFileSystem().deleteFile(location); } - @Test - public void testLargeFileIsNotOverwrittenUntilClosed() - throws IOException - { - if (!supportsIncompleteWriteNoClobber()) { - abort("skipped"); - } - Location location = getRootLocation().appendPath("testLargeFileIsNotOverwrittenUntilClosed-%s".formatted(UUID.randomUUID())); - try (OutputStream outputStream = getFileSystem().newOutputFile(location).createOrOverwrite()) { - outputStream.write("test".getBytes(UTF_8)); - } - TrinoInputFile inputFile = getFileSystem().newInputFile(location); - int fileSize = toIntExact(inputFile.length()); - @SuppressWarnings("NumericCastThatLosesPrecision") - byte[] fileBytes = new byte[fileSize]; - assertEquals(fileSize, readFile(inputFile, fileBytes)); - assertEquals("test", new String(fileBytes, UTF_8)); - try (OutputStream outputStream = getFileSystem().newOutputFile(location).createOrOverwrite()) { - // Write a 17 MB file to ensure data is flushed to storage by exceeding the buffer size - byte[] bytes = getBytes(); - int target = 17 * MEGABYTE; - int count = 0; - while (count < target) { - outputStream.write(bytes); - count += bytes.length; - if (count + bytes.length >= target) { - assertEquals(fileSize, readFile(inputFile, fileBytes)); - assertEquals("test", new String(fileBytes, UTF_8)); - } - } - } - assertTrue(fileExistsInListing(location)); - assertTrue(fileExists(location)); - getFileSystem().deleteFile(location); - } - + @SuppressWarnings("ConstantValue") private static byte[] getBytes() { - @SuppressWarnings("NumericCastThatLosesPrecision") byte[] bytes = new byte[8192]; for (int i = 0; i < bytes.length; i++) { bytes[i] = (byte) i; @@ -1347,15 +1300,6 @@ private boolean fileExists(Location location) return getFileSystem().newInputFile(location).exists(); } - private int readFile(TrinoInputFile inputFile, byte[] destination) - throws IOException - { - try (InputStream inputStream = inputFile.newStream()) { - checkState(destination.length >= inputFile.length(), "destination is smaller than file"); - return inputStream.readNBytes(destination, 0, toIntExact(inputFile.length())); - } - } - private String readLocation(Location path) { try (InputStream inputStream = getFileSystem().newInputFile(path).newStream()) { @@ -1443,8 +1387,8 @@ public TrinoOutputFile outputFile() public void createOrOverwrite(String data) { - try (OutputStream outputStream = outputFile().createOrOverwrite()) { - outputStream.write(data.getBytes(UTF_8)); + try { + outputFile().createOrOverwrite(data.getBytes(UTF_8)); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystemAccessOperations.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystemAccessOperations.java index afb90bd0440c..9dce610521c8 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystemAccessOperations.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystemAccessOperations.java @@ -86,9 +86,7 @@ void testCache() .build()); byte[] modifiedContent = "modified content".getBytes(StandardCharsets.UTF_8); - try (OutputStream output = fileSystem.newOutputFile(location).createOrOverwrite()) { - output.write(modifiedContent); - } + fileSystem.newOutputFile(location).createOrOverwrite(modifiedContent); assertReadOperations(location, modifiedContent, ImmutableMultiset.builder() diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsOutputFile.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsOutputFile.java index b91342528301..9a3d3b9e304d 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsOutputFile.java +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsOutputFile.java @@ -14,7 +14,6 @@ package io.trino.filesystem.hdfs; import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; -import io.airlift.slice.Slice; import io.airlift.stats.TimeStat; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoOutputFile; @@ -36,6 +35,7 @@ import static io.trino.filesystem.hdfs.HadoopPaths.hadoopPath; import static io.trino.filesystem.hdfs.HdfsFileSystem.withCause; import static io.trino.hdfs.FileSystemUtils.getRawFileSystem; +import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static java.util.Objects.requireNonNull; class HdfsOutputFile @@ -63,21 +63,23 @@ public OutputStream create(AggregatedMemoryContext memoryContext) } @Override - public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext) + public void createOrOverwrite(byte[] data) throws IOException { - return create(true, memoryContext); + try (OutputStream out = create(true, newSimpleAggregatedMemoryContext())) { + out.write(data); + } } @Override - public void createExclusive(Slice content, AggregatedMemoryContext memoryContext) + public void createExclusive(byte[] data) throws IOException { Path file = hadoopPath(location); FileSystem fileSystem = getRawFileSystem(environment.getFileSystem(context, file)); if (fileSystem instanceof GoogleHadoopFileSystem) { GcsAtomicOutputStream atomicOutputStream = new GcsAtomicOutputStream(environment, context, file); - atomicOutputStream.write(content.getBytes()); + atomicOutputStream.write(data); atomicOutputStream.close(); return; } diff --git a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/avro/TestAvroBase.java b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/avro/TestAvroBase.java index 7df383d93b2d..77f35c66966c 100644 --- a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/avro/TestAvroBase.java +++ b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/avro/TestAvroBase.java @@ -398,7 +398,7 @@ protected TrinoInputFile createWrittenFileWithData(Schema schema, List fileWriter = new DataFileWriter<>(new GenericDatumWriter<>())) { - fileWriter.create(schema, trinoLocalFilesystem.newOutputFile(location).createOrOverwrite()); + fileWriter.create(schema, trinoLocalFilesystem.newOutputFile(location).create()); for (GenericRecord genericRecord : records) { fileWriter.append(genericRecord); } @@ -412,7 +412,7 @@ protected TrinoInputFile createWrittenFileWithSchema(int count, Schema schema) Iterator randomData = new RandomData(schema, count).iterator(); Location tempFile = createLocalTempLocation(); try (DataFileWriter fileWriter = new DataFileWriter<>(new GenericDatumWriter<>())) { - fileWriter.create(schema, trinoLocalFilesystem.newOutputFile(tempFile).createOrOverwrite()); + fileWriter.create(schema, trinoLocalFilesystem.newOutputFile(tempFile).create()); while (randomData.hasNext()) { fileWriter.append((GenericRecord) randomData.next()); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/MetaDirStatisticsAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/MetaDirStatisticsAccess.java index 233281e101af..a36d49893bc4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/MetaDirStatisticsAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/MetaDirStatisticsAccess.java @@ -26,7 +26,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.Optional; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR; @@ -95,9 +94,7 @@ public void updateExtendedStatistics( Location statisticsPath = Location.of(tableLocation).appendPath(STATISTICS_META_DIR).appendPath(STATISTICS_FILE); TrinoFileSystem fileSystem = fileSystemFactory.create(session); - try (OutputStream outputStream = fileSystem.newOutputFile(statisticsPath).createOrOverwrite()) { - outputStream.write(statisticsCodec.toJsonBytes(statistics)); - } + fileSystem.newOutputFile(statisticsPath).createOrOverwrite(statisticsCodec.toJsonBytes(statistics)); // Remove outdated Starburst stats file, if it exists. Location starburstStatisticsPath = Location.of(tableLocation).appendPath(STARBURST_META_DIR).appendPath(STARBURST_STATISTICS_FILE); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java index 28bd0d5bce34..d9add1ab2cfa 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java @@ -33,7 +33,6 @@ import io.trino.spi.type.TypeManager; import java.io.IOException; -import java.io.OutputStream; import java.io.UncheckedIOException; import java.util.List; import java.util.Optional; @@ -162,9 +161,7 @@ public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot) LastCheckpoint newLastCheckpoint = new LastCheckpoint(newCheckpointVersion, checkpointEntries.size(), Optional.empty()); Location checkpointPath = transactionLogDir.appendPath(LAST_CHECKPOINT_FILENAME); TrinoOutputFile outputFile = fileSystem.newOutputFile(checkpointPath); - try (OutputStream outputStream = outputFile.createOrOverwrite()) { - outputStream.write(lastCheckpointCodec.toJsonBytes(newLastCheckpoint)); - } + outputFile.createOrOverwrite(lastCheckpointCodec.toJsonBytes(newLastCheckpoint)); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/GcsTransactionLogSynchronizer.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/GcsTransactionLogSynchronizer.java index 255535cf096b..0fc42d7988a2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/GcsTransactionLogSynchronizer.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/GcsTransactionLogSynchronizer.java @@ -23,7 +23,6 @@ import java.io.UncheckedIOException; import java.nio.file.FileAlreadyExistsException; -import static io.airlift.slice.Slices.wrappedBuffer; import static java.util.Objects.requireNonNull; public class GcsTransactionLogSynchronizer @@ -44,7 +43,7 @@ public void write(ConnectorSession session, String clusterId, Location newLogEnt { TrinoFileSystem fileSystem = fileSystemFactory.create(session); try { - fileSystem.newOutputFile(newLogEntryPath).createExclusive(wrappedBuffer(entryContents)); + fileSystem.newOutputFile(newLogEntryPath).createExclusive(entryContents); } catch (FileAlreadyExistsException e) { throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath + " to GCS", e); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/FileTestingTransactionLogSynchronizer.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/FileTestingTransactionLogSynchronizer.java index 1b2677996ab5..20812f5b9d28 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/FileTestingTransactionLogSynchronizer.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/FileTestingTransactionLogSynchronizer.java @@ -22,7 +22,6 @@ import io.trino.spi.connector.ConnectorSession; import java.io.IOException; -import java.io.OutputStream; import java.io.UncheckedIOException; import static java.util.Objects.requireNonNull; @@ -50,9 +49,7 @@ public void write(ConnectorSession session, String clusterId, Location newLogEnt try { TrinoFileSystem fileSystem = fileSystemFactory.create(session); TrinoOutputFile outputFile = fileSystem.newOutputFile(newLogEntryPath); - try (OutputStream outputStream = outputFile.createOrOverwrite()) { - outputStream.write(entryContents); - } + outputFile.createOrOverwrite(entryContents); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeGcsConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeGcsConnectorSmokeTest.java index 1f7ffb37c1ba..b2094661315b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeGcsConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeGcsConnectorSmokeTest.java @@ -15,8 +15,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.io.ByteSource; -import com.google.common.io.ByteStreams; import com.google.common.io.Resources; import com.google.common.reflect.ClassPath; import io.airlift.log.Logger; @@ -37,7 +35,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.io.OutputStream; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; @@ -177,11 +174,9 @@ protected void registerTableFromResources(String table, String resourcePath, Que .collect(toImmutableList()); for (ClassPath.ResourceInfo resourceInfo : resources) { String fileName = resourceInfo.getResourceName().replaceFirst("^" + Pattern.quote(resourcePath), quoteReplacement(targetDirectory)); - ByteSource byteSource = resourceInfo.asByteSource(); + byte[] bytes = resourceInfo.asByteSource().read(); TrinoOutputFile trinoOutputFile = fileSystem.newOutputFile(Location.of(fileName)); - try (OutputStream fileStream = trinoOutputFile.createOrOverwrite()) { - ByteStreams.copy(byteSource.openBufferedStream(), fileStream); - } + trinoOutputFile.createOrOverwrite(bytes); } } catch (IOException e) { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java index 5572b14ee038..1fd4dd17f3f9 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java @@ -15,8 +15,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.io.ByteSource; -import com.google.common.io.ByteStreams; import com.google.common.reflect.ClassPath; import io.airlift.concurrent.MoreFutures; import io.trino.Session; @@ -35,7 +33,6 @@ import org.junit.jupiter.api.TestInstance; import java.io.IOException; -import java.io.OutputStream; import java.io.UncheckedIOException; import java.nio.file.Path; import java.util.List; @@ -299,11 +296,9 @@ protected void registerTableFromResources(String table, String resourcePath, Que .collect(toImmutableList()); for (ClassPath.ResourceInfo resourceInfo : resources) { String fileName = resourceInfo.getResourceName().replaceFirst("^" + Pattern.quote(resourcePath), quoteReplacement(tableLocation)); - ByteSource byteSource = resourceInfo.asByteSource(); + byte[] bytes = resourceInfo.asByteSource().read(); TrinoOutputFile trinoOutputFile = fileSystem.newOutputFile(Location.of(fileName)); - try (OutputStream fileStream = trinoOutputFile.createOrOverwrite()) { - ByteStreams.copy(byteSource.openBufferedStream(), fileStream); - } + trinoOutputFile.createOrOverwrite(bytes); } } catch (IOException e) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java index 558c6c36743e..05a9555ad50c 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java @@ -1321,9 +1321,7 @@ public synchronized void replaceFunction(String databaseName, String functionNam byte[] json = functionCodec.toJsonBytes(function); try { - try (OutputStream outputStream = fileSystem.newOutputFile(file).createOrOverwrite()) { - outputStream.write(json); - } + fileSystem.newOutputFile(file).createOrOverwrite(json); } catch (IOException e) { throw new TrinoException(HIVE_METASTORE_ERROR, "Could not write function", e); @@ -1482,16 +1480,24 @@ private void writeFile(String type, Location location, JsonCodec codec, T try { byte[] json = codec.toJsonBytes(value); - if (!overwrite) { + TrinoOutputFile output = fileSystem.newOutputFile(location); + if (overwrite) { + output.createOrOverwrite(json); + } + else { + // best-effort exclusive and atomic creation if (fileSystem.newInputFile(location).exists()) { throw new TrinoException(HIVE_METASTORE_ERROR, type + " file already exists"); } - } - - // todo implement safer overwrite code - TrinoOutputFile output = fileSystem.newOutputFile(location); - try (OutputStream outputStream = overwrite ? output.createOrOverwrite() : output.create()) { - outputStream.write(json); + try { + output.createExclusive(json); + } + catch (UnsupportedOperationException ignored) { + // fall back to non-exclusive creation, relying on synchronization and above exists check + try (OutputStream out = output.create()) { + out.write(json); + } + } } } catch (Exception e) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AcidTables.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AcidTables.java index a7f92eb734e6..7fd83beaa065 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AcidTables.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/AcidTables.java @@ -86,9 +86,7 @@ public static void writeAcidVersionFile(TrinoFileSystem fileSystem, Location del throws IOException { TrinoOutputFile file = fileSystem.newOutputFile(versionFilePath(deltaOrBaseDir)); - try (var out = file.createOrOverwrite()) { - out.write('2'); - } + file.createOrOverwrite(new byte[] {'2'}); } public static int readAcidVersionFile(TrinoFileSystem fileSystem, Location deltaOrBaseDir) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingOutputFile.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingOutputFile.java index 8d0f956688b7..0084f3a233d6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingOutputFile.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingOutputFile.java @@ -54,13 +54,8 @@ public PositionOutputStream create() @Override public PositionOutputStream createOrOverwrite() { - try { - // Callers of this method don't have access to memory context, so we skip tracking memory here - return new CountingPositionOutputStream(outputFile.createOrOverwrite()); - } - catch (IOException e) { - throw new UncheckedIOException("Failed to create file: " + location(), e); - } + // Iceberg never overwrites existing files. All callers use unique names. + return create(); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index bf16d3846f01..484289b66e15 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -32,7 +32,6 @@ import io.trino.plugin.hive.HiveCompressionCodec; import io.trino.plugin.hive.TestingHivePlugin; import io.trino.plugin.iceberg.fileio.ForwardingFileIo; -import io.trino.plugin.iceberg.fileio.ForwardingOutputFile; import io.trino.server.DynamicFilterService; import io.trino.spi.QueryId; import io.trino.spi.connector.ColumnHandle; @@ -69,10 +68,10 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; @@ -4189,11 +4188,12 @@ public void testIncorrectIcebergFileSizes() dataFile.put("file_size_in_bytes", alteredValue); // Write altered metadata - try (OutputStream out = fileSystem.newOutputFile(Location.of(manifestFile)).createOrOverwrite(); - DataFileWriter dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema))) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (DataFileWriter dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema))) { dataFileWriter.create(schema, out); dataFileWriter.append(entry); } + fileSystem.newOutputFile(Location.of(manifestFile)).createOrOverwrite(out.toByteArray()); // Ignoring Iceberg provided file size makes the query succeed Session session = Session.builder(getSession()) @@ -4719,6 +4719,7 @@ protected void verifyIcebergTableProperties(MaterializedResult actual) @Test public void testGetIcebergTableWithLegacyOrcBloomFilterProperties() + throws IOException { String tableName = "test_get_table_with_legacy_orc_bloom_filter_" + randomNameSuffix(); assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); @@ -4738,7 +4739,8 @@ public void testGetIcebergTableWithLegacyOrcBloomFilterProperties() tableMetadata.sortOrder(), tableMetadata.location(), newProperties); - TableMetadataParser.overwrite(newTableMetadata, new ForwardingOutputFile(fileSystem, Location.of(metadataLocation))); + byte[] metadataJson = TableMetadataParser.toJson(newTableMetadata).getBytes(UTF_8); + fileSystem.newOutputFile(Location.of(metadataLocation)).createOrOverwrite(metadataJson); assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName)) .contains("orc_bloom_filter_columns", "orc_bloom_filter_fpp"); @@ -7126,11 +7128,11 @@ public void testNoRetryWhenMetadataFileInvalid() // Add duplicate field to produce validation error while reading the metadata file fieldsNode.add(newFieldNode); - String modifiedJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode); - try (OutputStream outputStream = fileSystem.newOutputFile(Location.of(metadataFileLocation)).createOrOverwrite()) { - // Corrupt metadata file by overwriting the invalid metadata content - outputStream.write(modifiedJson.getBytes(UTF_8)); - } + byte[] modifiedJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsBytes(jsonNode); + + // Corrupt metadata file by overwriting the invalid metadata content + fileSystem.newOutputFile(Location.of(metadataFileLocation)).createOrOverwrite(modifiedJson); + assertThat(query("SELECT * FROM " + tableName)) .failure().hasMessage("Invalid metadata file for table tpch.%s".formatted(tableName)); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java index e86fa98936ba..1bcb313d4587 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java @@ -106,7 +106,7 @@ public void testCreateTable() .add(new FileOperation(METADATA_JSON, "OutputFile.create")) .add(new FileOperation(SNAPSHOT, "InputFile.length")) .add(new FileOperation(SNAPSHOT, "InputFile.newStream")) - .add(new FileOperation(SNAPSHOT, "OutputFile.createOrOverwrite")) + .add(new FileOperation(SNAPSHOT, "OutputFile.create")) .build()); } @@ -118,7 +118,7 @@ public void testCreateOrReplaceTable() .add(new FileOperation(METADATA_JSON, "OutputFile.create")) .add(new FileOperation(SNAPSHOT, "InputFile.length")) .add(new FileOperation(SNAPSHOT, "InputFile.newStream")) - .add(new FileOperation(SNAPSHOT, "OutputFile.createOrOverwrite")) + .add(new FileOperation(SNAPSHOT, "OutputFile.create")) .build()); assertFileSystemAccesses("CREATE OR REPLACE TABLE test_create_or_replace (id VARCHAR, age INT)", ImmutableMultiset.builder() @@ -126,7 +126,7 @@ public void testCreateOrReplaceTable() .add(new FileOperation(METADATA_JSON, "InputFile.newStream")) .add(new FileOperation(SNAPSHOT, "InputFile.length")) .add(new FileOperation(SNAPSHOT, "InputFile.newStream")) - .add(new FileOperation(SNAPSHOT, "OutputFile.createOrOverwrite")) + .add(new FileOperation(SNAPSHOT, "OutputFile.create")) .build()); } @@ -140,8 +140,8 @@ public void testCreateTableAsSelect() .add(new FileOperation(METADATA_JSON, "OutputFile.create")) .add(new FileOperation(SNAPSHOT, "InputFile.length")) .add(new FileOperation(SNAPSHOT, "InputFile.newStream")) - .add(new FileOperation(SNAPSHOT, "OutputFile.createOrOverwrite")) - .add(new FileOperation(MANIFEST, "OutputFile.createOrOverwrite")) + .add(new FileOperation(SNAPSHOT, "OutputFile.create")) + .add(new FileOperation(MANIFEST, "OutputFile.create")) .build()); assertFileSystemAccesses( @@ -152,8 +152,8 @@ public void testCreateTableAsSelect() .addCopies(new FileOperation(METADATA_JSON, "OutputFile.create"), 2) // TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats in one commit .add(new FileOperation(SNAPSHOT, "InputFile.length")) .add(new FileOperation(SNAPSHOT, "InputFile.newStream")) - .add(new FileOperation(SNAPSHOT, "OutputFile.createOrOverwrite")) - .add(new FileOperation(MANIFEST, "OutputFile.createOrOverwrite")) + .add(new FileOperation(SNAPSHOT, "OutputFile.create")) + .add(new FileOperation(MANIFEST, "OutputFile.create")) .add(new FileOperation(STATS, "OutputFile.create")) .build()); } @@ -168,8 +168,8 @@ public void testCreateOrReplaceTableAsSelect() .add(new FileOperation(METADATA_JSON, "InputFile.newStream")) .add(new FileOperation(SNAPSHOT, "InputFile.length")) .add(new FileOperation(SNAPSHOT, "InputFile.newStream")) - .add(new FileOperation(SNAPSHOT, "OutputFile.createOrOverwrite")) - .add(new FileOperation(MANIFEST, "OutputFile.createOrOverwrite")) + .add(new FileOperation(SNAPSHOT, "OutputFile.create")) + .add(new FileOperation(MANIFEST, "OutputFile.create")) .add(new FileOperation(STATS, "OutputFile.create")) .build()); @@ -180,8 +180,8 @@ public void testCreateOrReplaceTableAsSelect() .addCopies(new FileOperation(METADATA_JSON, "InputFile.newStream"), 2) .addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 2) .addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 2) - .add(new FileOperation(SNAPSHOT, "OutputFile.createOrOverwrite")) - .add(new FileOperation(MANIFEST, "OutputFile.createOrOverwrite")) + .add(new FileOperation(SNAPSHOT, "OutputFile.create")) + .add(new FileOperation(MANIFEST, "OutputFile.create")) .add(new FileOperation(MANIFEST, "InputFile.newStream")) .add(new FileOperation(STATS, "OutputFile.create")) .build()); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java index 5df3dbad90ce..ce902b4c320c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; import io.trino.Session; import io.trino.filesystem.Location; import io.trino.testing.QueryRunner; @@ -21,9 +22,6 @@ import io.trino.testing.sql.TestTable; import org.junit.jupiter.api.Test; -import java.io.File; -import java.io.OutputStream; -import java.nio.file.Files; import java.util.List; import java.util.Map; import java.util.Optional; @@ -132,9 +130,8 @@ private void testReadSingleIntegerColumnOrcFile(String orcFileResourceName, int checkArgument(expectedValue != 0); try (TestTable table = new TestTable(getQueryRunner()::execute, "test_read_as_integer", "(\"_col0\") AS VALUES 0, NULL")) { String orcFilePath = (String) computeScalar(format("SELECT DISTINCT file_path FROM \"%s$files\"", table.getName())); - try (OutputStream outputStream = fileSystem.newOutputFile(Location.of(orcFilePath)).createOrOverwrite()) { - Files.copy(new File(getResource(orcFileResourceName).toURI()).toPath(), outputStream); - } + byte[] orcFileData = Resources.toByteArray(getResource(orcFileResourceName)); + fileSystem.newOutputFile(Location.of(orcFilePath)).createOrOverwrite(orcFileData); fileSystem.deleteFiles(List.of(Location.of(orcFilePath.replaceAll("/([^/]*)$", ".$1.crc")))); Session ignoreFileSizeFromMetadata = Session.builder(getSession())