Skip to content

Commit

Permalink
organize package
Browse files Browse the repository at this point in the history
BREAKING: loki label 'source' => 'logpush_source'
  • Loading branch information
maddsua committed Jan 25, 2025
1 parent 3c8879a commit 57b6748
Show file tree
Hide file tree
Showing 9 changed files with 525 additions and 469 deletions.
144 changes: 29 additions & 115 deletions service/forwarder.go → service/forwarder/loki/loki.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
package main
package loki

import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"

"github.com/google/uuid"
"github.com/maddsua/logpush/service/dbops"
"github.com/maddsua/logpush/service/ingester/streams"
)

type LokiConnection struct {
type Loki struct {
url string
UseStructMeta bool
Retries int
}

func ParseLokiUrl(params string) (*LokiConnection, error) {
func ParseLokiUrl(params string) (*Loki, error) {

if params = strings.TrimSpace(params); params == "" {
return nil, nil
Expand All @@ -48,13 +46,13 @@ func ParseLokiUrl(params string) (*LokiConnection, error) {
Host: parsed.Host,
}

return &LokiConnection{
return &Loki{
url: lokiUrl.String(),
UseStructMeta: strings.ToLower(os.Getenv("LOKI_STRUCTURED_METADATA")) != "false",
}, nil
}

func (this *LokiConnection) Ready() error {
func (this *Loki) Ready() error {

req, err := http.NewRequest("GET", this.url+"/ready", nil)
if err != nil {
Expand All @@ -76,44 +74,6 @@ func (this *LokiConnection) Ready() error {
return fmt.Errorf("http status code %d", resp.StatusCode)
}

type Level string

func (lvl Level) String() string {
switch lvl {
case "log", "warn", "error", "debug", "info":
return string(lvl)
default:
return "error"
}
}

type UnixMilli int64

func (um UnixMilli) String(sequence int) string {

var ts time.Time
if um > 0 {
ts = time.UnixMilli(int64(um))
} else {
ts = time.Now()
}

return strconv.FormatInt(ts.UnixNano()+int64(sequence), 10)
}

func (um UnixMilli) Time(sequence int) time.Time {

var ts time.Time
if um > 0 {
ts = time.UnixMilli(int64(um))
} else {
ts = time.Now()
}

// todo: ensure correct result
return ts.Add(time.Duration(sequence))
}

type LokiHttpBatch struct {
Streams []LokiStream `json:"streams"`
}
Expand All @@ -123,7 +83,14 @@ type LokiStream struct {
Values [][]any `json:"values"`
}

func (this *LokiConnection) PushStreams(streams []LokiStream) error {
func (this *Loki) retryNumber() int {
if this.Retries > 0 {
return this.Retries
}
return 3
}

func (this *Loki) PushStreams(streams []LokiStream) error {

payload, err := lokiSerializeStreams(streams)
if err != nil {
Expand Down Expand Up @@ -165,15 +132,15 @@ func (this *LokiConnection) PushStreams(streams []LokiStream) error {
return fmt.Errorf("failed to push log streams: %s", string(responseBody))
}

func (this *LokiConnection) IngestWeb(streamSource *dbops.Stream, txID uuid.UUID, remoteAddr string, payload WebStream) {
func (this *Loki) IngestWeb(streamSource *dbops.Stream, txID uuid.UUID, remoteAddr string, payload *streams.WebStream) {

var streams []LokiStream
if this.UseStructMeta {
if next := payload.ToStructuredLokiStream(streamSource, txID); len(next.Values) > 0 {
if next := webStreamToStructured(payload, streamSource, txID); len(next.Values) > 0 {
streams = []LokiStream{next}
}
} else {
streams = payload.ToLokiStreams(streamSource, txID)
streams = webStreamToLabeled(payload, streamSource, txID)
}

if len(streams) == 0 {
Expand All @@ -183,22 +150,22 @@ func (this *LokiConnection) IngestWeb(streamSource *dbops.Stream, txID uuid.UUID
return
}

for i := 0; i < webStreamPushRetryAttempts; i++ {
for i := 0; i < this.retryNumber(); i++ {

if err := this.PushStreams(streams); err != nil {
slog.Error("LOKI FORWARDER: failed to push entries",
slog.String("err", err.Error()),
slog.Int("attempt", i+1),
slog.Int("of", webStreamPushRetryAttempts),
slog.String("stream_id", streamSource.ID.String()),
slog.String("remote_addr", remoteAddr))
continue
err := this.PushStreams(streams)
if err == nil {
break
}

break
slog.Error("LOKI FORWARDER: failed to push entries",
slog.String("err", err.Error()),
slog.Int("attempt", i+1),
slog.Int("of", this.retryNumber()),
slog.String("stream_id", streamSource.ID.String()),
slog.String("remote_addr", remoteAddr))
}

slog.Debug("LOKI FORWARDER: Wrote entries",
slog.Debug("LOKI FORWARDER: Wrote streams",
slog.Int("count", len(streams)),
slog.String("stream_id", streamSource.ID.String()),
slog.String("remote_addr", remoteAddr))
Expand All @@ -215,56 +182,3 @@ func lokiSerializeStreams(streams []LokiStream) (*bytes.Buffer, error) {

return bytes.NewBuffer(data), nil
}

type Timescale struct {
DB *sql.DB
}

func (this *Timescale) IngestWeb(streamSource *dbops.Stream, txID uuid.UUID, remoteAddr string, payload WebStream) {

rows := payload.ToTimescaleRows(streamSource.ID, txID)
if len(rows) == 0 {
slog.Warn("LOKI FORWARDER: Empty log batch",
slog.String("stream_id", streamSource.ID.String()),
slog.String("remote_addr", remoteAddr))
return
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

tx, err := this.DB.BeginTx(ctx, nil)
if err != nil {
slog.Error("TIMESCALE FORWARDER: Failed to begin DB TX",
slog.String("err", err.Error()),
slog.String("stream_id", streamSource.ID.String()),
slog.String("remote_addr", remoteAddr))
return
}

txq := dbops.New(tx)

for _, row := range rows {
if err := txq.InsertStreamEntry(ctx, row); err != nil {
slog.Error("TIMESCALE FORWARDER: Failed to insert row",
slog.String("err", err.Error()),
slog.String("stream_id", streamSource.ID.String()),
slog.String("remote_addr", remoteAddr))
}
}

defer tx.Rollback()

if err := tx.Commit(); err != nil {
slog.Error("TIMESCALE FORWARDER: Failed to commit DB TX",
slog.String("err", err.Error()),
slog.String("stream_id", streamSource.ID.String()),
slog.String("remote_addr", remoteAddr))
return
}

slog.Debug("TIMESCALE FORWARDER: Wrote entries",
slog.Int("count", len(rows)),
slog.String("stream_id", streamSource.ID.String()),
slog.String("remote_addr", remoteAddr))
}
103 changes: 103 additions & 0 deletions service/forwarder/loki/streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package loki

import (
"maps"
"strings"

"github.com/google/uuid"
"github.com/maddsua/logpush/service/dbops"
"github.com/maddsua/logpush/service/ingester/streams"
"github.com/maddsua/logpush/service/logdata"
)

func webStreamToLabeled(stream *streams.WebStream, streamSource *dbops.Stream, txID uuid.UUID) []LokiStream {

baseLabels := map[string]string{
"logpush_source": "web",
"service_name": streamSource.Name,
"logpush_tx": txID.String(),
}

logdata.MergeStreamLabels(streamSource, baseLabels)
logdata.CopyMetaFields(baseLabels, stream.Meta)

var result []LokiStream

for idx, entry := range stream.Entries {

if entry.Message = strings.TrimSpace(entry.Message); entry.Message == "" {
continue
}

labels := map[string]string{}
maps.Copy(labels, baseLabels)
logdata.CopyMetaFields(labels, entry.Meta)
labels["detected_level"] = entry.Level.String()

result = append(result, LokiStream{
Stream: labels,
Values: [][]any{
{
entry.Date.String(idx),
entry.Message,
},
},
})
}

return result
}

func webStreamToStructured(stream *streams.WebStream, streamSource *dbops.Stream, txID uuid.UUID) LokiStream {

labels := map[string]string{
"logpush_source": "web",
"service_name": streamSource.Name,
"logpush_tx": txID.String(),
}

logdata.MergeStreamLabels(streamSource, labels)

metaFields := map[string]string{}

for key, val := range stream.Meta {

if _, has := labels[key]; has {
continue
}

switch key {
case "env", "environment":
labels["env"] = val
default:
metaFields[key] = val
}
}

var streamValues [][]any
for idx, entry := range stream.Entries {

if entry.Message = strings.TrimSpace(entry.Message); entry.Message == "" {
continue
}

meta := map[string]string{}
maps.Copy(meta, metaFields)
meta["detected_level"] = entry.Level.String()

if entry.Meta != nil {
maps.Copy(meta, entry.Meta)
}

streamValues = append(streamValues, []any{
entry.Date.String(idx),
entry.Message,
meta,
})
}

return LokiStream{
Stream: labels,
Values: streamValues,
}
}
56 changes: 56 additions & 0 deletions service/forwarder/timescale/streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package timescale

import (
"database/sql"
"encoding/json"
"maps"
"strings"

"github.com/google/uuid"
"github.com/maddsua/logpush/service/dbops"
"github.com/maddsua/logpush/service/ingester/streams"
"github.com/maddsua/logpush/service/logdata"
)

func webStreamToRows(batch *streams.WebStream, streamID uuid.UUID, txID uuid.UUID) []dbops.InsertStreamEntryParams {

var result []dbops.InsertStreamEntryParams

var mergeMeta = func(entry streams.WebLogEntry) map[string]string {

metadata := map[string]string{}

maps.Copy(metadata, batch.Meta)
logdata.CopyMetaFields(metadata, entry.Meta)

if len(metadata) == 0 {
return nil
}

return metadata
}

for idx, entry := range batch.Entries {

if entry.Message = strings.TrimSpace(entry.Message); entry.Message == "" {
continue
}

var metadata sql.Null[[]byte]
if meta := mergeMeta(entry); meta != nil {
metadata.V, _ = json.Marshal(meta)
metadata.Valid = true
}

result = append(result, dbops.InsertStreamEntryParams{
CreatedAt: entry.Date.Time(idx),
StreamID: streamID,
TxID: uuid.NullUUID{UUID: txID, Valid: true},
Level: entry.Level.String(),
Message: entry.Message,
Metadata: metadata,
})
}

return result
}
Loading

0 comments on commit 57b6748

Please sign in to comment.