Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix making too many create directory call #2828

Merged
merged 17 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 100 additions & 19 deletions ste/folderCreationTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,78 @@ import (
"github.com/Azure/azure-storage-azcopy/v10/common"
)

type TrieNode struct {
ashruti-msft marked this conversation as resolved.
Show resolved Hide resolved
children map[rune]*TrieNode
value *uint32
ashruti-msft marked this conversation as resolved.
Show resolved Hide resolved
isEnd bool
}

type Trie struct {
root *TrieNode
}

func NewTrie() *Trie {
return &Trie{
root: &TrieNode{children: make(map[rune]*TrieNode)},
}
}

func (t *Trie) Insert(key string, value uint32) {
node := t.root
for _, char := range key {
if _, exists := node.children[char]; !exists {
node.children[char] = &TrieNode{children: make(map[rune]*TrieNode)}
}
node = node.children[char]
ashruti-msft marked this conversation as resolved.
Show resolved Hide resolved
}
node.value = &value
node.isEnd = true
}

func (t *Trie) Get(key string) (*uint32, bool) {
node := t.root
for _, char := range key {
if _, exists := node.children[char]; !exists {
return nil, false
}
node = node.children[char]
}
if node.isEnd {
return node.value, true
}
return nil, false
}

func (t *Trie) Delete(key string) bool {
return t.deleteHelper(t.root, key, 0)
}

func (t *Trie) deleteHelper(node *TrieNode, key string, depth int) bool {
if node == nil {
return false
}

// If we have reached the end of the key
if depth == len(key) {
if !node.isEnd {
return false // Key does not exist
}
node.isEnd = false
node.value = nil

// If the node has no children, it can be deleted
return len(node.children) == 0
}

char := rune(key[depth])
if t.deleteHelper(node.children[char], key, depth+1) {
delete(node.children, char)
return !node.isEnd && len(node.children) == 0
}

return false
}

type FolderCreationTracker common.FolderCreationTracker

