Skip to content

Commit

Permalink
Status method return CanGC status #16
Browse files Browse the repository at this point in the history
  • Loading branch information
xiemalin committed Jul 21, 2021
1 parent 9ab5320 commit 955a044
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 21 deletions.
4 changes: 1 addition & 3 deletions filefanoutqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,11 @@ func (q *FileFanoutQueue) Status(fanoutID int64) *QueueFilesStatus {
return nil
}

queueFilesStatus := q.fileQueue.Status()
queueFilesStatus := q.fileQueue.status(qf.frontIndex, -1, -1)
if queueFilesStatus == nil {
return nil
}

queueFilesStatus.FrontIndex = qf.frontIndex

return queueFilesStatus
}

Expand Down
18 changes: 13 additions & 5 deletions filefanoutqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestFanoutQueueSubscribe(t *testing.T) {
fanoutIDCount2++
})

time.Sleep(time.Duration(2) * time.Second)
time.Sleep(time.Duration(3) * time.Second)

if fanoutIDCount1 != count {
t.Error("subscribe id=", fanoutID, " count should be ", count, " but actually is ", fanoutIDCount1)
Expand All @@ -293,7 +293,7 @@ func TestFanoutQueueSubscribe(t *testing.T) {
func TestFanoutQueue_Status(t *testing.T) {
Convey("Test empty queue status result", t, func() {
path := Tempfile()
defer clearFiles(path, "testqueue")
defer clearFiles(path, "fanoutqueue")
fanoutID := int64(100)

defer clearFrontIndexFiles(path, "fanoutqueue", fanoutID)
Expand All @@ -315,16 +315,16 @@ func TestFanoutQueue_Status(t *testing.T) {
So(qFileStatus.HeadDataPageIndex, ShouldEqual, 0)
So(qFileStatus.HeadDataItemOffset, ShouldEqual, 0)

So(qFileStatus.IndexFileList, ShouldBeEmpty)
So(qFileStatus.DataFileList, ShouldBeEmpty)
So(len(qFileStatus.IndexFileList), ShouldEqual, 1)
So(len(qFileStatus.DataFileList), ShouldEqual, 1)
So(qFileStatus.MetaFileInfo, ShouldNotBeNil)
So(qFileStatus.FrontFileInfo, ShouldNotBeNil)

})

Convey("Test non-empty queue status result", t, func() {
path := Tempfile()
defer clearFiles(path, "testqueue")
defer clearFiles(path, "fanoutqueue")
fanoutID := int64(100)

defer clearFrontIndexFiles(path, "fanoutqueue", fanoutID)
Expand Down Expand Up @@ -353,7 +353,15 @@ func TestFanoutQueue_Status(t *testing.T) {
So(qFileStatus.HeadDataItemOffset, ShouldEqual, dataLen)

So(len(qFileStatus.IndexFileList), ShouldEqual, 1)

fileInfo := qFileStatus.IndexFileList[0]
So(fileInfo.CanGC, ShouldBeFalse)
So(fileInfo.FileIndex, ShouldEqual, 0)

So(len(qFileStatus.DataFileList), ShouldEqual, 1)
fileInfo = qFileStatus.IndexFileList[0]
So(fileInfo.CanGC, ShouldBeFalse)
So(fileInfo.FileIndex, ShouldEqual, 0)
So(qFileStatus.MetaFileInfo, ShouldNotBeNil)
So(qFileStatus.FrontFileInfo, ShouldNotBeNil)

Expand Down
52 changes: 41 additions & 11 deletions filequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,30 +133,60 @@ type QueueFileInfo struct {
Path string
Size int64
FileIndex int64
CanGC bool
}

// Status get status info from current queue
func (q *FileQueue) Status() *QueueFilesStatus {
result := QueueFilesStatus{FrontIndex: q.frontIndex, HeadIndex: q.headIndex, TailIndex: q.tailIndex,

indexPageIndex := q.frontIndex >> uint(q.options.IndexItemsPerPage)

bb, err := q.getIndexItemArray(q.frontIndex)
var dataPageIndex int64
if err != nil {
dataPageIndex = -1
} else {
dataPageIndex = BytesToInt(bb[:8])
}

r := q.status(q.frontIndex, indexPageIndex, dataPageIndex)

// gc status
return r

}

func (q *FileQueue) status(frontIndex, currentIndexPageIndex, currentDataPageIndex int64) *QueueFilesStatus {
result := QueueFilesStatus{FrontIndex: frontIndex, HeadIndex: q.headIndex, TailIndex: q.tailIndex,
HeadDataPageIndex: q.headDataPageIndex, HeadDataItemOffset: q.headDataItemOffset}

result.IndexFileList = wrapFileInfos(q.indexFile)
result.DataFileList = wrapFileInfos(q.dataFile)
indexPageIndex := q.headIndex >> uint(q.options.IndexItemsPerPage)
result.IndexFileList = wrapFileInfos(q.indexFile, indexPageIndex, currentIndexPageIndex)

result.DataFileList = wrapFileInfos(q.dataFile, q.headDataPageIndex, currentDataPageIndex)
result.MetaFileInfo, _ = wrapFileInfo(0, q.metaFile)
result.FrontFileInfo, _ = wrapFileInfo(0, q.frontFile)
return &result

}

// wrapFileInfos wrap queue file info from DBFactory
func wrapFileInfos(factory *DBFactory) []*QueueFileInfo {
indexFileInfos := make([]*QueueFileInfo, len(factory.dbMap))
var i int64 = 0
for idx, db := range factory.dbMap {
info, err := wrapFileInfo(idx, db)
func wrapFileInfos(factory *DBFactory, maxFileNo, currentFileNo int64) []*QueueFileInfo {
indexFileInfos := make([]*QueueFileInfo, 0)
for i := maxFileNo; i >= 0; i-- {
filePath := factory.getFilePath(i)
_, err := os.Open(filePath)
if err != nil {
continue
}
db, err := factory.acquireDB(i)
if err == nil {
indexFileInfos[i] = info
i++
info, err := wrapFileInfo(i, db)
if i < currentFileNo {
info.CanGC = true
}
if err == nil {
indexFileInfos = append(indexFileInfos, info)
}
}
}
return indexFileInfos
Expand Down
12 changes: 10 additions & 2 deletions filequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ func TestFileQueue_Status(t *testing.T) {
So(qFileStatus.HeadDataPageIndex, ShouldEqual, 0)
So(qFileStatus.HeadDataItemOffset, ShouldEqual, 0)

So(qFileStatus.IndexFileList, ShouldBeEmpty)
So(qFileStatus.DataFileList, ShouldBeEmpty)
So(len(qFileStatus.IndexFileList), ShouldEqual, 1)
So(len(qFileStatus.DataFileList), ShouldEqual, 1)
So(qFileStatus.MetaFileInfo, ShouldNotBeNil)
So(qFileStatus.FrontFileInfo, ShouldNotBeNil)

Expand Down Expand Up @@ -408,7 +408,15 @@ func TestFileQueue_Status(t *testing.T) {
So(qFileStatus.HeadDataItemOffset, ShouldEqual, dataLen)

So(len(qFileStatus.IndexFileList), ShouldEqual, 1)

fileInfo := qFileStatus.IndexFileList[0]
So(fileInfo.CanGC, ShouldBeFalse)
So(fileInfo.FileIndex, ShouldEqual, 0)

So(len(qFileStatus.DataFileList), ShouldEqual, 1)
fileInfo = qFileStatus.IndexFileList[0]
So(fileInfo.CanGC, ShouldBeFalse)
So(fileInfo.FileIndex, ShouldEqual, 0)
So(qFileStatus.MetaFileInfo, ShouldNotBeNil)
So(qFileStatus.FrontFileInfo, ShouldNotBeNil)

Expand Down
1 change: 1 addition & 0 deletions mmapfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type DBFactory struct {
lockMap map[int64]*sync.Mutex

// DB mapping with file index no
dbMap map[int64]*DB

filePrefix string
Expand Down

0 comments on commit 955a044

Please sign in to comment.