Skip to content

Commit

Permalink
Salesforce Destination Using Pubsub (#146)
Browse files Browse the repository at this point in the history
* Draft salesforce destination

nit

* Cleanup code

* Nit

* lint

* refactor tests

* Refactor Pubsub Client

* Cleanup

* Update refactor pubusb + add utisl

* Change fmt.errors to errors, modify validate on dest

* Update headers, cleanup some fields, lint

* Update errors and add retry on topics

* lint cleanup

* publish records one by one instead of batches

* add retry on schema prepare

* refactor write and publish

* Update license date, config, field checks

* Check fields and dates

* Update configs

* Update source_test.go
  • Loading branch information
anna-cross authored Nov 1, 2024
1 parent 6a9c0ad commit 9431187
Show file tree
Hide file tree
Showing 19 changed files with 878 additions and 407 deletions.
8 changes: 4 additions & 4 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ packages:
interfaces:
PubSubClient:
config:
dir: source
dir: pubsub
inpackage: false
outpkg: source
outpkg: pubsub
PubSub_SubscribeClient:
config:
dir: source
dir: pubsub
inpackage: false
outpkg: source
outpkg: pubsub
github.com/conduitio-labs/conduit-connector-salesforce/source:
interfaces:
authenticator: null
Expand Down
8 changes: 5 additions & 3 deletions cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ package main

import (
sf "github.com/conduitio-labs/conduit-connector-salesforce"
sfSourcePubsub "github.com/conduitio-labs/conduit-connector-salesforce/source"
"github.com/conduitio-labs/conduit-connector-salesforce/destination"
"github.com/conduitio-labs/conduit-connector-salesforce/source"

sdk "github.com/conduitio/conduit-connector-sdk"
)

func main() {
sdk.Serve(sdk.Connector{
NewSpecification: sf.Specification,
NewSource: sfSourcePubsub.NewSource,
NewDestination: nil,
NewSource: source.NewSource,
NewDestination: destination.NewDestination,
})
}
44 changes: 5 additions & 39 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@ package config

import (
"context"
"errors"
"fmt"
"net"
"net/url"
"slices"
"time"

sdk "github.com/conduitio/conduit-connector-sdk"
)

//go:generate paramgen -output=paramgen_config.go Config
Expand All @@ -45,46 +40,17 @@ type Config struct {

// Number of retries allowed per read before the connector errors out
RetryCount uint `json:"retryCount" default:"10"`

// Deprecated: use `topicNames` instead.
TopicName string `json:"topicName"`

// TopicNames are the TopicNames the source connector will subscribe to
TopicNames []string `json:"topicNames"`

// PollingPeriod is the client event polling interval
PollingPeriod time.Duration `json:"pollingPeriod" default:"100ms"`

// Replay preset for the position the connector is fetching events from, can be latest or default to earliest.
ReplayPreset string `json:"replayPreset" default:"earliest" validate:"inclusion=latest|earliest"`
}

func (c Config) Validate(ctx context.Context) (Config, error) {
var errs []error

if c.TopicName != "" {
sdk.Logger(ctx).Warn().
Msg(`"topicName" is deprecated, use "topicNames" instead.`)

c.TopicNames = slices.Compact(append(c.TopicNames, c.TopicName))
}

if len(c.TopicNames) == 0 {
errs = append(errs, fmt.Errorf("'topicNames' empty, need at least one topic"))
}

if c.PollingPeriod == 0 {
errs = append(errs, fmt.Errorf("polling period cannot be zero %d", c.PollingPeriod))
}

if len(errs) != 0 {
return c, errors.Join(errs...)
}

func (c Config) Validate(_ context.Context) (Config, error) {
if _, err := url.Parse(c.OAuthEndpoint); err != nil {
return c, fmt.Errorf("failed to parse oauth endpoint url: %w", err)
}

if c.PubsubAddress == "" {
return c, fmt.Errorf("invalid pubsub address %q", c.PubsubAddress)
}

if _, _, err := net.SplitHostPort(c.PubsubAddress); err != nil {
return c, fmt.Errorf("failed to parse pubsub address: %w", err)
}
Expand Down
30 changes: 0 additions & 30 deletions config/paramgen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions destination/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package destination

import (
config "github.com/conduitio-labs/conduit-connector-salesforce/config"
"github.com/conduitio/conduit-commons/opencdc"
)

type TopicFn func(opencdc.Record) (string, error)

//go:generate paramgen -output=paramgen_config.go Config
type Config struct {
config.Config

// Topic is Salesforce event or topic to write record
TopicName string `json:"topicName" validate:"required"`
}
105 changes: 105 additions & 0 deletions destination/destination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package destination

import (
"context"

pubsub "github.com/conduitio-labs/conduit-connector-salesforce/pubsub"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/go-errors/errors"
)

var _ client = (*pubsub.Client)(nil)

type client interface {
Stop(context.Context)
Close(context.Context) error
Write(context.Context, opencdc.Record) error
Initialize(context.Context, []string) error
}

type Destination struct {
sdk.UnimplementedDestination
client client
config Config
}

func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

func (d *Destination) Parameters() config.Parameters {
return d.config.Parameters()
}

func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
if err := sdk.Util.ParseConfig(
ctx,
cfg,
&d.config,
NewDestination().Parameters(),
); err != nil {
return errors.Errorf("failed to parse config: %w", err)
}

return nil
}

func (d *Destination) Open(ctx context.Context) error {
client, err := pubsub.NewGRPCClient(ctx, d.config.Config, "publish")
if err != nil {
return errors.Errorf("could not create GRPCClient: %w", err)
}

if err := client.Initialize(ctx, []string{d.config.TopicName}); err != nil {
return errors.Errorf("could not initialize pubsub client: %w", err)
}

d.client = client

sdk.Logger(ctx).Debug().
Str("at", "destination.open").
Str("topic", d.config.TopicName).
Msgf("Grpc Client has been set. Will begin read for topic: %s", d.config.TopicName)

return nil
}

func (d *Destination) Write(ctx context.Context, rr []opencdc.Record) (int, error) {
for i, r := range rr {
if err := d.client.Write(ctx, r); err != nil {
return i, errors.Errorf("failed to write records: %w", err)
}
}

return len(rr), nil
}

func (d *Destination) Teardown(ctx context.Context) error {
if d.client == nil {
return nil
}

d.client.Stop(ctx)

if err := d.client.Close(ctx); err != nil {
return errors.Errorf("error when closing subscriber conn: %w", err)
}

return nil
}
73 changes: 73 additions & 0 deletions destination/paramgen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.23.2
require (
github.com/conduitio/conduit-commons v0.4.0
github.com/conduitio/conduit-connector-sdk v0.11.0
github.com/go-errors/errors v1.5.1
github.com/golangci/golangci-lint v1.61.0
github.com/linkedin/goavro/v2 v2.13.0
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -101,7 +102,7 @@ require (
github.com/golangci/revgrep v0.5.3 // indirect
github.com/golangci/unconvert v0.0.0-20240309020433-c5143eacb3ed // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/google/uuid v1.6.0
github.com/gordonklaus/ineffassign v0.1.0 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
github.com/gostaticanalysis/comment v1.4.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ github.com/ghostiam/protogetter v0.3.6 h1:R7qEWaSgFCsy20yYHNIJsU9ZOb8TziSRRxuAOT
github.com/ghostiam/protogetter v0.3.6/go.mod h1:7lpeDnEJ1ZjL/YtyoN99ljO4z0pd3H0d18/t2dPBxHw=
github.com/go-critic/go-critic v0.11.4 h1:O7kGOCx0NDIni4czrkRIXTnit0mkyKOCePh3My6OyEU=
github.com/go-critic/go-critic v0.11.4/go.mod h1:2QAdo4iuLik5S9YG0rT4wcZ8QxwHYkrr6/2MWAiv/vc=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI=
Expand Down
Loading

0 comments on commit 9431187

Please sign in to comment.