Skip to content

Commit

Permalink
feat: initial backend implementation of multihost synchronization (#562)
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge authored Jan 8, 2025
1 parent b3402a1 commit a4b4de5
Show file tree
Hide file tree
Showing 101 changed files with 7,943 additions and 2,168 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
test*
backrest-*
dist
__debug_bin
Expand Down
56 changes: 52 additions & 4 deletions cmd/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/gen/go/v1/v1connect"
"github.com/garethgeorge/backrest/internal/api"
syncapi "github.com/garethgeorge/backrest/internal/api/syncapi"
"github.com/garethgeorge/backrest/internal/auth"
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/env"
Expand Down Expand Up @@ -67,6 +68,7 @@ func main() {
if err != nil {
zap.S().Fatalf("error loading config: %v", err)
}
configMgr := &config.ConfigManager{Store: configStore}

var wg sync.WaitGroup

Expand All @@ -86,6 +88,7 @@ func main() {
zap.S().Fatalf("error creating oplog: %v", err)
}
migrateBboltOplog(opstore)
migratePopulateGuids(opstore, cfg)

// Create rotating log storage
logStore, err := logstore.NewLogStore(filepath.Join(env.DataDir(), "tasklogs"))
Expand All @@ -112,6 +115,7 @@ func main() {
}()

// Create orchestrator and start task loop.
// TODO: update the orchestrator to accept a configMgr and auto-refresh the config w/o explicit ApplyConfig call.
orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, log, logStore)
if err != nil {
zap.S().Fatalf("error creating orchestrator: %v", err)
Expand All @@ -124,18 +128,29 @@ func main() {
}()

// Create and serve the HTTP gateway
remoteConfigStore := syncapi.NewJSONDirRemoteConfigStore(filepath.Join(env.DataDir(), "sync", "remote_configs"))
syncMgr := syncapi.NewSyncManager(configMgr, remoteConfigStore, log, orchestrator)
wg.Add(1)
go func() {
syncMgr.RunSync(ctx)
wg.Done()
}()

syncHandler := syncapi.NewBackrestSyncHandler(syncMgr)

apiBackrestHandler := api.NewBackrestHandler(
configStore,
configMgr,
remoteConfigStore,
orchestrator,
log,
logStore,
)

authenticator := auth.NewAuthenticator(getSecret(), configStore)
authenticator := auth.NewAuthenticator(getSecret(), configMgr)
apiAuthenticationHandler := api.NewAuthenticationHandler(authenticator)

mux := http.NewServeMux()
mux.Handle(v1connect.NewAuthenticationHandler(apiAuthenticationHandler))
mux.Handle(v1connect.NewBackrestSyncServiceHandler(syncHandler))
backrestHandlerPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler)
mux.Handle(backrestHandlerPath, auth.RequireAuthentication(backrestHandler, authenticator))
mux.Handle("/", webui.Handler())
Expand Down Expand Up @@ -217,10 +232,15 @@ func installLoggers() {
c := zap.NewDevelopmentEncoderConfig()
c.EncodeLevel = zapcore.CapitalColorLevelEncoder
c.EncodeTime = zapcore.ISO8601TimeEncoder

debugLevel := zapcore.InfoLevel
if version == "unknown" { // dev build
debugLevel = zapcore.DebugLevel
}
pretty := zapcore.NewCore(
zapcore.NewConsoleEncoder(c),
zapcore.AddSync(colorable.NewColorableStdout()),
zapcore.InfoLevel,
debugLevel,
)

// JSON logging to log directory
Expand Down Expand Up @@ -294,3 +314,31 @@ func migrateBboltOplog(logstore oplog.OpStore) {
}
zap.S().Infof("migrated %d operations from old bbolt oplog to sqlite", count)
}

func migratePopulateGuids(logstore oplog.OpStore, cfg *v1.Config) {
zap.S().Info("migrating oplog to populate GUIDs")

repoToGUID := make(map[string]string)
for _, repo := range cfg.Repos {
if repo.Guid != "" {
repoToGUID[repo.Id] = repo.Guid
}
}

migratedOpCount := 0
if err := logstore.Transform(oplog.Query{}.SetRepoGUID(""), func(op *v1.Operation) (*v1.Operation, error) {
if op.RepoGuid != "" {
return nil, nil
}
if guid, ok := repoToGUID[op.RepoId]; ok {
op.RepoGuid = guid
migratedOpCount++
return op, nil
}
return nil, nil
}); err != nil {
zap.S().Fatalf("error populating repo GUIDs for existing operations: %v", err)
} else if migratedOpCount > 0 {
zap.S().Infof("populated repo GUIDs for %d existing operations", migratedOpCount)
}
}
77 changes: 0 additions & 77 deletions cmd/devtools/oplogexport/main.go

This file was deleted.

70 changes: 0 additions & 70 deletions cmd/devtools/oplogimport/main.go

This file was deleted.

Empty file.
Binary file not shown.
Loading

0 comments on commit a4b4de5

Please sign in to comment.