Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add publisher side flow control #1359

Merged
merged 24 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c5cd342
feat: add publisher flow control library support
feywind Jul 7, 2021
e34d868
chore: exorcise promisifyAll() from the rest of the library
feywind Jul 22, 2021
2e4f725
feat: finish up working publisher side flow control in the library it…
feywind Jul 22, 2021
fe37eb2
feat: another pass at the flow control support in the library
feywind Jul 27, 2021
a6c8e90
chore: merge remote-tracking branch 'remotes/origin/master' into pub-…
feywind Jul 27, 2021
cb263ee
chore: water the linters
feywind Jul 27, 2021
e249ec6
tests: add a test for the new sample
feywind Jul 30, 2021
759e640
Merge branch 'master' into pub-flow-control
feywind Aug 10, 2021
49ffb8b
chore: merge remote-tracking branch 'remotes/origin/main' into pub-fl…
feywind Aug 30, 2021
86fdd53
fix: updates from code reviews and merges from other PRs
feywind Aug 31, 2021
1fcfa20
fix: shift from deferredCatch() to PublishWhenReadyOptions
feywind Aug 31, 2021
ef7bd57
tests: update unit tests
feywind Aug 31, 2021
c96bee4
build: pin typescript to 4.3.x for generated samples (4.4 breaks catc…
feywind Sep 1, 2021
b6e3afe
samples: add sample for publisher flow control, and a test for the sa…
feywind Sep 1, 2021
3ce3f63
🦉 Updates from OwlBot
gcf-owl-bot[bot] Sep 1, 2021
d39365e
docs: revert "🦉 Updates from OwlBot" because it's not in `main` yet
feywind Sep 1, 2021
be00d5e
fix: renamed a few flow control things in response to API feedback
feywind Sep 2, 2021
567c54b
tests: remove all messages from the test subscription after testing p…
feywind Sep 2, 2021
bfd297d
feat: redo publisher flow control API, once more with feeling
feywind Sep 15, 2021
c17aedb
lint: fix a nit
feywind Sep 15, 2021
f5832fa
samples: update publisher flow control sample, js flavour
feywind Sep 15, 2021
95da483
chore: merge remote-tracking branch 'remotes/origin/main' into pub-fl…
feywind Sep 15, 2021
700974c
chore: updates from CI results and self-review
feywind Sep 15, 2021
08a0784
chore: fixes from comments on the PR
feywind Sep 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions samples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"gts": "^3.1.0",
"mocha": "^8.0.0",
"rimraf": "^3.0.2",
"typescript": "~4.3.0",
"uuid": "^8.0.0"
}
}
87 changes: 87 additions & 0 deletions samples/publishWithFlowControl.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// This is a generated sample. Please see typescript/README.md for more info.

'use strict';

// sample-metadata:
// title: Publish with flow control
// description: Publishes to a topic using publisher-side flow control.
// usage: node publishWithFlowControl.js <topic-name>

// [START pubsub_publisher_flow_control]
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishWithFlowControl(topicName) {
// Create publisher options
const options = {
flowControlOptions: {
maxOutstandingMessages: 50,
maxOutstandingBytes: 10 * 1024 * 1024, // 10 MB
},
};

// Get a publisher.
const topic = pubSubClient.topic(topicName, options);

// For flow controlled publishing, we'll use a publisher flow controller
// instead of `topic.publish()`.
const flow = topic.flowControlled();

// Publish messages in a fast loop.
const testMessage = {data: Buffer.from('test!')};
for (let i = 0; i < 1000; i++) {
// You can also just `await` on `publish()` unconditionally, but if
// you want to avoid pausing to the event loop on each iteration,
// you can manually check the return value before doing so.
const wait = flow.publish(testMessage);
if (wait) {
await wait;
}
}

// Wait on any pending publish requests. Note that you can call `all()`
// earlier if you like, and it will return a Promise for all messages
// that have been sent to `flowController.publish()` so far.
const messageIds = await flow.all();
console.log(`Published ${messageIds.length} with flow control settings.`);
}
// [END pubsub_publisher_flow_control]

