Skip to content

Commit

Permalink
Add destination ContentType parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Feb 2, 2024
1 parent 9c1307d commit 0806af5
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
25 changes: 23 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

package rabbitmq

type Config struct {
URL string `json:"url" validate:"required"`
import (
"fmt"

sdk "github.com/conduitio/conduit-connector-sdk"
)

type Config struct {
URL string `json:"url" validate:"required"`
QueueName string `json:"queueName" validate:"required"`
}

Expand All @@ -26,4 +31,20 @@ type SourceConfig struct {

type DestinationConfig struct {
Config

ContentType string `json:"contentType" validate:"required"`
}

func newDestinationConfig(cfg map[string]string) (DestinationConfig, error) {
var destCfg DestinationConfig
err := sdk.Util.ParseConfig(cfg, cfg)
if err != nil {
return destCfg, fmt.Errorf("invalid config: %w", err)
}

if destCfg.ContentType == "" {
destCfg.ContentType = "text/plain"
}

return destCfg, nil
}
11 changes: 4 additions & 7 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,11 @@ func (d *Destination) Parameters() map[string]sdk.Parameter {
return d.config.Parameters()
}

func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error {
func (d *Destination) Configure(ctx context.Context, cfg map[string]string) (err error) {
sdk.Logger(ctx).Info().Msg("Configuring Destination...")
err := sdk.Util.ParseConfig(cfg, &d.config)
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}

return nil
d.config, err = newDestinationConfig(cfg)
return err
}

func (d *Destination) Open(ctx context.Context) (err error) {
Expand Down Expand Up @@ -77,7 +74,7 @@ func (d *Destination) Open(ctx context.Context) (err error) {
func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) {
for _, record := range records {
msg := amqp091.Publishing{
ContentType: "text/plain",
ContentType: d.config.ContentType,
Body: record.Payload.After.Bytes(),
}

Expand Down

0 comments on commit 0806af5

Please sign in to comment.