-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
job-info: support new update-lookup and update-watch service #5467
job-info: support new update-lookup and update-watch service #5467
Conversation
39aaf83
to
1a8e4c4
Compare
As I think about this more, is the caching critically important? It would definitely save some unnecessary lookups. Lemme see how hard it'll be to add right now. Note: I think such support would be in a follow up PR, lets look at this PR as "first round implementation". |
1b75bba
to
e01ce52
Compare
although a few parts were a little tricky, wasn't horrible to add the "multiple watchers under 1 eventlog-watch" support. The commits could be squashed in at a later time, but for ease, I left the separate commits in so the review (can maybe) be easier. |
src/modules/job-info/update.c
Outdated
* to: | ||
* - execution.expiration | ||
*/ | ||
if (!streq (key, "execution.expiration")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the other PR, the actual key is just expiration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, I now see the resource-update
context is formatted differently in RFC21 than jobspec-update
.
Did we have a specific reason for that? It seems possibly unwise long term for both contexts to be formatted differently? On the other hand I can understand that R
should have far more limited update possibilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we have a specific reason for that? It seems possibly unwise long term for both contexts to be formatted differently?
I think it was done this way because jobspec and R are actually quite different - R is not a simple hierarchical document like jobspec. E.g., when grow/shrink or similar are added, it will be more useful to have the added or deleted R and not a wholesale replacement of execution.R_lite
.
Do users want to see the whole history starting with the original R, or just current R + updates? If the latter, would it make sense to have this RPC check the streaming bit, and if not set, return the current R and done? Then maybe #5464 and other users like sched hello in #5472 wouldn't need to replay the eventlog on their own? (I haven't looked at this - just basing this comment on your summary in #5472 - so sorry if it's off base) Possibly the method name would need adjustment then. |
The assumption in this PR is the latter. It returns R in its current form (R + updates currently in eventlog), and then streams further updates to R that occur later. I left whole history as a future option / flag, as it wasn't clear there was a use case at the moment (other than "would like to see it just to see it").
That isn't a terrible idea, makes sense. I guess the (original) theory was that whoever called this service would also want to know if R ever changed in the future. But I'm guessing there are a few cases where that's not needed? And #5464 could totally use this service. I initially just mirrored the "jobspec" equivalent code in #5464 because the idea was not put too much burden on the broker service for parsing the eventlog a bunch. If we wanted to go down the path where we put the burden on the |
e01ce52
to
2e3c1da
Compare
re-pushed, updating "execution.expiration" to "expiration", as I had misread the RFC |
2e3c1da
to
80e2003
Compare
Hmmm, got this in one builder (gcc-12,content-s3,distcheck) with one of my new tests. I have no idea how an EAGAIN could even happen. Will have to think about this. Edit: noticed a minor && chain issue below. Dunno if some raciness with the background processes.
|
Stepping back a bit, there are two use cases here:
|
bbca20b
to
45f7e68
Compare
per offline discussion, re-worked this PR.
because of all these changes, I had to squash my previous changes, so unfortunately there's just one giant commit adding |
45f7e68
to
e45ce00
Compare
hmmm several builders are consistently failing with
under the |
My guess is the docker manifest for that repo is corrupted. Same issue in flux-sched. |
e45ce00
to
287b501
Compare
287b501
to
9885340
Compare
Problem: In t2231-job-info-eventlog-watch.t, there is a missing pipe between an echo of a json payload and the 'rpc' helper tool, so a payload was not actually being sent. The test was just echoing the name of the command. Add a pipe between the echo and 'rpc' helper tool to initiate the correct payload and test.
Problem: A test in t2231-job-info-eventlog-watch.t does not set the streaming flag in an invalid input test. This test happens to work because the code's "is the input valid" check is done before the "is the streaming flag set" test. But for full correctness, the streaming flag should be set. Solution: Use the 'rpc_stream' tool in the test, which sets the streaming flag.
Problem: In several headers in job-info, the comment indicating the end of a ifdef block had the wrong macro name. Correct the macro name in the comments.
Problem: A local function in job-info was not declared static. Make it static.
Problem: It would be convenient if the several ctx destroy functions preserved errno so it doesn't have to be done with a "save_errno" pattern in cleanup paths. Preserve errno in ctx destroy functions. Cleanup code in a few locations where possible.
Problem: In guest_watch.c there is a utility function called guest_msg_pack() that allows us to create a message using specific credentials from another message. It's useful when messages are sent within the module and you want the original user's request credentials to be copied into those new messages. This function could be useful in future work. Solution: Move the function into a new internal util library and rename it cred_msg_pack().
Codecov Report
@@ Coverage Diff @@
## master #5467 +/- ##
==========================================
- Coverage 83.51% 83.46% -0.05%
==========================================
Files 485 487 +2
Lines 81543 81918 +375
==========================================
+ Hits 68099 68375 +276
- Misses 13444 13543 +99
|
Problem: In watch and guest_watch, there are some common eventlog parsing helper code that could be useful in future job-info additions. Solution: Move the helper function eventlog_parse_next() into util and add a new helper function eventlog_parse_entry_chunk(). Update callers in watch.c and guest_watch.c appropriately.
Problem: In the future, several services will need to know a job's resources and know the updates that would apply to them. This would currently require users to read R, read the eventlog, and then apply `resource-update` events to R. Some other users would also need to know when there are changes to R, necessitating watching the eventlog for future resource-update changes. It would be nice if a service did this as there will be multiple users. Solution: Support a new job-info.update-lookup service and job-info.update-watch streaming service. It currently supports only the key "R", but can be extended to other keys in the future. The job-info.update-lookup service will read R and the eventlog for a job. it then apples all resource-update changes to R and returns the result. job-info.update-watch service will do the same as the above, but if the job is not completed, it will continue to watch the eventlog for future resource-update events. On each new resource-update event, a new R will be streamed back to the caller. This continues until the job ends or the caller cancels the stream. Fixes flux-framework#5451
Problem: There is no coverage for the new job-info.update-lookup and job-info.update-watch services. Add tests in the new t2233-job-info-update.t file and add helper tools update_lookup and update_watch_stream.
9885340
to
a4fc87b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been basing work to handle R updates in the job-exec service and job shell on this PR, and this seems to be working. My inclination is to merge this PR now and make any needed changes and improvements if necessary as we need them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a few things but actually, if this is working, maybe it's fine to merge as is and open an issue to return and clean up the two bigger things I called out
- use eventlog class to parse eventlogs
- use
flux_msglist_disconnect()
andflux_msglist_cancel()
instead of duplicating that functionality.
Since it may take a bit of time to audit that code and make sure those changes don't break anything, merge now+fix later may be more expedient.
if (!(payload = json_vpack_ex (NULL, 0, fmt, ap))) | ||
goto error; | ||
if (!(payloadstr = json_dumps (payload, JSON_COMPACT))) { | ||
errno = ENOMEM; | ||
goto error; | ||
} | ||
if (flux_msg_set_string (newmsg, payloadstr) < 0) | ||
goto error; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is just moved in this PR not created, but maybe for later:
The json_vpack_ex()
+ json_dumps()
+ flux_msg_set_string()
could be replaced with one call to flux_msg_vpack()
.
@@ -708,19 +708,6 @@ static int main_namespace_lookup (struct guest_watch_ctx *gw) | |||
return rv; | |||
} | |||
|
|||
static bool eventlog_parse_next (const char **pp, const char **tok, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Three comments on this:
- use a different function prefix for local helpers since
eventlog_
is already used by the external eventlog class - would it be better to use
eventlog_decode()
from that class to parse input into a JSON array and then iterate over that? - is a "chunk" the same as an "eventlog entry"? Just call it that if so.
@@ -170,6 +170,8 @@ job_info_la_SOURCES = \ | |||
job-info/watch.c \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commit mesage for job-info: support new update-lookup/watch service
: capitalize "it" in the 4th paragraph of the body.
uc->type = type; | ||
uc->id = id; | ||
if (!(uc->key = strdup (key))) { | ||
errno = ENOMEM; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strdup already sets ENOMEM on failure
while (eventlog_parse_next (&input, &tok, &toklen)) { | ||
json_t *entry = NULL; | ||
const char *name; | ||
json_t *context = NULL; | ||
if (eventlog_parse_entry_chunk (uc->ctx->h, | ||
tok, | ||
toklen, | ||
&entry, | ||
&name, | ||
&context) < 0) { | ||
errmsg = "error parsing eventlog"; | ||
goto error; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, as noted earlier, it looks like this could be replaced with (with appropriate error handling)
eventlog = eventlog_decode (eventlog_str);
json_array_foreach (eventlog, index, entry) {
eventlog_entry_parse (entry, NULL, &name, &context);
...
}
if (flux_respond_error (uc->ctx->h, cmpmsg, ENODATA, NULL) < 0) | ||
flux_log_error (uc->ctx->h, | ||
"%s: flux_respond_error", | ||
__FUNCTION__); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Response should only be sent on cancel, not disconnect.
|
||
uc = zlist_first (ctx->update_watchers); | ||
while (uc) { | ||
update_watch_cancel (uc, msg, cancel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would be better to replace update_watch_cancel()
with
if (cancel)
flux_msglist_cancel (ctx->h, uc->msglist, msg);
else
flux_msglist_disconnect (ctx->h, uc->msglist, msg);
I've just set MWP on behalf of @chu11 |
per discussion in #5451
Problem: In the future, several services will need to know a job's resources and know the updates that would apply to them.
This would currently require users to read R, watch the eventlog, and then apply
resource-update
events as they happen. It would be nice if a service did this as there will be multiple users.Solution: Support a new job-info.update-watch streaming service. It currently supports only the key "R", but can be extended to other keys in the future.
The service will read R and the eventlog for a job and apply all resource-update changes as needed. This initial "R" will sent back to the caller. If the job has completed, the RPC streaming service ends. If not, the eventlog will be watched for future resource-update events. On each new resource-update event, a new R will be streamed back to the caller. This continues until the job ends or the caller cancels the stream.
Notes:
It might be wise to add some type of "caching" of sorts as multiple callers may want to watch the same job's R value. I didn't do that at the moment. I figured that could be an optimization for the future.
per discussion in Idea: job-info: add streaming RPC to "watch" R #5451 future options could include a "uniq" option as a "full history" option.