Skip to content

Commit

Permalink
introduce backend for filtering ingest (highlight#6713)
Browse files Browse the repository at this point in the history
## Summary

Introduces a sampling rate and ingestion exclusion filters for all 4
products.
Rate-based ingestion uses a stable ingest-item key to consistently deem 
whether a resource should be ingested or not, so that even if we replay 
the ingest queue, ingest decisions will remain the same based on
client-generated keys.
Filter-based ingestion mimics the existing search logic we have but
applies filters directly
as each item is ingested to avoid having to insert into clickhouse to
make a filter query. This means
that the filtering logic has to be duplicated with what we have for the
feeds themselves, but
the logic is shared across the 4 products.

### Sessions
* Sampling rate uses session secure ID as the key
* Exclusion filters are applied when a session is initialized or when
the datasync queue has an entry (ie. new fields are appended).
### Errors
* Sampling rate uses request id, trace id, or span id. 
* Exclusion filters are applied when errors are ingested in push payload
or in otel backend processing.
### Logs
* Sampling rate uses the log UUID. 
* Exclusion filters are applied in otel backend processing. 
### Traces
* Sampling rate uses the trace UUID. Because the trace UUID is
consistent with all spans children of a root trace, this is effectively
'head' tracing since once we determine a root trace to be included, all
child spans will also be included.
* Exclusion filters are applied in otel backend processing. 

Relates to highlight#6749

## How did you test this change?

New unit tests.
Validating sampling working correctly locally.

![image](https://github.com/highlight/highlight/assets/1351531/52bc2474-d5ac-44b8-8833-efbc3d5365d3)

## Are there any deployment considerations?

No, all new functionality is disabled by default with no way to enable
this from the UI yet.

## Does this work require review from our design team?

No
  • Loading branch information
Vadman97 authored Oct 7, 2023
1 parent 48e3949 commit 17b3958
Show file tree
Hide file tree
Showing 41 changed files with 2,898 additions and 244 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,5 @@ jobs:
go-version-file: 'backend/go.mod'
- name: Run tests
run: cd backend && go test -p 1 ./... -v
- name: Run fuzz tests
run: cd backend && make fuzz
27 changes: 21 additions & 6 deletions backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,32 @@ public-gen:
private-gen:
(cd ./private-graph; go run github.com/99designs/gqlgen)
private-test:
(cd ./private-graph/graph; ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test ./... -v)
(cd ./private-graph/graph; CLICKHOUSE_ADDRESS=localhost:9000 CLICKHOUSE_DATABASE=default CLICKHOUSE_TEST_DATABASE=test CLICKHOUSE_USERNAME=default ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test ./... -v)
public-test:
(cd ./public-graph/graph; ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test ./... -v)
(cd ./public-graph/graph; CLICKHOUSE_ADDRESS=localhost:9000 CLICKHOUSE_DATABASE=default CLICKHOUSE_TEST_DATABASE=test CLICKHOUSE_USERNAME=default ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test ./... -v)
model-test:
(cd ./model; ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test ./... -v)
(cd ./model; CLICKHOUSE_ADDRESS=localhost:9000 CLICKHOUSE_DATABASE=default CLICKHOUSE_TEST_DATABASE=test CLICKHOUSE_USERNAME=default ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test ./... -v)
payload-test:
(cd ./payload; ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test ./... -v)
(cd ./payload; CLICKHOUSE_ADDRESS=localhost:9000 CLICKHOUSE_DATABASE=default CLICKHOUSE_TEST_DATABASE=test CLICKHOUSE_USERNAME=default ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test ./... -v)
worker-test:
(cd ./worker; ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test ./... -v)
(cd ./worker; CLICKHOUSE_ADDRESS=localhost:9000 CLICKHOUSE_DATABASE=default CLICKHOUSE_TEST_DATABASE=test CLICKHOUSE_USERNAME=default ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test ./... -v)
test-and-coverage:
(ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test -p 1 -covermode=atomic -coverprofile=coverage.out ./... -v)
(CLICKHOUSE_ADDRESS=localhost:9000 CLICKHOUSE_DATABASE=default CLICKHOUSE_TEST_DATABASE=test CLICKHOUSE_USERNAME=default ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test -p 1 -covermode=atomic -coverprofile=coverage.out ./... -v)
test:
(CLICKHOUSE_ADDRESS=localhost:9000 CLICKHOUSE_DATABASE=default CLICKHOUSE_TEST_DATABASE=test CLICKHOUSE_USERNAME=default ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 PSQL_USER=postgres PSQL_PASSWORD=postgres go test -p 1 --race ./... -v)
fuzz:
for file in $$(grep -r --include='**_test.go' --files-with-matches 'func Fuzz' .); do \
funcs=$$(grep -oP 'func \K(Fuzz\w*)' $$file); \
for func in $${funcs}; do \
echo "Fuzzing $$func in $$file"; \
parentDir=$$(dirname $$file); \
CLICKHOUSE_ADDRESS='localhost:9000' CLICKHOUSE_DATABASE=default \
CLICKHOUSE_TEST_DATABASE=test CLICKHOUSE_USERNAME=default \
ENVIRONMENT=test LOG_LEVEL=debug PSQL_HOST=localhost PSQL_PORT=5432 \
PSQL_USER=postgres PSQL_PASSWORD=postgres \
go test $$parentDir -run=$$func -fuzz=$$func -fuzztime=5s || exit 1; \
done \
done
report-stripe-usage:
(go build; doppler run -- ./backend -runtime=worker -worker-handler=report-stripe-usage)
start-metric-monitor-watch:
Expand Down
28 changes: 28 additions & 0 deletions backend/clickhouse/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
model2 "github.com/highlight-run/highlight/backend/public-graph/graph/model"
"github.com/highlight-run/highlight/backend/queryparser"
"strings"
"time"

Expand Down Expand Up @@ -545,3 +547,29 @@ func (client *Client) QueryErrorHistogram(ctx context.Context, projectId int, qu

return bucketTimes, totals, nil
}

var errorObjectsTableConfig = tableConfig[modelInputs.ReservedErrorObjectKey]{
tableName: ErrorObjectsTable,
keysToColumns: map[modelInputs.ReservedErrorObjectKey]string{
modelInputs.ReservedErrorObjectKeySessionSecureID: "SessionSecureID",
modelInputs.ReservedErrorObjectKeyRequestID: "RequestID",
modelInputs.ReservedErrorObjectKeyTraceID: "TraceID",
modelInputs.ReservedErrorObjectKeySpanID: "SpanID",
modelInputs.ReservedErrorObjectKeyLogCursor: "LogCursor",
modelInputs.ReservedErrorObjectKeyEvent: "Event",
modelInputs.ReservedErrorObjectKeyType: "Type",
modelInputs.ReservedErrorObjectKeyURL: "URL",
modelInputs.ReservedErrorObjectKeySource: "Source",
modelInputs.ReservedErrorObjectKeyStackTrace: "StackTrace",
modelInputs.ReservedErrorObjectKeyTimestamp: "Timestamp",
modelInputs.ReservedErrorObjectKeyPayload: "Payload",
modelInputs.ReservedErrorObjectKeyServiceName: "Service.Name",
modelInputs.ReservedErrorObjectKeyServiceVersion: "Service.Version",
},
bodyColumn: "Event",
reservedKeys: modelInputs.AllReservedErrorObjectKey,
}

func ErrorMatchesQuery(errorObject *model2.BackendErrorObjectInput, filters *queryparser.Filters) bool {
return matchesQuery(errorObject, errorObjectsTableConfig, filters)
}
31 changes: 31 additions & 0 deletions backend/clickhouse/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package clickhouse

import (
modelInputs "github.com/highlight-run/highlight/backend/public-graph/graph/model"
"github.com/highlight-run/highlight/backend/queryparser"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func Test_ErrorMatchesQuery(t *testing.T) {
errorObject := modelInputs.BackendErrorObjectInput{}
filters := queryparser.Parse("oops something bad type:console.error os.type:linux resource_name:worker.* service_name:all")
matches := ErrorMatchesQuery(&errorObject, &filters)
assert.False(t, matches)

errorObject = modelInputs.BackendErrorObjectInput{
Event: "good evening, this is an error where oops something bad happens",
Type: "console.error",
URL: "https://example.com",
Source: "backend",
StackTrace: "yoo",
Timestamp: time.Now(),
Service: &modelInputs.ServiceInput{
Name: "all",
Version: "asdf",
},
}
matches = ErrorMatchesQuery(&errorObject, &filters)
assert.True(t, matches)
}
7 changes: 7 additions & 0 deletions backend/clickhouse/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clickhouse
import (
"context"
"fmt"
"github.com/highlight-run/highlight/backend/queryparser"
"math"
"time"

Expand Down Expand Up @@ -36,6 +37,7 @@ var logsTableConfig = tableConfig[modelInputs.ReservedLogKey]{
tableName: LogsTable,
keysToColumns: logKeysToColumns,
reservedKeys: modelInputs.AllReservedLogKey,
bodyColumn: "Body",
attributesColumn: "LogAttributes",
selectColumns: []string{
"Timestamp",
Expand All @@ -56,6 +58,7 @@ var logsSamplingTableConfig = tableConfig[modelInputs.ReservedLogKey]{
tableName: fmt.Sprintf("%s SAMPLE %d", LogsSamplingTable, SamplingRows),
keysToColumns: logKeysToColumns,
reservedKeys: modelInputs.AllReservedLogKey,
bodyColumn: "Body",
attributesColumn: "LogAttributes",
}

Expand Down Expand Up @@ -414,3 +417,7 @@ func (client *Client) LogsKeys(ctx context.Context, projectID int, startDate tim
func (client *Client) LogsKeyValues(ctx context.Context, projectID int, keyName string, startDate time.Time, endDate time.Time) ([]string, error) {
return KeyValuesAggregated(ctx, client, LogKeyValuesTable, projectID, keyName, startDate, endDate)
}

func LogMatchesQuery(logRow *LogRow, filters *queryparser.Filters) bool {
return matchesQuery(logRow, logsTableConfig, filters)
}
147 changes: 147 additions & 0 deletions backend/clickhouse/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package clickhouse
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/highlight-run/highlight/backend/queryparser"
"github.com/samber/lo"
"os"
"reflect"
"sort"
Expand Down Expand Up @@ -1228,3 +1231,147 @@ func FuzzReadLogs(f *testing.F) {
assert.NoError(t, err)
})
}

func Test_LogMatchesQuery(t *testing.T) {
logRow := LogRow{}
filters := queryparser.Parse("hello world os.type:linux resource_name:worker.* service_name:all")
matches := LogMatchesQuery(&logRow, &filters)
assert.False(t, matches)

logRow = LogRow{
Body: "this, is.a ; hello; world \nbe\xe2\x80\x83me",
ServiceName: "all",
LogAttributes: map[string]string{
"os.type": "linux",
"resource_name": "worker.kafka.process",
},
}
filters = queryparser.Parse("hello world be me os.type:linux resource_name:worker.* service_name:all")
matches = LogMatchesQuery(&logRow, &filters)
assert.True(t, matches)

filters = queryparser.Parse("not this one os.type:linux resource_name:worker.* service_name:all")
matches = LogMatchesQuery(&logRow, &filters)
assert.False(t, matches)
}

func Test_LogMatchesQuery_Body(t *testing.T) {
for _, body := range []string{
"hello world a test",
"hello, world this is a test",
"this, is.a ; hello; 123 there6 world:message,.:; \nbe\xe2\x80\x83me",
"0!0! 0*000000 000000000",
"0! 000\\\"0000000000000",
"(*",
"*!",
"*\\x80",
} {
logRow := LogRow{Body: body}
filters := queryparser.Parse(body)
// : represents a special case since our query parser treats it as an attribute. don't search on attrs
filters.Attributes = map[string][]string{}
matches := LogMatchesQuery(&logRow, &filters)
assert.True(t, matches, "failed on body %s", body)

filters = queryparser.Parse("no")
matches = LogMatchesQuery(&logRow, &filters)
assert.False(t, matches, "failed on body %s", body)
}
}

func Test_LogMatchesQuery_ClickHouse(t *testing.T) {
ctx := context.Background()
client, teardown := setupTest(t)
defer teardown(t)

now := time.Now()
oneSecondAgo := now.Add(-time.Second * 1)
var rows []*LogRow
for i := 1; i <= LogsLimit; i++ {
row := NewLogRow(oneSecondAgo, 1,
WithBody(ctx, "this is a hello world message"),
WithSeverityText(modelInputs.LogLevelInfo.String()),
WithServiceName("all"),
WithTraceID(uuid.New().String()),
WithLogAttributes(map[string]string{
"service": "foo",
"os.type": "linux",
"resource_name": "worker.kafka.process"}))
if i < 10 {
row.ServiceName = "dev"
}
rows = append(rows, row)
}
assert.NoError(t, client.BatchWriteLogRows(ctx, rows))

query := "hello world os.type:linux resource_name:worker.* service_name:dev"
result, err := client.ReadLogs(ctx, 1, modelInputs.QueryInput{
DateRange: makeDateWithinRange(now),
Query: query,
}, Pagination{})
assert.NoError(t, err)

var filtered []*LogRow
filters := queryparser.Parse(query)
for _, logRow := range rows {
if LogMatchesQuery(logRow, &filters) {
filtered = append(filtered, logRow)
}
}

assert.Equal(t, len(filtered), len(result.Edges))
for _, logRow := range filtered {
_, found := lo.Find(result.Edges, func(edge *modelInputs.LogEdge) bool {
if edge.Node.TraceID == nil {
return false
}
return *edge.Node.TraceID == logRow.TraceId
})
assert.True(t, found)
}
}

func Test_LogMatchesQuery_ClickHouse_Body(t *testing.T) {
for _, body := range []string{
"hello world",
"this, is.a ; hello; \nbe me",
"hello world a test",
"hello, world this is a test",
"foo*bar",
"0! 000\\\"0000000000000",
"(*",
"*!",
"*\\x80",
} {
ctx := context.Background()
client, teardown := setupTest(t)
defer teardown(t)

now := time.Now()
oneSecondAgo := now.Add(-time.Second * 1)
logRow := NewLogRow(oneSecondAgo, 1, WithBody(ctx, body))
assert.NoError(t, client.BatchWriteLogRows(ctx, []*LogRow{logRow}))

result, err := client.ReadLogs(ctx, 1, modelInputs.QueryInput{
DateRange: makeDateWithinRange(now),
Query: body,
}, Pagination{})
assert.NoError(t, err)

var filtered []*LogRow
filters := queryparser.Parse(body)
if LogMatchesQuery(logRow, &filters) {
filtered = append(filtered, logRow)
}

assert.Equal(t, 1, len(result.Edges))
assert.Equal(t, len(filtered), len(result.Edges))
_, found := lo.Find(result.Edges, func(edge *modelInputs.LogEdge) bool {
if edge.Node.TraceID == nil {
return false
}
return *edge.Node.TraceID == logRow.TraceId
})
assert.True(t, found)
}
}
Loading

0 comments on commit 17b3958

Please sign in to comment.