diff --git a/action/protocol/staking/builder.go b/action/protocol/staking/builder.go index 668df22f1c..ccbf5c86df 100644 --- a/action/protocol/staking/builder.go +++ b/action/protocol/staking/builder.go @@ -8,5 +8,7 @@ type ( BuilderConfig struct { Staking genesis.Staking PersistCandsMapBlock uint64 + CandsMapPatchDir string + CreateStakingPatch bool } ) diff --git a/action/protocol/staking/patchstore.go b/action/protocol/staking/patchstore.go new file mode 100644 index 0000000000..b59322f504 --- /dev/null +++ b/action/protocol/staking/patchstore.go @@ -0,0 +1,117 @@ +// Copyright (c) 2020 IoTeX Foundation +// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no +// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent +// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache +// License 2.0 that can be found in the LICENSE file. + +package staking + +import ( + "encoding/csv" + "encoding/hex" + "fmt" + "os" + "path/filepath" + + "github.com/pkg/errors" +) + +const ( + _name = "name" + _operator = "operator" +) + +// PatchStore is the patch store of staking protocol +type PatchStore struct { + dir string +} + +// NewPatchStore creates a new staking patch store +func NewPatchStore(dir string) *PatchStore { + return &PatchStore{dir: dir} +} + +func (store *PatchStore) pathOf(height uint64) string { + return filepath.Join(store.dir, fmt.Sprintf("%d.patch", height)) +} + +func (store *PatchStore) read(reader *csv.Reader) (CandidateList, error) { + record, err := reader.Read() + if err != nil { + return nil, err + } + if len(record) != 1 { + return nil, errors.Errorf("invalid record %+v", record) + } + data, err := hex.DecodeString(record[0]) + if err != nil { + return nil, err + } + var list CandidateList + if err := list.Deserialize(data); err != nil { + return nil, err + } + return list, nil +} + +// Read reads CandidateList by name and CandidateList by operator of given height +func (store *PatchStore) Read(height uint64) (CandidateList, CandidateList, CandidateList, error) { + file, err := os.Open(store.pathOf(height)) + if err != nil { + return nil, nil, nil, err + } + reader := csv.NewReader(file) + reader.FieldsPerRecord = -1 + listByName, err := store.read(reader) + if err != nil { + return nil, nil, nil, err + } + listByOperator, err := store.read(reader) + if err != nil { + return nil, nil, nil, err + } + listByOwner, err := store.read(reader) + if err != nil { + return nil, nil, nil, err + } + return listByName, listByOperator, listByOwner, nil +} + +// Write writes CandidateList by name and CandidateList by operator into store +func (store *PatchStore) Write( + height uint64, + listByName, listByOperator, listByOwner CandidateList, +) (err error) { + if len(listByName) == 0 || len(listByOperator) == 0 || len(listByOwner) == 0 { + return errors.Wrap(ErrNilParameters, "empty candidate list") + } + bytesByName, err := listByName.Serialize() + if err != nil { + return errors.Wrap(err, "failed to serialize candidate list by name") + } + bytesByOperator, err := listByOperator.Serialize() + if err != nil { + return errors.Wrap(err, "failed to serialize candidate list by operator") + } + bytesByOwner, err := listByOwner.Serialize() + if err != nil { + return errors.Wrap(err, "failed to serialize candidate list by owner") + } + records := [][]string{ + {hex.EncodeToString(bytesByName)}, + {hex.EncodeToString(bytesByOperator)}, + {hex.EncodeToString(bytesByOwner)}, + } + file, err := os.Create(store.pathOf(height)) + if err != nil { + return err + } + defer func() { + fileCloseErr := file.Close() + if fileCloseErr != nil && err == nil { + err = fileCloseErr + } + }() + + return csv.NewWriter(file).WriteAll(records) +} diff --git a/action/protocol/staking/patchstore_test.go b/action/protocol/staking/patchstore_test.go new file mode 100644 index 0000000000..4ae1ed3c80 --- /dev/null +++ b/action/protocol/staking/patchstore_test.go @@ -0,0 +1,128 @@ +// Copyright (c) 2019 IoTeX Foundation +// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no +// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent +// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache +// License 2.0 that can be found in the LICENSE file. + +package staking + +import ( + "math/big" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/iotexproject/iotex-core/pkg/unit" + "github.com/iotexproject/iotex-core/test/identityset" +) + +func TestInvalidDirectory(t *testing.T) { + require := require.New(t) + dir := filepath.Join(t.TempDir(), "invalid") + _, err := os.Create(dir) + require.NoError(err) + _, _, _, err = NewPatchStore(dir).Read(0) + require.ErrorContains(err, "not a directory") +} + +func TestInvalidDirectory2(t *testing.T) { + require := require.New(t) + dir := t.TempDir() + require.NoError(os.Remove(dir)) + _, err := os.Stat(dir) + require.ErrorIs(err, os.ErrNotExist) + _, _, _, err = NewPatchStore(dir).Read(0) + require.ErrorContains(err, "no such file or directory") +} + +func TestCorruptedData(t *testing.T) { + // TODO: add test for corrupted data +} + +func TestWriteAndRead(t *testing.T) { + require := require.New(t) + dir := t.TempDir() + patch := NewPatchStore(dir) + listByName := CandidateList{ + &Candidate{ + Owner: identityset.Address(1), + Operator: identityset.Address(7), + Reward: identityset.Address(1), + Name: "name0", + Votes: big.NewInt(1), + SelfStakeBucketIdx: 2, + SelfStake: unit.ConvertIotxToRau(1200000), + }, + &Candidate{ + Owner: identityset.Address(3), + Operator: identityset.Address(5), + Reward: identityset.Address(9), + Name: "name1", + Votes: big.NewInt(2), + SelfStakeBucketIdx: 3, + SelfStake: unit.ConvertIotxToRau(200000), + }, + } + listByOperator := CandidateList{ + &Candidate{ + Owner: identityset.Address(8), + Operator: identityset.Address(6), + Reward: identityset.Address(2), + Name: "operator", + Votes: big.NewInt(3), + SelfStakeBucketIdx: 5, + SelfStake: unit.ConvertIotxToRau(1200000), + }, + &Candidate{ + Owner: identityset.Address(3), + Operator: identityset.Address(4), + Reward: identityset.Address(5), + Name: "operator1", + Votes: big.NewInt(4), + SelfStakeBucketIdx: 6, + SelfStake: unit.ConvertIotxToRau(1800000), + }, + } + listByOwner := CandidateList{ + &Candidate{ + Owner: identityset.Address(2), + Operator: identityset.Address(8), + Reward: identityset.Address(6), + Name: "owner", + Votes: big.NewInt(6), + SelfStakeBucketIdx: 9, + SelfStake: unit.ConvertIotxToRau(1100000), + }, + &Candidate{ + Owner: identityset.Address(3), + Operator: identityset.Address(9), + Reward: identityset.Address(7), + Name: "owner1", + Votes: big.NewInt(7), + SelfStakeBucketIdx: 10, + SelfStake: unit.ConvertIotxToRau(2400000), + }, + &Candidate{ + Owner: identityset.Address(7), + Operator: identityset.Address(3), + Reward: identityset.Address(1), + Name: "owner2", + Votes: big.NewInt(8), + SelfStakeBucketIdx: 5, + SelfStake: unit.ConvertIotxToRau(1200000), + }, + } + require.ErrorIs(patch.Write(2, nil, nil, nil), ErrNilParameters) + require.ErrorIs(patch.Write(2, nil, listByOperator, nil), ErrNilParameters) + require.ErrorIs(patch.Write(2, nil, nil, listByOwner), ErrNilParameters) + require.NoError(patch.Write(2, listByName, listByOperator, listByOwner)) + listByNameCopy, listByOperatorCopy, listByOwnerCopy, err := patch.Read(1) + require.ErrorIs(err, os.ErrNotExist) + listByNameCopy, listByOperatorCopy, listByOwnerCopy, err = patch.Read(2) + require.NoError(err) + require.Equal(listByName, listByNameCopy) + require.Equal(listByOperator, listByOperatorCopy) + require.Equal(listByOwner, listByOwnerCopy) +} diff --git a/action/protocol/staking/protocol.go b/action/protocol/staking/protocol.go index 85bf47796d..8e048e5497 100644 --- a/action/protocol/staking/protocol.go +++ b/action/protocol/staking/protocol.go @@ -80,6 +80,7 @@ type ( config Configuration candBucketsIndexer *CandidatesBucketsIndexer voteReviser *VoteReviser + patch *PatchStore } // Configuration is the staking protocol configuration. @@ -90,6 +91,8 @@ type ( MinStakeAmount *big.Int BootstrapCandidates []genesis.BootstrapCandidate PersistCandsMapBlock uint64 + CandsMapPatchDir string + CreateStakingPatch bool } // DepositGas deposits gas to some pool @@ -150,6 +153,8 @@ func NewProtocol(depositGas DepositGas, cfg *BuilderConfig, candBucketsIndexer * MinStakeAmount: minStakeAmount, BootstrapCandidates: cfg.Staking.BootstrapCandidates, PersistCandsMapBlock: cfg.PersistCandsMapBlock, + CandsMapPatchDir: cfg.CandsMapPatchDir, + CreateStakingPatch: cfg.CreateStakingPatch, }, depositGas: depositGas, candBucketsIndexer: candBucketsIndexer, @@ -179,7 +184,10 @@ func (p *Protocol) Start(ctx context.Context, sr protocol.StateReader) (interfac if p.needToReadCandsMap(height) { name, operator, owners, err := readCandCenterStateFromStateDB(sr, height) if err != nil { - return nil, errors.Wrap(err, "failed to read name/operator map") + // stateDB does not have name/operator map yet + if name, operator, owners, err = p.readCandCenterStateFromPatch(height); err != nil { + return nil, errors.Wrap(err, "failed to read name/operator map") + } } if err = c.candCenter.base.loadNameOperatorMapOwnerList(name, operator, owners); err != nil { return nil, errors.Wrap(err, "failed to load name/operator map to cand center") @@ -339,7 +347,16 @@ func (p *Protocol) PreCommit(ctx context.Context, sm protocol.StateManager) erro if len(name) == 0 || len(op) == 0 || len(owners) == 0 { return ErrNilParameters } - return errors.Wrap(p.writeCandCenterStateToStateDB(sm, name, op, owners), "failed to write name/operator map to stateDB") + if err := p.writeCandCenterStateToStateDB(sm, name, op, owners); err != nil { + return errors.Wrap(err, "failed to write name/operator map to stateDB") + } + // write nameMap/operatorMap and ownerList to patch file + if p.config.CreateStakingPatch { + if err := p.writeNameOperatorMapToPatch(height, name, op, owners); err != nil { + return errors.Wrap(err, "failed to write staking patch file") + } + } + return nil } // Commit commits the last change @@ -627,3 +644,17 @@ func (p *Protocol) writeCandCenterStateToStateDB(sm protocol.StateManager, name, _, err := sm.PutState(owners, protocol.NamespaceOption(CandsMapNS), protocol.KeyOption(_ownerKey)) return err } + +func (p *Protocol) writeNameOperatorMapToPatch(height uint64, name, op, owners CandidateList) error { + if p.patch == nil { + p.patch = NewPatchStore(p.config.CandsMapPatchDir) + } + return p.patch.Write(height, name, op, owners) +} + +func (p *Protocol) readCandCenterStateFromPatch(height uint64) (CandidateList, CandidateList, CandidateList, error) { + if p.patch == nil { + p.patch = NewPatchStore(p.config.CandsMapPatchDir) + } + return p.patch.Read(height) +} diff --git a/blockchain/config.go b/blockchain/config.go index 8fe59e45e5..a6e77892e1 100644 --- a/blockchain/config.go +++ b/blockchain/config.go @@ -28,6 +28,7 @@ type ( ChainDBPath string `yaml:"chainDBPath"` TrieDBPatchFile string `yaml:"trieDBPatchFile"` TrieDBPath string `yaml:"trieDBPath"` + CandsMapPatchDir string `yaml:"candsMapPatchDir"` IndexDBPath string `yaml:"indexDBPath"` BloomfilterIndexDBPath string `yaml:"bloomfilterIndexDBPath"` CandidateIndexDBPath string `yaml:"candidateIndexDBPath"` @@ -69,6 +70,8 @@ type ( StreamingBlockBufferSize uint64 `yaml:"streamingBlockBufferSize"` // PersistCandsMapBlock is the block to persist candidates map PersistCandsMapBlock uint64 `yaml:"persistCandsMapBlock"` + // CreateStakingPatch indicates to create patch file or not + CreateStakingPatch bool `yaml:"createStakingPatch"` } ) @@ -78,6 +81,7 @@ var ( ChainDBPath: "/var/data/chain.db", TrieDBPatchFile: "/var/data/trie.db.patch", TrieDBPath: "/var/data/trie.db", + CandsMapPatchDir: "/var/data", IndexDBPath: "/var/data/index.db", BloomfilterIndexDBPath: "/var/data/bloomfilter.index.db", CandidateIndexDBPath: "/var/data/candidate.index.db", diff --git a/chainservice/builder.go b/chainservice/builder.go index 95b0b96121..80544c3aee 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -451,6 +451,8 @@ func (builder *Builder) registerStakingProtocol() error { &staking.BuilderConfig{ Staking: builder.cfg.Genesis.Staking, PersistCandsMapBlock: builder.cfg.Chain.PersistCandsMapBlock, + CandsMapPatchDir: builder.cfg.Chain.CandsMapPatchDir, + CreateStakingPatch: builder.cfg.Chain.CreateStakingPatch, }, builder.cs.candBucketsIndexer, builder.cfg.Genesis.GreenlandBlockHeight,