Skip to content

Commit

Permalink
feat: setup list pages generator
Browse files Browse the repository at this point in the history
  • Loading branch information
DaniAkash committed Feb 27, 2024
1 parent e21d453 commit b2aae46
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 35 deletions.
68 changes: 67 additions & 1 deletion src/client/auth/stub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import { V2Client } from "clarifai-nodejs-grpc/proto/clarifai/api/service_grpc_p
import { ClarifaiAuthHelper } from "./helper";
import { StatusCode } from "clarifai-nodejs-grpc/proto/clarifai/api/status/status_code_pb";
import { V2Stub } from "./register";
import { grpc } from "clarifai-nodejs-grpc";
import * as grpc from "@grpc/grpc-js";
import * as jspb from "google-protobuf";
import { Status } from "clarifai-nodejs-grpc/proto/clarifai/api/status/status_pb";

const throttleStatusCodes = new Set([
StatusCode.CONN_THROTTLED,
Expand Down Expand Up @@ -42,6 +44,7 @@ type CallbackResponseType<T> = T extends (
export class AuthorizedStub {
private authHelper: ClarifaiAuthHelper;
private stub: V2Client;
public client: V2Client;
private metadata: [string, string][];

constructor(authHelper?: ClarifaiAuthHelper) {
Expand All @@ -52,6 +55,7 @@ export class AuthorizedStub {
}

this.stub = this.authHelper.getStub();
this.client = this.stub;
this.metadata = this.authHelper.metadata;
}

Expand Down Expand Up @@ -88,6 +92,29 @@ export class AuthorizedStub {
});
});
}

async makeCallPromise<
TRequest extends jspb.Message,
TResponseObject extends { status?: Status.AsObject },
TResponse extends {
toObject: (arg?: boolean) => TResponseObject;
},
>(
endpoint: (
request: TRequest,
metadata: grpc.Metadata,
options: Partial<grpc.CallOptions>,
) => Promise<TResponse>,
requestData: TRequest,
): Promise<TResponse> {
const metadata = new grpc.Metadata();
const authMetadata = this.metadata;
authMetadata.forEach((meta) => {
metadata.set(meta?.[0], meta?.[1]);
});

return await endpoint(requestData, metadata, {});
}
}

export class RetryStub extends AuthorizedStub {
Expand Down Expand Up @@ -134,6 +161,45 @@ export class RetryStub extends AuthorizedStub {
}
throw new Error("Max retry attempts reached");
}

async makeCallPromise<
TRequest extends jspb.Message,
TResponseObject extends { status?: Status.AsObject },
TResponse extends {
toObject: (arg?: boolean) => TResponseObject;
},
>(
endpoint: (
request: TRequest,
metadata: grpc.Metadata,
options: Partial<grpc.CallOptions>,
) => Promise<TResponse>,
requestData: TRequest,
): Promise<TResponse> {
for (let attempt = 1; attempt <= this.maxAttempts; attempt++) {
try {
const response = await super.makeCallPromise(endpoint, requestData);
return response;
} catch (err) {
const errorCode = (err as ServiceError).code;
if (
retryCodesGrpc.has(errorCode) ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(err as any).status?.code in throttleStatusCodes
) {
console.log(`Attempt ${attempt} failed, retrying...`);
if (attempt < this.maxAttempts) {
await new Promise((resolve) =>
setTimeout(resolve, this.backoffTime * 1000),
);
continue;
}
}
throw err;
}
}
throw new Error("Max retry attempts reached");
}
}

