Skip to content

Commit

Permalink
Implement limit on max-concurrency. (#2032)
Browse files Browse the repository at this point in the history
Implement support to cap the total number of goroutines that can download same/different files concurrently.
  • Loading branch information
kislaykishore authored Jun 20, 2024
1 parent b3b62e0 commit 52b1e10
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 21 deletions.
6 changes: 5 additions & 1 deletion internal/cache/file/cache_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"path"
"path/filepath"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/operations"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"golang.org/x/sync/semaphore"
)

const CacheMaxSize = 100 * util.MiB
Expand Down Expand Up @@ -136,7 +138,7 @@ func (cht *cacheHandleTest) SetupTest() {
readLocalFileHandle, err := util.CreateFile(cht.fileSpec, os.O_RDONLY)
assert.Nil(cht.T(), err)

fileDownloadJob := downloader.NewJob(cht.object, cht.bucket, cht.cache, DefaultSequentialReadSizeMb, cht.fileSpec, func() {}, &config.FileCacheConfig{EnableCrcCheck: true, EnableParallelDownloads: false})
fileDownloadJob := downloader.NewJob(cht.object, cht.bucket, cht.cache, DefaultSequentialReadSizeMb, cht.fileSpec, func() {}, &config.FileCacheConfig{EnableCrcCheck: true, EnableParallelDownloads: false}, semaphore.NewWeighted(math.MaxInt64))

cht.cacheHandle = NewCacheHandle(readLocalFileHandle, fileDownloadJob, cht.cache, false, 0)
}
Expand Down Expand Up @@ -835,6 +837,7 @@ func (cht *cacheHandleTest) Test_Read_Sequential_Parallel_Download_True() {
cht.fileSpec,
func() {},
&config.FileCacheConfig{EnableCrcCheck: true, EnableParallelDownloads: true, DownloadParallelismPerFile: 2, ReadRequestSizeMB: 2},
semaphore.NewWeighted(math.MaxInt64),
)
cht.cacheHandle.fileDownloadJob = fileDownloadJob

Expand All @@ -861,6 +864,7 @@ func (cht *cacheHandleTest) Test_Read_Random_Parallel_Download_True() {
cht.fileSpec,
func() {},
&config.FileCacheConfig{EnableCrcCheck: true, EnableParallelDownloads: true, DownloadParallelismPerFile: 5, ReadRequestSizeMB: 2},
semaphore.NewWeighted(math.MaxInt64),
)
cht.cacheHandle.fileDownloadJob = fileDownloadJob

Expand Down
15 changes: 12 additions & 3 deletions internal/cache/file/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package downloader

import (
"math"
"os"

"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/data"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/config"
"github.com/googlecloudplatform/gcsfuse/v2/internal/locker"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"golang.org/x/sync/semaphore"
)

// JobManager is responsible for maintaining, getting and removing file download
Expand Down Expand Up @@ -55,19 +57,26 @@ type JobManager struct {
// concatenation of bucket name, "/", and object name. e.g. object path for an
// object named "a/b/foo.txt" in bucket named "test_bucket" would be
// "test_bucket/a/b/foo.txt"
jobs map[string]*Job
mu locker.Locker
jobs map[string]*Job
mu locker.Locker
maxParallelismSem *semaphore.Weighted
}

