diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index bdff36887039..b62f76ec302c 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -91,7 +91,7 @@ func NewMultiConnPool(cfg MultiConnPoolCfg, urls ...string) (*MultiConnPool, err // pool. var g errgroup.Group // Limit concurrent connection establishment. Allowing this to run - // at maximum parallism would trigger syn flood protection on the + // at maximum parallelism would trigger syn flood protection on the // host, which combined with any packet loss could cause Acquire to // return an error and fail the whole function. The value 100 is // chosen because it is less than the default value for SOMAXCONN @@ -102,8 +102,8 @@ func NewMultiConnPool(cfg MultiConnPoolCfg, urls ...string) (*MultiConnPool, err conns := warmupConns[i] for j := range conns { j := j + sem <- struct{}{} g.Go(func() error { - sem <- struct{}{} var err error conns[j], err = p.Acquire() <-sem diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index 146fe168337d..3c660c16bae3 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -555,6 +555,9 @@ func (w *tpcc) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, ql := workload.QueryLoad{SQLDatabase: sqlDatabase} ql.WorkerFns = make([]func(context.Context) error, w.workers) var group errgroup.Group + // Limit the amount of workers we initialize in parallel, to avoid running out + // of memory (#36897). + sem := make(chan struct{}, 100) for workerIdx := range ql.WorkerFns { workerIdx := workerIdx warehouse := w.wPart.totalElems[workerIdx%len(w.wPart.totalElems)] @@ -567,11 +570,13 @@ func (w *tpcc) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, dbs := partitionDBs[p] db := dbs[warehouse%len(dbs)] + sem <- struct{}{} group.Go(func() error { worker, err := newWorker(context.TODO(), w, db, reg.GetHandle(), warehouse) if err == nil { ql.WorkerFns[workerIdx] = worker.run } + <-sem return err }) }