Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache Active Validator Indices, Count, and Balances #2737

Merged
merged 38 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
130023e
benchmark process epoch
terencechain May 31, 2019
7950dcf
revert prof.out
terencechain May 31, 2019
3cb25ed
Add some optimizations
prestonvanloon May 31, 2019
9a2b1ab
beware where we use ActiveValidatorIndices...
terencechain Jun 1, 2019
7813441
revert extra file
terencechain Jun 1, 2019
f36b777
gaz
terencechain Jun 1, 2019
63a769f
quick commit to get feedback
terencechain Jun 1, 2019
079ffc5
revert extra file
terencechain Jun 1, 2019
0383d4d
started fixing tests
terencechain Jun 2, 2019
7e8ed6f
fixed broken TestProcessCrosslink_NoUpdate
terencechain Jun 2, 2019
804c0a1
Merge branch 'spec-v0.6' of https://github.com/prysmaticlabs/prysm in…
terencechain Jun 2, 2019
0192b9b
gaz
terencechain Jun 2, 2019
4522f77
cache randao seed
terencechain Jun 2, 2019
2e8bc92
fixed all the tests
terencechain Jun 2, 2019
0190159
fmt and lint
terencechain Jun 2, 2019
8a1aa71
Merge branch 'spec-v0.6' of https://github.com/prysmaticlabs/prysm in…
terencechain Jun 2, 2019
2a65c55
spacing
terencechain Jun 2, 2019
a7b979f
Added todo
terencechain Jun 2, 2019
9f88933
lint
terencechain Jun 2, 2019
382077f
revert binary file
terencechain Jun 2, 2019
904f251
started regression test
terencechain Jun 3, 2019
ff0daf8
basic tests done
terencechain Jun 3, 2019
f8e0cfc
using a fifo for active indices cache
terencechain Jun 3, 2019
3902def
using a fifo for active count cache
terencechain Jun 3, 2019
bb29a22
using a fifo for total balance cache
terencechain Jun 3, 2019
dcbd1bc
using a fifo for active balance cache
terencechain Jun 3, 2019
49f0b77
using a fifo for start shard cache
terencechain Jun 3, 2019
3962f37
using a fifo for seed cache
terencechain Jun 3, 2019
473f83e
gaz
terencechain Jun 3, 2019
a2b0b9e
clean up
terencechain Jun 3, 2019
ac37806
fixing tests
terencechain Jun 3, 2019
351cd6c
fixed all the core tests
terencechain Jun 3, 2019
e3172b8
fixed all the tests!!!
terencechain Jun 3, 2019
4c1306d
lint
terencechain Jun 3, 2019
d844766
comment
terencechain Jun 3, 2019
de858e4
rm'ed commented code
terencechain Jun 3, 2019
b714635
cache size to 1000 should be good enough
terencechain Jun 3, 2019
30231b7
better metrics namings
terencechain Jun 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions beacon-chain/attestation/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,14 @@ func (a *Service) handleAttestation(ctx context.Context, msg proto.Message) erro
// This sets the pool limit, once the old pool is cleared out. It does by using the number of active
// validators per slot as an estimate. The active indices here are not used in the actual processing
// of attestations.
activeIndices := helpers.ActiveValidatorIndices(state, helpers.CurrentEpoch(state))
attPerSlot := len(activeIndices) / int(params.BeaconConfig().SlotsPerEpoch)
count, err := helpers.ActiveValidatorCount(state, helpers.CurrentEpoch(state))
if err != nil {
return err
}
attPerSlot := count / params.BeaconConfig().SlotsPerEpoch
// we only set the limit at 70% of the calculated amount to be safe so that relevant attestations
// arent carried over to the next batch.
a.poolLimit = attPerSlot * 7 / 10
a.poolLimit = int(attPerSlot) * 7 / 10
if a.poolLimit == 0 {
a.poolLimit++
}
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/blockchain/block_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ func (c *ChainService) AdvanceState(
if err != nil {
return beaconState, &BlockFailedProcessingErr{err}
}
// Prune the block cache on every new finalized epoch.
// Prune the block cache and helper caches on every new finalized epoch.
if newState.FinalizedEpoch > finalizedEpoch {
helpers.ClearAllCaches()
c.beaconDB.ClearBlockCache()
}
log.WithField(
Expand Down
75 changes: 75 additions & 0 deletions beacon-chain/blockchain/block_processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/prysmaticlabs/prysm/beacon-chain/attestation"
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
v "github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
"github.com/prysmaticlabs/prysm/beacon-chain/internal"
Expand Down Expand Up @@ -935,3 +936,77 @@ func TestSaveValidatorIdx_IdxNotInState(t *testing.T) {
t.Error("Did not get wanted validator from activation queue")
}
}

func TestNewFinalizedBlock_CanClearCaches(t *testing.T) {
Copy link
Member Author

@terencechain terencechain Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests to verify a new finalized block clears cache

db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
e := params.BeaconConfig().FarFutureEpoch
a := params.BeaconConfig().MaxDepositAmount

// Set up state and block to process epoch to get a new finalized block.
blockRoots := make([][]byte, params.BeaconConfig().SlotsPerEpoch*3+1)
for i := 0; i < len(blockRoots); i++ {
blockRoots[i] = []byte{byte(i)}
}
randaoMixes := make([][]byte, params.BeaconConfig().LatestRandaoMixesLength)
for i := 0; i < len(randaoMixes); i++ {
randaoMixes[i] = params.BeaconConfig().ZeroHash[:]
}
crosslinks := make([]*pb.Crosslink, params.BeaconConfig().ShardCount)
for i := uint64(0); i < params.BeaconConfig().ShardCount; i++ {
crosslinks[i] = &pb.Crosslink{
Epoch: params.BeaconConfig().SlotsPerEpoch,
}
}
s := &pb.BeaconState{
Slot: params.BeaconConfig().SlotsPerEpoch*3 - 1,
PreviousJustifiedEpoch: 0,
PreviousJustifiedRoot: params.BeaconConfig().ZeroHash[:],
CurrentJustifiedEpoch: 1,
CurrentJustifiedRoot: params.BeaconConfig().ZeroHash[:],
JustificationBitfield: 3,
ValidatorRegistry: []*pb.Validator{{ExitEpoch: e}, {ExitEpoch: e}, {ExitEpoch: e}, {ExitEpoch: e}},
Balances: []uint64{a, a, a, a}, // validator total balance should be 128000000000
LatestBlockRoots: blockRoots,
LatestStateRoots: make([][]byte, params.BeaconConfig().SlotsPerHistoricalRoot),
LatestBlockHeader: &pb.BeaconBlockHeader{},
LatestRandaoMixes: randaoMixes,
LatestActiveIndexRoots: make([][]byte, params.BeaconConfig().LatestActiveIndexRootsLength),
CurrentCrosslinks: crosslinks,
LatestSlashedBalances: make([]uint64, params.BeaconConfig().LatestSlashedExitLength),
}

b := &pb.BeaconBlock{
Slot: s.Slot + 1,
//StateRoot: stateRoot[:],
//ParentRoot: parentHash[:],
Body: &pb.BeaconBlockBody{
Eth1Data: &pb.Eth1Data{
DepositRoot: []byte("a"),
BlockRoot: []byte("b"),
},
RandaoReveal: []byte{},
Attestations: nil,
},
}

chainService := setupBeaconChain(t, db, nil)

// Set up cache to make sure they are cleared after a new finalized block.
if _, err := helpers.ActiveValidatorIndices(s, helpers.CurrentEpoch(s)); err != nil {
t.Fatal(err)
}
if len(helpers.ActiveIndicesKeys()) == 0 {
t.Error("Cache is empty")
}

// Advance state get a a new finalized block
if _, err := chainService.AdvanceState(context.Background(), s, b); err != nil {
t.Fatal(err)
}

// Verify the cache is cleared
if len(helpers.ActiveIndicesKeys()) != 0 {
t.Errorf("Finalized epoch did not clear the cache, got %d", len(helpers.ActiveIndicesKeys()))
}
}
6 changes: 5 additions & 1 deletion beacon-chain/blockchain/fork_choice.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,11 @@ func (c *ChainService) isDescendant(currentHead *pb.BeaconBlock, newHead *pb.Bea
// each attestation target consists of validator index and its attestation target (i.e. the block
// which the validator attested to)
func (c *ChainService) AttestationTargets(state *pb.BeaconState) (map[uint64]*pb.AttestationTarget, error) {
indices := helpers.ActiveValidatorIndices(state, helpers.CurrentEpoch(state))
indices, err := helpers.ActiveValidatorIndices(state, helpers.CurrentEpoch(state))
if err != nil {
return nil, err
}

attestationTargets := make(map[uint64]*pb.AttestationTarget)
for i, index := range indices {
target, err := c.attsService.LatestAttestationTarget(state, index)
Expand Down
10 changes: 10 additions & 0 deletions beacon-chain/blockchain/fork_choice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var _ = ForkChoice(&ChainService{})
var endpoint = "ws://127.0.0.1"

func TestApplyForkChoice_SetsCanonicalHead(t *testing.T) {
helpers.ClearAllCaches()

deposits, _ := setupInitialDeposits(t, 5)
beaconState, err := state.GenesisBeaconState(deposits, 0, nil)
if err != nil {
Expand Down Expand Up @@ -220,6 +222,8 @@ func TestVoteCount_IncreaseCountCorrectly(t *testing.T) {
}

func TestAttestationTargets_RetrieveWorks(t *testing.T) {
helpers.ClearAllCaches()

beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
ctx := context.Background()
Expand Down Expand Up @@ -1238,6 +1242,8 @@ func setupBeaconChainBenchmark(b *testing.B, beaconDB *db.BeaconDB) *ChainServic
}

func TestUpdateFFGCheckPts_NewJustifiedSlot(t *testing.T) {
helpers.ClearAllCaches()

genesisSlot := uint64(0)
beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
Expand Down Expand Up @@ -1316,6 +1322,8 @@ func TestUpdateFFGCheckPts_NewJustifiedSlot(t *testing.T) {
}

func TestUpdateFFGCheckPts_NewFinalizedSlot(t *testing.T) {
helpers.ClearAllCaches()

genesisSlot := uint64(0)
beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
Expand Down Expand Up @@ -1400,6 +1408,8 @@ func TestUpdateFFGCheckPts_NewFinalizedSlot(t *testing.T) {
}

func TestUpdateFFGCheckPts_NewJustifiedSkipSlot(t *testing.T) {
helpers.ClearAllCaches()

genesisSlot := uint64(0)
beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
Expand Down
13 changes: 13 additions & 0 deletions beacon-chain/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"active_balance.go",
"active_count.go",
"active_indices.go",
"attestation_data.go",
"block.go",
"common.go",
"seed.go",
"shuffled_indices.go",
"start_shard.go",
"total_balance.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/cache",
visibility = ["//beacon-chain:__subpackages__"],
Expand All @@ -24,15 +30,22 @@ go_test(
name = "go_default_test",
size = "small",
srcs = [
"active_balance_test.go",
"active_count_test.go",
"active_indices_test.go",
"attestation_data_test.go",
"block_test.go",
"seed_test.go",
"shuffled_indices_test.go",
"start_shard_test.go",
"total_balance_test.go",
],
embed = [":go_default_library"],
race = "on",
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"//proto/beacon/rpc/v1:go_default_library",
"//shared/params:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
],
)
98 changes: 98 additions & 0 deletions beacon-chain/cache/active_balance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package cache

import (
"errors"
"strconv"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/shared/params"
"k8s.io/client-go/tools/cache"
)

var (
// ErrNotActiveBalanceInfo will be returned when a cache object is not a pointer to
// a ActiveBalanceByEpoch struct.
ErrNotActiveBalanceInfo = errors.New("object is not a active balance obj")

// maxActiveBalanceListSize defines the max number of active balance can cache.
maxActiveBalanceListSize = int(params.BeaconConfig().LatestRandaoMixesLength)

// Metrics.
activeBalanceCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "active_balance_cache_miss",
Help: "The number of active balance requests that aren't present in the cache.",
})
activeBalanceCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "active_balance_cache_hit",
Help: "The number of active balance requests that are present in the cache.",
})
)

// ActiveBalanceByEpoch defines the active validator balance per epoch.
type ActiveBalanceByEpoch struct {
Epoch uint64
ActiveBalance uint64
}

// ActiveBalanceCache is a struct with 1 queue for looking up active balance by epoch.
type ActiveBalanceCache struct {
activeBalanceCache *cache.FIFO
lock sync.RWMutex
}

// activeBalanceKeyFn takes the epoch as the key for the active balance of a given epoch.
func activeBalanceKeyFn(obj interface{}) (string, error) {
tInfo, ok := obj.(*ActiveBalanceByEpoch)
if !ok {
return "", ErrNotActiveBalanceInfo
}

return strconv.Itoa(int(tInfo.Epoch)), nil
}

// NewActiveBalanceCache creates a new active balance cache for storing/accessing active validator balance.
func NewActiveBalanceCache() *ActiveBalanceCache {
return &ActiveBalanceCache{
activeBalanceCache: cache.NewFIFO(activeBalanceKeyFn),
}
}

// ActiveBalanceInEpoch fetches ActiveBalanceByEpoch by epoch. Returns true with a
// reference to the ActiveBalanceInEpoch info, if exists. Otherwise returns false, nil.
func (c *ActiveBalanceCache) ActiveBalanceInEpoch(epoch uint64) (uint64, error) {
c.lock.RLock()
defer c.lock.RUnlock()
obj, exists, err := c.activeBalanceCache.GetByKey(strconv.Itoa(int(epoch)))
if err != nil {
return params.BeaconConfig().FarFutureEpoch, err
}

if exists {
activeBalanceCacheHit.Inc()
} else {
activeBalanceCacheMiss.Inc()
return params.BeaconConfig().FarFutureEpoch, nil
}

tInfo, ok := obj.(*ActiveBalanceByEpoch)
if !ok {
return params.BeaconConfig().FarFutureEpoch, ErrNotActiveBalanceInfo
}

return tInfo.ActiveBalance, nil
}

// AddActiveBalance adds ActiveBalanceByEpoch object to the cache. This method also trims the least
// recently added ActiveBalanceByEpoch object if the cache size has ready the max cache size limit.
func (c *ActiveBalanceCache) AddActiveBalance(activeBalance *ActiveBalanceByEpoch) error {
c.lock.Lock()
defer c.lock.Unlock()
if err := c.activeBalanceCache.AddIfNotPresent(activeBalance); err != nil {
return err
}

trim(c.activeBalanceCache, maxActiveBalanceListSize)
return nil
}
83 changes: 83 additions & 0 deletions beacon-chain/cache/active_balance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package cache

import (
"reflect"
"strconv"
"testing"

"github.com/prysmaticlabs/prysm/shared/params"
)

func TestActiveBalanceKeyFn_OK(t *testing.T) {
tInfo := &ActiveBalanceByEpoch{
Epoch: 45,
ActiveBalance: 7456,
}

key, err := activeBalanceKeyFn(tInfo)
if err != nil {
t.Fatal(err)
}
if key != strconv.Itoa(int(tInfo.Epoch)) {
t.Errorf("Incorrect hash key: %s, expected %s", key, strconv.Itoa(int(tInfo.Epoch)))
}
}

func TestActiveBalanceKeyFn_InvalidObj(t *testing.T) {
_, err := activeBalanceKeyFn("bad")
if err != ErrNotActiveBalanceInfo {
t.Errorf("Expected error %v, got %v", ErrNotActiveBalanceInfo, err)
}
}

func TestActiveBalanceCache_ActiveBalanceByEpoch(t *testing.T) {
cache := NewActiveBalanceCache()

tInfo := &ActiveBalanceByEpoch{
Epoch: 16511,
ActiveBalance: 4456547,
}
activeBalance, err := cache.ActiveBalanceInEpoch(tInfo.Epoch)
if err != nil {
t.Fatal(err)
}
if activeBalance != params.BeaconConfig().FarFutureEpoch {
t.Error("Expected active balance not to exist in empty cache")
}

if err := cache.AddActiveBalance(tInfo); err != nil {
t.Fatal(err)
}
activeBalance, err = cache.ActiveBalanceInEpoch(tInfo.Epoch)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(activeBalance, tInfo.ActiveBalance) {
t.Errorf(
"Expected fetched active balance to be %v, got %v",
tInfo.ActiveBalance,
activeBalance,
)
}
}

func TestActiveBalance_MaxSize(t *testing.T) {
cache := NewActiveBalanceCache()

for i := uint64(0); i < params.BeaconConfig().LatestRandaoMixesLength+100; i++ {
tInfo := &ActiveBalanceByEpoch{
Epoch: i,
}
if err := cache.AddActiveBalance(tInfo); err != nil {
t.Fatal(err)
}
}

if len(cache.activeBalanceCache.ListKeys()) != maxActiveBalanceListSize {
t.Errorf(
"Expected hash cache key size to be %d, got %d",
maxActiveBalanceListSize,
len(cache.activeBalanceCache.ListKeys()),
)
}
}
Loading