From 9b359d45e3b4d242ec0e8b2f74a7af163f960f4c Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sun, 29 Jan 2023 12:49:57 -0700 Subject: [PATCH] Add client ICQ impl --- .../stride/interchainquery/v1/messages.proto | 3 +- relayer/chains/cosmos/event_parser.go | 65 ++++++++++++++ relayer/chains/cosmos/message_handlers.go | 23 +++++ relayer/chains/cosmos/stride/codec.go | 24 +++++ relayer/chains/cosmos/stride/messages.go | 51 +++++++++++ relayer/chains/cosmos/stride/messages.pb.go | 57 ++++++------ relayer/chains/cosmos/tx.go | 39 +++++++++ relayer/processor/path_end_runtime.go | 75 +++++++++++++++- relayer/processor/path_processor.go | 3 + relayer/processor/path_processor_internal.go | 87 ++++++++++++++++++- relayer/processor/types.go | 48 ++++++++++ relayer/processor/types_internal.go | 32 +++++-- relayer/provider/provider.go | 33 +++++++ scripts/protocgen.sh | 2 +- 14 files changed, 501 insertions(+), 41 deletions(-) create mode 100644 relayer/chains/cosmos/stride/codec.go create mode 100644 relayer/chains/cosmos/stride/messages.go diff --git a/proto/stride/interchainquery/v1/messages.proto b/proto/stride/interchainquery/v1/messages.proto index c02f66e7b..8c974d9a6 100644 --- a/proto/stride/interchainquery/v1/messages.proto +++ b/proto/stride/interchainquery/v1/messages.proto @@ -6,7 +6,8 @@ import "gogoproto/gogo.proto"; import "cosmos_proto/cosmos.proto"; import "tendermint/crypto/proof.proto"; -option go_package = "github.com/Stride-Labs/stride/v5/x/interchainquery/types"; +// NOTE: copied into this repository from "github.com/Stride-Labs/stride/v5/x/interchainquery/types" +option go_package = "github.com/cosmos/relayer/relayer/chains/cosmos/stride"; // MsgSubmitQueryResponse represents a message type to fulfil a query request. message MsgSubmitQueryResponse { diff --git a/relayer/chains/cosmos/event_parser.go b/relayer/chains/cosmos/event_parser.go index ddc321dc9..c2f7cbe57 100644 --- a/relayer/chains/cosmos/event_parser.go +++ b/relayer/chains/cosmos/event_parser.go @@ -2,6 +2,7 @@ package cosmos import ( "encoding/hex" + "fmt" "strconv" "strings" @@ -103,6 +104,17 @@ func parseIBCMessageFromEvent( eventType: event.Type, info: ci, } + + case processor.ClientICQTypeRequest, processor.ClientICQTypeResponse: + ci := &clientICQInfo{ + Height: height, + Source: chainID, + } + ci.parseAttrs(log, event.Attributes) + return &ibcMessage{ + eventType: event.Type, + info: ci, + } } return nil } @@ -379,3 +391,56 @@ func (res *connectionInfo) parseConnectionAttribute(attr sdk.Attribute) { res.CounterpartyClientID = attr.Value } } + +type clientICQInfo struct { + Source string + Connection string + Chain string + QueryID provider.ClientICQQueryID + Type string + Request []byte + Height uint64 +} + +func (res *clientICQInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("connection_id", res.Connection) + enc.AddString("chain_id", res.Chain) + enc.AddString("query_id", string(res.QueryID)) + enc.AddString("type", res.Type) + enc.AddString("request", hex.EncodeToString(res.Request)) + enc.AddUint64("height", res.Height) + + return nil +} + +func (res *clientICQInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) { + for _, attr := range attrs { + if err := res.parseAttribute(attr); err != nil { + panic(fmt.Errorf("failed to parse attributes from client ICQ message: %w", err)) + } + } +} + +func (res *clientICQInfo) parseAttribute(attr sdk.Attribute) (err error) { + switch attr.Key { + case "connection_id": + res.Connection = attr.Value + case "chain_id": + res.Chain = attr.Value + case "query_id": + res.QueryID = provider.ClientICQQueryID(attr.Value) + case "type": + res.Type = attr.Value + case "request": + res.Request, err = hex.DecodeString(attr.Value) + if err != nil { + return err + } + case "height": + res.Height, err = strconv.ParseUint(attr.Value, 10, 64) + if err != nil { + return err + } + } + return nil +} diff --git a/relayer/chains/cosmos/message_handlers.go b/relayer/chains/cosmos/message_handlers.go index b28da07b2..92ee332bf 100644 --- a/relayer/chains/cosmos/message_handlers.go +++ b/relayer/chains/cosmos/message_handlers.go @@ -2,6 +2,7 @@ package cosmos import ( "context" + "encoding/hex" conntypes "github.com/cosmos/ibc-go/v5/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v5/modules/core/04-channel/types" @@ -21,6 +22,8 @@ func (ccp *CosmosChainProcessor) handleMessage(ctx context.Context, m ibcMessage ccp.handleConnectionMessage(m.eventType, provider.ConnectionInfo(*t), c) case *clientInfo: ccp.handleClientMessage(ctx, m.eventType, *t) + case *clientICQInfo: + ccp.handleClientICQMessage(m.eventType, provider.ClientICQInfo(*t), c) } } @@ -133,6 +136,15 @@ func (ccp *CosmosChainProcessor) handleClientMessage(ctx context.Context, eventT ccp.logObservedIBCMessage(eventType, zap.String("client_id", ci.clientID)) } +func (ccp *CosmosChainProcessor) handleClientICQMessage( + eventType string, + ci provider.ClientICQInfo, + c processor.IBCMessagesCache, +) { + c.ClientICQ.Retain(processor.ClientICQType(eventType), ci) + ccp.logClientICQMessage(eventType, ci) +} + func (ccp *CosmosChainProcessor) logObservedIBCMessage(m string, fields ...zap.Field) { ccp.log.With(zap.String("event_type", m)).Debug("Observed IBC message", fields...) } @@ -178,3 +190,14 @@ func (ccp *CosmosChainProcessor) logConnectionMessage(message string, ci provide zap.String("counterparty_connection_id", ci.CounterpartyConnID), ) } + +func (ccp *CosmosChainProcessor) logClientICQMessage(icqType string, ci provider.ClientICQInfo) { + ccp.logObservedIBCMessage(icqType, + zap.String("type", ci.Type), + zap.String("query_id", string(ci.QueryID)), + zap.String("request", hex.EncodeToString(ci.Request)), + zap.String("chain_id", ci.Chain), + zap.String("connection_id", ci.Connection), + zap.Uint64("height", ci.Height), + ) +} diff --git a/relayer/chains/cosmos/stride/codec.go b/relayer/chains/cosmos/stride/codec.go new file mode 100644 index 000000000..f1d869225 --- /dev/null +++ b/relayer/chains/cosmos/stride/codec.go @@ -0,0 +1,24 @@ +package stride + +import ( + "github.com/cosmos/cosmos-sdk/codec" + cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" +) + +// Originally sourced from https://github.com/Stride-Labs/stride/blob/v5.1.1/x/interchainquery/types/codec.go +// Needed for cosmos sdk Msg implementation in messages.go. + +var ( + amino = codec.NewLegacyAmino() + ModuleCdc = codec.NewAminoCodec(amino) +) + +func RegisterLegacyAminoCodec(cdc *codec.LegacyAmino) { + cdc.RegisterConcrete(&MsgSubmitQueryResponse{}, "/stride.interchainquery.MsgSubmitQueryResponse", nil) +} + +func init() { + RegisterLegacyAminoCodec(amino) + cryptocodec.RegisterCrypto(amino) + amino.Seal() +} diff --git a/relayer/chains/cosmos/stride/messages.go b/relayer/chains/cosmos/stride/messages.go new file mode 100644 index 000000000..29bdf5ed7 --- /dev/null +++ b/relayer/chains/cosmos/stride/messages.go @@ -0,0 +1,51 @@ +package stride + +import ( + sdk "github.com/cosmos/cosmos-sdk/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" +) + +// Originally sourced from https://github.com/Stride-Labs/stride/blob/v5.1.1/x/interchainquery/types/msgs.go +// Needed for cosmos sdk Msg implementation. + +// interchainquery message types +const ( + TypeMsgSubmitQueryResponse = "submitqueryresponse" + + // RouterKey is the message route for icq + RouterKey = "interchainquery" +) + +var _ sdk.Msg = &MsgSubmitQueryResponse{} + +// Route Implements Msg. +func (msg MsgSubmitQueryResponse) Route() string { return RouterKey } + +// Type Implements Msg. +func (msg MsgSubmitQueryResponse) Type() string { return TypeMsgSubmitQueryResponse } + +// ValidateBasic Implements Msg. +func (msg MsgSubmitQueryResponse) ValidateBasic() error { + // check from address + _, err := sdk.AccAddressFromBech32(msg.FromAddress) + if err != nil { + return sdkerrors.Wrapf(sdkerrors.ErrInvalidAddress, "invalid fromAddress in ICQ response (%s)", err) + } + // check chain_id is not empty + if msg.ChainId == "" { + return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "chain_id cannot be empty in ICQ response") + } + + return nil +} + +// GetSignBytes Implements Msg. +func (msg MsgSubmitQueryResponse) GetSignBytes() []byte { + return sdk.MustSortJSON(ModuleCdc.MustMarshalJSON(&msg)) +} + +// GetSigners Implements Msg. +func (msg MsgSubmitQueryResponse) GetSigners() []sdk.AccAddress { + fromAddress, _ := sdk.AccAddressFromBech32(msg.FromAddress) + return []sdk.AccAddress{fromAddress} +} diff --git a/relayer/chains/cosmos/stride/messages.pb.go b/relayer/chains/cosmos/stride/messages.pb.go index c94323b22..b22c6434f 100644 --- a/relayer/chains/cosmos/stride/messages.pb.go +++ b/relayer/chains/cosmos/stride/messages.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. // source: stride/interchainquery/v1/messages.proto -package types +package stride import ( fmt "fmt" @@ -77,34 +77,33 @@ func init() { } var fileDescriptor_25adad4f8ed32400 = []byte{ - // 425 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x92, 0xbd, 0x6e, 0xd4, 0x40, - 0x10, 0xc7, 0xbd, 0x09, 0x5c, 0x12, 0xe7, 0x10, 0x60, 0x4e, 0xc8, 0x09, 0xc2, 0x3e, 0xb9, 0x32, - 0x45, 0x76, 0x15, 0x10, 0x12, 0x0a, 0x15, 0xd7, 0x45, 0xe2, 0x73, 0xaf, 0xa3, 0x39, 0xf9, 0x63, - 0x63, 0xaf, 0x14, 0x7b, 0x97, 0x9d, 0xf5, 0x09, 0xbf, 0x01, 0x25, 0x25, 0xe5, 0x3d, 0x04, 0x0f, - 0x41, 0x19, 0x51, 0x51, 0x9d, 0xd0, 0x5d, 0x03, 0xed, 0x3d, 0x01, 0xf2, 0xae, 0x03, 0x28, 0xe9, - 0xc6, 0xf3, 0xff, 0x8d, 0xff, 0x33, 0x3b, 0xe3, 0xc6, 0xa0, 0x15, 0xcf, 0x19, 0xe1, 0xb5, 0x66, - 0x2a, 0x2b, 0x13, 0x5e, 0x7f, 0x68, 0x98, 0x6a, 0xc9, 0xfc, 0x98, 0x54, 0x0c, 0x20, 0x29, 0x18, - 0x60, 0xa9, 0x84, 0x16, 0xde, 0x81, 0x25, 0xf1, 0x15, 0x12, 0xcf, 0x8f, 0x0f, 0x47, 0x85, 0x28, - 0x84, 0xa1, 0x48, 0x17, 0xd9, 0x82, 0xc3, 0x83, 0x4c, 0x40, 0x25, 0x60, 0x66, 0x05, 0xfb, 0xd1, - 0x4b, 0x0f, 0x35, 0xab, 0x73, 0xa6, 0x2a, 0x5e, 0x6b, 0x92, 0xa9, 0x56, 0x6a, 0x41, 0xa4, 0x12, - 0xe2, 0xcc, 0xca, 0xd1, 0xef, 0x2d, 0xf7, 0xfe, 0x2b, 0x28, 0xa6, 0x4d, 0x5a, 0x71, 0xfd, 0xae, - 0x73, 0xa1, 0x0c, 0xa4, 0xa8, 0x81, 0x79, 0xd8, 0xdd, 0x35, 0xde, 0x33, 0x9e, 0xfb, 0x68, 0x8c, - 0xe2, 0xbd, 0xc9, 0xbd, 0xcd, 0x32, 0xbc, 0xdd, 0x26, 0xd5, 0xf9, 0x49, 0x74, 0xa9, 0x44, 0x74, - 0xc7, 0x84, 0xa7, 0x79, 0xc7, 0x9b, 0x36, 0x3b, 0x7e, 0xeb, 0x2a, 0x7f, 0xa9, 0x44, 0x74, 0xc7, - 0x84, 0xa7, 0xb9, 0xf7, 0xc8, 0x1d, 0x28, 0x06, 0xcd, 0xb9, 0xf6, 0xb7, 0xc7, 0x28, 0x1e, 0x4e, - 0xee, 0x6e, 0x96, 0xe1, 0x2d, 0x4b, 0xdb, 0x7c, 0x44, 0x7b, 0xc0, 0x7b, 0xed, 0xee, 0x99, 0xa6, - 0x67, 0x42, 0x82, 0x7f, 0x63, 0x8c, 0xe2, 0xfd, 0xc7, 0x0f, 0xf0, 0xbf, 0xc1, 0xb0, 0x1d, 0x0c, - 0xbf, 0xed, 0x98, 0x37, 0x12, 0x26, 0xa3, 0xcd, 0x32, 0xbc, 0x63, 0x7f, 0xf5, 0xb7, 0x2e, 0xa2, - 0xbb, 0xb2, 0xd7, 0x3b, 0xeb, 0x92, 0xf1, 0xa2, 0xd4, 0xfe, 0xcd, 0x31, 0x8a, 0xb7, 0xff, 0xb7, - 0xb6, 0xf9, 0x88, 0xf6, 0x80, 0xf7, 0xdc, 0x1d, 0x9e, 0x29, 0x51, 0xcd, 0x92, 0x3c, 0x57, 0x0c, - 0xc0, 0x1f, 0x98, 0xc9, 0xfc, 0xef, 0x5f, 0x8f, 0x46, 0xfd, 0x3b, 0xbf, 0xb0, 0xca, 0x54, 0x2b, - 0x5e, 0x17, 0x74, 0xbf, 0xa3, 0xfb, 0xd4, 0xc9, 0xf0, 0xd3, 0x22, 0x74, 0xbe, 0x2c, 0x42, 0xf4, - 0x6b, 0x11, 0x3a, 0x13, 0xfa, 0x6d, 0x15, 0xa0, 0x8b, 0x55, 0x80, 0x7e, 0xae, 0x02, 0xf4, 0x79, - 0x1d, 0x38, 0x17, 0xeb, 0xc0, 0xf9, 0xb1, 0x0e, 0x9c, 0xf7, 0xcf, 0x0a, 0xae, 0xcb, 0x26, 0xc5, - 0x99, 0xa8, 0xc8, 0xd4, 0xec, 0xfe, 0xe8, 0x65, 0x92, 0x02, 0xe9, 0x2f, 0x66, 0xfe, 0x94, 0x7c, - 0xbc, 0x76, 0x36, 0xba, 0x95, 0x0c, 0xd2, 0x81, 0x59, 0xe3, 0x93, 0x3f, 0x01, 0x00, 0x00, 0xff, - 0xff, 0x2b, 0xa5, 0x75, 0x71, 0x5d, 0x02, 0x00, 0x00, + // 415 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x92, 0x3f, 0x0f, 0x93, 0x40, + 0x18, 0x87, 0xb9, 0x56, 0xfb, 0x87, 0xd6, 0xa8, 0xd8, 0x18, 0x5a, 0x23, 0x10, 0x26, 0x1c, 0x3c, + 0x52, 0x4d, 0x1c, 0xea, 0x24, 0x5b, 0x07, 0xb5, 0xd2, 0xcd, 0x85, 0x50, 0xb8, 0xc2, 0x25, 0x85, + 0xc3, 0xbb, 0xa3, 0x09, 0xdf, 0xc0, 0xd1, 0xd1, 0xb1, 0x1f, 0xc2, 0x0f, 0xe1, 0xd8, 0x38, 0x39, + 0x35, 0xa6, 0x5d, 0x74, 0xed, 0x27, 0x30, 0xdc, 0xd1, 0x6a, 0x3a, 0xf1, 0xf2, 0x3e, 0xcf, 0xf1, + 0xbb, 0xf7, 0x0e, 0xd5, 0x61, 0x9c, 0xe2, 0x18, 0xb9, 0x38, 0xe7, 0x88, 0x46, 0x69, 0x88, 0xf3, + 0x4f, 0x25, 0xa2, 0x95, 0xbb, 0x9d, 0xba, 0x19, 0x62, 0x2c, 0x4c, 0x10, 0x83, 0x05, 0x25, 0x9c, + 0x68, 0x63, 0x69, 0xc2, 0x1b, 0x13, 0x6e, 0xa7, 0x93, 0x51, 0x42, 0x12, 0x22, 0x2c, 0xb7, 0xae, + 0xe4, 0x82, 0xc9, 0x38, 0x22, 0x2c, 0x23, 0x2c, 0x90, 0x40, 0xbe, 0x34, 0xe8, 0x29, 0x47, 0x79, + 0x8c, 0x68, 0x86, 0x73, 0xee, 0x46, 0xb4, 0x2a, 0x38, 0x71, 0x0b, 0x4a, 0xc8, 0x5a, 0x62, 0xfb, + 0x4f, 0x4b, 0x7d, 0xfc, 0x96, 0x25, 0xcb, 0x72, 0x95, 0x61, 0xfe, 0xa1, 0x4e, 0xf1, 0x11, 0x2b, + 0x48, 0xce, 0x90, 0x06, 0xd5, 0x9e, 0xc8, 0x0e, 0x70, 0xac, 0x03, 0x0b, 0x38, 0x7d, 0xef, 0xd1, + 0xf9, 0x60, 0xde, 0xaf, 0xc2, 0x6c, 0x33, 0xb3, 0x2f, 0xc4, 0xf6, 0xbb, 0xa2, 0x9c, 0xc7, 0xb5, + 0x2f, 0xb6, 0x59, 0xfb, 0xad, 0x5b, 0xff, 0x42, 0x6c, 0xbf, 0x2b, 0xca, 0x79, 0xac, 0x3d, 0x53, + 0x3b, 0x14, 0xb1, 0x72, 0xc3, 0xf5, 0xb6, 0x05, 0x9c, 0xa1, 0xf7, 0xf0, 0x7c, 0x30, 0xef, 0x49, + 0x5b, 0xf6, 0x6d, 0xbf, 0x11, 0xb4, 0x77, 0x6a, 0x5f, 0x6c, 0x3a, 0x20, 0x05, 0xd3, 0xef, 0x58, + 0xc0, 0x19, 0xbc, 0x78, 0x02, 0xff, 0x0d, 0x06, 0xe5, 0x60, 0x70, 0x51, 0x3b, 0xef, 0x0b, 0xe6, + 0x8d, 0xce, 0x07, 0xf3, 0x81, 0xfc, 0xd4, 0x75, 0x9d, 0xed, 0xf7, 0x8a, 0x86, 0xd7, 0xd1, 0x29, + 0xc2, 0x49, 0xca, 0xf5, 0xbb, 0x16, 0x70, 0xda, 0xff, 0x47, 0xcb, 0xbe, 0xed, 0x37, 0x82, 0xf6, + 0x5a, 0x1d, 0xae, 0x29, 0xc9, 0x82, 0x30, 0x8e, 0x29, 0x62, 0x4c, 0xef, 0x88, 0xc9, 0xf4, 0x1f, + 0xdf, 0x9e, 0x8f, 0x9a, 0x73, 0x7e, 0x23, 0xc9, 0x92, 0x53, 0x9c, 0x27, 0xfe, 0xa0, 0xb6, 0x9b, + 0xd6, 0x6c, 0xf8, 0x79, 0x67, 0x2a, 0x5f, 0x77, 0x26, 0xf8, 0xbd, 0x33, 0x15, 0x6f, 0xf1, 0xfd, + 0x68, 0x80, 0xfd, 0xd1, 0x00, 0xbf, 0x8e, 0x06, 0xf8, 0x72, 0x32, 0x94, 0xfd, 0xc9, 0x50, 0x7e, + 0x9e, 0x0c, 0xe5, 0xe3, 0xab, 0x04, 0xf3, 0xb4, 0x5c, 0xc1, 0x88, 0x64, 0xcd, 0xed, 0xb9, 0x14, + 0x6d, 0xc2, 0x0a, 0xd1, 0xeb, 0x53, 0x9c, 0x32, 0xbb, 0x50, 0xf9, 0x83, 0xac, 0x3a, 0xe2, 0x12, + 0x5f, 0xfe, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x5e, 0x6b, 0x0e, 0x66, 0x5b, 0x02, 0x00, 0x00, } func (m *MsgSubmitQueryResponse) Marshal() (dAtA []byte, err error) { diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index 3f71669cc..f0edda37a 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -24,7 +24,9 @@ import ( host "github.com/cosmos/ibc-go/v5/modules/core/24-host" ibcexported "github.com/cosmos/ibc-go/v5/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v5/modules/light-clients/07-tendermint/types" + strideicqtypes "github.com/cosmos/relayer/v2/relayer/chains/cosmos/stride" "github.com/cosmos/relayer/v2/relayer/provider" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/light" tmtypes "github.com/tendermint/tendermint/types" "go.uber.org/zap" @@ -891,6 +893,43 @@ func (cc *CosmosProvider) MsgUpdateClientHeader(latestHeader provider.IBCHeader, }, nil } +func (cc *CosmosProvider) QueryICQWithProof(ctx context.Context, path string, request []byte, height uint64) (provider.ICQProof, error) { + slashSplit := strings.Split(path, "/") + req := abci.RequestQuery{ + Path: path, + Height: int64(height), + Data: request, + Prove: slashSplit[len(slashSplit)-1] == "key", + } + + res, err := cc.QueryABCI(ctx, req) + if err != nil { + return provider.ICQProof{}, fmt.Errorf("failed to execute interchain query: %w", err) + } + return provider.ICQProof{ + Result: res.Value, + ProofOps: res.ProofOps, + Height: res.Height, + }, nil +} + +func (cc *CosmosProvider) MsgSubmitQueryResponse(chainID string, queryID provider.ClientICQQueryID, proof provider.ICQProof) (provider.RelayerMessage, error) { + signer, err := cc.Address() + if err != nil { + return nil, err + } + msg := &strideicqtypes.MsgSubmitQueryResponse{ + ChainId: chainID, + QueryId: string(queryID), + Result: proof.Result, + ProofOps: proof.ProofOps, + Height: proof.Height, + FromAddress: signer, + } + + return NewCosmosMessage(msg), nil +} + // RelayPacketFromSequence relays a packet with a given seq on src and returns recvPacket msgs, timeoutPacketmsgs and error func (cc *CosmosProvider) RelayPacketFromSequence( ctx context.Context, diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 83da731fe..3e1770518 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -35,9 +35,10 @@ type pathEndRuntime struct { // Cache in progress sends for the different IBC message types // to avoid retrying a message immediately after it is sent. - packetProcessing packetProcessingCache - connProcessing connectionProcessingCache - channelProcessing channelProcessingCache + packetProcessing packetProcessingCache + connProcessing connectionProcessingCache + channelProcessing channelProcessingCache + clientICQProcessing clientICQProcessingCache // Message subscriber callbacks connSubscribers map[string][]func(provider.ConnectionInfo) @@ -64,6 +65,7 @@ func newPathEndRuntime(log *zap.Logger, pathEnd PathEnd, metrics *PrometheusMetr packetProcessing: make(packetProcessingCache), connProcessing: make(connectionProcessingCache), channelProcessing: make(channelProcessingCache), + clientICQProcessing: make(clientICQProcessingCache), connSubscribers: make(map[string][]func(provider.ConnectionInfo)), metrics: metrics, } @@ -94,6 +96,7 @@ func (pathEnd *pathEndRuntime) mergeMessageCache(messageCache IBCMessagesCache, packetMessages := make(ChannelPacketMessagesCache) connectionHandshakeMessages := make(ConnectionMessagesCache) channelHandshakeMessages := make(ChannelMessagesCache) + clientICQMessages := make(ClientICQMessagesCache) for ch, pmc := range messageCache.PacketFlow { if pathEnd.info.ShouldRelayChannel(ChainChannelKey{ChainID: pathEnd.info.ChainID, CounterpartyChainID: counterpartyChainID, ChannelKey: ch}) { @@ -149,6 +152,17 @@ func (pathEnd *pathEndRuntime) mergeMessageCache(messageCache IBCMessagesCache, channelHandshakeMessages[eventType] = newCmc } pathEnd.messageCache.ChannelHandshake.Merge(channelHandshakeMessages) + + for icqType, cm := range messageCache.ClientICQ { + newCache := make(ClientICQMessageCache) + for queryID, m := range cm { + if m.Chain == counterpartyChainID { + newCache[queryID] = m + } + } + clientICQMessages[icqType] = newCache + } + pathEnd.messageCache.ClientICQ.Merge(clientICQMessages) } func (pathEnd *pathEndRuntime) handleCallbacks(c IBCMessagesCache) { @@ -572,6 +586,45 @@ func (pathEnd *pathEndRuntime) shouldSendChannelMessage(message channelIBCMessag return true } +// shouldSendClientICQMessage determines if the client ICQ message should be sent now. +// It will also determine if the message needs to be given up on entirely and remove retention if so. +func (pathEnd *pathEndRuntime) shouldSendClientICQMessage(message provider.ClientICQInfo) bool { + queryID := message.QueryID + inProgress, ok := pathEnd.clientICQProcessing[queryID] + if !ok { + // in progress cache does not exist for this query ID, so can send. + return true + } + blocksSinceLastProcessed := pathEnd.latestBlock.Height - inProgress.lastProcessedHeight + if inProgress.assembled { + if blocksSinceLastProcessed < blocksToRetrySendAfter { + // this message was sent less than blocksToRetrySendAfter ago, do not attempt to send again yet. + return false + } + } else { + if blocksSinceLastProcessed < blocksToRetryAssemblyAfter { + // this message was sent less than blocksToRetryAssemblyAfter ago, do not attempt assembly again yet. + return false + } + } + if inProgress.retryCount >= maxMessageSendRetries { + pathEnd.log.Error("Giving up on sending client ICQ message after max retries", + zap.String("query_id", string(queryID)), + ) + + // giving up on this query + // remove all retention of this client interchain query flow in pathEnd.messagesCache.ClientICQ + pathEnd.messageCache.ClientICQ.DeleteMessages(queryID) + + // delete in progress query for this specific ID + delete(pathEnd.clientICQProcessing, queryID) + + return false + } + + return true +} + func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTrack) { eventType := t.msg.eventType sequence := t.msg.info.Sequence @@ -652,3 +705,19 @@ func (pathEnd *pathEndRuntime) trackProcessingChannelMessage(t channelMessageToT assembled: t.assembled, } } + +func (pathEnd *pathEndRuntime) trackProcessingClientICQMessage(t clientICQMessageToTrack) { + retryCount := uint64(0) + + queryID := t.msg.info.QueryID + + if inProgress, ok := pathEnd.clientICQProcessing[queryID]; ok { + retryCount = inProgress.retryCount + 1 + } + + pathEnd.clientICQProcessing[queryID] = processingMessage{ + lastProcessedHeight: pathEnd.latestBlock.Height, + retryCount: retryCount, + assembled: t.assembled, + } +} diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index 1b76b6bd1..c21af22a0 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -24,6 +24,9 @@ const ( // to be relayed. packetProofQueryTimeout = 5 * time.Second + // Amount of time to wait for interchain queries. + interchainQueryTimeout = 60 * time.Second + // If message assembly fails from either proof query failure on the source // or assembling the message for the destination, how many blocks should pass // before retrying. diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index cdd18d357..761bb5bb5 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -395,6 +395,33 @@ ChannelHandshakeLoop: return res } +func (pp *PathProcessor) getUnrelayedClientICQMessages(pathEnd *pathEndRuntime, queryMessages, responseMessages ClientICQMessageCache) (res []clientICQMessage) { +ClientICQLoop: + for queryID, queryMsg := range queryMessages { + for resQueryID := range responseMessages { + if queryID == resQueryID { + // done with this query, remove all retention. + pathEnd.messageCache.ClientICQ.DeleteMessages(queryID) + delete(pathEnd.clientICQProcessing, queryID) + continue ClientICQLoop + } + } + // query ID not found in response messages, check if should send queryMsg and send + if pathEnd.shouldSendClientICQMessage(queryMsg) { + res = append(res, clientICQMessage{ + info: queryMsg, + }) + } + } + + // now iterate through completion message and remove any leftover messages. + for queryID := range responseMessages { + pathEnd.messageCache.ClientICQ.DeleteMessages(queryID) + delete(pathEnd.clientICQProcessing, queryID) + } + return res +} + // assembleMsgUpdateClient uses the ChainProvider from both pathEnds to assemble the client update header // from the source and then assemble the update client message in the correct format for the destination. func (pp *PathProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst *pathEndRuntime) (provider.RelayerMessage, error) { @@ -660,16 +687,29 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, messageLifec pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd1ChanCloseMessages...) pathEnd2ChannelMessages = append(pathEnd2ChannelMessages, pathEnd2ChanCloseMessages...) + pathEnd1ClientICQMessages := pp.getUnrelayedClientICQMessages( + pp.pathEnd1, + pp.pathEnd1.messageCache.ClientICQ[ClientICQTypeRequest], + pp.pathEnd1.messageCache.ClientICQ[ClientICQTypeResponse], + ) + pathEnd2ClientICQMessages := pp.getUnrelayedClientICQMessages( + pp.pathEnd2, + pp.pathEnd2.messageCache.ClientICQ[ClientICQTypeRequest], + pp.pathEnd2.messageCache.ClientICQ[ClientICQTypeResponse], + ) + pathEnd1Messages := pathEndMessages{ connectionMessages: pathEnd1ConnectionMessages, channelMessages: pathEnd1ChannelMessages, packetMessages: pathEnd1PacketMessages, + clientICQMessages: pathEnd1ClientICQMessages, } pathEnd2Messages := pathEndMessages{ connectionMessages: pathEnd2ConnectionMessages, channelMessages: pathEnd2ChannelMessages, packetMessages: pathEnd2PacketMessages, + clientICQMessages: pathEnd2ClientICQMessages, } pp.appendInitialMessageIfNecessary(messageLifecycle, &pathEnd1Messages, &pathEnd2Messages) @@ -739,6 +779,18 @@ func (pp *PathProcessor) assembleMessage( zap.String("port_id", m.info.PortID), ) } + case clientICQMessage: + message, err = pp.assembleClientICQMessage(ctx, m, src, dst) + om.clientICQMsgs[i] = clientICQMessageToTrack{ + msg: m, + assembled: err == nil, + } + if err == nil { + dst.log.Debug("Will send ICQ message", + zap.String("type", m.info.Type), + zap.String("query_id", string(m.info.QueryID)), + ) + } } if err != nil { pp.log.Error("Error assembling channel message", zap.Error(err)) @@ -753,7 +805,7 @@ func (pp *PathProcessor) assembleAndSendMessages( messages pathEndMessages, ) error { var needsClientUpdate bool - if len(messages.packetMessages) == 0 && len(messages.connectionMessages) == 0 && len(messages.channelMessages) == 0 { + if len(messages.packetMessages) == 0 && len(messages.connectionMessages) == 0 && len(messages.channelMessages) == 0 && len(messages.clientICQMessages) == 0 { var consensusHeightTime time.Time if dst.clientState.ConsensusTime.IsZero() { h, err := src.chainProvider.QueryIBCHeader(ctx, int64(dst.clientState.ConsensusHeight.RevisionHeight)) @@ -783,7 +835,7 @@ func (pp *PathProcessor) assembleAndSendMessages( msgs: make( []provider.RelayerMessage, 0, - len(messages.packetMessages)+len(messages.connectionMessages)+len(messages.channelMessages), + len(messages.packetMessages)+len(messages.connectionMessages)+len(messages.channelMessages)+len(messages.clientICQMessages), ), } msgUpdateClient, err := pp.assembleMsgUpdateClient(ctx, src, dst) @@ -816,6 +868,17 @@ func (pp *PathProcessor) assembleAndSendMessages( wg.Wait() } + if len(om.msgs) == 1 { + om.clientICQMsgs = make([]clientICQMessageToTrack, len(messages.clientICQMessages)) + // only assemble and send ICQ messages if there are no conn or chan handshake messages + for i, msg := range messages.clientICQMessages { + wg.Add(1) + go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) + } + + wg.Wait() + } + if len(om.msgs) == 1 { om.pktMsgs = make([]packetMessageToTrack, len(messages.packetMessages)) // only assemble and send packet messages if there are no handshake messages @@ -840,6 +903,10 @@ func (pp *PathProcessor) assembleAndSendMessages( dst.trackProcessingChannelMessage(m) } + for _, m := range om.clientICQMsgs { + dst.trackProcessingClientICQMessage(m) + } + for _, m := range om.pktMsgs { dst.trackProcessingPacketMessage(m) } @@ -1019,6 +1086,22 @@ func (pp *PathProcessor) assembleChannelMessage( return assembleMessage(msg.info, proof) } +func (pp *PathProcessor) assembleClientICQMessage( + ctx context.Context, + msg clientICQMessage, + src, dst *pathEndRuntime, +) (provider.RelayerMessage, error) { + ctx, cancel := context.WithTimeout(ctx, interchainQueryTimeout) + defer cancel() + + proof, err := src.chainProvider.QueryICQWithProof(ctx, msg.info.Type, msg.info.Request, src.latestBlock.Height-1) + if err != nil { + return nil, fmt.Errorf("error during interchain query: %w", err) + } + + return dst.chainProvider.MsgSubmitQueryResponse(msg.info.Chain, msg.info.QueryID, proof) +} + func (pp *PathProcessor) channelMessagesToSend(pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes pathEndChannelHandshakeResponse) ([]channelIBCMessage, []channelIBCMessage) { pathEnd1ChannelSrcLen := len(pathEnd1ChannelHandshakeRes.SrcMessages) pathEnd1ChannelDstLen := len(pathEnd1ChannelHandshakeRes.DstMessages) diff --git a/relayer/processor/types.go b/relayer/processor/types.go index c411a101c..e4594dde7 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -75,6 +75,7 @@ type IBCMessagesCache struct { PacketFlow ChannelPacketMessagesCache ConnectionHandshake ConnectionMessagesCache ChannelHandshake ChannelMessagesCache + ClientICQ ClientICQMessagesCache } // Clone makes a deep copy of an IBCMessagesCache. @@ -83,10 +84,12 @@ func (c IBCMessagesCache) Clone() IBCMessagesCache { PacketFlow: make(ChannelPacketMessagesCache, len(c.PacketFlow)), ConnectionHandshake: make(ConnectionMessagesCache, len(c.ConnectionHandshake)), ChannelHandshake: make(ChannelMessagesCache, len(c.ChannelHandshake)), + ClientICQ: make(ClientICQMessagesCache, len(c.ClientICQ)), } x.PacketFlow.Merge(c.PacketFlow) x.ConnectionHandshake.Merge(c.ConnectionHandshake) x.ChannelHandshake.Merge(c.ChannelHandshake) + x.ClientICQ.Merge(c.ClientICQ) return x } @@ -96,6 +99,7 @@ func NewIBCMessagesCache() IBCMessagesCache { PacketFlow: make(ChannelPacketMessagesCache), ConnectionHandshake: make(ConnectionMessagesCache), ChannelHandshake: make(ChannelMessagesCache), + ClientICQ: make(ClientICQMessagesCache), } } @@ -120,6 +124,15 @@ type ConnectionMessagesCache map[string]ConnectionMessageCache // ConnectionMessageCache is used for caching connection handshake IBC messages for a given IBC connection. type ConnectionMessageCache map[ConnectionKey]provider.ConnectionInfo +// ClientICQType string wrapper for query/response type. +type ClientICQType string + +// ClientICQMessagesCache is used for caching a ClientICQMessageCache for a given type (query/response). +type ClientICQMessagesCache map[ClientICQType]ClientICQMessageCache + +// ClientICQMessageCache is used for caching a client ICQ message for a given query ID. +type ClientICQMessageCache map[provider.ClientICQQueryID]provider.ClientICQInfo + // ChannelKey is the key used for identifying channels between ChainProcessor and PathProcessor. type ChannelKey struct { ChannelID string @@ -399,6 +412,41 @@ func (c ChannelMessageCache) Merge(other ChannelMessageCache) { } } +// Retain creates cache path if it doesn't exist, then caches message. +func (c ClientICQMessagesCache) Retain(icqType ClientICQType, ci provider.ClientICQInfo) { + queryID := ci.QueryID + if _, ok := c[icqType]; !ok { + c[icqType] = make(ClientICQMessageCache) + } + c[icqType][queryID] = ci +} + +// Merge merges another ClientICQMessagesCache into this one. +func (c ClientICQMessagesCache) Merge(other ClientICQMessagesCache) { + for k, v := range other { + _, ok := c[k] + if !ok { + c[k] = v + } else { + c[k].Merge(v) + } + } +} + +// Merge merges another ClientICQMessageCache into this one. +func (c ClientICQMessageCache) Merge(other ClientICQMessageCache) { + for k, v := range other { + c[k] = v + } +} + +// DeleteMessages deletes cached messages for the provided query ID. +func (c ClientICQMessagesCache) DeleteMessages(queryID provider.ClientICQQueryID) { + for _, cm := range c { + delete(cm, queryID) + } +} + // IBCHeaderCache holds a mapping of IBCHeaders for their block height. type IBCHeaderCache map[uint64]provider.IBCHeader diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index f9a56c8f1..a389cc467 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -16,6 +16,7 @@ type pathEndMessages struct { connectionMessages []connectionIBCMessage channelMessages []channelIBCMessage packetMessages []packetIBCMessage + clientICQMessages []clientICQMessage } type ibcMessage interface { @@ -55,6 +56,19 @@ type connectionIBCMessage struct { func (connectionIBCMessage) ibcMessageIndicator() {} +const ( + ClientICQTypeRequest = "query_request" + ClientICQTypeResponse = "query_response" +) + +// clientICQMessage holds a client ICQ message info, +// useful for sending messages around internal to the PathProcessor. +type clientICQMessage struct { + info provider.ClientICQInfo +} + +func (clientICQMessage) ibcMessageIndicator() {} + // processingMessage tracks the state of a IBC message currently being processed. type processingMessage struct { assembled bool @@ -102,6 +116,8 @@ func (c connectionProcessingCache) deleteMessages(toDelete ...map[string][]Conne } } +type clientICQProcessingCache map[provider.ClientICQQueryID]processingMessage + // contains MsgRecvPacket from counterparty // entire packet flow type pathEndPacketFlowMessages struct { @@ -191,11 +207,12 @@ func channelInfoChannelKey(c provider.ChannelInfo) ChannelKey { // outgoingMessages is a slice of relayer messages that can be // appended to concurrently. type outgoingMessages struct { - mu sync.Mutex - msgs []provider.RelayerMessage - pktMsgs []packetMessageToTrack - connMsgs []connectionMessageToTrack - chanMsgs []channelMessageToTrack + mu sync.Mutex + msgs []provider.RelayerMessage + pktMsgs []packetMessageToTrack + connMsgs []connectionMessageToTrack + chanMsgs []channelMessageToTrack + clientICQMsgs []clientICQMessageToTrack } // MarshalLogObject satisfies the zapcore.ObjectMarshaler interface @@ -254,6 +271,11 @@ type channelMessageToTrack struct { assembled bool } +type clientICQMessageToTrack struct { + msg clientICQMessage + assembled bool +} + // orderFromString parses a string into a channel order byte. func orderFromString(order string) chantypes.Order { switch strings.ToUpper(order) { diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index 8a1a500d1..a8fa6efdf 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -13,6 +13,7 @@ import ( commitmenttypes "github.com/cosmos/ibc-go/v5/modules/core/23-commitment/types" ibcexported "github.com/cosmos/ibc-go/v5/modules/core/exported" "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/proto/tendermint/crypto" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -124,6 +125,21 @@ type ChannelInfo struct { Version string } +// ClientICQQueryID string wrapper for query ID. +type ClientICQQueryID string + +// ClientICQInfo contains relevant properties from client ICQ messages. +// Client ICQ implementation does not use IBC connections and channels. +type ClientICQInfo struct { + Source string + Connection string + Chain string + QueryID ClientICQQueryID + Type string + Request []byte + Height uint64 +} + // PacketProof includes all of the proof parameters needed for packet flows. type PacketProof struct { Proof []byte @@ -146,6 +162,12 @@ type ChannelProof struct { Version string } +type ICQProof struct { + Result []byte + ProofOps *crypto.ProofOps // TODO swap out tendermint type for third party chains. + Height int64 +} + // loggableEvents is an unexported wrapper type for a slice of RelayerEvent, // to satisfy the zapcore.ArrayMarshaler interface. type loggableEvents []RelayerEvent @@ -327,6 +349,17 @@ type ChainProvider interface { // [End] Client IBC message assembly + // [Begin] Client ICQ message assembly + + // QueryICQWithProof performs an ABCI query and includes the required proofOps. + QueryICQWithProof(ctx context.Context, msgType string, request []byte, height uint64) (ICQProof, error) + + // MsgSubmitQueryResponse takes the counterparty chain ID, the ICQ query ID, and the query result proof, + // then assembles a MsgSubmitQueryResponse message formatted for sending to this chain. + MsgSubmitQueryResponse(chainID string, queryID ClientICQQueryID, proof ICQProof) (RelayerMessage, error) + + // [End] Client ICQ message assembly + // Query heavy relay methods. Only used for flushing old packets. RelayPacketFromSequence(ctx context.Context, src ChainProvider, srch, dsth, seq uint64, srcChanID, srcPortID string, order chantypes.Order) (RelayerMessage, RelayerMessage, error) diff --git a/scripts/protocgen.sh b/scripts/protocgen.sh index 312bc7089..b2c7fe0ba 100755 --- a/scripts/protocgen.sh +++ b/scripts/protocgen.sh @@ -27,5 +27,5 @@ done # move proto files to the right places # # Note: Proto files are suffixed with the current binary version. -cp -r github.com/Stride-Labs/stride/v5/x/interchainquery/types/* ./relayer/chains/cosmos/stride/ +cp -r github.com/cosmos/relayer/* ./ rm -rf github.com \ No newline at end of file