From a0052ddaafc64757802278686adbda03e6835b89 Mon Sep 17 00:00:00 2001 From: sabevzenko Date: Wed, 6 Mar 2024 15:52:01 +0300 Subject: [PATCH] client pool client pool 6718ea378b80e1066aa30589885e3e728910c1c1 --- .mapping.json | 4 +- components/guns/grpc/core.go | 77 ++++++++--- components/guns/grpc/scenario/core.go | 4 - components/guns/grpc/shared_deps.go | 12 ++ core/clientpool/pool.go | 33 +++++ core/core.go | 2 + core/engine/engine.go | 11 +- core/engine/instance.go | 14 +- core/engine/instance_test.go | 16 +-- core/warmup/interface.go | 1 - go.mod | 3 +- go.sum | 8 +- tests/acceptance/common.go | 18 --- tests/acceptance/config_model.go | 1 + ...m_reflection_grpc_test.go => grpc_test.go} | 122 +++++++++++++++--- tests/acceptance/http_test.go | 4 +- tests/acceptance/testdata/grpc/base.yaml | 1 + tests/grpc_scenario/main_test.go | 21 +-- 18 files changed, 245 insertions(+), 107 deletions(-) create mode 100644 components/guns/grpc/shared_deps.go create mode 100644 core/clientpool/pool.go rename tests/acceptance/{custom_reflection_grpc_test.go => grpc_test.go} (54%) diff --git a/.mapping.json b/.mapping.json index 78b840c0a..9ccf896dd 100644 --- a/.mapping.json +++ b/.mapping.json @@ -39,6 +39,7 @@ "components/guns/grpc/scenario/templater.go":"load/projects/pandora/components/guns/grpc/scenario/templater.go", "components/guns/grpc/scenario/templater_text.go":"load/projects/pandora/components/guns/grpc/scenario/templater_text.go", "components/guns/grpc/scenario/templater_text_test.go":"load/projects/pandora/components/guns/grpc/scenario/templater_text_test.go", + "components/guns/grpc/shared_deps.go":"load/projects/pandora/components/guns/grpc/shared_deps.go", "components/guns/http/base.go":"load/projects/pandora/components/guns/http/base.go", "components/guns/http/base_test.go":"load/projects/pandora/components/guns/http/base_test.go", "components/guns/http/client.go":"load/projects/pandora/components/guns/http/client.go", @@ -164,6 +165,7 @@ "core/aggregator/reporter.go":"load/projects/pandora/core/aggregator/reporter.go", "core/aggregator/reporter_test.go":"load/projects/pandora/core/aggregator/reporter_test.go", "core/aggregator/test.go":"load/projects/pandora/core/aggregator/test.go", + "core/clientpool/pool.go":"load/projects/pandora/core/clientpool/pool.go", "core/config/config.go":"load/projects/pandora/core/config/config.go", "core/config/config_test.go":"load/projects/pandora/core/config/config_test.go", "core/config/doc.go":"load/projects/pandora/core/config/doc.go", @@ -356,7 +358,7 @@ "script/coverage.sh":"load/projects/pandora/script/coverage.sh", "tests/acceptance/common.go":"load/projects/pandora/tests/acceptance/common.go", "tests/acceptance/config_model.go":"load/projects/pandora/tests/acceptance/config_model.go", - "tests/acceptance/custom_reflection_grpc_test.go":"load/projects/pandora/tests/acceptance/custom_reflection_grpc_test.go", + "tests/acceptance/grpc_test.go":"load/projects/pandora/tests/acceptance/grpc_test.go", "tests/acceptance/http_test.go":"load/projects/pandora/tests/acceptance/http_test.go", "tests/acceptance/testdata/grpc/base.yaml":"load/projects/pandora/tests/acceptance/testdata/grpc/base.yaml", "tests/acceptance/testdata/grpc/grpc.payload":"load/projects/pandora/tests/acceptance/testdata/grpc/grpc.payload", diff --git a/components/guns/grpc/core.go b/components/guns/grpc/core.go index 198531e23..6914cc231 100644 --- a/components/guns/grpc/core.go +++ b/components/guns/grpc/core.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "fmt" "strconv" "strings" @@ -17,6 +18,7 @@ import ( ammo "github.com/yandex/pandora/components/providers/grpc" "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/aggregator/netsample" + "github.com/yandex/pandora/core/clientpool" "github.com/yandex/pandora/core/warmup" "github.com/yandex/pandora/lib/answlog" "go.uber.org/zap" @@ -47,6 +49,7 @@ type GunConfig struct { TLS bool `config:"tls"` DialOptions GrpcDialOptions `config:"dial_options"` AnswLog AnswLogConfig `config:"answlog"` + PoolSize int `config:"pool-size"` } type AnswLogConfig struct { @@ -57,7 +60,6 @@ type AnswLogConfig struct { type Gun struct { DebugLog bool - Client *grpc.ClientConn Conf GunConfig Aggr core.Aggregator core.GunDeps @@ -79,9 +81,27 @@ func DefaultGunConfig() GunConfig { } } -func (g *Gun) WarmUp(opts *warmup.Options) (interface{}, error) { - target := replacePort(g.Conf.Target, g.Conf.ReflectPort) - conn, err := MakeGRPCConnect(target, g.Conf.TLS, g.Conf.DialOptions) +func (g *Gun) WarmUp(opts *warmup.Options) (any, error) { + return g.createSharedDeps(opts) +} + +func (g *Gun) createSharedDeps(opts *warmup.Options) (*SharedDeps, error) { + services, err := g.prepareMethodList(opts) + if err != nil { + return nil, err + } + clientPool, err := g.prepareClientPool() + if err != nil { + return nil, err + } + return &SharedDeps{ + services: services, + clientPool: clientPool, + }, nil +} + +func (g *Gun) prepareMethodList(opts *warmup.Options) (map[string]desc.MethodDescriptor, error) { + conn, err := g.makeReflectionConnect() if err != nil { return nil, fmt.Errorf("failed to connect to target: %w", err) } @@ -113,13 +133,22 @@ func (g *Gun) WarmUp(opts *warmup.Options) (interface{}, error) { return services, nil } -func (g *Gun) AcceptWarmUpResult(i interface{}) error { - services, ok := i.(map[string]desc.MethodDescriptor) - if !ok { - return fmt.Errorf("grpc WarmUp result should be services: map[string]desc.MethodDescriptor") +func (g *Gun) prepareClientPool() (*clientpool.Pool[grpcdynamic.Stub], error) { + if g.Conf.PoolSize <= 0 { + return nil, nil } - g.Services = services - return nil + clientPool, err := clientpool.New[grpcdynamic.Stub](g.Conf.PoolSize) + if err != nil { + return nil, fmt.Errorf("create clientpool err: %w", err) + } + for i := 0; i < g.Conf.PoolSize; i++ { + conn, err := g.makeConnect() + if err != nil { + return nil, fmt.Errorf("makeGRPCConnect fail %w", err) + } + clientPool.Add(grpcdynamic.NewStub(conn)) + } + return clientPool, nil } func NewGun(conf GunConfig) *Gun { @@ -128,14 +157,23 @@ func NewGun(conf GunConfig) *Gun { } func (g *Gun) Bind(aggr core.Aggregator, deps core.GunDeps) error { - conn, err := MakeGRPCConnect(g.Conf.Target, g.Conf.TLS, g.Conf.DialOptions) - if err != nil { - return fmt.Errorf("makeGRPCConnect fail %w", err) + sharedDeps, ok := deps.Shared.(*SharedDeps) + if !ok { + return errors.New("grpc WarmUp result should be struct: *SharedDeps") + } + g.Services = sharedDeps.services + if sharedDeps.clientPool != nil { + g.Stub = sharedDeps.clientPool.Next() + } else { + conn, err := g.makeConnect() + if err != nil { + return fmt.Errorf("makeGRPCConnect fail %w", err) + } + g.Stub = grpcdynamic.NewStub(conn) } - g.Client = conn + g.Aggr = aggr g.GunDeps = deps - g.Stub = grpcdynamic.NewStub(conn) if ent := deps.Log.Check(zap.DebugLevel, "Gun bind"); ent != nil { deps.Log.Warn("Deprecation Warning: log level: debug doesn't produce request/response logs anymore. Please use AnswLog option instead:\nanswlog:\n enabled: true\n filter: all|warning|error\n path: answ.log") @@ -217,6 +255,15 @@ func (g *Gun) AnswLogging(logger *zap.Logger, method *desc.MethodDescriptor, req logger.Debug("Response:", zap.Stringer("resp", response), zap.Error(grpcErr)) } +func (g *Gun) makeConnect() (conn *grpc.ClientConn, err error) { + return MakeGRPCConnect(g.Conf.Target, g.Conf.TLS, g.Conf.DialOptions) +} + +func (g *Gun) makeReflectionConnect() (conn *grpc.ClientConn, err error) { + target := replacePort(g.Conf.Target, g.Conf.ReflectPort) + return MakeGRPCConnect(target, g.Conf.TLS, g.Conf.DialOptions) +} + func MakeGRPCConnect(target string, isTLS bool, dialOptions GrpcDialOptions) (conn *grpc.ClientConn, err error) { opts := []grpc.DialOption{} if isTLS { diff --git a/components/guns/grpc/scenario/core.go b/components/guns/grpc/scenario/core.go index 7b5e2c95b..68e078ac0 100644 --- a/components/guns/grpc/scenario/core.go +++ b/components/guns/grpc/scenario/core.go @@ -86,10 +86,6 @@ func (g *Gun) WarmUp(opts *warmup.Options) (interface{}, error) { return g.gun.WarmUp(opts) } -func (g *Gun) AcceptWarmUpResult(i interface{}) error { - return g.gun.AcceptWarmUpResult(i) -} - func (g *Gun) Bind(aggr core.Aggregator, deps core.GunDeps) error { return g.gun.Bind(aggr, deps) } diff --git a/components/guns/grpc/shared_deps.go b/components/guns/grpc/shared_deps.go new file mode 100644 index 000000000..441b8b6ee --- /dev/null +++ b/components/guns/grpc/shared_deps.go @@ -0,0 +1,12 @@ +package grpc + +import ( + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/dynamic/grpcdynamic" + "github.com/yandex/pandora/core/clientpool" +) + +type SharedDeps struct { + services map[string]desc.MethodDescriptor + clientPool *clientpool.Pool[grpcdynamic.Stub] +} diff --git a/core/clientpool/pool.go b/core/clientpool/pool.go new file mode 100644 index 000000000..e078e9019 --- /dev/null +++ b/core/clientpool/pool.go @@ -0,0 +1,33 @@ +package clientpool + +import ( + "errors" + "sync/atomic" +) + +func New[T any](size int) (*Pool[T], error) { + if size <= 0 { + return nil, errors.New("pool size must be greater than zero") + } + return &Pool[T]{ + pool: make([]T, 0, size), + }, nil +} + +type Pool[T any] struct { + pool []T + i atomic.Uint64 +} + +func (p *Pool[T]) Add(conn T) { + p.pool = append(p.pool, conn) +} + +func (p *Pool[T]) Next() T { + if len(p.pool) == 0 { + var zero T + return zero + } + i := p.i.Add(1) + return p.pool[int(i)%len(p.pool)] +} diff --git a/core/core.go b/core/core.go index 36191f7d1..2006d68f6 100644 --- a/core/core.go +++ b/core/core.go @@ -103,6 +103,8 @@ type GunDeps struct { InstanceID int PoolID string + Shared any + // TODO(skipor): https://github.com/yandex/pandora/issues/71 // Pass parallelism value. InstanceId MUST be -1 if parallelism > 1. } diff --git a/core/engine/engine.go b/core/engine/engine.go index a2d3f21e0..6d5476d86 100644 --- a/core/engine/engine.go +++ b/core/engine/engine.go @@ -108,7 +108,7 @@ func (e *Engine) Wait() { func newPool(log *zap.Logger, m Metrics, onWaitDone func(), conf InstancePoolConfig) *instancePool { log = log.With(zap.String("pool", conf.ID)) - return &instancePool{log, m, onWaitDone, conf, nil} + return &instancePool{log: log, metrics: m, onWaitDone: onWaitDone, InstancePoolConfig: conf} } type instancePool struct { @@ -116,7 +116,7 @@ type instancePool struct { metrics Metrics onWaitDone func() InstancePoolConfig - gunWarmUpResult interface{} + sharedGunDeps any } // Run start instance pool. Run blocks until fail happen, or all instances finish. @@ -169,10 +169,7 @@ func (p *instancePool) warmUpGun(ctx context.Context) error { return fmt.Errorf("can't initiate a gun: %w", err) } if gunWithWarmUp, ok := gun.(warmup.WarmedUp); ok { - p.gunWarmUpResult, err = gunWithWarmUp.WarmUp(&warmup.Options{ - Log: p.log, - Ctx: ctx, - }) + p.sharedGunDeps, err = gunWithWarmUp.WarmUp(&warmup.Options{Log: p.log, Ctx: ctx}) if err != nil { return fmt.Errorf("gun warm up failed: %w", err) } @@ -362,7 +359,7 @@ func (p *instancePool) startInstances( instanceSharedDeps: instanceSharedDeps{ provider: p.Provider, metrics: p.metrics, - gunWarmUpResult: p.gunWarmUpResult, + gunDeps: p.sharedGunDeps, aggregator: p.Aggregator, discardOverflow: p.DiscardOverflow, }, diff --git a/core/engine/instance.go b/core/engine/instance.go index 9f153a022..965f64512 100644 --- a/core/engine/instance.go +++ b/core/engine/instance.go @@ -2,14 +2,12 @@ package engine import ( "context" - "fmt" "io" "github.com/pkg/errors" "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/aggregator/netsample" "github.com/yandex/pandora/core/coreutil" - "github.com/yandex/pandora/core/warmup" "github.com/yandex/pandora/lib/tag" "go.uber.org/zap" ) @@ -24,7 +22,7 @@ type instance struct { func newInstance(ctx context.Context, log *zap.Logger, poolID string, id int, deps instanceDeps) (*instance, error) { log = log.With(zap.Int("instance", id)) - gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: poolID, InstanceID: id} + gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: poolID, InstanceID: id, Shared: deps.gunDeps} sched, err := deps.newSchedule() if err != nil { return nil, err @@ -33,16 +31,12 @@ func newInstance(ctx context.Context, log *zap.Logger, poolID string, id int, de if err != nil { return nil, err } - if warmedUp, ok := gun.(warmup.WarmedUp); ok { - if err := warmedUp.AcceptWarmUpResult(deps.gunWarmUpResult); err != nil { - return nil, fmt.Errorf("gun failed to accept warmup result: %w", err) - } - } + err = gun.Bind(deps.aggregator, gunDeps) if err != nil { return nil, err } - inst := &instance{log, id, gun, sched, deps.instanceSharedDeps} + inst := &instance{log: log, id: id, gun: gun, schedule: sched, instanceSharedDeps: deps.instanceSharedDeps} return inst, nil } @@ -55,7 +49,7 @@ type instanceDeps struct { type instanceSharedDeps struct { provider core.Provider metrics Metrics - gunWarmUpResult interface{} + gunDeps any aggregator core.Aggregator discardOverflow bool } diff --git a/core/engine/instance_test.go b/core/engine/instance_test.go index 9f6adf4c3..fa02cadd0 100644 --- a/core/engine/instance_test.go +++ b/core/engine/instance_test.go @@ -47,15 +47,13 @@ func Test_Instance(t *testing.T) { var justBeforeEach = func() { deps := instanceDeps{ - - newSchedule, - newGun, - instanceSharedDeps{ - provider, - metrics, - nil, - aggregator, - false, + newSchedule: newSchedule, + newGun: newGun, + instanceSharedDeps: instanceSharedDeps{ + provider: provider, + metrics: metrics, + aggregator: aggregator, + discardOverflow: false, }, } ins, insCreateErr = newInstance(ctx, newNopLogger(), "pool_0", 0, deps) diff --git a/core/warmup/interface.go b/core/warmup/interface.go index 6438074bf..e794d8021 100644 --- a/core/warmup/interface.go +++ b/core/warmup/interface.go @@ -2,5 +2,4 @@ package warmup type WarmedUp interface { WarmUp(*Options) (interface{}, error) - AcceptWarmUpResult(interface{}) error } diff --git a/go.mod b/go.mod index d3ff7c3b8..7651c7fc8 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 - golang.org/x/net v0.19.0 + golang.org/x/net v0.20.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 google.golang.org/grpc v1.60.1 google.golang.org/protobuf v1.32.0 @@ -64,6 +64,7 @@ require ( github.com/zclconf/go-cty v1.13.2 // indirect go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect diff --git a/go.sum b/go.sum index 76a3287ab..e7a36317d 100644 --- a/go.sum +++ b/go.sum @@ -339,8 +339,8 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= -golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -360,8 +360,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/tests/acceptance/common.go b/tests/acceptance/common.go index da5b39671..ca774bf49 100644 --- a/tests/acceptance/common.go +++ b/tests/acceptance/common.go @@ -14,9 +14,6 @@ import ( "github.com/yandex/pandora/core/config" "github.com/yandex/pandora/core/engine" "github.com/yandex/pandora/lib/monitoring" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest/observer" "gopkg.in/yaml.v2" ) @@ -50,21 +47,6 @@ func unmarshalConfigFile(t *testing.T, filename string, serverAddr string) map[s return mapCfg } -func newNullLogger() *zap.Logger { - c, _ := observer.New(zap.InfoLevel) - return zap.New(c) -} - -func newLogger() *zap.Logger { - zapConf := zap.NewDevelopmentConfig() - zapConf.Level.SetLevel(zapcore.DebugLevel) - log, err := zapConf.Build(zap.AddCaller()) - if err != nil { - zap.L().Fatal("Logger build failed", zap.Error(err)) - } - return log -} - func newEngineMetrics(prefix string) engine.Metrics { return engine.Metrics{ Request: monitoring.NewCounter(prefix + "_Requests"), diff --git a/tests/acceptance/config_model.go b/tests/acceptance/config_model.go index ec469e869..eca7c9781 100644 --- a/tests/acceptance/config_model.go +++ b/tests/acceptance/config_model.go @@ -15,6 +15,7 @@ type PandoraConfigGRPCGun struct { Target string `yaml:"target"` TLS bool `yaml:"tls"` ReflectPort *int64 `yaml:"reflect_port,omitempty"` + PoolSize int `yaml:"pool-size,omitempty"` } type PandoraConfigAmmo struct { Type string `yaml:"type"` diff --git a/tests/acceptance/custom_reflection_grpc_test.go b/tests/acceptance/grpc_test.go similarity index 54% rename from tests/acceptance/custom_reflection_grpc_test.go rename to tests/acceptance/grpc_test.go index 8fd2ead9e..cec303f80 100644 --- a/tests/acceptance/custom_reflection_grpc_test.go +++ b/tests/acceptance/grpc_test.go @@ -10,12 +10,16 @@ import ( "github.com/spf13/afero" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/yandex/pandora/cli" grpcimport "github.com/yandex/pandora/components/grpc/import" phttpimport "github.com/yandex/pandora/components/phttp/import" "github.com/yandex/pandora/core/engine" coreimport "github.com/yandex/pandora/core/import" "github.com/yandex/pandora/examples/grpc/server" "github.com/yandex/pandora/lib/pointer" + "github.com/yandex/pandora/lib/testutil" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "gopkg.in/yaml.v2" @@ -28,7 +32,7 @@ func TestCheckGRPCReflectServer(t *testing.T) { phttpimport.Import(fs) grpcimport.Import(fs) }) - pandoraLogger := newNullLogger() + pandoraLogger := testutil.NewNullLogger() pandoraMetrics := newEngineMetrics("reflect") baseFile, err := os.ReadFile("testdata/grpc/base.yaml") require.NoError(t, err) @@ -53,15 +57,7 @@ func TestCheckGRPCReflectServer(t *testing.T) { grpcServer.Stop() }() - cfg := PandoraConfigGRPC{} - err = yaml.Unmarshal(baseFile, &cfg) - require.NoError(t, err) - b, err := yaml.Marshal(cfg) - require.NoError(t, err) - mapCfg := map[string]any{} - err = yaml.Unmarshal(b, &mapCfg) - require.NoError(t, err) - conf := decodeConfig(t, mapCfg) + conf := parseFileContentToCliConfig(t, baseFile, nil) require.Equal(t, 1, len(conf.Engine.Pools)) aggr := &aggregator{} @@ -105,16 +101,9 @@ func TestCheckGRPCReflectServer(t *testing.T) { reflectionGrpcServer.Stop() }() - cfg := PandoraConfigGRPC{} - err = yaml.Unmarshal(baseFile, &cfg) - cfg.Pools[0].Gun.ReflectPort = pointer.ToInt64(18889) - require.NoError(t, err) - b, err := yaml.Marshal(cfg) - require.NoError(t, err) - mapCfg := map[string]any{} - err = yaml.Unmarshal(b, &mapCfg) - require.NoError(t, err) - conf := decodeConfig(t, mapCfg) + conf := parseFileContentToCliConfig(t, baseFile, func(c *PandoraConfigGRPC) { + c.Pools[0].Gun.ReflectPort = pointer.ToInt64(18889) + }) require.Equal(t, 1, len(conf.Engine.Pools)) aggr := &aggregator{} @@ -128,3 +117,96 @@ func TestCheckGRPCReflectServer(t *testing.T) { require.Equal(t, int64(8), st.Hello) }) } + +func TestGrpcGunSuite(t *testing.T) { + suite.Run(t, new(GrpcGunSuite)) +} + +type GrpcGunSuite struct { + suite.Suite + fs afero.Fs + log *zap.Logger + metrics engine.Metrics +} + +func (s *GrpcGunSuite) SetupSuite() { + s.fs = afero.NewOsFs() + testOnce.Do(func() { + coreimport.Import(s.fs) + phttpimport.Import(s.fs) + grpcimport.Import(s.fs) + }) + + s.log = testutil.NewNullLogger() + s.metrics = newEngineMetrics("grpc_suite") +} + +func (s *GrpcGunSuite) Test_Run() { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + baseFile, err := os.ReadFile("testdata/grpc/base.yaml") + s.Require().NoError(err) + + tests := []struct { + name string + overwrite func(c *PandoraConfigGRPC) + wantCnt int64 + }{ + { + name: "default testdata/grpc/base.yaml", + wantCnt: 8, + }, + { + name: "add pool-size testdata/grpc/base.yaml", + overwrite: func(c *PandoraConfigGRPC) { + c.Pools[0].Gun.PoolSize = 2 + }, + wantCnt: 8, + }, + } + for _, tt := range tests { + s.Run(tt.name, func() { + + grpcServer := grpc.NewServer() + srv := server.NewServer(logger, time.Now().UnixNano()) + server.RegisterTargetServiceServer(grpcServer, srv) + reflection.Register(grpcServer) + l, err := net.Listen("tcp", ":18888") + s.Require().NoError(err) + go func() { + err = grpcServer.Serve(l) + s.Require().NoError(err) + }() + defer func() { + grpcServer.Stop() + }() + + conf := parseFileContentToCliConfig(s.T(), baseFile, tt.overwrite) + + aggr := &aggregator{} + conf.Engine.Pools[0].Aggregator = aggr + pandora := engine.New(s.log, s.metrics, conf.Engine) + + err = pandora.Run(context.Background()) + s.Require().NoError(err) + stats, err := srv.Stats(context.Background(), nil) + s.Require().NoError(err) + s.Require().Equal(tt.wantCnt, stats.Hello) + }) + } +} + +func parseFileContentToCliConfig(t *testing.T, baseFile []byte, overwrite func(c *PandoraConfigGRPC)) *cli.CliConfig { + cfg := PandoraConfigGRPC{} + err := yaml.Unmarshal(baseFile, &cfg) + require.NoError(t, err) + if overwrite != nil { + overwrite(&cfg) + } + b, err := yaml.Marshal(cfg) + require.NoError(t, err) + mapCfg := map[string]any{} + err = yaml.Unmarshal(b, &mapCfg) + require.NoError(t, err) + + return decodeConfig(t, mapCfg) +} diff --git a/tests/acceptance/http_test.go b/tests/acceptance/http_test.go index e646299bf..829a48a7c 100644 --- a/tests/acceptance/http_test.go +++ b/tests/acceptance/http_test.go @@ -13,6 +13,7 @@ import ( phttpimport "github.com/yandex/pandora/components/phttp/import" "github.com/yandex/pandora/core/engine" coreimport "github.com/yandex/pandora/core/import" + "github.com/yandex/pandora/lib/testutil" "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/net/http2" @@ -39,8 +40,7 @@ func (s *PandoraSuite) SetupSuite() { grpc.Import(s.fs) }) - s.log = newNullLogger() - // s.log = newLogger() + s.log = testutil.NewNullLogger() s.metrics = newEngineMetrics("http_suite") } diff --git a/tests/acceptance/testdata/grpc/base.yaml b/tests/acceptance/testdata/grpc/base.yaml index 4668a24e7..2d08b462a 100644 --- a/tests/acceptance/testdata/grpc/base.yaml +++ b/tests/acceptance/testdata/grpc/base.yaml @@ -4,6 +4,7 @@ pools: type: grpc target: localhost:18888 tls: false + pool-size: 0 ammo: type: grpc/json file: testdata/grpc/grpc.payload diff --git a/tests/grpc_scenario/main_test.go b/tests/grpc_scenario/main_test.go index 0f34605db..fd84c7bd4 100644 --- a/tests/grpc_scenario/main_test.go +++ b/tests/grpc_scenario/main_test.go @@ -88,13 +88,10 @@ func (s *GunSuite) Test_Scenario() { } g := grpcscenario.NewGun(cfg) - res, err := g.WarmUp(&warmup.Options{Log: log, Ctx: ctx}) + sharedDeps, err := g.WarmUp(&warmup.Options{Log: log, Ctx: ctx}) s.NoError(err) - err = g.AcceptWarmUpResult(res) - s.NoError(err) - - gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: "test", InstanceID: 1} + gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: "test", InstanceID: 1, Shared: sharedDeps} aggr := &Aggregator{} err = g.Bind(aggr, gunDeps) s.NoError(err) @@ -151,13 +148,10 @@ func (s *GunSuite) Test_FullScenario() { } g := grpcscenario.NewGun(gunConfig) - res, err := g.WarmUp(&warmup.Options{Log: log, Ctx: ctx}) - s.NoError(err) - - err = g.AcceptWarmUpResult(res) + sharedDeps, err := g.WarmUp(&warmup.Options{Log: log, Ctx: ctx}) s.NoError(err) - gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: "pool_id", InstanceID: 1} + gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: "pool_id", InstanceID: 1, Shared: sharedDeps} aggr := &Aggregator{} err = g.Bind(aggr, gunDeps) s.NoError(err) @@ -201,13 +195,10 @@ func (s *GunSuite) Test_ErrorScenario() { } g := grpcscenario.NewGun(cfg) - res, err := g.WarmUp(&warmup.Options{Log: log, Ctx: ctx}) - s.NoError(err) - - err = g.AcceptWarmUpResult(res) + sharedDeps, err := g.WarmUp(&warmup.Options{Log: log, Ctx: ctx}) s.NoError(err) - gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: "test", InstanceID: 1} + gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: "test", InstanceID: 1, Shared: sharedDeps} aggr := &Aggregator{} err = g.Bind(aggr, gunDeps) s.NoError(err)