Skip to content

Commit

Permalink
MSET and MSETNX (#206)
Browse files Browse the repository at this point in the history
* MSET and MSETNX

* adding support for multi set compression

* review comments
  • Loading branch information
ipapapa authored Feb 17, 2018
1 parent 681578e commit 6f4ff28
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.netflix.dyno.connectionpool;
/**
*
* Interface to be used for multi key operations, i.e.
* taking as input an vararg like String...
*
*
* @author ipapapa
*
* @param <CL>
* @param <R>
*/
public interface MultiKeyCompressionOperation<CL, R> extends Operation<CL, R> {

String[] compressMultiKeyValue(ConnectionContext ctx, String... value);

String decompressValue(ConnectionContext ctx, String value);
}
116 changes: 84 additions & 32 deletions dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class DynoJedisClient implements JedisCommands, BinaryJedisCommands, Mult
private final String clusterName;
private final ConnectionPool<Jedis> connPool;
private final AtomicReference<DynoJedisPipelineMonitor> pipelineMonitor = new AtomicReference<DynoJedisPipelineMonitor>();
private final EnumSet<OpName> compressionOperations = EnumSet.of(OpName.APPEND);

protected final DynoOPMonitor opMonitor;

Expand Down Expand Up @@ -241,48 +240,61 @@ public String decompressValue(String value, ConnectionContext ctx) {
* <ul>
* <lh>String Operations</lh>
* <li>{@link #mget(String...) MGET}</li>
* <li>{@link #mset(String...) MSET}</li>
* <li>{@link #msetnx(String...) MSETNX}</li>
* </ul>
*
* @param <T>
* the parameterized type
*/
private abstract class CompressionValueMultiKeyOperation<T> extends MultiKeyOperation<T>
implements CompressionOperation<Jedis, T> {
implements MultiKeyCompressionOperation<Jedis, T> {

private CompressionValueMultiKeyOperation(List<String> keys, OpName o) {
super(keys, o);
}

/**
* Compresses the value based on the threshold defined by
* Accepts a set of keys and values and compresses the value based on the threshold defined by
* {@link ConnectionPoolConfiguration#getValueCompressionThreshold()}
*
* @param value
* @param ctx and keyValues
* @return
*/
@Override
public String compressValue(String value, ConnectionContext ctx) {
String result = value;
int thresholdBytes = connPool.getConfiguration().getValueCompressionThreshold();

try {
// prefer speed over accuracy here so rather than using
// getBytes() to get the actual size
// just estimate using 2 bytes per character
if ((2 * value.length()) > thresholdBytes) {
result = ZipUtils.compressStringToBase64String(value);
ctx.setMetadata("compression", true);
}
} catch (IOException e) {
Logger.warn(
"UNABLE to compress [" + value + "] for key [" + getStringKey() + "]; sending value uncompressed");
}

return result;
public String[] compressMultiKeyValue(ConnectionContext ctx, String... keyValues) {
List<String> items = Arrays.asList(keyValues);
List<String> newItems = new ArrayList<String>();

for (int i = 0 ; i < items.size() ; i++) {
/*
* String... keyValues is a List of keys and values.
* The value always comes second and this is the one
* we want to compress.
*/
if(i % 2 == 0 ) {
String value = items.get(i);

try {
if ((2 * value.length()) > connPool.getConfiguration().getValueCompressionThreshold()) {
newItems.add(i, ZipUtils.compressStringToBase64String(value));
ctx.setMetadata("compression", true);
}

} catch (IOException e) {
Logger.warn(
"UNABLE to compress [" + value + "] for key [" + getStringKey() + "]; sending value uncompressed");
}
}
else {
newItems.add(items.get(i));
}
}
return (String[]) newItems.toArray();
}

@Override
public String decompressValue(String value, ConnectionContext ctx) {
public String decompressValue(ConnectionContext ctx, String value) {
try {
if (ZipUtils.isCompressed(value)) {
ctx.setMetadata("decompression", true);
Expand Down Expand Up @@ -2573,10 +2585,6 @@ public Long execute(Jedis client, ConnectionContext state) {
});
}

@Override
public Long bitcount(String key, long start, long end) {
return d_bitcount(key, start, end).getResult();
}

@Override
public Long pfadd(String key, String... elements) {
Expand All @@ -2587,6 +2595,11 @@ public Long pfadd(String key, String... elements) {
public long pfcount(String key) {
throw new UnsupportedOperationException("not yet implemented");
}

@Override
public Long bitcount(String key, long start, long end) {
return d_bitcount(key, start, end).getResult();
}

public OperationResult<Long> d_bitcount(final String key, final Long start, final Long end) {

Expand Down Expand Up @@ -2701,22 +2714,61 @@ public List<String> execute(final Jedis client, final ConnectionContext state)
new CollectionUtils.Transform<String, String>() {
@Override
public String get(String s) {
return decompressValue(s, state);
return decompressValue(state, s);
}
}));
}
});
}
}

@Override
public Long msetnx(String... keysvalues) {
return d_msetnx(keysvalues).getResult();
}

public OperationResult<Long> d_msetnx(final String... keysvalues) {
if (CompressionStrategy.NONE == connPool.getConfiguration().getCompressionStrategy()) {

return connPool.executeWithFailover(new MultiKeyOperation<Long>(Arrays.asList(keysvalues), OpName.MSETNX) {
@Override
public Long execute(Jedis client, ConnectionContext state) {
return client.msetnx(keysvalues);
}
});
} else {
return connPool.executeWithFailover(new CompressionValueMultiKeyOperation<Long>(Arrays.asList(keysvalues), OpName.MSETNX) {
@Override
public Long execute(final Jedis client, final ConnectionContext state) {
return client.msetnx(compressMultiKeyValue(state,keysvalues));
}
});
}
}

@Override
public String mset(String... keysvalues) {
throw new UnsupportedOperationException("not yet implemented");
return d_mset(keysvalues).getResult();
}

@Override
public Long msetnx(String... keysvalues) {
throw new UnsupportedOperationException("not yet implemented");
public OperationResult<String> d_mset(final String... keysvalues) {
if (CompressionStrategy.NONE == connPool.getConfiguration().getCompressionStrategy()) {

return connPool.executeWithFailover(new MultiKeyOperation<String>(Arrays.asList(keysvalues), OpName.MSET) {
@Override
public String execute(Jedis client, ConnectionContext state) {

return client.mset(keysvalues);
}
});
} else {
return connPool.executeWithFailover(new CompressionValueMultiKeyOperation<String>(Arrays.asList(keysvalues), OpName.MSET) {
@Override
public String execute(final Jedis client, final ConnectionContext state) {
return client.mset(compressMultiKeyValue(state,keysvalues));
}
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public enum OpName {
INCR, INCRBY, INCRBYFLOAT,
KEYS, LINDEX,
LINSERT, LLEN, LPOP, LPUSH, LPUSHX, LRANGE, LREM, LSET, LTRIM,
MOVE, MGET,
MOVE, MGET, MSET, MSETNX,
PERSIST, PEXPIRE, PEXPIREAT, PSETEX, PTTL,
RENAME, RENAMENX, RESTORE, RPOP, RPOPLPUSH, RPUSH, RPUSHX,
SADD, SCAN, SCARD, SDIFF, SDIFFSTORE, SET, SETBIT, SETEX, SETNX, SETRANGE, SINTER, SINTERSTORE, SISMEMBER, SMEMBERS,
Expand Down

0 comments on commit 6f4ff28

Please sign in to comment.