diff --git a/config/config_test.go b/config/config_test.go index 216c49c2..0e9263e7 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -56,18 +56,17 @@ func TestKoanfAdapter_Watch(t *gotesting.T) { var ch = make(chan struct{}) go func() { ka.watcher.Watch(ctx, func() error { - assert.NoError(t, ka.Reload(), "reload should be successful") + assert.NoError(t, ka.Reload(), "reload should be successful") err := ka.Reload() fmt.Println(err) - ch <- struct{}{} + ch <- struct{}{} return nil }) }() time.Sleep(time.Second) ioutil.WriteFile(f.Name(), []byte("foo: bar"), 0644) ioutil.WriteFile(f.Name(), []byte("foo: bar"), 0644) - <-ch - + <-ch // The following test is flaky on CI. Unable to reproduce locally. /* diff --git a/config/time.go b/config/time.go index 4de92713..3b1ab699 100644 --- a/config/time.go +++ b/config/time.go @@ -14,6 +14,11 @@ type Duration struct { time.Duration } +// Valid simplify if statement +func (d Duration) Valid() bool { + return d.Duration > 0 +} + // MarshalYAML implements yaml.Marshaler func (d Duration) MarshalYAML() (interface{}, error) { return d.String(), nil diff --git a/config/time_test.go b/config/time_test.go index 20ad8b11..ddd09377 100644 --- a/config/time_test.go +++ b/config/time_test.go @@ -80,3 +80,23 @@ func TestDuration_MarshalJSON(t *testing.T) { }) } } + +func TestDuration_Valid(t *testing.T) { + tests := []struct { + name string + val time.Duration + want bool + }{ + {">0", 1, true}, + {"<0", -1, false}, + {"=0", 0, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := Duration{tt.val} + if got := d.Valid(); got != tt.want { + t.Errorf("Valid() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/dtx/sagas/config.go b/dtx/sagas/config.go new file mode 100644 index 00000000..8374e029 --- /dev/null +++ b/dtx/sagas/config.go @@ -0,0 +1,51 @@ +package sagas + +import ( + "github.com/DoNewsCode/core/config" + "github.com/DoNewsCode/core/di" + "time" +) + +type configOut struct { + di.Out + + Config []config.ExportedConfig `group:"config,flatten"` +} + +var defaultConfig = configuration{ + SagaTimeout: config.Duration{Duration: 600 * time.Second}, + RecoverInterval: config.Duration{Duration: 60 * time.Second}, +} + +func provideConfig() configOut { + return configOut{ + Config: []config.ExportedConfig{ + { + Owner: "sagas", + Data: map[string]interface{}{ + "sagas": defaultConfig, + }, + Comment: "The saga configuration.", + }, + }, + } +} + +type configuration struct { + SagaTimeout config.Duration `json:"sagaTimeout" yaml:"sagaTimeout"` + RecoverInterval config.Duration `json:"recoverInterval" yaml:"recoverInterval"` +} + +func (c configuration) getSagaTimeout() config.Duration { + if !c.SagaTimeout.Valid() { + return defaultConfig.SagaTimeout + } + return c.SagaTimeout +} + +func (c configuration) getRecoverInterval() config.Duration { + if !c.RecoverInterval.Valid() { + return defaultConfig.RecoverInterval + } + return c.RecoverInterval +} diff --git a/dtx/sagas/dependency.go b/dtx/sagas/dependency.go index 99b4544f..abdb8081 100644 --- a/dtx/sagas/dependency.go +++ b/dtx/sagas/dependency.go @@ -4,11 +4,11 @@ import ( "context" "time" - "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/oklog/run" ) @@ -55,18 +55,13 @@ func provide(in in) out { if in.Store == nil { in.Store = NewInProcessStore() } - recover := 60 * time.Second - timeout := 600 * time.Second - - var configuration configuration - in.Conf.Unmarshal("sagas", &configuration) - - if configuration.SagaTimeout.Duration != 0 { - timeout = configuration.SagaTimeout.Duration - } - if configuration.RecoverInterval.Duration != 0 { - recover = configuration.RecoverInterval.Duration + var conf configuration + err := in.Conf.Unmarshal("sagas", &conf) + if err != nil { + level.Warn(in.Logger).Log("err", err) } + timeout := conf.getSagaTimeout().Duration + recoverVal := conf.getRecoverInterval().Duration registry := NewRegistry( in.Store, @@ -79,7 +74,7 @@ func provide(in in) out { eps[in.Steps[i].Name] = registry.AddStep(in.Steps[i]) } - return out{Registry: registry, Interval: recoverInterval(recover), SagaEndpoints: eps} + return out{Registry: registry, Interval: recoverInterval(recoverVal), SagaEndpoints: eps} } // ProvideRunGroup implements the RunProvider. @@ -103,39 +98,3 @@ func (m out) ProvideRunGroup(group *run.Group) { } func (m out) ModuleSentinel() {} - -type configOut struct { - Config []config.ExportedConfig -} - -type configuration struct { - SagaTimeout config.Duration `json:"sagaTimeout" yaml:"sagaTimeout"` - RecoverInterval config.Duration `json:"recoverInterval" yaml:"recoverInterval"` - MySQL mysql `json:"mysql" yaml:"mysql"` -} - -type mysql struct { - Connection string `json:"connection" yaml:"connection"` - Retention config.Duration `json:"retention" yaml:"retention"` - CleanupInterval config.Duration `json:"cleanupInterval" yaml:"cleanupInterval"` -} - -func provideConfig() configOut { - return configOut{Config: []config.ExportedConfig{ - { - Owner: "sagas", - Data: map[string]interface{}{ - "sagas": configuration{ - SagaTimeout: config.Duration{Duration: 600 * time.Second}, - RecoverInterval: config.Duration{Duration: 60 * time.Second}, - MySQL: mysql{ - Connection: "default", - Retention: config.Duration{Duration: 168 * time.Hour}, - CleanupInterval: config.Duration{Duration: time.Hour}, - }, - }, - }, - Comment: "The saga configuration.", - }, - }} -} diff --git a/dtx/sagas/dependency_test.go b/dtx/sagas/dependency_test.go index a1a85bc8..45ab39e4 100644 --- a/dtx/sagas/dependency_test.go +++ b/dtx/sagas/dependency_test.go @@ -47,9 +47,9 @@ func TestNew(t *testing.T) { }) } -func TestExportedConfigs(t *testing.T) { +func Test_provideConfig(t *testing.T) { conf := provideConfig() - _, err := yaml.Marshal(conf) + _, err := yaml.Marshal(conf.Config) assert.NoError(t, err) } diff --git a/dtx/sagas/mysqlstore/config.go b/dtx/sagas/mysqlstore/config.go new file mode 100644 index 00000000..007adfa9 --- /dev/null +++ b/dtx/sagas/mysqlstore/config.go @@ -0,0 +1,60 @@ +package mysqlstore + +import ( + "github.com/DoNewsCode/core/config" + "github.com/DoNewsCode/core/di" + "time" +) + +type configOut struct { + di.Out + + Config []config.ExportedConfig `group:"config,flatten"` +} + +var defaultConfig = configuration{ + Connection: "default", + Retention: config.Duration{Duration: 168 * time.Hour}, + CleanupInterval: config.Duration{Duration: time.Hour}, +} + +func provideConfig() configOut { + return configOut{ + Config: []config.ExportedConfig{ + { + Owner: "sagas", + Data: map[string]interface{}{ + "sagas-mysql": defaultConfig, + }, + Comment: "The saga mysql store configuration.", + }, + }, + } +} + +type configuration struct { + Connection string `json:"connection" yaml:"connection"` + Retention config.Duration `json:"retention" yaml:"retention"` + CleanupInterval config.Duration `json:"cleanupInterval" yaml:"cleanupInterval"` +} + +func (c configuration) getConnection() string { + if c.Connection == "" { + return defaultConfig.Connection + } + return c.Connection +} + +func (c configuration) getRetention() config.Duration { + if !c.Retention.Valid() { + return defaultConfig.Retention + } + return c.Retention +} + +func (c configuration) getCleanupInterval() config.Duration { + if !c.CleanupInterval.Valid() { + return defaultConfig.CleanupInterval + } + return c.CleanupInterval +} diff --git a/dtx/sagas/mysqlstore/dependency.go b/dtx/sagas/mysqlstore/dependency.go index 8bf5a1b2..6ddaa2be 100644 --- a/dtx/sagas/mysqlstore/dependency.go +++ b/dtx/sagas/mysqlstore/dependency.go @@ -2,12 +2,12 @@ package mysqlstore import ( "context" - "time" - "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/dtx/sagas" "github.com/DoNewsCode/core/otgorm" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/oklog/run" ) @@ -21,33 +21,28 @@ Providers returns MySQLStore dependency. - sagas.Store */ func Providers() di.Deps { - return []interface{}{provide} + return []interface{}{provide, provideConfig} } func provide(in in) (out, error) { - conn := "default" - if in.Conf.String("sagas.mysql.connection") != "" { - conn = in.Conf.String("sagas.mysql.connection") + var conf configuration + err := in.Conf.Unmarshal("sagas-mysql", &conf) + if err != nil { + level.Warn(in.Logger).Log("err", err) } + + conn := conf.getConnection() + db, err := in.Maker.Make(conn) if err != nil { return out{}, err } var opts []Option - d, _ := time.ParseDuration(in.Conf.String("sagas.mysql.retention")) - if d > 0 { - opts = append( - opts, - WithRetention(d), - ) - } - d, _ = time.ParseDuration(in.Conf.String("sagas.mysql.cleanupInterval")) - if d > 0 { - opts = append( - opts, - WithCleanUpInterval(d), - ) - } + retention := conf.getRetention().Duration + cleanupInterval := conf.getCleanupInterval().Duration + + opts = append(opts, WithRetention(retention), WithCleanUpInterval(cleanupInterval)) + store := New(db, opts...) return out{ Conn: conn, @@ -59,8 +54,9 @@ func provide(in in) (out, error) { type in struct { di.In - Maker otgorm.Maker - Conf contract.ConfigAccessor + Logger log.Logger + Maker otgorm.Maker + Conf contract.ConfigAccessor } type out struct { diff --git a/dtx/sagas/mysqlstore/dependency_test.go b/dtx/sagas/mysqlstore/dependency_test.go index bda09eb5..ad62018d 100644 --- a/dtx/sagas/mysqlstore/dependency_test.go +++ b/dtx/sagas/mysqlstore/dependency_test.go @@ -6,14 +6,16 @@ import ( "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/dtx/sagas" "github.com/DoNewsCode/core/otgorm" + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v3" "gorm.io/gorm" ) func TestProviders(t *testing.T) { c := core.Default( - core.WithInline("sagas.mysql.connection", "default"), - core.WithInline("sagas.mysql.retention", "1h"), - core.WithInline("sagas.mysql.cleanupInterval", "1h"), + core.WithInline("sagas-mysql.connection", "default"), + core.WithInline("sagas-mysql.retention", "1h"), + core.WithInline("sagas-mysql.cleanupInterval", "1h"), ) c.Provide(otgorm.Providers()) c.Provide(sagas.Providers()) @@ -34,3 +36,9 @@ func TestProviders(t *testing.T) { }.Rollback("-1") }) } + +func Test_provideConfig(t *testing.T) { + conf := provideConfig() + _, err := yaml.Marshal(conf.Config) + assert.NoError(t, err) +} diff --git a/leader/dependency.go b/leader/dependency.go index 4082be78..3ece3fa9 100644 --- a/leader/dependency.go +++ b/leader/dependency.go @@ -99,7 +99,9 @@ func determineDriver(in *in) error { } type configOut struct { - Config []config.ExportedConfig + di.Out + + Config []config.ExportedConfig `group:"config,flatten"` } func provideConfig() configOut { diff --git a/leader/dependency_test.go b/leader/dependency_test.go index d9fa9b04..d81e6dcf 100644 --- a/leader/dependency_test.go +++ b/leader/dependency_test.go @@ -80,8 +80,8 @@ func TestDetermineDriver(t *testing.T) { assert.Error(t, err) } -func TestExportedConfigs(t *testing.T) { +func Test_provideConfig(t *testing.T) { conf := provideConfig() - _, err := yaml.Marshal(conf) + _, err := yaml.Marshal(conf.Config) assert.NoError(t, err) } diff --git a/observability/observability.go b/observability/observability.go index fe0e909d..e7e6eba5 100644 --- a/observability/observability.go +++ b/observability/observability.go @@ -24,7 +24,7 @@ Providers returns a set of providers available in package observability metrics.Histogram */ func Providers() di.Deps { - return di.Deps{provide, exportConfig} + return di.Deps{provide, provideConfig} } // in is the injection argument of provide. @@ -75,7 +75,7 @@ type configOut struct { Config []config.ExportedConfig `group:"config,flatten"` } -func exportConfig() configOut { +func provideConfig() configOut { var conf map[string]interface{} _ = yaml.Unmarshal([]byte(sample), &conf) diff --git a/observability/observability_test.go b/observability/observability_test.go index 2132a929..30f69eed 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -24,7 +24,7 @@ func TestProvide(t *testing.T) { cleanup() } -func TestExportedConfigs(t *testing.T) { - Conf := exportConfig() +func Test_provideConfig(t *testing.T) { + Conf := provideConfig() assert.NotEmpty(t, Conf.Config) } diff --git a/otetcd/dependency.go b/otetcd/dependency.go index cdc1a5ef..f12c1381 100644 --- a/otetcd/dependency.go +++ b/otetcd/dependency.go @@ -28,7 +28,7 @@ Providers returns a set of dependencies including the Maker, the default *client *clientv3.Client */ func Providers() []interface{} { - return []interface{}{provideFactory, provideDefaultClient, provideExportedConfigs} + return []interface{}{provideFactory, provideDefaultClient, provideConfig} } // EtcdConfigInterceptor is an injector type hint that allows user to do @@ -146,12 +146,12 @@ func provideDefaultClient(maker Maker) (*clientv3.Client, error) { type configOut struct { di.Out - ExportedConfig []config.ExportedConfig `group:"config,flatten"` + Config []config.ExportedConfig `group:"config,flatten"` } -func provideExportedConfigs() configOut { +func provideConfig() configOut { return configOut{ - ExportedConfig: []config.ExportedConfig{ + Config: []config.ExportedConfig{ { "otetcd", map[string]interface{}{ diff --git a/otetcd/dependency_test.go b/otetcd/dependency_test.go index 90460345..46f20201 100644 --- a/otetcd/dependency_test.go +++ b/otetcd/dependency_test.go @@ -55,8 +55,8 @@ func TestProvideFactory(t *testing.T) { cleanup() } -func TestExportedConfigs(t *testing.T) { - conf := provideExportedConfigs() - _, err := yaml.Marshal(conf.ExportedConfig) +func Test_provideConfig(t *testing.T) { + conf := provideConfig() + _, err := yaml.Marshal(conf.Config) assert.NoError(t, err) } diff --git a/queue/dependency.go b/queue/dependency.go index 905213e5..e7b5c463 100644 --- a/queue/dependency.go +++ b/queue/dependency.go @@ -221,7 +221,9 @@ func provideDispatcher(maker DispatcherMaker) (dispatcherOut, error) { } type configOut struct { - Config []config.ExportedConfig + di.Out + + Config []config.ExportedConfig `group:"config,flatten"` } func provideConfig() configOut {