Skip to content

Commit

Permalink
fix(redis): redis集群回档bug修复 #9254
Browse files Browse the repository at this point in the history
  • Loading branch information
OMG-By authored and iSecloud committed Feb 12, 2025
1 parent db283c8 commit 5fc343f
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"os"
"path/filepath"

"strconv"
"sync"
Expand Down Expand Up @@ -119,6 +120,12 @@ func (task *RedisDataStructure) Run() (err error) {
return err
}

// 检查文件是否齐全
err = task.CheckFileList()
if err != nil {
return err
}

task.runtime.Logger.Info(task.params.RecoveryTimePoint)
// 构造任务初始化
recoverTasks := make([]*datastructure.TendisInsRecoverTask, 0, len(task.params.SourcePorts))
Expand Down Expand Up @@ -223,3 +230,29 @@ func (task *RedisDataStructure) CheckRecoverDir() (err error) {
task.runtime.Logger.Info("CheckRecoverDir:%s success", task.RecoverDir)
return nil
}

// CheckFileList 检查需要的全备和增备是否都拉取齐全,避免出现缺失一个无法重试的情况
func (task *RedisDataStructure) CheckFileList() error {
var filePath, msg string
fileAllOk := true
for _, file := range task.params.FullFileList {
filePath = filepath.Join(task.RecoverDir, file.FileName)
if _, err := os.Stat(filePath); os.IsNotExist(err) {
fileAllOk = false
msg = fmt.Sprintf("全备文件:%s 不存在", filePath)
task.runtime.Logger.Info(msg)
}
}
for _, file := range task.params.BinlogFileList {
filePath = filepath.Join(task.RecoverDir, file.FileName)
if _, err := os.Stat(filePath); os.IsNotExist(err) {
fileAllOk = false
msg = fmt.Sprintf("增备文件:%s 不存在", filePath)
task.runtime.Logger.Info(msg)
}
}
if !fileAllOk {
return fmt.Errorf("有文件未拉取!!!请检查后重试")
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -1264,61 +1264,63 @@ func (task *TendisInsRecoverTask) GetRocksdbBackupMeta(rocksIdx int) (meta *Rock
// (其实 参数backupDir 和 full.SaveLocalDir full.SaveRemoteDir是同一个文件夹, 但是是不同视角)
// NOCC:golint/fnsize(设计如此)
func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string,
dstTendisPort int, dstTendisPasswd string) error {
dstTendisPort int, dstTendisPasswd string) {

redisAddr := fmt.Sprintf("%s:%s", dstTendisIP, strconv.Itoa(dstTendisPort))
msg := fmt.Sprintf("master:%s开始导入全备", redisAddr)
mylog.Logger.Info(msg)
//再次探测tendisplus连接性
redisCli, err := myredis.NewRedisClient(redisAddr, dstTendisPasswd, 0, consts.TendisTypeTendisSSDInsance)
if err != nil {
return err
full.Err = err
return
}
defer redisCli.Close()

var infoRet map[string]string
infoRet, full.Err = redisCli.Info("server")
if full.Err != nil {
return full.Err
return
}
masterVersion := infoRet["redis_version"]
mylog.Logger.Info("Get redis_version success redis_version:%s", masterVersion)

var ssdDataDir string
ssdDataDir, full.Err = redisCli.GetDir()
if full.Err != nil {
return full.Err
return
}
// "Get SsdDataDir success SsdDataDir:/data1/redis/15000/data"
mylog.Logger.Info("Get SsdDataDir success SsdDataDir:%s", ssdDataDir)

//1、shutdown
err = redisCli.Shutdown()
if err != nil {
return err
full.Err = err
return
}
mylog.Logger.Info("master(%s) shutdown success", redisAddr)

if full.LocalFullBackupDir == "" {
err = fmt.Errorf("全备文件夹不存在,请检查:%s", full.LocalFullBackupDir)
mylog.Logger.Error(err.Error())
return err
full.Err = fmt.Errorf("全备文件夹不存在,请检查:%s", full.LocalFullBackupDir)
mylog.Logger.Error(full.Err.Error())
return

}
mylog.Logger.Info("full.LocalFullBackupDir:%s", full.LocalFullBackupDir)

fullFilePath := fmt.Sprintf("%v/%v", full.SaveDir, full.LocalFullBackupDir)
if _, err := os.Stat(fullFilePath); os.IsNotExist(err) {
err = fmt.Errorf("全备文件夹不存在,请检查:%s", full.LocalFullBackupDir)
mylog.Logger.Error(err.Error())
return err
full.Err = fmt.Errorf("全备文件夹不存在,请检查:%s", full.LocalFullBackupDir)
mylog.Logger.Error(full.Err.Error())
return
}

DepsDir := "/usr/local/redis/bin/deps"
if _, err := os.Stat(DepsDir); os.IsNotExist(err) {
err = fmt.Errorf("%s:不存在,请检查:%s", DepsDir, DepsDir)
mylog.Logger.Error(err.Error())
return err
full.Err = fmt.Errorf("%s:不存在,请检查:%s", DepsDir, DepsDir)
mylog.Logger.Error(full.Err.Error())
return
}

nowtime := time.Now().Local().Format(consts.FilenameTimeLayout)
Expand All @@ -1339,11 +1341,11 @@ func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string,
} else {
full.Err = fmt.Errorf("unsupported tendis version:%s,exit.", masterVersion)
mylog.Logger.Error(full.Err.Error())
return full.Err
return
}
restoreTool := full.getRestoreTool(dstTendisIP, masterVersion, dstTendisPort)
if full.Err != nil {
return full.Err
return
}

restoreCmd := fmt.Sprintf(`
Expand All @@ -1358,46 +1360,56 @@ func (full *TendisFullBackPull) RecoverTredisFromRocksdb(dstTendisIP string,
mylog.Logger.Info("restore command result:" + ret)
if full.Err != nil {
mylog.Logger.Error(fmt.Sprintf("恢复全备失败,详情:%v", err))
return full.Err
return
}

if util.FileExists(rockdbDir) {
mylog.Logger.Info("restore ok, %s generated", rockdbDir)
} else {
full.Err = fmt.Errorf("restore command failed, %s not generated", rockdbDir)
mylog.Logger.Error(full.Err.Error())
return full.Err
return
}
util.LocalDirChownMysql(rockdbDir)

ret01 := strings.TrimSpace(ret)
if strings.Contains(ret01, "ERR:") == true {
mylog.Logger.Error(fmt.Sprintf("恢复全备失败,err:%v,cmd:%s", err, restoreCmd))
return full.Err
full.Err = fmt.Errorf("恢复全备失败,err:%v,cmd:%s", err, restoreCmd)
mylog.Logger.Error(full.Err.Error())
return
}

// 4、拉起节点
startScript := filepath.Join("/usr/local/redis/bin", "start-redis.sh")
_, full.Err = util.RunLocalCmd("su", []string{consts.MysqlAaccount, "-c", startScript + " " + strconv.Itoa(
dstTendisPort)}, "", nil, 30*time.Second)
if full.Err != nil {
return full.Err
return
}
mylog.Logger.Info(fmt.Sprintf("su %s -c \"%s\"", consts.MysqlAaccount,
startScript+" "+strconv.Itoa(dstTendisPort)))
time.Sleep(2 * time.Second)
time.Sleep(30 * time.Second)

//再次探测tendisplus连接性->拉起是否成功
redisCli, err = myredis.NewRedisClient(redisAddr, dstTendisPasswd, 0, consts.TendisTypeTendisplusInsance)
// 实例比较大时,可能拉起的比较慢,需要多次探测。 如果10分钟都还没拉起,那就认为失败了
for i := 1; i < 20; i++ {
redisCli, err = myredis.NewRedisClient(redisAddr, dstTendisPasswd, 0, consts.TendisTypeTendisplusInsance)
if err != nil {
mylog.Logger.Warn("第%d/20探测%d实例是否存活失败...sleep 30s后进行下一次探测", i, dstTendisPort)
time.Sleep(30 * time.Second)
}
}
if err != nil {
return err
full.Err = fmt.Errorf("%d全备恢复后,实例拉起失败", dstTendisPort)
return
}

defer redisCli.Close()

msg = fmt.Sprintf("%s:%d 恢复全备成功", dstTendisIP, dstTendisPort)
mylog.Logger.Info(msg)

return nil
return
}

func (full *TendisFullBackPull) getRestoreTool(dstTendisIP, masterVersion string, dstTendisPort int) (restoreTool string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ func (incr *TredisRocksDBIncrBack) GetTredisIncrbacks(binlogFileList []FileDetai

// 过滤节点维度的文件,这里比较重要,因为flow传下来的是这台机器涉及到的所有节点信息,
// 这里是针对单节点的,所以需要过滤出来,这个值返回给前置函数
if strings.Contains(back01.BackupFile, incr.FileName) && (back01.NodeIP == incr.SourceIP) {
// 不能用strings.contains来判断因为backupfile可能存在xxx-20250210130000.log.zst的情况,直接用port段匹配
if match01[1] == incr.FileName && (back01.NodeIP == incr.SourceIP) {
mylog.Logger.Info("back01.BackupFile:%s,incr.FileName:%s,back01.NodeIP:%s,incr.SourceIP:%s",
back01.BackupFile, incr.FileName, back01.NodeIP, incr.SourceIP)
backs = append(backs, back01)
Expand Down

0 comments on commit 5fc343f

Please sign in to comment.