Skip to content

Commit

Permalink
abci: add preprocess block (#110)
Browse files Browse the repository at this point in the history
* version docs

* stash commits

* fix url

* remove master from sidebar

* fix build errors

* move to consensus connection

* add process

* add proof to block

* add block metadata in beginBlcok

* fix testing

* regenerate mocks

* add in needed proto changes

* regenerate mocks

* fix makeblock calls

* Apply suggestions from code review

* fix rpc tests

* fix linting and ci errors

* fix consensus tests

* add preprocess to e2e

* add preprocess to counter app

* replace meta_data with messages

* fix linting

* Update state/execution.go

Co-authored-by: Ismail Khoffi <Ismail.Khoffi@gmail.com>

* fix comment

* fix e2e tests

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Ismail Khoffi <Ismail.Khoffi@gmail.com>
  • Loading branch information
3 people authored and evan-forbes committed Sep 21, 2021
1 parent e4d922b commit fee0e1d
Show file tree
Hide file tree
Showing 20 changed files with 1,093 additions and 227 deletions.
2 changes: 2 additions & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Client interface {
OfferSnapshotAsync(context.Context, types.RequestOfferSnapshot) (*ReqRes, error)
LoadSnapshotChunkAsync(context.Context, types.RequestLoadSnapshotChunk) (*ReqRes, error)
ApplySnapshotChunkAsync(context.Context, types.RequestApplySnapshotChunk) (*ReqRes, error)
PreprocessTxsAsync(context.Context, types.RequestPreprocessTxs) (*ReqRes, error)

// Synchronous requests
FlushSync(context.Context) error
Expand All @@ -62,6 +63,7 @@ type Client interface {
OfferSnapshotSync(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunkSync(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
ApplySnapshotChunkSync(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
PreprocessTxsSync(context.Context, types.RequestPreprocessTxs) (*types.ResponsePreprocessTxs, error)
}

//----------------------------------------
Expand Down
21 changes: 21 additions & 0 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(
)
}

func (cli *grpcClient) PreprocessTxsAsync(ctx context.Context, params types.RequestPreprocessTxs) (*ReqRes, error) {
req := types.ToRequestPreprocessTxs(params)
res, err := cli.client.PreprocessTxs(context.Background(), req.GetPreprocessTxs(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(ctx, req, &types.Response{Value: &types.Response_PreprocessTxs{PreprocessTxs: res}})
}

// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func (cli *grpcClient) finishAsyncCall(ctx context.Context, req *types.Request, res *types.Response) (*ReqRes, error) {
Expand Down Expand Up @@ -504,3 +513,15 @@ func (cli *grpcClient) ApplySnapshotChunkSync(
}
return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
}

func (cli *grpcClient) PreprocessTxsSync(
ctx context.Context,
params types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {

reqres, err := cli.PreprocessTxsAsync(ctx, params)
if err != nil {
return nil, err
}
return reqres.Response.GetPreprocessTxs(), cli.Error()
}
22 changes: 22 additions & 0 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@ func (app *localClient) ApplySnapshotChunkAsync(
), nil
}

func (app *localClient) PreprocessTxsAsync(ctx context.Context, req types.RequestPreprocessTxs) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.PreprocessTxs(req)
return app.callback(
types.ToRequestPreprocessTxs(req),
types.ToResponsePreprocessTx(res),
), nil
}

//-------------------------------------------------------

func (app *localClient) FlushSync(ctx context.Context) error {
Expand Down Expand Up @@ -346,6 +357,17 @@ func (app *localClient) ApplySnapshotChunkSync(
return &res, nil
}

func (app *localClient) PreprocessTxsSync(
ctx context.Context,
req types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.PreprocessTxs(req)
return &res, nil
}

//-------------------------------------------------------

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
Expand Down
41 changes: 39 additions & 2 deletions abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ func (cli *socketClient) ApplySnapshotChunkAsync(
return cli.queueRequestAsync(ctx, types.ToRequestApplySnapshotChunk(req))
}

func (cli *socketClient) PreprocessTxsAsync(ctx context.Context, req types.RequestPreprocessTxs) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestPreprocessTxs(req))
}

//----------------------------------------

func (cli *socketClient) FlushSync(ctx context.Context) error {
Expand Down Expand Up @@ -465,6 +469,17 @@ func (cli *socketClient) ApplySnapshotChunkSync(
return reqres.Response.GetApplySnapshotChunk(), nil
}

func (cli *socketClient) PreprocessTxsSync(
ctx context.Context,
req types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestPreprocessTxs(req))
if err != nil {
return nil, err
}
return reqres.Response.GetPreprocessTxs(), nil
}

//----------------------------------------

// queueRequest enqueues req onto the queue. If the queue is full, it ether
Expand Down Expand Up @@ -591,6 +606,8 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
_, ok = res.Value.(*types.Response_ListSnapshots)
case *types.Request_OfferSnapshot:
_, ok = res.Value.(*types.Response_OfferSnapshot)
case *types.Request_PreprocessTxs:
_, ok = res.Value.(*types.Response_PreprocessTxs)
}
return ok
}
Expand Down
5 changes: 5 additions & 0 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,8 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo

return resQuery
}

