diff --git a/config.go b/config.go index 2ea48ec..7733c75 100644 --- a/config.go +++ b/config.go @@ -14,9 +14,14 @@ package rabbitmq -type Config struct { - URL string `json:"url" validate:"required"` +import ( + "fmt" + + sdk "github.com/conduitio/conduit-connector-sdk" +) +type Config struct { + URL string `json:"url" validate:"required"` QueueName string `json:"queueName" validate:"required"` } @@ -26,4 +31,20 @@ type SourceConfig struct { type DestinationConfig struct { Config + + ContentType string `json:"contentType" validate:"required"` +} + +func newDestinationConfig(cfg map[string]string) (DestinationConfig, error) { + var destCfg DestinationConfig + err := sdk.Util.ParseConfig(cfg, cfg) + if err != nil { + return destCfg, fmt.Errorf("invalid config: %w", err) + } + + if destCfg.ContentType == "" { + destCfg.ContentType = "text/plain" + } + + return destCfg, nil } diff --git a/destination.go b/destination.go index 6e2403b..d373d7e 100644 --- a/destination.go +++ b/destination.go @@ -42,14 +42,11 @@ func (d *Destination) Parameters() map[string]sdk.Parameter { return d.config.Parameters() } -func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error { +func (d *Destination) Configure(ctx context.Context, cfg map[string]string) (err error) { sdk.Logger(ctx).Info().Msg("Configuring Destination...") - err := sdk.Util.ParseConfig(cfg, &d.config) - if err != nil { - return fmt.Errorf("invalid config: %w", err) - } - return nil + d.config, err = newDestinationConfig(cfg) + return err } func (d *Destination) Open(ctx context.Context) (err error) { @@ -77,7 +74,7 @@ func (d *Destination) Open(ctx context.Context) (err error) { func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) { for _, record := range records { msg := amqp091.Publishing{ - ContentType: "text/plain", + ContentType: d.config.ContentType, Body: record.Payload.After.Bytes(), }