Skip to content

Commit

Permalink
Added Disk Mapping feature (fix #162)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexAkulov committed Apr 26, 2021
1 parent 5b1f9bf commit 99a2144
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 359 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type ClickHouseConfig struct {
Password string `yaml:"password" envconfig:"CLICKHOUSE_PASSWORD"`
Host string `yaml:"host" envconfig:"CLICKHOUSE_HOST"`
Port uint `yaml:"port" envconfig:"CLICKHOUSE_PORT"`
DiskMapping map[string]string `yaml:"disk_mapping" envconfig:"CLICKHOUSE_DISKS"`
DiskMapping map[string]string `yaml:"disk_mapping" envconfig:"CLICKHOUSE_DISK_MAPPING"`
SkipTables []string `yaml:"skip_tables" envconfig:"CLICKHOUSE_SKIP_TABLES"`
Timeout string `yaml:"timeout" envconfig:"CLICKHOUSE_TIMEOUT"`
FreezeByPart bool `yaml:"freeze_by_part" envconfig:"CLICKHOUSE_FREEZE_BY_PART"`
Expand Down
8 changes: 4 additions & 4 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func Download(cfg *config.Config, backupName string, tablePattern string, schema
if !schemaOnly {
for _, t := range tableMetadataForDownload {
for disk := range t.Parts {
if _, err := getPathByDiskName(cfg.ClickHouse.DiskMapping, diskMap, disk); err != nil {
return err
if _, ok := diskMap[disk]; !ok {
return fmt.Errorf("table '%s.%s' require disk '%s' that not found in clickhouse, you can add nonexistent disks to disk_mapping config", t.Database, t.Table, disk)
}
}
}
Expand All @@ -160,7 +160,7 @@ func Download(cfg *config.Config, backupName string, tablePattern string, schema
log := log.WithField("table", fmt.Sprintf("%s.%s", tableMetadata.Database, tableMetadata.Table))
if remoteBackup.DataFormat != "directory" {
for disk := range tableMetadata.Files {
diskPath, _ := getPathByDiskName(cfg.ClickHouse.DiskMapping, diskMap, disk)
diskPath := diskMap[disk]
tableLocalDir := path.Join(diskPath, "backup", backupName, "shadow", uuid, disk)
for _, archiveFile := range tableMetadata.Files[disk] {
tableRemoteFile := path.Join(backupName, "shadow", clickhouse.TablePathEncode(tableMetadata.Database), clickhouse.TablePathEncode(tableMetadata.Table), archiveFile)
Expand All @@ -173,7 +173,7 @@ func Download(cfg *config.Config, backupName string, tablePattern string, schema
}
for disk := range tableMetadata.Parts {
tableRemotePath := path.Join(backupName, "shadow", uuid, disk)
diskPath, _ := getPathByDiskName(cfg.ClickHouse.DiskMapping, diskMap, disk)
diskPath := diskMap[disk]
tableLocalDir := path.Join(diskPath, "backup", backupName, "shadow", uuid, disk)
if err := bd.DownloadPath(0, tableRemotePath, tableLocalDir); err != nil {
return err
Expand Down
176 changes: 88 additions & 88 deletions pkg/backup/flashback.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,93 +60,93 @@ func CopyPartHashes(cfg config.Config, tablePattern string, backupName string) e
}

// Flashback - restore tables matched by tablePattern from backupName by restroing only modified parts.
func Flashback(cfg *config.Config, backupName string, tablePattern string) error {
/*if schemaOnly || (schemaOnly == dataOnly) {
err := restoreSchema(config, backupName, tablePattern)
if err != nil {
return err
}
}
if dataOnly || (schemaOnly == dataOnly) {
err := RestoreData(config, backupName, tablePattern)
if err != nil {
return err
}
}*/

err := FlashBackData(cfg, backupName, tablePattern)
if err != nil {
return err
}
return nil
}
// func Flashback(cfg *config.Config, backupName string, tablePattern string) error {
// /*if schemaOnly || (schemaOnly == dataOnly) {
// err := restoreSchema(config, backupName, tablePattern)
// if err != nil {
// return err
// }
// }
// if dataOnly || (schemaOnly == dataOnly) {
// err := RestoreData(config, backupName, tablePattern)
// if err != nil {
// return err
// }
// }*/

// err := FlashBackData(cfg, backupName, tablePattern)
// if err != nil {
// return err
// }
// return nil
// }

// FlashBackData - restore data for tables matched by tablePattern from backupName
func FlashBackData(cfg *config.Config, backupName string, tablePattern string) error {
if backupName == "" {
PrintLocalBackups(cfg, "all")
return fmt.Errorf("select backup for restore")
}

ch := &clickhouse.ClickHouse{
Config: &cfg.ClickHouse,
}
if err := ch.Connect(); err != nil {
return fmt.Errorf("can't connect to clickhouse: %v", err)
}
defer ch.Close()

allBackupTables, err := ch.GetBackupTablesLegacy(backupName)
if err != nil {
return err
}

restoreTables := parseTablePatternForRestoreData(allBackupTables, tablePattern)

liveTables, err := ch.GetTables()

if err != nil {
return err
}
if len(restoreTables) == 0 {
return fmt.Errorf("backup doesn't have tables to restore")
}

missingTables := []string{}

for _, restoreTable := range restoreTables {
found := false
for _, liveTable := range liveTables {
if (restoreTable.Database == liveTable.Database) && (restoreTable.Table == liveTable.Name) {
found = true
break
}
}
if !found {
missingTables = append(missingTables, fmt.Sprintf("%s.%s", restoreTable.Database, restoreTable.Table))

for _, newtable := range missingTables {
//log.Printf("newtable=%s", newtable)
if err := RestoreSchema(cfg, backupName, newtable, true); err != nil {
return err
}
}

FlashBackData(cfg, backupName, tablePattern)
return nil
}
}

diffInfos, _ := ch.ComputePartitionsDelta(restoreTables, liveTables)
for _, tableDiff := range diffInfos {

if err := ch.CopyDataDiff(tableDiff); err != nil {
return fmt.Errorf("can't restore '%s.%s': %v", tableDiff.BTable.Database, tableDiff.BTable.Table, err)
}

if err := ch.ApplyPartitionsChanges(tableDiff); err != nil {
return fmt.Errorf("can't attach partitions for table '%s.%s': %v", tableDiff.BTable.Database, tableDiff.BTable.Table, err)
}
}
return nil
}
// func FlashBackData(cfg *config.Config, backupName string, tablePattern string) error {
// if backupName == "" {
// PrintLocalBackups(cfg, "all")
// return fmt.Errorf("select backup for restore")
// }

// ch := &clickhouse.ClickHouse{
// Config: &cfg.ClickHouse,
// }
// if err := ch.Connect(); err != nil {
// return fmt.Errorf("can't connect to clickhouse: %v", err)
// }
// defer ch.Close()

// allBackupTables, err := ch.GetBackupTablesLegacy(backupName)
// if err != nil {
// return err
// }

// restoreTables := parseTablePatternForRestoreData(allBackupTables, tablePattern)

// liveTables, err := ch.GetTables()

// if err != nil {
// return err
// }
// if len(restoreTables) == 0 {
// return fmt.Errorf("backup doesn't have tables to restore")
// }

// missingTables := []string{}

// for _, restoreTable := range restoreTables {
// found := false
// for _, liveTable := range liveTables {
// if (restoreTable.Database == liveTable.Database) && (restoreTable.Table == liveTable.Name) {
// found = true
// break
// }
// }
// if !found {
// missingTables = append(missingTables, fmt.Sprintf("%s.%s", restoreTable.Database, restoreTable.Table))

// for _, newtable := range missingTables {
// //log.Printf("newtable=%s", newtable)
// if err := RestoreSchema(cfg, backupName, newtable, true); err != nil {
// return err
// }
// }

// FlashBackData(cfg, backupName, tablePattern)
// return nil
// }
// }

// diffInfos, _ := ch.ComputePartitionsDelta(restoreTables, liveTables)
// for _, tableDiff := range diffInfos {

// if err := ch.CopyDataDiff(tableDiff); err != nil {
// return fmt.Errorf("can't restore '%s.%s': %v", tableDiff.BTable.Database, tableDiff.BTable.Table, err)
// }

// if err := ch.ApplyPartitionsChanges(tableDiff); err != nil {
// return fmt.Errorf("can't attach partitions for table '%s.%s': %v", tableDiff.BTable.Database, tableDiff.BTable.Table, err)
// }
// }
// return nil
// }
12 changes: 11 additions & 1 deletion pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,17 @@ func RestoreData(cfg *config.Config, backupName string, tablePattern string) err
if err != nil {
return err
}
// TODO: проверить все ли диски в КХ которые в бэкапе
diskMap := map[string]string{}
for _, disk := range disks {
diskMap[disk.Name] = disk.Path
}
for _, t := range tablesForRestore {
for disk := range t.Parts {
if _, ok := diskMap[disk]; !ok {
return fmt.Errorf("table '%s.%s' require disk '%s' that not found in clickhouse, you can add nonexistent disks to disk_mapping config", t.Database, t.Table, disk)
}
}
}
dstTablesMap := map[metadata.TableTitle]clickhouse.Table{}
for i := range chTables {
dstTablesMap[metadata.TableTitle{
Expand Down
11 changes: 9 additions & 2 deletions pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,18 @@ func Upload(cfg *config.Config, backupName string, tablePattern string, diffFrom
if err := bd.Connect(); err != nil {
return fmt.Errorf("can't connect to %s: %v", bd.Kind(), err)
}

if _, err := getLocalBackup(cfg, backupName); err != nil {
return fmt.Errorf("can't upload: %v", err)
}
remoteBackups, err := bd.BackupList()
if err != nil {
return err
}
for i := range remoteBackups {
if backupName == remoteBackups[i].BackupName {
return fmt.Errorf("'%s' already exists on remote", backupName)
}
}
defaulDataPath, err := ch.GetDefaultPath()
if err != nil {
return ErrUnknownClickhouseDataPath
Expand All @@ -65,7 +73,6 @@ func Upload(cfg *config.Config, backupName string, tablePattern string, diffFrom
for _, disk := range disks {
diskMap[disk.Name] = disk.Path
}
// TODO: проверяем существует ли бэкап на удалённом сторадже
metadataPath := path.Join(defaulDataPath, "backup", backupName, "metadata")
if _, err := os.Stat(metadataPath); err != nil {
return err
Expand Down
11 changes: 0 additions & 11 deletions pkg/backup/utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package backup

import (
"fmt"
"io"
"os"
"path"
Expand Down Expand Up @@ -51,13 +50,3 @@ func copyFile(srcFile string, dstFile string) error {
_, err = io.Copy(dst, src)
return err
}

func getPathByDiskName(diskMapConfig map[string]string, chDiskMap map[string]string, diskName string) (string, error) {
if p, ok := diskMapConfig[diskName]; ok {
return p, nil
}
if p, ok := chDiskMap[diskName]; ok {
return p, nil
}
return "", fmt.Errorf("disk '%s' not found in clickhouse, you can add nonexistent disks to disk_mapping config", diskName)
}
34 changes: 28 additions & 6 deletions pkg/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,40 @@ func (ch *ClickHouse) Connect() error {

// GetDisks - return data from system.disks table
func (ch *ClickHouse) GetDisks() ([]Disk, error) {
if ch.disks != nil {
return ch.disks, nil
}
version, err := ch.GetVersion()
if err != nil {
return nil, err
}
var disks []Disk
if version < 19015000 {
return ch.getDataPathFromSystemSettings()
disks, err = ch.getDataPathFromSystemSettings()
} else {
disks, err = ch.getDataPathFromSystemDisks()
}
if err != nil {
return nil, err
}
if len(ch.Config.DiskMapping) == 0 {
return disks, nil
}
dm := map[string]string{}
for k, v := range ch.Config.DiskMapping {
dm[k] = v
}
for i := range disks {
if p, ok := dm[disks[i].Name]; ok {
disks[i].Path = p
delete(dm, disks[i].Name)
}
}
for k, v := range dm {
disks = append(disks, Disk{
Name: k,
Path: v,
Type: "local",
})
}
return ch.getDataPathFromSystemDisks()
return disks, nil
}

func (ch *ClickHouse) GetDefaultPath() (string, error) {
Expand Down Expand Up @@ -531,7 +554,6 @@ func (ch *ClickHouse) GetPartitions(database, table string) (map[string][]metada
parts[i] = metadata.Part{
Partition: partitions[i].Partition,
Name: partitions[i].Name,
Path: partitions[i].Path, // TODO: ???
HashOfAllFiles: partitions[i].HashOfAllFiles,
HashOfUncompressedFiles: partitions[i].HashOfUncompressedFiles,
UncompressedHashOfCompressedFiles: partitions[i].UncompressedHashOfCompressedFiles,
Expand Down
Loading

0 comments on commit 99a2144

Please sign in to comment.