diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index 6e7779172..2b49b81f6 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -314,6 +314,8 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu newLatestQueriedBlock := persistence.latestQueriedBlock + chainID := ccp.chainProvider.ChainId() + for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ { var eg errgroup.Group var blockRes *ctypes.ResultBlockResults @@ -359,7 +361,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu // tx was not successful continue } - messages := ccp.ibcMessagesFromTransaction(tx, heightUint64) + messages := ibcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64) for _, m := range messages { ccp.handleMessage(m, ibcMessagesCache) @@ -382,8 +384,6 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu return nil } - chainID := ccp.chainProvider.ChainId() - for _, pp := range ccp.pathProcessors { clientID := pp.RelevantClientID(chainID) clientState, err := ccp.clientState(ctx, clientID) diff --git a/relayer/chains/cosmos/event_parser.go b/relayer/chains/cosmos/event_parser.go index ee36636ce..80650f102 100644 --- a/relayer/chains/cosmos/event_parser.go +++ b/relayer/chains/cosmos/event_parser.go @@ -9,6 +9,7 @@ import ( clienttypes "github.com/cosmos/ibc-go/v5/modules/core/02-client/types" conntypes "github.com/cosmos/ibc-go/v5/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v5/modules/core/04-channel/types" + "github.com/cosmos/relayer/v2/relayer/processor" "github.com/cosmos/relayer/v2/relayer/provider" abci "github.com/tendermint/tendermint/abci/types" "go.uber.org/zap" @@ -30,94 +31,126 @@ func (ccp *CosmosChainProcessor) ibcMessagesFromBlockEvents( beginBlockEvents, endBlockEvents []abci.Event, height uint64, ) (res []ibcMessage) { - beginBlockStringified := sdk.StringifyEvents(beginBlockEvents) - for _, event := range beginBlockStringified { - // don't use accumulator on begin and end block events, can be multiple IBC messages. - msg := parseIBCMessageFromEvent(ccp.log, event, height, new(ibcMessage)) - if msg == nil { - // not an ibc event - continue - } - res = append(res, *msg) - } - - endBlockStringified := sdk.StringifyEvents(endBlockEvents) - for _, event := range endBlockStringified { - // don't use accumulator on begin and end block events, can be multiple IBC messages. - msg := parseIBCMessageFromEvent(ccp.log, event, height, new(ibcMessage)) - if msg == nil { - // not an ibc event - continue - } - res = append(res, *msg) - } + chainID := ccp.chainProvider.ChainId() + res = append(res, ibcMessagesFromEvents(ccp.log, beginBlockEvents, chainID, height)...) + res = append(res, ibcMessagesFromEvents(ccp.log, endBlockEvents, chainID, height)...) return res } -// ibcMessagesFromTransaction parses all events within a transaction to find IBC messages -func (ccp *CosmosChainProcessor) ibcMessagesFromTransaction(tx *abci.ResponseDeliverTx, height uint64) []ibcMessage { - parsedLogs, err := sdk.ParseABCILogs(tx.Log) - if err != nil { - ccp.log.Info("Failed to parse abci logs", zap.Error(err)) - return nil - } - return parseABCILogs(ccp.log, parsedLogs, height) +type packetKey struct { + sequence uint64 + channel processor.ChannelKey } -func parseABCILogs(log *zap.Logger, logs sdk.ABCIMessageLogs, height uint64) (messages []ibcMessage) { - for _, msgLog := range logs { - msg := new(ibcMessage) - for _, event := range msgLog.Events { - msg = parseIBCMessageFromEvent(log, event, height, msg) - } - - if msg.info == nil { - // Not an IBC message, don't need to log here - continue +// ibcMessagesFromTransaction parses all events within a transaction to find IBC messages +func ibcMessagesFromEvents( + log *zap.Logger, + events []abci.Event, + chainID string, + height uint64, +) (messages []ibcMessage) { + recvPacketMsgs := make(map[packetKey]*packetInfo) + for _, event := range events { + evt := sdk.StringifyEvent(event) + switch event.Type { + case chantypes.EventTypeRecvPacket, chantypes.EventTypeWriteAck: + pi := &packetInfo{Height: height} + pi.parseAttrs(log, evt.Attributes) + ck, err := processor.PacketInfoChannelKey(event.Type, provider.PacketInfo(*pi)) + if err == nil { + pk := packetKey{ + sequence: pi.Sequence, + channel: ck, + } + _, ok := recvPacketMsgs[pk] + if !ok { + recvPacketMsgs[pk] = pi + } else { + recvPacketMsgs[pk].parseAttrs(log, evt.Attributes) + } + } + default: + m := parseIBCMessageFromEvent(log, evt, chainID, height) + if m == nil || m.info == nil { + // Not an IBC message, don't need to log here + continue + } + messages = append(messages, *m) } + } - messages = append(messages, *msg) + for _, recvPacketMsg := range recvPacketMsgs { + messages = append(messages, ibcMessage{ + eventType: chantypes.EventTypeRecvPacket, + info: recvPacketMsg, + }) } return messages } -func parseIBCMessageFromEvent(log *zap.Logger, event sdk.StringEvent, height uint64, msg *ibcMessage) *ibcMessage { +func parseIBCMessageFromEvent( + log *zap.Logger, + event sdk.StringEvent, + chainID string, + height uint64, +) *ibcMessage { switch event.Type { - case clienttypes.EventTypeCreateClient, clienttypes.EventTypeUpdateClient, - clienttypes.EventTypeUpgradeClient, clienttypes.EventTypeSubmitMisbehaviour, - clienttypes.EventTypeUpdateClientProposal: - ci := new(clientInfo) - ci.parseAttrs(log, event.Attributes) - msg.eventType = event.Type - msg.info = ci - case chantypes.EventTypeSendPacket, chantypes.EventTypeRecvPacket, + case chantypes.EventTypeSendPacket, chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, - chantypes.EventTypeTimeoutPacketOnClose, chantypes.EventTypeWriteAck: - var pi *packetInfo - if msg.info == nil { - pi = &packetInfo{Height: height} - } else { - pi = msg.info.(*packetInfo) - } + chantypes.EventTypeTimeoutPacketOnClose: + pi := &packetInfo{Height: height} pi.parseAttrs(log, event.Attributes) - msg.info = pi - if event.Type != chantypes.EventTypeWriteAck { - msg.eventType = event.Type + return &ibcMessage{ + eventType: event.Type, + info: pi, } - case conntypes.EventTypeConnectionOpenInit, conntypes.EventTypeConnectionOpenTry, - conntypes.EventTypeConnectionOpenAck, conntypes.EventTypeConnectionOpenConfirm: - ci := &connectionInfo{Height: height} - ci.parseAttrs(log, event.Attributes) - msg.eventType = event.Type - msg.info = ci case chantypes.EventTypeChannelOpenInit, chantypes.EventTypeChannelOpenTry, chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm, chantypes.EventTypeChannelCloseInit, chantypes.EventTypeChannelCloseConfirm: ci := &channelInfo{Height: height} ci.parseAttrs(log, event.Attributes) + return &ibcMessage{ + eventType: event.Type, + info: ci, + } + case conntypes.EventTypeConnectionOpenInit, conntypes.EventTypeConnectionOpenTry, + conntypes.EventTypeConnectionOpenAck, conntypes.EventTypeConnectionOpenConfirm: + ci := &connectionInfo{Height: height} + ci.parseAttrs(log, event.Attributes) + return &ibcMessage{ + eventType: event.Type, + info: ci, + } + case clienttypes.EventTypeCreateClient, clienttypes.EventTypeUpdateClient, + clienttypes.EventTypeUpgradeClient, clienttypes.EventTypeSubmitMisbehaviour, + clienttypes.EventTypeUpdateClientProposal: + ci := new(clientInfo) + ci.parseAttrs(log, event.Attributes) + return &ibcMessage{ + eventType: event.Type, + info: ci, + } + } + return nil +} + +func (msg *ibcMessage) parseIBCPacketReceiveMessageFromEvent( + log *zap.Logger, + event sdk.StringEvent, + chainID string, + height uint64, +) *ibcMessage { + var pi *packetInfo + if msg.info == nil { + pi = &packetInfo{Height: height} + msg.info = pi + } else { + pi = msg.info.(*packetInfo) + } + pi.parseAttrs(log, event.Attributes) + if event.Type != chantypes.EventTypeWriteAck { msg.eventType = event.Type - msg.info = ci } return msg } @@ -324,6 +357,10 @@ func (res *channelInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) { } } +// parseChannelAttribute parses channel attributes from an event. +// If the attribute has already been parsed into the channelInfo, +// it will not overwrite, and return true to inform the caller that +// the attribute already exists. func (res *channelInfo) parseChannelAttribute(attr sdk.Attribute) { switch attr.Key { case chantypes.AttributeKeyPortID: @@ -336,6 +373,8 @@ func (res *channelInfo) parseChannelAttribute(attr sdk.Attribute) { res.CounterpartyChannelID = attr.Value case chantypes.AttributeKeyConnectionID: res.ConnID = attr.Value + case chantypes.AttributeVersion: + res.Version = attr.Value } } diff --git a/relayer/chains/cosmos/event_parser_test.go b/relayer/chains/cosmos/event_parser_test.go index 7e943b87e..1cd823a4a 100644 --- a/relayer/chains/cosmos/event_parser_test.go +++ b/relayer/chains/cosmos/event_parser_test.go @@ -11,6 +11,7 @@ import ( "github.com/cosmos/relayer/v2/relayer/provider" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" "go.uber.org/zap" ) @@ -216,79 +217,90 @@ func TestParseEventLogs(t *testing.T) { testPacketDstChannel = "channel-1" testPacketDstPort = "port-1" ) - abciLogs := sdk.ABCIMessageLogs{ + events := []abci.Event{ + + { + Type: clienttypes.EventTypeUpdateClient, + Attributes: []abci.EventAttribute{ + { + Key: []byte(clienttypes.AttributeKeyClientID), + Value: []byte(testClientID1), + }, + { + Key: []byte(clienttypes.AttributeKeyConsensusHeight), + Value: []byte(testClientConsensusHeight), + }, + }, + }, { - MsgIndex: 0, - Events: sdk.StringEvents{ + Type: chantypes.EventTypeRecvPacket, + Attributes: []abci.EventAttribute{ + { + Key: []byte(chantypes.AttributeKeySequence), + Value: []byte(testPacketSequence), + }, + { + Key: []byte(chantypes.AttributeKeyDataHex), + Value: []byte(testPacketDataHex), + }, + { + Key: []byte(chantypes.AttributeKeyTimeoutHeight), + Value: []byte(testPacketTimeoutHeight), + }, + { + Key: []byte(chantypes.AttributeKeyTimeoutTimestamp), + Value: []byte(testPacketTimeoutTimestamp), + }, + { + Key: []byte(chantypes.AttributeKeySrcChannel), + Value: []byte(testPacketSrcChannel), + }, + { + Key: []byte(chantypes.AttributeKeySrcPort), + Value: []byte(testPacketSrcPort), + }, + { + Key: []byte(chantypes.AttributeKeyDstChannel), + Value: []byte(testPacketDstChannel), + }, { - Type: clienttypes.EventTypeUpdateClient, - Attributes: []sdk.Attribute{ - { - Key: clienttypes.AttributeKeyClientID, - Value: testClientID1, - }, - { - Key: clienttypes.AttributeKeyConsensusHeight, - Value: testClientConsensusHeight, - }, - }, + Key: []byte(chantypes.AttributeKeyDstPort), + Value: []byte(testPacketDstPort), }, }, }, { - MsgIndex: 1, - Events: sdk.StringEvents{ + Type: chantypes.EventTypeWriteAck, + Attributes: []abci.EventAttribute{ + { + Key: []byte(chantypes.AttributeKeySequence), + Value: []byte(testPacketSequence), + }, + { + Key: []byte(chantypes.AttributeKeyAckHex), + Value: []byte(testPacketAckHex), + }, + { + Key: []byte(chantypes.AttributeKeySrcChannel), + Value: []byte(testPacketSrcChannel), + }, + { + Key: []byte(chantypes.AttributeKeySrcPort), + Value: []byte(testPacketSrcPort), + }, { - Type: chantypes.EventTypeRecvPacket, - Attributes: []sdk.Attribute{ - { - Key: chantypes.AttributeKeySequence, - Value: testPacketSequence, - }, - { - Key: chantypes.AttributeKeyDataHex, - Value: testPacketDataHex, - }, - { - Key: chantypes.AttributeKeyTimeoutHeight, - Value: testPacketTimeoutHeight, - }, - { - Key: chantypes.AttributeKeyTimeoutTimestamp, - Value: testPacketTimeoutTimestamp, - }, - { - Key: chantypes.AttributeKeySrcChannel, - Value: testPacketSrcChannel, - }, - { - Key: chantypes.AttributeKeySrcPort, - Value: testPacketSrcPort, - }, - { - Key: chantypes.AttributeKeyDstChannel, - Value: testPacketDstChannel, - }, - { - Key: chantypes.AttributeKeyDstPort, - Value: testPacketDstPort, - }, - }, + Key: []byte(chantypes.AttributeKeyDstChannel), + Value: []byte(testPacketDstChannel), }, { - Type: chantypes.EventTypeWriteAck, - Attributes: []sdk.Attribute{ - { - Key: chantypes.AttributeKeyAckHex, - Value: testPacketAckHex, - }, - }, + Key: []byte(chantypes.AttributeKeyDstPort), + Value: []byte(testPacketDstPort), }, }, }, } - ibcMessages := parseABCILogs(zap.NewNop(), abciLogs, 0) + ibcMessages := ibcMessagesFromEvents(zap.NewNop(), events, "", 0) require.Len(t, ibcMessages, 2) diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index e237eacb9..3dd1d2f96 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -52,13 +52,9 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, return nil, err } var ibcMsgs []ibcMessage + chainID := cc.ChainId() for _, tx := range res.Txs { - parsedLogs, err := sdk.ParseABCILogs(tx.TxResult.Log) - if err != nil { - continue - } - - ibcMsgs = append(ibcMsgs, parseABCILogs(log, parsedLogs, 0)...) + ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, tx.TxResult.Events, chainID, 0)...) } return ibcMsgs, nil diff --git a/relayer/processor/types.go b/relayer/processor/types.go index ab755036d..92341f294 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -424,7 +424,7 @@ func (c IBCHeaderCache) Prune(keep int) { // PacketInfoChannelKey returns the applicable ChannelKey for the chain based on the eventType. func PacketInfoChannelKey(eventType string, info provider.PacketInfo) (ChannelKey, error) { switch eventType { - case chantypes.EventTypeRecvPacket: + case chantypes.EventTypeRecvPacket, chantypes.EventTypeWriteAck: return packetInfoChannelKey(info).Counterparty(), nil case chantypes.EventTypeSendPacket, chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket, chantypes.EventTypeTimeoutPacketOnClose: return packetInfoChannelKey(info), nil