Skip to content

Commit

Permalink
feat(restore): add support for namespace aware restore (#8968)
Browse files Browse the repository at this point in the history
This commit adds support for namespace aware restore. After this change,
we can restore exactly one namespace from a backup of multiple
namespaces into the namespace 0 of a dgraph cluster.
  • Loading branch information
shivaji-kharse authored Sep 14, 2023
1 parent 4c9448a commit e9f9b15
Show file tree
Hide file tree
Showing 14 changed files with 962 additions and 385 deletions.
64 changes: 61 additions & 3 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os/exec"
"strings"
Expand All @@ -43,6 +44,7 @@ type Cluster interface {
AlphasLogs() ([]string, error)
AssignUids(gc *dgo.Dgraph, num uint64) error
GetVersion() string
GetEncKeyPath() (string, error)
}

type GrpcClient struct {
Expand Down Expand Up @@ -375,7 +377,7 @@ func (hc *HTTPClient) WaitForTask(taskId string) error {

// Restore performs restore on Dgraph cluster from the given path to backup
func (hc *HTTPClient) Restore(c Cluster, backupPath string,
backupId string, incrFrom, backupNum int, encKey string) error {
backupId string, incrFrom, backupNum int) error {

// incremental restore was introduced in commit 8b3712e93ed2435bea52d957f7b69976c6cfc55b
incrRestoreSupported, err := IsHigherVersion(c.GetVersion(), "8b3712e93ed2435bea52d957f7b69976c6cfc55b")
Expand All @@ -386,6 +388,11 @@ func (hc *HTTPClient) Restore(c Cluster, backupPath string,
return errors.New("incremental restore is not supported by the cluster")
}

encKey, err := c.GetEncKeyPath()
if err != nil {
return errors.Wrapf(err, "error getting encryption key path")
}

var varPart, queryPart string
if incrRestoreSupported {
varPart = "$incrFrom: Int, "
Expand Down Expand Up @@ -429,6 +436,51 @@ func (hc *HTTPClient) Restore(c Cluster, backupPath string,
return nil
}

// RestoreTenant restore specific namespace
func (hc *HTTPClient) RestoreTenant(c Cluster, backupPath string, backupId string,
incrFrom, backupNum int, fromNamespace uint64) error {

encKey, err := c.GetEncKeyPath()
if err != nil {
return errors.Wrapf(err, "error getting encryption key path")
}

query := `mutation restoreTenant( $location: String!, $backupId: String,
$incrFrom: Int, $backupNum: Int, $encKey: String,$fromNamespace: Int! ) {
restoreTenant(input: {restoreInput: { location: $location, backupId: $backupId,
incrementalFrom: $incrFrom, backupNum: $backupNum,
encryptionKeyFile: $encKey },fromNamespace:$fromNamespace}) {
code
message
}
}`
vars := map[string]interface{}{"location": backupPath, "backupId": backupId, "backupNum": backupNum,
"encKey": encKey, "fromNamespace": fromNamespace, "incrFrom": incrFrom}

params := GraphQLParams{
Query: query,
Variables: vars,
}
resp, err := hc.RunGraphqlQuery(params, true)
if err != nil {
return err
}

var restoreResp struct {
RestoreTenant struct {
Code string
Message string
}
}
if err := json.Unmarshal(resp, &restoreResp); err != nil {
return errors.Wrap(err, "error unmarshalling restore response")
}
if restoreResp.RestoreTenant.Code != "Success" {
return fmt.Errorf("restoreTenant failed, response: %+v", restoreResp.RestoreTenant)
}
return nil
}

// WaitForRestore waits for restore to complete on all alphas
func WaitForRestore(c Cluster) error {
loop:
Expand Down Expand Up @@ -578,13 +630,19 @@ func (hc *HTTPClient) GetZeroState() (*LicenseResponse, error) {
if err != nil {
return nil, errors.Wrap(err, "error getting zero state http response")
}
defer func() {
if err := response.Body.Close(); err != nil {
log.Printf("[WARNING] error closing body: %v", err)
}
}()

body, err := io.ReadAll(response.Body)
if err != nil {
return nil, errors.New("error reading zero state response body")
return nil, errors.Wrapf(err, "error reading zero state response body")
}
var stateResponse LicenseResponse
if err := json.Unmarshal(body, &stateResponse); err != nil {
return nil, errors.New("error unmarshaling zero state response")
return nil, errors.Wrapf(err, "error unmarshaling zero state response")
}

return &stateResponse, nil
Expand Down
4 changes: 4 additions & 0 deletions dgraphtest/compose_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ func (c *ComposeCluster) AssignUids(client *dgo.Dgraph, num uint64) error {
func (c *ComposeCluster) GetVersion() string {
return localVersion
}

func (c *ComposeCluster) GetEncKeyPath() (string, error) {
return "", errNotImplemented
}
4 changes: 4 additions & 0 deletions dgraphtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,7 @@ func (cc ClusterConfig) WithCustomPlugins() ClusterConfig {
cc.customPlugins = true
return cc
}

func (cc ClusterConfig) GetClusterVolume(volume string) string {
return cc.volumes[volume]
}
17 changes: 12 additions & 5 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,10 +606,6 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
return err
}

var encPath string
if c.conf.encryption {
encPath = encKeyMountPath
}
hc, err = c.HTTPClient()
if err != nil {
return errors.Wrapf(err, "error creating HTTP client after upgrade")
Expand All @@ -619,7 +615,7 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
return errors.Wrapf(err, "error during login after upgrade")
}
}
if err := hc.Restore(c, DefaultBackupDir, "", 0, 1, encPath); err != nil {
if err := hc.Restore(c, DefaultBackupDir, "", 0, 1); err != nil {
return errors.Wrap(err, "error doing restore during upgrade")
}
if err := WaitForRestore(c); err != nil {
Expand Down Expand Up @@ -850,6 +846,17 @@ func (c *LocalCluster) GetVersion() string {
return c.conf.version
}

// GetEncKeyPath returns the path to the encryption key file when encryption is enabled.
// It returns an empty string otherwise. The path to the encryption file is valid only
// inside the alpha container.
func (c *LocalCluster) GetEncKeyPath() (string, error) {
if c.conf.encryption {
return encKeyMountPath, nil
}

return "", nil
}

func (c *LocalCluster) printAllLogs() error {
log.Printf("[INFO] all logs for cluster with prefix [%v] are below!", c.conf.prefix)
var finalErr error
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ require (
golang.org/x/text v0.12.0
golang.org/x/tools v0.9.3
google.golang.org/grpc v1.56.2
google.golang.org/protobuf v1.31.0
gopkg.in/square/go-jose.v2 v2.3.1
gopkg.in/yaml.v2 v2.4.0
)
Expand Down Expand Up @@ -143,6 +142,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc/examples v0.0.0-20230821201920-d51b3f41716d // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/DataDog/dd-trace-go.v1 v1.22.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ func newAdminResolverFactory() resolve.ResolverFactory {
"moveTablet": resolveMoveTablet,
"assign": resolveAssign,
"enterpriseLicense": resolveEnterpriseLicense,
"restoreTenant": resolveTenantRestore,
}

rf := resolverFactoryWithErrorMsg(errResolverNotFound).
Expand Down
18 changes: 18 additions & 0 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ const adminTypes = `
taskId: String
}
input RestoreTenantInput {
"""
restoreInput contains fields that are required for the restore operation,
i.e., location, backupId, and backupNum
"""
restoreInput: RestoreInput
"""
fromNamespace is the namespace of the tenant that needs to be restored into namespace 0 of the new cluster.
"""
fromNamespace: Int!
}
input RestoreInput {
"""
Expand Down Expand Up @@ -478,6 +491,11 @@ const adminMutations = `
"""
restore(input: RestoreInput!) : RestorePayload
"""
Restore given tenant into namespace 0 of the cluster
"""
restoreTenant(input: RestoreTenantInput!) : RestorePayload
"""
Login to Dgraph. Successful login results in a JWT that can be used in future requests.
If login is not successful an error is returned.
Expand Down
104 changes: 84 additions & 20 deletions graphql/admin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,41 @@ type restoreInput struct {
VaultFormat string
}

type restoreTenantInput struct {
RestoreInput restoreInput
FromNamespace uint64
}

func resolveTenantRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
input, err := getRestoreTenantInput(m)
if err != nil {
return resolve.EmptyResult(m, err), false
}
glog.Infof("Got restore request: %+v", input)

req := pb.RestoreRequest{
Location: input.RestoreInput.Location,
BackupId: input.RestoreInput.BackupId,
BackupNum: uint64(input.RestoreInput.BackupNum),
IncrementalFrom: uint64(input.RestoreInput.IncrementalFrom),
IsPartial: input.RestoreInput.IsPartial,
EncryptionKeyFile: input.RestoreInput.EncryptionKeyFile,
AccessKey: input.RestoreInput.AccessKey,
SecretKey: input.RestoreInput.SecretKey,
SessionToken: input.RestoreInput.SessionToken,
Anonymous: input.RestoreInput.Anonymous,
VaultAddr: input.RestoreInput.VaultAddr,
VaultRoleidFile: input.RestoreInput.VaultRoleIDFile,
VaultSecretidFile: input.RestoreInput.VaultSecretIDFile,
VaultPath: input.RestoreInput.VaultPath,
VaultField: input.RestoreInput.VaultField,
VaultFormat: input.RestoreInput.VaultFormat,
FromNamespace: input.FromNamespace,
IsNamespaceAwareRestore: true,
}
return restore(ctx, m, req)
}

func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
input, err := getRestoreInput(m)
if err != nil {
Expand All @@ -59,27 +94,31 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,
input.Location, input.BackupId, input.BackupNum, input.IncrementalFrom, input.IsPartial)

req := pb.RestoreRequest{
Location: input.Location,
BackupId: input.BackupId,
BackupNum: uint64(input.BackupNum),
IncrementalFrom: uint64(input.IncrementalFrom),
IsPartial: input.IsPartial,
EncryptionKeyFile: input.EncryptionKeyFile,
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
SessionToken: input.SessionToken,
Anonymous: input.Anonymous,
VaultAddr: input.VaultAddr,
VaultRoleidFile: input.VaultRoleIDFile,
VaultSecretidFile: input.VaultSecretIDFile,
VaultPath: input.VaultPath,
VaultField: input.VaultField,
VaultFormat: input.VaultFormat,
Location: input.Location,
BackupId: input.BackupId,
BackupNum: uint64(input.BackupNum),
IncrementalFrom: uint64(input.IncrementalFrom),
IsPartial: input.IsPartial,
EncryptionKeyFile: input.EncryptionKeyFile,
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
SessionToken: input.SessionToken,
Anonymous: input.Anonymous,
VaultAddr: input.VaultAddr,
VaultRoleidFile: input.VaultRoleIDFile,
VaultSecretidFile: input.VaultSecretIDFile,
VaultPath: input.VaultPath,
VaultField: input.VaultField,
VaultFormat: input.VaultFormat,
IsNamespaceAwareRestore: false,
}

return restore(ctx, m, req)
}

func restore(ctx context.Context, m schema.Mutation, req pb.RestoreRequest) (*resolve.Resolved, bool) {
wg := &sync.WaitGroup{}
err = worker.ProcessRestoreRequest(context.Background(), &req, wg)
if err != nil {
if err := worker.ProcessRestoreRequest(context.Background(), &req, wg); err != nil {
glog.Warningf("error processing restore request: %+v, err: %v", req, err)
return resolve.DataResult(
m,
Expand Down Expand Up @@ -116,10 +155,35 @@ func getRestoreInput(m schema.Mutation) (*restoreInput, error) {
if err := json.Unmarshal(inputByts, &input); err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}
if err := verifyRestoreInput(input); err != nil {
return nil, err
}

if input.BackupNum < 0 {
err := errors.Errorf("backupNum value should be equal or greater than zero")
return &input, nil
}

func getRestoreTenantInput(m schema.Mutation) (*restoreTenantInput, error) {
inputArg := m.ArgValue(schema.InputArgName)
inputByts, err := json.Marshal(inputArg)
if err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}

var input restoreTenantInput
if err := json.Unmarshal(inputByts, &input); err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}
if err := verifyRestoreInput(input.RestoreInput); err != nil {
return nil, err
}

return &input, nil
}

func verifyRestoreInput(input restoreInput) error {
if input.BackupNum < 0 {
err := errors.Errorf("backupNum value should be equal or greater than zero")
return schema.GQLWrapf(err, "couldn't get input argument")
}
return nil
}
2 changes: 2 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ message RestoreRequest {
uint64 backup_num = 16;
uint64 incremental_from = 17;
bool is_partial = 18;
uint64 fromNamespace = 19;
bool isNamespaceAwareRestore = 20;
}

message Proposal {
Expand Down
Loading

0 comments on commit e9f9b15

Please sign in to comment.