Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google Pub/Sub: Configurable host (for ordering) #2879

Merged
merged 2 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,20 @@ private[pubsub] trait PubSubApi {
}
}.withDefaultRetry

private def pool[T: FromResponseUnmarshaller, Ctx](parallelism: Int)(
private def pool[T: FromResponseUnmarshaller, Ctx](parallelism: Int, host: Option[String])(
implicit system: ActorSystem
): FlowWithContext[HttpRequest, Ctx, Try[T], Ctx, Future[HostConnectionPool]] =
GoogleHttp().cachedHostConnectionPoolWithContext[T, Ctx](
PubSubGoogleApisHost,
host.getOrElse(PubSubGoogleApisHost),
PubSubGoogleApisPort,
https = !isEmulated,
authenticate = !isEmulated,
parallelism = parallelism
)

def publish[T](topic: String, parallelism: Int): FlowWithContext[PublishRequest, T, PublishResponse, T, NotUsed] =
def publish[T](topic: String,
parallelism: Int,
host: Option[String]): FlowWithContext[PublishRequest, T, PublishResponse, T, NotUsed] =
FlowWithContext.fromTuples {
Flow
.fromMaterializer { (mat, attr) =>
Expand All @@ -247,13 +249,16 @@ private[pubsub] trait PubSubApi {
HttpRequest(POST, url, entity = entity)
}(ExecutionContexts.parasitic)
}
.via(pool[PublishResponse, T](parallelism))
.via(pool[PublishResponse, T](parallelism, host))
.map(_.get)
.asFlow
}
.mapMaterializedValue(_ => NotUsed)
}

def publish[T](topic: String, parallelism: Int): FlowWithContext[PublishRequest, T, PublishResponse, T, NotUsed] =
publish(topic, parallelism, None)

private implicit val publishResponseUnmarshaller: FromResponseUnmarshaller[PublishResponse] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
response.status match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.stream.alpakka.googlecloud.pubsub.javadsl

import java.util.concurrent.CompletionStage

import akka.actor.Cancellable
import akka.stream.alpakka.googlecloud.pubsub.scaladsl.{GooglePubSub => GPubSub}
import akka.stream.alpakka.googlecloud.pubsub.{AcknowledgeRequest, PubSubConfig, PublishRequest, ReceivedMessage}
Expand All @@ -14,6 +13,7 @@ import akka.{Done, NotUsed}

import scala.jdk.CollectionConverters._
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.concurrent.Future

