Skip to content

Commit

Permalink
Use series lookup as source of truth for if indexed or not and if sho…
Browse files Browse the repository at this point in the history
…uld phase out
  • Loading branch information
robskillington committed Feb 7, 2021
1 parent de3ce9a commit 834efa6
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 331 deletions.
23 changes: 23 additions & 0 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,12 @@ func (i *nsIndex) BlockForBlockStart(blockStart time.Time) (index.Block, error)
func (i *nsIndex) WriteBatch(
batch *index.WriteBatch,
) error {
// Filter anything with a pending index out before acquiring lock.
batch.MarkUnmarkedIfAlreadyIndexedSuccessAndFinalize()
if !batch.PendingAny() {
return nil
}

i.state.RLock()
if !i.isOpenWithRLock() {
i.state.RUnlock()
Expand Down Expand Up @@ -648,6 +654,23 @@ func (i *nsIndex) WriteBatch(
func (i *nsIndex) WritePending(
pending []writes.PendingIndexInsert,
) error {
// Filter anything with a pending index out before acquiring lock.
for j := 0; j < len(pending); j++ {
t := xtime.ToUnixNano(pending[j].Entry.Timestamp.Truncate(i.blockSize))
if !pending[j].Entry.OnIndexSeries.IfAlreadyIndexedMarkIndexSuccessAndFinalize(t) {
continue
}
// Remove this elem by moving tail here and shrinking by one.
n := len(pending)
pending[j] = pending[n-1]
pending = pending[:n-1]
// Reprocess element.
j--
}
if len(pending) == 0 {
return nil
}

i.state.RLock()
if !i.isOpenWithRLock() {
i.state.RUnlock()
Expand Down
Loading

0 comments on commit 834efa6

Please sign in to comment.