Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor issue indexer #5363

Merged
merged 17 commits into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions models/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,21 @@ func (issue *Issue) LoadPullRequest() error {
}

func (issue *Issue) loadComments(e Engine) (err error) {
return issue.loadCommentsByType(e, CommentTypeUnknown)
}

// LoadDiscussComments loads discuss comments
func (issue *Issue) LoadDiscussComments() error {
return issue.loadCommentsByType(x, CommentTypeComment)
}

func (issue *Issue) loadCommentsByType(e Engine, tp CommentType) (err error) {
if issue.Comments != nil {
return nil
}
issue.Comments, err = findComments(e, FindCommentsOptions{
IssueID: issue.ID,
Type: CommentTypeUnknown,
Type: tp,
})
return err
}
Expand Down Expand Up @@ -681,7 +690,6 @@ func updateIssueCols(e Engine, issue *Issue, cols ...string) error {
if _, err := e.ID(issue.ID).Cols(cols...).Update(issue); err != nil {
return err
}
UpdateIssueIndexerCols(issue.ID, cols...)
return nil
}

Expand Down Expand Up @@ -1217,6 +1225,12 @@ func getIssuesByIDs(e Engine, issueIDs []int64) ([]*Issue, error) {
return issues, e.In("id", issueIDs).Find(&issues)
}

func getIssueIDsByRepoID(e Engine, repoID int64) ([]int64, error) {
var ids = make([]int64, 0, 10)
err := e.Table("issue").Where("repo_id = ?", repoID).Find(&ids)
return ids, err
}

