diff --git a/filefanoutqueue.go b/filefanoutqueue.go index e7ad67d..77ffc3a 100644 --- a/filefanoutqueue.go +++ b/filefanoutqueue.go @@ -180,6 +180,18 @@ func (q *FileFanoutQueue) PeekAll(fanoutID int64) ([][]byte, []int64, error) { return q.fileQueue.peekAll(index, q.Size(fanoutID)) } +// PeekPagination to peek data from queue by paing feature. +func (q *FileFanoutQueue) PeekPagination(fanoutID int64, page, pagesize uint64) ([][]byte, []int64, error) { + + qf, err := q.getQueueFront(fanoutID) + if err != nil { + return nil, nil, err + } + index := qf.frontIndex + + return q.fileQueue.peekPagination(index, q.fileQueue.size(index), page, pagesize) +} + // Skip To skip deqeue target number of items func (q *FileFanoutQueue) Skip(fanoutID int64, count int64) error { if count <= 0 { diff --git a/filefanoutqueue_test.go b/filefanoutqueue_test.go index 5878c7c..72beebd 100644 --- a/filefanoutqueue_test.go +++ b/filefanoutqueue_test.go @@ -368,3 +368,63 @@ func TestFanoutQueue_Status(t *testing.T) { }) } + +func TestFanoutQueue_PeekPagination(t *testing.T) { + Convey("Test PeekPagination", t, func() { + path := Tempfile() + defer clearFiles(path, "fanoutqueue") + fanoutID := int64(100) + + defer clearFrontIndexFiles(path, "fanoutqueue", fanoutID) + + queue := FileFanoutQueue{} + err := queue.Open(path, "fanoutqueue", nil) + if err != nil { + t.Error(err) + } + defer queue.Close() + + Convey("test PeekPagination on empty queue", func() { + data, indexs, err := queue.PeekPagination(fanoutID, 0, 0) + So(err, ShouldBeNil) + So(data, ShouldBeEmpty) + So(indexs, ShouldBeEmpty) + + data, indexs, err = queue.PeekPagination(fanoutID, 1, 1) + So(err, ShouldBeNil) + So(data, ShouldBeEmpty) + So(indexs, ShouldBeEmpty) + }) + + Convey("test PeekPagination on items small than pagesize", func() { + for i := 0; i < 5; i++ { // add value + _, err := queue.Enqueue([]byte("hello matthew " + strconv.Itoa(i))) + So(err, ShouldBeNil) + } + + data, indexs, err := queue.PeekPagination(fanoutID, 0, 0) + So(err, ShouldBeNil) + So(len(data), ShouldEqual, 5) + So(string(data[4]), ShouldEqual, "hello matthew 4") + So(len(indexs), ShouldEqual, 5) + + data, indexs, err = queue.PeekPagination(fanoutID, 1, 10) + So(err, ShouldBeNil) + So(len(data), ShouldEqual, 5) + So(string(data[4]), ShouldEqual, "hello matthew 4") + So(len(indexs), ShouldEqual, 5) + + data, indexs, err = queue.PeekPagination(fanoutID, 2, 10) // large paing + So(err, ShouldBeNil) + So(data, ShouldBeEmpty) + So(indexs, ShouldBeEmpty) + + data, indexs, err = queue.PeekPagination(fanoutID, 2, 2) + So(err, ShouldBeNil) + So(len(data), ShouldEqual, 2) + So(string(data[1]), ShouldEqual, "hello matthew 3") + So(len(indexs), ShouldEqual, 2) + }) + + }) +} diff --git a/filequeue.go b/filequeue.go index 8ddd825..10bd423 100644 --- a/filequeue.go +++ b/filequeue.go @@ -40,6 +40,8 @@ const ( fileSuffix = ".dat" defaultFileMode = 0666 + + Default_Page_Size = 10 ) // DefaultOptions default options @@ -435,6 +437,52 @@ func (q *FileQueue) PeekAll() ([][]byte, []int64, error) { return q.peekAll(index, q.Size()) } +// PeekPagination to peek data from queue by paing feature. +func (q *FileQueue) PeekPagination(page, pagesize uint64) ([][]byte, []int64, error) { + return q.peekPagination(q.frontIndex, q.Size(), page, pagesize) +} + +// peekPagination to peek data from queue by paing feature. +func (q *FileQueue) peekPagination(frontindex int64, size int64, page, pagesize uint64) ([][]byte, []int64, error) { + if page == 0 { + page = 1 + } + if pagesize == 0 { + pagesize = Default_Page_Size + } + + begin := (page - 1) * pagesize + end := begin + pagesize + + if begin > uint64(size) { // no data return + return [][]byte{}, []int64{}, nil + } + + if end > uint64(size) { + end = uint64(size) + pagesize = end - begin + } + + // fix the offset + begin = begin + uint64(frontindex) + end = end + uint64(frontindex) + + result := make([][]byte, pagesize) + indexs := make([]int64, pagesize) + + var index int = 0 + for i := begin; i < end; i++ { + bb, err := q.peek(int64(i)) + if err != nil { + return nil, nil, err + } + result[index] = bb + indexs[index] = int64(i) + index++ + } + return result, indexs, nil +} + // Skip the target n items to front index func (q *FileQueue) Skip(count int64) error { if q.IsEmpty() { diff --git a/filequeue_test.go b/filequeue_test.go index 2219c8a..bff1ccd 100644 --- a/filequeue_test.go +++ b/filequeue_test.go @@ -2,6 +2,7 @@ package bigqueue import ( "fmt" + "strconv" "strings" "testing" "time" @@ -431,6 +432,66 @@ func TestFileQueue_Status(t *testing.T) { } +// TestFileQueue_PeekPagination +func TestFileQueue_PeekPagination(t *testing.T) { + Convey("Test PeekPagination", t, func() { + + path := Tempfile() + defer clearFiles(path, "testqueue") + + var queue = new(FileQueue) + + err := queue.Open(path, "testqueue", nil) + if err != nil { + t.Error(err) + } + defer queue.Close() + + Convey("test PeekPagination on empty queue", func() { + data, indexs, err := queue.PeekPagination(0, 0) + So(err, ShouldBeNil) + So(data, ShouldBeEmpty) + So(indexs, ShouldBeEmpty) + + data, indexs, err = queue.PeekPagination(1, 1) + So(err, ShouldBeNil) + So(data, ShouldBeEmpty) + So(indexs, ShouldBeEmpty) + }) + + Convey("test PeekPagination on items small than pagesize", func() { + for i := 0; i < 5; i++ { // add value + _, err := queue.Enqueue([]byte("hello matthew " + strconv.Itoa(i))) + So(err, ShouldBeNil) + } + + data, indexs, err := queue.PeekPagination(0, 0) + So(err, ShouldBeNil) + So(len(data), ShouldEqual, 5) + So(string(data[4]), ShouldEqual, "hello matthew 4") + So(len(indexs), ShouldEqual, 5) + + data, indexs, err = queue.PeekPagination(1, 10) + So(err, ShouldBeNil) + So(len(data), ShouldEqual, 5) + So(string(data[4]), ShouldEqual, "hello matthew 4") + So(len(indexs), ShouldEqual, 5) + + data, indexs, err = queue.PeekPagination(2, 10) // large paing + So(err, ShouldBeNil) + So(data, ShouldBeEmpty) + So(indexs, ShouldBeEmpty) + + data, indexs, err = queue.PeekPagination(2, 2) + So(err, ShouldBeNil) + So(len(data), ShouldEqual, 2) + So(string(data[1]), ShouldEqual, "hello matthew 3") + So(len(indexs), ShouldEqual, 2) + }) + + }) +} + // tempfile returns a temporary file path. func Tempfile() string { return "./bin/temp"