Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine meta backup and restore process of local storage #8

Merged
merged 14 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,25 @@ func NewBackupCmd() *cobra.Command {

backupCmd.AddCommand(newFullBackupCmd())
backupCmd.PersistentFlags().StringVar(&backupConfig.Meta, "meta", "", "meta server")
backupCmd.PersistentFlags().StringArrayVar(&backupConfig.SpaceNames, "spaces", nil, "space names")
backupCmd.PersistentFlags().StringVar(&backupConfig.BackendUrl, "storage", "", "storage path")
backupCmd.PersistentFlags().StringVar(&backupConfig.User, "user", "", "user for meta/storage")
backupCmd.PersistentFlags().StringArrayVar(&backupConfig.SpaceNames, "spaces", nil,
`(EXPERIMENTAL)space names.
By this option, user can specify which spaces to backup. Now this feature is still experimental.
`)
backupCmd.PersistentFlags().StringVar(&backupConfig.BackendUrl, "storage", "",
`backup target url, format: <SCHEME>://<PATH>.
<SCHEME>: a string indicating which backend type. optional: local, hdfs.
now hdfs and local is supported, s3 and oss are still experimental.
example:
for local - "local:///the/local/path/to/backup"
for hdfs - "hdfs://example_host:example_port/examplepath"
(EXPERIMENTAL) for oss - "oss://example/url/to/the/backup"
(EXPERIMENTAL) for s3 - "s3://example/url/to/the/backup"
`)
backupCmd.PersistentFlags().StringVar(&backupConfig.User, "user", "", "username to login into the hosts where meta/storage service located")
backupCmd.PersistentFlags().IntVar(&backupConfig.MaxSSHConnections, "connection", 5, "max ssh connection")
backupCmd.PersistentFlags().IntVar(&backupConfig.MaxConcurrent, "concurrent", 5, "max concurrent(for aliyun OSS)")
backupCmd.PersistentFlags().StringVar(&backupConfig.CommandArgs, "extra_args", "", "backup storage utils(oss/hdfs/s3) args for backup")
backupCmd.PersistentFlags().BoolVar(&backupConfig.Verbose, "verbose", false, "show backup detailed informations")

backupCmd.MarkPersistentFlagRequired("meta")
backupCmd.MarkPersistentFlagRequired("storage")
Expand Down Expand Up @@ -54,13 +67,21 @@ func newFullBackupCmd() *cobra.Command {
return err
}
defer logger.Sync() // flushes buffer, if any
b := backup.NewBackupClient(backupConfig, logger.Logger)
var b *backup.Backup
b, err = backup.NewBackupClient(backupConfig, logger.Logger)
if err != nil {
return err
}

fmt.Println("start to backup cluster...")
err = b.BackupCluster()
if err != nil {
return err
}
fmt.Println("backup successed")
fmt.Println("backup successed.")
if backupConfig.Verbose {
b.ShowSummaries()
}
return nil
},
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func NewRestoreCMD() *cobra.Command {
restoreCmd.PersistentFlags().StringVar(&restoreConfig.BackendUrl, "storage", "", "storage path")
restoreCmd.PersistentFlags().StringVar(&restoreConfig.User, "user", "", "user for meta and storage")
restoreCmd.PersistentFlags().StringVar(&restoreConfig.BackupName, "name", "", "backup name")
restoreCmd.PersistentFlags().BoolVar(&restoreConfig.AllowStandaloneMeta, "allow_standalone_meta", false, "if the target cluster with standalone meta service is allowed(for testing purpose)")
restoreCmd.PersistentFlags().IntVar(&restoreConfig.MaxConcurrent, "concurrent", 5, "max concurrent(for aliyun OSS)")
restoreCmd.PersistentFlags().StringVar(&restoreConfig.CommandArgs, "extra_args", "", "storage utils(oss/hdfs/s3) args for restore")

Expand Down Expand Up @@ -55,7 +54,12 @@ func newFullRestoreCmd() *cobra.Command {

defer logger.Sync() // flushes buffer, if any

r := restore.NewRestore(restoreConfig, logger.Logger)
var r *restore.Restore
r, err = restore.NewRestore(restoreConfig, logger.Logger)
if err != nil {
return err
}

err = r.RestoreCluster()
if err != nil {
return err
Expand Down
111 changes: 105 additions & 6 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backup

import (
"encoding/json"
"errors"
"fmt"
_ "os"
Expand All @@ -16,6 +17,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/vesoft-inc/nebula-br/pkg/config"
ctx0 "github.com/vesoft-inc/nebula-br/pkg/context"
"github.com/vesoft-inc/nebula-br/pkg/metaclient"
"github.com/vesoft-inc/nebula-br/pkg/remote"
"github.com/vesoft-inc/nebula-br/pkg/storage"
Expand Down Expand Up @@ -44,21 +46,50 @@ func (e *BackupError) Error() string {
return e.msg + e.Err.Error()
}

type backupEntry struct {
SrcPath string
DestUrl string
}
type idPathMap map[string][]backupEntry
type Backup struct {
critical27 marked this conversation as resolved.
Show resolved Hide resolved
config config.BackupConfig
metaLeader string
backendStorage storage.ExternalStorage
log *zap.Logger
metaFileName string
storageMap map[string]idPathMap
metaMap map[string]idPathMap
storeCtx *ctx0.Context
critical27 marked this conversation as resolved.
Show resolved Hide resolved
}

func NewBackupClient(cf config.BackupConfig, log *zap.Logger) *Backup {
backend, err := storage.NewExternalStorage(cf.BackendUrl, log, cf.MaxConcurrent, cf.CommandArgs)
func NewBackupClient(cf config.BackupConfig, log *zap.Logger) (*Backup, error) {
local_addr, err := remote.GetAddresstoReachRemote(strings.Split(cf.Meta, ":")[0], cf.User, log)
if err != nil {
log.Error("get local address failed", zap.Error(err))
return nil, err
}
log.Info("local address", zap.String("address", local_addr))
var (
storeCtx ctx0.Context
backend storage.ExternalStorage
)
backend, err = storage.NewExternalStorage(cf.BackendUrl, log, cf.MaxConcurrent, cf.CommandArgs,
&storeCtx)
if err != nil {
log.Error("new external storage failed", zap.Error(err))
return nil
return nil, err
}
return &Backup{config: cf, backendStorage: backend, log: log}

b := &Backup{config: cf, log: log,
storageMap: make(map[string]idPathMap),
metaMap: make(map[string]idPathMap),
storeCtx: &storeCtx}

b.storeCtx.LocalAddr = local_addr
b.storeCtx.Reporter = b
b.backendStorage = backend

return b, nil
}

func (b *Backup) dropBackup(name []byte) (*meta.ExecResp, error) {
Expand Down Expand Up @@ -149,6 +180,9 @@ func (b *Backup) BackupCluster() error {
}

meta := resp.GetMeta()
b.log.Info("response backup meta",
zap.String("backup.meta", metaclient.BackupMetaToString(meta)))

err = b.uploadAll(meta)
if err != nil {
return err
Expand All @@ -157,12 +191,27 @@ func (b *Backup) BackupCluster() error {
return nil
}

func (b *Backup) execPreUploadMetaCommand(metaDir string) error {
cmdStr := []string{"mkdir", "-p", metaDir}
b.log.Info("exec pre upload meta command", zap.Strings("cmd", cmdStr))
cmd := exec.Command(cmdStr[0], cmdStr[1:]...)
err := cmd.Run()
if err != nil {
return err
}
cmd.Wait()
return nil
}

func (b *Backup) uploadMeta(g *errgroup.Group, files []string) {

b.log.Info("will upload meta", zap.Int("sst file count", len(files)))
cmd := b.backendStorage.BackupMetaCommand(files)
b.log.Info("start upload meta", zap.String("addr", b.metaLeader))
ipAddr := strings.Split(b.metaLeader, ":")
b.storeCtx.RemoteAddr = ipAddr[0]
critical27 marked this conversation as resolved.
Show resolved Hide resolved

b.log.Info("will upload meta", zap.Int("sst file count", len(files)))
cmd := b.backendStorage.BackupMetaCommand(files)

func(addr string, user string, cmd string, log *zap.Logger) {
g.Go(func() error {
client, err := remote.NewClient(addr, user, log)
Expand Down Expand Up @@ -193,6 +242,9 @@ func (b *Backup) uploadStorage(g *errgroup.Group, dirs map[string][]spaceInfo) e
return err
}
i := 0

b.storeCtx.RemoteAddr = ipAddrs[0]

//We need to limit the number of ssh connections per storage node
for id2, cp := range idMap {
cmds := b.backendStorage.BackupStorageCommand(cp, k, id2)
Expand Down Expand Up @@ -254,6 +306,12 @@ func (b *Backup) uploadAll(meta *meta.BackupMeta) error {
return err
}

err = b.execPreUploadMetaCommand(b.backendStorage.BackupMetaDir())
if err != nil {
b.log.Error("exec pre uploadmeta command failed", zap.Error(err))
return err
}

var metaFiles []string
for _, f := range meta.GetMetaFiles() {
fileName := string(f[:])
Expand All @@ -271,6 +329,7 @@ func (b *Backup) uploadAll(meta *meta.BackupMeta) error {
}
}
}

err = b.uploadStorage(g, storageMap)
if err != nil {
return err
Expand Down Expand Up @@ -305,3 +364,43 @@ func (b *Backup) uploadAll(meta *meta.BackupMeta) error {

return nil
}

func (b *Backup) ShowSummaries() {
fmt.Printf("==== backup summeries ====\n")
fmt.Printf("localaddr : %s\n", b.storeCtx.LocalAddr)
fmt.Printf("backend.type : %s\n", b.backendStorage.Scheme())
fmt.Printf("backend.url : %s\n", b.backendStorage.URI())
fmt.Printf("tgt.meta.leader : %s\n", b.config.Meta)
if b.backendStorage.Scheme() == storage.SCHEME_LOCAL {
// if local, storages' snapshot would be copy to a path at that host.
b.showUploadSummaries(&b.metaMap, "tgt.meta.map")
b.showUploadSummaries(&b.storageMap, "tgt.storage.map")
}
fmt.Printf("==========================\n")
}

func (b *Backup) showUploadSummaries(m *map[string]idPathMap, msg string) {
o, _ := json.MarshalIndent(m, "", " ")
fmt.Printf("--- %s ---\n", msg)
fmt.Printf("%s\n", string(o))
}

func (b *Backup) doRecordUploading(m *map[string]idPathMap, spaceId string, host string, paths []string, desturl string) {
if (*m)[host] == nil {
(*m)[host] = make(idPathMap)
}
bes := []backupEntry{}
for _, p := range paths {
bes = append(bes, backupEntry{SrcPath: p, DestUrl: desturl})
}
(*m)[host][spaceId] = append((*m)[host][spaceId], bes[:]...)
}

func (b *Backup) StorageUploadingReport(spaceid string, host string, paths []string, desturl string) {
b.doRecordUploading(&b.storageMap, spaceid, host, paths, desturl)
}

func (b *Backup) MetaUploadingReport(host string, paths []string, desturl string) {
kDefaultSid := "0"
b.doRecordUploading(&b.metaMap, kDefaultSid, host, paths, desturl)
}
12 changes: 6 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ type BackupConfig struct {
// Only for OSS for now
MaxConcurrent int
CommandArgs string
Verbose bool
}

type RestoreConfig struct {
Meta string
BackendUrl string
MaxSSHConnections int
User string
BackupName string
AllowStandaloneMeta bool
Meta string
BackendUrl string
MaxSSHConnections int
User string
BackupName string
// Only for OSS for now
MaxConcurrent int
CommandArgs string
Expand Down
21 changes: 21 additions & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package context

import (
_ "github.com/vesoft-inc/nebula-go/v2/nebula"
)

type BackendUploadTracker interface {
StorageUploadingReport(spaceid string, host string, paths []string, desturl string)
MetaUploadingReport(host string, paths []string, desturl string)
}

// NB - not thread-safe
type Context struct {
LocalAddr string // the address of br client
RemoteAddr string // the address of nebula service
Reporter BackendUploadTracker
}

func NewContext(localaddr string, r BackendUploadTracker) *Context {
return &Context{LocalAddr: localaddr, Reporter: r}
}
18 changes: 18 additions & 0 deletions pkg/metaclient/util.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
package metaclient

import (
"encoding/json"
"strconv"

"github.com/vesoft-inc/nebula-go/v2/nebula"
"github.com/vesoft-inc/nebula-go/v2/nebula/meta"
)

func HostaddrToString(host *nebula.HostAddr) string {
return host.Host + ":" + strconv.Itoa(int(host.Port))
}

func BackupMetaToString(m *meta.BackupMeta) string {
mstr, err := json.Marshal(m)
if err != nil {
return ""
}
return string(mstr)
}

func ListClusterInfoRespToString(m *meta.ListClusterInfoResp) string {
mstr, err := json.Marshal(m)
if err != nil {
return ""
}
return string(mstr)
}
19 changes: 15 additions & 4 deletions pkg/remote/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ func NewClient(addr string, user string, log *zap.Logger) (*Client, error) {
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
}

retry := 1
retry := 0
for retry < 3 {
client, err := ssh.Dial("tcp", net.JoinHostPort(addr, "22"), config)
var client *ssh.Client
client, err = ssh.Dial("tcp", net.JoinHostPort(addr, "22"), config)
if err != nil {
log.Error("unable to connect host, will retry", zap.Error(err), zap.String("host", addr), zap.String("user", user))
time.Sleep(time.Second * 3)
log.Error("unable to connect host, will retry", zap.Int("attemp", retry), zap.Error(err), zap.String("host", addr), zap.String("user", user))
time.Sleep(time.Second * 1)
retry += 1
continue
}
return &Client{client, addr, user, log}, nil
Expand All @@ -71,6 +73,15 @@ func NewClientPool(addr string, user string, log *zap.Logger, count int) ([]*Cli
return clients, nil
}

func GetAddresstoReachRemote(addr string, user string, log *zap.Logger) (string, error) {
if cli, err := NewClient(addr, user, log); err == nil {
log.Info("succeed to reach remote", zap.String("addr of local", cli.client.Conn.LocalAddr().String()))
return strings.Split(cli.client.Conn.LocalAddr().String(), ":")[0], nil
} else {
return "", err
}
}

func (c *Client) Close() {
c.client.Close()
}
Expand Down
Loading