-
Notifications
You must be signed in to change notification settings - Fork 16
/
time.go
142 lines (122 loc) · 3.25 KB
/
time.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package kronos
import (
"context"
"errors"
"time"
"github.com/rubrikinc/kronos/kronoshttp"
"github.com/rubrikinc/kronos/kronosstats"
"github.com/rubrikinc/kronos/kronosutil/log"
"github.com/rubrikinc/kronos/server"
)
var kronosServer *server.Server
// Initialize initializes the kronos server.
// After Initialization, Now() in this package returns kronos time.
// If not initialized, Now() in this package returns system time
func Initialize(ctx context.Context, config server.Config) error {
// Stop previous server
if kronosServer != nil {
kronosServer.Stop()
}
var err error
kronosServer, err = server.NewKronosServer(ctx, config)
if err != nil {
return err
}
go func() {
if err := kronosServer.RunServer(ctx); err != nil {
log.Fatal(ctx, err)
}
}()
log.Info(ctx, "Kronos server initialized")
return nil
}
// Stop stops the kronos server
func Stop() {
if kronosServer != nil {
kronosServer.Stop()
log.Info(context.TODO(), "Kronos server stopped")
}
}
// IsActive returns whether kronos is running.
func IsActive() bool {
return kronosServer != nil
}
// Now returns Kronos time if Kronos is initialized, otherwise returns
// system time
func Now() int64 {
if kronosServer == nil {
log.Fatalf(context.TODO(), "Kronos server is not initialized")
}
// timePollInterval is the time to wait before internally retrying
// this function.
// This function blocks if not initialized or if KronosTime is stale
const timePollInterval = 100 * time.Millisecond
ctx := context.TODO()
for {
kt, err := kronosServer.KronosTimeNow(ctx)
if err != nil {
log.Errorf(
ctx,
"Failed to get KronosTime, err: %v. Sleeping for %s before retrying.",
err, timePollInterval,
)
time.Sleep(timePollInterval)
continue
}
return kt.Time
}
}
// Uptime returns Kronos uptime. This function can block if kronos uptime
// is invalid.
func Uptime() int64 {
if kronosServer == nil {
log.Fatalf(context.TODO(), "Kronos server is not initialized")
}
// timePollInterval is the time to wait before internally retrying
// this function.
// This function blocks if not initialized or if KronosTime is stale
const timePollInterval = 500 * time.Millisecond
ctx := context.TODO()
for {
ut, err := kronosServer.KronosUptimeNow(ctx)
if err != nil {
time.Sleep(timePollInterval)
continue
}
return ut.Uptime
}
}
// NodeID returns the NodeID of the kronos server in the kronos raft cluster.
// NodeID returns an empty string if kronosServer is not initialized
func NodeID(ctx context.Context) string {
if kronosServer == nil {
return ""
}
id, err := kronosServer.ID()
if err != nil {
log.Fatalf(ctx, "Failed to get kronosServer.ID, err: %v", err)
}
return id
}
// RemoveNode removes the given node from the kronos raft cluster
func RemoveNode(ctx context.Context, nodeID string) error {
if len(nodeID) == 0 {
return errors.New("node id is empty")
}
log.Infof(ctx, "Removing kronos node %s", nodeID)
client, err := kronosServer.NewClusterClient()
if err != nil {
return err
}
defer client.Close()
return client.RemoveNode(ctx, &kronoshttp.RemoveNodeRequest{
NodeID: nodeID,
})
}
// Metrics returns KronosMetrics
func Metrics() *kronosstats.KronosMetrics {
if kronosServer == nil {
return nil
}
return kronosServer.Metrics
}