From 411a4fb6f00fd46f1fbdb0b8e3a971d016a6e0f8 Mon Sep 17 00:00:00 2001 From: garethgeorge Date: Sun, 12 May 2024 00:26:28 -0700 Subject: [PATCH] fix: concurrency issues in run command handler --- internal/api/backresthandler.go | 7 +++++-- internal/config/validate.go | 2 +- internal/oplog/oplog.go | 8 +++++--- internal/orchestrator/repo/repo.go | 2 +- webui/src/state/oplog.ts | 4 ++-- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/internal/api/backresthandler.go b/internal/api/backresthandler.go index 0041d357..247ece5f 100644 --- a/internal/api/backresthandler.go +++ b/internal/api/backresthandler.go @@ -1,6 +1,7 @@ package api import ( + "bytes" "context" "encoding/hex" "errors" @@ -419,14 +420,16 @@ func (s *BackrestHandler) RunCommand(ctx context.Context, req *connect.Request[v ctx, cancel := context.WithCancel(ctx) - errChan := make(chan error, 1) outputs := make(chan []byte, 100) + errChan := make(chan error, 1) go func() { + start := time.Now() if err := repo.RunCommand(ctx, req.Msg.Command, func(output []byte) { - outputs <- output + outputs <- bytes.Clone(output) }); err != nil { errChan <- err } + outputs <- []byte("took " + time.Since(start).String()) cancel() }() diff --git a/internal/config/validate.go b/internal/config/validate.go index eeb1432b..02f309ae 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -108,7 +108,7 @@ func validatePlan(plan *v1.Plan, repos map[string]*v1.Repo) error { if plan.Retention != nil && plan.Retention.Policy == nil { err = multierror.Append(err, errors.New("retention policy must be nil or must specify a policy")) - } else if policyTimeBucketed, ok := plan.Retention.Policy.(*v1.RetentionPolicy_PolicyTimeBucketed); ok { + } else if policyTimeBucketed, ok := plan.Retention.GetPolicy().(*v1.RetentionPolicy_PolicyTimeBucketed); ok { if proto.Equal(policyTimeBucketed.PolicyTimeBucketed, &v1.RetentionPolicy_TimeBucketedCounts{}) { err = multierror.Append(err, errors.New("time bucketed policy must specify a non-empty bucket")) } diff --git a/internal/oplog/oplog.go b/internal/oplog/oplog.go index c224c15d..bf8f9e03 100644 --- a/internal/oplog/oplog.go +++ b/internal/oplog/oplog.go @@ -88,14 +88,16 @@ func NewOpLog(databasePath string) (*OpLog, error) { // Scan checks the log for incomplete operations. Should only be called at startup. func (o *OpLog) Scan(onIncomplete func(op *v1.Operation)) error { zap.L().Debug("scanning oplog for incomplete operations") + t := time.Now() err := o.db.Update(func(tx *bolt.Tx) error { sysBucket := tx.Bucket(SystemBucket) opLogBucket := tx.Bucket(OpLogBucket) c := opLogBucket.Cursor() + var k, v []byte if lastValidated := sysBucket.Get([]byte("last_validated")); lastValidated != nil { - c.Seek(lastValidated) + k, v = c.Seek(lastValidated) } - for k, v := c.Prev(); k != nil; k, v = c.Next() { + for ; k != nil; k, v = c.Next() { op := &v1.Operation{} if err := proto.Unmarshal(v, op); err != nil { zap.L().Error("error unmarshalling operation, there may be corruption in the oplog", zap.Error(err)) @@ -124,7 +126,7 @@ func (o *OpLog) Scan(onIncomplete func(op *v1.Operation)) error { if err != nil { return fmt.Errorf("scanning log: %v", err) } - zap.L().Debug("scan complete") + zap.L().Debug("scan complete", zap.Duration("duration", time.Since(t))) return nil } diff --git a/internal/orchestrator/repo/repo.go b/internal/orchestrator/repo/repo.go index 51e04026..c071b5aa 100644 --- a/internal/orchestrator/repo/repo.go +++ b/internal/orchestrator/repo/repo.go @@ -379,7 +379,7 @@ func chunkBy[T any](items []T, chunkSize int) (chunks [][]T) { } type callbackWriter struct { - callback func([]byte) + callback func([]byte) // note: callback must not retain the byte slice } func (w *callbackWriter) Write(p []byte) (n int, err error) { diff --git a/webui/src/state/oplog.ts b/webui/src/state/oplog.ts index c72a228d..acff8b36 100644 --- a/webui/src/state/oplog.ts +++ b/webui/src/state/oplog.ts @@ -388,7 +388,7 @@ export const colorForStatus = (status: OperationStatus) => { case OperationStatus.STATUS_SUCCESS: return "green"; case OperationStatus.STATUS_USER_CANCELLED: - return "orange"; + return "yellow"; default: return "grey"; } @@ -432,7 +432,7 @@ export const detailsForOperation = ( break; case OperationStatus.STATUS_USER_CANCELLED: state = "cancelled"; - color = "orange"; + color = "yellow"; break; default: state = "";