diff --git a/.golangci.yml b/.golangci.yml index b82a41075..3f634b1f6 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -13,20 +13,16 @@ linters-settings: linters: disable-all: true enable: - - deadcode #Default linter - errcheck #Default linter - gosimple #Default linter - govet #Default linter - ineffassign #Default linter - staticcheck #Default linter - - structcheck #Default linter - typecheck #Default linter - unused #Default linter - - varcheck #Default linter - gofmt - gci - goimports run: timeout: 5m - go: '1.18' diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 400512aac..d01492e20 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,6 +14,6 @@ repos: - id: check-added-large-files - id: check-merge-conflict - repo: https://github.com/golangci/golangci-lint - rev: v1.46.2 + rev: v1.51.0 hooks: - id: golangci-lint diff --git a/Taskfile.yaml b/Taskfile.yaml index 70531a851..b4971446d 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -8,6 +8,7 @@ vars: TIMEOUT: "10m" RUN: ".*" TAGS: "-tags json1,netgo" + BENCH_TIME: "30s" BENCH_RESULTS_DIR: "/tmp/benchmarks" BENCH_RESULTS_FILE: "/tmp/benchmarks/ledger.txt" BENCH_CPU_PROFILE: "/tmp/benchmarks/ledger.cpu.prof" @@ -26,7 +27,9 @@ tasks: tests: cmds: - - go test {{.TAGS}} {{if eq .VERBOSE "true"}}-v{{end}} -coverpkg {{.PKG}} -coverprofile coverage.out -covermode atomic {{.PKG}} + - > + go test {{.TAGS}} {{if eq .VERBOSE "true"}}-v{{end}} -coverpkg {{.PKG}} + -coverprofile coverage.out -covermode atomic {{.PKG}} tests:local: cmds: @@ -73,22 +76,8 @@ tasks: cmds: - mkdir -p {{.BENCH_RESULTS_DIR}} - > - go test {{.TAGS}} ./pkg/storage/sqlstorage - -run=XXX -bench={{.RUN}} -benchmem -timeout 1h - -cpuprofile {{.BENCH_CPU_PROFILE}} -memprofile {{.BENCH_MEM_PROFILE}} - | tee {{.BENCH_RESULTS_FILE}} - - benchstat {{.BENCH_RESULTS_FILE}} - env: - NUMARY_STORAGE_DRIVER: "postgres" - NUMARY_STORAGE_POSTGRES_CONN_STRING: "postgresql://ledger:ledger@127.0.0.1/ledger" - - bench:ledger: - deps: [postgres] - cmds: - - mkdir -p {{.BENCH_RESULTS_DIR}} - - > - go test {{.TAGS}} ./pkg/ledger - -run=XXX -bench=BenchmarkLedger_{{.RUN}} -benchmem -benchtime=30s -timeout 1h + go test {{.TAGS}} {{if eq .VERBOSE "true"}}-v{{end}} {{.PKG}} + -run=XXX -bench={{.RUN}} -benchmem -benchtime={{.BENCH_TIME}} -timeout {{.TIMEOUT}} -cpuprofile {{.BENCH_CPU_PROFILE}} -memprofile {{.BENCH_MEM_PROFILE}} | tee {{.BENCH_RESULTS_FILE}} - benchstat {{.BENCH_RESULTS_FILE}} @@ -107,7 +96,6 @@ tasks: install: deps: - install:golangci-lint - - install:cov-report - install:perf install:golangci-lint: @@ -117,10 +105,6 @@ tasks: sh -s -- -b $(go env GOPATH)/bin latest - golangci-lint --version - install:cov-report: - cmds: - - go install github.com/go-phorce/cov-report/cmd/cov-report - install:perf: - go install golang.org/x/perf/cmd/...@latest diff --git a/go.sum b/go.sum old mode 100755 new mode 100644 diff --git a/pkg/api/controllers/script_controller.go b/pkg/api/controllers/script_controller.go index 2bdf1a1ad..687cc7c74 100644 --- a/pkg/api/controllers/script_controller.go +++ b/pkg/api/controllers/script_controller.go @@ -35,7 +35,7 @@ func (ctl *ScriptController) PostScript(c *gin.Context) { preview := ok && (strings.ToUpper(value) == "YES" || strings.ToUpper(value) == "TRUE" || value == "1") res := ScriptResponse{} - execRes, err := l.(*ledger.Ledger).Execute(c.Request.Context(), false, preview, script) + execRes, err := l.(*ledger.Ledger).ExecuteScript(c.Request.Context(), preview, script) if err != nil { var ( code = apierrors.ErrInternal @@ -62,9 +62,7 @@ func (ctl *ScriptController) PostScript(c *gin.Context) { res.Details = apierrors.EncodeLink(message) } } - if len(execRes) > 0 { - res.Transaction = &execRes[0] - } + res.Transaction = &execRes c.JSON(http.StatusOK, res) } diff --git a/pkg/api/controllers/transaction_controller.go b/pkg/api/controllers/transaction_controller.go index 4ba71189a..44e2d565a 100644 --- a/pkg/api/controllers/transaction_controller.go +++ b/pkg/api/controllers/transaction_controller.go @@ -256,9 +256,6 @@ func (ctl *TransactionController) PostTransaction(c *gin.Context) { return } - var res []core.ExpandedTransaction - var err error - if len(payload.Postings) > 0 && payload.Script.Plain != "" || len(payload.Postings) == 0 && payload.Script.Plain == "" { apierrors.ResponseError(c, ledger.NewValidationError( @@ -276,24 +273,29 @@ func (ctl *TransactionController) PostTransaction(c *gin.Context) { Reference: payload.Reference, Metadata: payload.Metadata, } - res, err = l.(*ledger.Ledger).Execute(c.Request.Context(), - true, preview, core.TxsToScriptsData(txData)...) - } else { - script := core.ScriptData{ - Script: payload.Script, - Timestamp: payload.Timestamp, - Reference: payload.Reference, - Metadata: payload.Metadata, + res, err := l.(*ledger.Ledger).ExecuteTxsData(c.Request.Context(), preview, txData) + if err != nil { + apierrors.ResponseError(c, err) + return } - res, err = l.(*ledger.Ledger).Execute(c.Request.Context(), - false, preview, script) + + respondWithData[[]core.ExpandedTransaction](c, http.StatusOK, res) + return } + + script := core.ScriptData{ + Script: payload.Script, + Timestamp: payload.Timestamp, + Reference: payload.Reference, + Metadata: payload.Metadata, + } + res, err := l.(*ledger.Ledger).ExecuteScript(c.Request.Context(), preview, script) if err != nil { apierrors.ResponseError(c, err) return } - respondWithData[[]core.ExpandedTransaction](c, http.StatusOK, res) + respondWithData[[]core.ExpandedTransaction](c, http.StatusOK, []core.ExpandedTransaction{res}) } func (ctl *TransactionController) GetTransaction(c *gin.Context) { @@ -389,8 +391,7 @@ func (ctl *TransactionController) PostTransactionsBatch(c *gin.Context) { } } - res, err := l.(*ledger.Ledger).Execute(c.Request.Context(), true, false, - core.TxsToScriptsData(txs.Transactions...)...) + res, err := l.(*ledger.Ledger).ExecuteTxsData(c.Request.Context(), false, txs.Transactions...) if err != nil { apierrors.ResponseError(c, err) return diff --git a/pkg/api/controllers/transaction_controller_test.go b/pkg/api/controllers/transaction_controller_test.go index 9e04a1841..ecbe9b9da 100644 --- a/pkg/api/controllers/transaction_controller_test.go +++ b/pkg/api/controllers/transaction_controller_test.go @@ -301,10 +301,9 @@ func TestPostTransactions(t *testing.T) { expectedStatusCode: http.StatusBadRequest, expectedErr: sharedapi.ErrorResponse{ ErrorCode: apierrors.ErrInsufficientFund, - ErrorMessage: "[INSUFFICIENT_FUND] account had insufficient funds", - Details: apierrors.EncodeLink("account had insufficient funds"), + ErrorMessage: "balance.insufficient.TOK", ErrorCodeDeprecated: apierrors.ErrInsufficientFund, - ErrorMessageDeprecated: "[INSUFFICIENT_FUND] account had insufficient funds", + ErrorMessageDeprecated: "balance.insufficient.TOK", }, }, { @@ -593,11 +592,95 @@ func TestPostTransactions(t *testing.T) { ErrorMessageDeprecated: "cannot pass a timestamp prior to the last transaction:", }, }, + { + name: "mapping with postings", + payload: []controllers.PostTransaction{ + { + Postings: core.Postings{ + { + Source: "negativebalances:bar", + Destination: "world", + Amount: core.NewMonetaryInt(1000), + Asset: "TOK", + }, + }, + Timestamp: timestamp3, + }, + }, + expectedStatusCode: http.StatusOK, + expectedRes: sharedapi.BaseResponse[[]core.ExpandedTransaction]{ + Data: &[]core.ExpandedTransaction{{ + Transaction: core.Transaction{ + TransactionData: core.TransactionData{ + Postings: core.Postings{ + { + Source: "negativebalances:bar", + Destination: "world", + Amount: core.NewMonetaryInt(1000), + Asset: "TOK", + }, + }, + Timestamp: timestamp3, + }, + }, + }}, + }, + }, + { + name: "short asset", + payload: []controllers.PostTransaction{ + { + Postings: core.Postings{ + { + Source: "world", + Destination: "bank", + Amount: core.NewMonetaryInt(1000), + Asset: "F/9", + }, + }, + Timestamp: timestamp3, + }, + }, + expectedStatusCode: http.StatusOK, + expectedRes: sharedapi.BaseResponse[[]core.ExpandedTransaction]{ + Data: &[]core.ExpandedTransaction{{ + Transaction: core.Transaction{ + TransactionData: core.TransactionData{ + Postings: core.Postings{ + { + Source: "world", + Destination: "bank", + Amount: core.NewMonetaryInt(1000), + Asset: "F/9", + }, + }, + Timestamp: timestamp3, + }, + }, + }}, + }, + }, } internal.RunTest(t, fx.Invoke(func(lc fx.Lifecycle, api *api.API) { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { + internal.SaveMapping(t, api, core.Mapping{ + Contracts: []core.Contract{{ + Name: "negative balances", + Account: "negativebalances:*", + Expr: core.ExprOr{ + &core.ExprGte{ + Op1: core.VariableExpr{Name: "balance"}, + Op2: core.ConstantExpr{Value: 0}, + }, + &core.ExprLte{ + Op1: core.VariableExpr{Name: "balance"}, + Op2: core.ConstantExpr{Value: 0}, + }, + }, + }}, + }) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { for i := 0; i < len(tc.payload)-1; i++ { @@ -2003,10 +2086,9 @@ func TestPostTransactionsBatch(t *testing.T) { internal.Decode(t, rsp.Body, &err) require.EqualValues(t, sharedapi.ErrorResponse{ ErrorCode: apierrors.ErrInsufficientFund, - ErrorMessage: "[INSUFFICIENT_FUND] account had insufficient funds", - Details: apierrors.EncodeLink("account had insufficient funds"), + ErrorMessage: "balance.insufficient.COIN", ErrorCodeDeprecated: apierrors.ErrInsufficientFund, - ErrorMessageDeprecated: "[INSUFFICIENT_FUND] account had insufficient funds", + ErrorMessageDeprecated: "balance.insufficient.COIN", }, err) }) @@ -2053,10 +2135,9 @@ func TestPostTransactionsBatch(t *testing.T) { internal.Decode(t, rsp.Body, &err) require.EqualValues(t, sharedapi.ErrorResponse{ ErrorCode: apierrors.ErrInsufficientFund, - ErrorMessage: "[INSUFFICIENT_FUND] account had insufficient funds", - Details: apierrors.EncodeLink("account had insufficient funds"), + ErrorMessage: "balance.insufficient.GEM", ErrorCodeDeprecated: apierrors.ErrInsufficientFund, - ErrorMessageDeprecated: "[INSUFFICIENT_FUND] account had insufficient funds", + ErrorMessageDeprecated: "balance.insufficient.GEM", }, err) }) diff --git a/pkg/core/asset.go b/pkg/core/asset.go index 370c3c05d..e28f03aa1 100644 --- a/pkg/core/asset.go +++ b/pkg/core/asset.go @@ -4,7 +4,7 @@ import ( "regexp" ) -var assetRegexp = regexp.MustCompile(`^[A-Z][A-Z0-9]{1,16}(\/\d{1,6})?$`) +var assetRegexp = regexp.MustCompile(`^[A-Z][A-Z0-9]{0,16}(\/\d{1,6})?$`) func AssetIsValid(v string) bool { return assetRegexp.Match([]byte(v)) diff --git a/pkg/core/numscript.go b/pkg/core/numscript.go deleted file mode 100644 index 1a422d567..000000000 --- a/pkg/core/numscript.go +++ /dev/null @@ -1,120 +0,0 @@ -package core - -import ( - "encoding/json" - "fmt" - "sort" - "strings" -) - -type variable struct { - name string - jsonVal json.RawMessage -} - -func TxsToScriptsData(txsData ...TransactionData) []ScriptData { - res := []ScriptData{} - for _, txData := range txsData { - sb := strings.Builder{} - monetaryToVars := map[string]variable{} - accountsToVars := map[string]variable{} - i := 0 - j := 0 - for _, p := range txData.Postings { - if _, ok := accountsToVars[p.Source]; !ok { - if p.Source != WORLD { - accountsToVars[p.Source] = variable{ - name: fmt.Sprintf("va%d", i), - jsonVal: json.RawMessage(`"` + p.Source + `"`), - } - i++ - } - } - if _, ok := accountsToVars[p.Destination]; !ok { - if p.Destination != WORLD { - accountsToVars[p.Destination] = variable{ - name: fmt.Sprintf("va%d", i), - jsonVal: json.RawMessage(`"` + p.Destination + `"`), - } - i++ - } - } - mon := fmt.Sprintf("[%s %s]", p.Amount.String(), p.Asset) - if _, ok := monetaryToVars[mon]; !ok { - monetaryToVars[mon] = variable{ - name: fmt.Sprintf("vm%d", j), - jsonVal: json.RawMessage( - `{"asset":"` + p.Asset + `","amount":` + p.Amount.String() + `}`), - } - j++ - } - } - - sb.WriteString("vars {\n") - accVars := make([]string, 0) - for _, v := range accountsToVars { - accVars = append(accVars, v.name) - } - sort.Strings(accVars) - for _, v := range accVars { - sb.WriteString(fmt.Sprintf("\taccount $%s\n", v)) - } - monVars := make([]string, 0) - for _, v := range monetaryToVars { - monVars = append(monVars, v.name) - } - sort.Strings(monVars) - for _, v := range monVars { - sb.WriteString(fmt.Sprintf("\tmonetary $%s\n", v)) - } - sb.WriteString("}\n") - - for _, p := range txData.Postings { - m := fmt.Sprintf("[%s %s]", p.Amount.String(), p.Asset) - mon, ok := monetaryToVars[m] - if !ok { - panic(fmt.Sprintf("monetary %s not found", m)) - } - sb.WriteString(fmt.Sprintf("send $%s (\n", mon.name)) - if p.Source == WORLD { - sb.WriteString("\tsource = @world\n") - } else { - src, ok := accountsToVars[p.Source] - if !ok { - panic(fmt.Sprintf("source %s not found", p.Source)) - } - sb.WriteString(fmt.Sprintf("\tsource = $%s\n", src.name)) - } - if p.Destination == WORLD { - sb.WriteString("\tdestination = @world\n") - } else { - dest, ok := accountsToVars[p.Destination] - if !ok { - panic(fmt.Sprintf("destination %s not found", p.Destination)) - } - sb.WriteString(fmt.Sprintf("\tdestination = $%s\n", dest.name)) - } - sb.WriteString(")\n") - } - - vars := map[string]json.RawMessage{} - for _, v := range accountsToVars { - vars[v.name] = v.jsonVal - } - for _, v := range monetaryToVars { - vars[v.name] = v.jsonVal - } - - res = append(res, ScriptData{ - Script: Script{ - Plain: sb.String(), - Vars: vars, - }, - Timestamp: txData.Timestamp, - Reference: txData.Reference, - Metadata: txData.Metadata, - }) - } - - return res -} diff --git a/pkg/core/numscript_test.go b/pkg/core/numscript_test.go deleted file mode 100644 index f948b7798..000000000 --- a/pkg/core/numscript_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package core - -import ( - "encoding/json" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestTxsToScriptsData(t *testing.T) { - ts := time.Now() - tests := []struct { - name string - input []TransactionData - output []ScriptData - }{ - { - name: "empty", - input: []TransactionData{}, - output: []ScriptData{}, - }, - { - name: "nominal", - input: []TransactionData{ - { - Postings: Postings{ - { - Source: "world", - Destination: "alice", - Asset: "EUR/2", - Amount: NewMonetaryInt(100), - }, - }, - Reference: "ref", - Timestamp: ts, - Metadata: Metadata{"key": "val"}, - }, - }, - output: []ScriptData{ - { - Script: Script{ - Plain: "vars {\n\taccount $va0\n\tmonetary $vm0\n}\nsend $vm0 (\n\tsource = @world\n\tdestination = $va0\n)\n", - Vars: map[string]json.RawMessage{ - "va0": json.RawMessage(`"alice"`), - "vm0": json.RawMessage(`{"asset":"EUR/2","amount":100}`), - }, - }, - Reference: "ref", - Timestamp: ts, - Metadata: Metadata{"key": "val"}, - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.output, - TxsToScriptsData(tt.input...)) - }) - } -} diff --git a/pkg/ledger/benchmarks_test.go b/pkg/ledger/benchmarks_test.go new file mode 100644 index 000000000..33d31e756 --- /dev/null +++ b/pkg/ledger/benchmarks_test.go @@ -0,0 +1,279 @@ +package ledger_test + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/numary/ledger/pkg/core" + "github.com/numary/ledger/pkg/ledger" + "github.com/stretchr/testify/require" +) + +const nbPostings = 1000 + +func BenchmarkLedger_PostTransactions_Scripts_Single_FixedAccounts(b *testing.B) { + var execResScript core.ExpandedTransaction + + txData := core.TransactionData{} + for i := 0; i < nbPostings; i++ { + txData.Postings = append(txData.Postings, core.Posting{ + Source: "world", + Destination: "benchmarks:" + strconv.Itoa(i), + Asset: "COIN", + Amount: core.NewMonetaryInt(10), + }) + } + _, err := txData.Postings.Validate() + require.NoError(b, err) + + runOnLedger(func(l *ledger.Ledger) { + b.ResetTimer() + + res := core.ExpandedTransaction{} + + for n := 0; n < b.N; n++ { + b.StopTimer() + script := txToScriptData(txData) + b.StartTimer() + res, err = l.ExecuteScript(context.Background(), true, script) + require.NoError(b, err) + require.Len(b, res.Postings, nbPostings) + } + + execResScript = res + require.Len(b, execResScript.Postings, nbPostings) + }) +} + +func BenchmarkLedger_PostTransactions_Postings_Single_FixedAccounts(b *testing.B) { + var execRes []core.ExpandedTransaction + + runOnLedger(func(l *ledger.Ledger) { + txData := core.TransactionData{} + for i := 0; i < nbPostings; i++ { + txData.Postings = append(txData.Postings, core.Posting{ + Source: "world", + Destination: "benchmarks:" + strconv.Itoa(i), + Asset: "COIN", + Amount: core.NewMonetaryInt(10), + }) + } + + b.ResetTimer() + + res := []core.ExpandedTransaction{} + + for n := 0; n < b.N; n++ { + _, err := txData.Postings.Validate() + require.NoError(b, err) + res, err = l.ExecuteTxsData(context.Background(), true, txData) + require.NoError(b, err) + require.Len(b, res, 1) + require.Len(b, res[0].Postings, nbPostings) + } + + execRes = res + require.Len(b, execRes, 1) + require.Len(b, execRes[0].Postings, nbPostings) + }) +} + +func BenchmarkLedger_PostTransactions_Postings_Batch_FixedAccounts(b *testing.B) { + var execRes []core.ExpandedTransaction + + txsData := newTxsData(1) + + runOnLedger(func(l *ledger.Ledger) { + b.ResetTimer() + + res := []core.ExpandedTransaction{} + + for n := 0; n < b.N; n++ { + var err error + for _, txData := range txsData { + _, err := txData.Postings.Validate() + require.NoError(b, err) + } + res, err = l.ExecuteTxsData(context.Background(), true, txsData...) + require.NoError(b, err) + require.Len(b, res, 7) + require.Len(b, res[0].Postings, 1) + require.Len(b, res[1].Postings, 1) + require.Len(b, res[2].Postings, 2) + require.Len(b, res[3].Postings, 4) + require.Len(b, res[4].Postings, 4) + require.Len(b, res[5].Postings, 1) + require.Len(b, res[6].Postings, 1) + } + + execRes = res + require.Len(b, execRes, 7) + require.Len(b, execRes[0].Postings, 1) + require.Len(b, execRes[1].Postings, 1) + require.Len(b, execRes[2].Postings, 2) + require.Len(b, execRes[3].Postings, 4) + require.Len(b, execRes[4].Postings, 4) + require.Len(b, execRes[5].Postings, 1) + require.Len(b, execRes[6].Postings, 1) + }) +} + +func BenchmarkLedger_PostTransactions_Postings_Batch_VaryingAccounts(b *testing.B) { + var execRes []core.ExpandedTransaction + + runOnLedger(func(l *ledger.Ledger) { + b.ResetTimer() + + res := []core.ExpandedTransaction{} + + for n := 0; n < b.N; n++ { + b.StopTimer() + txsData := newTxsData(n) + b.StartTimer() + var err error + for _, txData := range txsData { + _, err := txData.Postings.Validate() + require.NoError(b, err) + } + res, err = l.ExecuteTxsData(context.Background(), true, txsData...) + require.NoError(b, err) + require.Len(b, res, 7) + require.Len(b, res[0].Postings, 1) + require.Len(b, res[1].Postings, 1) + require.Len(b, res[2].Postings, 2) + require.Len(b, res[3].Postings, 4) + require.Len(b, res[4].Postings, 4) + require.Len(b, res[5].Postings, 1) + require.Len(b, res[6].Postings, 1) + } + + execRes = res + require.Len(b, execRes, 7) + require.Len(b, execRes[0].Postings, 1) + require.Len(b, execRes[1].Postings, 1) + require.Len(b, execRes[2].Postings, 2) + require.Len(b, execRes[3].Postings, 4) + require.Len(b, execRes[4].Postings, 4) + require.Len(b, execRes[5].Postings, 1) + require.Len(b, execRes[6].Postings, 1) + }) +} + +func newTxsData(i int) []core.TransactionData { + return []core.TransactionData{ + { + Postings: core.Postings{ + { + Source: "world", + Destination: fmt.Sprintf("payins:%d", i), + Amount: core.NewMonetaryInt(10000), + Asset: "EUR/2", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: fmt.Sprintf("payins:%d", i), + Destination: fmt.Sprintf("users:%d:wallet", i), + Amount: core.NewMonetaryInt(10000), + Asset: "EUR/2", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: "world", + Destination: fmt.Sprintf("teller:%d", i), + Amount: core.NewMonetaryInt(350000), + Asset: "RBLX/6", + }, + { + Source: "world", + Destination: fmt.Sprintf("teller:%d", i), + Amount: core.NewMonetaryInt(1840000), + Asset: "SNAP/6", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: fmt.Sprintf("users:%d:wallet", i), + Destination: fmt.Sprintf("trades:%d", i), + Amount: core.NewMonetaryInt(1500), + Asset: "EUR/2", + }, + { + Source: fmt.Sprintf("trades:%d", i), + Destination: fmt.Sprintf("fiat:holdings:%d", i), + Amount: core.NewMonetaryInt(1500), + Asset: "EUR/2", + }, + { + Source: fmt.Sprintf("teller:%d", i), + Destination: fmt.Sprintf("trades:%d", i), + Amount: core.NewMonetaryInt(350000), + Asset: "RBLX/6", + }, + { + Source: fmt.Sprintf("trades:%d", i), + Destination: fmt.Sprintf("users:%d:wallet", i), + Amount: core.NewMonetaryInt(350000), + Asset: "RBLX/6", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: fmt.Sprintf("users:%d:wallet", i), + Destination: fmt.Sprintf("trades:%d", i), + Amount: core.NewMonetaryInt(4230), + Asset: "EUR/2", + }, + { + Source: fmt.Sprintf("trades:%d", i), + Destination: fmt.Sprintf("fiat:holdings:%d", i), + Amount: core.NewMonetaryInt(4230), + Asset: "EUR/2", + }, + { + Source: fmt.Sprintf("teller:%d", i), + Destination: fmt.Sprintf("trades:%d", i), + Amount: core.NewMonetaryInt(1840000), + Asset: "SNAP/6", + }, + { + Source: fmt.Sprintf("trades:%d", i), + Destination: fmt.Sprintf("users:%d:wallet", i), + Amount: core.NewMonetaryInt(1840000), + Asset: "SNAP/6", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: fmt.Sprintf("users:%d:wallet", i), + Destination: fmt.Sprintf("users:%d:withdrawals", i), + Amount: core.NewMonetaryInt(2270), + Asset: "EUR/2", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: fmt.Sprintf("users:%d:withdrawals", i), + Destination: fmt.Sprintf("payouts:%d", i), + Amount: core.NewMonetaryInt(2270), + Asset: "EUR/2", + }, + }, + }, + } +} diff --git a/pkg/ledger/execute_script.go b/pkg/ledger/execute_script.go new file mode 100644 index 000000000..54de4e3f4 --- /dev/null +++ b/pkg/ledger/execute_script.go @@ -0,0 +1,330 @@ +package ledger + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "time" + + "github.com/DmitriyVTitov/size" + "github.com/dgraph-io/ristretto" + machine "github.com/formancehq/machine/core" + "github.com/formancehq/machine/script/compiler" + "github.com/formancehq/machine/vm" + "github.com/formancehq/machine/vm/program" + "github.com/numary/ledger/pkg/core" + "github.com/numary/ledger/pkg/opentelemetry" + "github.com/numary/ledger/pkg/storage" + "github.com/pkg/errors" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +func (l *Ledger) ExecuteScript(ctx context.Context, preview bool, script core.ScriptData) (core.ExpandedTransaction, error) { + ctx, span := opentelemetry.Start(ctx, "ExecuteScript") + defer span.End() + + addOps := new(core.AdditionalOperations) + + lastTx, err := l.store.GetLastTransaction(ctx) + if err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, + "could not get last transaction") + } + + vAggr := NewVolumeAggregator(l) + txs := make([]core.ExpandedTransaction, 0) + var nextTxId uint64 + if lastTx != nil { + nextTxId = lastTx.ID + 1 + } + + accs := map[string]*core.AccountWithVolumes{} + // Until v1.5.0, dates was stored as string using rfc3339 format + // So round the date to the second to keep the same behaviour + if script.Timestamp.IsZero() { + script.Timestamp = time.Now().UTC().Truncate(time.Second) + } else { + script.Timestamp = script.Timestamp.UTC() + } + + past := false + if lastTx != nil && script.Timestamp.Before(lastTx.Timestamp) { + past = true + } + if past && !l.allowPastTimestamps { + return core.ExpandedTransaction{}, NewValidationError(fmt.Sprintf( + "cannot pass a timestamp prior to the last transaction: %s (passed) is %s before %s (last)", + script.Timestamp.Format(time.RFC3339Nano), + lastTx.Timestamp.Sub(script.Timestamp), + lastTx.Timestamp.Format(time.RFC3339Nano))) + } + + if script.Reference != "" { + txs, err := l.GetTransactions(ctx, *NewTransactionsQuery(). + WithReferenceFilter(script.Reference)) + if err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, + "get transactions with reference") + } + if len(txs.Data) > 0 { + return core.ExpandedTransaction{}, NewConflictError() + } + } + + if script.Plain == "" { + return core.ExpandedTransaction{}, NewScriptError(ScriptErrorNoScript, + "no script to execute") + } + + m, err := NewMachineFromScript(script.Plain, l.cache, span) + if err != nil { + return core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + err.Error()) + } + + if err := m.SetVarsFromJSON(script.Vars); err != nil { + return core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + errors.Wrap(err, "could not set variables").Error()) + } + + resourcesChan, err := m.ResolveResources() + if err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, + "could not resolve program resources") + } + for req := range resourcesChan { + if req.Error != nil { + return core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + errors.Wrap(req.Error, "could not resolve program resources").Error()) + } + if _, ok := accs[req.Account]; !ok { + accs[req.Account], err = l.GetAccount(ctx, req.Account) + if err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, + fmt.Sprintf("could not get account %q", req.Account)) + } + } + if req.Key != "" { + entry, ok := accs[req.Account].Metadata[req.Key] + if !ok { + return core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + fmt.Sprintf("missing key %v in metadata for account %v", req.Key, req.Account)) + } + data, err := json.Marshal(entry) + if err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, "marshaling metadata") + } + value, err := machine.NewValueFromTypedJSON(data) + if err != nil { + return core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + errors.Wrap(err, fmt.Sprintf( + "invalid format for metadata at key %v for account %v", + req.Key, req.Account)).Error()) + } + req.Response <- *value + } else if req.Asset != "" { + amt := accs[req.Account].Balances[req.Asset].OrZero() + resp := machine.MonetaryInt(*amt) + req.Response <- &resp + } else { + return core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + errors.Wrap(err, fmt.Sprintf("invalid ResourceRequest: %+v", req)).Error()) + } + } + + balanceCh, err := m.ResolveBalances() + if err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, + "could not resolve balances") + } + for req := range balanceCh { + if req.Error != nil { + return core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + errors.Wrap(req.Error, "could not resolve program balances").Error()) + } + var amt *core.MonetaryInt + if _, ok := accs[req.Account]; !ok { + accs[req.Account], err = l.GetAccount(ctx, req.Account) + if err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, + fmt.Sprintf("could not get account %q", req.Account)) + } + } + amt = accs[req.Account].Balances[req.Asset].OrZero() + resp := machine.MonetaryInt(*amt) + req.Response <- &resp + } + + exitCode, err := m.Execute() + if err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, + "script execution failed") + } + + if exitCode != vm.EXIT_OK { + switch exitCode { + case vm.EXIT_FAIL: + return core.ExpandedTransaction{}, errors.New( + "script exited with error code EXIT_FAIL") + case vm.EXIT_FAIL_INVALID: + return core.ExpandedTransaction{}, errors.New( + "internal error: compiled script was invalid") + case vm.EXIT_FAIL_INSUFFICIENT_FUNDS: + // TODO: If the machine can provide the asset which is failing + // we should be able to use InsufficientFundError{} instead of error code + return core.ExpandedTransaction{}, NewScriptError(ScriptErrorInsufficientFund, + "account had insufficient funds") + default: + return core.ExpandedTransaction{}, errors.New( + "script execution failed") + } + } + + if len(m.Postings) == 0 { + return core.ExpandedTransaction{}, + NewValidationError("transaction has no postings") + } + + txVolumeAggr := vAggr.NextTx() + postings := make([]core.Posting, len(m.Postings)) + for j, posting := range m.Postings { + amt := core.MonetaryInt(*posting.Amount) + if err := txVolumeAggr.Transfer(ctx, + posting.Source, posting.Destination, posting.Asset, &amt, accs); err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, "transferring volumes") + } + postings[j] = core.Posting{ + Source: posting.Source, + Destination: posting.Destination, + Amount: &amt, + Asset: posting.Asset, + } + } + + for account, volumes := range txVolumeAggr.PostCommitVolumes { + if _, ok := accs[account]; !ok { + accs[account], err = l.GetAccount(ctx, account) + if err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, fmt.Sprintf("get account '%s'", account)) + } + } + for asset, vol := range volumes { + accs[account].Volumes[asset] = vol + } + accs[account].Balances = accs[account].Volumes.Balances() + } + + metadata := m.GetTxMetaJSON() + for k, v := range metadata { + asMapAny := make(map[string]any) + if err := json.Unmarshal(v.([]byte), &asMapAny); err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, "unmarshaling transaction metadata") + } + metadata[k] = asMapAny + } + for k, v := range script.Metadata { + _, ok := metadata[k] + if ok { + return core.ExpandedTransaction{}, NewScriptError(ScriptErrorMetadataOverride, + "cannot override metadata from script") + } + metadata[k] = v + } + + for account, meta := range m.GetAccountsMetaJSON() { + meta := meta.(map[string][]byte) + for k, v := range meta { + asMapAny := make(map[string]any) + if err := json.Unmarshal(v, &asMapAny); err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, "unmarshaling account metadata") + } + if account[0] == '@' { + account = account[1:] + } + if addOps.SetAccountMeta == nil { + addOps.SetAccountMeta = core.AccountsMeta{} + } + if _, ok := addOps.SetAccountMeta[account]; !ok { + addOps.SetAccountMeta[account] = core.Metadata{} + } + addOps.SetAccountMeta[account][k] = asMapAny + } + } + + tx := core.ExpandedTransaction{ + Transaction: core.Transaction{ + TransactionData: core.TransactionData{ + Postings: postings, + Reference: script.Reference, + Metadata: core.Metadata(metadata), + Timestamp: script.Timestamp, + }, + ID: nextTxId, + }, + PreCommitVolumes: txVolumeAggr.PreCommitVolumes, + PostCommitVolumes: txVolumeAggr.PostCommitVolumes, + } + + if preview { + return tx, nil + } + + if err := l.store.Commit(ctx, tx); err != nil { + switch { + case storage.IsErrorCode(err, storage.ConstraintFailed): + return core.ExpandedTransaction{}, NewConflictError() + default: + return core.ExpandedTransaction{}, errors.Wrap(err, + "committing transactions") + } + } + + if addOps != nil && addOps.SetAccountMeta != nil { + for addr, m := range addOps.SetAccountMeta { + if err := l.store.UpdateAccountMetadata(ctx, + addr, m, time.Now().Round(time.Second).UTC()); err != nil { + return core.ExpandedTransaction{}, errors.Wrap(err, + "updating account metadata") + } + } + } + + l.monitor.CommittedTransactions(ctx, l.store.Name(), txs...) + if addOps != nil && addOps.SetAccountMeta != nil { + for addr, m := range addOps.SetAccountMeta { + l.monitor.SavedMetadata(ctx, + l.store.Name(), core.MetaTargetTypeAccount, addr, m) + } + } + + return tx, nil +} + +func NewMachineFromScript(script string, cache *ristretto.Cache, span trace.Span) (*vm.Machine, error) { + h := sha256.New() + if _, err := h.Write([]byte(script)); err != nil { + return nil, errors.Wrap(err, "hashing script") + } + curr := h.Sum(nil) + + if cachedProgram, found := cache.Get(curr); found { + span.SetAttributes(attribute.Bool("numscript-cache-hit", true)) + return vm.NewMachine(cachedProgram.(program.Program)), nil + } + + span.SetAttributes(attribute.Bool("numscript-cache-hit", false)) + prog, err := compiler.Compile(script) + if err != nil { + return nil, err + } + + progSizeBytes := size.Of(*prog) + if progSizeBytes == -1 { + return nil, fmt.Errorf("error while calculating the size in bytes of the program") + } + cache.Set(curr, *prog, int64(progSizeBytes)) + + return vm.NewMachine(*prog), nil +} diff --git a/pkg/ledger/executor_test.go b/pkg/ledger/execute_script_test.go similarity index 60% rename from pkg/ledger/executor_test.go rename to pkg/ledger/execute_script_test.go index 75cf8db8f..349cf4922 100644 --- a/pkg/ledger/executor_test.go +++ b/pkg/ledger/execute_script_test.go @@ -5,7 +5,9 @@ import ( "crypto/sha256" "encoding/json" "fmt" + "sort" "strconv" + "strings" "testing" "github.com/DmitriyVTitov/size" @@ -22,7 +24,7 @@ func TestNoScript(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { script := core.ScriptData{} - _, err := l.Execute(context.Background(), false, false, script) + _, err := l.ExecuteScript(context.Background(), false, script) assert.IsType(t, &ledger.ScriptError{}, err) assert.Equal(t, ledger.ScriptErrorNoScript, err.(*ledger.ScriptError).Code) }) @@ -34,7 +36,7 @@ func TestCompilationError(t *testing.T) { Script: core.Script{Plain: "willnotcompile"}, } - _, err := l.Execute(context.Background(), false, false, script) + _, err := l.ExecuteScript(context.Background(), false, script) assert.IsType(t, &ledger.ScriptError{}, err) assert.Equal(t, ledger.ScriptErrorCompilationFailed, err.(*ledger.ScriptError).Code) }) @@ -42,25 +44,98 @@ func TestCompilationError(t *testing.T) { func TestSend(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - - script := core.ScriptData{ - Script: core.Script{ - Plain: ` + t.Run("nominal", func(t *testing.T) { + script := core.ScriptData{ + Script: core.Script{ + Plain: ` send [USD/2 99] ( - source=@world - destination=@user:001 + source = @world + destination = @user:001 )`, - }, - } + }, + } + _, err := l.ExecuteScript(context.Background(), false, script) + require.NoError(t, err) - _, err := l.Execute(context.Background(), false, false, script) - require.NoError(t, err) + assertBalance(t, l, "user:001", + "USD/2", core.NewMonetaryInt(99)) + }) - assertBalance(t, l, "user:001", - "USD/2", core.NewMonetaryInt(99)) + t.Run("one send with zero amount should fail", func(t *testing.T) { + script := core.ScriptData{ + Script: core.Script{ + Plain: ` + send [USD/2 0] ( + source = @world + destination = @user:001 + )`, + }, + } + _, err := l.ExecuteScript(context.Background(), false, script) + require.Error(t, err) + require.True(t, ledger.IsValidationError(err)) + require.ErrorContains(t, err, "transaction has no postings") + }) + + t.Run("one send with monetary all should fail", func(t *testing.T) { + script := core.ScriptData{ + Script: core.Script{ + Plain: ` + send [USD/2 *] ( + source = @alice + destination = @user:001 + )`, + }, + } + _, err := l.ExecuteScript(context.Background(), false, script) + require.Error(t, err) + require.True(t, ledger.IsValidationError(err)) + require.ErrorContains(t, err, "transaction has no postings") + }) + + t.Run("one send with zero amount and another with positive amount should succeed", func(t *testing.T) { + script := core.ScriptData{ + Script: core.Script{ + Plain: ` + send [USD/2 0] ( + source = @world + destination = @user:001 + ) + send [USD/2 1] ( + source = @world + destination = @user:001 + )`, + }, + } + res, err := l.ExecuteScript(context.Background(), false, script) + require.NoError(t, err) + require.Equal(t, 1, len(res.Postings)) + + assertBalance(t, l, "user:001", + "USD/2", core.NewMonetaryInt(100)) + }) + + t.Run("one send with monetary all and another with positive amount should succeed", func(t *testing.T) { + script := core.ScriptData{ + Script: core.Script{ + Plain: ` + send [USD/2 *] ( + source = @alice + destination = @user:001 + ) + send [USD/2 1] ( + source = @world + destination = @user:001 + )`, + }, + } + res, err := l.ExecuteScript(context.Background(), false, script) + require.NoError(t, err) + require.Equal(t, 1, len(res.Postings)) + + assertBalance(t, l, "user:001", + "USD/2", core.NewMonetaryInt(101)) + }) }) } @@ -81,19 +156,13 @@ func TestNoVariables(t *testing.T) { }, } - _, err := l.Execute(context.Background(), false, false, script) + _, err := l.ExecuteScript(context.Background(), false, script) assert.Error(t, err) - - require.NoError(t, l.Close(context.Background())) }) } func TestVariables(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - script := core.ScriptData{ Script: core.Script{ Plain: ` @@ -111,7 +180,7 @@ func TestVariables(t *testing.T) { }, } - _, err := l.Execute(context.Background(), false, false, script) + _, err := l.ExecuteScript(context.Background(), false, script) require.NoError(t, err) assertBalance(t, l, "user:042", @@ -121,10 +190,6 @@ func TestVariables(t *testing.T) { func TestVariablesEmptyAccount(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - script := core.ScriptData{ Script: core.Script{ Plain: ` @@ -134,7 +199,7 @@ func TestVariablesEmptyAccount(t *testing.T) { )`, }, } - _, err := l.Execute(context.Background(), false, false, script) + _, err := l.ExecuteScript(context.Background(), false, script) require.NoError(t, err) script = core.ScriptData{ @@ -156,7 +221,7 @@ func TestVariablesEmptyAccount(t *testing.T) { }, }, } - _, err = l.Execute(context.Background(), false, false, script) + _, err = l.ExecuteScript(context.Background(), false, script) require.NoError(t, err) assertBalance(t, l, "alice", "EUR", core.NewMonetaryInt(1)) @@ -166,10 +231,6 @@ func TestVariablesEmptyAccount(t *testing.T) { func TestEnoughFunds(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - tx := core.TransactionData{ Postings: []core.Posting{ { @@ -181,8 +242,7 @@ func TestEnoughFunds(t *testing.T) { }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(tx)...) + _, err := l.ExecuteTxsData(context.Background(), false, tx) require.NoError(t, err) script := core.ScriptData{ @@ -195,17 +255,13 @@ func TestEnoughFunds(t *testing.T) { }, } - _, err = l.Execute(context.Background(), false, false, script) + _, err = l.ExecuteScript(context.Background(), false, script) assert.NoError(t, err) }) } func TestNotEnoughFunds(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - tx := core.TransactionData{ Postings: []core.Posting{ { @@ -217,8 +273,7 @@ func TestNotEnoughFunds(t *testing.T) { }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(tx)...) + _, err := l.ExecuteTxsData(context.Background(), false, tx) require.NoError(t, err) script := core.ScriptData{ @@ -231,17 +286,13 @@ func TestNotEnoughFunds(t *testing.T) { }, } - _, err = l.Execute(context.Background(), false, false, script) + _, err = l.ExecuteScript(context.Background(), false, script) assert.True(t, ledger.IsScriptErrorWithCode(err, apierrors.ErrInsufficientFund)) }) } func TestMissingMetadata(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - plain := ` vars { account $sale @@ -261,17 +312,13 @@ func TestMissingMetadata(t *testing.T) { }, } - _, err := l.Execute(context.Background(), false, false, script) + _, err := l.ExecuteScript(context.Background(), false, script) assert.True(t, ledger.IsScriptErrorWithCode(err, ledger.ScriptErrorCompilationFailed)) }) } func TestMetadata(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - tx := core.TransactionData{ Postings: []core.Posting{ { @@ -283,8 +330,7 @@ func TestMetadata(t *testing.T) { }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(tx)...) + _, err := l.ExecuteTxsData(context.Background(), false, tx) require.NoError(t, err) err = l.SaveMeta(context.Background(), core.MetaTargetTypeAccount, @@ -333,7 +379,7 @@ func TestMetadata(t *testing.T) { }, } - _, err = l.Execute(context.Background(), false, false, script) + _, err = l.ExecuteScript(context.Background(), false, script) require.NoError(t, err) assertBalance(t, l, "sales:042", "COIN", core.NewMonetaryInt(0)) @@ -406,11 +452,7 @@ func TestSetTxMeta(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - - _, err := l.Execute(context.Background(), false, false, tc.script) + _, err := l.ExecuteScript(context.Background(), false, tc.script) if tc.expectedErrorCode != "" { require.Error(t, err) @@ -428,10 +470,6 @@ func TestSetTxMeta(t *testing.T) { func TestScriptSetReference(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - plain := ` send [USD/2 99] ( source=@world @@ -446,7 +484,7 @@ func TestScriptSetReference(t *testing.T) { Reference: "tx_ref", } - _, err := l.Execute(context.Background(), false, false, script) + _, err := l.ExecuteScript(context.Background(), false, script) require.NoError(t, err) last, err := l.GetLedgerStore().GetLastTransaction(context.Background()) @@ -458,11 +496,7 @@ func TestScriptSetReference(t *testing.T) { func TestScriptReferenceConflict(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - - _, err := l.Execute(context.Background(), false, false, + _, err := l.ExecuteScript(context.Background(), false, core.ScriptData{ Script: core.Script{ Plain: ` @@ -476,7 +510,7 @@ func TestScriptReferenceConflict(t *testing.T) { }) require.NoError(t, err) - _, err = l.Execute(context.Background(), false, false, + _, err = l.ExecuteScript(context.Background(), false, core.ScriptData{ Script: core.Script{ Plain: ` @@ -496,8 +530,8 @@ func TestScriptReferenceConflict(t *testing.T) { func TestSetAccountMeta(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { t.Run("valid", func(t *testing.T) { - res, err := l.Execute(context.Background(), - false, false, core.ScriptData{ + _, err := l.ExecuteScript(context.Background(), false, + core.ScriptData{ Script: core.Script{Plain: ` send [USD/2 99] ( source = @world @@ -511,7 +545,6 @@ func TestSetAccountMeta(t *testing.T) { `}, }) require.NoError(t, err) - require.Equal(t, 1, len(res)) acc, err := l.GetAccount(context.Background(), "alice") require.NoError(t, err) @@ -526,7 +559,7 @@ func TestSetAccountMeta(t *testing.T) { }) t.Run("invalid syntax", func(t *testing.T) { - _, err := l.Execute(context.Background(), false, false, + _, err := l.ExecuteScript(context.Background(), false, core.ScriptData{ Script: core.Script{Plain: ` send [USD/2 99] ( @@ -545,10 +578,6 @@ func TestSetAccountMeta(t *testing.T) { func TestMonetaryVariableBalance(t *testing.T) { t.Run("simple", func(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - tx := core.TransactionData{ Postings: []core.Posting{ { @@ -559,8 +588,7 @@ func TestMonetaryVariableBalance(t *testing.T) { }, }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(tx)...) + _, err := l.ExecuteTxsData(context.Background(), false, tx) require.NoError(t, err) script := core.ScriptData{ @@ -576,8 +604,7 @@ func TestMonetaryVariableBalance(t *testing.T) { }, } - _, err = l.Execute(context.Background(), - false, false, script) + _, err = l.ExecuteScript(context.Background(), false, script) require.NoError(t, err) assertBalance(t, l, "world", "COIN", core.NewMonetaryInt(0)) assertBalance(t, l, "users:001", "COIN", core.NewMonetaryInt(0)) @@ -586,10 +613,6 @@ func TestMonetaryVariableBalance(t *testing.T) { t.Run("complex", func(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - tx := core.TransactionData{ Postings: []core.Posting{ { @@ -606,8 +629,7 @@ func TestMonetaryVariableBalance(t *testing.T) { }, }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(tx)...) + _, err := l.ExecuteTxsData(context.Background(), false, tx) require.NoError(t, err) script := core.ScriptData{ @@ -629,8 +651,7 @@ func TestMonetaryVariableBalance(t *testing.T) { }, } - _, err = l.Execute(context.Background(), - false, false, script) + _, err = l.ExecuteScript(context.Background(), false, script) require.NoError(t, err) assertBalance(t, l, "B", "USD/2", core.NewMonetaryInt(40)) assertBalance(t, l, "D", "USD/2", core.NewMonetaryInt(60)) @@ -639,10 +660,6 @@ func TestMonetaryVariableBalance(t *testing.T) { t.Run("error insufficient funds", func(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - tx := core.TransactionData{ Postings: []core.Posting{ { @@ -653,8 +670,7 @@ func TestMonetaryVariableBalance(t *testing.T) { }, }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(tx)...) + _, err := l.ExecuteTxsData(context.Background(), false, tx) require.NoError(t, err) script := core.ScriptData{ @@ -673,18 +689,13 @@ func TestMonetaryVariableBalance(t *testing.T) { )`, }, } - _, err = l.Execute(context.Background(), - false, false, script) + _, err = l.ExecuteScript(context.Background(), false, script) assert.True(t, ledger.IsScriptErrorWithCode(err, apierrors.ErrInsufficientFund)) }) }) t.Run("error negative balance", func(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - tx := core.TransactionData{ Postings: []core.Posting{ { @@ -695,8 +706,7 @@ func TestMonetaryVariableBalance(t *testing.T) { }, }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(tx)...) + _, err := l.ExecuteTxsData(context.Background(), false, tx) require.NoError(t, err) script := core.ScriptData{ @@ -712,7 +722,7 @@ func TestMonetaryVariableBalance(t *testing.T) { }, } - _, err = l.Execute(context.Background(), false, false, script) + _, err = l.ExecuteScript(context.Background(), false, script) assert.True(t, ledger.IsScriptErrorWithCode(err, ledger.ScriptErrorCompilationFailed)) assert.ErrorContains(t, err, "must be non-negative") }) @@ -720,10 +730,6 @@ func TestMonetaryVariableBalance(t *testing.T) { t.Run("error variable type", func(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - defer func(l *ledger.Ledger, ctx context.Context) { - require.NoError(t, l.Close(ctx)) - }(l, context.Background()) - script := core.ScriptData{ Script: core.Script{ Plain: ` @@ -736,7 +742,7 @@ func TestMonetaryVariableBalance(t *testing.T) { )`, }, } - _, err := l.Execute(context.Background(), false, false, script) + _, err := l.ExecuteScript(context.Background(), false, script) assert.True(t, ledger.IsScriptErrorWithCode(err, apierrors.ErrScriptCompilationFailed)) }) }) @@ -769,17 +775,16 @@ func TestNewMachineFromScript(t *testing.T) { } _, err := txData.Postings.Validate() require.NoError(t, err) - scripts := core.TxsToScriptsData(txData) - script := scripts[0].Plain + script := txToScriptData(txData) h := sha256.New() - _, err = h.Write([]byte(script)) + _, err = h.Write([]byte(script.Plain)) require.NoError(t, err) key := h.Sum(nil) keySizeBytes := size.Of(key) require.NotEqual(t, -1, keySizeBytes) - prog, err := compiler.Compile(script) + prog, err := compiler.Compile(script.Plain) require.NoError(t, err) progSizeBytes := size.Of(*prog) require.NotEqual(t, -1, progSizeBytes) @@ -789,7 +794,7 @@ func TestNewMachineFromScript(t *testing.T) { cache := ledger.NewCache(capacityBytes, 1, true) - m, err := ledger.NewMachineFromScript(script, cache, span) + m, err := ledger.NewMachineFromScript(script.Plain, cache, span) require.NoError(t, err) require.NotNil(t, m) cache.Wait() @@ -797,7 +802,7 @@ func TestNewMachineFromScript(t *testing.T) { require.Equal(t, uint64(1), cache.Metrics.Misses()) require.Equal(t, uint64(1), cache.Metrics.KeysAdded()) - m, err = ledger.NewMachineFromScript(script, cache, span) + m, err = ledger.NewMachineFromScript(script.Plain, cache, span) require.NoError(t, err) require.NotNil(t, m) cache.Wait() @@ -811,7 +816,7 @@ func TestNewMachineFromScript(t *testing.T) { cache := ledger.NewCache(capacityBytes, 1, true) - m, err := ledger.NewMachineFromScript(script, cache, span) + m, err := ledger.NewMachineFromScript(script.Plain, cache, span) require.NoError(t, err) require.NotNil(t, m) cache.Wait() @@ -819,7 +824,7 @@ func TestNewMachineFromScript(t *testing.T) { require.Equal(t, uint64(1), cache.Metrics.Misses()) require.Equal(t, uint64(0), cache.Metrics.KeysAdded()) - m, err = ledger.NewMachineFromScript(script, cache, span) + m, err = ledger.NewMachineFromScript(script.Plain, cache, span) require.NoError(t, err) require.NotNil(t, m) cache.Wait() @@ -829,234 +834,113 @@ func TestNewMachineFromScript(t *testing.T) { }) } -var execRes []core.ExpandedTransaction +type variable struct { + name string + jsonVal json.RawMessage +} -const nbPostings = 1000 +func txToScriptData(txData core.TransactionData) core.ScriptData { + if len(txData.Postings) == 0 { + return core.ScriptData{} + } -func BenchmarkLedger_PostTransactionsSingle(b *testing.B) { - runOnLedger(func(l *ledger.Ledger) { - txData := core.TransactionData{} - for i := 0; i < nbPostings; i++ { - txData.Postings = append(txData.Postings, core.Posting{ - Source: "world", - Destination: "benchmarks:" + strconv.Itoa(i), - Asset: "COIN", - Amount: core.NewMonetaryInt(10), - }) + sb := strings.Builder{} + monetaryToVars := map[string]variable{} + accountsToVars := map[string]variable{} + i := 0 + j := 0 + for _, p := range txData.Postings { + if _, ok := accountsToVars[p.Source]; !ok { + if p.Source != core.WORLD { + accountsToVars[p.Source] = variable{ + name: fmt.Sprintf("va%d", i), + jsonVal: json.RawMessage(`"` + p.Source + `"`), + } + i++ + } } - - b.ResetTimer() - - res := []core.ExpandedTransaction{} - - for n := 0; n < b.N; n++ { - _, err := txData.Postings.Validate() - require.NoError(b, err) - script := core.TxsToScriptsData(txData) - res, err = l.Execute(context.Background(), true, true, script...) - require.NoError(b, err) - require.Len(b, res, 1) - require.Len(b, res[0].Postings, nbPostings) + if _, ok := accountsToVars[p.Destination]; !ok { + if p.Destination != core.WORLD { + accountsToVars[p.Destination] = variable{ + name: fmt.Sprintf("va%d", i), + jsonVal: json.RawMessage(`"` + p.Destination + `"`), + } + i++ + } + } + mon := fmt.Sprintf("[%s %s]", p.Amount.String(), p.Asset) + if _, ok := monetaryToVars[mon]; !ok { + monetaryToVars[mon] = variable{ + name: fmt.Sprintf("vm%d", j), + jsonVal: json.RawMessage( + `{"asset":"` + p.Asset + `","amount":` + p.Amount.String() + `}`), + } + j++ } - - execRes = res - require.Len(b, execRes, 1) - require.Len(b, execRes[0].Postings, nbPostings) - }) -} - -func newTxsData(i int) []core.TransactionData { - return []core.TransactionData{ - { - Postings: core.Postings{ - { - Source: "world", - Destination: fmt.Sprintf("payins:%d", i), - Amount: core.NewMonetaryInt(10000), - Asset: "EUR/2", - }, - }, - }, - { - Postings: core.Postings{ - { - Source: fmt.Sprintf("payins:%d", i), - Destination: fmt.Sprintf("users:%d:wallet", i), - Amount: core.NewMonetaryInt(10000), - Asset: "EUR/2", - }, - }, - }, - { - Postings: core.Postings{ - { - Source: "world", - Destination: fmt.Sprintf("teller:%d", i), - Amount: core.NewMonetaryInt(350000), - Asset: "RBLX/6", - }, - { - Source: "world", - Destination: fmt.Sprintf("teller:%d", i), - Amount: core.NewMonetaryInt(1840000), - Asset: "SNAP/6", - }, - }, - }, - { - Postings: core.Postings{ - { - Source: fmt.Sprintf("users:%d:wallet", i), - Destination: fmt.Sprintf("trades:%d", i), - Amount: core.NewMonetaryInt(1500), - Asset: "EUR/2", - }, - { - Source: fmt.Sprintf("trades:%d", i), - Destination: fmt.Sprintf("fiat:holdings:%d", i), - Amount: core.NewMonetaryInt(1500), - Asset: "EUR/2", - }, - { - Source: fmt.Sprintf("teller:%d", i), - Destination: fmt.Sprintf("trades:%d", i), - Amount: core.NewMonetaryInt(350000), - Asset: "RBLX/6", - }, - { - Source: fmt.Sprintf("trades:%d", i), - Destination: fmt.Sprintf("users:%d:wallet", i), - Amount: core.NewMonetaryInt(350000), - Asset: "RBLX/6", - }, - }, - }, - { - Postings: core.Postings{ - { - Source: fmt.Sprintf("users:%d:wallet", i), - Destination: fmt.Sprintf("trades:%d", i), - Amount: core.NewMonetaryInt(4230), - Asset: "EUR/2", - }, - { - Source: fmt.Sprintf("trades:%d", i), - Destination: fmt.Sprintf("fiat:holdings:%d", i), - Amount: core.NewMonetaryInt(4230), - Asset: "EUR/2", - }, - { - Source: fmt.Sprintf("teller:%d", i), - Destination: fmt.Sprintf("trades:%d", i), - Amount: core.NewMonetaryInt(1840000), - Asset: "SNAP/6", - }, - { - Source: fmt.Sprintf("trades:%d", i), - Destination: fmt.Sprintf("users:%d:wallet", i), - Amount: core.NewMonetaryInt(1840000), - Asset: "SNAP/6", - }, - }, - }, - { - Postings: core.Postings{ - { - Source: fmt.Sprintf("users:%d:wallet", i), - Destination: fmt.Sprintf("users:%d:withdrawals", i), - Amount: core.NewMonetaryInt(2270), - Asset: "EUR/2", - }, - }, - }, - { - Postings: core.Postings{ - { - Source: fmt.Sprintf("users:%d:withdrawals", i), - Destination: fmt.Sprintf("payouts:%d", i), - Amount: core.NewMonetaryInt(2270), - Asset: "EUR/2", - }, - }, - }, } -} - -func BenchmarkLedger_PostTransactionsBatch(b *testing.B) { - runOnLedger(func(l *ledger.Ledger) { - txsData := newTxsData(1) - b.ResetTimer() - - res := []core.ExpandedTransaction{} + sb.WriteString("vars {\n") + accVars := make([]string, 0) + for _, v := range accountsToVars { + accVars = append(accVars, v.name) + } + sort.Strings(accVars) + for _, v := range accVars { + sb.WriteString(fmt.Sprintf("\taccount $%s\n", v)) + } + monVars := make([]string, 0) + for _, v := range monetaryToVars { + monVars = append(monVars, v.name) + } + sort.Strings(monVars) + for _, v := range monVars { + sb.WriteString(fmt.Sprintf("\tmonetary $%s\n", v)) + } + sb.WriteString("}\n") - for n := 0; n < b.N; n++ { - var err error - for _, txData := range txsData { - _, err := txData.Postings.Validate() - require.NoError(b, err) + for _, p := range txData.Postings { + m := fmt.Sprintf("[%s %s]", p.Amount.String(), p.Asset) + mon, ok := monetaryToVars[m] + if !ok { + panic(fmt.Sprintf("monetary %s not found", m)) + } + sb.WriteString(fmt.Sprintf("send $%s (\n", mon.name)) + if p.Source == core.WORLD { + sb.WriteString("\tsource = @world\n") + } else { + src, ok := accountsToVars[p.Source] + if !ok { + panic(fmt.Sprintf("source %s not found", p.Source)) } - script := core.TxsToScriptsData(txsData...) - res, err = l.Execute(context.Background(), true, true, script...) - require.NoError(b, err) - require.Len(b, res, 7) - require.Len(b, res[0].Postings, 1) - require.Len(b, res[1].Postings, 1) - require.Len(b, res[2].Postings, 2) - require.Len(b, res[3].Postings, 4) - require.Len(b, res[4].Postings, 4) - require.Len(b, res[5].Postings, 1) - require.Len(b, res[6].Postings, 1) + sb.WriteString(fmt.Sprintf("\tsource = $%s allowing unbounded overdraft\n", src.name)) } - - execRes = res - require.Len(b, execRes, 7) - require.Len(b, execRes[0].Postings, 1) - require.Len(b, execRes[1].Postings, 1) - require.Len(b, execRes[2].Postings, 2) - require.Len(b, execRes[3].Postings, 4) - require.Len(b, execRes[4].Postings, 4) - require.Len(b, execRes[5].Postings, 1) - require.Len(b, execRes[6].Postings, 1) - }) -} - -func BenchmarkLedger_PostTransactionsBatch2(b *testing.B) { - runOnLedger(func(l *ledger.Ledger) { - b.ResetTimer() - - res := []core.ExpandedTransaction{} - - for n := 0; n < b.N; n++ { - b.StopTimer() - txsData := newTxsData(n) - b.StartTimer() - var err error - for _, txData := range txsData { - _, err := txData.Postings.Validate() - require.NoError(b, err) + if p.Destination == core.WORLD { + sb.WriteString("\tdestination = @world\n") + } else { + dest, ok := accountsToVars[p.Destination] + if !ok { + panic(fmt.Sprintf("destination %s not found", p.Destination)) } - script := core.TxsToScriptsData(txsData...) - res, err = l.Execute(context.Background(), true, true, script...) - require.NoError(b, err) - require.Len(b, res, 7) - require.Len(b, res[0].Postings, 1) - require.Len(b, res[1].Postings, 1) - require.Len(b, res[2].Postings, 2) - require.Len(b, res[3].Postings, 4) - require.Len(b, res[4].Postings, 4) - require.Len(b, res[5].Postings, 1) - require.Len(b, res[6].Postings, 1) + sb.WriteString(fmt.Sprintf("\tdestination = $%s\n", dest.name)) } + sb.WriteString(")\n") + } - execRes = res - require.Len(b, execRes, 7) - require.Len(b, execRes[0].Postings, 1) - require.Len(b, execRes[1].Postings, 1) - require.Len(b, execRes[2].Postings, 2) - require.Len(b, execRes[3].Postings, 4) - require.Len(b, execRes[4].Postings, 4) - require.Len(b, execRes[5].Postings, 1) - require.Len(b, execRes[6].Postings, 1) - }) + vars := map[string]json.RawMessage{} + for _, v := range accountsToVars { + vars[v.name] = v.jsonVal + } + for _, v := range monetaryToVars { + vars[v.name] = v.jsonVal + } + + return core.ScriptData{ + Script: core.Script{ + Plain: sb.String(), + Vars: vars, + }, + Timestamp: txData.Timestamp, + Reference: txData.Reference, + Metadata: txData.Metadata, + } } diff --git a/pkg/ledger/execute_txsdata.go b/pkg/ledger/execute_txsdata.go new file mode 100644 index 000000000..0c20f5b20 --- /dev/null +++ b/pkg/ledger/execute_txsdata.go @@ -0,0 +1,168 @@ +package ledger + +import ( + "context" + "fmt" + "time" + + "github.com/numary/ledger/pkg/core" + "github.com/numary/ledger/pkg/opentelemetry" + "github.com/numary/ledger/pkg/storage" + "github.com/pkg/errors" +) + +func (l *Ledger) ExecuteTxsData(ctx context.Context, preview bool, txsData ...core.TransactionData) ([]core.ExpandedTransaction, error) { + ctx, span := opentelemetry.Start(ctx, "ExecuteTxsData") + defer span.End() + + if len(txsData) == 0 { + return []core.ExpandedTransaction{}, errors.New("no transaction data to execute") + } + + lastTx, err := l.store.GetLastTransaction(ctx) + if err != nil { + return []core.ExpandedTransaction{}, errors.Wrap(err, + "could not get last transaction") + } + + vAggr := NewVolumeAggregator(l) + txs := make([]core.ExpandedTransaction, 0) + var nextTxId uint64 + var lastTxTimestamp time.Time + if lastTx != nil { + nextTxId = lastTx.ID + 1 + lastTxTimestamp = lastTx.Timestamp + } + + contracts := make([]core.Contract, 0) + mapping, err := l.store.LoadMapping(ctx) + if err != nil { + return []core.ExpandedTransaction{}, errors.Wrap(err, + "loading mapping") + } + if mapping != nil { + contracts = append(contracts, mapping.Contracts...) + } + contracts = append(contracts, DefaultContracts...) + + usedReferences := make(map[string]struct{}) + accs := map[string]*core.AccountWithVolumes{} + for i, txData := range txsData { + if len(txData.Postings) == 0 { + return []core.ExpandedTransaction{}, NewValidationError( + fmt.Sprintf("executing transaction data %d: no postings", i)) + } + // Until v1.5.0, dates was stored as string using rfc3339 format + // So round the date to the second to keep the same behaviour + if txData.Timestamp.IsZero() { + txData.Timestamp = time.Now().UTC().Truncate(time.Second) + } else { + txData.Timestamp = txData.Timestamp.UTC() + } + + past := false + if lastTx != nil && txData.Timestamp.Before(lastTxTimestamp) { + past = true + } + if past && !l.allowPastTimestamps { + return []core.ExpandedTransaction{}, NewValidationError(fmt.Sprintf( + "cannot pass a timestamp prior to the last transaction: %s (passed) is %s before %s (last)", + txData.Timestamp.Format(time.RFC3339Nano), + lastTxTimestamp.Sub(txData.Timestamp), + lastTxTimestamp.Format(time.RFC3339Nano))) + } + lastTxTimestamp = txData.Timestamp + + if txData.Reference != "" { + if _, ok := usedReferences[txData.Reference]; ok { + return []core.ExpandedTransaction{}, NewConflictError() + } + usedReferences[txData.Reference] = struct{}{} + + txs, err := l.GetTransactions(ctx, *NewTransactionsQuery(). + WithReferenceFilter(txData.Reference)) + if err != nil { + return []core.ExpandedTransaction{}, errors.Wrap(err, + "get transactions with reference") + } + if len(txs.Data) > 0 { + return []core.ExpandedTransaction{}, NewConflictError() + } + } + + txVolumeAggr := vAggr.NextTx() + for _, posting := range txData.Postings { + if err := txVolumeAggr.Transfer(ctx, + posting.Source, posting.Destination, posting.Asset, posting.Amount, accs); err != nil { + return []core.ExpandedTransaction{}, NewTransactionCommitError(i, err) + } + } + + for account, volumes := range txVolumeAggr.PostCommitVolumes { + if _, ok := accs[account]; !ok { + accs[account], err = l.GetAccount(ctx, account) + if err != nil { + return []core.ExpandedTransaction{}, NewTransactionCommitError(i, + errors.Wrap(err, fmt.Sprintf("get account '%s'", account))) + } + } + for asset, vol := range volumes { + accs[account].Volumes[asset] = vol + } + accs[account].Balances = accs[account].Volumes.Balances() + for asset, volume := range volumes { + if account == core.WORLD { + continue + } + + for _, contract := range contracts { + if contract.Match(account) { + if ok := contract.Expr.Eval(core.EvalContext{ + Variables: map[string]interface{}{ + "balance": volume.Balance(), + }, + Metadata: accs[account].Metadata, + Asset: asset, + }); !ok { + return []core.ExpandedTransaction{}, NewInsufficientFundError(asset) + } + break + } + } + } + } + + if txData.Metadata == nil { + txData.Metadata = core.Metadata{} + } + + tx := core.ExpandedTransaction{ + Transaction: core.Transaction{ + TransactionData: txData, + ID: nextTxId, + }, + PreCommitVolumes: txVolumeAggr.PreCommitVolumes, + PostCommitVolumes: txVolumeAggr.PostCommitVolumes, + } + lastTx = &tx + txs = append(txs, tx) + nextTxId++ + } + + if preview { + return txs, nil + } + + if err := l.store.Commit(ctx, txs...); err != nil { + switch { + case storage.IsErrorCode(err, storage.ConstraintFailed): + return []core.ExpandedTransaction{}, NewConflictError() + default: + return []core.ExpandedTransaction{}, errors.Wrap(err, + "committing transactions") + } + } + + l.monitor.CommittedTransactions(ctx, l.store.Name(), txs...) + return txs, nil +} diff --git a/pkg/ledger/process_test.go b/pkg/ledger/execute_txsdata_test.go similarity index 92% rename from pkg/ledger/process_test.go rename to pkg/ledger/execute_txsdata_test.go index 9a9e477f2..9057e4a44 100644 --- a/pkg/ledger/process_test.go +++ b/pkg/ledger/execute_txsdata_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestLedger_processTx(t *testing.T) { +func TestLedger_ExecuteTxsData(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { t.Run("multi assets", func(t *testing.T) { worldTotoUSD := core.NewMonetaryInt(43) @@ -135,8 +135,7 @@ func TestLedger_processTx(t *testing.T) { }, } - res, err := l.Execute(context.Background(), true, true, - core.TxsToScriptsData(txsData...)...) + res, err := l.ExecuteTxsData(context.Background(), true, txsData...) assert.NoError(t, err) assert.Equal(t, len(txsData), len(res)) @@ -186,8 +185,7 @@ func TestLedger_processTx(t *testing.T) { }, } - res, err := l.Execute(context.Background(), true, true, - core.TxsToScriptsData(txsData...)...) + res, err := l.ExecuteTxsData(context.Background(), true, txsData...) require.NoError(t, err) require.Equal(t, len(txsData), len(res)) @@ -308,10 +306,25 @@ func TestLedger_processTx(t *testing.T) { }) }) - t.Run("no script", func(t *testing.T) { - _, err := l.Execute(context.Background(), true, true, core.ScriptData{}) + t.Run("empty", func(t *testing.T) { + _, err := l.ExecuteTxsData(context.Background(), true, core.TransactionData{}) assert.Error(t, err) - assert.ErrorContains(t, err, "no script to execute") + assert.ErrorContains(t, err, "executing transaction data 0: no postings") + }) + + t.Run("amount zero", func(t *testing.T) { + res, err := l.ExecuteTxsData(context.Background(), true, core.TransactionData{ + Postings: core.Postings{ + { + Source: "world", + Destination: "alice", + Amount: core.NewMonetaryInt(0), + Asset: "USD", + }, + }, + }) + assert.NoError(t, err) + assert.Equal(t, 1, len(res)) }) }) @@ -328,8 +341,8 @@ func TestLedger_processTx(t *testing.T) { }, })) - _, err := l.Execute(context.Background(), true, true, - core.TxsToScriptsData(core.TransactionData{ + _, err := l.ExecuteTxsData(context.Background(), true, + core.TransactionData{ Postings: []core.Posting{{ Source: "world", Destination: "bank", @@ -337,7 +350,7 @@ func TestLedger_processTx(t *testing.T) { Asset: "USD", }}, Timestamp: now.UTC().Add(-time.Second), - })...) + }) assert.NoError(t, err) }) }, ledger.WithPastTimestamps) diff --git a/pkg/ledger/executor.go b/pkg/ledger/executor.go deleted file mode 100644 index 063ee0b1e..000000000 --- a/pkg/ledger/executor.go +++ /dev/null @@ -1,381 +0,0 @@ -package ledger - -import ( - "context" - "crypto/sha256" - "encoding/json" - "fmt" - "time" - - "github.com/DmitriyVTitov/size" - "github.com/dgraph-io/ristretto" - machine "github.com/formancehq/machine/core" - "github.com/formancehq/machine/script/compiler" - "github.com/formancehq/machine/vm" - "github.com/formancehq/machine/vm/program" - "github.com/numary/ledger/pkg/core" - "github.com/numary/ledger/pkg/opentelemetry" - "github.com/numary/ledger/pkg/storage" - "github.com/pkg/errors" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -func (l *Ledger) Execute(ctx context.Context, checkMapping, preview bool, scripts ...core.ScriptData) ([]core.ExpandedTransaction, error) { - ctx, span := opentelemetry.Start(ctx, "Execute") - defer span.End() - - if len(scripts) == 0 { - return []core.ExpandedTransaction{}, - NewScriptError(ScriptErrorNoScript, "no script to execute") - } - - addOps := new(core.AdditionalOperations) - - lastTx, err := l.store.GetLastTransaction(ctx) - if err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, - "could not get last transaction") - } - - vAggr := NewVolumeAggregator(l) - txs := make([]core.ExpandedTransaction, 0) - var nextTxId uint64 - var lastTxTimestamp time.Time - if lastTx != nil { - nextTxId = lastTx.ID + 1 - lastTxTimestamp = lastTx.Timestamp - } - contracts := make([]core.Contract, 0) - if checkMapping { - mapping, err := l.store.LoadMapping(ctx) - if err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, - "loading mapping") - } - if mapping != nil { - contracts = append(contracts, mapping.Contracts...) - } - contracts = append(contracts, DefaultContracts...) - } - - usedReferences := make(map[string]struct{}) - accs := map[string]*core.AccountWithVolumes{} - for i, script := range scripts { - // Until v1.5.0, dates was stored as string using rfc3339 format - // So round the date to the second to keep the same behaviour - if script.Timestamp.IsZero() { - script.Timestamp = time.Now().UTC().Truncate(time.Second) - } else { - script.Timestamp = script.Timestamp.UTC() - } - - past := false - if lastTx != nil && script.Timestamp.Before(lastTxTimestamp) { - past = true - } - if past && !l.allowPastTimestamps { - return []core.ExpandedTransaction{}, NewValidationError(fmt.Sprintf( - "cannot pass a timestamp prior to the last transaction: %s (passed) is %s before %s (last)", - script.Timestamp.Format(time.RFC3339Nano), - lastTxTimestamp.Sub(script.Timestamp), - lastTxTimestamp.Format(time.RFC3339Nano))) - } - lastTxTimestamp = script.Timestamp - - if script.Reference != "" { - if _, ok := usedReferences[script.Reference]; ok { - return []core.ExpandedTransaction{}, NewConflictError() - } - usedReferences[script.Reference] = struct{}{} - - txs, err := l.GetTransactions(ctx, *NewTransactionsQuery(). - WithReferenceFilter(script.Reference)) - if err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, "GetTransactions") - } - if len(txs.Data) > 0 { - return []core.ExpandedTransaction{}, NewConflictError() - } - } - - if script.Plain == "" { - return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorNoScript, - "no script to execute") - } - - m, err := NewMachineFromScript(script.Plain, l.cache, span) - if err != nil { - return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, - err.Error()) - } - - if err := m.SetVarsFromJSON(script.Vars); err != nil { - return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, - errors.Wrap(err, "could not set variables").Error()) - } - - resourcesChan, err := m.ResolveResources() - if err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, - "could not resolve program resources") - } - for req := range resourcesChan { - if req.Error != nil { - return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, - errors.Wrap(req.Error, "could not resolve program resources").Error()) - } - if _, ok := accs[req.Account]; !ok { - accs[req.Account], err = l.GetAccount(ctx, req.Account) - if err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, - fmt.Sprintf("could not get account %q", req.Account)) - } - } - if req.Key != "" { - entry, ok := accs[req.Account].Metadata[req.Key] - if !ok { - return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, - fmt.Sprintf("missing key %v in metadata for account %v", req.Key, req.Account)) - } - data, err := json.Marshal(entry) - if err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, "json.Marshal") - } - value, err := machine.NewValueFromTypedJSON(data) - if err != nil { - return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, - errors.Wrap(err, fmt.Sprintf( - "invalid format for metadata at key %v for account %v", - req.Key, req.Account)).Error()) - } - req.Response <- *value - } else if req.Asset != "" { - amt := accs[req.Account].Balances[req.Asset].OrZero() - resp := machine.MonetaryInt(*amt) - req.Response <- &resp - } else { - return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, - errors.Wrap(err, fmt.Sprintf("invalid ResourceRequest: %+v", req)).Error()) - } - } - - balanceCh, err := m.ResolveBalances() - if err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, - "could not resolve balances") - } - for req := range balanceCh { - if req.Error != nil { - return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, - errors.Wrap(req.Error, "could not resolve program balances").Error()) - } - var amt *core.MonetaryInt - if _, ok := accs[req.Account]; !ok { - accs[req.Account], err = l.GetAccount(ctx, req.Account) - if err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, - fmt.Sprintf("could not get account %q", req.Account)) - } - } - amt = accs[req.Account].Balances[req.Asset].OrZero() - resp := machine.MonetaryInt(*amt) - req.Response <- &resp - } - - exitCode, err := m.Execute() - if err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, - "script execution failed") - } - - if exitCode != vm.EXIT_OK { - switch exitCode { - case vm.EXIT_FAIL: - return []core.ExpandedTransaction{}, errors.New( - "script exited with error code EXIT_FAIL") - case vm.EXIT_FAIL_INVALID: - return []core.ExpandedTransaction{}, errors.New( - "internal error: compiled script was invalid") - case vm.EXIT_FAIL_INSUFFICIENT_FUNDS: - // TODO: If the machine can provide the asset which is failing - // we should be able to use InsufficientFundError{} instead of error code - return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorInsufficientFund, - "account had insufficient funds") - default: - return []core.ExpandedTransaction{}, errors.New( - "script execution failed") - } - } - - if len(m.Postings) == 0 { - return []core.ExpandedTransaction{}, - NewValidationError("transaction has no postings") - } - - txVolumeAggr := vAggr.NextTx() - postings := make([]core.Posting, len(m.Postings)) - for j, posting := range m.Postings { - amt := core.MonetaryInt(*posting.Amount) - if err := txVolumeAggr.Transfer(ctx, - posting.Source, posting.Destination, posting.Asset, &amt, accs); err != nil { - return []core.ExpandedTransaction{}, NewTransactionCommitError(i, err) - } - postings[j] = core.Posting{ - Source: posting.Source, - Destination: posting.Destination, - Amount: &amt, - Asset: posting.Asset, - } - } - - for account, volumes := range txVolumeAggr.PostCommitVolumes { - if _, ok := accs[account]; !ok { - accs[account], err = l.GetAccount(ctx, account) - if err != nil { - return []core.ExpandedTransaction{}, NewTransactionCommitError(i, - errors.Wrap(err, fmt.Sprintf("GetAccount '%s'", account))) - } - } - for asset, vol := range volumes { - accs[account].Volumes[asset] = vol - } - accs[account].Balances = accs[account].Volumes.Balances() - for asset, volume := range volumes { - if account == core.WORLD { - continue - } - - for _, contract := range contracts { - if contract.Match(account) { - if ok := contract.Expr.Eval(core.EvalContext{ - Variables: map[string]interface{}{ - "balance": volume.Balance(), - }, - Metadata: accs[account].Metadata, - Asset: asset, - }); !ok { - return []core.ExpandedTransaction{}, NewInsufficientFundError(asset) - } - break - } - } - } - } - - metadata := m.GetTxMetaJSON() - for k, v := range metadata { - asMapAny := make(map[string]any) - if err := json.Unmarshal(v.([]byte), &asMapAny); err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, "json.Unmarshal") - } - metadata[k] = asMapAny - } - for k, v := range script.Metadata { - _, ok := metadata[k] - if ok { - return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorMetadataOverride, - "cannot override metadata from script") - } - metadata[k] = v - } - - for account, meta := range m.GetAccountsMetaJSON() { - meta := meta.(map[string][]byte) - for k, v := range meta { - asMapAny := make(map[string]any) - if err := json.Unmarshal(v, &asMapAny); err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, "json.Unmarshal") - } - if account[0] == '@' { - account = account[1:] - } - if addOps.SetAccountMeta == nil { - addOps.SetAccountMeta = core.AccountsMeta{} - } - if _, ok := addOps.SetAccountMeta[account]; !ok { - addOps.SetAccountMeta[account] = core.Metadata{} - } - addOps.SetAccountMeta[account][k] = asMapAny - } - } - - tx := core.ExpandedTransaction{ - Transaction: core.Transaction{ - TransactionData: core.TransactionData{ - Postings: postings, - Reference: script.Reference, - Metadata: core.Metadata(metadata), - Timestamp: script.Timestamp, - }, - ID: nextTxId, - }, - PreCommitVolumes: txVolumeAggr.PreCommitVolumes, - PostCommitVolumes: txVolumeAggr.PostCommitVolumes, - } - lastTx = &tx - txs = append(txs, tx) - nextTxId++ - } - - if preview { - return txs, nil - } - - if err := l.store.Commit(ctx, txs...); err != nil { - switch { - case storage.IsErrorCode(err, storage.ConstraintFailed): - return []core.ExpandedTransaction{}, NewConflictError() - default: - return []core.ExpandedTransaction{}, errors.Wrap(err, - "committing transactions") - } - } - - if addOps != nil && addOps.SetAccountMeta != nil { - for addr, m := range addOps.SetAccountMeta { - if err := l.store.UpdateAccountMetadata(ctx, - addr, m, time.Now().Round(time.Second).UTC()); err != nil { - return []core.ExpandedTransaction{}, errors.Wrap(err, - "updating account metadata") - } - } - } - - l.monitor.CommittedTransactions(ctx, l.store.Name(), txs...) - if addOps != nil && addOps.SetAccountMeta != nil { - for addr, m := range addOps.SetAccountMeta { - l.monitor.SavedMetadata(ctx, - l.store.Name(), core.MetaTargetTypeAccount, addr, m) - } - } - - return txs, nil -} - -func NewMachineFromScript(script string, cache *ristretto.Cache, span trace.Span) (*vm.Machine, error) { - h := sha256.New() - if _, err := h.Write([]byte(script)); err != nil { - return nil, errors.Wrap(err, "hashing script") - } - curr := h.Sum(nil) - - if cachedProgram, found := cache.Get(curr); found { - span.SetAttributes(attribute.Bool("numscript-cache-hit", true)) - return vm.NewMachine(cachedProgram.(program.Program)), nil - } - - span.SetAttributes(attribute.Bool("numscript-cache-hit", false)) - prog, err := compiler.Compile(script) - if err != nil { - return nil, err - } - - progSizeBytes := size.Of(*prog) - if progSizeBytes == -1 { - return nil, fmt.Errorf("error while calculating the size in bytes of the program") - } - cache.Set(curr, *prog, int64(progSizeBytes)) - - return vm.NewMachine(*prog), nil -} diff --git a/pkg/ledger/ledger.go b/pkg/ledger/ledger.go index a4b80c37b..aad3381d4 100644 --- a/pkg/ledger/ledger.go +++ b/pkg/ledger/ledger.go @@ -119,8 +119,7 @@ func (l *Ledger) RevertTransaction(ctx context.Context, id uint64) (*core.Expand Reference: rt.Reference, Metadata: rt.Metadata, } - res, err := l.Execute(ctx, false, false, - core.TxsToScriptsData(txData)...) + res, err := l.ExecuteTxsData(ctx, false, txData) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf( "executing revert script for transaction %d", id)) diff --git a/pkg/ledger/ledger_test.go b/pkg/ledger/ledger_test.go index 6b955ae76..285a4c3f4 100644 --- a/pkg/ledger/ledger_test.go +++ b/pkg/ledger/ledger_test.go @@ -137,8 +137,7 @@ func TestTransaction(t *testing.T) { continue } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(batch...)...) + _, err := l.ExecuteTxsData(context.Background(), false, batch...) require.NoError(t, err) batch = []core.TransactionData{} @@ -150,7 +149,7 @@ func TestTransaction(t *testing.T) { expected := total.Neg() b := world.Balances["GEM"] assert.Equalf(t, expected, b, - "wrong GEM balance for account world, expected: %d got: %d", + "wrong GEM balance for account world, expected: %s got: %s", expected, b) require.NoError(t, l.Close(context.Background())) @@ -196,8 +195,7 @@ func TestTransactionBatchWithConflictingReference(t *testing.T) { }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(batch...)...) + _, err := l.ExecuteTxsData(context.Background(), false, batch...) assert.Error(t, err) assert.IsType(t, new(ledger.ConflictError), err) }) @@ -215,12 +213,10 @@ func TestTransactionBatchWithConflictingReference(t *testing.T) { }, Reference: "ref1", } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(txData)...) + _, err := l.ExecuteTxsData(context.Background(), false, txData) require.NoError(t, err) - _, err = l.Execute(context.Background(), - true, false, core.TxsToScriptsData(txData)...) + _, err = l.ExecuteTxsData(context.Background(), false, txData) assert.Error(t, err) assert.IsType(t, new(ledger.ConflictError), err) }) @@ -258,8 +254,7 @@ func TestTransactionBatchTimestamps(t *testing.T) { Timestamp: timestamp1, }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(batch...)...) + _, err := l.ExecuteTxsData(context.Background(), false, batch...) require.True(t, ledger.IsValidationError(err), err) require.ErrorContains(t, err, "cannot pass a timestamp prior to the last transaction") }) @@ -288,8 +283,7 @@ func TestTransactionBatchTimestamps(t *testing.T) { Timestamp: timestamp3, }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(batch...)...) + _, err := l.ExecuteTxsData(context.Background(), false, batch...) assert.NoError(t, err) }) t.Run("ascending order but before last inserted should fail", func(t *testing.T) { @@ -317,8 +311,7 @@ func TestTransactionBatchTimestamps(t *testing.T) { Timestamp: timestamp4, }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(batch...)...) + _, err := l.ExecuteTxsData(context.Background(), false, batch...) require.True(t, ledger.IsValidationError(err)) require.ErrorContains(t, err, "cannot pass a timestamp prior to the last transaction") }) @@ -370,8 +363,7 @@ func TestTransactionExpectedVolumes(t *testing.T) { }, } - res, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(txsData...)...) + res, err := l.ExecuteTxsData(context.Background(), false, txsData...) assert.NoError(t, err) postCommitVolumes := core.AggregatePostCommitVolumes(res...) @@ -421,12 +413,10 @@ func TestReference(t *testing.T) { }, } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(tx)...) + _, err := l.ExecuteTxsData(context.Background(), false, tx) require.NoError(t, err) - _, err = l.Execute(context.Background(), - true, false, core.TxsToScriptsData(tx)...) + _, err = l.ExecuteTxsData(context.Background(), false, tx) assert.Error(t, err) }) } @@ -457,17 +447,16 @@ func TestAccountMetadata(t *testing.T) { { // We have to create at least one transaction to retrieve an account from GetAccounts store method - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(core.TransactionData{ - Postings: core.Postings{ - { - Source: "world", - Amount: core.NewMonetaryInt(100), - Asset: "USD", - Destination: "users:001", - }, + _, err := l.ExecuteTxsData(context.Background(), false, core.TransactionData{ + Postings: core.Postings{ + { + Source: "world", + Amount: core.NewMonetaryInt(100), + Asset: "USD", + Destination: "users:001", }, - })...) + }, + }) assert.NoError(t, err) acc, err := l.GetAccount(context.Background(), "users:001") @@ -484,8 +473,8 @@ func TestAccountMetadata(t *testing.T) { func TestTransactionMetadata(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - _, err := l.Execute(context.Background(), true, false, - core.TxsToScriptsData(core.TransactionData{ + _, err := l.ExecuteTxsData(context.Background(), false, + core.TransactionData{ Postings: []core.Posting{ { Source: "world", @@ -494,7 +483,7 @@ func TestTransactionMetadata(t *testing.T) { Asset: "COIN", }, }, - })...) + }) require.NoError(t, err) tx, err := l.GetLedgerStore().GetLastTransaction(context.Background()) @@ -523,8 +512,8 @@ func TestTransactionMetadata(t *testing.T) { func TestSaveTransactionMetadata(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - _, err := l.Execute(context.Background(), true, false, - core.TxsToScriptsData(core.TransactionData{ + _, err := l.ExecuteTxsData(context.Background(), false, + core.TransactionData{ Postings: []core.Posting{ { Source: "world", @@ -536,7 +525,7 @@ func TestSaveTransactionMetadata(t *testing.T) { Metadata: core.Metadata{ "a metadata": "a value", }, - })...) + }) require.NoError(t, err) tx, err := l.GetLedgerStore().GetLastTransaction(context.Background()) @@ -552,8 +541,8 @@ func TestSaveTransactionMetadata(t *testing.T) { func TestGetTransaction(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - _, err := l.Execute(context.Background(), true, false, - core.TxsToScriptsData(core.TransactionData{ + _, err := l.ExecuteTxsData(context.Background(), false, + core.TransactionData{ Reference: "bar", Postings: []core.Posting{ { @@ -563,7 +552,7 @@ func TestGetTransaction(t *testing.T) { Asset: "COIN", }, }, - })...) + }) require.NoError(t, err) last, err := l.GetLedgerStore().GetLastTransaction(context.Background()) @@ -589,8 +578,7 @@ func TestGetTransactions(t *testing.T) { }, } - _, err := l.Execute(context.Background(), true, false, - core.TxsToScriptsData(tx)...) + _, err := l.ExecuteTxsData(context.Background(), false, tx) require.NoError(t, err) res, err := l.GetTransactions(context.Background(), *ledger.NewTransactionsQuery()) @@ -604,8 +592,8 @@ func TestRevertTransaction(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { revertAmt := core.NewMonetaryInt(100) - res, err := l.Execute(context.Background(), true, false, - core.TxsToScriptsData(core.TransactionData{ + res, err := l.ExecuteTxsData(context.Background(), false, + core.TransactionData{ Reference: "foo", Postings: []core.Posting{ { @@ -615,7 +603,7 @@ func TestRevertTransaction(t *testing.T) { Asset: "COIN", }, }, - })...) + }) require.NoError(t, err) world, err := l.GetAccount(context.Background(), "world") @@ -659,20 +647,19 @@ func TestRevertTransaction(t *testing.T) { func TestVeryBigTransaction(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - amount, err := core.ParseMonetaryInt( "199999999999999999992919191919192929292939847477171818284637291884661818183647392936472918836161728274766266161728493736383838") require.NoError(t, err) - res, err := l.Execute(context.Background(), true, false, - core.TxsToScriptsData(core.TransactionData{ + res, err := l.ExecuteTxsData(context.Background(), false, + core.TransactionData{ Postings: []core.Posting{{ Source: "world", Destination: "bank", Asset: "ETH/18", Amount: amount, }}, - })...) + }) require.NoError(t, err) txFromDB, err := l.GetTransaction(context.Background(), res[0].ID) @@ -697,8 +684,7 @@ func BenchmarkTransaction1(b *testing.B) { }, }) - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(txs...)...) + _, err := l.ExecuteTxsData(context.Background(), false, txs...) require.NoError(b, err) } }) @@ -723,8 +709,7 @@ func BenchmarkTransaction_20_1k(b *testing.B) { }) } - _, err := l.Execute(context.Background(), - true, false, core.TxsToScriptsData(txs...)...) + _, err := l.ExecuteTxsData(context.Background(), false, txs...) require.NoError(b, err) } }