Skip to content

Commit

Permalink
Add finalizer support to async Redis client; add libevent example tha…
Browse files Browse the repository at this point in the history
…t tests it and also demonstrates pubsub
  • Loading branch information
tudor committed Feb 19, 2021
1 parent 53a8144 commit dc9e2fa
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 13 deletions.
74 changes: 62 additions & 12 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,21 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe
c->flags |= REDIS_IN_CALLBACK;
cb->fn(ac,reply,cb->privdata);
c->flags &= ~REDIS_IN_CALLBACK;
/* Always run finalizer when calling with NULL reply. */
if (reply == NULL && cb->finalizer != NULL) {
c->flags |= REDIS_IN_FINALIZER;
cb->finalizer(ac,cb->privdata);
c->flags &= ~REDIS_IN_FINALIZER;
}
}
}

static void __redisRunFinalizer(redisAsyncContext *ac, redisCallback *cb) {
redisContext *c = &(ac->c);
if (cb->fn != NULL && cb->finalizer != NULL) {
c->flags |= REDIS_IN_FINALIZER;
cb->finalizer(ac,cb->privdata);
c->flags &= ~REDIS_IN_FINALIZER;
}
}

Expand Down Expand Up @@ -362,7 +377,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
void redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
c->flags |= REDIS_FREEING;
if (!(c->flags & REDIS_IN_CALLBACK))
if (!(c->flags & (REDIS_IN_CALLBACK | REDIS_IN_FINALIZER)))
__redisAsyncFree(ac);
}

Expand Down Expand Up @@ -406,11 +421,11 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {

/** unset the auto-free flag here, because disconnect undoes this */
c->flags &= ~REDIS_NO_AUTO_FREE;
if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
if (!(c->flags & (REDIS_IN_CALLBACK | REDIS_IN_FINALIZER)) && ac->replies.head == NULL)
__redisAsyncDisconnect(ac);
}

static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb, int* call_finalizer) {
redisContext *c = &(ac->c);
dict *callbacks;
redisCallback *cb;
Expand All @@ -421,6 +436,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,

/* Custom reply functions are not supported for pub/sub. This will fail
* very hard when they are used... */
*call_finalizer = 0;
if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
assert(reply->elements >= 2);
assert(reply->element[0]->type == REDIS_REPLY_STRING);
Expand Down Expand Up @@ -451,8 +467,10 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,

/* If this is an unsubscribe message, remove it. */
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
if (cb->pending_subs == 0)
if (cb->pending_subs == 0) {
dictDelete(callbacks,sname);
*call_finalizer = 1;
}

/* If this was the last unsubscribe message, revert to
* non-subscribe mode. */
Expand All @@ -469,6 +487,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
} else {
/* Shift callback for invalid commands. */
__redisShiftCallback(&ac->sub.invalid,dstcb);
*call_finalizer = 1;
}
return REDIS_OK;
oom:
Expand Down Expand Up @@ -502,11 +521,12 @@ static int redisIsSubscribeReply(redisReply *reply) {

void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, 0, NULL};
redisCallback cb = {NULL, NULL, NULL, 0, NULL};
void *reply = NULL;
int status;

