Skip to content

Commit

Permalink
Update usage.go
Browse files Browse the repository at this point in the history
  • Loading branch information
bbernays committed Jul 16, 2024
1 parent 4aa4422 commit 9ea3d5b
Showing 1 changed file with 52 additions and 36 deletions.
88 changes: 52 additions & 36 deletions premium/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type BatchUpdater struct {
apiClient *cqapi.ClientWithResponses
tokenClient TokenClient

awsMarketPlaceClient *marketplacemetering.Client
awsMarketplaceClient *marketplacemetering.Client

// Plugin details
teamName cqapi.TeamName
Expand Down Expand Up @@ -213,7 +213,7 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}

u.awsMarketPlaceClient = marketplacemetering.NewFromConfig(cfg)
u.awsMarketplaceClient = marketplacemetering.NewFromConfig(cfg)
u.teamName = "AWS_MARKETPLACE"
// This needs to be larger than normal, because we can only send a single usage record per second (from each compute node)
u.batchLimit = 1000000000
Expand Down Expand Up @@ -261,16 +261,12 @@ func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, e
}

func isAWSMarketplace() bool {
return os.Getenv("CQ_AWS_MARKETPLACE_CONTAINER") == "true" || os.Getenv("CQ_AWS_MARKETPLACE_AMI") == "true"
return os.Getenv("CQ_AWS_MARKETPLACE_CONTAINER") == "true"
}

func awsMarketplaceProductCode() string {
if os.Getenv("CQ_AWS_MARKETPLACE_CONTAINER") == "true" {
return "PLACEHOLDER"
}
if os.Getenv("CQ_AWS_MARKETPLACE_AMI") == "true" {
// TODO: This is a placeholder product code, we need to get the real one from AWS Marketplace
return "PLACEHOLDER"
return "2a8bdkarwqrp0tmo4errl65s7"
}
return ""
}
Expand Down Expand Up @@ -342,7 +338,7 @@ func (u *BatchUpdater) TeamName() string {
}

func (u *BatchUpdater) HasQuota(ctx context.Context) (bool, error) {
if u.awsMarketPlaceClient != nil {
if u.awsMarketplaceClient != nil {
return true, nil
}
u.logger.Debug().Str("url", u.url).Str("team", u.teamName).Str("pluginTeam", u.pluginMeta.Team).Str("pluginKind", string(u.pluginMeta.Kind)).Str("pluginName", u.pluginMeta.Name).Msg("checking quota")
Expand Down Expand Up @@ -453,39 +449,59 @@ func (u *BatchUpdater) backgroundUpdater() {
<-started
}

func (u *BatchUpdater) reportUsageToAWSMarketplace(ctx context.Context, rows uint32, tables []cqapi.UsageIncreaseTablesInner) error {
// Timestamp + UsageDimension + UsageQuantity are required fields and must be unique
// since Timestamp only maintains a granularity of seconds, we need to ensure our batch size is large enough

usage := make([]types.UsageAllocation, len(tables))
for i, table := range tables {
usage[i] = types.UsageAllocation{
AllocatedUsageQuantity: aws.Int32(int32(table.Rows)),
Tags: []types.Tag{
{
Key: aws.String("plugin_name"),
Value: aws.String(u.pluginMeta.Name),
},
{
Key: aws.String("plugin_team"),
Value: aws.String(u.pluginMeta.Team),
},
{
Key: aws.String("plugin_kind"),
Value: aws.String(string(u.pluginMeta.Kind)),
},
{
Key: aws.String("table_name"),
Value: aws.String(table.Name),
},
},
}
}

_, err := u.awsMarketplaceClient.MeterUsage(ctx, &marketplacemetering.MeterUsageInput{
// Product code is a unique identifier for a product in AWS Marketplace
// Each product is given a unique product code when it is listed in AWS Marketplace
// in the future we can have multiple product codes for container or AMI based listings
ProductCode: aws.String(awsMarketplaceProductCode()),
Timestamp: aws.Time(time.Now()),
UsageDimension: aws.String("rows"),
UsageAllocations: usage,
UsageQuantity: aws.Int32(int32(rows)),
})
if err != nil {
return fmt.Errorf("failed to update usage with : %w", err)
}
return nil
}

func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, rows uint32, tables []cqapi.UsageIncreaseTablesInner) error {
for retry := 0; retry < u.maxRetries; retry++ {
u.logger.Debug().Str("url", u.url).Int("try", retry).Int("max_retries", u.maxRetries).Uint32("rows", rows).Msg("updating usage")
queryStartTime := time.Now()

// If the AWS Marketplace client is set, use it to track usage
if u.awsMarketPlaceClient != nil {
// Timestamp + UsageDimension + UsageQuantity are required fields and must be unique
// since Timestamp only maintains a granularity of seconds, we need to ensure our batch size is large enough
_, err := u.awsMarketPlaceClient.MeterUsage(ctx, &marketplacemetering.MeterUsageInput{
// Product code is a unique identifier for a product in AWS Marketplace
// Each product is given a unique product code when it is listed in AWS Marketplace
// in the future we can have multiple product codes for container or AMI based listings
ProductCode: aws.String(awsMarketplaceProductCode()),
Timestamp: aws.Time(time.Now()),
UsageDimension: aws.String("rows"),
UsageAllocations: []types.UsageAllocation{
{
AllocatedUsageQuantity: aws.Int32(int32(rows)),
Tags: []types.Tag{
{
Key: aws.String("plugin"),
Value: aws.String(fmt.Sprintf("%s/%s/%s", u.pluginMeta.Team, u.pluginMeta.Kind, u.pluginMeta.Name)),
},
},
},
},
UsageQuantity: aws.Int32(int32(rows)),
})
if err != nil {
return fmt.Errorf("failed to update usage with : %w", err)
}
return nil
if u.awsMarketplaceClient != nil {
return u.reportUsageToAWSMarketplace(ctx, rows, tables)
}
payload := cqapi.IncreaseTeamPluginUsageJSONRequestBody{
RequestId: uuid.New(),
Expand Down

0 comments on commit 9ea3d5b

Please sign in to comment.