Skip to content

Commit

Permalink
use ObjectPool instead of ThreadLocal during encode/decode (#822)
Browse files Browse the repository at this point in the history
* 优化threadlocal

* use simple objectpool replace Cache of Threadlocal

* 缩小pool初始化大小

* 调整一下pool大小和包位置

* java encode

* java encode2

* test 优化

* retest

* retest2
  • Loading branch information
zt9788 authored Oct 7, 2023
1 parent 0b88e1a commit 45b5d8f
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,46 @@ public class JavaValueEncoder extends AbstractValueEncoder {

public static final JavaValueEncoder INSTANCE = new JavaValueEncoder(true);

private static final int INIT_BUF_SIZE = 256;
private static final int INIT_BUF_SIZE = 2048;

public JavaValueEncoder(boolean useIdentityNumber) {
super(useIdentityNumber);
}

private static ThreadLocal<WeakReference<ByteArrayOutputStream>> threadLocal =
ThreadLocal.withInitial(() -> new WeakReference<>(new ByteArrayOutputStream(INIT_BUF_SIZE)));
static ObjectPool<ByteArrayOutputStream> bosPool = new ObjectPool<>(16, new ObjectPool.ObjectFactory<ByteArrayOutputStream>() {
@Override
public ByteArrayOutputStream create() {
return new ByteArrayOutputStream(INIT_BUF_SIZE);
}

@Override
public void reset(ByteArrayOutputStream obj) {
obj.reset();
}
});

@Override
public byte[] apply(Object value) {
ByteArrayOutputStream bos = null;
try {
WeakReference<ByteArrayOutputStream> ref = threadLocal.get();
ByteArrayOutputStream bos = ref.get();
if (bos == null) {
bos = new ByteArrayOutputStream(INIT_BUF_SIZE);
threadLocal.set(new WeakReference<>(bos));
}

try {
if (useIdentityNumber) {
bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 24) & 0xFF);
bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 16) & 0xFF);
bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 8) & 0xFF);
bos.write(SerialPolicy.IDENTITY_NUMBER_JAVA & 0xFF);
}


ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(value);
oos.flush();
return bos.toByteArray();
} finally {
bos.reset();
bos = bosPool.borrowObject();
if (useIdentityNumber) {
bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 24) & 0xFF);
bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 16) & 0xFF);
bos.write((SerialPolicy.IDENTITY_NUMBER_JAVA >> 8) & 0xFF);
bos.write(SerialPolicy.IDENTITY_NUMBER_JAVA & 0xFF);
}
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(value);
oos.flush();
return bos.toByteArray();
} catch (IOException e) {
StringBuilder sb = new StringBuilder("Java Encode error. ");
sb.append("msg=").append(e.getMessage());
throw new CacheEncodeException(sb.toString(), e);
}finally {
if(bos != null)
bosPool.returnObject(bos);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@ public Object doApply(byte[] buffer) {
in = new ByteArrayInputStream(buffer);
}
Input input = new Input(in);
Kryo kryo = (Kryo) Kryo5ValueEncoder.kryoThreadLocal.get()[0];
ClassLoader classLoader = Kryo5ValueDecoder.class.getClassLoader();
Thread t = Thread.currentThread();
if (t != null) {
ClassLoader ctxClassLoader = t.getContextClassLoader();
if (ctxClassLoader != null) {
classLoader = ctxClassLoader;
Kryo5ValueEncoder.Kryo5Cache kryoCache = null;
try {
kryoCache = Kryo5ValueEncoder.kryoCacheObjectPool.borrowObject();
Kryo kryo = kryoCache.getKryo();
ClassLoader classLoader = Kryo5ValueDecoder.class.getClassLoader();
Thread t = Thread.currentThread();
if (t != null) {
ClassLoader ctxClassLoader = t.getContextClassLoader();
if (ctxClassLoader != null) {
classLoader = ctxClassLoader;
}
}
kryo.setClassLoader(classLoader);
return kryo.readClassAndObject(input);
}finally {
if(kryoCache != null){
Kryo5ValueEncoder.kryoCacheObjectPool.returnObject(kryoCache);
}
}
kryo.setClassLoader(classLoader);
return kryo.readClassAndObject(input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import com.esotericsoftware.kryo.kryo5.io.Output;
import com.esotericsoftware.kryo.kryo5.serializers.CompatibleFieldSerializer;

import java.lang.ref.WeakReference;

/**
* Created on 2016/10/4.
*
Expand All @@ -16,53 +14,63 @@ public class Kryo5ValueEncoder extends AbstractValueEncoder {

public static final Kryo5ValueEncoder INSTANCE = new Kryo5ValueEncoder(true);

private static int INIT_BUFFER_SIZE = 256;

static ThreadLocal<Object[]> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.setDefaultSerializer(CompatibleFieldSerializer.class);
kryo.setRegistrationRequired(false);
private static final int INIT_BUFFER_SIZE = 2048;

Output output = new Output(INIT_BUFFER_SIZE, -1);
//Default size = 32K
static ObjectPool<Kryo5Cache> kryoCacheObjectPool = new ObjectPool<>(16, new ObjectPool.ObjectFactory<Kryo5Cache>() {
@Override
public Kryo5Cache create() {
return new Kryo5Cache();
}

WeakReference<Output> ref = new WeakReference<>(output);
return new Object[]{kryo, ref};
@Override
public void reset(Kryo5Cache obj) {
obj.getKryo().reset();
obj.getOutput().reset();
}
});

public static class Kryo5Cache {
final Output output;
final Kryo kryo;
public Kryo5Cache(){
kryo = new Kryo();
kryo.setDefaultSerializer(CompatibleFieldSerializer.class);
kryo.setRegistrationRequired(false);
output = new Output(INIT_BUFFER_SIZE, -1);
}

public Output getOutput(){
return output;
}

public Kryo getKryo(){
return kryo;
}

}

public Kryo5ValueEncoder(boolean useIdentityNumber) {
super(useIdentityNumber);
}

@Override
public byte[] apply(Object value) {
Kryo5Cache kryoCache = null;
try {
Object[] kryoAndBuffer = kryoThreadLocal.get();
Kryo kryo = (Kryo) kryoAndBuffer[0];
WeakReference<Output> ref = (WeakReference<Output>) kryoAndBuffer[1];
Output output = ref.get();
if (output == null) {
output = new Output(INIT_BUFFER_SIZE, -1);
}

try {
if (useIdentityNumber) {
writeInt(output, SerialPolicy.IDENTITY_NUMBER_KRYO5);
}
kryo.reset();
kryo.writeClassAndObject(output, value);
return output.toBytes();
} finally {
//reuse buffer if possible
output.reset();
if (ref.get() == null) {
ref = new WeakReference<>(output);
kryoAndBuffer[1] = ref;
}
kryoCache = kryoCacheObjectPool.borrowObject();
if (useIdentityNumber) {
writeInt(kryoCache.getOutput(), SerialPolicy.IDENTITY_NUMBER_KRYO5);
}
kryoCache.getKryo().writeClassAndObject(kryoCache.getOutput(), value);
return kryoCache.getOutput().toBytes();
} catch (Exception e) {
StringBuilder sb = new StringBuilder("Kryo Encode error. ");
sb.append("msg=").append(e.getMessage());
throw new CacheEncodeException(sb.toString(), e);
}finally {
if(kryoCache != null)
kryoCacheObjectPool.returnObject(kryoCache);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@ public Object doApply(byte[] buffer) {
in = new ByteArrayInputStream(buffer);
}
Input input = new Input(in);
Kryo kryo = (Kryo) KryoValueEncoder.kryoThreadLocal.get()[0];
ClassLoader classLoader = KryoValueDecoder.class.getClassLoader();
Thread t = Thread.currentThread();
if (t != null) {
ClassLoader ctxClassLoader = t.getContextClassLoader();
if (ctxClassLoader != null) {
classLoader = ctxClassLoader;
KryoValueEncoder.KryoCache kryoCache = null;
try {
kryoCache = KryoValueEncoder.kryoCacheObjectPool.borrowObject();
Kryo kryo = kryoCache.getKryo();
ClassLoader classLoader = KryoValueDecoder.class.getClassLoader();
Thread t = Thread.currentThread();
if (t != null) {
ClassLoader ctxClassLoader = t.getContextClassLoader();
if (ctxClassLoader != null) {
classLoader = ctxClassLoader;
}
}
kryo.setClassLoader(classLoader);
return kryo.readClassAndObject(input);
}finally {
if(kryoCache != null){
KryoValueEncoder.kryoCacheObjectPool.returnObject(kryoCache);
}
}
kryo.setClassLoader(classLoader);
return kryo.readClassAndObject(input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;

import java.lang.ref.WeakReference;
import java.util.Arrays;

/**
* Created on 2016/10/4.
Expand All @@ -16,53 +16,64 @@ public class KryoValueEncoder extends AbstractValueEncoder {

public static final KryoValueEncoder INSTANCE = new KryoValueEncoder(true);

private static int INIT_BUFFER_SIZE = 256;
private static final int INIT_BUFFER_SIZE = 2048;

static ThreadLocal<Object[]> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.setDefaultSerializer(CompatibleFieldSerializer.class);
// kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
// kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));

byte[] buffer = new byte[INIT_BUFFER_SIZE];
//Default size = 32K
static ObjectPool<KryoCache> kryoCacheObjectPool = new ObjectPool<>(16, new ObjectPool.ObjectFactory<KryoCache>() {
@Override
public KryoCache create() {
return new KryoCache();
}

WeakReference<byte[]> ref = new WeakReference<>(buffer);
return new Object[]{kryo, ref};
@Override
public void reset(KryoCache obj) {
obj.getKryo().reset();
obj.getOutput().clear();
}
});

public static class KryoCache {
final Output output;
final Kryo kryo;
public KryoCache(){
kryo = new Kryo();
kryo.setDefaultSerializer(CompatibleFieldSerializer.class);
byte[] buffer = new byte[INIT_BUFFER_SIZE];
output = new Output(buffer, -1);
}

public Output getOutput(){
return output;
}
public Kryo getKryo(){
return kryo;
}
}

public KryoValueEncoder(boolean useIdentityNumber) {
super(useIdentityNumber);
}

@Override
public byte[] apply(Object value) {
KryoCache kryoCache = null;
try {
Object[] kryoAndBuffer = kryoThreadLocal.get();
Kryo kryo = (Kryo) kryoAndBuffer[0];
WeakReference<byte[]> ref = (WeakReference<byte[]>) kryoAndBuffer[1];
byte[] buffer = ref.get();
if (buffer == null) {
buffer = new byte[INIT_BUFFER_SIZE];
}
Output output = new Output(buffer, -1);

try {
if (useIdentityNumber) {
writeInt(output, SerialPolicy.IDENTITY_NUMBER_KRYO4);
}
kryo.writeClassAndObject(output, value);
return output.toBytes();
} finally {
//reuse buffer if possible
if (ref.get() == null || buffer != output.getBuffer()) {
ref = new WeakReference<>(output.getBuffer());
kryoAndBuffer[1] = ref;
}
kryoCache = kryoCacheObjectPool.borrowObject();
// Output output = new Output(kryoCache.getBuffer(), -1);
// output.clear();
Output output = kryoCache.getOutput();
if (useIdentityNumber) {
writeInt(output, SerialPolicy.IDENTITY_NUMBER_KRYO4);
}
kryoCache.getKryo().writeClassAndObject(output, value);
return output.toBytes();
} catch (Exception e) {
StringBuilder sb = new StringBuilder("Kryo Encode error. ");
sb.append("msg=").append(e.getMessage());
throw new CacheEncodeException(sb.toString(), e);
} finally {
if(kryoCache != null)
kryoCacheObjectPool.returnObject(kryoCache);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.alicp.jetcache.support;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ArrayBlockingQueue;

/**
* @Description
* @author: zhangtong
* @create: 2023/10/6 3:27 PM
*/
public class ObjectPool<T> {
private final ArrayBlockingQueue<T> queue;
private final int size;
private final ObjectFactory<T> factory;
private static final Logger logger = LoggerFactory.getLogger(ObjectPool.class);

public ObjectPool(int size, ObjectFactory<T> factory) {
this.size = size;
this.factory = factory;
queue = new ArrayBlockingQueue<>(size);
for (int i = 0; i < size; i++) {
queue.add(factory.create());
}
logger.debug("Init the object pool with size {}", size);
}

public T borrowObject() {
T t = queue.poll();
if(t == null) {
logger.debug("The pool is not enough, create a new object");
return factory.create();
}
return t;
}

public void returnObject(T obj) {
if (obj == null) {
return;
}
factory.reset(obj);
queue.offer(obj);
}

public interface ObjectFactory<T> {
T create();
void reset(T obj);
}
}
Loading

0 comments on commit 45b5d8f

Please sign in to comment.