Skip to content

Commit

Permalink
log invalid token requests
Browse files Browse the repository at this point in the history
  • Loading branch information
maddsua committed Jan 20, 2025
1 parent b5c2226 commit 8362587
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 21 deletions.
4 changes: 2 additions & 2 deletions service/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (this *LokiConnection) PushStreams(streams []LokiStream) error {
return fmt.Errorf("failed to push log streams: %s", string(responseBody))
}

func (this *LokiConnection) IngestWeb(streamSource dbops.Stream, remoteAddr string, payload WebStream) {
func (this *LokiConnection) IngestWeb(streamSource *dbops.Stream, remoteAddr string, payload WebStream) {

stream := payload.ToLokiStream(streamSource)
if len(stream.Values) == 0 {
Expand Down Expand Up @@ -186,7 +186,7 @@ type Timescale struct {
DB *sql.DB
}

func (this *Timescale) IngestWeb(streamSource dbops.Stream, remoteAddr string, payload WebStream) {
func (this *Timescale) IngestWeb(streamSource *dbops.Stream, remoteAddr string, payload WebStream) {

rows := payload.ToTimescaleRows(streamSource.ID)
if len(rows) == 0 {
Expand Down
42 changes: 23 additions & 19 deletions service/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,41 @@ func (this *LogIngester) ServeHTTP(writer http.ResponseWriter, req *http.Request

func (this *LogIngester) handleRequest(req *http.Request) error {

serviceID, err := uuid.Parse(req.PathValue("id"))
streamID, err := uuid.Parse(req.PathValue("id"))
if err != nil {
return errors.New("service id required")
}

var logStream dbops.Stream
var getLogStream = func() (*dbops.Stream, error) {

if cached := this.StreamCache.Get(serviceID); cached != nil {

if cached.Entry == nil {
return errors.New("service not found")
if cached := this.StreamCache.Get(streamID); cached != nil {
return cached.Entry, nil
}

logStream = *cached.Entry

} else {

if logStream, err = this.DB.GetStream(req.Context(), serviceID); err != nil {
entry, err := this.DB.GetStream(req.Context(), streamID)
if err != nil {

if err == sql.ErrNoRows {
this.StreamCache.Set(serviceID, nil)
return errors.New("service not found")
this.StreamCache.Set(streamID, nil)
return nil, nil
}

slog.Error("Failed to query log stream",
slog.String("err", err.Error()))

return errors.New("unable to query requested service stream")
return nil, err
}

this.StreamCache.Set(serviceID, &logStream)
this.StreamCache.Set(streamID, &entry)
return &entry, nil
}

logStream, err := getLogStream()
if err != nil {
slog.Error("WEB STREAM: Failed to query log stream",
slog.String("err", err.Error()))
return errors.New("unable to query requested service stream")
} else if logStream == nil {
slog.Warn("WEB STREAM: Log stream not found",
slog.String("id", streamID.String()))
return errors.New("service not found")
}

contentType := req.Header.Get("content-type")
Expand Down Expand Up @@ -117,7 +121,7 @@ type WebLogEntry struct {
Meta map[string]string `json:"meta"`
}

func (batch *WebStream) ToLokiStream(streamSource dbops.Stream) LokiStream {
func (batch *WebStream) ToLokiStream(streamSource *dbops.Stream) LokiStream {

labels := map[string]string{
"source": "web",
Expand Down

0 comments on commit 8362587

Please sign in to comment.