Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test improve memory usage #704

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.core.annotations.Nullable;
import org.msgpack.core.buffer.ArrayBufferInput;
import org.msgpack.core.buffer.InputStreamBufferInput;
import org.msgpack.core.buffer.MessageBufferInput;
Expand All @@ -47,8 +48,8 @@
public class MessagePackParser
extends ParserMinimalBase
{
private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder =
new ThreadLocal<Tuple<Object, MessageUnpacker>>();
private static final ThreadLocal<Triple<Object, MessageUnpacker, MessageBufferInput>> reuseObjectHolder =
new ThreadLocal<>();
private final MessageUnpacker messageUnpacker;

private static final BigInteger LONG_MIN = BigInteger.valueOf((long) Long.MIN_VALUE);
Expand Down Expand Up @@ -130,7 +131,7 @@ public MessagePackParser(
boolean reuseResourceInParser)
throws IOException
{
this(ctxt, features, new InputStreamBufferInput(in), objectCodec, in, reuseResourceInParser);
this(ctxt, features, null, in, objectCodec, in, reuseResourceInParser);
}

public MessagePackParser(IOContext ctxt, int features, ObjectCodec objectCodec, byte[] bytes)
Expand All @@ -147,12 +148,53 @@ public MessagePackParser(
boolean reuseResourceInParser)
throws IOException
{
this(ctxt, features, new ArrayBufferInput(bytes), objectCodec, bytes, reuseResourceInParser);
this(ctxt, features, bytes, null, objectCodec, bytes, reuseResourceInParser);
}

private MessageBufferInput createMessageBufferInput(
// Either of `bytes` or `in` is available
@Nullable byte[] bytes,
@Nullable InputStream in)
{
if (bytes != null) {
return new ArrayBufferInput(bytes);
}
else if (in != null) {
return new InputStreamBufferInput(in);
}
else {
throw new IllegalArgumentException("The both `bytes` and `in` are null");
}
}

private MessageBufferInput resetOrRecreateMessageBufferInput(
MessageBufferInput messageBufferInput,
// Either of `bytes` or `in` is available
@Nullable byte[] bytes,
@Nullable InputStream in)
throws IOException
{
// TODO: Revisit here
messageBufferInput.close();
if (messageBufferInput instanceof ArrayBufferInput && bytes != null) {
((ArrayBufferInput) messageBufferInput).reset(bytes);
}
else if (messageBufferInput instanceof InputStreamBufferInput && in != null) {
((InputStreamBufferInput) messageBufferInput).reset(in);
}
else {
// The existing MessageBufferInput type doesn't match the new source type.
// Recreate MessageBufferInput instance.
return createMessageBufferInput(bytes, in);
}
return messageBufferInput;
}

private MessagePackParser(IOContext ctxt,
int features,
MessageBufferInput input,
// Either of `bytes` or `in` is available
@Nullable byte[] bytes,
@Nullable InputStream in,
ObjectCodec objectCodec,
Object src,
boolean reuseResourceInParser)
Expand All @@ -167,29 +209,38 @@ private MessagePackParser(IOContext ctxt,
parsingContext = JsonReadContext.createRootContext(dups);
this.reuseResourceInParser = reuseResourceInParser;
if (!reuseResourceInParser) {
this.messageUnpacker = MessagePack.newDefaultUnpacker(input);
this.messageUnpacker = MessagePack.newDefaultUnpacker(createMessageBufferInput(bytes, in));
return;
}
else {
this.messageUnpacker = null;
}

MessageUnpacker messageUnpacker;
Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get();
if (messageUnpackerTuple == null) {
messageUnpacker = MessagePack.newDefaultUnpacker(input);
MessageBufferInput messageBufferInput;
Triple<Object, MessageUnpacker, MessageBufferInput> messageUnpackerResource = reuseObjectHolder.get();
if (messageUnpackerResource == null) {
messageBufferInput = createMessageBufferInput(bytes, in);
messageUnpacker = MessagePack.newDefaultUnpacker(messageBufferInput);
}
else {
// Considering to reuse InputStream with JsonParser.Feature.AUTO_CLOSE_SOURCE,
// MessagePackParser needs to use the MessageUnpacker that has the same InputStream
// since it has buffer which has loaded the InputStream data ahead.
// However, it needs to call MessageUnpacker#reset when the source is different from the previous one.
if (isEnabled(JsonParser.Feature.AUTO_CLOSE_SOURCE) || messageUnpackerTuple.first() != src) {
messageUnpackerTuple.second().reset(input);
if (isEnabled(Feature.AUTO_CLOSE_SOURCE) || messageUnpackerResource.first() != src) {
messageBufferInput = messageUnpackerResource.third();
messageUnpacker = messageUnpackerResource.second();

messageBufferInput = resetOrRecreateMessageBufferInput(messageBufferInput, bytes, in);
messageUnpacker.reset(messageBufferInput);
}
else {
messageBufferInput = messageUnpackerResource.third();
messageUnpacker = messageUnpackerResource.second();
}
messageUnpacker = messageUnpackerTuple.second();
}
messageUnpackerHolder.set(new Tuple<Object, MessageUnpacker>(src, messageUnpacker));
reuseObjectHolder.set(new Triple<>(src, messageUnpacker, messageBufferInput));
}

public void setExtensionTypeCustomDeserializers(ExtensionTypeCustomDeserializers extTypeCustomDesers)
Expand Down Expand Up @@ -690,10 +741,10 @@ private MessageUnpacker getMessageUnpacker()
return this.messageUnpacker;
}

Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get();
if (messageUnpackerTuple == null) {
Triple<Object, MessageUnpacker, MessageBufferInput> reuseObject = reuseObjectHolder.get();
if (reuseObject == null) {
throw new IllegalStateException("messageUnpacker is null");
}
return messageUnpackerTuple.second();
return reuseObject.second();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//
// MessagePack for Java
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.msgpack.jackson.dataformat;

public class Triple<F, S, T>
{
private final F first;
private final S second;
private final T third;

public Triple(F first, S second, T third)
{
this.first = first;
this.second = second;
this.third = third;
}

public F first()
{
return first;
}

public S second()
{
return second;
}

public T third()
{
return third;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
//
package org.msgpack.jackson.dataformat;

/**
* Created by komamitsu on 5/28/15.
*/
public class Tuple<F, S>
{
private final F first;
Expand Down