From 07b09502b9554386afa7bd4c5487f9b8da3a59bb Mon Sep 17 00:00:00 2001 From: garethgeorge Date: Wed, 8 May 2024 00:24:46 -0700 Subject: [PATCH] fix: improve concurrency handling in RunCommand --- internal/api/backresthandler.go | 46 +++++++++++++++++------ internal/orchestrator/tasks/taskforget.go | 2 +- webui/src/views/App.tsx | 4 +- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/internal/api/backresthandler.go b/internal/api/backresthandler.go index 5d4dffbd..d52302ba 100644 --- a/internal/api/backresthandler.go +++ b/internal/api/backresthandler.go @@ -420,34 +420,56 @@ func (s *BackrestHandler) RunCommand(ctx context.Context, req *connect.Request[v ctx, cancel := context.WithCancel(ctx) errChan := make(chan error, 1) - var outputBuf []byte - + outputs := make(chan []byte, 100) go func() { if err := repo.RunCommand(ctx, req.Msg.Command, func(output []byte) { - outputBuf = append(outputBuf, output...) + outputs <- output }); err != nil { errChan <- err } cancel() }() + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + bufSize := 32 * 1024 + buf := make([]byte, 0, bufSize) + + flush := func() error { + if len(buf) > 0 { + if err := resp.Send(&types.BytesValue{Value: buf}); err != nil { + return fmt.Errorf("failed to write output: %w", err) + } + buf = buf[:0] + } + return nil + } + for { select { case err := <-errChan: - if err := resp.Send(&types.BytesValue{Value: outputBuf}); err != nil { - return fmt.Errorf("failed to write output: %w", err) + if err := flush(); err != nil { + return err } return err case <-ctx.Done(): - if err := resp.Send(&types.BytesValue{Value: outputBuf}); err != nil { - return fmt.Errorf("failed to write output: %w", err) + return flush() + case output := <-outputs: + if len(output)+len(buf) > bufSize { + flush() } - return nil - case <-time.After(100 * time.Millisecond): - if err := resp.Send(&types.BytesValue{Value: outputBuf}); err != nil { - return fmt.Errorf("failed to write output: %w", err) + if len(output) > bufSize { + if err := resp.Send(&types.BytesValue{Value: output}); err != nil { + return fmt.Errorf("failed to write output: %w", err) + } + continue + } + buf = append(buf, output...) + case <-ticker.C: + if len(buf) > 0 { + flush() } - outputBuf = outputBuf[:0] // clear the buffer and continue } } } diff --git a/internal/orchestrator/tasks/taskforget.go b/internal/orchestrator/tasks/taskforget.go index 20787fa1..3e1c4bde 100644 --- a/internal/orchestrator/tasks/taskforget.go +++ b/internal/orchestrator/tasks/taskforget.go @@ -129,7 +129,7 @@ func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner) func useLegacyCompatMode(oplog *oplog.OpLog, planID string) (bool, error) { instanceIDs := make(map[string]struct{}) if err := oplog.ForEachByPlan(planID, indexutil.CollectAll(), func(op *v1.Operation) error { - if snapshotOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok { + if snapshotOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok && !snapshotOp.OperationIndexSnapshot.GetForgot() { tags := snapshotOp.OperationIndexSnapshot.GetSnapshot().GetTags() instanceIDs[repo.InstanceIDFromTags(tags)] = struct{}{} } diff --git a/webui/src/views/App.tsx b/webui/src/views/App.tsx index e921a191..2ec1a59b 100644 --- a/webui/src/views/App.tsx +++ b/webui/src/views/App.tsx @@ -71,7 +71,7 @@ export const App: React.FC = () => { ) { alertApi.error( "Failed to fetch initial config, typically this means the UI could not connect to the backend", - 0, + 0 ); return; } @@ -80,7 +80,7 @@ export const App: React.FC = () => { alertApi.error(err.message, 0); alertApi.error( "Failed to fetch initial config, typically this means the UI could not connect to the backend", - 0, + 0 ); }); }, []);