Skip to content

Commit

Permalink
fix race in ledger migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
SaveTheRbtz committed Apr 12, 2022
1 parent ecf3f14 commit c42159f
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions cmd/util/ledger/migrations/storage_used_update_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migrations

import (
"bufio"
"context"
"fmt"
"os"
"path"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/utils"
"github.com/onflow/flow-go/model/flow"
"golang.org/x/sync/errgroup"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -72,26 +74,27 @@ func (m *StorageUsedUpdateMigration) Migrate(payload []ledger.Payload) ([]ledger

storageUsed := make(map[string]uint64)
storageUsedChan := make(chan accountPayloadSize, workerCount)
payloadChan := make(chan indexedPayload)
payloadChan := make(chan indexedPayload, workerCount)
storageUsedPayloadChan := make(chan accountStorageUsedPayload, workerCount)
storageUsedPayload := make(map[string]int)

inputWG := &sync.WaitGroup{}
inputEG, ctx := errgroup.WithContext(context.Background())
outputWG := &sync.WaitGroup{}

outputWG.Add(1)
go func() {
defer outputWG.Done()
for payloadSize := range storageUsedChan {
if _, ok := storageUsed[payloadSize.Address]; !ok {
storageUsed[payloadSize.Address] = 0
}
storageUsed[payloadSize.Address] = storageUsed[payloadSize.Address] + payloadSize.StorageUsed
}
outputWG.Done()
}()

outputWG.Add(1)
go func() {
defer outputWG.Done()
for su := range storageUsedPayloadChan {
if _, ok := storageUsedPayload[su.Address]; ok {
m.Log.Error().
Expand All @@ -100,17 +103,15 @@ func (m *StorageUsedUpdateMigration) Migrate(payload []ledger.Payload) ([]ledger
}
storageUsedPayload[su.Address] = su.Index
}
outputWG.Done()
}()

for i := 0; i < workerCount; i++ {
inputWG.Add(1)
go func() {
inputEG.Go(func() error {
for p := range payloadChan {
var id flow.RegisterID
id, err = KeyToRegisterID(p.Payload.Key)
id, err := KeyToRegisterID(p.Payload.Key)
if err != nil {
log.Error().Err(err).Msg("error converting key to register ID")
return err
}
if len([]byte(id.Owner)) != flow.AddressLength {
// not an address
Expand All @@ -127,19 +128,21 @@ func (m *StorageUsedUpdateMigration) Migrate(payload []ledger.Payload) ([]ledger
StorageUsed: uint64(registerSize(id, p.Payload)),
}
}
inputWG.Done()
}()
return nil
})
}

Loop:
for i, p := range payload {
payloadChan <- indexedPayload{
Index: i,
Payload: p,
select {
case <-ctx.Done():
break Loop
case payloadChan <- indexedPayload{Index: i, Payload: p}:
}
}

close(payloadChan)
inputWG.Wait()
err = inputEG.Wait()
close(storageUsedChan)
close(storageUsedPayloadChan)
outputWG.Wait()
Expand Down

0 comments on commit c42159f

Please sign in to comment.