Skip to content

Commit

Permalink
job-info: support new update-watch service
Browse files Browse the repository at this point in the history
Problem: In the future, several services will need to know a
job's resources and know the updates that would apply to them.
This would currently require users to read R, watch the eventlog,
and then apply `resource-update` events as they happen.  It would
be nice if a service did this as there will be multiple users.

Solution: Support a new job-info.update-watch streaming service.  It currently
supports only the key "R", but can be extended to other keys in the future.

The service will read R and the eventlog for a job and apply all resource-update
changes as needed.  This initial "R" will sent back to the caller.  If the
job has completed, the RPC streaming service ends.  If not, the eventlog
will be watched for future resource-update events.  On each new
resource-update event, a new R will be streamed back to the caller.  This
continues until the job ends or the caller cancels the stream.

Fixes flux-framework#5451
  • Loading branch information
chu11 committed Sep 22, 2023
1 parent 58ce117 commit b08a237
Show file tree
Hide file tree
Showing 5 changed files with 558 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/modules/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ job_info_la_SOURCES = \
job-info/watch.c \
job-info/guest_watch.h \
job-info/guest_watch.c \
job-info/update.h \
job-info/update.c \
job-info/util.h \
job-info/util.c
job_info_la_CPPFLAGS = \
Expand Down
24 changes: 22 additions & 2 deletions src/modules/job-info/job-info.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
#include "lookup.h"
#include "watch.h"
#include "guest_watch.h"
#include "update.h"

static void disconnect_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct info_ctx *ctx = arg;
watchers_cancel (ctx, msg, false);
guest_watchers_cancel (ctx, msg, false);
update_watchers_cancel (ctx, msg, false);
}

static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
Expand All @@ -36,10 +38,12 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
int lookups = zlist_size (ctx->lookups);
int watchers = zlist_size (ctx->watchers);
int guest_watchers = zlist_size (ctx->guest_watchers);
if (flux_respond_pack (h, msg, "{s:i s:i s:i}",
int update_watchers = zlist_size (ctx->update_watchers);
if (flux_respond_pack (h, msg, "{s:i s:i s:i s:i}",
"lookups", lookups,
"watchers", watchers,
"guest_watchers", guest_watchers) < 0) {
"guest_watchers", guest_watchers,
"update_watchers", update_watchers) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
goto error;
}
Expand All @@ -66,6 +70,16 @@ static const struct flux_msg_handler_spec htab[] = {
.cb = watch_cancel_cb,
.rolemask = FLUX_ROLE_USER
},
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-info.update-watch",
.cb = update_watch_cb,
.rolemask = FLUX_ROLE_USER
},
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-info.update-watch-cancel",
.cb = update_watch_cancel_cb,
.rolemask = FLUX_ROLE_USER
},
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-info.disconnect",
.cb = disconnect_cb,
Expand Down Expand Up @@ -96,6 +110,10 @@ static void info_ctx_destroy (struct info_ctx *ctx)
guest_watch_cleanup (ctx);
zlist_destroy (&ctx->guest_watchers);
}
if (ctx->update_watchers) {
update_watch_cleanup (ctx);
zlist_destroy (&ctx->update_watchers);
}
free (ctx);
errno = saved_errno;
}
Expand All @@ -118,6 +136,8 @@ static struct info_ctx *info_ctx_create (flux_t *h)
goto error;
if (!(ctx->guest_watchers = zlist_new ()))
goto error;
if (!(ctx->update_watchers = zlist_new ()))
goto error;
return ctx;
error:
info_ctx_destroy (ctx);
Expand Down
1 change: 1 addition & 0 deletions src/modules/job-info/job-info.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct info_ctx {
zlist_t *lookups;
zlist_t *watchers;
zlist_t *guest_watchers;
zlist_t *update_watchers;
};

#endif /* _FLUX_JOB_INFO_H */
Expand Down
Loading

0 comments on commit b08a237

Please sign in to comment.