Skip to content

Commit

Permalink
add more destination parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Feb 16, 2024
1 parent c291662 commit 81a0777
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 33 deletions.
31 changes: 19 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,22 @@ The destination connector sends data from upstream systems to RabbitMQ via Condu

### Configuration Parameters

| Name | Description | Required | Default Value |
|------------------------|---------------------------------------------------------------------|----------|---------------|
| `url` | The RabbitMQ server's URL. | Yes | |
| `queueName` | The name of the RabbitMQ queue where messages will be published to. | Yes | |
| `clientCert` | Path to the client certificate for TLS. | No | |
| `clientKey` | Path to the client's key for TLS. | No | |
| `caCert` | Path to the CA (Certificate Authority) certificate for TLS. | No | |
| `contentType` | The MIME content type of the messages written to RabbitMQ. | No | `text/plain` |
| `exchangeName` | The name of the exchange to publish to. | No | |
| `exchangeType` | The type of the exchange to publish to. | No | `direct` |
| `routingKey` | The routing key to use when publishing to an exchange. | No | |

| Name | Description | Required | Default Value |
|-------------------------|---------------------------------------------------------------------|----------|---------------|
| `url` | The RabbitMQ server's URL. | Yes | |
| `queueName` | The name of the RabbitMQ queue where messages will be published to. | Yes | |
| `clientCert` | Path to the client certificate for TLS. | No | |
| `clientKey` | Path to the client's key for TLS. | No | |
| `caCert` | Path to the CA (Certificate Authority) certificate for TLS. | No | |
| `contentType` | The MIME content type of the messages written to RabbitMQ. | No | `text/plain` |
| `exchange.name` | The name of the exchange to publish to. | No | |
| `exchange.type` | The type of the exchange to publish to. | No | `direct` |
| `exchange.durable` | Specifies whether the exchange is durable. | No | `true` |
| `exchange.autoDelete` | If the exchange will auto-delete. | No | `false` |
| `exchange.internal` | If the exchange is internal. | No | `false` |
| `exchange.noWait` | If the exchange is declared without waiting for server reply. | No | `false` |
| `queue.durable` | Specifies whether the queue is durable. | No | `true` |
| `queue.autoDelete` | If the queue will auto-delete. | No | `false` |
| `queue.exclusive` | If the queue is exclusive. | No | `false` |
| `queue.noWait` | If the queue is declared without waiting for server reply. | No | `false` |
| `routingKey` | The routing key to use when publishing to an exchange. | No | |
33 changes: 28 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,40 @@ type SourceConfig struct {
Config
}

type QueueConfig struct {
// Durable indicates if the queue will survive broker restarts.
Durable bool `json:"durable" default:"true"`
// AutoDelete indicates if the queue will be deleted when there are no more consumers.
AutoDelete bool `json:"autoDelete" default:"false"`
// Exclusive indicates if the queue can be accessed by other connections.
Exclusive bool `json:"exclusive" default:"false"`
// NoWait indicates if the queue should be declared without waiting for server confirmation.
NoWait bool `json:"noWait" default:"false"`
}

type ExchangeConfig struct {
// Name is the name of the exchange.
Name string `json:"name"`
// Type is the type of the exchange (e.g., direct, fanout, topic, headers).
Type string `json:"type"`
// Durable indicates if the exchange will survive broker restarts.
Durable bool `json:"durable" default:"true"`
// AutoDelete indicates if the exchange will be deleted when the last queue is unbound from it.
AutoDelete bool `json:"autoDelete" default:"false"`
// Internal indicates if the exchange is used for internal purposes and cannot be directly published to by a client.
Internal bool `json:"internal" default:"false"`
// NoWait indicates if the exchange should be declared without waiting for server confirmation.
NoWait bool `json:"noWait" default:"false"`
}

type DestinationConfig struct {
Config

// ContentType is the MIME content type of the messages written to rabbitmq
ContentType string `json:"contentType" default:"text/plain"`

// ExchangeName is the name of the exchange to publish to
ExchangeName string `json:"exchangeName" default:""`

// ExchangeType is the type of the exchange to publish to
ExchangeType string `json:"exchangeType" default:"direct"`
Queue QueueConfig `json:"queue"`
Exchange ExchangeConfig `json:"exchange"`

// RoutingKey is the routing key to use when publishing to an exchange
RoutingKey string `json:"routingKey" default:""`
Expand Down
28 changes: 21 additions & 7 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) (err
return fmt.Errorf("invalid config: %w", err)
}

d.exchange = d.config.ExchangeName
d.exchange = d.config.Exchange.Name
d.routingKey = d.config.RoutingKey
if d.exchange == "" {
d.routingKey = d.config.QueueName
Expand Down Expand Up @@ -83,24 +83,38 @@ func (d *Destination) Open(ctx context.Context) (err error) {
}
sdk.Logger(ctx).Debug().Msgf("opened channel")

_, err = d.ch.QueueDeclare(d.config.QueueName, false, false, false, false, nil)
_, err = d.ch.QueueDeclare(
d.config.QueueName,
d.config.Queue.Durable,
d.config.Queue.AutoDelete,
d.config.Queue.Exclusive,
d.config.Queue.NoWait,
nil)
if err != nil {
return fmt.Errorf("failed to declare queue: %w", err)
}
sdk.Logger(ctx).Debug().Msgf("declared queue %s", d.config.QueueName)

if d.config.ExchangeName != "" {
err = d.ch.ExchangeDeclare(d.config.ExchangeName, d.config.ExchangeType, false, false, false, false, nil)
if d.config.Exchange.Name != "" {
err = d.ch.ExchangeDeclare(
d.config.Exchange.Name,
d.config.Exchange.Type,
d.config.Exchange.Durable,
d.config.Exchange.AutoDelete,
d.config.Exchange.Internal,
d.config.Exchange.NoWait,
nil,
)
if err != nil {
return fmt.Errorf("failed to declare exchange: %w", err)
}
sdk.Logger(ctx).Debug().Msgf("declared exchange %s", d.config.ExchangeName)
sdk.Logger(ctx).Debug().Msgf("declared exchange %s", d.config.Exchange.Name)

err = d.ch.QueueBind(d.config.QueueName, d.config.RoutingKey, d.config.ExchangeName, false, nil)
err = d.ch.QueueBind(d.config.QueueName, d.config.RoutingKey, d.config.Exchange.Name, false, nil)
if err != nil {
return fmt.Errorf("failed to bind queue to exchange: %w", err)
}
sdk.Logger(ctx).Debug().Msgf("bound queue %s to exchange %s with routing key %s", d.config.QueueName, d.config.ExchangeName, d.config.RoutingKey)
sdk.Logger(ctx).Debug().Msgf("bound queue %s to exchange %s with routing key %s", d.config.QueueName, d.config.Exchange.Name, d.config.RoutingKey)
}

return nil
Expand Down
7 changes: 3 additions & 4 deletions destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ func testExchange(is *is.I, queueName, exchangeName, exchangeType, routingKey st

dest := NewDestination()
destCfg := cfgToMap(DestinationConfig{
Config: sharedCfg,
ExchangeName: exchangeName,
ExchangeType: exchangeType,
RoutingKey: routingKey,
Config: sharedCfg,
Exchange: ExchangeConfig{Name: exchangeName, Type: exchangeType},
RoutingKey: routingKey,
})
err := dest.Configure(ctx, destCfg)
is.NoErr(err)
Expand Down
58 changes: 53 additions & 5 deletions paramgen_dest.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 81a0777

Please sign in to comment.