Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(restore): add support for namespace aware restore #8968

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os/exec"
"strings"
Expand All @@ -43,6 +44,7 @@ type Cluster interface {
AlphasLogs() ([]string, error)
AssignUids(gc *dgo.Dgraph, num uint64) error
GetVersion() string
GetEncKeyPath() (string, error)
}

type GrpcClient struct {
Expand Down Expand Up @@ -375,7 +377,7 @@ func (hc *HTTPClient) WaitForTask(taskId string) error {

// Restore performs restore on Dgraph cluster from the given path to backup
func (hc *HTTPClient) Restore(c Cluster, backupPath string,
shivaji-kharse marked this conversation as resolved.
Show resolved Hide resolved
backupId string, incrFrom, backupNum int, encKey string) error {
backupId string, incrFrom, backupNum int) error {

// incremental restore was introduced in commit 8b3712e93ed2435bea52d957f7b69976c6cfc55b
incrRestoreSupported, err := IsHigherVersion(c.GetVersion(), "8b3712e93ed2435bea52d957f7b69976c6cfc55b")
Expand All @@ -386,6 +388,11 @@ func (hc *HTTPClient) Restore(c Cluster, backupPath string,
return errors.New("incremental restore is not supported by the cluster")
}

encKey, err := c.GetEncKeyPath()
if err != nil {
return errors.Wrapf(err, "error getting encryption key path")
}

var varPart, queryPart string
if incrRestoreSupported {
varPart = "$incrFrom: Int, "
Expand Down Expand Up @@ -429,6 +436,51 @@ func (hc *HTTPClient) Restore(c Cluster, backupPath string,
return nil
}

// RestoreTenant restore specific namespace
func (hc *HTTPClient) RestoreTenant(c Cluster, backupPath string, backupId string,
incrFrom, backupNum int, fromNamespace uint64) error {

encKey, err := c.GetEncKeyPath()
if err != nil {
return errors.Wrapf(err, "error getting encryption key path")
}

query := `mutation restoreTenant( $location: String!, $backupId: String,
$incrFrom: Int, $backupNum: Int, $encKey: String,$fromNamespace: Int! ) {
restoreTenant(input: {restoreInput: { location: $location, backupId: $backupId,
incrementalFrom: $incrFrom, backupNum: $backupNum,
encryptionKeyFile: $encKey },fromNamespace:$fromNamespace}) {
code
message
}
}`
vars := map[string]interface{}{"location": backupPath, "backupId": backupId, "backupNum": backupNum,
"encKey": encKey, "fromNamespace": fromNamespace, "incrFrom": incrFrom}

params := GraphQLParams{
Query: query,
Variables: vars,
}
resp, err := hc.RunGraphqlQuery(params, true)
if err != nil {
return err
}

var restoreResp struct {
RestoreTenant struct {
Code string
Message string
}
}
if err := json.Unmarshal(resp, &restoreResp); err != nil {
return errors.Wrap(err, "error unmarshalling restore response")
}
if restoreResp.RestoreTenant.Code != "Success" {
return fmt.Errorf("restoreTenant failed, response: %+v", restoreResp.RestoreTenant)
}
return nil
}

// WaitForRestore waits for restore to complete on all alphas
func WaitForRestore(c Cluster) error {
loop:
Expand Down Expand Up @@ -578,13 +630,19 @@ func (hc *HTTPClient) GetZeroState() (*LicenseResponse, error) {
if err != nil {
return nil, errors.Wrap(err, "error getting zero state http response")
}
defer func() {
if err := response.Body.Close(); err != nil {
log.Printf("[WARNING] error closing body: %v", err)
}
}()

body, err := io.ReadAll(response.Body)
if err != nil {
return nil, errors.New("error reading zero state response body")
return nil, errors.Wrapf(err, "error reading zero state response body")
}
var stateResponse LicenseResponse
if err := json.Unmarshal(body, &stateResponse); err != nil {
return nil, errors.New("error unmarshaling zero state response")
return nil, errors.Wrapf(err, "error unmarshaling zero state response")
}

return &stateResponse, nil
Expand Down
4 changes: 4 additions & 0 deletions dgraphtest/compose_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ func (c *ComposeCluster) AssignUids(client *dgo.Dgraph, num uint64) error {
func (c *ComposeCluster) GetVersion() string {
return localVersion
}

func (c *ComposeCluster) GetEncKeyPath() (string, error) {
return "", errNotImplemented
}
4 changes: 4 additions & 0 deletions dgraphtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,7 @@ func (cc ClusterConfig) WithCustomPlugins() ClusterConfig {
cc.customPlugins = true
return cc
}

func (cc ClusterConfig) GetClusterVolume(volume string) string {
return cc.volumes[volume]
}
17 changes: 12 additions & 5 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,10 +606,6 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
return err
}

var encPath string
if c.conf.encryption {
encPath = encKeyMountPath
}
hc, err = c.HTTPClient()
if err != nil {
return errors.Wrapf(err, "error creating HTTP client after upgrade")
Expand All @@ -619,7 +615,7 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
return errors.Wrapf(err, "error during login after upgrade")
}
}
if err := hc.Restore(c, DefaultBackupDir, "", 0, 1, encPath); err != nil {
if err := hc.Restore(c, DefaultBackupDir, "", 0, 1); err != nil {
return errors.Wrap(err, "error doing restore during upgrade")
}
if err := WaitForRestore(c); err != nil {
Expand Down Expand Up @@ -850,6 +846,17 @@ func (c *LocalCluster) GetVersion() string {
return c.conf.version
}

// GetEncKeyPath returns the path to the encryption key file when encryption is enabled.
// It returns an empty string otherwise. The path to the encryption file is valid only
// inside the alpha container.
func (c *LocalCluster) GetEncKeyPath() (string, error) {
if c.conf.encryption {
return encKeyMountPath, nil
}

return "", nil
}

func (c *LocalCluster) printAllLogs() error {
log.Printf("[INFO] all logs for cluster with prefix [%v] are below!", c.conf.prefix)
var finalErr error
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ require (
golang.org/x/text v0.12.0
golang.org/x/tools v0.9.3
google.golang.org/grpc v1.56.2
google.golang.org/protobuf v1.31.0
gopkg.in/square/go-jose.v2 v2.3.1
gopkg.in/yaml.v2 v2.4.0
)
Expand Down Expand Up @@ -143,6 +142,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc/examples v0.0.0-20230821201920-d51b3f41716d // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/DataDog/dd-trace-go.v1 v1.22.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ func newAdminResolverFactory() resolve.ResolverFactory {
"moveTablet": resolveMoveTablet,
"assign": resolveAssign,
"enterpriseLicense": resolveEnterpriseLicense,
"restoreTenant": resolveTenantRestore,
}

rf := resolverFactoryWithErrorMsg(errResolverNotFound).
Expand Down
18 changes: 18 additions & 0 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ const adminTypes = `
taskId: String
}

input RestoreTenantInput {
"""
restoreInput contains fields that are required for the restore operation,
i.e., location, backupId, and backupNum
"""
restoreInput: RestoreInput
shivaji-kharse marked this conversation as resolved.
Show resolved Hide resolved
shivaji-kharse marked this conversation as resolved.
Show resolved Hide resolved

"""
fromNamespace is the namespace of the tenant that needs to be restored into namespace 0 of the new cluster.
"""
fromNamespace: Int!
}

input RestoreInput {

"""
Expand Down Expand Up @@ -478,6 +491,11 @@ const adminMutations = `
"""
restore(input: RestoreInput!) : RestorePayload

"""
Restore given tenant into namespace 0 of the cluster
"""
restoreTenant(input: RestoreTenantInput!) : RestorePayload

"""
Login to Dgraph. Successful login results in a JWT that can be used in future requests.
If login is not successful an error is returned.
Expand Down
104 changes: 84 additions & 20 deletions graphql/admin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,41 @@ type restoreInput struct {
VaultFormat string
}

type restoreTenantInput struct {
RestoreInput restoreInput
FromNamespace uint64
}

func resolveTenantRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
input, err := getRestoreTenantInput(m)
if err != nil {
return resolve.EmptyResult(m, err), false
}
glog.Infof("Got restore request: %+v", input)

req := pb.RestoreRequest{
Location: input.RestoreInput.Location,
BackupId: input.RestoreInput.BackupId,
BackupNum: uint64(input.RestoreInput.BackupNum),
IncrementalFrom: uint64(input.RestoreInput.IncrementalFrom),
IsPartial: input.RestoreInput.IsPartial,
EncryptionKeyFile: input.RestoreInput.EncryptionKeyFile,
AccessKey: input.RestoreInput.AccessKey,
SecretKey: input.RestoreInput.SecretKey,
SessionToken: input.RestoreInput.SessionToken,
Anonymous: input.RestoreInput.Anonymous,
VaultAddr: input.RestoreInput.VaultAddr,
VaultRoleidFile: input.RestoreInput.VaultRoleIDFile,
VaultSecretidFile: input.RestoreInput.VaultSecretIDFile,
VaultPath: input.RestoreInput.VaultPath,
VaultField: input.RestoreInput.VaultField,
VaultFormat: input.RestoreInput.VaultFormat,
FromNamespace: input.FromNamespace,
IsNamespaceAwareRestore: true,
}
return restore(ctx, m, req)
}

func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
input, err := getRestoreInput(m)
if err != nil {
Expand All @@ -59,27 +94,31 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,
input.Location, input.BackupId, input.BackupNum, input.IncrementalFrom, input.IsPartial)

req := pb.RestoreRequest{
Location: input.Location,
BackupId: input.BackupId,
BackupNum: uint64(input.BackupNum),
IncrementalFrom: uint64(input.IncrementalFrom),
IsPartial: input.IsPartial,
EncryptionKeyFile: input.EncryptionKeyFile,
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
SessionToken: input.SessionToken,
Anonymous: input.Anonymous,
VaultAddr: input.VaultAddr,
VaultRoleidFile: input.VaultRoleIDFile,
VaultSecretidFile: input.VaultSecretIDFile,
VaultPath: input.VaultPath,
VaultField: input.VaultField,
VaultFormat: input.VaultFormat,
Location: input.Location,
BackupId: input.BackupId,
BackupNum: uint64(input.BackupNum),
IncrementalFrom: uint64(input.IncrementalFrom),
IsPartial: input.IsPartial,
EncryptionKeyFile: input.EncryptionKeyFile,
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
SessionToken: input.SessionToken,
Anonymous: input.Anonymous,
VaultAddr: input.VaultAddr,
VaultRoleidFile: input.VaultRoleIDFile,
VaultSecretidFile: input.VaultSecretIDFile,
VaultPath: input.VaultPath,
VaultField: input.VaultField,
VaultFormat: input.VaultFormat,
IsNamespaceAwareRestore: false,
}

return restore(ctx, m, req)
}

func restore(ctx context.Context, m schema.Mutation, req pb.RestoreRequest) (*resolve.Resolved, bool) {
wg := &sync.WaitGroup{}
err = worker.ProcessRestoreRequest(context.Background(), &req, wg)
if err != nil {
if err := worker.ProcessRestoreRequest(context.Background(), &req, wg); err != nil {
glog.Warningf("error processing restore request: %+v, err: %v", req, err)
return resolve.DataResult(
m,
Expand Down Expand Up @@ -116,10 +155,35 @@ func getRestoreInput(m schema.Mutation) (*restoreInput, error) {
if err := json.Unmarshal(inputByts, &input); err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}
if err := verifyRestoreInput(input); err != nil {
return nil, err
}

if input.BackupNum < 0 {
err := errors.Errorf("backupNum value should be equal or greater than zero")
return &input, nil
}

func getRestoreTenantInput(m schema.Mutation) (*restoreTenantInput, error) {
inputArg := m.ArgValue(schema.InputArgName)
inputByts, err := json.Marshal(inputArg)
if err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}

var input restoreTenantInput
if err := json.Unmarshal(inputByts, &input); err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}
if err := verifyRestoreInput(input.RestoreInput); err != nil {
return nil, err
}

return &input, nil
}

func verifyRestoreInput(input restoreInput) error {
shivaji-kharse marked this conversation as resolved.
Show resolved Hide resolved
if input.BackupNum < 0 {
err := errors.Errorf("backupNum value should be equal or greater than zero")
return schema.GQLWrapf(err, "couldn't get input argument")
}
return nil
}
2 changes: 2 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ message RestoreRequest {
uint64 backup_num = 16;
uint64 incremental_from = 17;
bool is_partial = 18;
uint64 fromNamespace = 19;
bool isNamespaceAwareRestore = 20;
}

message Proposal {
Expand Down
Loading