diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 943ddb8..551f8b6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,7 +12,11 @@ jobs: name: Test on go ${{ matrix.go_version }} strategy: matrix: - go_version: ["1.19", "1.20", "1.21"] + go_version: [ + "1.19", + "1.20", + "1.21", + ] runs-on: ubuntu-latest steps: @@ -26,4 +30,7 @@ jobs: go-version: ${{ matrix.go_version }} - name: Test - run: go test -v ./... + run: go test -v $(go list ./... | grep -v github.com/libi/dcron/e2e) + + - name: Test E2E + run: go test -v ./e2e diff --git a/dcron.go b/dcron.go index 9bd95ca..cd839ef 100644 --- a/dcron.go +++ b/dcron.go @@ -36,8 +36,7 @@ type Dcron struct { nodePool INodePool running int32 - logger dlog.Logger - logInfo bool + logger dlog.Logger nodeUpdateDuration time.Duration hashReplicas int @@ -98,14 +97,14 @@ func (d *Dcron) GetLogger() dlog.Logger { // AddJob add a job func (d *Dcron) AddJob(jobName, cronStr string, job Job) (err error) { - return d.addJob(jobName, cronStr, nil, job) + return d.addJob(jobName, cronStr, job) } // AddFunc add a cron func func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error) { - return d.addJob(jobName, cronStr, cmd, nil) + return d.addJob(jobName, cronStr, cron.FuncJob(cmd)) } -func (d *Dcron) addJob(jobName, cronStr string, cmd func(), job Job) (err error) { +func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) { d.logger.Infof("addJob '%s' : %s", jobName, cronStr) d.jobsRWMut.Lock() @@ -116,7 +115,6 @@ func (d *Dcron) addJob(jobName, cronStr string, cmd func(), job Job) (err error) innerJob := JobWarpper{ Name: jobName, CronStr: cronStr, - Func: cmd, Job: job, Dcron: d, } diff --git a/dlog/logger.go b/dlog/logger.go index a70f12e..cefb334 100644 --- a/dlog/logger.go +++ b/dlog/logger.go @@ -57,18 +57,22 @@ func NewPrintfLoggerFromLogfLogger(logger LogfLogger) PrintfLogger { func NewLoggerForTest(t *testing.T) Logger { return &StdLogger{ - Log: NewPrintfLoggerFromLogfLogger(t), + Log: NewPrintfLoggerFromLogfLogger(t), + LogVerbose: true, } } +// 这个方法会打印出所有的WARN level以上的LOG func WarnPrintfLogger(l PrintfLogger) Logger { return &StdLogger{Log: l, LogVerbose: false} } +// 这个方法会打印出所有的INFO level的LOG func VerbosePrintfLogger(l PrintfLogger) Logger { return &StdLogger{Log: l, LogVerbose: true} } +// 默认的Logger构造函数,会打印出所有WARN level以上的LOG func DefaultPrintfLogger(l PrintfLogger) Logger { return WarnPrintfLogger(l) } diff --git a/driver/driver.go b/driver/driver.go index 3a70a04..4706b81 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -3,7 +3,7 @@ package driver import ( "context" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -18,7 +18,13 @@ type DriverV2 interface { NodeID() string // get nodes GetNodes(ctx context.Context) (nodes []string, err error) + + // register node to remote server (like etcd/redis), + // will create a goroutine to keep the connection. + // And then continue for other work. Start(ctx context.Context) (err error) + + // stop the goroutine of keep connection. Stop(ctx context.Context) (err error) withOption(opt Option) (err error) diff --git a/driver/etcddriver.go b/driver/etcddriver.go index cc7dd27..7d17de3 100644 --- a/driver/etcddriver.go +++ b/driver/etcddriver.go @@ -21,11 +21,13 @@ type EtcdDriver struct { nodeID string serviceName string - cli *clientv3.Client + cli *clientv3.Client + nodes *sync.Map + logger dlog.Logger + lease int64 - nodes *sync.Map leaseID clientv3.LeaseID - logger dlog.Logger + leaseCh <-chan *clientv3.LeaseKeepAliveResponse ctx context.Context cancel context.CancelFunc @@ -91,9 +93,11 @@ func (e *EtcdDriver) watcher(serviceName string) { for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { - case mvccpb.PUT: //修改或者新增 + case mvccpb.PUT: + // 修改或者新增 e.setServiceList(string(ev.Kv.Key), string(ev.Kv.Value)) - case mvccpb.DELETE: //删除 + case mvccpb.DELETE: + // 删除 e.delServiceList(string(ev.Kv.Key)) } } @@ -138,39 +142,30 @@ func (e *EtcdDriver) revoke(ctx context.Context) { } } -func (e *EtcdDriver) heartBeat(ctx context.Context) { -label: - leaseCh, err := e.keepAlive(ctx, e.nodeID) +func (e *EtcdDriver) startHeartBeat(ctx context.Context) { + var err error + e.leaseCh, err = e.keepAlive(ctx, e.nodeID) if err != nil { e.logger.Errorf("keep alive error, %v", err) return } +} + +func (e *EtcdDriver) keepHeartBeat() { for { select { case <-e.ctx.Done(): { - e.logger.Infof("driver stopped") + e.logger.Warnf("driver stopped") return } - case _, ok := <-leaseCh: + case _, ok := <-e.leaseCh: { - // if lease timeout, goto top of - // this function to keepalive if !ok { - goto label + e.logger.Warnf("lease channel stop, driver stopped") + return } } - case <-time.After(etcdBusinessTimeout): - { - e.logger.Errorf("ectd cli keepalive timeout") - return - } - case <-time.After(time.Duration(e.lease/2) * (time.Second)): - { - // if near to nodes time, - // renew the lease - goto label - } } } } @@ -191,12 +186,13 @@ func (e *EtcdDriver) GetNodes(ctx context.Context) (nodes []string, err error) { func (e *EtcdDriver) Start(ctx context.Context) (err error) { // renew a global ctx when start every time e.ctx, e.cancel = context.WithCancel(context.TODO()) - go e.heartBeat(ctx) + e.startHeartBeat(ctx) err = e.watchService(ctx, e.serviceName) if err != nil { return } - return nil + go e.keepHeartBeat() + return } func (e *EtcdDriver) Stop(ctx context.Context) (err error) { diff --git a/driver/redisdriver.go b/driver/redisdriver.go index 079103d..b7d193a 100644 --- a/driver/redisdriver.go +++ b/driver/redisdriver.go @@ -9,7 +9,7 @@ import ( "time" "github.com/libi/dcron/dlog" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" ) const ( diff --git a/dcron_test.go b/e2e/dcron_test.go similarity index 65% rename from dcron_test.go rename to e2e/dcron_test.go index ffd91a3..8afb64b 100644 --- a/dcron_test.go +++ b/e2e/dcron_test.go @@ -1,10 +1,10 @@ -package dcron_test +package e2e_test import ( - "fmt" "log" "os" "sync" + "sync/atomic" "testing" "time" @@ -20,77 +20,111 @@ const ( DefaultRedisAddr = "127.0.0.1:6379" ) -type TestJob1 struct { +type TestJobWithWG struct { Name string + WG *sync.WaitGroup + Test *testing.T + Cnt *atomic.Int32 } -func (t TestJob1) Run() { - fmt.Println("执行 testjob ", t.Name, time.Now().Format("15:04:05")) +func (job *TestJobWithWG) Run() { + job.Test.Logf("jobName=[%s], time=%s, job rest count=%d", + job.Name, + time.Now().Format("15:04:05"), + job.Cnt.Load(), + ) + if job.Cnt.Load() == 0 { + return + } else { + job.Cnt.Store(job.Cnt.Add(-1)) + if job.Cnt.Load() == 0 { + job.WG.Done() + } + } } -var testData = make(map[string]struct{}) - func TestMultiNodes(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(3) + testJobWGs := make([]*sync.WaitGroup, 0) + testJobWGs = append(testJobWGs, &sync.WaitGroup{}) + testJobWGs = append(testJobWGs, &sync.WaitGroup{}) + testJobWGs = append(testJobWGs, &sync.WaitGroup{}) + testJobWGs[0].Add(1) + testJobWGs[1].Add(1) + + testJobs := make([]*TestJobWithWG, 0) + testJobs = append( + testJobs, + &TestJobWithWG{ + Name: "s1_test1", + WG: testJobWGs[0], + Test: t, + Cnt: &atomic.Int32{}, + }, + &TestJobWithWG{ + Name: "s1_test2", + WG: testJobWGs[1], + Test: t, + Cnt: &atomic.Int32{}, + }, + &TestJobWithWG{ + Name: "s1_test3", + WG: testJobWGs[2], + Test: t, + Cnt: &atomic.Int32{}, + }) + testJobs[0].Cnt.Store(5) + testJobs[1].Cnt.Store(5) + + nodeCancel := make([](chan int), 3) + nodeCancel[0] = make(chan int, 1) + nodeCancel[1] = make(chan int, 1) + nodeCancel[2] = make(chan int, 1) - go runNode(t, wg) // 间隔1秒启动测试节点刷新逻辑 - time.Sleep(time.Second) - go runNode(t, wg) - time.Sleep(time.Second) - go runNode(t, wg) + go runNode(t, wg, testJobs, nodeCancel[0]) + <-time.After(time.Second) + go runNode(t, wg, testJobs, nodeCancel[1]) + <-time.After(time.Second) + go runNode(t, wg, testJobs, nodeCancel[2]) + + testJobWGs[0].Wait() + testJobWGs[1].Wait() + + close(nodeCancel[0]) + close(nodeCancel[1]) + close(nodeCancel[2]) wg.Wait() } -func runNode(t *testing.T, wg *sync.WaitGroup) { +func runNode(t *testing.T, wg *sync.WaitGroup, testJobs []*TestJobWithWG, cancel chan int) { redisCli := redis.NewClient(&redis.Options{ Addr: DefaultRedisAddr, }) drv := driver.NewRedisDriver(redisCli) - dcron := dcron.NewDcronWithOption("server1", drv, dcron.WithLogger(&dlog.StdLogger{ - Log: log.New(os.Stdout, "", log.LstdFlags), - })) - //添加多个任务 启动多个节点时 任务会均匀分配给各个节点 - - err := dcron.AddFunc("s1 test1", "* * * * *", func() { - // 同时启动3个节点 但是一个 job 同一时间只会执行一次 通过 map 判重 - key := "s1 test1 : " + time.Now().Format("15:04") - if _, ok := testData[key]; ok { - t.Error("job have running in other node") - } - testData[key] = struct{}{} - }) - if err != nil { - t.Error("add func error") - } - err = dcron.AddFunc("s1 test2", "* * * * *", func() { - t.Log("执行 service1 test2 任务", time.Now().Format("15:04:05")) - }) - if err != nil { - t.Error("add func error") - } + dcron := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.WithLogger( + dlog.DefaultPrintfLogger( + log.New(os.Stdout, "", log.LstdFlags)))) + // 添加多个任务 启动多个节点时 任务会均匀分配给各个节点 - testJob := TestJob1{"addtestjob"} - err = dcron.AddJob("addtestjob1", "* * * * *", testJob) - if err != nil { - t.Error("add func error") + var err error + for _, job := range testJobs { + if err = dcron.AddJob(job.Name, "* * * * *", job); err != nil { + t.Error("add job error") + } } - err = dcron.AddFunc("s1 test3", "* * * * *", func() { - t.Log("执行 service1 test3 任务", time.Now().Format("15:04:05")) - }) - if err != nil { - t.Error("add func error") - } dcron.Start() - //移除测试 - dcron.Remove("s1 test3") - <-time.After(120 * time.Second) - wg.Done() + dcron.Remove(testJobs[2].Name) + <-cancel dcron.Stop() + wg.Done() } func Test_SecondsJob(t *testing.T) { @@ -117,9 +151,9 @@ func runSecondNode(id string, wg *sync.WaitGroup, runningTime time.Duration, t * drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption(t.Name(), drv, dcron.CronOptionSeconds(), - dcron.WithLogger(&dlog.StdLogger{ - Log: log.New(os.Stdout, "["+id+"]", log.LstdFlags), - }), + dcron.WithLogger(dlog.DefaultPrintfLogger( + log.New(os.Stdout, "["+id+"]", log.LstdFlags), + )), dcron.CronOptionChain(cron.Recover( cron.DefaultLogger, )), @@ -148,15 +182,13 @@ func runSecondNodeWithLogger(id string, wg *sync.WaitGroup, runningTime time.Dur Addr: DefaultRedisAddr, }) drv := driver.NewRedisDriver(redisCli) - dcr := dcron.NewDcronWithOption(t.Name(), drv, - // must use `WithPrintLogInfo` before `WithLogger` - // because we need to set up `cron` log level, it depends - // on ths value of this configuration. - dcron.WithPrintLogInfo(), + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, dcron.CronOptionSeconds(), - dcron.WithLogger(&dlog.StdLogger{ - Log: log.New(os.Stdout, "["+id+"]", log.LstdFlags), - }), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.New(os.Stdout, "["+id+"]", log.LstdFlags), + )), dcron.CronOptionChain(cron.Recover( cron.DefaultLogger, )), @@ -211,9 +243,9 @@ func Test_WithClusterStableNodes(t *testing.T) { drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption(t.Name(), drv, dcron.CronOptionSeconds(), - dcron.WithLogger(&dlog.StdLogger{ - Log: log.New(os.Stdout, "["+id+"]", log.LstdFlags), - }), + dcron.WithLogger(dlog.DefaultPrintfLogger( + log.New(os.Stdout, "["+id+"]", log.LstdFlags)), + ), dcron.WithClusterStable(timeWindow), dcron.WithNodeUpdateDuration(timeWindow), ) diff --git a/driver/etcddriver_test.go b/e2e/etcddriver_test.go similarity index 87% rename from driver/etcddriver_test.go rename to e2e/etcddriver_test.go index e0f8225..12e6a5c 100644 --- a/driver/etcddriver_test.go +++ b/e2e/etcddriver_test.go @@ -1,4 +1,4 @@ -package driver_test +package e2e_test import ( "context" @@ -35,7 +35,7 @@ func TestEtcdDriver_GetNodes(t *testing.T) { require.Nil(t, err) drvs = append(drvs, drv) } - <-time.After(5 * time.Second) + <-time.After(15 * time.Second) for _, v := range drvs { nodes, err := v.GetNodes(context.Background()) require.Nil(t, err) @@ -64,11 +64,9 @@ func TestEtcdDriver_Stop(t *testing.T) { DialTimeout: 3 * time.Second, }) drv2.Init(t.Name(), driver.NewTimeoutOption(5*time.Second), driver.NewLoggerOption(dlog.NewLoggerForTest(t))) - err = drv2.Start(context.Background()) - require.Nil(t, err) + require.Nil(t, drv2.Start(context.Background())) + require.Nil(t, drv1.Start(context.Background())) - err = drv1.Start(context.Background()) - require.Nil(t, err) <-time.After(3 * time.Second) nodes, err = drv1.GetNodes(context.Background()) require.Nil(t, err) @@ -80,17 +78,21 @@ func TestEtcdDriver_Stop(t *testing.T) { drv1.Stop(context.Background()) - <-time.After(5 * time.Second) + <-time.After(15 * time.Second) nodes, err = drv2.GetNodes(context.Background()) require.Nil(t, err) require.Len(t, nodes, 1) err = drv1.Start(context.Background()) require.Nil(t, err) - <-time.After(5 * time.Second) + <-time.After(15 * time.Second) nodes, err = drv2.GetNodes(context.Background()) require.Nil(t, err) require.Len(t, nodes, 2) + nodes, err = drv1.GetNodes(context.Background()) + require.Nil(t, err) + require.Len(t, nodes, 2) drv2.Stop(context.Background()) + drv1.Stop(context.Background()) } diff --git a/driver/redisdriver_test.go b/e2e/redisdriver_test.go similarity index 87% rename from driver/redisdriver_test.go rename to e2e/redisdriver_test.go index 4cf380f..f94c9c8 100644 --- a/driver/redisdriver_test.go +++ b/e2e/redisdriver_test.go @@ -1,4 +1,4 @@ -package driver_test +package e2e_test import ( "context" @@ -9,12 +9,12 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/libi/dcron/dlog" "github.com/libi/dcron/driver" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" ) func testFuncNewRedisDriver(addr string) driver.DriverV2 { - log.Println("redis=", addr) + log.Printf("redis=%s", addr) redisCli := redis.NewClient(&redis.Options{ Addr: addr, }) @@ -60,11 +60,9 @@ func TestRedisDriver_Stop(t *testing.T) { drv2.Init(t.Name(), driver.NewTimeoutOption(5*time.Second), driver.NewLoggerOption(dlog.NewLoggerForTest(t))) - err = drv2.Start(context.Background()) - require.Nil(t, err) - err = drv1.Start(context.Background()) - require.Nil(t, err) + require.Nil(t, drv2.Start(context.Background())) + require.Nil(t, drv1.Start(context.Background())) nodes, err = drv1.GetNodes(context.Background()) require.Nil(t, err) @@ -87,6 +85,10 @@ func TestRedisDriver_Stop(t *testing.T) { nodes, err = drv2.GetNodes(context.Background()) require.Nil(t, err) require.Len(t, nodes, 2) + nodes, err = drv1.GetNodes(context.Background()) + require.Nil(t, err) + require.Len(t, nodes, 2) drv2.Stop(context.Background()) + drv1.Stop(context.Background()) } diff --git a/driver/rediszsetdriver_test.go b/e2e/rediszsetdriver_test.go similarity index 88% rename from driver/rediszsetdriver_test.go rename to e2e/rediszsetdriver_test.go index 4302093..1c024b1 100644 --- a/driver/rediszsetdriver_test.go +++ b/e2e/rediszsetdriver_test.go @@ -1,4 +1,4 @@ -package driver_test +package e2e_test import ( "context" @@ -8,7 +8,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/libi/dcron/dlog" "github.com/libi/dcron/driver" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" ) @@ -58,11 +58,9 @@ func TestRedisZSetDriver_Stop(t *testing.T) { drv2.Init(t.Name(), driver.NewTimeoutOption(5*time.Second), driver.NewLoggerOption(dlog.NewLoggerForTest(t))) - err = drv2.Start(context.Background()) - require.Nil(t, err) - err = drv1.Start(context.Background()) - require.Nil(t, err) + require.Nil(t, drv2.Start(context.Background())) + require.Nil(t, drv1.Start(context.Background())) nodes, err = drv1.GetNodes(context.Background()) require.Nil(t, err) @@ -85,6 +83,10 @@ func TestRedisZSetDriver_Stop(t *testing.T) { nodes, err = drv2.GetNodes(context.Background()) require.Nil(t, err) require.Len(t, nodes, 2) + nodes, err = drv1.GetNodes(context.Background()) + require.Nil(t, err) + require.Len(t, nodes, 2) drv2.Stop(context.Background()) + drv1.Stop(context.Background()) } diff --git a/examples/example/example.go b/examples/example/example.go index 6764026..7fae1ef 100644 --- a/examples/example/example.go +++ b/examples/example/example.go @@ -64,9 +64,12 @@ func main() { }) driver := driver.NewRedisDriver(redisCli) logger := &dlog.StdLogger{ - Log: log.New(os.Stdout, "["+*subId+"]", log.LstdFlags), + Log: log.New(os.Stdout, "["+*subId+"]", log.LstdFlags), + LogVerbose: true, } - dcron := dcron.NewDcronWithOption(*serverName, driver, + dcron := dcron.NewDcronWithOption( + *serverName, + driver, dcron.WithLogger(logger), dcron.WithHashReplicas(10), dcron.WithNodeUpdateDuration(time.Second*10), diff --git a/inodepool_test.go b/inodepool_test.go index 7f0c6ba..ce701e3 100644 --- a/inodepool_test.go +++ b/inodepool_test.go @@ -47,6 +47,12 @@ func (ts *TestINodePoolSuite) setUpEtcd() { ts.etcdsvr = integration.NewLazyCluster() } +func (ts *TestINodePoolSuite) stopAllNodePools(nodePools []dcron.INodePool) { + for _, nodePool := range nodePools { + nodePool.Stop(context.Background()) + } +} + func (ts *TestINodePoolSuite) declareRedisDrivers(clients *[]*redis.Client, drivers *[]driver.DriverV2, numberOfNodes int) { for i := 0; i < numberOfNodes; i++ { *clients = append(*clients, redis.NewClient(&redis.Options{ @@ -116,6 +122,7 @@ func (ts *TestINodePoolSuite) TestMultiNodesRedis() { nodePools = append(nodePools, dcron.NewNodePool(ServiceName, drivers[i], updateDuration, ts.defaultHashReplicas, nil)) } ts.runCheckJobAvailable(numberOfNodes, ServiceName, &nodePools, updateDuration) + ts.stopAllNodePools(nodePools) } func (ts *TestINodePoolSuite) TestMultiNodesEtcd() { @@ -134,6 +141,7 @@ func (ts *TestINodePoolSuite) TestMultiNodesEtcd() { nodePools = append(nodePools, dcron.NewNodePool(ServiceName, drivers[i], updateDuration, ts.defaultHashReplicas, nil)) } ts.runCheckJobAvailable(numberOfNodes, ServiceName, &nodePools, updateDuration) + ts.stopAllNodePools(nodePools) } func (ts *TestINodePoolSuite) TestMultiNodesRedisZSet() { @@ -152,6 +160,7 @@ func (ts *TestINodePoolSuite) TestMultiNodesRedisZSet() { nodePools = append(nodePools, dcron.NewNodePool(ServiceName, drivers[i], updateDuration, ts.defaultHashReplicas, nil)) } ts.runCheckJobAvailable(numberOfNodes, ServiceName, &nodePools, updateDuration) + ts.stopAllNodePools(nodePools) } func TestTestINodePoolSuite(t *testing.T) { diff --git a/job_warpper.go b/job_warpper.go index fccc513..fd78e78 100644 --- a/job_warpper.go +++ b/job_warpper.go @@ -23,7 +23,6 @@ type JobWarpper struct { Dcron *Dcron Name string CronStr string - Func func() Job Job } @@ -36,9 +35,6 @@ func (job JobWarpper) Run() { } func (job JobWarpper) Execute() { - if job.Func != nil { - job.Func() - } if job.Job != nil { job.Job.Run() } diff --git a/option.go b/option.go index 58bfa24..af47004 100644 --- a/option.go +++ b/option.go @@ -20,13 +20,6 @@ func WithLogger(logger dlog.Logger) Option { } } -// PrintLogInfo set log info level -func WithPrintLogInfo() Option { - return func(dcron *Dcron) { - dcron.logInfo = true - } -} - // WithNodeUpdateDuration set node update duration func WithNodeUpdateDuration(d time.Duration) Option { return func(dcron *Dcron) {