diff --git a/database/test/tarantool/pinger_test.go b/database/test/tarantool/pinger_test.go index 3fd8ebc..71c9bb5 100644 --- a/database/test/tarantool/pinger_test.go +++ b/database/test/tarantool/pinger_test.go @@ -24,8 +24,6 @@ func TestConnectFailover(t *testing.T) { arConnTimeout := 200 * time.Millisecond - pingInterval := 500 * time.Millisecond - // включаем микросекунды в std логере log.SetFlags(log.LstdFlags | log.Lmicroseconds) @@ -63,8 +61,7 @@ func TestConnectFailover(t *testing.T) { "arcfg/Timeout": arConnTimeout, }) - pinger := activerecord.NewPinger(activerecord.WithPingInterval(pingInterval)) - defer pinger.StopWatch() + pinger := NewBasicConnectionChecker() logger := activerecord.NewLogger() logger.SetLogLevel(activerecord.ErrorLoggerLevel) @@ -75,11 +72,15 @@ func TestConnectFailover(t *testing.T) { activerecord.WithConnectionPinger(pinger), ) - _, err = activerecord.AddClusterChecker(ctx, cfgName, octopus.ClusterConfigParams) + _, err = activerecord.AddClusterChecker(ctx, cfgName, activerecord.ClusterConfigParameters{ + Globs: octopus.DefaultConnectionParams, + OptionCreator: octopus.DefaultOptionCreator, + OptionChecker: octopus.CheckShardInstance, + }) require.NoError(t, err) // проверяем типы и состав узлов кластера после загрузки конфигурации - instances := pinger.ObservedInstances(cfgName) + instances := pinger.ObservedInstances() // все инстансы из конфигурации (включая несуществующие fakehost) require.Len(t, instances, 5) @@ -98,7 +99,7 @@ func TestConnectFailover(t *testing.T) { eg := &errgroup.Group{} // асинхронно запускаем серию параллельных запросов в узлы кластера (отлавливаем тормоза и гонки) - for g := 0; g < 80; g++ { + for g := 0; g < 8; g++ { g := g eg.Go(func() error { for i := 0; i < 5000; i++ { @@ -144,11 +145,10 @@ func TestConnectFailover(t *testing.T) { } // останавливаем мастер ноду - require.NoError(t, master.Stop(ctx)) - // подождем пока пингер актуализирует кластер после остановки ноды - time.Sleep(pingInterval + 10*time.Millisecond) + require.NoError(t, master.Terminate(ctx)) + require.NoError(t, pinger.check(ctx)) - instances = pinger.ObservedInstances(cfgName) + instances = pinger.ObservedInstances() // проверяем что остановленный мастер пропал из доступных узлов masters := availableInstances(instances, activerecord.ModeMaster) require.Len(t, masters, 0) @@ -159,16 +159,17 @@ func TestConnectFailover(t *testing.T) { // останавливаем одну реплику (но в конфигурации активрекорд она по прежнему присутствует) require.NoError(t, replica3.Stop(ctx)) - // подождем пока пингер актуализирует кластер после остановки ноды - time.Sleep(pingInterval + 10*time.Millisecond) + require.NoError(t, pinger.check(ctx)) - instances = pinger.ObservedInstances(cfgName) + instances = pinger.ObservedInstances() replicas = availableInstances(instances, activerecord.ModeReplica) // осталась одна доступная реплика require.Len(t, replicas, 1) require.Equal(t, rHost1, replicas[0].Config.Addr) - require.NoError(t, master.Start(ctx)) + // поднимает контейнер с мастером БД + master, err = tarantool.RunContainer(ctx, tarantool.WithTarantool15("tarantool/tarantool:1.5", time.Second)) + require.NoError(t, err) masterHost, err := master.ServerHostPort(ctx) require.NoError(t, err) @@ -179,13 +180,12 @@ func TestConnectFailover(t *testing.T) { "arcfg/Timeout": arConnTimeout, }) - fmt.Println("update arconfig") + log.Println("update arconfig") - // подождем пока пингер актуализирует кластер после остановки ноды - time.Sleep(pingInterval + 10*time.Millisecond) + require.NoError(t, pinger.check(ctx)) // обновленная конфигурация состоит из 2 узлов - instances = pinger.ObservedInstances(cfgName) + instances = pinger.ObservedInstances() require.Len(t, instances, 2) masters = availableInstances(instances, activerecord.ModeMaster) @@ -225,6 +225,68 @@ func availableInstances(instances []activerecord.ShardInstance, modeType activer return ret } +type TestConnectionChecker struct { + checkParams activerecord.ClusterConfigParameters + cfgName string + instances []activerecord.ShardInstance +} + +func (c *TestConnectionChecker) AddClusterChecker(ctx context.Context, cfgName string, params activerecord.ClusterConfigParameters) (*activerecord.Cluster, error) { + c.checkParams = params + c.cfgName = cfgName + + return nil, c.check(ctx) +} + +func NewBasicConnectionChecker() *TestConnectionChecker { + return &TestConnectionChecker{} +} + +func (c *TestConnectionChecker) check(ctx context.Context) error { + clusterConfig, err := activerecord.ConfigCacher().Get(ctx, c.cfgName, c.checkParams.Globs, c.checkParams.OptionCreator) + if err != nil { + return fmt.Errorf("can't load cluster info: %w", err) + } + + for i := 0; i < clusterConfig.Shards(); i++ { + curInstances := clusterConfig.ShardInstances(i) + + var actualShard activerecord.Shard + + for _, si := range curInstances { + opts, connErr := c.checkParams.OptionChecker(ctx, si) + + if opts != nil { + si.Config.Mode = opts.InstanceMode() + } + + instance := activerecord.ShardInstance{ + ParamsID: si.ParamsID, + Config: si.Config, + Options: si.Options, + Offline: connErr != nil, + } + + switch instance.Config.Mode { + case activerecord.ModeMaster: + actualShard.Masters = append(actualShard.Masters, instance) + case activerecord.ModeReplica: + actualShard.Replicas = append(actualShard.Replicas, instance) + } + } + + instances := actualShard.Instances() + c.instances = instances + clusterConfig.SetShardInstances(i, instances) + } + + return nil +} + +func (c *TestConnectionChecker) ObservedInstances() []activerecord.ShardInstance { + return c.instances +} + type TestConfig struct { cfg sync.Map created time.Time diff --git a/go.mod b/go.mod index 1df62a3..d445260 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/ebirukov/tnt-containers v1.0.2 - github.com/mailru/activerecord v1.11.0-b3 + github.com/mailru/activerecord v1.11.0 github.com/stretchr/testify v1.8.4 golang.org/x/sync v0.3.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 84ea89b..00ef07d 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= -github.com/mailru/activerecord v1.11.0-b3 h1:mJsp9cGhm+nhKrU4UDs7++kW/uZ1FZDAoJBODCqvEuo= -github.com/mailru/activerecord v1.11.0-b3/go.mod h1:Dg3GWaG8VauQujFrWsjlXX6I093sm3xq3945/aur+OM= +github.com/mailru/activerecord v1.11.0 h1:OwxTHyMmrNuSe/7syqYcfKkkEoBapBvrylWZPuL3d/g= +github.com/mailru/activerecord v1.11.0/go.mod h1:Dg3GWaG8VauQujFrWsjlXX6I093sm3xq3945/aur+OM= github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45 h1:x3Zw96Gt6HbEPUWsTbQYj/nfaNv5lWHy6CeEkl8gwqw= github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45/go.mod h1:guLmlFj8yjd0hoz+QWxRU4Gn+VOb2nOQZ4EqRmMHarw= github.com/moby/patternmatcher v0.5.0 h1:YCZgJOeULcxLw1Q+sVR636pmS7sPEn1Qo2iAN6M7DBo=