Skip to content

Commit

Permalink
Delegate pub/sub replies to the right callback
Browse files Browse the repository at this point in the history
  • Loading branch information
pietern committed Dec 31, 2010
1 parent 3ac8ef9 commit a0ebc54
Showing 1 changed file with 48 additions and 2 deletions.
50 changes: 48 additions & 2 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,53 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
__redisAsyncDisconnect(ac);
}

static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
redisContext *c = &(ac->c);
dict *callbacks;
dictEntry *de;
int pvariant;
char *stype;
sds sname;

/* Custom reply functions are not supported for pub/sub. This will fail
* very hard when they are used... */
if (reply->type == REDIS_REPLY_ARRAY) {
assert(reply->elements >= 2);
assert(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str;
pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;

if (pvariant)
callbacks = ac->sub.patterns;
else
callbacks = ac->sub.channels;

/* Locate the right callback */
assert(reply->element[1]->type == REDIS_REPLY_STRING);
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
de = dictFind(callbacks,sname);
if (de != NULL) {
memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));

/* If this is an unsubscribe message, remove it. */
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
dictDelete(callbacks,sname);

/* If this was the last unsubscribe message, revert to
* non-subscribe mode. */
assert(reply->element[2+pvariant]->type == REDIS_REPLY_INTEGER);
if (reply->element[2+pvariant]->integer == 0)
c->flags &= ~REDIS_SUBSCRIBED;
}
}
sdsfree(sname);
} else {
/* Shift callback for invalid commands. */
__redisShiftCallback(&ac->sub.invalid,dstcb);
}
return REDIS_OK;
}

void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb;
Expand All @@ -298,8 +345,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/* No more regular callbacks, the context *must* be subscribed. */
assert(c->flags & REDIS_SUBSCRIBED);

/* TODO: find the right callback for pub/sub message. */
__redisGetSubscribeCallback(ac,reply,&cb);
}

if (cb.fn != NULL) {
Expand Down

0 comments on commit a0ebc54

Please sign in to comment.