Skip to content

Commit

Permalink
feat(loki-canary): Add support to push logs directly to Loki. (#7063)
Browse files Browse the repository at this point in the history
Add additional `push` mode to loki-canary, which pushes the logs
directly to given Loki URL as it generates logs.

The real function of Loki Canary is to act like a tenant and help us
know the whether Loki installation is working as perceived by a real
tenant.

Main rationale for this additional push mode is to make canary more
standalone without needing for `promtail` (or `grafana-agent`) to scrape
it's logs and send to loki, with this change, Loki canary happily tests
Loki behavior without needing any other dependencies.

**NOTES**:
1. If you run Loki behind any proxy that has different authorization
policies to READ or WRITE to Loki, then important change that canary
operator need to be aware of it, now the user credentials they pass via
`-user` and `-pass` to access loki endpoints need to have both `read'
and `write` permissions (previously canary just query the logs where as
promtail is the one pushes the logs, so just READ permissions was
sufficient).

2. There will be follow up PR(s) to cleanup, particularly `reader` and
`comparitor` component in terms of logging with proper logger. Rationale
is to keep the changes small per PR to make it easy to review.

3. This changes were tested it in one of the internal Loki dev cell.

4. **This PR is a no-op if this new `push` flag is disabled (it's disabled
by default)**

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
  • Loading branch information
kavirajk and Danny Kopping authored Sep 9, 2022
1 parent 3d4788f commit 2c9fa05
Show file tree
Hide file tree
Showing 7 changed files with 499 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Loki

##### Enhancements
* [7063](https://github.com/grafana/loki/pull/7063) **kavirajk**: Add additional `push` mode to Loki canary that can directly push logs to given Loki URL.
* [7069](https://github.com/grafana/loki/pull/7069) **periklis**: Add support for custom internal server listener for readiness probes.
* [7023](https://github.com/grafana/loki/pull/7023) **liguozhong**: logql engine support exec `vector(0)` grammar.
* [6983](https://github.com/grafana/loki/pull/6983) **slim-bean**: `__timestamp__` and `__line__` are now available in the logql `label_format` query stage.
Expand Down
56 changes: 53 additions & 3 deletions cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"io"
"os"
"strconv"
"sync"
Expand All @@ -13,6 +14,8 @@ import (
"net/http"
"os/signal"

"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/config"
"github.com/prometheus/common/version"
Expand All @@ -23,6 +26,12 @@ import (
_ "github.com/grafana/loki/pkg/util/build"
)

const (
defaultMinBackoff = 500 * time.Millisecond
defaultMaxBackoff = 5 * time.Minute
defaultMaxRetries = 10
)

type canary struct {
lock sync.Mutex

Expand All @@ -39,13 +48,18 @@ func main() {
sValue := flag.String("streamvalue", "stdout", "The unique stream value for this instance of loki-canary to use in the log selector")
port := flag.Int("port", 3500, "Port which loki-canary should expose metrics")
addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100")
push := flag.Bool("push", false, "Push the logs directly to given Loki address")
useTLS := flag.Bool("tls", false, "Does the loki connection use TLS?")
certFile := flag.String("cert-file", "", "Client PEM encoded X.509 certificate for optional use with TLS connection to Loki")
keyFile := flag.String("key-file", "", "Client PEM encoded X.509 key for optional use with TLS connection to Loki")
caFile := flag.String("ca-file", "", "Client certificate authority for optional use with TLS connection to Loki")
user := flag.String("user", "", "Loki username.")
pass := flag.String("pass", "", "Loki password.")
pass := flag.String("pass", "", "Loki password. This credential should have both read and write permissions to Loki endpoints")
tenantID := flag.String("tenant-id", "", "Tenant ID to be set in X-Scope-OrgID header.")
writeTimeout := flag.Duration("write-timeout", 10*time.Second, "How long to wait write response from Loki")
writeMinBackoff := flag.Duration("write-min-backoff", defaultMinBackoff, "Initial backoff time before first retry ")
writeMaxBackoff := flag.Duration("write-max-backoff", defaultMaxBackoff, "Maximum backoff time between retries ")
writeMaxRetries := flag.Int("write-max-retries", defaultMaxRetries, "Maximum number of retries when push a log entry ")
queryTimeout := flag.Duration("query-timeout", 10*time.Second, "How long to wait for a query response from Loki")

interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries")
Expand Down Expand Up @@ -112,15 +126,51 @@ func main() {
sentChan := make(chan time.Time)
receivedChan := make(chan time.Time)

logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "caller", log.Caller(3))

c := &canary{}
startCanary := func() {
c.stop()

c.lock.Lock()
defer c.lock.Unlock()

var err error
c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *outOfOrderMin, *outOfOrderMax, *outOfOrderPercentage, *size)
var (
err error
w io.Writer
)

w = os.Stdout

if *push {
backoffCfg := backoff.Config{
MinBackoff: *writeMinBackoff,
MaxBackoff: *writeMaxBackoff,
MaxRetries: *writeMaxRetries,
}
push, err := writer.NewPush(
*addr,
*tenantID,
*writeTimeout,
config.DefaultHTTPClientConfig,
*lName, *lVal,
*sName, *sValue,
tlsConfig,
*caFile,
*user, *pass,
&backoffCfg,
log.NewLogfmtLogger(os.Stdout),
)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Unable to create writer for Loki, check config: %s", err)
os.Exit(1)
}

w = push
}

c.writer = writer.NewWriter(w, sentChan, *interval, *outOfOrderMin, *outOfOrderMax, *outOfOrderPercentage, *size, logger)
c.reader, err = reader.NewReader(os.Stderr, receivedChan, *useTLS, tlsConfig, *caFile, *addr, *user, *pass, *tenantID, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Unable to create reader for Loki querier, check config: %s", err)
Expand Down
10 changes: 9 additions & 1 deletion docs/sources/operations/loki-canary.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,15 @@ All options:
-out-of-order-percentage int
Percentage (0-100) of log entries that should be sent out of order
-pass string
Loki password
Loki password. This credential should have both read and write permissions to Loki endpoints
-port int
Port which Loki Canary should expose metrics (default 3500)
-pruneinterval duration
Frequency to check sent versus received logs, and also the frequency at which queries
for missing logs will be dispatched to Loki, and the frequency spot check queries are run
(default 1m0s)
-push
Push the logs directly to given Loki address
-query-timeout duration
How long to wait for a query response from Loki (default 10s)
-size int
Expand Down Expand Up @@ -365,4 +367,10 @@ All options:
Print this build's version information
-wait duration
Duration to wait for log entries before reporting them as lost (default 1m0s)
-write-max-backoff duration
Maximum backoff time between retries (default 5m0s)
-write-max-retries int
Maximum number of retries when push a log entry (default 10)
-write-min-backoff duration
Initial backoff time before first retry (default 500ms)
```
4 changes: 4 additions & 0 deletions docs/sources/upgrading/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ The output is incredibly verbose as it shows the entire internal config struct u

### Loki

### Loki Canary Permission
We introduced new `push` mode to [Loki canary](https://grafana.com/docs/loki/latest/operations/loki-canary/), that can push logs generated by Loki canary directly to given Loki URL (previously it just writes to local file and you need some agent like promtail to scrape and push it to Loki).
So if you run Loki behind some proxy with different authorization policies to read and write to Loki, then auth credentials we pass to Loki canary now needs to have both `READ` and `WRITE` permissions.

### Engine query timeout is deprecated

Previously, we had two configurations to define a query timeout: `engine.timeout` and `querier.query-timeout`.
Expand Down
236 changes: 236 additions & 0 deletions pkg/canary/writer/push.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package writer

import (
"bufio"
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/dskit/backoff"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/build"
)

const (
defaultContentType = "application/x-protobuf"
defaultMaxReponseBufferLen = 1024

pushEndpoint = "/loki/api/v1/push"
)

var (
defaultUserAgent = fmt.Sprintf("canary-push/%s", build.GetVersion().Version)
)

// Push is a io.Writer, that writes given log entries by pushing
// directly to the given loki server URL. Each `Push` instance handles for a single tenant.
// No batching of log lines happens when sending to Loki.
type Push struct {
lokiURL string
tenantID string
httpClient *http.Client
userAgent string
contentType string
logger log.Logger

// auth
username, password string

// Will add these label to the logs pushed to loki
labelName, labelValue, streamName, streamValue string

// push retry and backoff
backoff *backoff.Config
}

// NewPush creates an instance of `Push` which writes logs directly to given `lokiAddr`
func NewPush(
lokiAddr, tenantID string,
timeout time.Duration,
cfg config.HTTPClientConfig,
labelName, labelValue string,
streamName, streamValue string,
tlsCfg *tls.Config,
caFile string,
username, password string,
backoffCfg *backoff.Config,
logger log.Logger,
) (*Push, error) {

client, err := config.NewClientFromConfig(cfg, "canary-push", config.WithHTTP2Disabled())
if err != nil {
return nil, err
}

client.Timeout = timeout
scheme := "http"

// setup tls transport
if tlsCfg != nil {
rt, err := config.NewTLSRoundTripper(tlsCfg, caFile, func(tls *tls.Config) (http.RoundTripper, error) {
return &http.Transport{TLSClientConfig: tls}, nil
})
if err != nil {
return nil, fmt.Errorf("failed to create TLS config for transport: %w", err)
}
client.Transport = rt
scheme = "https"
}

u := url.URL{
Scheme: scheme,
Host: lokiAddr,
Path: pushEndpoint,
}

return &Push{
lokiURL: u.String(),
tenantID: tenantID,
httpClient: client,
userAgent: defaultUserAgent,
contentType: defaultContentType,
logger: logger,
labelName: labelName,
labelValue: labelValue,
streamName: streamName,
streamValue: streamValue,
username: username,
password: password,
backoff: backoffCfg,
}, nil
}

// Write implements the io.Writer.
func (p *Push) Write(payload []byte) (int, error) {
ctx, cancel := context.WithTimeout(context.Background(), p.httpClient.Timeout)
defer cancel()
if err := p.send(ctx, payload); err != nil {
return 0, err
}
return len(payload), nil
}

func (p *Push) parsePayload(payload []byte) (*logproto.PushRequest, error) {
// payload that is sent by the `writer` will be in format `LogEntry`
var (
tsStr, logLine string
)
if _, err := fmt.Sscanf(string(payload), LogEntry, &tsStr, &logLine); err != nil {
return nil, fmt.Errorf("failed to parse payload written sent by writer: %w", err)
}

ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse unix nano timestamp: %w", err)
}

labels := model.LabelSet{
model.LabelName(p.labelName): model.LabelValue(p.labelValue),
model.LabelName(p.streamName): model.LabelValue(p.streamValue),
}

return &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: labels.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, ts),
Line: string(payload),
},
},
Hash: uint64(labels.Fingerprint()),
},
},
}, nil
}

