Skip to content

Commit

Permalink
add namespace aware restore functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaji-kharse committed Sep 1, 2023
1 parent 4c9448a commit 979facf
Show file tree
Hide file tree
Showing 14 changed files with 825 additions and 377 deletions.
51 changes: 38 additions & 13 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os/exec"
"strings"
Expand All @@ -43,6 +44,7 @@ type Cluster interface {
AlphasLogs() ([]string, error)
AssignUids(gc *dgo.Dgraph, num uint64) error
GetVersion() string
GetEncKeyPath() (string, error)
}

type GrpcClient struct {
Expand Down Expand Up @@ -374,8 +376,8 @@ func (hc *HTTPClient) WaitForTask(taskId string) error {
}

// Restore performs restore on Dgraph cluster from the given path to backup
func (hc *HTTPClient) Restore(c Cluster, backupPath string,
backupId string, incrFrom, backupNum int, encKey string) error {
func (hc *HTTPClient) Restore(c Cluster, backupPath string, backupId string,
incrFrom, backupNum int, oneNamespaceOnly bool, fromNamespace uint64) error {

// incremental restore was introduced in commit 8b3712e93ed2435bea52d957f7b69976c6cfc55b
incrRestoreSupported, err := IsHigherVersion(c.GetVersion(), "8b3712e93ed2435bea52d957f7b69976c6cfc55b")
Expand All @@ -386,24 +388,41 @@ func (hc *HTTPClient) Restore(c Cluster, backupPath string,
return errors.New("incremental restore is not supported by the cluster")
}

var varPart, queryPart string
isNamespaceAwareRestorSupported, err := IsHigherVersion(c.GetVersion(), "ba727644664138d3ddba96a21348612b2bec2d88")
if err != nil {
return errors.Wrapf(err, "error checking namespace aware restore support")
}
encKey, err := c.GetEncKeyPath()
if err != nil {
return errors.Wrapf(err, "error getting encryption key path")
}

var varPart, queryPart strings.Builder
if incrRestoreSupported {
varPart = "$incrFrom: Int, "
queryPart = " incrementalFrom: $incrFrom,"
varPart.WriteString("$incrFrom: Int, ")
queryPart.WriteString(" incrementalFrom: $incrFrom,")
}
if isNamespaceAwareRestorSupported {
varPart.WriteString("$oneNamespaceOnly: Boolean, $fromNamespace: UInt64 ")
queryPart.WriteString(" oneNamespaceOnly: $oneNamespaceOnly, fromNamespace: $fromNamespace,")
}
query := fmt.Sprintf(`mutation restore($location: String!, $backupId: String,
%v$backupNum: Int, $encKey: String) {
restore(input: {location: $location, backupId: $backupId,%v
backupNum: $backupNum, encryptionKeyFile: $encKey}) {
%v$backupNum: Int, $encKey: String, ) {
restore(input: {location: $location, backupId: $backupId,%v backupNum: $backupNum,
encryptionKeyFile: $encKey, }) {
code
message
}
}`, varPart, queryPart)
vars := map[string]interface{}{"location": backupPath, "backupId": backupId,
"backupNum": backupNum, "encKey": encKey}
}`, varPart.String(), queryPart.String())
vars := map[string]interface{}{"location": backupPath, "backupId": backupId, "backupNum": backupNum,
"encKey": encKey, "oneNamespaceOnly": oneNamespaceOnly, "fromNamespace": fromNamespace}
if incrRestoreSupported {
vars["incrFrom"] = incrFrom
}
if isNamespaceAwareRestorSupported {
vars["oneNamespaceOnly"] = oneNamespaceOnly
vars["fromNamespace"] = fromNamespace
}

params := GraphQLParams{
Query: query,
Expand Down Expand Up @@ -578,13 +597,19 @@ func (hc *HTTPClient) GetZeroState() (*LicenseResponse, error) {
if err != nil {
return nil, errors.Wrap(err, "error getting zero state http response")
}
defer func() {
if err := response.Body.Close(); err != nil {
log.Printf("[WARNING] error closing body: %v", err)
}
}()

body, err := io.ReadAll(response.Body)
if err != nil {
return nil, errors.New("error reading zero state response body")
return nil, errors.Wrapf(err, "error reading zero state response body")
}
var stateResponse LicenseResponse
if err := json.Unmarshal(body, &stateResponse); err != nil {
return nil, errors.New("error unmarshaling zero state response")
return nil, errors.Wrapf(err, "error unmarshaling zero state response")
}

return &stateResponse, nil
Expand Down
4 changes: 4 additions & 0 deletions dgraphtest/compose_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ func (c *ComposeCluster) AssignUids(client *dgo.Dgraph, num uint64) error {
func (c *ComposeCluster) GetVersion() string {
return localVersion
}

func (c *ComposeCluster) GetEncKeyPath() (string, error) {
return "", errNotImplemented
}
6 changes: 3 additions & 3 deletions dgraphtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type ClusterConfig struct {
aclAlg jwt.SigningMethod
encryption bool
version string
volumes map[string]string
Volumes map[string]string
refillInterval time.Duration
uidLease int
portOffset int // exposed port offset for grpc/http port for both alpha/zero
Expand All @@ -131,7 +131,7 @@ func NewClusterConfig() ClusterConfig {
replicas: 1,
verbosity: 2,
version: localVersion,
volumes: map[string]string{DefaultBackupDir: defaultBackupVol, DefaultExportDir: defaultExportVol},
Volumes: map[string]string{DefaultBackupDir: defaultBackupVol, DefaultExportDir: defaultExportVol},
refillInterval: 20 * time.Second,
uidLease: 50,
portOffset: -1,
Expand Down Expand Up @@ -192,7 +192,7 @@ func (cc ClusterConfig) WithVersion(version string) ClusterConfig {
// WithAlphaVolume allows creating a shared volumes across alphas with
// name volname and mount directory specified as dir inside the container
func (cc ClusterConfig) WithAlphaVolume(volname, dir string) ClusterConfig {
cc.volumes[dir] = volname
cc.Volumes[dir] = volname
return cc
}

Expand Down
2 changes: 1 addition & 1 deletion dgraphtest/dgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (a *alpha) mounts(c *LocalCluster) ([]mount.Mount, error) {
})
}

for dir, vol := range c.conf.volumes {
for dir, vol := range c.conf.Volumes {
mounts = append(mounts, mount.Mount{
Type: mount.TypeVolume,
Source: vol,
Expand Down
21 changes: 14 additions & 7 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (c *LocalCluster) init() error {
return errors.Wrap(err, "error while making binariesPath")
}

for _, vol := range c.conf.volumes {
for _, vol := range c.conf.Volumes {
if err := c.createVolume(vol); err != nil {
return err
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func (c *LocalCluster) Cleanup(verbose bool) {

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
for _, vol := range c.conf.volumes {
for _, vol := range c.conf.Volumes {
if err := c.dcli.VolumeRemove(ctx, vol, true); err != nil {
log.Printf("[WARNING] error removing volume [%v]: %v", vol, err)
}
Expand Down Expand Up @@ -606,10 +606,6 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
return err
}

var encPath string
if c.conf.encryption {
encPath = encKeyMountPath
}
hc, err = c.HTTPClient()
if err != nil {
return errors.Wrapf(err, "error creating HTTP client after upgrade")
Expand All @@ -619,7 +615,7 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
return errors.Wrapf(err, "error during login after upgrade")
}
}
if err := hc.Restore(c, DefaultBackupDir, "", 0, 1, encPath); err != nil {
if err := hc.Restore(c, DefaultBackupDir, "", 0, 1, false, 0); err != nil {
return errors.Wrap(err, "error doing restore during upgrade")
}
if err := WaitForRestore(c); err != nil {
Expand Down Expand Up @@ -850,6 +846,17 @@ func (c *LocalCluster) GetVersion() string {
return c.conf.version
}

// GetEncKeyPath returns the path to the encryption key file when encryption is enabled.
// It returns an empty string otherwise. The path to the encryption file is valid only
// inside the alpha container.
func (c *LocalCluster) GetEncKeyPath() (string, error) {
if c.conf.encryption {
return encKeyMountPath, nil
}

return "", nil
}

func (c *LocalCluster) printAllLogs() error {
log.Printf("[INFO] all logs for cluster with prefix [%v] are below!", c.conf.prefix)
var finalErr error
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ require (
golang.org/x/text v0.12.0
golang.org/x/tools v0.9.3
google.golang.org/grpc v1.56.2
google.golang.org/protobuf v1.31.0
gopkg.in/square/go-jose.v2 v2.3.1
gopkg.in/yaml.v2 v2.4.0
)
Expand Down Expand Up @@ -143,6 +142,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc/examples v0.0.0-20230821201920-d51b3f41716d // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/DataDog/dd-trace-go.v1 v1.22.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
13 changes: 13 additions & 0 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,19 @@ const adminTypes = `
Set to true to allow backing up to S3 or Minio bucket that requires no credentials.
"""
anonymous: Boolean
"""
oneNamespaceOnly should be set if only one namespace is to be restored from
the backup. The namespace value should be set into the fromNamespace parameter.
The given namespace is always restored into namespace 0 in the cluster.
"""
oneNamespaceOnly: Boolean
"""
fromNamespace is the namespace that will be restored into the namespace 0 of the cluster.
If oneNamespaceOnly is set, only the fromNamespace will be restored into the new cluster.
"""
fromNamespace: UInt64
}
type RestorePayload {
Expand Down
6 changes: 5 additions & 1 deletion graphql/admin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type restoreInput struct {
VaultPath string
VaultField string
VaultFormat string
OneNamespaceOnly bool
FromNamespace uint64
}

func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
Expand Down Expand Up @@ -75,6 +77,8 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,
VaultPath: input.VaultPath,
VaultField: input.VaultField,
VaultFormat: input.VaultFormat,
OneNamespaceOnly: input.OneNamespaceOnly,
FromNamespace: input.FromNamespace,
}

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -116,10 +120,10 @@ func getRestoreInput(m schema.Mutation) (*restoreInput, error) {
if err := json.Unmarshal(inputByts, &input); err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}

if input.BackupNum < 0 {
err := errors.Errorf("backupNum value should be equal or greater than zero")
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}

return &input, nil
}
2 changes: 2 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ message RestoreRequest {
uint64 backup_num = 16;
uint64 incremental_from = 17;
bool is_partial = 18;
bool one_namespace_only = 19;
uint64 from_namespace = 20;
}

message Proposal {
Expand Down
Loading

0 comments on commit 979facf

Please sign in to comment.