Skip to content

Commit

Permalink
fix: instable peer caching and handling
Browse files Browse the repository at this point in the history
Previously all the peers saving functions were goroutined
due to which the peer handling system was highly unstable
and hence this change has been implemented to goroutine
only the part which saves the fetched peers to persistent
database and all other goroutine calls have been converted
to syncronous function calls.
  • Loading branch information
celestix committed Apr 19, 2024
1 parent 67ccdcd commit b55e34a
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 29 deletions.
8 changes: 4 additions & 4 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,16 @@ func (dp *NativeDispatcher) Handle(ctx context.Context, updates tg.UpdatesClass)
chats := u.MapChats()
e.Chats = chats.ChatToMap()
e.Channels = chats.ChannelToMap()
go saveUsersPeers(u.Users, dp.pStorage)
go saveChatsPeers(u.Chats, dp.pStorage)
saveUsersPeers(u.Users, dp.pStorage)
saveChatsPeers(u.Chats, dp.pStorage)
case *tg.UpdatesCombined:
upds = u.Updates
e.Users = u.MapUsers().NotEmptyToMap()
chats := u.MapChats()
e.Chats = chats.ChatToMap()
e.Channels = chats.ChannelToMap()
go saveUsersPeers(u.Users, dp.pStorage)
go saveChatsPeers(u.Chats, dp.pStorage)
saveUsersPeers(u.Users, dp.pStorage)
saveChatsPeers(u.Chats, dp.pStorage)
case *tg.UpdateShort:
upds = []tg.UpdateClass{u.Update}
e.short()
Expand Down
2 changes: 1 addition & 1 deletion ext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func (ctx *Context) extractContactResolvedPeer(p *tg.ContactsResolvedPeer, err e
if err != nil {
return &types.EmptyUC{}, err
}
go functions.SavePeersFromClassArray(ctx.PeerStorage, p.Chats, p.Users)
functions.SavePeersFromClassArray(ctx.PeerStorage, p.Chats, p.Users)
switch p.Peer.(type) {
case *tg.PeerChannel:
if p.Chats == nil || len(p.Chats) == 0 {
Expand Down
6 changes: 3 additions & 3 deletions ext/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func GetNewUpdate(ctx context.Context, client *tg.Client, p *storage.PeerStorage
for _, vu := range value.Chats {
switch chat := vu.(type) {
case *tg.Chat:
go p.AddPeer(chat.ID, storage.DefaultAccessHash, storage.TypeChat, storage.DefaultUsername)
p.AddPeer(chat.ID, storage.DefaultAccessHash, storage.TypeChat, storage.DefaultUsername)
e.Chats[chat.ID] = chat
case *tg.Channel:
go p.AddPeer(chat.ID, chat.AccessHash, storage.TypeChannel, chat.Username)
p.AddPeer(chat.ID, chat.AccessHash, storage.TypeChannel, chat.Username)
e.Channels[chat.ID] = chat
}
}
Expand All @@ -65,7 +65,7 @@ func GetNewUpdate(ctx context.Context, client *tg.Client, p *storage.PeerStorage
if !ok {
continue
}
go p.AddPeer(user.ID, user.AccessHash, storage.TypeUser, user.Username)
p.AddPeer(user.ID, user.AccessHash, storage.TypeUser, user.Username)
e.Users[user.ID] = user
}
}
Expand Down
12 changes: 6 additions & 6 deletions functions/messageHelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ func GetChannelMessages(context context.Context, client *tg.Client, p *storage.P
}
switch m := messages.(type) {
case *tg.MessagesMessages:
go SavePeersFromClassArray(p, m.Chats, m.Users)
SavePeersFromClassArray(p, m.Chats, m.Users)
return m.Messages, nil
case *tg.MessagesMessagesSlice:
go SavePeersFromClassArray(p, m.Chats, m.Users)
SavePeersFromClassArray(p, m.Chats, m.Users)
return m.Messages, nil
case *tg.MessagesChannelMessages:
go SavePeersFromClassArray(p, m.Chats, m.Users)
SavePeersFromClassArray(p, m.Chats, m.Users)
return m.Messages, nil
default:
return nil, nil
Expand All @@ -54,13 +54,13 @@ func GetChatMessages(context context.Context, client *tg.Client, p *storage.Peer
}
switch m := messages.(type) {
case *tg.MessagesMessages:
go SavePeersFromClassArray(p, m.Chats, m.Users)
SavePeersFromClassArray(p, m.Chats, m.Users)
return m.Messages, nil
case *tg.MessagesMessagesSlice:
go SavePeersFromClassArray(p, m.Chats, m.Users)
SavePeersFromClassArray(p, m.Chats, m.Users)
return m.Messages, nil
case *tg.MessagesChannelMessages:
go SavePeersFromClassArray(p, m.Chats, m.Users)
SavePeersFromClassArray(p, m.Chats, m.Users)
return m.Messages, nil
default:
return nil, nil
Expand Down
4 changes: 2 additions & 2 deletions functions/updatesHelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func GetUpdateClassFromUpdatesClass(updates tg.UpdatesClass, p *storage.PeerStor
func getUpdateFromUpdates(updates tg.UpdatesClass, p *storage.PeerStorage) ([]tg.UpdateClass, []tg.ChatClass, []tg.UserClass) {
switch u := updates.(type) {
case *tg.Updates:
go SavePeersFromClassArray(p, u.Chats, u.Users)
SavePeersFromClassArray(p, u.Chats, u.Users)
return u.Updates, u.Chats, u.Users
case *tg.UpdatesCombined:
go SavePeersFromClassArray(p, u.Chats, u.Users)
SavePeersFromClassArray(p, u.Chats, u.Users)
return u.Updates, u.Chats, u.Users
case *tg.UpdateShort:
return []tg.UpdateClass{u.Update}, tg.ChatClassArray{}, tg.UserClassArray{}
Expand Down
26 changes: 13 additions & 13 deletions storage/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,34 @@ const (

func (p *PeerStorage) AddPeer(iD, accessHash int64, peerType EntityType, userName string) {
peer := &Peer{ID: iD, AccessHash: accessHash, Type: peerType.GetInt(), Username: userName}
p.peerCache.Set(iD, peer)
if p.inMemory {
p.peerCache.Set(iD, peer)
} else {
go p.peerCache.Set(iD, peer)
tx := p.SqlSession.Begin()
tx.Save(peer)
p.peerLock.Lock()
defer p.peerLock.Unlock()
tx.Commit()
return
}
go p.addPeerToDb(peer)
}

func (p *PeerStorage) addPeerToDb(peer *Peer) {
tx := p.SqlSession.Begin()
tx.Save(peer)
p.peerLock.Lock()
defer p.peerLock.Unlock()
tx.Commit()
}

// GetPeerById finds the provided id in the peer storage and return it if found.
func (p *PeerStorage) GetPeerById(iD int64) *Peer {
peer, ok := p.peerCache.Get(iD)
if p.inMemory {
// peer := PeerMemoryMap[iD]
peer, ok := p.peerCache.Get(iD)
if !ok {
return &Peer{}
}
return peer
} else {
peer, ok := p.peerCache.Get(iD)
if !ok {
return p.cachePeers(iD)
}
return peer
}
return peer
}

// GetPeerByUsername finds the provided username in the peer storage and return it if found.
Expand Down

0 comments on commit b55e34a

Please sign in to comment.