// GetIssuesByIDs return issues with the given IDs.
func GetIssuesByIDs(issueIDs []int64) ([]*Issue, error) {
return getIssuesByIDs(x, issueIDs)
Expand Down
2 changes: 2 additions & 0 deletions models/issue_comment.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,7 @@ func UpdateComment(doer *User, c *Comment, oldContent string) error {
if err := c.LoadIssue(); err != nil {
return err
}

if err := c.Issue.LoadAttributes(); err != nil {
return err
}
Expand Down Expand Up @@ -1093,6 +1094,7 @@ func DeleteComment(doer *User, comment *Comment) error {
if err := comment.LoadIssue(); err != nil {
return err
}

if err := comment.Issue.LoadAttributes(); err != nil {
return err
}
Expand Down
166 changes: 91 additions & 75 deletions models/issue_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,60 @@ package models
import (
Copy link
Contributor

@zeripath zeripath Feb 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably out of scope for this PR, but I wonder if we really want this functionality in models/ rather than in modules/indexer/. (Obviously out of scope: I think we probably need to restructure models to make it less of a grab-bag of everything and try to move struct into perhaps a core-models and peripheral/plugin system. )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to do that, but it will change more files. I prefer to do that on another refactor PRs.

"fmt"

"code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/indexer/issues"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
)

// issueIndexerUpdateQueue queue of issue ids to be updated
var issueIndexerUpdateQueue chan int64
var (
// issueIndexerUpdateQueue queue of issue ids to be updated
issueIndexerUpdateQueue issues.Queue
issueIndexer issues.Indexer
)

// InitIssueIndexer initialize issue indexer
func InitIssueIndexer() {
indexer.InitIssueIndexer(populateIssueIndexer)
issueIndexerUpdateQueue = make(chan int64, setting.Indexer.UpdateQueueLength)
go processIssueIndexerUpdateQueue()
func InitIssueIndexer() error {
var populate bool
switch setting.Indexer.IssueType {
case "bleve":
issueIndexer = issues.NewBleveIndexer(setting.Indexer.IssuePath)
exist, err := issueIndexer.Init()
if err != nil {
return err
}
populate = !exist
default:
return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType)
}

var err error
switch setting.Indexer.IssueIndexerQueueType {
case setting.LevelQueueType:
issueIndexerUpdateQueue, err = issues.NewLevelQueue(
issueIndexer,
setting.Indexer.IssueIndexerQueueDir,
setting.Indexer.IssueIndexerQueueBatchNumber)
if err != nil {
return err
}
case setting.ChannelQueueType:
issueIndexerUpdateQueue = issues.NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber)
default:
return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType)
}

go issueIndexerUpdateQueue.Run()

if populate {
go populateIssueIndexer()
}

return nil
}

// populateIssueIndexer populate the issue indexer with issue data
func populateIssueIndexer() error {
batch := indexer.IssueIndexerBatch()
func populateIssueIndexer() {
for page := 1; ; page++ {
repos, _, err := SearchRepositoryByName(&SearchRepoOptions{
Page: page,
Expand All @@ -35,98 +70,79 @@ func populateIssueIndexer() error {
Collaborate: util.OptionalBoolFalse,
})
if err != nil {
return fmt.Errorf("Repositories: %v", err)
log.Error(4, "SearchRepositoryByName: %v", err)
continue
}
if len(repos) == 0 {
return batch.Flush()
return
}

for _, repo := range repos {
issues, err := Issues(&IssuesOptions{
is, err := Issues(&IssuesOptions{
RepoIDs: []int64{repo.ID},
IsClosed: util.OptionalBoolNone,
IsPull: util.OptionalBoolNone,
})
if err != nil {
return err
log.Error(4, "Issues: %v", err)
continue
}
if err = IssueList(issues).LoadComments(); err != nil {
return err
if err = IssueList(is).LoadDiscussComments(); err != nil {
log.Error(4, "LoadComments: %v", err)
continue
}
for _, issue := range issues {
if err := issue.update().AddToFlushingBatch(batch); err != nil {
return err
}
for _, issue := range is {
UpdateIssueIndexer(issue)
}
}
}
}

func processIssueIndexerUpdateQueue() {
batch := indexer.IssueIndexerBatch()
for {
var issueID int64
select {
case issueID = <-issueIndexerUpdateQueue:
default:
// flush whatever updates we currently have, since we
// might have to wait a while
if err := batch.Flush(); err != nil {
log.Error(4, "IssueIndexer: %v", err)
}
issueID = <-issueIndexerUpdateQueue
}
issue, err := GetIssueByID(issueID)
if err != nil {
log.Error(4, "GetIssueByID: %v", err)
} else if err = issue.update().AddToFlushingBatch(batch); err != nil {
log.Error(4, "IssueIndexer: %v", err)
}
}
}

func (issue *Issue) update() indexer.IssueIndexerUpdate {
comments := make([]string, 0, 5)
// UpdateIssueIndexer add/update an issue to the issue indexer
func UpdateIssueIndexer(issue *Issue) {
var comments []string
for _, comment := range issue.Comments {
if comment.Type == CommentTypeComment {
comments = append(comments, comment.Content)
}
}
return indexer.IssueIndexerUpdate{
IssueID: issue.ID,
Data: &indexer.IssueIndexerData{
RepoID: issue.RepoID,
Title: issue.Title,
Content: issue.Content,
Comments: comments,
},
}
issueIndexerUpdateQueue.Push(&issues.IndexerData{
ID: issue.ID,
RepoID: issue.RepoID,
Title: issue.Title,
Content: issue.Content,
Comments: comments,
})
}

// updateNeededCols whether a change to the specified columns requires updating
// the issue indexer
func updateNeededCols(cols []string) bool {
for _, col := range cols {
switch col {
case "name", "content":
return true
}
// DeleteRepoIssueIndexer deletes repo's all issues indexes
func DeleteRepoIssueIndexer(repo *Repository) {
var ids []int64
ids, err := getIssueIDsByRepoID(x, repo.ID)
if err != nil {
log.Error(4, "getIssueIDsByRepoID failed: %v", err)
return
}

if len(ids) <= 0 {
return
}
return false
}

// UpdateIssueIndexerCols update an issue in the issue indexer, given changes
// to the specified columns
func UpdateIssueIndexerCols(issueID int64, cols ...string) {
updateNeededCols(cols)
issueIndexerUpdateQueue.Push(&issues.IndexerData{
IDs: ids,
IsDelete: true,
})
}

// UpdateIssueIndexer add/update an issue to the issue indexer
func UpdateIssueIndexer(issueID int64) {
select {
case issueIndexerUpdateQueue <- issueID:
default:
go func() {
issueIndexerUpdateQueue <- issueID
}()
// SearchIssuesByKeyword search issue ids by keywords and repo id
func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) {
var issueIDs []int64
res, err := issueIndexer.Search(keyword, repoID, 1000, 0)
if err != nil {
return nil, err
}
for _, r := range res.Hits {
issueIDs = append(issueIDs, r.ID)
}
return issueIDs, nil
}
16 changes: 13 additions & 3 deletions models/issue_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

package models

import "fmt"
import (
"fmt"

"github.com/go-xorm/builder"
)

// IssueList defines a list of issues
type IssueList []*Issue
Expand Down Expand Up @@ -338,7 +342,7 @@ func (issues IssueList) loadAttachments(e Engine) (err error) {
return nil
}

func (issues IssueList) loadComments(e Engine) (err error) {
func (issues IssueList) loadComments(e Engine, cond builder.Cond) (err error) {
if len(issues) == 0 {
return nil
}
Expand All @@ -354,6 +358,7 @@ func (issues IssueList) loadComments(e Engine) (err error) {
rows, err := e.Table("comment").
Join("INNER", "issue", "issue.id = comment.issue_id").
In("issue.id", issuesIDs[:limit]).
Where(cond).
Rows(new(Comment))
if err != nil {
return err
Expand Down Expand Up @@ -479,5 +484,10 @@ func (issues IssueList) LoadAttachments() error {

// LoadComments loads comments
func (issues IssueList) LoadComments() error {
return issues.loadComments(x)
return issues.loadComments(x, builder.NewCond())
}

// LoadDiscussComments loads discuss comments
func (issues IssueList) LoadDiscussComments() error {
return issues.loadComments(x, builder.Eq{"comment.type": CommentTypeComment})
}
14 changes: 0 additions & 14 deletions models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"strings"

"code.gitea.io/gitea/modules/log"
Expand Down Expand Up @@ -158,19 +157,6 @@ func LoadConfigs() {
DbCfg.SSLMode = sec.Key("SSL_MODE").MustString("disable")
DbCfg.Path = sec.Key("PATH").MustString("data/gitea.db")
DbCfg.Timeout = sec.Key("SQLITE_TIMEOUT").MustInt(500)

sec = setting.Cfg.Section("indexer")
setting.Indexer.IssuePath = sec.Key("ISSUE_INDEXER_PATH").MustString(path.Join(setting.AppDataPath, "indexers/issues.bleve"))
if !filepath.IsAbs(setting.Indexer.IssuePath) {
setting.Indexer.IssuePath = path.Join(setting.AppWorkPath, setting.Indexer.IssuePath)
}
setting.Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false)
setting.Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(setting.AppDataPath, "indexers/repos.bleve"))
if !filepath.IsAbs(setting.Indexer.RepoPath) {
setting.Indexer.RepoPath = path.Join(setting.AppWorkPath, setting.Indexer.RepoPath)
}
setting.Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20)
setting.Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024)
}

// parsePostgreSQLHostPort parses given input in various forms defined in
Expand Down
4 changes: 4 additions & 0 deletions models/unit_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func MainTest(m *testing.M, pathToGiteaRoot string) {
fatalTestError("Error creating test engine: %v\n", err)
}

if err = InitIssueIndexer(); err != nil {
fatalTestError("Error InitIssueIndexer: %v\n", err)
}

setting.AppURL = "https://try.gitea.io/"
setting.RunUser = "runuser"
setting.SSH.Port = 3000
Expand Down
Loading