Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamodb: updated get all query and added configurable limit #230

Merged
merged 5 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions argus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ store:
# maxRetires is the maximum times the application will retry the request to the db.
# (Optional) default: 3
maxRetries: 3

# getAllLimit is the maximum number of items to get at a time.
# (Optional) defaults to no limit
getAllLimit: 50

# accessKey is the AWS accessKey to access dynamodb.
accessKey: "accessKey"
Expand Down
4 changes: 4 additions & 0 deletions deploy/packaging/argus_spruce.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ store:
# (Optional) default: 3
maxRetries: (( grab $AWS_RETRIES || "3" ))

# getAllLimit is the maximum number of items to get at a time.
# (Optional) defaults to no limit
getAllLimit: (( grab $AWS_GET_ALL_LIMIT || "50" ))

# accessKey is the AWS accessKey to access dynamodb.
accessKey: (( grab $AWS_ACCESS_KEY_ID || "accessKey" ))

Expand Down
9 changes: 9 additions & 0 deletions store/db/metric/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (

// DynamoDB-specific metrics.
DynamodbConsumedCapacityCounter = "dynamodb_consumed_capacity_total"
DynamodbGetAllGauge = "dynamodb_get_all_results"
)

// Metric label keys.
Expand Down Expand Up @@ -90,6 +91,13 @@ func ProvideMetrics() fx.Option {
QueryTypeLabelKey,
DynamoCapacityOpLabelKey,
),

touchstone.Gauge(
prometheus.GaugeOpts{
Name: DynamodbGetAllGauge,
Help: "Amount of records returned for a GetAll dynamodb request.",
},
),
)
}

Expand All @@ -98,4 +106,5 @@ type Measures struct {
Queries *prometheus.CounterVec `name:"db_queries_total"`
QueryDurationSeconds prometheus.ObserverVec `name:"db_query_duration_seconds"`
DynamodbConsumedCapacity *prometheus.CounterVec `name:"dynamodb_consumed_capacity_total"`
DynamodbGetAllGauge prometheus.Gauge `name:"dynamodb_get_all_results"`
}
6 changes: 5 additions & 1 deletion store/dynamodb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type Config struct {
// (Optional) Defaults to 3.
MaxRetries int

// GetAllLimit is the maximum number of items to get at a time.
// (Optional) defaults to no limit
GetAllLimit int

// AccessKey is the AWS AccessKey credential.
AccessKey string `validate:"required"`

Expand Down Expand Up @@ -111,7 +115,7 @@ func NewDynamoDB(config Config, measures metric.Measures) (store.S, error) {
SecretAccessKey: config.SecretKey,
}))

svc, err := newService(awsConfig, "", config.Table)
svc, err := newService(awsConfig, "", config.Table, int64(config.GetAllLimit), &measures)
if err != nil {
return nil, err
}
Expand Down
44 changes: 38 additions & 6 deletions store/dynamodb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package dynamodb

import (
"errors"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -26,6 +28,11 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/xmidt-org/argus/model"
"github.com/xmidt-org/argus/store"
"github.com/xmidt-org/argus/store/db/metric"
)

var (
errNilMeasures = errors.New("measures cannot be nil")
)

// client captures the methods of interest from the dynamoDB API. This
Expand Down Expand Up @@ -55,7 +62,12 @@ type executor struct {
// tableName is the name of the dynamodb table
tableName string

// getAllLimit is the maximum number of records to return for a GetAll
getAllLimit int64

now func() time.Time

measures *metric.Measures
}

