From dd0f10ab7095f8f6cb10c75fe1bf6fbeebedabf2 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sun, 19 Jan 2025 13:26:23 +0200 Subject: [PATCH] connector: implement chat list sync and backfill Fixes #1 --- pkg/connector/backfill.go | 192 ++++++++++++++++++++++++++++++++++ pkg/connector/chatinfo.go | 22 ++-- pkg/connector/chatsync.go | 157 +++++++++++++++++++++++++++ pkg/connector/client.go | 11 ++ pkg/connector/dbmeta.go | 6 +- pkg/connector/groupinfo.go | 13 ++- pkg/connector/login.go | 19 +++- pkg/msgconv/from-signal.go | 31 +++++- pkg/signalid/dbmeta.go | 4 + pkg/signalmeow/attachments.go | 4 + 10 files changed, 440 insertions(+), 19 deletions(-) create mode 100644 pkg/connector/backfill.go create mode 100644 pkg/connector/chatsync.go diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go new file mode 100644 index 00000000..c0416b16 --- /dev/null +++ b/pkg/connector/backfill.go @@ -0,0 +1,192 @@ +// mautrix-signal - A Matrix-Signal puppeting bridge. +// Copyright (C) 2025 Tulir Asokan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package connector + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/google/uuid" + "github.com/rs/zerolog" + "go.mau.fi/util/ptr" + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/networkid" + + "go.mau.fi/mautrix-signal/pkg/msgconv" + "go.mau.fi/mautrix-signal/pkg/signalid" + "go.mau.fi/mautrix-signal/pkg/signalmeow/protobuf/backuppb" + "go.mau.fi/mautrix-signal/pkg/signalmeow/store" +) + +var _ bridgev2.BackfillingNetworkAPI = (*SignalClient)(nil) + +func tryCastUUID(b []byte) uuid.UUID { + if len(b) == 16 { + return uuid.UUID(b) + } + return uuid.Nil +} + +func (s *SignalClient) FetchMessages(ctx context.Context, params bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) { + if !s.IsLoggedIn() { + return nil, bridgev2.ErrNotLoggedIn + } + userID, groupID, err := signalid.ParsePortalID(params.Portal.ID) + if err != nil { + return nil, fmt.Errorf("failed to parse portal ID: %w", err) + } + var chat *store.BackupChat + if groupID != "" { + chat, err = s.Client.Store.BackupStore.GetBackupChatByGroupID(ctx, groupID) + } else { + chat, err = s.Client.Store.BackupStore.GetBackupChatByUserID(ctx, userID) + } + if err != nil { + return nil, fmt.Errorf("failed to get chat: %w", err) + } else if chat == nil { + zerolog.Ctx(ctx).Debug().Msg("Chat not found, returning nil response for backfill") + return nil, nil + } + var anchorTS time.Time + if params.AnchorMessage != nil { + anchorTS = params.AnchorMessage.Timestamp + } + minTS := anchorTS + items, err := s.Client.Store.BackupStore.GetBackupChatItems(ctx, chat.Id, anchorTS, params.Forward, params.Count) + if err != nil { + return nil, fmt.Errorf("failed to get chat items: %w", err) + } + if len(items) > 0 { + minTS = time.UnixMilli(int64(items[0].DateSent)) + } + // GetBackupChatItems returns in reverse chronological order, so flip the list + slices.Reverse(items) + var firstDirectionfulProcessed bool + var isRead bool + convertedMessages := make([]*bridgev2.BackfillMessage, 0, len(items)) + attMap := make(msgconv.AttachmentMap) + recipientMap := make(map[uint64]*backuppb.Recipient) + getRecipientACI := func(id uint64) (uuid.UUID, error) { + recipient, ok := recipientMap[id] + if !ok { + recipient, err = s.Client.Store.BackupStore.GetBackupRecipient(ctx, id) + if err != nil { + return uuid.Nil, fmt.Errorf("failed to get recipient %d: %w", id, err) + } else if len(recipient.GetContact().GetAci()) != 16 && recipient.GetSelf() == nil { + zerolog.Ctx(ctx).Warn(). + Uint64("recipient_id", id). + Type("recipient_type", recipient.GetDestination()). + Msg("ACI not found for recipient") + } + recipientMap[id] = recipient + } + + switch dest := recipient.Destination.(type) { + case *backuppb.Recipient_Self: + return s.Client.Store.ACI, nil + case *backuppb.Recipient_Contact: + if len(dest.Contact.GetAci()) == 16 { + return uuid.UUID(dest.Contact.GetAci()), nil + } + } + return uuid.Nil, nil + } + for _, item := range items { + var streamOrder int64 + switch dt := item.DirectionalDetails.(type) { + case *backuppb.ChatItem_Incoming: + streamOrder = int64(dt.Incoming.GetDateServerSent()) + if !firstDirectionfulProcessed { + firstDirectionfulProcessed = true + isRead = dt.Incoming.Read + } + case *backuppb.ChatItem_Outgoing: + // TODO stream order? + if !firstDirectionfulProcessed { + firstDirectionfulProcessed = true + isRead = true + } + } + if len(attMap) > 0 { + clear(attMap) + } + senderACI, err := getRecipientACI(item.AuthorId) + if err != nil { + return nil, err + } else if senderACI == uuid.Nil { + continue + } + dm, reactions := msgconv.BackupToDataMessage(item, attMap) + if dm == nil { + continue + } + cm := s.Main.MsgConv.ToMatrix(ctx, s.Client, params.Portal, s.Main.Bridge.Bot, dm, attMap) + convertedReactions := make([]*bridgev2.BackfillReaction, 0, len(reactions)) + for _, reaction := range reactions { + reactionSenderACI, err := getRecipientACI(reaction.AuthorId) + if err != nil { + return nil, err + } else if reactionSenderACI == uuid.Nil { + continue + } + convertedReactions = append(convertedReactions, &bridgev2.BackfillReaction{ + TargetPart: ptr.Ptr(networkid.PartID("")), + Timestamp: time.UnixMilli(int64(reaction.SentTimestamp)), + Sender: s.makeEventSender(reactionSenderACI), + Emoji: reaction.GetEmoji(), + }) + } + msgID := signalid.MakeMessageID(senderACI, item.DateSent) + convertedMessages = append(convertedMessages, &bridgev2.BackfillMessage{ + ConvertedMessage: cm, + Sender: s.makeEventSender(senderACI), + ID: msgID, + TxnID: networkid.TransactionID(msgID), + Timestamp: time.UnixMilli(int64(item.DateSent)), + StreamOrder: streamOrder, + Reactions: convertedReactions, + }) + } + return &bridgev2.FetchMessagesResponse{ + Messages: convertedMessages, + HasMore: len(items) >= params.Count, + Forward: params.Forward, + MarkRead: isRead, + ApproxTotalCount: chat.TotalMessages, + CompleteCallback: func() { + // When reaching the last backwards backfill batch, delete the chat from the backup store. + // If backwards backfilling isn't enabled, delete immediately after the first backfill request. + if (!params.Forward && len(items) < params.Count) || !s.Main.Bridge.Config.Backfill.Queue.Enabled { + err := s.Client.Store.BackupStore.DeleteBackupChat(ctx, chat.Id) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to delete chat from backup store") + } else { + zerolog.Ctx(ctx).Debug().Msg("Deleted chat from backup store as backfill seems finished") + } + } else { + err := s.Client.Store.BackupStore.DeleteBackupChatItems(ctx, chat.Id, minTS) + if err != nil { + zerolog.Ctx(ctx).Err(err).Time("min_ts", minTS).Msg("Failed to delete messages from backup store") + } else { + zerolog.Ctx(ctx).Debug().Time("min_ts", minTS).Msg("Deleted messages from backup store") + } + } + }, + }, nil +} diff --git a/pkg/connector/chatinfo.go b/pkg/connector/chatinfo.go index e3eef596..c73b5d5c 100644 --- a/pkg/connector/chatinfo.go +++ b/pkg/connector/chatinfo.go @@ -34,6 +34,7 @@ import ( "go.mau.fi/mautrix-signal/pkg/libsignalgo" "go.mau.fi/mautrix-signal/pkg/signalid" + "go.mau.fi/mautrix-signal/pkg/signalmeow/store" "go.mau.fi/mautrix-signal/pkg/signalmeow/types" ) @@ -62,14 +63,14 @@ func (s *SignalClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) return nil, err } if groupID != "" { - return s.getGroupInfo(ctx, groupID, 0) + return s.getGroupInfo(ctx, groupID, 0, nil) } else { aci, pni := userID.ToACIAndPNI() contact, err := s.Client.Store.RecipientStore.LoadAndUpdateRecipient(ctx, aci, pni, nil) if err != nil { return nil, err } - return s.makeCreateDMResponse(contact).PortalInfo, nil + return s.makeCreateDMResponse(ctx, contact, nil).PortalInfo, nil } } @@ -182,13 +183,13 @@ func (s *SignalClient) ResolveIdentifier(ctx context.Context, number string, cre UserID: signalid.MakeUserID(aci), UserInfo: s.contactToUserInfo(recipient), Ghost: ghost, - Chat: s.makeCreateDMResponse(recipient), + Chat: s.makeCreateDMResponse(ctx, recipient, nil), }, nil } else { return &bridgev2.ResolveIdentifierResponse{ UserID: signalid.MakeUserIDFromServiceID(libsignalgo.NewPNIServiceID(pni)), UserInfo: s.contactToUserInfo(recipient), - Chat: s.makeCreateDMResponse(recipient), + Chat: s.makeCreateDMResponse(ctx, recipient, nil), }, nil } } @@ -207,7 +208,7 @@ func (s *SignalClient) GetContactList(ctx context.Context) ([]*bridgev2.ResolveI for i, recipient := range recipients { recipientResp := &bridgev2.ResolveIdentifierResponse{ UserInfo: s.contactToUserInfo(recipient), - Chat: s.makeCreateDMResponse(recipient), + Chat: s.makeCreateDMResponse(ctx, recipient, nil), } if recipient.ACI != uuid.Nil { recipientResp.UserID = signalid.MakeUserID(recipient.ACI) @@ -224,7 +225,7 @@ func (s *SignalClient) GetContactList(ctx context.Context) ([]*bridgev2.ResolveI return resp, nil } -func (s *SignalClient) makeCreateDMResponse(recipient *types.Recipient) *bridgev2.CreateChatResponse { +func (s *SignalClient) makeCreateDMResponse(ctx context.Context, recipient *types.Recipient, backupChat *store.BackupChat) *bridgev2.CreateChatResponse { name := "" topic := PrivateChatTopic selfUser := s.makeEventSender(s.Client.Store.ACI) @@ -247,6 +248,13 @@ func (s *SignalClient) makeCreateDMResponse(recipient *types.Recipient) *bridgev name = s.Main.Config.FormatDisplayname(recipient) serviceID = libsignalgo.NewPNIServiceID(recipient.PNI) } else { + if backupChat == nil { + var err error + backupChat, err = s.Client.Store.BackupStore.GetBackupChatByUserID(ctx, libsignalgo.NewACIServiceID(recipient.ACI)) + if err != nil { + zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to get backup chat for recipient") + } + } members.OtherUserID = signalid.MakeUserID(recipient.ACI) if recipient.ACI == s.Client.Store.ACI { name = NoteToSelfName @@ -275,6 +283,8 @@ func (s *SignalClient) makeCreateDMResponse(recipient *types.Recipient) *bridgev Topic: &topic, Members: members, Type: ptr.Ptr(database.RoomTypeDM), + + CanBackfill: backupChat != nil, }, } } diff --git a/pkg/connector/chatsync.go b/pkg/connector/chatsync.go new file mode 100644 index 00000000..5d982e72 --- /dev/null +++ b/pkg/connector/chatsync.go @@ -0,0 +1,157 @@ +// mautrix-signal - A Matrix-Signal puppeting bridge. +// Copyright (C) 2025 Tulir Asokan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package connector + +import ( + "context" + "encoding/base64" + "time" + + "github.com/google/uuid" + "github.com/rs/zerolog" + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/simplevent" + + "go.mau.fi/mautrix-signal/pkg/libsignalgo" + "go.mau.fi/mautrix-signal/pkg/signalid" + "go.mau.fi/mautrix-signal/pkg/signalmeow/protobuf/backuppb" + "go.mau.fi/mautrix-signal/pkg/signalmeow/types" +) + +func (s *SignalClient) syncChats(ctx context.Context) { + if s.UserLogin.Metadata.(*signalid.UserLoginMetadata).ChatsSynced { + return + } + if s.Client.Store.EphemeralBackupKey != nil { + zerolog.Ctx(ctx).Info().Msg("Fetching transfer archive before syncing chats") + meta, err := s.Client.WaitForTransfer(ctx) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to request transfer archive") + return + } else if meta.Error != "" { + zerolog.Ctx(ctx).Error().Str("error_type", meta.Error).Msg("Transfer archive request was rejected") + s.UserLogin.Metadata.(*signalid.UserLoginMetadata).ChatsSynced = true + err = s.UserLogin.Save(ctx) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to save user login metadata after transfer archive request was rejected") + } + return + } + err = s.Client.FetchAndProcessTransfer(ctx, meta) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to fetch and process transfer archive") + return + } + zerolog.Ctx(ctx).Info().Msg("Transfer archive fetched and processed, syncing chats") + } + chats, err := s.Client.Store.BackupStore.GetBackupChats(ctx) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to get chats from backup store") + return + } + zerolog.Ctx(ctx).Info().Int("chat_count", len(chats)).Msg("Fetched chats to sync from database") + for _, chat := range chats { + recipient, err := s.Client.Store.BackupStore.GetBackupRecipient(ctx, chat.RecipientId) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to get recipient for chat") + continue + } + resyncEvt := &simplevent.ChatResync{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventChatResync, + LogContext: func(c zerolog.Context) zerolog.Context { + return c. + Int("message_count", chat.TotalMessages). + Uint64("backup_chat_id", chat.Id). + Uint64("backup_recipient_id", chat.RecipientId) + }, + CreatePortal: true, + }, + LatestMessageTS: time.UnixMilli(int64(chat.LatestMessageID)), + } + switch dest := recipient.Destination.(type) { + case *backuppb.Recipient_Contact: + aci := tryCastUUID(dest.Contact.GetAci()) + pni := tryCastUUID(dest.Contact.GetPni()) + if chat.TotalMessages == 0 { + zerolog.Ctx(ctx).Debug(). + Stringer("aci", aci). + Stringer("pni", pni). + Uint64("e164", dest.Contact.GetE164()). + Msg("Skipping direct chat with no messages and deleting data") + err = s.Client.Store.BackupStore.DeleteBackupChat(ctx, chat.Id) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to delete chat from backup store") + } + continue + } + processedRecipient, err := s.Client.Store.RecipientStore.LoadAndUpdateRecipient(ctx, aci, pni, nil) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to get full recipient data") + continue + } + dmInfo := s.makeCreateDMResponse(ctx, processedRecipient, chat) + resyncEvt.PortalKey = dmInfo.PortalKey + resyncEvt.ChatInfo = dmInfo.PortalInfo + case *backuppb.Recipient_Self: + processedRecipient, err := s.Client.Store.RecipientStore.LoadAndUpdateRecipient(ctx, s.Client.Store.ACI, uuid.Nil, nil) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to get full recipient data") + continue + } + dmInfo := s.makeCreateDMResponse(ctx, processedRecipient, chat) + resyncEvt.PortalKey = dmInfo.PortalKey + resyncEvt.ChatInfo = dmInfo.PortalInfo + case *backuppb.Recipient_Group: + if len(dest.Group.MasterKey) != libsignalgo.GroupMasterKeyLength { + continue + } + rawGroupID, err := libsignalgo.GroupMasterKey(dest.Group.MasterKey).GroupIdentifier() + if err != nil { + zerolog.Ctx(ctx).Err(err). + Uint64("recipient_id", recipient.Id). + Msg("Failed to get group identifier from master key") + continue + } + groupID := types.GroupIdentifier(base64.StdEncoding.EncodeToString(rawGroupID[:])) + groupInfo, err := s.getGroupInfo(ctx, groupID, dest.Group.GetSnapshot().GetVersion(), chat) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to get full group info") + continue + } + resyncEvt.PortalKey = s.makePortalKey(string(groupID)) + resyncEvt.ChatInfo = groupInfo + default: + zerolog.Ctx(ctx).Debug(). + Type("destination_type", dest). + Uint64("backup_chat_id", chat.Id). + Uint64("backup_recipient_id", chat.RecipientId). + Msg("Ignoring and deleting chat with unsupported destination type") + err = s.Client.Store.BackupStore.DeleteBackupChat(ctx, chat.Id) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to delete chat from backup store") + } + continue + } + s.UserLogin.QueueRemoteEvent(resyncEvt) + } + s.UserLogin.Metadata.(*signalid.UserLoginMetadata).ChatsSynced = true + err = s.UserLogin.Save(ctx) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to save user login metadata after syncing chats") + } +} diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 2f2925c7..ece67530 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -206,6 +206,16 @@ func (s *SignalClient) bridgeStateLoop(statusChan <-chan signalmeow.SignalConnec } } +func (s *SignalClient) postLoginConnect(ctx context.Context) { + // TODO it would be more proper to only connect after syncing, + // but currently syncing will fetch group info online, so it has to be connected. + s.Connect(ctx) + s.syncChats(ctx) + if s.Client.Store.MasterKey != nil { + s.Client.SyncStorage(ctx) + } +} + func (s *SignalClient) Connect(ctx context.Context) { if s.Client == nil { s.UserLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateBadCredentials, Message: "You're not logged into Signal"}) @@ -283,6 +293,7 @@ func (s *SignalClient) tryConnect(ctx context.Context, retryCount int) { } } else { go s.bridgeStateLoop(ch) + go s.syncChats(ctx) } } diff --git a/pkg/connector/dbmeta.go b/pkg/connector/dbmeta.go index 4ee3b083..d8f644e1 100644 --- a/pkg/connector/dbmeta.go +++ b/pkg/connector/dbmeta.go @@ -33,7 +33,9 @@ func (s *SignalConnector) GetDBMetaTypes() database.MetaTypes { Message: func() any { return &signalid.MessageMetadata{} }, - Reaction: nil, - UserLogin: nil, + Reaction: nil, + UserLogin: func() any { + return &signalid.UserLoginMetadata{} + }, } } diff --git a/pkg/connector/groupinfo.go b/pkg/connector/groupinfo.go index e5c1fc5f..0cb3ce55 100644 --- a/pkg/connector/groupinfo.go +++ b/pkg/connector/groupinfo.go @@ -31,6 +31,7 @@ import ( "go.mau.fi/mautrix-signal/pkg/libsignalgo" "go.mau.fi/mautrix-signal/pkg/signalid" "go.mau.fi/mautrix-signal/pkg/signalmeow" + "go.mau.fi/mautrix-signal/pkg/signalmeow/store" "go.mau.fi/mautrix-signal/pkg/signalmeow/types" ) @@ -93,7 +94,7 @@ func inviteLinkToJoinRule(inviteLinkAccess signalmeow.AccessControl) event.JoinR } } -func (s *SignalClient) getGroupInfo(ctx context.Context, groupID types.GroupIdentifier, minRevision uint32) (*bridgev2.ChatInfo, error) { +func (s *SignalClient) getGroupInfo(ctx context.Context, groupID types.GroupIdentifier, minRevision uint32, backupChat *store.BackupChat) (*bridgev2.ChatInfo, error) { groupInfo, err := s.Client.RetrieveGroupByID(ctx, groupID, minRevision) if err != nil { return nil, err @@ -152,6 +153,13 @@ func (s *SignalClient) getGroupInfo(ctx context.Context, groupID types.GroupIden Membership: event.MembershipBan, } } + if backupChat == nil { + // TODO allow using backup chat for data too instead of asking server? + backupChat, err = s.Client.Store.BackupStore.GetBackupChatByGroupID(ctx, groupID) + if err != nil { + zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to get backup chat for group") + } + } return &bridgev2.ChatInfo{ Name: &groupInfo.Title, Topic: &groupInfo.Description, @@ -164,6 +172,7 @@ func (s *SignalClient) getGroupInfo(ctx context.Context, groupID types.GroupIden Type: ptr.Ptr(database.RoomTypeDefault), JoinRule: &event.JoinRulesEventContent{JoinRule: joinRule}, ExtraUpdates: makeRevisionUpdater(groupInfo.Revision), + CanBackfill: backupChat != nil, }, nil } @@ -366,7 +375,7 @@ func (s *SignalClient) catchUpGroup(ctx context.Context, portal *bridgev2.Portal Logger() if fromRevision == 0 { log.Info().Msg("Syncing full group info") - info, err := s.getGroupInfo(ctx, types.GroupIdentifier(portal.ID), toRevision) + info, err := s.getGroupInfo(ctx, types.GroupIdentifier(portal.ID), toRevision, nil) if err != nil { log.Err(err).Msg("Failed to get group info") } else { diff --git a/pkg/connector/login.go b/pkg/connector/login.go index e2f2e98d..3b81b0aa 100644 --- a/pkg/connector/login.go +++ b/pkg/connector/login.go @@ -79,7 +79,9 @@ func (qr *QRLogin) Start(ctx context.Context) (*bridgev2.LoginStep, error) { provCtx, cancel := context.WithCancel(log.WithContext(context.Background())) qr.cancelChan = cancel // Don't use the start context here: the channel will outlive the start request. - qr.ProvChan = signalmeow.PerformProvisioning(provCtx, qr.Main.Store, qr.Main.Config.DeviceName, false) + qr.ProvChan = signalmeow.PerformProvisioning( + provCtx, qr.Main.Store, qr.Main.Config.DeviceName, qr.Main.Bridge.Config.Backfill.Enabled, + ) var resp signalmeow.ProvisioningResponse select { case resp = <-qr.ProvChan: @@ -165,6 +167,7 @@ func (qr *QRLogin) processingWait(ctx context.Context) (*bridgev2.LoginStep, err RemoteProfile: status.RemoteProfile{ Phone: qr.ProvData.Number, }, + Metadata: &signalid.UserLoginMetadata{}, }, &bridgev2.NewLoginParams{ DeleteOnConflict: true, }) @@ -172,10 +175,16 @@ func (qr *QRLogin) processingWait(ctx context.Context) (*bridgev2.LoginStep, err return nil, fmt.Errorf("failed to create user login: %w", err) } backgroundCtx := ul.Log.WithContext(context.Background()) - ul.Client.Connect(backgroundCtx) - if signalClient := ul.Client.(*SignalClient).Client; signalClient.Store.MasterKey != nil { - zerolog.Ctx(ctx).Info().Msg("Received master key in login, syncing storage immediately") - go signalClient.SyncStorage(backgroundCtx) + signalClient := ul.Client.(*SignalClient).Client + if signalClient.Store.EphemeralBackupKey != nil { + zerolog.Ctx(ctx).Info().Msg("Received ephemeral backup key in login, syncing chats before connecting") + go ul.Client.(*SignalClient).postLoginConnect(backgroundCtx) + } else { + ul.Client.Connect(backgroundCtx) + if signalClient.Store.MasterKey != nil { + zerolog.Ctx(ctx).Info().Msg("Received master key in login, syncing storage immediately") + go signalClient.SyncStorage(backgroundCtx) + } } return &bridgev2.LoginStep{ Type: bridgev2.LoginStepTypeComplete, diff --git a/pkg/msgconv/from-signal.go b/pkg/msgconv/from-signal.go index 714b7235..7ee29b71 100644 --- a/pkg/msgconv/from-signal.go +++ b/pkg/msgconv/from-signal.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/base64" + "errors" "fmt" "net/http" "strings" @@ -41,6 +42,11 @@ import ( signalpb "go.mau.fi/mautrix-signal/pkg/signalmeow/protobuf" ) +var ( + ErrAttachmentNotInBackup = errors.New("attachment not found in backup") + ErrBackupNotSupported = errors.New("downloading attachments from server-side backup is not yet supported") +) + func calculateLength(dm *signalpb.DataMessage) int { if dm.GetFlags()&uint32(signalpb.DataMessage_EXPIRATION_TIMER_UPDATE) != 0 { return 1 @@ -384,6 +390,23 @@ func (mc *MessageConverter) convertContactToMatrix(ctx context.Context, contact func (mc *MessageConverter) convertAttachmentToMatrix(ctx context.Context, index int, att *signalpb.AttachmentPointer, attMap AttachmentMap) *bridgev2.ConvertedMessagePart { part, err := mc.reuploadAttachment(ctx, att, attMap) if err != nil { + if (errors.Is(err, signalmeow.ErrAttachmentNotFound) || errors.Is(err, ErrAttachmentNotInBackup)) && attMap != nil { + return &bridgev2.ConvertedMessagePart{ + Type: event.EventMessage, + Content: &event.MessageEventContent{ + MsgType: event.MsgNotice, + Body: fmt.Sprintf("Attachment no longer available %s", att.GetFileName()), + }, + } + } else if errors.Is(err, ErrBackupNotSupported) { + return &bridgev2.ConvertedMessagePart{ + Type: event.EventMessage, + Content: &event.MessageEventContent{ + MsgType: event.MsgNotice, + Body: "Downloading attachments from backup is not yet supported", + }, + } + } zerolog.Ctx(ctx).Err(err).Int("attachment_index", index).Msg("Failed to handle attachment") return &bridgev2.ConvertedMessagePart{ Type: event.EventMessage, @@ -434,7 +457,7 @@ func (mc *MessageConverter) convertStickerToMatrix(ctx context.Context, sticker func (mc *MessageConverter) downloadSignalLongText(ctx context.Context, att *signalpb.AttachmentPointer, attMap AttachmentMap) (*string, error) { data, err := mc.downloadAttachment(ctx, att, attMap) if err != nil { - return nil, fmt.Errorf("failed to download attachment: %w", err) + return nil, err } longBody := string(data) return &longBody, nil @@ -449,10 +472,10 @@ func (mc *MessageConverter) downloadAttachment(ctx context.Context, att *signalp if !ok { return nil, fmt.Errorf("no attachment identifier and attachment not found in map") } else if target == nil { - return nil, fmt.Errorf("attachment not available in backup") + return nil, ErrAttachmentNotInBackup } else { // TODO add support for downloading attachments from backup - return nil, fmt.Errorf("downloading attachments from backup is not yet supported") + return nil, ErrBackupNotSupported } } return signalmeow.DownloadAttachment(ctx, att) @@ -461,7 +484,7 @@ func (mc *MessageConverter) downloadAttachment(ctx context.Context, att *signalp func (mc *MessageConverter) reuploadAttachment(ctx context.Context, att *signalpb.AttachmentPointer, attMap AttachmentMap) (*bridgev2.ConvertedMessagePart, error) { data, err := mc.downloadAttachment(ctx, att, attMap) if err != nil { - return nil, fmt.Errorf("failed to download attachment: %w", err) + return nil, err } mimeType := att.GetContentType() if mimeType == "" { diff --git a/pkg/signalid/dbmeta.go b/pkg/signalid/dbmeta.go index f3811af0..76ee2178 100644 --- a/pkg/signalid/dbmeta.go +++ b/pkg/signalid/dbmeta.go @@ -29,6 +29,10 @@ type MessageMetadata struct { ContainsAttachments bool `json:"contains_attachments,omitempty"` } +type UserLoginMetadata struct { + ChatsSynced bool `json:"chats_synced,omitempty"` +} + type GhostMetadata struct { ProfileFetchedAt jsontime.UnixMilli `json:"profile_fetched_at"` } diff --git a/pkg/signalmeow/attachments.go b/pkg/signalmeow/attachments.go index 4d5d6d6e..d733b40c 100644 --- a/pkg/signalmeow/attachments.go +++ b/pkg/signalmeow/attachments.go @@ -57,6 +57,7 @@ func getAttachmentPath(id uint64, key string) string { // ErrInvalidMACForAttachment signals that the downloaded attachment has an invalid MAC. var ErrInvalidMACForAttachment = errors.New("invalid MAC for attachment") var ErrInvalidDigestForAttachment = errors.New("invalid digest for attachment") +var ErrAttachmentNotFound = errors.New("attachment not found on server") func DownloadAttachment(ctx context.Context, a *signalpb.AttachmentPointer) ([]byte, error) { path := getAttachmentPath(a.GetCdnId(), a.GetCdnKey()) @@ -77,6 +78,9 @@ func DownloadAttachment(ctx context.Context, a *signalpb.AttachmentPointer) ([]b } else if len(body) < 1024 { zerolog.Ctx(ctx).Debug().Bytes("response_data", body).Msg("Failed download response data") } + if resp.StatusCode == http.StatusNotFound { + return nil, ErrAttachmentNotFound + } return nil, fmt.Errorf("unexpected status code %d", resp.StatusCode) }