Skip to content

Commit

Permalink
add Peek method supporting pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
xiemalin committed Jul 22, 2021
1 parent 955a044 commit 2840500
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 0 deletions.
12 changes: 12 additions & 0 deletions filefanoutqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ func (q *FileFanoutQueue) PeekAll(fanoutID int64) ([][]byte, []int64, error) {
return q.fileQueue.peekAll(index, q.Size(fanoutID))
}

// PeekPagination to peek data from queue by paing feature.
func (q *FileFanoutQueue) PeekPagination(fanoutID int64, page, pagesize uint64) ([][]byte, []int64, error) {

qf, err := q.getQueueFront(fanoutID)
if err != nil {
return nil, nil, err
}
index := qf.frontIndex

return q.fileQueue.peekPagination(index, q.fileQueue.size(index), page, pagesize)
}

// Skip To skip deqeue target number of items
func (q *FileFanoutQueue) Skip(fanoutID int64, count int64) error {
if count <= 0 {
Expand Down
60 changes: 60 additions & 0 deletions filefanoutqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,63 @@ func TestFanoutQueue_Status(t *testing.T) {
})

}

func TestFanoutQueue_PeekPagination(t *testing.T) {
Convey("Test PeekPagination", t, func() {
path := Tempfile()
defer clearFiles(path, "fanoutqueue")
fanoutID := int64(100)

defer clearFrontIndexFiles(path, "fanoutqueue", fanoutID)

queue := FileFanoutQueue{}
err := queue.Open(path, "fanoutqueue", nil)
if err != nil {
t.Error(err)
}
defer queue.Close()

Convey("test PeekPagination on empty queue", func() {
data, indexs, err := queue.PeekPagination(fanoutID, 0, 0)
So(err, ShouldBeNil)
So(data, ShouldBeEmpty)
So(indexs, ShouldBeEmpty)

data, indexs, err = queue.PeekPagination(fanoutID, 1, 1)
So(err, ShouldBeNil)
So(data, ShouldBeEmpty)
So(indexs, ShouldBeEmpty)
})

Convey("test PeekPagination on items small than pagesize", func() {
for i := 0; i < 5; i++ { // add value
_, err := queue.Enqueue([]byte("hello matthew " + strconv.Itoa(i)))
So(err, ShouldBeNil)
}

data, indexs, err := queue.PeekPagination(fanoutID, 0, 0)
So(err, ShouldBeNil)
So(len(data), ShouldEqual, 5)
So(string(data[4]), ShouldEqual, "hello matthew 4")
So(len(indexs), ShouldEqual, 5)

data, indexs, err = queue.PeekPagination(fanoutID, 1, 10)
So(err, ShouldBeNil)
So(len(data), ShouldEqual, 5)
So(string(data[4]), ShouldEqual, "hello matthew 4")
So(len(indexs), ShouldEqual, 5)

data, indexs, err = queue.PeekPagination(fanoutID, 2, 10) // large paing
So(err, ShouldBeNil)
So(data, ShouldBeEmpty)
So(indexs, ShouldBeEmpty)

data, indexs, err = queue.PeekPagination(fanoutID, 2, 2)
So(err, ShouldBeNil)
So(len(data), ShouldEqual, 2)
So(string(data[1]), ShouldEqual, "hello matthew 3")
So(len(indexs), ShouldEqual, 2)
})

})
}
48 changes: 48 additions & 0 deletions filequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
fileSuffix = ".dat"

defaultFileMode = 0666

Default_Page_Size = 10
)

// DefaultOptions default options
Expand Down Expand Up @@ -435,6 +437,52 @@ func (q *FileQueue) PeekAll() ([][]byte, []int64, error) {
return q.peekAll(index, q.Size())
}

// PeekPagination to peek data from queue by paing feature.
func (q *FileQueue) PeekPagination(page, pagesize uint64) ([][]byte, []int64, error) {
return q.peekPagination(q.frontIndex, q.Size(), page, pagesize)
}

// peekPagination to peek data from queue by paing feature.
func (q *FileQueue) peekPagination(frontindex int64, size int64, page, pagesize uint64) ([][]byte, []int64, error) {
if page == 0 {
page = 1
}
if pagesize == 0 {
pagesize = Default_Page_Size
}

begin := (page - 1) * pagesize
end := begin + pagesize

if begin > uint64(size) { // no data return
return [][]byte{}, []int64{}, nil
}

if end > uint64(size) {
end = uint64(size)
pagesize = end - begin
}

// fix the offset
begin = begin + uint64(frontindex)
end = end + uint64(frontindex)

result := make([][]byte, pagesize)
indexs := make([]int64, pagesize)

var index int = 0
for i := begin; i < end; i++ {
bb, err := q.peek(int64(i))
if err != nil {
return nil, nil, err
}
result[index] = bb
indexs[index] = int64(i)
index++
}
return result, indexs, nil
}

// Skip the target n items to front index
func (q *FileQueue) Skip(count int64) error {
if q.IsEmpty() {
Expand Down
61 changes: 61 additions & 0 deletions filequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bigqueue

import (
"fmt"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -431,6 +432,66 @@ func TestFileQueue_Status(t *testing.T) {

}

// TestFileQueue_PeekPagination
func TestFileQueue_PeekPagination(t *testing.T) {
Convey("Test PeekPagination", t, func() {

path := Tempfile()
defer clearFiles(path, "testqueue")

var queue = new(FileQueue)

err := queue.Open(path, "testqueue", nil)
if err != nil {
t.Error(err)
}
defer queue.Close()

Convey("test PeekPagination on empty queue", func() {
data, indexs, err := queue.PeekPagination(0, 0)
So(err, ShouldBeNil)
So(data, ShouldBeEmpty)
So(indexs, ShouldBeEmpty)

data, indexs, err = queue.PeekPagination(1, 1)
So(err, ShouldBeNil)
So(data, ShouldBeEmpty)
So(indexs, ShouldBeEmpty)
})

Convey("test PeekPagination on items small than pagesize", func() {
for i := 0; i < 5; i++ { // add value
_, err := queue.Enqueue([]byte("hello matthew " + strconv.Itoa(i)))
So(err, ShouldBeNil)
}

data, indexs, err := queue.PeekPagination(0, 0)
So(err, ShouldBeNil)
So(len(data), ShouldEqual, 5)
So(string(data[4]), ShouldEqual, "hello matthew 4")
So(len(indexs), ShouldEqual, 5)

data, indexs, err = queue.PeekPagination(1, 10)
So(err, ShouldBeNil)
So(len(data), ShouldEqual, 5)
So(string(data[4]), ShouldEqual, "hello matthew 4")
So(len(indexs), ShouldEqual, 5)

data, indexs, err = queue.PeekPagination(2, 10) // large paing
So(err, ShouldBeNil)
So(data, ShouldBeEmpty)
So(indexs, ShouldBeEmpty)

data, indexs, err = queue.PeekPagination(2, 2)
So(err, ShouldBeNil)
So(len(data), ShouldEqual, 2)
So(string(data[1]), ShouldEqual, "hello matthew 3")
So(len(indexs), ShouldEqual, 2)
})

})
}

// tempfile returns a temporary file path.
func Tempfile() string {
return "./bin/temp"
Expand Down

0 comments on commit 2840500

Please sign in to comment.