func (app *Application) PreprocessTxs(
req types.RequestPreprocessTxs) types.ResponsePreprocessTxs {
return types.ResponsePreprocessTxs{Txs: req.Txs}
}
5 changes: 5 additions & 0 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (app *PersistentKVStoreApplication) ApplySnapshotChunk(
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

func (app *PersistentKVStoreApplication) PreprocessTxs(
req types.RequestPreprocessTxs) types.ResponsePreprocessTxs {
return types.ResponsePreprocessTxs{Txs: req.Txs}
}

//---------------------------------------------
// update validators

Expand Down
3 changes: 3 additions & 0 deletions abci/server/socket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types
case *types.Request_ApplySnapshotChunk:
res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk)
responses <- types.ToResponseApplySnapshotChunk(res)
case *types.Request_PreprocessTxs:
res := s.app.PreprocessTxs(*r.PreprocessTxs)
responses <- types.ToResponsePreprocessTx(res)
default:
responses <- types.ToResponseException("Unknown request")
}
Expand Down
22 changes: 17 additions & 5 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
// to be driven by a blockchain-based replication engine via the ABCI.
// All methods take a RequestXxx argument and return a ResponseXxx argument,
// except CheckTx/DeliverTx, which take `tx []byte`, and `Commit`, which takes nothing.
// nolint:lll // ignore for interface
type Application interface {
// Info/Query Connection
Info(RequestInfo) ResponseInfo // Return application info
Expand All @@ -17,11 +18,12 @@ type Application interface {
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool

// Consensus Connection
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block
DeliverTx(RequestDeliverTx) ResponseDeliverTx // Deliver a tx for full processing
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set
Commit() ResponseCommit // Commit the state and return the application Merkle root hash
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block
DeliverTx(RequestDeliverTx) ResponseDeliverTx // Deliver a tx for full processing
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set
Commit() ResponseCommit // Commit the state and return the application Merkle root hash
PreprocessTxs(RequestPreprocessTxs) ResponsePreprocessTxs // State machine preprocessing of txs

// State Sync Connection
ListSnapshots(RequestListSnapshots) ResponseListSnapshots // List available snapshots
Expand Down Expand Up @@ -90,6 +92,10 @@ func (BaseApplication) ApplySnapshotChunk(req RequestApplySnapshotChunk) Respons
return ResponseApplySnapshotChunk{}
}

func (BaseApplication) PreprocessTxs(req RequestPreprocessTxs) ResponsePreprocessTxs {
return ResponsePreprocessTxs{}
}

//-------------------------------------------------------

// GRPCApplication is a GRPC wrapper for Application
Expand Down Expand Up @@ -172,3 +178,9 @@ func (app *GRPCApplication) ApplySnapshotChunk(
res := app.app.ApplySnapshotChunk(*req)
return &res, nil
}

func (app *GRPCApplication) PreprocessTxs(
ctx context.Context, req *RequestPreprocessTxs) (*ResponsePreprocessTxs, error) {
res := app.app.PreprocessTxs(*req)
return &res, nil
}
12 changes: 12 additions & 0 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ func ToRequestApplySnapshotChunk(req RequestApplySnapshotChunk) *Request {
}
}

func ToRequestPreprocessTxs(res RequestPreprocessTxs) *Request {
return &Request{
Value: &Request_PreprocessTxs{&res},
}
}

//----------------------------------------

func ToResponseException(errStr string) *Response {
Expand Down Expand Up @@ -200,3 +206,9 @@ func ToResponseApplySnapshotChunk(res ResponseApplySnapshotChunk) *Response {
Value: &Response_ApplySnapshotChunk{&res},
}
}

func ToResponsePreprocessTx(res ResponsePreprocessTxs) *Response {
return &Response{
Value: &Response_PreprocessTxs{&res},
}
}
Loading

0 comments on commit fee0e1d

Please sign in to comment.