type JPPTCompatibleFolderCreationTracker interface {
Expand All @@ -23,7 +95,7 @@ func NewFolderCreationTracker(fpo common.FolderPropertyOption, plan *JobPartPlan
return &jpptFolderTracker{ // This prevents a dependency cycle. Reviewers: Are we OK with this? Can you think of a better way to do it?
plan: plan,
mu: &sync.Mutex{},
contents: make(map[string]uint32),
contents: NewTrie(),
unregisteredButCreated: make(map[string]struct{}),
}
case common.EFolderPropertiesOption.NoFolders():
Expand Down Expand Up @@ -55,19 +127,21 @@ func (f *nullFolderTracker) StopTracking(folder string) {
type jpptFolderTracker struct {
plan IJobPartPlanHeader
mu *sync.Mutex
contents map[string]uint32
contents *Trie
unregisteredButCreated map[string]struct{}
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
}

func (f *jpptFolderTracker) RegisterPropertiesTransfer(folder string, transferIndex uint32) {
f.mu.Lock()
defer f.mu.Unlock()

print("Registering folder: " + folder + "\n")

if folder == common.Dev_Null {
return // Never persist to dev-null
}

f.contents[folder] = transferIndex
f.contents.Insert(folder, transferIndex)

// We created it before it was enumerated-- Let's register that now.
if _, ok := f.unregisteredButCreated[folder]; ok {
Expand All @@ -85,9 +159,11 @@ func (f *jpptFolderTracker) CreateFolder(folder string, doCreation func() error)
return nil // Never persist to dev-null
}

if idx, ok := f.contents[folder]; ok &&
f.plan.Transfer(idx).TransferStatus() == (common.ETransferStatus.FolderCreated()) {
return nil
if idx, ok := f.contents.Get(folder); ok {
status := f.plan.Transfer(*idx).TransferStatus()
if status == (common.ETransferStatus.FolderCreated()) || status == (common.ETransferStatus.Success()) {
return nil
}
}

if _, ok := f.unregisteredButCreated[folder]; ok {
Expand All @@ -99,9 +175,9 @@ func (f *jpptFolderTracker) CreateFolder(folder string, doCreation func() error)
return err
}

if idx, ok := f.contents[folder]; ok {
if idx, ok := f.contents.Get(folder); ok {
// overwrite it's transfer status
f.plan.Transfer(idx).SetTransferStatus(common.ETransferStatus.FolderCreated(), false)
f.plan.Transfer(*idx).SetTransferStatus(common.ETransferStatus.FolderCreated(), false)
} else {
// A folder hasn't been hit in traversal yet.
// Recording it in memory is OK, because we *cannot* resume a job that hasn't finished traversal.
Expand All @@ -127,8 +203,8 @@ func (f *jpptFolderTracker) ShouldSetProperties(folder string, overwrite common.
defer f.mu.Unlock()

var created bool
if idx, ok := f.contents[folder]; ok {
created = f.plan.Transfer(idx).TransferStatus() == common.ETransferStatus.FolderCreated()
if idx, ok := f.contents.Get(folder); ok {
created = f.plan.Transfer(*idx).TransferStatus() == common.ETransferStatus.FolderCreated()
ashruti-msft marked this conversation as resolved.
Show resolved Hide resolved
} else {
// This should not happen, ever.
// Folder property jobs register with the tracker before they start getting processed.
Expand Down Expand Up @@ -167,17 +243,22 @@ func (f *jpptFolderTracker) StopTracking(folder string) {
return // Not possible to track this
}

//add a log mentioning we are in stotracking and folder is being deleted
ashruti-msft marked this conversation as resolved.
Show resolved Hide resolved
fmt.Println("In StopTracking, deleting folder: " + folder)

// no-op, because tracking is now handled by jppt, anyway.
if _, ok := f.contents[folder]; ok {
delete(f.contents, folder)
} else {
currentContents := ""
if f.contents != nil {
if _, ok := f.contents.Get(folder); ok {
f.contents.Delete(folder)
} else {
currentContents := ""

for k, v := range f.contents {
currentContents += fmt.Sprintf("K: %s V: %d\n", k, v)
}
for k, v := range f.contents.root.children {
currentContents += fmt.Sprintf("K: %c V: %v\n", k, v.value)
}

// double should never be hit, but *just in case*.
panic(common.NewAzCopyLogSanitizer().SanitizeLogMessage("Folder " + folder + " shouldn't finish tracking until it's been recorded\nCurrent Contents:\n" + currentContents))
// double should never be hit, but *just in case*.
panic(common.NewAzCopyLogSanitizer().SanitizeLogMessage("Folder " + folder + " shouldn't finish tracking until it's been recorded\nCurrent Contents:\n" + currentContents))
}
}
}
51 changes: 25 additions & 26 deletions ste/folderCreationTracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,26 @@ import (
// This is mocked to test the folder creation tracker
type mockedJPPH struct {
folderName []string
index []int
index []int
status []*JobPartPlanTransfer

}

func (jpph *mockedJPPH) CommandString() string { panic("Not implemented") }
func (jpph *mockedJPPH) GetRelativeSrcDstStrings(uint32) (string, string) { panic("Not implemented") }
func (jpph *mockedJPPH) JobPartStatus() common.JobStatus { panic("Not implemented") }
func (jpph *mockedJPPH) JobStatus() common.JobStatus { panic("Not implemented") }
func (jpph *mockedJPPH) SetJobPartStatus(common.JobStatus) { panic("Not implemented") }
func (jpph *mockedJPPH) SetJobStatus(common.JobStatus) { panic("Not implemented") }
func (jpph *mockedJPPH) Transfer(idx uint32) *JobPartPlanTransfer {
func (jpph *mockedJPPH) CommandString() string { panic("Not implemented") }
func (jpph *mockedJPPH) GetRelativeSrcDstStrings(uint32) (string, string) { panic("Not implemented") }
func (jpph *mockedJPPH) JobPartStatus() common.JobStatus { panic("Not implemented") }
func (jpph *mockedJPPH) JobStatus() common.JobStatus { panic("Not implemented") }
func (jpph *mockedJPPH) SetJobPartStatus(common.JobStatus) { panic("Not implemented") }
func (jpph *mockedJPPH) SetJobStatus(common.JobStatus) { panic("Not implemented") }
func (jpph *mockedJPPH) Transfer(idx uint32) *JobPartPlanTransfer {
return jpph.status[idx]
}
func (jpph *mockedJPPH) TransferSrcDstRelatives(uint32) (string, string) { panic("Not implemented") }
func (jpph *mockedJPPH) TransferSrcDstStrings(uint32) (string, string, bool) { panic("Not implemented") }
func (jpph *mockedJPPH) TransferSrcPropertiesAndMetadata(uint32) (common.ResourceHTTPHeaders, common.Metadata, blob.BlobType, blob.AccessTier, bool, bool, bool, common.InvalidMetadataHandleOption, common.EntityType, string, string, common.BlobTags) {
func (jpph *mockedJPPH) TransferSrcDstRelatives(uint32) (string, string) { panic("Not implemented") }
func (jpph *mockedJPPH) TransferSrcDstStrings(uint32) (string, string, bool) {
panic("Not implemented")
}
func (jpph *mockedJPPH) TransferSrcPropertiesAndMetadata(uint32) (common.ResourceHTTPHeaders, common.Metadata, blob.BlobType, blob.AccessTier, bool, bool, bool, common.InvalidMetadataHandleOption, common.EntityType, string, string, common.BlobTags) {
panic("Not implemented")
}


// This test verifies that when we call dir create for a directory, it is created only once,
// even if multiple routines request it to be created.
Expand All @@ -60,22 +60,21 @@ func TestFolderCreationTracker_directoryCreate(t *testing.T) {

// create a plan with one registered and one unregistered folder
folderReg := "folderReg"
folderUnReg := "folderUnReg"

folderUnReg := "folderUnReg"

plan := &mockedJPPH{
folderName: []string{folderReg, folderUnReg},
index: []int{0, 1},
status: []*JobPartPlanTransfer {
&JobPartPlanTransfer{atomicTransferStatus: common.ETransferStatus.NotStarted(),},
&JobPartPlanTransfer{atomicTransferStatus: common.ETransferStatus.NotStarted(),},
index: []int{0, 1},
status: []*JobPartPlanTransfer{
&JobPartPlanTransfer{atomicTransferStatus: common.ETransferStatus.NotStarted()},
&JobPartPlanTransfer{atomicTransferStatus: common.ETransferStatus.NotStarted()},
},
}

fct := &jpptFolderTracker{
fct := &jpptFolderTracker{
plan: plan,
mu: &sync.Mutex{},
contents: make(map[string]uint32),
contents: NewTrie(),
unregisteredButCreated: make(map[string]struct{}),
}

Expand All @@ -85,13 +84,13 @@ func TestFolderCreationTracker_directoryCreate(t *testing.T) {
// Multiple calls to create folderReg should execute create only once.
numOfCreations := int32(0)
var wg sync.WaitGroup
doCreation := func() error{
doCreation := func() error {
atomic.AddInt32(&numOfCreations, 1)
plan.status[0].atomicTransferStatus = common.ETransferStatus.FolderCreated()
return nil
}

ch := make(chan bool)
ch := make(chan bool)
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
Expand All @@ -100,15 +99,15 @@ func TestFolderCreationTracker_directoryCreate(t *testing.T) {
wg.Done()
}()
}
close(ch) // this will cause all above go rotuines to start creating folder
close(ch) // this will cause all above go routines to start creating folder

wg.Wait()
a.Equal(int32(1), numOfCreations)

// similar test for unregistered folder
numOfCreations = 0
ch = make(chan bool)
doCreation = func() error{
doCreation = func() error {
atomic.AddInt32(&numOfCreations, 1)
plan.status[1].atomicTransferStatus = common.ETransferStatus.FolderCreated()
return nil
Expand All @@ -126,4 +125,4 @@ func TestFolderCreationTracker_directoryCreate(t *testing.T) {
wg.Wait()
a.Equal(int32(1), numOfCreations)

}
}
2 changes: 1 addition & 1 deletion ste/xfer-anyToRemote-folder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func anyToRemote_folder(jptm IJobPartTransferMgr, info *TransferInfo, pacer pace
} else {

t := jptm.GetFolderCreationTracker()
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
defer t.StopTracking(s.DirUrlToString()) // don't need it after this routine
//defer t.StopTracking(s.DirUrlToString()) // don't need it after this routine
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
shouldSetProps := t.ShouldSetProperties(s.DirUrlToString(), jptm.GetOverwriteOption(), jptm.GetOverwritePrompter())
if !shouldSetProps {
jptm.LogAtLevelForCurrentTransfer(common.LogWarning, "Folder already exists, so due to the --overwrite option, its properties won't be set")
Expand Down
2 changes: 1 addition & 1 deletion ste/xfer-remoteToLocal-folder.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func remoteToLocal_folder(jptm IJobPartTransferMgr, pacer pacer, df downloaderFa

// no chunks to schedule. Just run the folder handling operations
t := jptm.GetFolderCreationTracker()
defer t.StopTracking(info.Destination) // don't need it after this routine
//defer t.StopTracking(info.Destination) // don't need it after this routine

err = common.CreateDirectoryIfNotExist(info.Destination, t) // we may create it here, or possible there's already a file transfer for the folder that has created it, or maybe it already existed before this job
if err != nil {
Expand Down
Loading