/**
Expand Down
40 changes: 21 additions & 19 deletions src/client/base.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { UserAppIDSet } from "clarifai-nodejs-grpc/proto/clarifai/api/resources_pb";
import { ClarifaiAuthHelper } from "./auth/helper";
import { getFromDictOrEnv } from "../utils/misc";
import { FirstParameterType, createStub } from "./auth/stub";
import { createStub } from "./auth/stub";
import { V2Stub } from "./auth/register";
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
import { V2Client } from "clarifai-nodejs-grpc/proto/clarifai/api/service_grpc_pb";
import { KWArgs } from "../utils/types";
import * as jspb from "google-protobuf";
import * as grpc from "@grpc/grpc-js";
import { Status } from "clarifai-nodejs-grpc/proto/clarifai/api/status/status_pb";

/**
* BaseClient is the base class for all the classes interacting with Clarifai endpoints.
Expand Down Expand Up @@ -37,18 +40,7 @@ export class BaseClient {
* @param {string} [kwargs.base='https://api.clarifai.com'] The base URL for the API endpoint. Defaults to 'https://api.clarifai.com'.
* @param {string} [kwargs.ui='https://clarifai.com'] The URL for the UI. Defaults to 'https://clarifai.com'.
*/
constructor(
kwargs:
| {
userId: string;
appId: string;
pat: string;
token?: string;
base?: string;
ui?: string;
}
| Record<string, never> = {},
) {
constructor(kwargs: KWArgs = {}) {
const pat = getFromDictOrEnv("pat", "CLARIFAI_PAT", kwargs);
kwargs.pat = pat;
this.authHelper =
Expand Down Expand Up @@ -77,11 +69,21 @@ export class BaseClient {
* @param argument The argument to pass to the gRPC method.
* @returns A Promise resolving to the result of the gRPC method call.
*/
protected async grpcRequest<MethodName extends keyof V2Client>(
method: MethodName,
argument: FirstParameterType<V2Client[MethodName]>,
) {
await this.STUB.makeCall(method, argument);
protected async grpcRequest<
TRequest extends jspb.Message,
TResponseObject extends { status?: Status.AsObject },
TResponse extends {
toObject: (arg?: boolean) => TResponseObject;
},
>(
endpoint: (
request: TRequest,
metadata: grpc.Metadata,
options: Partial<grpc.CallOptions>,
) => Promise<TResponse>,
requestData: TRequest,
): Promise<TResponse> {
return await this.STUB.makeCallPromise(endpoint, requestData);
}

/**
Expand Down
87 changes: 72 additions & 15 deletions src/client/lister.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,86 @@
import * as grpc from "@grpc/grpc-js";
import * as jspb from "google-protobuf";
import { KWArgs } from "../utils/types";
import { BaseClient } from "./base";
import { StatusCode } from "clarifai-nodejs-grpc/proto/clarifai/api/status/status_code_pb";
import { Status } from "clarifai-nodejs-grpc/proto/clarifai/api/status/status_pb";

export class Lister extends BaseClient {
defaultPageSize: number;

constructor(
kwargs:
| {
userId: string;
appId: string;
pat: string;
token?: string;
base?: string;
ui?: string;
}
| Record<string, never> = {},
pageSize: number = 16,
) {
constructor({
kwargs = {},
pageSize = 16,
}: {
kwargs?: KWArgs;
pageSize?: number;
}) {
super(kwargs);
this.defaultPageSize = pageSize;
}

/**
* TODO: Implement the actual pagination logic
*/
listPagesGenerator() {
throw new Error("Not implemented");
async *listPagesGenerator<
TRequest extends jspb.Message,
TResponseObject extends { status?: Status.AsObject },
TResponse extends {
toObject: (arg?: boolean) => TResponseObject;
},
>(
endpoint: (
request: TRequest,
metadata: grpc.Metadata,
options: Partial<grpc.CallOptions>,
) => Promise<TResponse>,
requestData: TRequest,
pageNo: number = 1,
perPage: number = this.defaultPageSize,
) {
// Initial setup
let page = pageNo;

while (true) {
// Prepare request data
// @ts-expect-error - TS doesn't know that the method exists
requestData["page"] = page;
if (perPage) {
// @ts-expect-error - TS doesn't know that the method exists
requestData["per_page"] = perPage;
}

// Perform gRPC request
const response = await this.grpcRequest(endpoint, requestData);

// Check response status
if (response.toObject().status?.code !== StatusCode.SUCCESS) {
throw new Error(`Listing failed with response ${response}`);
}

// Process and yield response items
if (Object.keys(response).length === 1) {
break;
} else {
yield response;
// const listingResource = Object.keys(response)[1];
// for (const item of response[listingResource]) {
// if (listingResource === "dataset_inputs") {
// yield this.processResponseKeys(
// item["input"],
// listingResource.slice(0, -1),
// );
// } else {
// yield this.processResponseKeys(item, listingResource.slice(0, -1));
// }
// }
}

// Exit loop if pagination is not to be continued
if (pageNo !== undefined || perPage !== undefined) {
break;
}
page += 1;
}
}
}
64 changes: 64 additions & 0 deletions src/client/user.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// import { StatusCode } from "clarifai-nodejs-grpc/proto/clarifai/api/status/status_code_pb";
import { Lister } from "./lister";
import { KWArgs } from "../utils/types";
import { ListAppsRequest } from "clarifai-nodejs-grpc/proto/clarifai/api/service_pb";
import { mapParamsToRequest, promisifyGrpcCall } from "../utils/misc";

// interface UserAppID {
// userId?: string;
// appId?: string;
// }

// interface AppInfo {
// // Define properties based on the Python code's usage
// }

// interface RunnerInfo {
// // Define properties based on the Python code's usage
// }

export class User extends Lister {
// private logger: Logger;

constructor(kwargs: KWArgs = {}) {
super({ kwargs });
// this.logger = getLogger("INFO", __filename);
}

// Convert generator functions to async functions returning Promises of arrays
async *listApps({
params = {},
pageNo,
perPage,
}: {
params?:
| Omit<ListAppsRequest.AsObject, "userAppId">
| Record<string, never>;
pageNo?: number;
perPage?: number;
}) {
const listApps = promisifyGrpcCall(this.STUB.client.listApps);
const request = new ListAppsRequest();
mapParamsToRequest(params, request);

yield this.listPagesGenerator(listApps, request, pageNo, perPage);
}

// async listRunners(
// filterBy: Record<string, any> = {},
// pageNo?: number,
// perPage?: number,
// ) {}

// async createApp(
// appId: string,
// baseWorkflow: string = "Empty",
// kwargs: Record<string, any> = {},
// ) {}

// TypeScript does not have a direct equivalent to Python's __getattr__, so this functionality may need to be implemented differently if required.

toString() {
// Implementation of a method to return user details as a string
}
}
43 changes: 43 additions & 0 deletions src/utils/misc.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { UserError } from "../errors";
import { GrpcWithCallback } from "./types";
import * as grpc from "@grpc/grpc-js";

/**
* Get a value from a dictionary or an environment variable.
Expand Down Expand Up @@ -27,3 +29,44 @@ function getFromEnv(key: string, envKey: string): string {
);
}
}

export function mapParamsToRequest<T>(
params: Record<string, unknown>,
request: T,
) {
Object.entries(params).forEach(([key, value]) => {
// Assuming direct mapping for simplicity
const methodName = `set${key.charAt(0).toUpperCase()}${key.slice(1)}`;
// @ts-expect-error - TS doesn't know that the method exists
if (typeof request[methodName] === "function") {
// @ts-expect-error - TS doesn't know that the method exists
request[methodName](value);
} else {
// Log or handle the absence of a setter method
console.warn(`Method ${methodName} does not exist on ListAppsRequest`);
}
});
}

export function promisifyGrpcCall<TRequest, TResponse>(
func: GrpcWithCallback<TRequest, TResponse>,
): (
request: TRequest,
metadata: grpc.Metadata,
options: Partial<grpc.CallOptions>,
) => Promise<TResponse> {
return (
request: TRequest,
metadata: grpc.Metadata,
options: Partial<grpc.CallOptions>,
): Promise<TResponse> => {
return new Promise((resolve, reject) => {
func(request, metadata, options, (error, response) => {
if (error) {
return reject(error);
}
resolve(response);
});
});
};
}
Loading

0 comments on commit b2aae46

Please sign in to comment.