Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡ fetch org repositories in parallel #4970

Merged
merged 4 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions internal/workerpool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Mondoo, Inc.
// SPDX-License-Identifier: BUSL-1.1

package workerpool

import (
"github.com/cockroachdb/errors"
)

type Task[R any] func() (result R, err error)

type Pool[R any] struct {
queue chan Task[R]
results chan R
errors chan error
workerCount int
requestsSent int
requestsRead int

err error
}

func New[R any](count int) *Pool[R] {
return &Pool[R]{
queue: make(chan Task[R]),
results: make(chan R),
errors: make(chan error),
workerCount: count,
}
}

func (p *Pool[R]) Start() {
for i := 0; i < p.workerCount; i++ {
w := worker[R]{id: i, queue: p.queue, results: p.results, errors: p.errors}
w.Start()
}

p.errorCollector()
}

func (p *Pool[R]) errorCollector() {
go func() {
for e := range p.errors {
p.err = errors.Join(p.err, e)
}
}()
}

func (p *Pool[R]) GetError() error {
return p.err
}

func (p *Pool[R]) Submit(t Task[R]) {
p.queue <- t
p.requestsSent++
}

func (p *Pool[R]) GetResult() R {
defer func() {
p.requestsRead++
}()
return <-p.results
}

func (p *Pool[R]) HasPendingRequests() bool {
return p.requestsSent-p.requestsRead > 0
}

func (p *Pool[R]) Close() {
close(p.queue)
}
145 changes: 145 additions & 0 deletions internal/workerpool/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) Mondoo, Inc.
// SPDX-License-Identifier: BUSL-1.1

package workerpool_test

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.mondoo.com/cnquery/v11/internal/workerpool"
)

func TestPoolSubmitAndRetrieveResult(t *testing.T) {
pool := workerpool.New[int](2)
pool.Start()
defer pool.Close()

task := func() (int, error) {
return 42, nil
}

// no requests
assert.False(t, pool.HasPendingRequests())

// submit a request
pool.Submit(task)

// should have pending requests
assert.True(t, pool.HasPendingRequests())

// assert results comes back
result := pool.GetResult()
assert.Equal(t, 42, result)

// no more requests pending
assert.False(t, pool.HasPendingRequests())

// no errors
assert.Nil(t, pool.GetError())
}

func TestPoolHandleErrors(t *testing.T) {
pool := workerpool.New[int](5)
pool.Start()
defer pool.Close()

// submit a task that will return an error
task := func() (int, error) {
return 0, errors.New("task error")
}
pool.Submit(task)

// Wait for error collector to process
time.Sleep(100 * time.Millisecond)

err := pool.GetError()
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "task error")
}
}

func TestPoolMultipleTasksWithErrors(t *testing.T) {
type test struct {
data int
}
pool := workerpool.New[*test](5)
pool.Start()
defer pool.Close()

tasks := []workerpool.Task[*test]{
func() (*test, error) { return &test{1}, nil },
func() (*test, error) { return &test{2}, nil },
func() (*test, error) {
return nil, errors.New("task error")
},
func() (*test, error) { return &test{3}, nil },
}

for _, task := range tasks {
pool.Submit(task)
}

var results []*test
for range tasks {
results = append(results, pool.GetResult())
}

assert.ElementsMatch(t, []*test{nil, &test{1}, &test{2}, &test{3}}, results)
assert.False(t, pool.HasPendingRequests())

}

func TestPoolHandlesNilTasks(t *testing.T) {
pool := workerpool.New[int](2)
pool.Start()
defer pool.Close()

var nilTask workerpool.Task[int]
pool.Submit(nilTask)

// Wait for worker to process the nil task
time.Sleep(100 * time.Millisecond)

err := pool.GetError()
assert.NoError(t, err)
}

func TestPoolHasPendingRequests(t *testing.T) {
pool := workerpool.New[int](2)
pool.Start()
defer pool.Close()

task := func() (int, error) {
time.Sleep(50 * time.Millisecond)
return 10, nil
}

pool.Submit(task)
assert.True(t, pool.HasPendingRequests())

result := pool.GetResult()
assert.Equal(t, 10, result)
assert.False(t, pool.HasPendingRequests())
}

func TestPoolClosesGracefully(t *testing.T) {
pool := workerpool.New[int](1)
pool.Start()

task := func() (int, error) {
time.Sleep(100 * time.Millisecond)
return 42, nil
}

pool.Submit(task)

pool.Close()

// Ensure no panic occurs and channels are closed
assert.PanicsWithError(t, "send on closed channel", func() {
pool.Submit(task)
})
}
28 changes: 28 additions & 0 deletions internal/workerpool/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Mondoo, Inc.
// SPDX-License-Identifier: BUSL-1.1

package workerpool

type worker[R any] struct {
id int
queue <-chan Task[R]
results chan<- R
errors chan<- error
}

func (w *worker[R]) Start() {
go func() {
for task := range w.queue {
if task == nil {
continue
}

data, err := task()
if err != nil {
w.errors <- err
}

w.results <- data
}
}()
}
4 changes: 2 additions & 2 deletions providers-sdk/v1/inventory/inventory.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions providers-sdk/v1/plugin/plugin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 12 additions & 34 deletions providers-sdk/v1/plugin/plugin_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions providers-sdk/v1/resources/resources.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading