Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add basic consensus check to support data proxy tally #344

Merged
merged 9 commits into from
Aug 28, 2024
67 changes: 43 additions & 24 deletions x/tally/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (k Keeper) ProcessTallies(ctx sdk.Context) error {
// Loop through the list to apply filter, execute tally, and post
// execution result.
sudoMsgs := make([]types.Sudo, len(tallyList))
vmResults := make([]tallyvm.VmResult, len(tallyList))
tallyResults := make([]TallyResult, len(tallyList))
for i, req := range tallyList {
// Construct barebone sudo message to be posted to the contract
// here and populate its results fields after FilterAndTally.
Expand All @@ -86,30 +86,30 @@ func (k Keeper) ProcessTallies(ctx sdk.Context) error {
},
}

vmRes, consensus, err := k.FilterAndTally(ctx, req)
result, err := k.FilterAndTally(ctx, req)
if err != nil {
// Return with exit code 255 to signify that the tally VM
// was not executed due to the error specified in the result
// field.
sudoMsg.ExitCode = 0xff
sudoMsg.Result.ExitCode = 0xff
sudoMsg.Result.Result = []byte(err.Error())
sudoMsg.Result.Consensus = consensus
sudoMsg.Result.Consensus = result.consensus
} else {
sudoMsg.ExitCode = byte(vmRes.ExitInfo.ExitCode)
sudoMsg.Result.ExitCode = byte(vmRes.ExitInfo.ExitCode)
sudoMsg.Result.Result = vmRes.Result
sudoMsg.Result.Consensus = consensus
sudoMsg.ExitCode = byte(result.exitInfo.ExitCode)
sudoMsg.Result.ExitCode = byte(result.exitInfo.ExitCode)
sudoMsg.Result.Result = result.result
sudoMsg.Result.Consensus = result.consensus
}
k.Logger(ctx).Info(
"completed tally execution",
"request_id", req.ID,
"execution_result", vmRes,
"result", result,
"sudo_message", sudoMsg,
)

sudoMsgs[i] = sudoMsg
vmResults[i] = vmRes
tallyResults[i] = result
}

msg, err := json.Marshal(struct {
Expand Down Expand Up @@ -143,31 +143,43 @@ func (k Keeper) ProcessTallies(ctx sdk.Context) error {
types.EventTypeTallyCompletion,
sdk.NewAttribute(types.AttributeDataRequestID, sudoMsgs[i].ID),
sdk.NewAttribute(types.AttributeTypeConsensus, strconv.FormatBool(sudoMsgs[i].Result.Consensus)),
sdk.NewAttribute(types.AttributeTallyVMStdOut, strings.Join(vmResults[i].Stdout, "\n")),
sdk.NewAttribute(types.AttributeTallyVMStdErr, strings.Join(vmResults[i].Stderr, "\n")),
sdk.NewAttribute(types.AttributeTallyVMStdOut, strings.Join(tallyResults[i].stdout, "\n")),
sdk.NewAttribute(types.AttributeTallyVMStdErr, strings.Join(tallyResults[i].stderr, "\n")),
sdk.NewAttribute(types.AttributeTallyExitCode, fmt.Sprintf("%02x", sudoMsgs[i].ExitCode)),
sdk.NewAttribute(types.AttributeProxyPubKeys, strings.Join(tallyResults[i].proxyPubKeys, "\n")),
),
)
}
return nil
}

type TallyResult struct {
consensus bool
stdout []string
stderr []string
result []byte
exitInfo tallyvm.ExitInfo
proxyPubKeys []string // data proxy pubkeys in basic consensus
}