function main(topicName = 'YOUR_TOPIC_NAME') {
publishWithFlowControl(topicName).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
16 changes: 16 additions & 0 deletions samples/system-test/topics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,22 @@ describe('topics', () => {
assert.strictEqual(receivedMessage.data.toString(), expectedMessage.data);
});

it('should publish with flow control', async () => {
const [subscription] = await pubsub
.topic(topicNameThree)
.subscription(subscriptionNameOne)
.get({autoCreate: true});
const output = execSync(
`${commandFor('publishWithFlowControl')} ${topicNameThree}`
);
const receivedMessage = await _pullOneMessage(subscription);
assert.strictEqual(receivedMessage.data.toString(), 'test!');
assert.ok(output.indexOf('Published 1000 with flow control settings') >= 0);
feywind marked this conversation as resolved.
Show resolved Hide resolved

// Junk any remaining published messages.
await subscription.seek(new Date(Date.now()));
feywind marked this conversation as resolved.
Show resolved Hide resolved
});

it('should publish a JSON message', async () => {
const [subscription] = await pubsub
.topic(topicNameThree)
Expand Down
83 changes: 83 additions & 0 deletions samples/typescript/publishWithFlowControl.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Publish with flow control
// description: Publishes to a topic using publisher-side flow control.
// usage: node publishWithFlowControl.js <topic-name>

// [START pubsub_publisher_flow_control]
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';

// Imports the Google Cloud client library
import {PubSub, PublishOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishWithFlowControl(topicName: string) {
// Create publisher options
const options: PublishOptions = {
flowControlOptions: {
maxOutstandingMessages: 50,
maxOutstandingBytes: 10 * 1024 * 1024, // 10 MB
},
};

// Get a publisher.
const topic = pubSubClient.topic(topicName, options);

// For flow controlled publishing, we'll use a publisher flow controller
// instead of `topic.publish()`.
const flow = topic.flowControlled();

// Publish messages in a fast loop.
const testMessage = {data: Buffer.from('test!')};
for (let i = 0; i < 1000; i++) {
// You can also just `await` on `publish()` unconditionally, but if
// you want to avoid pausing to the event loop on each iteration,
// you can manually check the return value before doing so.
const wait = flow.publish(testMessage);
if (wait) {
await wait;
}
}

// Wait on any pending publish requests. Note that you can call `all()`
// earlier if you like, and it will return a Promise for all messages
// that have been sent to `flowController.publish()` so far.
const messageIds = await flow.all();
console.log(`Published ${messageIds.length} with flow control settings.`);
}
// [END pubsub_publisher_flow_control]

function main(topicName = 'YOUR_TOPIC_NAME') {
publishWithFlowControl(topicName).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
13 changes: 9 additions & 4 deletions src/iam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
* @module pubsub/iam
*/

import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import {CallOptions, IamProtos} from 'google-gax';

import {google} from '../protos/protos';

import {Omit, PubSub, RequestCallback, ResourceCallback} from './pubsub';
import {promisifySome} from './util';

export type Policy = {
etag?: string | Buffer;
Expand Down Expand Up @@ -393,7 +393,12 @@ export class IAM {

/*! Developer Documentation
*
* All async methods (except for streams) will return a Promise in the event
* that a callback is omitted.
* Existing async methods (except for streams) will return a Promise in the event
* that a callback is omitted. Future methods will not allow for a callback.
* (Use .then() on the returned Promise instead.)
*/
promisifyAll(IAM);
promisifySome(IAM, IAM.prototype, [
'getPolicy',
'setPolicy',
'testPermissions',
]);
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export {
export {Attributes, PublishCallback, PublishOptions} from './publisher';
export {BatchPublishOptions} from './publisher/message-batch';
export {PublishError} from './publisher/publish-error';
export {FlowControlOptions} from './publisher/flow-control';
export {
PageOptions,
GetSnapshotsCallback,
Expand Down
Loading