Skip to content

Commit

Permalink
Refactor result watcehr scheduling
Browse files Browse the repository at this point in the history
Add max runs to limit jobs that are not scheduled indefinitely
Refactor rescheduling into own job to delay rescheduling
  • Loading branch information
buehlefs committed Mar 30, 2022
1 parent 2db323b commit d5faa0b
Showing 1 changed file with 31 additions and 8 deletions.
39 changes: 31 additions & 8 deletions result-watcher.bal
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import ballerina/task;
import ballerina/file;
import ballerina/io;
import ballerina/uuid;
import ballerina/time;
import qhana_backend.database;

configurable string storageLocation = "experimentData";
Expand Down Expand Up @@ -266,6 +267,27 @@ isolated class ResultProcessor {
}
}

isolated class ResultWatcherRescheduler {

*task:Job;

private final ResultWatcher watcher;

isolated function init(ResultWatcher watcher) {
self.watcher = watcher;
}

public isolated function execute() {
io:println(string `Reschedule watcher ${self.watcher.stepId} after new substep was found.`);
// TODO: Probably needs to be changed in the future
(decimal|int)[] initialIntervals = [2, 10, 5, 10, 10, 60, 30, 20, 60, 10, 600];
error? err = self.watcher.schedule(...initialIntervals);
if err != () {
io:println(err);
}
}
}

public isolated class ResultWatcher {

*task:Job;
Expand Down Expand Up @@ -329,9 +351,11 @@ public isolated class ResultWatcher {
do {
// perform backoff
decimal? newInterval;
int? maxRuns = -1;
lock {
newInterval = self.scheduleIntervals.length() > 0 ? self.scheduleIntervals.pop() : ();
self.currentBackoffCounter = self.backoffCounters.length() > 0 ? self.backoffCounters.pop() : ();
maxRuns = self.currentBackoffCounter;
}
if newInterval == () {
io:println(string `Unschedule watcher ${self.stepId} after running out of watching attempts.`);
Expand All @@ -340,7 +364,7 @@ public isolated class ResultWatcher {
io:println(`finally finish executing job for step ${self.stepId}`);
return;
} else {
check self.reschedule(newInterval);
check self.reschedule(newInterval, (maxRuns == ()) ? -1 : maxRuns + 1);
}
} on fail error err {
lock {
Expand All @@ -352,7 +376,7 @@ public isolated class ResultWatcher {
}
}

private isolated function reschedule(decimal interval) returns error? {
private isolated function reschedule(decimal interval, int maxCount = -1) returns error? {
error? err = self.unschedule();

if (err != ()) {
Expand All @@ -365,7 +389,7 @@ public isolated class ResultWatcher {
}

lock {
self.jobId = check task:scheduleJobRecurByFrequency(self, interval);
self.jobId = check task:scheduleJobRecurByFrequency(self, interval, maxCount);
io:println(string `Scheduled watcher for step ${self.stepId} with interval ${interval}. JobId: ${self.jobId.toString()}`);
}
}
Expand Down Expand Up @@ -410,8 +434,10 @@ public isolated class ResultWatcher {

var startingIntervall = self.scheduleIntervals.pop(); // list always contains >1 entries at this point (see guard at top)
self.currentBackoffCounter = self.backoffCounters.length() > 0 ? self.backoffCounters.pop() : ();

int? maxRuns = self.currentBackoffCounter;

check self.reschedule(startingIntervall);
check self.reschedule(startingIntervall, (maxRuns == ()) ? -1 : maxRuns + 1);
}
}

Expand All @@ -437,10 +463,7 @@ public isolated class ResultWatcher {
ResultProcessor processor = new (result, self.experimentId, self.stepId, self.resultEndpoint);
boolean isChanged = check processor.processIntermediateResult();
if isChanged {
io:println(string `Reschedule watcher ${self.stepId} after new substep was found.`);
// TODO: Probably needs to be changed in the future
(decimal|int)[] initialIntervals = [2, 10, 5, 10, 10, 60, 30, 20, 60, 10, 600];
check self.schedule(...initialIntervals);
var _ = check task:scheduleOneTimeJob(new ResultWatcherRescheduler(self), time:utcToCivil(time:utcAddSeconds(time:utcNow(), 1)));
}
} on fail error e {
lock {
Expand Down

0 comments on commit d5faa0b

Please sign in to comment.