-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathconfig.go
126 lines (117 loc) · 3.89 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"flag"
"log"
"os"
"strings"
"github.com/andviro/grayproxy/pkg/disk"
"github.com/andviro/grayproxy/pkg/dummy"
"github.com/andviro/grayproxy/pkg/http"
"github.com/andviro/grayproxy/pkg/loki"
"github.com/andviro/grayproxy/pkg/tcp"
"github.com/andviro/grayproxy/pkg/tls"
"github.com/andviro/grayproxy/pkg/udp"
"github.com/andviro/grayproxy/pkg/ws"
"github.com/pkg/errors"
)
const (
maxChunkSize = 8192
assembleTimeout = 1000
stopTimeout = 2000
decompressSizeLimit = 1048576
diskFileSize = 104857600
)
type urlList []string
func (ul *urlList) Set(val string) error {
*ul = append(*ul, val)
return nil
}
func (ul *urlList) String() string {
return strings.Join(*ul, ",")
}
func (app *app) newListener(addr string) listener {
switch {
case strings.HasPrefix(addr, "udp://"):
return &udp.Listener{
Address: strings.TrimPrefix(addr, "udp://"),
MaxChunkSize: maxChunkSize,
MaxMessageSize: -1,
DecompressSizeLimit: decompressSizeLimit,
AssembleTimeout: assembleTimeout,
}
case strings.HasPrefix(addr, "http://"):
l := new(http.Listener)
l.Address = strings.TrimPrefix(addr, "http://")
l.StopTimeout = stopTimeout
return l
}
return &tcp.Listener{Address: strings.TrimPrefix(addr, "tcp://")}
}
func (app *app) configure() error {
fs := flag.NewFlagSet("grayproxy", flag.ExitOnError)
fs.Var(&app.inputURLs, "in", "input address in form schema://address:port (may be specified multiple times). Default: udp://:12201")
fs.Var(&app.outputURLs, "out", "output address in form schema://address:port (may be specified multiple times)")
fs.BoolVar(&app.verbose, "v", false, "echo received logs on console")
fs.IntVar(&app.sendTimeout, "sendTimeout", 1000, "maximum TCP or HTTP output timeout (ms)")
fs.StringVar(&app.dataDir, "dataDir", "", "buffer directory (defaults to no buffering)")
if err := fs.Parse(os.Args[1:]); err != nil {
return errors.Wrap(err, "parsing command-line")
}
if len(app.inputURLs) == 0 {
app.inputURLs = urlList{"udp://:12201"}
}
app.ins = make([]listener, len(app.inputURLs))
for i, v := range app.inputURLs {
app.ins[i] = app.newListener(v)
log.Printf("Added input %d at %s", i, v)
}
if len(app.outputURLs) == 0 {
log.Print("WARNING: no outputs configured")
}
app.outs = make([]sender, 0, len(app.outputURLs))
app.sendErrors = make([]error, len(app.outputURLs))
for i, v := range app.outputURLs {
log.Printf("adding output %d: %s", i, v)
switch {
case strings.HasPrefix(v, "http://") || strings.HasPrefix(v, "https://"):
if strings.HasSuffix(v, "/api/prom/push") {
ls, err := loki.New(v)
if err != nil {
log.Println("could not create loki output: ", err.Error())
continue
}
app.outs = append(app.outs, ls)
break
}
app.outs = append(app.outs, &http.Sender{Address: v, SendTimeout: app.sendTimeout})
case strings.HasPrefix(v, "ws://"):
wss := &ws.Sender{Address: v}
if err := wss.Start(); err != nil {
log.Println("Invalid websocket URL: ", err.Error())
break
}
app.outs = append(app.outs, wss)
case strings.HasPrefix(v, "udp://"):
app.outs = append(app.outs, &udp.Sender{Address: strings.TrimPrefix(v, "udp://"), SendTimeout: app.sendTimeout})
case strings.HasPrefix(v, "tls://"):
app.outs = append(app.outs, &tls.Sender{Address: strings.TrimPrefix(v, "tls://"), SendTimeout: app.sendTimeout})
default:
app.outs = append(app.outs, &tcp.Sender{Address: strings.TrimPrefix(v, "tcp://"), SendTimeout: app.sendTimeout})
}
}
if app.dataDir == "" {
app.q = dummy.New()
log.Println("Buffering is not configured, unsent messages will be lost")
return nil
}
stat, err := os.Stat(app.dataDir)
if err != nil {
return errors.Wrap(err, "checking buffer directory")
}
if !stat.IsDir() {
return errors.Errorf("%q is not a directory", app.dataDir)
}
q, err := disk.New(app.dataDir, diskFileSize)
app.q = q
return err
}