func NewJobManager(fileInfoCache *lru.Cache, filePerm os.FileMode, dirPerm os.FileMode,
cacheDir string, sequentialReadSizeMb int32, c *config.FileCacheConfig) (jm *JobManager) {
maxDownloadParallelism := int64(math.MaxInt64)
if c.MaxDownloadParallelism > 0 {
maxDownloadParallelism = int64(c.MaxDownloadParallelism)
}
jm = &JobManager{
fileInfoCache: fileInfoCache,
filePerm: filePerm,
dirPerm: dirPerm,
cacheDir: cacheDir,
sequentialReadSizeMb: sequentialReadSizeMb,
fileCacheConfig: c,
// Shared between jobs - Limits the overall concurrency of downloads.
maxParallelismSem: semaphore.NewWeighted(maxDownloadParallelism),
}
jm.mu = locker.New("JobManager", func() {})
jm.jobs = make(map[string]*Job)
Expand Down Expand Up @@ -105,7 +114,7 @@ func (jm *JobManager) CreateJobIfNotExists(object *gcs.MinObject, bucket gcs.Buc
removeJobCallback := func() {
jm.removeJob(object.Name, bucket.Name())
}
job = NewJob(object, bucket, jm.fileInfoCache, jm.sequentialReadSizeMb, fileSpec, removeJobCallback, jm.fileCacheConfig)
job = NewJob(object, bucket, jm.fileInfoCache, jm.sequentialReadSizeMb, fileSpec, removeJobCallback, jm.fileCacheConfig, jm.maxParallelismSem)
jm.jobs[objectPath] = job
return job
}
Expand Down
199 changes: 199 additions & 0 deletions internal/cache/file/downloader/jm_parallel_downloads_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package downloader

import (
"context"
"crypto/rand"
"math"
"os"
"path"
"testing"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/data"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util"
"github.com/googlecloudplatform/gcsfuse/v2/internal/config"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/stretchr/testify/assert"
)

func createObjectInBucket(t *testing.T, objPath string, objSize int64, bucket gcs.Bucket) []byte {
t.Helper()
objectContent := make([]byte, objSize)
_, err := rand.Read(objectContent)
if err != nil {
t.Fatalf("Error while generating random object content: %v", err)
}
_, err = storageutil.CreateObject(context.Background(), bucket, objPath, objectContent)
if err != nil {
t.Fatalf("Error while creating object in fakestorage: %v", err)
}
return objectContent
}

func configureFakeStorage(t *testing.T) storage.StorageHandle {
t.Helper()
fakeStorage := storage.NewFakeStorage()
t.Cleanup(func() { fakeStorage.ShutDown() })
return fakeStorage.CreateStorageHandle()
}

func configureCache(t *testing.T, maxSize int64) (*lru.Cache, string) {
t.Helper()
cache := lru.NewCache(uint64(maxSize))
cacheDir, err := os.MkdirTemp("", "gcsfuse_test")
if err != nil {
t.Fatalf("Error while creating the cache directory: %v", err)
}
t.Cleanup(func() { _ = os.RemoveAll(cacheDir) })
return cache, cacheDir
}

func createObjectInStoreAndInitCache(t *testing.T, cache *lru.Cache, bucket gcs.Bucket, objectName string, objectSize int64) (gcs.MinObject, []byte) {
t.Helper()
content := createObjectInBucket(t, objectName, objectSize, bucket)
minObj := getMinObject(objectName, bucket)
fileInfoKey := data.FileInfoKey{
BucketName: storage.TestBucketName,
ObjectName: objectName,
}
fileInfo := data.FileInfo{
Key: fileInfoKey,
ObjectGeneration: minObj.Generation,
FileSize: minObj.Size,
Offset: 0,
}
fileInfoKeyName, err := fileInfoKey.Key()
if err != nil {
t.Fatalf("Error occurred while retrieving fileInfoKey: %v", err)
}
_, err = cache.Insert(fileInfoKeyName, fileInfo)
if err != nil {
t.Fatalf("Error occurred while inserting fileinfo into cache: %v", err)
}
return minObj, content
}

func TestParallelDownloads(t *testing.T) {
tbl := []struct {
name string
objectSize int64
readReqSize int
downloadParallelismPerFile int
maxDownloadParallelism int
downloadOffset int64
expectedOffset int64
subscribedOffset int64
}{
{
name: "download in chunks of concurrency * readReqSize",
objectSize: 15 * util.MiB,
readReqSize: 4,
downloadParallelismPerFile: math.MaxInt,
maxDownloadParallelism: 3,
subscribedOffset: 7,
downloadOffset: 10,
expectedOffset: 12 * util.MiB,
},
{
name: "download only upto the object size",
objectSize: 10 * util.MiB,
readReqSize: 4,
downloadParallelismPerFile: math.MaxInt,
maxDownloadParallelism: 3,
subscribedOffset: 7,
downloadOffset: 10,
expectedOffset: 10 * util.MiB,
},
}
for _, tc := range tbl {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
cache, cacheDir := configureCache(t, 2*tc.objectSize)
storageHandle := configureFakeStorage(t)
bucket := storageHandle.BucketHandle(storage.TestBucketName, "")
minObj, content := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/foo.txt", tc.objectSize)
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, &config.FileCacheConfig{EnableParallelDownloads: true,
DownloadParallelismPerFile: tc.downloadParallelismPerFile, ReadRequestSizeMB: tc.readReqSize, EnableCrcCheck: true, MaxDownloadParallelism: tc.maxDownloadParallelism})
job := jm.CreateJobIfNotExists(&minObj, bucket)
subscriberC := job.subscribe(tc.subscribedOffset)

_, err := job.Download(context.Background(), 10, false)

timeout := time.After(1 * time.Second)
for {
select {
case jobStatus := <-subscriberC:
if assert.Nil(t, err) {
assert.Equal(t, tc.expectedOffset, jobStatus.Offset)
verifyFileTillOffset(t,
data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/foo.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm}, tc.expectedOffset,
content)
}
return
case <-timeout:
assert.Fail(t, "Test timed out")
return
}
}
})
}
}

