Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with stalled requests if shutdown #3075

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public synchronized void complete(final TaskResponse resp) {

// TTL = Time to live, the maximum time a key will be in the cache before it is
// evicted, regardless of activity.
@Value("${terarium.taskrunner.response-cache-ttl-seconds:86400}") // 24 hours
@Value("${terarium.taskrunner.response-cache-ttl-seconds:43200}") // 12 hours
private long CACHE_TTL_SECONDS;

// Max idle = The maximum time a key can be idle in the cache before it is
Expand All @@ -143,7 +143,7 @@ public synchronized void complete(final TaskResponse resp) {
// Always use a lease time for distributed locks to prevent application wide
// deadlocks. If for whatever reason the lock has not been released within a
// N seconds, it will automatically free itself.
@Value("${terarium.taskrunner.redis-lock-lease-seconds:30}") // 30 seconds
@Value("${terarium.taskrunner.redis-lock-lease-seconds:10}") // 10 seconds
private long REDIS_LOCK_LEASE_SECONDS;

private final RabbitTemplate rabbitTemplate;
Expand Down Expand Up @@ -463,15 +463,15 @@ public TaskFuture runTaskAsync(final TaskRequest r) throws JsonProcessingExcepti
// future already exists on this instance
return futures.get(existingId);
}
// otherwise dispatch it again
log.info("No viable cached response found for task id: {} for SHA: {}, creating new task", existingId,
hash);
}

// no cached request, create the response cache entry
final TaskResponse queuedResponse = req.createResponse(TaskStatus.QUEUED);
responseCache.put(req.getId(), queuedResponse, CACHE_TTL_SECONDS, TimeUnit.SECONDS, CACHE_MAX_IDLE_SECONDS,
TimeUnit.SECONDS);
// otherwise dispatch it again, and overwrite the id
log.info(
"No viable cached response found for task id: {} for SHA: {}, creating new task with id {}", existingId,
hash, req.getId());

taskIdCache.put(hash, req.getId(), CACHE_TTL_SECONDS, TimeUnit.SECONDS, CACHE_MAX_IDLE_SECONDS,
TimeUnit.SECONDS);
}

// now send request
final String requestQueue = String.format("%s-%s", TASK_RUNNER_REQUEST_QUEUE, req.getType().toString());
Expand All @@ -495,6 +495,13 @@ public TaskFuture runTaskAsync(final TaskRequest r) throws JsonProcessingExcepti
final String jsonStr = objectMapper.writeValueAsString(req);
rabbitTemplate.convertAndSend(requestQueue, jsonStr);

// put the response in redis after it is queued in the case the id is reserved
// but the server is shutdown before it dispatches the request, which would
// cause servers to wait on requests that were never sent.
final TaskResponse queuedResponse = req.createResponse(TaskStatus.QUEUED);
responseCache.put(req.getId(), queuedResponse, CACHE_TTL_SECONDS, TimeUnit.SECONDS, CACHE_MAX_IDLE_SECONDS,
TimeUnit.SECONDS);

// create and return the future
final CompletableTaskFuture future = new CompletableTaskFuture(req.getId(), queuedResponse);
futures.put(req.getId(), future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,39 @@ public void testItDoesNotCacheFailure() throws Exception {
final TaskFuture future1 = taskService.runTaskAsync(req);
Assertions.assertEquals(TaskStatus.FAILED, future1.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getStatus());

// next request should not pull the successful response from cache
// next request should not pull the failed response from cache
final TaskFuture future2 = taskService.runTaskAsync(req);
Assertions.assertEquals(TaskStatus.FAILED, future2.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getStatus());
Assertions.assertNotEquals(future1.getId(), future2.getId());
}

// @Test
@WithUserDetails(MockUser.URSULA)
public void testItDoesNotCacheFailureButCacheSuccessAfter() throws Exception {
final int TIMEOUT_SECONDS = 20;

final byte[] input = "{\"input\":\"This is my input string\"}".getBytes();

final TaskRequest req = new TaskRequest();
req.setType(TaskType.GOLLM);
req.setScript("/echo.py");
req.setInput(input);

final TaskFuture future1 = taskService.runTaskAsync(req);
taskService.cancelTask(future1.getId());
Assertions.assertEquals(TaskStatus.CANCELLED, future1.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getStatus());

// next request should not pull the cancelled response from cache
final TaskFuture future2 = taskService.runTaskAsync(req);
Assertions.assertEquals(TaskStatus.SUCCESS, future2.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getStatus());
Assertions.assertNotEquals(future1.getId(), future2.getId());

// next request should pull the successful response from cache
final TaskFuture future3 = taskService.runTaskAsync(req);
Assertions.assertEquals(TaskStatus.SUCCESS, future3.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getStatus());
Assertions.assertEquals(future2.getId(), future3.getId());
}

// @Test
@WithUserDetails(MockUser.URSULA)
public void testItCanCacheWithConcurrency() throws Exception {
Expand Down
Loading