Skip to content

Commit

Permalink
Merge pull request #8 from linkensphere201/master
Browse files Browse the repository at this point in the history
refine meta backup and restore process of local storage
  • Loading branch information
linkensphere201 authored Jul 16, 2021
2 parents 23fe601 + 8a168ec commit 1f2be49
Show file tree
Hide file tree
Showing 24 changed files with 789 additions and 128 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@ PKG := ./pkg/...
BUILDTARGET := bin/br
SUFFIX := $(GOEXE)

GITSHA := $(shell git describe --no-match --always --dirty)
GITREF := $(shell git rev-parse --abbrev-ref HEAD)

REPO := github.com/vesoft-inc/nebula-br

LDFLAGS += -X $(REPO)/pkg/version.GitSha=$(GITSHA)
LDFLAGS += -X $(REPO)/pkg/version.GitRef=$(GITREF)

build:
$(GO) build -o $(BUILDTARGET) main.go
$(GO) build -ldflags '$(LDFLAGS)' -o $(BUILDTARGET) main.go

test:
$(GO) test -v $(PKG) -short
Expand Down
31 changes: 26 additions & 5 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,25 @@ func NewBackupCmd() *cobra.Command {

backupCmd.AddCommand(newFullBackupCmd())
backupCmd.PersistentFlags().StringVar(&backupConfig.Meta, "meta", "", "meta server")
backupCmd.PersistentFlags().StringArrayVar(&backupConfig.SpaceNames, "spaces", nil, "space names")
backupCmd.PersistentFlags().StringVar(&backupConfig.BackendUrl, "storage", "", "storage path")
backupCmd.PersistentFlags().StringVar(&backupConfig.User, "user", "", "user for meta/storage")
backupCmd.PersistentFlags().StringArrayVar(&backupConfig.SpaceNames, "spaces", nil,
`(EXPERIMENTAL)space names.
By this option, user can specify which spaces to backup. Now this feature is still experimental.
`)
backupCmd.PersistentFlags().StringVar(&backupConfig.BackendUrl, "storage", "",
`backup target url, format: <SCHEME>://<PATH>.
<SCHEME>: a string indicating which backend type. optional: local, hdfs.
now hdfs and local is supported, s3 and oss are still experimental.
example:
for local - "local:///the/local/path/to/backup"
for hdfs - "hdfs://example_host:example_port/examplepath"
(EXPERIMENTAL) for oss - "oss://example/url/to/the/backup"
(EXPERIMENTAL) for s3 - "s3://example/url/to/the/backup"
`)
backupCmd.PersistentFlags().StringVar(&backupConfig.User, "user", "", "username to login into the hosts where meta/storage service located")
backupCmd.PersistentFlags().IntVar(&backupConfig.MaxSSHConnections, "connection", 5, "max ssh connection")
backupCmd.PersistentFlags().IntVar(&backupConfig.MaxConcurrent, "concurrent", 5, "max concurrent(for aliyun OSS)")
backupCmd.PersistentFlags().StringVar(&backupConfig.CommandArgs, "extra_args", "", "backup storage utils(oss/hdfs/s3) args for backup")
backupCmd.PersistentFlags().BoolVar(&backupConfig.Verbose, "verbose", false, "show backup detailed informations")

backupCmd.MarkPersistentFlagRequired("meta")
backupCmd.MarkPersistentFlagRequired("storage")
Expand Down Expand Up @@ -54,13 +67,21 @@ func newFullBackupCmd() *cobra.Command {
return err
}
defer logger.Sync() // flushes buffer, if any
b := backup.NewBackupClient(backupConfig, logger.Logger)
var b *backup.Backup
b, err = backup.NewBackupClient(backupConfig, logger.Logger)
if err != nil {
return err
}

fmt.Println("start to backup cluster...")
err = b.BackupCluster()
if err != nil {
return err
}
fmt.Println("backup successed")
fmt.Println("backup successed.")
if backupConfig.Verbose {
b.ShowSummaries()
}
return nil
},
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func NewCleanupCmd() *cobra.Command {
cleanupCmd := &cobra.Command{
Use: "cleanup",
Short: "Clean up temporary files in backup",
Short: "[EXPERIMENTAL]Clean up temporary files in backup",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
logger, _ := log.NewLogger(config.LogPath)
Expand Down
8 changes: 6 additions & 2 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func NewRestoreCMD() *cobra.Command {
restoreCmd.PersistentFlags().StringVar(&restoreConfig.BackendUrl, "storage", "", "storage path")
restoreCmd.PersistentFlags().StringVar(&restoreConfig.User, "user", "", "user for meta and storage")
restoreCmd.PersistentFlags().StringVar(&restoreConfig.BackupName, "name", "", "backup name")
restoreCmd.PersistentFlags().BoolVar(&restoreConfig.AllowStandaloneMeta, "allow_standalone_meta", false, "if the target cluster with standalone meta service is allowed(for testing purpose)")
restoreCmd.PersistentFlags().IntVar(&restoreConfig.MaxConcurrent, "concurrent", 5, "max concurrent(for aliyun OSS)")
restoreCmd.PersistentFlags().StringVar(&restoreConfig.CommandArgs, "extra_args", "", "storage utils(oss/hdfs/s3) args for restore")

Expand Down Expand Up @@ -55,7 +54,12 @@ func newFullRestoreCmd() *cobra.Command {

defer logger.Sync() // flushes buffer, if any

r := restore.NewRestore(restoreConfig, logger.Logger)
var r *restore.Restore
r, err = restore.NewRestore(restoreConfig, logger.Logger)
if err != nil {
return err
}

err = r.RestoreCluster()
if err != nil {
return err
Expand Down
13 changes: 9 additions & 4 deletions cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ import (
"fmt"

"github.com/spf13/cobra"
"github.com/vesoft-inc/nebula-br/pkg/version"
)

var version string = "2.0"

func NewVersionCmd() *cobra.Command {
versionCmd := &cobra.Command{
Use: "version",
Short: "print the version of nebula br",
Short: "print the version of nebula br tool",
RunE: func(cmd *cobra.Command, args []string) error {
fmt.Println(version)
vstring := fmt.Sprintf(
`%s,V-%d.%d.%d
GitSha: %s
GitRef: %s
please run "help" subcommand for more infomation.`, version.VerName, version.VerMajor, version.VerMinor, version.VerPatch, version.GitSha, version.GitRef)

fmt.Println(vstring)
return nil
},
}
Expand Down
113 changes: 107 additions & 6 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backup

import (
"encoding/json"
"errors"
"fmt"
_ "os"
Expand All @@ -16,6 +17,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/vesoft-inc/nebula-br/pkg/config"
backupCtx "github.com/vesoft-inc/nebula-br/pkg/context"
"github.com/vesoft-inc/nebula-br/pkg/metaclient"
"github.com/vesoft-inc/nebula-br/pkg/remote"
"github.com/vesoft-inc/nebula-br/pkg/storage"
Expand Down Expand Up @@ -44,21 +46,50 @@ func (e *BackupError) Error() string {
return e.msg + e.Err.Error()
}

type backupEntry struct {
SrcPath string
DestUrl string
}
type idPathMap map[string][]backupEntry
type Backup struct {
config config.BackupConfig
metaLeader string
backendStorage storage.ExternalStorage
log *zap.Logger
metaFileName string
storageMap map[string]idPathMap
metaMap map[string]idPathMap
storeCtx *backupCtx.Context
}

func NewBackupClient(cf config.BackupConfig, log *zap.Logger) *Backup {
backend, err := storage.NewExternalStorage(cf.BackendUrl, log, cf.MaxConcurrent, cf.CommandArgs)
func NewBackupClient(cf config.BackupConfig, log *zap.Logger) (*Backup, error) {
local_addr, err := remote.GetAddresstoReachRemote(strings.Split(cf.Meta, ":")[0], cf.User, log)
if err != nil {
log.Error("get local address failed", zap.Error(err))
return nil, err
}
log.Info("local address", zap.String("address", local_addr))
var (
storeCtx backupCtx.Context
backend storage.ExternalStorage
)
backend, err = storage.NewExternalStorage(cf.BackendUrl, log, cf.MaxConcurrent, cf.CommandArgs,
&storeCtx)
if err != nil {
log.Error("new external storage failed", zap.Error(err))
return nil
return nil, err
}
return &Backup{config: cf, backendStorage: backend, log: log}

b := &Backup{config: cf, log: log,
storageMap: make(map[string]idPathMap),
metaMap: make(map[string]idPathMap),
storeCtx: &storeCtx}

b.storeCtx.LocalAddr = local_addr
b.storeCtx.Reporter = b
b.backendStorage = backend

return b, nil
}

func (b *Backup) dropBackup(name []byte) (*meta.ExecResp, error) {
Expand Down Expand Up @@ -149,6 +180,9 @@ func (b *Backup) BackupCluster() error {
}

meta := resp.GetMeta()
b.log.Info("response backup meta",
zap.String("backup.meta", metaclient.BackupMetaToString(meta)))

err = b.uploadAll(meta)
if err != nil {
return err
Expand All @@ -157,12 +191,27 @@ func (b *Backup) BackupCluster() error {
return nil
}

func (b *Backup) execPreUploadMetaCommand(metaDir string) error {
cmdStr := []string{"mkdir", "-p", metaDir}
b.log.Info("exec pre upload meta command", zap.Strings("cmd", cmdStr))
cmd := exec.Command(cmdStr[0], cmdStr[1:]...)
err := cmd.Run()
if err != nil {
return err
}
cmd.Wait()
return nil
}

func (b *Backup) uploadMeta(g *errgroup.Group, files []string) {

b.log.Info("will upload meta", zap.Int("sst file count", len(files)))
cmd := b.backendStorage.BackupMetaCommand(files)
b.log.Info("start upload meta", zap.String("addr", b.metaLeader))
ipAddr := strings.Split(b.metaLeader, ":")
b.storeCtx.RemoteAddr = ipAddr[0]

b.log.Info("will upload meta", zap.Int("sst file count", len(files)))
cmd := b.backendStorage.BackupMetaCommand(files)

func(addr string, user string, cmd string, log *zap.Logger) {
g.Go(func() error {
client, err := remote.NewClient(addr, user, log)
Expand Down Expand Up @@ -193,6 +242,9 @@ func (b *Backup) uploadStorage(g *errgroup.Group, dirs map[string][]spaceInfo) e
return err
}
i := 0

b.storeCtx.RemoteAddr = ipAddrs[0]

//We need to limit the number of ssh connections per storage node
for id2, cp := range idMap {
cmds := b.backendStorage.BackupStorageCommand(cp, k, id2)
Expand Down Expand Up @@ -254,6 +306,14 @@ func (b *Backup) uploadAll(meta *meta.BackupMeta) error {
return err
}

if b.backendStorage.Scheme() == storage.SCHEME_LOCAL { // NB: only local backend need this
err = b.execPreUploadMetaCommand(b.backendStorage.BackupMetaDir())
if err != nil {
b.log.Error("exec pre uploadmeta command failed", zap.Error(err))
return err
}
}

var metaFiles []string
for _, f := range meta.GetMetaFiles() {
fileName := string(f[:])
Expand All @@ -271,6 +331,7 @@ func (b *Backup) uploadAll(meta *meta.BackupMeta) error {
}
}
}

err = b.uploadStorage(g, storageMap)
if err != nil {
return err
Expand Down Expand Up @@ -305,3 +366,43 @@ func (b *Backup) uploadAll(meta *meta.BackupMeta) error {

return nil
}

func (b *Backup) ShowSummaries() {
fmt.Printf("==== backup summeries ====\n")
fmt.Printf("localaddr : %s\n", b.storeCtx.LocalAddr)
fmt.Printf("backend.type : %s\n", b.backendStorage.Scheme())
fmt.Printf("backend.url : %s\n", b.backendStorage.URI())
fmt.Printf("tgt.meta.leader : %s\n", b.config.Meta)
if b.backendStorage.Scheme() == storage.SCHEME_LOCAL {
// if local, storages' snapshot would be copy to a path at that host.
b.showUploadSummaries(&b.metaMap, "tgt.meta.map")
b.showUploadSummaries(&b.storageMap, "tgt.storage.map")
}
fmt.Printf("==========================\n")
}

func (b *Backup) showUploadSummaries(m *map[string]idPathMap, msg string) {
o, _ := json.MarshalIndent(m, "", " ")
fmt.Printf("--- %s ---\n", msg)
fmt.Printf("%s\n", string(o))
}

func (b *Backup) doRecordUploading(m *map[string]idPathMap, spaceId string, host string, paths []string, desturl string) {
if (*m)[host] == nil {
(*m)[host] = make(idPathMap)
}
bes := []backupEntry{}
for _, p := range paths {
bes = append(bes, backupEntry{SrcPath: p, DestUrl: desturl})
}
(*m)[host][spaceId] = append((*m)[host][spaceId], bes[:]...)
}

func (b *Backup) StorageUploadingReport(spaceid string, host string, paths []string, desturl string) {
b.doRecordUploading(&b.storageMap, spaceid, host, paths, desturl)
}

func (b *Backup) MetaUploadingReport(host string, paths []string, desturl string) {
kDefaultSid := "0"
b.doRecordUploading(&b.metaMap, kDefaultSid, host, paths, desturl)
}
12 changes: 6 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ type BackupConfig struct {
// Only for OSS for now
MaxConcurrent int
CommandArgs string
Verbose bool
}

type RestoreConfig struct {
Meta string
BackendUrl string
MaxSSHConnections int
User string
BackupName string
AllowStandaloneMeta bool
Meta string
BackendUrl string
MaxSSHConnections int
User string
BackupName string
// Only for OSS for now
MaxConcurrent int
CommandArgs string
Expand Down
21 changes: 21 additions & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package context

import (
_ "github.com/vesoft-inc/nebula-go/v2/nebula"
)

type BackendUploadTracker interface {
StorageUploadingReport(spaceid string, host string, paths []string, desturl string)
MetaUploadingReport(host string, paths []string, desturl string)
}

// NB - not thread-safe
type Context struct {
LocalAddr string // the address of br client
RemoteAddr string // the address of nebula service
Reporter BackendUploadTracker
}

func NewContext(localaddr string, r BackendUploadTracker) *Context {
return &Context{LocalAddr: localaddr, Reporter: r}
}
18 changes: 18 additions & 0 deletions pkg/metaclient/util.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
package metaclient

import (
"encoding/json"
"strconv"

"github.com/vesoft-inc/nebula-go/v2/nebula"
"github.com/vesoft-inc/nebula-go/v2/nebula/meta"
)

func HostaddrToString(host *nebula.HostAddr) string {
return host.Host + ":" + strconv.Itoa(int(host.Port))
}

func BackupMetaToString(m *meta.BackupMeta) string {
mstr, err := json.Marshal(m)
if err != nil {
return ""
}
return string(mstr)
}

func ListClusterInfoRespToString(m *meta.ListClusterInfoResp) string {
mstr, err := json.Marshal(m)
if err != nil {
return ""
}
return string(mstr)
}
Loading

0 comments on commit 1f2be49

Please sign in to comment.