Skip to content

Commit

Permalink
feat: handle for packet timeout (#86)
Browse files Browse the repository at this point in the history
* fix: helper method issue on no json data

* fix: archway query refractor

* chore: add helper

* feat: add timeoutpacket and requestTimeoutpacket method

* feat: add packet timeout events on icon

* feat: adding icon block for each height not only btp blocks

* feat: proof on queryPacketCommitment

* feat: add icon requesttimeout and packet timeout method

* feat: add Msgtimeoutpacket and request type

* fix: handler shouldsendpacket check condition

* feat:handle processor for timeout packet

* fix: packetReceipt in icon

* fix: handle assemble message for request timeout

* fix: validate packet method

* fix: archway validate method

* fix: handle for request timeout proof

* fix: seperate updateclient and other messages in archway (#87)

* fix: seperate updateclient and other messages in archway

* fix: increase retrySend after 2 block

* fix: change request timeout type and proof function

* fix: packet timeout

* chore: name from common constant
  • Loading branch information
viveksharmapoudel authored and izyak committed Sep 7, 2023
1 parent 863acbb commit d0c5d34
Show file tree
Hide file tree
Showing 29 changed files with 291 additions and 118 deletions.
5 changes: 3 additions & 2 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types"
"github.com/cosmos/relayer/v2/relayer"
"github.com/cosmos/relayer/v2/relayer/common"
"github.com/cosmos/relayer/v2/relayer/processor"
"github.com/cosmos/relayer/v2/relayer/provider"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -203,10 +204,10 @@ func createClientCmd(a *appState) *cobra.Command {
}

if iconStartHeight != 0 {
if src.ChainProvider.Type() == "icon" {
if src.ChainProvider.Type() == common.IconModule {
srch = iconStartHeight
}
if dst.ChainProvider.Type() == "icon" {
if dst.ChainProvider.Type() == common.IconModule {
dsth = iconStartHeight
}
}
Expand Down
1 change: 0 additions & 1 deletion relayer/chains/archway/event_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func ibcMessagesFromEvents(
var evt sdk.StringEvent
if base64Encoded {
evt = parseBase64Event(log, event)
// fmt.Printf("event %v \n", evt)
} else {
evt = sdk.StringifyEvent(event)

Expand Down
8 changes: 6 additions & 2 deletions relayer/chains/archway/helper_debug_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func jsonDumpDataFile(filename string, bufs interface{}) {
os.Exit(1)
}

fmt.Printf("Successfully created or appended JSON in %s", filename)
fmt.Printf("Successfully created or appended JSON in %s \n", filename)
}

func readExistingData(filename string, opPointer interface{}) error {
Expand All @@ -42,6 +42,10 @@ func readExistingData(filename string, opPointer interface{}) error {
return fmt.Errorf("Error reading JSON from file: %v", err)
}

if jsonData == nil {
return nil
}

// Unmarshal JSON data into a slice of structs
err = json.Unmarshal(jsonData, opPointer)
if err != nil {
Expand All @@ -66,7 +70,7 @@ func SaveMsgToFile(filename string, msgs []provider.RelayerMessage) {
var d []DataFormat
err := readExistingData(filename, &d)
if err != nil {
fmt.Println("error savetoFile ")
fmt.Println("error savingtoFile ", err)
return
}

Expand Down
3 changes: 1 addition & 2 deletions relayer/chains/archway/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/cosmos/gogoproto/proto"
"github.com/cosmos/relayer/v2/relayer/chains/archway/types"
"github.com/cosmos/relayer/v2/relayer/provider"
"go.uber.org/zap"
)

type WasmContractMessage struct {
Expand Down Expand Up @@ -37,7 +36,7 @@ func (ap *ArchwayProvider) NewWasmContractMessage(method string, m codec.ProtoMa
if err != nil {
return nil, err
}
ap.log.Debug("Archway Constructed message ", zap.String("MethodName", method), zap.Any("Message", types.NewHexBytes(protoMsg)))
// ap.log.Debug("Archway Constructed message ", zap.String("MethodName", method), zap.Any("Message", types.NewHexBytes(protoMsg)))

msgParam, err := types.GenerateTxnParams(method, types.NewHexBytes(protoMsg))

Expand Down
4 changes: 4 additions & 0 deletions relayer/chains/archway/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ func (ac *ArchwayProvider) Codec() Codec {
return ac.Cdc
}

func (ap *ArchwayProvider) ClientContext() client.Context {
return ap.ClientCtx
}

func (ap *ArchwayProvider) updateNextAccountSequence(seq uint64) {
if seq > ap.nextAccountSeq {
ap.nextAccountSeq = seq
Expand Down
49 changes: 27 additions & 22 deletions relayer/chains/archway/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (ap *ArchwayProvider) QueryClientStateResponse(ctx context.Context, height
return nil, err
}

storageKey := fmt.Sprintf("%s%x", getKey(STORAGEKEY__Commitments), common.GetClientStateCommitmentKey(srcClientId))
proof, err := ap.QueryArchwayProof(ctx, common.MustHexStrToBytes(storageKey), height)
storageKey := getStorageKeyFromPath(common.GetClientStateCommitmentKey(srcClientId))
proof, err := ap.QueryArchwayProof(ctx, storageKey, height)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -465,8 +465,8 @@ func (ap *ArchwayProvider) QueryConnection(ctx context.Context, height int64, co
return nil, err
}

storageKey := fmt.Sprintf("%s%x", getKey(STORAGEKEY__Commitments), common.GetConnectionCommitmentKey(connectionid))
connProof, err := ap.QueryArchwayProof(ctx, common.MustHexStrToBytes(storageKey), height)
storageKey := getStorageKeyFromPath(common.GetConnectionCommitmentKey(connectionid))
connProof, err := ap.QueryArchwayProof(ctx, storageKey, height)

return conntypes.NewQueryConnectionResponse(conn, connProof, clienttypes.NewHeight(0, uint64(height))), nil
}
Expand Down Expand Up @@ -569,13 +569,15 @@ func (ap *ArchwayProvider) GenerateConnHandshakeProof(ctx context.Context, heigh
return nil, nil, nil, nil, clienttypes.Height{}, err
}

connStorageKey := fmt.Sprintf("%s%x", getKey(STORAGEKEY__Commitments), common.GetConnectionCommitmentKey(connId))
proofConnBytes, err := ap.QueryArchwayProof(ctx,
common.MustHexStrToBytes(connStorageKey),
height)
connStorageKey := getStorageKeyFromPath(common.GetConnectionCommitmentKey(connId))
proofConnBytes, err := ap.QueryArchwayProof(ctx, connStorageKey, height)
if err != nil {
return nil, nil, nil, nil, nil, err

}

consStorageKey := fmt.Sprintf("%s%x", getKey(STORAGEKEY__Commitments), common.GetConsensusStateCommitmentKey(clientId, big.NewInt(0), big.NewInt(height)))
proofConsensusBytes, err := ap.QueryArchwayProof(ctx, common.MustHexStrToBytes(consStorageKey), height)
consStorageKey := getStorageKeyFromPath(common.GetConsensusStateCommitmentKey(clientId, big.NewInt(0), big.NewInt(height)))
proofConsensusBytes, err := ap.QueryArchwayProof(ctx, consStorageKey, height)
if err != nil {
return nil, nil, nil, nil, nil, err
}
Expand All @@ -594,7 +596,6 @@ func (ap *ArchwayProvider) QueryChannel(ctx context.Context, height int64, chann
return nil, err
}

fmt.Printf("the channelState is %x \n ", channelState)
if channelState == nil {
return nil, err
}
Expand All @@ -604,8 +605,8 @@ func (ap *ArchwayProvider) QueryChannel(ctx context.Context, height int64, chann
return nil, err
}

storageKey := fmt.Sprintf("%s%x", getKey(STORAGEKEY__Commitments), common.GetChannelCommitmentKey(portid, channelid))
proof, err := ap.QueryArchwayProof(ctx, common.MustHexStrToBytes(storageKey), height)
storageKey := getStorageKeyFromPath(common.GetChannelCommitmentKey(portid, channelid))
proof, err := ap.QueryArchwayProof(ctx, storageKey, height)

return chantypes.NewQueryChannelResponse(channelS, proof, clienttypes.NewHeight(0, uint64(height))), nil
}
Expand Down Expand Up @@ -716,8 +717,8 @@ func (ap *ArchwayProvider) QueryPacketCommitment(ctx context.Context, height int
if err != nil {
return nil, err
}
storageKey := fmt.Sprintf("%s%x", getKey(STORAGEKEY__Commitments), common.GetPacketCommitmentKey(portid, channelid, big.NewInt(int64(seq))))
proof, err := ap.QueryArchwayProof(ctx, common.MustHexStrToBytes(storageKey), height)
storageKey := getStorageKeyFromPath(common.GetPacketCommitmentKey(portid, channelid, big.NewInt(int64(seq))))
proof, err := ap.QueryArchwayProof(ctx, storageKey, height)

if err != nil {
return nil, err
Expand All @@ -739,8 +740,8 @@ func (ap *ArchwayProvider) QueryPacketAcknowledgement(ctx context.Context, heigh
if err != nil {
return nil, err
}
storageKey := fmt.Sprintf("%s%x", getKey(STORAGEKEY__Commitments), common.GetPacketAcknowledgementCommitmentKey(portid, channelid, big.NewInt(int64(seq))))
proof, err := ap.QueryArchwayProof(ctx, common.MustHexStrToBytes(storageKey), height)
storageKey := getStorageKeyFromPath(common.GetPacketAcknowledgementCommitmentKey(portid, channelid, big.NewInt(int64(seq))))
proof, err := ap.QueryArchwayProof(ctx, storageKey, height)

return &chantypes.QueryPacketAcknowledgementResponse{
Acknowledgement: pktAcknowledgement.Data.Bytes(),
Expand All @@ -750,20 +751,24 @@ func (ap *ArchwayProvider) QueryPacketAcknowledgement(ctx context.Context, heigh
}

func (ap *ArchwayProvider) QueryPacketReceipt(ctx context.Context, height int64, channelid, portid string, seq uint64) (recRes *chantypes.QueryPacketReceiptResponse, err error) {
pktReceiptParams, err := types.NewPacketReceipt(portid, channelid, seq).Bytes()

// getting proof from commitment map in contract
storageKey := getStorageKeyFromPath(common.GetPacketReceiptCommitmentKey(portid, channelid, big.NewInt(int64(seq))))
proof, err := ap.QueryArchwayProof(ctx, storageKey, height)
if err != nil {
return nil, err
}
pktReceipt, err := ap.QueryIBCHandlerContract(ctx, pktReceiptParams)

pktReceiptParams, err := types.NewPacketReceipt(portid, channelid, seq).Bytes()
if err != nil {
return nil, err
}

storageKey := fmt.Sprintf("%s%x", getKey(STORAGEKEY__Commitments), common.GetPacketReceiptCommitmentKey(portid, channelid, big.NewInt(int64(seq))))
proof, err := ap.QueryArchwayProof(ctx, common.MustHexStrToBytes(storageKey), height)
if err != nil {
pktReceipt, err := ap.QueryIBCHandlerContract(ctx, pktReceiptParams)
if err != nil && !strings.Contains(err.Error(), "PacketCommitmentNotFound") {
return nil, err
}

return &chantypes.QueryPacketReceiptResponse{
Received: pktReceipt != nil, // TODO: Bytes to boolean
Proof: proof,
Expand Down
80 changes: 44 additions & 36 deletions relayer/chains/archway/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (ap *ArchwayProvider) MsgSubmitMisbehaviour(clientID string, misbehaviour i
}

func (ap *ArchwayProvider) ValidatePacket(msgTransfer provider.PacketInfo, latest provider.LatestBlock) error {

if msgTransfer.Sequence == 0 {
return errors.New("refusing to relay packet with sequence: 0")
}
Expand All @@ -221,19 +222,21 @@ func (ap *ArchwayProvider) ValidatePacket(msgTransfer provider.PacketInfo, lates
}

// This should not be possible, as it violates IBC spec
if msgTransfer.TimeoutHeight.IsZero() && msgTransfer.TimeoutTimestamp == 0 {
if msgTransfer.TimeoutHeight.IsZero() {
return errors.New("refusing to relay packet without a timeout (height or timestamp must be set)")
}

revision := clienttypes.ParseChainID(ap.PCfg.ChainID)
latestClientTypesHeight := clienttypes.NewHeight(revision, latest.Height)
revisionNumber := 0
latestClientTypesHeight := clienttypes.NewHeight(uint64(revisionNumber), latest.Height)
if !msgTransfer.TimeoutHeight.IsZero() && latestClientTypesHeight.GTE(msgTransfer.TimeoutHeight) {
fmt.Println("packet timeout failed finally ", msgTransfer.TimeoutHeight)

return provider.NewTimeoutHeightError(latest.Height, msgTransfer.TimeoutHeight.RevisionHeight)
}
latestTimestamp := uint64(latest.Time.UnixNano())
if msgTransfer.TimeoutTimestamp > 0 && latestTimestamp > msgTransfer.TimeoutTimestamp {
return provider.NewTimeoutTimestampError(latestTimestamp, msgTransfer.TimeoutTimestamp)
}
// latestTimestamp := uint64(latest.Time.UnixNano())
// if msgTransfer.TimeoutTimestamp > 0 && latestTimestamp > msgTransfer.TimeoutTimestamp {
// return provider.NewTimeoutTimestampError(latestTimestamp, msgTransfer.TimeoutTimestamp)
// }

return nil
}
Expand Down Expand Up @@ -264,7 +267,8 @@ func (ap *ArchwayProvider) PacketAcknowledgement(ctx context.Context, msgRecvPac
}

func (ap *ArchwayProvider) PacketReceipt(ctx context.Context, msgTransfer provider.PacketInfo, height uint64) (provider.PacketProof, error) {
packetReceiptResponse, err := ap.QueryPacketCommitment(ctx, int64(height), msgTransfer.SourceChannel, msgTransfer.SourcePort, msgTransfer.Sequence)

packetReceiptResponse, err := ap.QueryPacketReceipt(ctx, int64(height), msgTransfer.SourceChannel, msgTransfer.SourcePort, msgTransfer.Sequence)

if err != nil {
return provider.PacketProof{}, nil
Expand Down Expand Up @@ -297,7 +301,8 @@ func (ap *ArchwayProvider) MsgRecvPacket(msgTransfer provider.PacketInfo, proof
}

params := &chantypes.MsgRecvPacket{
Packet: msgTransfer.Packet(),
Packet: msgTransfer.Packet(),
// Packet: chantypes.Packet{}, //TODO: just to check packet timeout
ProofCommitment: proof.Proof,
ProofHeight: proof.ProofHeight,
Signer: signer,
Expand Down Expand Up @@ -339,6 +344,10 @@ func (ap *ArchwayProvider) MsgTimeout(msgTransfer provider.PacketInfo, proof pro
return ap.NewWasmContractMessage(MethodTimeoutPacket, params)
}

func (ap *ArchwayProvider) MsgTimeoutRequest(msgTransfer provider.PacketInfo, proof provider.PacketProof) (provider.RelayerMessage, error) {
return nil, fmt.Errorf("MsgTimeoutRequest Not implemented for Archway module")
}

func (ap *ArchwayProvider) MsgTimeoutOnClose(msgTransfer provider.PacketInfo, proofUnreceived provider.PacketProof) (provider.RelayerMessage, error) {
return nil, nil
}
Expand Down Expand Up @@ -710,10 +719,6 @@ func (ap *ArchwayProvider) SendMessages(ctx context.Context, msgs []provider.Rel
return rlyResp, true, callbackErr
}

func (ap *ArchwayProvider) ClientContext() client.Context {
return ap.ClientCtx
}

func (ap *ArchwayProvider) SendMessagesToMempool(
ctx context.Context,
msgs []provider.RelayerMessage,
Expand All @@ -731,37 +736,34 @@ func (ap *ArchwayProvider) SendMessagesToMempool(
return err
}

var sdkMsgs []sdk.Msg
for _, msg := range msgs {

if msg == nil {
ap.log.Debug("One of the message is nil")
ap.log.Debug("One of the message of archway")
continue
}

archwayMsg, ok := msg.(*WasmContractMessage)
if !ok {
return fmt.Errorf("Invalid ArchwayMsg")
return fmt.Errorf("Archway Message is not valid %s", archwayMsg.Type())
}

sdkMsgs = append(sdkMsgs, archwayMsg.Msg)
}

if err != nil {

ap.log.Debug("error when dumping message")

}
txBytes, sequence, err := ap.buildMessages(cliCtx, factory, archwayMsg.Msg)
if err != nil {
return err
}
ap.updateNextAccountSequence(sequence + 1)

txBytes, sequence, err := ap.buildMessages(cliCtx, factory, sdkMsgs...)
if err != nil {
return err
if msg.Type() == MethodUpdateClient {
err := ap.BroadcastTx(cliCtx, txBytes, []provider.RelayerMessage{msg}, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback, true)
if err != nil {
return fmt.Errorf("Archway: failed during updateClient ")
}
continue
}
ap.BroadcastTx(cliCtx, txBytes, []provider.RelayerMessage{msg}, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback, false)
}

// updating the next sequence number
ap.updateNextAccountSequence(sequence + 1)

return ap.BroadcastTx(cliCtx, txBytes, msgs, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback)
return nil

}

Expand Down Expand Up @@ -838,8 +840,6 @@ func (ap *ArchwayProvider) LogSuccessTx(res *sdk.TxResponse, msgs []provider.Rel
fields...,
)

// uncomment for saving msg
SaveMsgToFile(ArchwayDebugMessagePath, msgs)
}

// getFeePayer returns the bech32 address of the fee payer of a transaction.
Expand Down Expand Up @@ -967,6 +967,7 @@ func (ap *ArchwayProvider) BroadcastTx(
asyncCtx context.Context, // context for async wait for block inclusion after successful tx broadcast
asyncTimeout time.Duration, // timeout for waiting for block inclusion
asyncCallback func(*provider.RelayerTxResponse, error), // callback for success/fail of the wait for block inclusion
shouldWait bool,
) error {
res, err := clientCtx.BroadcastTx(txBytes)
// log submitted txn
Expand Down Expand Up @@ -1005,6 +1006,10 @@ func (ap *ArchwayProvider) BroadcastTx(
zap.String("txHash", res.TxHash),
)

if shouldWait {
ap.waitForTx(asyncCtx, hexTx, msgs, asyncTimeout, asyncCallback)
return nil
}
go ap.waitForTx(asyncCtx, hexTx, msgs, asyncTimeout, asyncCallback)
return nil
}
Expand Down Expand Up @@ -1032,7 +1037,7 @@ func (ap *ArchwayProvider) waitForTx(
waitTimeout time.Duration,
callback func(*provider.RelayerTxResponse, error),
) {
res, err := ap.waitForBlockInclusion(ctx, txHash, waitTimeout)
res, err := ap.waitForTxResult(ctx, txHash, waitTimeout)
if err != nil {
ap.log.Error("Failed to wait for block inclusion", zap.Error(err))
if callback != nil {
Expand All @@ -1041,6 +1046,9 @@ func (ap *ArchwayProvider) waitForTx(
return
}

//uncomment for saving msg
SaveMsgToFile(ArchwayDebugMessagePath, msgs)

rlyResp := &provider.RelayerTxResponse{
Height: res.Height,
TxHash: res.TxHash,
Expand Down Expand Up @@ -1073,7 +1081,7 @@ func (ap *ArchwayProvider) waitForTx(
ap.LogSuccessTx(res, msgs)
}

func (ap *ArchwayProvider) waitForBlockInclusion(
func (ap *ArchwayProvider) waitForTxResult(
ctx context.Context,
txHash []byte,
waitTimeout time.Duration,
Expand Down
Loading

0 comments on commit d0c5d34

Please sign in to comment.