Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flattening of events #979

Merged
merged 4 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu

newLatestQueriedBlock := persistence.latestQueriedBlock

chainID := ccp.chainProvider.ChainId()

for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ {
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
Expand Down Expand Up @@ -359,7 +361,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
// tx was not successful
continue
}
messages := ccp.ibcMessagesFromTransaction(tx, heightUint64)
messages := ibcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64)

for _, m := range messages {
ccp.handleMessage(m, ibcMessagesCache)
Expand All @@ -382,8 +384,6 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
return nil
}

chainID := ccp.chainProvider.ChainId()

for _, pp := range ccp.pathProcessors {
clientID := pp.RelevantClientID(chainID)
clientState, err := ccp.clientState(ctx, clientID)
Expand Down
171 changes: 105 additions & 66 deletions relayer/chains/cosmos/event_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
clienttypes "github.com/cosmos/ibc-go/v5/modules/core/02-client/types"
conntypes "github.com/cosmos/ibc-go/v5/modules/core/03-connection/types"
chantypes "github.com/cosmos/ibc-go/v5/modules/core/04-channel/types"
"github.com/cosmos/relayer/v2/relayer/processor"
"github.com/cosmos/relayer/v2/relayer/provider"
abci "github.com/tendermint/tendermint/abci/types"
"go.uber.org/zap"
Expand All @@ -30,94 +31,126 @@ func (ccp *CosmosChainProcessor) ibcMessagesFromBlockEvents(
beginBlockEvents, endBlockEvents []abci.Event,
height uint64,
) (res []ibcMessage) {
beginBlockStringified := sdk.StringifyEvents(beginBlockEvents)
for _, event := range beginBlockStringified {
// don't use accumulator on begin and end block events, can be multiple IBC messages.
msg := parseIBCMessageFromEvent(ccp.log, event, height, new(ibcMessage))
if msg == nil {
// not an ibc event
continue
}
res = append(res, *msg)
}

endBlockStringified := sdk.StringifyEvents(endBlockEvents)
for _, event := range endBlockStringified {
// don't use accumulator on begin and end block events, can be multiple IBC messages.
msg := parseIBCMessageFromEvent(ccp.log, event, height, new(ibcMessage))
if msg == nil {
// not an ibc event
continue
}
res = append(res, *msg)
}
chainID := ccp.chainProvider.ChainId()
res = append(res, ibcMessagesFromEvents(ccp.log, beginBlockEvents, chainID, height)...)
res = append(res, ibcMessagesFromEvents(ccp.log, endBlockEvents, chainID, height)...)
return res
}

// ibcMessagesFromTransaction parses all events within a transaction to find IBC messages
func (ccp *CosmosChainProcessor) ibcMessagesFromTransaction(tx *abci.ResponseDeliverTx, height uint64) []ibcMessage {
parsedLogs, err := sdk.ParseABCILogs(tx.Log)
if err != nil {
ccp.log.Info("Failed to parse abci logs", zap.Error(err))
return nil
}
return parseABCILogs(ccp.log, parsedLogs, height)
type packetKey struct {
sequence uint64
channel processor.ChannelKey
}

func parseABCILogs(log *zap.Logger, logs sdk.ABCIMessageLogs, height uint64) (messages []ibcMessage) {
for _, msgLog := range logs {
msg := new(ibcMessage)
for _, event := range msgLog.Events {
msg = parseIBCMessageFromEvent(log, event, height, msg)
}

if msg.info == nil {
// Not an IBC message, don't need to log here
continue
// ibcMessagesFromTransaction parses all events within a transaction to find IBC messages
func ibcMessagesFromEvents(
log *zap.Logger,
events []abci.Event,
chainID string,
height uint64,
) (messages []ibcMessage) {
recvPacketMsgs := make(map[packetKey]*packetInfo)
for _, event := range events {
evt := sdk.StringifyEvent(event)
switch event.Type {
case chantypes.EventTypeRecvPacket, chantypes.EventTypeWriteAck:
pi := &packetInfo{Height: height}
pi.parseAttrs(log, evt.Attributes)
ck, err := processor.PacketInfoChannelKey(event.Type, provider.PacketInfo(*pi))
if err == nil {
pk := packetKey{
sequence: pi.Sequence,
channel: ck,
}
_, ok := recvPacketMsgs[pk]
if !ok {
recvPacketMsgs[pk] = pi
} else {
recvPacketMsgs[pk].parseAttrs(log, evt.Attributes)
}
}
default:
m := parseIBCMessageFromEvent(log, evt, chainID, height)
if m == nil || m.info == nil {
// Not an IBC message, don't need to log here
continue
}
messages = append(messages, *m)
}
}

messages = append(messages, *msg)
for _, recvPacketMsg := range recvPacketMsgs {
messages = append(messages, ibcMessage{
eventType: chantypes.EventTypeRecvPacket,
info: recvPacketMsg,
})
}

return messages
}

func parseIBCMessageFromEvent(log *zap.Logger, event sdk.StringEvent, height uint64, msg *ibcMessage) *ibcMessage {
func parseIBCMessageFromEvent(
log *zap.Logger,
event sdk.StringEvent,
chainID string,
height uint64,
) *ibcMessage {
switch event.Type {
case clienttypes.EventTypeCreateClient, clienttypes.EventTypeUpdateClient,
clienttypes.EventTypeUpgradeClient, clienttypes.EventTypeSubmitMisbehaviour,
clienttypes.EventTypeUpdateClientProposal:
ci := new(clientInfo)
ci.parseAttrs(log, event.Attributes)
msg.eventType = event.Type
msg.info = ci
case chantypes.EventTypeSendPacket, chantypes.EventTypeRecvPacket,
case chantypes.EventTypeSendPacket,
chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket,
chantypes.EventTypeTimeoutPacketOnClose, chantypes.EventTypeWriteAck:
var pi *packetInfo
if msg.info == nil {
pi = &packetInfo{Height: height}
} else {
pi = msg.info.(*packetInfo)
}
chantypes.EventTypeTimeoutPacketOnClose:
pi := &packetInfo{Height: height}
pi.parseAttrs(log, event.Attributes)
msg.info = pi
if event.Type != chantypes.EventTypeWriteAck {
msg.eventType = event.Type
return &ibcMessage{
eventType: event.Type,
info: pi,
}
case conntypes.EventTypeConnectionOpenInit, conntypes.EventTypeConnectionOpenTry,
conntypes.EventTypeConnectionOpenAck, conntypes.EventTypeConnectionOpenConfirm:
ci := &connectionInfo{Height: height}
ci.parseAttrs(log, event.Attributes)
msg.eventType = event.Type
msg.info = ci
case chantypes.EventTypeChannelOpenInit, chantypes.EventTypeChannelOpenTry,
chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm,
chantypes.EventTypeChannelCloseInit, chantypes.EventTypeChannelCloseConfirm:
ci := &channelInfo{Height: height}
ci.parseAttrs(log, event.Attributes)
return &ibcMessage{
eventType: event.Type,
info: ci,
}
case conntypes.EventTypeConnectionOpenInit, conntypes.EventTypeConnectionOpenTry,
conntypes.EventTypeConnectionOpenAck, conntypes.EventTypeConnectionOpenConfirm:
ci := &connectionInfo{Height: height}
ci.parseAttrs(log, event.Attributes)
return &ibcMessage{
eventType: event.Type,
info: ci,
}
case clienttypes.EventTypeCreateClient, clienttypes.EventTypeUpdateClient,
clienttypes.EventTypeUpgradeClient, clienttypes.EventTypeSubmitMisbehaviour,
clienttypes.EventTypeUpdateClientProposal:
ci := new(clientInfo)
ci.parseAttrs(log, event.Attributes)
return &ibcMessage{
eventType: event.Type,
info: ci,
}
}
return nil
}

func (msg *ibcMessage) parseIBCPacketReceiveMessageFromEvent(
log *zap.Logger,
event sdk.StringEvent,
chainID string,
height uint64,
) *ibcMessage {
var pi *packetInfo
if msg.info == nil {
pi = &packetInfo{Height: height}
msg.info = pi
} else {
pi = msg.info.(*packetInfo)
}
pi.parseAttrs(log, event.Attributes)
if event.Type != chantypes.EventTypeWriteAck {
msg.eventType = event.Type
msg.info = ci
}
return msg
}
Expand Down Expand Up @@ -324,6 +357,10 @@ func (res *channelInfo) parseAttrs(log *zap.Logger, attrs []sdk.Attribute) {
}
}

// parseChannelAttribute parses channel attributes from an event.
// If the attribute has already been parsed into the channelInfo,
// it will not overwrite, and return true to inform the caller that
// the attribute already exists.
func (res *channelInfo) parseChannelAttribute(attr sdk.Attribute) {
switch attr.Key {
case chantypes.AttributeKeyPortID:
Expand All @@ -336,6 +373,8 @@ func (res *channelInfo) parseChannelAttribute(attr sdk.Attribute) {
res.CounterpartyChannelID = attr.Value
case chantypes.AttributeKeyConnectionID:
res.ConnID = attr.Value
case chantypes.AttributeVersion:
res.Version = attr.Value
}
}

Expand Down
Loading