Skip to content

Commit

Permalink
add aof restore
Browse files Browse the repository at this point in the history
  • Loading branch information
bug-superman committed Oct 15, 2023
1 parent 8d239ae commit 6235dfe
Show file tree
Hide file tree
Showing 16 changed files with 1,179 additions and 9 deletions.
10 changes: 10 additions & 0 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"RedisShake/internal/function"
"RedisShake/internal/log"
"RedisShake/internal/reader"
"RedisShake/internal/reader/aof_reader"
"RedisShake/internal/status"
"RedisShake/internal/utils"
"RedisShake/internal/writer"
Expand Down Expand Up @@ -60,6 +61,15 @@ func main() {
}
theReader = reader.NewRDBReader(opts)
log.Infof("create RdbReader: %v", opts.Filepath)
} else if v.IsSet("aof_reader") {
opts := new(aof_reader.AOFReaderOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("aof_reader", opts)
if err != nil {
log.Panicf("failed to read the AOFReader config entry. err: %v", err)
}
theReader = aof_reader.NewAOFReader(opts)
log.Infof("create AOFReader: %v", opts.Filepath)
} else {
log.Panicf("no reader config entry found")
}
Expand Down
19 changes: 19 additions & 0 deletions docs/src/en/reader/aof_reader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# aof_reader

## Introduction

Can use ` aof_ Reader 'to read data from the AOF file and then write it to the target end.
It is commonly used to recover data from backup files and also supports data flash back.

## configuration

```toml
[aof_reader]
aoffilepath="/tmp/appendonly.aof.manifest" "or single-aof: /tmp/appendonly.aof ""
aoftimestamp="0"
```

*An absolute path should be passed in.

##The main process is as follows:
![aof_reader.jpg](/public/aof_reader.jpg)
Binary file added docs/src/public/aof_reader.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/src/zh/guide/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ RedisShake 提供了不同的 Reader 用来对接不同的源端,配置详见
* [Sync Reader](../reader/sync_reader.md)
* [Scan Reader](../reader/scan_reader.md)
* [RDB Reader](../reader/rdb_reader.md)
* [AOF Reader](../reader/aof_reader.md)

## writer 配置

Expand Down
18 changes: 18 additions & 0 deletions docs/src/zh/reader/aof_reader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# aof_reader

## 介绍

可以使用 `aof_reader` 来从 AOF 文件中读取数据,然后写入目标端。常见于从备份文件中恢复数据,还支持数据闪回。

## 配置

```toml
[aof_reader]
aoffilepath="/tmp/appendonly.aof.manifest" "或者单aof文件 "/tmp/appendonly.aof""
aoftimestamp="0"
```

* 应传入绝对路径。

## 主要流程如下:
![aof_reader.jpg](/public/aof_reader.jpg)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/text v0.12.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
155 changes: 155 additions & 0 deletions internal/aof/aof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package aof

import (
"bufio"
"io"
"os"
"strconv"
"strings"

"RedisShake/internal/entry"
"RedisShake/internal/log"
)

const (
AOFNotExist = 1
AOFOpenErr = 3
AOFOK = 0
AOFEmpty = 2
AOFFailed = 4
AOFTruncated = 5
SizeMax = 128
)

type Loader struct {
filPath string
ch chan *entry.Entry
}

func NewLoader(filPath string, ch chan *entry.Entry) *Loader {
ld := new(Loader)
ld.ch = ch
ld.filPath = filPath
return ld
}

func ReadCompleteLine(reader *bufio.Reader) ([]byte, error) {
line, isPrefix, err := reader.ReadLine()
if err != nil {
return nil, err
}

for isPrefix {
var additional []byte
additional, isPrefix, err = reader.ReadLine()
if err != nil {
return nil, err
}
line = append(line, additional...)
}

return line, err
}

func (ld *Loader) LoadSingleAppendOnlyFile(AOFTimeStamp int64) int {
ret := AOFOK
AOFFilepath := ld.filPath
fp, err := os.Open(AOFFilepath)
if err != nil {
if os.IsNotExist(err) {
if _, err := os.Stat(AOFFilepath); err == nil || !os.IsNotExist(err) {
log.Infof("Fatal error: can't open the append log File %v for reading: %v", AOFFilepath, err.Error())
return AOFOpenErr
} else {
log.Infof("The append log File %v doesn't exist: %v", AOFFilepath, err.Error())
return AOFNotExist
}

}
defer fp.Close()

stat, _ := fp.Stat()
if stat.Size() == 0 {
return AOFEmpty
}
}
reader := bufio.NewReader(fp)
for {

line, err := ReadCompleteLine(reader)
{
if err != nil {
if err == io.EOF {
break
} else {
log.Infof("Unrecoverable error reading the append only File %v: %v", AOFFilepath, err)
ret = AOFFailed
return ret
}
} else {
_, errs := fp.Seek(0, io.SeekCurrent)
if errs != nil {
log.Infof("Unrecoverable error reading the append only File %v: %v", AOFFilepath, errs)
ret = AOFFailed
return ret
}
}

if line[0] == '#' {
if AOFTimeStamp != 0 && strings.HasPrefix(string(line), "#TS:") {
var ts int64
ts, err = strconv.ParseInt(strings.TrimPrefix(string(line[1:]), "#TS:"), 10, 64)
if err != nil {
log.Panicf("Invalid timestamp annotation")
}

if ts > AOFTimeStamp {
ret = AOFTruncated
log.Infof("AOFTruncated %s", line)
return ret
}
}
continue
}
if line[0] != '*' {
log.Panicf("Bad File format reading the append only File %v:make a backup of your AOF File, then use ./redis-check-AOF --fix <FileName.manifest>", AOFFilepath)
}
argc, _ := strconv.ParseInt(string(line[1:]), 10, 64)
if argc < 1 {
log.Panicf("Bad File format reading the append only File %v:make a backup of your AOF File, then use ./redis-check-AOF --fix <FileName.manifest>", AOFFilepath)
}
if argc > int64(SizeMax) {
log.Panicf("Bad File format reading the append only File %v:make a backup of your AOF File, then use ./redis-check-AOF --fix <FileName.manifest>", AOFFilepath)
}
e := entry.NewEntry()
var argv []string

for j := 0; j < int(argc); j++ {
line, err := ReadCompleteLine(reader)
if err != nil || line[0] != '$' {
log.Infof("Bad File format reading the append only File %v:make a backup of your AOF File, then use ./redis-check-AOF --fix <FileName.manifest>", AOFFilepath)
ret = AOFFailed
return ret
}
v64, _ := strconv.ParseInt(string(line[1:]), 10, 64)
argString := make([]byte, v64+2)
argString, err = ReadCompleteLine(reader)
if err != nil {
log.Infof("Unrecoverable error reading the append only File %v: %v", AOFFilepath, err)
ret = AOFFailed
return ret
}
argString = argString[:v64]
argv = append(argv, string(argString))
}

for _, value := range argv {
e.Argv = append(e.Argv, value)
}
ld.ch <- e

}

}
return ret
}
108 changes: 108 additions & 0 deletions internal/reader/aof_reader/aof_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package reader

import (
"RedisShake/internal/aof"
"RedisShake/internal/reader"
"path/filepath"

"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/utils"

"github.com/dustin/go-humanize"
)

type AOFReaderOptions struct {
Filepath string `mapstructure:"filepath" default:""`
AOFTimestamp int64 `mapstructure:"timestamp" default:"0"`
FilterDangerousCommands string `mapstructure:"filterdangerousCommands" default:"no"`
}

type aofReader struct {
path string
ch chan *entry.Entry

stat struct {
AOFName string `json:"aof_name"`
AOFStatus string `json:"aof_status"`
AOFFilepath string `json:"aof_file_path"`
AOFFileSizeBytes int64 `json:"aof_file_size_bytes"`
AOFFileSizeHuman string `json:"aof_file_size_human"`
AOFFileSentBytes int64 `json:"aof_file_sent_bytes"`
AOFFileSentHuman string `json:"aof_file_sent_human"`
AOFPercent string `json:"aof_percent"`
AOFTimestamp int64 `json:"aof_time_stamp"`
FilterDangerousCommands string `json:"filter_dangerous_commands"`
}
}

func (r *aofReader) Status() interface{} {
return r.stat
}

func (r *aofReader) StatusString() string {
return r.stat.AOFStatus
}

func (r *aofReader) StatusConsistent() bool {
return r.stat.AOFFileSentBytes == r.stat.AOFFileSizeBytes
}

func NewAOFReader(opts *AOFReaderOptions) reader.Reader {
log.Infof("NewAOFReader: path=[%s]", opts.Filepath)
absolutePath, err := filepath.Abs(opts.Filepath)
if err != nil {
log.Panicf("NewAOFReader: filepath.Abs error: %s", err.Error())
}
log.Infof("NewAOFReader: absolute path=[%s]", absolutePath)
r := &aofReader{
path: absolutePath,
ch: make(chan *entry.Entry),
}
r.stat.AOFName = "aof_reader"
r.stat.AOFStatus = "init"
r.stat.AOFFilepath = absolutePath
r.stat.AOFFileSizeBytes = int64(utils.GetFileSize(absolutePath))
r.stat.AOFFileSizeHuman = humanize.Bytes(uint64(r.stat.AOFFileSizeBytes))
r.stat.AOFTimestamp = opts.AOFTimestamp
r.stat.FilterDangerousCommands = opts.FilterDangerousCommands
return r
}

func (r *aofReader) StartRead() chan *entry.Entry {
//init entry
r.ch = make(chan *entry.Entry, 1024)

// start read aof
go func() {
aofFileInfo := NewAOFFileInfo(r.path, r.ch)
// try load manifest file
aofFileInfo.AOFLoadManifestFromDisk()
manifestInfo := aofFileInfo.AOFManifest
if manifestInfo == nil { // load single aof file
log.Infof("start send single AOF path=[%s]", r.path)
aofLoader := aof.NewLoader(r.path, r.ch)
ret := aofLoader.LoadSingleAppendOnlyFile(r.stat.AOFTimestamp)
if ret == AOFOk || ret == AOFTruncated {
log.Infof("The AOF File was successfully loaded")
} else {
log.Infof("There was an error opening the AOF File.")
}
log.Infof("Send single AOF finished. path=[%s]", r.path)
close(r.ch)
} else {
aofLoader := NewAOFFileInfo(r.path, r.ch)
ret := aofLoader.LoadAppendOnlyFile(manifestInfo, r.ch, r.stat.AOFTimestamp, r.stat.FilterDangerousCommands)
if ret == AOFOk || ret == AOFTruncated {
log.Infof("The AOF File was successfully loaded")
} else {
log.Infof("There was an error opening the AOF File.")
}
log.Infof("Send multi-part AOF finished. path=[%s]", r.path)
close(r.ch)
}

}()

return r.ch
}
Loading

0 comments on commit 6235dfe

Please sign in to comment.