Skip to content

Commit

Permalink
feat: update defaults for batch settings also, and update which resul…
Browse files Browse the repository at this point in the history
…t codes will cause a retry (#877)

* fix(defaults): update batch setting defaults to match other language libraries

* fix(retries): update codes we'll retry on, and add a special check for server shutdown

* fix: clear up a few member names, and fix some issues from unit tests

* chore: run lint fixer
  • Loading branch information
feywind authored Feb 3, 2020
1 parent c91c92f commit 32ae411
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 34 deletions.
35 changes: 25 additions & 10 deletions src/default-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,32 @@
// These options will be used library-wide. They're specified here so that
// they can be changed easily in the future.
export const defaultOptions = {
// The maximum number of messages that may be queued for sending.
maxOutstandingMessages: 1000,
subscription: {
// The maximum number of messages that may be queued for receiving,
// with the default lease manager.
maxOutstandingMessages: 1000,

// The maximum amount of message data that may be queued for sending,
// in bytes.
maxOutstandingBytes: 100 * 1024 * 1024,
// The maximum amount of message data that may be queued for receiving,
// in bytes, with the default lease manager.
maxOutstandingBytes: 100 * 1024 * 1024,

// The maximum number of minutes that a message's lease will ever
// be extended.
maxExtensionMinutes: 60,
// The maximum number of minutes that a message's lease will ever
// be extended.
maxExtensionMinutes: 60,

// The maximum number of streams/threads that will ever be opened.
maxStreams: 5,
// The maximum number of subscription streams/threads that will ever
// be opened.
maxStreams: 5,
},

publish: {
// The maximum number of messages we'll batch up for publish().
maxOutstandingMessages: 100,

// The maximum size of the total batched up messages for publish().
maxOutstandingBytes: 1 * 1024 * 1024,

// The maximum time we'll wait to send batched messages, in milliseconds.
maxDelayMillis: 10,
},
};
6 changes: 3 additions & 3 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ export class LeaseManager extends EventEmitter {
setOptions(options: FlowControlOptions): void {
const defaults: FlowControlOptions = {
allowExcessMessages: true,
maxBytes: defaultOptions.maxOutstandingBytes,
maxExtension: defaultOptions.maxExtensionMinutes,
maxMessages: defaultOptions.maxOutstandingMessages,
maxBytes: defaultOptions.subscription.maxOutstandingBytes,
maxExtension: defaultOptions.subscription.maxExtensionMinutes,
maxMessages: defaultOptions.subscription.maxOutstandingMessages,
};

this._options = Object.assign(defaults, options);
Expand Down
2 changes: 1 addition & 1 deletion src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const PULL_TIMEOUT = require('./v1/subscriber_client_config.json').interfaces[
*/
const DEFAULT_OPTIONS: MessageStreamOptions = {
highWaterMark: 0,
maxStreams: defaultOptions.maxStreams,
maxStreams: defaultOptions.subscription.maxStreams,
timeout: 300000,
};

Expand Down
7 changes: 4 additions & 3 deletions src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {Queue, OrderedQueue} from './message-queues';
import {Topic} from '../topic';
import {RequestCallback} from '../pubsub';
import {google} from '../../proto/pubsub';
import {defaultOptions} from '../default-options';

export type PubsubMessage = google.pubsub.v1.IPubsubMessage;

Expand Down Expand Up @@ -168,9 +169,9 @@ export class Publisher {
setOptions(options = {} as PublishOptions): void {
const defaults = {
batching: {
maxBytes: Math.pow(1024, 2) * 5,
maxMessages: 1000,
maxMilliseconds: 100,
maxBytes: defaultOptions.publish.maxOutstandingBytes,
maxMessages: defaultOptions.publish.maxOutstandingMessages,
maxMilliseconds: defaultOptions.publish.maxDelayMillis,
},
messageOrdering: false,
gaxOpts: {
Expand Down
12 changes: 8 additions & 4 deletions src/pull-retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@ import {StatusObject, status} from '@grpc/grpc-js';
* retryable status codes
*/
export const RETRY_CODES: status[] = [
status.OK,
status.CANCELLED,
status.UNKNOWN,
status.DEADLINE_EXCEEDED,
status.RESOURCE_EXHAUSTED,
status.ABORTED,
status.INTERNAL,
status.UNAVAILABLE,
status.DATA_LOSS,
];

/**
Expand Down Expand Up @@ -73,6 +69,14 @@ export class PullRetry {
this.failures += 1;
}

if (
err.code === status.UNAVAILABLE &&
err.details &&
err.details.match(/Server shutdownNow invoked/)
) {
return true;
}

return RETRY_CODES.includes(err.code);
}
}
6 changes: 4 additions & 2 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,16 @@ export class Subscriber extends EventEmitter {
// 1 message at a time.
if (options.flowControl) {
const {
maxMessages = defaultOptions.maxOutstandingMessages,
maxMessages = defaultOptions.subscription.maxOutstandingMessages,
} = options.flowControl;

if (!options.streamingOptions) {
options.streamingOptions = {} as MessageStreamOptions;
}

const {maxStreams = defaultOptions.maxStreams} = options.streamingOptions;
const {
maxStreams = defaultOptions.subscription.maxStreams,
} = options.streamingOptions;
options.streamingOptions.maxStreams = Math.min(maxStreams, maxMessages);
}
}
Expand Down
8 changes: 6 additions & 2 deletions test/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,17 @@ describe('LeaseManager', () => {
assert.strictEqual(leaseManager.isFull(), false);

leaseManager.remove(littleMessage);
bigMessage.length = defaultOptions.maxOutstandingBytes * 2;
bigMessage.length = defaultOptions.subscription.maxOutstandingBytes * 2;
leaseManager.add(bigMessage as Message);
assert.strictEqual(leaseManager.isFull(), true);
});

it('should cap maxMessages', () => {
for (let i = 0; i < defaultOptions.maxOutstandingMessages; i++) {
for (
let i = 0;
i < defaultOptions.subscription.maxOutstandingMessages;
i++
) {
assert.strictEqual(leaseManager.isFull(), false);
leaseManager.add(new FakeMessage() as Message);
}
Expand Down
7 changes: 5 additions & 2 deletions test/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ describe('MessageStream', () => {
});

it('should default maxStreams', () => {
assert.strictEqual(client.streams.length, defaultOptions.maxStreams);
assert.strictEqual(
client.streams.length,
defaultOptions.subscription.maxStreams
);
});

it('should pull pullTimeouts default from config file', () => {
Expand Down Expand Up @@ -252,7 +255,7 @@ describe('MessageStream', () => {
setImmediate(() => {
assert.strictEqual(
client.streams.length,
defaultOptions.maxStreams
defaultOptions.subscription.maxStreams
);
client.streams.forEach(stream => {
assert.strictEqual(
Expand Down
8 changes: 5 additions & 3 deletions test/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import * as p from '../../src/publisher';
import * as q from '../../src/publisher/message-queues';
import {PublishError} from '../../src/publisher/publish-error';

import {defaultOptions} from '../../src/default-options';

let promisified = false;
const fakePromisify = Object.assign({}, pfy, {
promisifyAll: (ctor: Function, options: pfy.PromisifyAllOptions) => {
Expand Down Expand Up @@ -262,9 +264,9 @@ describe('Publisher', () => {

assert.deepStrictEqual(publisher.settings, {
batching: {
maxBytes: Math.pow(1024, 2) * 5,
maxMessages: 1000,
maxMilliseconds: 100,
maxBytes: defaultOptions.publish.maxOutstandingBytes,
maxMessages: defaultOptions.publish.maxOutstandingMessages,
maxMilliseconds: defaultOptions.publish.maxDelayMillis,
},
messageOrdering: false,
gaxOpts: {
Expand Down
10 changes: 6 additions & 4 deletions test/pull-retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,21 @@ describe('PullRetry', () => {
describe('retry', () => {
it('should return true for retryable errors', () => {
[
status.OK,
status.CANCELLED,
status.UNKNOWN,
status.DEADLINE_EXCEEDED,
status.RESOURCE_EXHAUSTED,
status.ABORTED,
status.INTERNAL,
status.UNAVAILABLE,
status.DATA_LOSS,
].forEach((code: status) => {
const shouldRetry = retrier.retry({code} as StatusObject);
assert.strictEqual(shouldRetry, true);
});

const serverShutdown = retrier.retry({
code: status.UNAVAILABLE,
details: 'Server shutdownNow invoked',
} as StatusObject);
assert.strictEqual(serverShutdown, true);
});

it('should return false for non-retryable errors', () => {
Expand Down

0 comments on commit 32ae411

Please sign in to comment.