type storableItem struct {
Expand Down Expand Up @@ -184,7 +196,8 @@ func (d *executor) Delete(key model.Key) (store.OwnableItem, *dynamodb.ConsumedC
//TODO: For data >= 1MB, we'll need to handle pagination
func (d *executor) GetAll(bucket string) (map[string]store.OwnableItem, *dynamodb.ConsumedCapacity, error) {
result := map[string]store.OwnableItem{}
queryResult, err := d.c.Query(&dynamodb.QueryInput{
now := strconv.Itoa(int(d.now().Unix()))
input := &dynamodb.QueryInput{
TableName: aws.String(d.tableName),
KeyConditions: map[string]*dynamodb.Condition{
"bucket": {
Expand All @@ -195,9 +208,21 @@ func (d *executor) GetAll(bucket string) (map[string]store.OwnableItem, *dynamod
},
},
},
"expires": {
ComparisonOperator: aws.String("GT"),
AttributeValueList: []*dynamodb.AttributeValue{
{
N: &now,
},
},
},
},
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
})
}
if d.getAllLimit > 0 {
input.Limit = &d.getAllLimit
}
queryResult, err := d.c.Query(input)

var consumedCapacity *dynamodb.ConsumedCapacity
if queryResult != nil {
Expand All @@ -206,6 +231,7 @@ func (d *executor) GetAll(bucket string) (map[string]store.OwnableItem, *dynamod
if err != nil {
return map[string]store.OwnableItem{}, consumedCapacity, err
}
d.measures.DynamodbGetAllGauge.Set(float64(len(queryResult.Items)))

for _, i := range queryResult.Items {
item := new(storableItem)
Expand Down Expand Up @@ -236,7 +262,11 @@ func itemNotFound(item *storableItem) bool {
return item.Key.Bucket == "" || item.Key.ID == ""
}

func newService(config aws.Config, awsProfile string, tableName string) (service, error) {
func newService(config aws.Config, awsProfile string, tableName string, getAllLimit int64, measures *metric.Measures) (service, error) {
if measures == nil {
return nil, errNilMeasures
}

sess, err := session.NewSessionWithOptions(session.Options{
Config: config,
Profile: awsProfile,
Expand All @@ -248,8 +278,10 @@ func newService(config aws.Config, awsProfile string, tableName string) (service
}

return &executor{
c: dynamodb.New(sess),
tableName: tableName,
now: time.Now,
c: dynamodb.New(sess),
tableName: tableName,
getAllLimit: getAllLimit,
now: time.Now,
measures: measures,
}, nil
}
62 changes: 40 additions & 22 deletions store/dynamodb/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/argus/model"
"github.com/xmidt-org/argus/store"
"github.com/xmidt-org/argus/store/db/metric"
"github.com/xmidt-org/touchstone/touchtest"
)

var (
Expand Down Expand Up @@ -149,14 +153,45 @@ func TestGetAll(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.Description, func(t *testing.T) {
assert := assert.New(t)
m := new(mockClient)
client := new(mockClient)
testAssert := touchtest.New(t)
expectedRegistry := prometheus.NewPedanticRegistry()
expectedMeasures := &metric.Measures{
DynamodbGetAllGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "testGetAllGauge",
Help: "testGetAllGauge",
},
),
}
expectedRegistry.MustRegister(expectedMeasures.DynamodbGetAllGauge)
if tc.QueryOutput != nil {
expectedMeasures.DynamodbGetAllGauge.Set(float64(len(tc.QueryOutput.Items)))
}
actualRegistry := prometheus.NewPedanticRegistry()
m := &metric.Measures{
DynamodbGetAllGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "testGetAllGauge",
Help: "testGetAllGauge",
},
),
}
actualRegistry.MustRegister(m.DynamodbGetAllGauge)

svc := executor{
c: m,
tableName: "testTable",
now: nowFunc,
c: client,
tableName: "testTable",
getAllLimit: 10,
now: nowFunc,
measures: m,
}
m.On("Query", getQueryInput()).Return(tc.QueryOutput, tc.QueryErr)
client.On("Query", mock.Anything).Return(tc.QueryOutput, tc.QueryErr)
items, cc, err := svc.GetAll("testBucket")
testAssert.Expect(expectedRegistry)
assert.True(testAssert.GatherAndCompare(actualRegistry,
"testGetAllGauge"))

assert.Equal(tc.ExpectedItems, items)
assert.Equal(tc.ExpectedConsumedCapacity, cc)
assert.Equal(tc.ExpectedErr, err)
Expand Down Expand Up @@ -318,23 +353,6 @@ func getPutItemInput(key model.Key, item store.OwnableItem) *dynamodb.PutItemInp
}
}

func getQueryInput() *dynamodb.QueryInput {
return &dynamodb.QueryInput{
TableName: aws.String("testTable"),
KeyConditions: map[string]*dynamodb.Condition{
"bucket": {
ComparisonOperator: aws.String("EQ"),
AttributeValueList: []*dynamodb.AttributeValue{
{
S: aws.String("testBucket"),
},
},
},
},
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
}
}

func getFilteredQueryOutput(now time.Time, consumedCapacity *dynamodb.ConsumedCapacity) *dynamodb.QueryOutput {
pastExpiration := strconv.Itoa(int(now.Unix() - int64(time.Hour.Seconds())))
futureExpiration := strconv.Itoa(int(now.Add(time.Hour).Unix()))
Expand Down