Skip to content

Commit

Permalink
Merge pull request #42 from lxzan/testing
Browse files Browse the repository at this point in the history
v1.6.7
  • Loading branch information
lxzan authored Aug 19, 2023
2 parents 6c89b70 + b6ceb2b commit 4c33e36
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
26 changes: 22 additions & 4 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type (
q []asyncJob // 任务队列
maxConcurrency int32 // 最大并发
curConcurrency int32 // 当前并发
offset int // 偏移量
}

asyncJob func()
Expand All @@ -25,6 +26,24 @@ func newWorkerQueue(maxConcurrency int32) *workerQueue {
return c
}

func (c *workerQueue) pop() asyncJob {
var n = len(c.q) - c.offset
if n == 0 {
return nil
}
job := c.q[c.offset]
c.q[c.offset] = nil
c.offset++
if n == 1 {
c.offset = 0
c.q = c.q[:0]
if cap(c.q) > 256 {
c.q = nil
}
}
return job
}

// 获取一个任务
func (c *workerQueue) getJob(delta int32) asyncJob {
c.mu.Lock()
Expand All @@ -34,13 +53,12 @@ func (c *workerQueue) getJob(delta int32) asyncJob {
if c.curConcurrency >= c.maxConcurrency {
return nil
}
if len(c.q) == 0 {
var job = c.pop()
if job == nil {
return nil
}
var result = c.q[0]
c.q = c.q[1:]
c.curConcurrency++
return result
return job
}

// 循环执行任务
Expand Down
18 changes: 18 additions & 0 deletions task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,24 @@ func TestTaskQueue(t *testing.T) {
for i := int64(1); i <= 1000; i++ {
var tmp = i
w.Push(func() {
time.Sleep(time.Millisecond)
atomic.AddInt64(&sum, tmp)
wg.Done()
})
}
wg.Wait()
as.Equal(sum, int64(500500))
})

t.Run("", func(t *testing.T) {
sum := int64(0)
w := newWorkerQueue(1)
var wg = &sync.WaitGroup{}
wg.Add(1000)
for i := int64(1); i <= 1000; i++ {
var tmp = i
w.Push(func() {
time.Sleep(time.Millisecond)
atomic.AddInt64(&sum, tmp)
wg.Done()
})
Expand Down

0 comments on commit 4c33e36

Please sign in to comment.