diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_data_structure.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_data_structure.go index 65aba09b69..cba0cbc9a7 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_data_structure.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_data_structure.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "strconv" "sync" @@ -119,6 +120,12 @@ func (task *RedisDataStructure) Run() (err error) { return err } + // 检查文件是否齐全 + err = task.CheckFileList() + if err != nil { + return err + } + task.runtime.Logger.Info(task.params.RecoveryTimePoint) // 构造任务初始化 recoverTasks := make([]*datastructure.TendisInsRecoverTask, 0, len(task.params.SourcePorts)) @@ -223,3 +230,29 @@ func (task *RedisDataStructure) CheckRecoverDir() (err error) { task.runtime.Logger.Info("CheckRecoverDir:%s success", task.RecoverDir) return nil } + +// CheckFileList 检查需要的全备和增备是否都拉取齐全,避免出现缺失一个无法重试的情况 +func (task *RedisDataStructure) CheckFileList() error { + var filePath, msg string + fileAllOk := true + for _, file := range task.params.FullFileList { + filePath = filepath.Join(task.RecoverDir, file.FileName) + if _, err := os.Stat(filePath); os.IsNotExist(err) { + fileAllOk = false + msg = fmt.Sprintf("全备文件:%s 不存在", filePath) + task.runtime.Logger.Info(msg) + } + } + for _, file := range task.params.BinlogFileList { + filePath = filepath.Join(task.RecoverDir, file.FileName) + if _, err := os.Stat(filePath); os.IsNotExist(err) { + fileAllOk = false + msg = fmt.Sprintf("增备文件:%s 不存在", filePath) + task.runtime.Logger.Info(msg) + } + } + if !fileAllOk { + return fmt.Errorf("有文件未拉取!!!请检查后重试") + } + return nil +} diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/datastructure/fullback.go b/dbm-services/redis/db-tools/dbactuator/pkg/datastructure/fullback.go index fb3c9e073e..901856a7a4 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/datastructure/fullback.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/datastructure/fullback.go @@ -1264,7 +1264,7 @@ func (task *TendisInsRecoverTask) GetRocksdbBackupMeta(rocksIdx int) (meta *Rock // (其实 参数backupDir 和 full.SaveLocalDir full.SaveRemoteDir是同一个文件夹, 但是是不同视角) // NOCC:golint/fnsize(设计如此) func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string, - dstTendisPort int, dstTendisPasswd string) error { + dstTendisPort int, dstTendisPasswd string) { redisAddr := fmt.Sprintf("%s:%s", dstTendisIP, strconv.Itoa(dstTendisPort)) msg := fmt.Sprintf("master:%s开始导入全备", redisAddr) @@ -1272,14 +1272,15 @@ func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string, //再次探测tendisplus连接性 redisCli, err := myredis.NewRedisClient(redisAddr, dstTendisPasswd, 0, consts.TendisTypeTendisSSDInsance) if err != nil { - return err + full.Err = err + return } defer redisCli.Close() var infoRet map[string]string infoRet, full.Err = redisCli.Info("server") if full.Err != nil { - return full.Err + return } masterVersion := infoRet["redis_version"] mylog.Logger.Info("Get redis_version success redis_version:%s", masterVersion) @@ -1287,7 +1288,7 @@ func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string, var ssdDataDir string ssdDataDir, full.Err = redisCli.GetDir() if full.Err != nil { - return full.Err + return } // "Get SsdDataDir success SsdDataDir:/data1/redis/15000/data" mylog.Logger.Info("Get SsdDataDir success SsdDataDir:%s", ssdDataDir) @@ -1295,30 +1296,31 @@ func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string, //1、shutdown err = redisCli.Shutdown() if err != nil { - return err + full.Err = err + return } mylog.Logger.Info("master(%s) shutdown success", redisAddr) if full.LocalFullBackupDir == "" { - err = fmt.Errorf("全备文件夹不存在,请检查:%s", full.LocalFullBackupDir) - mylog.Logger.Error(err.Error()) - return err + full.Err = fmt.Errorf("全备文件夹不存在,请检查:%s", full.LocalFullBackupDir) + mylog.Logger.Error(full.Err.Error()) + return } mylog.Logger.Info("full.LocalFullBackupDir:%s", full.LocalFullBackupDir) fullFilePath := fmt.Sprintf("%v/%v", full.SaveDir, full.LocalFullBackupDir) if _, err := os.Stat(fullFilePath); os.IsNotExist(err) { - err = fmt.Errorf("全备文件夹不存在,请检查:%s", full.LocalFullBackupDir) - mylog.Logger.Error(err.Error()) - return err + full.Err = fmt.Errorf("全备文件夹不存在,请检查:%s", full.LocalFullBackupDir) + mylog.Logger.Error(full.Err.Error()) + return } DepsDir := "/usr/local/redis/bin/deps" if _, err := os.Stat(DepsDir); os.IsNotExist(err) { - err = fmt.Errorf("%s:不存在,请检查:%s", DepsDir, DepsDir) - mylog.Logger.Error(err.Error()) - return err + full.Err = fmt.Errorf("%s:不存在,请检查:%s", DepsDir, DepsDir) + mylog.Logger.Error(full.Err.Error()) + return } nowtime := time.Now().Local().Format(consts.FilenameTimeLayout) @@ -1339,11 +1341,11 @@ func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string, } else { full.Err = fmt.Errorf("unsupported tendis version:%s,exit.", masterVersion) mylog.Logger.Error(full.Err.Error()) - return full.Err + return } restoreTool := full.getRestoreTool(dstTendisIP, masterVersion, dstTendisPort) if full.Err != nil { - return full.Err + return } restoreCmd := fmt.Sprintf(` @@ -1358,7 +1360,7 @@ func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string, mylog.Logger.Info("restore command result:" + ret) if full.Err != nil { mylog.Logger.Error(fmt.Sprintf("恢复全备失败,详情:%v", err)) - return full.Err + return } if util.FileExists(rockdbDir) { @@ -1366,14 +1368,15 @@ func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string, } else { full.Err = fmt.Errorf("restore command failed, %s not generated", rockdbDir) mylog.Logger.Error(full.Err.Error()) - return full.Err + return } util.LocalDirChownMysql(rockdbDir) ret01 := strings.TrimSpace(ret) if strings.Contains(ret01, "ERR:") == true { - mylog.Logger.Error(fmt.Sprintf("恢复全备失败,err:%v,cmd:%s", err, restoreCmd)) - return full.Err + full.Err = fmt.Errorf("恢复全备失败,err:%v,cmd:%s", err, restoreCmd) + mylog.Logger.Error(full.Err.Error()) + return } // 4、拉起节点 @@ -1381,23 +1384,32 @@ func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string, _, full.Err = util.RunLocalCmd("su", []string{consts.MysqlAaccount, "-c", startScript + " " + strconv.Itoa( dstTendisPort)}, "", nil, 30*time.Second) if full.Err != nil { - return full.Err + return } mylog.Logger.Info(fmt.Sprintf("su %s -c \"%s\"", consts.MysqlAaccount, startScript+" "+strconv.Itoa(dstTendisPort))) - time.Sleep(2 * time.Second) + time.Sleep(30 * time.Second) //再次探测tendisplus连接性->拉起是否成功 - redisCli, err = myredis.NewRedisClient(redisAddr, dstTendisPasswd, 0, consts.TendisTypeTendisplusInsance) + // 实例比较大时,可能拉起的比较慢,需要多次探测。 如果10分钟都还没拉起,那就认为失败了 + for i := 1; i < 20; i++ { + redisCli, err = myredis.NewRedisClient(redisAddr, dstTendisPasswd, 0, consts.TendisTypeTendisplusInsance) + if err != nil { + mylog.Logger.Warn("第%d/20探测%d实例是否存活失败...sleep 30s后进行下一次探测", i, dstTendisPort) + time.Sleep(30 * time.Second) + } + } if err != nil { - return err + full.Err = fmt.Errorf("%d全备恢复后,实例拉起失败", dstTendisPort) + return } + defer redisCli.Close() msg = fmt.Sprintf("%s:%d 恢复全备成功", dstTendisIP, dstTendisPort) mylog.Logger.Info(msg) - return nil + return } func (full *TendisFullBackPull) getRestoreTool(dstTendisIP, masterVersion string, dstTendisPort int) (restoreTool string) { diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/datastructure/tendisSSD_incrback.go b/dbm-services/redis/db-tools/dbactuator/pkg/datastructure/tendisSSD_incrback.go index 6bfddd7eec..e94b64e15a 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/datastructure/tendisSSD_incrback.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/datastructure/tendisSSD_incrback.go @@ -239,7 +239,8 @@ func (incr *TredisRocksDBIncrBack) GetTredisIncrbacks(binlogFileList []FileDetai // 过滤节点维度的文件,这里比较重要,因为flow传下来的是这台机器涉及到的所有节点信息, // 这里是针对单节点的,所以需要过滤出来,这个值返回给前置函数 - if strings.Contains(back01.BackupFile, incr.FileName) && (back01.NodeIP == incr.SourceIP) { + // 不能用strings.contains来判断因为backupfile可能存在xxx-20250210130000.log.zst的情况,直接用port段匹配 + if match01[1] == incr.FileName && (back01.NodeIP == incr.SourceIP) { mylog.Logger.Info("back01.BackupFile:%s,incr.FileName:%s,back01.NodeIP:%s,incr.SourceIP:%s", back01.BackupFile, incr.FileName, back01.NodeIP, incr.SourceIP) backs = append(backs, back01)