-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathjob_manager.go
225 lines (194 loc) · 6.06 KB
/
job_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package main
import (
"context"
"fmt"
"log"
"net/http"
"sync"
"time"
"gopkg.in/yaml.v2"
)
// A Job is an interface to a single instance of a gatherer
type Job interface {
StartJob()
}
// JobConfig is an interface to provide a generic configuration for a Job
type JobConfig interface {
GetJobName() string
}
// JobManager holds various things needed to run and manage all jobs
type JobManager struct {
ctx context.Context
wg *sync.WaitGroup
storage *Storage
httpclient *http.Client
jobs []interface{}
globalTags map[string]string
serviceConfig ServiceConfig
}
// NewJobManager creates a populated JobManager object with configured http.Client
func NewJobManager(ctx context.Context, wg *sync.WaitGroup, s *Storage, serviceConfig ServiceConfig) (*JobManager, error) {
var err error
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// We have to disable keep-alives to keep our server connection time
// measurements accurate
DisableKeepAlives: true,
}
var requestTimeout time.Duration
if serviceConfig.General.RequestTimeout == "" {
requestTimeout = 15 * time.Second
} else {
requestTimeout, err = time.ParseDuration(serviceConfig.General.RequestTimeout)
if err != nil {
log.Fatalln("could not parse request timeout duration in config:", err)
}
}
if serviceConfig.General.UserAgent == "" {
serviceConfig.General.UserAgent = "crabby/1.0"
}
client := &http.Client{
Transport: tr,
Timeout: requestTimeout,
}
defer tr.CloseIdleConnections()
return &JobManager{
ctx: ctx,
wg: wg,
storage: s,
httpclient: client,
globalTags: serviceConfig.General.Tags,
serviceConfig: serviceConfig,
}, nil
}
// Run attempts to build and launch all configured jobs
func (jm *JobManager) Run() error {
err := jm.BuildJobs()
if err != nil {
return fmt.Errorf("Unable to build jobs for JobManager: %v", err)
}
jm.StartJobs()
return nil
}
// BuildJobs assembles properly-configured jobs for the JobManager
func (jm *JobManager) BuildJobs() error {
log.Println("Building jobs...")
// To support multiple styles of job configurations within our YAML configuration,
// we unmarshal the `jobs` YAML struct to a MetaJobsConfig struct in config.go, which
// contains members for every kind of configuration that might be found in the `jobs`
// struct in our config file. To turn those into the proper gather-specific job
// configs, we use a little trick: we remarshall the struct that config.go
// parsed back into YAML... and then we unmarshall it again into the type of job
// config that we actually want.
for _, j := range jm.serviceConfig.Jobs {
switch j.Type {
case "simple":
jc := new(SimpleJobConfig)
remarshalled, _ := yaml.Marshal(j)
err := yaml.Unmarshal(remarshalled, jc)
if err != nil {
return fmt.Errorf("unable to marshall simple job %v: %v", j.Step.Name, err)
}
jm.jobs = append(jm.jobs, jm.newJob(jc))
case "selenium":
jc := new(SeleniumJobConfig)
remarshalled, _ := yaml.Marshal(j)
err := yaml.Unmarshal(remarshalled, jc)
if err != nil {
return fmt.Errorf("unable to marshall simple job %v: %v", j.Step.Name, err)
}
jm.jobs = append(jm.jobs, jm.newJob(jc))
case "api":
jc := new(APIJobConfig)
remarshalled, _ := yaml.Marshal(j)
err := yaml.Unmarshal(remarshalled, jc)
if err != nil {
return fmt.Errorf("unable to marshall api job %v: %v", j.Steps[0].Name, err)
}
jm.jobs = append(jm.jobs, jm.newJob(jc))
default:
return fmt.Errorf("job type was not specified for job %v. Add a 'type: <jobtype>' to this job's configuration", j.Step.Name)
}
}
return nil
}
// StartJobs starts all active jobs
func (jm *JobManager) StartJobs() {
log.Println("Starting jobs...")
for _, j := range jm.jobs {
switch j.(type) {
case *SimpleJob:
go j.(*SimpleJob).StartJob()
case *SeleniumJob:
go j.(*SeleniumJob).StartJob()
case *APIJob:
go j.(*APIJob).StartJob()
}
}
}
// newJob creates a new Job of the appropriate type for the chosen gatherer
func (jm *JobManager) newJob(jobconfig JobConfig) Job {
log.Println("Creating job", jobconfig.GetJobName())
switch c := jobconfig.(type) {
case *SimpleJobConfig:
jobconfig.(*SimpleJobConfig).Tags = mergeTags(jobconfig.(*SimpleJobConfig).Tags, jm.serviceConfig.General.Tags)
return &SimpleJob{
config: *c,
wg: jm.wg,
ctx: jm.ctx,
storage: jm.storage,
client: jm.httpclient,
}
case *SeleniumJobConfig:
c.seleniumServer = jm.serviceConfig.Selenium.URL
jobconfig.(*SeleniumJobConfig).Tags = mergeTags(jobconfig.(*SeleniumJobConfig).Tags, jm.serviceConfig.General.Tags)
return &SeleniumJob{
config: *c,
wg: jm.wg,
ctx: jm.ctx,
storage: jm.storage,
}
case *APIJobConfig:
jobconfig.(*APIJobConfig).Tags = mergeTags(jobconfig.(*APIJobConfig).Tags, jm.serviceConfig.General.Tags)
return &APIJob{
config: *c,
wg: jm.wg,
ctx: jm.ctx,
storage: jm.storage,
client: jm.httpclient,
}
default:
return &NoOpJob{}
}
}
func mergeTags(jobTags map[string]string, globalTags map[string]string) map[string]string {
mergedTags := make(map[string]string)
// If we don't have any global tags or job tags, just return an empty map
if len(jobTags) == 0 && len(globalTags) == 0 {
return mergedTags
}
for k, v := range jobTags {
mergedTags[k] = v
}
for k, v := range globalTags {
// Add the global tags to the merged tags, but only if they weren't overriden by a job tag
_, present := mergedTags[k]
if !present {
mergedTags[k] = v
}
}
return mergedTags
}
// NoOpJobConfig defines a job configuration that does nothing. Used to detect invalid job types.
type NoOpJobConfig struct {
}
// GetJobName does nothing. Used to detect invalid job types
func (c *NoOpJobConfig) GetJobName() string {
return ""
}
// NoOpJob defines a job that does nothing. Used to detect invalid job types.
type NoOpJob struct{}
// StartJob does nothing. Used to detect invalid job types.
func (n *NoOpJob) StartJob() {}