Skip to content

Commit

Permalink
dynamodb: updated get all query and added configurable limit (#230)
Browse files Browse the repository at this point in the history
* dynamodb: updated get all query and added configurable limit

* updated to latest version of aws-sdk

* current progress

* condition on expires

* update changelog
  • Loading branch information
kristinapathak authored Apr 12, 2022
1 parent 9ba1411 commit 2a17eda
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 64 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.9.0]
- Added query condition for GetAll in dynamodb that requires a secondary index in the database. [#230](https://github.com/xmidt-org/argus/pull/230)
- Added a configurable GetAll limit and metric for dynamodb to minimize performance issues from getting too many records. [#230](https://github.com/xmidt-org/argus/pull/230)

## [v0.8.0]
- Removed setLogger func dependency in chrysom basic client. [#228](https://github.com/xmidt-org/argus/pull/228)
- Fixed chrysom basic client fallback to a non-context logger. [#228](https://github.com/xmidt-org/argus/pull/228)
Expand Down Expand Up @@ -192,7 +196,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [v0.1.0] Tue May 07 2020 Jack Murdock - 0.1.0
- initial creation

[Unreleased]: https://github.com/xmidt-org/argus/compare/v0.8.0...HEAD
[Unreleased]: https://github.com/xmidt-org/argus/compare/v0.9.0...HEAD
[v0.9.0]: https://github.com/xmidt-org/argus/compare/v0.8.0...v0.9.0
[v0.8.0]: https://github.com/xmidt-org/argus/compare/v0.7.0...v0.8.0
[v0.7.0]: https://github.com/xmidt-org/argus/compare/v0.6.0...v0.7.0
[v0.6.0]: https://github.com/xmidt-org/argus/compare/v0.5.2...v0.6.0
Expand Down
4 changes: 4 additions & 0 deletions argus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ store:
# maxRetires is the maximum times the application will retry the request to the db.
# (Optional) default: 3
maxRetries: 3

# getAllLimit is the maximum number of items to get at a time.
# (Optional) defaults to no limit
getAllLimit: 50

# accessKey is the AWS accessKey to access dynamodb.
accessKey: "accessKey"
Expand Down
4 changes: 4 additions & 0 deletions deploy/packaging/argus_spruce.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ store:
# (Optional) default: 3
maxRetries: (( grab $AWS_RETRIES || "3" ))

# getAllLimit is the maximum number of items to get at a time.
# (Optional) defaults to no limit
getAllLimit: (( grab $AWS_GET_ALL_LIMIT || "50" ))

# accessKey is the AWS accessKey to access dynamodb.
accessKey: (( grab $AWS_ACCESS_KEY_ID || "accessKey" ))

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
emperror.dev/emperror v0.33.0
github.com/aws/aws-sdk-go v1.31.6
github.com/aws/aws-sdk-go v1.43.35
github.com/go-kit/kit v0.10.0
github.com/go-playground/validator/v10 v10.3.0
github.com/gocql/gocql v0.0.0-20200505093417-effcbd8bcf0e
Expand Down
48 changes: 15 additions & 33 deletions go.sum

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions store/db/metric/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (

// DynamoDB-specific metrics.
DynamodbConsumedCapacityCounter = "dynamodb_consumed_capacity_total"
DynamodbGetAllGauge = "dynamodb_get_all_results"
)

// Metric label keys.
Expand Down Expand Up @@ -90,6 +91,13 @@ func ProvideMetrics() fx.Option {
QueryTypeLabelKey,
DynamoCapacityOpLabelKey,
),

touchstone.Gauge(
prometheus.GaugeOpts{
Name: DynamodbGetAllGauge,
Help: "Amount of records returned for a GetAll dynamodb request.",
},
),
)
}

Expand All @@ -98,4 +106,5 @@ type Measures struct {
Queries *prometheus.CounterVec `name:"db_queries_total"`
QueryDurationSeconds prometheus.ObserverVec `name:"db_query_duration_seconds"`
DynamodbConsumedCapacity *prometheus.CounterVec `name:"dynamodb_consumed_capacity_total"`
DynamodbGetAllGauge prometheus.Gauge `name:"dynamodb_get_all_results"`
}
6 changes: 5 additions & 1 deletion store/dynamodb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type Config struct {
// (Optional) Defaults to 3.
MaxRetries int

// GetAllLimit is the maximum number of items to get at a time.
// (Optional) defaults to no limit
GetAllLimit int

// AccessKey is the AWS AccessKey credential.
AccessKey string `validate:"required"`

Expand Down Expand Up @@ -111,7 +115,7 @@ func NewDynamoDB(config Config, measures metric.Measures) (store.S, error) {
SecretAccessKey: config.SecretKey,
}))

svc, err := newService(awsConfig, "", config.Table)
svc, err := newService(awsConfig, "", config.Table, int64(config.GetAllLimit), &measures)
if err != nil {
return nil, err
}
Expand Down
45 changes: 39 additions & 6 deletions store/dynamodb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package dynamodb

import (
"errors"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -26,6 +28,11 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/xmidt-org/argus/model"
"github.com/xmidt-org/argus/store"
"github.com/xmidt-org/argus/store/db/metric"
)

var (
errNilMeasures = errors.New("measures cannot be nil")
)

// client captures the methods of interest from the dynamoDB API. This
Expand Down Expand Up @@ -55,7 +62,12 @@ type executor struct {
// tableName is the name of the dynamodb table
tableName string

// getAllLimit is the maximum number of records to return for a GetAll
getAllLimit int64

now func() time.Time

measures *metric.Measures
}

type storableItem struct {
Expand Down Expand Up @@ -184,8 +196,10 @@ func (d *executor) Delete(key model.Key) (store.OwnableItem, *dynamodb.ConsumedC
//TODO: For data >= 1MB, we'll need to handle pagination
func (d *executor) GetAll(bucket string) (map[string]store.OwnableItem, *dynamodb.ConsumedCapacity, error) {
result := map[string]store.OwnableItem{}
queryResult, err := d.c.Query(&dynamodb.QueryInput{
now := strconv.Itoa(int(d.now().Unix()))
input := &dynamodb.QueryInput{
TableName: aws.String(d.tableName),
IndexName: aws.String("Expires-index"),
KeyConditions: map[string]*dynamodb.Condition{
"bucket": {
ComparisonOperator: aws.String("EQ"),
Expand All @@ -195,9 +209,21 @@ func (d *executor) GetAll(bucket string) (map[string]store.OwnableItem, *dynamod
},
},
},
"expires": {
ComparisonOperator: aws.String("GT"),
AttributeValueList: []*dynamodb.AttributeValue{
{
N: &now,
},
},
},
},
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
})
}
if d.getAllLimit > 0 {
input.Limit = &d.getAllLimit
}
queryResult, err := d.c.Query(input)

var consumedCapacity *dynamodb.ConsumedCapacity
if queryResult != nil {
Expand All @@ -206,6 +232,7 @@ func (d *executor) GetAll(bucket string) (map[string]store.OwnableItem, *dynamod
if err != nil {
return map[string]store.OwnableItem{}, consumedCapacity, err
}
d.measures.DynamodbGetAllGauge.Set(float64(len(queryResult.Items)))

for _, i := range queryResult.Items {
item := new(storableItem)
Expand Down Expand Up @@ -236,7 +263,11 @@ func itemNotFound(item *storableItem) bool {
return item.Key.Bucket == "" || item.Key.ID == ""
}

func newService(config aws.Config, awsProfile string, tableName string) (service, error) {
func newService(config aws.Config, awsProfile string, tableName string, getAllLimit int64, measures *metric.Measures) (service, error) {
if measures == nil {
return nil, errNilMeasures
}

sess, err := session.NewSessionWithOptions(session.Options{
Config: config,
Profile: awsProfile,
Expand All @@ -248,8 +279,10 @@ func newService(config aws.Config, awsProfile string, tableName string) (service
}

return &executor{
c: dynamodb.New(sess),
tableName: tableName,
now: time.Now,
c: dynamodb.New(sess),
tableName: tableName,
getAllLimit: getAllLimit,
now: time.Now,
measures: measures,
}, nil
}
62 changes: 40 additions & 22 deletions store/dynamodb/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/argus/model"
"github.com/xmidt-org/argus/store"
"github.com/xmidt-org/argus/store/db/metric"
"github.com/xmidt-org/touchstone/touchtest"
)

var (
Expand Down Expand Up @@ -149,14 +153,45 @@ func TestGetAll(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.Description, func(t *testing.T) {
assert := assert.New(t)
m := new(mockClient)
client := new(mockClient)
testAssert := touchtest.New(t)
expectedRegistry := prometheus.NewPedanticRegistry()
expectedMeasures := &metric.Measures{
DynamodbGetAllGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "testGetAllGauge",
Help: "testGetAllGauge",
},
),
}
expectedRegistry.MustRegister(expectedMeasures.DynamodbGetAllGauge)
if tc.QueryOutput != nil {
expectedMeasures.DynamodbGetAllGauge.Set(float64(len(tc.QueryOutput.Items)))
}
actualRegistry := prometheus.NewPedanticRegistry()
m := &metric.Measures{
DynamodbGetAllGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "testGetAllGauge",
Help: "testGetAllGauge",
},
),
}
actualRegistry.MustRegister(m.DynamodbGetAllGauge)

svc := executor{
c: m,
tableName: "testTable",
now: nowFunc,
c: client,
tableName: "testTable",
getAllLimit: 10,
now: nowFunc,
measures: m,
}
m.On("Query", getQueryInput()).Return(tc.QueryOutput, tc.QueryErr)
client.On("Query", mock.Anything).Return(tc.QueryOutput, tc.QueryErr)
items, cc, err := svc.GetAll("testBucket")
testAssert.Expect(expectedRegistry)
assert.True(testAssert.GatherAndCompare(actualRegistry,
"testGetAllGauge"))

assert.Equal(tc.ExpectedItems, items)
assert.Equal(tc.ExpectedConsumedCapacity, cc)
assert.Equal(tc.ExpectedErr, err)
Expand Down Expand Up @@ -318,23 +353,6 @@ func getPutItemInput(key model.Key, item store.OwnableItem) *dynamodb.PutItemInp
}
}

func getQueryInput() *dynamodb.QueryInput {
return &dynamodb.QueryInput{
TableName: aws.String("testTable"),
KeyConditions: map[string]*dynamodb.Condition{
"bucket": {
ComparisonOperator: aws.String("EQ"),
AttributeValueList: []*dynamodb.AttributeValue{
{
S: aws.String("testBucket"),
},
},
},
},
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
}
}

func getFilteredQueryOutput(now time.Time, consumedCapacity *dynamodb.ConsumedCapacity) *dynamodb.QueryOutput {
pastExpiration := strconv.Itoa(int(now.Unix() - int64(time.Hour.Seconds())))
futureExpiration := strconv.Itoa(int(now.Add(time.Hour).Unix()))
Expand Down

0 comments on commit 2a17eda

Please sign in to comment.