Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP Pub/Sub transporter #1089

Merged
merged 19 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ ClientBin/
*.publishsettings
node_modules/
bower_components/
.vscode/
.editorconfig
omnisharp.json

# RIA/Silverlight projects
Generated_Code/
Expand Down
25 changes: 24 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,35 @@ services:
- POSTGRES_DATABASE=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres

gcp-pubsub:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
ports:
- "8085:8085"
command:
[
"gcloud",
"--quiet",
"beta",
"emulators",
"pubsub",
"start",
"--host-port",
"0.0.0.0:8085",
"--project",
"wolverine",
"--verbosity",
"debug",
"--log-http",
"--user-output-enabled",
]

rabbitmq:
image: "rabbitmq:management"
ports:
- "5672:5672"
- "15672:15672"

sqlserver:
image: "mcr.microsoft.com/azure-sql-edge"
ports:
Expand All @@ -27,14 +50,14 @@ services:
- "ACCEPT_EULA=Y"
- "SA_PASSWORD=P@55w0rd"
- "MSSQL_PID=Developer"

pulsar:
image: "apachepulsar/pulsar:latest"
ports:
- "6650:6650"
- "8080:8080"
command: bin/pulsar standalone


localstack:
image: localstack/localstack:stable
ports:
Expand Down
50 changes: 50 additions & 0 deletions docs/guide/messaging/transports/gcp-pubsub/conventional-routing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Conventional Message Routing

You can have Wolverine automatically determine message routing to GCP Pub/Sub
based on conventions as shown in the code snippet below. By default, this approach assumes that
each outgoing message type should be sent to topic named with the [message type name](/guide/messages.html#message-type-name-or-alias) for that
message type.

Likewise, Wolverine sets up a listener for a topic named similarly for each message type known
to be handled by the application.

<!-- snippet: sample_conventional_routing_for_pubsub -->
<a id='snippet-sample_conventional_routing_for_pubsub'></a>
```cs
var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UsePubsub("your-project-id")
.UseConventionalRouting(convention =>
{

// Optionally override the default queue naming scheme
convention.TopicNameForSender(t => t.Namespace)

// Optionally override the default queue naming scheme
.QueueNameForListener(t => t.Namespace)

// Fine tune the conventionally discovered listeners
.ConfigureListeners((listener, builder) =>
{
var messageType = builder.MessageType;
var runtime = builder.Runtime; // Access to basically everything

// customize the new queue
listener.CircuitBreaker(queue => { });

// other options...
})

// Fine tune the conventionally discovered sending endpoints
.ConfigureSending((subscriber, builder) =>
{
// Similarly, use the message type and/or wolverine runtime
// to customize the message sending
});

});
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs#L135-L168' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_conventional_routing_for_pubsub' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
66 changes: 66 additions & 0 deletions docs/guide/messaging/transports/gcp-pubsub/deadlettering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Dead Lettering

By default, Wolverine dead lettering is disabled for GCP Pub/Sub transport and Wolverine uses any persistent envelope storage for dead lettering. You can opt in to Wolverine dead lettering through GCP Pub/Sub globally as shown below.

<!-- snippet: sample_enable_wolverine_dead_lettering_for_pubsub -->
<a id='snippet-sample_enable_wolverine_dead_lettering_for_pubsub'></a>
```cs
var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UsePubsub("your-project-id")

// Enable dead lettering for all Wolverine endpoints
.EnableDeadLettering(

// Optionally configure how the GCP Pub/Sub dead letter itself
// is created by Wolverine
options =>
{
options.Topic.MessageRetentionDuration =
Duration.FromTimeSpan(TimeSpan.FromDays(14));

options.Subscription.MessageRetentionDuration =
Duration.FromTimeSpan(TimeSpan.FromDays(14));
}

);
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs#L177-L197' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_enable_wolverine_dead_lettering_for_pubsub' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

When enabled, Wolverine will try to move dead letter messages in GCP Pub/Sub to a single, global topic named "wlvrn.dead-letter".

That can be overridden on a single endpoint at a time (or by conventions too of course) like:

<!-- snippet: sample_configuring_wolverine_dead_lettering_for_pubsub -->
<a id='snippet-sample_configuring_wolverine_dead_lettering_for_pubsub'></a>
```cs
var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UsePubsub("your-project-id")
.EnableDeadLettering();

// No dead letter queueing
opts.ListenToPubsubTopic("incoming")
.DisableDeadLettering();

// Use a different dead letter queue
opts.ListenToPubsubTopic("important")
.ConfigureDeadLettering(
"important_errors",

// Optionally configure how the dead letter itself
// is built by Wolverine
e =>
{
e.TelemetryEnabled = true;
}

);
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs#L206-L229' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_wolverine_dead_lettering_for_pubsub' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
80 changes: 80 additions & 0 deletions docs/guide/messaging/transports/gcp-pubsub/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Using Google Cloud Platform (GCP) Pub/Sub

::: tip
Wolverine.AzureServiceBus is able to support inline, buffered, or durable endpoints.
:::

Wolverine supports [GCP Pub/Sub](https://cloud.google.com/pubsub) as a messaging transport through the WolverineFx.Pubsub package.

## Connecting to the Broker

After referencing the Nuget package, the next step to using GCP Pub/Sub within your Wolverine application is to connect to the service broker using the `UsePubsub()` extension method.

If you are running on Google Cloud or with Application Default Credentials (ADC), you just need to supply [your GCP project id](https://support.google.com/googleapi/answer/7014113):

<!-- snippet: sample_basic_setup_to_pubsub -->
<a id='snippet-sample_basic_setup_to_pubsub'></a>
```cs
var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UsePubsub("your-project-id")

// Let Wolverine create missing topics and subscriptions as necessary
.AutoProvision()

// Optionally purge all subscriptions on application startup.
// Warning though, this is potentially slow
.AutoPurgeOnStartup();

}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs#L17-L29' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_basic_setup_to_pubsub' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

