Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for memory allocated by trino s3 staging output stream #14407

Conversation

homar
Copy link
Member

@homar homar commented Sep 30, 2022

Description

Account for memory allocated by TrinoS3FileSystems
Fixes: #14023

Non-technical explanation

Some queries that previously killed cluster should be gracefully terminated now

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Core
* Account for memory allocated by TrinoS3FileSystems. ({issue}`14023`)

@cla-bot cla-bot bot added the cla-signed label Sep 30, 2022
@homar homar force-pushed the homar/account_for_memory_allocated_by_TrinoS3StagingOutputStream branch 2 times, most recently from da5abd6 to d9ca5e1 Compare October 1, 2022 06:58
@findepi findepi requested a review from sopel39 October 1, 2022 17:00
@homar homar force-pushed the homar/account_for_memory_allocated_by_TrinoS3StagingOutputStream branch 3 times, most recently from c19095e to 6f3e3cd Compare October 2, 2022 16:43
@homar homar changed the title Homar/account for memory allocated by trino s3 staging output stream Account for memory allocated by trino s3 staging output stream Oct 2, 2022
@homar homar force-pushed the homar/account_for_memory_allocated_by_TrinoS3StagingOutputStream branch from 6f3e3cd to 4e4368d Compare October 3, 2022 08:37
@homar homar marked this pull request as ready for review October 3, 2022 13:01
@homar homar requested a review from findepi October 3, 2022 13:01
implements MemoryAware
{
private final FSDataOutputStream delegate;
private final Supplier<Long> getOutputStreamRetainedSizeInBytesSupplier;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOutputStreamRetainedSizeInBytesSupplier -> retainedSizeInBytesSupplier

}

@Override
public String location()
{
return path;
}

private static class MemoryAwareOutputStreamWrapper
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just extend io.trino.hdfs.TrinoFileSystemCache.OutputStreamWrapper

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi @electrum why do we need OutputStreamWrapper anyway?

Seems like io.trino.hdfs.TrinoFileSystemCache.OutputStreamWrapper#getWrappedStream

        @Override
        public OutputStream getWrappedStream()
        {
            return ((FSDataOutputStream) super.getWrappedStream()).getWrappedStream();
        }

is fragile assumption

Copy link
Member Author

@homar homar Oct 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just extend io.trino.hdfs.TrinoFileSystemCache.OutputStreamWrapper

it is private i didn't want to change that, but sure I can do it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is private i didn't want to change that, but sure I can do it

Oh, but I mean you can make OutputStreamWrapper do what MemoryAwareOutputStreamWrapper does (provide MemoryAware interface for wrapper stream)

Copy link
Member Author

@homar homar Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, right

@@ -40,6 +42,8 @@
private final String outputPath;
private final TrinoFileSystem fileSystem;

private Supplier<Long> getOutputStreamRetainedSizeInBytesSupplier;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOutputStreamRetainedSizeInBytesSupplier -> outputStreamRetainedSizeInBytesSupplier

@Override
public long getRetainedSizeInBytes()
{
// TODO find a better way to estimate this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If transferManager does not hold for memory outside between TrinoS3StagingOutputStream calls, then it should be 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure about this but you are right probably

@homar homar force-pushed the homar/account_for_memory_allocated_by_TrinoS3StagingOutputStream branch from 4e4368d to 32e2c21 Compare October 3, 2022 22:47
@@ -401,7 +401,7 @@ public RemoteIterator<LocatedFileStatus> listFiles(Path path, boolean recursive)
}
}

private static class OutputStreamWrapper
public static class OutputStreamWrapper
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep it private


