-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgaia.go
197 lines (172 loc) · 4.09 KB
/
gaia.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package gaia
import (
"context"
"errors"
"os"
"os/signal"
"sync"
"github.com/apus-run/sea-kit/log"
"golang.org/x/sync/errgroup"
"github.com/apus-run/gaia/registry"
"github.com/apus-run/gaia/transport"
)
type AppInfo interface {
ID() string
Name() string
Version() string
Metadata() map[string]string
Endpoint() []string
}
type Gaia struct {
opts *options
ctx context.Context
cancel func()
mu sync.Mutex
instance *registry.ServiceInstance
}
// New create an application lifecycle manager.
func New(opts ...Option) *Gaia {
o := Apply(opts...)
if o.logger != nil {
log.SetLogger(o.logger)
}
ctx, cancel := context.WithCancel(o.ctx)
return &Gaia{
ctx: ctx,
cancel: cancel,
opts: o,
}
}
// ID returns app instance id.
func (a *Gaia) ID() string { return a.opts.id }
// Name returns service name.
func (a *Gaia) Name() string { return a.opts.name }
// Version returns app version.
func (a *Gaia) Version() string { return a.opts.version }
// Metadata returns service metadata.
func (a *Gaia) Metadata() map[string]string { return a.opts.metadata }
// Endpoint returns endpoints.
func (a *Gaia) Endpoint() []string {
if a.instance != nil {
return a.instance.Endpoints
}
return nil
}
// Run executes all OnStart hooks registered with the application's Lifecycle.
func (a *Gaia) Run() error {
// build service instance
instance, err := a.buildInstance()
if err != nil {
return err
}
a.mu.Lock()
a.instance = instance
a.mu.Unlock()
eg, ctx := errgroup.WithContext(NewContext(a.ctx, a))
wg := sync.WaitGroup{}
for _, fn := range a.opts.beforeStart {
if err = fn(ctx); err != nil {
return err
}
}
for _, srv := range a.opts.servers {
srv := srv
eg.Go(func() error {
<-ctx.Done() // wait for stop signal
stopCtx, cancel := context.WithTimeout(NewContext(a.opts.ctx, a), a.opts.stopTimeout)
defer cancel()
return srv.Stop(stopCtx)
})
wg.Add(1)
eg.Go(func() error {
wg.Done()
return srv.Start(ctx)
})
}
wg.Wait()
// register service
if a.opts.registry != nil {
c, cancel := context.WithTimeout(ctx, a.opts.registryTimeout)
defer cancel()
if err := a.opts.registry.Register(c, instance); err != nil {
return err
}
}
for _, fn := range a.opts.afterStart {
if err = fn(ctx); err != nil {
return err
}
}
// watch signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, a.opts.sigs...)
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case <-quit:
return a.Stop()
}
})
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}
// Stop gracefully stops the application.
func (a *Gaia) Stop() (err error) {
ctx := NewContext(a.ctx, a)
for _, fn := range a.opts.beforeStop {
err = fn(ctx)
}
// deregister instance
a.mu.Lock()
instance := a.instance
a.mu.Unlock()
if a.opts.registry != nil && instance != nil {
ctx, cancel := context.WithTimeout(NewContext(a.ctx, a), a.opts.registryTimeout)
defer cancel()
if err := a.opts.registry.Deregister(ctx, instance); err != nil {
return err
}
}
// cancel app
if a.cancel != nil {
a.cancel()
}
return err
}
func (a *Gaia) buildInstance() (*registry.ServiceInstance, error) {
endpoints := make([]string, 0, len(a.opts.endpoints))
for _, e := range a.opts.endpoints {
endpoints = append(endpoints, e.String())
}
if len(endpoints) == 0 {
for _, srv := range a.opts.servers {
if r, ok := srv.(transport.Endpointer); ok {
e, err := r.Endpoint()
if err != nil {
return nil, err
}
endpoints = append(endpoints, e.String())
}
}
}
return ®istry.ServiceInstance{
ID: a.opts.id,
Name: a.opts.name,
Version: a.opts.version,
Metadata: a.opts.metadata,
Endpoints: endpoints,
}, nil
}
type appKey struct{}
// NewContext returns a new Context that carries value.
func NewContext(ctx context.Context, s AppInfo) context.Context {
return context.WithValue(ctx, appKey{}, s)
}
// FromContext returns the Transport value stored in ctx, if any.
func FromContext(ctx context.Context) (s AppInfo, ok bool) {
s, ok = ctx.Value(appKey{}).(AppInfo)
return
}