Skip to content

Commit

Permalink
use ants pool (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS authored Oct 14, 2022
1 parent ee1f465 commit 7025077
Show file tree
Hide file tree
Showing 33 changed files with 309 additions and 1,374 deletions.
36 changes: 24 additions & 12 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -56,51 +57,56 @@ func serve(c *cli.Context) error {
utils.WritePid(config.PidFile)
defer os.Remove(config.PidFile)

if err := utils.NewPool(config.MaxConcurrency); err != nil {
log.Error(err)
return err
}
defer utils.Pool.Release()

ctx, cancel := context.WithCancel(c.Context)
defer cancel()

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1)

errChan := make(chan error, 2)
defer close(errChan)

wg := &sync.WaitGroup{}
wg.Add(2)

workloadManager, err := workload.NewManager(ctx, config)
workloadsManager, err := workload.NewManager(ctx, config)
if err != nil {
return err
}
go func() {
_ = utils.Pool.Submit(func() {
defer wg.Done()
if err := workloadManager.Run(ctx); err != nil {
if err := workloadsManager.Run(ctx); err != nil {
log.Errorf("[agent] workload manager err: %v, exiting", err)
errChan <- err
}
}()
})

nodeManager, err := node.NewManager(ctx, config)
if err != nil {
return err
}
go func() {
_ = utils.Pool.Submit(func() {
defer wg.Done()
if err := nodeManager.Run(ctx); err != nil {
log.Errorf("[agent] node manager err: %v, exiting", err)
errChan <- err
}
}()
})

apiHandler := api.NewHandler(config, workloadManager)
go apiHandler.Serve()
apiHandler := api.NewHandler(config, workloadsManager)
_ = utils.Pool.Submit(apiHandler.Serve)

go func() {
_ = utils.Pool.Submit(func() {
select {
case <-ctx.Done():
log.Info("[agent] Agent exiting")
case <-errChan:
log.Info("[agent] got err, exiting")
log.Error("[agent] Got error, exiting")
cancel()
case sig := <-signalChan:
log.Infof("[agent] Agent caught system signal %v", sig)
Expand All @@ -111,7 +117,7 @@ func serve(c *cli.Context) error {
}
cancel()
}
}()
})

wg.Wait()
return nil
Expand Down Expand Up @@ -242,6 +248,12 @@ func main() {
Value: false,
Usage: "will only check containers belong to this node if set",
},
&cli.IntFlag{
Name: "max-concurrency",
Value: runtime.NumCPU() * 100,
Usage: "max concurrency for goroutine pool",
EnvVars: []string{"ERU_MAX_CONCURRENCY"},
},
},
Action: serve,
}
Expand Down
25 changes: 17 additions & 8 deletions agent.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@
# This option is not required as the default value is "/tmp/agent.pid".
pid: /tmp/agent.pid

# store defines the type of core service.
# This option is not required as the default value is "grpc".
store: grpc

# runtime defines the type of runtime.
# This option is not required as the default value is "docker".
runtime: docker

# core defines the address of eru-core component.
# This option is not required as the default value is "127.0.0.1:5001".
core:
Expand All @@ -24,6 +16,23 @@ core:
# The default value of this option is 60.
heartbeat_interval: 120

# max_concurrency defines the goroutine pool max size
# need set a big number
# The default value is 100X number of the CPU.
max_concurrency: 1000

# check_only_mine defines check agent itself or not
# The default value is false
check_only_mine: false

# store defines the type of core service.
# This option is not required as the default value is "grpc".
store: grpc

# runtime defines the type of runtime.
# This option is not required as the default value is "docker".
runtime: docker

# auth defines the authentication values for eru-core.
#
# auth.username and auth.password are the username and password required by eru-core.
Expand Down
25 changes: 15 additions & 10 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package api
import (
"encoding/json"
"net/http"
"runtime/pprof" //nolint
"runtime/pprof" //nolint:nolintlint
"time"

// enable profile
_ "net/http/pprof" //nolint

Expand All @@ -21,8 +23,8 @@ type JSON map[string]interface{}

// Handler define handler
type Handler struct {
config *types.Config
workloadManager *workload.Manager
config *types.Config
workloadsManager *workload.Manager
}

// URL /version/
Expand Down Expand Up @@ -59,15 +61,15 @@ func (h *Handler) log(w http.ResponseWriter, req *http.Request) {
return
}
defer conn.Close()
h.workloadManager.PullLog(req.Context(), app, buf)
h.workloadsManager.PullLog(req.Context(), app, buf)
}
}

