Skip to content

Commit

Permalink
Add operation to read pre-existing objects
Browse files Browse the repository at this point in the history
This has been a popular demand...
To not break config files, I have opted to use bucket_prefix to determine the bucket name...
There will be people that do not like this ;)

Resolves #7
  • Loading branch information
mulbc committed Apr 9, 2020
1 parent b294965 commit 93bdfd4
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 14 deletions.
34 changes: 34 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,40 @@ docker pull quay.io/mulbc/goroom-worker
In the `k8s` folder you will find example files to deploy Gosbench on Openshift and Kubernetes.
Be sure to modify the ConfigMaps in `gosbench.yaml` to use your S3 endpoint credentials.

### Reading pre-existing files from buckets

Due to popular demand, reading pre-existing files have been added. You activate this special mode by setting `existing_read_weight` to something higher than 0.

There are some important things to consider though ;)

Just like with other operations, the `bucket_prefix` value will be evaluated to determine the bucket name to search for pre-existing objects.

**Example:** This is an excerpt of your config:

```yaml
objects:
size_min: 5
size_max: 100
part_size: 0
# distribution: constant, random, sequential
size_distribution: random
unit: KB
number_min: 10
number_max: 100
# distribution: constant, random, sequential
number_distribution: constant
buckets:
number_min: 2
number_max: 10
# distribution: constant, random, sequential
number_distribution: constant
bucket_prefix: myBucket-
```

Note: Due to the constant distribution, we will only consider the `_min` values.

This will cause each workers to search for pre-existing files in the buckets `myBucket-0` and `myBucket-1` and read 10 objects from these buckets. If there are less than 10 objects in any of these buckets, some objects will be read multiple times. The object size given in your config will be ignored when reading pre-existing files.

## Contributing

