Skip to content

Commit

Permalink
Add more config, flags and cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
  • Loading branch information
kavirajk committed Sep 6, 2022
1 parent 3ca7390 commit 3ef4eb9
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 51 deletions.
45 changes: 22 additions & 23 deletions cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"crypto/tls"
"net/http"
"net/url"
"os/signal"

"github.com/go-kit/log"
Expand Down Expand Up @@ -42,6 +41,7 @@ func main() {
sValue := flag.String("streamvalue", "stdout", "The unique stream value for this instance of loki-canary to use in the log selector")
port := flag.Int("port", 3500, "Port which loki-canary should expose metrics")
addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100")
push := flag.Bool("push", false, "Push the logs to given Loki address inaddition to writing to stdout")
useTLS := flag.Bool("tls", false, "Does the loki connection use TLS?")
certFile := flag.String("cert-file", "", "Client PEM encoded X.509 certificate for optional use with TLS connection to Loki")
keyFile := flag.String("key-file", "", "Client PEM encoded X.509 key for optional use with TLS connection to Loki")
Expand Down Expand Up @@ -128,28 +128,27 @@ func main() {
w io.Writer
)

scheme := "http"
if *useTLS {
scheme = "https"
}

httpListen, err := url.ParseRequestURI(fmt.Sprintf("%s://%s", scheme, *addr))
if err != nil {
fmt.Fprintf(os.Stderr, "given loki URL is invalid: %s", err)
}

w, err = writer.NewPush(
httpListen.String(),
*tenantID,
*writeTimeout,
config.DefaultHTTPClientConfig,
*lName, *lVal,
*sName, *sValue,
log.NewLogfmtLogger(os.Stdout),
)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Unable to create writer for Loki, check config: %s", err)
os.Exit(1)
w = os.Stdout

if *push {
push, err := writer.NewPush(
*addr,
*tenantID,
*writeTimeout,
config.DefaultHTTPClientConfig,
*lName, *lVal,
*sName, *sValue,
tlsConfig,
*caFile,
*user, *pass,
log.NewLogfmtLogger(os.Stdout),
)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Unable to create writer for Loki, check config: %s", err)
os.Exit(1)
}

w = io.MultiWriter(os.Stdout, push) // writes to both stdout and push to Loki directly.
}

c.writer = writer.NewWriter(w, sentChan, *interval, *outOfOrderMin, *outOfOrderMax, *outOfOrderPercentage, *size)
Expand Down
63 changes: 39 additions & 24 deletions pkg/canary/writer/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/prometheus/common/model"
)

var p io.WriteCloser = &Push{}
var p io.Writer = &Push{}

const (
defaultContentType = "application/x-protobuf"
Expand All @@ -41,44 +41,57 @@ var (
// directly to the given loki server URL. Each `Push` instance handles for a single tenant.
// TODO(kavi): Add batching?
type Push struct {
lokiURL string
tenantID string
httpClient *http.Client
userAgent string
contentType string
logger log.Logger
useTLs bool
clientTLSConfig *tls.Config
caFile string
lokiURL string
tenantID string
httpClient *http.Client
userAgent string
contentType string
logger log.Logger

// auth
user, password string
username, password string

// Will add these label to the logs pushed to loki
labelName, labelValue, streamName, streamValue string
}

func NewPush(
lokiURL, tenantID string,
lokiAddr, tenantID string,
timeout time.Duration,
cfg config.HTTPClientConfig,
labelName, labelValue string,
streamName, streamValue string,
tlsCfg *tls.Config,
caFile string,
username, password string,
logger log.Logger,
) (*Push, error) {

u, err := url.ParseRequestURI(lokiURL)
if err != nil {
return nil, fmt.Errorf("given Loki URL(%q) is invalid: %w", lokiURL, err)
}

client, err := config.NewClientFromConfig(cfg, "canary-push", config.WithHTTP2Disabled())
if err != nil {
return nil, err
}

client.Timeout = timeout
u.Path = pushEndpoint
scheme := "http"

// setup tls transport
if tlsCfg != nil {
rt, err := config.NewTLSRoundTripper(tlsCfg, caFile, func(tls *tls.Config) (http.RoundTripper, error) {
return &http.Transport{TLSClientConfig: tls}, nil
})
if err != nil {
return nil, fmt.Errorf("failed to create TLS config for transport: %w", err)
}
client.Transport = rt
scheme = "https"
}

u := url.URL{
Scheme: scheme,
Host: lokiAddr,
Path: pushEndpoint,
}

return &Push{
lokiURL: u.String(),
Expand All @@ -91,6 +104,8 @@ func NewPush(
labelValue: labelValue,
streamName: streamName,
streamValue: streamValue,
username: username,
password: password,
}, nil
}

Expand All @@ -104,12 +119,6 @@ func (p *Push) Write(payload []byte) (int, error) {
return len(payload), nil
}

// Close makes sure the pending buffer is pushed to `loki` before
// returning. It's the responsibility of the client to call Close.
func (p *Push) Close() error {
return nil
}

func (p *Push) parsePayload(payload []byte) (*logproto.PushRequest, error) {
// payload that is sent by the `writer` will be in format `LogEntry`
var (
Expand Down Expand Up @@ -168,10 +177,16 @@ func (p *Push) send(ctx context.Context, payload []byte) error {
req.Header.Set("Content-Type", p.contentType)
req.Header.Set("User-Agent", p.userAgent)

// set org-id
if p.tenantID != "" {
req.Header.Set("X-Scope-OrgID", p.tenantID)
}

// basic auth if provided
if p.username != "" {
req.SetBasicAuth(p.username, p.password)
}

resp, err := p.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to push payload: %w", err)
Expand Down
16 changes: 14 additions & 2 deletions pkg/canary/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"strconv"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

const (
Expand All @@ -25,9 +28,17 @@ type Writer struct {
pad string
quit chan struct{}
done chan struct{}

logger log.Logger
}

func NewWriter(writer io.Writer, sentChan chan time.Time, entryInterval, outOfOrderMin, outOfOrderMax time.Duration, outOfOrderPercentage, entrySize int) *Writer {
func NewWriter(
writer io.Writer,
sentChan chan time.Time,
entryInterval, outOfOrderMin, outOfOrderMax time.Duration,
outOfOrderPercentage, entrySize int,
logger log.Logger,
) *Writer {

w := &Writer{
w: writer,
Expand All @@ -40,6 +51,7 @@ func NewWriter(writer io.Writer, sentChan chan time.Time, entryInterval, outOfOr
prevTsLen: 0,
quit: make(chan struct{}),
done: make(chan struct{}),
logger: logger,
}

go w.run()
Expand Down Expand Up @@ -85,7 +97,7 @@ func (w *Writer) run() {

_, err := fmt.Fprintf(w.w, LogEntry, ts, w.pad)
if err != nil {
panic(err)
level.Error(w.logger).Log("msg", "failed to write log entry", "error", err)
}
w.sent <- t
case <-w.quit:
Expand Down
2 changes: 0 additions & 2 deletions pkg/logcli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ func (c *DefaultClient) wsConnect(path, query string, quiet bool) (*websocket.Co
us = strings.Replace(us, "http", "ws", 1)
}

fmt.Println("Websocket URL", us)

if !quiet {
log.Println(us)
}
Expand Down

0 comments on commit 3ef4eb9

Please sign in to comment.