Skip to content

Commit

Permalink
feat: implement garbage collection of old operations
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Dec 22, 2023
1 parent 20dd78c commit 46456a8
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 66 deletions.
5 changes: 1 addition & 4 deletions internal/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,8 @@ func (o *OpLog) nextOperationId(b *bolt.Bucket, unixTimeMs int64) (int64, error)
func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error {
b := tx.Bucket(OpLogBucket)
if op.Id == 0 {
if op.UnixTimeStartMs == 0 {
return fmt.Errorf("operation must have a start time")
}
var err error
op.Id, err = o.nextOperationId(b, op.UnixTimeStartMs)
op.Id, err = o.nextOperationId(b, time.Now().UnixMilli())
if err != nil {
return fmt.Errorf("create next operation ID: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
zap.L().Info("Applied config to orchestrator, task queue reset. Rescheduling planned tasks now.")

// Requeue tasks that are affected by the config change.
o.ScheduleTask(&CollectGarbageTask{
orchestrator: o,
}, TaskPriorityDefault)
for _, plan := range cfg.Plans {
t, err := NewScheduledBackupTask(o, plan)
if err != nil {
Expand Down
118 changes: 118 additions & 0 deletions internal/orchestrator/taskcollectgarbage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package orchestrator

import (
"context"
"fmt"
"time"

v1 "github.com/garethgeorge/restora/gen/go/v1"
"go.uber.org/zap"
)

const (
gcStartupDelay = 5 * time.Second
gcInterval = 24 * time.Hour
// keep operations that are eligible for gc for 30 days OR up to a limit of 100 for any one plan.
// an operation is eligible for gc if:
// - it has no snapshot associated with it
// - it has a forgotten snapshot associated with it
gcHistoryAge = 30 * 24 * time.Hour
gcHistoryMaxCount = 100
)

type CollectGarbageTask struct {
orchestrator *Orchestrator // owning orchestrator
firstRun bool
}

var _ Task = &CollectGarbageTask{}

func (t *CollectGarbageTask) Name() string {
return "collect garbage"
}

func (t *CollectGarbageTask) Next(now time.Time) *time.Time {
if !t.firstRun {
t.firstRun = true
runAt := now.Add(gcStartupDelay)
return &runAt
}

runAt := now.Add(gcInterval)
return &runAt
}

func (t *CollectGarbageTask) Run(ctx context.Context) error {
oplog := t.orchestrator.OpLog

// pass 1: identify forgotten snapshots.
snapshotIsForgotten := make(map[string]bool)
if err := oplog.ForAll(func(op *v1.Operation) error {
if snapshotOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
if snapshotOp.OperationIndexSnapshot.Forgot {
snapshotIsForgotten[snapshotOp.OperationIndexSnapshot.Snapshot.Id] = true
}
}
return nil
}); err != nil {
return fmt.Errorf("identifying forgotten snapshots: %w", err)
}

// pass 2: identify operations that are gc eligible
// - any operation that has no snapshot associated with it
// - any operation that has a forgotten snapshot associated with it
operationsByPlan := make(map[string][]gcOpInfo)
if err := oplog.ForAll(func(op *v1.Operation) error {
if op.SnapshotId == "" || snapshotIsForgotten[op.SnapshotId] {
operationsByPlan[op.PlanId] = append(operationsByPlan[op.PlanId], gcOpInfo{
id: op.Id,
timestamp: op.UnixTimeStartMs,
})
}
return nil
}); err != nil {
return fmt.Errorf("identifying gc eligible operations: %w", err)
}

var gcOps []int64
curTime := curTimeMillis()
for _, opInfos := range operationsByPlan {
if len(opInfos) >= gcHistoryMaxCount {
for _, opInfo := range opInfos[:len(opInfos)-gcHistoryMaxCount] {
gcOps = append(gcOps, opInfo.id)
}
opInfos = opInfos[len(opInfos)-gcHistoryMaxCount:]
}

// check if each operation timestamp is old.
for _, opInfo := range opInfos {
if curTime-opInfo.timestamp > gcHistoryAge.Milliseconds() {
gcOps = append(gcOps, opInfo.id)
}
}
}

// pass 3: remove gc eligible operations
if err := oplog.Delete(gcOps...); err != nil {
return fmt.Errorf("removing gc eligible operations: %w", err)
}

zap.L().Info("collecting garbage",
zap.Int("forgotten_snapshots", len(snapshotIsForgotten)),
zap.Any("operations_removed", len(gcOps)))

return nil
}

func (t *CollectGarbageTask) Cancel(withStatus v1.OperationStatus) error {
return nil
}

func (t *CollectGarbageTask) OperationId() int64 {
return 0
}

type gcOpInfo struct {
id int64 // operation ID
timestamp int64 // unix time milliseconds
}
4 changes: 0 additions & 4 deletions internal/orchestrator/taskforget.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ func (t *ForgetTask) Run(ctx context.Context) error {
continue
}
}
// Soft delete the operation (can be recovered if necessary, todo: implement recovery).
if e := t.orch.OpLog.Delete(op.Id); err != nil {
err = multierror.Append(err, fmt.Errorf("delete operation %v: %w", op.Id, e))
}
}

if len(forgot) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion proto/v1/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ message OperationList {
}

message Operation {
// required, primary ID of the operation.
// required, primary ID of the operation. ID is sequential based on creation time of the operation.
int64 id = 1;
// required, repo id if associated with a repo
string repo_id = 2;
Expand Down
4 changes: 2 additions & 2 deletions webui/src/components/ActivityBar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const ActivityBar = () => {
}
});

return <span>{details.map(details => {
return <>{details.displayName} in progress for plan {details.op.planId} to {details.op.repoId} for {formatDuration(details.details.duration)}</>
return <span>{details.map((details, idx) => {
return <span key={idx}>{details.displayName} in progress for plan {details.op.planId} to {details.op.repoId} for {formatDuration(details.details.duration)}</span>
})}</span>
}
17 changes: 10 additions & 7 deletions webui/src/components/OperationList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ export const OperationList = ({
};
subscribeToOperations(lis);

backupCollector.subscribe(() => {
backupCollector.subscribe(_.debounce(() => {
let backups = backupCollector.getAll();
backups.sort((a, b) => {
return b.startTimeMs - a.startTimeMs;
});
setBackups(backups);
});
}, 50));

getOperations(req)
.then((ops) => {
Expand All @@ -106,7 +106,10 @@ export const OperationList = ({
};
}, [JSON.stringify(req)]);
} else {
backups = useBackups || [];
backups = [...(useBackups || [])];
backups.sort((a, b) => {
return b.startTimeMs - a.startTimeMs;
});
}

if (backups.length === 0) {
Expand Down Expand Up @@ -277,14 +280,14 @@ export const OperationRow = ({
children: <>
Removed snapshots:
<pre>{forgetOp.forget?.map((f) => (
<>
<div key={f.id}>
{"removed snapshot " + normalizeSnapshotId(f.id!) + " taken at " + formatTime(f.unixTimeMs!)} <br />
</>
</div>
))}</pre>
Policy:
<ul>
{policyDesc.map((desc) => (
<li>{desc}</li>
{policyDesc.map((desc, idx) => (
<li key={idx}>{desc}</li>
))}
</ul>
</>,
Expand Down
26 changes: 23 additions & 3 deletions webui/src/components/OperationTree.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ export const OperationTree = ({
};
subscribeToOperations(lis);

backupCollector.subscribe(() => {
backupCollector.subscribe(_.debounce(() => {
let backups = backupCollector.getAll();
backups.sort((a, b) => {
return b.startTimeMs - a.startTimeMs;
});
setBackups(backups);
});
}, 50));

getOperations(req)
.then((ops) => {
Expand All @@ -79,7 +79,7 @@ export const OperationTree = ({
}, [JSON.stringify(req)]);

const treeData = useMemo(() => {
return buildTreeYear(backups);
return buildTreePlan(backups);
}, [backups]);

if (backups.length === 0) {
Expand Down Expand Up @@ -199,6 +199,26 @@ export const OperationTree = ({
);
};

const buildTreePlan = (operations: BackupInfo[]): OpTreeNode[] => {
const grouped = _.groupBy(operations, (op) => {
return op.planId;
});

const entries: OpTreeNode[] = _.map(grouped, (value, key) => {
return {
key: "p" + key,
title: "" + key,
children: buildTreeYear(value),
};
});
entries.sort(sortByKey);

if (entries.length === 1) {
return entries[0].children!;
}
return entries;
};

const buildTreeYear = (operations: BackupInfo[]): OpTreeNode[] => {
const grouped = _.groupBy(operations, (op) => {
return localISOTime(op.displayTime).substring(0, 4);
Expand Down
16 changes: 10 additions & 6 deletions webui/src/components/SnapshotBrowser.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import React, { useEffect, useMemo, useState } from "react";
import { Button, Dropdown, Form, Input, Modal, Space, Tree } from "antd";
import { Button, Dropdown, Form, Input, Modal, Space, Spin, Tree } from "antd";
import type { DataNode, EventDataNode } from "antd/es/tree";
import {
ListSnapshotFilesResponse,
Expand Down Expand Up @@ -109,7 +109,7 @@ export const SnapshotBrowser = ({
},
{ pathPrefix: "/api" }
);

setTreeData((treeData) => {
let toUpdate: DataNode | null = null;
for (const node of treeData) {
Expand All @@ -118,14 +118,14 @@ export const SnapshotBrowser = ({
break;
}
}

if (!toUpdate) {
return treeData;
}

const toUpdateCopy = { ...toUpdate };
toUpdateCopy.children = respToNodes(resp);

return treeData.map((node) => {
const didUpdate = replaceKeyInTree(node, key as string, toUpdateCopy);
if (didUpdate) {
Expand All @@ -136,6 +136,10 @@ export const SnapshotBrowser = ({
});
};

if (treeData.length === 0) {
return <Spin />;
}

return (
<SnapshotBrowserContext.Provider
value={{ snapshotId, repoId, planId, showModal }}
Expand Down Expand Up @@ -327,4 +331,4 @@ const RestoreModal = ({
);
};

const restoreFlow = (repoId: string, snapshotId: string, path: string) => {};
const restoreFlow = (repoId: string, snapshotId: string, path: string) => { };
Loading

0 comments on commit 46456a8

Please sign in to comment.