From 44c15011848edbfcd3eb8ca813c7200cfe1c2083 Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 23 May 2024 15:14:00 -0500 Subject: [PATCH 01/22] initial --- go.mod | 5 +++++ go.sum | 10 ++++++++++ premium/usage.go | 31 ++++++++++++++++++++++++++++++- 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index dc97a9341c..8b3cde7371 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.21.5 require ( github.com/apache/arrow/go/v15 v15.0.0-20240114144300-7e703aae55c1 + github.com/aws/aws-sdk-go-v2 v1.26.1 + github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.4 github.com/bradleyjkemp/cupaloy/v2 v2.8.0 github.com/cloudquery/cloudquery-api-go v1.9.1 github.com/cloudquery/plugin-pb-go v1.19.8 @@ -40,6 +42,9 @@ require ( github.com/andybalholm/brotli v1.1.0 // indirect github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect + github.com/aws/smithy-go v1.20.2 // indirect github.com/aymerick/douceur v0.2.0 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect diff --git a/go.sum b/go.sum index 515861a765..67a4156124 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,16 @@ github.com/apache/arrow/go/v15 v15.0.0-20240114144300-7e703aae55c1 h1:T1NToVz08r github.com/apache/arrow/go/v15 v15.0.0-20240114144300-7e703aae55c1/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= +github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= +github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.4 h1:zKfueuQerw3RCopW6KbTkoTuD3W/t9e898UYqyssopw= +github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.4/go.mod h1:Q01yJLephuOzv6IYzcknrpVAriOqB66+qtGnpqgw9UE= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= diff --git a/premium/usage.go b/premium/usage.go index 3a7b6e4169..a3006733ca 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -10,6 +10,9 @@ import ( "sync/atomic" "time" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/marketplacemetering" + "github.com/aws/aws-sdk-go-v2/service/marketplacemetering/types" cqapi "github.com/cloudquery/cloudquery-api-go" "github.com/cloudquery/cloudquery-api-go/auth" "github.com/cloudquery/cloudquery-api-go/config" @@ -131,6 +134,8 @@ type BatchUpdater struct { apiClient *cqapi.ClientWithResponses tokenClient TokenClient + awsMarketPlaceClient *marketplacemetering.Client + // Plugin details teamName cqapi.TeamName pluginMeta plugin.Meta @@ -177,6 +182,9 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e TeamNameValue: u.teamName, }, nil } + if os.Getenv("CQ_AWS_MARKETPLACE") == "true" { + u.awsMarketPlaceClient = marketplacemetering.New(marketplacemetering.Options{}) + } if u.tokenClient == nil { u.tokenClient = auth.NewTokenClient() @@ -325,7 +333,28 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numbe for retry := 0; retry < u.maxRetries; retry++ { u.logger.Debug().Str("url", u.url).Int("try", retry).Int("max_retries", u.maxRetries).Uint32("rows", numberToUpdate).Msg("updating usage") queryStartTime := time.Now() - + if u.awsMarketPlaceClient != nil { + _, err := u.awsMarketPlaceClient.MeterUsage(ctx, &marketplacemetering.MeterUsageInput{ + ProductCode: aws.String("cloudquery"), + Timestamp: aws.Time(time.Now()), + UsageDimension: aws.String("rows"), + UsageAllocations: []types.UsageAllocation{ + { + AllocatedUsageQuantity: aws.Int32(int32(numberToUpdate)), + Tags: []types.Tag{ + { + Key: aws.String("plugin"), + Value: aws.String(fmt.Sprintf("%s/%s/%s", u.pluginMeta.Team, u.pluginMeta.Kind, u.pluginMeta.Name)), + }, + }, + }, + }, + UsageQuantity: aws.Int32(int32(numberToUpdate)), + }) + if err != nil { + return fmt.Errorf("failed to update usage with : %w", err) + } + } resp, err := u.apiClient.IncreaseTeamPluginUsageWithResponse(ctx, u.teamName, cqapi.IncreaseTeamPluginUsageJSONRequestBody{ RequestId: uuid.New(), PluginTeam: u.pluginMeta.Team, From 38ba207bc1681b1bb4c515deb77f870481b9508e Mon Sep 17 00:00:00 2001 From: bbernays Date: Fri, 7 Jun 2024 08:19:34 -0500 Subject: [PATCH 02/22] Update go.mod --- go.mod | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.mod b/go.mod index 32520695e7..0cc17ea69f 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/cloudquery/plugin-sdk/v4 go 1.21.5 +replace github.com/cloudquery/plugin-sdk/v4 => ../../../../plugin-sdk + require ( github.com/apache/arrow/go/v16 v16.1.0 github.com/aws/aws-sdk-go-v2 v1.27.1 From 284759f332292d5fbde08bc680a47ad08c3f4379 Mon Sep 17 00:00:00 2001 From: bbernays Date: Fri, 7 Jun 2024 08:19:36 -0500 Subject: [PATCH 03/22] Update go.sum --- go.sum | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/go.sum b/go.sum index 280f21b343..4d2a3b556f 100644 --- a/go.sum +++ b/go.sum @@ -23,14 +23,14 @@ github.com/apache/arrow/go/v16 v16.1.0 h1:dwgfOya6s03CzH9JrjCBx6bkVb4yPD4ma3haj9 github.com/apache/arrow/go/v16 v16.1.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= -github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= -github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= -github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.4 h1:zKfueuQerw3RCopW6KbTkoTuD3W/t9e898UYqyssopw= -github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.4/go.mod h1:Q01yJLephuOzv6IYzcknrpVAriOqB66+qtGnpqgw9UE= +github.com/aws/aws-sdk-go-v2 v1.27.1 h1:xypCL2owhog46iFxBKKpBcw+bPTX/RJzwNj8uSilENw= +github.com/aws/aws-sdk-go-v2 v1.27.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.8 h1:RnLB7p6aaFMRfyQkD6ckxR7myCC9SABIqSz4czYUUbU= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.8/go.mod h1:XH7dQJd+56wEbP1I4e4Duo+QhSMxNArE8VP7NuUOTeM= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.8 h1:jzApk2f58L9yW9q1GEab3BMMFWUkkiZhyrRUtbwUbKU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.8/go.mod h1:WqO+FftfO3tGePUtQxPXM6iODVfqMwsVMgTbG/ZXIdQ= +github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.9 h1:1Y6NAaK/9Wjxb4VAlUOMUQuTrOKNXfLrhaSzNdLKkwo= +github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.9/go.mod h1:KpgKQ+ZF5kssRykR4yxANtddzYFatCqlv8yHddRHne4= github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= From effb581e1008e6acd6bd2373a4ba69edb215663e Mon Sep 17 00:00:00 2001 From: bbernays Date: Fri, 7 Jun 2024 08:19:38 -0500 Subject: [PATCH 04/22] Update usage.go --- premium/usage.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 9492387378..4f3eaf2b8c 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -429,12 +429,12 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, rows queryStartTime := time.Now() if u.awsMarketPlaceClient != nil { _, err := u.awsMarketPlaceClient.MeterUsage(ctx, &marketplacemetering.MeterUsageInput{ - ProductCode: aws.String("cloudquery"), + ProductCode: aws.String("3r9d4ty0j8bloz3r1p4o0r9q3"), Timestamp: aws.Time(time.Now()), UsageDimension: aws.String("rows"), UsageAllocations: []types.UsageAllocation{ { - AllocatedUsageQuantity: aws.Int32(int32(numberToUpdate)), + AllocatedUsageQuantity: aws.Int32(int32(rows)), Tags: []types.Tag{ { Key: aws.String("plugin"), @@ -443,14 +443,13 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, rows }, }, }, - UsageQuantity: aws.Int32(int32(numberToUpdate)), + UsageQuantity: aws.Int32(int32(rows)), }) if err != nil { return fmt.Errorf("failed to update usage with : %w", err) } + return nil } - resp, err := u.apiClient.IncreaseTeamPluginUsageWithResponse(ctx, u.teamName, cqapi.IncreaseTeamPluginUsageJSONRequestBody{ - payload := cqapi.IncreaseTeamPluginUsageJSONRequestBody{ RequestId: uuid.New(), PluginTeam: u.pluginMeta.Team, From f18f71ff6cf8b3e839c7e6f82a79d0b5627cb78e Mon Sep 17 00:00:00 2001 From: bbernays Date: Fri, 21 Jun 2024 14:49:44 -0500 Subject: [PATCH 05/22] mod tidy --- go.mod | 15 ++++++++++++--- go.sum | 30 ++++++++++++++++++++++++------ 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index bccdad9715..145197002d 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,8 @@ replace github.com/cloudquery/plugin-sdk/v4 => ../../../../plugin-sdk require ( github.com/apache/arrow/go/v16 v16.1.0 - github.com/aws/aws-sdk-go-v2 v1.27.1 + github.com/aws/aws-sdk-go-v2 v1.30.0 + github.com/aws/aws-sdk-go-v2/config v1.27.21 github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.9 github.com/bradleyjkemp/cupaloy/v2 v2.8.0 github.com/cloudquery/cloudquery-api-go v1.11.3 @@ -43,8 +44,16 @@ require ( github.com/andybalholm/brotli v1.1.0 // indirect github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.8 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.8 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.21 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.8 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.14 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.21.1 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.25.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.29.1 // indirect github.com/aws/smithy-go v1.20.2 // indirect github.com/aymerick/douceur v0.2.0 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect diff --git a/go.sum b/go.sum index f78db71139..6bbc034c2a 100644 --- a/go.sum +++ b/go.sum @@ -23,14 +23,32 @@ github.com/apache/arrow/go/v16 v16.1.0 h1:dwgfOya6s03CzH9JrjCBx6bkVb4yPD4ma3haj9 github.com/apache/arrow/go/v16 v16.1.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= -github.com/aws/aws-sdk-go-v2 v1.27.1 h1:xypCL2owhog46iFxBKKpBcw+bPTX/RJzwNj8uSilENw= -github.com/aws/aws-sdk-go-v2 v1.27.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.8 h1:RnLB7p6aaFMRfyQkD6ckxR7myCC9SABIqSz4czYUUbU= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.8/go.mod h1:XH7dQJd+56wEbP1I4e4Duo+QhSMxNArE8VP7NuUOTeM= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.8 h1:jzApk2f58L9yW9q1GEab3BMMFWUkkiZhyrRUtbwUbKU= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.8/go.mod h1:WqO+FftfO3tGePUtQxPXM6iODVfqMwsVMgTbG/ZXIdQ= +github.com/aws/aws-sdk-go-v2 v1.30.0 h1:6qAwtzlfcTtcL8NHtbDQAqgM5s6NDipQTkPxyH/6kAA= +github.com/aws/aws-sdk-go-v2 v1.30.0/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/config v1.27.21 h1:yPX3pjGCe2hJsetlmGNB4Mngu7UPmvWPzzWCv1+boeM= +github.com/aws/aws-sdk-go-v2/config v1.27.21/go.mod h1:4XtlEU6DzNai8RMbjSF5MgGZtYvrhBP/aKZcRtZAVdM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.21 h1:pjAqgzfgFhTv5grc7xPHtXCAaMapzmwA7aU+c/SZQGw= +github.com/aws/aws-sdk-go-v2/credentials v1.17.21/go.mod h1:nhK6PtBlfHTUDVmBLr1dg+WHCOCK+1Fu/WQyVHPsgNQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.8 h1:FR+oWPFb/8qMVYMWN98bUZAGqPvLHiyqg1wqQGfUAXY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.8/go.mod h1:EgSKcHiuuakEIxJcKGzVNWh5srVAQ3jKaSrBGRYvM48= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.12 h1:SJ04WXGTwnHlWIODtC5kJzKbeuHt+OUNOgKg7nfnUGw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.12/go.mod h1:FkpvXhA92gb3GE9LD6Og0pHHycTxW7xGpnEh5E7Opwo= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.12 h1:hb5KgeYfObi5MHkSSZMEudnIvX30iB+E21evI4r6BnQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.12/go.mod h1:CroKe/eWJdyfy9Vx4rljP5wTUjNJfb+fPz1uMYUhEGM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.14 h1:zSDPny/pVnkqABXYRicYuPf9z2bTqfH13HT3v6UheIk= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.14/go.mod h1:3TTcI5JSzda1nw/pkVC9dhgLre0SNBFj2lYS4GctXKI= github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.9 h1:1Y6NAaK/9Wjxb4VAlUOMUQuTrOKNXfLrhaSzNdLKkwo= github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.9/go.mod h1:KpgKQ+ZF5kssRykR4yxANtddzYFatCqlv8yHddRHne4= +github.com/aws/aws-sdk-go-v2/service/sso v1.21.1 h1:sd0BsnAvLH8gsp2e3cbaIr+9D7T1xugueQ7V/zUAsS4= +github.com/aws/aws-sdk-go-v2/service/sso v1.21.1/go.mod h1:lcQG/MmxydijbeTOp04hIuJwXGWPZGI3bwdFDGRTv14= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.25.1 h1:1uEFNNskK/I1KoZ9Q8wJxMz5V9jyBlsiaNrM7vA3YUQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.25.1/go.mod h1:z0P8K+cBIsFXUr5rzo/psUeJ20XjPN0+Nn8067Nd+E4= +github.com/aws/aws-sdk-go-v2/service/sts v1.29.1 h1:myX5CxqXE0QMZNja6FA1/FSE3Vu1rVmeUmpJMMzeZg0= +github.com/aws/aws-sdk-go-v2/service/sts v1.29.1/go.mod h1:N2mQiucsO0VwK9CYuS4/c2n6Smeh1v47Rz3dWCPFLdE= github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= From 5732cf9e52f1f877d1f1446ecdf105815c1bfde2 Mon Sep 17 00:00:00 2001 From: bbernays Date: Fri, 21 Jun 2024 14:50:06 -0500 Subject: [PATCH 06/22] Update usage.go --- premium/usage.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/premium/usage.go b/premium/usage.go index 4f3eaf2b8c..60c778d391 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -12,6 +12,7 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" + awsConfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/marketplacemetering" "github.com/aws/aws-sdk-go-v2/service/marketplacemetering/types" cqapi "github.com/cloudquery/cloudquery-api-go" @@ -206,7 +207,15 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e }, nil } if os.Getenv("CQ_AWS_MARKETPLACE") == "true" { - u.awsMarketPlaceClient = marketplacemetering.New(marketplacemetering.Options{}) + cfg, err := awsConfig.LoadDefaultConfig(context.TODO()) + if err != nil { + return nil, fmt.Errorf("failed to load AWS config: %w", err) + } + + u.awsMarketPlaceClient = marketplacemetering.NewFromConfig(cfg) + u.teamName = "AWS_MARKETPLACE" + u.backgroundUpdater() + return u, nil } if u.tokenClient == nil { @@ -315,6 +324,9 @@ func (u *BatchUpdater) TeamName() string { } func (u *BatchUpdater) HasQuota(ctx context.Context) (bool, error) { + if u.awsMarketPlaceClient != nil { + return true, nil + } u.logger.Debug().Str("url", u.url).Str("team", u.teamName).Str("pluginTeam", u.pluginMeta.Team).Str("pluginKind", string(u.pluginMeta.Kind)).Str("pluginName", u.pluginMeta.Name).Msg("checking quota") usage, err := u.apiClient.GetTeamPluginUsageWithResponse(ctx, u.teamName, u.pluginMeta.Team, u.pluginMeta.Kind, u.pluginMeta.Name) if err != nil { From e1bfb0395322cb13cad3d3ea0e27c7aeae93fdba Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 24 Jun 2024 15:41:38 -0500 Subject: [PATCH 07/22] Update usage.go --- premium/usage.go | 1 + 1 file changed, 1 insertion(+) diff --git a/premium/usage.go b/premium/usage.go index 60c778d391..77bc032d14 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -214,6 +214,7 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e u.awsMarketPlaceClient = marketplacemetering.NewFromConfig(cfg) u.teamName = "AWS_MARKETPLACE" + u.batchLimit = 1000000000 u.backgroundUpdater() return u, nil } From 102aa070d54510fa147905f71e4e17c23836e717 Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 24 Jun 2024 15:46:42 -0500 Subject: [PATCH 08/22] go mod --- examples/simple_plugin/go.mod | 14 ++++++++++++++ examples/simple_plugin/go.sum | 28 ++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/examples/simple_plugin/go.mod b/examples/simple_plugin/go.mod index cdd5242cd0..c68cb55ec0 100644 --- a/examples/simple_plugin/go.mod +++ b/examples/simple_plugin/go.mod @@ -18,6 +18,20 @@ require ( github.com/andybalholm/brotli v1.1.0 // indirect github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.30.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.27.21 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.21 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.8 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.14 // indirect + github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.9 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.21.1 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.25.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.29.1 // indirect + github.com/aws/smithy-go v1.20.2 // indirect github.com/aymerick/douceur v0.2.0 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect diff --git a/examples/simple_plugin/go.sum b/examples/simple_plugin/go.sum index cba571b3d2..301f535af6 100644 --- a/examples/simple_plugin/go.sum +++ b/examples/simple_plugin/go.sum @@ -23,6 +23,34 @@ github.com/apache/arrow/go/v16 v16.1.0 h1:dwgfOya6s03CzH9JrjCBx6bkVb4yPD4ma3haj9 github.com/apache/arrow/go/v16 v16.1.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/aws/aws-sdk-go-v2 v1.30.0 h1:6qAwtzlfcTtcL8NHtbDQAqgM5s6NDipQTkPxyH/6kAA= +github.com/aws/aws-sdk-go-v2 v1.30.0/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/config v1.27.21 h1:yPX3pjGCe2hJsetlmGNB4Mngu7UPmvWPzzWCv1+boeM= +github.com/aws/aws-sdk-go-v2/config v1.27.21/go.mod h1:4XtlEU6DzNai8RMbjSF5MgGZtYvrhBP/aKZcRtZAVdM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.21 h1:pjAqgzfgFhTv5grc7xPHtXCAaMapzmwA7aU+c/SZQGw= +github.com/aws/aws-sdk-go-v2/credentials v1.17.21/go.mod h1:nhK6PtBlfHTUDVmBLr1dg+WHCOCK+1Fu/WQyVHPsgNQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.8 h1:FR+oWPFb/8qMVYMWN98bUZAGqPvLHiyqg1wqQGfUAXY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.8/go.mod h1:EgSKcHiuuakEIxJcKGzVNWh5srVAQ3jKaSrBGRYvM48= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.12 h1:SJ04WXGTwnHlWIODtC5kJzKbeuHt+OUNOgKg7nfnUGw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.12/go.mod h1:FkpvXhA92gb3GE9LD6Og0pHHycTxW7xGpnEh5E7Opwo= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.12 h1:hb5KgeYfObi5MHkSSZMEudnIvX30iB+E21evI4r6BnQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.12/go.mod h1:CroKe/eWJdyfy9Vx4rljP5wTUjNJfb+fPz1uMYUhEGM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.14 h1:zSDPny/pVnkqABXYRicYuPf9z2bTqfH13HT3v6UheIk= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.14/go.mod h1:3TTcI5JSzda1nw/pkVC9dhgLre0SNBFj2lYS4GctXKI= +github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.9 h1:1Y6NAaK/9Wjxb4VAlUOMUQuTrOKNXfLrhaSzNdLKkwo= +github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.21.9/go.mod h1:KpgKQ+ZF5kssRykR4yxANtddzYFatCqlv8yHddRHne4= +github.com/aws/aws-sdk-go-v2/service/sso v1.21.1 h1:sd0BsnAvLH8gsp2e3cbaIr+9D7T1xugueQ7V/zUAsS4= +github.com/aws/aws-sdk-go-v2/service/sso v1.21.1/go.mod h1:lcQG/MmxydijbeTOp04hIuJwXGWPZGI3bwdFDGRTv14= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.25.1 h1:1uEFNNskK/I1KoZ9Q8wJxMz5V9jyBlsiaNrM7vA3YUQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.25.1/go.mod h1:z0P8K+cBIsFXUr5rzo/psUeJ20XjPN0+Nn8067Nd+E4= +github.com/aws/aws-sdk-go-v2/service/sts v1.29.1 h1:myX5CxqXE0QMZNja6FA1/FSE3Vu1rVmeUmpJMMzeZg0= +github.com/aws/aws-sdk-go-v2/service/sts v1.29.1/go.mod h1:N2mQiucsO0VwK9CYuS4/c2n6Smeh1v47Rz3dWCPFLdE= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= From 310240c9bcb60bfa4dd296648a45a3e3df9c8631 Mon Sep 17 00:00:00 2001 From: bbernays Date: Mon, 24 Jun 2024 15:57:16 -0500 Subject: [PATCH 09/22] Update usage.go --- premium/usage.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/premium/usage.go b/premium/usage.go index 77bc032d14..c2b30b6ac3 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -206,6 +206,7 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e TeamNameValue: u.teamName, }, nil } + // If user wants to use the AWS Marketplace for billing, don't even try to communicate with CQ API if os.Getenv("CQ_AWS_MARKETPLACE") == "true" { cfg, err := awsConfig.LoadDefaultConfig(context.TODO()) if err != nil { @@ -214,6 +215,7 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e u.awsMarketPlaceClient = marketplacemetering.NewFromConfig(cfg) u.teamName = "AWS_MARKETPLACE" + // This needs to be larger than normal, because we can only send a single usage record per second (from each compute node) u.batchLimit = 1000000000 u.backgroundUpdater() return u, nil @@ -440,8 +442,15 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, rows for retry := 0; retry < u.maxRetries; retry++ { u.logger.Debug().Str("url", u.url).Int("try", retry).Int("max_retries", u.maxRetries).Uint32("rows", rows).Msg("updating usage") queryStartTime := time.Now() + + // If the AWS Marketplace client is set, use it to track usage if u.awsMarketPlaceClient != nil { + // Timestamp + UsageDimension + UsageQuantity are required fields and must be unique + // since Timestamp only maintains a granularity of seconds, we need to ensure our batch size is large enough _, err := u.awsMarketPlaceClient.MeterUsage(ctx, &marketplacemetering.MeterUsageInput{ + // Product code is a unique identifier for a product in AWS Marketplace + // Each product is given a unique product code when it is listed in AWS Marketplace + // in the future we can have multiple product codes for container or AMI based listings ProductCode: aws.String("3r9d4ty0j8bloz3r1p4o0r9q3"), Timestamp: aws.Time(time.Now()), UsageDimension: aws.String("rows"), From 4aa4422184fb6d03d2e40da61324da3361d8090e Mon Sep 17 00:00:00 2001 From: bbernays Date: Wed, 3 Jul 2024 14:51:47 -0500 Subject: [PATCH 10/22] Update usage.go --- premium/usage.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index c2b30b6ac3..2a4fccf248 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -207,7 +207,7 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e }, nil } // If user wants to use the AWS Marketplace for billing, don't even try to communicate with CQ API - if os.Getenv("CQ_AWS_MARKETPLACE") == "true" { + if isAWSMarketplace() { cfg, err := awsConfig.LoadDefaultConfig(context.TODO()) if err != nil { return nil, fmt.Errorf("failed to load AWS config: %w", err) @@ -260,6 +260,21 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e return u, nil } +func isAWSMarketplace() bool { + return os.Getenv("CQ_AWS_MARKETPLACE_CONTAINER") == "true" || os.Getenv("CQ_AWS_MARKETPLACE_AMI") == "true" +} + +func awsMarketplaceProductCode() string { + if os.Getenv("CQ_AWS_MARKETPLACE_CONTAINER") == "true" { + return "PLACEHOLDER" + } + if os.Getenv("CQ_AWS_MARKETPLACE_AMI") == "true" { + // TODO: This is a placeholder product code, we need to get the real one from AWS Marketplace + return "PLACEHOLDER" + } + return "" +} + func (u *BatchUpdater) Increase(rows uint32) error { if u.usageIncreaseMethod == UsageIncreaseMethodBreakdown { return fmt.Errorf("mixing usage increase methods is not allowed, use IncreaseForTable instead") @@ -451,7 +466,7 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, rows // Product code is a unique identifier for a product in AWS Marketplace // Each product is given a unique product code when it is listed in AWS Marketplace // in the future we can have multiple product codes for container or AMI based listings - ProductCode: aws.String("3r9d4ty0j8bloz3r1p4o0r9q3"), + ProductCode: aws.String(awsMarketplaceProductCode()), Timestamp: aws.Time(time.Now()), UsageDimension: aws.String("rows"), UsageAllocations: []types.UsageAllocation{ From 9ea3d5bc75dd4b31c4ce9b8ff6e3a2b104a5e6c9 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 16 Jul 2024 09:10:26 -0500 Subject: [PATCH 11/22] Update usage.go --- premium/usage.go | 88 ++++++++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 36 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 2a4fccf248..1febf3f96f 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -151,7 +151,7 @@ type BatchUpdater struct { apiClient *cqapi.ClientWithResponses tokenClient TokenClient - awsMarketPlaceClient *marketplacemetering.Client + awsMarketplaceClient *marketplacemetering.Client // Plugin details teamName cqapi.TeamName @@ -213,7 +213,7 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e return nil, fmt.Errorf("failed to load AWS config: %w", err) } - u.awsMarketPlaceClient = marketplacemetering.NewFromConfig(cfg) + u.awsMarketplaceClient = marketplacemetering.NewFromConfig(cfg) u.teamName = "AWS_MARKETPLACE" // This needs to be larger than normal, because we can only send a single usage record per second (from each compute node) u.batchLimit = 1000000000 @@ -261,16 +261,12 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e } func isAWSMarketplace() bool { - return os.Getenv("CQ_AWS_MARKETPLACE_CONTAINER") == "true" || os.Getenv("CQ_AWS_MARKETPLACE_AMI") == "true" + return os.Getenv("CQ_AWS_MARKETPLACE_CONTAINER") == "true" } func awsMarketplaceProductCode() string { if os.Getenv("CQ_AWS_MARKETPLACE_CONTAINER") == "true" { - return "PLACEHOLDER" - } - if os.Getenv("CQ_AWS_MARKETPLACE_AMI") == "true" { - // TODO: This is a placeholder product code, we need to get the real one from AWS Marketplace - return "PLACEHOLDER" + return "2a8bdkarwqrp0tmo4errl65s7" } return "" } @@ -342,7 +338,7 @@ func (u *BatchUpdater) TeamName() string { } func (u *BatchUpdater) HasQuota(ctx context.Context) (bool, error) { - if u.awsMarketPlaceClient != nil { + if u.awsMarketplaceClient != nil { return true, nil } u.logger.Debug().Str("url", u.url).Str("team", u.teamName).Str("pluginTeam", u.pluginMeta.Team).Str("pluginKind", string(u.pluginMeta.Kind)).Str("pluginName", u.pluginMeta.Name).Msg("checking quota") @@ -453,39 +449,59 @@ func (u *BatchUpdater) backgroundUpdater() { <-started } +func (u *BatchUpdater) reportUsageToAWSMarketplace(ctx context.Context, rows uint32, tables []cqapi.UsageIncreaseTablesInner) error { + // Timestamp + UsageDimension + UsageQuantity are required fields and must be unique + // since Timestamp only maintains a granularity of seconds, we need to ensure our batch size is large enough + + usage := make([]types.UsageAllocation, len(tables)) + for i, table := range tables { + usage[i] = types.UsageAllocation{ + AllocatedUsageQuantity: aws.Int32(int32(table.Rows)), + Tags: []types.Tag{ + { + Key: aws.String("plugin_name"), + Value: aws.String(u.pluginMeta.Name), + }, + { + Key: aws.String("plugin_team"), + Value: aws.String(u.pluginMeta.Team), + }, + { + Key: aws.String("plugin_kind"), + Value: aws.String(string(u.pluginMeta.Kind)), + }, + { + Key: aws.String("table_name"), + Value: aws.String(table.Name), + }, + }, + } + } + + _, err := u.awsMarketplaceClient.MeterUsage(ctx, &marketplacemetering.MeterUsageInput{ + // Product code is a unique identifier for a product in AWS Marketplace + // Each product is given a unique product code when it is listed in AWS Marketplace + // in the future we can have multiple product codes for container or AMI based listings + ProductCode: aws.String(awsMarketplaceProductCode()), + Timestamp: aws.Time(time.Now()), + UsageDimension: aws.String("rows"), + UsageAllocations: usage, + UsageQuantity: aws.Int32(int32(rows)), + }) + if err != nil { + return fmt.Errorf("failed to update usage with : %w", err) + } + return nil +} + func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, rows uint32, tables []cqapi.UsageIncreaseTablesInner) error { for retry := 0; retry < u.maxRetries; retry++ { u.logger.Debug().Str("url", u.url).Int("try", retry).Int("max_retries", u.maxRetries).Uint32("rows", rows).Msg("updating usage") queryStartTime := time.Now() // If the AWS Marketplace client is set, use it to track usage - if u.awsMarketPlaceClient != nil { - // Timestamp + UsageDimension + UsageQuantity are required fields and must be unique - // since Timestamp only maintains a granularity of seconds, we need to ensure our batch size is large enough - _, err := u.awsMarketPlaceClient.MeterUsage(ctx, &marketplacemetering.MeterUsageInput{ - // Product code is a unique identifier for a product in AWS Marketplace - // Each product is given a unique product code when it is listed in AWS Marketplace - // in the future we can have multiple product codes for container or AMI based listings - ProductCode: aws.String(awsMarketplaceProductCode()), - Timestamp: aws.Time(time.Now()), - UsageDimension: aws.String("rows"), - UsageAllocations: []types.UsageAllocation{ - { - AllocatedUsageQuantity: aws.Int32(int32(rows)), - Tags: []types.Tag{ - { - Key: aws.String("plugin"), - Value: aws.String(fmt.Sprintf("%s/%s/%s", u.pluginMeta.Team, u.pluginMeta.Kind, u.pluginMeta.Name)), - }, - }, - }, - }, - UsageQuantity: aws.Int32(int32(rows)), - }) - if err != nil { - return fmt.Errorf("failed to update usage with : %w", err) - } - return nil + if u.awsMarketplaceClient != nil { + return u.reportUsageToAWSMarketplace(ctx, rows, tables) } payload := cqapi.IncreaseTeamPluginUsageJSONRequestBody{ RequestId: uuid.New(), From 566bbc0d05f514a8cbec96313c6783835e209da1 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 16 Jul 2024 09:22:36 -0500 Subject: [PATCH 12/22] Update usage.go --- premium/usage.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 1febf3f96f..b09dfdff96 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -208,17 +208,8 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e } // If user wants to use the AWS Marketplace for billing, don't even try to communicate with CQ API if isAWSMarketplace() { - cfg, err := awsConfig.LoadDefaultConfig(context.TODO()) - if err != nil { - return nil, fmt.Errorf("failed to load AWS config: %w", err) - } - - u.awsMarketplaceClient = marketplacemetering.NewFromConfig(cfg) - u.teamName = "AWS_MARKETPLACE" - // This needs to be larger than normal, because we can only send a single usage record per second (from each compute node) - u.batchLimit = 1000000000 - u.backgroundUpdater() - return u, nil + err := u.setupAWSMarketplace() + return u, err } if u.tokenClient == nil { @@ -260,6 +251,20 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e return u, nil } +func (u *BatchUpdater) setupAWSMarketplace() error { + cfg, err := awsConfig.LoadDefaultConfig(context.TODO()) + if err != nil { + return fmt.Errorf("failed to load AWS config: %w", err) + } + + u.awsMarketplaceClient = marketplacemetering.NewFromConfig(cfg) + u.teamName = "AWS_MARKETPLACE" + // This needs to be larger than normal, because we can only send a single usage record per second (from each compute node) + u.batchLimit = 1000000000 + u.backgroundUpdater() + return nil +} + func isAWSMarketplace() bool { return os.Getenv("CQ_AWS_MARKETPLACE_CONTAINER") == "true" } From d5ab9375d52f0c2488b7211c118ee95718cfee80 Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 1 Aug 2024 23:11:03 -0500 Subject: [PATCH 13/22] Update usage.go --- premium/usage.go | 126 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 94 insertions(+), 32 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index b09dfdff96..2765997979 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -45,6 +45,11 @@ const ( MaximumUpdateIntervalHeader = "x-cq-maximum-update-interval" ) +//go:generate mockgen -package=mocks -destination=../premium/mocks/marketplacemetering.go -source=usage.go AWSMarketplaceClientInterface +type AWSMarketplaceClientInterface interface { + MeterUsage(ctx context.Context, params *marketplacemetering.MeterUsageInput, optFns ...func(*marketplacemetering.Options)) (*marketplacemetering.MeterUsageOutput, error) +} + type TokenClient interface { GetToken() (auth.Token, error) GetTokenType() auth.TokenType @@ -133,6 +138,13 @@ func WithAPIClient(apiClient *cqapi.ClientWithResponses) UsageClientOptions { } } +// WithAWSMarketplaceClient sets the AWS Marketplace client to use - defaults to marketplacemetering.NewFromConfig() +func WithAWSMarketplaceClient(awsMarketplaceClient AWSMarketplaceClientInterface) UsageClientOptions { + return func(updater *BatchUpdater) { + updater.awsMarketplaceClient = awsMarketplaceClient + } +} + // withTokenClient sets the token client to use - defaults to auth.NewTokenClient(). Used in tests to mock the token client func withTokenClient(tokenClient TokenClient) UsageClientOptions { return func(updater *BatchUpdater) { @@ -151,7 +163,7 @@ type BatchUpdater struct { apiClient *cqapi.ClientWithResponses tokenClient TokenClient - awsMarketplaceClient *marketplacemetering.Client + awsMarketplaceClient AWSMarketplaceClientInterface // Plugin details teamName cqapi.TeamName @@ -256,11 +268,15 @@ func (u *BatchUpdater) setupAWSMarketplace() error { if err != nil { return fmt.Errorf("failed to load AWS config: %w", err) } - - u.awsMarketplaceClient = marketplacemetering.NewFromConfig(cfg) + // This allows us to be able to inject a mock client for testing + if u.awsMarketplaceClient == nil { + u.awsMarketplaceClient = marketplacemetering.NewFromConfig(cfg) + } u.teamName = "AWS_MARKETPLACE" // This needs to be larger than normal, because we can only send a single usage record per second (from each compute node) u.batchLimit = 1000000000 + + u.minTimeBetweenFlushes = 1 * time.Minute u.backgroundUpdater() return nil } @@ -381,9 +397,28 @@ func (u *BatchUpdater) getTableUsage() (usage []cqapi.UsageIncreaseTablesInner, return usage, u.rows } +func (u *BatchUpdater) subtractTableUsageForAWSMarketplace(total uint32) { + for table := range u.tables { + tableTotal := u.tables[table] + if tableTotal < 1 { + continue + } + if tableTotal > total { + u.tables[table] -= total + total = 0 + } else { + u.tables[table] = 0 + total -= tableTotal + } + } +} func (u *BatchUpdater) subtractTableUsage(usage []cqapi.UsageIncreaseTablesInner, total uint32) { u.Lock() defer u.Unlock() + if u.awsMarketplaceClient != nil { + u.subtractTableUsageForAWSMarketplace(total) + return + } for _, table := range usage { u.tables[table.Name] -= uint32(table.Rows) @@ -412,6 +447,12 @@ func (u *BatchUpdater) backgroundUpdater() { // Not enough rows to update continue } + // If we are using AWS Marketplace, we need to round down to the nearest 1000 + // Only on the last update, will we round up to the nearest 1000 + // This will allow us to not over charge the customer by rounding on each batch + if u.awsMarketplaceClient != nil { + totals = roundDown(totals, 1000) + } if err := u.updateUsageWithRetryAndBackoff(ctx, totals, tables); err != nil { log.Warn().Err(err).Msg("failed to update usage") @@ -440,11 +481,19 @@ func (u *BatchUpdater) backgroundUpdater() { case <-u.done: tables, totals := u.getTableUsage() if totals != 0 { + // To allow us to round up the total in the last batch we need to save the original total + // to use in the last subtractTableUsage + originalTotals := totals + + // If we are using AWS Marketplace, we need to round up to the nearest 1000 + if u.awsMarketplaceClient != nil { + totals = roundUp(totals, 1000) + } if err := u.updateUsageWithRetryAndBackoff(ctx, totals, tables); err != nil { u.closeError <- err return } - u.subtractTableUsage(tables, totals) + u.subtractTableUsage(tables, originalTotals) } u.closeError <- nil return @@ -454,35 +503,28 @@ func (u *BatchUpdater) backgroundUpdater() { <-started } -func (u *BatchUpdater) reportUsageToAWSMarketplace(ctx context.Context, rows uint32, tables []cqapi.UsageIncreaseTablesInner) error { +func (u *BatchUpdater) reportUsageToAWSMarketplace(ctx context.Context, rows uint32) error { + // AWS marketplace requires usage to be reported as groups of 1000 + rows /= 1000 + usage := []types.UsageAllocation{{ + AllocatedUsageQuantity: aws.Int32(int32(rows)), + Tags: []types.Tag{ + { + Key: aws.String("plugin_name"), + Value: aws.String(u.pluginMeta.Name), + }, + { + Key: aws.String("plugin_team"), + Value: aws.String(u.pluginMeta.Team), + }, + { + Key: aws.String("plugin_kind"), + Value: aws.String(string(u.pluginMeta.Kind)), + }, + }, + }} // Timestamp + UsageDimension + UsageQuantity are required fields and must be unique // since Timestamp only maintains a granularity of seconds, we need to ensure our batch size is large enough - - usage := make([]types.UsageAllocation, len(tables)) - for i, table := range tables { - usage[i] = types.UsageAllocation{ - AllocatedUsageQuantity: aws.Int32(int32(table.Rows)), - Tags: []types.Tag{ - { - Key: aws.String("plugin_name"), - Value: aws.String(u.pluginMeta.Name), - }, - { - Key: aws.String("plugin_team"), - Value: aws.String(u.pluginMeta.Team), - }, - { - Key: aws.String("plugin_kind"), - Value: aws.String(string(u.pluginMeta.Kind)), - }, - { - Key: aws.String("table_name"), - Value: aws.String(table.Name), - }, - }, - } - } - _, err := u.awsMarketplaceClient.MeterUsage(ctx, &marketplacemetering.MeterUsageInput{ // Product code is a unique identifier for a product in AWS Marketplace // Each product is given a unique product code when it is listed in AWS Marketplace @@ -506,7 +548,7 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, rows // If the AWS Marketplace client is set, use it to track usage if u.awsMarketplaceClient != nil { - return u.reportUsageToAWSMarketplace(ctx, rows, tables) + return u.reportUsageToAWSMarketplace(ctx, rows) } payload := cqapi.IncreaseTeamPluginUsageJSONRequestBody{ RequestId: uuid.New(), @@ -663,3 +705,23 @@ func (NoOpUsageClient) IncreaseForTable(_ string, _ uint32) error { func (NoOpUsageClient) Close() error { return nil } +func roundDown(x, unit uint32) uint32 { + a := (x / unit) * unit + b := a + unit + if a < b { + return a + } + return b +} + +func roundUp(x, unit uint32) uint32 { + a := (x / unit) * unit + b := a + unit + + if (x - a) < (b - x) { + return a + } + return b +} + + From 55382b87889b4b5d4dbe18415f7b8cf172cf5eab Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 1 Aug 2024 23:11:05 -0500 Subject: [PATCH 14/22] Update usage_test.go --- premium/usage_test.go | 81 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/premium/usage_test.go b/premium/usage_test.go index b8e09a6fc9..d7408c4b09 100644 --- a/premium/usage_test.go +++ b/premium/usage_test.go @@ -12,10 +12,16 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/marketplacemetering" + "github.com/aws/aws-sdk-go-v2/service/marketplacemetering/types" cqapi "github.com/cloudquery/cloudquery-api-go" "github.com/cloudquery/cloudquery-api-go/auth" "github.com/cloudquery/cloudquery-api-go/config" + "github.com/cloudquery/plugin-sdk/v4/faker" "github.com/cloudquery/plugin-sdk/v4/plugin" + "github.com/cloudquery/plugin-sdk/v4/premium/mocks" + "github.com/golang/mock/gomock" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -344,6 +350,54 @@ func TestUsageService_IncreaseForTable_CorrectByTable(t *testing.T) { } } +func TestUsageService_AWSMarketplaceDone(t *testing.T) { + var err error + ctrl := gomock.NewController(t) + + m := mocks.NewMockAWSMarketplaceClientInterface(ctrl) + + out := marketplacemetering.MeterUsageOutput{} + in := meteringInput{ + MeterUsageInput: marketplacemetering.MeterUsageInput{ + ProductCode: aws.String("2a8bdkarwqrp0tmo4errl65s7"), + UsageDimension: aws.String("rows"), + UsageQuantity: aws.Int32(20), + UsageAllocations: []types.UsageAllocation{{ + AllocatedUsageQuantity: aws.Int32(int32(20)), + Tags: []types.Tag{ + { + Key: aws.String("plugin_name"), + Value: aws.String("vault"), + }, + { + Key: aws.String("plugin_team"), + Value: aws.String("plugin-team"), + }, + { + Key: aws.String("plugin_kind"), + Value: aws.String("source"), + }, + }, + }}, + }, + } + assert.NoError(t, faker.FakeObject(&out)) + m.EXPECT().MeterUsage(gomock.Any(), in).Return(&out, nil) + t.Setenv("CQ_AWS_MARKETPLACE_CONTAINER", "true") + usageClient := newClient(t, nil, WithBatchLimit(50), WithAWSMarketplaceClient(m)) + + // This will generate 19,998 rows + // We expect that there will be 20 rows reported to AWS Marketplace + rows := 9999 + for i := 0; i < rows; i++ { + err = usageClient.IncreaseForTable("table", 2) + require.NoError(t, err) + } + + err = usageClient.Close() + require.NoError(t, err) +} + func TestUsageService_Increase_ErrorOnMixingMethods(t *testing.T) { s := createTestServer(t) defer s.server.Close() @@ -826,3 +880,30 @@ func Test_UsageClientInit_UnknownTokenType(t *testing.T) { }) } } + +type meteringInput struct { + marketplacemetering.MeterUsageInput +} + +func (mi meteringInput) Matches(x any) bool { + testInput, ok := x.(*marketplacemetering.MeterUsageInput) + if !ok { + return false + } + + if aws.ToString(testInput.ProductCode) != aws.ToString(mi.ProductCode) { + return false + } + if aws.ToString(testInput.UsageDimension) != aws.ToString(mi.UsageDimension) { + return false + } + if aws.ToInt32(testInput.UsageQuantity) != aws.ToInt32(mi.UsageQuantity) { + return false + } + + return true +} + +func (mi meteringInput) String() string { + return fmt.Sprintf("{ProductCode:%s UsageDimension:%s UsageQuantity:%d}", aws.ToString(mi.ProductCode), aws.ToString(mi.UsageDimension), aws.ToInt32(mi.UsageQuantity)) +} From a8a453ee3bb9610025f58869ec292fea6f745e7c Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 1 Aug 2024 23:11:07 -0500 Subject: [PATCH 15/22] Create marketplacemetering.go --- premium/mocks/marketplacemetering.go | 255 +++++++++++++++++++++++++++ 1 file changed, 255 insertions(+) create mode 100644 premium/mocks/marketplacemetering.go diff --git a/premium/mocks/marketplacemetering.go b/premium/mocks/marketplacemetering.go new file mode 100644 index 0000000000..911d6d576d --- /dev/null +++ b/premium/mocks/marketplacemetering.go @@ -0,0 +1,255 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: usage.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + marketplacemetering "github.com/aws/aws-sdk-go-v2/service/marketplacemetering" + auth "github.com/cloudquery/cloudquery-api-go/auth" + gomock "github.com/golang/mock/gomock" +) + +// MockAWSMarketplaceClientInterface is a mock of AWSMarketplaceClientInterface interface. +type MockAWSMarketplaceClientInterface struct { + ctrl *gomock.Controller + recorder *MockAWSMarketplaceClientInterfaceMockRecorder +} + +// MockAWSMarketplaceClientInterfaceMockRecorder is the mock recorder for MockAWSMarketplaceClientInterface. +type MockAWSMarketplaceClientInterfaceMockRecorder struct { + mock *MockAWSMarketplaceClientInterface +} + +// NewMockAWSMarketplaceClientInterface creates a new mock instance. +func NewMockAWSMarketplaceClientInterface(ctrl *gomock.Controller) *MockAWSMarketplaceClientInterface { + mock := &MockAWSMarketplaceClientInterface{ctrl: ctrl} + mock.recorder = &MockAWSMarketplaceClientInterfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockAWSMarketplaceClientInterface) EXPECT() *MockAWSMarketplaceClientInterfaceMockRecorder { + return m.recorder +} + +// MeterUsage mocks base method. +func (m *MockAWSMarketplaceClientInterface) MeterUsage(ctx context.Context, params *marketplacemetering.MeterUsageInput, optFns ...func(*marketplacemetering.Options)) (*marketplacemetering.MeterUsageOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "MeterUsage", varargs...) + ret0, _ := ret[0].(*marketplacemetering.MeterUsageOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MeterUsage indicates an expected call of MeterUsage. +func (mr *MockAWSMarketplaceClientInterfaceMockRecorder) MeterUsage(ctx, params interface{}, optFns ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MeterUsage", reflect.TypeOf((*MockAWSMarketplaceClientInterface)(nil).MeterUsage), varargs...) +} + +// MockTokenClient is a mock of TokenClient interface. +type MockTokenClient struct { + ctrl *gomock.Controller + recorder *MockTokenClientMockRecorder +} + +// MockTokenClientMockRecorder is the mock recorder for MockTokenClient. +type MockTokenClientMockRecorder struct { + mock *MockTokenClient +} + +// NewMockTokenClient creates a new mock instance. +func NewMockTokenClient(ctrl *gomock.Controller) *MockTokenClient { + mock := &MockTokenClient{ctrl: ctrl} + mock.recorder = &MockTokenClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTokenClient) EXPECT() *MockTokenClientMockRecorder { + return m.recorder +} + +// GetToken mocks base method. +func (m *MockTokenClient) GetToken() (auth.Token, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetToken") + ret0, _ := ret[0].(auth.Token) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetToken indicates an expected call of GetToken. +func (mr *MockTokenClientMockRecorder) GetToken() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetToken", reflect.TypeOf((*MockTokenClient)(nil).GetToken)) +} + +// GetTokenType mocks base method. +func (m *MockTokenClient) GetTokenType() auth.TokenType { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTokenType") + ret0, _ := ret[0].(auth.TokenType) + return ret0 +} + +// GetTokenType indicates an expected call of GetTokenType. +func (mr *MockTokenClientMockRecorder) GetTokenType() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTokenType", reflect.TypeOf((*MockTokenClient)(nil).GetTokenType)) +} + +// MockQuotaMonitor is a mock of QuotaMonitor interface. +type MockQuotaMonitor struct { + ctrl *gomock.Controller + recorder *MockQuotaMonitorMockRecorder +} + +// MockQuotaMonitorMockRecorder is the mock recorder for MockQuotaMonitor. +type MockQuotaMonitorMockRecorder struct { + mock *MockQuotaMonitor +} + +// NewMockQuotaMonitor creates a new mock instance. +func NewMockQuotaMonitor(ctrl *gomock.Controller) *MockQuotaMonitor { + mock := &MockQuotaMonitor{ctrl: ctrl} + mock.recorder = &MockQuotaMonitorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockQuotaMonitor) EXPECT() *MockQuotaMonitorMockRecorder { + return m.recorder +} + +// HasQuota mocks base method. +func (m *MockQuotaMonitor) HasQuota(arg0 context.Context) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HasQuota", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HasQuota indicates an expected call of HasQuota. +func (mr *MockQuotaMonitorMockRecorder) HasQuota(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasQuota", reflect.TypeOf((*MockQuotaMonitor)(nil).HasQuota), arg0) +} + +// TeamName mocks base method. +func (m *MockQuotaMonitor) TeamName() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TeamName") + ret0, _ := ret[0].(string) + return ret0 +} + +// TeamName indicates an expected call of TeamName. +func (mr *MockQuotaMonitorMockRecorder) TeamName() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TeamName", reflect.TypeOf((*MockQuotaMonitor)(nil).TeamName)) +} + +// MockUsageClient is a mock of UsageClient interface. +type MockUsageClient struct { + ctrl *gomock.Controller + recorder *MockUsageClientMockRecorder +} + +// MockUsageClientMockRecorder is the mock recorder for MockUsageClient. +type MockUsageClientMockRecorder struct { + mock *MockUsageClient +} + +// NewMockUsageClient creates a new mock instance. +func NewMockUsageClient(ctrl *gomock.Controller) *MockUsageClient { + mock := &MockUsageClient{ctrl: ctrl} + mock.recorder = &MockUsageClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockUsageClient) EXPECT() *MockUsageClientMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockUsageClient) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockUsageClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockUsageClient)(nil).Close)) +} + +// HasQuota mocks base method. +func (m *MockUsageClient) HasQuota(arg0 context.Context) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HasQuota", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HasQuota indicates an expected call of HasQuota. +func (mr *MockUsageClientMockRecorder) HasQuota(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasQuota", reflect.TypeOf((*MockUsageClient)(nil).HasQuota), arg0) +} + +// Increase mocks base method. +func (m *MockUsageClient) Increase(arg0 uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Increase", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Increase indicates an expected call of Increase. +func (mr *MockUsageClientMockRecorder) Increase(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Increase", reflect.TypeOf((*MockUsageClient)(nil).Increase), arg0) +} + +// IncreaseForTable mocks base method. +func (m *MockUsageClient) IncreaseForTable(arg0 string, arg1 uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IncreaseForTable", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// IncreaseForTable indicates an expected call of IncreaseForTable. +func (mr *MockUsageClientMockRecorder) IncreaseForTable(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncreaseForTable", reflect.TypeOf((*MockUsageClient)(nil).IncreaseForTable), arg0, arg1) +} + +// TeamName mocks base method. +func (m *MockUsageClient) TeamName() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TeamName") + ret0, _ := ret[0].(string) + return ret0 +} + +// TeamName indicates an expected call of TeamName. +func (mr *MockUsageClientMockRecorder) TeamName() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TeamName", reflect.TypeOf((*MockUsageClient)(nil).TeamName)) +} From 1cd7d2a5c9102d3cc9582aa4cbb2ecda553de694 Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 1 Aug 2024 23:11:09 -0500 Subject: [PATCH 16/22] Update go.mod --- go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/go.mod b/go.mod index d390d562dd..c06faa7fd9 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/cloudquery/plugin-pb-go v1.20.8 github.com/cloudquery/plugin-sdk/v2 v2.7.0 github.com/goccy/go-json v0.10.3 + github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 From 2727ba1c7a5c5ea14108897b7b7a742572592aba Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 1 Aug 2024 23:11:12 -0500 Subject: [PATCH 17/22] Update go.sum --- go.sum | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/go.sum b/go.sum index 5a107afb5b..612d9a2bba 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -141,6 +143,7 @@ github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= @@ -169,14 +172,27 @@ go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+ go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -184,10 +200,19 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.15.0 h1:2lYxjRbTYyxkJxlhC+LvJIx3SsANPdRybu1tGj9/OrQ= From b0e32fc5f5aa52180d1fc27b9d493d95dc2bda43 Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 1 Aug 2024 23:12:34 -0500 Subject: [PATCH 18/22] Update go.mod --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index c06faa7fd9..e0079aae5c 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,6 @@ module github.com/cloudquery/plugin-sdk/v4 go 1.21.11 -replace github.com/cloudquery/plugin-sdk/v4 => ../../../../plugin-sdk require ( github.com/apache/arrow/go/v17 v17.0.0 From 079ed11d754a4b7351f351ad8f4c13fb150cbc4f Mon Sep 17 00:00:00 2001 From: Ben Bernays Date: Fri, 2 Aug 2024 08:14:05 -0500 Subject: [PATCH 19/22] Apply suggestions from code review Co-authored-by: Herman Schaaf --- premium/usage.go | 19 ++++++----------- premium/usage_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 2765997979..4cb294ac9e 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -403,7 +403,7 @@ func (u *BatchUpdater) subtractTableUsageForAWSMarketplace(total uint32) { if tableTotal < 1 { continue } - if tableTotal > total { + if tableTotal >= total { u.tables[table] -= total total = 0 } else { @@ -705,23 +705,16 @@ func (NoOpUsageClient) IncreaseForTable(_ string, _ uint32) error { func (NoOpUsageClient) Close() error { return nil } + func roundDown(x, unit uint32) uint32 { - a := (x / unit) * unit - b := a + unit - if a < b { - return a - } - return b + return x - (x % unit) } func roundUp(x, unit uint32) uint32 { - a := (x / unit) * unit - b := a + unit - - if (x - a) < (b - x) { - return a + if x%unit == 0 { + return x } - return b + return x + (unit - x%unit) } diff --git a/premium/usage_test.go b/premium/usage_test.go index d7408c4b09..fb5f82a041 100644 --- a/premium/usage_test.go +++ b/premium/usage_test.go @@ -907,3 +907,50 @@ func (mi meteringInput) Matches(x any) bool { func (mi meteringInput) String() string { return fmt.Sprintf("{ProductCode:%s UsageDimension:%s UsageQuantity:%d}", aws.ToString(mi.ProductCode), aws.ToString(mi.UsageDimension), aws.ToInt32(mi.UsageQuantity)) } + + +func TestRoundDown(t *testing.T) { + cases := []struct { + x uint32 + unit uint32 + want uint32 + }{ + {1000, 1000, 1000}, + {2000, 1000, 2000}, + {2001, 1000, 2000}, + {2999, 1000, 2000}, + {0, 1000, 0}, + {123, 1000, 0}, + {2500, 1000, 2000}, + } + + for _, c := range cases { + got := roundDown(c.x, c.unit) + if got != c.want { + t.Errorf("roundDown(%d, %d) == %d, want %d", c.x, c.unit, got, c.want) + } + } +} + +func TestRoundUp(t *testing.T) { + cases := []struct { + x uint32 + unit uint32 + want uint32 + }{ + {1000, 1000, 1000}, + {2000, 1000, 2000}, + {2001, 1000, 3000}, + {2999, 1000, 3000}, + {0, 1000, 0}, + {123, 1000, 1000}, + {2500, 1000, 3000}, + } + + for _, c := range cases { + got := roundUp(c.x, c.unit) + if got != c.want { + t.Errorf("roundUp(%d, %d) == %d, want %d", c.x, c.unit, got, c.want) + } + } +} From c8be2ae5b9f9f29b4a1c7ed60d3703cdab705355 Mon Sep 17 00:00:00 2001 From: bbernays Date: Fri, 2 Aug 2024 08:20:29 -0500 Subject: [PATCH 20/22] Update usage.go --- premium/usage.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 4cb294ac9e..0de446d2e0 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -405,11 +405,11 @@ func (u *BatchUpdater) subtractTableUsageForAWSMarketplace(total uint32) { } if tableTotal >= total { u.tables[table] -= total - total = 0 - } else { - u.tables[table] = 0 - total -= tableTotal + // we can return early because we have subtracted enough rows + return } + u.tables[table] = 0 + total -= tableTotal } } func (u *BatchUpdater) subtractTableUsage(usage []cqapi.UsageIncreaseTablesInner, total uint32) { @@ -471,7 +471,12 @@ func (u *BatchUpdater) backgroundUpdater() { if totals == 0 { continue } - + // If we are using AWS Marketplace, we need to round down to the nearest 1000 + // Only on the last update, will we round up to the nearest 1000 + // This will allow us to not over charge the customer by rounding on each batch + if u.awsMarketplaceClient != nil { + totals = roundDown(totals, 1000) + } if err := u.updateUsageWithRetryAndBackoff(ctx, totals, tables); err != nil { log.Warn().Err(err).Msg("failed to update usage") continue @@ -716,5 +721,3 @@ func roundUp(x, unit uint32) uint32 { } return x + (unit - x%unit) } - - From 15281cc526a2d8825664a77f5c50f6f807601c19 Mon Sep 17 00:00:00 2001 From: bbernays Date: Fri, 2 Aug 2024 08:20:32 -0500 Subject: [PATCH 21/22] Update usage_test.go --- premium/usage_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/premium/usage_test.go b/premium/usage_test.go index fb5f82a041..cf66046074 100644 --- a/premium/usage_test.go +++ b/premium/usage_test.go @@ -908,7 +908,6 @@ func (mi meteringInput) String() string { return fmt.Sprintf("{ProductCode:%s UsageDimension:%s UsageQuantity:%d}", aws.ToString(mi.ProductCode), aws.ToString(mi.UsageDimension), aws.ToInt32(mi.UsageQuantity)) } - func TestRoundDown(t *testing.T) { cases := []struct { x uint32 From 5730106a2fcd7f216a7ecb928a6b2ca0f35c9cf0 Mon Sep 17 00:00:00 2001 From: bbernays Date: Fri, 2 Aug 2024 08:29:26 -0500 Subject: [PATCH 22/22] Update usage_test.go --- premium/usage_test.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/premium/usage_test.go b/premium/usage_test.go index cf66046074..78cb7a576a 100644 --- a/premium/usage_test.go +++ b/premium/usage_test.go @@ -924,10 +924,7 @@ func TestRoundDown(t *testing.T) { } for _, c := range cases { - got := roundDown(c.x, c.unit) - if got != c.want { - t.Errorf("roundDown(%d, %d) == %d, want %d", c.x, c.unit, got, c.want) - } + assert.Equal(t, c.want, roundDown(c.x, c.unit)) } } @@ -947,9 +944,6 @@ func TestRoundUp(t *testing.T) { } for _, c := range cases { - got := roundUp(c.x, c.unit) - if got != c.want { - t.Errorf("roundUp(%d, %d) == %d, want %d", c.x, c.unit, got, c.want) - } + assert.Equal(t, c.want, roundUp(c.x, c.unit)) } }