Skip to content

Latest commit

 

History

History
 
 

nice-grpc

nice-grpc npm version

A Node.js gRPC library that is nice to you. Built on top of grpc-js.

Features

  • Written in TypeScript for TypeScript.
  • Modern API that uses Promises and Async Iterables for streaming.
  • Easy cancellation propagation with AbortSignal.
  • Client and server middleware support via concise API that uses Async Generators.

Prerequisites

Supports NodeJS 14+. Global AbortController is required which is available in NodeJS since 15.0.0, NodeJS 14.17+ requires the --experimental-abortcontroller flag. A polyfill is available for older NodeJS versions.

Installation

npm install nice-grpc

Usage

Compiling Protobuf files

The recommended way is to use ts-proto.

Using ts-proto

Install necessary tools:

npm install protobufjs long
npm install --save-dev grpc-tools ts-proto

Use ts-proto version not older than 1.112.0.

Given a Protobuf file ./proto/example.proto, generate TypeScript code into directory ./compiled_proto:

./node_modules/.bin/grpc_tools_node_protoc \
  --plugin=protoc-gen-ts_proto=./node_modules/.bin/protoc-gen-ts_proto \
  --ts_proto_out=./compiled_proto \
  --ts_proto_opt=outputServices=nice-grpc,outputServices=generic-definitions,useExactTypes=false \
  --proto_path=./proto \
  ./proto/example.proto

You can omit the --plugin flag if you invoke this command via npm script.

Using google-protobuf

Install necessary tools:

npm install google-protobuf
npm install --save-dev grpc-tools grpc_tools_node_protoc_ts @types/google-protobuf

Given a Protobuf file ./proto/example.proto, generate JS code and TypeScript definitions into directory ./compiled_proto:

./node_modules/.bin/grpc_tools_node_protoc \
  --plugin=protoc-gen-grpc=./node_modules/.bin/grpc_tools_node_protoc_plugin \
  --plugin=protoc-gen-ts=./node_modules/.bin/protoc-gen-ts \
  --js_out=import_style=commonjs,binary:./compiled_proto \
  --ts_out=grpc_js:./compiled_proto \
  --grpc_out=grpc_js:./compiled_proto \
  --proto_path=./proto \
  ./proto/example.proto

Server

Consider the following Protobuf definition:

syntax = "proto3";

package nice_grpc.example;

service ExampleService {
  rpc ExampleUnaryMethod(ExampleRequest) returns (ExampleResponse) {};
}

message ExampleRequest {
  // ...
}
message ExampleResponse {
  // ...
}

After compiling Protobuf file, we can write service implementation:

When compiling Protobufs using ts-proto:

import {
  ExampleServiceImplementation,
  ExampleRequest,
  ExampleResponse,
  DeepPartial,
} from './compiled_proto/example';

const exampleServiceImpl: ExampleServiceImplementation = {
  async exampleUnaryMethod(
    request: ExampleRequest,
  ): Promise<DeepPartial<ExampleResponse>> {
    // ... method logic

    return response;
  },
};

Alternatively, you can use classes:

class ExampleServiceImpl implements ExampleServiceImplementation {
  async exampleUnaryMethod(
    request: ExampleRequest,
  ): Promise<DeepPartial<ExampleResponse>> {
    // ... method logic

    return response;
  }
}

With ts-proto, response is automatically wrapped with fromPartial.

When compiling Protobufs using google-protobuf:

import {ServiceImplementation} from 'nice-grpc';
import {ExampleRequest, ExampleResponse} from './compiled_proto/example_pb';
import {IExampleService} from './compiled_proto/example_grpc_pb';

const exampleServiceImpl: ServiceImplementation<IExampleService> = {
  async exampleUnaryMethod(request: ExampleRequest): Promise<ExampleResponse> {
    // ... method logic

    return response;
  },
};

Further examples use ts-proto.

Now we can create and start a server that exposes our service:

import {createServer} from 'nice-grpc';
import {ExampleServiceDefinition} from './compiled_proto/example';

const server = createServer();

server.add(ExampleServiceDefinition, exampleServiceImpl);

await server.listen('0.0.0.0:8080');

Once we need to stop, gracefully shut down the server:

await server.shutdown();

Call context