// send does the heavy lifting of sending the generated logs into the Loki server.
// It won't batch.
func (p *Push) send(ctx context.Context, payload []byte) error {
var (
resp *http.Response
err error
)

preq, err := p.parsePayload(payload)
if err != nil {
return err
}

payload, err = proto.Marshal(preq)
if err != nil {
return fmt.Errorf("failed to marshal payload to json: %w", err)
}

payload = snappy.Encode(nil, payload)

req, err := http.NewRequest("POST", p.lokiURL, bytes.NewReader(payload))
if err != nil {
return fmt.Errorf("failed to create push request: %w", err)
}
req = req.WithContext(ctx)
req.Header.Set("Content-Type", p.contentType)
req.Header.Set("User-Agent", p.userAgent)

// set org-id
if p.tenantID != "" {
req.Header.Set("X-Scope-OrgID", p.tenantID)
}

// basic auth if provided
if p.username != "" {
req.SetBasicAuth(p.username, p.password)
}

backoff := backoff.New(ctx, *p.backoff)

// send log with retry
for {
resp, err = p.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to push payload: %w", err)
}
status := resp.StatusCode

if status/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(resp.Body, defaultMaxReponseBufferLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, status, line)

}

if err := resp.Body.Close(); err != nil {
level.Error(p.logger).Log("msg", "failed to close response body", "error", err)
}

if status > 0 && status != 429 && status/100 != 5 {
break
}

if !backoff.Ongoing() {
break
}

level.Info(p.logger).Log("msg", "retrying as server returned non successful error", "status", status, "error", err)

}

return err
}
Loading

0 comments on commit 2c9fa05

Please sign in to comment.