Skip to content

Commit

Permalink
fix: update grpc.max_metadata_size to 4MiB for exactly-once, and shif…
Browse files Browse the repository at this point in the history
…t ack/modack errors to 'debug' stream channel (#1505)

* fix: update grpc.max_metadata_size to 4MiB for exactly-once

* fix: change ack and modAck errors to be optional debug warnings

* tests: update testing to match earlier changes

* fix: check against undefined, not falsey
  • Loading branch information
feywind authored Apr 6, 2022
1 parent 18b7e5d commit abd10cc
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 8 deletions.
5 changes: 4 additions & 1 deletion src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ export abstract class MessageQueue {
try {
await this._sendBatch(batch);
} catch (e) {
this._subscriber.emit('error', e);
// These queues are used for ack and modAck messages, which should
// never surface an error to the user level. However, we'll emit
// them onto this debug channel in case debug info is needed.
this._subscriber.emit('debug', e);
}

this.numInFlightRequests -= batchSize;
Expand Down
12 changes: 11 additions & 1 deletion src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,17 @@ export class PubSub {
private schemaClient?: SchemaServiceClient;

constructor(options?: ClientConfig) {
options = options || {};
options = Object.assign({}, options || {});

// Needed for potentially large responses that may come from using exactly-once delivery.
// This will get passed down to grpc client objects.
const maxMetadataSize = 'grpc.max_metadata_size';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const optionsAny = options as any;
if (optionsAny[maxMetadataSize] === undefined) {
optionsAny[maxMetadataSize] = 4 * 1024 * 1024; // 4 MiB
}

// Determine what scopes are needed.
// It is the union of the scopes on both clients.
const clientClasses = [v1.SubscriberClient, v1.PublisherClient];
Expand Down
1 change: 1 addition & 0 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ export class Subscriber extends EventEmitter {

this._stream
.on('error', err => this.emit('error', err))
.on('debug', err => this.emit('debug', err))
.on('data', (data: PullResponse) => this._onData(data))
.once('close', () => this.close());

Expand Down
8 changes: 8 additions & 0 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export type DetachSubscriptionResponse = EmptyResponse;
listener: (error: StatusError) => void
): this;
on(event: 'close', listener: () => void): this;
on(event: 'debug', listener: (error: StatusError) => void); this;
// Only used internally.
on(event: 'newListener', listener: Function): this;
Expand Down Expand Up @@ -154,6 +155,9 @@ export type DetachSubscriptionResponse = EmptyResponse;
* Upon receipt of an error:
* on(event: 'error', listener: (error: Error) => void): this;
*
* Upon receipt of a (non-fatal) debug warning:
* on(event: 'debug', listener: (error: Error) => void): this;
*
* Upon the closing of the subscriber:
* on(event: 'close', listener: Function): this;
*
Expand Down Expand Up @@ -220,6 +224,9 @@ export type DetachSubscriptionResponse = EmptyResponse;
* // Register an error handler.
* subscription.on('error', (err) => {});
*
* // Register a debug handler, to catch non-fatal errors.
* subscription.on('debug', (err) => { console.error(err); });
*
* // Register a close handler in case the subscriber closes unexpectedly
* subscription.on('close', () => {});
*
Expand Down Expand Up @@ -318,6 +325,7 @@ export class Subscription extends EventEmitter {
this._subscriber = new Subscriber(this, options);
this._subscriber
.on('error', err => this.emit('error', err))
.on('debug', err => this.emit('debug', err))
.on('message', message => this.emit('message', message))
.on('close', () => this.emit('close'));

Expand Down
12 changes: 6 additions & 6 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ describe('MessageQueues', () => {
assert.deepStrictEqual(batch, expectedBatch);
});

it('should emit any errors', done => {
it('should emit any errors as debug events', done => {
const fakeError = new Error('err');

sandbox.stub(messageQueue.batches, 'push').throws(fakeError);

subscriber.on('error', err => {
subscriber.on('debug', err => {
assert.strictEqual(err, fakeError);
done();
});
Expand Down Expand Up @@ -362,7 +362,7 @@ describe('MessageQueues', () => {
assert.strictEqual(callOptions, fakeCallOptions);
});

it('should throw a BatchError if unable to ack', done => {
it('should throw a BatchError on "debug" if unable to ack', done => {
const messages = [
new FakeMessage(),
new FakeMessage(),
Expand All @@ -380,7 +380,7 @@ describe('MessageQueues', () => {

sandbox.stub(subscriber.client, 'acknowledge').rejects(fakeError);

subscriber.on('error', (err: BatchError) => {
subscriber.on('debug', (err: BatchError) => {
assert.strictEqual(err.message, expectedMessage);
assert.deepStrictEqual(err.ackIds, ackIds);
assert.strictEqual(err.code, fakeError.code);
Expand Down Expand Up @@ -487,7 +487,7 @@ describe('MessageQueues', () => {
assert.strictEqual(callOptions, fakeCallOptions);
});

it('should throw a BatchError if unable to modAck', done => {
it('should throw a BatchError on "debug" if unable to modAck', done => {
const messages = [
new FakeMessage(),
new FakeMessage(),
Expand All @@ -505,7 +505,7 @@ describe('MessageQueues', () => {

sandbox.stub(subscriber.client, 'modifyAckDeadline').rejects(fakeError);

subscriber.on('error', (err: BatchError) => {
subscriber.on('debug', (err: BatchError) => {
assert.strictEqual(err.message, expectedMessage);
assert.deepStrictEqual(err.ackIds, ackIds);
assert.strictEqual(err.code, fakeError.code);
Expand Down
16 changes: 16 additions & 0 deletions test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,12 @@ describe('PubSub', () => {
});

describe('instantiation', () => {
const maxMetadataSizeKey = 'grpc.max_metadata_size';
const DEFAULT_OPTIONS = {
libName: 'gccl',
libVersion: PKG.version,
scopes: [],
[maxMetadataSizeKey]: 4 * 1024 * 1024,
};

it('should extend the correct methods', () => {
Expand All @@ -220,6 +222,20 @@ describe('PubSub', () => {
assert(new PubSub() instanceof PubSub);
});

it('should augment the gRPC options for metadata size', () => {
let pubsub = new PubSub();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let optionsAny: any = pubsub.options;
assert.strictEqual(optionsAny[maxMetadataSizeKey], 4 * 1024 * 1024);

optionsAny = {
[maxMetadataSizeKey]: 1 * 1024 * 1024,
};
pubsub = new PubSub(optionsAny);
optionsAny = pubsub.options;
assert.strictEqual(optionsAny[maxMetadataSizeKey], 1 * 1024 * 1024);
});

it('should combine all required scopes', () => {
v1ClientOverrides.SubscriberClient = {};
v1ClientOverrides.SubscriberClient.scopes = ['a', 'b', 'c'];
Expand Down
19 changes: 19 additions & 0 deletions test/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,25 @@ describe('Subscription', () => {
});
});

describe('debug', () => {
const error = new Error('err') as ServiceError;

beforeEach(() => {
subscription.request = (config, callback) => {
callback(error);
};
});

it('should return the debug events to the callback', done => {
subscription.on('debug', err => {
assert.strictEqual(err, error);
done();
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(subscription as any)._subscriber.emit('debug', error);
});
});

describe('delete', () => {
beforeEach(() => {
sandbox.stub(subscription, 'removeAllListeners').yields(util.noop);
Expand Down

0 comments on commit abd10cc

Please sign in to comment.