Skip to content

Commit

Permalink
Tried to add posting list cache for single values
Browse files Browse the repository at this point in the history
  • Loading branch information
Harshil Goel committed Oct 12, 2023
1 parent c1bd839 commit 7ba5300
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
34 changes: 31 additions & 3 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ type LocalCache struct {

// plists are posting lists in memory. They can be discarded to reclaim space.
plists map[string]*List

postings map[string]*pb.PostingList
}

// NewLocalCache returns a new LocalCache instance.
Expand All @@ -120,13 +122,17 @@ func NewLocalCache(startTs uint64) *LocalCache {
deltas: make(map[string][]byte),
plists: make(map[string]*List),
maxVersions: make(map[string]uint64),
postings: make(map[string]*pb.PostingList),
}
}

// NoCache returns a new LocalCache instance, which won't cache anything. Useful to pass startTs
// around.
func NoCache(startTs uint64) *LocalCache {
return &LocalCache{startTs: startTs}
return &LocalCache{
startTs: startTs,
postings: make(map[string]*pb.PostingList),
}
}

func (lc *LocalCache) getNoStore(key string) *List {
Expand All @@ -138,6 +144,20 @@ func (lc *LocalCache) getNoStore(key string) *List {
return nil
}

// SetIfAbsent adds the list for the specified key to the cache. If a list for the same
// key already exists, the cache will not be modified and the existing list
// will be returned instead. This behavior is meant to prevent the goroutines
// using the cache from ending up with an orphaned version of a list.
func (lc *LocalCache) SetPostingIfAbsent(key string, updated *pb.PostingList) *pb.PostingList {
lc.Lock()
defer lc.Unlock()
if pl, ok := lc.postings[key]; ok {
return pl
}
lc.postings[key] = updated
return updated
}

// SetIfAbsent adds the list for the specified key to the cache. If a list for the same
// key already exists, the cache will not be modified and the existing list
// will be returned instead. This behavior is meant to prevent the goroutines
Expand Down Expand Up @@ -200,8 +220,10 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e
remaining_keys := make([][]byte, 0)
lc.RLock()
for i, key := range keys {
pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
if pl, ok := lc.postings[string(key)]; ok && pl != nil {
results[i] = pl
} else if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
pl := &pb.PostingList{}
err := pl.Unmarshal(delta)
if err != nil {
results[i] = pl
Expand All @@ -214,6 +236,10 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e

txn := pstore.NewTransactionAt(lc.startTs, false)
items, err := txn.GetBatch(remaining_keys)
if err != nil {
fmt.Println(err, keys)
return nil, err
}
idx := 0

for i := 0; i < len(results); i++ {
Expand Down Expand Up @@ -245,6 +271,7 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e
}
pl.Postings = pl.Postings[:idx]
results[i] = pl
lc.SetPostingIfAbsent(string(keys[i]), pl)
}

return results, err
Expand Down Expand Up @@ -311,6 +338,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
}
}
pl.Postings = pl.Postings[:idx]
lc.SetPostingIfAbsent(string(key), pl)
return pl, nil
}

Expand Down
30 changes: 29 additions & 1 deletion worker/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/hex"
"sort"
"strings"
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
Expand Down Expand Up @@ -511,7 +512,34 @@ func processSort(ctx context.Context, ts *pb.SortMessage) (*pb.SortResult, error
cctx, cancel := context.WithCancel(ctx)
defer cancel()

r := sortWithIndex(cctx, ts)
resCh := make(chan *sortresult, 2)
go func() {
select {
case <-time.After(3 * time.Millisecond):
// Wait between ctx chan and time chan.
case <-ctx.Done():
resCh <- &sortresult{err: ctx.Err()}
return
}
r := sortWithoutIndex(cctx, ts)
resCh <- r
}()

go func() {
sr := sortWithIndex(cctx, ts)
resCh <- sr
}()

r := <-resCh
if r.err == nil {
cancel()
// wait for other goroutine to get cancelled
<-resCh
} else {
span.Annotatef(nil, "processSort error: %v", r.err)
r = <-resCh
}

if r.err != nil {
return nil, r.err
}
Expand Down

0 comments on commit 7ba5300

Please sign in to comment.