public class OutputStreamOrcDataSink
implements OrcDataSink
{
private static final int INSTANCE_SIZE = toIntExact(ClassLayout.parseClass(OutputStreamOrcDataSink.class).instanceSize());

private final OutputStreamSliceOutput output;
private final Supplier<Long> getOutputStreamRetainedSizeInBytesSupplier;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOutputStreamRetainedSizeInBytesSupplier -> outputStreamRetainedSizeInBytesSupplier

@sopel39
Copy link
Member

sopel39 commented Oct 4, 2022

I think clearer approach is to use io.trino.filesystem.TrinoOutputFile in writers and modify io.trino.filesystem.TrinoOutputFile#create:

    OutputStream create(AggregatedMemoryContext context);

    OutputStream createOrOverwrite(AggregatedMemoryContext context)

then in io.trino.filesystem.hdfs.HdfsOutputFile#create(boolean) you can just check if FileSystem is TrinoS3FileSystem (see io.trino.plugin.hive.util.HiveWriteUtils#isS3FileSystem) and use dedicated create method (to be created) in TrinoS3FileSystem.

This way there is less intermediate layers and OutputStream unwrapping. It should be easy to test (just check if TrinoOutputFile accounts for memory in S3) and it won't be that fragile

@homar
Copy link
Member Author

homar commented Oct 4, 2022

in io.trino.filesystem.hdfs.HdfsOutputFile#create(boolean) you can just check if FileSystem is TrinoS3FileSystem

I can't because TrinoS3FileSystem belongs to trino-hive module that depends on trino-filesystem to which HdfsOutputFile belongs. So HdfsOutputFile can't see TrinoS3FileSystem so there is no way to push AggregatedMemoryContext down

@homar homar force-pushed the homar/account_for_memory_allocated_by_TrinoS3StagingOutputStream branch 3 times, most recently from e3c9d0a to 247bcee Compare October 5, 2022 09:20
@sopel39
Copy link
Member

sopel39 commented Oct 5, 2022

I can't because TrinoS3FileSystem belongs to trino-hive module that depends on trino-filesystem to which HdfsOutputFile belongs. So HdfsOutputFile can't see TrinoS3FileSystem so there is no way to push AggregatedMemoryContext down

You can add MemoryAwareFileSystem interface and make TrinoS3FileSystem extend it.
The point is that such approach would be testable (e.g. you can add a test that io.trino.filesystem.hdfs.HdfsOutputFile with S3 path is actually memory aware)

@homar
Copy link
Member Author

homar commented Oct 5, 2022

I can't because TrinoS3FileSystem belongs to trino-hive module that depends on trino-filesystem to which HdfsOutputFile belongs. So HdfsOutputFile can't see TrinoS3FileSystem so there is no way to push AggregatedMemoryContext down

You can add MemoryAwareFileSystem interface and make TrinoS3FileSystem extend it. The point is that such approach would be testable (e.g. you can add a test that io.trino.filesystem.hdfs.HdfsOutputFile with S3 path is actually memory aware)

That is not the only problem. Those create methods from TrinoOutputFile are used not only by writers but also in other places like io.trino.filesystem.fileio.ForwardingOutputFile. And in some of these places, like the one mentioned or S3TransactionLogSynchronizer it is not really feasible to obtain instance of AggregatedMemoryContext to pass it down. I know I can create them in place using newRootAggregatedMemoryContext but then nobody will read them in some execution paths so it will look badly imho. Or am I missing something like only adding these 2 methods mentioned by you so the TrinoOutputFile interface would like:

 OutputStream create(AggregatedMemoryContext context);

 OutputStream createOrOverwrite(AggregatedMemoryContext context)

 OutputStream create();

 OutputStream createOrOverwrite();

and simply creating instances of AggregatedMemoryContext for methods that don't accept them - though they theirs values won't be ever read

@homar homar force-pushed the homar/account_for_memory_allocated_by_TrinoS3StagingOutputStream branch 4 times, most recently from 571ff0d to 81ae13f Compare October 6, 2022 22:16
@homar
Copy link
Member Author

homar commented Oct 7, 2022

@sopel39 please take another look.
There are some things I am not sure about - like what I did in

    public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)

in TrinoS3FileSystem

