Skip to content

Commit

Permalink
support gracefully shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
jjz921024 authored and suxb201 committed Dec 26, 2023
1 parent 2ff48ca commit f7d72a2
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 85 deletions.
17 changes: 16 additions & 1 deletion cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package main

import (
"os"
"os/signal"
"syscall"
"context"
_ "net/http/pprof"

"RedisShake/internal/config"
Expand Down Expand Up @@ -110,7 +114,10 @@ func main() {

log.Infof("start syncing...")

ch := theReader.StartRead()
ctx, cancel := context.WithCancel(context.Background())
ch := theReader.StartRead(ctx)
go waitShutdown(cancel)

for e := range ch {
// calc arguments
e.Parse()
Expand All @@ -132,3 +139,11 @@ func main() {
utils.ReleaseFileLock() // Release file lock
log.Infof("all done")
}

func waitShutdown(cancel context.CancelFunc) {
quitCh := make(chan os.Signal, 1)
signal.Notify(quitCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
sig := <-quitCh
log.Infof("Got signal: %s to exit.", sig)
cancel()
}
16 changes: 8 additions & 8 deletions internal/aof/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aof

import (
"bufio"
"context"
"io"
"os"
"strconv"
Expand Down Expand Up @@ -51,7 +52,7 @@ func ReadCompleteLine(reader *bufio.Reader) ([]byte, error) {
return line, err
}

func (ld *Loader) LoadSingleAppendOnlyFile(timestamp int64) int {
func (ld *Loader) LoadSingleAppendOnlyFile(ctx context.Context, timestamp int64) int {
ret := OK
filePath := ld.filePath
fp, err := os.Open(filePath)
Expand Down Expand Up @@ -80,12 +81,14 @@ func (ld *Loader) LoadSingleAppendOnlyFile(timestamp int64) int {
}
reader := bufio.NewReader(fp)
for {

line, err := ReadCompleteLine(reader)
{
select {
case <-ctx.Done():
return ret
default:
line, err := ReadCompleteLine(reader)
if err != nil {
if err == io.EOF {
break
return ret
} else {
log.Infof("Unrecoverable error reading the append only File %v: %v", filePath, err)
ret = Failed
Expand Down Expand Up @@ -152,9 +155,6 @@ func (ld *Loader) LoadSingleAppendOnlyFile(timestamp int64) int {
e.Argv = append(e.Argv, value)
}
ld.ch <- e

}

}
return ret
}
8 changes: 8 additions & 0 deletions internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

type Redis struct {
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
protoReader *proto.Reader
Expand All @@ -33,6 +34,7 @@ func NewRedisClient(address string, username string, password string, Tls bool)
log.Panicf("dial failed. address=[%s], tls=[%v], err=[%v]", address, Tls, err)
}

r.conn = conn
r.reader = bufio.NewReader(conn)
r.writer = bufio.NewWriter(conn)
r.protoReader = proto.NewReader(r.reader)
Expand Down Expand Up @@ -129,6 +131,12 @@ func (r *Redis) SetBufioReader(rd *bufio.Reader) {
r.protoReader = proto.NewReader(r.reader)
}

func (r *Redis) Close() {
if err := r.conn.Close(); err != nil {
log.Infof("close redis conn err: %s\n", err.Error())
}
}

/* Commands */

func (r *Redis) Scan(cursor uint64) (newCursor uint64, keys []string) {
Expand Down
9 changes: 6 additions & 3 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rdb
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"io"
"os"
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewLoader(name string, updateFunc func(int64), filPath string, ch chan *ent

// ParseRDB parse rdb file
// return repl stream db id
func (ld *Loader) ParseRDB() int {
func (ld *Loader) ParseRDB(ctx context.Context) int {
var err error
ld.fp, err = os.OpenFile(ld.filPath, os.O_RDONLY, 0666)
if err != nil {
Expand Down Expand Up @@ -89,12 +90,12 @@ func (ld *Loader) ParseRDB() int {
log.Debugf("[%s] RDB version: %d", ld.name, version)

// read entries
ld.parseRDBEntry(rd)
ld.parseRDBEntry(ctx, rd)

return ld.replStreamDbId
}

func (ld *Loader) parseRDBEntry(rd *bufio.Reader) {
func (ld *Loader) parseRDBEntry(ctx context.Context, rd *bufio.Reader) {
// for stat
updateProcessSize := func() {
if ld.updateFunc == nil {
Expand Down Expand Up @@ -198,6 +199,8 @@ func (ld *Loader) parseRDBEntry(rd *bufio.Reader) {
select {
case <-tick:
updateProcessSize()
case <- ctx.Done():
return
default:
}
}
Expand Down
8 changes: 4 additions & 4 deletions internal/reader/aof_reader.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package reader

import (
"context"
"path/filepath"

"RedisShake/internal/aof"

"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/utils"
Expand Down Expand Up @@ -66,7 +66,7 @@ func NewAOFReader(opts *AOFReaderOptions) Reader {
return r
}

func (r *aofReader) StartRead() chan *entry.Entry {
func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry {
//init entry
r.ch = make(chan *entry.Entry, 1024)

Expand All @@ -79,7 +79,7 @@ func (r *aofReader) StartRead() chan *entry.Entry {
if manifestInfo == nil { // load single aof file
log.Infof("start send single AOF path=[%s]", r.path)
aofLoader := aof.NewLoader(r.path, r.ch)
ret := aofLoader.LoadSingleAppendOnlyFile(r.stat.AOFTimestamp)
ret := aofLoader.LoadSingleAppendOnlyFile(ctx, r.stat.AOFTimestamp)
if ret == AOFOk || ret == AOFTruncated {
log.Infof("The AOF File was successfully loaded")
} else {
Expand All @@ -89,7 +89,7 @@ func (r *aofReader) StartRead() chan *entry.Entry {
close(r.ch)
} else {
aofLoader := NewAOFFileInfo(r.path, r.ch)
ret := aofLoader.LoadAppendOnlyFile(manifestInfo, r.stat.AOFTimestamp)
ret := aofLoader.LoadAppendOnlyFile(ctx, manifestInfo, r.stat.AOFTimestamp)
if ret == AOFOk || ret == AOFTruncated {
log.Infof("The AOF File was successfully loaded")
} else {
Expand Down
5 changes: 3 additions & 2 deletions internal/reader/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package reader
import (
"RedisShake/internal/entry"
"RedisShake/internal/status"
"context"
)

type Reader interface {
status.Statusable
StartRead() chan *entry.Entry
}
StartRead(ctx context.Context) chan *entry.Entry
}
15 changes: 8 additions & 7 deletions internal/reader/parsing_aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"container/list"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -555,7 +556,7 @@ func GetHistoryAndIncrAppendOnlyFilesNum(am *AOFManifest) int {
return num
}

func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int {
func (aofInfo *INFO) LoadAppendOnlyFile(ctx context.Context, am *AOFManifest, AOFTimeStamp int64) int {
if am == nil {
log.Panicf("AOFManifest is null")
}
Expand Down Expand Up @@ -593,7 +594,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int
aofInfo.UpdateLoadingFileName(AOFName)
BaseSize = aofInfo.GetAppendOnlyFileSize(AOFName, nil)
start = Ustime()
ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, 0) //Currently, RDB files cannot be restored at a point in time.
ret = aofInfo.ParsingSingleAppendOnlyFile(ctx, AOFName, 0) //Currently, RDB files cannot be restored at a point in time.
if ret == AOFOk || (ret == AOFTruncated) {
log.Infof("DB loaded from Base File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000)
}
Expand Down Expand Up @@ -627,7 +628,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int
aofInfo.UpdateLoadingFileName(AOFName)
AOFNum++
start = Ustime()
ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, AOFTimeStamp)
ret = aofInfo.ParsingSingleAppendOnlyFile(ctx, AOFName, AOFTimeStamp)
if ret == AOFOk || (ret == AOFTruncated) {
log.Infof("DB loaded from History File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000)
return ret
Expand Down Expand Up @@ -659,7 +660,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int
aofInfo.UpdateLoadingFileName(AOFName)
AOFNum++
start = Ustime()
ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, AOFTimeStamp)
ret = aofInfo.ParsingSingleAppendOnlyFile(ctx, AOFName, AOFTimeStamp)
if ret == AOFOk || (ret == AOFTruncated) {
log.Infof("DB loaded from incr File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000)
return ret
Expand Down Expand Up @@ -691,7 +692,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int

}

func (aofInfo *INFO) ParsingSingleAppendOnlyFile(FileName string, AOFTimeStamp int64) int {
func (aofInfo *INFO) ParsingSingleAppendOnlyFile(ctx context.Context, FileName string, AOFTimeStamp int64) int {
ret := AOFOk
AOFFilepath := path.Join(aofInfo.AOFDirName, FileName)
println(AOFFilepath)
Expand Down Expand Up @@ -725,12 +726,12 @@ func (aofInfo *INFO) ParsingSingleAppendOnlyFile(FileName string, AOFTimeStamp i
log.Infof("Reading RDB Base File on AOF loading...")
rdbOpt := RdbReaderOptions{Filepath: AOFFilepath}
ldRDB := NewRDBReader(&rdbOpt)
ldRDB.StartRead()
ldRDB.StartRead(ctx)
return AOFOk
}
// load single aof file
aofSingleReader := aof.NewLoader(MakePath(aofInfo.AOFDirName, FileName), aofInfo.ch)
ret = aofSingleReader.LoadSingleAppendOnlyFile(AOFTimeStamp)
ret = aofSingleReader.LoadSingleAppendOnlyFile(ctx, AOFTimeStamp)
return ret

}
6 changes: 3 additions & 3 deletions internal/reader/rdb_reader.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package reader

import (
"context"
"fmt"

"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/rdb"
"RedisShake/internal/utils"

"github.com/dustin/go-humanize"
)

Expand Down Expand Up @@ -41,7 +41,7 @@ func NewRDBReader(opts *RdbReaderOptions) Reader {
return r
}

func (r *rdbReader) StartRead() chan *entry.Entry {
func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry {
log.Infof("[%s] start read", r.stat.Name)
r.ch = make(chan *entry.Entry, 1024)
updateFunc := func(offset int64) {
Expand All @@ -53,7 +53,7 @@ func (r *rdbReader) StartRead() chan *entry.Entry {
rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, r.stat.Filepath, r.ch)

go func() {
_ = rdbLoader.ParseRDB()
_ = rdbLoader.ParseRDB(ctx)
log.Infof("[%s] rdb file parse done", r.stat.Name)
close(r.ch)
}()
Expand Down
5 changes: 3 additions & 2 deletions internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reader

import (
"context"
"fmt"
"sync"

Expand All @@ -25,13 +26,13 @@ func NewScanClusterReader(opts *ScanReaderOptions) Reader {
return rd
}

func (rd *scanClusterReader) StartRead() chan *entry.Entry {
func (rd *scanClusterReader) StartRead(ctx context.Context) chan *entry.Entry {
ch := make(chan *entry.Entry, 1024)
var wg sync.WaitGroup
for _, r := range rd.readers {
wg.Add(1)
go func(r Reader) {
for e := range r.StartRead() {
for e := range r.StartRead(ctx) {
ch <- e
}
wg.Done()
Expand Down
Loading

0 comments on commit f7d72a2

Please sign in to comment.