Skip to content

Commit

Permalink
[Kopia wrappers - A] Refactoring Kopia package (#1546)
Browse files Browse the repository at this point in the history
* Add command logsafe

* Add utility functions

* Created sub-packages for repository and snapshot wrapper methods

* Rename OpenRepository, DeleteSnapshot and reportSnapshotStatus methods

* Convert constants and functions from local to global

* Fix constant in pkg/kopia/snapshot/stream.go

* Fix const description in pkg/kopia/utils.go

* Add Client keyword in constants and rename kankopia import alias in pkg/stream/stream.go

* Resolve cyclic import statements

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
shlokc9 and mergify[bot] authored Aug 2, 2022
1 parent 6e8b881 commit 9198399
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 62 deletions.
6 changes: 3 additions & 3 deletions pkg/kando/location_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/spf13/cobra"

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/kopia"
"github.com/kanisterio/kanister/pkg/kopia/snapshot"
"github.com/kanisterio/kanister/pkg/location"
"github.com/kanisterio/kanister/pkg/param"
)
Expand Down Expand Up @@ -52,7 +52,7 @@ func runLocationDelete(cmd *cobra.Command) error {
if snapJSON == "" {
return errors.New("kopia snapshot information is required to delete data using kopia")
}
kopiaSnap, err := kopia.UnmarshalKopiaSnapshot(snapJSON)
kopiaSnap, err := snapshot.UnmarshalKopiaSnapshot(snapJSON)
if err != nil {
return err
}
Expand All @@ -66,7 +66,7 @@ func runLocationDelete(cmd *cobra.Command) error {

// kopiaLocationDelete deletes the kopia snapshot with given backupID
func kopiaLocationDelete(ctx context.Context, backupID, path, password string) error {
return kopia.DeleteSnapshot(ctx, backupID, path, password)
return snapshot.Delete(ctx, backupID, path, password)
}

func locationDelete(ctx context.Context, p *param.Profile, path string) error {
Expand Down
10 changes: 6 additions & 4 deletions pkg/kando/location_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/kopia"
"github.com/kanisterio/kanister/pkg/kopia/repository"
"github.com/kanisterio/kanister/pkg/kopia/snapshot"
"github.com/kanisterio/kanister/pkg/location"
"github.com/kanisterio/kanister/pkg/param"
)
Expand Down Expand Up @@ -62,7 +64,7 @@ func runLocationPull(cmd *cobra.Command, args []string) error {
if snapJSON == "" {
return errors.New("kopia snapshot information is required to pull data using kopia")
}
kopiaSnap, err := kopia.UnmarshalKopiaSnapshot(snapJSON)
kopiaSnap, err := snapshot.UnmarshalKopiaSnapshot(snapJSON)
if err != nil {
return err
}
Expand Down Expand Up @@ -93,17 +95,17 @@ func locationPull(ctx context.Context, p *param.Profile, path string, target io.
func kopiaLocationPull(ctx context.Context, backupID, path, targetPath, password string) error {
switch targetPath {
case usePipeParam:
return kopia.Read(ctx, os.Stdout, backupID, path, password)
return snapshot.Read(ctx, os.Stdout, backupID, path, password)
default:
return kopia.ReadFile(ctx, backupID, targetPath, password)
return snapshot.ReadFile(ctx, backupID, targetPath, password)
}
}

// connectToKopiaServer connects to the kopia server with given creds
func connectToKopiaServer(ctx context.Context, kp *param.Profile) error {
contentCacheSize := kopia.GetDataStoreGeneralContentCacheSize(kp.Credential.KopiaServerSecret.ConnectOptions)
metadataCacheSize := kopia.GetDataStoreGeneralMetadataCacheSize(kp.Credential.KopiaServerSecret.ConnectOptions)
return kopia.ConnectToAPIServer(
return repository.ConnectToAPIServer(
ctx,
kp.Credential.KopiaServerSecret.Cert,
kp.Credential.KopiaServerSecret.Password,
Expand Down
10 changes: 5 additions & 5 deletions pkg/kando/location_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/spf13/cobra"

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/kopia"
"github.com/kanisterio/kanister/pkg/kopia/snapshot"
"github.com/kanisterio/kanister/pkg/location"
"github.com/kanisterio/kanister/pkg/output"
"github.com/kanisterio/kanister/pkg/param"
Expand Down Expand Up @@ -95,19 +95,19 @@ func locationPush(ctx context.Context, p *param.Profile, path string, source io.

// kopiaLocationPush pushes the data from the source using a kopia snapshot
func kopiaLocationPush(ctx context.Context, path, outputName, sourcePath, password string) error {
var snapInfo *kopia.SnapshotInfo
var snapInfo *snapshot.SnapshotInfo
var err error
switch sourcePath {
case usePipeParam:
snapInfo, err = kopia.Write(ctx, os.Stdin, path, password)
snapInfo, err = snapshot.Write(ctx, os.Stdin, path, password)
default:
snapInfo, err = kopia.WriteFile(ctx, path, sourcePath, password)
snapInfo, err = snapshot.WriteFile(ctx, path, sourcePath, password)
}
if err != nil {
return errors.Wrap(err, "Failed to push data using kopia")
}

snapInfoJSON, err := kopia.MarshalKopiaSnapshot(snapInfo)
snapInfoJSON, err := snapshot.MarshalKopiaSnapshot(snapInfo)
if err != nil {
return err
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/kopia/connect.go → pkg/kopia/repository/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kopia
package repository

import (
"context"
Expand All @@ -26,6 +26,7 @@ import (
"github.com/kopia/kopia/repo/content"
"github.com/pkg/errors"

"github.com/kanisterio/kanister/pkg/kopia"
"github.com/kanisterio/kanister/pkg/log"
"github.com/kanisterio/kanister/pkg/poll"
)
Expand All @@ -47,7 +48,7 @@ func ConnectToAPIServer(
metadataCacheMB int,
) error {
// Extra fingerprint from the TLS Certificate secret
fingerprint, err := ExtractFingerprintFromCertificate(tlsCert)
fingerprint, err := kopia.ExtractFingerprintFromCertificate(tlsCert)
if err != nil {
return errors.Wrap(err, "Failed to extract fingerprint from Kopia API Server Certificate Secret")
}
Expand All @@ -61,7 +62,7 @@ func ConnectToAPIServer(

opts := &repo.ConnectOptions{
CachingOptions: content.CachingOptions{
CacheDirectory: defaultCacheDirectory,
CacheDirectory: kopia.DefaultClientCacheDirectory,
MaxCacheSizeBytes: int64(contentCacheMB << 20),
MaxMetadataCacheSizeBytes: int64(metadataCacheMB << 20),
MaxListCacheDuration: content.DurationSeconds(defaultConnectMaxListCacheDuration.Seconds()),
Expand All @@ -79,7 +80,7 @@ func ConnectToAPIServer(
Max: 15 * time.Second,
}, func(c context.Context) (bool, error) {
// TODO(@pavan): Modify this to use custom config file path, if required
err := repo.ConnectAPIServer(ctx, defaultConfigFilePath, serverInfo, userPassphrase, opts)
err := repo.ConnectAPIServer(ctx, kopia.DefaultClientConfigFilePath, serverInfo, userPassphrase, opts)
switch {
case isGetRepoParametersError(err):
log.Debug().WithError(err).Print("Connecting to the Kopia API Server")
Expand All @@ -92,10 +93,10 @@ func ConnectToAPIServer(
return errors.Wrap(err, "Failed connecting to the Kopia API Server")
}

// OpenRepository connects to the kopia repository based on the config stored in the config file
// Open connects to the kopia repository based on the config stored in the config file
// NOTE: This assumes that `kopia repository connect` has been already run on the machine
// OR the above Connect function has been used to connect to the repository server
func OpenRepository(ctx context.Context, configFile, password, purpose string) (repo.RepositoryWriter, error) {
func Open(ctx context.Context, configFile, password, purpose string) (repo.RepositoryWriter, error) {
repoConfig := repositoryConfigFileName(configFile)
if _, err := os.Stat(repoConfig); os.IsNotExist(err) {
return nil, errors.New("Failed find kopia configuration file")
Expand Down
38 changes: 32 additions & 6 deletions pkg/kopia/snapshot.go → pkg/kopia/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kopia
package snapshot

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
Expand All @@ -27,6 +28,9 @@ import (
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"

"github.com/kanisterio/kanister/pkg/kopia"
"github.com/kanisterio/kanister/pkg/kopia/repository"
)

// SnapshotSource creates and uploads a kopia snapshot to the given repository
Expand Down Expand Up @@ -76,10 +80,10 @@ func SnapshotSource(
return "", 0, errors.Wrap(ferr, "Failed to flush kopia repository")
}

return reportSnapshotStatus(ctx, snapshotStartTime, manifest)
return reportStatus(ctx, snapshotStartTime, manifest)
}

func reportSnapshotStatus(ctx context.Context, snapshotStartTime time.Time, manifest *snapshot.Manifest) (string, int64, error) {
func reportStatus(ctx context.Context, snapshotStartTime time.Time, manifest *snapshot.Manifest) (string, int64, error) {
manifestID := manifest.ID
snapSize := manifest.Stats.TotalFileSize

Expand All @@ -99,9 +103,9 @@ func reportSnapshotStatus(ctx context.Context, snapshotStartTime time.Time, mani
return string(manifestID), snapSize, nil
}

// DeleteSnapshot deletes Kopia snapshot with given manifest ID
func DeleteSnapshot(ctx context.Context, backupID, path, password string) error {
rep, err := OpenRepository(ctx, defaultConfigFilePath, password, pullRepoPurpose)
// Delete deletes Kopia snapshot with given manifest ID
func Delete(ctx context.Context, backupID, path, password string) error {
rep, err := repository.Open(ctx, kopia.DefaultClientConfigFilePath, password, pullRepoPurpose)
if err != nil {
return errors.Wrap(err, "Failed to open kopia repository")
}
Expand Down Expand Up @@ -145,3 +149,25 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour

return result, nil
}

// MarshalKopiaSnapshot encodes kopia SnapshotInfo struct into a string
func MarshalKopiaSnapshot(snapInfo *SnapshotInfo) (string, error) {
if err := snapInfo.Validate(); err != nil {
return "", err
}
snap, err := json.Marshal(snapInfo)
if err != nil {
return "", errors.Wrap(err, "failed to marshal kopia snapshot information")
}

return string(snap), nil
}

// UnmarshalKopiaSnapshot decodes a kopia snapshot JSON string into SnapshotInfo struct
func UnmarshalKopiaSnapshot(snapInfoJSON string) (SnapshotInfo, error) {
snap := SnapshotInfo{}
if err := json.Unmarshal([]byte(snapInfoJSON), &snap); err != nil {
return snap, errors.Wrap(err, "failed to unmarshal kopia snapshot information")
}
return snap, snap.Validate()
}
15 changes: 9 additions & 6 deletions pkg/kopia/stream.go → pkg/kopia/snapshot/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kopia
package snapshot

import (
"context"
Expand All @@ -28,6 +28,9 @@ import (
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"

"github.com/kanisterio/kanister/pkg/kopia"
"github.com/kanisterio/kanister/pkg/kopia/repository"
)

const (
Expand Down Expand Up @@ -67,7 +70,7 @@ func (si *SnapshotInfo) Validate() error {
// A virtual directory tree rooted at filepath.Dir(path) is created with
// a kopia streaming file with filepath.Base(path) as name
func Write(ctx context.Context, source io.Reader, path, password string) (*SnapshotInfo, error) {
rep, err := OpenRepository(ctx, defaultConfigFilePath, password, pushRepoPurpose)
rep, err := repository.Open(ctx, kopia.DefaultClientConfigFilePath, password, pushRepoPurpose)
if err != nil {
return nil, errors.Wrap(err, "Failed to open kopia repository")
}
Expand Down Expand Up @@ -113,7 +116,7 @@ func Write(ctx context.Context, source io.Reader, path, password string) (*Snaps

// WriteFile creates a kopia snapshot from the given source file
func WriteFile(ctx context.Context, path, sourcePath, password string) (*SnapshotInfo, error) {
rep, err := OpenRepository(ctx, defaultConfigFilePath, password, pushRepoPurpose)
rep, err := repository.Open(ctx, kopia.DefaultClientConfigFilePath, password, pushRepoPurpose)
if err != nil {
return nil, errors.Wrap(err, "Failed to open kopia repository")
}
Expand Down Expand Up @@ -181,13 +184,13 @@ func resolveSymlink(path string) (string, error) {

// Read reads a kopia snapshot with the given ID and copies it to the given target
func Read(ctx context.Context, target io.Writer, backupID, path, password string) error {
rep, err := OpenRepository(ctx, defaultConfigFilePath, password, pullRepoPurpose)
rep, err := repository.Open(ctx, kopia.DefaultClientConfigFilePath, password, pullRepoPurpose)
if err != nil {
return errors.Wrap(err, "Failed to open kopia repository")
}

// Get the kopia object ID belonging to the streaming file
oid, err := getStreamingFileObjectIDFromSnapshot(ctx, rep, path, backupID)
oid, err := kopia.GetStreamingFileObjectIDFromSnapshot(ctx, rep, path, backupID)
if err != nil {
return err
}
Expand All @@ -207,7 +210,7 @@ func Read(ctx context.Context, target io.Writer, backupID, path, password string

// ReadFile restores a kopia snapshot with the given ID to the given target
func ReadFile(ctx context.Context, backupID, target, password string) error {
rep, err := OpenRepository(ctx, defaultConfigFilePath, password, pullRepoPurpose)
rep, err := repository.Open(ctx, kopia.DefaultClientConfigFilePath, password, pullRepoPurpose)
if err != nil {
return errors.Wrap(err, "Failed to open kopia repository")
}
Expand Down
34 changes: 6 additions & 28 deletions pkg/kopia/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import (
)

const (
// defaultConfigFilePath is the file which contains kopia repo config
defaultConfigFilePath = "/tmp/kopia-repository.config"
// DefaultClientConfigFilePath is the file which contains kopia repo config
DefaultClientConfigFilePath = "/tmp/kopia-repository.config"

// defaultCacheDirectory is the directory where kopia content cache is created
defaultCacheDirectory = "/tmp/kopia-cache"
// DefaultClientCacheDirectory is the directory where kopia content cache is created
DefaultClientCacheDirectory = "/tmp/kopia-cache"

// defaultDataStoreGeneralContentCacheSizeMB is the default content cache size for general command workloads
defaultDataStoreGeneralContentCacheSizeMB = 0
Expand Down Expand Up @@ -140,8 +140,8 @@ func ExtractFingerprintFromCertificate(cert string) (string, error) {
return fingerprint, nil
}

// getStreamingFileObjectIDFromSnapshot returns the kopia object ID of the fs.StreamingFile object from the repository
func getStreamingFileObjectIDFromSnapshot(ctx context.Context, rep repo.Repository, path, backupID string) (object.ID, error) {
// GetStreamingFileObjectIDFromSnapshot returns the kopia object ID of the fs.StreamingFile object from the repository
func GetStreamingFileObjectIDFromSnapshot(ctx context.Context, rep repo.Repository, path, backupID string) (object.ID, error) {
// Example: if the path from the blueprint is `/mysql-backups/1/2/mysqldump.sql`, the given backupID
// belongs to the root entry `/mysql-backups/1/2` with `mysqldump.sql` as a nested entry.
// The goal here is to find the nested entry and extract the object ID
Expand Down Expand Up @@ -191,25 +191,3 @@ func GetDataStoreGeneralMetadataCacheSize(opt map[string]int) int {
}
return defaultDataStoreGeneralMetadataCacheSizeMB
}

// MarshalKopiaSnapshot encodes kopia SnapshotInfo struct into a string
func MarshalKopiaSnapshot(snapInfo *SnapshotInfo) (string, error) {
if err := snapInfo.Validate(); err != nil {
return "", err
}
snap, err := json.Marshal(snapInfo)
if err != nil {
return "", errors.Wrap(err, "failed to marshal kopia snapshot information")
}

return string(snap), nil
}

// UnmarshalKopiaSnapshot decodes a kopia snapshot JSON string into SnapshotInfo struct
func UnmarshalKopiaSnapshot(snapInfoJSON string) (SnapshotInfo, error) {
snap := SnapshotInfo{}
if err := json.Unmarshal([]byte(snapInfoJSON), &snap); err != nil {
return snap, errors.Wrap(err, "failed to unmarshal kopia snapshot information")
}
return snap, snap.Validate()
}
Loading

0 comments on commit 9198399

Please sign in to comment.