Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(loki-canary): Add support to push logs directly to Loki. #7063

Merged
merged 16 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Loki

##### Enhancements
* [7063](https://github.com/grafana/loki/pull/7063) **kavirajk**: Add additional `push` mode to Loki canary that can directly push logs to given Loki URL.
* [7069](https://github.com/grafana/loki/pull/7069) **periklis**: Add support for custom internal server listener for readiness probes.
* [7023](https://github.com/grafana/loki/pull/7023) **liguozhong**: logql engine support exec `vector(0)` grammar.
* [6983](https://github.com/grafana/loki/pull/6983) **slim-bean**: `__timestamp__` and `__line__` are now available in the logql `label_format` query stage.
Expand Down
42 changes: 38 additions & 4 deletions cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"io"
"os"
"strconv"
"sync"
Expand All @@ -13,6 +14,7 @@ import (
"net/http"
"os/signal"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/config"
"github.com/prometheus/common/version"
Expand All @@ -39,13 +41,15 @@ func main() {
sValue := flag.String("streamvalue", "stdout", "The unique stream value for this instance of loki-canary to use in the log selector")
port := flag.Int("port", 3500, "Port which loki-canary should expose metrics")
addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100")
push := flag.Bool("push", false, "Push the logs directly to given Loki address")
useTLS := flag.Bool("tls", false, "Does the loki connection use TLS?")
certFile := flag.String("cert-file", "", "Client PEM encoded X.509 certificate for optional use with TLS connection to Loki")
keyFile := flag.String("key-file", "", "Client PEM encoded X.509 key for optional use with TLS connection to Loki")
caFile := flag.String("ca-file", "", "Client certificate authority for optional use with TLS connection to Loki")
user := flag.String("user", "", "Loki username.")
pass := flag.String("pass", "", "Loki password.")
pass := flag.String("pass", "", "Loki password. This credential should have both read and write permissions to Loki endpoints")
tenantID := flag.String("tenant-id", "", "Tenant ID to be set in X-Scope-OrgID header.")
writeTimeout := flag.Duration("write-timeout", 10*time.Second, "How long to wait write response from Loki")
queryTimeout := flag.Duration("query-timeout", 10*time.Second, "How long to wait for a query response from Loki")

interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries")
Expand Down Expand Up @@ -112,15 +116,45 @@ func main() {
sentChan := make(chan time.Time)
receivedChan := make(chan time.Time)

logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "caller", log.Caller(3))

c := &canary{}
startCanary := func() {
c.stop()

c.lock.Lock()
defer c.lock.Unlock()

var err error
c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *outOfOrderMin, *outOfOrderMax, *outOfOrderPercentage, *size)
var (
err error
w io.Writer
)

w = os.Stdout

if *push {
push, err := writer.NewPush(
*addr,
*tenantID,
*writeTimeout,
config.DefaultHTTPClientConfig,
*lName, *lVal,
*sName, *sValue,
tlsConfig,
*caFile,
*user, *pass,
log.NewLogfmtLogger(os.Stdout),
)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Unable to create writer for Loki, check config: %s", err)
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
os.Exit(1)
}

w = push
}

c.writer = writer.NewWriter(w, sentChan, *interval, *outOfOrderMin, *outOfOrderMax, *outOfOrderPercentage, *size, logger)
c.reader, err = reader.NewReader(os.Stderr, receivedChan, *useTLS, tlsConfig, *caFile, *addr, *user, *pass, *tenantID, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Unable to create reader for Loki querier, check config: %s", err)
Expand Down Expand Up @@ -148,7 +182,7 @@ func main() {
}()

terminate := make(chan os.Signal, 1)
signal.Notify(terminate, syscall.SIGTERM, os.Interrupt)
signal.Notify(terminate, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)
kavirajk marked this conversation as resolved.
Show resolved Hide resolved

for range terminate {
_, _ = fmt.Fprintf(os.Stderr, "shutting down\n")
Expand Down
4 changes: 3 additions & 1 deletion docs/sources/operations/loki-canary.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,15 @@ All options:
-out-of-order-percentage int
Percentage (0-100) of log entries that should be sent out of order
-pass string
Loki password
Loki password. This credential should have both read and write permissions to Loki endpoints
-port int
Port which Loki Canary should expose metrics (default 3500)
-pruneinterval duration
Frequency to check sent versus received logs, and also the frequency at which queries
for missing logs will be dispatched to Loki, and the frequency spot check queries are run
(default 1m0s)
-push
Push the logs directly to given Loki address
-query-timeout duration
How long to wait for a query response from Loki (default 10s)
-size int
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/upgrading/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ The output is incredibly verbose as it shows the entire internal config struct u

### Loki

### Loki Canary Permission
We introduced new `push` mode to [Loki canary](https://grafana.com/docs/loki/latest/operations/loki-canary/), that can push logs generated by Loki canary directly to given Loki URL (previously it just writes to local file and you need some agent like promtail to scrape and push it to Loki).
So if you run Loki behind some proxy with different authorization policies to read and write to Loki, then auth credentials we pass to Loki canary now needs to have both `READ` and `WRITE` permissions.

### Engine query timeout is deprecated

Previously, we had two configurations to define a query timeout: `engine.timeout` and `querier.query-timeout`.
Expand Down
208 changes: 208 additions & 0 deletions pkg/canary/writer/push.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package writer

import (
"bufio"
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/build"
)

const (
defaultContentType = "application/x-protobuf"
defaultMaxReponseBufferLen = 1024

pushEndpoint = "/loki/api/v1/push"
)

var (
defaultUserAgent = fmt.Sprintf("canary-push/%s", build.GetVersion().Version)
)

// Push is a io.Writer, that writes given long entries by pushing
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
// directly to the given loki server URL. Each `Push` instance handles for a single tenant.
// No batching of log lines happens when sending to Loki.
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
type Push struct {
lokiURL string
tenantID string
httpClient *http.Client
userAgent string
contentType string
logger log.Logger

// auth
username, password string

// Will add these label to the logs pushed to loki
labelName, labelValue, streamName, streamValue string
}

// NewPush creates an instance of `Push` which writes logs directly to given `lokiAddr`
func NewPush(
lokiAddr, tenantID string,
timeout time.Duration,
cfg config.HTTPClientConfig,
labelName, labelValue string,
streamName, streamValue string,
tlsCfg *tls.Config,
caFile string,
username, password string,
logger log.Logger,
) (*Push, error) {

client, err := config.NewClientFromConfig(cfg, "canary-push", config.WithHTTP2Disabled())
if err != nil {
return nil, err
}

client.Timeout = timeout
scheme := "http"

// setup tls transport
if tlsCfg != nil {
rt, err := config.NewTLSRoundTripper(tlsCfg, caFile, func(tls *tls.Config) (http.RoundTripper, error) {
return &http.Transport{TLSClientConfig: tls}, nil
})
if err != nil {
return nil, fmt.Errorf("failed to create TLS config for transport: %w", err)
}
client.Transport = rt
scheme = "https"
}

u := url.URL{
Scheme: scheme,
Host: lokiAddr,
Path: pushEndpoint,
}

return &Push{
lokiURL: u.String(),
tenantID: tenantID,
httpClient: client,
userAgent: defaultUserAgent,
contentType: defaultContentType,
logger: logger,
labelName: labelName,
labelValue: labelValue,
streamName: streamName,
streamValue: streamValue,
username: username,
password: password,
}, nil
}

// Write implements the io.Writer.
func (p *Push) Write(payload []byte) (int, error) {
ctx, cancel := context.WithTimeout(context.Background(), p.httpClient.Timeout)
defer cancel()
if err := p.send(ctx, payload); err != nil {
return 0, err
}
return len(payload), nil
}

func (p *Push) parsePayload(payload []byte) (*logproto.PushRequest, error) {
// payload that is sent by the `writer` will be in format `LogEntry`
var (
tsStr, logLine string
)
if _, err := fmt.Sscanf(string(payload), LogEntry, &tsStr, &logLine); err != nil {
return nil, fmt.Errorf("failed to parse payload written sent by writer: %w", err)
}

ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse unix nano timestamp: %w", err)
}

labels := model.LabelSet{
model.LabelName(p.labelName): model.LabelValue(p.labelValue),
model.LabelName(p.streamName): model.LabelValue(p.streamValue),
}

return &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: labels.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, ts),
Line: string(payload),
},
},
Hash: uint64(labels.Fingerprint()),
},
},
}, nil
}

// send does the heavy lifting of sending the generated logs into the Loki server.
// It won't batch.
func (p *Push) send(ctx context.Context, payload []byte) error {
preq, err := p.parsePayload(payload)
if err != nil {
return err
}

payload, err = proto.Marshal(preq)
if err != nil {
return fmt.Errorf("failed to marshal payload to json: %w", err)
}

payload = snappy.Encode(nil, payload)

req, err := http.NewRequest("POST", p.lokiURL, bytes.NewReader(payload))
if err != nil {
return fmt.Errorf("failed to create push request: %w", err)
}
req = req.WithContext(ctx)
req.Header.Set("Content-Type", p.contentType)
req.Header.Set("User-Agent", p.userAgent)

// set org-id
if p.tenantID != "" {
req.Header.Set("X-Scope-OrgID", p.tenantID)
}

// basic auth if provided
if p.username != "" {
req.SetBasicAuth(p.username, p.password)
}

resp, err := p.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to push payload: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
level.Error(p.logger).Log("msg", "failed to close response body", "error", err)
}
}()

if resp.StatusCode/100 != 2 {
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
scanner := bufio.NewScanner(io.LimitReader(resp.Body, defaultMaxReponseBufferLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
return fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
}

return nil
}
Loading