// FilterAndTally applies filter and executes tally. It returns the
// tally VM result, consensus boolean, and error if applicable.
func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request) (tallyvm.VmResult, bool, error) {
// tally VM result, consensus boolean, consensus data proxy public keys,
// and error if applicable.
func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request) (TallyResult, error) {
var result TallyResult
filter, err := base64.StdEncoding.DecodeString(req.ConsensusFilter)
if err != nil {
return tallyvm.VmResult{}, false, errorsmod.Wrap(err, "failed to decode consensus filter")
return result, errorsmod.Wrap(err, "failed to decode consensus filter")
}
// Convert base64-encoded payback address to hex encoding that
// the tally VM expects.
decodedBytes, err := base64.StdEncoding.DecodeString(req.PaybackAddress)
if err != nil {
return tallyvm.VmResult{}, false, errorsmod.Wrap(err, "failed to decode payback address")
return result, errorsmod.Wrap(err, "failed to decode payback address")
}
paybackAddrHex := hex.EncodeToString(decodedBytes)

// Sort reveals.
// Sort reveals and proxy public keys.
keys := make([]string, len(req.Reveals))
i := 0
for k := range req.Reveals {
Expand All @@ -178,38 +190,40 @@ func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request) (tallyvm.VmRe
reveals := make([]types.RevealBody, len(req.Reveals))
for i, k := range keys {
reveals[i] = req.Reveals[k]
sort.Strings(reveals[i].ProxyPubKeys)
}

outliers, consensus, err := ApplyFilter(filter, reveals)
var outliers []int
outliers, result.consensus, result.proxyPubKeys, err = ApplyFilter(filter, reveals)
if err != nil {
return tallyvm.VmResult{}, false, errorsmod.Wrap(err, "error while applying filter")
return result, errorsmod.Wrap(err, "error while applying filter")
}

tallyWasm, err := k.wasmStorageKeeper.GetDataRequestWasm(ctx, req.TallyBinaryID)
if err != nil {
return tallyvm.VmResult{}, false, err
return result, err
}
tallyInputs, err := base64.StdEncoding.DecodeString(req.TallyInputs)
if err != nil {
return tallyvm.VmResult{}, false, errorsmod.Wrap(err, "failed to decode tally inputs")
return result, errorsmod.Wrap(err, "failed to decode tally inputs")
}

args, err := tallyVMArg(tallyInputs, reveals, outliers)
if err != nil {
return tallyvm.VmResult{}, false, errorsmod.Wrap(err, "failed to construct tally VM arguments")
return result, errorsmod.Wrap(err, "failed to construct tally VM arguments")
}

k.Logger(ctx).Info(
"executing tally VM",
"request_id", req.ID,
"tally_wasm_hash", req.TallyBinaryID,
"consensus", consensus,
"consensus", result.consensus,
"arguments", args,
)

vmRes := tallyvm.ExecuteTallyVm(tallyWasm.Bytecode, args, map[string]string{
"VM_MODE": "tally",
"CONSENSUS": fmt.Sprintf("%v", consensus),
"CONSENSUS": fmt.Sprintf("%v", result.consensus),
"DR_ID": req.ID,
"DR_INPUT": req.DrInputs,
"BINARY_ID": req.DrBinaryID,
Expand All @@ -220,7 +234,12 @@ func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request) (tallyvm.VmRe
"DR_PAYBACK_ADDRESS": paybackAddrHex,
"BLOCK_HEIGHT": fmt.Sprintf("%d", ctx.BlockHeight()),
})
return vmRes, consensus, nil
result.stdout = vmRes.Stdout
result.stderr = vmRes.Stderr
result.result = vmRes.Result
result.exitInfo = vmRes.ExitInfo

return result, nil
}

func tallyVMArg(inputArgs []byte, reveals []types.RevealBody, outliers []int) ([]string, error) {
Expand Down
39 changes: 30 additions & 9 deletions x/tally/keeper/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keeper

import (
"errors"
"fmt"

"github.com/sedaprotocol/seda-chain/x/tally/types"
)
Expand All @@ -15,10 +16,30 @@ const (
// ApplyFilter processes filter of the type specified in the first
// byte of consensus filter. It returns an outlier list, which is
// a boolean list where true at index i means that the reveal at
// index i is an outlier, consensus boolean, and error.
func ApplyFilter(input []byte, reveals []types.RevealBody) ([]int, bool, error) {
// index i is an outlier, consensus boolean, consensus data proxy
// public keys, and error. It assumes that the reveals and their
// proxy public keys are sorted.
func ApplyFilter(input []byte, reveals []types.RevealBody) ([]int, bool, []string, error) {
if len(input) == 0 {
return make([]int, len(reveals)), false, types.ErrInvalidFilterType
return make([]int, len(reveals)), false, nil, types.ErrInvalidFilterType
}

// Determine basic consensus on tuple of (exit_code, proxy_pub_keys)
var maxFreq int
var proxyPubKeys []string
freq := make(map[string]int, len(reveals))
for _, reveal := range reveals {
success := reveal.ExitCode == 0
tuple := fmt.Sprintf("%v%v", success, reveal.ProxyPubKeys)
freq[tuple]++

if freq[tuple] > maxFreq {
proxyPubKeys = reveal.ProxyPubKeys
maxFreq = freq[tuple]
}
}
if maxFreq*3 < len(reveals)*2 {
return make([]int, len(reveals)), false, nil, types.ErrNoBasicConsensus
}

var filter types.Filter
Expand All @@ -31,21 +52,21 @@ func ApplyFilter(input []byte, reveals []types.RevealBody) ([]int, bool, error)
case filterTypeStdDev:
filter, err = types.NewFilterStdDev(input)
default:
return make([]int, len(reveals)), false, types.ErrInvalidFilterType
return make([]int, len(reveals)), false, proxyPubKeys, types.ErrInvalidFilterType
}
if err != nil {
return make([]int, len(reveals)), false, err
return make([]int, len(reveals)), false, proxyPubKeys, err
}

outliers, err := filter.ApplyFilter(reveals)
switch {
case err == nil:
return outliers, true, nil
return outliers, true, proxyPubKeys, nil
case errors.Is(err, types.ErrNoConsensus):
return outliers, false, nil
return outliers, false, proxyPubKeys, nil
case errors.Is(err, types.ErrCorruptReveals):
return make([]int, len(reveals)), false, err
return make([]int, len(reveals)), false, proxyPubKeys, err
default:
return make([]int, len(reveals)), false, err
return make([]int, len(reveals)), false, proxyPubKeys, err
}
}
2 changes: 1 addition & 1 deletion x/tally/keeper/filter_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func FuzzStdDevFilter(f *testing.F) {
filter, err := hex.DecodeString(filterHex)
require.NoError(t, err)

outliers, _, err := keeper.ApplyFilter(filter, reveals)
outliers, _, _, err := keeper.ApplyFilter(filter, reveals)
require.Equal(t, expOutliers, outliers)
require.ErrorIs(t, err, nil)
})
Expand Down
Loading
Loading