Skip to content

Commit

Permalink
add redis task queue support and improve docs
Browse files Browse the repository at this point in the history
  • Loading branch information
lunny committed Mar 1, 2019
1 parent 3de99cb commit b7f4ee9
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,4 @@ prime/
*.snap
*.snap-build
*_source.tar.bz2
.DS_Store
5 changes: 5 additions & 0 deletions custom/conf/app.ini.sample
Original file line number Diff line number Diff line change
Expand Up @@ -716,3 +716,8 @@ IS_INPUT_FILE = false
ENABLED = false
; If you want to add authorization, specify a token here
TOKEN =

[task]
QUEUE_TYPE = redis
QUEUE_LENGTH = 1000
QUEUE_CONN_STR = "addrs=127.0.0.1:6379 db=0"
7 changes: 7 additions & 0 deletions docs/content/doc/advanced/config-cheat-sheet.en-us.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,15 @@ Two special environment variables are passed to the render command:
- `GITEA_PREFIX_SRC`, which contains the current URL prefix in the `src` path tree. To be used as prefix for links.
- `GITEA_PREFIX_RAW`, which contains the current URL prefix in the `raw` path tree. To be used as prefix for image paths.

## Task (`task`)

- `QUEUE_TYPE`: **channel**: Task queue type, could be `channel` or `redis`.
- `QUEUE_LENGTH`: **1000**: Task queue length, available only when `QUEUE_TYPE` is `channel`.
- `QUEUE_CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Task queue connction string, available only when `QUEUE_TYPE` is `redis`. If there is a password of redis, use `addrs=127.0.0.1:6379 password=123 db=0`.

## Other (`other`)

- `SHOW_FOOTER_BRANDING`: **false**: Show Gitea branding in the footer.
- `SHOW_FOOTER_VERSION`: **true**: Show Gitea version information in the footer.
- `SHOW_FOOTER_TEMPLATE_LOAD_TIME`: **true**: Show time of template execution in the footer.

4 changes: 4 additions & 0 deletions docs/content/doc/advanced/config-cheat-sheet.zh-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,11 @@ IS_INPUT_FILE = false
- RENDER_COMMAND: 工具的命令行命令及参数。
- IS_INPUT_FILE: 输入方式是最后一个参数为文件路径还是从标准输入读取。

## Task (`task`)

- `QUEUE_TYPE`: **channel**: 任务队列类型,可以为 `channel``redis`
- `QUEUE_LENGTH`: **1000**: 任务队列长度,当 `QUEUE_TYPE``channel` 时有效。
- `QUEUE_CONN_STR`: **addrs=127.0.0.1:6379 db=0**: 任务队列连接字符串,当 `QUEUE_TYPE``redis` 时有效。如果redis有密码,则可以 `addrs=127.0.0.1:6379 password=123 db=0`

## Other (`other`)

Expand Down
1 change: 1 addition & 0 deletions modules/setting/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
const (
LevelQueueType = "levelqueue"
ChannelQueueType = "channel"
RedisQueueType = "redis"
)

var (
Expand Down
11 changes: 7 additions & 4 deletions modules/setting/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ package setting
var (
// Task settings
Task = struct {
QueueType string
QueueLength int
QueueType string
QueueLength int
QueueConnStr string
}{
QueueType: ChannelQueueType,
QueueLength: 1000,
QueueType: ChannelQueueType,
QueueLength: 1000,
QueueConnStr: "addrs=127.0.0.1:6379 db=0",
}
)

func newTaskService() {
sec := Cfg.Section("task")
Task.QueueType = sec.Key("QUEUE_TYPE").MustString(ChannelQueueType)
Task.QueueLength = sec.Key("QUEUE_LENGTH").MustInt(1000)
Task.QueueConnStr = sec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0")
}
119 changes: 119 additions & 0 deletions modules/task/queue_redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package task

import (
"encoding/json"
"errors"
"strconv"
"strings"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"

"github.com/go-redis/redis"
)

var (
_ Queue = &RedisQueue{}
)

type redisClient interface {
RPush(key string, args ...interface{}) *redis.IntCmd
LPop(key string) *redis.StringCmd
Ping() *redis.StatusCmd
}

// RedisQueue redis queue
type RedisQueue struct {
client redisClient
queueName string
}

func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) {
fields := strings.Fields(connStr)
for _, f := range fields {
items := strings.SplitN(f, "=", 2)
if len(items) < 2 {
continue
}
switch strings.ToLower(items[0]) {
case "addrs":
addrs = items[1]
case "password":
password = items[1]
case "db":
dbIdx, err = strconv.Atoi(items[1])
if err != nil {
return
}
}
}
return
}

// NewRedisQueue creates single redis or cluster redis queue
func NewRedisQueue(addrs string, password string, dbIdx int) (*RedisQueue, error) {
dbs := strings.Split(addrs, ",")
var queue = RedisQueue{
queueName: "task_queue",
}
if len(dbs) == 0 {
return nil, errors.New("no redis host found")
} else if len(dbs) == 1 {
queue.client = redis.NewClient(&redis.Options{
Addr: strings.TrimSpace(dbs[0]), // use default Addr
Password: password, // no password set
DB: dbIdx, // use default DB
})
} else {
// cluster will ignore db
queue.client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: dbs,
Password: password,
})
}
if err := queue.client.Ping().Err(); err != nil {
return nil, err
}
return &queue, nil
}

func (r *RedisQueue) Run() error {
for {
bs, err := r.client.LPop(r.queueName).Bytes()
if err != nil {
if err != redis.Nil {
log.Error(4, "LPop failed: %v", err)
}
time.Sleep(time.Millisecond * 100)
continue
}

var task models.Task
err = json.Unmarshal(bs, &task)
if err != nil {
log.Error(4, "Unmarshal task failed: %s", err.Error())
} else {
err = Run(&task)
if err != nil {
log.Error(4, "Run task failed: %s", err.Error())
}
}

time.Sleep(time.Millisecond * 100)
}
return nil
}

// Push implements Queue
func (r *RedisQueue) Push(task *models.Task) error {
bs, err := json.Marshal(task)
if err != nil {
return err
}
return r.client.RPush(r.queueName, bs).Err()
}
11 changes: 10 additions & 1 deletion modules/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,17 @@ func Init() error {
switch setting.Task.QueueType {
case setting.ChannelQueueType:
taskQueue = NewChannelQueue(setting.Task.QueueLength)
case setting.RedisQueueType:
addrs, pass, idx, err := parseConnStr(setting.Task.QueueConnStr)
if err != nil {
return err
}
taskQueue, err = NewRedisQueue(addrs, pass, idx)
if err != nil {
return err
}
default:
return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType)
return fmt.Errorf("Unsupported task queue type: %v", setting.Task.QueueType)
}

go taskQueue.Run()
Expand Down
Binary file modified public/img/loading.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 2 additions & 3 deletions public/js/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,14 @@ function updateIssuesMeta(url, action, issueIds, elementId) {
},
success: resolve
})
}
})
}

function initRepoStatusChecker() {
console.log("initRepoStatusChecker")
var migrating = $("#repo_migrating");
if (migrating) {
var repo_name = migrating.attr('repo');
if (typeof repo_nane === 'undefined') {
if (typeof repo_name === 'undefined') {
return
}
$.ajax({
Expand Down
2 changes: 1 addition & 1 deletion templates/repo/migrating.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
{{template "base/alert" .}}
<div class="home">
<div class="ui stackable middle very relaxed page grid">
<div id="reop_migrating" class="sixteen wide center aligned centered column" repo="{{.Repo.Repository.FullName}}">
<div id="repo_migrating" class="sixteen wide center aligned centered column" repo="{{.Repo.Repository.FullName}}">
<div>
<img src="{{AppSubUrl}}/img/loading.png"/>
</div>
Expand Down

0 comments on commit b7f4ee9

Please sign in to comment.