Skip to content

Commit

Permalink
feat(block-scheduler): job tracking & offset commits (#15338)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Dec 11, 2024
1 parent 6316204 commit f2bff77
Show file tree
Hide file tree
Showing 15 changed files with 767 additions and 195 deletions.
16 changes: 8 additions & 8 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,13 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
logger := log.With(
i.logger,
"worker_id", workerID,
"partition", job.Partition,
"job_min_offset", job.Offsets.Min,
"job_max_offset", job.Offsets.Max,
"partition", job.Partition(),
"job_min_offset", job.Offsets().Min,
"job_max_offset", job.Offsets().Max,
)

i.jobsMtx.Lock()
i.inflightJobs[job.ID] = job
i.inflightJobs[job.ID()] = job
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

Expand Down Expand Up @@ -284,7 +284,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
}

i.jobsMtx.Lock()
delete(i.inflightJobs, job.ID)
delete(i.inflightJobs, job.ID())
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

Expand Down Expand Up @@ -315,15 +315,15 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
"load records",
1,
func(ctx context.Context) error {
lastOffset, err = i.loadRecords(ctx, job.Partition, job.Offsets, inputCh)
lastOffset, err = i.loadRecords(ctx, job.Partition(), job.Offsets(), inputCh)
return err
},
func(ctx context.Context) error {
level.Debug(logger).Log(
"msg", "finished loading records",
"ctx_error", ctx.Err(),
"last_offset", lastOffset,
"total_records", lastOffset-job.Offsets.Min,
"total_records", lastOffset-job.Offsets().Min,
)
close(inputCh)
return nil
Expand Down Expand Up @@ -488,7 +488,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
}
}

if lastOffset <= job.Offsets.Min {
if lastOffset <= job.Offsets().Min {
return lastOffset, nil
}

Expand Down
144 changes: 127 additions & 17 deletions pkg/blockbuilder/scheduler/prioritiy_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestPriorityQueue(t *testing.T) {
t.Run("operations", func(t *testing.T) {
t.Run("basic operations", func(t *testing.T) {
tests := []struct {
name string
input []int
Expand All @@ -33,16 +33,14 @@ func TestPriorityQueue(t *testing.T) {
input: []int{3, 1, 2},
wantPops: []int{1, 2, 3},
},
{
name: "duplicate elements",
input: []int{2, 1, 2, 1},
wantPops: []int{1, 1, 2, 2},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pq := NewPriorityQueue[int](func(a, b int) bool { return a < b })
pq := NewPriorityQueue[int, int](
func(a, b int) bool { return a < b },
func(v int) int { return v },
)
require.Equal(t, 0, pq.Len())

// Push all elements
Expand All @@ -69,15 +67,73 @@ func TestPriorityQueue(t *testing.T) {
}
})

t.Run("key operations", func(t *testing.T) {
type Job struct {
ID string
Priority int
}

pq := NewPriorityQueue[string, Job](
func(a, b Job) bool { return a.Priority < b.Priority },
func(j Job) string { return j.ID },
)

// Test Push with duplicate key
job1 := Job{ID: "job1", Priority: 1}
job1Updated := Job{ID: "job1", Priority: 3}
job2 := Job{ID: "job2", Priority: 2}

pq.Push(job1)
require.Equal(t, 1, pq.Len())

// Push with same key should update
pq.Push(job1Updated)
require.Equal(t, 1, pq.Len())

// Verify updated priority
v, ok := pq.Lookup("job1")
require.True(t, ok)
require.Equal(t, job1Updated, v)

// Test Remove
pq.Push(job2)
v, ok = pq.Remove("job1")
require.True(t, ok)
require.Equal(t, job1Updated, v)
require.Equal(t, 1, pq.Len())

// Test UpdatePriority
newJob2 := Job{ID: "job2", Priority: 4}
ok = pq.UpdatePriority("job2", newJob2)
require.True(t, ok)

v, ok = pq.Lookup("job2")
require.True(t, ok)
require.Equal(t, newJob2, v)

// Test non-existent key operations
v, ok = pq.Lookup("nonexistent")
require.False(t, ok)
require.Zero(t, v)

v, ok = pq.Remove("nonexistent")
require.False(t, ok)
require.Zero(t, v)

ok = pq.UpdatePriority("nonexistent", Job{})
require.False(t, ok)
})

t.Run("custom type", func(t *testing.T) {
type Job struct {
ID string
Priority int
}

pq := NewPriorityQueue[Job](func(a, b Job) bool {
return a.Priority < b.Priority
})
pq := NewPriorityQueue[string, Job](
func(a, b Job) bool { return a.Priority < b.Priority },
func(j Job) string { return j.ID },
)

jobs := []Job{
{ID: "high", Priority: 3},
Expand All @@ -102,25 +158,28 @@ func TestPriorityQueue(t *testing.T) {
})

t.Run("mixed operations", func(t *testing.T) {
pq := NewPriorityQueue[int](func(a, b int) bool { return a < b })
pq := NewPriorityQueue[int, int](
func(a, b int) bool { return a < b },
func(v int) int { return v },
)

// Push some elements
pq.Push(3)
pq.Push(1)
require.Equal(t, 2, pq.Len())
pq.Push(4)

// Pop lowest
// Pop an element
v, ok := pq.Pop()
require.True(t, ok)
require.Equal(t, 1, v)

// Push more elements
pq.Push(2)
pq.Push(4)
pq.Push(5)

// Verify remaining elements come out in order
want := []int{2, 3, 4}
got := make([]int, 0, 3)
// Pop remaining elements and verify order
want := []int{2, 3, 4, 5}
got := make([]int, 0, len(want))
for range want {
v, ok := pq.Pop()
require.True(t, ok)
Expand Down Expand Up @@ -191,3 +250,54 @@ func TestCircularBuffer(t *testing.T) {
})
}
}

func TestCircularBufferLookup(t *testing.T) {
t.Run("empty buffer", func(t *testing.T) {
cb := NewCircularBuffer[int](5)
_, ok := cb.Lookup(func(i int) bool { return i == 1 })
require.False(t, ok)
})

t.Run("single element", func(t *testing.T) {
cb := NewCircularBuffer[int](5)
cb.Push(1)
v, ok := cb.Lookup(func(i int) bool { return i == 1 })
require.True(t, ok)
require.Equal(t, 1, v)
})

t.Run("multiple elements", func(t *testing.T) {
cb := NewCircularBuffer[int](5)
for i := 1; i <= 3; i++ {
cb.Push(i)
}
v, ok := cb.Lookup(func(i int) bool { return i == 2 })
require.True(t, ok)
require.Equal(t, 2, v)
})

t.Run("wrapped buffer", func(t *testing.T) {
cb := NewCircularBuffer[int](3)
// Push 5 elements into a buffer of size 3, causing wrap-around
for i := 1; i <= 5; i++ {
cb.Push(i)
}
// Buffer should now contain [4,5,3] with head at index 2
v, ok := cb.Lookup(func(i int) bool { return i == 4 })
require.True(t, ok)
require.Equal(t, 4, v)

// Element that was evicted should not be found
_, ok = cb.Lookup(func(i int) bool { return i == 1 })
require.False(t, ok)
})

t.Run("no match", func(t *testing.T) {
cb := NewCircularBuffer[int](5)
for i := 1; i <= 3; i++ {
cb.Push(i)
}
_, ok := cb.Lookup(func(i int) bool { return i == 99 })
require.False(t, ok)
})
}
Loading

0 comments on commit f2bff77

Please sign in to comment.