Each service implementation method receives CallContext as a second argument, that has type:

type CallContext = {
  /**
   * Request metadata from client.
   */
  metadata: Metadata;
  /**
   * Client address.
   */
  peer: string;
  /**
   * Response header. Sent with the first response, or when `sendHeader` is
   * called.
   */
  header: Metadata;
  /**
   * Manually send response header.
   */
  sendHeader(): void;
  /**
   * Response trailer. Sent when server method returns or throws.
   */
  trailer: Metadata;
  /**
   * Signal that is aborted once the call gets cancelled.
   */
  signal: AbortSignal;
};

Call context may be augmented by Middleware.

Errors

To report an error to a client, use ServerError.

Any thrown errors other than ServerError will result in client receiving error with status code UNKNOWN. Use server middleware for custom handling of uncaught errors.

See gRPC docs for the correct usage of status codes.

import {ServerError, Status} from 'nice-grpc';

const exampleServiceImpl: ExampleServiceImplementation = {
  async exampleUnaryMethod(
    request: ExampleRequest,
  ): Promise<DeepPartial<ExampleResponse>> {
    // ... method logic

    throw new ServerError(Status.NOT_FOUND, 'Requested data does not exist');
  },
};

Metadata

A server receives client metadata along with request, and can send response metadata in header and trailer.

const exampleServiceImpl: ExampleServiceImplementation = {
  async exampleUnaryMethod(
    request: ExampleRequest,
    context: CallContext,
  ): Promise<DeepPartial<ExampleResponse>> {
    // read client metadata
    const someValue = context.metadata.get('some-key');

    // add metadata to header
    context.header.set('some-key', 'some-value');

    // ... method logic

    // add metadata to trailer
    context.trailer.set('some-key', 'some-value');

    return response;
  },
};

Cancelling calls

A server receives AbortSignal that gets aborted once the call is cancelled by the client. You can use it to cancel any inner requests.

import fetch from 'node-fetch';

const exampleServiceImpl: ExampleServiceImplementation = {
  async exampleUnaryMethod(
    request: ExampleRequest,
    context: CallContext,
  ): Promise<DeepPartial<ExampleResponse>> {
    const response = await fetch('http://example.com', {
      signal: context.signal,
    });

    // ...
  },
};

Server streaming

Consider the following Protobuf definition:

service ExampleService {
  rpc ExampleStreamingMethod(ExampleRequest)
    returns (stream ExampleResponse) {};
}

Service implementation defines this method as an Async Generator:

import {delay} from 'abort-controller-x';

const exampleServiceImpl: ExampleServiceImplementation = {
  async *exampleStreamingMethod(
    request: ExampleRequest,
    context: CallContext,
  ): AsyncIterable<DeepPartial<ExampleResponse>> {
    for (let i = 0; i < 10; i++) {
      await delay(context.signal, 1000);

      yield response;
    }
  },
};
Example: IxJS
import {range} from 'ix/asynciterable';
import {withAbort, map} from 'ix/asynciterable/operators';

const exampleServiceImpl: ExampleServiceImplementation = {
  async *exampleStreamingMethod(
    request: ExampleRequest,
    context: CallContext,
  ): AsyncIterable<DeepPartial<ExampleResponse>> {
    yield* range(0, 10).pipe(
      withAbort(context.signal),
      map(() => response),
    );
  },
};
Example: Observables
import {Observable} from 'rxjs';
import {from} from 'ix/asynciterable';
import {withAbort} from 'ix/asynciterable/operators';

const exampleServiceImpl: ExampleServiceImplementation = {
  async *exampleStreamingMethod(
    request: ExampleRequest,
    context: CallContext,
  ): AsyncIterable<DeepPartial<ExampleResponse>> {
    const observable: Observable<DeepPartial<ExampleResponse>>;

    yield* from(observable).pipe(withAbort(context.signal));
  },
};

Client streaming

Given a client streaming method:

service ExampleService {
  rpc ExampleClientStreamingMethod(stream ExampleRequest)
    returns (ExampleResponse) {};
}

Service implementation method receives request as an Async Iterable:

const exampleServiceImpl: ExampleServiceImplementation = {
  async exampleUnaryMethod(
    request: AsyncIterable<ExampleRequest>,
  ): Promise<DeepPartial<ExampleResponse>> {
    for await (const item of request) {
      // ...
    }

    return response;
  },
};

