Skip to content

Commit

Permalink
Problem: no command to patch the duplicated tx situation (#522)
Browse files Browse the repository at this point in the history
* Problem: no command to patch the duplicated tx situation

Closes: #521
Solution:
- add reindex-duplicated-tx command

* reuse tmDB from fix-unlucky-tx

* Update cmd/cronosd/cmd/reindex-duplicated-tx.go

* Revert "Update cmd/cronosd/cmd/reindex-duplicated-tx.go"

This reverts commit e605d3c.

* fix lint

* Update cmd/cronosd/cmd/reindex-duplicated-tx.go
  • Loading branch information
yihuang authored Jun 6, 2022
1 parent 5de290b commit 5c66cd8
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

- [cronos#489](https://github.com/crypto-org-chain/cronos/pull/489) Enable jemalloc memory allocator, and update rocksdb src to `v6.29.5`.
- [#513](https://github.com/crypto-org-chain/cronos/pull/513) Add `fix-unlucky-tx` command to patch txs post v0.7.0 upgrade.
- [cronos#522](https://github.com/crypto-org-chain/cronos/pull/522) Add `reindex-duplicated-tx` command to handle the tendermint tx duplicated issue.

*May 3, 2022*

Expand Down
186 changes: 186 additions & 0 deletions cmd/cronosd/cmd/reindex-duplicated-tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package cmd

import (
"bufio"
"context"
"fmt"
"os"
"runtime"
"strconv"
"sync"

"github.com/spf13/cobra"

"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/server"
abci "github.com/tendermint/tendermint/abci/types"
)

const (
FlagPrintTxs = "print-txs"
FlagBlocksFile = "blocks-file"
FlagStartBlock = "start-block"
FlagEndBlock = "end-block"
FlagConcurrency = "concurrency"
)

// ReindexDuplicatedTx update the tx execution result of false-failed tx in tendermint db
func ReindexDuplicatedTx() *cobra.Command {
cmd := &cobra.Command{
Use: "reindex-duplicated-tx",
Short: "Reindex tx that suffer from tendermint duplicated tx issue, it can handle both v0.6.x and v0.7.x blocks.",
Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := server.GetServerContextFromCmd(cmd)

chainID, err := cmd.Flags().GetString(flags.FlagChainID)
if err != nil {
return err
}
printTxs, err := cmd.Flags().GetBool(FlagPrintTxs)
if err != nil {
return err
}

tmDB, err := openTMDB(ctx.Config, chainID)
if err != nil {
return err
}

// iterate and patch a single block
processBlock := func(height int64) error {
blockResult, err := tmDB.stateStore.LoadABCIResponses(height)
if err != nil {
return err
}
block := tmDB.blockStore.LoadBlock(height)
if block == nil {
return fmt.Errorf("block not found: %d", height)
}

for txIndex, txResult := range blockResult.DeliverTxs {
tx := block.Txs[txIndex]
txHash := tx.Hash()
indexed, err := tmDB.txIndexer.Get(txHash)
if err != nil {
return err
}
if txResult.Code == 0 && txResult.Code != indexed.Result.Code {
if printTxs {
fmt.Println(height, txIndex)
continue
}
// a success tx but indexed wrong, reindex the tx
result := &abci.TxResult{
Height: height,
Index: uint32(txIndex),
Tx: tx,
Result: *txResult,
}

if err := tmDB.txIndexer.Index(result); err != nil {
return err
}
}
}

return nil
}

concurrency, err := cmd.Flags().GetInt(FlagConcurrency)
if err != nil {
return err
}

blockChan := make(chan int64, concurrency)
var wg sync.WaitGroup
ctCtx, cancel := context.WithCancel(context.Background())
defer cancel()

for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctCtx.Done():
return
case blockNum, ok := <-blockChan:
if !ok {
return
}

if err := processBlock(blockNum); err != nil {
fmt.Fprintf(os.Stderr, "process block failed: %d, %+v", blockNum, err)
cancel()
return
}
}
}
}(&wg)
}

blocksFile, err := cmd.Flags().GetString(FlagBlocksFile)
if err != nil {
return err
}
findBlock := func() error {
if len(blocksFile) > 0 {
// read block numbers from file, one number per line
file, err := os.Open(blocksFile)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
blockNumber, err := strconv.ParseInt(scanner.Text(), 10, 64)
if err != nil {
return err
}
blockChan <- blockNumber
}
} else {
startHeight, err := cmd.Flags().GetInt(FlagStartBlock)
if err != nil {
return err
}
endHeight, err := cmd.Flags().GetInt(FlagEndBlock)
if err != nil {
return err
}
if startHeight < 1 {
return fmt.Errorf("invalid start-block: %d", startHeight)
}
if endHeight < startHeight {
return fmt.Errorf("invalid end-block %d, smaller than start-block", endHeight)
}

for height := startHeight; height <= endHeight; height++ {
blockChan <- int64(height)
}
}
return nil
}

go func() {
if err := findBlock(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
close(blockChan)
}()

wg.Wait()

return ctCtx.Err()
},
}
cmd.Flags().String(flags.FlagChainID, "cronosmainnet_25-1", "network chain ID, only useful for psql tx indexer backend")
cmd.Flags().Bool(FlagPrintTxs, false, "Print the block number and tx indexes of the duplicated txs without patch")
cmd.Flags().String(FlagBlocksFile, "", "Read block numbers from a file instead of iterating all the blocks")
cmd.Flags().Int(FlagStartBlock, 1, "The start of the block range to iterate, inclusive")
cmd.Flags().Int(FlagEndBlock, -1, "The end of the block range to iterate, inclusive")
cmd.Flags().Int(FlagConcurrency, runtime.NumCPU(), "Define how many workers run in concurrency")

return cmd
}
1 change: 1 addition & 0 deletions cmd/cronosd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func initRootCmd(rootCmd *cobra.Command, encodingConfig params.EncodingConfig) {
txCommand(),
ethermintclient.KeyCommands(app.DefaultNodeHome),
FixUnluckyTxCmd(),
ReindexDuplicatedTx(),
)

// add rosetta
Expand Down

0 comments on commit 5c66cd8

Please sign in to comment.