diff --git a/CHANGELOG.md b/CHANGELOG.md index d33f1864a0cf..d8bd5616f1d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ #### Loki ##### Enhancements +* [7063](https://github.com/grafana/loki/pull/7063) **kavirajk**: Add additional `push` mode to Loki canary that can directly push logs to given Loki URL. * [7069](https://github.com/grafana/loki/pull/7069) **periklis**: Add support for custom internal server listener for readiness probes. * [7023](https://github.com/grafana/loki/pull/7023) **liguozhong**: logql engine support exec `vector(0)` grammar. * [6983](https://github.com/grafana/loki/pull/6983) **slim-bean**: `__timestamp__` and `__line__` are now available in the logql `label_format` query stage. diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index 4c308ccc42d8..881412a25b8b 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "io" "os" "strconv" "sync" @@ -13,6 +14,8 @@ import ( "net/http" "os/signal" + "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/config" "github.com/prometheus/common/version" @@ -23,6 +26,12 @@ import ( _ "github.com/grafana/loki/pkg/util/build" ) +const ( + defaultMinBackoff = 500 * time.Millisecond + defaultMaxBackoff = 5 * time.Minute + defaultMaxRetries = 10 +) + type canary struct { lock sync.Mutex @@ -39,13 +48,18 @@ func main() { sValue := flag.String("streamvalue", "stdout", "The unique stream value for this instance of loki-canary to use in the log selector") port := flag.Int("port", 3500, "Port which loki-canary should expose metrics") addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100") + push := flag.Bool("push", false, "Push the logs directly to given Loki address") useTLS := flag.Bool("tls", false, "Does the loki connection use TLS?") certFile := flag.String("cert-file", "", "Client PEM encoded X.509 certificate for optional use with TLS connection to Loki") keyFile := flag.String("key-file", "", "Client PEM encoded X.509 key for optional use with TLS connection to Loki") caFile := flag.String("ca-file", "", "Client certificate authority for optional use with TLS connection to Loki") user := flag.String("user", "", "Loki username.") - pass := flag.String("pass", "", "Loki password.") + pass := flag.String("pass", "", "Loki password. This credential should have both read and write permissions to Loki endpoints") tenantID := flag.String("tenant-id", "", "Tenant ID to be set in X-Scope-OrgID header.") + writeTimeout := flag.Duration("write-timeout", 10*time.Second, "How long to wait write response from Loki") + writeMinBackoff := flag.Duration("write-min-backoff", defaultMinBackoff, "Initial backoff time before first retry ") + writeMaxBackoff := flag.Duration("write-max-backoff", defaultMaxBackoff, "Maximum backoff time between retries ") + writeMaxRetries := flag.Int("write-max-retries", defaultMaxRetries, "Maximum number of retries when push a log entry ") queryTimeout := flag.Duration("query-timeout", 10*time.Second, "How long to wait for a query response from Loki") interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries") @@ -112,6 +126,9 @@ func main() { sentChan := make(chan time.Time) receivedChan := make(chan time.Time) + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + logger = log.With(logger, "caller", log.Caller(3)) + c := &canary{} startCanary := func() { c.stop() @@ -119,8 +136,41 @@ func main() { c.lock.Lock() defer c.lock.Unlock() - var err error - c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *outOfOrderMin, *outOfOrderMax, *outOfOrderPercentage, *size) + var ( + err error + w io.Writer + ) + + w = os.Stdout + + if *push { + backoffCfg := backoff.Config{ + MinBackoff: *writeMinBackoff, + MaxBackoff: *writeMaxBackoff, + MaxRetries: *writeMaxRetries, + } + push, err := writer.NewPush( + *addr, + *tenantID, + *writeTimeout, + config.DefaultHTTPClientConfig, + *lName, *lVal, + *sName, *sValue, + tlsConfig, + *caFile, + *user, *pass, + &backoffCfg, + log.NewLogfmtLogger(os.Stdout), + ) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Unable to create writer for Loki, check config: %s", err) + os.Exit(1) + } + + w = push + } + + c.writer = writer.NewWriter(w, sentChan, *interval, *outOfOrderMin, *outOfOrderMax, *outOfOrderPercentage, *size, logger) c.reader, err = reader.NewReader(os.Stderr, receivedChan, *useTLS, tlsConfig, *caFile, *addr, *user, *pass, *tenantID, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "Unable to create reader for Loki querier, check config: %s", err) diff --git a/docs/sources/operations/loki-canary.md b/docs/sources/operations/loki-canary.md index a27fb2317cfe..051800654828 100644 --- a/docs/sources/operations/loki-canary.md +++ b/docs/sources/operations/loki-canary.md @@ -328,13 +328,15 @@ All options: -out-of-order-percentage int Percentage (0-100) of log entries that should be sent out of order -pass string - Loki password + Loki password. This credential should have both read and write permissions to Loki endpoints -port int Port which Loki Canary should expose metrics (default 3500) -pruneinterval duration Frequency to check sent versus received logs, and also the frequency at which queries for missing logs will be dispatched to Loki, and the frequency spot check queries are run (default 1m0s) + -push + Push the logs directly to given Loki address -query-timeout duration How long to wait for a query response from Loki (default 10s) -size int @@ -365,4 +367,10 @@ All options: Print this build's version information -wait duration Duration to wait for log entries before reporting them as lost (default 1m0s) + -write-max-backoff duration + Maximum backoff time between retries (default 5m0s) + -write-max-retries int + Maximum number of retries when push a log entry (default 10) + -write-min-backoff duration + Initial backoff time before first retry (default 500ms) ``` diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index 71f5df0538fb..45f5ef763ed7 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -33,6 +33,10 @@ The output is incredibly verbose as it shows the entire internal config struct u ### Loki +### Loki Canary Permission +We introduced new `push` mode to [Loki canary](https://grafana.com/docs/loki/latest/operations/loki-canary/), that can push logs generated by Loki canary directly to given Loki URL (previously it just writes to local file and you need some agent like promtail to scrape and push it to Loki). +So if you run Loki behind some proxy with different authorization policies to read and write to Loki, then auth credentials we pass to Loki canary now needs to have both `READ` and `WRITE` permissions. + ### Engine query timeout is deprecated Previously, we had two configurations to define a query timeout: `engine.timeout` and `querier.query-timeout`. diff --git a/pkg/canary/writer/push.go b/pkg/canary/writer/push.go new file mode 100644 index 000000000000..73c40674436e --- /dev/null +++ b/pkg/canary/writer/push.go @@ -0,0 +1,236 @@ +package writer + +import ( + "bufio" + "bytes" + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/grafana/dskit/backoff" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/util/build" +) + +const ( + defaultContentType = "application/x-protobuf" + defaultMaxReponseBufferLen = 1024 + + pushEndpoint = "/loki/api/v1/push" +) + +var ( + defaultUserAgent = fmt.Sprintf("canary-push/%s", build.GetVersion().Version) +) + +// Push is a io.Writer, that writes given log entries by pushing +// directly to the given loki server URL. Each `Push` instance handles for a single tenant. +// No batching of log lines happens when sending to Loki. +type Push struct { + lokiURL string + tenantID string + httpClient *http.Client + userAgent string + contentType string + logger log.Logger + + // auth + username, password string + + // Will add these label to the logs pushed to loki + labelName, labelValue, streamName, streamValue string + + // push retry and backoff + backoff *backoff.Config +} + +// NewPush creates an instance of `Push` which writes logs directly to given `lokiAddr` +func NewPush( + lokiAddr, tenantID string, + timeout time.Duration, + cfg config.HTTPClientConfig, + labelName, labelValue string, + streamName, streamValue string, + tlsCfg *tls.Config, + caFile string, + username, password string, + backoffCfg *backoff.Config, + logger log.Logger, +) (*Push, error) { + + client, err := config.NewClientFromConfig(cfg, "canary-push", config.WithHTTP2Disabled()) + if err != nil { + return nil, err + } + + client.Timeout = timeout + scheme := "http" + + // setup tls transport + if tlsCfg != nil { + rt, err := config.NewTLSRoundTripper(tlsCfg, caFile, func(tls *tls.Config) (http.RoundTripper, error) { + return &http.Transport{TLSClientConfig: tls}, nil + }) + if err != nil { + return nil, fmt.Errorf("failed to create TLS config for transport: %w", err) + } + client.Transport = rt + scheme = "https" + } + + u := url.URL{ + Scheme: scheme, + Host: lokiAddr, + Path: pushEndpoint, + } + + return &Push{ + lokiURL: u.String(), + tenantID: tenantID, + httpClient: client, + userAgent: defaultUserAgent, + contentType: defaultContentType, + logger: logger, + labelName: labelName, + labelValue: labelValue, + streamName: streamName, + streamValue: streamValue, + username: username, + password: password, + backoff: backoffCfg, + }, nil +} + +// Write implements the io.Writer. +func (p *Push) Write(payload []byte) (int, error) { + ctx, cancel := context.WithTimeout(context.Background(), p.httpClient.Timeout) + defer cancel() + if err := p.send(ctx, payload); err != nil { + return 0, err + } + return len(payload), nil +} + +func (p *Push) parsePayload(payload []byte) (*logproto.PushRequest, error) { + // payload that is sent by the `writer` will be in format `LogEntry` + var ( + tsStr, logLine string + ) + if _, err := fmt.Sscanf(string(payload), LogEntry, &tsStr, &logLine); err != nil { + return nil, fmt.Errorf("failed to parse payload written sent by writer: %w", err) + } + + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse unix nano timestamp: %w", err) + } + + labels := model.LabelSet{ + model.LabelName(p.labelName): model.LabelValue(p.labelValue), + model.LabelName(p.streamName): model.LabelValue(p.streamValue), + } + + return &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: labels.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, ts), + Line: string(payload), + }, + }, + Hash: uint64(labels.Fingerprint()), + }, + }, + }, nil +} + +// send does the heavy lifting of sending the generated logs into the Loki server. +// It won't batch. +func (p *Push) send(ctx context.Context, payload []byte) error { + var ( + resp *http.Response + err error + ) + + preq, err := p.parsePayload(payload) + if err != nil { + return err + } + + payload, err = proto.Marshal(preq) + if err != nil { + return fmt.Errorf("failed to marshal payload to json: %w", err) + } + + payload = snappy.Encode(nil, payload) + + req, err := http.NewRequest("POST", p.lokiURL, bytes.NewReader(payload)) + if err != nil { + return fmt.Errorf("failed to create push request: %w", err) + } + req = req.WithContext(ctx) + req.Header.Set("Content-Type", p.contentType) + req.Header.Set("User-Agent", p.userAgent) + + // set org-id + if p.tenantID != "" { + req.Header.Set("X-Scope-OrgID", p.tenantID) + } + + // basic auth if provided + if p.username != "" { + req.SetBasicAuth(p.username, p.password) + } + + backoff := backoff.New(ctx, *p.backoff) + + // send log with retry + for { + resp, err = p.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to push payload: %w", err) + } + status := resp.StatusCode + + if status/100 != 2 { + scanner := bufio.NewScanner(io.LimitReader(resp.Body, defaultMaxReponseBufferLen)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, status, line) + + } + + if err := resp.Body.Close(); err != nil { + level.Error(p.logger).Log("msg", "failed to close response body", "error", err) + } + + if status > 0 && status != 429 && status/100 != 5 { + break + } + + if !backoff.Ongoing() { + break + } + + level.Info(p.logger).Log("msg", "retrying as server returned non successful error", "status", status, "error", err) + + } + + return err +} diff --git a/pkg/canary/writer/push_test.go b/pkg/canary/writer/push_test.go new file mode 100644 index 000000000000..ad98484982ea --- /dev/null +++ b/pkg/canary/writer/push_test.go @@ -0,0 +1,179 @@ +package writer + +import ( + "encoding/base64" + "fmt" + "math" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/util" +) + +const ( + testTenant = "test1" + testUsername = "canary" + testPassword = "secret" +) + +func Test_Push(t *testing.T) { + // create dummy loki server + responses := make(chan response, 1) // buffered not to block the response handler + backoff := backoff.Config{ + MinBackoff: 300 * time.Millisecond, + MaxBackoff: 5 * time.Minute, + MaxRetries: 10, + } + + // mock loki server + mock := httptest.NewServer(createServerHandler(responses)) + require.NotNil(t, mock) + defer mock.Close() + + // without TLS + push, err := NewPush(mock.Listener.Addr().String(), "test1", 2*time.Second, config.DefaultHTTPClientConfig, "name", "loki-canary", "stream", "stdout", nil, "", "", "", &backoff, log.NewNopLogger()) + require.NoError(t, err) + ts, payload := testPayload() + n, err := push.Write([]byte(payload)) + require.NoError(t, err) + assert.Equal(t, len(payload), n) + resp := <-responses + assertResponse(t, resp, false, labelSet("name", "loki-canary", "stream", "stdout"), ts, payload) + + // with basic Auth + push, err = NewPush(mock.Listener.Addr().String(), "test1", 2*time.Second, config.DefaultHTTPClientConfig, "name", "loki-canary", "stream", "stdout", nil, "", testUsername, testPassword, &backoff, log.NewNopLogger()) + require.NoError(t, err) + ts, payload = testPayload() + n, err = push.Write([]byte(payload)) + require.NoError(t, err) + assert.Equal(t, len(payload), n) + resp = <-responses + assertResponse(t, resp, true, labelSet("name", "loki-canary", "stream", "stdout"), ts, payload) + + // with custom labels + push, err = NewPush(mock.Listener.Addr().String(), "test1", 2*time.Second, config.DefaultHTTPClientConfig, "name", "loki-canary", "pod", "abc", nil, "", testUsername, testPassword, &backoff, log.NewNopLogger()) + require.NoError(t, err) + ts, payload = testPayload() + n, err = push.Write([]byte(payload)) + require.NoError(t, err) + assert.Equal(t, len(payload), n) + resp = <-responses + assertResponse(t, resp, true, labelSet("name", "loki-canary", "pod", "abc"), ts, payload) +} + +// Test helpers + +func assertResponse(t *testing.T, resp response, testAuth bool, labels model.LabelSet, ts time.Time, payload string) { + t.Helper() + + // assert metadata + assert.Equal(t, testTenant, resp.tenantID) + + var expUser, expPass string + + if testAuth { + expUser = testUsername + expPass = testPassword + } + + assert.Equal(t, expUser, resp.username) + assert.Equal(t, expPass, resp.password) + assert.Equal(t, defaultContentType, resp.contentType) + assert.Equal(t, defaultUserAgent, resp.userAgent) + + // assert stream labels + require.Len(t, resp.pushReq.Streams, 1) + assert.Equal(t, labels.String(), resp.pushReq.Streams[0].Labels) + assert.Equal(t, uint64(labels.Fingerprint()), resp.pushReq.Streams[0].Hash) + + // assert log entry + require.Len(t, resp.pushReq.Streams, 1) + require.Len(t, resp.pushReq.Streams[0].Entries, 1) + assert.Equal(t, payload, resp.pushReq.Streams[0].Entries[0].Line) + assert.Equal(t, ts, resp.pushReq.Streams[0].Entries[0].Timestamp) +} + +type response struct { + tenantID string + pushReq logproto.PushRequest + contentType string + userAgent string + username, password string +} + +func createServerHandler(responses chan response) http.HandlerFunc { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + // Parse the request + var pushReq logproto.PushRequest + if err := util.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, util.RawSnappy); err != nil { + rw.WriteHeader(500) + return + } + + var ( + username, password string + ) + + basicAuth := req.Header.Get("Authorization") + if basicAuth != "" { + encoded := strings.TrimPrefix(basicAuth, "Basic ") // now we have just encoded `username:password` + decoded, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + rw.WriteHeader(500) + return + } + fmt.Println("decoded", decoded) + toks := strings.FieldsFunc(string(decoded), func(r rune) bool { + return r == ':' + }) + username, password = toks[0], toks[1] + } + + responses <- response{ + tenantID: req.Header.Get("X-Scope-OrgID"), + contentType: req.Header.Get("Content-Type"), + userAgent: req.Header.Get("User-Agent"), + username: username, + password: password, + pushReq: pushReq, + } + + rw.WriteHeader(http.StatusOK) + }) +} + +func labelSet(keyVals ...string) model.LabelSet { + if len(keyVals)%2 != 0 { + panic("not matching key-value pairs") + } + + lbs := model.LabelSet{} + + i := 0 + j := i + 1 + for i < len(keyVals)-1 { + lbs[model.LabelName(keyVals[i])] = model.LabelValue(keyVals[i+1]) + i += 2 + j += 2 + } + + return lbs +} + +func testPayload() (time.Time, string) { + ts := time.Now().UTC() + payload := fmt.Sprintf(LogEntry, fmt.Sprint(ts.UnixNano()), "pppppp") + + return ts, payload +} diff --git a/pkg/canary/writer/writer.go b/pkg/canary/writer/writer.go index 2919f7d7cc2d..cddceaf5971d 100644 --- a/pkg/canary/writer/writer.go +++ b/pkg/canary/writer/writer.go @@ -7,6 +7,9 @@ import ( "strconv" "strings" "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" ) const ( @@ -25,9 +28,17 @@ type Writer struct { pad string quit chan struct{} done chan struct{} + + logger log.Logger } -func NewWriter(writer io.Writer, sentChan chan time.Time, entryInterval, outOfOrderMin, outOfOrderMax time.Duration, outOfOrderPercentage, entrySize int) *Writer { +func NewWriter( + writer io.Writer, + sentChan chan time.Time, + entryInterval, outOfOrderMin, outOfOrderMax time.Duration, + outOfOrderPercentage, entrySize int, + logger log.Logger, +) *Writer { w := &Writer{ w: writer, @@ -40,6 +51,7 @@ func NewWriter(writer io.Writer, sentChan chan time.Time, entryInterval, outOfOr prevTsLen: 0, quit: make(chan struct{}), done: make(chan struct{}), + logger: logger, } go w.run() @@ -83,7 +95,10 @@ func (w *Writer) run() { w.prevTsLen = tsLen } - fmt.Fprintf(w.w, LogEntry, ts, w.pad) + _, err := fmt.Fprintf(w.w, LogEntry, ts, w.pad) + if err != nil { + level.Error(w.logger).Log("msg", "failed to write log entry", "error", err) + } w.sent <- t case <-w.quit: return