Skip to content

Commit

Permalink
Merge branch 'main' into patch-3
Browse files Browse the repository at this point in the history
  • Loading branch information
JStickler authored Dec 6, 2024
2 parents 5c482d7 + 78d3c44 commit 44fd44a
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 53 deletions.
50 changes: 24 additions & 26 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem st
}

type QuerierConfig struct {
// MinTableOffset is derived from the compactor's MinTableOffset
MinTableOffset int
BuildInterval time.Duration
BuildTableOffset int
}

// BloomQuerier is a store-level abstraction on top of Client
Expand Down Expand Up @@ -119,30 +119,28 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
preFilterSeries := len(grouped)

// Do not attempt to filter chunks for which there are no blooms
if bq.cfg.MinTableOffset > 0 {
minAge := truncateDay(model.Now()).Add(-1 * config.ObjectStorageIndexRequiredPeriod * time.Duration(bq.cfg.MinTableOffset-1))
if through.After(minAge) {
level.Debug(logger).Log(
"msg", "skip too recent chunks",
"tenant", tenant,
"from", from.Time(),
"through", through.Time(),
"responses", 0,
"preFilterChunks", preFilterChunks,
"postFilterChunks", preFilterChunks,
"filteredChunks", 0,
"preFilterSeries", preFilterSeries,
"postFilterSeries", preFilterSeries,
"filteredSeries", 0,
)

bq.metrics.chunksTotal.Add(float64(preFilterChunks))
bq.metrics.chunksFiltered.Add(0)
bq.metrics.seriesTotal.Add(float64(preFilterSeries))
bq.metrics.seriesFiltered.Add(0)

return chunkRefs, false, nil
}
minAge := model.Now().Add(-1 * (config.ObjectStorageIndexRequiredPeriod*time.Duration(bq.cfg.BuildTableOffset) + 2*bq.cfg.BuildInterval))
if through.After(minAge) {
level.Info(logger).Log(
"msg", "skip too recent chunks",
"tenant", tenant,
"from", from.Time(),
"through", through.Time(),
"responses", 0,
"preFilterChunks", preFilterChunks,
"postFilterChunks", preFilterChunks,
"filteredChunks", 0,
"preFilterSeries", preFilterSeries,
"postFilterSeries", preFilterSeries,
"filteredSeries", 0,
)

bq.metrics.chunksTotal.Add(float64(preFilterChunks))
bq.metrics.chunksFiltered.Add(0)
bq.metrics.seriesTotal.Add(float64(preFilterSeries))
bq.metrics.seriesFiltered.Add(0)

return chunkRefs, false, nil
}

var skippedGrps [][]*logproto.GroupedChunkRefs
Expand Down
77 changes: 53 additions & 24 deletions pkg/logql/log/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,42 +440,71 @@ func contains(line, substr []byte, caseInsensitive bool) bool {
return containsLower(line, substr)
}

// containsLower verifies if substr is a substring of line, with case insensitive comparison.
// substr is expected to be in lowercase.
func containsLower(line, substr []byte) bool {
if len(substr) == 0 {
return true
}
if len(substr) > len(line) {
return false
}
j := 0
for len(line) > 0 {
// ascii fast case
if c := line[0]; c < utf8.RuneSelf && substr[j] < utf8.RuneSelf {
if c == substr[j] || c+'a'-'A' == substr[j] || c == substr[j]+'a'-'A' {
j++
if j == len(substr) {
return true

// Fast path - try to find first byte of substr
firstByte := substr[0]
maxIndex := len(line) - len(substr)

i := 0
for i <= maxIndex {
// Find potential first byte match
c := line[i]
if c != firstByte && c+'a'-'A' != firstByte && c != firstByte+'a'-'A' {
i++
continue
}

// Found potential match, check rest of substr
matched := true
linePos := i
substrPos := 0

for linePos < len(line) && substrPos < len(substr) {
c := line[linePos]
s := substr[substrPos]

// Fast ASCII comparison
if c < utf8.RuneSelf && s < utf8.RuneSelf {
if c != s && c+'a'-'A' != s && c != s+'a'-'A' {
matched = false
break
}
line = line[1:]
linePos++
substrPos++
continue
}
line = line[1:]
j = 0
continue
}
// unicode slow case
lr, lwid := utf8.DecodeRune(line)
mr, mwid := utf8.DecodeRune(substr[j:])
if lr == mr || mr == unicode.To(unicode.LowerCase, lr) {
j += mwid
if j == len(substr) {
return true

// Slower Unicode path only when needed
lr, lineSize := utf8.DecodeRune(line[linePos:])
mr, substrSize := utf8.DecodeRune(substr[substrPos:])

if lr == utf8.RuneError || mr == utf8.RuneError {
matched = false
break
}
line = line[lwid:]
continue

if unicode.ToLower(lr) != mr {
matched = false
break
}

linePos += lineSize
substrPos += substrSize
}
line = line[lwid:]
j = 0

if matched && substrPos == len(substr) {
return true
}
i++
}
return false
}
Expand Down
99 changes: 98 additions & 1 deletion pkg/logql/log/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func Test_SimplifiedRegex(t *testing.T) {
// tests all lines with both filter, they should have the same result.
for _, line := range fixtures {
l := []byte(line)
require.Equal(t, d.Filter(l), f.Filter(l), "regexp %s failed line: %s", test.re, line)
require.Equal(t, d.Filter(l), f.Filter(l), "regexp %s failed line: %s re:%v simplified:%v", test.re, line, d.Filter(l), f.Filter(l))
}
})
}
Expand Down Expand Up @@ -219,3 +219,100 @@ func benchmarkRegex(b *testing.B, re, line string, match bool) {
func Test_rune(t *testing.T) {
require.True(t, newContainsFilter([]byte("foo"), true).Filter([]byte("foo")))
}

func BenchmarkContainsLower(b *testing.B) {
cases := []struct {
name string
line string
substr string
expected bool
}{
{
name: "short_line_no_match",
line: "this is a short log line",
substr: "missing",
expected: false,
},
{
name: "short_line_with_match",
line: "this is a short log line",
substr: "SHORT",
expected: true,
},
{
name: "long_line_no_match",
line: "2023-06-14T12:34:56.789Z INFO [service_name] This is a much longer log line with timestamps, levels and other information that typically appears in production logs. RequestID=123456 UserID=789 Action=GetUser Duration=123ms Status=200",
substr: "nonexistent",
expected: false,
},
{
name: "long_line_match_start",
line: "2023-06-14T12:34:56.789Z INFO [service_name] This is a much longer log line with timestamps, levels and other information that typically appears in production logs. RequestID=123456 UserID=789 Action=GetUser Duration=123ms Status=200",
substr: "2023",
expected: true,
},
{
name: "long_line_match_middle",
line: "2023-06-14T12:34:56.789Z INFO [service_name] This is a much longer log line with timestamps, levels and other information that typically appears in production logs. RequestID=123456 UserID=789 Action=GetUser Duration=123ms Status=200",
substr: "LEVELS",
expected: true,
},
{
name: "long_line_match_end",
line: "2023-06-14T12:34:56.789Z INFO [service_name] This is a much longer log line with timestamps, levels and other information that typically appears in production logs. RequestID=123456 UserID=789 Action=GetUser Duration=123ms Status=200",
substr: "status",
expected: true,
},
{
name: "short_unicode_line_no_match",
line: "🌟 Unicode line with emojis 🎉 and special chars ñ é ß",
substr: "missing",
expected: false,
},
{
name: "short_unicode_line_with_match",
line: "🌟 Unicode line with emojis 🎉 and special chars ñ é ß",
substr: "EMOJIS",
expected: true,
},
{
name: "long_unicode_line_no_match",
line: "2023-06-14T12:34:56.789Z 🚀 [микросервис] Длинное сообщение с Unicode символами 统一码 が大好き! エラー分析: システムは正常に動作しています。RequestID=123456 状態=良好 Résultat=Succès ß=γ 🎯 τέλος",
substr: "nonexistent",
expected: false,
},
{
name: "long_unicode_line_match_start",
line: "2023-06-14T12:34:56.789Z 🚀[МИКРОСервис] Длинное сообщение с Unicode символами 统一码 が大好き! エラー分析: システムは正常に動作しています。RequestID=123456 状態=良好 Résultat=Succès ß=γ 🎯 τέλος",
substr: "микросервис",
expected: true,
},
{
name: "long_unicode_line_match_middle",
line: "2023-06-14T12:34:56.789Z 🚀 [микросервис] Длинное сообщение с Unicode символами 统一码 が大好き! エラー分析: システムは正常に動作しています。RequestID=123456 状態=良好 Résultat=Succès ß=γ 🎯 τέλος",
substr: "UNICODE",
expected: true,
},
{
name: "long_unicode_line_match_end",
line: "2023-06-14T12:34:56.789Z 🚀 [микросервис] Длинное сообщение с Unicode символами 统一码 が大好き! エラー分析: システムは正常に動作しています。RequestID=123456 状態=良好 Résultat=Succès ß=γ 🎯 τέλος",
substr: "τέλος",
expected: true,
},
}

var m bool
for _, c := range cases {
b.Run(c.name, func(b *testing.B) {
line := []byte(c.line)
substr := []byte(c.substr)
for i := 0; i < b.N; i++ {
m = containsLower(line, substr)
}
if m != c.expected {
b.Fatalf("expected %v but got %v", c.expected, m)
}
})
}
res = m // Avoid compiler optimization
}
3 changes: 2 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,8 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
}
resolver := bloomgateway.NewBlockResolver(t.BloomStore, logger)
querierCfg := bloomgateway.QuerierConfig{
MinTableOffset: t.Cfg.BloomBuild.Planner.MinTableOffset,
BuildTableOffset: t.Cfg.BloomBuild.Planner.MinTableOffset,
BuildInterval: t.Cfg.BloomBuild.Planner.PlanningInterval,
}
bloomQuerier = bloomgateway.NewQuerier(bloomGatewayClient, querierCfg, t.Overrides, resolver, prometheus.DefaultRegisterer, logger)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func ErrorTypeFromHTTPStatus(status int) string {
}

func IsError(status int) bool {
return status/200 != 0
return status < 200 || status >= 300
}

func IsServerError(status int) bool {
Expand Down
56 changes: 56 additions & 0 deletions pkg/util/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,59 @@ func TestErrorTypeFromHTTPStatus(t *testing.T) {
})
}
}

func TestIsError(t *testing.T) {
tests := []struct {
name string
status int
expectedResult bool
}{
{
name: "200 OK",
status: 200,
expectedResult: false,
},
{
name: "201 Created",
status: 201,
expectedResult: false,
},
{
name: "400 Bad Request",
status: 400,
expectedResult: true,
},
{
name: "404 Not Found",
status: 404,
expectedResult: true,
},
{
name: "429 Too Many Requests",
status: 429,
expectedResult: true,
},
{
name: "500 Internal Server Error",
status: 500,
expectedResult: true,
},
{
name: "503 Service Unavailable",
status: 503,
expectedResult: true,
},
{
name: "600 Unknown",
status: 600,
expectedResult: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := util.IsError(tt.status)
assert.Equal(t, tt.expectedResult, result)
})
}
}

0 comments on commit 44fd44a

Please sign in to comment.