-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
284 lines (259 loc) · 6.72 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
// Copyright 2016 Giulio Iotti. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package kuradns
import (
"encoding/json"
"errors"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/dullgiulio/kuradns/cfg"
)
// Type of request
type reqtype int
const (
// Add a source
reqtypeAdd reqtype = iota
// Remove/delete a source
reqtypeDel
// Update a source
reqtypeUp
)
var (
// Error returned when too many requests are queued for processing.
errQueueFull = errors.New("queue full")
// Invalid request type.
errUnknownReqType = errors.New("unknown request type")
)
// response represents the value returned from a server operation.
type response error
// request includes a channel to get the response, the source to work on and the type of request.
type request struct {
resp chan response
src *source
rtype reqtype
}
// makeRequest allocates a request of type t and source src.
func makeRequest(src *source, t reqtype) request {
return request{
src: src,
resp: make(chan response),
rtype: t,
}
}
// done marks a request as processed.
func (r request) done() {
close(r.resp)
}
// fail marks a request as processed with error.
func (r request) fail(err error) {
r.resp <- err
}
// send tries to queue a request. An error is returned if the queue is full.
func (r request) send(ch chan<- request) error {
select {
case ch <- r:
return nil
default:
close(r.resp)
return errQueueFull
}
}
// String representation for a request
func (r request) String() string {
op := "unk"
switch r.rtype {
case reqtypeAdd:
op = "add"
case reqtypeDel:
op = "rem"
case reqtypeUp:
op = "update"
}
return fmt.Sprintf("%s '%s'", op, r.src.name)
}
// server is the single instance that manages all accesses to the repository and
// coordinates access to it, between both the DNS and HTTP interfaces.
type server struct {
verbose bool
fname string
srcs sources
repo repository
zone host
self host
soa *soa
ttl time.Duration
respPool sync.Pool
mux sync.RWMutex
requests chan request
}
// NewServer allocates a server instance. fname is the file where the session is restored
// and subsequently persisted; verbose controls the logging level; ttl the duration to
// apply to all DNS records served; self is the domain name of the local host.
func NewServer(fname, zone, self string, verbose bool, ttl time.Duration) *server {
s := &server{
fname: fname,
zone: host(zone),
self: host(self),
ttl: ttl,
verbose: verbose,
requests: make(chan request, 10), // TODO: buffering is a param
soa: newSoa(host(zone), host(self)),
repo: makeRepository(),
srcs: makeSources(),
}
go s.run()
if fname != "" {
s.restoreSources()
}
return s
}
// jsonSource represent the persisted list of sources
type jsonSource struct {
// Name of the source
Name string
// All configuration options as key value pairs
Conf map[string]string
}
// restoreSources reads the JSON file of the sources and restartes
// all sources found. If starting a source failed, an error is logged
// and the source ignored.
func (s *server) restoreSources() {
f, err := os.Open(s.fname)
if err != nil {
if !os.IsNotExist(err) {
log.Fatalf("cannot restore sources: %s", err)
}
return
}
defer f.Close()
// unset s.fname so that while we add source again, they are not persisted
// in an intermediate state.
s.mux.Lock()
fname := s.fname
s.fname = ""
s.mux.Unlock()
var jsrcs []jsonSource
if err := json.NewDecoder(f).Decode(&jsrcs); err != nil {
log.Printf("cannot restore sources, error decoding JSON: %s", err)
return
}
for _, v := range jsrcs {
stype := v.Conf["source.type"]
name := v.Name
if err := s.handleSourceAdd(name, stype, cfg.FromMap(v.Conf)); err != nil {
log.Printf("cannot restore source %s: %s", name, err)
}
}
// Re-enable persitance.
s.mux.Lock()
s.fname = fname
s.mux.Unlock()
}
// persistSources writes to fname the JSON with the sources
// currently configured and their configuration.
func (s *server) persistSources() {
s.mux.Lock()
defer s.mux.Unlock()
if s.fname == "" {
return
}
f, err := os.Create(s.fname)
if err != nil {
log.Printf("cannot persist sources: %s", err)
return
}
defer f.Close()
var i int
jsrcs := make([]jsonSource, len(s.srcs))
for _, v := range s.srcs {
jsrcs[i] = jsonSource{
Name: v.name,
Conf: v.conf.Map(),
}
i++
}
if err := json.NewEncoder(f).Encode(&jsrcs); err != nil {
log.Printf("cannot persist sources: %s", err)
return
}
if err != f.Sync(); err != nil {
log.Printf("cannot flush file: %s", err)
return
}
}
// cloneRepo safely creates and returns a full copy of the current repository.
func (s *server) cloneRepo() repository {
s.mux.RLock()
defer s.mux.RUnlock()
return s.repo.clone()
}
// setRepo atomically changes the repository used by the server with repo.
func (s *server) setRepo(repo repository) {
s.mux.Lock()
s.repo = repo
s.mux.Unlock()
}
// run serves requests queued on the requests channel. run logs errors and information if
// server is configured as verbose. run does not return.
func (s *server) run() {
for req := range s.requests {
switch req.rtype {
case reqtypeAdd:
if s.srcs.has(req.src.name) {
req.fail(fmt.Errorf("%s: source already exists", req.String()))
log.Printf("[error] sources: not added existing source %s", req.src.name)
continue
}
repo := s.cloneRepo()
repo.updateSource(req.src, s.zone, s.ttl)
s.setRepo(repo)
s.srcs[req.src.name] = req.src
if s.verbose {
log.Printf("[info] sources: added source %s", req.src.name)
}
case reqtypeDel:
if !s.srcs.has(req.src.name) {
req.fail(fmt.Errorf("%s: source not found", req.String()))
log.Printf("[error] sources: not removed non-existing source %s", req.src.name)
continue
}
repo := s.cloneRepo()
repo.deleteSource(req.src)
s.setRepo(repo)
delete(s.srcs, req.src.name)
if s.verbose {
log.Printf("[info] sources: deleted source %s", req.src.name)
}
case reqtypeUp:
if !s.srcs.has(req.src.name) {
req.fail(fmt.Errorf("%s: source not found", req.String()))
log.Printf("[error] sources: not updated non-existing source %s", req.src.name)
continue
}
src := s.srcs[req.src.name]
repo := s.cloneRepo()
repo.deleteSource(src)
if err := src.initGenerator(); err != nil {
src.err = err
req.fail(err)
log.Printf("[error] sources: %s", err)
continue
}
repo.updateSource(src, s.zone, s.ttl)
s.setRepo(repo)
if s.verbose {
log.Printf("[info] sources: updated source %s", src.name)
}
default:
req.fail(errUnknownReqType)
log.Printf("[error] unknown request type %d", req.rtype)
continue
}
s.persistSources()
req.done()
}
}