while((status = redisGetReply(c,&reply)) == REDIS_OK) {
int call_finalizer = 0;
if (reply == NULL) {
/* When the connection is being disconnected and there are
* no more replies, this is the cue to really disconnect. */
Expand Down Expand Up @@ -564,11 +584,16 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
/* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
if(c->flags & REDIS_SUBSCRIBED)
__redisGetSubscribeCallback(ac,reply,&cb);
__redisGetSubscribeCallback(ac,reply,&cb,&call_finalizer);
} else {
call_finalizer = !(c->flags & REDIS_MONITORING);
}

if (cb.fn != NULL) {
__redisRunCallback(ac,&cb,reply);
if (call_finalizer) {
__redisRunFinalizer(ac,&cb);
}
c->reader->fn->freeObject(reply);

/* Proceed with free'ing when redisAsyncFree() was called. */
Expand Down Expand Up @@ -734,7 +759,7 @@ static const char *nextArgument(const char *start, const char **str, size_t *len
/* Helper function for the redisAsyncCommand* family of functions. Writes a
* formatted command to the output buffer and registers the provided callback
* function with the context. */
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *cmd, size_t len) {
redisContext *c = &(ac->c);
redisCallback cb;
struct dict *cbdict;
Expand All @@ -747,11 +772,13 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
sds sname;
int ret;

/* Don't accept new commands when the connection is about to be closed. */
if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
/* Don't accept new commands when the connection is about to be closed. Also, don't accept new
* commands when running a finalizer. */
if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING | REDIS_IN_FINALIZER)) return REDIS_ERR;

/* Setup callback */
cb.fn = fn;
cb.finalizer = finalizer;
cb.privdata = privdata;
cb.pending_subs = 1;

Expand Down Expand Up @@ -782,6 +809,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
if (de != NULL) {
existcb = dictGetEntryVal(de);
cb.pending_subs = existcb->pending_subs + 1;
__redisRunFinalizer(ac,existcb);
}

ret = dictReplace(cbdict,sname,&cb);
Expand Down Expand Up @@ -826,6 +854,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
}

int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
return redisvAsyncCommandWithFinalizer(ac, fn, NULL, privdata, format, ap);
}

int redisvAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, va_list ap) {
char *cmd;
int len;
int status;
Expand All @@ -835,7 +867,7 @@ int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdat
if (len < 0)
return REDIS_ERR;

status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len);
hi_free(cmd);
return status;
}
Expand All @@ -849,20 +881,38 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata
return status;
}

int redisAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, ...) {
va_list ap;
int status;
va_start(ap,format);
status = redisvAsyncCommandWithFinalizer(ac,fn,finalizer,privdata,format,ap);
va_end(ap);
return status;
}

int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
return redisAsyncCommandArgvWithFinalizer(ac, fn, NULL, privdata, argc, argv, argvlen);
}

int redisAsyncCommandArgvWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, int argc, const char **argv, const size_t *argvlen) {
sds cmd;
int len;
int status;
len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
if (len < 0)
return REDIS_ERR;
status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len);
sdsfree(cmd);
return status;
}

int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
int status = __redisAsyncCommand(ac,fn,NULL,privdata,cmd,len);
return status;
}

int redisAsyncFormattedCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *cmd, size_t len) {
int status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len);
return status;
}

Expand Down
7 changes: 7 additions & 0 deletions async.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ struct dict; /* dictionary header is included in async.c */

/* Reply callback prototype and container */
typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*);
typedef void (redisFinalizerCallback)(struct redisAsyncContext*, void*);
typedef struct redisCallback {
struct redisCallback *next; /* simple singly linked list */
redisCallbackFn *fn;
redisFinalizerCallback *finalizer;
int pending_subs;
void *privdata;
} redisCallback;
Expand Down Expand Up @@ -140,6 +142,11 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata
int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len);

int redisvAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, va_list ap);
int redisAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, ...);
int redisAsyncCommandArgvWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, int argc, const char **argv, const size_t *argvlen);
int redisAsyncFormattedCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *cmd, size_t len);

#ifdef __cplusplus
}
#endif
Expand Down
4 changes: 3 additions & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ ENDIF()

FIND_PATH(LIBEVENT event.h)
if (LIBEVENT)
ADD_EXECUTABLE(example-libevent example-libevent)
ADD_EXECUTABLE(example-libevent example-libevent.c)
TARGET_LINK_LIBRARIES(example-libevent hiredis event)
ADD_EXECUTABLE(example-libevent-pubsub example-libevent-pubsub.c)
TARGET_LINK_LIBRARIES(example-libevent-pubsub hiredis event)
ENDIF()

FIND_PATH(LIBUV uv.h)
Expand Down
Loading

0 comments on commit dc9e2fa

Please sign in to comment.