From 98787d254b8bec15b9d6c895ab4c0592bf4d8080 Mon Sep 17 00:00:00 2001 From: Trock Date: Mon, 10 May 2021 17:22:15 +0800 Subject: [PATCH 1/8] feat: cluster address in env --- config/env.go | 16 ++++++++ config/remote/remote_test.go | 7 +--- leader/election_test.go | 5 --- leader/leaderetcd/etcd_test.go | 7 +--- leader/leaderredis/redis_test.go | 7 +--- otes/provider.go | 7 +--- otes/provider_test.go | 16 +++----- otes/tracing_test.go | 7 ++-- otetcd/dependency.go | 9 +---- otetcd/dependency_test.go | 9 +---- otetcd/tracing_test.go | 6 +-- otkafka/dependency_test.go | 10 ++--- otkafka/example_test.go | 17 +++++--- otkafka/reader_config.go | 48 +++++++++++------------ otkafka/reader_config_test.go | 4 +- otkafka/setup_test.go | 67 +++++++++++++++----------------- otkafka/transport_test.go | 2 +- otkafka/writer_config.go | 28 ++++++------- otkafka/writer_test.go | 12 +++--- otredis/module_test.go | 10 ++--- otredis/provider.go | 10 ++--- otredis/provider_test.go | 3 +- queue/dependency_test.go | 8 ++-- queue/dispatcher_test.go | 13 ++----- queue/redis_driver.go | 8 +--- 25 files changed, 151 insertions(+), 185 deletions(-) diff --git a/config/env.go b/config/env.go index 7b88e11a..5756c870 100644 --- a/config/env.go +++ b/config/env.go @@ -1,6 +1,7 @@ package config import ( + "os" "strings" "github.com/DoNewsCode/core/contract" @@ -80,3 +81,18 @@ func NewEnvFromConf(conf contract.ConfigAccessor) Env { envStr := conf.String("env") return NewEnv(envStr) } + +func getDefaultAddrsFromEnv(env, defaultVal string) []string { + if v := os.Getenv(env); v != "" { + return strings.Split(v, ",") + } + return []string{defaultVal} +} + +// Default multiple addresses from env +var ( + ENV_DEFAULT_ELASTICSEARCH_ADDRS = getDefaultAddrsFromEnv("ELASTICSEARCH_ADDR", "http://127.0.0.1:9200") + ENV_DEFAULT_ETCD_ADDRS = getDefaultAddrsFromEnv("ETCD_ADDR", "127.0.0.1:2379") + ENV_DEFAULT_KAFKA_ADDRS = getDefaultAddrsFromEnv("KAFKA_ADDR", "127.0.0.1:9092") + ENV_DEFAULT_REDIS_ADDRS = getDefaultAddrsFromEnv("REDIS_ADDR", "127.0.0.1:6379") +) diff --git a/config/remote/remote_test.go b/config/remote/remote_test.go index a51a422c..737257e7 100644 --- a/config/remote/remote_test.go +++ b/config/remote/remote_test.go @@ -3,21 +3,18 @@ package remote import ( "context" "fmt" - "os" "sync" "testing" "time" + "github.com/DoNewsCode/core/config" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3" ) func TestRemote(t *testing.T) { - if os.Getenv("ETCD_ADDR") == "" { - t.Skip("Set env ETCD_ADDR to run remote tests") - } cfg := &clientv3.Config{ - Endpoints: []string{os.Getenv("ETCD_ADDR")}, + Endpoints: config.ENV_DEFAULT_ETCD_ADDRS, DialTimeout: 2 * time.Second, } diff --git a/leader/election_test.go b/leader/election_test.go index 69686e82..e8831aae 100644 --- a/leader/election_test.go +++ b/leader/election_test.go @@ -2,7 +2,6 @@ package leader import ( "context" - "fmt" "os" "testing" "time" @@ -16,10 +15,6 @@ import ( ) func TestMain(m *testing.M) { - if os.Getenv("ETCD_ADDR") == "" { - fmt.Println("Set env ETCD_ADDR to run leader tests") - os.Exit(0) - } os.Exit(m.Run()) } diff --git a/leader/leaderetcd/etcd_test.go b/leader/leaderetcd/etcd_test.go index 9f3d21fe..11319fdf 100644 --- a/leader/leaderetcd/etcd_test.go +++ b/leader/leaderetcd/etcd_test.go @@ -2,19 +2,16 @@ package leaderetcd import ( "context" - "os" "testing" + "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/key" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3" ) func TestNewEtcdDriver(t *testing.T) { - if os.Getenv("ETCD_ADDR") == "" { - t.Skip("Set env ETCD_ADDR to run leaderetcd tests") - } - client, _ := clientv3.New(clientv3.Config{Endpoints: []string{os.Getenv("ETCD_ADDR")}}) + client, _ := clientv3.New(clientv3.Config{Endpoints: config.ENV_DEFAULT_ETCD_ADDRS}) e1 := NewEtcdDriver(client, key.New("test")) e2 := NewEtcdDriver(client, key.New("test")) diff --git a/leader/leaderredis/redis_test.go b/leader/leaderredis/redis_test.go index fafdbaa4..d2b6ab9d 100644 --- a/leader/leaderredis/redis_test.go +++ b/leader/leaderredis/redis_test.go @@ -4,10 +4,10 @@ package leaderredis import ( "context" - "os" "testing" "time" + "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/events" "github.com/DoNewsCode/core/key" "github.com/DoNewsCode/core/leader" @@ -16,10 +16,7 @@ import ( ) func TestCampaign(t *testing.T) { - if os.Getenv("REDIS_ADDR") == "" { - t.Skip("Set env REDIS_ADDR to run leaderredis tests") - } - client := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: []string{os.Getenv("REDIS_ADDR")}}) + client := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.ENV_DEFAULT_REDIS_ADDRS}) driver := RedisDriver{ client: client, keyer: key.New(), diff --git a/otes/provider.go b/otes/provider.go index 68053a16..1fc4ec9a 100644 --- a/otes/provider.go +++ b/otes/provider.go @@ -3,7 +3,6 @@ package otes import ( "fmt" "net/http" - "os" "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" @@ -95,11 +94,7 @@ func provideEsFactory(p in) (out, func()) { if name != "default" { return di.Pair{}, fmt.Errorf("elastic configuration %s not valid", name) } - defaultURL := "http://localhost:9200" - if os.Getenv("ELASTICSEARCH_ADDR") != "" { - defaultURL = os.Getenv("ELASTICSEARCH_ADDR") - } - conf.URL = []string{defaultURL} + conf.URL = config.ENV_DEFAULT_ELASTICSEARCH_ADDRS } if p.Interceptor != nil { p.Interceptor(name, &conf) diff --git a/otes/provider_test.go b/otes/provider_test.go index 6f6bbf26..ebf207cc 100644 --- a/otes/provider_test.go +++ b/otes/provider_test.go @@ -1,28 +1,24 @@ package otes import ( - "fmt" + "os" + "testing" + "github.com/DoNewsCode/core/config" "github.com/go-kit/kit/log" "github.com/olivere/elastic/v7" "github.com/stretchr/testify/assert" - "os" - "testing" ) func TestMain(m *testing.M) { - if os.Getenv("ELASTICSEARCH_ADDR") == "" { - fmt.Println("Set env ELASTICSEARCH_ADDR to run otes tests") - os.Exit(0) - } os.Exit(m.Run()) } func TestNewEsFactory(t *testing.T) { esFactory, cleanup := provideEsFactory(in{ Conf: config.MapAdapter{"es": map[string]Config{ - "default": {URL: []string{os.Getenv("ELASTICSEARCH_ADDR")}}, - "alternative": {URL: []string{os.Getenv("ELASTICSEARCH_ADDR")}}, + "default": {URL: config.ENV_DEFAULT_ELASTICSEARCH_ADDRS}, + "alternative": {URL: config.ENV_DEFAULT_ELASTICSEARCH_ADDRS}, }}, Logger: log.NewNopLogger(), Tracer: nil, @@ -41,7 +37,7 @@ func TestNewEsFactoryWithOptions(t *testing.T) { var called bool esFactory, cleanup := provideEsFactory(in{ Conf: config.MapAdapter{"es": map[string]Config{ - "default": {URL: []string{os.Getenv("ELASTICSEARCH_ADDR")}}, + "default": {URL: config.ENV_DEFAULT_ELASTICSEARCH_ADDRS}, }}, Logger: log.NewNopLogger(), Options: []elastic.ClientOptionFunc{ diff --git a/otes/tracing_test.go b/otes/tracing_test.go index 1949abf2..42c1deba 100644 --- a/otes/tracing_test.go +++ b/otes/tracing_test.go @@ -3,7 +3,6 @@ package otes import ( "context" "net/http" - "os" "testing" "github.com/DoNewsCode/core/config" @@ -18,8 +17,8 @@ func TestTracing(t *testing.T) { opentracing.SetGlobalTracer(tracer) factory, cleanup := provideEsFactory(in{ Conf: config.MapAdapter{"es": map[string]Config{ - "default": {URL: []string{"http://localhost:9200"}}, - "alternative": {URL: []string{"http://localhost:9200"}}, + "default": {URL: config.ENV_DEFAULT_ELASTICSEARCH_ADDRS}, + "alternative": {URL: config.ENV_DEFAULT_ELASTICSEARCH_ADDRS}, }}, Logger: log.NewNopLogger(), Tracer: tracer, @@ -31,7 +30,7 @@ func TestTracing(t *testing.T) { span, ctx := opentracing.StartSpanFromContextWithTracer(context.Background(), tracer, "es.query") defer span.Finish() - res, code, err := client.Ping(os.Getenv("ELASTICSEARCH_ADDR")).Do(ctx) + res, code, err := client.Ping(config.ENV_DEFAULT_ELASTICSEARCH_ADDRS[0]).Do(ctx) assert.NoError(t, err) assert.Equal(t, http.StatusOK, code) assert.NotNil(t, res) diff --git a/otetcd/dependency.go b/otetcd/dependency.go index 48775708..79c69951 100644 --- a/otetcd/dependency.go +++ b/otetcd/dependency.go @@ -2,7 +2,6 @@ package otetcd import ( "fmt" - "os" "time" "github.com/DoNewsCode/core/config" @@ -95,13 +94,7 @@ func provideFactory(p factoryIn) (FactoryOut, func()) { if name != "default" { return di.Pair{}, fmt.Errorf("etcd configuration %s not valid", name) } - - defaultEndpoint := "127.0.0.1:2379" - - if os.Getenv("ETCD_ADDR") != "" { - defaultEndpoint = os.Getenv("ETCD_ADDR") - } - conf = Option{Endpoints: []string{defaultEndpoint}} + conf = Option{Endpoints: config.ENV_DEFAULT_ETCD_ADDRS} } co := clientv3.Config{ Endpoints: conf.Endpoints, diff --git a/otetcd/dependency_test.go b/otetcd/dependency_test.go index f32e09db..bd199e6c 100644 --- a/otetcd/dependency_test.go +++ b/otetcd/dependency_test.go @@ -1,7 +1,6 @@ package otetcd import ( - "fmt" "os" "testing" @@ -15,10 +14,6 @@ import ( ) func TestMain(m *testing.M) { - if os.Getenv("ETCD_ADDR") == "" { - fmt.Println("Set env ETCD_ADDR to run otetcd tests") - os.Exit(0) - } os.Exit(m.Run()) } @@ -47,10 +42,10 @@ func TestProvideFactory(t *testing.T) { out, cleanup := provideFactory(factoryIn{ Conf: config.MapAdapter{"etcd": map[string]Option{ "default": { - Endpoints: []string{"localhost:2379"}, + Endpoints: config.ENV_DEFAULT_ETCD_ADDRS, }, "alternative": { - Endpoints: []string{"localhost:2379"}, + Endpoints: config.ENV_DEFAULT_ETCD_ADDRS, }, }}, Logger: log.NewNopLogger(), diff --git a/otetcd/tracing_test.go b/otetcd/tracing_test.go index 5424c1ae..dde5dcc1 100644 --- a/otetcd/tracing_test.go +++ b/otetcd/tracing_test.go @@ -2,14 +2,14 @@ package otetcd import ( "context" + "testing" + "github.com/DoNewsCode/core/config" "github.com/go-kit/kit/log" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3" - "os" - "testing" ) func TestTracing(t *testing.T) { @@ -19,7 +19,7 @@ func TestTracing(t *testing.T) { Logger: log.NewNopLogger(), Conf: config.MapAdapter{"etcd": map[string]Option{ "default": { - Endpoints: []string{os.Getenv("ETCD_ADDR")}, + Endpoints: config.ENV_DEFAULT_ETCD_ADDRS, }, }}, Interceptor: func(name string, options *clientv3.Config) { diff --git a/otkafka/dependency_test.go b/otkafka/dependency_test.go index 1c49e5fa..96d9615b 100644 --- a/otkafka/dependency_test.go +++ b/otkafka/dependency_test.go @@ -19,7 +19,7 @@ func TestProvideReaderFactory(t *testing.T) { In: di.In{}, Conf: config.MapAdapter{"kafka.reader": map[string]ReaderConfig{ "default": { - Brokers: []string{"127.0.0.1:9092"}, + Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, Topic: "Test", }, "alternative": { @@ -43,11 +43,11 @@ func TestProvideWriterFactory(t *testing.T) { In: di.In{}, Conf: config.MapAdapter{"kafka.writer": map[string]WriterConfig{ "default": { - Brokers: []string{"127.0.0.1:9092"}, + Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, Topic: "Test", }, "alternative": { - Brokers: []string{"127.0.0.1:9092"}, + Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, Topic: "Test", }, }}, @@ -67,11 +67,11 @@ func TestProvideKafka(t *testing.T) { Logger: log.NewNopLogger(), Conf: config.MapAdapter{"kafka.writer": map[string]WriterConfig{ "default": { - Brokers: []string{"127.0.0.1:9092"}, + Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, Topic: "Test", }, "alternative": { - Brokers: []string{"127.0.0.1:9092"}, + Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, Topic: "Test", }, }}, diff --git a/otkafka/example_test.go b/otkafka/example_test.go index 053b71d9..3d8bd5ba 100644 --- a/otkafka/example_test.go +++ b/otkafka/example_test.go @@ -3,9 +3,10 @@ package otkafka_test import ( "context" "fmt" - "os" + "strings" "github.com/DoNewsCode/core" + "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/otkafka" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/rawbytes" @@ -13,24 +14,30 @@ import ( ) func Example_reader() { - var config = ` + brokers := make([]string, len(config.ENV_DEFAULT_KAFKA_ADDRS)) + for i, addr := range config.ENV_DEFAULT_KAFKA_ADDRS { + brokers[i] = fmt.Sprintf(` - %s`, addr) + } + brokersStr := strings.Join(brokers, ` +`) + var conf = ` log: level: none kafka: reader: default: brokers: - - ` + os.Getenv("KAFKA_ADDR") + ` +` + brokersStr + ` topic: example writer: default: brokers: - - ` + os.Getenv("KAFKA_ADDR") + ` +` + brokersStr + ` topic: example ` - c := core.Default(core.WithConfigStack(rawbytes.Provider([]byte(config)), yaml.Parser())) + c := core.Default(core.WithConfigStack(rawbytes.Provider([]byte(conf)), yaml.Parser())) c.Provide(otkafka.Providers()) c.Invoke(func(writer *kafka.Writer) { err := writer.WriteMessages(context.Background(), kafka.Message{Value: []byte(`hello`)}) diff --git a/otkafka/reader_config.go b/otkafka/reader_config.go index b1d99ee1..662477c0 100644 --- a/otkafka/reader_config.go +++ b/otkafka/reader_config.go @@ -1,9 +1,9 @@ package otkafka import ( - "os" "time" + "github.com/DoNewsCode/core/config" "github.com/segmentio/kafka-go" ) @@ -131,30 +131,30 @@ type ReaderConfig struct { // during kafka.Reader's creation type ReaderInterceptor func(name string, reader *kafka.ReaderConfig) -func fromReaderConfig(config ReaderConfig) kafka.ReaderConfig { - if len(config.Brokers) == 0 { - config.Brokers = []string{os.Getenv("KAFKA_ADDR")} +func fromReaderConfig(conf ReaderConfig) kafka.ReaderConfig { + if len(conf.Brokers) == 0 { + conf.Brokers = config.ENV_DEFAULT_KAFKA_ADDRS } return kafka.ReaderConfig{ - Brokers: config.Brokers, - GroupID: config.GroupID, - Topic: config.Topic, - Partition: config.MaxAttempts, - MinBytes: config.MinBytes, - MaxBytes: config.MaxBytes, - MaxWait: config.MaxWait, - ReadLagInterval: config.ReadLagInterval, - HeartbeatInterval: config.HeartbeatInterval, - CommitInterval: config.CommitInterval, - PartitionWatchInterval: config.PartitionWatchInterval, - WatchPartitionChanges: config.WatchPartitionChanges, - SessionTimeout: config.SessionTimeout, - RebalanceTimeout: config.RebalanceTimeout, - JoinGroupBackoff: config.JoinGroupBackoff, - RetentionTime: config.RetentionTime, - StartOffset: config.StartOffset, - ReadBackoffMin: config.ReadBackoffMin, - ReadBackoffMax: config.ReadBackoffMax, - MaxAttempts: config.MaxAttempts, + Brokers: conf.Brokers, + GroupID: conf.GroupID, + Topic: conf.Topic, + Partition: conf.MaxAttempts, + MinBytes: conf.MinBytes, + MaxBytes: conf.MaxBytes, + MaxWait: conf.MaxWait, + ReadLagInterval: conf.ReadLagInterval, + HeartbeatInterval: conf.HeartbeatInterval, + CommitInterval: conf.CommitInterval, + PartitionWatchInterval: conf.PartitionWatchInterval, + WatchPartitionChanges: conf.WatchPartitionChanges, + SessionTimeout: conf.SessionTimeout, + RebalanceTimeout: conf.RebalanceTimeout, + JoinGroupBackoff: conf.JoinGroupBackoff, + RetentionTime: conf.RetentionTime, + StartOffset: conf.StartOffset, + ReadBackoffMin: conf.ReadBackoffMin, + ReadBackoffMax: conf.ReadBackoffMax, + MaxAttempts: conf.MaxAttempts, } } diff --git a/otkafka/reader_config_test.go b/otkafka/reader_config_test.go index 21103550..3351e4a1 100644 --- a/otkafka/reader_config_test.go +++ b/otkafka/reader_config_test.go @@ -1,7 +1,7 @@ package otkafka import ( - "os" + "github.com/DoNewsCode/core/config" "testing" "github.com/stretchr/testify/assert" @@ -9,5 +9,5 @@ import ( func Test_fromReaderConfig(t *testing.T) { reader := fromReaderConfig(ReaderConfig{}) - assert.Equal(t, os.Getenv("KAFKA_ADDR"), reader.Brokers[0]) + assert.Equal(t, config.ENV_DEFAULT_KAFKA_ADDRS, reader.Brokers) } diff --git a/otkafka/setup_test.go b/otkafka/setup_test.go index 7a796a9b..c4c92a2f 100644 --- a/otkafka/setup_test.go +++ b/otkafka/setup_test.go @@ -1,20 +1,16 @@ package otkafka import ( - "fmt" - "github.com/segmentio/kafka-go" "net" "os" "strconv" "testing" + + "github.com/DoNewsCode/core/config" + "github.com/segmentio/kafka-go" ) func TestMain(m *testing.M) { - if os.Getenv("KAFKA_ADDR") == "" { - fmt.Println("Set env KAFKA_ADDR to run otkafka tests") - os.Exit(0) - } - setupTopic() os.Exit(m.Run()) @@ -23,35 +19,34 @@ func TestMain(m *testing.M) { func setupTopic() { var topics = []string{"trace", "test", "example"} + conn, err := kafka.Dial("tcp", config.ENV_DEFAULT_KAFKA_ADDRS[0]) + if err != nil { + panic(err.Error()) + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + panic(err.Error()) + } + var controllerConn *kafka.Conn + controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + panic(err.Error()) + } + defer controllerConn.Close() + + topicConfigs := make([]kafka.TopicConfig, 0) for _, topic := range topics { - conn, err := kafka.Dial("tcp", os.Getenv("KAFKA_ADDR")) - if err != nil { - panic(err.Error()) - } - defer conn.Close() - - controller, err := conn.Controller() - if err != nil { - panic(err.Error()) - } - var controllerConn *kafka.Conn - controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) - if err != nil { - panic(err.Error()) - } - defer controllerConn.Close() - - topicConfigs := []kafka.TopicConfig{ - kafka.TopicConfig{ - Topic: topic, - NumPartitions: 1, - ReplicationFactor: 1, - }, - } - - err = controllerConn.CreateTopics(topicConfigs...) - if err != nil { - panic(err.Error()) - } + topicConfigs = append(topicConfigs, kafka.TopicConfig{ + Topic: topic, + NumPartitions: 1, + ReplicationFactor: 1, + }) + } + + err = controllerConn.CreateTopics(topicConfigs...) + if err != nil { + panic(err.Error()) } } diff --git a/otkafka/transport_test.go b/otkafka/transport_test.go index e3833f09..22dba1b5 100644 --- a/otkafka/transport_test.go +++ b/otkafka/transport_test.go @@ -21,7 +21,7 @@ func TestTransport_RoundTrip(t *testing.T) { In: di.In{}, Conf: config.MapAdapter{"kafka.writer": map[string]WriterConfig{ "default": { - Brokers: []string{"127.0.0.1:9092"}, + Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, Topic: "Test", }, }}, diff --git a/otkafka/writer_config.go b/otkafka/writer_config.go index d7623a08..689b098e 100644 --- a/otkafka/writer_config.go +++ b/otkafka/writer_config.go @@ -1,9 +1,9 @@ package otkafka import ( - "os" "time" + "github.com/DoNewsCode/core/config" "github.com/segmentio/kafka-go" ) @@ -83,20 +83,20 @@ type WriterConfig struct { Async bool `json:"async" yaml:"async"` } -func fromWriterConfig(config WriterConfig) kafka.Writer { - if len(config.Brokers) == 0 { - config.Brokers = []string{os.Getenv("KAFKA_ADDR")} +func fromWriterConfig(conf WriterConfig) kafka.Writer { + if len(conf.Brokers) == 0 { + conf.Brokers = config.ENV_DEFAULT_KAFKA_ADDRS } return kafka.Writer{ - Addr: kafka.TCP(config.Brokers...), - Topic: config.Topic, - MaxAttempts: config.MaxAttempts, - BatchSize: config.BatchSize, - BatchBytes: int64(config.BatchBytes), - BatchTimeout: config.BatchTimeout, - ReadTimeout: config.ReadTimeout, - WriteTimeout: config.WriteTimeout, - RequiredAcks: kafka.RequiredAcks(config.RequiredAcks), - Async: config.Async, + Addr: kafka.TCP(conf.Brokers...), + Topic: conf.Topic, + MaxAttempts: conf.MaxAttempts, + BatchSize: conf.BatchSize, + BatchBytes: int64(conf.BatchBytes), + BatchTimeout: conf.BatchTimeout, + ReadTimeout: conf.ReadTimeout, + WriteTimeout: conf.WriteTimeout, + RequiredAcks: kafka.RequiredAcks(conf.RequiredAcks), + Async: conf.Async, } } diff --git a/otkafka/writer_test.go b/otkafka/writer_test.go index 0da040ba..7b87bec5 100644 --- a/otkafka/writer_test.go +++ b/otkafka/writer_test.go @@ -2,20 +2,22 @@ package otkafka import ( "context" + "strings" + "testing" + + "github.com/DoNewsCode/core/config" "github.com/go-kit/kit/log" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" - "os" - "testing" ) func TestWriter(t *testing.T) { { ctx := context.Background() kw := kafka.Writer{ - Addr: kafka.TCP(os.Getenv("KAFKA_ADDR")), + Addr: kafka.TCP(config.ENV_DEFAULT_KAFKA_ADDRS...), Topic: "trace", } tracer := mocktracer.New() @@ -30,7 +32,7 @@ func TestWriter(t *testing.T) { { ctx := context.Background() - kr := kafka.NewReader(kafka.ReaderConfig{Brokers: []string{os.Getenv("KAFKA_ADDR")}, Topic: "trace", GroupID: "test", MinBytes: 1, MaxBytes: 1}) + kr := kafka.NewReader(kafka.ReaderConfig{Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, Topic: "trace", GroupID: "test", MinBytes: 1, MaxBytes: 1}) tracer := mocktracer.New() msg, err := kr.ReadMessage(ctx) assert.NoError(t, err) @@ -45,5 +47,5 @@ func TestWriter(t *testing.T) { func Test_fromWriterConfig(t *testing.T) { writer := fromWriterConfig(WriterConfig{}) - assert.Equal(t, os.Getenv("KAFKA_ADDR"), writer.Addr.String()) + assert.Equal(t, strings.Join(config.ENV_DEFAULT_KAFKA_ADDRS, ","), writer.Addr.String()) } diff --git a/otredis/module_test.go b/otredis/module_test.go index 6abb0c17..a8b9ffa7 100644 --- a/otredis/module_test.go +++ b/otredis/module_test.go @@ -2,23 +2,19 @@ package otredis import ( "context" - "fmt" - "github.com/go-redis/redis/v8" "os" "testing" "time" "github.com/DoNewsCode/core" + "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/di" mock_metrics "github.com/DoNewsCode/core/otredis/mocks" + "github.com/go-redis/redis/v8" "github.com/golang/mock/gomock" ) func TestMain(m *testing.M) { - if os.Getenv("REDIS_ADDR") == "" { - fmt.Println("Set env REDIS_ADDR to run otredis tests") - os.Exit(0) - } os.Exit(m.Run()) } @@ -38,7 +34,7 @@ func TestModule_ProvideRunGroup(t *testing.T) { m.EXPECT().Set(gomock.Any()).MinTimes(1) c := core.New( - core.WithInline("redis.default.addrs", []string{os.Getenv("REDIS_ADDR")}), + core.WithInline("redis.default.addrs", config.ENV_DEFAULT_REDIS_ADDRS), core.WithInline("redisMetrics.interval", "1ms"), core.WithInline("log.level", "none"), ) diff --git a/otredis/provider.go b/otredis/provider.go index c18aab64..a822ccf4 100644 --- a/otredis/provider.go +++ b/otredis/provider.go @@ -2,7 +2,6 @@ package otredis import ( "fmt" - "os" "time" "github.com/DoNewsCode/core/config" @@ -98,12 +97,9 @@ func provideRedisFactory(p in) (out, func()) { if name != "default" { return di.Pair{}, fmt.Errorf("redis configuration %s not valid", name) } - addr := "localhost:6379" - if os.Getenv("REDIS_ADDR") != "" { - addr = os.Getenv("REDIS_ADDR") - } + base = RedisUniversalOptions{ - Addrs: []string{addr}, + Addrs: config.ENV_DEFAULT_REDIS_ADDRS, } } full = redis.UniversalOptions{ @@ -192,7 +188,7 @@ func provideConfig() configOut { Data: map[string]interface{}{ "redis": map[string]RedisUniversalOptions{ "default": { - Addrs: []string{os.Getenv("REDIS_ADDR")}, + Addrs: config.ENV_DEFAULT_REDIS_ADDRS, }, }, "redisMetrics": metricsConf{ diff --git a/otredis/provider_test.go b/otredis/provider_test.go index b098fd77..3a04d10e 100644 --- a/otredis/provider_test.go +++ b/otredis/provider_test.go @@ -1,7 +1,6 @@ package otredis import ( - "os" "testing" "github.com/DoNewsCode/core/config" @@ -42,5 +41,5 @@ func TestProvideConfigs(t *testing.T) { k.Load(rawbytes.Provider(bytes), yaml.Parser()) k.Unmarshal("redis.default", &r) assert.Equal(t, 0, r.DB) - assert.Equal(t, []string{os.Getenv("REDIS_ADDR")}, r.Addrs) + assert.Equal(t, config.ENV_DEFAULT_REDIS_ADDRS, r.Addrs) } diff --git a/queue/dependency_test.go b/queue/dependency_test.go index cb023b1c..3c1ef723 100644 --- a/queue/dependency_test.go +++ b/queue/dependency_test.go @@ -2,21 +2,21 @@ package queue import ( "context" + "testing" + "time" + "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/events" "github.com/go-kit/kit/log" "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" - "os" - "testing" - "time" ) type maker struct{} func (m maker) Make(name string) (redis.UniversalClient, error) { - return redis.NewUniversalClient(&redis.UniversalOptions{Addrs: []string{os.Getenv("REDIS_ADDR")}}), nil + return redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.ENV_DEFAULT_REDIS_ADDRS}), nil } func TestProvideDispatcher(t *testing.T) { diff --git a/queue/dispatcher_test.go b/queue/dispatcher_test.go index d293e60a..bc9ef403 100644 --- a/queue/dispatcher_test.go +++ b/queue/dispatcher_test.go @@ -3,18 +3,17 @@ package queue import ( "context" "errors" - "fmt" "math/rand" "os" + "testing" "time" + "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/events" "github.com/DoNewsCode/core/logging" "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" - - "testing" ) type MockListener func(ctx context.Context, event contract.Event) error @@ -53,16 +52,12 @@ type MockEvent struct { } func TestMain(m *testing.M) { - if os.Getenv("REDIS_ADDR") == "" { - fmt.Println("Set env REDIS_ADDR to run queue tests") - os.Exit(0) - } os.Exit(m.Run()) } func setUp() *QueueableDispatcher { s := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: []string{os.Getenv("REDIS_ADDR")}, + Addrs: config.ENV_DEFAULT_REDIS_ADDRS, }) driver := RedisDriver{ Logger: logging.NewLogger("logfmt"), @@ -89,7 +84,7 @@ func tearDown() { Waiting: "waiting", Timeout: "timeout", } - redisClient := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: []string{os.Getenv("REDIS_ADDR")}}) + redisClient := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.ENV_DEFAULT_REDIS_ADDRS}) redisClient.Del(context.Background(), channel.Delayed) redisClient.Del(context.Background(), channel.Failed) redisClient.Del(context.Background(), channel.Reserved) diff --git a/queue/redis_driver.go b/queue/redis_driver.go index 185343b6..a78ffe89 100644 --- a/queue/redis_driver.go +++ b/queue/redis_driver.go @@ -4,10 +4,10 @@ import ( "context" "fmt" "math/rand" - "os" "sync" "time" + "github.com/DoNewsCode/core/config" "github.com/go-kit/kit/log" "github.com/go-redis/redis/v8" "github.com/pkg/errors" @@ -250,12 +250,8 @@ func (r *RedisDriver) populateDefaults() { return } if r.RedisClient == nil { - addr := "localhost:6379" - if os.Getenv("REDIS_ADDR") != "" { - addr = os.Getenv("REDIS_ADDR") - } r.RedisClient = redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: []string{addr}, + Addrs: config.ENV_DEFAULT_REDIS_ADDRS, }) } if r.Packer == nil { From 45c5503671eb2aabd27b3766c53c9ac624c21bcc Mon Sep 17 00:00:00 2001 From: Trock Date: Mon, 10 May 2021 17:27:34 +0800 Subject: [PATCH 2/8] fix: rename LL_CAPS to CamelCase --- config/env.go | 8 ++++---- config/remote/remote_test.go | 2 +- leader/leaderetcd/etcd_test.go | 2 +- leader/leaderredis/redis_test.go | 2 +- otes/provider.go | 2 +- otes/provider_test.go | 6 +++--- otes/tracing_test.go | 6 +++--- otetcd/dependency.go | 2 +- otetcd/dependency_test.go | 4 ++-- otetcd/tracing_test.go | 2 +- otkafka/dependency_test.go | 10 +++++----- otkafka/example_test.go | 4 ++-- otkafka/reader_config.go | 2 +- otkafka/reader_config_test.go | 2 +- otkafka/setup_test.go | 2 +- otkafka/transport_test.go | 2 +- otkafka/writer_config.go | 2 +- otkafka/writer_test.go | 6 +++--- otredis/module_test.go | 2 +- otredis/provider.go | 4 ++-- otredis/provider_test.go | 2 +- queue/dependency_test.go | 2 +- queue/dispatcher_test.go | 4 ++-- queue/redis_driver.go | 2 +- 24 files changed, 41 insertions(+), 41 deletions(-) diff --git a/config/env.go b/config/env.go index 5756c870..abff56fe 100644 --- a/config/env.go +++ b/config/env.go @@ -91,8 +91,8 @@ func getDefaultAddrsFromEnv(env, defaultVal string) []string { // Default multiple addresses from env var ( - ENV_DEFAULT_ELASTICSEARCH_ADDRS = getDefaultAddrsFromEnv("ELASTICSEARCH_ADDR", "http://127.0.0.1:9200") - ENV_DEFAULT_ETCD_ADDRS = getDefaultAddrsFromEnv("ETCD_ADDR", "127.0.0.1:2379") - ENV_DEFAULT_KAFKA_ADDRS = getDefaultAddrsFromEnv("KAFKA_ADDR", "127.0.0.1:9092") - ENV_DEFAULT_REDIS_ADDRS = getDefaultAddrsFromEnv("REDIS_ADDR", "127.0.0.1:6379") + EnvDefaultElasticsearchAddrs = getDefaultAddrsFromEnv("ELASTICSEARCH_ADDR", "http://127.0.0.1:9200") + EnvDefaultEtcdAddrs = getDefaultAddrsFromEnv("ETCD_ADDR", "127.0.0.1:2379") + EnvDefaultKafkaAddrs = getDefaultAddrsFromEnv("KAFKA_ADDR", "127.0.0.1:9092") + EnvDefaultRedisAddrs = getDefaultAddrsFromEnv("REDIS_ADDR", "127.0.0.1:6379") ) diff --git a/config/remote/remote_test.go b/config/remote/remote_test.go index 737257e7..fafbc50e 100644 --- a/config/remote/remote_test.go +++ b/config/remote/remote_test.go @@ -14,7 +14,7 @@ import ( func TestRemote(t *testing.T) { cfg := &clientv3.Config{ - Endpoints: config.ENV_DEFAULT_ETCD_ADDRS, + Endpoints: config.EnvDefaultEtcdAddrs, DialTimeout: 2 * time.Second, } diff --git a/leader/leaderetcd/etcd_test.go b/leader/leaderetcd/etcd_test.go index 11319fdf..436fa64e 100644 --- a/leader/leaderetcd/etcd_test.go +++ b/leader/leaderetcd/etcd_test.go @@ -11,7 +11,7 @@ import ( ) func TestNewEtcdDriver(t *testing.T) { - client, _ := clientv3.New(clientv3.Config{Endpoints: config.ENV_DEFAULT_ETCD_ADDRS}) + client, _ := clientv3.New(clientv3.Config{Endpoints: config.EnvDefaultEtcdAddrs}) e1 := NewEtcdDriver(client, key.New("test")) e2 := NewEtcdDriver(client, key.New("test")) diff --git a/leader/leaderredis/redis_test.go b/leader/leaderredis/redis_test.go index d2b6ab9d..410358db 100644 --- a/leader/leaderredis/redis_test.go +++ b/leader/leaderredis/redis_test.go @@ -16,7 +16,7 @@ import ( ) func TestCampaign(t *testing.T) { - client := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.ENV_DEFAULT_REDIS_ADDRS}) + client := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.EnvDefaultRedisAddrs}) driver := RedisDriver{ client: client, keyer: key.New(), diff --git a/otes/provider.go b/otes/provider.go index 1fc4ec9a..dab74455 100644 --- a/otes/provider.go +++ b/otes/provider.go @@ -94,7 +94,7 @@ func provideEsFactory(p in) (out, func()) { if name != "default" { return di.Pair{}, fmt.Errorf("elastic configuration %s not valid", name) } - conf.URL = config.ENV_DEFAULT_ELASTICSEARCH_ADDRS + conf.URL = config.EnvDefaultElasticsearchAddrs } if p.Interceptor != nil { p.Interceptor(name, &conf) diff --git a/otes/provider_test.go b/otes/provider_test.go index ebf207cc..ad0f9130 100644 --- a/otes/provider_test.go +++ b/otes/provider_test.go @@ -17,8 +17,8 @@ func TestMain(m *testing.M) { func TestNewEsFactory(t *testing.T) { esFactory, cleanup := provideEsFactory(in{ Conf: config.MapAdapter{"es": map[string]Config{ - "default": {URL: config.ENV_DEFAULT_ELASTICSEARCH_ADDRS}, - "alternative": {URL: config.ENV_DEFAULT_ELASTICSEARCH_ADDRS}, + "default": {URL: config.EnvDefaultElasticsearchAddrs}, + "alternative": {URL: config.EnvDefaultElasticsearchAddrs}, }}, Logger: log.NewNopLogger(), Tracer: nil, @@ -37,7 +37,7 @@ func TestNewEsFactoryWithOptions(t *testing.T) { var called bool esFactory, cleanup := provideEsFactory(in{ Conf: config.MapAdapter{"es": map[string]Config{ - "default": {URL: config.ENV_DEFAULT_ELASTICSEARCH_ADDRS}, + "default": {URL: config.EnvDefaultElasticsearchAddrs}, }}, Logger: log.NewNopLogger(), Options: []elastic.ClientOptionFunc{ diff --git a/otes/tracing_test.go b/otes/tracing_test.go index 42c1deba..c18e9e0e 100644 --- a/otes/tracing_test.go +++ b/otes/tracing_test.go @@ -17,8 +17,8 @@ func TestTracing(t *testing.T) { opentracing.SetGlobalTracer(tracer) factory, cleanup := provideEsFactory(in{ Conf: config.MapAdapter{"es": map[string]Config{ - "default": {URL: config.ENV_DEFAULT_ELASTICSEARCH_ADDRS}, - "alternative": {URL: config.ENV_DEFAULT_ELASTICSEARCH_ADDRS}, + "default": {URL: config.EnvDefaultElasticsearchAddrs}, + "alternative": {URL: config.EnvDefaultElasticsearchAddrs}, }}, Logger: log.NewNopLogger(), Tracer: tracer, @@ -30,7 +30,7 @@ func TestTracing(t *testing.T) { span, ctx := opentracing.StartSpanFromContextWithTracer(context.Background(), tracer, "es.query") defer span.Finish() - res, code, err := client.Ping(config.ENV_DEFAULT_ELASTICSEARCH_ADDRS[0]).Do(ctx) + res, code, err := client.Ping(config.EnvDefaultElasticsearchAddrs[0]).Do(ctx) assert.NoError(t, err) assert.Equal(t, http.StatusOK, code) assert.NotNil(t, res) diff --git a/otetcd/dependency.go b/otetcd/dependency.go index 79c69951..98eb4860 100644 --- a/otetcd/dependency.go +++ b/otetcd/dependency.go @@ -94,7 +94,7 @@ func provideFactory(p factoryIn) (FactoryOut, func()) { if name != "default" { return di.Pair{}, fmt.Errorf("etcd configuration %s not valid", name) } - conf = Option{Endpoints: config.ENV_DEFAULT_ETCD_ADDRS} + conf = Option{Endpoints: config.EnvDefaultEtcdAddrs} } co := clientv3.Config{ Endpoints: conf.Endpoints, diff --git a/otetcd/dependency_test.go b/otetcd/dependency_test.go index bd199e6c..4be8438c 100644 --- a/otetcd/dependency_test.go +++ b/otetcd/dependency_test.go @@ -42,10 +42,10 @@ func TestProvideFactory(t *testing.T) { out, cleanup := provideFactory(factoryIn{ Conf: config.MapAdapter{"etcd": map[string]Option{ "default": { - Endpoints: config.ENV_DEFAULT_ETCD_ADDRS, + Endpoints: config.EnvDefaultEtcdAddrs, }, "alternative": { - Endpoints: config.ENV_DEFAULT_ETCD_ADDRS, + Endpoints: config.EnvDefaultEtcdAddrs, }, }}, Logger: log.NewNopLogger(), diff --git a/otetcd/tracing_test.go b/otetcd/tracing_test.go index dde5dcc1..de1614af 100644 --- a/otetcd/tracing_test.go +++ b/otetcd/tracing_test.go @@ -19,7 +19,7 @@ func TestTracing(t *testing.T) { Logger: log.NewNopLogger(), Conf: config.MapAdapter{"etcd": map[string]Option{ "default": { - Endpoints: config.ENV_DEFAULT_ETCD_ADDRS, + Endpoints: config.EnvDefaultEtcdAddrs, }, }}, Interceptor: func(name string, options *clientv3.Config) { diff --git a/otkafka/dependency_test.go b/otkafka/dependency_test.go index 96d9615b..f03c5acd 100644 --- a/otkafka/dependency_test.go +++ b/otkafka/dependency_test.go @@ -19,7 +19,7 @@ func TestProvideReaderFactory(t *testing.T) { In: di.In{}, Conf: config.MapAdapter{"kafka.reader": map[string]ReaderConfig{ "default": { - Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, + Brokers: config.EnvDefaultKafkaAddrs, Topic: "Test", }, "alternative": { @@ -43,11 +43,11 @@ func TestProvideWriterFactory(t *testing.T) { In: di.In{}, Conf: config.MapAdapter{"kafka.writer": map[string]WriterConfig{ "default": { - Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, + Brokers: config.EnvDefaultKafkaAddrs, Topic: "Test", }, "alternative": { - Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, + Brokers: config.EnvDefaultKafkaAddrs, Topic: "Test", }, }}, @@ -67,11 +67,11 @@ func TestProvideKafka(t *testing.T) { Logger: log.NewNopLogger(), Conf: config.MapAdapter{"kafka.writer": map[string]WriterConfig{ "default": { - Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, + Brokers: config.EnvDefaultKafkaAddrs, Topic: "Test", }, "alternative": { - Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, + Brokers: config.EnvDefaultKafkaAddrs, Topic: "Test", }, }}, diff --git a/otkafka/example_test.go b/otkafka/example_test.go index 3d8bd5ba..ea3f89f5 100644 --- a/otkafka/example_test.go +++ b/otkafka/example_test.go @@ -14,8 +14,8 @@ import ( ) func Example_reader() { - brokers := make([]string, len(config.ENV_DEFAULT_KAFKA_ADDRS)) - for i, addr := range config.ENV_DEFAULT_KAFKA_ADDRS { + brokers := make([]string, len(config.EnvDefaultKafkaAddrs)) + for i, addr := range config.EnvDefaultKafkaAddrs { brokers[i] = fmt.Sprintf(` - %s`, addr) } brokersStr := strings.Join(brokers, ` diff --git a/otkafka/reader_config.go b/otkafka/reader_config.go index 662477c0..747b03b6 100644 --- a/otkafka/reader_config.go +++ b/otkafka/reader_config.go @@ -133,7 +133,7 @@ type ReaderInterceptor func(name string, reader *kafka.ReaderConfig) func fromReaderConfig(conf ReaderConfig) kafka.ReaderConfig { if len(conf.Brokers) == 0 { - conf.Brokers = config.ENV_DEFAULT_KAFKA_ADDRS + conf.Brokers = config.EnvDefaultKafkaAddrs } return kafka.ReaderConfig{ Brokers: conf.Brokers, diff --git a/otkafka/reader_config_test.go b/otkafka/reader_config_test.go index 3351e4a1..76843861 100644 --- a/otkafka/reader_config_test.go +++ b/otkafka/reader_config_test.go @@ -9,5 +9,5 @@ import ( func Test_fromReaderConfig(t *testing.T) { reader := fromReaderConfig(ReaderConfig{}) - assert.Equal(t, config.ENV_DEFAULT_KAFKA_ADDRS, reader.Brokers) + assert.Equal(t, config.EnvDefaultKafkaAddrs, reader.Brokers) } diff --git a/otkafka/setup_test.go b/otkafka/setup_test.go index c4c92a2f..cd6f9660 100644 --- a/otkafka/setup_test.go +++ b/otkafka/setup_test.go @@ -19,7 +19,7 @@ func TestMain(m *testing.M) { func setupTopic() { var topics = []string{"trace", "test", "example"} - conn, err := kafka.Dial("tcp", config.ENV_DEFAULT_KAFKA_ADDRS[0]) + conn, err := kafka.Dial("tcp", config.EnvDefaultKafkaAddrs[0]) if err != nil { panic(err.Error()) } diff --git a/otkafka/transport_test.go b/otkafka/transport_test.go index 22dba1b5..c7245de1 100644 --- a/otkafka/transport_test.go +++ b/otkafka/transport_test.go @@ -21,7 +21,7 @@ func TestTransport_RoundTrip(t *testing.T) { In: di.In{}, Conf: config.MapAdapter{"kafka.writer": map[string]WriterConfig{ "default": { - Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, + Brokers: config.EnvDefaultKafkaAddrs, Topic: "Test", }, }}, diff --git a/otkafka/writer_config.go b/otkafka/writer_config.go index 689b098e..6c4c1e8a 100644 --- a/otkafka/writer_config.go +++ b/otkafka/writer_config.go @@ -85,7 +85,7 @@ type WriterConfig struct { func fromWriterConfig(conf WriterConfig) kafka.Writer { if len(conf.Brokers) == 0 { - conf.Brokers = config.ENV_DEFAULT_KAFKA_ADDRS + conf.Brokers = config.EnvDefaultKafkaAddrs } return kafka.Writer{ Addr: kafka.TCP(conf.Brokers...), diff --git a/otkafka/writer_test.go b/otkafka/writer_test.go index 7b87bec5..65abe8ed 100644 --- a/otkafka/writer_test.go +++ b/otkafka/writer_test.go @@ -17,7 +17,7 @@ func TestWriter(t *testing.T) { { ctx := context.Background() kw := kafka.Writer{ - Addr: kafka.TCP(config.ENV_DEFAULT_KAFKA_ADDRS...), + Addr: kafka.TCP(config.EnvDefaultKafkaAddrs...), Topic: "trace", } tracer := mocktracer.New() @@ -32,7 +32,7 @@ func TestWriter(t *testing.T) { { ctx := context.Background() - kr := kafka.NewReader(kafka.ReaderConfig{Brokers: config.ENV_DEFAULT_KAFKA_ADDRS, Topic: "trace", GroupID: "test", MinBytes: 1, MaxBytes: 1}) + kr := kafka.NewReader(kafka.ReaderConfig{Brokers: config.EnvDefaultKafkaAddrs, Topic: "trace", GroupID: "test", MinBytes: 1, MaxBytes: 1}) tracer := mocktracer.New() msg, err := kr.ReadMessage(ctx) assert.NoError(t, err) @@ -47,5 +47,5 @@ func TestWriter(t *testing.T) { func Test_fromWriterConfig(t *testing.T) { writer := fromWriterConfig(WriterConfig{}) - assert.Equal(t, strings.Join(config.ENV_DEFAULT_KAFKA_ADDRS, ","), writer.Addr.String()) + assert.Equal(t, strings.Join(config.EnvDefaultKafkaAddrs, ","), writer.Addr.String()) } diff --git a/otredis/module_test.go b/otredis/module_test.go index a8b9ffa7..cd50a550 100644 --- a/otredis/module_test.go +++ b/otredis/module_test.go @@ -34,7 +34,7 @@ func TestModule_ProvideRunGroup(t *testing.T) { m.EXPECT().Set(gomock.Any()).MinTimes(1) c := core.New( - core.WithInline("redis.default.addrs", config.ENV_DEFAULT_REDIS_ADDRS), + core.WithInline("redis.default.addrs", config.EnvDefaultRedisAddrs), core.WithInline("redisMetrics.interval", "1ms"), core.WithInline("log.level", "none"), ) diff --git a/otredis/provider.go b/otredis/provider.go index a822ccf4..88f8466e 100644 --- a/otredis/provider.go +++ b/otredis/provider.go @@ -99,7 +99,7 @@ func provideRedisFactory(p in) (out, func()) { } base = RedisUniversalOptions{ - Addrs: config.ENV_DEFAULT_REDIS_ADDRS, + Addrs: config.EnvDefaultRedisAddrs, } } full = redis.UniversalOptions{ @@ -188,7 +188,7 @@ func provideConfig() configOut { Data: map[string]interface{}{ "redis": map[string]RedisUniversalOptions{ "default": { - Addrs: config.ENV_DEFAULT_REDIS_ADDRS, + Addrs: config.EnvDefaultRedisAddrs, }, }, "redisMetrics": metricsConf{ diff --git a/otredis/provider_test.go b/otredis/provider_test.go index 3a04d10e..fbaa9e8f 100644 --- a/otredis/provider_test.go +++ b/otredis/provider_test.go @@ -41,5 +41,5 @@ func TestProvideConfigs(t *testing.T) { k.Load(rawbytes.Provider(bytes), yaml.Parser()) k.Unmarshal("redis.default", &r) assert.Equal(t, 0, r.DB) - assert.Equal(t, config.ENV_DEFAULT_REDIS_ADDRS, r.Addrs) + assert.Equal(t, config.EnvDefaultRedisAddrs, r.Addrs) } diff --git a/queue/dependency_test.go b/queue/dependency_test.go index 3c1ef723..ccdfe884 100644 --- a/queue/dependency_test.go +++ b/queue/dependency_test.go @@ -16,7 +16,7 @@ import ( type maker struct{} func (m maker) Make(name string) (redis.UniversalClient, error) { - return redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.ENV_DEFAULT_REDIS_ADDRS}), nil + return redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.EnvDefaultRedisAddrs}), nil } func TestProvideDispatcher(t *testing.T) { diff --git a/queue/dispatcher_test.go b/queue/dispatcher_test.go index bc9ef403..96b50f0f 100644 --- a/queue/dispatcher_test.go +++ b/queue/dispatcher_test.go @@ -57,7 +57,7 @@ func TestMain(m *testing.M) { func setUp() *QueueableDispatcher { s := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: config.ENV_DEFAULT_REDIS_ADDRS, + Addrs: config.EnvDefaultRedisAddrs, }) driver := RedisDriver{ Logger: logging.NewLogger("logfmt"), @@ -84,7 +84,7 @@ func tearDown() { Waiting: "waiting", Timeout: "timeout", } - redisClient := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.ENV_DEFAULT_REDIS_ADDRS}) + redisClient := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.EnvDefaultRedisAddrs}) redisClient.Del(context.Background(), channel.Delayed) redisClient.Del(context.Background(), channel.Failed) redisClient.Del(context.Background(), channel.Reserved) diff --git a/queue/redis_driver.go b/queue/redis_driver.go index a78ffe89..add29f7e 100644 --- a/queue/redis_driver.go +++ b/queue/redis_driver.go @@ -251,7 +251,7 @@ func (r *RedisDriver) populateDefaults() { } if r.RedisClient == nil { r.RedisClient = redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: config.ENV_DEFAULT_REDIS_ADDRS, + Addrs: config.EnvDefaultRedisAddrs, }) } if r.Packer == nil { From 25b3083181c33b7eb8b684a34dd52131abf5de49 Mon Sep 17 00:00:00 2001 From: Trock Date: Wed, 12 May 2021 11:20:40 +0800 Subject: [PATCH 3/8] refactor: add internal and change env --- config/env.go | 16 ---------------- config/remote/remote_test.go | 15 +++++++++++++-- dtx/sagas/mysqlstore/mysql_store_test.go | 9 ++++++--- leader/election_test.go | 10 +++++++++- leader/leaderetcd/etcd_test.go | 16 ++++++++++++++-- leader/leaderredis/redis_test.go | 16 ++++++++++++++-- otes/provider.go | 7 +++++-- otes/provider_test.go | 12 +++++++++--- otes/tracing_test.go | 6 +++--- otetcd/dependency.go | 7 +++++-- otetcd/dependency_test.go | 10 ++++++++-- otetcd/tracing_test.go | 2 +- otgorm/dependency.go | 11 ++++------- otgorm/dependency_test.go | 2 +- otgorm/module_test.go | 2 +- otkafka/dependency.go | 15 +++++++++++---- otkafka/dependency_test.go | 12 ++++++------ otkafka/example_test.go | 18 ++++-------------- otkafka/reader_config.go | 3 +-- otkafka/reader_config_test.go | 3 +-- otkafka/setup_test.go | 8 ++++++-- otkafka/transport_test.go | 2 +- otkafka/writer_config.go | 3 +-- otkafka/writer_test.go | 7 +++---- otmongo/provider.go | 11 +++++------ otmongo/provider_test.go | 6 +++--- otredis/module_test.go | 9 +++++++-- otredis/provider.go | 7 +++++-- otredis/provider_test.go | 2 +- ots3/dependency.go | 22 +++++++++++++++------- ots3/example_test.go | 6 +++--- ots3/uploader_test.go | 16 ++++++++-------- queue/dependency_test.go | 2 +- queue/dispatcher_test.go | 11 ++++++++--- queue/redis_driver.go | 10 ++++++---- 35 files changed, 189 insertions(+), 125 deletions(-) diff --git a/config/env.go b/config/env.go index abff56fe..7b88e11a 100644 --- a/config/env.go +++ b/config/env.go @@ -1,7 +1,6 @@ package config import ( - "os" "strings" "github.com/DoNewsCode/core/contract" @@ -81,18 +80,3 @@ func NewEnvFromConf(conf contract.ConfigAccessor) Env { envStr := conf.String("env") return NewEnv(envStr) } - -func getDefaultAddrsFromEnv(env, defaultVal string) []string { - if v := os.Getenv(env); v != "" { - return strings.Split(v, ",") - } - return []string{defaultVal} -} - -// Default multiple addresses from env -var ( - EnvDefaultElasticsearchAddrs = getDefaultAddrsFromEnv("ELASTICSEARCH_ADDR", "http://127.0.0.1:9200") - EnvDefaultEtcdAddrs = getDefaultAddrsFromEnv("ETCD_ADDR", "127.0.0.1:2379") - EnvDefaultKafkaAddrs = getDefaultAddrsFromEnv("KAFKA_ADDR", "127.0.0.1:9092") - EnvDefaultRedisAddrs = getDefaultAddrsFromEnv("REDIS_ADDR", "127.0.0.1:6379") -) diff --git a/config/remote/remote_test.go b/config/remote/remote_test.go index fafbc50e..2ca059d4 100644 --- a/config/remote/remote_test.go +++ b/config/remote/remote_test.go @@ -3,18 +3,29 @@ package remote import ( "context" "fmt" + "github.com/DoNewsCode/core/internal" + "os" "sync" "testing" "time" - "github.com/DoNewsCode/core/config" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3" ) +var envDefaultEtcdAddrs, envDefaultEtcdAddrsIsSet = internal.GetDefaultAddrsFromEnv("ETCD_ADDR", "127.0.0.1:2379") + +func TestMain(m *testing.M) { + if !envDefaultEtcdAddrsIsSet { + fmt.Println("Set env ETCD_ADDR to run remote tests") + os.Exit(0) + } + os.Exit(m.Run()) +} + func TestRemote(t *testing.T) { cfg := &clientv3.Config{ - Endpoints: config.EnvDefaultEtcdAddrs, + Endpoints: envDefaultEtcdAddrs, DialTimeout: 2 * time.Second, } diff --git a/dtx/sagas/mysqlstore/mysql_store_test.go b/dtx/sagas/mysqlstore/mysql_store_test.go index b95fa972..67ab870b 100644 --- a/dtx/sagas/mysqlstore/mysql_store_test.go +++ b/dtx/sagas/mysqlstore/mysql_store_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/DoNewsCode/core/internal" "os" "testing" "time" @@ -23,8 +24,10 @@ func (m module) ProvideMigration() []*otgorm.Migration { return Migrations("default") } +var envDefaultMysqlDsn, envDefaultMysqlDsnIsSet = internal.GetDefaultAddrFromEnv("MYSQL_DSN", "root@tcp(127.0.0.1:3306)/app?charset=utf8mb4&parseTime=True&loc=Local") + func TestMain(m *testing.M) { - if os.Getenv("MYSQL_DSN") == "" { + if !envDefaultMysqlDsnIsSet { fmt.Println("Set env MYSQL_DSN to run mysqlstore tests") os.Exit(0) } @@ -262,7 +265,7 @@ func TestMySQLStore(t *testing.T) { c := core.New( core.WithInline("log.level", "debug"), core.WithInline("gorm.default.database", "mysql"), - core.WithInline("gorm.default.dsn", "root@tcp(127.0.0.1:3306)/app?charset=utf8mb4&parseTime=True&loc=Local"), + core.WithInline("gorm.default.dsn", envDefaultMysqlDsn), ) c.ProvideEssentials() c.Provide(otgorm.Providers()) @@ -277,7 +280,7 @@ func TestStore_CleanUp(t *testing.T) { c := core.New( core.WithInline("log.level", "error"), core.WithInline("gorm.default.database", "mysql"), - core.WithInline("gorm.default.dsn", "root@tcp(127.0.0.1:3306)/app?charset=utf8mb4&parseTime=True&loc=Local"), + core.WithInline("gorm.default.dsn", envDefaultMysqlDsn), ) c.ProvideEssentials() c.Provide(otgorm.Providers()) diff --git a/leader/election_test.go b/leader/election_test.go index e8831aae..696f30aa 100644 --- a/leader/election_test.go +++ b/leader/election_test.go @@ -2,11 +2,13 @@ package leader import ( "context" + "fmt" "os" "testing" "time" "github.com/DoNewsCode/core/events" + "github.com/DoNewsCode/core/internal" "github.com/DoNewsCode/core/key" leaderetcd2 "github.com/DoNewsCode/core/leader/leaderetcd" "github.com/stretchr/testify/assert" @@ -14,7 +16,13 @@ import ( "go.uber.org/atomic" ) +var envDefaultRedisAddrs, envDefaultRedisAddrsIsSet = internal.GetDefaultAddrsFromEnv("REDIS_ADDR", "127.0.0.1:6379") + func TestMain(m *testing.M) { + if !envDefaultRedisAddrsIsSet { + fmt.Println("Set env REDIS_ADDR to run leader tests") + os.Exit(0) + } os.Exit(m.Run()) } @@ -22,7 +30,7 @@ func TestElection(t *testing.T) { var dispatcher = &events.SyncDispatcher{} var e1, e2 Election - client, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}}) + client, err := clientv3.New(clientv3.Config{Endpoints: envDefaultRedisAddrs}) assert.NoError(t, err) e1 = Election{ dispatcher: dispatcher, diff --git a/leader/leaderetcd/etcd_test.go b/leader/leaderetcd/etcd_test.go index 436fa64e..1fc30054 100644 --- a/leader/leaderetcd/etcd_test.go +++ b/leader/leaderetcd/etcd_test.go @@ -2,16 +2,28 @@ package leaderetcd import ( "context" + "fmt" + "os" "testing" - "github.com/DoNewsCode/core/config" + "github.com/DoNewsCode/core/internal" "github.com/DoNewsCode/core/key" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3" ) +var envDefaultEtcdAddrs, envDefaultEtcdAddrsIsSet = internal.GetDefaultAddrsFromEnv("ETCD_ADDR", "127.0.0.1:2379") + +func TestMain(m *testing.M) { + if !envDefaultEtcdAddrsIsSet { + fmt.Println("Set env ETCD_ADDR to run leaderetcd tests") + os.Exit(0) + } + os.Exit(m.Run()) +} + func TestNewEtcdDriver(t *testing.T) { - client, _ := clientv3.New(clientv3.Config{Endpoints: config.EnvDefaultEtcdAddrs}) + client, _ := clientv3.New(clientv3.Config{Endpoints: envDefaultEtcdAddrs}) e1 := NewEtcdDriver(client, key.New("test")) e2 := NewEtcdDriver(client, key.New("test")) diff --git a/leader/leaderredis/redis_test.go b/leader/leaderredis/redis_test.go index 410358db..65d08e4d 100644 --- a/leader/leaderredis/redis_test.go +++ b/leader/leaderredis/redis_test.go @@ -4,19 +4,31 @@ package leaderredis import ( "context" + "fmt" + "os" "testing" "time" - "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/events" + "github.com/DoNewsCode/core/internal" "github.com/DoNewsCode/core/key" "github.com/DoNewsCode/core/leader" "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" ) +var envDefaultRedisAddrs, envDefaultRedisAddrsIsSet = internal.GetDefaultAddrsFromEnv("REDIS_ADDR", "127.0.0.1:6379") + +func TestMain(m *testing.M) { + if !envDefaultRedisAddrsIsSet { + fmt.Println("Set env REDIS_ADDR to run leaderredis tests") + os.Exit(0) + } + os.Exit(m.Run()) +} + func TestCampaign(t *testing.T) { - client := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.EnvDefaultRedisAddrs}) + client := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: envDefaultRedisAddrs}) driver := RedisDriver{ client: client, keyer: key.New(), diff --git a/otes/provider.go b/otes/provider.go index dab74455..7c4a5e35 100644 --- a/otes/provider.go +++ b/otes/provider.go @@ -7,6 +7,7 @@ import ( "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/internal" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/olivere/elastic/v7" @@ -94,7 +95,7 @@ func provideEsFactory(p in) (out, func()) { if name != "default" { return di.Pair{}, fmt.Errorf("elastic configuration %s not valid", name) } - conf.URL = config.EnvDefaultElasticsearchAddrs + conf.URL = envDefaultElasticsearchAddrs } if p.Interceptor != nil { p.Interceptor(name, &conf) @@ -164,7 +165,7 @@ func provideConfig() configOut { Data: map[string]interface{}{ "es": map[string]Config{ "default": { - URL: []string{"http://localhost:9200"}, + URL: envDefaultElasticsearchAddrs, Shards: 1, }, }, @@ -174,3 +175,5 @@ func provideConfig() configOut { } return configOut{Config: configs} } + +var envDefaultElasticsearchAddrs, envDefaultElasticsearchAddrsIsSet = internal.GetDefaultAddrsFromEnv("ELASTICSEARCH_ADDR", "http://127.0.0.1:9200") diff --git a/otes/provider_test.go b/otes/provider_test.go index ad0f9130..922b6d8b 100644 --- a/otes/provider_test.go +++ b/otes/provider_test.go @@ -1,6 +1,7 @@ package otes import ( + "fmt" "os" "testing" @@ -11,14 +12,19 @@ import ( ) func TestMain(m *testing.M) { + if !envDefaultElasticsearchAddrsIsSet { + fmt.Println("Set env ELASTICSEARCH_ADDR to run otes tests") + os.Exit(0) + } + os.Exit(m.Run()) } func TestNewEsFactory(t *testing.T) { esFactory, cleanup := provideEsFactory(in{ Conf: config.MapAdapter{"es": map[string]Config{ - "default": {URL: config.EnvDefaultElasticsearchAddrs}, - "alternative": {URL: config.EnvDefaultElasticsearchAddrs}, + "default": {URL: envDefaultElasticsearchAddrs}, + "alternative": {URL: envDefaultElasticsearchAddrs}, }}, Logger: log.NewNopLogger(), Tracer: nil, @@ -37,7 +43,7 @@ func TestNewEsFactoryWithOptions(t *testing.T) { var called bool esFactory, cleanup := provideEsFactory(in{ Conf: config.MapAdapter{"es": map[string]Config{ - "default": {URL: config.EnvDefaultElasticsearchAddrs}, + "default": {URL: envDefaultElasticsearchAddrs}, }}, Logger: log.NewNopLogger(), Options: []elastic.ClientOptionFunc{ diff --git a/otes/tracing_test.go b/otes/tracing_test.go index c18e9e0e..b06387ef 100644 --- a/otes/tracing_test.go +++ b/otes/tracing_test.go @@ -17,8 +17,8 @@ func TestTracing(t *testing.T) { opentracing.SetGlobalTracer(tracer) factory, cleanup := provideEsFactory(in{ Conf: config.MapAdapter{"es": map[string]Config{ - "default": {URL: config.EnvDefaultElasticsearchAddrs}, - "alternative": {URL: config.EnvDefaultElasticsearchAddrs}, + "default": {URL: envDefaultElasticsearchAddrs}, + "alternative": {URL: envDefaultElasticsearchAddrs}, }}, Logger: log.NewNopLogger(), Tracer: tracer, @@ -30,7 +30,7 @@ func TestTracing(t *testing.T) { span, ctx := opentracing.StartSpanFromContextWithTracer(context.Background(), tracer, "es.query") defer span.Finish() - res, code, err := client.Ping(config.EnvDefaultElasticsearchAddrs[0]).Do(ctx) + res, code, err := client.Ping(envDefaultElasticsearchAddrs[0]).Do(ctx) assert.NoError(t, err) assert.Equal(t, http.StatusOK, code) assert.NotNil(t, res) diff --git a/otetcd/dependency.go b/otetcd/dependency.go index 98eb4860..5d550f88 100644 --- a/otetcd/dependency.go +++ b/otetcd/dependency.go @@ -7,6 +7,7 @@ import ( "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/internal" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/opentracing-contrib/go-grpc" @@ -94,7 +95,7 @@ func provideFactory(p factoryIn) (FactoryOut, func()) { if name != "default" { return di.Pair{}, fmt.Errorf("etcd configuration %s not valid", name) } - conf = Option{Endpoints: config.EnvDefaultEtcdAddrs} + conf = Option{Endpoints: envDefaultEtcdAddrs} } co := clientv3.Config{ Endpoints: conf.Endpoints, @@ -157,7 +158,7 @@ func provideConfig() configOut { map[string]interface{}{ "etcd": map[string]Option{ "default": { - Endpoints: []string{"127.0.0.1:2379"}, + Endpoints: envDefaultEtcdAddrs, AutoSyncInterval: config.Duration{}, DialTimeout: config.Duration{}, DialKeepAliveTime: config.Duration{}, @@ -184,3 +185,5 @@ func provideConfig() configOut { func duration(d config.Duration) time.Duration { return d.Duration } + +var envDefaultEtcdAddrs, envDefaultEtcdAddrsIsSet = internal.GetDefaultAddrsFromEnv("ETCD_ADDR", "127.0.0.1:2379") diff --git a/otetcd/dependency_test.go b/otetcd/dependency_test.go index 4be8438c..7aa43ef9 100644 --- a/otetcd/dependency_test.go +++ b/otetcd/dependency_test.go @@ -1,6 +1,7 @@ package otetcd import ( + "fmt" "os" "testing" @@ -14,6 +15,11 @@ import ( ) func TestMain(m *testing.M) { + if !envDefaultEtcdAddrsIsSet { + fmt.Println("Set env ETCD_ADDR to run otetcd tests") + os.Exit(0) + } + os.Exit(m.Run()) } @@ -42,10 +48,10 @@ func TestProvideFactory(t *testing.T) { out, cleanup := provideFactory(factoryIn{ Conf: config.MapAdapter{"etcd": map[string]Option{ "default": { - Endpoints: config.EnvDefaultEtcdAddrs, + Endpoints: envDefaultEtcdAddrs, }, "alternative": { - Endpoints: config.EnvDefaultEtcdAddrs, + Endpoints: envDefaultEtcdAddrs, }, }}, Logger: log.NewNopLogger(), diff --git a/otetcd/tracing_test.go b/otetcd/tracing_test.go index de1614af..da528124 100644 --- a/otetcd/tracing_test.go +++ b/otetcd/tracing_test.go @@ -19,7 +19,7 @@ func TestTracing(t *testing.T) { Logger: log.NewNopLogger(), Conf: config.MapAdapter{"etcd": map[string]Option{ "default": { - Endpoints: config.EnvDefaultEtcdAddrs, + Endpoints: envDefaultEtcdAddrs, }, }}, Interceptor: func(name string, options *clientv3.Config) { diff --git a/otgorm/dependency.go b/otgorm/dependency.go index 6b491c39..187d0d0e 100644 --- a/otgorm/dependency.go +++ b/otgorm/dependency.go @@ -4,12 +4,12 @@ import ( "errors" "fmt" "net" - "os" "time" "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/internal" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/opentracing/opentracing-go" @@ -259,11 +259,6 @@ type configOut struct { // ProvideConfig exports the default database configuration. func provideConfig() configOut { - var defaultDSN string - defaultDSN = "root@tcp(127.0.0.1:3306)/app?charset=utf8mb4&parseTime=True&loc=Local" - if os.Getenv("MYSQL_DSN") != "" { - defaultDSN = os.Getenv("MYSQL_DSN") - } exported := []config.ExportedConfig{ { Owner: "otgorm", @@ -271,7 +266,7 @@ func provideConfig() configOut { "gorm": map[string]databaseConf{ "default": { Database: "mysql", - Dsn: defaultDSN, + Dsn: envDefaultMysqlDsn, SkipDefaultTransaction: false, FullSaveAssociations: false, DryRun: false, @@ -297,3 +292,5 @@ func provideConfig() configOut { } return configOut{Config: exported} } + +var envDefaultMysqlDsn, envDefaultMysqlDsnIsSet = internal.GetDefaultAddrFromEnv("MYSQL_DSN", "root@tcp(127.0.0.1:3306)/app?charset=utf8mb4&parseTime=True&loc=Local") diff --git a/otgorm/dependency_test.go b/otgorm/dependency_test.go index 9fa46a30..61b6b009 100644 --- a/otgorm/dependency_test.go +++ b/otgorm/dependency_test.go @@ -21,7 +21,7 @@ func TestProvideDBFactory(t *testing.T) { }, "alternative": { Database: "mysql", - Dsn: "root@tcp(127.0.0.1:3306)/app?charset=utf8mb4&parseTime=True&loc=Local", + Dsn: envDefaultMysqlDsn, }, }}, Logger: log.NewNopLogger(), diff --git a/otgorm/module_test.go b/otgorm/module_test.go index f909ce8f..3a29f69b 100644 --- a/otgorm/module_test.go +++ b/otgorm/module_test.go @@ -52,7 +52,7 @@ func (m *Mock) ProvideMigration() []*Migration { } func TestMain(m *testing.M) { - if os.Getenv("MYSQL_DSN") == "" { + if !envDefaultMysqlDsnIsSet { fmt.Println("Set env MYSQL_DSN to run otgorm tests") os.Exit(0) } diff --git a/otkafka/dependency.go b/otkafka/dependency.go index 2cdc8382..4aa163b6 100644 --- a/otkafka/dependency.go +++ b/otkafka/dependency.go @@ -6,6 +6,7 @@ import ( "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/internal" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/opentracing/opentracing-go" @@ -172,11 +173,15 @@ func provideConfig() configOut { Owner: "kitkafka", Data: map[string]interface{}{ "kafka": map[string]interface{}{ - "reader": ReaderConfig{ - Brokers: []string{"127.0.0.1:9092"}, + "reader": map[string]interface{}{ + "default": ReaderConfig{ + Brokers: envDefaultKafkaAddrs, + }, }, - "writer": WriterConfig{ - Brokers: []string{"127.0.0.1:9092"}, + "writer": map[string]interface{}{ + "default": WriterConfig{ + Brokers: envDefaultKafkaAddrs, + }, }, }, }, @@ -185,3 +190,5 @@ func provideConfig() configOut { } return configOut{Config: configs} } + +var envDefaultKafkaAddrs, envDefaultKafkaAddrsIsSet = internal.GetDefaultAddrsFromEnv("KAFKA_ADDR", "127.0.0.1:9092") diff --git a/otkafka/dependency_test.go b/otkafka/dependency_test.go index f03c5acd..0bed41e3 100644 --- a/otkafka/dependency_test.go +++ b/otkafka/dependency_test.go @@ -19,11 +19,11 @@ func TestProvideReaderFactory(t *testing.T) { In: di.In{}, Conf: config.MapAdapter{"kafka.reader": map[string]ReaderConfig{ "default": { - Brokers: config.EnvDefaultKafkaAddrs, + Brokers: envDefaultKafkaAddrs, Topic: "Test", }, "alternative": { - Brokers: []string{"127.0.0.1:9093"}, + Brokers: envDefaultKafkaAddrs, Topic: "Test", }, }}, @@ -43,11 +43,11 @@ func TestProvideWriterFactory(t *testing.T) { In: di.In{}, Conf: config.MapAdapter{"kafka.writer": map[string]WriterConfig{ "default": { - Brokers: config.EnvDefaultKafkaAddrs, + Brokers: envDefaultKafkaAddrs, Topic: "Test", }, "alternative": { - Brokers: config.EnvDefaultKafkaAddrs, + Brokers: envDefaultKafkaAddrs, Topic: "Test", }, }}, @@ -67,11 +67,11 @@ func TestProvideKafka(t *testing.T) { Logger: log.NewNopLogger(), Conf: config.MapAdapter{"kafka.writer": map[string]WriterConfig{ "default": { - Brokers: config.EnvDefaultKafkaAddrs, + Brokers: envDefaultKafkaAddrs, Topic: "Test", }, "alternative": { - Brokers: config.EnvDefaultKafkaAddrs, + Brokers: envDefaultKafkaAddrs, Topic: "Test", }, }}, diff --git a/otkafka/example_test.go b/otkafka/example_test.go index ea3f89f5..7fb1caa4 100644 --- a/otkafka/example_test.go +++ b/otkafka/example_test.go @@ -3,10 +3,8 @@ package otkafka_test import ( "context" "fmt" - "strings" "github.com/DoNewsCode/core" - "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/otkafka" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/rawbytes" @@ -14,12 +12,6 @@ import ( ) func Example_reader() { - brokers := make([]string, len(config.EnvDefaultKafkaAddrs)) - for i, addr := range config.EnvDefaultKafkaAddrs { - brokers[i] = fmt.Sprintf(` - %s`, addr) - } - brokersStr := strings.Join(brokers, ` -`) var conf = ` log: level: none @@ -27,15 +19,13 @@ kafka: reader: default: brokers: -` + brokersStr + ` - topic: - example + - 127.0.0.1:9200 + topic: example writer: default: brokers: -` + brokersStr + ` - topic: - example + - 127.0.0.1:9200 + topic: example ` c := core.Default(core.WithConfigStack(rawbytes.Provider([]byte(conf)), yaml.Parser())) c.Provide(otkafka.Providers()) diff --git a/otkafka/reader_config.go b/otkafka/reader_config.go index 747b03b6..9976087e 100644 --- a/otkafka/reader_config.go +++ b/otkafka/reader_config.go @@ -3,7 +3,6 @@ package otkafka import ( "time" - "github.com/DoNewsCode/core/config" "github.com/segmentio/kafka-go" ) @@ -133,7 +132,7 @@ type ReaderInterceptor func(name string, reader *kafka.ReaderConfig) func fromReaderConfig(conf ReaderConfig) kafka.ReaderConfig { if len(conf.Brokers) == 0 { - conf.Brokers = config.EnvDefaultKafkaAddrs + conf.Brokers = envDefaultKafkaAddrs } return kafka.ReaderConfig{ Brokers: conf.Brokers, diff --git a/otkafka/reader_config_test.go b/otkafka/reader_config_test.go index 76843861..2391009e 100644 --- a/otkafka/reader_config_test.go +++ b/otkafka/reader_config_test.go @@ -1,7 +1,6 @@ package otkafka import ( - "github.com/DoNewsCode/core/config" "testing" "github.com/stretchr/testify/assert" @@ -9,5 +8,5 @@ import ( func Test_fromReaderConfig(t *testing.T) { reader := fromReaderConfig(ReaderConfig{}) - assert.Equal(t, config.EnvDefaultKafkaAddrs, reader.Brokers) + assert.Equal(t, envDefaultKafkaAddrs, reader.Brokers) } diff --git a/otkafka/setup_test.go b/otkafka/setup_test.go index cd6f9660..e2f0330f 100644 --- a/otkafka/setup_test.go +++ b/otkafka/setup_test.go @@ -1,16 +1,20 @@ package otkafka import ( + "fmt" "net" "os" "strconv" "testing" - "github.com/DoNewsCode/core/config" "github.com/segmentio/kafka-go" ) func TestMain(m *testing.M) { + if !envDefaultKafkaAddrsIsSet { + fmt.Println("Set env KAFKA_ADDR to run otkafka tests") + os.Exit(0) + } setupTopic() os.Exit(m.Run()) @@ -19,7 +23,7 @@ func TestMain(m *testing.M) { func setupTopic() { var topics = []string{"trace", "test", "example"} - conn, err := kafka.Dial("tcp", config.EnvDefaultKafkaAddrs[0]) + conn, err := kafka.Dial("tcp", envDefaultKafkaAddrs[0]) if err != nil { panic(err.Error()) } diff --git a/otkafka/transport_test.go b/otkafka/transport_test.go index c7245de1..59ed2df4 100644 --- a/otkafka/transport_test.go +++ b/otkafka/transport_test.go @@ -21,7 +21,7 @@ func TestTransport_RoundTrip(t *testing.T) { In: di.In{}, Conf: config.MapAdapter{"kafka.writer": map[string]WriterConfig{ "default": { - Brokers: config.EnvDefaultKafkaAddrs, + Brokers: envDefaultKafkaAddrs, Topic: "Test", }, }}, diff --git a/otkafka/writer_config.go b/otkafka/writer_config.go index 6c4c1e8a..874be272 100644 --- a/otkafka/writer_config.go +++ b/otkafka/writer_config.go @@ -3,7 +3,6 @@ package otkafka import ( "time" - "github.com/DoNewsCode/core/config" "github.com/segmentio/kafka-go" ) @@ -85,7 +84,7 @@ type WriterConfig struct { func fromWriterConfig(conf WriterConfig) kafka.Writer { if len(conf.Brokers) == 0 { - conf.Brokers = config.EnvDefaultKafkaAddrs + conf.Brokers = envDefaultKafkaAddrs } return kafka.Writer{ Addr: kafka.TCP(conf.Brokers...), diff --git a/otkafka/writer_test.go b/otkafka/writer_test.go index 65abe8ed..6cd701d6 100644 --- a/otkafka/writer_test.go +++ b/otkafka/writer_test.go @@ -5,7 +5,6 @@ import ( "strings" "testing" - "github.com/DoNewsCode/core/config" "github.com/go-kit/kit/log" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" @@ -17,7 +16,7 @@ func TestWriter(t *testing.T) { { ctx := context.Background() kw := kafka.Writer{ - Addr: kafka.TCP(config.EnvDefaultKafkaAddrs...), + Addr: kafka.TCP(envDefaultKafkaAddrs...), Topic: "trace", } tracer := mocktracer.New() @@ -32,7 +31,7 @@ func TestWriter(t *testing.T) { { ctx := context.Background() - kr := kafka.NewReader(kafka.ReaderConfig{Brokers: config.EnvDefaultKafkaAddrs, Topic: "trace", GroupID: "test", MinBytes: 1, MaxBytes: 1}) + kr := kafka.NewReader(kafka.ReaderConfig{Brokers: envDefaultKafkaAddrs, Topic: "trace", GroupID: "test", MinBytes: 1, MaxBytes: 1}) tracer := mocktracer.New() msg, err := kr.ReadMessage(ctx) assert.NoError(t, err) @@ -47,5 +46,5 @@ func TestWriter(t *testing.T) { func Test_fromWriterConfig(t *testing.T) { writer := fromWriterConfig(WriterConfig{}) - assert.Equal(t, strings.Join(config.EnvDefaultKafkaAddrs, ","), writer.Addr.String()) + assert.Equal(t, strings.Join(envDefaultKafkaAddrs, ","), writer.Addr.String()) } diff --git a/otmongo/provider.go b/otmongo/provider.go index 7db561bd..84e8d182 100644 --- a/otmongo/provider.go +++ b/otmongo/provider.go @@ -3,11 +3,11 @@ package otmongo import ( "context" "fmt" - "os" "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/internal" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/opentracing/opentracing-go" @@ -97,10 +97,7 @@ func provideMongoFactory(p in) (out, func()) { if name != "default" { return di.Pair{}, fmt.Errorf("mongo configuration %s not valid", name) } - conf.Uri = "mongodb://127.0.0.1:27017" - if os.Getenv("MONGO_ADDR") != "" { - conf.Uri = os.Getenv("MONGO_ADDR") - } + conf.Uri = envDefaultMongoAddr } opts := options.Client() opts.ApplyURI(conf.Uri) @@ -148,7 +145,7 @@ func provideConfig() configOut { Uri string `json:"uri" yaml:"uri"` }{ "default": { - Uri: "", + Uri: envDefaultMongoAddr, }, }, }, @@ -157,3 +154,5 @@ func provideConfig() configOut { } return configOut{Config: configs} } + +var envDefaultMongoAddr, envDefaultMongoAddrIsSet = internal.GetDefaultAddrFromEnv("MONGO_ADDR", "mongodb://127.0.0.1:27017") diff --git a/otmongo/provider_test.go b/otmongo/provider_test.go index 670e00f0..fdb59b82 100644 --- a/otmongo/provider_test.go +++ b/otmongo/provider_test.go @@ -11,7 +11,7 @@ import ( ) func TestMain(m *testing.M) { - if os.Getenv("MONGO_ADDR") == "" { + if !envDefaultMongoAddrIsSet { fmt.Println("Set env MONGO_ADDR to run otmongo tests") os.Exit(0) } @@ -24,10 +24,10 @@ func TestNewMongoFactory(t *testing.T) { In: dig.In{}, Conf: config.MapAdapter{"mongo": map[string]struct{ Uri string }{ "default": { - Uri: "mongodb://127.0.0.1:27017", + Uri: envDefaultMongoAddr, }, "alternative": { - Uri: "mongodb://127.0.0.1:27017", + Uri: envDefaultMongoAddr, }, }}, Tracer: nil, diff --git a/otredis/module_test.go b/otredis/module_test.go index cd50a550..e510582d 100644 --- a/otredis/module_test.go +++ b/otredis/module_test.go @@ -2,12 +2,12 @@ package otredis import ( "context" + "fmt" "os" "testing" "time" "github.com/DoNewsCode/core" - "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/di" mock_metrics "github.com/DoNewsCode/core/otredis/mocks" "github.com/go-redis/redis/v8" @@ -15,6 +15,11 @@ import ( ) func TestMain(m *testing.M) { + if !envDefaultRedisAddrsIsSet { + fmt.Println("Set env REDIS_ADDR to run otredis tests") + os.Exit(0) + } + os.Exit(m.Run()) } @@ -34,7 +39,7 @@ func TestModule_ProvideRunGroup(t *testing.T) { m.EXPECT().Set(gomock.Any()).MinTimes(1) c := core.New( - core.WithInline("redis.default.addrs", config.EnvDefaultRedisAddrs), + core.WithInline("redis.default.addrs", envDefaultRedisAddrs), core.WithInline("redisMetrics.interval", "1ms"), core.WithInline("log.level", "none"), ) diff --git a/otredis/provider.go b/otredis/provider.go index 88f8466e..03a95c10 100644 --- a/otredis/provider.go +++ b/otredis/provider.go @@ -7,6 +7,7 @@ import ( "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/internal" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/go-redis/redis/v8" @@ -99,7 +100,7 @@ func provideRedisFactory(p in) (out, func()) { } base = RedisUniversalOptions{ - Addrs: config.EnvDefaultRedisAddrs, + Addrs: envDefaultRedisAddrs, } } full = redis.UniversalOptions{ @@ -188,7 +189,7 @@ func provideConfig() configOut { Data: map[string]interface{}{ "redis": map[string]RedisUniversalOptions{ "default": { - Addrs: config.EnvDefaultRedisAddrs, + Addrs: envDefaultRedisAddrs, }, }, "redisMetrics": metricsConf{ @@ -200,3 +201,5 @@ func provideConfig() configOut { } return configOut{Config: configs} } + +var envDefaultRedisAddrs, envDefaultRedisAddrsIsSet = internal.GetDefaultAddrsFromEnv("REDIS_ADDR", "127.0.0.1:6379") diff --git a/otredis/provider_test.go b/otredis/provider_test.go index fbaa9e8f..b3e75875 100644 --- a/otredis/provider_test.go +++ b/otredis/provider_test.go @@ -41,5 +41,5 @@ func TestProvideConfigs(t *testing.T) { k.Load(rawbytes.Provider(bytes), yaml.Parser()) k.Unmarshal("redis.default", &r) assert.Equal(t, 0, r.DB) - assert.Equal(t, config.EnvDefaultRedisAddrs, r.Addrs) + assert.Equal(t, envDefaultRedisAddrs, r.Addrs) } diff --git a/ots3/dependency.go b/ots3/dependency.go index b707d1b1..4c31fb29 100644 --- a/ots3/dependency.go +++ b/ots3/dependency.go @@ -7,12 +7,12 @@ import ( "net/url" "github.com/DoNewsCode/core/config" + "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/internal" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/opentracing/opentracing-go" - - "github.com/DoNewsCode/core/contract" ) /* @@ -161,11 +161,11 @@ func provideConfig() configOut { Data: map[string]interface{}{ "s3": map[string]S3Config{ "default": { - AccessKey: "", - AccessSecret: "", - Endpoint: "", - Region: "", - Bucket: "", + AccessKey: envDefaultS3AccessKey, + AccessSecret: envDefaultS3AccessSecret, + Endpoint: envDefaultS3Endpoint, + Region: envDefaultS3Region, + Bucket: envDefaultS3Bucket, CdnUrl: "", }, }}, @@ -174,3 +174,11 @@ func provideConfig() configOut { } return configOut{Config: configs} } + +var ( + envDefaultS3Endpoint, envDefaultS3EndpointIsSet = internal.GetDefaultAddrFromEnv("S3_ENDPOINT", "http://127.0.0.1:9000") + envDefaultS3AccessKey, _ = internal.GetDefaultAddrFromEnv("S3_ACCESSKEY", "minioadmin") + envDefaultS3AccessSecret, _ = internal.GetDefaultAddrFromEnv("S3_ACCESSSECRET", "minioadmin") + envDefaultS3Region, _ = internal.GetDefaultAddrFromEnv("S3_REGION", "asia") + envDefaultS3Bucket, _ = internal.GetDefaultAddrFromEnv("S3_BUCKET", "mybucket") +) diff --git a/ots3/example_test.go b/ots3/example_test.go index a5006bf0..cb170d40 100644 --- a/ots3/example_test.go +++ b/ots3/example_test.go @@ -13,10 +13,10 @@ func Example() { panic(err) } defer file.Close() - uploader := NewManager(os.Getenv("S3_ACCESSKEY"), os.Getenv("S3_ACCESSSECRET"), os.Getenv("S3_ENDPOINT"), os.Getenv("S3_REGION"), os.Getenv("S3_BUCKET")) - _ = uploader.CreateBucket(context.Background(), os.Getenv("S3_BUCKET")) + uploader := NewManager(envDefaultS3AccessKey, envDefaultS3AccessSecret, envDefaultS3Endpoint, envDefaultS3Region, envDefaultS3Bucket) + _ = uploader.CreateBucket(context.Background(), envDefaultS3Bucket) url, _ := uploader.Upload(context.Background(), "foo", file) - url = strings.Replace(url, os.Getenv("S3_ENDPOINT"), "http://example.org", 1) + url = strings.Replace(url, envDefaultS3Endpoint, "http://example.org", 1) fmt.Println(url) // Output: diff --git a/ots3/uploader_test.go b/ots3/uploader_test.go index 3381eb96..fd986eb2 100644 --- a/ots3/uploader_test.go +++ b/ots3/uploader_test.go @@ -34,11 +34,11 @@ func TestNewManager(t *testing.T) { } func TestMain(m *testing.M) { - if os.Getenv("S3_ENDPOINT") == "" { + if !envDefaultS3EndpointIsSet { fmt.Println("Set env S3_ENDPOINT to run ots3 tests") os.Exit(0) } - manager := NewManager(os.Getenv("S3_ACCESSKEY"), os.Getenv("S3_ACCESSSECRET"), os.Getenv("S3_ENDPOINT"), os.Getenv("S3_REGION"), os.Getenv("S3_BUCKET")) + manager := NewManager(envDefaultS3AccessKey, envDefaultS3AccessSecret, envDefaultS3Endpoint, envDefaultS3Region, envDefaultS3Bucket) _ = manager.CreateBucket(context.Background(), "foo") os.Exit(m.Run()) } @@ -67,7 +67,7 @@ func TestManager_CreateBucket(t *testing.T) { func TestManager_UploadFromUrl(t *testing.T) { tracer := mocktracer.New() m := setupManagerWithTracer(tracer) - _ = m.CreateBucket(context.Background(), os.Getenv("S3_BUCKET")) + _ = m.CreateBucket(context.Background(), envDefaultS3Bucket) newURL, err := m.UploadFromUrl(context.Background(), "https://avatars.githubusercontent.com/u/43054062") assert.NoError(t, err) assert.NotEmpty(t, newURL) @@ -80,11 +80,11 @@ func setupManager() *Manager { func setupManagerWithTracer(tracer opentracing.Tracer) *Manager { m := NewManager( - os.Getenv("S3_ACCESSKEY"), - os.Getenv("S3_ACCESSSECRET"), - os.Getenv("S3_ENDPOINT"), - os.Getenv("S3_REGION"), - os.Getenv("S3_BUCKET"), + envDefaultS3AccessKey, + envDefaultS3AccessSecret, + envDefaultS3Endpoint, + envDefaultS3Region, + envDefaultS3Bucket, WithTracer(tracer), ) return m diff --git a/queue/dependency_test.go b/queue/dependency_test.go index ccdfe884..e17e58ab 100644 --- a/queue/dependency_test.go +++ b/queue/dependency_test.go @@ -16,7 +16,7 @@ import ( type maker struct{} func (m maker) Make(name string) (redis.UniversalClient, error) { - return redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.EnvDefaultRedisAddrs}), nil + return redis.NewUniversalClient(&redis.UniversalOptions{Addrs: envDefaultRedisAddrs}), nil } func TestProvideDispatcher(t *testing.T) { diff --git a/queue/dispatcher_test.go b/queue/dispatcher_test.go index 96b50f0f..22f3af73 100644 --- a/queue/dispatcher_test.go +++ b/queue/dispatcher_test.go @@ -3,12 +3,12 @@ package queue import ( "context" "errors" + "fmt" "math/rand" "os" "testing" "time" - "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/events" "github.com/DoNewsCode/core/logging" @@ -52,12 +52,17 @@ type MockEvent struct { } func TestMain(m *testing.M) { + if !envDefaultRedisAddrsIsSet { + fmt.Println("Set env REDIS_ADDR to run queue tests") + os.Exit(0) + } + os.Exit(m.Run()) } func setUp() *QueueableDispatcher { s := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: config.EnvDefaultRedisAddrs, + Addrs: envDefaultRedisAddrs, }) driver := RedisDriver{ Logger: logging.NewLogger("logfmt"), @@ -84,7 +89,7 @@ func tearDown() { Waiting: "waiting", Timeout: "timeout", } - redisClient := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: config.EnvDefaultRedisAddrs}) + redisClient := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: envDefaultRedisAddrs}) redisClient.Del(context.Background(), channel.Delayed) redisClient.Del(context.Background(), channel.Failed) redisClient.Del(context.Background(), channel.Reserved) diff --git a/queue/redis_driver.go b/queue/redis_driver.go index add29f7e..3be088e8 100644 --- a/queue/redis_driver.go +++ b/queue/redis_driver.go @@ -3,11 +3,11 @@ package queue import ( "context" "fmt" + "github.com/DoNewsCode/core/internal" "math/rand" "sync" "time" - "github.com/DoNewsCode/core/config" "github.com/go-kit/kit/log" "github.com/go-redis/redis/v8" "github.com/pkg/errors" @@ -15,9 +15,9 @@ import ( // The Packer interface describes how to save the message in wire format type Packer interface { - // Compress serializes the message to bytes + // Marshal Compress serializes the message to bytes Marshal(message interface{}) ([]byte, error) - // Decompress reverses the bytes to message + // Unmarshal Decompress reverses the bytes to message Unmarshal(data []byte, message interface{}) error } @@ -251,7 +251,7 @@ func (r *RedisDriver) populateDefaults() { } if r.RedisClient == nil { r.RedisClient = redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: config.EnvDefaultRedisAddrs, + Addrs: envDefaultRedisAddrs, }) } if r.Packer == nil { @@ -288,3 +288,5 @@ func getRetryDuration(d time.Duration) time.Duration { } return d } + +var envDefaultRedisAddrs, envDefaultRedisAddrsIsSet = internal.GetDefaultAddrsFromEnv("REDIS_ADDR", "127.0.0.1:6379") From 453e0ad60bfba30c59c84ad09388f045fb064944 Mon Sep 17 00:00:00 2001 From: Trock Date: Wed, 12 May 2021 11:21:15 +0800 Subject: [PATCH 4/8] refactor: add internal and change env --- internal/env.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 internal/env.go diff --git a/internal/env.go b/internal/env.go new file mode 100644 index 00000000..a4014713 --- /dev/null +++ b/internal/env.go @@ -0,0 +1,22 @@ +package internal + +import ( + "os" + "strings" +) + +// GetDefaultAddrsFromEnv return addrs +func GetDefaultAddrsFromEnv(env, defaultVal string) ([]string, bool) { + if v := os.Getenv(env); v != "" { + return strings.Split(v, ","), true + } + return []string{defaultVal}, false +} + +// GetDefaultAddrFromEnv return addr/dsn +func GetDefaultAddrFromEnv(env, defaultVal string) (string, bool) { + if v := os.Getenv(env); v != "" { + return v, true + } + return defaultVal, false +} From 13b3f58d947266af9afd5a7b5a265cbd3b7b4614 Mon Sep 17 00:00:00 2001 From: Trock Date: Wed, 12 May 2021 11:27:20 +0800 Subject: [PATCH 5/8] style: imports format --- config/remote/remote_test.go | 2 +- dtx/sagas/mysqlstore/mysql_store_test.go | 2 +- queue/module_test.go | 5 +++-- queue/redis_driver.go | 2 +- queue/redis_driver_test.go | 5 +++-- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/config/remote/remote_test.go b/config/remote/remote_test.go index 2ca059d4..8b749500 100644 --- a/config/remote/remote_test.go +++ b/config/remote/remote_test.go @@ -3,12 +3,12 @@ package remote import ( "context" "fmt" - "github.com/DoNewsCode/core/internal" "os" "sync" "testing" "time" + "github.com/DoNewsCode/core/internal" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3" ) diff --git a/dtx/sagas/mysqlstore/mysql_store_test.go b/dtx/sagas/mysqlstore/mysql_store_test.go index 67ab870b..4fce5fc7 100644 --- a/dtx/sagas/mysqlstore/mysql_store_test.go +++ b/dtx/sagas/mysqlstore/mysql_store_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/DoNewsCode/core/internal" "os" "testing" "time" @@ -12,6 +11,7 @@ import ( "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/dtx" "github.com/DoNewsCode/core/dtx/sagas" + "github.com/DoNewsCode/core/internal" "github.com/DoNewsCode/core/otgorm" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" diff --git a/queue/module_test.go b/queue/module_test.go index 53cafe1d..d1d347dd 100644 --- a/queue/module_test.go +++ b/queue/module_test.go @@ -2,12 +2,13 @@ package queue import ( "context" + "testing" + "time" + "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/events" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" - "testing" - "time" ) func TestModule_ProvideCommand(t *testing.T) { diff --git a/queue/redis_driver.go b/queue/redis_driver.go index 3be088e8..1238ff52 100644 --- a/queue/redis_driver.go +++ b/queue/redis_driver.go @@ -3,11 +3,11 @@ package queue import ( "context" "fmt" - "github.com/DoNewsCode/core/internal" "math/rand" "sync" "time" + "github.com/DoNewsCode/core/internal" "github.com/go-kit/kit/log" "github.com/go-redis/redis/v8" "github.com/pkg/errors" diff --git a/queue/redis_driver_test.go b/queue/redis_driver_test.go index 8473dd66..8eebab0e 100644 --- a/queue/redis_driver_test.go +++ b/queue/redis_driver_test.go @@ -2,11 +2,12 @@ package queue_test import ( "context" + "sync" + "testing" + "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/events" "github.com/DoNewsCode/core/queue" - "sync" - "testing" ) func setUpInProcessQueueBenchmark(wg *sync.WaitGroup) (*queue.QueueableDispatcher, func()) { From 6ab088c336c3d061bc0d02185d49e2b678f56290 Mon Sep 17 00:00:00 2001 From: Trock Date: Wed, 12 May 2021 11:33:58 +0800 Subject: [PATCH 6/8] docs: add todo --- otkafka/example_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/otkafka/example_test.go b/otkafka/example_test.go index 7fb1caa4..967177c2 100644 --- a/otkafka/example_test.go +++ b/otkafka/example_test.go @@ -12,6 +12,7 @@ import ( ) func Example_reader() { + // todo set brokers from env, escape var conf = ` log: level: none From 3e101316cbfbbf6df1e48933bc55868ee1f142d2 Mon Sep 17 00:00:00 2001 From: Trock Date: Wed, 12 May 2021 14:02:31 +0800 Subject: [PATCH 7/8] fix: election_test env error --- leader/election_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/leader/election_test.go b/leader/election_test.go index 696f30aa..1991d7ec 100644 --- a/leader/election_test.go +++ b/leader/election_test.go @@ -16,11 +16,11 @@ import ( "go.uber.org/atomic" ) -var envDefaultRedisAddrs, envDefaultRedisAddrsIsSet = internal.GetDefaultAddrsFromEnv("REDIS_ADDR", "127.0.0.1:6379") +var envDefaultEtcdAddrs, envDefaultEtcdAddrsIsSet = internal.GetDefaultAddrsFromEnv("ETCD_ADDR", "127.0.0.1:2379") func TestMain(m *testing.M) { - if !envDefaultRedisAddrsIsSet { - fmt.Println("Set env REDIS_ADDR to run leader tests") + if !envDefaultEtcdAddrsIsSet { + fmt.Println("Set env ETCD_ADDR to run leader tests") os.Exit(0) } os.Exit(m.Run()) @@ -30,7 +30,7 @@ func TestElection(t *testing.T) { var dispatcher = &events.SyncDispatcher{} var e1, e2 Election - client, err := clientv3.New(clientv3.Config{Endpoints: envDefaultRedisAddrs}) + client, err := clientv3.New(clientv3.Config{Endpoints: envDefaultEtcdAddrs}) assert.NoError(t, err) e1 = Election{ dispatcher: dispatcher, From 953540817b80f73d8e8ec6b6fe07505831068aa7 Mon Sep 17 00:00:00 2001 From: Trock Date: Wed, 12 May 2021 15:18:50 +0800 Subject: [PATCH 8/8] refactor: change conf type to map[string]interface{} --- otkafka/example_test.go | 43 +++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/otkafka/example_test.go b/otkafka/example_test.go index 967177c2..4ea1bb19 100644 --- a/otkafka/example_test.go +++ b/otkafka/example_test.go @@ -3,32 +3,37 @@ package otkafka_test import ( "context" "fmt" + "os" + "strings" "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/otkafka" - "github.com/knadh/koanf/parsers/yaml" - "github.com/knadh/koanf/providers/rawbytes" + "github.com/knadh/koanf/providers/confmap" "github.com/segmentio/kafka-go" ) func Example_reader() { - // todo set brokers from env, escape - var conf = ` -log: - level: none -kafka: - reader: - default: - brokers: - - 127.0.0.1:9200 - topic: example - writer: - default: - brokers: - - 127.0.0.1:9200 - topic: example -` - c := core.Default(core.WithConfigStack(rawbytes.Provider([]byte(conf)), yaml.Parser())) + brokers := strings.Split(os.Getenv("KAFKA_ADDR"), ",") + conf := map[string]interface{}{ + "log": map[string]interface{}{ + "level": "none", + }, + "kafka": map[string]interface{}{ + "reader": map[string]interface{}{ + "default": otkafka.ReaderConfig{ + Brokers: brokers, + Topic: "example", + }, + }, + "writer": map[string]interface{}{ + "default": otkafka.WriterConfig{ + Brokers: brokers, + Topic: "example", + }, + }, + }, + } + c := core.Default(core.WithConfigStack(confmap.Provider(conf, "."), nil)) c.Provide(otkafka.Providers()) c.Invoke(func(writer *kafka.Writer) { err := writer.WriteMessages(context.Background(), kafka.Message{Value: []byte(`hello`)})