func TestMultipleConcurrentDownloads(t *testing.T) {
t.Parallel()
storageHandle := configureFakeStorage(t)
cache, cacheDir := configureCache(t, 30*util.MiB)
bucket := storageHandle.BucketHandle(storage.TestBucketName, "")
minObj1, content1 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/foo.txt", 10*util.MiB)
minObj2, content2 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/bar.txt", 5*util.MiB)
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, &config.FileCacheConfig{EnableParallelDownloads: true,
DownloadParallelismPerFile: math.MaxInt, ReadRequestSizeMB: 2, EnableCrcCheck: true, MaxDownloadParallelism: 2})
job1 := jm.CreateJobIfNotExists(&minObj1, bucket)
job2 := jm.CreateJobIfNotExists(&minObj2, bucket)
s1 := job1.subscribe(10 * util.MiB)
s2 := job2.subscribe(5 * util.MiB)
ctx := context.Background()

_, err1 := job1.Download(ctx, 10*util.MiB, false)
_, err2 := job2.Download(ctx, 5*util.MiB, false)

notif1, notif2 := false, false
timeout := time.After(1 * time.Second)
for {
select {
case <-s1:
notif1 = true
case <-s2:
notif2 = true
case <-timeout:
assert.Fail(t, "Test timed out")
return
}
if assert.Nil(t, err1) && assert.Nil(t, err2) && notif1 && notif2 {
verifyFileTillOffset(t,
data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/foo.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm},
10*util.MiB, content1)
verifyFileTillOffset(t,
data.FileSpec{Path: util.GetDownloadPath(path.Join(cacheDir, storage.TestBucketName), "path/in/gcs/bar.txt"), FilePerm: util.DefaultFilePerm, DirPerm: util.DefaultDirPerm},
5*util.MiB, content2)
return
}
}
}
6 changes: 6 additions & 0 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/util"
"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
)

type jobStatusName string
Expand Down Expand Up @@ -87,6 +88,9 @@ type Job struct {
removeJobCallback func()

mu locker.Locker
// This semaphore is shared across all jobs spawned by the job manager and is
// used to limit the download concurrency.
maxParallelismSem *semaphore.Weighted
}

// JobStatus represents the status of job.
Expand All @@ -111,6 +115,7 @@ func NewJob(
fileSpec data.FileSpec,
removeJobCallback func(),
fileCacheConfig *config.FileCacheConfig,
maxParallelismSem *semaphore.Weighted,
) (job *Job) {
job = &Job{
object: object,
Expand All @@ -120,6 +125,7 @@ func NewJob(
fileSpec: fileSpec,
removeJobCallback: removeJobCallback,
fileCacheConfig: fileCacheConfig,
maxParallelismSem: maxParallelismSem,
}
job.mu = locker.New("Job-"+fileSpec.Path, job.checkInvariants)
job.init()
Expand Down
21 changes: 4 additions & 17 deletions internal/cache/file/downloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"errors"
"fmt"
"math"
"os"
"path"
"reflect"
Expand All @@ -31,10 +32,10 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
testutil "github.com/googlecloudplatform/gcsfuse/v2/internal/util"
. "github.com/jacobsa/ogletest"
"golang.org/x/sync/semaphore"
)

////////////////////////////////////////////////////////////////////////
Expand All @@ -45,34 +46,20 @@ const CacheMaxSize = 50
const DefaultObjectName = "foo"
const DefaultSequentialReadSizeMb = 100

func (dt *downloaderTest) getMinObject(objectName string) gcs.MinObject {
ctx := context.Background()
minObject, _, err := dt.bucket.StatObject(ctx, &gcs.StatObjectRequest{Name: objectName,
ForceFetchFromGcs: true})
if err != nil {
panic(fmt.Errorf("error whlie stating object: %w", err))
}

if minObject != nil {
return *minObject
}
return gcs.MinObject{}
}

func (dt *downloaderTest) initJobTest(objectName string, objectContent []byte, sequentialReadSize int32, lruCacheSize uint64, removeCallback func()) {
ctx := context.Background()
objects := map[string][]byte{objectName: objectContent}
err := storageutil.CreateObjects(ctx, dt.bucket, objects)
AssertEq(nil, err)
dt.object = dt.getMinObject(objectName)
dt.object = getMinObject(objectName, dt.bucket)
dt.fileSpec = data.FileSpec{
Path: dt.fileCachePath(dt.bucket.Name(), dt.object.Name),
FilePerm: util.DefaultFilePerm,
DirPerm: util.DefaultDirPerm,
}
dt.cache = lru.NewCache(lruCacheSize)

dt.job = NewJob(&dt.object, dt.bucket, dt.cache, sequentialReadSize, dt.fileSpec, removeCallback, dt.defaultFileCacheConfig)
dt.job = NewJob(&dt.object, dt.bucket, dt.cache, sequentialReadSize, dt.fileSpec, removeCallback, dt.defaultFileCacheConfig, semaphore.NewWeighted(math.MaxInt64))
fileInfoKey := data.FileInfoKey{
BucketName: storage.TestBucketName,
ObjectName: objectName,
Expand Down
Loading

0 comments on commit 52b1e10

Please sign in to comment.