Skip to content

Commit

Permalink
job-info: support new update-lookup/watch service
Browse files Browse the repository at this point in the history
Problem: In the future, several services will need to know a job's
resources and know the updates that would apply to them.  This would
currently require users to read R, read the eventlog, and then apply
`resource-update` events to R.  Some other users would also need to
know when there are changes to R, necessitating watching the eventlog
for future resource-update changes.

It would be nice if a service did this as there will be multiple users.

Solution: Support a new job-info.update-lookup service and
job-info.update-watch streaming service.  It currently supports only
the key "R", but can be extended to other keys in the future.

The job-info.update-lookup service will read R and the eventlog for
a job.  it then apples all resource-update changes to R and returns
the result.

job-info.update-watch service will do the same as the above, but if
the job is not completed, it will continue to watch the eventlog for
future resource-update events.  On each new resource-update event,
a new R will be streamed back to the caller.  This continues until
the job ends or the caller cancels the stream.

Fixes #5451
  • Loading branch information
chu11 authored and grondo committed Oct 24, 2023
1 parent 9c37fea commit 531ada0
Show file tree
Hide file tree
Showing 5 changed files with 807 additions and 2 deletions.
3 changes: 3 additions & 0 deletions src/modules/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ job_info_la_SOURCES = \
job-info/watch.c \
job-info/guest_watch.h \
job-info/guest_watch.c \
job-info/update.h \
job-info/update.c \
job-info/util.h \
job-info/util.c
job_info_la_CPPFLAGS = \
Expand All @@ -178,6 +180,7 @@ job_info_la_CPPFLAGS = \
job_info_la_LIBADD = \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libflux-core.la \
$(top_builddir)/src/common/libjob/libjob.la \
$(JANSSON_LIBS) \
$(HWLOC_LIBS)
job_info_la_LDFLAGS = $(fluxmod_ldflags) -module
Expand Down
41 changes: 39 additions & 2 deletions src/modules/job-info/job-info.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
#include "lookup.h"
#include "watch.h"
#include "guest_watch.h"
#include "update.h"

static void disconnect_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct info_ctx *ctx = arg;
watchers_cancel (ctx, msg, false);
guest_watchers_cancel (ctx, msg, false);
update_watchers_cancel (ctx, msg, false);
}

static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
Expand All @@ -36,10 +38,14 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
int lookups = zlist_size (ctx->lookups);
int watchers = zlist_size (ctx->watchers);
int guest_watchers = zlist_size (ctx->guest_watchers);
if (flux_respond_pack (h, msg, "{s:i s:i s:i}",
int update_lookups = zlist_size (ctx->update_lookups);
int update_watchers = update_watch_count (ctx);
if (flux_respond_pack (h, msg, "{s:i s:i s:i s:i s:i}",
"lookups", lookups,
"watchers", watchers,
"guest_watchers", guest_watchers) < 0) {
"guest_watchers", guest_watchers,
"update_lookups", update_lookups,
"update_watchers", update_watchers) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
goto error;
}
Expand All @@ -66,6 +72,21 @@ static const struct flux_msg_handler_spec htab[] = {
.cb = watch_cancel_cb,
.rolemask = FLUX_ROLE_USER
},
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-info.update-lookup",
.cb = update_lookup_cb,
.rolemask = FLUX_ROLE_USER
},
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-info.update-watch",
.cb = update_watch_cb,
.rolemask = FLUX_ROLE_USER
},
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-info.update-watch-cancel",
.cb = update_watch_cancel_cb,
.rolemask = FLUX_ROLE_USER
},
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-info.disconnect",
.cb = disconnect_cb,
Expand Down Expand Up @@ -96,6 +117,14 @@ static void info_ctx_destroy (struct info_ctx *ctx)
guest_watch_cleanup (ctx);
zlist_destroy (&ctx->guest_watchers);
}
if (ctx->update_lookups)
zlist_destroy (&ctx->update_lookups);
if (ctx->update_watchers) {
update_watch_cleanup (ctx);
zlist_destroy (&ctx->update_watchers);
}
if (ctx->index_uw)
zhashx_destroy (&ctx->index_uw);
free (ctx);
errno = saved_errno;
}
Expand All @@ -118,6 +147,14 @@ static struct info_ctx *info_ctx_create (flux_t *h)
goto error;
if (!(ctx->guest_watchers = zlist_new ()))
goto error;
if (!(ctx->update_lookups = zlist_new ()))
goto error;
if (!(ctx->update_watchers = zlist_new ()))
goto error;
/* no destructor for index_uw, destruction handled on
* update_watchers list */
if (!(ctx->index_uw = zhashx_new ()))
goto error;
return ctx;
error:
info_ctx_destroy (ctx);
Expand Down
4 changes: 4 additions & 0 deletions src/modules/job-info/job-info.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <flux/core.h>

#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libjob/job_hash.h"
#include "src/common/libutil/lru_cache.h"

#define OWNER_LRU_MAXSIZE 1000
Expand All @@ -25,6 +26,9 @@ struct info_ctx {
zlist_t *lookups;
zlist_t *watchers;
zlist_t *guest_watchers;
zlist_t *update_lookups;
zlist_t *update_watchers;
zhashx_t *index_uw; /* update_watchers lookup */
};

#endif /* _FLUX_JOB_INFO_H */
Expand Down
Loading

0 comments on commit 531ada0

Please sign in to comment.