Skip to content

Commit

Permalink
Implement workers_share_bucket
Browse files Browse the repository at this point in the history
Resolves #5
  • Loading branch information
mulbc committed Apr 9, 2020
1 parent d0c77f1 commit ba0840d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
23 changes: 12 additions & 11 deletions common/configFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,18 @@ type TestCaseConfiguration struct {
NumberLast uint64
NumberDistribution string `yaml:"number_distribution"`
} `yaml:"buckets"`
BucketPrefix string `yaml:"bucket_prefix"`
ObjectPrefix string `yaml:"object_prefix"`
Runtime time.Duration `yaml:"stop_with_runtime"`
OpsDeadline uint64 `yaml:"stop_with_ops"`
Workers int `yaml:"workers"`
ParallelClients int `yaml:"parallel_clients"`
CleanAfter bool `yaml:"clean_after"`
ReadWeight int `yaml:"read_weight"`
WriteWeight int `yaml:"write_weight"`
ListWeight int `yaml:"list_weight"`
DeleteWeight int `yaml:"delete_weight"`
BucketPrefix string `yaml:"bucket_prefix"`
ObjectPrefix string `yaml:"object_prefix"`
Runtime time.Duration `yaml:"stop_with_runtime"`
OpsDeadline uint64 `yaml:"stop_with_ops"`
Workers int `yaml:"workers"`
WorkerShareBuckets bool `yaml:"workers_share_buckets"`
ParallelClients int `yaml:"parallel_clients"`
CleanAfter bool `yaml:"clean_after"`
ReadWeight int `yaml:"read_weight"`
WriteWeight int `yaml:"write_weight"`
ListWeight int `yaml:"list_weight"`
DeleteWeight int `yaml:"delete_weight"`
}

// Testconf contains all the information necessary to set up a distributed test
Expand Down
18 changes: 11 additions & 7 deletions worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func connectToServer(serverAddress string) error {
log.Info("Got config from server - starting preparations now")

InitS3(*config.S3Config)
fillWorkqueue(config.Test, Workqueue, config.WorkerID)
fillWorkqueue(config.Test, Workqueue, config.WorkerID, config.Test.WorkerShareBuckets)

for _, work := range *Workqueue.Queue {
work.Prepare()
Expand Down Expand Up @@ -196,9 +196,13 @@ func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueu

bucketCount := common.EvaluateDistribution(testConfig.Buckets.NumberMin, testConfig.Buckets.NumberMax, &testConfig.Buckets.NumberLast, 1, testConfig.Buckets.NumberDistribution)
for bucket := uint64(0); bucket < bucketCount; bucket++ {
err := createBucket(housekeepingSvc, fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket))
bucketName := fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket)
if shareBucketName {
bucketName = fmt.Sprintf("%s%d", testConfig.BucketPrefix, bucket)
}
err := createBucket(housekeepingSvc, bucketName)
if err != nil {
log.WithError(err).WithField("bucket", fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket)).Error("Error when creating bucket")
log.WithError(err).WithField("bucket", bucketName).Error("Error when creating bucket")
}
objectCount := common.EvaluateDistribution(testConfig.Objects.NumberMin, testConfig.Objects.NumberMax, &testConfig.Objects.NumberLast, 1, testConfig.Objects.NumberDistribution)
for object := uint64(0); object < objectCount; object++ {
Expand All @@ -209,31 +213,31 @@ func fillWorkqueue(testConfig *common.TestCaseConfiguration, Workqueue *Workqueu
case "read":
IncreaseOperationValue(nextOp, 1/float64(testConfig.ReadWeight), Workqueue)
new := ReadOperation{
Bucket: fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket),
Bucket: bucketName,
ObjectName: fmt.Sprintf("%s%s%d", workerID, testConfig.ObjectPrefix, object),
ObjectSize: objectSize,
}
*Workqueue.Queue = append(*Workqueue.Queue, new)
case "write":
IncreaseOperationValue(nextOp, 1/float64(testConfig.WriteWeight), Workqueue)
new := WriteOperation{
Bucket: fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket),
Bucket: bucketName,
ObjectName: fmt.Sprintf("%s%s%d", workerID, testConfig.ObjectPrefix, object),
ObjectSize: objectSize,
}
*Workqueue.Queue = append(*Workqueue.Queue, new)
case "list":
IncreaseOperationValue(nextOp, 1/float64(testConfig.ListWeight), Workqueue)
new := ListOperation{
Bucket: fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket),
Bucket: bucketName,
ObjectName: fmt.Sprintf("%s%s%d", workerID, testConfig.ObjectPrefix, object),
ObjectSize: objectSize,
}
*Workqueue.Queue = append(*Workqueue.Queue, new)
case "delete":
IncreaseOperationValue(nextOp, 1/float64(testConfig.DeleteWeight), Workqueue)
new := DeleteOperation{
Bucket: fmt.Sprintf("%s%s%d", workerID, testConfig.BucketPrefix, bucket),
Bucket: bucketName,
ObjectName: fmt.Sprintf("%s%s%d", workerID, testConfig.ObjectPrefix, object),
ObjectSize: objectSize,
}
Expand Down

0 comments on commit ba0840d

Please sign in to comment.