From 81a07778c19b7a4a6c2543cd7f2263310624be94 Mon Sep 17 00:00:00 2001 From: Guillem Date: Thu, 15 Feb 2024 21:05:02 -0300 Subject: [PATCH] add more destination parameters --- README.md | 31 ++++++++++++++---------- config.go | 33 ++++++++++++++++++++++---- destination.go | 28 ++++++++++++++++------ destination_test.go | 7 +++--- paramgen_dest.go | 58 +++++++++++++++++++++++++++++++++++++++++---- 5 files changed, 124 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index af9de00..4d5c590 100644 --- a/README.md +++ b/README.md @@ -28,15 +28,22 @@ The destination connector sends data from upstream systems to RabbitMQ via Condu ### Configuration Parameters -| Name | Description | Required | Default Value | -|------------------------|---------------------------------------------------------------------|----------|---------------| -| `url` | The RabbitMQ server's URL. | Yes | | -| `queueName` | The name of the RabbitMQ queue where messages will be published to. | Yes | | -| `clientCert` | Path to the client certificate for TLS. | No | | -| `clientKey` | Path to the client's key for TLS. | No | | -| `caCert` | Path to the CA (Certificate Authority) certificate for TLS. | No | | -| `contentType` | The MIME content type of the messages written to RabbitMQ. | No | `text/plain` | -| `exchangeName` | The name of the exchange to publish to. | No | | -| `exchangeType` | The type of the exchange to publish to. | No | `direct` | -| `routingKey` | The routing key to use when publishing to an exchange. | No | | - +| Name | Description | Required | Default Value | +|-------------------------|---------------------------------------------------------------------|----------|---------------| +| `url` | The RabbitMQ server's URL. | Yes | | +| `queueName` | The name of the RabbitMQ queue where messages will be published to. | Yes | | +| `clientCert` | Path to the client certificate for TLS. | No | | +| `clientKey` | Path to the client's key for TLS. | No | | +| `caCert` | Path to the CA (Certificate Authority) certificate for TLS. | No | | +| `contentType` | The MIME content type of the messages written to RabbitMQ. | No | `text/plain` | +| `exchange.name` | The name of the exchange to publish to. | No | | +| `exchange.type` | The type of the exchange to publish to. | No | `direct` | +| `exchange.durable` | Specifies whether the exchange is durable. | No | `true` | +| `exchange.autoDelete` | If the exchange will auto-delete. | No | `false` | +| `exchange.internal` | If the exchange is internal. | No | `false` | +| `exchange.noWait` | If the exchange is declared without waiting for server reply. | No | `false` | +| `queue.durable` | Specifies whether the queue is durable. | No | `true` | +| `queue.autoDelete` | If the queue will auto-delete. | No | `false` | +| `queue.exclusive` | If the queue is exclusive. | No | `false` | +| `queue.noWait` | If the queue is declared without waiting for server reply. | No | `false` | +| `routingKey` | The routing key to use when publishing to an exchange. | No | | diff --git a/config.go b/config.go index 5335ade..1530c08 100644 --- a/config.go +++ b/config.go @@ -44,17 +44,40 @@ type SourceConfig struct { Config } +type QueueConfig struct { + // Durable indicates if the queue will survive broker restarts. + Durable bool `json:"durable" default:"true"` + // AutoDelete indicates if the queue will be deleted when there are no more consumers. + AutoDelete bool `json:"autoDelete" default:"false"` + // Exclusive indicates if the queue can be accessed by other connections. + Exclusive bool `json:"exclusive" default:"false"` + // NoWait indicates if the queue should be declared without waiting for server confirmation. + NoWait bool `json:"noWait" default:"false"` +} + +type ExchangeConfig struct { + // Name is the name of the exchange. + Name string `json:"name"` + // Type is the type of the exchange (e.g., direct, fanout, topic, headers). + Type string `json:"type"` + // Durable indicates if the exchange will survive broker restarts. + Durable bool `json:"durable" default:"true"` + // AutoDelete indicates if the exchange will be deleted when the last queue is unbound from it. + AutoDelete bool `json:"autoDelete" default:"false"` + // Internal indicates if the exchange is used for internal purposes and cannot be directly published to by a client. + Internal bool `json:"internal" default:"false"` + // NoWait indicates if the exchange should be declared without waiting for server confirmation. + NoWait bool `json:"noWait" default:"false"` +} + type DestinationConfig struct { Config // ContentType is the MIME content type of the messages written to rabbitmq ContentType string `json:"contentType" default:"text/plain"` - // ExchangeName is the name of the exchange to publish to - ExchangeName string `json:"exchangeName" default:""` - - // ExchangeType is the type of the exchange to publish to - ExchangeType string `json:"exchangeType" default:"direct"` + Queue QueueConfig `json:"queue"` + Exchange ExchangeConfig `json:"exchange"` // RoutingKey is the routing key to use when publishing to an exchange RoutingKey string `json:"routingKey" default:""` diff --git a/destination.go b/destination.go index 190e3b8..206d299 100644 --- a/destination.go +++ b/destination.go @@ -50,7 +50,7 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) (err return fmt.Errorf("invalid config: %w", err) } - d.exchange = d.config.ExchangeName + d.exchange = d.config.Exchange.Name d.routingKey = d.config.RoutingKey if d.exchange == "" { d.routingKey = d.config.QueueName @@ -83,24 +83,38 @@ func (d *Destination) Open(ctx context.Context) (err error) { } sdk.Logger(ctx).Debug().Msgf("opened channel") - _, err = d.ch.QueueDeclare(d.config.QueueName, false, false, false, false, nil) + _, err = d.ch.QueueDeclare( + d.config.QueueName, + d.config.Queue.Durable, + d.config.Queue.AutoDelete, + d.config.Queue.Exclusive, + d.config.Queue.NoWait, + nil) if err != nil { return fmt.Errorf("failed to declare queue: %w", err) } sdk.Logger(ctx).Debug().Msgf("declared queue %s", d.config.QueueName) - if d.config.ExchangeName != "" { - err = d.ch.ExchangeDeclare(d.config.ExchangeName, d.config.ExchangeType, false, false, false, false, nil) + if d.config.Exchange.Name != "" { + err = d.ch.ExchangeDeclare( + d.config.Exchange.Name, + d.config.Exchange.Type, + d.config.Exchange.Durable, + d.config.Exchange.AutoDelete, + d.config.Exchange.Internal, + d.config.Exchange.NoWait, + nil, + ) if err != nil { return fmt.Errorf("failed to declare exchange: %w", err) } - sdk.Logger(ctx).Debug().Msgf("declared exchange %s", d.config.ExchangeName) + sdk.Logger(ctx).Debug().Msgf("declared exchange %s", d.config.Exchange.Name) - err = d.ch.QueueBind(d.config.QueueName, d.config.RoutingKey, d.config.ExchangeName, false, nil) + err = d.ch.QueueBind(d.config.QueueName, d.config.RoutingKey, d.config.Exchange.Name, false, nil) if err != nil { return fmt.Errorf("failed to bind queue to exchange: %w", err) } - sdk.Logger(ctx).Debug().Msgf("bound queue %s to exchange %s with routing key %s", d.config.QueueName, d.config.ExchangeName, d.config.RoutingKey) + sdk.Logger(ctx).Debug().Msgf("bound queue %s to exchange %s with routing key %s", d.config.QueueName, d.config.Exchange.Name, d.config.RoutingKey) } return nil diff --git a/destination_test.go b/destination_test.go index 51efe98..fb2dce3 100644 --- a/destination_test.go +++ b/destination_test.go @@ -51,10 +51,9 @@ func testExchange(is *is.I, queueName, exchangeName, exchangeType, routingKey st dest := NewDestination() destCfg := cfgToMap(DestinationConfig{ - Config: sharedCfg, - ExchangeName: exchangeName, - ExchangeType: exchangeType, - RoutingKey: routingKey, + Config: sharedCfg, + Exchange: ExchangeConfig{Name: exchangeName, Type: exchangeType}, + RoutingKey: routingKey, }) err := dest.Configure(ctx, destCfg) is.NoErr(err) diff --git a/paramgen_dest.go b/paramgen_dest.go index 6c98b25..722a210 100644 --- a/paramgen_dest.go +++ b/paramgen_dest.go @@ -33,18 +33,66 @@ func (DestinationConfig) Parameters() map[string]sdk.Parameter { Type: sdk.ParameterTypeString, Validations: []sdk.Validation{}, }, - "exchangeName": { + "exchange.autoDelete": { + Default: "false", + Description: "autoDelete indicates if the exchange will be deleted when the last queue is unbound from it.", + Type: sdk.ParameterTypeBool, + Validations: []sdk.Validation{}, + }, + "exchange.durable": { + Default: "true", + Description: "durable indicates if the exchange will survive broker restarts.", + Type: sdk.ParameterTypeBool, + Validations: []sdk.Validation{}, + }, + "exchange.internal": { + Default: "false", + Description: "internal indicates if the exchange is used for internal purposes and cannot be directly published to by a client.", + Type: sdk.ParameterTypeBool, + Validations: []sdk.Validation{}, + }, + "exchange.name": { Default: "", - Description: "exchangeName is the name of the exchange to publish to", + Description: "name is the name of the exchange.", Type: sdk.ParameterTypeString, Validations: []sdk.Validation{}, }, - "exchangeType": { - Default: "direct", - Description: "exchangeType is the type of the exchange to publish to", + "exchange.noWait": { + Default: "false", + Description: "noWait indicates if the exchange should be declared without waiting for server confirmation.", + Type: sdk.ParameterTypeBool, + Validations: []sdk.Validation{}, + }, + "exchange.type": { + Default: "", + Description: "type is the type of the exchange (e.g., direct, fanout, topic, headers).", Type: sdk.ParameterTypeString, Validations: []sdk.Validation{}, }, + "queue.autoDelete": { + Default: "false", + Description: "autoDelete indicates if the queue will be deleted when there are no more consumers.", + Type: sdk.ParameterTypeBool, + Validations: []sdk.Validation{}, + }, + "queue.durable": { + Default: "true", + Description: "durable indicates if the queue will survive broker restarts.", + Type: sdk.ParameterTypeBool, + Validations: []sdk.Validation{}, + }, + "queue.exclusive": { + Default: "false", + Description: "exclusive indicates if the queue can be accessed by other connections.", + Type: sdk.ParameterTypeBool, + Validations: []sdk.Validation{}, + }, + "queue.noWait": { + Default: "false", + Description: "noWait indicates if the queue should be declared without waiting for server confirmation.", + Type: sdk.ParameterTypeBool, + Validations: []sdk.Validation{}, + }, "queueName": { Default: "", Description: "queueName is the name of the queue to consume from / publish to",