Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change createOrOverwrite to accept file content #20913

Merged
merged 5 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.FileAlreadyExistsException;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static java.util.Objects.requireNonNull;

class AzureOutputFile
Expand Down Expand Up @@ -65,10 +66,21 @@ public OutputStream create(AggregatedMemoryContext memoryContext)
}

@Override
public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext)
public void createOrOverwrite(byte[] data)
throws IOException
{
return createOutputStream(memoryContext, true);
try (OutputStream out = createOutputStream(newSimpleAggregatedMemoryContext(), true)) {
out.write(data);
}
}

@Override
public void createExclusive(byte[] data)
throws IOException
{
try (OutputStream outputStream = create()) {
outputStream.write(data);
}
}

private AzureOutputStream createOutputStream(AggregatedMemoryContext memoryContext, boolean overwrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ protected final void verifyFileSystemIsEmpty()
assertThat(blobContainerClient.listBlobs()).isEmpty();
}

@Override
protected boolean supportsCreateExclusive()
{
return true;
}

@Test
@Override
public void testPaths()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,6 @@ protected boolean isHierarchical()
return false;
}

@Override
protected boolean isFileContentCaching()
{
return true;
}

@Override
protected boolean supportsCreateExclusive()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ public void testCache()
.build());

byte[] modifiedContent = "modified content".getBytes(StandardCharsets.UTF_8);
try (OutputStream output = fileSystem.newOutputFile(location).createOrOverwrite()) {
output.write(modifiedContent);
}
fileSystem.newOutputFile(location).createOrOverwrite(modifiedContent);

// Clear the cache, as lastModified time might be unchanged
cacheKeyProvider.increaseCacheVersion();
Expand Down
5 changes: 0 additions & 5 deletions lib/trino-filesystem-gcs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,28 @@
package io.trino.filesystem.gcs;

import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.StorageException;
import io.airlift.slice.Slice;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.memory.context.AggregatedMemoryContext;

import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.nio.file.FileAlreadyExistsException;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.filesystem.gcs.GcsUtils.getBlob;
import static io.trino.filesystem.gcs.GcsUtils.handleGcsException;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
import static java.util.Objects.requireNonNull;

public class GcsOutputFile
implements TrinoOutputFile
{
private static final BlobTargetOption[] DOES_NOT_EXIST_TARGET_OPTION = {BlobTargetOption.doesNotExist()};
private static final BlobWriteOption[] DOES_NOT_EXIST_WRITE_OPTION = {BlobWriteOption.doesNotExist()};
private static final BlobWriteOption[] EMPTY_WRITE_OPTIONS = {};

private final GcsLocation location;
private final Storage storage;
private final long writeBlockSizeBytes;
Expand All @@ -55,75 +49,46 @@ public GcsOutputFile(GcsLocation location, Storage storage, long writeBlockSizeB
}

@Override
public OutputStream create(AggregatedMemoryContext memoryContext)
throws IOException
{
return createOutputStream(memoryContext, false);
}

@Override
public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext)
public void createOrOverwrite(byte[] data)
throws IOException
{
return createOutputStream(memoryContext, true);
try {
storage.create(blobInfo(), data);
}
catch (RuntimeException e) {
throw handleGcsException(e, "writing file", location);
}
}

@Override
public void createExclusive(Slice content, AggregatedMemoryContext memoryContext)
public void createExclusive(byte[] data)
electrum marked this conversation as resolved.
Show resolved Hide resolved
throws IOException
{
try {
if (getBlob(storage, location).isPresent()) {
throw new FileAlreadyExistsException("File %s already exists".formatted(location));
}
storage.create(
BlobInfo.newBuilder(BlobId.of(location.bucket(), location.path())).build(),
content.getBytes(),
DOES_NOT_EXIST_TARGET_OPTION);
}
catch (FileAlreadyExistsException e) {
throw e;
}
catch (StorageException e) {
// When the file corresponding to `location` already exists, the operation will fail with the exception message `412 Precondition Failed`
if (e.getCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(location.toString());
}
throw handleGcsException(e, "writing file", location);
storage.create(blobInfo(), data, BlobTargetOption.doesNotExist());
}
catch (RuntimeException e) {
throwIfAlreadyExists(e);
throw handleGcsException(e, "writing file", location);
}
}