If you'd like to connect to a GCP Pub/Sub emulator running on your development box,
you set emulator detection throught this helper:

<!-- snippet: sample_connect_to_pubsub_emulator -->
<a id='snippet-sample_connect_to_pubsub_emulator'></a>
```cs
var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UsePubsub("your-project-id")

// Tries to use GCP Pub/Sub emulator, as it defaults
// to EmulatorDetection.EmulatorOrProduction. But you can
// supply your own, like EmulatorDetection.EmulatorOnly
.UseEmulatorDetection();

}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs#L38-L48' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_connect_to_pubsub_emulator' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Request/Reply

[Request/reply](https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html) mechanics (`IMessageBus.InvokeAsync<T>()`) are possible with the GCP Pub/Sub transport *if* Wolverine has the ability to auto-provision a specific response topic and subscription for each node. That topic and subscription would be named like `wlvrn.response.[application node id]` if you happen to notice that in your GCP Pub/Sub.

### Enable System Endpoints

If your application has permissions to create topics and subscriptions in GCP Pub/Sub, you can enable system endpoints and opt in to the request/reply mechanics mentioned above.

<!-- snippet: sample_enable_system_endpoints_in_pubsub -->
<a id='snippet-sample_enable_system_endpoints_in_pubsub'></a>
```cs
var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAzureServiceBusTesting()
.AutoProvision().AutoPurgeOnStartup()
.EnableSystemEndpoints();

opts.ListenToAzureServiceBusQueue("send_and_receive");

opts.PublishAllMessages().ToAzureServiceBusQueue("send_and_receive");
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs#L57-L63' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_enable_system_endpoints_in_pubsub' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
78 changes: 78 additions & 0 deletions docs/guide/messaging/transports/gcp-pubsub/interoperability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Interoperability

