From 8b230b777de4b4f4a5495584a5ae32e66c480552 Mon Sep 17 00:00:00 2001 From: Maciej Kudas Date: Thu, 7 Jul 2022 14:07:43 +0200 Subject: [PATCH] test(acceptance): add acceptance tests (#10) --- cmd/sf/acceptance_test.go | 87 +++++++++++ cmd/sf/main.go | 6 +- cmd/sf/oauth_client_mock_test.go | 34 +++++ cmd/sf/streaming_client_mock_test.go | 95 ++++++++++++ go.mod | 11 +- go.sum | 45 ++++-- internal/cometd/client.go | 26 ++-- internal/cometd/responses/connect.go | 28 ++-- internal/salesforce/oauth/client.go | 15 +- internal/salesforce/oauth/client_test.go | 20 +-- source/source.go | 180 ++++++++++++++--------- spec.go | 1 + 12 files changed, 419 insertions(+), 129 deletions(-) create mode 100644 cmd/sf/acceptance_test.go create mode 100644 cmd/sf/oauth_client_mock_test.go create mode 100644 cmd/sf/streaming_client_mock_test.go diff --git a/cmd/sf/acceptance_test.go b/cmd/sf/acceptance_test.go new file mode 100644 index 0000000..e7455b4 --- /dev/null +++ b/cmd/sf/acceptance_test.go @@ -0,0 +1,87 @@ +// Copyright © 2022 Meroxa, Inc. and Miquido +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "testing" + + sdk "github.com/conduitio/conduit-connector-sdk" + sf "github.com/miquido/conduit-connector-salesforce" + "github.com/miquido/conduit-connector-salesforce/internal/cometd" + "github.com/miquido/conduit-connector-salesforce/internal/salesforce/oauth" + sfSource "github.com/miquido/conduit-connector-salesforce/source" +) + +type CustomConfigurableAcceptanceTestDriver struct { + sdk.ConfigurableAcceptanceTestDriver + + streamingClient *streamingClientMock +} + +func (d *CustomConfigurableAcceptanceTestDriver) WriteToSource(_ *testing.T, records []sdk.Record) (results []sdk.Record) { + d.streamingClient.SetResults(records) + + // No destination connector, return wanted records + for _, record := range records { + record.Key = nil + + results = append(results, record) + } + + return +} + +func TestAcceptance(t *testing.T) { + sourceConfig := map[string]string{ + sfSource.ConfigKeyEnvironment: oauth.EnvironmentSandbox, + sfSource.ConfigKeyClientID: "client-id", + sfSource.ConfigKeyClientSecret: "client-secret", + sfSource.ConfigKeyUsername: "username", + sfSource.ConfigKeyPassword: "password", + sfSource.ConfigKeySecurityToken: "security-token", + sfSource.ConfigKeyPushTopicsNames: "MyTopic1,MyTopic2", + } + + sfSource.OAuthClientFactory = func(_ oauth.Environment, _, _, _, _, _ string) oauth.Client { + return &oAuthClientMock{} + } + + streamingClient := &streamingClientMock{} + sfSource.StreamingClientFactory = func(_, _ string) (cometd.Client, error) { + return streamingClient, nil + } + + sdk.AcceptanceTest(t, &CustomConfigurableAcceptanceTestDriver{ + ConfigurableAcceptanceTestDriver: sdk.ConfigurableAcceptanceTestDriver{ + Config: sdk.ConfigurableAcceptanceTestDriverConfig{ + Connector: sdk.Connector{ + NewSpecification: sf.Specification, + NewSource: sfSource.NewSource, + NewDestination: nil, + }, + + SourceConfig: sourceConfig, + GenerateDataType: sdk.GenerateStructuredData, + + Skip: []string{ + "TestAcceptance/TestSource_Open_ResumeAtPositionCDC", + "TestAcceptance/TestSource_Open_ResumeAtPositionSnapshot", + }, + }, + }, + + streamingClient: streamingClient, + }) +} diff --git a/cmd/sf/main.go b/cmd/sf/main.go index cec1202..645f24d 100644 --- a/cmd/sf/main.go +++ b/cmd/sf/main.go @@ -21,5 +21,9 @@ import ( ) func main() { - sdk.Serve(sf.Specification, sfSource.NewSource, nil) + sdk.Serve(sdk.Connector{ + NewSpecification: sf.Specification, + NewSource: sfSource.NewSource, + NewDestination: nil, + }) } diff --git a/cmd/sf/oauth_client_mock_test.go b/cmd/sf/oauth_client_mock_test.go new file mode 100644 index 0000000..fb07504 --- /dev/null +++ b/cmd/sf/oauth_client_mock_test.go @@ -0,0 +1,34 @@ +// Copyright © 2022 Meroxa, Inc. and Miquido +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + + "github.com/miquido/conduit-connector-salesforce/internal/salesforce/oauth/response" +) + +type oAuthClientMock struct { +} + +func (c *oAuthClientMock) Authenticate(_ context.Context) (response.TokenResponse, error) { + return response.TokenResponse{ + AccessToken: "access-token", + InstanceURL: "hxxp://instance.url", + ID: "1", + IssuedAt: "2", + Signature: "3", + }, nil +} diff --git a/cmd/sf/streaming_client_mock_test.go b/cmd/sf/streaming_client_mock_test.go new file mode 100644 index 0000000..264cb2b --- /dev/null +++ b/cmd/sf/streaming_client_mock_test.go @@ -0,0 +1,95 @@ +// Copyright © 2022 Meroxa, Inc. and Miquido +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "sync" + + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/miquido/conduit-connector-salesforce/internal/cometd/responses" +) + +type streamingClientMock struct { + results []sdk.Record + lastIndex int + mutex sync.Mutex +} + +func (s *streamingClientMock) SetResults(results []sdk.Record) { + s.mutex.Lock() + s.results = results + s.mutex.Unlock() +} + +func (s *streamingClientMock) Handshake(_ context.Context) (responses.SuccessfulHandshakeResponse, error) { + // Make Handshake always successful + return responses.SuccessfulHandshakeResponse{ + Successful: true, + }, nil +} + +func (s *streamingClientMock) Connect(_ context.Context) (responses.ConnectResponse, error) { + s.mutex.Lock() + + response := responses.ConnectResponse{ + Successful: true, + Events: make([]responses.ConnectResponseEvent, 0, len(s.results)), + } + + for _, record := range s.results { + response.Events = append(response.Events, responses.ConnectResponseEvent{ + Data: responses.ConnectResponseEventData{ + Event: responses.ConnectResponseEventDataMetadata{ + CreatedDate: record.CreatedAt, + ReplayID: s.lastIndex, + }, + SObject: record.Payload.(sdk.StructuredData), + }, + Channel: "MyTopic1", + }) + + s.lastIndex++ + } + + s.results = nil + + s.mutex.Unlock() + + // Make Connect always successful + return response, nil +} + +func (s *streamingClientMock) SubscribeToPushTopic(_ context.Context, pushTopic string) (responses.SubscribeResponse, error) { + // Make SubscribeToPushTopic always successful + return responses.SubscribeResponse{ + Successful: true, + Subscription: []string{pushTopic}, + }, nil +} + +func (s *streamingClientMock) UnsubscribeToPushTopic(ctx context.Context, pushTopic string) (responses.UnsubscribeResponse, error) { + // Make UnsubscribeToPushTopic always successful + return responses.UnsubscribeResponse{ + Successful: true, + }, nil +} + +func (s *streamingClientMock) Disconnect(ctx context.Context) (responses.DisconnectResponse, error) { + // Make Disconnect always successful + return responses.DisconnectResponse{ + Successful: true, + }, nil +} diff --git a/go.mod b/go.mod index 587bd06..32450f5 100644 --- a/go.mod +++ b/go.mod @@ -3,34 +3,37 @@ module github.com/miquido/conduit-connector-salesforce go 1.18 require ( - github.com/conduitio/conduit-connector-sdk v0.2.0 + github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220608071937-511c321558fc github.com/jaswdr/faker v1.12.0 + github.com/sigmavirus24/gobayeux v1.0.0 + github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.8.0 golang.org/x/net v0.0.0-20220531201128-c960675eff93 + gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) require ( github.com/conduitio/conduit-connector-protocol v0.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/hashicorp/go-hclog v1.2.0 // indirect github.com/hashicorp/go-plugin v1.4.4 // indirect github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect github.com/jpillora/backoff v1.0.0 // indirect + github.com/matryer/is v1.4.0 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mitchellh/go-testing-interface v1.14.1 // indirect github.com/oklog/run v1.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rs/zerolog v1.26.1 // indirect + github.com/rs/zerolog v1.27.0 // indirect go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1 // indirect + go.uber.org/goleak v1.1.12 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/genproto v0.0.0-20220601144221-27df5f98adab // indirect google.golang.org/grpc v1.47.0 // indirect google.golang.org/protobuf v1.28.0 // indirect - gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4afa9ff..52164d6 100644 --- a/go.sum +++ b/go.sum @@ -14,9 +14,9 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/conduitio/conduit-connector-protocol v0.2.0 h1:gwYXVKEMgTtU67ephQ5WwTGIDbT/eTLA9Mdr9Bnbqxc= github.com/conduitio/conduit-connector-protocol v0.2.0/go.mod h1:udCU2AkLcYQoLjAO06tHVL2iFJPw+DamK+wllnj50hk= -github.com/conduitio/conduit-connector-sdk v0.2.0 h1:yReJT3SOAGqJIlk59WC5FPgpv0Gg+NG4NFj6FJ89XnM= -github.com/conduitio/conduit-connector-sdk v0.2.0/go.mod h1:zZ/YJqhIZyXdVmFJS55zqkukpBmB+ohbX2kDduoj8Z0= -github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220608071937-511c321558fc h1:OIOPxzthWycx2JdMfW6r7AMTXmQHvLHBTH285ZkVb+8= +github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220608071937-511c321558fc/go.mod h1:iz8Hbw5NjAHAAEL6lOKKg6+2EYSmYwZtcDQv1iqP8RM= +github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -35,7 +35,6 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= -github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= @@ -71,7 +70,14 @@ github.com/jaswdr/faker v1.12.0/go.mod h1:x7ZlyB1AZqwqKZgyQlnqEG8FDptmHlncA5u2zY github.com/jhump/protoreflect v1.10.2-0.20211108190630-d551e22cd340 h1:Vdzuzjwa0C0Vd7+eBTXaEKqarx2S0TG1u5TTugjHLkk= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE= +github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= @@ -91,29 +97,37 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= -github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc= +github.com/rs/zerolog v1.27.0 h1:1T7qCieN22GVc8S4Q2yuexzBb1EqjbgjSH9RohbMjKs= +github.com/rs/zerolog v1.27.0/go.mod h1:7frBqO0oezxmnO7GF86FY++uy8I0Tk/If5ni1G9Qc0U= +github.com/sigmavirus24/gobayeux v1.0.0 h1:nAMp5XczwtFKl2Ga6Ydh3YOzhRm5CfnbFsSlbhbl8SM= +github.com/sigmavirus24/gobayeux v1.0.0/go.mod h1:wpYl2kmQiEOz0GpKUnttDLL7w9ZVVnZaMxiBiKwWBIQ= +github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1 h1:EHYFlC8XppCJX8C3TS06BC3xA6ctiowDlySWErdOaXU= go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.1/go.mod h1:iYPhlwHzhRoPYviJbA604qT6wYuQghfrebmXUXLKjk8= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -122,11 +136,10 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220531201128-c960675eff93 h1:MYimHLfoXEpOhqd/zgoA/uoXzHB86AEky4LAx5ij9xA= golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -140,7 +153,9 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -148,10 +163,8 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -159,7 +172,6 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -168,8 +180,8 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= +golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -207,8 +219,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs= gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/cometd/client.go b/internal/cometd/client.go index f1d830e..ee9466f 100644 --- a/internal/cometd/client.go +++ b/internal/cometd/client.go @@ -28,7 +28,15 @@ import ( "golang.org/x/net/publicsuffix" ) -func NewClient(baseURL, accessToken string) (*Client, error) { +type Client interface { + Handshake(ctx context.Context) (responses.SuccessfulHandshakeResponse, error) + Connect(ctx context.Context) (responses.ConnectResponse, error) + SubscribeToPushTopic(ctx context.Context, pushTopic string) (responses.SubscribeResponse, error) + UnsubscribeToPushTopic(ctx context.Context, pushTopic string) (responses.UnsubscribeResponse, error) + Disconnect(ctx context.Context) (responses.DisconnectResponse, error) +} + +func NewDefaultClient(baseURL, accessToken string) (Client, error) { jar, err := cookiejar.New(&cookiejar.Options{ PublicSuffixList: publicsuffix.List, }) @@ -36,14 +44,14 @@ func NewClient(baseURL, accessToken string) (*Client, error) { return nil, err } - return &Client{ + return &DefaultClient{ baseURL: baseURL, accessToken: accessToken, longPollClient: &http.Client{Jar: jar}, }, nil } -type Client struct { +type DefaultClient struct { baseURL string accessToken string clientID string @@ -52,7 +60,7 @@ type Client struct { // Handshake performs a handshake request. // See: https://docs.cometd.org/current7/reference/#_bayeux_meta_handshake -func (s *Client) Handshake(ctx context.Context) (responses.SuccessfulHandshakeResponse, error) { +func (s *DefaultClient) Handshake(ctx context.Context) (responses.SuccessfulHandshakeResponse, error) { // Prepare and send request responseData, err := s.httpPost(ctx, requests.HandshakeRequest{}) if err != nil { @@ -80,7 +88,7 @@ func (s *Client) Handshake(ctx context.Context) (responses.SuccessfulHandshakeRe // Connect performs a connect request. // See: https://docs.cometd.org/current7/reference/#_bayeux_meta_connect -func (s *Client) Connect(ctx context.Context) (responses.ConnectResponse, error) { +func (s *DefaultClient) Connect(ctx context.Context) (responses.ConnectResponse, error) { // Prepare and send request responseData, err := s.httpPost(ctx, requests.ConnectRequest{ ClientID: s.clientID, @@ -142,7 +150,7 @@ func (s *Client) Connect(ctx context.Context) (responses.ConnectResponse, error) // SubscribeToPushTopic performs a subscribe to topic request. // See: https://docs.cometd.org/current7/reference/#_bayeux_meta_subscribe -func (s *Client) SubscribeToPushTopic(ctx context.Context, pushTopic string) (responses.SubscribeResponse, error) { +func (s *DefaultClient) SubscribeToPushTopic(ctx context.Context, pushTopic string) (responses.SubscribeResponse, error) { // Prepare and send request responseData, err := s.httpPost(ctx, requests.SubscribePushTopicRequest{ ClientID: s.clientID, @@ -165,7 +173,7 @@ func (s *Client) SubscribeToPushTopic(ctx context.Context, pushTopic string) (re // UnsubscribeToPushTopic performs a unsubscribe from topic request. // See: https://docs.cometd.org/current7/reference/#_bayeux_meta_unsubscribe -func (s *Client) UnsubscribeToPushTopic(ctx context.Context, pushTopic string) (responses.UnsubscribeResponse, error) { +func (s *DefaultClient) UnsubscribeToPushTopic(ctx context.Context, pushTopic string) (responses.UnsubscribeResponse, error) { // Prepare and send request responseData, err := s.httpPost(ctx, requests.UnsubscribePushTopicRequest{ ClientID: s.clientID, @@ -188,7 +196,7 @@ func (s *Client) UnsubscribeToPushTopic(ctx context.Context, pushTopic string) ( // Disconnect performs a disconnect request. // See: https://docs.cometd.org/current7/reference/#_bayeux_meta_disconnect -func (s *Client) Disconnect(ctx context.Context) (responses.DisconnectResponse, error) { +func (s *DefaultClient) Disconnect(ctx context.Context) (responses.DisconnectResponse, error) { // Prepare and send request responseData, err := s.httpPost(ctx, requests.DisconnectRequest{ ClientID: s.clientID, @@ -209,7 +217,7 @@ func (s *Client) Disconnect(ctx context.Context) (responses.DisconnectResponse, } // httpPost sends a POST request to the CometD server -func (s *Client) httpPost(ctx context.Context, payload requests.Request) ([]byte, error) { +func (s *DefaultClient) httpPost(ctx context.Context, payload requests.Request) ([]byte, error) { // Prepare the payload requestData, err := payload.MarshalJSON() if err != nil { diff --git a/internal/cometd/responses/connect.go b/internal/cometd/responses/connect.go index 225d5e2..e4549e8 100644 --- a/internal/cometd/responses/connect.go +++ b/internal/cometd/responses/connect.go @@ -55,16 +55,20 @@ const ( // ConnectResponseEvent represents single piece of data returned in connect response type ConnectResponseEvent struct { - Data struct { - Event struct { - CreatedDate time.Time `json:"createdDate"` - ReplayID int `json:"replayId"` - - // Type denotes creation, update, delete, or undelete of a record - // See: https://developer.salesforce.com/docs/atlas.en-us.236.0.api_streaming.meta/api_streaming/terms.htm - Type ConnectResponseEventType `json:"type"` - } `json:"event"` - SObject map[string]interface{} `json:"sobject"` - } `json:"data"` - Channel string `json:"channel"` + Data ConnectResponseEventData `json:"data"` + Channel string `json:"channel"` +} + +type ConnectResponseEventData struct { + Event ConnectResponseEventDataMetadata `json:"event"` + SObject map[string]interface{} `json:"sobject"` +} + +type ConnectResponseEventDataMetadata struct { + CreatedDate time.Time `json:"createdDate"` + ReplayID int `json:"replayId"` + + // Type denotes creation, update, delete, or undelete of a record + // See: https://developer.salesforce.com/docs/atlas.en-us.236.0.api_streaming.meta/api_streaming/terms.htm + Type ConnectResponseEventType `json:"type"` } diff --git a/internal/salesforce/oauth/client.go b/internal/salesforce/oauth/client.go index 0bda18e..73bdacf 100644 --- a/internal/salesforce/oauth/client.go +++ b/internal/salesforce/oauth/client.go @@ -36,15 +36,20 @@ const ( testLoginURI = "https://test.salesforce.com/services/oauth2/token" ) -func NewClient( +//go:generate moq -out client_moq_test.go . Client +type Client interface { + Authenticate(ctx context.Context) (response.TokenResponse, error) +} + +func NewDefaultClient( environment Environment, clientID string, clientSecret string, username string, password string, securityToken string, -) *Client { - return &Client{ +) Client { + return &DefaultClient{ httpClient: http.DefaultClient, environment: environment, clientID: clientID, @@ -55,7 +60,7 @@ func NewClient( } } -type Client struct { +type DefaultClient struct { httpClient httpClient environment Environment clientID string @@ -71,7 +76,7 @@ type httpClient interface { } // Authenticate attempts to authenticate the client with given credentials -func (a *Client) Authenticate(ctx context.Context) (response.TokenResponse, error) { +func (a *DefaultClient) Authenticate(ctx context.Context) (response.TokenResponse, error) { // Prepare request payload payload := url.Values{ "grant_type": {grantType}, diff --git a/internal/salesforce/oauth/client_test.go b/internal/salesforce/oauth/client_test.go index 1fbc966..79234a6 100644 --- a/internal/salesforce/oauth/client_test.go +++ b/internal/salesforce/oauth/client_test.go @@ -44,16 +44,16 @@ func TestNewClient(t *testing.T) { securityToken = fakerInstance.RandomStringWithLength(32) ) - client := NewClient( + client := NewDefaultClient( environment, clientID, clientSecret, username, password, securityToken, - ) + ).(*DefaultClient) - require.IsType(t, &Client{}, client) + require.IsType(t, &DefaultClient{}, client) require.Same(t, http.DefaultClient, client.httpClient) require.Equal(t, environment, client.environment) require.Equal(t, clientID, client.clientID) @@ -99,7 +99,7 @@ func TestClient_Authenticate(t *testing.T) { }, } - client := Client{ + client := DefaultClient{ httpClient: &hcMock, environment: environment, clientID: clientID, @@ -152,7 +152,7 @@ func TestClient_Authenticate(t *testing.T) { }, } - client := Client{ + client := DefaultClient{ httpClient: &hcMock, environment: environment, clientID: clientID, @@ -202,7 +202,7 @@ func TestClient_Authenticate(t *testing.T) { }, } - client := Client{ + client := DefaultClient{ httpClient: &hcMock, environment: environment, clientID: clientID, @@ -253,7 +253,7 @@ func TestClient_Authenticate(t *testing.T) { }, } - client := Client{ + client := DefaultClient{ httpClient: &hcMock, environment: environment, clientID: clientID, @@ -310,7 +310,7 @@ func TestClient_Authenticate(t *testing.T) { }, } - client := Client{ + client := DefaultClient{ httpClient: &hcMock, environment: environment, clientID: clientID, @@ -370,7 +370,7 @@ func TestClient_Authenticate(t *testing.T) { }, } - client := Client{ + client := DefaultClient{ httpClient: &hcMock, environment: environment, clientID: clientID, @@ -431,7 +431,7 @@ func TestClient_Authenticate(t *testing.T) { }, } - client := Client{ + client := DefaultClient{ httpClient: &hcMock, environment: environment, clientID: clientID, diff --git a/source/source.go b/source/source.go index 82d3a71..ec55b11 100644 --- a/source/source.go +++ b/source/source.go @@ -16,6 +16,7 @@ package source import ( "context" + "errors" "fmt" "strconv" "time" @@ -25,25 +26,31 @@ import ( "github.com/miquido/conduit-connector-salesforce/internal/cometd" "github.com/miquido/conduit-connector-salesforce/internal/cometd/responses" "github.com/miquido/conduit-connector-salesforce/internal/salesforce/oauth" + "gopkg.in/tomb.v2" ) const sfCometDVersion = "54.0" +var OAuthClientFactory = oauth.NewDefaultClient +var StreamingClientFactory = cometd.NewDefaultClient + +var ErrConnectorIsStopped = errors.New("connector is stopped") + type Source struct { sdk.UnimplementedSource config Config - streamingClient *cometd.Client + streamingClient cometd.Client subscriptions map[string]bool events chan responses.ConnectResponseEvent - errors chan error + tomb *tomb.Tomb } func NewSource() sdk.Source { return &Source{ subscriptions: make(map[string]bool), events: make(chan responses.ConnectResponseEvent), - errors: make(chan error), + tomb: nil, } } @@ -58,7 +65,7 @@ func (s *Source) Configure(_ context.Context, cfgRaw map[string]string) (err err func (s *Source) Open(ctx context.Context, _ sdk.Position) error { // Authenticate - oAuthClient := oauth.NewClient( + oAuthClient := OAuthClientFactory( s.config.Environment, s.config.ClientID, s.config.ClientSecret, @@ -73,7 +80,7 @@ func (s *Source) Open(ctx context.Context, _ sdk.Position) error { } // Streaming API client - s.streamingClient, err = cometd.NewClient( + s.streamingClient, err = StreamingClientFactory( fmt.Sprintf("%s/cometd/%s", token.InstanceURL, sfCometDVersion), token.AccessToken, ) @@ -103,14 +110,17 @@ func (s *Source) Open(ctx context.Context, _ sdk.Position) error { } // Start events worker - go func(ctx context.Context) { - s.eventsWorker(ctx) - }(ctx) + s.tomb = &tomb.Tomb{} + s.tomb.Go(s.eventsWorker) return nil } func (s *Source) Read(ctx context.Context) (sdk.Record, error) { + if s.tomb == nil { + return sdk.Record{}, ErrConnectorIsStopped + } + select { case event, ok := <-s.events: if !ok { @@ -158,7 +168,12 @@ func (s *Source) Read(ctx context.Context) (sdk.Record, error) { }, }, nil - case err := <-s.errors: + case <-s.tomb.Dead(): + err := s.tomb.Err() + if err == nil { + err = ErrConnectorIsStopped + } + return sdk.Record{}, err case <-ctx.Done(): @@ -173,96 +188,117 @@ func (s *Source) Ack(ctx context.Context, position sdk.Position) error { } func (s *Source) Teardown(ctx context.Context) error { - // Unsubscribe - for _, pushTopicName := range s.config.PushTopicsNames { - unsubscribeResponse, err := s.streamingClient.UnsubscribeToPushTopic(ctx, pushTopicName) - if err != nil { - sdk.Logger(ctx).Warn().Msgf("unsubscribe error: failed to unsubscribe %q topic: %s", pushTopicName, err) - } else if !unsubscribeResponse.Successful { - sdk.Logger(ctx).Warn().Msgf("unsubscribe error: failed to unsubscribe %q topic: %s", pushTopicName, unsubscribeResponse.Error) + var err error + + if s.tomb != nil { + s.tomb.Kill(ErrConnectorIsStopped) + + err = s.tomb.Wait() + + // Worker was properly closed + if errors.Is(err, ErrConnectorIsStopped) { + err = nil } } - // Disconnect - disconnectResponse, err := s.streamingClient.Disconnect(ctx) - if err != nil { - return fmt.Errorf("connector close error: disconnect error: %w", err) - } - if !disconnectResponse.Successful { - return fmt.Errorf("connector close error: disconnect error: %s", disconnectResponse.Error) + if s.streamingClient != nil { + // Unsubscribe + for _, pushTopicName := range s.config.PushTopicsNames { + unsubscribeResponse, err := s.streamingClient.UnsubscribeToPushTopic(ctx, pushTopicName) + if err != nil { + sdk.Logger(ctx).Warn().Msgf("unsubscribe error: failed to unsubscribe %q topic: %s", pushTopicName, err) + } else if !unsubscribeResponse.Successful { + sdk.Logger(ctx).Warn().Msgf("unsubscribe error: failed to unsubscribe %q topic: %s", pushTopicName, unsubscribeResponse.Error) + } + } + + // Disconnect + disconnectResponse, err := s.streamingClient.Disconnect(ctx) + if err != nil { + return fmt.Errorf("connector close error: disconnect error: %w", err) + } + if !disconnectResponse.Successful { + return fmt.Errorf("connector close error: disconnect error: %s", disconnectResponse.Error) + } + + // Close the streaming client + s.streamingClient = nil } + // Remove registered subscriptions and free the memory s.subscriptions = nil - s.streamingClient = nil - return nil + return err } // eventsWorker continuously queries for data updates from Salesforce -func (s *Source) eventsWorker(ctx context.Context) { +func (s *Source) eventsWorker() error { defer close(s.events) for { - // Receive event - connectResponse, err := s.streamingClient.Connect(ctx) - if err != nil { - s.errors <- fmt.Errorf("failed to receive event: %w", err) - - return - } + select { + case <-s.tomb.Dying(): + return s.tomb.Err() - // If not successful, check how to retry - if !connectResponse.Successful { - if nil == connectResponse.Advice { - s.errors <- fmt.Errorf("failed to receive event and no reconnection strategy provided by the server: %s", connectResponse.Error) + default: + ctx := s.tomb.Context(context.Background()) - return + // Receive event + connectResponse, err := s.streamingClient.Connect(ctx) + if err != nil { + return fmt.Errorf("failed to receive event: %w", err) } - switch connectResponse.Advice.Reconnect { - case responses.AdviceReconnectRetry: - // Check if request can be retried - if connectResponse.Advice.Interval < 0 { - s.errors <- fmt.Errorf("server disallowed for reconnect, stopping") - - return + // If not successful, check how to retry + if !connectResponse.Successful { + if nil == connectResponse.Advice { + return fmt.Errorf("failed to receive event and no reconnection strategy provided by the server: %s", connectResponse.Error) } - // Wait and retry - time.Sleep(time.Millisecond * time.Duration(connectResponse.Advice.Interval)) - - continue - - case responses.AdviceReconnectHandshake: - // Handshake and retry - if _, err := s.streamingClient.Handshake(ctx); err != nil { - s.errors <- fmt.Errorf("reconnect handshake error: %w", err) + switch connectResponse.Advice.Reconnect { + case responses.AdviceReconnectRetry: + // Check if request can be retried + if connectResponse.Advice.Interval < 0 { + return fmt.Errorf("server disallowed for reconnect, stopping") + } - return - } + // Wait and retry + time.Sleep(time.Millisecond * time.Duration(connectResponse.Advice.Interval)) - continue + continue - case responses.AdviceReconnectNone: - // Cannot retry - s.errors <- fmt.Errorf("server disallowed for reconnect, stopping") + case responses.AdviceReconnectHandshake: + // Handshake and retry + if _, err := s.streamingClient.Handshake(ctx); err != nil { + return fmt.Errorf("reconnect handshake error: %w", err) + } - return + continue - default: - // Unexpected, cannot retry - s.errors <- fmt.Errorf("unsupported reconnect advice: %s", connectResponse.Advice.Reconnect) + case responses.AdviceReconnectNone: + // Cannot retry + return fmt.Errorf("server disallowed for reconnect, stopping") - return + default: + // Unexpected, cannot retry + return fmt.Errorf("unsupported reconnect advice: %s", connectResponse.Advice.Reconnect) + } } - } - // If successful, send event - for _, event := range connectResponse.Events { - if _, exists := s.subscriptions[event.Channel]; exists { - s.events <- event - } else { - sdk.Logger(ctx).Debug().Msgf("Received event for unsupported channel: %s", event.Channel) + // If successful, send event + for _, event := range connectResponse.Events { + if _, exists := s.subscriptions[event.Channel]; exists { + // Send out the record if possible + select { + case s.events <- event: + // sdk.Record was sent successfully + + case <-s.tomb.Dying(): + return s.tomb.Err() + } + } else { + sdk.Logger(ctx).Debug().Msgf("Received event for unsupported channel: %s", event.Channel) + } } } } diff --git a/spec.go b/spec.go index 86dc1c5..f4ffed9 100644 --- a/spec.go +++ b/spec.go @@ -23,6 +23,7 @@ func Specification() sdk.Specification { return sdk.Specification{ Name: "salesforce", Summary: "A Salesforce source plugin for Conduit.", + Description: "The Conduit plugin supporting Salesforce source.", Version: "v0.1.0", Author: "Miquido", DestinationParams: map[string]sdk.Parameter{