Skip to content

Commit

Permalink
Google Pub/Sub: Configurable host (for ordering) (#2879)
Browse files Browse the repository at this point in the history
  • Loading branch information
LukBed authored Jul 1, 2022
1 parent 2767744 commit d27748d
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,20 @@ private[pubsub] trait PubSubApi {
}
}.withDefaultRetry

private def pool[T: FromResponseUnmarshaller, Ctx](parallelism: Int)(
private def pool[T: FromResponseUnmarshaller, Ctx](parallelism: Int, host: Option[String])(
implicit system: ActorSystem
): FlowWithContext[HttpRequest, Ctx, Try[T], Ctx, Future[HostConnectionPool]] =
GoogleHttp().cachedHostConnectionPoolWithContext[T, Ctx](
PubSubGoogleApisHost,
host.getOrElse(PubSubGoogleApisHost),
PubSubGoogleApisPort,
https = !isEmulated,
authenticate = !isEmulated,
parallelism = parallelism
)

def publish[T](topic: String, parallelism: Int): FlowWithContext[PublishRequest, T, PublishResponse, T, NotUsed] =
def publish[T](topic: String,
parallelism: Int,
host: Option[String]): FlowWithContext[PublishRequest, T, PublishResponse, T, NotUsed] =
FlowWithContext.fromTuples {
Flow
.fromMaterializer { (mat, attr) =>
Expand All @@ -247,13 +249,16 @@ private[pubsub] trait PubSubApi {
HttpRequest(POST, url, entity = entity)
}(ExecutionContexts.parasitic)
}
.via(pool[PublishResponse, T](parallelism))
.via(pool[PublishResponse, T](parallelism, host))
.map(_.get)
.asFlow
}
.mapMaterializedValue(_ => NotUsed)
}

def publish[T](topic: String, parallelism: Int): FlowWithContext[PublishRequest, T, PublishResponse, T, NotUsed] =
publish(topic, parallelism, None)

private implicit val publishResponseUnmarshaller: FromResponseUnmarshaller[PublishResponse] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
response.status match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,37 @@

package akka.stream.alpakka.googlecloud.pubsub.javadsl

import java.util.concurrent.CompletionStage

import akka.actor.Cancellable
import akka.stream.alpakka.googlecloud.pubsub.scaladsl.{GooglePubSub => GPubSub}
import akka.stream.alpakka.googlecloud.pubsub.{AcknowledgeRequest, PubSubConfig, PublishRequest, ReceivedMessage}
import akka.stream.javadsl.{Flow, FlowWithContext, Sink, Source}
import akka.{Done, NotUsed}

import scala.jdk.CollectionConverters._
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