Middleware

Server middleware intercepts incoming calls allowing to:

  • Execute any logic before and after implementation methods
  • Look into request, request metadata and response
  • Interrupt a call before it reaches implementation by throwing a ServerError
  • Catch implementation errors and return friendly ServerErrors to a client
  • Augment call context
  • Modify response header and trailer metadata

Server middleware is defined as an Async Generator. The most basic no-op middleware looks like this:

import {ServerMiddlewareCall, CallContext} from 'nice-grpc';

async function* middleware<Request, Response>(
  call: ServerMiddlewareCall<Request, Response>,
  context: CallContext,
) {
  return yield* call.next(call.request, context);
}

For unary and client streaming methods, the call.next generator yields no items and returns a single response; for server streaming and bidirectional streaming methods, it yields each response and returns void. By doing return yield* we cover both cases. To handle these cases separately, we can write a middleware as follows:

async function* middleware<Request, Response>(
  call: ServerMiddlewareCall<Request, Response>,
  context: CallContext,
) {
  if (!call.responseStream) {
    const response = yield* call.next(call.request, context);

    return response;
  } else {
    for await (const response of call.next(call.request, context)) {
      yield response;
    }

    return;
  }
}

To attach a middleware to a server, use a server.use method. Note that server.use returns a new server instance.

const server = createServer().use(middleware1).use(middleware2);

A middleware that is attached first, will be invoked first.

You can also attach middleware per-service:

const server = createServer().use(middlewareA);

server.with(middlewareB).add(Service1, service1Impl);
server.with(middlewareC).add(Service2, service2Impl);

In the above example, Service1 gets middlewareA and middlewareB, and Service2 gets middlewareA and middlewareC.

Example: Logging

Log all calls:

import {Status} from 'nice-grpc';
import {isAbortError} from 'abort-controller-x';

async function* loggingMiddleware<Request, Response>(
  call: ServerMiddlewareCall<Request, Response>,
  context: CallContext,
) {
  const {path} = call.method;

  console.log('Server call', path, 'start');

  try {
    const result = yield* call.next(call.request, context);

    console.log('Server call', path, 'end: OK');

    return result;
  } catch (error) {
    if (error instanceof ServerError) {
      console.log(
        'Server call',
        path,
        `end: ${Status[error.code]}: ${error.details}`,
      );
    } else if (isAbortError(error)) {
      console.log('Server call', path, 'cancel');
    } else {
      console.log('Server call', path, `error: ${error?.stack}`);
    }

    throw error;
  }
}
Example: Error handling

Catch unknown errors and wrap them into ServerErrors with friendly messages:

import {Status} from 'nice-grpc';
import {isAbortError} from 'abort-controller-x';

async function* errorHandlingMiddleware<Request, Response>(
  call: ServerMiddlewareCall<Request, Response>,
  context: CallContext,
) {
  try {
    return yield* call.next(call.request, context);
  } catch (error: unknown) {
    if (error instanceof ServerError || isAbortError(error)) {
      throw error;
    }

    let details = 'Unknown server error occurred';

    if (process.env.NODE_ENV === 'development') {
      details += `: ${error.stack}`;
    }

    throw new ServerError(Status.UNKNOWN, details);
  }
}
Example: Authentication

Validate JSON Web Token (JWT) from request metadata and put its claims to CallContext:

import {Status} from 'nice-grpc';
import createRemoteJWKSet from 'jose/jwks/remote';
import jwtVerify, {JWTPayload} from 'jose/jwt/verify';
import {JOSEError} from 'jose/util/errors';

const jwks = createRemoteJWKSet(
  new URL('https://example.com/.well-known/jwks.json'),
);

type AuthCallContextExt = {
  auth: JWTPayload;
};

