-
Notifications
You must be signed in to change notification settings - Fork 0
/
dependency.go
160 lines (140 loc) · 3.72 KB
/
dependency.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package otfranz
import (
"fmt"
"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/contract"
"github.com/DoNewsCode/core/di"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/twmb/franz-go/pkg/kgo"
)
/*
Providers is a set of dependencies including Factory,Maker and exported configs.
Depends On:
opentracing.Tracer
contract.ConfigUnmarshaler
log.Logger
contract.Dispatcher
Provide:
Factory
Maker
*/
func Providers(optionFunc ...ProvidersOptionFunc) di.Deps {
option := providersOption{
interceptor: func(name string, conf *Config) {},
}
for _, f := range optionFunc {
f(&option)
}
return di.Deps{
provideKafkaFactory(&option),
provideConfig,
di.Bind(new(Factory), new(Maker)),
}
}
// Maker models a Factory
type Maker interface {
Make(name string) (*kgo.Client, error)
}
// factoryIn is an injection parameter for provideFactory.
type factoryIn struct {
di.In
Tracer opentracing.Tracer `optional:"true"`
Conf contract.ConfigUnmarshaler
Logger log.Logger
Dispatcher contract.Dispatcher `optional:"true"`
}
// factoryOut is the result of provideKafkaFactory.
type factoryOut struct {
di.Out
Factory Factory
Client *kgo.Client
}
// Module implements di.Modular
func (f factoryOut) Module() interface{} {
return f
}
// provideKafkaFactory creates the Factory. It is valid dependency option for package core.
// Note: when working with package core's DI container, use provideKafkaFactory over provideFactory.
// Not only provideKafkaFactory provides *kgo.Client, but also only provideKafkaFactory
// exports default Kafka configuration.
func provideKafkaFactory(option *providersOption) func(p factoryIn) (factoryOut, func(), error) {
if option.interceptor == nil {
option.interceptor = func(name string, config *Config) {}
}
return func(p factoryIn) (factoryOut, func(), error) {
factory, rc := provideFactory(p, option.interceptor)
cli, err := factory.Make("default")
if err != nil {
level.Warn(p.Logger).Log("err", err)
}
if p.Dispatcher != nil {
if option.reloadable {
factory.SubscribeReloadEventFrom(p.Dispatcher)
}
}
return factoryOut{
Factory: factory,
Client: cli,
}, rc, nil
}
}
// provideFactory creates the Factory. It is valid dependency option for package core.
func provideFactory(p factoryIn, interceptor Interceptor) (Factory, func()) {
factory := di.NewFactory(func(name string) (di.Pair, error) {
var (
err error
conf Config
)
err = p.Conf.Unmarshal(fmt.Sprintf("kafka.%s", name), &conf)
if err != nil {
return di.Pair{}, fmt.Errorf("kafka configuration %s not valid: %w", name, err)
}
if p.Logger != nil {
var logLevel string
err = p.Conf.Unmarshal("log.level", &logLevel)
if err != nil {
logLevel = "debug"
}
conf.Logger = FranzLogAdapter(logLevel, p.Logger)
}
interceptor(name, &conf)
// converts Config to []kgo.Opt
opts := fromConfig(conf)
client, err := kgo.NewClient(opts...)
if err != nil {
return di.Pair{}, err
}
return di.Pair{
Conn: client,
Closer: func() {
client.Close()
},
}, nil
})
return Factory{factory}, factory.Close
}
type configOut struct {
di.Out
Config []config.ExportedConfig `group:"config,flatten"`
}
func provideConfig() configOut {
configs := []config.ExportedConfig{
{
Owner: "otfranz",
Data: map[string]interface{}{
"kafka": map[string]interface{}{
"default": Config{
SeedBrokers: []string{"127.0.0.1:9092"},
DefaultProduceTopic: "example",
Topics: []string{"example"},
Group: "example",
},
},
},
Comment: "The configuration of kafka with franz-go",
},
}
return configOut{Config: configs}
}