/**
* Java DSL for Google Pub/Sub
*/
object GooglePubSub {

/**
* Creates a flow to that publishes messages to a topic and emits the message ids.
* @param overrideHost publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publish(topic: String,
config: PubSubConfig,
overrideHost: String,
parallelism: Int): Flow[PublishRequest, java.util.List[String], NotUsed] =
GPubSub
.publish(topic, config, overrideHost, parallelism)
.map(response => response.asJava)
.asJava

/**
* Creates a flow to that publishes messages to a topic and emits the message ids.
*/
Expand All @@ -32,6 +46,22 @@ object GooglePubSub {
.map(response => response.asJava)
.asJava

/**
* Creates a flow to that publishes messages to a topic and emits the message ids and carries a context
* through.
* @param overrideHost publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publishWithContext[C](topic: String,
config: PubSubConfig,
overrideHost: String,
parallelism: Int): FlowWithContext[PublishRequest, C, java.util.List[String], C, NotUsed] =
GPubSub
.publishWithContext[C](topic, config, overrideHost, parallelism)
.map(response => response.asJava)
.asJava

/**
* Creates a flow to that publishes messages to a topic and emits the message ids and carries a context
* through.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,77 @@ object GooglePubSub extends GooglePubSub {
protected[pubsub] trait GooglePubSub {
private[pubsub] def httpApi: PubSubApi

/**
* Creates a flow to that publishes messages to a topic and emits the message ids.
* @param overrideHost publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publish(topic: String,
config: PubSubConfig,
overrideHost: String,
parallelism: Int): Flow[PublishRequest, immutable.Seq[String], NotUsed] =
internalPublish(topic, config, Some(overrideHost), parallelism)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids.
* @param overrideHost publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publish(topic: String,
config: PubSubConfig,
overrideHost: String): Flow[PublishRequest, immutable.Seq[String], NotUsed] =
internalPublish(topic, config, Some(overrideHost), parallelism = 1)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids.
*/
def publish(topic: String,
config: PubSubConfig,
parallelism: Int = 1): Flow[PublishRequest, immutable.Seq[String], NotUsed] =
internalPublish(topic, config, None, parallelism)

private def internalPublish(topic: String,
config: PubSubConfig,
overrideHost: Option[String],
parallelism: Int): Flow[PublishRequest, immutable.Seq[String], NotUsed] =
Flow[PublishRequest]
.map((_, ()))
.via(
publishWithContext[Unit](topic, config, parallelism).asFlow
internalPublishWithContext[Unit](topic, config, overrideHost, parallelism).asFlow
)
.map(_._1)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids and carries a context
* through.
* @param overrideHost publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publishWithContext[C](
topic: String,
config: PubSubConfig,
overrideHost: String,
parallelism: Int
): FlowWithContext[PublishRequest, C, immutable.Seq[String], C, NotUsed] =
internalPublishWithContext(topic, config, Some(overrideHost), parallelism)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids and carries a context
* through.
* @param overrideHost publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publishWithContext[C](
topic: String,
config: PubSubConfig,
overrideHost: String
): FlowWithContext[PublishRequest, C, immutable.Seq[String], C, NotUsed] =
internalPublishWithContext(topic, config, Some(overrideHost), parallelism = 1)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids and carries a context
* through.
Expand All @@ -48,10 +106,20 @@ protected[pubsub] trait GooglePubSub {
topic: String,
config: PubSubConfig,
parallelism: Int = 1
): FlowWithContext[PublishRequest, C, immutable.Seq[String], C, NotUsed] =
internalPublishWithContext(topic, config, None, parallelism)

private def internalPublishWithContext[C](
topic: String,
config: PubSubConfig,
overrideHost: Option[String],
parallelism: Int
): FlowWithContext[PublishRequest, C, immutable.Seq[String], C, NotUsed] =
// some wrapping back and forth as FlowWithContext doesn't offer `setup`
// https://github.com/akka/akka/issues/27883
FlowWithContext.fromTuples(flow(config)(httpApi.publish[C](topic, parallelism).asFlow)).map(_.messageIds)
FlowWithContext
.fromTuples(flow(config)(httpApi.publish[C](topic, parallelism, overrideHost).asFlow))
.map(_.messageIds)

/**
* Creates a source pulling messages from a subscription.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ private static void example() throws NoSuchAlgorithmException, InvalidKeySpecExc
// #publish-fast

// #publish with ordering key
// to provide ordering messages must be sent to the same regional endpoint
Flow<PublishRequest, List<String>, NotUsed> publishToRegionalEndpointFlow =
GooglePubSub.publish(topic, config, "europe-west1-pubsub.googleapis.com", 1);

PublishMessage publishMessageWithOrderingKey =
PublishMessage.create(
new String(Base64.getEncoder().encode("Hello Google!".getBytes())),
Expand All @@ -85,7 +89,7 @@ private static void example() throws NoSuchAlgorithmException, InvalidKeySpecExc
PublishRequest.create(Lists.newArrayList(publishMessage));

CompletionStage<List<List<String>>> publishedMessageWithOrderingKeyIds =
source.via(publishFlow).runWith(Sink.seq(), system);
source.via(publishToRegionalEndpointFlow).runWith(Sink.seq(), system);
// #publish with ordering key

// #subscribe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,45 @@ class PubSubApiSpec extends AnyFlatSpec with BeforeAndAfterAll with ScalaFutures
result.futureValue._2 shouldBe (())
}

it should "publish to overridden host" in {
val httpApiWithHostToOverride = new PubSubApi {
val isEmulated = false
val PubSubGoogleApisHost = "invalid-host" //this host must be override to complete the test
val PubSubGoogleApisPort = wiremockServer.httpsPort()
}

val publishMessage =
PublishMessage(
data = new String(Base64.getEncoder.encode("Hello Google!".getBytes))
)

val publishRequest = PublishRequest(Seq(publishMessage))

val expectedPublishRequest =
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ=="}]}"""
val publishResponse = """{"messageIds":["1"]}"""

mock.register(
WireMock
.post(
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false")
)
.withRequestBody(WireMock.equalToJson(expectedPublishRequest))
.withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(
aResponse()
.withStatus(200)
.withBody(publishResponse)
.withHeader("Content-Type", "application/json")
)
)
val flow = httpApiWithHostToOverride.publish[Unit]("topic1", 1, Some("localhost"))
val result =
Source.single((publishRequest, ())).via(flow).toMat(Sink.head)(Keep.right).run()
result.futureValue._1.messageIds shouldBe Seq("1")
result.futureValue._2 shouldBe (())
}

it should "publish without Authorization header to emulator" in {

val publishMessage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ class ExampleUsage {
//#publish-fast

//#publish with ordering key
val messageWithOrderingKey =
val publishToRegionalEndpointFlow: Flow[PublishRequest, Seq[String], NotUsed] =
GooglePubSub.publish(topic, config, "europe-west1-pubsub.googleapis.com")
val messageWithOrderingKey: PublishMessage =
PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)), None, Some("my-ordering-key"))
val publishedMessageWithOrderingKeyIds: Future[Seq[Seq[String]]] = Source
.single(PublishRequest(Seq(messageWithOrderingKey)))
.via(publishFlow)
.via(publishToRegionalEndpointFlow)
.runWith(Sink.seq)
//#publish with ordering key

Expand Down

0 comments on commit d27748d

Please sign in to comment.