Skip to content

Commit

Permalink
Send error if remote echo takes unexpectedly long
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed May 24, 2024
1 parent 074e239 commit dfcc23e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
9 changes: 5 additions & 4 deletions messagetracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ var (
errMediaDecryptFailed = errors.New("failed to decrypt media")
errMediaConvertFailed = errors.New("failed to convert media")
errMediaReuploadFailed = errors.New("failed to upload media to google")
errEchoTimeout = errors.New("remote echo timeout")

errIncorrectUser = errors.New("incorrect user")
errNotLoggedIn = errors.New("not logged in")

errMessageTakingLong = errors.New("bridging the message is taking longer than usual")
)

type OutgoingStatusError gmproto.MessageStatusType
Expand Down Expand Up @@ -98,6 +97,8 @@ func errorToStatusReason(err error) (reason event.MessageStatusReason, status ev
return event.MessageStatusUnsupported, event.MessageStatusFail, true, true, err.Error()
case errors.Is(err, context.DeadlineExceeded):
return event.MessageStatusTooOld, event.MessageStatusRetriable, false, true, "handling the message took too long and was cancelled"
case errors.Is(err, errEchoTimeout):
return event.MessageStatusTooOld, event.MessageStatusRetriable, false, true, "phone has not confirmed message delivery"
case errors.Is(err, errTargetNotFound):
return event.MessageStatusGenericError, event.MessageStatusFail, true, false, ""
case errors.As(err, &ose):
Expand All @@ -116,8 +117,8 @@ func (portal *Portal) sendErrorMessage(ctx context.Context, evt *event.Event, er
certainty = "was not"
}
msg := fmt.Sprintf("\u26a0 Your %s %s bridged: %v", msgType, certainty, err)
if errors.Is(err, errMessageTakingLong) {
msg = fmt.Sprintf("\u26a0 Bridging your %s is taking longer than usual", msgType)
if errors.Is(err, errEchoTimeout) {
msg = fmt.Sprintf("\u26a0 Your phone has not echoed the message, it may have been lost")
}
content := &event.MessageEventContent{
MsgType: event.MsgNotice,
Expand Down
30 changes: 25 additions & 5 deletions portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/bridge"
"maunium.net/go/mautrix/bridge/status"
"maunium.net/go/mautrix/crypto/attachment"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
Expand Down Expand Up @@ -252,8 +253,9 @@ type PortalMatrixMessage struct {

type outgoingMessage struct {
*event.Event
Saved bool
Checkpointed bool
Acked bool
Errored bool
Timeouted bool
}

type Portal struct {
Expand Down Expand Up @@ -376,12 +378,14 @@ func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) {
}

func (portal *Portal) handleMessageLoop() {
outgoingTicker := time.NewTicker(1 * time.Minute)
for {
portal.handleOneMessageLoopItem()
outgoingTicker.Stop()
portal.handleOneMessageLoopItem(outgoingTicker.C)
}
}

func (portal *Portal) handleOneMessageLoopItem() {
func (portal *Portal) handleOneMessageLoopItem(timeout <-chan time.Time) {
defer func() {
if err := recover(); err != nil {
logEvt := portal.zlog.WithLevel(zerolog.FatalLevel).
Expand All @@ -400,7 +404,18 @@ func (portal *Portal) handleOneMessageLoopItem() {
portal.handleMessageLoopItem(msg)
case msg := <-portal.matrixMessages:
portal.handleMatrixMessageLoopItem(msg)
case <-timeout:
}
portal.outgoingMessagesLock.Lock()
for _, out := range portal.outgoingMessages {
if !out.Timeouted && out.Acked && !out.Errored && time.Since(time.UnixMilli(out.Timestamp)) > 1*time.Minute {
go portal.sendStatusEvent(context.TODO(), out.ID, "", errEchoTimeout, nil)
go portal.sendErrorMessage(context.TODO(), out.Event, errEchoTimeout, "message", false, "")
go portal.bridge.SendMessageCheckpoint(out.Event, status.MsgStepRemote, errEchoTimeout, status.MsgStatusTimeout, 0)
out.Timeouted = true
}
}
portal.outgoingMessagesLock.Unlock()
}

func (portal *Portal) isOutgoingMessage(msg *gmproto.Message) *database.Message {
Expand Down Expand Up @@ -2045,8 +2060,9 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event, timing
}

txnID := util.GenerateTmpID()
outgoingMsg := &outgoingMessage{Event: evt}
portal.outgoingMessagesLock.Lock()
portal.outgoingMessages[txnID] = &outgoingMessage{Event: evt}
portal.outgoingMessages[txnID] = outgoingMsg
portal.outgoingMessagesLock.Unlock()
if evt.Type == event.EventSticker {
content.MsgType = event.MsgImage
Expand All @@ -2067,10 +2083,14 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event, timing
resp, err := sender.Client.SendMessage(req)
timings.send = time.Since(start)
if err != nil {
outgoingMsg.Errored = true
go ms.sendMessageMetrics(ctx, sender, evt, err, "Error sending", true)
} else if resp.Status != gmproto.SendMessageResponse_SUCCESS {
outgoingMsg.Errored = true
outgoingMsg.Acked = true
go ms.sendMessageMetrics(ctx, sender, evt, fmt.Errorf("response status %d", resp.Status), "Error sending", true)
} else {
outgoingMsg.Acked = true
go ms.sendMessageMetrics(ctx, sender, evt, nil, "", true)
}
}
Expand Down

0 comments on commit dfcc23e

Please sign in to comment.