From c968db452b24107095a3ca93cee48e135338716a Mon Sep 17 00:00:00 2001 From: y0sher Date: Wed, 28 Aug 2024 20:39:59 +0300 Subject: [PATCH 01/10] test: sign with the same validator in parallel on the same time. --- signer/sign_attestation_test.go | 51 +++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/signer/sign_attestation_test.go b/signer/sign_attestation_test.go index f381e228..1739fb71 100644 --- a/signer/sign_attestation_test.go +++ b/signer/sign_attestation_test.go @@ -73,6 +73,57 @@ func TestReferenceAttestation(t *testing.T) { require.EqualValues(t, sig, actualSig) } +// tested against a block and sig generated from https://github.com/prysmaticlabs/prysm/blob/master/shared/testutil/block.go#L357 +func TestLockSameValidatorInParallel(t *testing.T) { + sk := _byteArray("2c083f2c8fc923fa2bd32a70ab72b4b46247e8c1f347adc30b2f8036a355086c") + pk := _byteArray("a9cf360aa15fb1d1d30ee2b578dc5884823c19661886ae8b892775ccb3bd96b7d7345569a2aa0b14e4d015c54a6a0c54") + domain := _byteArray32("0100000081509579e35e84020ad8751eca180b44df470332d3ad17fc6fd52459") + + store := inmemStorage() + options := ð2keymanager.KeyVaultOptions{} + options.SetStorage(store) + options.SetWalletType(core.NDWallet) + vault, err := eth2keymanager.NewKeyVault(options) + require.NoError(t, err) + wallet, err := vault.Wallet() + require.NoError(t, err) + + k, err := core.NewHDKeyFromPrivateKey(sk, "") + require.NoError(t, err) + acc := wallets.NewValidatorAccount("1", k, nil, "", vault.Context) + require.NoError(t, err) + require.NoError(t, wallet.AddValidatorAccount(acc)) + + //// setup signer + signer := NewSimpleSigner(wallet, &prot.NoProtection{}, core.MainNetwork) + + attestationDataByts := _byteArray("000000000000000000000000000000003a43a4bf26fb5947e809c1f24f7dc6857c8ac007e535d48e6e4eca2122fd776b0000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000003a43a4bf26fb5947e809c1f24f7dc6857c8ac007e535d48e6e4eca2122fd776b") + + // decode attestation + attData := &phase0.AttestationData{} + require.NoError(t, attData.UnmarshalSSZ(attestationDataByts)) + + go func() { + _, _, err := signer.SignBeaconAttestation(attData, phase0.Domain{0}, pk) + require.NoError(t, err) + + }() + + ch := make(chan struct{}) + + go func() { + _, _, err := signer.SignBeaconAttestation(attData, domain, pk) + close(ch) + require.NoError(t, err) + }() + + select { + case <-ch: + case <-time.After(200 * time.Millisecond): + t.Fatal("timeout") + } +} + func TestAttestationSlashingSignatures(t *testing.T) { t.Run("valid attestation, sign using public key", func(t *testing.T) { seed, _ := hex.DecodeString("0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1fff") From 6d5527518d0b4e6369f0c6c12895231ae8b0be61 Mon Sep 17 00:00:00 2001 From: y0sher Date: Wed, 28 Aug 2024 20:51:59 +0300 Subject: [PATCH 02/10] fix: return lock instead of locking inside `lock` function to prevent deadlock. --- signer/sign_attestation.go | 5 +++-- signer/sign_block.go | 5 +++-- signer/sign_sync_committee.go | 15 +++++++++------ signer/validator_signer.go | 16 +++------------- 4 files changed, 18 insertions(+), 23 deletions(-) diff --git a/signer/sign_attestation.go b/signer/sign_attestation.go index 7b6ce165..d03603e2 100644 --- a/signer/sign_attestation.go +++ b/signer/sign_attestation.go @@ -19,9 +19,10 @@ func (signer *SimpleSigner) SignBeaconAttestation(attestation *phase0.Attestatio } // 2. lock for current account - signer.lock(account.ID(), "attestation") + val := signer.lock(account.ID(), "attestation") + val.Lock() defer func() { - signer.unlock(account.ID(), "attestation") + val.Unlock() }() // 3. far future check diff --git a/signer/sign_block.go b/signer/sign_block.go index 1817d188..616a9369 100644 --- a/signer/sign_block.go +++ b/signer/sign_block.go @@ -23,8 +23,9 @@ func (signer *SimpleSigner) SignBlock(block ssz.HashRoot, slot phase0.Slot, doma } // 2. lock for current account - signer.lock(account.ID(), "proposal") - defer signer.unlock(account.ID(), "proposal") + val := signer.lock(account.ID(), "proposal") + val.Lock() + defer val.Unlock() // 3. far future check if !IsValidFarFutureSlot(signer.network, slot) { diff --git a/signer/sign_sync_committee.go b/signer/sign_sync_committee.go index 7f2b0492..1d670045 100644 --- a/signer/sign_sync_committee.go +++ b/signer/sign_sync_committee.go @@ -21,8 +21,9 @@ func (signer *SimpleSigner) SignSyncCommittee(msgBlockRoot []byte, domain phase0 } // 2. lock for current account - signer.lock(account.ID(), "sync_committee") - defer signer.unlock(account.ID(), "sync_committee") + val := signer.lock(account.ID(), "sync_committee") + val.Lock() + defer val.Unlock() // 3. sign sszRoot := SSZBytes(msgBlockRoot) @@ -51,8 +52,9 @@ func (signer *SimpleSigner) SignSyncCommitteeSelectionData(data *altair.SyncAggr } // 2. lock for current account - signer.lock(account.ID(), "sync_committee_selection_data") - defer signer.unlock(account.ID(), "sync_committee_selection_data") + val := signer.lock(account.ID(), "sync_committee_selection_data") + val.Lock() + defer val.Unlock() // 3. sign if data == nil { @@ -83,8 +85,9 @@ func (signer *SimpleSigner) SignSyncCommitteeContributionAndProof(contribAndProo } // 2. lock for current account - signer.lock(account.ID(), "sync_committee_selection_and_proof") - defer signer.unlock(account.ID(), "sync_committee_selection_and_proof") + val := signer.lock(account.ID(), "sync_committee_selection_and_proof") + val.Lock() + defer val.Unlock() // 3. sign if contribAndProof == nil { diff --git a/signer/validator_signer.go b/signer/validator_signer.go index a4e12b9c..b5e0d7c5 100644 --- a/signer/validator_signer.go +++ b/signer/validator_signer.go @@ -51,26 +51,16 @@ func NewSimpleSigner(wallet core.Wallet, slashingProtector core.SlashingProtecto } // lock locks signer -func (signer *SimpleSigner) lock(accountID uuid.UUID, operation string) { +func (signer *SimpleSigner) lock(accountID uuid.UUID, operation string) *sync.RWMutex { signer.mapLock.Lock() defer signer.mapLock.Unlock() k := accountID.String() + "_" + operation if val, ok := signer.signLocks[k]; ok { - val.Lock() + return val } else { signer.signLocks[k] = &sync.RWMutex{} - signer.signLocks[k].Lock() - } -} - -func (signer *SimpleSigner) unlock(accountID uuid.UUID, operation string) { - signer.mapLock.RLock() - defer signer.mapLock.RUnlock() - - k := accountID.String() + "_" + operation - if val, ok := signer.signLocks[k]; ok { - val.Unlock() + return signer.signLocks[k] } } From a9949f2961f3a48734d1bd20fb5b4f2517c1ecf8 Mon Sep 17 00:00:00 2001 From: y0sher Date: Thu, 29 Aug 2024 11:42:46 +0300 Subject: [PATCH 03/10] make account context thread-safe --- wallets/account.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/wallets/account.go b/wallets/account.go index fe2e0974..cea23c7f 100644 --- a/wallets/account.go +++ b/wallets/account.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "encoding/json" "strings" + "sync" "github.com/google/uuid" "github.com/pkg/errors" @@ -21,6 +22,7 @@ type HDAccount struct { id uuid.UUID validationKey *core.HDKey withdrawalPubKey []byte + contextMtx *sync.RWMutex context *core.WalletContext } @@ -122,6 +124,7 @@ func NewValidatorAccount( validationKey: validationKey, withdrawalPubKey: withdrawalPubKey, basePath: basePath, + contextMtx: &sync.RWMutex{}, context: context, } } @@ -161,7 +164,7 @@ func (account *HDAccount) GetDepositData() (map[string]interface{}, error) { depositData, root, err := eth1deposit.DepositData( account.validationKey, account.withdrawalPubKey, - account.context.Storage.Network(), + account.GetContext().Storage.Network(), eth1deposit.MaxEffectiveBalanceInGwei, ) if err != nil { @@ -173,11 +176,20 @@ func (account *HDAccount) GetDepositData() (map[string]interface{}, error) { "signature": strings.TrimPrefix(depositData.Signature.String(), "0x"), "withdrawalCredentials": hex.EncodeToString(depositData.WithdrawalCredentials), "depositDataRoot": hex.EncodeToString(root[:]), - "depositContractAddress": account.context.Storage.Network().DepositContractAddress(), + "depositContractAddress": account.GetContext().Storage.Network().DepositContractAddress(), }, nil } // SetContext is the context setter func (account *HDAccount) SetContext(ctx *core.WalletContext) { + account.contextMtx.Lock() + defer account.contextMtx.Unlock() account.context = ctx } + +// SetContext is the context setter +func (account *HDAccount) GetContext() *core.WalletContext { + account.contextMtx.RLock() + defer account.contextMtx.RUnlock() + return account.context +} From b5848884a7a503cc6125cbd02ce2b2d097b2d124 Mon Sep 17 00:00:00 2001 From: y0sher Date: Thu, 29 Aug 2024 12:10:06 +0300 Subject: [PATCH 04/10] use value mutex to not initialize --- wallets/account.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/wallets/account.go b/wallets/account.go index cea23c7f..a7c12ed3 100644 --- a/wallets/account.go +++ b/wallets/account.go @@ -22,7 +22,7 @@ type HDAccount struct { id uuid.UUID validationKey *core.HDKey withdrawalPubKey []byte - contextMtx *sync.RWMutex + contextMtx sync.RWMutex context *core.WalletContext } @@ -124,7 +124,6 @@ func NewValidatorAccount( validationKey: validationKey, withdrawalPubKey: withdrawalPubKey, basePath: basePath, - contextMtx: &sync.RWMutex{}, context: context, } } From ce7241561e0ff472beb1021ba3c04acf6aa2a1f1 Mon Sep 17 00:00:00 2001 From: y0sher Date: Tue, 22 Oct 2024 15:50:52 +0300 Subject: [PATCH 05/10] fix golangci-lint issues --- cli/cmd/wallet/cmd/account/handler/handler_create.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cli/cmd/wallet/cmd/account/handler/handler_create.go b/cli/cmd/wallet/cmd/account/handler/handler_create.go index 3c7ea7e3..b7940e55 100644 --- a/cli/cmd/wallet/cmd/account/handler/handler_create.go +++ b/cli/cmd/wallet/cmd/account/handler/handler_create.go @@ -276,13 +276,13 @@ func ValidateHighestValues(accountFlagValues CreateAccountFlagValues) error { privateKeysCount := len(accountFlagValues.privateKeys) if len(accountFlagValues.highestSources) != privateKeysCount { - return errors.Errorf("highest sources " + errorExplain) + return errors.Errorf("highest sources: %v", errorExplain) } if len(accountFlagValues.highestTargets) != privateKeysCount { - return errors.Errorf("highest targets " + errorExplain) + return errors.Errorf("highest targets: %v", errorExplain) } if len(accountFlagValues.highestProposals) != privateKeysCount { - return errors.Errorf("highest proposals " + errorExplain) + return errors.Errorf("highest proposals: %v", errorExplain) } } else if accountFlagValues.accumulate { if len(accountFlagValues.highestSources) != (accountFlagValues.index + 1) { From 9d2f3ea88e3f804db4b5077ac5579255a84136da Mon Sep 17 00:00:00 2001 From: y0sher Date: Tue, 22 Oct 2024 15:56:02 +0300 Subject: [PATCH 06/10] fix test --- cli/cmd/wallet/cmd/account/handler/handler_create.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cli/cmd/wallet/cmd/account/handler/handler_create.go b/cli/cmd/wallet/cmd/account/handler/handler_create.go index b7940e55..2bd09602 100644 --- a/cli/cmd/wallet/cmd/account/handler/handler_create.go +++ b/cli/cmd/wallet/cmd/account/handler/handler_create.go @@ -276,13 +276,13 @@ func ValidateHighestValues(accountFlagValues CreateAccountFlagValues) error { privateKeysCount := len(accountFlagValues.privateKeys) if len(accountFlagValues.highestSources) != privateKeysCount { - return errors.Errorf("highest sources: %v", errorExplain) + return errors.Errorf("highest sources %v", errorExplain) } if len(accountFlagValues.highestTargets) != privateKeysCount { - return errors.Errorf("highest targets: %v", errorExplain) + return errors.Errorf("highest targets %v", errorExplain) } if len(accountFlagValues.highestProposals) != privateKeysCount { - return errors.Errorf("highest proposals: %v", errorExplain) + return errors.Errorf("highest proposals %v", errorExplain) } } else if accountFlagValues.accumulate { if len(accountFlagValues.highestSources) != (accountFlagValues.index + 1) { From 95aeb434b65f36eae5e740f890e6e8b06eb5c460 Mon Sep 17 00:00:00 2001 From: y0sher Date: Sun, 27 Oct 2024 16:31:44 +0200 Subject: [PATCH 07/10] defer without function wrapping --- signer/sign_attestation.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/signer/sign_attestation.go b/signer/sign_attestation.go index d03603e2..67c23001 100644 --- a/signer/sign_attestation.go +++ b/signer/sign_attestation.go @@ -21,9 +21,7 @@ func (signer *SimpleSigner) SignBeaconAttestation(attestation *phase0.Attestatio // 2. lock for current account val := signer.lock(account.ID(), "attestation") val.Lock() - defer func() { - val.Unlock() - }() + defer val.Unlock() // 3. far future check if !IsValidFarFutureEpoch(signer.network, attestation.Target.Epoch) { From 0678236cc0b502eb27855ae05e691fa86ad83520 Mon Sep 17 00:00:00 2001 From: y0sher Date: Sun, 27 Oct 2024 18:04:06 +0200 Subject: [PATCH 08/10] make test wait for both goroutines --- signer/sign_attestation_test.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/signer/sign_attestation_test.go b/signer/sign_attestation_test.go index 1739fb71..f780765f 100644 --- a/signer/sign_attestation_test.go +++ b/signer/sign_attestation_test.go @@ -103,25 +103,35 @@ func TestLockSameValidatorInParallel(t *testing.T) { attData := &phase0.AttestationData{} require.NoError(t, attData.UnmarshalSSZ(attestationDataByts)) + ch := make(chan struct{}) + go func() { _, _, err := signer.SignBeaconAttestation(attData, phase0.Domain{0}, pk) require.NoError(t, err) - + close(ch) }() - ch := make(chan struct{}) + ch2 := make(chan struct{}) go func() { _, _, err := signer.SignBeaconAttestation(attData, domain, pk) - close(ch) require.NoError(t, err) + close(ch2) + }() + select { + case <-ch2: + case <-time.After(200 * time.Millisecond): + t.Fatal("timeout") + } + select { case <-ch: case <-time.After(200 * time.Millisecond): t.Fatal("timeout") } + } func TestAttestationSlashingSignatures(t *testing.T) { From b8b6df7ca9bef8e588a453067e07614f6f2176e4 Mon Sep 17 00:00:00 2001 From: y0sher Date: Sun, 27 Oct 2024 18:04:53 +0200 Subject: [PATCH 09/10] typo --- wallets/account.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wallets/account.go b/wallets/account.go index a7c12ed3..37d6ba42 100644 --- a/wallets/account.go +++ b/wallets/account.go @@ -186,7 +186,7 @@ func (account *HDAccount) SetContext(ctx *core.WalletContext) { account.context = ctx } -// SetContext is the context setter +// GetContext is the context getter func (account *HDAccount) GetContext() *core.WalletContext { account.contextMtx.RLock() defer account.contextMtx.RUnlock() From 823f9479aea56eb99d07f5a77e59db0a15077979 Mon Sep 17 00:00:00 2001 From: moshe-blox Date: Sun, 27 Oct 2024 19:32:51 +0200 Subject: [PATCH 10/10] add many validator parallel test --- signer/sign_attestation_test.go | 112 ++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/signer/sign_attestation_test.go b/signer/sign_attestation_test.go index f780765f..df6657ca 100644 --- a/signer/sign_attestation_test.go +++ b/signer/sign_attestation_test.go @@ -3,6 +3,7 @@ package signer import ( "encoding/hex" "fmt" + "sync" "testing" "time" @@ -134,6 +135,117 @@ func TestLockSameValidatorInParallel(t *testing.T) { } +func TestManyValidatorsParallel(t *testing.T) { + type testValidator struct { + sk []byte + pk []byte + id string + } + + testValidators := []testValidator{ + { + sk: _byteArray("2c083f2c8fc923fa2bd32a70ab72b4b46247e8c1f347adc30b2f8036a355086c"), + pk: _byteArray("a9cf360aa15fb1d1d30ee2b578dc5884823c19661886ae8b892775ccb3bd96b7d7345569a2aa0b14e4d015c54a6a0c54"), + id: "1", + }, + { + sk: _byteArray("6327b1e58c41d60dd7c3c8b9634204255707c2d12e2513c345001d8926745eea"), + pk: _byteArray("954eb88ed1207f891dc3c28fa6cfdf8f53bf0ed3d838f3476c0900a61314d22d4f0a300da3cd010444dd5183e35a593c"), + id: "2", + }, + { + sk: _byteArray("5470813f7deef638dc531188ca89e36976d536f680e89849cd9077fd096e20bc"), + pk: _byteArray("a3862121db5914d7272b0b705e6e3c5336b79e316735661873566245207329c30f9a33d4fb5f5857fc6fd0a368186972"), + id: "3", + }, + } + + attestationDataByts := _byteArray("000000000000000000000000000000003a43a4bf26fb5947e809c1f24f7dc6857c8ac007e535d48e6e4eca2122fd776b0000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000003a43a4bf26fb5947e809c1f24f7dc6857c8ac007e535d48e6e4eca2122fd776b") + domain := _byteArray32("0100000081509579e35e84020ad8751eca180b44df470332d3ad17fc6fd52459") + + // setup KeyVault + store := inmemStorage() + options := ð2keymanager.KeyVaultOptions{} + options.SetStorage(store) + options.SetWalletType(core.NDWallet) + vault, err := eth2keymanager.NewKeyVault(options) + require.NoError(t, err) + wallet, err := vault.Wallet() + require.NoError(t, err) + + // create accounts + protector := prot.NewNormalProtection(store) + for i := range testValidators { + k, err := core.NewHDKeyFromPrivateKey(testValidators[i].sk, "") + require.NoError(t, err) + require.EqualValues(t, testValidators[i].pk, k.PublicKey().Serialize()) + + acc := wallets.NewValidatorAccount(testValidators[i].id, k, nil, "", vault.Context) + require.NoError(t, err) + require.EqualValues(t, testValidators[i].pk, acc.ValidatorPublicKey()) + require.NoError(t, wallet.AddValidatorAccount(acc)) + + // setup base attestation data + baseAttData := &phase0.AttestationData{} + require.NoError(t, baseAttData.UnmarshalSSZ(attestationDataByts)) + err = protector.UpdateHighestAttestation(acc.ValidatorPublicKey(), baseAttData) + require.NoError(t, err) + } + + // setup signer + signer := NewSimpleSigner(wallet, protector, core.PraterNetwork) + + // Sign attestation in parallel. + type validatorResult struct { + signs int + errs int + } + var validatorResults = map[string]*validatorResult{} + var mu sync.Mutex + for _, v := range testValidators { + validatorResults[string(v.pk)] = &validatorResult{} + } + + var wg sync.WaitGroup + const goroutinesPerValidator = 10 + for _, v := range testValidators { + v := v + for i := 0; i < goroutinesPerValidator; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + // decode attestation to be signed + attData := &phase0.AttestationData{} + require.NoError(t, attData.UnmarshalSSZ(attestationDataByts)) + attData.Slot += phase0.Slot(core.PraterNetwork.SlotsPerEpoch()) + attData.Source.Epoch++ + attData.Target.Epoch++ + + _, _, err := signer.SignBeaconAttestation(attData, domain, v.pk) + // require.EqualValues(t, sig, actualSig) + + mu.Lock() + defer mu.Unlock() + if err != nil { + validatorResults[string(v.pk)].errs++ + require.ErrorContains(t, err, "slashable attestation (HighestAttestationVote), not signing") + } else { + validatorResults[string(v.pk)].signs++ + } + }() + } + } + wg.Wait() + + for pk, v := range validatorResults { + t.Logf("pk: %x, signs: %d, errs: %d", []byte(pk), v.signs, v.errs) + + require.Equal(t, 1, v.signs) + require.Equal(t, goroutinesPerValidator-1, v.errs) + } +} + func TestAttestationSlashingSignatures(t *testing.T) { t.Run("valid attestation, sign using public key", func(t *testing.T) { seed, _ := hex.DecodeString("0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1fff")