/**
Expand All @@ -23,15 +23,27 @@ object GooglePubSub {

/**
* Creates a flow to that publishes messages to a topic and emits the message ids.
* @param overrideHost if present publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publish(topic: String,
config: PubSubConfig,
overrideHost: java.util.Optional[String],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't String be enough? If you don't want to pass a host you'd use the other overload?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, changed

parallelism: Int): Flow[PublishRequest, java.util.List[String], NotUsed] =
GPubSub
.publish(topic, config, parallelism)
.publish(topic, config, overrideHost.asScala, parallelism)
.map(response => response.asJava)
.asJava

/**
* Creates a flow to that publishes messages to a topic and emits the message ids.
*/
def publish(topic: String,
config: PubSubConfig,
parallelism: Int): Flow[PublishRequest, java.util.List[String], NotUsed] =
publish(topic, config, java.util.Optional.empty(), parallelism)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids and carries a context
* through.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,83 @@ protected[pubsub] trait GooglePubSub {

/**
* Creates a flow to that publishes messages to a topic and emits the message ids.
* @param overrideHost if present publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publish(topic: String,
config: PubSubConfig,
parallelism: Int = 1): Flow[PublishRequest, immutable.Seq[String], NotUsed] =
overrideHost: Option[String],
parallelism: Int): Flow[PublishRequest, immutable.Seq[String], NotUsed] =
Flow[PublishRequest]
.map((_, ()))
.via(
publishWithContext[Unit](topic, config, parallelism).asFlow
publishWithContext[Unit](topic, config, overrideHost, parallelism).asFlow
)
.map(_._1)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids.
* @param overrideHost if present publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publish(topic: String,
config: PubSubConfig,
overrideHost: Option[String]): Flow[PublishRequest, immutable.Seq[String], NotUsed] =
publish(topic, config, overrideHost, parallelism = 1)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids.
*/
def publish(topic: String,
config: PubSubConfig,
parallelism: Int = 1): Flow[PublishRequest, immutable.Seq[String], NotUsed] =
publish(topic, config, None, parallelism)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids and carries a context
* through.
* @param overrideHost if present publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publishWithContext[C](
topic: String,
config: PubSubConfig,
parallelism: Int = 1
overrideHost: Option[String],
parallelism: Int
): FlowWithContext[PublishRequest, C, immutable.Seq[String], C, NotUsed] =
// some wrapping back and forth as FlowWithContext doesn't offer `setup`
// https://github.com/akka/akka/issues/27883
FlowWithContext.fromTuples(flow(config)(httpApi.publish[C](topic, parallelism).asFlow)).map(_.messageIds)
FlowWithContext
.fromTuples(flow(config)(httpApi.publish[C](topic, parallelism, overrideHost).asFlow))
.map(_.messageIds)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids and carries a context
* through.
* @param overrideHost if present publish message will be sent to specific host,
* can be used to send message to specific regional endpoint,
* which can be important when ordering is enabled
*/
def publishWithContext[C](
topic: String,
config: PubSubConfig,
overrideHost: Option[String]
): FlowWithContext[PublishRequest, C, immutable.Seq[String], C, NotUsed] =
publishWithContext(topic, config, overrideHost, parallelism = 1)

/**
* Creates a flow to that publishes messages to a topic and emits the message ids and carries a context
* through.
*/
def publishWithContext[C](
topic: String,
config: PubSubConfig,
parallelism: Int = 1
): FlowWithContext[PublishRequest, C, immutable.Seq[String], C, NotUsed] =
publishWithContext(topic, config, None, parallelism)

/**
* Creates a source pulling messages from a subscription.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ private static void example() throws NoSuchAlgorithmException, InvalidKeySpecExc
// #publish-fast

// #publish with ordering key
// to provide ordering messages must be sent to the same regional endpoint
Flow<PublishRequest, List<String>, NotUsed> publishToRegionalEndpointFlow =
GooglePubSub.publish(topic, config, Optional.of("europe-west1-pubsub.googleapis.com"), 1);

PublishMessage publishMessageWithOrderingKey =
PublishMessage.create(
new String(Base64.getEncoder().encode("Hello Google!".getBytes())),
Expand All @@ -85,7 +89,7 @@ private static void example() throws NoSuchAlgorithmException, InvalidKeySpecExc
PublishRequest.create(Lists.newArrayList(publishMessage));

CompletionStage<List<List<String>>> publishedMessageWithOrderingKeyIds =
source.via(publishFlow).runWith(Sink.seq(), system);
source.via(publishToRegionalEndpointFlow).runWith(Sink.seq(), system);
// #publish with ordering key

// #subscribe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,45 @@ class PubSubApiSpec extends AnyFlatSpec with BeforeAndAfterAll with ScalaFutures
result.futureValue._2 shouldBe (())
}

it should "publish to overridden host" in {
val httpApiWithHostToOverride = new PubSubApi {
val isEmulated = false
val PubSubGoogleApisHost = "invalid-host" //this host must be override to complete the test
val PubSubGoogleApisPort = wiremockServer.httpsPort()
}

val publishMessage =
PublishMessage(
data = new String(Base64.getEncoder.encode("Hello Google!".getBytes))
)

val publishRequest = PublishRequest(Seq(publishMessage))

val expectedPublishRequest =
"""{"messages":[{"data":"SGVsbG8gR29vZ2xlIQ=="}]}"""
val publishResponse = """{"messageIds":["1"]}"""

mock.register(
WireMock
.post(
urlEqualTo(s"/v1/projects/${TestCredentials.projectId}/topics/topic1:publish?prettyPrint=false")
)
.withRequestBody(WireMock.equalToJson(expectedPublishRequest))
.withHeader("Authorization", WireMock.equalTo("Bearer " + accessToken))
.willReturn(
aResponse()
.withStatus(200)
.withBody(publishResponse)
.withHeader("Content-Type", "application/json")
)
)
val flow = httpApiWithHostToOverride.publish[Unit]("topic1", 1, Some("localhost"))
val result =
Source.single((publishRequest, ())).via(flow).toMat(Sink.head)(Keep.right).run()
result.futureValue._1.messageIds shouldBe Seq("1")
result.futureValue._2 shouldBe (())
}

it should "publish without Authorization header to emulator" in {

val publishMessage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ class ExampleUsage {
//#publish-fast

//#publish with ordering key
val messageWithOrderingKey =
val publishToRegionalEndpointFlow: Flow[PublishRequest, Seq[String], NotUsed] =
GooglePubSub.publish(topic, config, Some("europe-west1-pubsub.googleapis.com"))
val messageWithOrderingKey: PublishMessage =
PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)), None, Some("my-ordering-key"))
val publishedMessageWithOrderingKeyIds: Future[Seq[Seq[String]]] = Source
.single(PublishRequest(Seq(messageWithOrderingKey)))
.via(publishFlow)
.via(publishToRegionalEndpointFlow)
.runWith(Sink.seq)
//#publish with ordering key

Expand Down