// NewHandler new api http handler
func NewHandler(config *types.Config, workloadManager *workload.Manager) *Handler {
func NewHandler(config *types.Config, workloadsManager *workload.Manager) *Handler {
return &Handler{
config: config,
workloadManager: workloadManager,
config: config,
workloadsManager: workloadsManager,
}
}

Expand Down Expand Up @@ -98,8 +100,11 @@ func (h *Handler) Serve() {
http.Handle("/metrics", promhttp.Handler())
log.Infof("[apiServe] http api started %s", h.config.API.Addr)

err := http.ListenAndServe(h.config.API.Addr, nil) //nolint
if err != nil {
log.Panicf("http api failed %s", err)
server := &http.Server{
Addr: h.config.API.Addr,
ReadHeaderTimeout: 3 * time.Second,
}
if err := server.ListenAndServe(); err != nil {
log.Errorf("http api failed %s", err)
}
}
30 changes: 23 additions & 7 deletions common/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,26 @@ package common
import "errors"

// ErrNotImplemented .
var ErrNotImplemented = errors.New("not implemented")

// ErrConnecting means writer is in connecting status, waiting to be connected
var ErrConnecting = errors.New("connecting")

// ErrInvalidScheme .
var ErrInvalidScheme = errors.New("invalid scheme")
var (
ErrNotImplemented = errors.New("not implemented")
// ErrConnecting means writer is in connecting status, waiting to be connected
ErrConnecting = errors.New("connecting")
// ErrInvalidScheme .
ErrInvalidScheme = errors.New("invalid scheme")
// ErrGetRuntimeFailed .
ErrGetRuntimeFailed = errors.New("failed to get runtime client")
// ErrInvalidRuntimeType .
ErrInvalidRuntimeType = errors.New("unknown runtime type")
// ErrGetStoreFailed .
ErrGetStoreFailed = errors.New("failed to get store client")
// ErrInvalidStoreType .
ErrInvalidStoreType = errors.New("unknown store type")
// ErrWorkloadUnhealthy .
ErrWorkloadUnhealthy = errors.New("not healthy")
// ErrClosedSteam .
ErrClosedSteam = errors.New("closed")
// ErrSyscallFailed .
ErrSyscallFailed = errors.New("syscall fail, Not a syscall.Stat_t")
// ErrDevNotFound .
ErrDevNotFound = errors.New("device not found")
)
106 changes: 9 additions & 97 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,153 +4,65 @@ go 1.19

require (
github.com/CMGS/statsd v0.0.0-20160223095033-48c421b3c1ab
github.com/StackExchange/wmi v0.0.0-20180412205111-cdffdb33acae // indirect
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40
github.com/containerd/containerd v1.6.8 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/docker/docker v20.10.18+incompatible
github.com/docker/go-units v0.5.0
github.com/go-ole/go-ole v0.0.0-20180213002836-a1ec82a652eb // indirect
github.com/jinzhu/configor v1.2.1
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/panjf2000/ants/v2 v2.5.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1 // indirect
github.com/projecteru2/core v0.0.0-20221007011422-c20649c6c5a8
github.com/projecteru2/core v0.0.0-20221011033901-ff9e2848a9e1
github.com/projecteru2/libyavirt v0.0.0-20220621042712-95cdc6363b1c
github.com/prometheus/client_golang v1.13.0
github.com/shirou/gopsutil v3.20.11+incompatible
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/testify v1.8.0
github.com/urfave/cli/v2 v2.17.1
go.uber.org/automaxprocs v1.5.1
golang.org/x/sys v0.0.0-20221006211917-84dc82d7e875
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/BurntSushi/toml v1.2.0 // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/Microsoft/hcsshim v0.9.4 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20220930113650-c6815a8c17ad // indirect
github.com/acomagu/bufpipe v1.0.3 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/StackExchange/wmi v0.0.0-20180412205111-cdffdb33acae // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cloudflare/circl v1.2.0 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.4.0 // indirect
github.com/cornelk/hashmap v1.0.8 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/getsentry/sentry-go v0.14.0 // indirect
github.com/go-git/gcfg v1.5.0 // indirect
github.com/go-git/go-billy/v5 v5.3.1 // indirect
github.com/go-git/go-git/v5 v5.4.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v0.0.0-20180213002836-a1ec82a652eb // indirect
github.com/go-ping/ping v0.0.0-20210407214646-e4e642a95741 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/errors v1.0.0 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/sys/mount v0.3.3 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
github.com/moby/term v0.0.0-20220808134915-39b0c02b01ae // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/muroq/redislock v0.0.0-20210327061935-5425e33e6f9f // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/runc v1.1.4 // indirect
github.com/panjf2000/ants/v2 v2.5.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sanity-io/litter v1.5.5 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect
github.com/xanzy/ssh-agent v0.3.2 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/api/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/v2 v2.305.5 // indirect
go.etcd.io/etcd/client/v3 v3.5.5 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.5 // indirect
go.etcd.io/etcd/raft/v3 v3.5.5 // indirect
go.etcd.io/etcd/server/v3 v3.5.5 // indirect
go.etcd.io/etcd/tests/v3 v3.5.5 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/contrib v1.10.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.1 // indirect
go.opentelemetry.io/otel v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp v0.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0 // indirect
go.opentelemetry.io/otel/internal/metric v0.27.0 // indirect
go.opentelemetry.io/otel/metric v0.32.1 // indirect
go.opentelemetry.io/otel/sdk v1.10.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.28.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.32.1 // indirect
go.opentelemetry.io/otel/trace v1.10.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b // indirect
golang.org/x/exp v0.0.0-20221006183845-316c7553db56 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20221004154528-8021a29435af // indirect
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 // indirect
golang.org/x/sys v0.0.0-20221006211917-84dc82d7e875 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20220930163606-c98284e70a91 // indirect
google.golang.org/grpc v1.50.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
honnef.co/go/tools v0.3.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
gotest.tools/v3 v3.0.3 // indirect
)
Loading

0 comments on commit 7025077

Please sign in to comment.