Skip to content

Commit

Permalink
feat: concurrent checkTx (#141)
Browse files Browse the repository at this point in the history
* chore: bump up ostracon, iavl and tm-db

* feat: concurrent checkTx (#49)

* feat: implement new abci, `BeginRecheckTx()` and `EndRecheckTx()`

* test: fix tests

* refactor: decompose checkTx & runTx

* chore: protect app.checkState w/ RWMutex for simulate

* chore: remove unused var

* feat: account lock decorator

* chore: skip AccountLockDecorator if not checkTx

* chore: bump up tendermint

* chore: revise accountlock position

* chore: accountlock_test

* chore: revise accountlock covers `cache.Write()`

* chore: revise `sampleBytes` to `2`

* fix: test according to `sampleBytes`

* chore: revise `getUniqSortedAddressKey()` and add `getAddressKey()`

* chore: revise `how to sort` not to use `reflection`

* chore: bump up tendermint

* test: check `sorted` in `TestGetUniqSortedAddressKey()`

* chore: move `accountLock` from `anteTx()` to `checkTx()`
# Conflicts:
#	baseapp/abci.go
#	baseapp/baseapp.go
#	baseapp/baseapp_test.go
#	baseapp/helpers.go
#	go.mod
#	go.sum
#	x/bank/bench_test.go
#	x/mock/test_utils.go

* fix: make it buildable

* fix: tests

* fix: gasWanted & gasUsed are always `0` (#51)

* fix: gasWanted & gasUsed is always `0`

* chore: error log for general panic
# Conflicts:
#	baseapp/baseapp.go
  • Loading branch information
jinsan-line authored Apr 22, 2021
1 parent a70834a commit a25ca2f
Show file tree
Hide file tree
Showing 13 changed files with 394 additions and 151 deletions.
53 changes: 28 additions & 25 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,33 +213,38 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
defer telemetry.MeasureSince(time.Now(), "abci", "check_tx")

var mode runTxMode

switch {
case req.Type == abci.CheckTxType_New:
mode = runTxModeCheck

case req.Type == abci.CheckTxType_Recheck:
mode = runTxModeReCheck
tx, err := app.txDecoder(req.Tx)
if err != nil {
return sdkerrors.ResponseCheckTx(err, 0, 0, app.trace)
}

default:
if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}

gInfo, result, err := app.runTx(mode, req.Tx)
gInfo, err := app.checkTx(req.Tx, tx, req.Type == abci.CheckTxType_Recheck)
if err != nil {
return sdkerrors.ResponseCheckTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
}

return abci.ResponseCheckTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints?
Log: result.Log,
Data: result.Data,
Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
}
}

// BeginRecheckTx implements the ABCI interface and set the check state based on the given header
func (app *BaseApp) BeginRecheckTx(req abci.RequestBeginRecheckTx) abci.ResponseBeginRecheckTx {
// NOTE: This is safe because Ostracon holds a lock on the mempool for Rechecking.
app.setCheckState(req.Header)
return abci.ResponseBeginRecheckTx{Code: abci.CodeTypeOK}
}

// EndRecheckTx implements the ABCI interface.
func (app *BaseApp) EndRecheckTx(req abci.RequestEndRecheckTx) abci.ResponseEndRecheckTx {
return abci.ResponseEndRecheckTx{Code: abci.CodeTypeOK}
}

// DeliverTx implements the ABCI interface and executes a tx in DeliverTx mode.
// State only gets persisted if all messages are valid and get executed successfully.
// Otherwise, the ResponseDeliverTx will contain releveant error information.
Expand All @@ -258,7 +263,12 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, err := app.runTx(runTxModeDeliver, req.Tx)
tx, err := app.txDecoder(req.Tx)
if err != nil {
return sdkerrors.ResponseDeliverTx(err, 0, 0, app.trace)
}

gInfo, result, err := app.runTx(req.Tx, tx, false)
if err != nil {
resultStr = "failed"
return sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
Expand All @@ -275,11 +285,10 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx

// Commit implements the ABCI interface. It will commit all state that exists in
// the deliver state's multi-store and includes the resulting commit ID in the
// returned abci.ResponseCommit. Commit will set the check state based on the
// latest header and reset the deliver state. Also, if a non-zero halt height is
// defined in config, Commit will execute a deferred function call to check
// against that height and gracefully halt if it matches the latest committed
// height.
// returned abci.ResponseCommit. Commit will reset the deliver state.
// Also, if a non-zero halt height is defined in config, Commit will execute
// a deferred function call to check against that height and gracefully halt if
// it matches the latest committed height.
func (app *BaseApp) Commit() (res abci.ResponseCommit) {
defer telemetry.MeasureSince(time.Now(), "abci", "commit")

Expand All @@ -293,12 +302,6 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {
commitID := app.cms.Commit()
app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID))

// Reset the Check state to the latest committed.
//
// NOTE: This is safe because Tendermint holds a lock on the mempool for
// Commit. Use the header from this latest block.
app.setCheckState(header)

// empty/reset the deliver state
app.deliverState = nil

Expand Down
88 changes: 88 additions & 0 deletions baseapp/accountlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package baseapp

import (
"encoding/binary"
"sort"
"sync"

sdk "github.com/line/lbm-sdk/v2/types"
)

// NOTE should 1 <= sampleBytes <= 4. If modify it, you should revise `getAddressKey()` as well
const sampleBytes = 2

type AccountLock struct {
accMtx [1 << (sampleBytes * 8)]sync.Mutex
}

func (al *AccountLock) Lock(ctx sdk.Context, tx sdk.Tx) []uint32 {
if !ctx.IsCheckTx() || ctx.IsReCheckTx() {
return nil
}

signers := getSigners(tx)
accKeys := getUniqSortedAddressKey(signers)

for _, key := range accKeys {
al.accMtx[key].Lock()
}

return accKeys
}

func (al *AccountLock) Unlock(accKeys []uint32) {
// NOTE reverse order
for i, length := 0, len(accKeys); i < length; i++ {
key := accKeys[length-1-i]
al.accMtx[key].Unlock()
}
}

func getSigners(tx sdk.Tx) []sdk.AccAddress {
seen := map[string]bool{}
var signers []sdk.AccAddress
for _, msg := range tx.GetMsgs() {
for _, addr := range msg.GetSigners() {
if !seen[addr.String()] {
signers = append(signers, addr)
seen[addr.String()] = true
}
}
}
return signers
}

func getUniqSortedAddressKey(addrs []sdk.AccAddress) []uint32 {
accKeys := make([]uint32, 0, len(addrs))
for _, addr := range addrs {
accKeys = append(accKeys, getAddressKey(addr))
}

accKeys = uniq(accKeys)
sort.Sort(uint32Slice(accKeys))

return accKeys
}

func getAddressKey(addr sdk.AccAddress) uint32 {
return uint32(binary.BigEndian.Uint16(addr))
}

func uniq(u []uint32) []uint32 {
seen := map[uint32]bool{}
var ret []uint32
for _, v := range u {
if !seen[v] {
ret = append(ret, v)
seen[v] = true
}
}
return ret
}

// Uint32Slice attaches the methods of Interface to []uint32, sorting in increasing order.
type uint32Slice []uint32

func (p uint32Slice) Len() int { return len(p) }
func (p uint32Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
119 changes: 119 additions & 0 deletions baseapp/accountlock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package baseapp

import (
"reflect"
"sort"
"sync"
"testing"

"github.com/stretchr/testify/require"

ostproto "github.com/line/ostracon/proto/ostracon/types"

"github.com/line/lbm-sdk/v2/crypto/keys/secp256k1"
"github.com/line/lbm-sdk/v2/testutil/testdata"
sdk "github.com/line/lbm-sdk/v2/types"
)

func TestAccountLock(t *testing.T) {
app := setupBaseApp(t)
ctx := app.NewContext(true, ostproto.Header{})

privs := newTestPrivKeys(3)
tx := newTestTx(privs)

accKeys := app.accountLock.Lock(ctx, tx)

for _, accKey := range accKeys {
require.True(t, isMutexLock(&app.accountLock.accMtx[accKey]))
}

app.accountLock.Unlock(accKeys)

for _, accKey := range accKeys {
require.False(t, isMutexLock(&app.accountLock.accMtx[accKey]))
}
}

func TestUnlockDoNothingWithNil(t *testing.T) {
app := setupBaseApp(t)
require.NotPanics(t, func() { app.accountLock.Unlock(nil) })
}

func TestGetSigner(t *testing.T) {
privs := newTestPrivKeys(3)
tx := newTestTx(privs)
signers := getSigners(tx)

require.Equal(t, getAddrs(privs), signers)
}

func TestGetUniqSortedAddressKey(t *testing.T) {
privs := newTestPrivKeys(3)

addrs := getAddrs(privs)
addrs = append(addrs, addrs[1], addrs[0])
require.Equal(t, 5, len(addrs))

accKeys := getUniqSortedAddressKey(addrs)

// length should be reduced because `duplicated` is removed
require.Less(t, len(accKeys), len(addrs))

// check uniqueness
for i, iv := range accKeys {
for j, jv := range accKeys {
if i != j {
require.True(t, iv != jv)
}
}
}

// should be sorted
require.True(t, sort.IsSorted(uint32Slice(accKeys)))
}

type AccountLockTestTx struct {
Msgs []sdk.Msg
}

var _ sdk.Tx = AccountLockTestTx{}

func (tx AccountLockTestTx) GetMsgs() []sdk.Msg {
return tx.Msgs
}

func (tx AccountLockTestTx) ValidateBasic() error {
return nil
}

func newTestPrivKeys(num int) []*secp256k1.PrivKey {
privs := make([]*secp256k1.PrivKey, 0, num)
for i := 0; i < num; i++ {
privs = append(privs, secp256k1.GenPrivKey())
}
return privs
}

func getAddrs(privs []*secp256k1.PrivKey) []sdk.AccAddress {
addrs := make([]sdk.AccAddress, 0, len(privs))
for _, priv := range privs {
addrs = append(addrs, sdk.AccAddress(priv.PubKey().Address()))
}
return addrs
}

func newTestTx(privs []*secp256k1.PrivKey) sdk.Tx {
addrs := getAddrs(privs)
msgs := make([]sdk.Msg, len(addrs))
for i, addr := range addrs {
msgs[i] = testdata.NewTestMsg(addr)
}
return AccountLockTestTx{Msgs: msgs}
}

// Hack (too slow)
func isMutexLock(mtx *sync.Mutex) bool {
state := reflect.ValueOf(mtx).Elem().FieldByName("state")
return state.Int() == 1
}
Loading

0 comments on commit a25ca2f

Please sign in to comment.