private OutputStream createOutputStream(AggregatedMemoryContext memoryContext, boolean overwrite)
@Override
public OutputStream create(AggregatedMemoryContext memoryContext)
throws IOException
{
try {
BlobWriteOption[] blobWriteOptions = EMPTY_WRITE_OPTIONS;
if (!overwrite) {
if (getBlob(storage, location).isPresent()) {
throw new FileAlreadyExistsException("File %s already exists".formatted(location));
}
blobWriteOptions = DOES_NOT_EXIST_WRITE_OPTION;
if (getBlob(storage, location).isPresent()) {
throw new FileAlreadyExistsException("File %s already exists".formatted(location));
}
WriteChannel writeChannel = storage.writer(
BlobInfo.newBuilder(BlobId.of(location.bucket(), location.path())).build(),
blobWriteOptions);

WriteChannel writeChannel = storage.writer(blobInfo(), BlobWriteOption.doesNotExist());
return new GcsOutputStream(location, writeChannel, memoryContext, writeBlockSizeBytes);
}
catch (FileAlreadyExistsException e) {
throw e;
}
catch (StorageException e) {
// When the file corresponding to `location` already exists, the operation will fail with the exception message `412 Precondition Failed`
if (e.getCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(location.toString());
}
throw handleGcsException(e, "writing file", location);
}
catch (RuntimeException e) {
throwIfAlreadyExists(e);
throw handleGcsException(e, "writing file", location);
}
}
Expand All @@ -133,4 +98,18 @@ public Location location()
{
return location.location();
}

private BlobInfo blobInfo()
{
return BlobInfo.newBuilder(location.bucket(), location.path()).build();
}

private void throwIfAlreadyExists(RuntimeException e)
throws FileAlreadyExistsException
{
// when `location` already exists, the operation will fail with `412 Precondition Failed`
if ((e instanceof StorageException se) && (se.getCode() == HTTP_PRECON_FAILED)) {
throw new FileAlreadyExistsException(location.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.junit.jupiter.api.TestInstance;

import java.io.IOException;
import java.io.OutputStream;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand All @@ -44,9 +43,7 @@ void testCreateFileRetry()
// In practice this may happen between 7 and 20 retries.
for (int i = 1; i <= 30; i++) {
TrinoOutputFile outputFile = getFileSystem().newOutputFile(getRootLocation().appendPath("testFile"));
try (OutputStream out = outputFile.createOrOverwrite()) {
out.write("test".getBytes(UTF_8));
}
outputFile.createOrOverwrite("test".getBytes(UTF_8));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.memory.context.AggregatedMemoryContext;
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.io.OutputStream;

import static java.util.Objects.requireNonNull;
Expand All @@ -38,14 +39,16 @@ public S3OutputFile(S3Client client, S3Context context, S3Location location)
}

@Override
public OutputStream create(AggregatedMemoryContext memoryContext)
public void createOrOverwrite(byte[] data)
throws IOException
{
// S3 doesn't support exclusive file creation. Fallback to createOrOverwrite().
return createOrOverwrite(memoryContext);
try (OutputStream out = create()) {
out.write(data);
}
}

@Override
public OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext)
public OutputStream create(AggregatedMemoryContext memoryContext)
{
return new S3OutputStream(memoryContext, client, context, location);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;

import static com.google.common.collect.Iterables.getOnlyElement;
Expand Down Expand Up @@ -153,9 +152,7 @@ void testFileWithTrailingWhitespaceAgainstNativeClient()

// Verify writing
byte[] newContents = "bar bar baz new content".getBytes(UTF_8);
try (OutputStream outputStream = fileSystem.newOutputFile(fileEntry.location()).createOrOverwrite()) {
outputStream.write(newContents.clone());
}
fileSystem.newOutputFile(fileEntry.location()).createOrOverwrite(newContents);
assertThat(s3Client.getObjectAsBytes(request -> request.bucket(bucket()).key(key)).asByteArray())
.isEqualTo(newContents);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package io.trino.filesystem;

import io.airlift.slice.Slice;
import io.trino.memory.context.AggregatedMemoryContext;

import java.io.IOException;
Expand All @@ -26,52 +25,54 @@
public interface TrinoOutputFile
{
/**
* Create file exclusively, failing or overwriting if the file already exists. For file systems which do not
* support exclusive creation (e.g. S3), this will fallback to createOrOverwrite().
*
* @throws FileAlreadyExistsException If a file of that name already exists
* This method delegates to {@link #create(AggregatedMemoryContext)}.
*/
default OutputStream create()
throws IOException
{
return create(newSimpleAggregatedMemoryContext());
}

default OutputStream createOrOverwrite()
throws IOException
{
return createOrOverwrite(newSimpleAggregatedMemoryContext());
}
/**
* Create file with the specified content, atomically if possible.
* The file will be replaced if it already exists.
electrum marked this conversation as resolved.
Show resolved Hide resolved
* If an error occurs while writing and the implementation does not
* support atomic writes, then a partial file may be written,
* or the original file may be deleted or left unchanged.
*/
void createOrOverwrite(byte[] data)
throws IOException;

/**
* Create file exclusively and atomically with specified contents.
* Create file exclusively and atomically with the specified content.
electrum marked this conversation as resolved.
Show resolved Hide resolved
* If an error occurs while writing, the file will not be created.
*
* @throws FileAlreadyExistsException if the file already exists
* @throws UnsupportedOperationException if the file system does not support this operation
*/
default void createExclusive(Slice content)
default void createExclusive(byte[] data)
throws IOException
{
createExclusive(content, newSimpleAggregatedMemoryContext());
throw new UnsupportedOperationException("createExclusive not supported by " + getClass());
}

/**
* Create file exclusively, failing or overwriting if the file already exists. For file systems which do not
* support exclusive creation (e.g. S3), this will fall back to createOrOverwrite().
* Open an output stream for creating a new file.
* This method is expected to be used with unique locations.
* <p>
* If the file already exists, the method may fail or the file may be overwritten,
* depending on the file system implementation.
* For example, S3 does not support creating files exclusively, and performing an
* existence check before creating the file is both expensive and unnecessary for
* locations that are expected to be unique.
* <p>
* The file may be created immediately, or it may be created atomically when the
* output stream is closed, depending on the file system implementation.
*
* @throws FileAlreadyExistsException If a file of that name already exists
* @throws FileAlreadyExistsException if the file already exists (optional)
*/
OutputStream create(AggregatedMemoryContext memoryContext)
throws IOException;

OutputStream createOrOverwrite(AggregatedMemoryContext memoryContext)
throws IOException;

/**
* Create file exclusively and atomically with specified contents.
*/
default void createExclusive(Slice content, AggregatedMemoryContext memoryContext)
throws IOException
{
throw new UnsupportedOperationException("createExclusive not supported by " + getClass());
}

Location location();
}
Loading