From 6f66f372036fbd9afbade59c10aa6ae2be9242ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Andr=C3=A9=20Martins?= Date: Wed, 12 Jul 2017 17:05:48 -0400 Subject: [PATCH 1/5] Spring Integration channel adapter code samples Aid on getting started with the Spring Integration GCP Pub/Sub channel adapters. --- pom.xml | 1 + spring/integration/pubsub/pom.xml | 130 +++++++++++++++ .../spring/pubsub/PubsubApplication.java | 153 ++++++++++++++++++ .../src/main/resources/static/index.html | 36 +++++ .../src/main/resources/static/js/pubsub.js | 11 ++ .../integration/pubsub/src/main/wro/main.less | 0 .../pubsub/src/main/wro/wro.properties | 4 + .../integration/pubsub/src/main/wro/wro.xml | 9 ++ 8 files changed, 344 insertions(+) create mode 100644 spring/integration/pubsub/pom.xml create mode 100644 spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java create mode 100644 spring/integration/pubsub/src/main/resources/static/index.html create mode 100644 spring/integration/pubsub/src/main/resources/static/js/pubsub.js create mode 100644 spring/integration/pubsub/src/main/wro/main.less create mode 100644 spring/integration/pubsub/src/main/wro/wro.properties create mode 100644 spring/integration/pubsub/src/main/wro/wro.xml diff --git a/pom.xml b/pom.xml index 73b0c5ba6dc..0feaec8eb60 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,7 @@ pubsub/cloud-client spanner/cloud-client speech/cloud-client + spring/integration/pubsub storage/cloud-client storage/json-api storage/storage-transfer diff --git a/spring/integration/pubsub/pom.xml b/spring/integration/pubsub/pom.xml new file mode 100644 index 00000000000..c63ef0f78cb --- /dev/null +++ b/spring/integration/pubsub/pom.xml @@ -0,0 +1,130 @@ + + + + doc-samples + com.google.cloud + 1.0.0 + ../../.. + + 4.0.0 + jar + + spring-integration-pubsub-samples + + + 1.0.0.BUILD-SNAPSHOT + 1.5.2.RELEASE + + + + + org.springframework.cloud + spring-cloud-gcp-core + ${spring-cloud-gcp.version} + + + org.springframework.cloud + spring-cloud-gcp-pubsub + ${spring-cloud-gcp.version} + + + org.springframework.cloud + spring-cloud-gcp-starter-pubsub + ${spring-cloud-gcp.version} + + + org.springframework.cloud + spring-integration-gcp + ${spring-cloud-gcp.version} + + + org.springframework.boot + spring-boot-starter-web + ${spring-boot.version} + + + + + + + + + src/main/resources + + + ${project.build.directory}/generated-resources + + + + + org.springframework.boot + spring-boot-maven-plugin + 1.5.2.RELEASE + + + maven-resources-plugin + 3.0.1 + + + + copy-resources + validate + + copy-resources + + + ${basedir}/target/wro + + + src/main/wro + true + + + + + + + + ro.isdc.wro4j + wro4j-maven-plugin + 1.7.6 + + + generate-resources + + run + + + + + ro.isdc.wro.maven.plugin.manager.factory.ConfigurableWroManagerFactory + ${project.build.directory}/generated-resources/static/css + ${project.build.directory}/generated-resources/static/js + ${project.build.directory}/wro/wro.xml + src/main/wro/wro.properties + ${basedir}/src/main/wro + + + + org.webjars + jquery + 2.1.1 + + + org.webjars + angularjs + 1.3.8 + + + org.webjars + bootstrap + 3.2.0 + + + + + + diff --git a/spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java b/spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java new file mode 100644 index 00000000000..e8e47242849 --- /dev/null +++ b/spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java @@ -0,0 +1,153 @@ +package com.example.spring.pubsub; + +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.protobuf.ByteString; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.gcp.pubsub.PubsubAdmin; +import org.springframework.cloud.gcp.pubsub.core.PubsubTemplate; +import org.springframework.cloud.gcp.pubsub.support.GcpHeaders; +import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.gcp.AckMode; +import org.springframework.integration.gcp.inbound.PubsubInboundChannelAdapter; +import org.springframework.integration.gcp.outbound.PubsubMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.view.RedirectView; + +@SpringBootApplication +@IntegrationComponentScan +@RestController +@ComponentScan(basePackages = {"org.springframework.cloud.gcp"}) +public class PubsubApplication { + + private static final Log LOGGER = LogFactory.getLog(PubsubApplication.class); + + @Autowired + private PubsubOutboundGateway messagingGateway; + + @Autowired + private PubsubAdmin admin; + + public static void main(String[] args) throws IOException { + SpringApplication.run(PubsubApplication.class, args); + } + + @GetMapping("/listTopics") + public List listTopics() { + return admin.listTopics().stream() + .map(Topic::getNameAsTopicName) + .map(TopicName::getTopic) + .collect(Collectors.toList()); + } + + @GetMapping("/listSubscriptions") + public List listSubscriptions() { + return admin.listSubscriptions().stream() + .map(Subscription::getNameAsSubscriptionName) + .map(SubscriptionName::getSubscription) + .collect(Collectors.toList()); + } + + @PostMapping("/postMessage") + public RedirectView addMessage(@RequestParam("message") String message) { + messagingGateway.sendToPubsub(message); + return new RedirectView("/"); + } + + @PostMapping("/newTopic") + public RedirectView newTopic(@RequestParam("name") String topicName) { + admin.createTopic(topicName); + return new RedirectView("/"); + } + + @PostMapping("/newSubscription") + public RedirectView newSubscription(@RequestParam("name") String subscriptionName, + @RequestParam("topic") String topicName) { + admin.createSubscription(subscriptionName, topicName); + return new RedirectView("/"); + } + + @Bean + public MessageChannel pubsubInputChannel() { + return new PublishSubscribeChannel(); + } + + @Bean + public PubsubInboundChannelAdapter messageChannelAdapter( + @Qualifier("pubsubInputChannel") MessageChannel inputChannel, + SubscriberFactory subscriberFactory) { + PubsubInboundChannelAdapter adapter = + new PubsubInboundChannelAdapter(subscriberFactory, "messages"); + adapter.setOutputChannel(inputChannel); + adapter.setAckMode(AckMode.MANUAL); + + return adapter; + } + + @Bean + @ServiceActivator(inputChannel = "pubsubInputChannel") + public MessageHandler receiveMessage() { + return message -> { + LOGGER.info("Message arrived! Payload: " + + ((ByteString) message.getPayload()).toStringUtf8()); + AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get( + GcpHeaders.ACKNOWLEDGEMENT); + consumer.ack(); + }; + } + + @Bean + @ServiceActivator(inputChannel = "pubsubInputChannel") + public MessageHandler receiveMessageInParallel() { + return message -> { + LOGGER.info("Message also arrived here! Payload: " + + ((ByteString) message.getPayload()).toStringUtf8()); + AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get( + GcpHeaders.ACKNOWLEDGEMENT); + consumer.ack(); + }; + } + + @Bean + @ServiceActivator(inputChannel = "pubsubOutputChannel") + public MessageHandler messageSender(PubsubTemplate pubsubTemplate) throws IOException { + PubsubMessageHandler outboundAdapter = new PubsubMessageHandler(pubsubTemplate); + outboundAdapter.setTopic("test"); + return outboundAdapter; + } + + @Bean + public MessageChannel pubsubOutputChannel() { + return new PublishSubscribeChannel(); + } + + @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel") + public interface PubsubOutboundGateway { + + void sendToPubsub(String text); + } +} diff --git a/spring/integration/pubsub/src/main/resources/static/index.html b/spring/integration/pubsub/src/main/resources/static/index.html new file mode 100644 index 00000000000..6e2593841e9 --- /dev/null +++ b/spring/integration/pubsub/src/main/resources/static/index.html @@ -0,0 +1,36 @@ + + + + + Spring Integration GCP sample + + + + + +
+
+ Post message: +
+
+ New topic: +
+
+ New subscription: + for topic +
+
+
+

Topics

+
+ {{ topic }}
+
+
+
+

Subscriptions

+
+ {{ subscription }}
+
+
+ + diff --git a/spring/integration/pubsub/src/main/resources/static/js/pubsub.js b/spring/integration/pubsub/src/main/resources/static/js/pubsub.js new file mode 100644 index 00000000000..79e979d2cb2 --- /dev/null +++ b/spring/integration/pubsub/src/main/resources/static/js/pubsub.js @@ -0,0 +1,11 @@ +angular.module('pubsub', []) + .controller('listTopics', function($scope, $http) { + $http.get('/listTopics').success(function(data) { + $scope.topics = data; + }); + }) + .controller('listSubscriptions', function($scope, $http) { + $http.get('/listSubscriptions').success(function(data) { + $scope.subscriptions = data; + }) + }); diff --git a/spring/integration/pubsub/src/main/wro/main.less b/spring/integration/pubsub/src/main/wro/main.less new file mode 100644 index 00000000000..e69de29bb2d diff --git a/spring/integration/pubsub/src/main/wro/wro.properties b/spring/integration/pubsub/src/main/wro/wro.properties new file mode 100644 index 00000000000..1502f7dc98b --- /dev/null +++ b/spring/integration/pubsub/src/main/wro/wro.properties @@ -0,0 +1,4 @@ +#List of preProcessors +preProcessors=lessCssImport +#List of postProcessors +postProcessors=less4j,jsMin diff --git a/spring/integration/pubsub/src/main/wro/wro.xml b/spring/integration/pubsub/src/main/wro/wro.xml new file mode 100644 index 00000000000..40873c376fc --- /dev/null +++ b/spring/integration/pubsub/src/main/wro/wro.xml @@ -0,0 +1,9 @@ + + + webjar:bootstrap/3.2.0/less/bootstrap.less + file:${project.basedir}/src/main/wro/main.less + webjar:jquery/2.1.1/jquery.min.js + webjar:bootstrap/3.2.0/bootstrap.js + webjar:angularjs/1.3.8/angular.min.js + + From f13fa3c829449ac3c9bbef6d3fb8e7ab923410b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Andr=C3=A9=20Martins?= Date: Thu, 13 Jul 2017 11:30:50 -0400 Subject: [PATCH 2/5] Remove unnecessary dependencies --- spring/integration/pubsub/pom.xml | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/spring/integration/pubsub/pom.xml b/spring/integration/pubsub/pom.xml index c63ef0f78cb..03a8efcc106 100644 --- a/spring/integration/pubsub/pom.xml +++ b/spring/integration/pubsub/pom.xml @@ -19,16 +19,6 @@ - - org.springframework.cloud - spring-cloud-gcp-core - ${spring-cloud-gcp.version} - - - org.springframework.cloud - spring-cloud-gcp-pubsub - ${spring-cloud-gcp.version} - org.springframework.cloud spring-cloud-gcp-starter-pubsub From 431123dfc70b9701cd3dbb52119ac7c6372bb715 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Andr=C3=A9=20Martins?= Date: Thu, 13 Jul 2017 14:07:25 -0400 Subject: [PATCH 3/5] More code review fixes --- .../spring/pubsub/PubsubApplication.java | 29 +++++++------------ .../src/main/resources/application.properties | 5 ++++ 2 files changed, 15 insertions(+), 19 deletions(-) create mode 100644 spring/integration/pubsub/src/main/resources/application.properties diff --git a/spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java b/spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java index e8e47242849..ad1f62c3f21 100644 --- a/spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java +++ b/spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java @@ -1,18 +1,18 @@ package com.example.spring.pubsub; -import com.google.pubsub.v1.Subscription; -import com.google.pubsub.v1.SubscriptionName; -import com.google.pubsub.v1.Topic; -import com.google.pubsub.v1.TopicName; import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.protobuf.ByteString; - -import java.util.List; -import java.util.stream.Collectors; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; @@ -22,8 +22,6 @@ import org.springframework.cloud.gcp.pubsub.support.GcpHeaders; import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.PublishSubscribeChannel; @@ -39,9 +37,7 @@ import org.springframework.web.servlet.view.RedirectView; @SpringBootApplication -@IntegrationComponentScan @RestController -@ComponentScan(basePackages = {"org.springframework.cloud.gcp"}) public class PubsubApplication { private static final Log LOGGER = LogFactory.getLog(PubsubApplication.class); @@ -110,7 +106,7 @@ public PubsubInboundChannelAdapter messageChannelAdapter( @Bean @ServiceActivator(inputChannel = "pubsubInputChannel") - public MessageHandler receiveMessage() { + public MessageHandler messageReceiver1() { return message -> { LOGGER.info("Message arrived! Payload: " + ((ByteString) message.getPayload()).toStringUtf8()); @@ -122,7 +118,7 @@ public MessageHandler receiveMessage() { @Bean @ServiceActivator(inputChannel = "pubsubInputChannel") - public MessageHandler receiveMessageInParallel() { + public MessageHandler messageReceiver2() { return message -> { LOGGER.info("Message also arrived here! Payload: " + ((ByteString) message.getPayload()).toStringUtf8()); @@ -134,17 +130,12 @@ public MessageHandler receiveMessageInParallel() { @Bean @ServiceActivator(inputChannel = "pubsubOutputChannel") - public MessageHandler messageSender(PubsubTemplate pubsubTemplate) throws IOException { + public MessageHandler messageSender(PubsubTemplate pubsubTemplate) { PubsubMessageHandler outboundAdapter = new PubsubMessageHandler(pubsubTemplate); outboundAdapter.setTopic("test"); return outboundAdapter; } - @Bean - public MessageChannel pubsubOutputChannel() { - return new PublishSubscribeChannel(); - } - @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel") public interface PubsubOutboundGateway { diff --git a/spring/integration/pubsub/src/main/resources/application.properties b/spring/integration/pubsub/src/main/resources/application.properties new file mode 100644 index 00000000000..db7041b911c --- /dev/null +++ b/spring/integration/pubsub/src/main/resources/application.properties @@ -0,0 +1,5 @@ +#spring.cloud.gcp.projectId=[YOUR_PROJECT_ID] +#spring.cloud.gcp.credentialsLocation=file:[LOCAL_PATH_TO_CREDENTIALS] +# +#spring.cloud.gcp.pubsub.subscriber.executorThreads=[SUBSCRIBER_THREADS] +#spring.cloud.gcp.pubsub.publisher.executorThreads=[PUBLISHER_THREADS] From a06cd45bffa2b23915a1622f19723add18799684 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Andr=C3=A9=20Martins?= Date: Thu, 13 Jul 2017 15:32:10 -0400 Subject: [PATCH 4/5] Code review fixes --- pom.xml | 2 +- .../src/main/resources/static/js/pubsub.js | 11 -- spring/{integration => }/pubsub/pom.xml | 2 +- .../spring/pubsub/PubsubApplication.java | 122 +++++++++--------- .../spring/pubsub/WebAppController.java | 108 ++++++++++++++++ .../src/main/resources/application.properties | 0 .../src/main/resources/static/index.html | 15 +++ .../src/main/resources/static/js/pubsub.js | 27 ++++ .../pubsub/src/main/wro/main.less | 0 .../pubsub/src/main/wro/wro.properties | 0 .../pubsub/src/main/wro/wro.xml | 0 11 files changed, 211 insertions(+), 76 deletions(-) delete mode 100644 spring/integration/pubsub/src/main/resources/static/js/pubsub.js rename spring/{integration => }/pubsub/pom.xml (98%) rename spring/{integration => }/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java (55%) create mode 100644 spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java rename spring/{integration => }/pubsub/src/main/resources/application.properties (100%) rename spring/{integration => }/pubsub/src/main/resources/static/index.html (62%) create mode 100644 spring/pubsub/src/main/resources/static/js/pubsub.js rename spring/{integration => }/pubsub/src/main/wro/main.less (100%) rename spring/{integration => }/pubsub/src/main/wro/wro.properties (100%) rename spring/{integration => }/pubsub/src/main/wro/wro.xml (100%) diff --git a/pom.xml b/pom.xml index 0feaec8eb60..6960ff68d60 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ pubsub/cloud-client spanner/cloud-client speech/cloud-client - spring/integration/pubsub + spring/pubsub storage/cloud-client storage/json-api storage/storage-transfer diff --git a/spring/integration/pubsub/src/main/resources/static/js/pubsub.js b/spring/integration/pubsub/src/main/resources/static/js/pubsub.js deleted file mode 100644 index 79e979d2cb2..00000000000 --- a/spring/integration/pubsub/src/main/resources/static/js/pubsub.js +++ /dev/null @@ -1,11 +0,0 @@ -angular.module('pubsub', []) - .controller('listTopics', function($scope, $http) { - $http.get('/listTopics').success(function(data) { - $scope.topics = data; - }); - }) - .controller('listSubscriptions', function($scope, $http) { - $http.get('/listSubscriptions').success(function(data) { - $scope.subscriptions = data; - }) - }); diff --git a/spring/integration/pubsub/pom.xml b/spring/pubsub/pom.xml similarity index 98% rename from spring/integration/pubsub/pom.xml rename to spring/pubsub/pom.xml index 03a8efcc106..075b9142b19 100644 --- a/spring/integration/pubsub/pom.xml +++ b/spring/pubsub/pom.xml @@ -6,7 +6,7 @@ doc-samples com.google.cloud 1.0.0 - ../../.. + ../.. 4.0.0 jar diff --git a/spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java b/spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java similarity index 55% rename from spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java rename to spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java index ad1f62c3f21..48c40a0587a 100644 --- a/spring/integration/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java +++ b/spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java @@ -1,23 +1,29 @@ -package com.example.spring.pubsub; +/* + * Copyright 2017 original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import java.io.IOException; -import java.util.List; -import java.util.stream.Collectors; +package com.example.spring.pubsub; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.protobuf.ByteString; -import com.google.pubsub.v1.Subscription; -import com.google.pubsub.v1.SubscriptionName; -import com.google.pubsub.v1.Topic; -import com.google.pubsub.v1.TopicName; +import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.gcp.pubsub.PubsubAdmin; import org.springframework.cloud.gcp.pubsub.core.PubsubTemplate; import org.springframework.cloud.gcp.pubsub.support.GcpHeaders; import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory; @@ -30,68 +36,39 @@ import org.springframework.integration.gcp.outbound.PubsubMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.servlet.view.RedirectView; @SpringBootApplication -@RestController public class PubsubApplication { private static final Log LOGGER = LogFactory.getLog(PubsubApplication.class); - @Autowired - private PubsubOutboundGateway messagingGateway; - - @Autowired - private PubsubAdmin admin; - public static void main(String[] args) throws IOException { SpringApplication.run(PubsubApplication.class, args); } - - @GetMapping("/listTopics") - public List listTopics() { - return admin.listTopics().stream() - .map(Topic::getNameAsTopicName) - .map(TopicName::getTopic) - .collect(Collectors.toList()); - } - - @GetMapping("/listSubscriptions") - public List listSubscriptions() { - return admin.listSubscriptions().stream() - .map(Subscription::getNameAsSubscriptionName) - .map(SubscriptionName::getSubscription) - .collect(Collectors.toList()); - } - - @PostMapping("/postMessage") - public RedirectView addMessage(@RequestParam("message") String message) { - messagingGateway.sendToPubsub(message); - return new RedirectView("/"); - } - - @PostMapping("/newTopic") - public RedirectView newTopic(@RequestParam("name") String topicName) { - admin.createTopic(topicName); - return new RedirectView("/"); - } - - @PostMapping("/newSubscription") - public RedirectView newSubscription(@RequestParam("name") String subscriptionName, - @RequestParam("topic") String topicName) { - admin.createSubscription(subscriptionName, topicName); - return new RedirectView("/"); - } - + /** + * Spring channel for incoming messages from Google Cloud Pub/Sub. + * + *

We use a {@link PublishSubscribeChannel} which broadcasts messages to every subscriber. In + * this case, every service activator. + */ @Bean public MessageChannel pubsubInputChannel() { return new PublishSubscribeChannel(); } + /** + * Inbound channel adapter that gets activated whenever a new message arrives at a Google Cloud + * Pub/Sub subscription. + * + *

Messages get posted to the specified input channel, which activates the service activators + * below. + * + * @param inputChannel Spring channel that receives messages and triggers attached service + * activators + * @param subscriberFactory creates the subscriber that listens to messages from Google Cloud + * Pub/Sub + * @return the inbound channel adapter for a Google Cloud Pub/Sub subscription + */ @Bean public PubsubInboundChannelAdapter messageChannelAdapter( @Qualifier("pubsubInputChannel") MessageChannel inputChannel, @@ -104,30 +81,46 @@ public PubsubInboundChannelAdapter messageChannelAdapter( return adapter; } + /** + * Message handler that gets triggered whenever a new message arrives at the attached Spring + * channel. + * + *

Just logs the received message. Message acknowledgement mode set to manual above, so the + * consumer that allows us to (n)ack is extracted from the message headers and used to ack. + */ @Bean @ServiceActivator(inputChannel = "pubsubInputChannel") public MessageHandler messageReceiver1() { return message -> { LOGGER.info("Message arrived! Payload: " + ((ByteString) message.getPayload()).toStringUtf8()); - AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get( - GcpHeaders.ACKNOWLEDGEMENT); + AckReplyConsumer consumer = + (AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT); consumer.ack(); }; } + /** + * Second message handler that also gets messages from the same subscription as above. + */ @Bean @ServiceActivator(inputChannel = "pubsubInputChannel") public MessageHandler messageReceiver2() { return message -> { LOGGER.info("Message also arrived here! Payload: " + ((ByteString) message.getPayload()).toStringUtf8()); - AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get( - GcpHeaders.ACKNOWLEDGEMENT); + AckReplyConsumer consumer = + (AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT); consumer.ack(); }; } + /** + * The outbound channel adapter to write messages from a Spring channel to a Google Cloud Pub/Sub + * topic. + * + * @param pubsubTemplate Spring abstraction to send messages to Google Cloud Pub/Sub topics + */ @Bean @ServiceActivator(inputChannel = "pubsubOutputChannel") public MessageHandler messageSender(PubsubTemplate pubsubTemplate) { @@ -136,6 +129,9 @@ public MessageHandler messageSender(PubsubTemplate pubsubTemplate) { return outboundAdapter; } + /** + * A Spring mechanism to write messages to a channel. + */ @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel") public interface PubsubOutboundGateway { diff --git a/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java b/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java new file mode 100644 index 00000000000..9c1283c40e6 --- /dev/null +++ b/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java @@ -0,0 +1,108 @@ +/* + * Copyright 2017 original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.spring.pubsub; + +import com.example.spring.pubsub.PubsubApplication.PubsubOutboundGateway; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import java.util.List; +import java.util.stream.Collectors; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.gcp.pubsub.PubsubAdmin; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.view.RedirectView; + +@RestController +public class WebAppController { + + @Autowired + private PubsubOutboundGateway messagingGateway; + + @Autowired private PubsubAdmin admin; + + /** + * Lists every topic in the project. + * + * @return a list of the names of every topic in the project + */ + @GetMapping("/listTopics") + public List listTopics() { + return admin + .listTopics() + .stream() + .map(Topic::getNameAsTopicName) + .map(TopicName::getTopic) + .collect(Collectors.toList()); + } + + /** + * Lists every subscription in the project. + * + * @return a list of the names of every subscription in the project + */ + @GetMapping("/listSubscriptions") + public List listSubscriptions() { + return admin + .listSubscriptions() + .stream() + .map(Subscription::getNameAsSubscriptionName) + .map(SubscriptionName::getSubscription) + .collect(Collectors.toList()); + } + + /** + * Posts a message to a Google Cloud Pub/Sub topic, through Spring's messaging gateway, and + * redirects the user to the home page. + * + * @param message the message posted to the Pub/Sub topic + */ + @PostMapping("/postMessage") + public RedirectView addMessage(@RequestParam("message") String message) { + messagingGateway.sendToPubsub(message); + return new RedirectView("/"); + } + + /** + * Creates a new topic on Google Cloud Pub/Sub, through Spring's Pub/Sub admin class, and + * redirects the user to the home page. + * + * @param topicName the name of the new topic + */ + @PostMapping("/newTopic") + public RedirectView newTopic(@RequestParam("name") String topicName) { + admin.createTopic(topicName); + return new RedirectView("/"); + } + + /** + * Creates a new subscription on Google Cloud Pub/Sub, through Spring's Pub/Sub admin class, and + * redirects the user to the home page. + * + * @param topicName the name of the new subscription + */ + @PostMapping("/newSubscription") + public RedirectView newSubscription( + @RequestParam("name") String subscriptionName, @RequestParam("topic") String topicName) { + admin.createSubscription(subscriptionName, topicName); + return new RedirectView("/"); + } +} diff --git a/spring/integration/pubsub/src/main/resources/application.properties b/spring/pubsub/src/main/resources/application.properties similarity index 100% rename from spring/integration/pubsub/src/main/resources/application.properties rename to spring/pubsub/src/main/resources/application.properties diff --git a/spring/integration/pubsub/src/main/resources/static/index.html b/spring/pubsub/src/main/resources/static/index.html similarity index 62% rename from spring/integration/pubsub/src/main/resources/static/index.html rename to spring/pubsub/src/main/resources/static/index.html index 6e2593841e9..d5a1ad8675a 100644 --- a/spring/integration/pubsub/src/main/resources/static/index.html +++ b/spring/pubsub/src/main/resources/static/index.html @@ -1,4 +1,19 @@ + diff --git a/spring/pubsub/src/main/resources/static/js/pubsub.js b/spring/pubsub/src/main/resources/static/js/pubsub.js new file mode 100644 index 00000000000..288f07cb991 --- /dev/null +++ b/spring/pubsub/src/main/resources/static/js/pubsub.js @@ -0,0 +1,27 @@ +/* + * Copyright 2017 original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +angular.module('pubsub', []) + .controller('listTopics', function($scope, $http) { + $http.get('/listTopics').success(function(data) { + $scope.topics = data; + }); + }) + .controller('listSubscriptions', function($scope, $http) { + $http.get('/listSubscriptions').success(function(data) { + $scope.subscriptions = data; + }) + }); diff --git a/spring/integration/pubsub/src/main/wro/main.less b/spring/pubsub/src/main/wro/main.less similarity index 100% rename from spring/integration/pubsub/src/main/wro/main.less rename to spring/pubsub/src/main/wro/main.less diff --git a/spring/integration/pubsub/src/main/wro/wro.properties b/spring/pubsub/src/main/wro/wro.properties similarity index 100% rename from spring/integration/pubsub/src/main/wro/wro.properties rename to spring/pubsub/src/main/wro/wro.properties diff --git a/spring/integration/pubsub/src/main/wro/wro.xml b/spring/pubsub/src/main/wro/wro.xml similarity index 100% rename from spring/integration/pubsub/src/main/wro/wro.xml rename to spring/pubsub/src/main/wro/wro.xml From 43ef2e80c6cbdc7646da28931570856b45b9c7dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Andr=C3=A9=20Martins?= Date: Fri, 14 Jul 2017 11:01:51 -0400 Subject: [PATCH 5/5] Adds README and other smaller fixes --- spring/pubsub/README.md | 144 ++++++++++++++++++ .../spring/pubsub/PubsubApplication.java | 5 + .../spring/pubsub/WebAppController.java | 3 +- .../src/main/resources/application.properties | 1 + 4 files changed, 152 insertions(+), 1 deletion(-) create mode 100644 spring/pubsub/README.md diff --git a/spring/pubsub/README.md b/spring/pubsub/README.md new file mode 100644 index 00000000000..ea3b076e5e9 --- /dev/null +++ b/spring/pubsub/README.md @@ -0,0 +1,144 @@ +# Getting started with Spring Integration channel adapters for Google Cloud Pub/Sub + +This is a sample application that uses Spring Integration and Spring Boot to read and write messages +to Google Cloud Pub/Sub. + +PubsubApplication is a typical Spring Boot application. We declare all the necessary beans for the +application to work in the `PubsubApplication` class. The most important ones are the inbound and +outbound channel adapters. + +## Channel adapters + +On Spring Integration, channel adapters are adapters that send or receive messages from external +systems, convert them to/from an internal Spring message representation and read/write them to a +Spring channel, which can then have other components attached to it, such as service activators. + +### Inbound channel adapter + +PubsubInboundChannelAdapter is Spring Cloud GCP Pub/Sub inbound channel adapter class. It's declared +in the user app as follows: + +``` +@Bean +public PubsubInboundChannelAdapter messageChannelAdapter( + @Qualifier("pubsubInputChannel") MessageChannel inputChannel, + SubscriberFactory subscriberFactory) { + PubsubInboundChannelAdapter adapter = + new PubsubInboundChannelAdapter(subscriberFactory, "messages"); + adapter.setOutputChannel(inputChannel); + adapter.setAckMode(AckMode.MANUAL); + + return adapter; +} +``` + +In the example, we instantiate the `PubsubInboundChannelAdapter` object with a SubscriberFactory and +a Google Cloud Pub/Sub subscription name, from where the adapter listens to messages, and then set +its output channel and ack mode. + +In apps which use the Spring Cloud GCP Pubsub Boot starter, a SubscriberFactory is automatically +provided. The subscription name (e.g., `"messages"`) is the name of a Google Cloud Pub/Sub +subscription that must already exist when the channel adapter is created. + +The input channel is a channel in which messages get into Spring from an external system. +In this example, we use a PublishSubscribeChannel, which broadcasts incoming messages to all its +subscribers, including service activators. + +``` +@Bean +public MessageChannel pubsubInputChannel() { + return new PublishSubscribeChannel(); +} +``` + +Setting the acknowledgement mode on the inbound channel adapter is optional. It is set to automatic +by default. If set to manual, messages must be explicitly acknowledged through the +`AckReplyConsumer` object from the Spring message header `GcpHeader.ACKNOWLEDGEMENT`. + +``` +AckReplyConsumer consumer = + (AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT); +consumer.ack(); +``` + +A service activator is typically attached to a channel in order to process incoming messages. Here +is an example of a service activator that logs and acknowledges the received message. + +``` +@Bean +@ServiceActivator(inputChannel = "pubsubInputChannel") +public MessageHandler messageReceiver1() { + return message -> { + LOGGER.info("Message arrived! Payload: " + + ((ByteString) message.getPayload()).toStringUtf8()); + AckReplyConsumer consumer = + (AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT); + consumer.ack(); + }; +} +``` + +### Outbound channel adapter + +PubSubMessageHandler is Spring Cloud GCP's Pub/Sub outbound channel adapter. It converts Spring +messages in a channel to an external representation and sends them to a Google Cloud Pub/Sub topic. + +``` +@Bean +@ServiceActivator(inputChannel = "pubsubOutputChannel") +public MessageHandler messageSender(PubsubTemplate pubsubTemplate) { + PubsubMessageHandler outboundAdapter = new PubsubMessageHandler(pubsubTemplate); + outboundAdapter.setTopic("test"); + return outboundAdapter; +} +``` + +`PubsubTemplate` is Spring Cloud GCP's abstraction to send messages to Google Cloud Pub/Sub. It +contains the logic to create a Google Cloud Pub/Sub `Publisher`, convert Spring messages to Google +Cloud Pub/Sub `PubsubMessage` and publish them to a topic. + +`PubsubMessageHandler` requires a `PubsubTemplate` to be instantiated. The Spring Cloud GCP Boot +Pubsub starter provides a pre-configured `PubsubTemplate`, ready to use. `PubsubMessageHandler` +also requires the name of a Google Cloud Pub/Sub topic, which must exist before any messages are +sent. + +We use a messaging gateway to write to a Spring channel. + +``` +@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel") +public interface PubsubOutboundGateway { + + void sendToPubsub(String text); +} +``` + +Spring auto-generates the output channel, as well as the gateway code and injects it to the local +variable in `WebAppController`. + +``` +@Autowired +private PubsubOutboundGateway messagingGateway; +``` + +## Administration + +The Spring Cloud GCP Pubsub package provides a Google Cloud Pub/Sub administration utility, +`PubsubAdmin`, to simplify the creation, listing and deletion of Google Cloud Pub/Sub topics and +subscriptions. The Spring Cloud GCP Pubsub starter provides a pre-configured `PubsubAdmin`, based on +an application's properties. + +``` +@Autowired +private PubsubAdmin admin; +``` + +## Sample application + +This sample application uses Spring Boot and Spring Web to declare a REST controller. The front-end +uses client-side scripting with Angular. + +It is exemplified how to: +* Send messages to a Google Cloud Pub/Sub topic through an outbound channel adapter; +* Receive and process messages from a Google Cloud Pub/Sub subscription through an inbound channel +adapter; +* Create new Google Cloud Pub/Sub topics and subscriptions through the Pub/Sub admin utility. diff --git a/spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java b/spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java index 48c40a0587a..047bd9490f8 100644 --- a/spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java +++ b/spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java @@ -45,6 +45,9 @@ public class PubsubApplication { public static void main(String[] args) throws IOException { SpringApplication.run(PubsubApplication.class, args); } + + // Inbound channel adapter. + /** * Spring channel for incoming messages from Google Cloud Pub/Sub. * @@ -115,6 +118,8 @@ public MessageHandler messageReceiver2() { }; } + // Outbound channel adapter + /** * The outbound channel adapter to write messages from a Spring channel to a Google Cloud Pub/Sub * topic. diff --git a/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java b/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java index 9c1283c40e6..bcd4f534423 100644 --- a/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java +++ b/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java @@ -37,7 +37,8 @@ public class WebAppController { @Autowired private PubsubOutboundGateway messagingGateway; - @Autowired private PubsubAdmin admin; + @Autowired + private PubsubAdmin admin; /** * Lists every topic in the project. diff --git a/spring/pubsub/src/main/resources/application.properties b/spring/pubsub/src/main/resources/application.properties index db7041b911c..1080c9b04bc 100644 --- a/spring/pubsub/src/main/resources/application.properties +++ b/spring/pubsub/src/main/resources/application.properties @@ -2,4 +2,5 @@ #spring.cloud.gcp.credentialsLocation=file:[LOCAL_PATH_TO_CREDENTIALS] # #spring.cloud.gcp.pubsub.subscriber.executorThreads=[SUBSCRIBER_THREADS] +#spring.cloud.gcp.pubsub.subscriber.ackDeadline=[ACK_DEADLINE_SECONDS] #spring.cloud.gcp.pubsub.publisher.executorThreads=[PUBLISHER_THREADS]