Skip to content

Commit

Permalink
fix test and make code clean
Browse files Browse the repository at this point in the history
  • Loading branch information
lunny committed Dec 18, 2019
1 parent 0de12a2 commit 60ed63c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 73 deletions.
89 changes: 27 additions & 62 deletions modules/indexer/code/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,15 @@
package code

import (
"context"
"fmt"
"os"
"strconv"
"strings"
"sync"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/charset"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"

Expand Down Expand Up @@ -59,7 +56,7 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
// updates and bleve version updates. If index needs to be created (or
// re-created), returns (nil, nil)
func openIndexer(path string, latestVersion int) (bleve.Index, error) {
_, err := os.Stat(setting.Indexer.IssuePath)
_, err := os.Stat(path)
if err != nil && os.IsNotExist(err) {
return nil, nil
} else if err != nil {
Expand Down Expand Up @@ -137,34 +134,6 @@ const (
repoIndexerLatestVersion = 4
)

type bleveIndexerHolder struct {
index bleve.Index
mutex sync.RWMutex
cond *sync.Cond
}

func newBleveIndexerHolder() *bleveIndexerHolder {
b := &bleveIndexerHolder{}
b.cond = sync.NewCond(b.mutex.RLocker())
return b
}

func (r *bleveIndexerHolder) set(index bleve.Index) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.index = index
r.cond.Broadcast()
}

func (r *bleveIndexerHolder) get() bleve.Index {
r.mutex.RLock()
defer r.mutex.RUnlock()
if r.index == nil {
r.cond.Wait()
}
return r.index
}

// createRepoIndexer create a repo indexer if one does not already exist
func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) {
docMapping := bleve.NewDocumentMapping()
Expand Down Expand Up @@ -222,51 +191,47 @@ var (

// BleveIndexer represents a bleve indexer implementation
type BleveIndexer struct {
indexDir string
indexerHolder *bleveIndexerHolder
indexDir string
indexer bleve.Index
}

// NewBleveIndexer creates a new bleve local indexer
func NewBleveIndexer(indexDir string) *BleveIndexer {
return &BleveIndexer{
indexDir: indexDir,
indexerHolder: newBleveIndexerHolder(),
func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
indexer := &BleveIndexer{
indexDir: indexDir,
}
created, err := indexer.init()
return indexer, created, err
}

// Init init the indexer
func (b *BleveIndexer) Init() (bool, error) {
indexer, err := openIndexer(b.indexDir, repoIndexerLatestVersion)
func (b *BleveIndexer) init() (bool, error) {
var err error
b.indexer, err = openIndexer(b.indexDir, repoIndexerLatestVersion)
if err != nil {
log.Fatal("openIndexer: %v", err)
return false, err
}
if indexer != nil {
b.indexerHolder.set(indexer)
b.closeAtTerminate()
if b.indexer != nil {
return false, nil
}

indexer, err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion)
b.indexer, err = createRepoIndexer(b.indexDir, repoIndexerLatestVersion)
if err != nil {
return false, err
}
b.indexerHolder.set(indexer)
b.closeAtTerminate()

return true, nil
}

func (b *BleveIndexer) closeAtTerminate() {
graceful.GetManager().RunAtTerminate(context.Background(), func() {
log.Debug("Closing repo indexer")
indexer := b.indexerHolder.get()
if indexer != nil {
err := indexer.Close()
if err != nil {
log.Error("Error whilst closing the repository indexer: %v", err)
}
func (b *BleveIndexer) Close() {
log.Debug("Closing repo indexer")
if b.indexer != nil {
err := b.indexer.Close()
if err != nil {
log.Error("Error whilst closing the repository indexer: %v", err)
}
log.Info("PID: %d Repository Indexer closed", os.Getpid())
})
}
log.Info("PID: %d Repository Indexer closed", os.Getpid())
}

// Index indexes the data
Expand All @@ -287,7 +252,7 @@ func (b *BleveIndexer) Index(repoID int64) error {
return nil
}

batch := rupture.NewFlushingBatch(b.indexerHolder.get(), maxBatchSize)
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
for _, update := range changes.Updates {
if err := addUpdate(update, repo, batch); err != nil {
return err
Expand All @@ -308,11 +273,11 @@ func (b *BleveIndexer) Index(repoID int64) error {
func (b *BleveIndexer) Delete(repoID int64) error {
query := numericEqualityQuery(repoID, "RepoID")
searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false)
result, err := b.indexerHolder.get().Search(searchRequest)
result, err := b.indexer.Search(searchRequest)
if err != nil {
return err
}
batch := rupture.NewFlushingBatch(b.indexerHolder.get(), maxBatchSize)
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
for _, hit := range result.Hits {
if err = batch.Delete(hit.ID); err != nil {
return err
Expand Down Expand Up @@ -348,7 +313,7 @@ func (b *BleveIndexer) Search(repoIDs []int64, keyword string, page, pageSize in
searchRequest.Fields = []string{"Content", "RepoID"}
searchRequest.IncludeLocations = true

result, err := b.indexerHolder.get().Search(searchRequest)
result, err := b.indexer.Search(searchRequest)
if err != nil {
return 0, nil, err
}
Expand Down
17 changes: 11 additions & 6 deletions modules/indexer/code/bleve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"

"github.com/stretchr/testify/assert"
)
Expand All @@ -22,13 +24,16 @@ func TestIndexAndSearch(t *testing.T) {
models.PrepareTestEnv(t)

dir := "./bleve.index"
defer os.RemoveAll(dir)
indexer := NewBleveIndexer(dir)
os.RemoveAll(dir)

_, err := indexer.Init()
assert.NoError(t, err)
setting.Indexer.RepoIndexerEnabled = true
idx, _, err := NewBleveIndexer(dir)
if err != nil {
idx.Close()
log.Fatal("indexer.Init: %v", err)
}

err = indexer.Index(1)
err = idx.Index(1)
assert.NoError(t, err)

var (
Expand All @@ -52,7 +57,7 @@ func TestIndexAndSearch(t *testing.T) {
)

for _, kw := range keywords {
total, res, err := indexer.Search(nil, kw.Keyword, 1, 10)
total, res, err := idx.Search(nil, kw.Keyword, 1, 10)
assert.NoError(t, err)
assert.EqualValues(t, len(kw.IDs), total)

Expand Down
10 changes: 6 additions & 4 deletions modules/indexer/code/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type SearchResult struct {

// Indexer defines an interface to indexer issues contents
type Indexer interface {
Init() (bool, error)
Index(repoID int64) error
Delete(repoID int64) error
Search(repoIDs []int64, keyword string, page, pageSize int) (int64, []*SearchResult, error)
Close()
}

// Init initialize the repo indexer
Expand All @@ -43,13 +43,15 @@ func Init() {
go func() {
start := time.Now()
log.Info("Initializing Repository Indexer")
indexer = NewBleveIndexer(setting.Indexer.RepoPath)
created, err := indexer.Init()
var created bool
var err error
indexer, created, err = NewBleveIndexer(setting.Indexer.RepoPath)
if err != nil {
indexer.Close()
log.Fatal("indexer.Init: %v", err)
}

go processRepoIndexerOperationQueue()
go processRepoIndexerOperationQueue(indexer)

if created {
go populateRepoIndexer()
Expand Down
4 changes: 3 additions & 1 deletion modules/indexer/code/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ type repoIndexerOperation struct {

var repoIndexerOperationQueue chan repoIndexerOperation

func processRepoIndexerOperationQueue() {
func processRepoIndexerOperationQueue(indexer Indexer) {
defer indexer.Close()

repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength)
for {
select {
Expand Down

0 comments on commit 60ed63c

Please sign in to comment.