Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pre_init messages #1131

Merged
merged 4 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion interchaintest/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (r *Relayer) GetClients(ctx context.Context, _ ibc.RelayerExecReporter, cha
if strings.TrimSpace(client) == "" {
continue
}
var clientOutput *ibc.ClientOutput
clientOutput := &ibc.ClientOutput{}
if err := json.Unmarshal([]byte(client), clientOutput); err != nil {
return nil, fmt.Errorf("failed to parse client %q: %w", client, err)
}
Expand Down
24 changes: 20 additions & 4 deletions relayer/chains/mock/message_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

type msgHandlerParams struct {
mcp *MockChainProcessor
height int64
packetInfo *chantypes.Packet
ibcMessagesCache processor.IBCMessagesCache
}
Expand All @@ -31,9 +32,14 @@ func handleMsgTransfer(p msgHandlerParams) {
CounterpartyPortID: p.packetInfo.DestinationPort,
}
p.ibcMessagesCache.PacketFlow.Retain(channelKey, chantypes.EventTypeSendPacket, provider.PacketInfo{
Height: uint64(p.height),
Sequence: p.packetInfo.Sequence,
Data: p.packetInfo.Data,
TimeoutHeight: p.packetInfo.TimeoutHeight,
SourcePort: p.packetInfo.SourcePort,
SourceChannel: p.packetInfo.SourceChannel,
DestPort: p.packetInfo.DestinationPort,
DestChannel: p.packetInfo.DestinationChannel,
})
p.mcp.log.Debug("observed MsgTransfer",
zap.String("chain_id", p.mcp.chainID),
Expand All @@ -53,8 +59,13 @@ func handleMsgRecvPacket(p msgHandlerParams) {
CounterpartyPortID: p.packetInfo.SourcePort,
}
p.ibcMessagesCache.PacketFlow.Retain(channelKey, chantypes.EventTypeRecvPacket, provider.PacketInfo{
Sequence: p.packetInfo.Sequence,
Data: p.packetInfo.Data,
Height: uint64(p.height),
Sequence: p.packetInfo.Sequence,
Data: p.packetInfo.Data,
SourcePort: p.packetInfo.SourcePort,
SourceChannel: p.packetInfo.SourceChannel,
DestPort: p.packetInfo.DestinationPort,
DestChannel: p.packetInfo.DestinationChannel,
})
p.mcp.log.Debug("observed MsgRecvPacket",
zap.String("chain_id", p.mcp.chainID),
Expand All @@ -74,8 +85,13 @@ func handleMsgAcknowledgement(p msgHandlerParams) {
CounterpartyPortID: p.packetInfo.DestinationPort,
}
p.ibcMessagesCache.PacketFlow.Retain(channelKey, chantypes.EventTypeAcknowledgePacket, provider.PacketInfo{
Sequence: p.packetInfo.Sequence,
Data: p.packetInfo.Data,
Height: uint64(p.height),
Sequence: p.packetInfo.Sequence,
Data: p.packetInfo.Data,
SourcePort: p.packetInfo.SourcePort,
SourceChannel: p.packetInfo.SourceChannel,
DestPort: p.packetInfo.DestinationPort,
DestChannel: p.packetInfo.DestinationChannel,
})
p.mcp.log.Debug("observed MsgAcknowledgement",
zap.String("chain_id", p.mcp.chainID),
Expand Down
5 changes: 5 additions & 0 deletions relayer/chains/mock/mock_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (mcp *MockChainProcessor) queryCycle(ctx context.Context, persistence *quer
for _, m := range messages {
if handler, ok := messageHandlers[m.EventType]; ok {
handler(msgHandlerParams{
height: i,
mcp: mcp,
packetInfo: m.PacketInfo,
ibcMessagesCache: ibcMessagesCache,
Expand All @@ -175,6 +176,10 @@ func (mcp *MockChainProcessor) queryCycle(ctx context.Context, persistence *quer
for _, pp := range mcp.pathProcessors {
mcp.log.Info("sending messages to path processor", zap.String("chain_id", mcp.chainID))
pp.HandleNewData(mcp.chainID, processor.ChainProcessorCacheData{
LatestBlock: provider.LatestBlock{
Height: uint64(i),
Time: time.Now(),
},
IBCMessagesCache: ibcMessagesCache,
InSync: mcp.inSync,
ChannelStateCache: channelStateCache,
Expand Down
5 changes: 4 additions & 1 deletion relayer/processor/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ func (mp *messageProcessor) assembleMessage(
mp.trackMessage(msg.tracker(assembled), i)
wg.Done()
if err != nil {
dst.log.Error(fmt.Sprintf("Error assembling %s message", msg.msgType()), zap.Object("msg", msg))
dst.log.Error(fmt.Sprintf("Error assembling %s message", msg.msgType()),
zap.Object("msg", msg),
zap.Error(err),
)
return
}
dst.log.Debug(fmt.Sprintf("Assembled %s message", msg.msgType()), zap.Object("msg", msg))
Expand Down
29 changes: 24 additions & 5 deletions relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ func (pathEnd *pathEndRuntime) removePacketRetention(
toDelete := make(map[string][]uint64)
toDeleteCounterparty := make(map[string][]uint64)
switch eventType {
case chantypes.EventTypeSendPacket:
toDelete[eventType] = []uint64{sequence}
toDelete[preInitKey] = []uint64{sequence}
case chantypes.EventTypeRecvPacket:
toDelete[eventType] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeSendPacket] = []uint64{sequence}
Expand All @@ -480,7 +483,7 @@ func (pathEnd *pathEndRuntime) removePacketRetention(
// It will also determine if the message needs to be given up on entirely and remove retention if so.
func (pathEnd *pathEndRuntime) shouldSendConnectionMessage(message connectionIBCMessage, counterparty *pathEndRuntime) bool {
eventType := message.eventType
k := connectionInfoConnectionKey(message.info).Counterparty()
k := ConnectionInfoConnectionKey(message.info).Counterparty()
if message.info.Height >= counterparty.latestBlock.Height {
pathEnd.log.Debug("Waiting to relay connection message until counterparty height has incremented",
zap.Inline(k),
Expand Down Expand Up @@ -520,15 +523,20 @@ func (pathEnd *pathEndRuntime) shouldSendConnectionMessage(message connectionIBC
toDeleteCounterparty := make(map[string][]ConnectionKey)
counterpartyKey := k.Counterparty()
switch eventType {
case conntypes.EventTypeConnectionOpenInit:
toDelete[preInitKey] = []ConnectionKey{k.PreInitKey()}
case conntypes.EventTypeConnectionOpenTry:
toDeleteCounterparty[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{counterpartyKey.MsgInitKey()}
toDeleteCounterparty[preInitKey] = []ConnectionKey{counterpartyKey.PreInitKey()}
case conntypes.EventTypeConnectionOpenAck:
toDeleteCounterparty[conntypes.EventTypeConnectionOpenTry] = []ConnectionKey{counterpartyKey}
toDelete[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{k.MsgInitKey()}
toDelete[preInitKey] = []ConnectionKey{k.PreInitKey()}
case conntypes.EventTypeConnectionOpenConfirm:
toDeleteCounterparty[conntypes.EventTypeConnectionOpenAck] = []ConnectionKey{counterpartyKey}
toDelete[conntypes.EventTypeConnectionOpenTry] = []ConnectionKey{k}
toDeleteCounterparty[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{counterpartyKey.MsgInitKey()}
toDeleteCounterparty[preInitKey] = []ConnectionKey{counterpartyKey.PreInitKey()}
}
// delete in progress send for this specific message
pathEnd.connProcessing.deleteMessages(map[string][]ConnectionKey{eventType: {k}})
Expand All @@ -543,11 +551,11 @@ func (pathEnd *pathEndRuntime) shouldSendConnectionMessage(message connectionIBC
return true
}

// shouldSendConnectionMessage determines if the channel handshake message should be sent now.
// shouldSendChannelMessage determines if the channel handshake message should be sent now.
// It will also determine if the message needs to be given up on entirely and remove retention if so.
func (pathEnd *pathEndRuntime) shouldSendChannelMessage(message channelIBCMessage, counterparty *pathEndRuntime) bool {
eventType := message.eventType
channelKey := channelInfoChannelKey(message.info).Counterparty()
channelKey := ChannelInfoChannelKey(message.info).Counterparty()
if message.info.Height >= counterparty.latestBlock.Height {
pathEnd.log.Debug("Waiting to relay channel message until counterparty height has incremented",
zap.Inline(channelKey),
Expand Down Expand Up @@ -591,15 +599,20 @@ func (pathEnd *pathEndRuntime) shouldSendChannelMessage(message channelIBCMessag

counterpartyKey := channelKey.Counterparty()
switch eventType {
case chantypes.EventTypeChannelOpenInit:
toDelete[preInitKey] = []ChannelKey{channelKey.MsgInitKey()}
case chantypes.EventTypeChannelOpenTry:
toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.MsgInitKey()}
toDeleteCounterparty[preInitKey] = []ChannelKey{counterpartyKey.MsgInitKey()}
case chantypes.EventTypeChannelOpenAck:
toDeleteCounterparty[chantypes.EventTypeChannelOpenTry] = []ChannelKey{counterpartyKey}
toDelete[chantypes.EventTypeChannelOpenInit] = []ChannelKey{channelKey.MsgInitKey()}
toDelete[preInitKey] = []ChannelKey{channelKey.MsgInitKey()}
case chantypes.EventTypeChannelOpenConfirm:
toDeleteCounterparty[chantypes.EventTypeChannelOpenAck] = []ChannelKey{counterpartyKey}
toDelete[chantypes.EventTypeChannelOpenTry] = []ChannelKey{channelKey}
toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.MsgInitKey()}
toDeleteCounterparty[preInitKey] = []ChannelKey{counterpartyKey.MsgInitKey()}
case chantypes.EventTypeChannelCloseConfirm:
toDeleteCounterparty[chantypes.EventTypeChannelCloseInit] = []ChannelKey{counterpartyKey}
toDelete[chantypes.EventTypeChannelCloseConfirm] = []ChannelKey{channelKey}
Expand Down Expand Up @@ -722,7 +735,10 @@ func (pathEnd *pathEndRuntime) trackProcessingMessage(tracker messageToTrack) ui
}
case channelMessageToTrack:
eventType := t.msg.eventType
channelKey := channelInfoChannelKey(t.msg.info).Counterparty()
channelKey := ChannelInfoChannelKey(t.msg.info)
if eventType != chantypes.EventTypeChannelOpenInit {
channelKey = channelKey.Counterparty()
}
msgProcessCache, ok := pathEnd.channelProcessing[eventType]
if !ok {
msgProcessCache = make(channelKeySendCache)
Expand All @@ -740,7 +756,10 @@ func (pathEnd *pathEndRuntime) trackProcessingMessage(tracker messageToTrack) ui
}
case connectionMessageToTrack:
eventType := t.msg.eventType
connectionKey := connectionInfoConnectionKey(t.msg.info).Counterparty()
connectionKey := ConnectionInfoConnectionKey(t.msg.info)
if eventType != conntypes.EventTypeConnectionOpenInit {
connectionKey = connectionKey.Counterparty()
}
msgProcessCache, ok := pathEnd.connProcessing[eventType]
if !ok {
msgProcessCache = make(connectionKeySendCache)
Expand Down
Loading