Skip to content

Commit

Permalink
Close Zstd Dictionary after execution to avoid any memory leak. (#9403)…
Browse files Browse the repository at this point in the history
… (#9424)

* Close Dictionary after every execution to avoid any memory leak



* Close Dictionary after every execution to avoid any memory leak



* Add changelog



---------


(cherry picked from commit 5cc7313)

Signed-off-by: Mohit Godwani <mgodwan@amazon.com>
Signed-off-by: Andrew Ross <andrross@amazon.com>
Co-authored-by: Mohit Godwani <81609427+mgodwan@users.noreply.github.com>
  • Loading branch information
andrross and mgodwan authored Aug 18, 2023
1 parent 98946b9 commit c7fcd71
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Fixed
- Fix flaky ResourceAwareTasksTests.testBasicTaskResourceTracking test ([#8993](https://github.com/opensearch-project/OpenSearch/pull/8993))
- Fix memory leak when using Zstd Dictionary ([#9403](https://github.com/opensearch-project/OpenSearch/pull/9403))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ private void compress(byte[] bytes, int offset, int length, DataOutput out) thro

// dictionary compression first
doCompress(bytes, offset, dictLength, cctx, out);
cctx.loadDict(new ZstdDictCompress(bytes, offset, dictLength, compressionLevel));
try (ZstdDictCompress dictCompress = new ZstdDictCompress(bytes, offset, dictLength, compressionLevel)) {
cctx.loadDict(dictCompress);

for (int start = offset + dictLength; start < end; start += blockLength) {
int l = Math.min(blockLength, end - start);
doCompress(bytes, start, l, cctx, out);
for (int start = offset + dictLength; start < end; start += blockLength) {
int l = Math.min(blockLength, end - start);
doCompress(bytes, start, l, cctx, out);
}
}
}
}
Expand Down Expand Up @@ -170,32 +172,33 @@ public void decompress(DataInput in, int originalLength, int offset, int length,

// decompress dictionary first
doDecompress(in, dctx, bytes, dictLength);

dctx.loadDict(new ZstdDictDecompress(bytes.bytes, 0, dictLength));

int offsetInBlock = dictLength;
int offsetInBytesRef = offset;

// Skip unneeded blocks
while (offsetInBlock + blockLength < offset) {
final int compressedLength = in.readVInt();
in.skipBytes(compressedLength);
offsetInBlock += blockLength;
offsetInBytesRef -= blockLength;
try (ZstdDictDecompress dictDecompress = new ZstdDictDecompress(bytes.bytes, 0, dictLength)) {
dctx.loadDict(dictDecompress);

int offsetInBlock = dictLength;
int offsetInBytesRef = offset;

// Skip unneeded blocks
while (offsetInBlock + blockLength < offset) {
final int compressedLength = in.readVInt();
in.skipBytes(compressedLength);
offsetInBlock += blockLength;
offsetInBytesRef -= blockLength;
}

// Read blocks that intersect with the interval we need
while (offsetInBlock < offset + length) {
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength);
int l = Math.min(blockLength, originalLength - offsetInBlock);
doDecompress(in, dctx, bytes, l);
offsetInBlock += blockLength;
}

bytes.offset = offsetInBytesRef;
bytes.length = length;

assert bytes.isValid() : "decompression output is corrupted";
}

// Read blocks that intersect with the interval we need
while (offsetInBlock < offset + length) {
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength);
int l = Math.min(blockLength, originalLength - offsetInBlock);
doDecompress(in, dctx, bytes, l);
offsetInBlock += blockLength;
}

bytes.offset = offsetInBytesRef;
bytes.length = length;

assert bytes.isValid() : "decompression output is corrupted";
}
}

Expand Down

0 comments on commit c7fcd71

Please sign in to comment.