@homar homar requested a review from sopel39 October 10, 2022 15:06
return environment.doAs(context.getIdentity(), () -> fileSystem.create(file, overwrite));
}

public static FileSystem getRawFileSystem(FileSystem fileSystem)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

import java.io.IOException;
import java.io.OutputStream;

public interface TrinoOutputFile
{
OutputStream create()
OutputStream create(AggregatedMemoryContext aggregatedMemoryContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregatedMemoryContext aggregatedMemoryContext -> AggregatedMemoryContext memoryContext

throws IOException;

OutputStream createOrOverwrite()
OutputStream createOrOverwrite(AggregatedMemoryContext aggregatedMemoryContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregatedMemoryContext aggregatedMemoryContext -> AggregatedMemoryContext memoryContext

@@ -40,27 +43,39 @@ public HdfsOutputFile(String path, HdfsEnvironment environment, HdfsContext cont
}

@Override
public OutputStream create()
public OutputStream create(AggregatedMemoryContext aggregatedMemoryContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregatedMemoryContext aggregatedMemoryContext -> AggregatedMemoryContext memoryContext


public class OutputStreamOrcDataSink
implements OrcDataSink
{
private static final int INSTANCE_SIZE = toIntExact(ClassLayout.parseClass(OutputStreamOrcDataSink.class).instanceSize());

private final OutputStreamSliceOutput output;
private final AggregatedMemoryContext aggregatedMemoryContext;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregatedMemoryContext -> outputMemoryContext


public OutputStreamOrcDataSink(OutputStream outputStream)
public OutputStreamOrcDataSink(OutputStream outputStream, AggregatedMemoryContext aggregatedMemoryContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this constructor private.

Add factory methods:

OutputStreamOrcDataSink OutputStreamOrcDataSink#create(TrinoOutputFile outputFile) {
  AggregatedMemoryContext context = newSimpleAggregatedMemoryContext();
  return new OutputStreamOrcDataSink(outputFile.create(context), context);
}

/**
 * Use only when output stream memory consumption tracking is not possible.
 */
OutputStreamOrcDataSink OutputStreamOrcDataSink#create(OutputStream stream) {
  return new OutputStreamOrcDataSink(strea, newSimpleAggregatedMemoryContext());
}

this.memoryContext = requireNonNull(aggregatedMemoryContext, "aggregatedMemoryContext is null")
.newLocalMemoryContext(TrinoS3StreamingOutputStream.class.getSimpleName());
this.buffer = new byte[0];
this.memoryContext.setBytes(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

}
else {
this.buffer = new byte[0];
memoryContext.setBytes(buffer.length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: buffer.length -> 0

@@ -170,7 +174,7 @@ private IcebergFileWriter createParquetWriter(
.collect(toImmutableList());

try {
OutputStream outputStream = fileSystem.newOutputFile(outputPath).create();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it TrinoOutputFile outputFile = fileSystem.newOutputFile(outputPath)

@homar homar force-pushed the homar/account_for_memory_allocated_by_TrinoS3StagingOutputStream branch 2 times, most recently from 6ec2b56 to aef4280 Compare November 23, 2022 08:36
this.memoryContext = requireNonNull(memoryContext, "memoryContext is null");
}

public static OutputStreamOrcDataSink create(TrinoOutputFile outputFile, AggregatedMemoryContext memoryContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static constructor above OutputStreamOrcDataSink constructor

@@ -1491,7 +1503,8 @@ public TrinoS3StagingOutputStream(
File tempFile,
Consumer<PutObjectRequest> requestCustomizer,
long multiPartUploadMinFileSize,
long multiPartUploadMinPartSize)
long multiPartUploadMinPartSize,
AggregatedMemoryContext memoryContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove? you don't account for mem in TrinoS3StagingOutputStream

@@ -1634,6 +1652,9 @@ public TrinoS3StreamingOutputStream(
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.buffer = new byte[0];
this.initialBufferSize = 64;
this.memoryContext = requireNonNull(memoryContext, "memoryContext is null")
.newLocalMemoryContext(TrinoS3StreamingOutputStream.class.getSimpleName());
this.buffer = new byte[0];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer is already created above

public void testThatTrinoS3FileSystemReportsConsumedMemory()
throws IOException
{
AggregatedMemoryContext memoryContext = mock(AggregatedMemoryContext.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't really have to mock these. Use real objects (e.g io.trino.memory.context.AggregatedMemoryContext#newRootAggregatedMemoryContext with reservation handler to track max allocation

@homar homar force-pushed the homar/account_for_memory_allocated_by_TrinoS3StagingOutputStream branch 2 times, most recently from 7d0718e to 5dc2be3 Compare November 27, 2022 09:39
@homar homar requested a review from sopel39 November 27, 2022 11:34
@@ -54,7 +56,8 @@ public IcebergParquetFileWriter(
CompressionCodecName compressionCodecName,
String trinoVersion,
String outputPath,
TrinoFileSystem fileSystem)
TrinoFileSystem fileSystem,
AggregatedMemoryContext memoryContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar as for OutputStreamOrcDataSink. Create AggregatedMemoryContext locally in ParquetFileWriter.
You can then simplify calls to createParquetFileWriter

@@ -75,10 +76,11 @@ public ParquetFileWriter(
String trinoVersion,
boolean useBatchColumnReadersForVerification,
Optional<DateTimeZone> parquetTimeZone,
Optional<Supplier<ParquetDataSource>> validationInputFactory)
Optional<Supplier<ParquetDataSource>> validationInputFactory,
AggregatedMemoryContext memoryContext)
Copy link
Member

@sopel39 sopel39 Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create memory context locally and override getMemoryUsage method

OutputStream outputStream = fs.create(new Path("s3n://test-bucket/test1"), memoryContext);
outputStream.write(new byte[] {1, 2, 3, 4, 5, 6}, 0, 6);
}
assertThat(memoryReservationHandler.getReserved()).isGreaterThanOrEqualTo(64);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why reserved is not 0? Mem leak?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a memory leak. in the test I simply don't close outputStream, memory context is not closed and thus not set to 0.
I did it on purpose to be able to catche the value. But I can do it better.

fs.initialize(rootPath.toUri(), newEmptyConfiguration());
fs.setS3Client(s3);
OutputStream outputStream = fs.create(new Path("s3n://test-bucket/test1"), memoryContext);
outputStream.write(new byte[] {1, 2, 3, 4, 5, 6}, 0, 6);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close outputStream too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh I didn't notice this before answering the comment above ;) it is done now

@homar homar force-pushed the homar/account_for_memory_allocated_by_TrinoS3StagingOutputStream branch from 5dc2be3 to 650cc11 Compare November 29, 2022 22:49
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % comments

@@ -448,6 +448,13 @@
<scope>test</scope>
</dependency>

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this dependency

@@ -1691,6 +1706,8 @@ public void close()

try {
flushBuffer(true);
memoryContext.setBytes(0);
memoryContext.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just close is sufficient (no need for setBytes)

outputStream.close();
}
assertThat(memoryReservationHandler.getReserved()).isEqualTo(0);
assertThat(memoryReservationHandler.getSetValues()).contains(64L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just store max and make sure that assertThat(memoryReservationHandler.getMaxReserver).isGreatherThan(0);

Checking that 64 is part of reserved values is fragile (depends on implementation)

@homar homar force-pushed the homar/account_for_memory_allocated_by_TrinoS3StagingOutputStream branch from 650cc11 to f884858 Compare December 1, 2022 22:14
@homar homar requested a review from sopel39 December 2, 2022 09:28
@sopel39 sopel39 merged commit 21d723f into trinodb:master Dec 2, 2022
@sopel39 sopel39 mentioned this pull request Dec 2, 2022
@github-actions github-actions bot added this to the 404 milestone Dec 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Account for TrinoS3StreamingOutputStream#buffer in writer retained memory
2 participants