diff --git a/pubsub/README.md b/pubsub/README.md index abe3140dbd..7f52f194bf 100644 --- a/pubsub/README.md +++ b/pubsub/README.md @@ -39,18 +39,22 @@ View the [documentation][topics_0_docs] or the [source code][topics_0_code]. __Usage:__ `node topics.js --help` ``` +topics.js + Commands: - list Lists all topics in the current project. - create Creates a new topic. - delete Deletes a topic. - publish Publishes a message to a topic. - publish-ordered Publishes an ordered message to a topic. - get-policy Gets the IAM policy for a topic. - set-policy Sets the IAM policy for a topic. - test-permissions Tests the permissions for a topic. + topics.js list Lists all topics in the current project. + topics.js create Creates a new topic. + topics.js delete Deletes a topic. + topics.js publish Publishes a message to a topic. + topics.js publish-batch Publishes messages to a topic using custom batching settings. + topics.js publish-ordered Publishes an ordered message to a topic. + topics.js get-policy Gets the IAM policy for a topic. + topics.js set-policy Sets the IAM policy for a topic. + topics.js test-permissions Tests the permissions for a topic. Options: - --help Show help [boolean] + --version Show version number [boolean] + --help Show help [boolean] Examples: node topics.js list @@ -58,6 +62,8 @@ Examples: node topics.js delete my-topic node topics.js publish my-topic "Hello, world!" node topics.js publish my-topic '{"data":"Hello, world!"}' + node topics.js publish-ordered my-topic "Hello, world!" + node topics.js publish-batch my-topic "Hello, world!" -w 1000 node topics.js get-policy greetings node topics.js set-policy greetings node topics.js test-permissions greetings @@ -75,27 +81,39 @@ View the [documentation][subscriptions_1_docs] or the [source code][subscription __Usage:__ `node subscriptions.js --help` ``` +subscriptions.js + Commands: - list [topicName] Lists all subscriptions in the current project, optionally filtering by a - topic. - create Creates a new subscription. - create-push Creates a new push subscription. - delete Deletes a subscription. - get Gets the metadata for a subscription. - listen Listens to messages for a subscription. - get-policy Gets the IAM policy for a subscription. - set-policy Sets the IAM policy for a subscription. - test-permissions Tests the permissions for a subscription. + subscriptions.js list [topicName] Lists all subscriptions in the current project, + optionally filtering by a topic. + subscriptions.js create Creates a new subscription. + subscriptions.js create-flow Creates a new subscription with flow-control limits, + which don't persist between subscriptions. + subscriptions.js create-push Creates a new push subscription. + subscriptions.js modify-config Modifies the configuration of an existing push + subscription. + subscriptions.js delete Deletes a subscription. + subscriptions.js get Gets the metadata for a subscription. + subscriptions.js listen-messages Listens to messages for a subscription. + subscriptions.js listen-errors Listens to messages and errors for a subscription. + subscriptions.js get-policy Gets the IAM policy for a subscription. + subscriptions.js set-policy Sets the IAM policy for a subscription. + subscriptions.js test-permissions Tests the permissions for a subscription. Options: - --help Show help [boolean] + --version Show version number [boolean] + --help Show help [boolean] Examples: node subscriptions.js list node subscriptions.js list my-topic node subscriptions.js create my-topic worker-1 + node subscriptions.js create-flow my-topic worker-1 -m 5 node subscriptions.js create-push my-topic worker-1 + node subscriptions.js modify-config my-topic worker-1 node subscriptions.js get worker-1 + node subscriptions.js listen-messages my-subscription + node subscriptions.js listen-errors my-subscription node subscriptions.js delete worker-1 node subscriptions.js pull worker-1 node subscriptions.js get-policy worker-1 diff --git a/pubsub/package.json b/pubsub/package.json index 011cf3f3c8..b6a9dbe47d 100644 --- a/pubsub/package.json +++ b/pubsub/package.json @@ -17,14 +17,14 @@ "test": "samples test run --cmd ava -- -T 30s --verbose system-test/*.test.js" }, "dependencies": { - "@google-cloud/pubsub": "0.14.0", - "yargs": "8.0.2" + "@google-cloud/pubsub": "0.14.5", + "yargs": "10.0.3" }, "devDependencies": { - "@google-cloud/nodejs-repo-tools": "1.4.17", + "@google-cloud/nodejs-repo-tools": "2.0.11", "ava": "0.22.0", "proxyquire": "1.8.0", - "sinon": "3.2.1" + "sinon": "4.0.1" }, "cloud-repo-tools": { "requiresKeyFile": true, diff --git a/pubsub/subscriptions.js b/pubsub/subscriptions.js index b9d8b93438..15ad1f224c 100644 --- a/pubsub/subscriptions.js +++ b/pubsub/subscriptions.js @@ -84,6 +84,32 @@ function createSubscription (topicName, subscriptionName) { } // [END pubsub_create_subscription] +// [START pubsub_subscriber_flow_settings] +function createFlowControlledSubscription (topicName, subscriptionName, maxInProgress, maxBytes) { + // Instantiates a client + const pubsub = PubSub(); + + // References an existing topic, e.g. "my-topic" + const topic = pubsub.topic(topicName); + + // Creates a new subscription, e.g. "my-new-subscription" + // Note that flow control configurations are not persistent + return topic.createSubscription(subscriptionName, { + flowControl: { + maxBytes: maxBytes, + maxMessages: maxInProgress + } + }) + .then((results) => { + const subscription = results[0]; + + console.log(`Subscription ${subscription.name} created with a maximum of ${maxInProgress} unprocessed messages.`); + + return subscription; + }); +} +// [END pubsub_subscriber_flow_settings] + // [START pubsub_create_push_subscription] function createPushSubscription (topicName, subscriptionName) { // Instantiates a client @@ -112,6 +138,28 @@ function createPushSubscription (topicName, subscriptionName) { } // [END pubsub_create_push_subscription] +// [START pubsub_modify_push_config] +function modifyPushConfig (topicName, subscriptionName, pushEndpoint) { + // Instantiates a client + const pubsub = PubSub(); + + // References an existing topic and subscription, e.g. "my-topic" > "my-subscription" + const topic = pubsub.topic(topicName); + const subscription = topic.subscription(subscriptionName); + + const options = { + // Set to an HTTPS endpoint of your choice. If necessary, register + // (authorize) the domain on which the server is hosted. + pushEndpoint: `https://${pubsub.projectId}.appspot.com/push` + }; + + return subscription.modifyPushConfig(options) + .then((results) => { + console.log(`Modified push config for subscription ${subscription.name}.`); + }); +} +// [END pubsub_modify_push_config] + // [START pubsub_delete_subscription] function deleteSubscription (subscriptionName) { // Instantiates a client @@ -246,6 +294,42 @@ function listenForOrderedMessages (subscriptionName, timeout) { } // [END pubsub_listen_ordered_messages] +// [START pubsub_listen_errors] +function listenForErrors (subscriptionName, timeout) { + // Instantiates a client + const pubsub = PubSub(); + + // References an existing subscription, e.g. "my-subscription" + const subscription = pubsub.subscription(subscriptionName); + + // Create an event handler to handle messages + const messageHandler = function (message) { + // Do something with the message + console.log(`Message: ${message}`); + + // "Ack" (acknowledge receipt of) the message + message.ack(); + }; + + // Create an event handler to handle errors + const errorHandler = function (error) { + // Do something with the error + console.error(`ERROR: ${error}`); + }; + + // Listen for new messages/errors until timeout is hit + return new Promise((resolve) => { + subscription.on(`message`, messageHandler); + subscription.on(`error`, errorHandler); + setTimeout(() => { + subscription.removeListener(`message`, messageHandler); + subscription.removeListener(`error`, errorHandler); + resolve(); + }, timeout * 1000); + }); +} +// [END pubsub_listen_errors] + // [START pubsub_get_subscription_policy] function getSubscriptionPolicy (subscriptionName) { // Instantiates a client @@ -349,12 +433,35 @@ const cli = require(`yargs`) {}, (opts) => createSubscription(opts.topicName, opts.subscriptionName) ) + .command( + `create-flow `, + `Creates a new subscription with flow-control limits, which don't persist between subscriptions.`, + { + maxInProgress: { + alias: 'm', + type: 'number', + default: 0 + }, + maxBytes: { + alias: 'b', + type: 'number', + default: 0 + } + }, + (opts) => createFlowControlledSubscription(opts.topicName, opts.subscriptionName, opts.maxInProgress, opts.maxBytes) + ) .command( `create-push `, `Creates a new push subscription.`, {}, (opts) => createPushSubscription(opts.topicName, opts.subscriptionName) ) + .command( + `modify-config `, + `Modifies the configuration of an existing push subscription.`, + {}, + (opts) => modifyPushConfig(opts.topicName, opts.subscriptionName) + ) .command( `delete `, `Deletes a subscription.`, @@ -368,7 +475,7 @@ const cli = require(`yargs`) (opts) => getSubscription(opts.subscriptionName) ) .command( - `listen `, + `listen-messages `, `Listens to messages for a subscription.`, { timeout: { @@ -379,6 +486,18 @@ const cli = require(`yargs`) }, (opts) => listenForMessages(opts.subscriptionName, opts.timeout) ) + .command( + `listen-errors `, + `Listens to messages and errors for a subscription.`, + { + timeout: { + alias: 't', + type: 'number', + default: 10 + } + }, + (opts) => listenForErrors(opts.subscriptionName, opts.timeout) + ) .command( `get-policy `, `Gets the IAM policy for a subscription.`, @@ -400,8 +519,12 @@ const cli = require(`yargs`) .example(`node $0 list`) .example(`node $0 list my-topic`) .example(`node $0 create my-topic worker-1`) + .example(`node $0 create-flow my-topic worker-1 -m 5`) .example(`node $0 create-push my-topic worker-1`) + .example(`node $0 modify-config my-topic worker-1`) .example(`node $0 get worker-1`) + .example(`node $0 listen-messages my-subscription`) + .example(`node $0 listen-errors my-subscription`) .example(`node $0 delete worker-1`) .example(`node $0 pull worker-1`) .example(`node $0 get-policy worker-1`) diff --git a/pubsub/system-test/subscriptions.test.js b/pubsub/system-test/subscriptions.test.js index bcd563badc..7b8f2325cd 100644 --- a/pubsub/system-test/subscriptions.test.js +++ b/pubsub/system-test/subscriptions.test.js @@ -27,10 +27,12 @@ const topicNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`; const subscriptionNameOne = `nodejs-docs-samples-test-sub-${uuid.v4()}`; const subscriptionNameTwo = `nodejs-docs-samples-test-sub-${uuid.v4()}`; const subscriptionNameThree = `nodejs-docs-samples-test-sub-${uuid.v4()}`; +const subscriptionNameFour = `nodejs-docs-samples-test-sub-${uuid.v4()}`; const projectId = process.env.GCLOUD_PROJECT; const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`; const fullSubscriptionNameOne = `projects/${projectId}/subscriptions/${subscriptionNameOne}`; const fullSubscriptionNameTwo = `projects/${projectId}/subscriptions/${subscriptionNameTwo}`; +const fullSubscriptionNameFour = `projects/${projectId}/subscriptions/${subscriptionNameFour}`; const cmd = `node subscriptions.js`; test.before(tools.checkCredentials); @@ -81,6 +83,12 @@ test.serial(`should create a push subscription`, async (t) => { }).start(); }); +test.serial(`should modify the config of an existing push subscription`, async (t) => { + t.plan(1); + const output = await tools.runAsync(`${cmd} modify-config ${topicNameTwo} ${subscriptionNameTwo}`, cwd); + t.is(output, `Modified push config for subscription ${fullSubscriptionNameTwo}.`); +}); + test.serial(`should get metadata for a subscription`, async (t) => { const output = await tools.runAsync(`${cmd} get ${subscriptionNameOne}`, cwd); const expected = `Subscription: ${fullSubscriptionNameOne}` + @@ -111,9 +119,8 @@ test.serial(`should list subscriptions for a topic`, async (t) => { }); test.serial(`should listen for messages`, async (t) => { - const expected = `Hello, world!`; - const messageIds = await pubsub.topic(topicNameOne).publisher().publish(Buffer.from(expected)); - const output = await tools.runAsync(`${cmd} listen ${subscriptionNameOne}`, cwd); + const messageIds = await pubsub.topic(topicNameOne).publisher().publish(Buffer.from(`Hello, world!`)); + const output = await tools.runAsync(`${cmd} listen-messages ${subscriptionNameOne}`, cwd); t.true(output.includes(`Received message ${messageIds[0]}:`)); }); @@ -148,6 +155,11 @@ test.serial(`should listen for ordered messages`, async (t) => { }); }); +test.serial(`should listen for error messages`, async (t) => { + const output = await tools.runAsyncWithIO(`${cmd} listen-errors nonexistent-subscription -t 3`, cwd); + t.true(output.stderr.includes(`Resource not found`)); +}); + test.serial(`should set the IAM policy for a subscription`, async (t) => { await tools.runAsync(`${cmd} set-policy ${subscriptionNameOne}`, cwd); const results = await pubsub.subscription(subscriptionNameOne).iam.getPolicy(); @@ -185,3 +197,13 @@ test.serial(`should delete a subscription`, async (t) => { assert(subscriptions.every((s) => s.name !== fullSubscriptionNameOne)); }).start(); }); + +test.serial(`should create a subscription with flow control`, async (t) => { + t.plan(1); + const output = await tools.runAsync(`${cmd} create-flow ${topicNameTwo} ${subscriptionNameFour} -m 5 -b 1024`, cwd); + t.is(output, `Subscription ${fullSubscriptionNameFour} created with a maximum of 5 unprocessed messages.`); + await tools.tryTest(async (assert) => { + const [subscriptions] = await pubsub.topic(topicNameTwo).getSubscriptions(); + assert(subscriptions.some((s) => s.name === fullSubscriptionNameFour)); + }).start(); +}); diff --git a/pubsub/system-test/topics.test.js b/pubsub/system-test/topics.test.js index 03537df661..2e0844c708 100644 --- a/pubsub/system-test/topics.test.js +++ b/pubsub/system-test/topics.test.js @@ -26,6 +26,7 @@ const topicNameOne = `nodejs-docs-samples-test-${uuid.v4()}`; const topicNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`; const subscriptionNameOne = `nodejs-docs-samples-test-${uuid.v4()}`; const subscriptionNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`; +const subscriptionNameThree = `nodejs-docs-samples-test-${uuid.v4()}`; const projectId = process.env.GCLOUD_PROJECT; const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`; const expectedMessage = { data: `Hello, world!` }; @@ -48,6 +49,9 @@ test.after.always(async () => { try { await pubsub.subscription(subscriptionNameTwo).delete(); } catch (err) {} // ignore error + try { + await pubsub.subscription(subscriptionNameThree).delete(); + } catch (err) {} // ignore error try { await pubsub.topic(topicNameTwo).delete(); } catch (err) {} // ignore error @@ -131,6 +135,18 @@ test.serial(`should publish ordered messages`, async (t) => { await topics.publishOrderedMessage(topicNameTwo, expectedMessage.data); }); +test.serial(`should publish with specific batch settings`, async (t) => { + t.plan(2); + const expectedWait = 1000; + const [subscription] = await pubsub.topic(topicNameOne).createSubscription(subscriptionNameThree); + const startTime = Date.now(); + await tools.runAsync(`${cmd} publish-batch ${topicNameOne} "${expectedMessage.data}" -w ${expectedWait}`, cwd); + const receivedMessage = await _pullOneMessage(subscription); + const publishTime = Date.parse(receivedMessage.publishTime); + t.is(receivedMessage.data.toString(), expectedMessage.data); + t.true(publishTime - startTime > expectedWait); +}); + test.serial(`should set the IAM policy for a topic`, async (t) => { await tools.runAsync(`${cmd} set-policy ${topicNameOne}`, cwd); const results = await pubsub.topic(topicNameOne).iam.getPolicy(); diff --git a/pubsub/topics.js b/pubsub/topics.js index 0d7f06c275..9d4363f7de 100644 --- a/pubsub/topics.js +++ b/pubsub/topics.js @@ -26,7 +26,7 @@ const PubSub = require(`@google-cloud/pubsub`); // [START pubsub_list_topics] -function listTopics () { +function listAllTopics () { // Instantiates a client const pubsub = PubSub(); @@ -100,6 +100,35 @@ function publishMessage (topicName, data) { } // [END pubsub_publish_message] +// [START pubsub_publisher_batched_settings] +function publishBatchedMessages (topicName, data, maxMessages, maxWaitTime) { + // Instantiates a client + const pubsub = PubSub(); + + // References an existing topic, e.g. "my-topic" + const topic = pubsub.topic(topicName); + + // Create a publisher for the topic (with additional batching configuration) + const publisher = topic.publisher({ + batching: { + maxMessages: maxMessages, + maxMilliseconds: maxWaitTime + } + }); + + // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + return publisher.publish(dataBuffer) + .then((results) => { + const messageId = results[0]; + + console.log(`Message ${messageId} published.`); + + return messageId; + }); +} +// [END pubsub_publisher_batched_settings] + let publishCounterValue = 1; function getPublishCounterValue () { @@ -233,7 +262,7 @@ const cli = require(`yargs`) `list`, `Lists all topics in the current project.`, {}, - listTopics + listAllTopics ) .command( `create `, @@ -254,6 +283,25 @@ const cli = require(`yargs`) (opts) => { publishMessage(opts.topicName, opts.message); } + ) + .command( + `publish-batch `, + `Publishes messages to a topic using custom batching settings.`, + { + maxWaitTime: { + alias: 'w', + type: 'number', + default: 10 + }, + maxMessages: { + alias: 'm', + type: 'number', + default: 10 + } + }, + (opts) => { + publishBatchedMessages(opts.topicName, opts.message, opts.maxMessages, opts.maxWaitTime); + } ) .command( `publish-ordered `, @@ -291,6 +339,8 @@ const cli = require(`yargs`) .example(`node $0 delete my-topic`) .example(`node $0 publish my-topic "Hello, world!"`) .example(`node $0 publish my-topic '{"data":"Hello, world!"}'`) + .example(`node $0 publish-ordered my-topic "Hello, world!"`) + .example(`node $0 publish-batch my-topic "Hello, world!" -w 1000`) .example(`node $0 get-policy greetings`) .example(`node $0 set-policy greetings`) .example(`node $0 test-permissions greetings`)