Skip to content

Commit

Permalink
Fix history recording bug
Browse files Browse the repository at this point in the history
Reword msg feed handling
  • Loading branch information
y-a-t-s committed Sep 23, 2024
1 parent 6e21cd3 commit 1a2e9d5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 73 deletions.
34 changes: 9 additions & 25 deletions chat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"strings"
"sync"

"y-a-t-s/sockchat/config"

Expand Down Expand Up @@ -58,26 +57,16 @@ func (c *Chat) Stop() {
c.Cfg = c.sock.cfg
}

func newHistChan() chan *Message {
return make(chan *Message, HIST_LEN)
}

func (c *Chat) recordHistory(feed <-chan *Message) chan chan *Message {
out := make(chan chan *Message, 2)

var (
prevID uint32 = 0 // ID of previously processed msg.
out := make(chan chan *Message, 4)

// Length tracker.
// Once we reach the max, the oldest msgs get popped off the feed.
hl = 0
hist = newHistChan()
)
var prevID uint32 = 0 // ID of previously processed msg.

hist := newFeedChan()
editHist := func(msg *Message) {
close(hist)

hc, nc := newHistChan(), newHistChan()
hc, nc := newFeedChan(), newFeedChan()
for hm := range hist {
if msg.MessageID == hm.MessageID {
hm.Release()
Expand All @@ -104,14 +93,14 @@ func (c *Chat) recordHistory(feed <-chan *Message) chan chan *Message {
case msg.IsEdited() && msg.MessageID <= prevID:
editHist(msg)
default:
switch {
case hl >= HIST_LEN:
select {
case hist <- msg:
default:
hm := <-hist
hm.Release()
hist <- msg
default:
hl++
}
prevID = msg.MessageID
}
}
}()
Expand All @@ -123,17 +112,13 @@ func (c *Chat) router(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup

histFeed := make(chan *Message, HIST_LEN)
defer close(histFeed)
hist := c.recordHistory(histFeed)

if c.Cfg.Logger {
logFeed := c.Feeder.NewFeed()
wg.Add(1)
logFeed := c.Feeder.Feed()
context.AfterFunc(ctx, func() {
defer wg.Done()
logFeed.Close()
})

Expand Down Expand Up @@ -185,7 +170,6 @@ func (c *Chat) router(ctx context.Context) {
select {
case <-ctx.Done():
cancel()
wg.Wait()
return
case hf := <-hist:
c.History <- hf
Expand Down
74 changes: 29 additions & 45 deletions chat/feeds.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package chat

import (
"context"
"sync"
)

func newFeedChan() chan *Message {
return make(chan *Message, HIST_LEN)
}

type feed struct {
Feed chan Message
closed chan struct{}
}

func newFeed() *feed {
return &feed{
func newFeed() feed {
return feed{
Feed: make(chan Message, HIST_LEN),
closed: make(chan struct{}, 1),
}
Expand Down Expand Up @@ -39,50 +42,42 @@ func (mf *feed) Close() {
}

type Feeder struct {
feeds []*feed
in chan<- *Message
in chan<- *Message

mx sync.RWMutex
once sync.Once
Feed func() feed
}

func NewFeeder(ctx context.Context) *Feeder {
in := make(chan *Message, 512)
fdr := &Feeder{
feeds: make([]*feed, 0, 4),
in: in,
}
in := newFeedChan()

dropFeed := func(i int) {
fdr.mx.Lock()
defer fdr.mx.Unlock()
feeds := make([]feed, 0, 4)
newFeeds := make(chan feed)

fl := len(fdr.feeds) - 1
if i > fl {
return
}

fdr.feeds[i].Close()
if i < fl {
fdr.feeds[i] = fdr.feeds[fl]
}
fdr.feeds = fdr.feeds[:fl-1]
fdr := &Feeder{
in: in,
Feed: func() feed {
mf := newFeed()
newFeeds <- mf
return mf
},
}

castMsg := func(msg *Message) {
if msg == nil {
return
}

fdr.mx.RLock()
defer fdr.mx.RUnlock()

for i, mf := range fdr.feeds {
for i := range feeds {
select {
case <-mf.closed:
go dropFeed(i)
case <-feeds[i].closed:
// See if the closed feed can be replaced now.
select {
case feeds[i] = <-newFeeds:
feeds[i].send(msg)
default:
}
default:
mf.send(msg)
feeds[i].send(msg)
}
}
}
Expand All @@ -92,6 +87,8 @@ func NewFeeder(ctx context.Context) *Feeder {
select {
case <-ctx.Done():
return
case mf := <-newFeeds:
feeds = append(feeds, mf)
case msg := <-in:
castMsg(msg)
}
Expand All @@ -101,19 +98,6 @@ func NewFeeder(ctx context.Context) *Feeder {
return fdr
}

func (fdr *Feeder) NewFeed() *feed {
mf := newFeed()

go func() {
fdr.mx.Lock()
defer fdr.mx.Unlock()

fdr.feeds = append(fdr.feeds, mf)
}()

return mf
}

func (fdr *Feeder) Send(msg *Message) {
fdr.in <- msg
}
7 changes: 4 additions & 3 deletions services/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (ui *TUI) incomingHandler(ctx context.Context) {
}

// Chat msg feed from socket.
feed := ui.Chat.Feeder.NewFeed()
feed := ui.Chat.Feeder.Feed()
defer feed.Close()

for {
Expand Down Expand Up @@ -211,9 +211,10 @@ func (ui *TUI) incomingHandler(ctx context.Context) {
}

ui.Console.Clear()
ui.Console.ScrollToEnd()
bb.WriteTo(ui.Console)
bb.Reset()
go highlight()
highlight()
case msg := <-feed.Feed:
if msg.IsEdited() && msg.MessageID <= prevID {
continue
Expand All @@ -222,7 +223,7 @@ func (ui *TUI) incomingHandler(ctx context.Context) {
io.WriteString(ui.Console, msgStr(&msg))
if msg.IsMention {
mentionIDs = append(mentionIDs, fmt.Sprint(msg.MessageID))
go highlight()
highlight()
}

prevID = msg.MessageID
Expand Down

0 comments on commit 1a2e9d5

Please sign in to comment.