Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gcplog targetmanager #3083

Merged
merged 22 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cmd/promtail/promtail-local-pubsub-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

positions:
filename: /tmp/positions.yaml

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: pubsub-test
gcplog:
project_id: "grafanalabs-dev"
subscription: "dev-logs-pull"
use_incoming_timestamp: false # default rewrite timestamp.
labels:
job: pubsub-gcp
64 changes: 64 additions & 0 deletions docs/sources/clients/promtail/gcplog-cloud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
---
title: Cloud setup for gcplog TargetManager
---
This document explain how one can setup Google Cloud Platform to forward its cloud resource logs from a particular GCP project into Google Pubsub topic so that is available for Loki promtail to consume.

This document assumes, that reader have `gcloud` installed and have required permissions(as mentioned in #[Roles and Permission] section)

# Roles and Permission
User should have following roles to complete the setup.
- "roles/pubsub.editor"
- "roles/logging.configWriter"

# Setup Pubsub Topic
Google Pubsub Topic will act as the queue to persist log messages which then can be read from `promtail`.

```bash
$ gcloud pubsub topics create $TOPIC_ID
```

e.g:
```bash
$ gcloud pubsub topics create cloud-logs
```

# Setup Log Router
We create a log sink to forward cloud logs into pubsub topic created before

```bash
$ gcloud beta logging sinks create $SINK_NAME $SINK_LOCATION $OPTIONAL_FLAGS
```

e.g:
```bash
$ gcloud beta logging sinks create cloud-logs pubsub.googleapis.com/projects/my-project/topics/cloud-logs \
--log-filter='resource.type=("gcs_bucket")' \
--description="Cloud logs"
```

Above command also adds `log-filter` option which represents what type of logs should get into the destination `pubsub` topic.
For more information on adding `log-filter` refer this [document](https://cloud.google.com/logging/docs/export/configure_export_v2#creating_sink)

# Create Pubsub subscription for Loki
We create subscription for the pubsub topic we create above and `promtail` uses this subscription to consume log messages.

```bash
$ gcloud pubsub subscriptions create cloud-logs --topic=$TOPIC_ID \
--ack-deadline=$ACK_DEADLINE \
--message-retention-duration=$RETENTION_DURATION \
```

e.g:
```bash
$ gcloud pubsub subscriptions create cloud-logs --topic=pubsub.googleapis.com/projects/my-project/topics/cloud-logs \
--ack-deadline=10s \
--message-retention-duration=7d \
```

For more fine grained options, refer to the `gcloud pubsub subscriptions --help`

# ServiceAccount for Promtail
We need a service account with following permissions.
- pubsub.subscriber

This enables promtail to read log entries from the pubsub subscription created before.
22 changes: 22 additions & 0 deletions docs/sources/clients/promtail/scraping.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,28 @@ Keep in mind that labels prefixed with `__` will be dropped, so relabeling is re
target_label: syslog_identifier
```

## Gcplog scraping
Promtail supports scraping cloud resource logs(say GCS bucket logs, Load Balancer logs, Kubernetes Cluster logs) from GCP.
Configs are set in `gcplog` section in `scrape_config`

```yaml
- job_name: gcplog
gcplog:
project_id: "my-gcp-project"
subscription: "my-pubsub-subscription"
use_incoming_timestamp: false # default rewrite timestamps.
labels:
job: "gcplog"
```
Here `project_id` and `subscription` are the only required fields.

- `project_id` is the GCP project id.
- `subscription` is the GCP pubsub subscription where promtail can consume log entries from.

Before using `gcplog` target, GCP should be [configured](../gcplog-cloud) with pubsub subscription to receive logs from.

It also support `relabeling` and `pipeline` stages just like other targets.

## Syslog Receiver

Promtail supports receiving [IETF Syslog (RFC5424)](https://tools.ietf.org/html/rfc5424)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/grafana/loki
go 1.15

require (
cloud.google.com/go/pubsub v1.3.1
github.com/NYTimes/gziphandler v1.1.1
github.com/aws/aws-lambda-go v1.17.0
github.com/blang/semver v3.5.1+incompatible // indirect
Expand Down Expand Up @@ -61,6 +62,7 @@ require (
go.uber.org/atomic v1.7.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
google.golang.org/api v0.35.0
google.golang.org/grpc v1.33.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0 h1:Lpy6hKgdcl7a3WGSfJIFmxmcdjSpP6OmBEfcOv1Y680=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1 h1:ukjixP1wl0LpnZ6LWtZJ0mX5tBmjp1f8Sqer8Z2OMUU=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.3.0/go.mod h1:9IAwXhoyBJ7z9LcAwkj0/7NnPzYaPeZxxVp3zm+5IqA=
Expand Down
68 changes: 6 additions & 62 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,23 @@ import (
"math"
"net/http"

"github.com/dustin/go-humanize"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/util"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/unmarshal"
unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy"
lokiutil "github.com/grafana/loki/pkg/util"
)

var (
contentType = http.CanonicalHeaderKey("Content-Type")

bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant",
}, []string{"tenant"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant"})
)
var contentType = http.CanonicalHeaderKey("Content-Type")

const applicationJSON = "application/json"

// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {

req, err := ParseRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -60,62 +42,24 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
}

func ParseRequest(r *http.Request) (*logproto.PushRequest, error) {
userID, _ := user.ExtractOrgID(r.Context())
logger := util.WithContext(r.Context(), util.Logger)
body := lokiutil.NewSizeReader(r.Body)
contentType := r.Header.Get(contentType)
var req logproto.PushRequest

defer func() {
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
)

for _, s := range req.Streams {
streamLabelsSize += int64(len(s.Labels))
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
}
}

// incrementing tenant metrics if we have a tenant.
if totalEntries != 0 && userID != "" {
bytesIngested.WithLabelValues(userID).Add(float64(entriesSize))
linesIngested.WithLabelValues(userID).Add(float64(totalEntries))
}

level.Debug(logger).Log(
"msg", "push request parsed",
"path", r.URL.Path,
"contentType", contentType,
"bodySize", humanize.Bytes(uint64(body.Size())),
"streams", len(req.Streams),
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
)
}()

switch contentType {
switch r.Header.Get(contentType) {
case applicationJSON:
var err error

if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = unmarshal.DecodePushRequest(body, &req)
err = unmarshal.DecodePushRequest(r.Body, &req)
} else {
err = unmarshal_legacy.DecodePushRequest(body, &req)
err = unmarshal_legacy.DecodePushRequest(r.Body, &req)
}

if err != nil {
return nil, err
}

default:
if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
if err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
return nil, err
}
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
JobName string `yaml:"job_name,omitempty"`
PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"`
JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"`
GcplogConfig *GcplogTargetConfig `yaml:"gcplog,omitempty"`
SyslogConfig *SyslogTargetConfig `yaml:"syslog,omitempty"`
PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
Expand Down Expand Up @@ -164,6 +165,23 @@ type SyslogTargetConfig struct {
UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"`
}

// GcplogTargetConfig describes a scrape config to pull logs from any pubsub topic.
type GcplogTargetConfig struct {
// ProjectID is the Cloud project id
ProjectID string `yaml:"project_id"`

// Subscription is the scription name we use to pull logs from a pubsub topic.
Subscription string `yaml:"subscription"`

// Labels are the additional labels to be added to log entry while pushing it to Loki server.
Labels model.LabelSet `yaml:"labels"`

// UseIncomingTimestamp represents whether to keep the timestamp same as actual log entry coming in or replace it with
// current timestamp at the time of processing.
// Its default value(`false`) denotes, replace it with current timestamp at the time of processing.
UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"`
}

// PushTargetConfig describes a scrape config that listens for Loki push messages.
type PushTargetConfig struct {
// Server is the weaveworks server config for listening connections
Expand Down
11 changes: 10 additions & 1 deletion pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,16 @@ type FileTarget struct {
}

// NewFileTarget create a new FileTarget.
func NewFileTarget(metrics *Metrics, logger log.Logger, handler api.EntryHandler, positions positions.Positions, path string, labels model.LabelSet, discoveredLabels model.LabelSet, targetConfig *Config) (*FileTarget, error) {
func NewFileTarget(
metrics *Metrics,
logger log.Logger,
handler api.EntryHandler,
positions positions.Positions,
path string,
labels model.LabelSet,
discoveredLabels model.LabelSet,
targetConfig *Config,
) (*FileTarget, error) {

watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand Down
90 changes: 90 additions & 0 deletions pkg/promtail/targets/gcplog/formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package gcplog

import (
"fmt"
"strings"
"time"

"cloud.google.com/go/pubsub"
json "github.com/json-iterator/go"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
)

// LogEntry that will be written to the pubsub topic.
// According to the following spec.
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
// nolint: golint
type GCPLogEntry struct {
LogName string `json:"logName"`
Resource struct {
Type string `json:"type"`
Labels map[string]string `json:"labels"`
} `json:"resource"`
Timestamp string `json:"timestamp"`

// The time the log entry was received by Logging.
// Its important that `Timestamp` is optional in GCE log entry.
ReceiveTimestamp string `json:"receiveTimestamp"`

TextPayload string `json:"textPayload"`

// NOTE(kavi): There are other fields on GCPLogEntry. but we need only need above fields for now
// anyway we will be sending the entire entry to Loki.
}

func format(m *pubsub.Message, other model.LabelSet, useIncomingTimestamp bool) (api.Entry, error) {
var ge GCPLogEntry

if err := json.Unmarshal(m.Data, &ge); err != nil {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
return api.Entry{}, err
}

labels := model.LabelSet{
"logName": model.LabelValue(ge.LogName),
"resourceType": model.LabelValue(ge.Resource.Type),
}
for k, v := range ge.Resource.Labels {
if !model.LabelName(k).IsValid() || !model.LabelValue(k).IsValid() {
continue
}
labels[model.LabelName(k)] = model.LabelValue(v)
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
}

// add labels from config as well.
labels = labels.Merge(other)

ts := time.Now()
line := string(m.Data)

if useIncomingTimestamp {
tt := ge.Timestamp
if tt == "" {
tt = ge.ReceiveTimestamp
}
var err error
ts, err = time.Parse(time.RFC3339, tt)
if err != nil {
return api.Entry{}, fmt.Errorf("invalid timestamp format: %w", err)
}

if ts.IsZero() {
return api.Entry{}, fmt.Errorf("no timestamp found in the log entry")
}
}

// Send only `ge.textPaylload` as log line if its present.
if strings.TrimSpace(ge.TextPayload) != "" {
line = ge.TextPayload
}

return api.Entry{
Labels: labels,
Entry: logproto.Entry{
Timestamp: ts,
Line: line,
},
}, nil
}
Loading