diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini index bdc42480e443e..1753ed2330706 100644 --- a/custom/conf/app.example.ini +++ b/custom/conf/app.example.ini @@ -769,10 +769,10 @@ PATH = ;; Global limit of repositories per user, applied at creation time. -1 means no limit ;MAX_CREATION_LIMIT = -1 ;; -;; Mirror sync queue length, increase if mirror syncing starts hanging +;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead) ;MIRROR_QUEUE_LENGTH = 1000 ;; -;; Patch test queue length, increase if pull request patch testing starts hanging +;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead) ;PULL_REQUEST_QUEUE_LENGTH = 1000 ;; ;; Preferred Licenses to place at the top of the List diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md index 251f6bd51a9df..6421457332e31 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md +++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md @@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`. - `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create. - `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user, `-1` means no limit. -- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it +- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`. as large as possible. Use caution when editing this value. - `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch - testing starts hanging. + testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`. - `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at the top of the list. Name must match file name in options/license or custom/options/license. - `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the @@ -382,6 +382,8 @@ relation to port exhaustion. ## Queue (`queue` and `queue.*`) +Configuration at `[queue]` will set defaults for all queues with overrides for individual queues at `[queue.*]`. + - `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy` - `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.) - `LENGTH`: **20**: Maximal queue size before channel queues block @@ -400,6 +402,22 @@ relation to port exhaustion. - `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long. - `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost. +Gitea creates the following non-unique queues: + +- `code_indexer` +- `issue_indexer` +- `notification-service` +- `task` +- `mail` +- `push_update` + +And the following unique queues: + +- `repo_stats_update` +- `repo-archive` +- `mirror` +- `pr_patch_checker` + ## Admin (`admin`) - `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled @@ -588,7 +606,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type command or full path). - `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments. - `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail -- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. +- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]` ## Cache (`cache`) diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 5bec67c4d355c..3c8896cba644e 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -9,6 +9,7 @@ import ( "fmt" "sync" + "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" ) @@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration type ChannelUniqueQueue struct { *WorkerPool lock sync.Mutex - table map[Data]bool + table map[string]bool shutdownCtx context.Context shutdownCtxCancel context.CancelFunc terminateCtx context.Context @@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, + table: map[string]bool{}, shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -65,9 +66,14 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { - queue.lock.Lock() - delete(queue.table, datum) - queue.lock.Unlock() + bs, err := json.Marshal(datum) + if err != nil { + log.Error("unable to marshal data: %v", datum) + } else { + queue.lock.Lock() + delete(queue.table, string(bs)) + queue.lock.Unlock() + } handle(datum) } }, config.WorkerPoolConfiguration) @@ -94,6 +100,11 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) } + + bs, err := json.Marshal(data) + if err != nil { + return err + } q.lock.Lock() locked := true defer func() { @@ -101,16 +112,16 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { q.lock.Unlock() } }() - if _, ok := q.table[data]; ok { + if _, ok := q.table[string(bs)]; ok { return ErrAlreadyInQueue } // FIXME: We probably need to implement some sort of limit here // If the downstream queue blocks this table will grow without limit - q.table[data] = true + q.table[string(bs)] = true if fn != nil { err := fn() if err != nil { - delete(q.table, data) + delete(q.table, string(bs)) return err } } @@ -122,9 +133,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { // Has checks if the data is in the queue func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { + bs, err := json.Marshal(data) + if err != nil { + return false, err + } + q.lock.Lock() defer q.lock.Unlock() - _, has := q.table[data] + _, has := q.table[string(bs)] return has, nil } diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 76b7dc1faf70c..2e54fece415a1 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -158,4 +158,16 @@ func NewQueueService() { if _, ok := sectionMap["LENGTH"]; !ok { _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength)) } + + // Handle the old mirror queue configuration + // Please note this will be a unique queue + section = Cfg.Section("queue.mirror") + sectionMap = map[string]bool{} + for _, key := range section.Keys() { + sectionMap[key.Name()] = true + } + if _, ok := sectionMap["LENGTH"]; !ok { + _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.MirrorQueueLength)) + } + } diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 7a3e37d993597..3373b7f0a148a 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -7,18 +7,43 @@ package mirror import ( "context" "fmt" - "strconv" - "strings" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/sync" ) -// mirrorQueue holds an UniqueQueue object of the mirror -var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength) +var mirrorQueue queue.UniqueQueue + +// RequestType type of mirror request +type RequestType int + +const ( + // PullRequestType for pull mirrors + PullRequestType RequestType = iota + // PushRequestType for push mirrors + PushRequestType +) + +// Request for the mirror queue +type Request struct { + Type RequestType + RepoID int64 +} + +// doMirror causes this request to mirror itself +func doMirror(ctx context.Context, req *Request) { + switch req.Type { + case PushRequestType: + _ = SyncPushMirror(ctx, req.RepoID) + case PullRequestType: + _ = SyncPullMirror(ctx, req.RepoID) + default: + log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID) + } +} // Update checks and updates mirror repositories. func Update(ctx context.Context) error { @@ -29,19 +54,25 @@ func Update(ctx context.Context) error { log.Trace("Doing: Update") handler := func(idx int, bean interface{}) error { - var item string + var item Request if m, ok := bean.(*models.Mirror); ok { if m.Repo == nil { log.Error("Disconnected mirror found: %d", m.ID) return nil } - item = fmt.Sprintf("pull %d", m.RepoID) + item = Request{ + Type: PullRequestType, + RepoID: m.RepoID, + } } else if m, ok := bean.(*models.PushMirror); ok { if m.Repo == nil { log.Error("Disconnected push-mirror found: %d", m.ID) return nil } - item = fmt.Sprintf("push %d", m.ID) + item = Request{ + Type: PushRequestType, + RepoID: m.RepoID, + } } else { log.Error("Unknown bean: %v", bean) return nil @@ -51,8 +82,7 @@ func Update(ctx context.Context) error { case <-ctx.Done(): return fmt.Errorf("Aborted") default: - mirrorQueue.Add(item) - return nil + return mirrorQueue.Push(&item) } } @@ -68,26 +98,10 @@ func Update(ctx context.Context) error { return nil } -// syncMirrors checks and syncs mirrors. -// FIXME: graceful: this should be a persistable queue -func syncMirrors(ctx context.Context) { - // Start listening on new sync requests. - for { - select { - case <-ctx.Done(): - mirrorQueue.Close() - return - case item := <-mirrorQueue.Queue(): - id, _ := strconv.ParseInt(item[5:], 10, 64) - if strings.HasPrefix(item, "pull") { - _ = SyncPullMirror(ctx, id) - } else if strings.HasPrefix(item, "push") { - _ = SyncPushMirror(ctx, id) - } else { - log.Error("Unknown item in queue: %v", item) - } - mirrorQueue.Remove(item) - } +func queueHandle(data ...queue.Data) { + for _, datum := range data { + req := datum.(*Request) + doMirror(graceful.GetManager().ShutdownContext(), req) } } @@ -96,7 +110,9 @@ func InitSyncMirrors() { if !setting.Mirror.Enabled { return } - go graceful.GetManager().RunWithShutdownContext(syncMirrors) + mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(Request)) + + go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run) } // StartToMirror adds repoID to mirror queue @@ -104,7 +120,15 @@ func StartToMirror(repoID int64) { if !setting.Mirror.Enabled { return } - go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID)) + go func() { + err := mirrorQueue.Push(&Request{ + Type: PushRequestType, + RepoID: repoID, + }) + if err != nil { + log.Error("Unable to push push mirror request to the queue for repo[%d]: Error: %v", repoID, err) + } + }() } // AddPushMirrorToQueue adds the push mirror to the queue @@ -112,5 +136,14 @@ func AddPushMirrorToQueue(mirrorID int64) { if !setting.Mirror.Enabled { return } - go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID)) + go func() { + + err := mirrorQueue.Push(&Request{ + Type: PullRequestType, + RepoID: mirrorID, + }) + if err != nil { + log.Error("Unable to push pull mirror request to the queue for repo[%d]: Error: %v", mirrorID, err) + } + }() }