Skip to content

Commit

Permalink
updating pub/sub samples (#928)
Browse files Browse the repository at this point in the history
* updating pub/sub samples

* another update
  • Loading branch information
jabubake authored Nov 22, 2017
1 parent 04ffdf0 commit 059bb0e
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public void doPost(HttpServletRequest req, HttpServletResponse resp)
String topicId = System.getenv("PUBSUB_TOPIC");
// create a publisher on the topic
if (publisher == null) {
publisher = Publisher.defaultBuilder(
TopicName.create(ServiceOptions.getDefaultProjectId(), topicId))
publisher = Publisher.newBuilder(
TopicName.of(ServiceOptions.getDefaultProjectId(), topicId))
.build();
}
// construct a pubsub message from the payload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.example.pubsub;

// [START pubsub_quickstart_create_subscription]
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.PushConfig;
Expand All @@ -43,15 +44,19 @@ public static void main(String... args) throws Exception {
// Your subscription ID eg. "my-sub"
String subscriptionId = args[1];

TopicName topicName = TopicName.create(projectId, topicId);
TopicName topicName = TopicName.of(projectId, topicId);

// Create a new subscription
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
// create a pull subscription with default acknowledgement deadline (= 10 seconds)
Subscription subscription =
subscriptionAdminClient.createSubscription(
subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
} catch (ApiException e) {
// example : code = ALREADY_EXISTS(409) implies subscription already exists
System.out.print(e.getStatusCode().getCode());
System.out.print(e.isRetryable());
}

System.out.printf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

// [START pubsub_quickstart_create_topic]
// Imports the Google Cloud client library
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.TopicName;
Expand All @@ -39,9 +40,13 @@ public static void main(String... args) throws Exception {
String topicId = args[0];

// Create a new topic
TopicName topic = TopicName.create(projectId, topicId);
TopicName topic = TopicName.of(projectId, topicId);
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
topicAdminClient.createTopic(topic);
} catch (ApiException e) {
// example : code = ALREADY_EXISTS(409) implies topic already exists
System.out.print(e.getStatusCode().getCode());
System.out.print(e.isRetryable());
}

System.out.printf("Topic %s:%s created.\n", topic.getProject(), topic.getTopic());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,67 @@
// [START pubsub_quickstart_publisher]

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.util.ArrayList;
import java.util.List;

public class PublisherExample {

static final int MESSAGE_COUNT = 5;

// use the default project id
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

//schedule a message to be published, messages are automatically batched
private static ApiFuture<String> publishMessage(Publisher publisher, String message)
throws Exception {
// convert message to bytes
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
return publisher.publish(pubsubMessage);
}

/** Publish messages to a topic. */
/** Publish messages to a topic.
* @param args topic name, number of messages
*/
public static void main(String... args) throws Exception {
// topic id, eg. "my-topic"
String topicId = args[0];
TopicName topicName = TopicName.create(PROJECT_ID, topicId);
int messageCount = Integer.parseInt(args[1]);
TopicName topicName = TopicName.of(PROJECT_ID, topicId);
Publisher publisher = null;
List<ApiFuture<String>> apiFutures = new ArrayList<>();
try {
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.defaultBuilder(topicName).build();
for (int i = 0; i < MESSAGE_COUNT; i++) {
publisher = Publisher.newBuilder(topicName).build();

for (int i = 0; i < messageCount; i++) {
String message = "message-" + i;
ApiFuture<String> messageId = publishMessage(publisher, message);
apiFutures.add(messageId);

// convert message to bytes
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(data)
.build();

//schedule a message to be published, messages are automatically batched
ApiFuture<String> future = publisher.publish(pubsubMessage);

// add an asynchronous callback to handle success / failure
ApiFutures.addCallback(future, new ApiFutureCallback<String>() {

@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// details on the API exception
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + message);
}

@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic)
System.out.println(messageId);
}
});
}
} finally {
// Once published, returns server-assigned message ids (unique within the topic)
List<String> messageIds = ApiFutures.allAsList(apiFutures).get();
for (String messageId : messageIds) {
System.out.println(messageId);
}
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
public static void main(String... args) throws Exception {
// set subscriber id, eg. my-sub
String subscriptionId = args[0];
SubscriptionName subscriptionName = SubscriptionName.create(PROJECT_ID, subscriptionId);
SubscriptionName subscriptionName = SubscriptionName.of(PROJECT_ID, subscriptionId);
Subscriber subscriber = null;
try {
// create a subscriber bound to the asynchronous message receiver
subscriber =
Subscriber.defaultBuilder(subscriptionName, new MessageReceiverExample()).build();
Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
subscriber.startAsync().awaitRunning();
// Continue to listen to messages
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class QuickStartIT {
private String projectId = ServiceOptions.getDefaultProjectId();
private String topicId = formatForTest("my-topic");
private String subscriptionId = formatForTest("my-sub");
private int messageCount = 5;

class SubscriberRunnable implements Runnable {

Expand Down Expand Up @@ -104,9 +105,9 @@ public void testQuickstart() throws Exception {

bout.reset();
// publish messages
PublisherExample.main(topicId);
PublisherExample.main(topicId, String.valueOf(messageCount));
String[] messageIds = bout.toString().split("\n");
assertThat(messageIds).hasLength(PublisherExample.MESSAGE_COUNT);
assertThat(messageIds).hasLength(messageCount);

bout.reset();
// receive messages
Expand All @@ -132,7 +133,7 @@ private String formatForTest(String name) {

private void deleteTestTopic() throws Exception {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
topicAdminClient.deleteTopic(TopicName.create(projectId, topicId));
topicAdminClient.deleteTopic(TopicName.of(projectId, topicId));
} catch (IOException e) {
System.err.println("Error deleting topic " + e.getMessage());
}
Expand All @@ -141,7 +142,7 @@ private void deleteTestTopic() throws Exception {
private void deleteTestSubscription() throws Exception {
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
subscriptionAdminClient.deleteSubscription(
SubscriptionName.create(projectId, subscriptionId));
SubscriptionName.of(projectId, subscriptionId));
} catch (IOException e) {
System.err.println("Error deleting subscription " + e.getMessage());
}
Expand Down

0 comments on commit 059bb0e

Please sign in to comment.