Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: pretty sagas config #116

Merged
merged 2 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,17 @@ func TestKoanfAdapter_Watch(t *gotesting.T) {
var ch = make(chan struct{})
go func() {
ka.watcher.Watch(ctx, func() error {
assert.NoError(t, ka.Reload(), "reload should be successful")
assert.NoError(t, ka.Reload(), "reload should be successful")
err := ka.Reload()
fmt.Println(err)
ch <- struct{}{}
ch <- struct{}{}
return nil
})
}()
time.Sleep(time.Second)
ioutil.WriteFile(f.Name(), []byte("foo: bar"), 0644)
ioutil.WriteFile(f.Name(), []byte("foo: bar"), 0644)
<-ch

<-ch

// The following test is flaky on CI. Unable to reproduce locally.
/*
Expand Down
5 changes: 5 additions & 0 deletions config/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ type Duration struct {
time.Duration
}

// Valid simplify if statement
func (d Duration) Valid() bool {
return d.Duration > 0
}

// MarshalYAML implements yaml.Marshaler
func (d Duration) MarshalYAML() (interface{}, error) {
return d.String(), nil
Expand Down
20 changes: 20 additions & 0 deletions config/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,23 @@ func TestDuration_MarshalJSON(t *testing.T) {
})
}
}

func TestDuration_Valid(t *testing.T) {
tests := []struct {
name string
val time.Duration
want bool
}{
{">0", 1, true},
{"<0", -1, false},
{"=0", 0, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := Duration{tt.val}
if got := d.Valid(); got != tt.want {
t.Errorf("Valid() = %v, want %v", got, tt.want)
}
})
}
}
51 changes: 51 additions & 0 deletions dtx/sagas/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package sagas

import (
"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/di"
"time"
)

type configOut struct {
di.Out

Config []config.ExportedConfig `group:"config,flatten"`
}

var defaultConfig = configuration{
SagaTimeout: config.Duration{Duration: 600 * time.Second},
RecoverInterval: config.Duration{Duration: 60 * time.Second},
}

func provideConfig() configOut {
return configOut{
Config: []config.ExportedConfig{
{
Owner: "sagas",
Data: map[string]interface{}{
"sagas": defaultConfig,
},
Comment: "The saga configuration.",
},
},
}
}

type configuration struct {
SagaTimeout config.Duration `json:"sagaTimeout" yaml:"sagaTimeout"`
RecoverInterval config.Duration `json:"recoverInterval" yaml:"recoverInterval"`
}

func (c configuration) getSagaTimeout() config.Duration {
if !c.SagaTimeout.Valid() {
return defaultConfig.SagaTimeout
}
return c.SagaTimeout
}

func (c configuration) getRecoverInterval() config.Duration {
if !c.RecoverInterval.Valid() {
return defaultConfig.RecoverInterval
}
return c.RecoverInterval
}
57 changes: 8 additions & 49 deletions dtx/sagas/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"time"

"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/contract"
"github.com/DoNewsCode/core/di"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
)

Expand Down Expand Up @@ -55,18 +55,13 @@ func provide(in in) out {
if in.Store == nil {
in.Store = NewInProcessStore()
}
recover := 60 * time.Second
timeout := 600 * time.Second

var configuration configuration
in.Conf.Unmarshal("sagas", &configuration)

if configuration.SagaTimeout.Duration != 0 {
timeout = configuration.SagaTimeout.Duration
}
if configuration.RecoverInterval.Duration != 0 {
recover = configuration.RecoverInterval.Duration
var conf configuration
err := in.Conf.Unmarshal("sagas", &conf)
if err != nil {
level.Warn(in.Logger).Log("err", err)
}
timeout := conf.getSagaTimeout().Duration
recoverVal := conf.getRecoverInterval().Duration

registry := NewRegistry(
in.Store,
Expand All @@ -79,7 +74,7 @@ func provide(in in) out {
eps[in.Steps[i].Name] = registry.AddStep(in.Steps[i])
}

return out{Registry: registry, Interval: recoverInterval(recover), SagaEndpoints: eps}
return out{Registry: registry, Interval: recoverInterval(recoverVal), SagaEndpoints: eps}
}

// ProvideRunGroup implements the RunProvider.
Expand All @@ -103,39 +98,3 @@ func (m out) ProvideRunGroup(group *run.Group) {
}

func (m out) ModuleSentinel() {}

type configOut struct {
Config []config.ExportedConfig
}

type configuration struct {
SagaTimeout config.Duration `json:"sagaTimeout" yaml:"sagaTimeout"`
RecoverInterval config.Duration `json:"recoverInterval" yaml:"recoverInterval"`
MySQL mysql `json:"mysql" yaml:"mysql"`
}

type mysql struct {
Connection string `json:"connection" yaml:"connection"`
Retention config.Duration `json:"retention" yaml:"retention"`
CleanupInterval config.Duration `json:"cleanupInterval" yaml:"cleanupInterval"`
}

func provideConfig() configOut {
return configOut{Config: []config.ExportedConfig{
{
Owner: "sagas",
Data: map[string]interface{}{
"sagas": configuration{
SagaTimeout: config.Duration{Duration: 600 * time.Second},
RecoverInterval: config.Duration{Duration: 60 * time.Second},
MySQL: mysql{
Connection: "default",
Retention: config.Duration{Duration: 168 * time.Hour},
CleanupInterval: config.Duration{Duration: time.Hour},
},
},
},
Comment: "The saga configuration.",
},
}}
}
4 changes: 2 additions & 2 deletions dtx/sagas/dependency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func TestNew(t *testing.T) {
})
}

func TestExportedConfigs(t *testing.T) {
func Test_provideConfig(t *testing.T) {
conf := provideConfig()
_, err := yaml.Marshal(conf)
_, err := yaml.Marshal(conf.Config)
assert.NoError(t, err)
}

Expand Down
60 changes: 60 additions & 0 deletions dtx/sagas/mysqlstore/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package mysqlstore

import (
"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/di"
"time"
)

type configOut struct {
di.Out

Config []config.ExportedConfig `group:"config,flatten"`
}

var defaultConfig = configuration{
Connection: "default",
Retention: config.Duration{Duration: 168 * time.Hour},
CleanupInterval: config.Duration{Duration: time.Hour},
}

func provideConfig() configOut {
return configOut{
Config: []config.ExportedConfig{
{
Owner: "sagas",
Data: map[string]interface{}{
"sagas-mysql": defaultConfig,
},
Comment: "The saga mysql store configuration.",
},
},
}
}

type configuration struct {
Connection string `json:"connection" yaml:"connection"`
Retention config.Duration `json:"retention" yaml:"retention"`
CleanupInterval config.Duration `json:"cleanupInterval" yaml:"cleanupInterval"`
}

func (c configuration) getConnection() string {
if c.Connection == "" {
return defaultConfig.Connection
}
return c.Connection
}

func (c configuration) getRetention() config.Duration {
if !c.Retention.Valid() {
return defaultConfig.Retention
}
return c.Retention
}

func (c configuration) getCleanupInterval() config.Duration {
if !c.CleanupInterval.Valid() {
return defaultConfig.CleanupInterval
}
return c.CleanupInterval
}
40 changes: 18 additions & 22 deletions dtx/sagas/mysqlstore/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package mysqlstore

import (
"context"
"time"

"github.com/DoNewsCode/core/contract"
"github.com/DoNewsCode/core/di"
"github.com/DoNewsCode/core/dtx/sagas"
"github.com/DoNewsCode/core/otgorm"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
)

Expand All @@ -21,33 +21,28 @@ Providers returns MySQLStore dependency.
- sagas.Store
*/
func Providers() di.Deps {
return []interface{}{provide}
return []interface{}{provide, provideConfig}
}

func provide(in in) (out, error) {
conn := "default"
if in.Conf.String("sagas.mysql.connection") != "" {
conn = in.Conf.String("sagas.mysql.connection")
var conf configuration
err := in.Conf.Unmarshal("sagas-mysql", &conf)
if err != nil {
level.Warn(in.Logger).Log("err", err)
}

conn := conf.getConnection()

db, err := in.Maker.Make(conn)
if err != nil {
return out{}, err
}
var opts []Option
d, _ := time.ParseDuration(in.Conf.String("sagas.mysql.retention"))
if d > 0 {
opts = append(
opts,
WithRetention(d),
)
}
d, _ = time.ParseDuration(in.Conf.String("sagas.mysql.cleanupInterval"))
if d > 0 {
opts = append(
opts,
WithCleanUpInterval(d),
)
}
retention := conf.getRetention().Duration
cleanupInterval := conf.getCleanupInterval().Duration

opts = append(opts, WithRetention(retention), WithCleanUpInterval(cleanupInterval))

store := New(db, opts...)
return out{
Conn: conn,
Expand All @@ -59,8 +54,9 @@ func provide(in in) (out, error) {
type in struct {
di.In

Maker otgorm.Maker
Conf contract.ConfigAccessor
Logger log.Logger
Maker otgorm.Maker
Conf contract.ConfigAccessor
}

type out struct {
Expand Down
14 changes: 11 additions & 3 deletions dtx/sagas/mysqlstore/dependency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (
"github.com/DoNewsCode/core"
"github.com/DoNewsCode/core/dtx/sagas"
"github.com/DoNewsCode/core/otgorm"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v3"
"gorm.io/gorm"
)

func TestProviders(t *testing.T) {
c := core.Default(
core.WithInline("sagas.mysql.connection", "default"),
core.WithInline("sagas.mysql.retention", "1h"),
core.WithInline("sagas.mysql.cleanupInterval", "1h"),
core.WithInline("sagas-mysql.connection", "default"),
core.WithInline("sagas-mysql.retention", "1h"),
core.WithInline("sagas-mysql.cleanupInterval", "1h"),
)
c.Provide(otgorm.Providers())
c.Provide(sagas.Providers())
Expand All @@ -34,3 +36,9 @@ func TestProviders(t *testing.T) {
}.Rollback("-1")
})
}

func Test_provideConfig(t *testing.T) {
conf := provideConfig()
_, err := yaml.Marshal(conf.Config)
assert.NoError(t, err)
}
4 changes: 3 additions & 1 deletion leader/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ func determineDriver(in *in) error {
}

type configOut struct {
Config []config.ExportedConfig
di.Out

Config []config.ExportedConfig `group:"config,flatten"`
}

func provideConfig() configOut {
Expand Down
4 changes: 2 additions & 2 deletions leader/dependency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func TestDetermineDriver(t *testing.T) {
assert.Error(t, err)
}

func TestExportedConfigs(t *testing.T) {
func Test_provideConfig(t *testing.T) {
conf := provideConfig()
_, err := yaml.Marshal(conf)
_, err := yaml.Marshal(conf.Config)
assert.NoError(t, err)
}
Loading