Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: passing dependency for asyncdestinationmanager #5080

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"go.uber.org/mock/gomock"

"github.com/rudderlabs/rudder-go-kit/stats"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

Expand All @@ -21,6 +23,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
mocks_oauth "github.com/rudderlabs/rudder-server/mocks/services/oauth"
Expand Down Expand Up @@ -68,7 +71,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(&bingads_sdk.GetBulkUploadUrlResponse{
UploadUrl: "http://localhost/upload1",
RequestId: misc.FastUUID().URN(),
Expand Down Expand Up @@ -135,7 +138,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, fmt.Errorf("Error in getting bulk upload url"))
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, fmt.Errorf("Error in getting bulk upload url"))

Expand Down Expand Up @@ -188,7 +191,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
ClientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &ClientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &ClientI)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, fmt.Errorf("unable to get bulk upload url, check your credentials"))

bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, fmt.Errorf("unable to get bulk upload url, check your credentials"))
Expand Down Expand Up @@ -242,7 +245,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(&bingads_sdk.GetBulkUploadUrlResponse{
UploadUrl: "http://localhost/upload1",
RequestId: misc.FastUUID().URN(),
Expand Down Expand Up @@ -307,7 +310,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&bingads_sdk.GetBulkUploadStatusResponse{
PercentComplete: int64(100),
Expand All @@ -330,7 +333,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(nil, fmt.Errorf("failed to get bulk upload status:"))
pollInput := common.AsyncPoll{
Expand All @@ -349,7 +352,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&bingads_sdk.GetBulkUploadStatusResponse{
PercentComplete: int64(100),
Expand Down Expand Up @@ -378,7 +381,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&bingads_sdk.GetBulkUploadStatusResponse{
PercentComplete: int64(0),
Expand All @@ -405,7 +408,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&bingads_sdk.GetBulkUploadStatusResponse{
PercentComplete: int64(0),
Expand All @@ -432,7 +435,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId456").Return(&bingads_sdk.GetBulkUploadStatusResponse{
PercentComplete: int64(100),
Expand Down Expand Up @@ -478,7 +481,7 @@ var _ = Describe("Bing ads Audience", func() {
client := ts.Client()
modifiedURL := ts.URL // Use the test server URL
clientI := Client{client: client, URL: modifiedURL}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

UploadStatsInput := common.GetUploadStatsInput{
FailedJobURLs: modifiedURL,
Expand Down Expand Up @@ -528,7 +531,7 @@ var _ = Describe("Bing ads Audience", func() {
client := ts.Client()
modifiedURL := ts.URL // Use the test server URL
clientI := Client{client: client, URL: modifiedURL}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

UploadStatsInput := common.GetUploadStatsInput{
FailedJobURLs: modifiedURL,
Expand Down Expand Up @@ -582,7 +585,7 @@ var _ = Describe("Bing ads Audience", func() {
},
})

bingAdsUploader, err := newManagerInternal(&destination, oauthService, nil)
bingAdsUploader, err := newManagerInternal(logger.NOP, stats.NOP, &destination, oauthService, nil)
Expect(err).To(BeNil())
Expect(bingAdsUploader).ToNot(BeNil())
})
Expand All @@ -592,7 +595,7 @@ var _ = Describe("Bing ads Audience", func() {
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader("BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(&bingads_sdk.GetBulkUploadUrlResponse{
UploadUrl: "http://localhost/upload",
RequestId: misc.FastUUID().URN(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ import (
"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
router_utils "github.com/rudderlabs/rudder-server/router/utils"
"github.com/rudderlabs/rudder-server/utils/misc"
)

func NewBingAdsBulkUploader(destName string, service bingads.BulkServiceI, client *Client) *BingAdsBulkUploader {
func NewBingAdsBulkUploader(logger logger.Logger, statsFactory stats.Stats, destName string, service bingads.BulkServiceI, client *Client) *BingAdsBulkUploader {
return &BingAdsBulkUploader{
destName: destName,
service: service,
logger: logger.NewLogger().Child("batchRouter").Child("AsyncDestinationManager").Child("BingAds").Child("BingAdsBulkUploader"),
logger: logger.Child("BingAds").Child("BingAdsBulkUploader"),
statsFactory: statsFactory,
client: *client,
fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destName, 100*bytesize.MB),
eventsLimit: common.GetBatchRouterConfigInt64("MaxEventsLimit", destName, 4000000),
Expand Down Expand Up @@ -56,7 +58,7 @@ func (b *BingAdsBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStr
DestinationID: destination.ID,
}
}
uploadRetryableStat := stats.Default.NewTaggedStat("events_over_prescribed_limit", stats.CountType, map[string]string{
uploadRetryableStat := b.statsFactory.NewTaggedStat("events_over_prescribed_limit", stats.CountType, map[string]string{
"module": "batch_router",
"destType": b.destName,
})
Expand All @@ -74,7 +76,7 @@ func (b *BingAdsBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStr
continue
}

uploadTimeStat := stats.Default.NewTaggedStat("async_upload_time", stats.TimerType, map[string]string{
uploadTimeStat := b.statsFactory.NewTaggedStat("async_upload_time", stats.TimerType, map[string]string{
"module": "batch_router",
"destType": b.destName,
})
Expand Down Expand Up @@ -220,13 +222,13 @@ func (b *BingAdsBulkUploader) getUploadStatsOfSingleImport(filePath string) (com
},
}

eventsAbortedStat := stats.Default.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{
eventsAbortedStat := b.statsFactory.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{
"module": "batch_router",
"destType": b.destName,
})
eventsAbortedStat.Count(len(eventStatsResponse.Metadata.FailedKeys))

eventsSuccessStat := stats.Default.NewTaggedStat("success_job_count", stats.CountType, map[string]string{
eventsSuccessStat := b.statsFactory.NewTaggedStat("success_job_count", stats.CountType, map[string]string{
"module": "batch_router",
"destType": b.destName,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/bing-ads/common"
"github.com/rudderlabs/rudder-server/services/oauth"
oauthv2 "github.com/rudderlabs/rudder-server/services/oauth/v2"
)

func newManagerInternal(destination *backendconfig.DestinationT, oauthClient oauth.Authorizer, oauthClientV2 oauthv2.Authorizer) (*BingAdsBulkUploader, error) {
func newManagerInternal(logger logger.Logger, statsFactory stats.Stats, destination *backendconfig.DestinationT, oauthClient oauth.Authorizer, oauthClientV2 oauthv2.Authorizer) (*BingAdsBulkUploader, error) {
destConfig := DestinationConfig{}
jsonConfig, err := stdjson.Marshal(destination.Config)
if err != nil {
Expand Down Expand Up @@ -49,16 +50,22 @@
session := bingads.NewSession(sessionConfig)

clientNew := Client{}
bingUploader := NewBingAdsBulkUploader(destination.DestinationDefinition.Name, bingads.NewBulkService(session), &clientNew)
bingUploader := NewBingAdsBulkUploader(logger, statsFactory, destination.DestinationDefinition.Name, bingads.NewBulkService(session), &clientNew)
return bingUploader, nil
}

func NewManager(destination *backendconfig.DestinationT, backendConfig backendconfig.BackendConfig) (*BingAdsBulkUploader, error) {
func NewManager(
conf *config.Config,
logger logger.Logger,
statsFactory stats.Stats,
destination *backendconfig.DestinationT,
backendConfig backendconfig.BackendConfig,
) (*BingAdsBulkUploader, error) {

Check warning on line 63 in router/batchrouter/asyncdestinationmanager/bing-ads/audience/manager.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/bing-ads/audience/manager.go#L63

Added line #L63 was not covered by tests
oauthClient := oauth.NewOAuthErrorHandler(backendConfig)
oauthClientV2 := oauthv2.NewOAuthHandler(backendConfig,
oauthv2.WithLogger(logger.NewLogger().Child("BatchRouter")),
oauthv2.WithCPConnectorTimeout(config.GetDuration("HttpClient.oauth.timeout", 30, time.Second)),
oauthv2.WithStats(stats.Default),
oauthv2.WithLogger(logger),
oauthv2.WithCPConnectorTimeout(conf.GetDuration("HttpClient.oauth.timeout", 30, time.Second)),
oauthv2.WithStats(statsFactory),

Check warning on line 68 in router/batchrouter/asyncdestinationmanager/bing-ads/audience/manager.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/bing-ads/audience/manager.go#L66-L68

Added lines #L66 - L68 were not covered by tests
)
return newManagerInternal(destination, oauthClient, oauthClientV2)
return newManagerInternal(logger, statsFactory, destination, oauthClient, oauthClientV2)

Check warning on line 70 in router/batchrouter/asyncdestinationmanager/bing-ads/audience/manager.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/bing-ads/audience/manager.go#L70

Added line #L70 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"net/http"
"os"

bingads "github.com/rudderlabs/bing-ads-go-sdk/bingads"
"github.com/rudderlabs/bing-ads-go-sdk/bingads"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
)

type Client struct {
Expand All @@ -18,6 +19,7 @@ type BingAdsBulkUploader struct {
destName string
service bingads.BulkServiceI
logger logger.Logger
statsFactory stats.Stats
client Client
fileSizeLimit int64
eventsLimit int64
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/utils/misc"
)

Expand Down Expand Up @@ -161,7 +162,7 @@ func (b *BingAdsBulkUploader) createZipFile(filePath, audienceId string) ([]*Act
return nil, err
}

payloadSizeStat := stats.Default.NewTaggedStat("payload_size", stats.HistogramType,
payloadSizeStat := b.statsFactory.NewTaggedStat("payload_size", stats.HistogramType,
map[string]string{
"module": "batch_router",
"destType": b.destName,
Expand Down
Loading
Loading