Skip to content

Commit

Permalink
feat: add button to forget individual snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Feb 15, 2024
1 parent dc7a3a5 commit 276b1d2
Show file tree
Hide file tree
Showing 15 changed files with 587 additions and 250 deletions.
482 changes: 282 additions & 200 deletions gen/go/v1/service.pb.go

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions gen/go/v1/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions gen/go/v1/v1connect/service.connect.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 32 additions & 14 deletions internal/api/backresthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,22 +291,40 @@ func (s *BackrestHandler) Backup(ctx context.Context, req *connect.Request[types
return connect.NewResponse(&emptypb.Empty{}), err
}

func (s *BackrestHandler) Forget(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) {
plan, err := s.orchestrator.GetPlan(req.Msg.Value)
if err != nil {
return nil, fmt.Errorf("failed to get plan %q: %w", req.Msg.Value, err)
func (s *BackrestHandler) Forget(ctx context.Context, req *connect.Request[v1.ForgetRequest]) (*connect.Response[emptypb.Empty], error) {
at := time.Now()
var err error
if req.Msg.SnapshotId != "" && req.Msg.PlanId != "" && req.Msg.RepoId != "" {
wait := make(chan struct{})
s.orchestrator.ScheduleTask(
orchestrator.NewOneoffForgetSnapshotTask(s.orchestrator, req.Msg.RepoId, req.Msg.PlanId, req.Msg.SnapshotId, at),
orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget, func(e error) {
err = e
close(wait)
})
<-wait
} else if req.Msg.RepoId != "" && req.Msg.PlanId != "" {
plan, err := s.orchestrator.GetPlan(req.Msg.PlanId)
if err != nil {
return nil, fmt.Errorf("failed to get plan %q: %w", req.Msg.PlanId, err)
}

wait := make(chan struct{})
s.orchestrator.ScheduleTask(
orchestrator.NewOneoffForgetTask(s.orchestrator, plan, "", at),
orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget, func(e error) {
err = e
close(wait)
})
<-wait
} else {
return nil, errors.New("must specify repoId and planId and (optionally) snapshotId")
}

at := time.Now()
var wg sync.WaitGroup
wg.Add(1)
s.orchestrator.ScheduleTask(orchestrator.NewOneoffForgetTask(s.orchestrator, plan, "", at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget, func(e error) {
err = e
wg.Done()
})
s.orchestrator.ScheduleTask(orchestrator.NewOneoffIndexSnapshotsTask(s.orchestrator, plan.Repo, at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityIndexSnapshots)
wg.Wait()
return connect.NewResponse(&emptypb.Empty{}), err
if err != nil {
return nil, err
}
return connect.NewResponse(&emptypb.Empty{}), nil
}

func (s *BackrestHandler) Prune(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) {
Expand Down
8 changes: 8 additions & 0 deletions internal/orchestrator/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ func (r *RepoOrchestrator) Forget(ctx context.Context, plan *v1.Plan) ([]*v1.Res
return forgotten, nil
}

func (r *RepoOrchestrator) ForgetSnapshot(ctx context.Context, snapshotId string) error {
r.mu.Lock()
defer r.mu.Unlock()

r.l.Debug("Forget snapshot with ID", zap.String("snapshot", snapshotId))
return r.repo.ForgetSnapshot(ctx, snapshotId)
}

func (r *RepoOrchestrator) Prune(ctx context.Context, output io.Writer) error {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down
6 changes: 0 additions & 6 deletions internal/orchestrator/taskforget.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package orchestrator

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -11,7 +10,6 @@ import (
"github.com/garethgeorge/backrest/internal/oplog/indexutil"
"github.com/hashicorp/go-multierror"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

// ForgetTask tracks a forget operation.
Expand Down Expand Up @@ -60,10 +58,6 @@ func (t *ForgetTask) Next(now time.Time) *time.Time {
}

func (t *ForgetTask) Run(ctx context.Context) error {
if t.plan.Retention == nil || proto.Equal(t.plan.Retention, &v1.RetentionPolicy{}) {
return errors.New("plan does not have a retention policy")
}

if err := t.runWithOpAndContext(ctx, func(ctx context.Context, op *v1.Operation) error {
forgetOp := &v1.Operation_OperationForget{
OperationForget: &v1.OperationForget{},
Expand Down
94 changes: 94 additions & 0 deletions internal/orchestrator/taskforgetsnapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package orchestrator

import (
"context"
"fmt"
"time"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/oplog/indexutil"
"github.com/hashicorp/go-multierror"
"go.uber.org/zap"
)

// ForgetTask tracks a forget operation.
type ForgetSnapshotTask struct {
TaskWithOperation
repoId string
planId string
forgetSnapshot string
at *time.Time
}

var _ Task = &ForgetSnapshotTask{}

func NewOneoffForgetSnapshotTask(orchestrator *Orchestrator, repoId, planId, forgetSnapshot string, at time.Time) *ForgetSnapshotTask {
return &ForgetSnapshotTask{
TaskWithOperation: TaskWithOperation{
orch: orchestrator,
},
repoId: repoId,
planId: planId,
at: &at,
forgetSnapshot: forgetSnapshot,
}
}

func (t *ForgetSnapshotTask) Name() string {
return fmt.Sprintf("forget snapshot %q", t.forgetSnapshot)
}

func (t *ForgetSnapshotTask) Next(now time.Time) *time.Time {
ret := t.at
if ret != nil {
t.at = nil
if err := t.setOperation(&v1.Operation{
PlanId: t.planId,
RepoId: t.repoId,
UnixTimeStartMs: timeToUnixMillis(*ret),
Status: v1.OperationStatus_STATUS_PENDING,
Op: &v1.Operation_OperationForget{},
}); err != nil {
zap.S().Errorf("task %v failed to add operation to oplog: %v", t.Name(), err)
return nil
}
}
return ret
}

func (t *ForgetSnapshotTask) Run(ctx context.Context) error {
id := t.op.Id
if err := t.runWithOpAndContext(ctx, func(ctx context.Context, op *v1.Operation) error {
repo, err := t.orch.GetRepo(t.repoId)
if err != nil {
return fmt.Errorf("get repo %q: %w", t.repoId, err)
}

// Find snapshot to forget
var ops []*v1.Operation
t.orch.OpLog.ForEachBySnapshotId(t.forgetSnapshot, indexutil.CollectAll(), func(op *v1.Operation) error {
ops = append(ops, op)
return nil
})

for _, op := range ops {
if indexOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
err := repo.ForgetSnapshot(ctx, op.SnapshotId)
if err != nil {
return fmt.Errorf("forget %q: %w", op.SnapshotId, err)
}
indexOp.OperationIndexSnapshot.Forgot = true
if e := t.orch.OpLog.Update(op); err != nil {
err = multierror.Append(err, fmt.Errorf("mark index snapshot %v as forgotten: %w", op.Id, e))
continue
}
}
}

return err
}); err != nil {
return err
}

return t.orch.OpLog.Delete(id)
}
20 changes: 20 additions & 0 deletions pkg/restic/restic.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,26 @@ func (r *Repo) Forget(ctx context.Context, policy *RetentionPolicy, opts ...Gene
return &result[0], nil
}

func (r *Repo) ForgetSnapshot(ctx context.Context, snapshotId string, opts ...GenericOption) error {
opt := resolveOpts(opts)

args := []string{"forget", "--json", snapshotId}
args = append(args, r.extraArgs...)
args = append(args, opt.extraArgs...)
args = append(args, snapshotId)

cmd := exec.CommandContext(ctx, r.cmd, args...)
cmd.Env = append(cmd.Env, r.buildEnv()...)
cmd.Env = append(cmd.Env, opt.extraEnv...)

output, err := cmd.CombinedOutput()
if err != nil {
return newCmdError(cmd, string(output), err)
}

return nil
}

func (r *Repo) Prune(ctx context.Context, pruneOutput io.Writer, opts ...GenericOption) error {
opt := resolveOpts(opts)

Expand Down
43 changes: 41 additions & 2 deletions pkg/restic/restic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,10 @@ func TestResticForget(t *testing.T) {
ids = append(ids, output.SnapshotId)
}

// prune all snapshots
// forget snapshots
res, err := r.Forget(context.Background(), &RetentionPolicy{KeepLastN: 3})
if err != nil {
t.Fatalf("failed to prune snapshots: %v", err)
t.Fatalf("failed to forget snapshots: %v", err)
}

if len(res.Keep) != 3 {
Expand Down Expand Up @@ -286,6 +286,45 @@ func TestResticForget(t *testing.T) {
}
}

func TestForgetSnapshotId(t *testing.T) {
t.Parallel()

repo := t.TempDir()
r := NewRepo(helpers.ResticBinary(t), &v1.Repo{
Id: "test",
Uri: repo,
Password: "test",
}, WithFlags("--no-cache"))
if err := r.Init(context.Background()); err != nil {
t.Fatalf("failed to init repo: %v", err)
}

testData := helpers.CreateTestData(t)

ids := make([]string, 0)
for i := 0; i < 5; i++ {
output, err := r.Backup(context.Background(), nil, WithBackupPaths(testData))
if err != nil {
t.Fatalf("failed to backup and create new snapshot: %v", err)
}
ids = append(ids, output.SnapshotId)
}

// forget snapshot by ID
err := r.ForgetSnapshot(context.Background(), ids[0])
if err != nil {
t.Fatalf("failed to forget snapshots: %v", err)
}

snapshots, err := r.Snapshots(context.Background())
if err != nil {
t.Fatalf("failed to list snapshots: %v", err)
}
if len(snapshots) != 4 {
t.Errorf("wanted 4 snapshots, got: %d", len(snapshots))
}
}

func TestResticPrune(t *testing.T) {
t.Parallel()

Expand Down
Loading

0 comments on commit 276b1d2

Please sign in to comment.