From 3969283fb69d67788913105eeb63d02ce470bb45 Mon Sep 17 00:00:00 2001 From: yihuang Date: Wed, 21 Sep 2022 08:53:42 +0800 Subject: [PATCH] Problem: state streamers are not integrated (backport #702) Solution: - integration the basic file streamer * add integration test * changelog * fix build * fix lint * fix deliver tx event in cosmos-sdk * fix integration test * Update integration_tests/test_streamer.py Signed-off-by: yihuang * update ethermint and fix build * add a small cli utility into test_streamer.py * fix integration test * update sdk to upstream Signed-off-by: yihuang --- CHANGELOG.md | 4 ++ app/app.go | 32 +++++++++ client/flags.go | 3 + cmd/cronosd/cmd/root.go | 2 + integration_tests/configs/default.jsonnet | 2 +- integration_tests/network.py | 7 +- integration_tests/poetry.lock | 19 +++++- integration_tests/pyproject.toml | 1 + integration_tests/test_streamer.py | 80 +++++++++++++++++++++++ nix/testenv.nix | 6 ++ 10 files changed, 150 insertions(+), 6 deletions(-) create mode 100644 client/flags.go create mode 100644 integration_tests/test_streamer.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d3928cfab3..74332b2bdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ - [cronos#719](https://github.com/crypto-org-chain/cronos/pull/719) Fix `eth_call` for legacy blocks (backport #713). +### Improvements + +- [cronos#721](https://github.com/crypto-org-chain/cronos/pull/721) Integrate the file state streamer (backport #702). + *Sep 20, 2022* ## v0.9.0-beta3 diff --git a/app/app.go b/app/app.go index 5bebef054b..8c92ba5d51 100644 --- a/app/app.go +++ b/app/app.go @@ -5,6 +5,7 @@ import ( "net/http" "os" "path/filepath" + "sync" "github.com/crypto-org-chain/cronos/x/cronos/middleware" @@ -19,11 +20,13 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/client/grpc/tmservice" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/server/api" "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" + "github.com/cosmos/cosmos-sdk/store/streaming/file" storetypes "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/testutil/testdata" sdk "github.com/cosmos/cosmos-sdk/types" @@ -118,6 +121,7 @@ import ( gravitytypes "github.com/peggyjv/gravity-bridge/module/v2/x/gravity/types" // this line is used by starport scaffolding # stargate/app/moduleImport + cronosappclient "github.com/crypto-org-chain/cronos/client" "github.com/crypto-org-chain/cronos/x/cronos" cronosclient "github.com/crypto-org-chain/cronos/x/cronos/client" cronoskeeper "github.com/crypto-org-chain/cronos/x/cronos/keeper" @@ -142,6 +146,8 @@ const ( // // NOTE: In the SDK, the default value is 255. AddrLen = 20 + + FileStreamerDirectory = "file_streamer" ) // this line is used by starport scaffolding # stargate/wasm/app/enabledProposals @@ -341,6 +347,32 @@ func New( tkeys := sdk.NewTransientStoreKeys(paramstypes.TStoreKey, evmtypes.TransientKey, feemarkettypes.TransientKey) memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey) + // configure state listening capabilities using AppOptions + // we are doing nothing with the returned streamingServices and waitGroup in this case + // Only support file streamer right now. + if cast.ToString(appOpts.Get(cronosappclient.FlagStreamers)) == "file" { + streamingDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data", FileStreamerDirectory) + if err := os.MkdirAll(streamingDir, os.ModePerm); err != nil { + panic(err) + } + + // default to exposing all + exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys)) + for _, storeKey := range keys { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec) + if err != nil { + panic(err) + } + bApp.SetStreamingService(service) + + wg := new(sync.WaitGroup) + if err := service.Stream(wg); err != nil { + panic(err) + } + } + app := &App{ BaseApp: bApp, cdc: cdc, diff --git a/client/flags.go b/client/flags.go new file mode 100644 index 0000000000..96926045f6 --- /dev/null +++ b/client/flags.go @@ -0,0 +1,3 @@ +package client + +const FlagStreamers = "streamers" diff --git a/cmd/cronosd/cmd/root.go b/cmd/cronosd/cmd/root.go index 1066ae8bbe..0357883043 100644 --- a/cmd/cronosd/cmd/root.go +++ b/cmd/cronosd/cmd/root.go @@ -45,6 +45,7 @@ import ( ethermint "github.com/evmos/ethermint/types" "github.com/crypto-org-chain/cronos/app" + cronosclient "github.com/crypto-org-chain/cronos/client" // this line is used by starport scaffolding # stargate/root/import ) @@ -147,6 +148,7 @@ func initRootCmd(rootCmd *cobra.Command, encodingConfig params.EncodingConfig) { func addModuleInitFlags(startCmd *cobra.Command) { crisis.AddModuleInitFlags(startCmd) cronos.AddModuleInitFlags(startCmd) + startCmd.Flags().String(cronosclient.FlagStreamers, "", "Enable streamers, only file streamer is supported right now") // this line is used by starport scaffolding # stargate/root/initFlags } diff --git a/integration_tests/configs/default.jsonnet b/integration_tests/configs/default.jsonnet index da06e0e42d..12ca83f47d 100644 --- a/integration_tests/configs/default.jsonnet +++ b/integration_tests/configs/default.jsonnet @@ -2,7 +2,7 @@ dotenv: '../../scripts/.env', 'cronos_777-1': { cmd: 'cronosd', - 'start-flags': '--trace', + 'start-flags': '--trace --streamers file', config: { mempool: { version: 'v1', diff --git a/integration_tests/network.py b/integration_tests/network.py index 0bc5240c92..b9f69e1caa 100644 --- a/integration_tests/network.py +++ b/integration_tests/network.py @@ -55,9 +55,10 @@ def node_rpc(self, i): return "tcp://127.0.0.1:%d" % ports.rpc_port(self.base_port(i)) def cosmos_cli(self, i=0): - return CosmosCLI( - self.base_dir / f"node{i}", self.node_rpc(i), self.chain_binary - ) + return CosmosCLI(self.node_home(i), self.node_rpc(i), self.chain_binary) + + def node_home(self, i=0): + return self.base_dir / f"node{i}" def use_websocket(self, use=True): self._w3 = None diff --git a/integration_tests/poetry.lock b/integration_tests/poetry.lock index 27b1bb5426..9b0b08e93d 100644 --- a/integration_tests/poetry.lock +++ b/integration_tests/poetry.lock @@ -146,6 +146,21 @@ category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +[[package]] +name = "cprotobuf" +version = "0.1.10" +description = "pythonic and high performance protocol buffer implementation." +category = "main" +optional = false +python-versions = "*" +develop = false + +[package.source] +type = "git" +url = "https://github.com/yihuang/cprotobuf.git" +reference = "master" +resolved_reference = "711f2e2f5619ca4308caf9c971355ce62598c765" + [[package]] name = "cytoolz" version = "0.11.2" @@ -1079,7 +1094,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest- [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "4c42654857d14b05711431944ce50438ba9f430f8922600b9d3cfb8420fd4dc6" +content-hash = "0dd8419b0eba0ed6f49c4f485e1894f5e24127acf71beb7c8d31aa8bce6388e2" [metadata.files] aiohttp = [ @@ -1222,8 +1237,8 @@ click = [ ] colorama = [ {file = "colorama-0.4.4-py2.py3-none-any.whl", hash = "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2"}, - {file = "colorama-0.4.4.tar.gz", hash = "sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b"}, ] +cprotobuf = [] cytoolz = [ {file = "cytoolz-0.11.2.tar.gz", hash = "sha256:ea23663153806edddce7e4153d1d407d62357c05120a4e8485bddf1bd5ab22b4"}, ] diff --git a/integration_tests/pyproject.toml b/integration_tests/pyproject.toml index 6a53f000ae..f5b3e565b2 100644 --- a/integration_tests/pyproject.toml +++ b/integration_tests/pyproject.toml @@ -26,6 +26,7 @@ toml = "^0.10.2" pysha3 = "^1.0.2" jsonnet = "^0.18.0" eth-account = { git = "https://github.com/mmsqe/eth-account.git", branch = "v0.5.8-rc0" } +cprotobuf = { git = "https://github.com/yihuang/cprotobuf.git" } [tool.poetry.dev-dependencies] diff --git a/integration_tests/test_streamer.py b/integration_tests/test_streamer.py new file mode 100644 index 0000000000..249aa346f2 --- /dev/null +++ b/integration_tests/test_streamer.py @@ -0,0 +1,80 @@ +from cprotobuf import Field, ProtoEntity, decode_primitive +from hexbytes import HexBytes + +from .utils import ADDRS + + +class StoreKVPairs(ProtoEntity): + # the store key for the KVStore this pair originates from + store_key = Field("string", 1) + # true indicates a delete operation + delete = Field("bool", 2) + key = Field("bytes", 3) + value = Field("bytes", 4) + + +def decode_stream_file(data, body_cls=StoreKVPairs, header_cls=None, footer_cls=None): + """ + header, body*, footer + """ + header = footer = None + body = [] + offset = 0 + size, n = decode_primitive(data, "uint64") + offset += n + + # header + if header_cls is not None: + header = header_cls() + header.ParseFromString(data[offset : offset + size]) + offset += size + + while True: + size, n = decode_primitive(data[offset:], "uint64") + offset += n + if offset + size == len(data): + # footer + if footer_cls is not None: + footer = footer_cls() + footer.ParseFromString(data[offset : offset + size]) + offset += size + break + else: + # body + if body_cls is not None: + item = body_cls() + item.ParseFromString(data[offset : offset + size]) + body.append(item) + offset += size + return header, body, footer + + +def test_streamers(cronos): + """ + - check the streaming files are created + - try to parse the state change sets + """ + # inspect the first state change of the first tx in genesis + path = cronos.node_home(0) / "data/file_streamer/block-0-tx-0" + _, body, _ = decode_stream_file(open(path, "rb").read()) + # creation of the validator account + assert body[0].store_key == "acc" + # the order in gen_txs is undeterministic, could be either one. + assert body[0].key in ( + b"\x01" + HexBytes(ADDRS["validator"]), + b"\x01" + HexBytes(ADDRS["validator2"]), + ) + + +if __name__ == "__main__": + import binascii + import sys + + _, body, _ = decode_stream_file(open(sys.argv[1], "rb").read()) + for item in body: + print( + item.store_key, + item.delete, + binascii.hexlify(item.key).decode(), + binascii.hexlify(item.value).decode(), + ) diff --git a/nix/testenv.nix b/nix/testenv.nix index b099989cb1..c5b9ef023a 100644 --- a/nix/testenv.nix +++ b/nix/testenv.nix @@ -14,5 +14,11 @@ pkgs.poetry2nix.mkPoetryEnv { nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [ self.poetry ]; } ); + + cprotobuf = super.cprotobuf.overridePythonAttrs ( + old: { + nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [ self.cython ]; + } + ); }); }