Skip to content

Commit

Permalink
Read faster from pipes. Add some checks in gollm controller. (#2656)
Browse files Browse the repository at this point in the history
  • Loading branch information
kbirk authored Feb 5, 2024
1 parent 8c39a70 commit 53619d5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ public ResponseEntity<TaskResponse> createModelCardTask(
return ResponseEntity.notFound().build();
}

// make sure there is text in the document
if (document.get().getText() == null || document.get().getText().isEmpty()) {
log.warn("Document {} has no text to send", documentId);
return ResponseEntity.badRequest().build();
}

// check for input length
if (document.get().getText().length() > 600000) {
log.warn("Document {} text too long for GoLLM model card task", documentId);
return ResponseEntity.badRequest().build();
}

ModelCardInput input = new ModelCardInput();
input.setResearchPaper(document.get().getText());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class Task {

private int PROCESS_KILL_TIMEOUT_SECONDS = 10;

private int BYTES_PER_READ = 1024 * 1024;

public Task(TaskRequest req) throws IOException, InterruptedException {
mapper = new ObjectMapper();

Expand Down Expand Up @@ -166,7 +168,7 @@ public byte[] readOutputWithTimeout(int timeoutMinutes)
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(outputPipeName))) {
log.debug("Reading from output pipe: {} for task: {}", outputPipeName, req.getId());
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024]; // buffer size
byte[] buffer = new byte[BYTES_PER_READ]; // buffer size
int bytesRead;
while ((bytesRead = bis.read(buffer)) != -1) {
bos.write(buffer, 0, bytesRead);
Expand Down
16 changes: 10 additions & 6 deletions packages/taskrunner/src/test/resources/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
import sys
import concurrent.futures


READ_CHUNK_SIZE = 1024*1024


def read_input(input_pipe: str):
line = b''
with open(input_pipe, 'rb') as f:
chunks = []
with open(input_pipe, "rb") as f:
while True:
chunk = f.read(1024)
if chunk == b'':
chunk = f.read(READ_CHUNK_SIZE)
if chunk == b"":
break
line += chunk
return json.loads(line.decode('utf-8'))
chunks.append(chunk)
return json.loads(b"".join(chunks).decode("utf-8"))

def read_input_with_timeout(input_pipe: str, timeout_seconds: int):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
Expand Down

0 comments on commit 53619d5

Please sign in to comment.