async function* authMiddleware<Request, Response>(
  call: ServerMiddlewareCall<Request, Response, AuthCallContextExt>,
  context: CallContext,
) {
  const authorization = context.metadata.get('Authorization');

  if (authorization == null) {
    throw new ServerError(
      Status.UNAUTHENTICATED,
      'Missing Authorization metadata',
    );
  }

  const parts = authorization.toString().split(' ');

  if (parts.length !== 2 || parts[0] !== 'Bearer') {
    throw new ServerError(
      Status.UNAUTHENTICATED,
      'Invalid Authorization metadata format. Expected "Bearer <token>"',
    );
  }

  const token = parts[1];

  const {payload} = await jwtVerify(token, jwks).catch(error => {
    if (error instanceof JOSEError) {
      throw new ServerError(Status.UNAUTHENTICATED, error.message);
    } else {
      throw error;
    }
  });

  return yield* call.next(call.request, {
    ...context,
    auth: payload,
  });
}

Service implementation can then access JWT claims via call context:

const exampleServiceImpl: ExampleServiceImplementation<AuthCallContextExt> = {
  async exampleUnaryMethod(
    request: ExampleRequest,
    context: CallContext & AuthCallContextExt,
  ): Promise<DeepPartial<ExampleResponse>> {
    const userId = context.auth.sub;

    // ...
  },
};

Client

Consider the following Protobuf definition:

syntax = "proto3";

package nice_grpc.example;

service ExampleService {
  rpc ExampleUnaryMethod(ExampleRequest) returns (ExampleResponse) {};
}

message ExampleRequest {
  // ...
}
message ExampleResponse {
  // ...
}

After compiling Protobuf file, we can create the client:

When compiling Protobufs using ts-proto:

import {createChannel, createClient} from 'nice-grpc';
import {
  ExampleServiceClient,
  ExampleServiceDefinition,
} from './compiled_proto/example';

const channel = createChannel('localhost:8080');

const client: ExampleServiceClient = createClient(
  ExampleServiceDefinition,
  channel,
);

When compiling Protobufs using google-protobuf:

import {createChannel, createClient, Client} from 'nice-grpc';
import {
  ExampleService,
  IExampleService,
} from './compiled_proto/example_grpc_pb';

const channel = createChannel('localhost:8080');

const client: Client<IExampleService> = createClient(ExampleService, channel);

Further examples use ts-proto.

Call the method:

const response = await client.exampleUnaryMethod(request);

With ts-proto, request is automatically wrapped with fromPartial.

Once we've done with the client, close the channel:

channel.close();

Call options

Each client method accepts CallOptions as an optional second argument, that has type:

type CallOptions = {
  /**
   * Request metadata.
   */
  metadata?: Metadata;
  /**
   * Signal that cancels the call once aborted.
   */
  signal?: AbortSignal;
  /**
   * Called when header is received.
   */
  onHeader?(header: Metadata): void;
  /**
   * Called when trailer is received.
   */
  onTrailer?(trailer: Metadata): void;
};

Call options may be augmented by Middleware.

When creating a client, you may specify default call options per method, or for all methods. This doesn't make much sense for built-in options, but may do for middleware.

const client = createClient(ExampleServiceDefinition, channel, {
  '*': {
    // applies for all methods
  },
  exampleUnaryMethod: {
    // applies for single method
  },
});

Channels

By default, a channel uses insecure connection. The following are equivalent:

import {createChannel, ChannelCredentials} from 'nice-grpc';

createChannel('example.com:8080');
createChannel('http://example.com:8080');
createChannel('example.com:8080', ChannelCredentials.createInsecure());

To connect over TLS, use one of the following:

createChannel('https://example.com:8080');
createChannel('example.com:8080', ChannelCredentials.createSsl());

If the port is omitted, it defaults to 80 for insecure connections, and 443 for secure connections.

Metadata

Client can send request metadata and receive response header and trailer:

import {Metadata} from 'nice-grpc';

const response = await client.exampleUnaryMethod(request, {
  metadata: Metadata({key: 'value'}),
  onHeader(header: Metadata) {
    // ...
  },
  onTrailer(trailer: Metadata) {
    // ...
  },
});

Errors

Client calls may throw gRPC errors represented as ClientError, that contain status code and description.

import {ClientError, Status} from 'nice-grpc';
import {ExampleResponse} from './compiled_proto/example';

let response: ExampleResponse | null;

try {
  response = await client.exampleUnaryMethod(request);
} catch (error: unknown) {
  if (error instanceof ClientError && error.code === Status.NOT_FOUND) {
    response = null;
  } else {
    throw error;
  }
}

Cancelling calls

