Skip to content

Commit

Permalink
Add comments, add split the E2E test cases and other test cases. (#80)
Browse files Browse the repository at this point in the history
* add example app to readme

* add NodeID function into dcron

* add cron lib

* fix warning

* logger removal : phares1

* update

* update

* update

* update

* update

* update

* update

* update test workflow

* revert 1.22

* update etcd driver

* update

* fix get nodes

* update

* update for fix TAT

* Revert "update etcd driver"

This reverts commit a21ebf7.

* Revert "deps: updated go-redis to v9 (#79)"

This reverts commit 0b85b24.

* update

* update

* refact etcd

* Revert "refact etcd"

This reverts commit 049bed1.

* Revert "update"

This reverts commit 9c71fd6.

* update

* refactor etcddriver

* fix error

* update

* Revert "update"

This reverts commit 6cfcfe6.

* Revert "fix error"

This reverts commit 99b2d82.

* Revert "refactor etcddriver"

This reverts commit a576ac3.

* update

* update

* remove comments, and fix

* add comments
  • Loading branch information
dxyinme authored Jan 1, 2024
1 parent 8d05300 commit af5e8d6
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 134 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ jobs:
name: Test on go ${{ matrix.go_version }}
strategy:
matrix:
go_version: ["1.19", "1.20", "1.21"]
go_version: [
"1.19",
"1.20",
"1.21",
]
runs-on: ubuntu-latest

steps:
Expand All @@ -26,4 +30,7 @@ jobs:
go-version: ${{ matrix.go_version }}

- name: Test
run: go test -v ./...
run: go test -v $(go list ./... | grep -v github.com/libi/dcron/e2e)

- name: Test E2E
run: go test -v ./e2e
10 changes: 4 additions & 6 deletions dcron.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ type Dcron struct {
nodePool INodePool
running int32

logger dlog.Logger
logInfo bool
logger dlog.Logger

nodeUpdateDuration time.Duration
hashReplicas int
Expand Down Expand Up @@ -98,14 +97,14 @@ func (d *Dcron) GetLogger() dlog.Logger {

// AddJob add a job
func (d *Dcron) AddJob(jobName, cronStr string, job Job) (err error) {
return d.addJob(jobName, cronStr, nil, job)
return d.addJob(jobName, cronStr, job)
}

// AddFunc add a cron func
func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error) {
return d.addJob(jobName, cronStr, cmd, nil)
return d.addJob(jobName, cronStr, cron.FuncJob(cmd))
}
func (d *Dcron) addJob(jobName, cronStr string, cmd func(), job Job) (err error) {
func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) {
d.logger.Infof("addJob '%s' : %s", jobName, cronStr)

d.jobsRWMut.Lock()
Expand All @@ -116,7 +115,6 @@ func (d *Dcron) addJob(jobName, cronStr string, cmd func(), job Job) (err error)
innerJob := JobWarpper{
Name: jobName,
CronStr: cronStr,
Func: cmd,
Job: job,
Dcron: d,
}
Expand Down
6 changes: 5 additions & 1 deletion dlog/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,22 @@ func NewPrintfLoggerFromLogfLogger(logger LogfLogger) PrintfLogger {

func NewLoggerForTest(t *testing.T) Logger {
return &StdLogger{
Log: NewPrintfLoggerFromLogfLogger(t),
Log: NewPrintfLoggerFromLogfLogger(t),
LogVerbose: true,
}
}

// 这个方法会打印出所有的WARN level以上的LOG
func WarnPrintfLogger(l PrintfLogger) Logger {
return &StdLogger{Log: l, LogVerbose: false}
}

// 这个方法会打印出所有的INFO level的LOG
func VerbosePrintfLogger(l PrintfLogger) Logger {
return &StdLogger{Log: l, LogVerbose: true}
}

// 默认的Logger构造函数,会打印出所有WARN level以上的LOG
func DefaultPrintfLogger(l PrintfLogger) Logger {
return WarnPrintfLogger(l)
}
8 changes: 7 additions & 1 deletion driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package driver
import (
"context"

"github.com/redis/go-redis/v9"
redis "github.com/redis/go-redis/v9"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand All @@ -18,7 +18,13 @@ type DriverV2 interface {
NodeID() string
// get nodes
GetNodes(ctx context.Context) (nodes []string, err error)

// register node to remote server (like etcd/redis),
// will create a goroutine to keep the connection.
// And then continue for other work.
Start(ctx context.Context) (err error)

// stop the goroutine of keep connection.
Stop(ctx context.Context) (err error)

withOption(opt Option) (err error)
Expand Down
48 changes: 22 additions & 26 deletions driver/etcddriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ type EtcdDriver struct {
nodeID string
serviceName string

cli *clientv3.Client
cli *clientv3.Client
nodes *sync.Map
logger dlog.Logger

lease int64
nodes *sync.Map
leaseID clientv3.LeaseID
logger dlog.Logger
leaseCh <-chan *clientv3.LeaseKeepAliveResponse

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -91,9 +93,11 @@ func (e *EtcdDriver) watcher(serviceName string) {
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //修改或者新增
case mvccpb.PUT:
// 修改或者新增
e.setServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: //删除
case mvccpb.DELETE:
// 删除
e.delServiceList(string(ev.Kv.Key))
}
}
Expand Down Expand Up @@ -138,39 +142,30 @@ func (e *EtcdDriver) revoke(ctx context.Context) {
}
}

func (e *EtcdDriver) heartBeat(ctx context.Context) {
label:
leaseCh, err := e.keepAlive(ctx, e.nodeID)
func (e *EtcdDriver) startHeartBeat(ctx context.Context) {
var err error
e.leaseCh, err = e.keepAlive(ctx, e.nodeID)
if err != nil {
e.logger.Errorf("keep alive error, %v", err)
return
}
}

func (e *EtcdDriver) keepHeartBeat() {
for {
select {
case <-e.ctx.Done():
{
e.logger.Infof("driver stopped")
e.logger.Warnf("driver stopped")
return
}
case _, ok := <-leaseCh:
case _, ok := <-e.leaseCh:
{
// if lease timeout, goto top of
// this function to keepalive
if !ok {
goto label
e.logger.Warnf("lease channel stop, driver stopped")
return
}
}
case <-time.After(etcdBusinessTimeout):
{
e.logger.Errorf("ectd cli keepalive timeout")
return
}
case <-time.After(time.Duration(e.lease/2) * (time.Second)):
{
// if near to nodes time,
// renew the lease
goto label
}
}
}
}
Expand All @@ -191,12 +186,13 @@ func (e *EtcdDriver) GetNodes(ctx context.Context) (nodes []string, err error) {
func (e *EtcdDriver) Start(ctx context.Context) (err error) {
// renew a global ctx when start every time
e.ctx, e.cancel = context.WithCancel(context.TODO())
go e.heartBeat(ctx)
e.startHeartBeat(ctx)
err = e.watchService(ctx, e.serviceName)
if err != nil {
return
}
return nil
go e.keepHeartBeat()
return
}

func (e *EtcdDriver) Stop(ctx context.Context) (err error) {
Expand Down
2 changes: 1 addition & 1 deletion driver/redisdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/libi/dcron/dlog"
"github.com/redis/go-redis/v9"
redis "github.com/redis/go-redis/v9"
)

const (
Expand Down
Loading

0 comments on commit af5e8d6

Please sign in to comment.