Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamodb: updated get all query and added configurable limit #230

Merged
merged 5 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.9.0]
- Added query condition for GetAll in dynamodb that requires a secondary index in the database. [#230](https://github.com/xmidt-org/argus/pull/230)
- Added a configurable GetAll limit and metric for dynamodb to minimize performance issues from getting too many records. [#230](https://github.com/xmidt-org/argus/pull/230)

## [v0.8.0]
- Removed setLogger func dependency in chrysom basic client. [#228](https://github.com/xmidt-org/argus/pull/228)
- Fixed chrysom basic client fallback to a non-context logger. [#228](https://github.com/xmidt-org/argus/pull/228)
Expand Down Expand Up @@ -192,7 +196,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [v0.1.0] Tue May 07 2020 Jack Murdock - 0.1.0
- initial creation

[Unreleased]: https://github.com/xmidt-org/argus/compare/v0.8.0...HEAD
[Unreleased]: https://github.com/xmidt-org/argus/compare/v0.9.0...HEAD
[v0.9.0]: https://github.com/xmidt-org/argus/compare/v0.8.0...v0.9.0
[v0.8.0]: https://github.com/xmidt-org/argus/compare/v0.7.0...v0.8.0
[v0.7.0]: https://github.com/xmidt-org/argus/compare/v0.6.0...v0.7.0
[v0.6.0]: https://github.com/xmidt-org/argus/compare/v0.5.2...v0.6.0
Expand Down
4 changes: 4 additions & 0 deletions argus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ store:
# maxRetires is the maximum times the application will retry the request to the db.
# (Optional) default: 3
maxRetries: 3

# getAllLimit is the maximum number of items to get at a time.
# (Optional) defaults to no limit
getAllLimit: 50

# accessKey is the AWS accessKey to access dynamodb.
accessKey: "accessKey"
Expand Down
4 changes: 4 additions & 0 deletions deploy/packaging/argus_spruce.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ store:
# (Optional) default: 3
maxRetries: (( grab $AWS_RETRIES || "3" ))

# getAllLimit is the maximum number of items to get at a time.
# (Optional) defaults to no limit
getAllLimit: (( grab $AWS_GET_ALL_LIMIT || "50" ))

# accessKey is the AWS accessKey to access dynamodb.
accessKey: (( grab $AWS_ACCESS_KEY_ID || "accessKey" ))

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
emperror.dev/emperror v0.33.0
github.com/aws/aws-sdk-go v1.31.6
github.com/aws/aws-sdk-go v1.43.35
github.com/go-kit/kit v0.10.0
github.com/go-playground/validator/v10 v10.3.0
github.com/gocql/gocql v0.0.0-20200505093417-effcbd8bcf0e
Expand Down
48 changes: 15 additions & 33 deletions go.sum

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions store/db/metric/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (

// DynamoDB-specific metrics.
DynamodbConsumedCapacityCounter = "dynamodb_consumed_capacity_total"
DynamodbGetAllGauge = "dynamodb_get_all_results"
)

// Metric label keys.
Expand Down Expand Up @@ -90,6 +91,13 @@ func ProvideMetrics() fx.Option {
QueryTypeLabelKey,
DynamoCapacityOpLabelKey,
),

touchstone.Gauge(
prometheus.GaugeOpts{
Name: DynamodbGetAllGauge,
Help: "Amount of records returned for a GetAll dynamodb request.",
},
),
)
}

Expand All @@ -98,4 +106,5 @@ type Measures struct {
Queries *prometheus.CounterVec `name:"db_queries_total"`
QueryDurationSeconds prometheus.ObserverVec `name:"db_query_duration_seconds"`
DynamodbConsumedCapacity *prometheus.CounterVec `name:"dynamodb_consumed_capacity_total"`
DynamodbGetAllGauge prometheus.Gauge `name:"dynamodb_get_all_results"`
}
6 changes: 5 additions & 1 deletion store/dynamodb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type Config struct {
// (Optional) Defaults to 3.
MaxRetries int

// GetAllLimit is the maximum number of items to get at a time.
// (Optional) defaults to no limit
GetAllLimit int

// AccessKey is the AWS AccessKey credential.
AccessKey string `validate:"required"`

Expand Down Expand Up @@ -111,7 +115,7 @@ func NewDynamoDB(config Config, measures metric.Measures) (store.S, error) {
SecretAccessKey: config.SecretKey,
}))

svc, err := newService(awsConfig, "", config.Table)
svc, err := newService(awsConfig, "", config.Table, int64(config.GetAllLimit), &measures)
if err != nil {
return nil, err
}
Expand Down
45 changes: 39 additions & 6 deletions store/dynamodb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package dynamodb

import (
"errors"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -26,6 +28,11 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/xmidt-org/argus/model"
"github.com/xmidt-org/argus/store"
"github.com/xmidt-org/argus/store/db/metric"
)

var (
errNilMeasures = errors.New("measures cannot be nil")
)

// client captures the methods of interest from the dynamoDB API. This
Expand Down Expand Up @@ -55,7 +62,12 @@ type executor struct {
// tableName is the name of the dynamodb table
tableName string

// getAllLimit is the maximum number of records to return for a GetAll
getAllLimit int64

now func() time.Time

measures *metric.Measures
}

type storableItem struct {
Expand Down Expand Up @@ -184,8 +196,10 @@ func (d *executor) Delete(key model.Key) (store.OwnableItem, *dynamodb.ConsumedC
//TODO: For data >= 1MB, we'll need to handle pagination
func (d *executor) GetAll(bucket string) (map[string]store.OwnableItem, *dynamodb.ConsumedCapacity, error) {
result := map[string]store.OwnableItem{}
queryResult, err := d.c.Query(&dynamodb.QueryInput{
now := strconv.Itoa(int(d.now().Unix()))
input := &dynamodb.QueryInput{
TableName: aws.String(d.tableName),
IndexName: aws.String("Expires-index"),
KeyConditions: map[string]*dynamodb.Condition{
"bucket": {
ComparisonOperator: aws.String("EQ"),
Expand All @@ -195,9 +209,21 @@ func (d *executor) GetAll(bucket string) (map[string]store.OwnableItem, *dynamod
},
},
},
"expires": {
ComparisonOperator: aws.String("GT"),
AttributeValueList: []*dynamodb.AttributeValue{
{
N: &now,
},
},
},
},
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
})
}
if d.getAllLimit > 0 {
input.Limit = &d.getAllLimit
}
queryResult, err := d.c.Query(input)

var consumedCapacity *dynamodb.ConsumedCapacity
if queryResult != nil {
Expand All @@ -206,6 +232,7 @@ func (d *executor) GetAll(bucket string) (map[string]store.OwnableItem, *dynamod
if err != nil {
return map[string]store.OwnableItem{}, consumedCapacity, err
}
d.measures.DynamodbGetAllGauge.Set(float64(len(queryResult.Items)))

for _, i := range queryResult.Items {
item := new(storableItem)
Expand Down Expand Up @@ -236,7 +263,11 @@ func itemNotFound(item *storableItem) bool {
return item.Key.Bucket == "" || item.Key.ID == ""
}

func newService(config aws.Config, awsProfile string, tableName string) (service, error) {
func newService(config aws.Config, awsProfile string, tableName string, getAllLimit int64, measures *metric.Measures) (service, error) {
if measures == nil {
return nil, errNilMeasures
}

sess, err := session.NewSessionWithOptions(session.Options{
Config: config,
Profile: awsProfile,
Expand All @@ -248,8 +279,10 @@ func newService(config aws.Config, awsProfile string, tableName string) (service
}

return &executor{
c: dynamodb.New(sess),
tableName: tableName,
now: time.Now,
c: dynamodb.New(sess),
tableName: tableName,
getAllLimit: getAllLimit,
now: time.Now,
measures: measures,
}, nil
}
62 changes: 40 additions & 22 deletions store/dynamodb/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/argus/model"
"github.com/xmidt-org/argus/store"
"github.com/xmidt-org/argus/store/db/metric"
"github.com/xmidt-org/touchstone/touchtest"
)

var (
Expand Down Expand Up @@ -149,14 +153,45 @@ func TestGetAll(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.Description, func(t *testing.T) {
assert := assert.New(t)
m := new(mockClient)
client := new(mockClient)
testAssert := touchtest.New(t)
expectedRegistry := prometheus.NewPedanticRegistry()
expectedMeasures := &metric.Measures{
DynamodbGetAllGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "testGetAllGauge",
Help: "testGetAllGauge",
},
),
}
expectedRegistry.MustRegister(expectedMeasures.DynamodbGetAllGauge)
if tc.QueryOutput != nil {
expectedMeasures.DynamodbGetAllGauge.Set(float64(len(tc.QueryOutput.Items)))
}
actualRegistry := prometheus.NewPedanticRegistry()
m := &metric.Measures{
DynamodbGetAllGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "testGetAllGauge",
Help: "testGetAllGauge",
},
),
}
actualRegistry.MustRegister(m.DynamodbGetAllGauge)

svc := executor{
c: m,
tableName: "testTable",
now: nowFunc,
c: client,
tableName: "testTable",
getAllLimit: 10,
now: nowFunc,
measures: m,
}
m.On("Query", getQueryInput()).Return(tc.QueryOutput, tc.QueryErr)
client.On("Query", mock.Anything).Return(tc.QueryOutput, tc.QueryErr)
items, cc, err := svc.GetAll("testBucket")
testAssert.Expect(expectedRegistry)
assert.True(testAssert.GatherAndCompare(actualRegistry,
"testGetAllGauge"))

assert.Equal(tc.ExpectedItems, items)
assert.Equal(tc.ExpectedConsumedCapacity, cc)
assert.Equal(tc.ExpectedErr, err)
Expand Down Expand Up @@ -318,23 +353,6 @@ func getPutItemInput(key model.Key, item store.OwnableItem) *dynamodb.PutItemInp
}
}

func getQueryInput() *dynamodb.QueryInput {
return &dynamodb.QueryInput{
TableName: aws.String("testTable"),
KeyConditions: map[string]*dynamodb.Condition{
"bucket": {
ComparisonOperator: aws.String("EQ"),
AttributeValueList: []*dynamodb.AttributeValue{
{
S: aws.String("testBucket"),
},
},
},
},
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
}
}

func getFilteredQueryOutput(now time.Time, consumedCapacity *dynamodb.ConsumedCapacity) *dynamodb.QueryOutput {
pastExpiration := strconv.Itoa(int(now.Unix() - int64(time.Hour.Seconds())))
futureExpiration := strconv.Itoa(int(now.Add(time.Hour).Unix()))
Expand Down