Skip to content

Commit

Permalink
chore(inputs): allow stream processing of inputs (#37)
Browse files Browse the repository at this point in the history
-off-by: Tomas Pilar <tomas.pilar@ibm.com>
  • Loading branch information
pilartomas authored Mar 20, 2024
1 parent 1b110e5 commit 16ec0ca
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 88 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ If so, check out the [NodeJS SDK](https://github.com/IBM/ibm-generative-ai-node-

## Key features

- ⚡️ Performant - processes 1k of short inputs in about 4 minutes
- ⚡️ Performant - processes 1k of short inputs in under a minute
- ☀️ Fault-tolerant - retry strategies and overflood protection
- 🏖️ Worry-free parallel processing - just pass all the data, we take care of the parallel processing
- 🚦 Handles concurrency limiting - even if you have multiple parallel jobs running
- ⏩ Requests are always returned in the respective order
- 📄 Work with files as your input or output
- ⌨️ Support stdin and stdout interaction

![-----------------------------------------------------](./assets/img/rainbow.png)
Expand Down
72 changes: 39 additions & 33 deletions src/commands/text/generation/create.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { stdin } from "node:process";
import { Readable } from "node:stream";

import { readJSONStream } from "../../../utils/streams.js";
import { createInputStream } from "../../../utils/streams.js";
import { groupOptions } from "../../../utils/yargs.js";
import { parseInput } from "../../../utils/parsers.js";
import { clientMiddleware } from "../../../middleware/client.js";

import { generationConfig, generationMiddleware } from "./index.js";

const REQUEST_LIMIT = 1000; // We want to have reasonable memory footprint while maintaining full performance

export const createCommandDefinition = [
["create [inputs..]"],
"Generate text based on input(s)",
Expand All @@ -23,49 +25,53 @@ export const createCommandDefinition = [
groupOptions({
"allow-errors": {
type: "boolean",
description: "Continue if generation fails for an input",
description: "Continue even if generation fails for an input",
default: false,
},
})
),
async (args) => {
const inlineInputs = args.inputs;
const inputs =
const inputStream =
inlineInputs.length > 0
? inlineInputs
: (await readJSONStream(stdin)).map(parseInput);
? Readable.from(inlineInputs)
: createInputStream(stdin);

const { model, parameters, allowErrors } = args;
const promises = inputs.map((input) =>
args.client.text.generation.create(
{
model_id: model,
parameters,
input,
},
{
signal: args.timeout,
}
)
);

if (allowErrors) {
for (const { status, value, reason } of await Promise.allSettled(
promises
)) {
switch (status) {
case "fulfilled":
args.print(value);
break;
case "rejected":
args.print(reason);
break;
}
}
} else {
for (const output of await Promise.all(promises)) {
const requests = [];
const consume = async (request) => {
try {
const output = await request;
args.print(output);
} catch (err) {
if (allowErrors) {
args.print(err);
} else {
throw err;
}
}
};
// Produce requests
for await (const input of inputStream) {
// If limit has been reached, consume the oldest request first
if (requests.length >= REQUEST_LIMIT) await consume(requests.shift());
requests.push(
args.client.text.generation.create(
{
model_id: model,
parameters,
input,
},
{
signal: args.timeout,
}
)
);
}
// Consume remaining requests
for (const request of requests) {
await consume(request);
}
},
];
75 changes: 55 additions & 20 deletions src/commands/text/tokenization/create.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { stdin } from "node:process";
import { Readable, compose } from "node:stream";

import { clientMiddleware } from "../../../middleware/client.js";
import { parseInput } from "../../../utils/parsers.js";
import { readJSONStream } from "../../../utils/streams.js";
import {
createBatchTransform,
createInputStream,
} from "../../../utils/streams.js";
import { groupOptions } from "../../../utils/yargs.js";

export const createCommandDefinition = [
Expand Down Expand Up @@ -43,30 +46,62 @@ export const createCommandDefinition = [
},
"Configuration:"
)
)
.options(
groupOptions(
{
"batch-size": {
type: "number",
requiresArg: true,
describe: "Batch size for inputs",
default: 100,
},
"allow-errors": {
type: "boolean",
description:
"Continue even if tokenization fails for a batch of inputs",
default: false,
},
},
"Options:"
)
),
async (args) => {
const inlineInputs = args.inputs;
const inputs =
const inputStream =
inlineInputs.length > 0
? inlineInputs
: (await readJSONStream(stdin)).map(parseInput);
? Readable.from(inlineInputs)
: createInputStream(stdin);
const { model, tokens, inputText, batchSize, allowErrors } = args;

const { model, tokens, inputText } = args;
const output = await args.client.text.tokenization.create(
{
model_id: model,
input: inputs,
parameters: {
return_options: {
input_text: inputText,
tokens,
for await (const inputs of compose(
inputStream,
createBatchTransform({ batchSize })
)) {
try {
const output = await args.client.text.tokenization.create(
{
model_id: model,
input: inputs,
parameters: {
return_options: {
input_text: inputText,
tokens,
},
},
},
},
},
{
signal: args.timeout,
{
signal: args.timeout,
}
);
args.print(output);
} catch (err) {
if (allowErrors) {
args.print(err);
} else {
throw err;
}
}
);
args.print(output);
}
},
];
11 changes: 2 additions & 9 deletions src/errors.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
import {
BaseError,
InvalidInputError as InvalidInputSDKError,
} from "@ibm-generative-ai/node-sdk";

export class InvalidInputError extends Error {}
import { BaseError, InvalidInputError } from "@ibm-generative-ai/node-sdk";

export const isUsageError = (err) =>
!(err instanceof BaseError) ||
err instanceof InvalidInputSDKError ||
err instanceof InvalidInputError;
!(err instanceof BaseError) || err instanceof InvalidInputError;
54 changes: 30 additions & 24 deletions src/utils/streams.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
import { pipeline } from "stream/promises";
import { compose } from "node:stream";

import Parser from "stream-json/Parser.js";
import StreamValues from "stream-json/streamers/StreamValues.js";
import Parser from "stream-json/jsonl/Parser.js";

import { InvalidInputError } from "../errors.js";
import { parseInput } from "./parsers.js";

export const readJSONStream = async (stream) => {
const values = [];
try {
await pipeline(
stream,
new Parser({ jsonStreaming: true }),
new StreamValues(),
async function (source) {
for await (const value of source) {
values.push(value.value);
}
export const createInputStream = (stream) =>
compose(
stream,
new Parser({
checkErrors: true,
}),
async function* (source) {
for await (const value of source) {
yield parseInput(value.value);
}
);
} catch (err) {
throw new InvalidInputError(
"Failed to parse JSON stream, please check your input",
{ cause: err }
);
}
return values;
};
}
);

export function createBatchTransform({ batchSize }) {
if (batchSize < 1) throw new Error("Batch size must be positive");

return async function* (source) {
let batch = [];
for await (const item of source) {
if (batch.length >= batchSize) {
yield batch;
batch = [];
}
batch.push(item);
}
if (batch.length > 0) yield batch;
};
}

0 comments on commit 16ec0ca

Please sign in to comment.