Hey, it's a complicated world and Wolverine is a relative newcomer, so it's somewhat likely you'll find yourself needing to make a Wolverine application talk via GCP Pub/Sub to
a non-Wolverine application. Not to worry (too much), Wolverine has you covered with the ability to customize Wolverine to GCP Pub/Sub mapping.

You can create interoperability with non-Wolverine applications by writing a custom `IPubsubEnvelopeMapper`
as shown in the following sample:

<!-- snippet: sample_custom_pubsub_mapper -->
<a id='snippet-sample_custom_pubsub_mapper'></a>
```cs
public class CustomPubsubMapper : EnvelopeMapper<ReceivedMessage, PubsubMessage>, IPubsubEnvelopeMapper
{
public CustomPubsubMapper(PubsubEndpoint endpoint) : base(endpoint) { }

public void MapIncomingToEnvelope(PubsubEnvelope envelope, ReceivedMessage incoming)
{
envelope.AckId = incoming.AckId;

// You will have to help Wolverine out by either telling Wolverine
// what the message type is, or by reading the actual message object,
// or by telling Wolverine separately what the default message type
// is for a listening endpoint
envelope.MessageType = typeof(Message1).ToMessageTypeName();

}

public void MapOutgoingToMessage(OutgoingMessageBatch outgoing, PubsubMessage message)
{
message.Data = ByteString.CopyFrom(outgoing.Data);
}

protected override void writeOutgoingHeader(PubsubMessage outgoing, string key, string value)
{
outgoing.Attributes[key] = value;
}

protected override void writeIncomingHeaders(ReceivedMessage incoming, Envelope envelope)
{
if (incoming.Message.Attributes is null) return;

foreach (var pair in incoming.Message.Attributes) envelope.Headers[pair.Key] = pair.Value?.ToString();
}

protected override bool tryReadIncomingHeader(ReceivedMessage incoming, string key, out string? value)
{
if (incoming.Message.Attributes.TryGetValue(key, out var header))
{
value = header.ToString();

return true;
}

value = null;

return false;
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs#L253-L299' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_custom_pubsub_mapper' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

To apply that mapper to specific endpoints, use this syntax on any type of GCP Pub/Sub endpoint:

<!-- snippet: sample_configuring_custom_envelope_mapper_for_pubsub -->
<a id='snippet-sample_configuring_custom_envelope_mapper_for_pubsub'></a>
```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UsePubsub("your-project-id")
.UseConventionalRouting()
.ConfigureListeners(l => l.InteropWith(e => new CustomPubsubMapper(e)))
.ConfigureSenders(s => s.InteropWith(e => new CustomPubsubMapper(e)));
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs#L238-L245' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_custom_envelope_mapper_for_pubsub' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
39 changes: 39 additions & 0 deletions docs/guide/messaging/transports/gcp-pubsub/listening.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Listening

Setting up Wolverine listeners and GCP Pub/Sub subscriptions for GCP Pub/Sub topics is shown below:

<!-- snippet: sample_listen_to_pubsub_topic -->
<a id='snippet-sample_listen_to_pubsub_topic'></a>
```cs
var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UsePubsub("your-project-id");

opts.ListenToPubsubTopic("incoming1");

opts.ListenToPubsubTopic("incoming2")

// You can optimize the throughput by running multiple listeners
// in parallel
.ListenerCount(5)

.ConfigurePubsubSubscription(options =>
{

// Optionally configure the subscription itself
options.DeadLetterPolicy = new() {
DeadLetterTopic = "errors",
MaxDeliveryAttempts = 5
};
options.AckDeadlineSeconds = 60;
options.RetryPolicy = new() {
MinimumBackoff = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
MaximumBackoff = Duration.FromTimeSpan(TimeSpan.FromSeconds(10))
};

});
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/GCP/Wolverine.Pubsub.Tests/DocumentationSamples.cs#L72-L100' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_listen_to_pubsub_topic' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
Loading
Loading