Skip to content

Commit

Permalink
Add findStaleMessages() API
Browse files Browse the repository at this point in the history
Attempts to return the items present in the local queue shard
but not in the hashmap, if any.

(Ideally, we would not require this function, however, in some
configurations, especially with multi-region write traffic sharing
the same queue, we may find ourselves with stale items in the queue
shards)
  • Loading branch information
smukil committed Jan 23, 2020
1 parent de3787f commit 5b25b69
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ public interface DynoQueue extends Closeable {
public void processUnacks();
public void atomicProcessUnacks();

/**
*
* Attempts to return the items present in the local queue shard but not in the hashmap, if any.
* (Ideally, we would not require this function, however, in some configurations, especially with multi-region write
* traffic sharing the same queue, we may find ourselves with stale items in the queue shards)
*
* @return List of stale messages IDs.
*/
public List<Message> findStaleMessages();

/*
* <=== Begin unsafe* functions. ===>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,70 @@ public void processUnacks() {

}

@Override
public List<Message> findStaleMessages() {
return execute("findStaleMessages", localQueueShard, () -> {

List<Message> stale_msgs = new ArrayList<>();

int batchSize = 1_000;

double now = Long.valueOf(clock.millis()).doubleValue();
long num_stale = 0;

Set<String> elems = nonQuorumConn.zrangeByScore(localQueueShard, 0, now, 0, batchSize);

if (elems.size() == 0) {
return stale_msgs;
}

String findStaleMsgsScript = "local hkey=KEYS[1]\n" +
"local queue_shard=ARGV[1]\n" +
"local unack_shard=ARGV[2]\n" +
"local num_msgs=ARGV[3]\n" +
"\n" +
"local stale_msgs={}\n" +
"local num_stale_idx = 1\n" +
"for i=0,num_msgs-1 do\n" +
" local msg_id=ARGV[4+i]\n" +
"\n" +
" local exists_hash = redis.call('hget', hkey, msg_id)\n" +
" local exists_queue = redis.call('zscore', queue_shard, msg_id)\n" +
" local exists_unack = redis.call('zscore', unack_shard, msg_id)\n" +
"\n" +
" if (exists_hash and exists_queue) then\n" +
" elseif (not (exists_unack)) then\n" +
" stale_msgs[num_stale_idx] = msg_id\n" +
" num_stale_idx = num_stale_idx + 1\n" +
" end\n" +
"end\n" +
"\n" +
"return stale_msgs\n";

String unackKey = getUnackKey(queueName, shardName);
ImmutableList.Builder builder = ImmutableList.builder();
builder.add(localQueueShard);
builder.add(unackKey);
builder.add(Integer.toString(elems.size()));
for (String msg : elems) {
builder.add(msg);
}

ArrayList<String> stale_msg_ids = (ArrayList) ((DynoJedisClient)quorumConn).eval(findStaleMsgsScript, Collections.singletonList(messageStoreKey), builder.build());
num_stale = stale_msg_ids.size();
if (num_stale > 0) {
logger.info("findStaleMsgs(): Found " + num_stale + " messages present in queue but not in hashmap");
}

for (String m : stale_msg_ids) {
Message msg = new Message();
msg.setId(m);
stale_msgs.add(msg);
}
return stale_msgs;
});
}

@Override
public void atomicProcessUnacks() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ public void atomicProcessUnacks() {
throw new UnsupportedOperationException();
}

@Override
public List<Message> findStaleMessages() { throw new UnsupportedOperationException(); }

@Override
public boolean atomicRemove(String messageId) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,9 @@ public void atomicProcessUnacks() {
throw new UnsupportedOperationException();
}

@Override
public List<Message> findStaleMessages() { throw new UnsupportedOperationException(); }

@Override
public boolean atomicRemove(String messageId) {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit 5b25b69

Please sign in to comment.