Skip to content

Commit

Permalink
[WIP] refactoring restoreDataRegular to allow restore data via ATTACH…
Browse files Browse the repository at this point in the history
… TABLE, fix #529
  • Loading branch information
Slach committed May 16, 2023
1 parent 17f9f5f commit 94116ec
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 162 deletions.
8 changes: 6 additions & 2 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# v2.3.0
IMPROVEMENTS
- allow backup and properly restore table with system.mutations is_done=0 status. fix [529](https://github.com/AlexAkulov/clickhouse-backup/issues/529)
- add `CLICKHOUSE_BACKUP_MUTATIONS` and `CLICKHOUSE_RESTORE_AS_ATTACH` config options to allow backup and properly restore table with system.mutations is_done=0 status. fix [529](https://github.com/AlexAkulov/clickhouse-backup/issues/529)
- add test coverage reports for unit, testflows and integration tests, fix [644](https://github.com/AlexAkulov/clickhouse-backup/issues/644)

BUG FIXES
- apply SETTINGS check_table_dependencies=0 to DROP DATABASE statement, when pass `--ignore-dependencies` together with `--rm` in `restore` command, fix [651](https://github.com/AlexAkulov/clickhouse-backup/issues/651)
- apply `SETTINGS check_table_dependencies=0` to `DROP DATABASE` statement, when pass `--ignore-dependencies` together with `--rm` in `restore` command, fix [651](https://github.com/AlexAkulov/clickhouse-backup/issues/651)

# v2.2.6
BUG FIXES
- fix panic for resume upload after restart API server for boolean parameters, fix [653](https://github.com/AlexAkulov/clickhouse-backup/issues/653)

# v2.2.5
BUG FIXES
Expand Down
8 changes: 5 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ ARG CLICKHOUSE_VERSION=latest
ARG CLICKHOUSE_IMAGE=clickhouse/clickhouse-server
FROM ${CLICKHOUSE_IMAGE}:${CLICKHOUSE_VERSION} AS builder-base

USER root
RUN rm -fv /etc/apt/sources.list.d/clickhouse.list && \
apt-get install -y gnupg && apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 52B59B1571A79DBC054901C0F6BC817356A3D45E && \
( apt-get update || true ) && \
apt-get install -y gnupg ca-certificates && apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 52B59B1571A79DBC054901C0F6BC817356A3D45E && \
DISTRIB_CODENAME=$(cat /etc/lsb-release | grep DISTRIB_CODENAME | cut -d "=" -f 2) && \
echo ${DISTRIB_CODENAME} && \
echo "deb https://ppa.launchpadcontent.net/longsleep/golang-backports/ubuntu ${DISTRIB_CODENAME} main" > /etc/apt/sources.list.d/golang.list && \
echo "deb-src https://ppa.launchpadcontent.net/longsleep/golang-backports/ubuntu ${DISTRIB_CODENAME} main" >> /etc/apt/sources.list.d/golang.list && \
apt-get update && \
apt-get install -y golang-1.20 make git && \
( apt-get update || true ) && \
apt-get install -y --no-install-recommends golang-1.20 make git gcc && \
mkdir -p /root/go/

RUN ln -nsfv /usr/lib/go-1.20/bin/go /usr/bin/go
Expand Down
1 change: 1 addition & 0 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ clickhouse:
check_replicas_before_attach: true # CLICKHOUSE_CHECK_REPLICAS_BEFORE_ATTACH, helps avoiding concurrent ATTACH PART execution when restoring ReplicatedMergeTree tables
use_embedded_backup_restore: false # CLICKHOUSE_USE_EMBEDDED_BACKUP_RESTORE, use BACKUP / RESTORE SQL statements instead of regular SQL queries to use features of modern ClickHouse server versions
backup_mutations: true # CLICKHOUSE_BACKUP_MUTATIONS, allow backup mutations from system.mutations WHERE is_done AND apply it during restore
restore_as_attach: true # CLICKHOUSE_RESTORE_AS_ATTACH, allow restore tables which have inconsistent data parts structure and mutations in progress
azblob:
endpoint_suffix: "core.windows.net" # AZBLOB_ENDPOINT_SUFFIX
account_name: "" # AZBLOB_ACCOUNT_NAME
Expand Down
187 changes: 116 additions & 71 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,58 +537,80 @@ func (b *Backuper) restoreDataEmbedded(backupName string, tablesForRestore ListO

func (b *Backuper) restoreDataRegular(ctx context.Context, backupName string, tablePattern string, tablesForRestore ListOfTables, diskMap map[string]string, disks []clickhouse.Disk, log *apexLog.Entry) error {
if len(b.cfg.General.RestoreDatabaseMapping) > 0 {
for sourceDb, targetDb := range b.cfg.General.RestoreDatabaseMapping {
if tablePattern != "" {
sourceDbRE := regexp.MustCompile(fmt.Sprintf("(^%s.*)|(,%s.*)", sourceDb, sourceDb))
if sourceDbRE.MatchString(tablePattern) {
matches := sourceDbRE.FindAllStringSubmatch(tablePattern, -1)
substitution := targetDb + ".*"
if strings.HasPrefix(matches[0][1], ",") {
substitution = "," + substitution
}
tablePattern = sourceDbRE.ReplaceAllString(tablePattern, substitution)
} else {
tablePattern += "," + targetDb + ".*"
}
} else {
tablePattern += targetDb + ".*"
}
}
tablePattern = b.changeTablePatternFromRestoreDatabaseMapping(tablePattern)
}
chTables, err := b.ch.GetTables(ctx, tablePattern)
if err != nil {
return err
}
for _, t := range tablesForRestore {
for disk := range t.Parts {
if _, diskExists := diskMap[disk]; !diskExists {
log.Warnf("table '%s.%s' require disk '%s' that not found in clickhouse table system.disks, you can add nonexistent disks to `disk_mapping` in `clickhouse` config section, data will restored to %s", t.Database, t.Table, disk, diskMap["default"])
found := false
for _, d := range disks {
if d.Name == disk {
found = true
break
}
}
if !found {
newDisk := clickhouse.Disk{
Name: disk,
Path: diskMap["default"],
Type: "local",
}
disks = append(disks, newDisk)
}
disks = b.adjustDisksFromTablesWithSystemDisks(tablesForRestore, diskMap, log, disks)
dstTablesMap := b.prepareDstTablesMap(chTables)

missingTables := b.checkMissingTables(tablesForRestore, chTables)
if len(missingTables) > 0 {
return fmt.Errorf("%s is not created. Restore schema first or create missing tables manually", strings.Join(missingTables, ", "))
}

for i, table := range tablesForRestore {
// need mapped database path and original table.Database for HardlinkBackupPartsToStorage
dstDatabase := table.Database
if len(b.cfg.General.RestoreDatabaseMapping) > 0 {
if targetDB, isMapped := b.cfg.General.RestoreDatabaseMapping[table.Database]; isMapped {
dstDatabase = targetDB
tablesForRestore[i].Database = targetDB
}
}
log := log.WithField("table", fmt.Sprintf("%s.%s", dstDatabase, table.Table))
dstTable, ok := dstTablesMap[metadata.TableTitle{
Database: dstDatabase,
Table: table.Table}]
if !ok {
return fmt.Errorf("can't find '%s.%s' in current system.tables", dstDatabase, table.Table)
}
// https://github.com/AlexAkulov/clickhouse-backup/issues/529
if b.cfg.ClickHouse.RestoreAsAttach {
if err = b.restoreDataRegularByAttach(ctx, backupName, table, disks, dstTable, log, tablesForRestore, i); err != nil {
return err
}
} else {
if err = b.restoreDataRegularByParts(backupName, table, disks, dstTable, log, tablesForRestore, i); err != nil {
return err
}
}
// https://github.com/AlexAkulov/clickhouse-backup/issues/529
for _, mutation := range table.Mutations {
if err := b.ch.ApplyMutation(ctx, tablesForRestore[i], mutation); err != nil {
log.Warnf("can't apply mutation %s for table `%s`.`%s` : %v", mutation.Command, tablesForRestore[i].Database, tablesForRestore[i].Table, err)
}
}
log.Info("done")
}
dstTablesMap := map[metadata.TableTitle]clickhouse.Table{}
for i, chTable := range chTables {
dstTablesMap[metadata.TableTitle{
Database: chTables[i].Database,
Table: chTables[i].Name,
}] = chTable
return nil
}

func (b *Backuper) restoreDataRegularByAttach(ctx context.Context, backupName string, table metadata.TableMetadata, disks []clickhouse.Disk, dstTable clickhouse.Table, log *apexLog.Entry, tablesForRestore ListOfTables, i int) error {
if err := filesystemhelper.HardlinkBackupPartsToStorage(backupName, table, disks, dstTable.DataPaths, b.ch, false); err != nil {
return fmt.Errorf("can't copy data to storage '%s.%s': %v", table.Database, table.Table, err)
}
log.Debugf("data to 'storage' copied")
if err := b.ch.AttachTable(ctx, tablesForRestore[i]); err != nil {
return fmt.Errorf("can't attach table '%s.%s': %v", tablesForRestore[i].Database, tablesForRestore[i].Table, err)
}
return nil
}

func (b *Backuper) restoreDataRegularByParts(backupName string, table metadata.TableMetadata, disks []clickhouse.Disk, dstTable clickhouse.Table, log *apexLog.Entry, tablesForRestore ListOfTables, i int) error {
if err := filesystemhelper.HardlinkBackupPartsToStorage(backupName, table, disks, dstTable.DataPaths, b.ch, true); err != nil {
return fmt.Errorf("can't copy data to datached '%s.%s': %v", table.Database, table.Table, err)
}
log.Debugf("data to 'detached' copied")
if err := b.ch.AttachDataParts(tablesForRestore[i], disks); err != nil {
return fmt.Errorf("can't attach data parts for table '%s.%s': %v", tablesForRestore[i].Database, tablesForRestore[i].Table, err)
}
return nil
}

func (b *Backuper) checkMissingTables(tablesForRestore ListOfTables, chTables []clickhouse.Table) []string {
var missingTables []string
for _, table := range tablesForRestore {
dstDatabase := table.Database
Expand All @@ -608,42 +630,65 @@ func (b *Backuper) restoreDataRegular(ctx context.Context, backupName string, ta
missingTables = append(missingTables, fmt.Sprintf("'%s.%s'", dstDatabase, table.Table))
}
}
if len(missingTables) > 0 {
return fmt.Errorf("%s is not created. Restore schema first or create missing tables manually", strings.Join(missingTables, ", "))
return missingTables
}

func (b *Backuper) prepareDstTablesMap(chTables []clickhouse.Table) map[metadata.TableTitle]clickhouse.Table {
dstTablesMap := map[metadata.TableTitle]clickhouse.Table{}
for i, chTable := range chTables {
dstTablesMap[metadata.TableTitle{
Database: chTables[i].Database,
Table: chTables[i].Name,
}] = chTable
}
return dstTablesMap
}

for i, table := range tablesForRestore {
// need mapped database path and original table.Database for CopyDataToDetached
dstDatabase := table.Database
if len(b.cfg.General.RestoreDatabaseMapping) > 0 {
if targetDB, isMapped := b.cfg.General.RestoreDatabaseMapping[table.Database]; isMapped {
dstDatabase = targetDB
tablesForRestore[i].Database = targetDB
func (b *Backuper) adjustDisksFromTablesWithSystemDisks(tablesForRestore ListOfTables, diskMap map[string]string, log *apexLog.Entry, disks []clickhouse.Disk) []clickhouse.Disk {
for _, t := range tablesForRestore {
for disk := range t.Parts {
if _, diskExists := diskMap[disk]; !diskExists {
log.Warnf("table '%s.%s' require disk '%s' that not found in clickhouse table system.disks, you can add nonexistent disks to `disk_mapping` in `clickhouse` config section, data will restored to %s", t.Database, t.Table, disk, diskMap["default"])
found := false
for _, d := range disks {
if d.Name == disk {
found = true
break
}
}
if !found {
newDisk := clickhouse.Disk{
Name: disk,
Path: diskMap["default"],
Type: "local",
}
disks = append(disks, newDisk)
}
}
}
log := log.WithField("table", fmt.Sprintf("%s.%s", dstDatabase, table.Table))
dstTable, ok := dstTablesMap[metadata.TableTitle{
Database: dstDatabase,
Table: table.Table}]
if !ok {
return fmt.Errorf("can't find '%s.%s' in current system.tables", dstDatabase, table.Table)
}
if err := filesystemhelper.CopyDataToDetached(backupName, table, disks, dstTable.DataPaths, b.ch); err != nil {
return fmt.Errorf("can't restore '%s.%s': %v", table.Database, table.Table, err)
}
log.Debugf("copied data to 'detached'")
if err := b.ch.AttachPartitions(tablesForRestore[i], disks); err != nil {
return fmt.Errorf("can't attach partitions for table '%s.%s': %v", tablesForRestore[i].Database, tablesForRestore[i].Table, err)
}
// https://github.com/AlexAkulov/clickhouse-backup/issues/529
for _, mutation := range table.Mutations {
if err := b.ch.ApplyMutation(ctx, tablesForRestore[i], mutation); err != nil {
log.Warnf("can't apply mutation %s for table `%s`.`%s` : %v", mutation.Command, tablesForRestore[i].Database, tablesForRestore[i].Table, err)
}
return disks
}

func (b *Backuper) changeTablePatternFromRestoreDatabaseMapping(tablePattern string) string {
for sourceDb, targetDb := range b.cfg.General.RestoreDatabaseMapping {
if tablePattern != "" {
sourceDbRE := regexp.MustCompile(fmt.Sprintf("(^%s.*)|(,%s.*)", sourceDb, sourceDb))
if sourceDbRE.MatchString(tablePattern) {
matches := sourceDbRE.FindAllStringSubmatch(tablePattern, -1)
substitution := targetDb + ".*"
if strings.HasPrefix(matches[0][1], ",") {
substitution = "," + substitution
}
tablePattern = sourceDbRE.ReplaceAllString(tablePattern, substitution)
} else {
tablePattern += "," + targetDb + ".*"
}
} else {
tablePattern += targetDb + ".*"
}
log.Info("done")
}
return nil
return tablePattern
}

func (b *Backuper) restoreEmbedded(backupName string, restoreOnlySchema bool, tablesForRestore ListOfTables, partitions []string) error {
Expand Down
Loading

0 comments on commit 94116ec

Please sign in to comment.