diff --git a/cmd/blobstream/query/cmd.go b/cmd/blobstream/query/cmd.go index cbce2ebc..bf17638c 100644 --- a/cmd/blobstream/query/cmd.go +++ b/cmd/blobstream/query/cmd.go @@ -52,10 +52,28 @@ func Command() *cobra.Command { } func Signers() *cobra.Command { + signersCmd := &cobra.Command{ + Use: "signers", + Aliases: []string{"s"}, + Short: "Query orchestrators information that are running blobstream", + SilenceUsage: true, + } + + signersCmd.AddCommand( + SignersNonce(), + SignersRange(), + ) + + signersCmd.SetHelpCommand(&cobra.Command{}) + + return signersCmd +} + +func SignersNonce() *cobra.Command { command := &cobra.Command{ - Use: "signers ", + Use: "nonce ", Args: cobra.ExactArgs(1), - Short: "Queries the Blobstream for attestations signers", + Short: "Queries the Blobstream for attestations signers by nonce", Long: "Queries the Blobstream for attestations signers. The nonce is the attestation nonce that the command" + " will query signatures for. It should be either a specific nonce starting from 2 and on." + " Or, use 'latest' as argument to check the latest attestation nonce", @@ -76,7 +94,10 @@ func Signers() *cobra.Command { ctx, cancel := context.WithCancel(cmd.Context()) defer cancel() - stopFuncs := make([]func() error, 0, 1) + tmQuerier, appQuerier, p2pQuerier, stopFuncs, err := startResources(ctx, config, logger) + if err != nil { + return err + } defer func() { for _, f := range stopFuncs { err := f() @@ -86,70 +107,246 @@ func Signers() *cobra.Command { } }() - // create tm querier and app querier - tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier( - logger, - config.coreRPC, - config.coreGRPC, - config.grpcInsecure, - ) - stopFuncs = append(stopFuncs, stops...) + nonce, err := parseNonce(ctx, appQuerier, args[0]) if err != nil { return err } - - // creating the host - h, err := libp2p.New() - if err != nil { - return err + if nonce == 1 { + return fmt.Errorf("nonce 1 doesn't need to be signed. signatures start from nonce 2") } - addrInfo, err := peer.AddrInfoFromString(config.targetNode) + + qOutput, err := getSignatures(ctx, logger, appQuerier, tmQuerier, p2pQuerier, nonce) if err != nil { return err } - for i := 0; i < 5; i++ { - logger.Debug("connecting to target node...") - err := h.Connect(ctx, *addrInfo) + if config.outputFile == "" { + printConfirms(logger, *qOutput) + } else { + err := writeConfirmsToJSONFile(logger, *qOutput, config.outputFile) if err != nil { - logger.Error("couldn't connect to target node", "err", err.Error()) + return err } - if err == nil { - logger.Debug("connected to target node") - break - } - time.Sleep(5 * time.Second) } + return nil + }, + } + return addFlags(command) +} - // creating the data store - dataStore := dssync.MutexWrap(ds.NewMapDatastore()) +func startResources(ctx context.Context, config Config, logger tmlog.Logger) (*rpc.TmQuerier, *rpc.AppQuerier, *p2p.Querier, []func() error, error) { + stopFuncs := make([]func() error, 0, 1) - // creating the dht - dht, err := p2p.NewBlobstreamDHT(cmd.Context(), h, dataStore, []peer.AddrInfo{}, logger) + // create tm querier and app querier + tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier( + logger, + config.coreRPC, + config.coreGRPC, + config.grpcInsecure, + ) + stopFuncs = append(stopFuncs, stops...) + if err != nil { + return nil, nil, nil, stopFuncs, err + } + + // creating the host + h, err := libp2p.New() + if err != nil { + return nil, nil, nil, stopFuncs, err + } + addrInfo, err := peer.AddrInfoFromString(config.targetNode) + if err != nil { + return nil, nil, nil, stopFuncs, err + } + for i := 0; i < 5; i++ { + logger.Debug("connecting to target node...") + err := h.Connect(ctx, *addrInfo) + if err != nil { + logger.Error("couldn't connect to target node", "err", err.Error()) + } + if err == nil { + logger.Debug("connected to target node") + break + } + time.Sleep(5 * time.Second) + } + + // creating the data store + dataStore := dssync.MutexWrap(ds.NewMapDatastore()) + + // creating the dht + dht, err := p2p.NewBlobstreamDHT(ctx, h, dataStore, []peer.AddrInfo{}, logger) + if err != nil { + return nil, nil, nil, stopFuncs, err + } + + // creating the p2p querier + p2pQuerier := p2p.NewQuerier(dht, logger) + return tmQuerier, appQuerier, p2pQuerier, stopFuncs, nil +} + +func SignersRange() *cobra.Command { + command := &cobra.Command{ + Use: "range ", + Args: cobra.ExactArgs(2), + Short: "Queries the Blobstream for the orchestrators that signed a certain range of nonces", + Long: "Queries the Blobstream for attestations signers. The range of the nonces can start from two" + + " and can use the keyword `latest` to denominate the latest nonce. The range is end exclusive.", + RunE: func(cmd *cobra.Command, args []string) error { + // creating the logger + logger := tmlog.NewTMLogger(os.Stdout) + fileConfig, err := tryToGetExistingConfig(cmd, logger) + if err != nil { + return err + } + config, err := parseFlags(cmd, &fileConfig) if err != nil { return err } - // creating the p2p querier - p2pQuerier := p2p.NewQuerier(dht, logger) + logger.Debug("initializing queriers") - nonce, err := parseNonce(ctx, appQuerier, args[0]) + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + tmQuerier, appQuerier, p2pQuerier, stopFuncs, err := startResources(ctx, config, logger) if err != nil { return err } - if nonce == 1 { - return fmt.Errorf("nonce 1 doesn't need to be signed. signatures start from nonce 2") - } + defer func() { + for _, f := range stopFuncs { + err := f() + if err != nil { + logger.Error(err.Error()) + } + } + }() - err = getSignaturesAndPrintThem(ctx, logger, appQuerier, tmQuerier, p2pQuerier, nonce, config.outputFile) + if args[0] == "latest" { + return fmt.Errorf("start nonce can't be the `latest` nonce") + } + startNonce, err := parseNonce(ctx, appQuerier, args[0]) + if err != nil { + return err + } + endNonce, err := parseNonce(ctx, appQuerier, args[1]) + if err != nil { + return err + } + err = validateRange(startNonce, endNonce) if err != nil { return err } + + qOutputs := make([]*queryOutput, endNonce-startNonce) + for nonce := startNonce; nonce < endNonce; nonce++ { + qOutputs[nonce-startNonce], err = getSignatures(ctx, logger, appQuerier, tmQuerier, p2pQuerier, nonce) + if err != nil { + return err + } + } + + signersMap := make(map[validatorInfo]uint64) + for _, qOutput := range qOutputs { + for _, sig := range qOutput.Signatures { + if sig.Signed { + // increment the number of the signatures + signersMap[validatorInfo{ + EvmAddress: sig.EvmAddress, + Moniker: sig.Moniker, + ValopAddress: sig.ValopAddress, + }]++ + } else { + // keep the same number of signatures the same but still have the value in the map + signersMap[validatorInfo{ + EvmAddress: sig.EvmAddress, + Moniker: sig.Moniker, + ValopAddress: sig.ValopAddress, + }] = signersMap[validatorInfo{ + EvmAddress: sig.EvmAddress, + Moniker: sig.Moniker, + ValopAddress: sig.ValopAddress, + }] + } + } + } + + if config.outputFile == "" { + printSignersForRange(logger, signersMap) + } else { + err := writeSignersForRangeToJSONFile(logger, signersMap, config.outputFile) + if err != nil { + return err + } + } + return nil }, } return addFlags(command) } +func writeSignersForRangeToJSONFile(logger tmlog.Logger, signersMap map[validatorInfo]uint64, outputFile string) error { + logger.Info("writing number of signatures to json file", "path", outputFile) + + file, err := os.OpenFile(outputFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644) + if err != nil { + return err + } + defer func(file *os.File) { + err := file.Close() + if err != nil { + logger.Error("failed to close file", "err", err.Error()) + } + }(file) + + encoder := json.NewEncoder(file) + type encodedSignersStruct struct { + EvmAddress string `json:"evmAddress"` + Moniker string `json:"moniker"` + ValopAddress string `json:"valopAddress"` + NumberOfSignatures uint64 `json:"number_of_signatures"` + } + encodedSigners := make([]encodedSignersStruct, 0) + for key, val := range signersMap { + encodedSigners = append(encodedSigners, encodedSignersStruct{ + EvmAddress: key.EvmAddress, + Moniker: key.Moniker, + ValopAddress: key.ValopAddress, + NumberOfSignatures: val, + }) + } + err = encoder.Encode(encodedSigners) + if err != nil { + return err + } + + logger.Info("output written to file successfully", "path", outputFile) + return nil +} + +func printSignersForRange(logger tmlog.Logger, signersMap map[validatorInfo]uint64) { + for signer, numberOfSignatures := range signersMap { + logger.Info(signer.Moniker, "evm_address", signer.EvmAddress, "valop_address", signer.ValopAddress, "number_of_signatures", numberOfSignatures) + } + logger.Info("done") +} + +func validateRange(start uint64, end uint64) error { + if start == 0 { + return fmt.Errorf("start cannot be 0. attestations start at 1") + } + if end == 0 { + return fmt.Errorf("end cannot be 0. attestations start at 1") + } + if start == end { + return fmt.Errorf("start cannot be equal to end") + } + if start > end { + return fmt.Errorf("start cannot be higher than end") + } + return nil +} + type signature struct { EvmAddress string `json:"evmAddress"` Moniker string `json:"moniker"` @@ -172,59 +369,51 @@ type queryOutput struct { CanRelay bool `json:"can_relay"` } -func getSignaturesAndPrintThem( +func getSignatures( ctx context.Context, logger tmlog.Logger, appQuerier *rpc.AppQuerier, tmQuerier *rpc.TmQuerier, p2pQuerier *p2p.Querier, nonce uint64, - outputFile string, -) error { +) (*queryOutput, error) { logger.Info("getting signatures for nonce", "nonce", nonce) lastValset, err := appQuerier.QueryLastValsetBeforeNonce(ctx, nonce) if err != nil { - return err + return nil, err } validatorSet, err := appQuerier.QueryStakingValidatorSet(ctx) if err != nil { - return err + return nil, err } validatorsInfo, err := toValidatorsInfo(ctx, appQuerier, validatorSet) if err != nil { - return err + return nil, err } att, err := appQuerier.QueryAttestationByNonce(ctx, nonce) if err != nil { - return err + return nil, err } if att == nil { - return celestiatypes.ErrAttestationNotFound + return nil, celestiatypes.ErrAttestationNotFound } switch castedAtt := att.(type) { case *celestiatypes.Valset: signBytes, err := castedAtt.SignBytes() if err != nil { - return err + return nil, err } confirms, err := p2pQuerier.QueryValsetConfirms(ctx, nonce, *lastValset, signBytes.Hex()) if err != nil { - return err + return nil, err } qOutput := toQueryOutput(toValsetConfirmsMap(confirms), validatorsInfo, nonce, *lastValset) - if outputFile == "" { - printConfirms(logger, qOutput) - } else { - err := writeConfirmsToJSONFile(logger, qOutput, outputFile) - if err != nil { - return err - } - } + return &qOutput, nil case *celestiatypes.DataCommitment: commitment, err := tmQuerier.QueryCommitment( ctx, @@ -232,26 +421,18 @@ func getSignaturesAndPrintThem( castedAtt.EndBlock, ) if err != nil { - return err + return nil, err } dataRootHash := types.DataCommitmentTupleRootSignBytes(big.NewInt(int64(castedAtt.Nonce)), commitment) confirms, err := p2pQuerier.QueryDataCommitmentConfirms(ctx, *lastValset, nonce, dataRootHash.Hex()) if err != nil { - return err + return nil, err } qOutput := toQueryOutput(toDataCommitmentConfirmsMap(confirms), validatorsInfo, nonce, *lastValset) - if outputFile == "" { - printConfirms(logger, qOutput) - } else { - err := writeConfirmsToJSONFile(logger, qOutput, outputFile) - if err != nil { - return err - } - } + return &qOutput, nil default: - return errors.Wrap(types.ErrUnknownAttestationType, strconv.FormatUint(nonce, 10)) + return nil, errors.Wrap(types.ErrUnknownAttestationType, strconv.FormatUint(nonce, 10)) } - return nil } func toValidatorsInfo(ctx context.Context, appQuerier *rpc.AppQuerier, validatorSet []stakingtypes.Validator) (map[string]validatorInfo, error) {