* Be aware that this repo uses pre-commit hooks - install them via `pre-commit install`
Expand Down
6 changes: 5 additions & 1 deletion common/configFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type TestCaseConfiguration struct {
ParallelClients int `yaml:"parallel_clients"`
CleanAfter bool `yaml:"clean_after"`
ReadWeight int `yaml:"read_weight"`
ExistingReadWeight int `yaml:"existing_read_weight"`
WriteWeight int `yaml:"write_weight"`
ListWeight int `yaml:"list_weight"`
DeleteWeight int `yaml:"delete_weight"`
Expand Down Expand Up @@ -108,9 +109,12 @@ func checkTestCase(testcase *TestCaseConfiguration) error {
if testcase.Runtime == 0 && testcase.OpsDeadline == 0 {
return fmt.Errorf("Either stop_with_runtime or stop_with_ops needs to be set")
}
if testcase.ReadWeight == 0 && testcase.WriteWeight == 0 && testcase.ListWeight == 0 && testcase.DeleteWeight == 0 {
if testcase.ReadWeight == 0 && testcase.WriteWeight == 0 && testcase.ListWeight == 0 && testcase.DeleteWeight == 0 && testcase.ExistingReadWeight == 0 {
return fmt.Errorf("At least one weight needs to be set - Read / Write / List / Delete")
}
if testcase.ExistingReadWeight != 0 && testcase.BucketPrefix == "" {
return fmt.Errorf("When using existing_read_weight, setting the bucket_prefix is mandatory")
}
if testcase.Buckets.NumberMin == 0 {
return fmt.Errorf("Please set minimum number of Buckets")
}
Expand Down
29 changes: 29 additions & 0 deletions common/configFile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,35 @@ func Test_checkTestCase(t *testing.T) {
NumberDistribution: "constant",
Unit: "XB",
}}}, true},
{"Existing object read without bucket prefix", args{&TestCaseConfiguration{Runtime: time.Second, OpsDeadline: 10, ExistingReadWeight: 1,
Buckets: struct {
NumberMin uint64 `yaml:"number_min"`
NumberMax uint64 `yaml:"number_max"`
NumberLast uint64
NumberDistribution string `yaml:"number_distribution"`
}{
NumberMin: 1,
NumberDistribution: "constant",
},
Objects: struct {
SizeMin uint64 `yaml:"size_min"`
SizeMax uint64 `yaml:"size_max"`
PartSize uint64 `yaml:"part_size"`
SizeLast uint64
SizeDistribution string `yaml:"size_distribution"`
NumberMin uint64 `yaml:"number_min"`
NumberMax uint64 `yaml:"number_max"`
NumberLast uint64
NumberDistribution string `yaml:"number_distribution"`
Unit string `yaml:"unit"`
}{
SizeMin: 1,
SizeMax: 2,
NumberMin: 3,
SizeDistribution: "constant",
NumberDistribution: "constant",
Unit: "XB",
}}}, true},
{"All good", args{&TestCaseConfiguration{Runtime: time.Second, OpsDeadline: 10, ReadWeight: 1,
Buckets: struct {
NumberMin uint64 `yaml:"number_min"`
Expand Down
1 change: 1 addition & 0 deletions examples/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ grafana_config:

tests:
- read_weight: 20
existing_read_weight: 0
write_weight: 80
delete_weight: 0
list_weight: 0
Expand Down
33 changes: 29 additions & 4 deletions worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"time"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/mulbc/gosbench/common"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -179,11 +180,14 @@ func workUntilOps(Workqueue *Workqueue, workChannel chan WorkItem, maxOps uint64
}
}

func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue, workerID string) {
func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue, workerID string, shareBucketName bool) {

if testConfig.ReadWeight > 0 {
Workqueue.OperationValues = append(Workqueue.OperationValues, KV{Key: "read"})
}
if testConfig.ExistingReadWeight > 0 {
Workqueue.OperationValues = append(Workqueue.OperationValues, KV{Key: "existing_read"})
}
if testConfig.WriteWeight > 0 {
Workqueue.OperationValues = append(Workqueue.OperationValues, KV{Key: "write"})
}
Expand All @@ -204,6 +208,16 @@ func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueu
if err != nil {
log.WithError(err).WithField("bucket", bucketName).Error("Error when creating bucket")
}
var PreExistingObjects *s3.ListObjectsOutput
var PreExistingObjectCount uint64
if testConfig.ExistingReadWeight > 0 {
PreExistingObjects, err = listObjects(housekeepingSvc, "", bucketName)
PreExistingObjectCount = uint64(len(PreExistingObjects.Contents))
log.Debugf("Found %d objects in bucket %s", PreExistingObjectCount, bucketName)
if err != nil {
log.WithError(err).Fatalf("Problems when listing contents of bucket %s", bucketName)
}
}
objectCount := common.EvaluateDistribution(testConfig.Objects.NumberMin, testConfig.Objects.NumberMax, &testConfig.Objects.NumberLast, 1, testConfig.Objects.NumberDistribution)
for object := uint64(0); object < objectCount; object++ {
objectSize := common.EvaluateDistribution(testConfig.Objects.SizeMin, testConfig.Objects.SizeMax, &testConfig.Objects.SizeLast, 1, testConfig.Objects.SizeDistribution)
Expand All @@ -213,9 +227,20 @@ func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueu
case "read":
IncreaseOperationValue(nextOp, 1/float64(testConfig.ReadWeight), Workqueue)
new := ReadOperation{
Bucket: bucketName,
ObjectName: fmt.Sprintf("%s%s%d", workerID, testConfig.ObjectPrefix, object),
ObjectSize: objectSize,
Bucket: bucketName,
ObjectName: fmt.Sprintf("%s%s%d", workerID, testConfig.ObjectPrefix, object),
ObjectSize: objectSize,
WorksOnPreexistingObject: false,
}
*Workqueue.Queue = append(*Workqueue.Queue, new)
case "existing_read":
IncreaseOperationValue(nextOp, 1/float64(testConfig.ExistingReadWeight), Workqueue)
new := ReadOperation{
// TODO: Get bucket and object that already exist
Bucket: bucketName,
ObjectName: *PreExistingObjects.Contents[object%PreExistingObjectCount].Key,
ObjectSize: uint64(*PreExistingObjects.Contents[object%PreExistingObjectCount].Size),
WorksOnPreexistingObject: true,
}
*Workqueue.Queue = append(*Workqueue.Queue, new)
case "write":
Expand Down
6 changes: 3 additions & 3 deletions worker/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func getObjectProperties(service *s3.S3, objectName string, bucket string) {

log.Debugf("Object Properties:\n%+v", result)
}
func listObjects(service *s3.S3, prefix string, bucket string) error {
_, err := service.ListObjects(&s3.ListObjectsInput{
func listObjects(service *s3.S3, prefix string, bucket string) (*s3.ListObjectsOutput, error) {
result, err := service.ListObjects(&s3.ListObjectsInput{
Bucket: &bucket,
Prefix: &prefix,
})
Expand All @@ -157,7 +157,7 @@ func listObjects(service *s3.S3, prefix string, bucket string) error {
log.WithError(aerr).Errorf("Could not find prefix %s in bucket %s when querying properties", prefix, bucket)
}
}
return err
return result, err
}

func getObject(service *s3.S3, objectName string, bucket string) error {
Expand Down
21 changes: 15 additions & 6 deletions worker/workItems.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ type WorkItem interface {

// ReadOperation stands for a read operation
type ReadOperation struct {
Bucket string
ObjectName string
ObjectSize uint64
Bucket string
ObjectName string
ObjectSize uint64
WorksOnPreexistingObject bool
}

// WriteOperation stands for a write operation
Expand Down Expand Up @@ -96,7 +97,10 @@ func IncreaseOperationValue(operation string, value float64, Queue *Workqueue) e

// Prepare prepares the execution of the ReadOperation
func (op ReadOperation) Prepare() error {
log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).Debug("Preparing ReadOperation")
log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).WithField("Preexisting?", op.WorksOnPreexistingObject).Debug("Preparing ReadOperation")
if op.WorksOnPreexistingObject {
return nil
}
return putObject(housekeepingSvc, op.ObjectName, bytes.NewReader(generateRandomBytes(op.ObjectSize)), op.Bucket)
}

Expand Down Expand Up @@ -125,7 +129,7 @@ func (op Stopper) Prepare() error {

// Do executes the actual work of the ReadOperation
func (op ReadOperation) Do() error {
log.Debug("Doing ReadOperation")
log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).WithField("Preexisting?", op.WorksOnPreexistingObject).Debug("Doing ReadOperation")
return getObject(svc, op.ObjectName, op.Bucket)
}

Expand All @@ -138,7 +142,8 @@ func (op WriteOperation) Do() error {
// Do executes the actual work of the ListOperation
func (op ListOperation) Do() error {
log.Debug("Doing ListOperation")
return listObjects(svc, op.ObjectName, op.Bucket)
_, err := listObjects(svc, op.ObjectName, op.Bucket)
return err
}

// Do executes the actual work of the DeleteOperation
Expand All @@ -154,6 +159,10 @@ func (op Stopper) Do() error {

// Clean removes the objects and buckets left from the previous ReadOperation
func (op ReadOperation) Clean() error {
if op.WorksOnPreexistingObject {
return nil
}
log.WithField("bucket", op.Bucket).WithField("object", op.ObjectName).WithField("Preexisting?", op.WorksOnPreexistingObject).Debug("Cleaning up ReadOperation")
return deleteObject(housekeepingSvc, op.ObjectName, op.Bucket)
}

Expand Down

0 comments on commit 93bdfd4

Please sign in to comment.