A client call can be cancelled using AbortSignal.

import {isAbortError} from 'abort-controller-x';

const abortController = new AbortController();

client
  .exampleUnaryMethod(request, {
    signal: abortController.signal,
  })
  .catch(error => {
    if (isAbortError(error)) {
      // aborted
    } else {
      throw error;
    }
  });

abortController.abort();

Server streaming

Consider the following Protobuf definition:

service ExampleService {
  rpc ExampleStreamingMethod(ExampleRequest)
    returns (stream ExampleResponse) {};
}

Client method returns an Async Iterable:

for await (const response of client.exampleStreamingMethod(request)) {
  // ...
}

Client streaming

Given a client streaming method:

service ExampleService {
  rpc ExampleClientStreamingMethod(stream ExampleRequest)
    returns (ExampleResponse) {};
}

Client method expects an Async Iterable as its first argument:

import {ExampleRequest, DeepPartial} from './compiled_proto/example';

async function* createRequest(): AsyncIterable<DeepPartial<ExampleRequest>> {
  for (let i = 0; i < 10; i++) {
    yield request;
  }
}

const response = await client.exampleClientStreamingMethod(createRequest());

Middleware

Client middleware intercepts outgoing calls allowing to:

  • Execute any logic before and after reaching server
  • Modify request metadata
  • Look into request, response and response metadata
  • Send call multiple times for retries or hedging
  • Augment call options type to have own configuration

Client middleware is defined as an Async Generator and is very similar to Server middleware. Key differences:

  • Middleware invocation order is reversed: middleware that is attached first, will be invoked last.
  • There's no such thing as CallContext for client middleware; instead, CallOptions are passed through the chain and can be accessed or altered by a middleware.

The most basic no-op middleware looks like this:

import {ClientMiddlewareCall, CallOptions} from 'nice-grpc';

async function* middleware<Request, Response>(
  call: ClientMiddlewareCall<Request, Response>,
  options: CallOptions,
) {
  return yield* call.next(call.request, options);
}

For unary and client streaming methods, the call.next generator yields no items and returns a single response; for server streaming and bidirectional streaming methods, it yields each response and returns void. By doing return yield* we cover both cases. To handle these cases separately, we can write a middleware as follows:

async function* middleware<Request, Response>(
  call: ClientMiddlewareCall<Request, Response>,
  options: CallOptions,
) {
  if (!call.responseStream) {
    const response = yield* call.next(call.request, options);

    return response;
  } else {
    for await (const response of call.next(call.request, options)) {
      yield response;
    }

    return;
  }
}

To create a client with middleware, use a client factory:

import {createClientFactory} from 'nice-grpc';

const client = createClientFactory()
  .use(middleware1)
  .use(middleware2)
  .create(ExampleService, channel);

A middleware that is attached first, will be invoked last.

You can reuse a single factory to create multiple clients:

const clientFactory = createClientFactory().use(middleware);

const client1 = clientFactory.create(Service1, channel1);
const client2 = clientFactory.create(Service2, channel2);

You can also attach middleware per-client:

const factory = createClientFactory().use(middlewareA);

const client1 = clientFactory.use(middlewareB).create(Service1, channel1);
const client2 = clientFactory.use(middlewareC).create(Service2, channel2);

In the above example, Service1 client gets middlewareA and middlewareB, and Service2 client gets middlewareA and middlewareC.

Example: Logging

Log all calls:

import {
  ClientMiddlewareCall,
  CallOptions,
  ClientError,
  Status,
} from 'nice-grpc';
import {isAbortError} from 'abort-controller-x';

async function* loggingMiddleware<Request, Response>(
  call: ClientMiddlewareCall<Request, Response>,
  options: CallOptions,
) {
  const {path} = call.method;

  console.log('Client call', path, 'start');

  try {
    const result = yield* call.next(call.request, options);

    console.log('Client call', path, 'end: OK');

    return result;
  } catch (error) {
    if (error instanceof ClientError) {
      console.log(
        'Client call',
        path,
        `end: ${Status[error.code]}: ${error.details}`,
      );
    } else if (isAbortError(error)) {
      console.log('Client call', path, 'cancel');
    } else {
      console.log('Client call', path, `error: ${error?.stack}`);
    }

    throw error;
  }
}