Skip to content

Commit

Permalink
Add PeekChan() (#14)
Browse files Browse the repository at this point in the history
* Add PeekChan() based on nsqio@2cb4338

* Add peekchan test
  • Loading branch information
kev1n80 authored Sep 22, 2023
1 parent bb13b3d commit 9ac83ef
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
13 changes: 13 additions & 0 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (l LogLevel) String() string {
type Interface interface {
Put([]byte) error
ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
PeekChan() <-chan []byte // this is expected to be an *unbuffered* channel
Close() error
Delete() error
Depth() int64
Expand Down Expand Up @@ -102,6 +103,9 @@ type diskQueue struct {
// exposed via ReadChan()
readChan chan []byte

// exposed via PeekChan()
peekChan chan []byte

// internal channels
depthChan chan int64
writeChan chan []byte
Expand Down Expand Up @@ -148,6 +152,7 @@ func NewWithDiskSpace(name string, dataPath string,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
peekChan: make(chan []byte),
depthChan: make(chan int64),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
Expand Down Expand Up @@ -196,6 +201,10 @@ func (d *diskQueue) start() error {
return nil
}

func (d *diskQueue) PeekChan() <-chan []byte {
return d.peekChan
}

// Depth returns the depth of the queue
func (d *diskQueue) Depth() int64 {
depth, ok := <-d.depthChan
Expand Down Expand Up @@ -988,6 +997,7 @@ func (d *diskQueue) ioLoop() {
var err error
var count int64
var r chan []byte
var p chan []byte

syncTicker := time.NewTicker(d.syncTimeout)

Expand Down Expand Up @@ -1016,13 +1026,16 @@ func (d *diskQueue) ioLoop() {
}
}
r = d.readChan
p = d.peekChan
} else {
r = nil
p = nil
}

select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case p <- dataRead:
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
Expand Down
77 changes: 77 additions & 0 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,3 +1260,80 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
<-dq.ReadChan()
}
}

func TestDiskQueuePeek(t *testing.T) {
l := NewTestLogger(t)
dqName := "test_disk_queue_peek" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
msg := bytes.Repeat([]byte{0}, 10)
ml := int64(len(msg))
dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l)
defer dq.Close()
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())

t.Run("roll", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(i+1), dq.Depth())
}

for i := 10; i > 0; i-- {
Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(i-1), dq.Depth())
}

Nil(t, dq.Empty())
})

t.Run("peek-read", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(i+1), dq.Depth())
}

for i := 10; i > 0; i-- {
Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(i-1), dq.Depth())
}

Nil(t, dq.Empty())
})

t.Run("read-peek", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(i+1), dq.Depth())
}

for i := 10; i > 1; i-- {
Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(i-1), dq.Depth())

Equal(t, msg, <-dq.PeekChan())
Equal(t, int64(i-1), dq.Depth())
}

Nil(t, dq.Empty())
})

}

0 comments on commit 9ac83ef

Please sign in to comment.