From 2e5e321d755601e6ec98aa490a1b01d895cdf183 Mon Sep 17 00:00:00 2001 From: ziggie Date: Wed, 3 Apr 2024 23:23:30 +0100 Subject: [PATCH] query: Add unit test for the batch time-out. --- query/workmanager_test.go | 154 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) diff --git a/query/workmanager_test.go b/query/workmanager_test.go index 13e81c76d..c96bcd01c 100644 --- a/query/workmanager_test.go +++ b/query/workmanager_test.go @@ -3,6 +3,7 @@ package query import ( "fmt" "sort" + "sync" "testing" "time" @@ -479,3 +480,156 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) { } } } + +// queryJobWithWorkerIndex is used to know which worker was used for the +// corresponding job request to signal the result back to the result channel. +type queryJobWithWorkerIndex struct { + worker int + job *queryJob +} + +// mergeWorkChannels is used to merge the channels of all the workers into a one +// single one for better control of the concurrency during testing. +func mergeWorkChannels(workers []*mockWorker) <-chan queryJobWithWorkerIndex { + var wg sync.WaitGroup + merged := make(chan queryJobWithWorkerIndex) + + // Function to copy data from each worker channel to the merged channel + readFromWorker := func(input <-chan *queryJob, worker int) { + defer wg.Done() + for { + value, ok := <-input + if !ok { + // Channel is closed, exit the loop + return + } + merged <- queryJobWithWorkerIndex{ + worker: worker, + job: value, + } + } + } + + // Start a goroutine for each worker channel. + wg.Add(len(workers)) + for i, work := range workers { + go readFromWorker(work.nextJob, i) + } + + // Wait for all copying to be done, then close the merged channel + go func() { + wg.Wait() + close(merged) + }() + + return merged +} + +// TestWorkManagerTimeOutBatch tests that as soon as a batch times-out all the +// ongoing queries already registered with workers and also the queued up ones +// are canceled. +func TestWorkManagerTimeOutBatch(t *testing.T) { + const numQueries = 100 + const numWorkers = 10 + + // Start the workDispatcher goroutine. + wm, workers := startWorkManager(t, numWorkers) + + // mergeChan is the channel which receives all the jobQueries + // sequentially which are sent to the registered workers. + mergeChan := mergeWorkChannels(workers) + + // activeQueries are the queries currently registered with the workers. + var activeQueries []queryJobWithWorkerIndex + + // Schedule a batch of queries. + var queries []*Request + for i := 0; i < numQueries; i++ { + q := &Request{} + queries = append(queries, q) + } + + // Send the batch query (including numQueries), and include a channel + // to cancel the batch. + // + // NOTE: We will timeout the batch to simulate a slow peer connection + // and make sure we cancel all ongoing queries including the ones which + // are still queued up. + cancelChan := make(chan struct{}) + errChan := wm.Query(queries, Cancel(cancelChan), Timeout(1*time.Second)) + + // Send a query to every active worker. + for i := 0; i < numWorkers; i++ { + select { + case jobQuery := <-mergeChan: + activeQueries = append(activeQueries, jobQuery) + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(5 * time.Second): + t.Fatalf("next job not received") + } + } + + // We wait before we send the result for one query to exceed the timeout + // of the batch. + time.Sleep(2 * time.Second) + + // We need to signal a result for one of the active workers so that + // the batch timeout is triggered. + workerIndex := activeQueries[0].worker + workers[workerIndex].results <- &jobResult{ + job: activeQueries[0].job, + err: nil, + } + + // We expect the cancelChan to be closed for this batch. + select { + case <-cancelChan: + case <-time.After(time.Second): + t.Fatalf("expected for the cancelChan to close") + } + + // As soon as the batch times-out an error is sent via the errChan. + select { + case err := <-errChan: + require.ErrorIs(t, err, ErrQueryTimeout) + case <-time.After(time.Second): + t.Fatalf("expected for the errChan to signal") + } + + // The cancelChan got closed, this happens when the batch times-out. + // So all the ongoing queries are canceled as well. + for i := 1; i < numWorkers; i++ { + job := activeQueries[i].job + select { + case <-job.cancelChan: + workers[i].results <- &jobResult{ + job: job, + err: nil, + } + case <-time.After(time.Second): + t.Fatalf("expected for the cancelChan to close") + } + } + + // Make also sure that all the queued queries for this batch are + // canceled as well. + for i := numWorkers; i < numQueries; i++ { + select { + case res := <-mergeChan: + job := res.job + workerIndex := res.worker + select { + case <-job.cancelChan: + workers[workerIndex].results <- &jobResult{ + job: job, + err: nil, + } + case <-time.After(time.Second): + t.Fatalf("expected for the cancelChan to close") + } + case <-time.After(time.Second): + t.Fatalf("next job not received") + } + } +}