diff --git a/c.go b/c.go index 5d1e50ce..cf13d37b 100644 --- a/c.go +++ b/c.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/log" "github.com/knadh/koanf/providers/confmap" "github.com/knadh/koanf/providers/file" + "github.com/spf13/cobra" "go.uber.org/dig" ) @@ -32,7 +33,7 @@ type C struct { Env contract.Env contract.ConfigAccessor logging.LevelLogger - contract.Container + *container.Container contract.Dispatcher di *dig.Container } @@ -224,7 +225,7 @@ func (c *C) AddModule(modules ...interface{}) { } } -// Provide adds a dependencies provider to the core. Note the dependency provider +// Provide adds dependencies provider to the core. Note the dependency provider // must be a function in the form of: // // func(foo Foo) Bar @@ -373,7 +374,18 @@ func (c *C) Serve(ctx context.Context) error { }) } -// AddModuleFunc add the module after Invoking its' constructor. Clean up +// Shutdown iterates through every CloserProvider registered in the container, +// and calls them in the reversed order of registration. +func (c *C) Shutdown() { + modules := c.Modules() + for i := range modules { + if closer, ok := modules[len(modules)-i-1].(CloserProvider); ok { + closer.ProvideCloser() + } + } +} + +// AddModuleFunc add the module after Invoking its constructor. Clean up // functions and errors are handled automatically. func (c *C) AddModuleFunc(constructor interface{}) { c.provide(constructor) @@ -404,6 +416,17 @@ func (c *C) AddModuleFunc(constructor interface{}) { } } +// ApplyRootCommand iterates through every CommandProvider registered in the container, +// and introduce the root *cobra.Command to everyone. +func (c *C) ApplyRootCommand(command *cobra.Command) { + modules := c.Modules() + for i := range modules { + if p, ok := modules[i].(CommandProvider); ok { + p.ProvideCommand(command) + } + } +} + // Invoke runs the given function after instantiating its dependencies. Any // arguments that the function has are treated as its dependencies. The // dependencies are instantiated in an unspecified order along with any diff --git a/c_test.go b/c_test.go index 8dd47cf4..c27f7297 100644 --- a/c_test.go +++ b/c_test.go @@ -200,3 +200,19 @@ func TestC_cleanup(t *testing.T) { assert.True(t, dependencyCleanupCalled) assert.True(t, moduleCleanupCalled) } + +type closer func() + +func (f closer) ProvideCloser() { + f() +} + +func TestContainer_Shutdown(t *testing.T) { + seq := 0 + container := New() + container.AddModule(closer(func() { assert.Equal(t, 2, seq); seq = 1 })) + container.AddModule(closer(func() { assert.Equal(t, 3, seq); seq = 2 })) + container.AddModule(closer(func() { assert.Equal(t, 0, seq); seq = 3 })) + container.Shutdown() + assert.Equal(t, 1, seq) +} diff --git a/container/container.go b/container/container.go index 7dbb47df..81ac5cf7 100644 --- a/container/container.go +++ b/container/container.go @@ -5,87 +5,13 @@ package container import ( "github.com/DoNewsCode/core/contract" - "github.com/gorilla/mux" - "github.com/oklog/run" - "github.com/robfig/cron/v3" - "github.com/spf13/cobra" - "google.golang.org/grpc" ) var _ contract.Container = (*Container)(nil) -// CronProvider provides cron jobs. -type CronProvider interface { - ProvideCron(crontab *cron.Cron) -} - -// CommandProvider provides cobra.Command. -type CommandProvider interface { - ProvideCommand(command *cobra.Command) -} - -// HTTPProvider provides http services. -type HTTPProvider interface { - ProvideHTTP(router *mux.Router) -} - -// GRPCProvider provides gRPC services. -type GRPCProvider interface { - ProvideGRPC(server *grpc.Server) -} - -// CloserProvider provides a shutdown function that will be called when service exits. -type CloserProvider interface { - ProvideCloser() -} - -// RunProvider provides a runnable actor. Use it to register any server-like -// actions. For example, kafka consumer can be started here. -type RunProvider interface { - ProvideRunGroup(group *run.Group) -} - // Container holds all modules registered. type Container struct { - httpProviders []func(router *mux.Router) - grpcProviders []func(server *grpc.Server) - closerProviders []func() - runProviders []func(g *run.Group) - modules []interface{} - cronProviders []func(crontab *cron.Cron) - commandProviders []func(command *cobra.Command) -} - -// ApplyRouter iterates through every HTTPProvider registered in the container, -// and introduce the router to everyone. -func (c *Container) ApplyRouter(router *mux.Router) { - for _, p := range c.httpProviders { - p(router) - } -} - -// ApplyGRPCServer iterates through every GRPCProvider registered in the container, -// and introduce a *grpc.Server to everyone. -func (c *Container) ApplyGRPCServer(server *grpc.Server) { - for _, p := range c.grpcProviders { - p(server) - } -} - -// Shutdown iterates through every CloserProvider registered in the container, -// and calls them in the reversed order of registration. -func (c *Container) Shutdown() { - for i := len(c.closerProviders) - 1; i >= 0; i-- { - c.closerProviders[i]() - } -} - -// ApplyRunGroup iterates through every RunProvider registered in the container, -// and introduce the *run.Group to everyone. -func (c *Container) ApplyRunGroup(g *run.Group) { - for _, p := range c.runProviders { - p(g) - } + modules []interface{} } // Modules returns all modules in the container. This method is used to scan for @@ -107,40 +33,6 @@ func (c *Container) Modules() []interface{} { return c.modules } -// ApplyCron iterates through every CronProvider registered in the container, -// and introduce the *cron.Cron to everyone. -func (c *Container) ApplyCron(crontab *cron.Cron) { - for _, p := range c.cronProviders { - p(crontab) - } -} - -// ApplyRootCommand iterates through every CommandProvider registered in the container, -// and introduce the root *cobra.Command to everyone. -func (c *Container) ApplyRootCommand(command *cobra.Command) { - for _, p := range c.commandProviders { - p(command) - } -} - func (c *Container) AddModule(module interface{}) { - if p, ok := module.(HTTPProvider); ok { - c.httpProviders = append(c.httpProviders, p.ProvideHTTP) - } - if p, ok := module.(GRPCProvider); ok { - c.grpcProviders = append(c.grpcProviders, p.ProvideGRPC) - } - if p, ok := module.(CronProvider); ok { - c.cronProviders = append(c.cronProviders, p.ProvideCron) - } - if p, ok := module.(RunProvider); ok { - c.runProviders = append(c.runProviders, p.ProvideRunGroup) - } - if p, ok := module.(CommandProvider); ok { - c.commandProviders = append(c.commandProviders, p.ProvideCommand) - } - if p, ok := module.(CloserProvider); ok { - c.closerProviders = append(c.closerProviders, p.ProvideCloser) - } c.modules = append(c.modules, module) } diff --git a/container/container_test.go b/container/container_test.go index 51ec767d..7ea597b2 100644 --- a/container/container_test.go +++ b/container/container_test.go @@ -3,36 +3,9 @@ package container import ( "testing" - "github.com/gorilla/mux" - "github.com/oklog/run" - "github.com/robfig/cron/v3" - "github.com/spf13/cobra" "github.com/stretchr/testify/assert" - "google.golang.org/grpc" ) -type mock struct{} - -func (m mock) ProvideRunGroup(group *run.Group) { - panic("implement me") -} - -func (m mock) ProvideGRPC(server *grpc.Server) { - panic("implement me") -} - -func (m mock) ProvideHTTP(router *mux.Router) { - panic("implement me") -} - -func (m mock) ProvideCron(crontab *cron.Cron) { - panic("implement me") -} - -func (m mock) ProvideCommand(command *cobra.Command) { - panic("implement me") -} - func TestContainer_AddModule(t *testing.T) { cases := []struct { name string @@ -43,19 +16,7 @@ func TestContainer_AddModule(t *testing.T) { "any", "foo", func(t *testing.T, container Container) { - assert.Contains(t, container.modules, "foo") - }, - }, - { - "mock", - mock{}, - func(t *testing.T, container Container) { - assert.Len(t, container.runProviders, 1) - assert.Len(t, container.httpProviders, 1) - assert.Len(t, container.grpcProviders, 1) - assert.Len(t, container.cronProviders, 1) - assert.Len(t, container.commandProviders, 1) - assert.Len(t, container.closerProviders, 0) + assert.Contains(t, container.Modules(), "foo") }, }, } @@ -70,19 +31,3 @@ func TestContainer_AddModule(t *testing.T) { }) } } - -type closer func() - -func (f closer) ProvideCloser() { - f() -} - -func TestContainer_Shutdown(t *testing.T) { - seq := 0 - container := Container{} - container.AddModule(closer(func() { assert.Equal(t, 2, seq); seq = 1 })) - container.AddModule(closer(func() { assert.Equal(t, 3, seq); seq = 2 })) - container.AddModule(closer(func() { assert.Equal(t, 0, seq); seq = 3 })) - container.Shutdown() - assert.Equal(t, 1, seq) -} diff --git a/contract/container.go b/contract/container.go index 220b085a..7c893fc9 100644 --- a/contract/container.go +++ b/contract/container.go @@ -1,21 +1,6 @@ package contract -import ( - "github.com/gorilla/mux" - "github.com/oklog/run" - "github.com/robfig/cron/v3" - "github.com/spf13/cobra" - "google.golang.org/grpc" -) - // Container holds modules. type Container interface { - ApplyRouter(router *mux.Router) - ApplyGRPCServer(server *grpc.Server) - ApplyCron(crontab *cron.Cron) - ApplyRunGroup(g *run.Group) - ApplyRootCommand(command *cobra.Command) - Shutdown() Modules() []interface{} - AddModule(module interface{}) } diff --git a/cron/config.go b/cron/config.go new file mode 100644 index 00000000..690df18c --- /dev/null +++ b/cron/config.go @@ -0,0 +1,19 @@ +package cron + +import ( + "time" + + "github.com/robfig/cron/v3" +) + +// Config is the configuration for the cron package. +type Config struct { + // Parser is the parser to parse cron expressions. + Parser cron.ScheduleParser + // Location is the timezone to use in parsing cron expressions. + Location *time.Location + // GlobalOptions are the job options that are applied to all jobs. + GlobalOptions []JobOption + // EnableSeconds is whether to enable seconds in the cron expression. + EnableSeconds bool +} diff --git a/cron/cron.go b/cron/cron.go new file mode 100644 index 00000000..8d8249f1 --- /dev/null +++ b/cron/cron.go @@ -0,0 +1,230 @@ +// Package cron is a partial rewrite based on the github.com/robfig/cron/v3 +// package. The API in this package enforces context passing and error +// propagation, and consequently enables better logging, metrics and tracing +// support. +package cron + +import ( + "container/heap" + "context" + "fmt" + "sync" + "time" + + "github.com/robfig/cron/v3" +) + +// Cron schedules jobs to be run on the specified schedule. +type Cron struct { + parser cron.ScheduleParser + lock *sync.Cond + jobDescriptors jobDescriptors + globalMiddleware []JobOption + location *time.Location + nextID int + quitWaiter sync.WaitGroup +} + +// New returns a new Cron instance. +func New(config Config) *Cron { + c := &Cron{ + parser: config.Parser, + lock: sync.NewCond(&sync.Mutex{}), + jobDescriptors: jobDescriptors{}, + globalMiddleware: config.GlobalOptions, + location: config.Location, + nextID: 1, + } + if config.Parser == nil { + if config.EnableSeconds { + c.parser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + } else { + c.parser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + } + } + if config.Location == nil { + c.location = time.Local + } + return c +} + +// Add adds a new job to the cron scheduler.A list of middleware can be supplied. +// Note the error returned by the runner will be discarded. It is the user's +// responsibility to handle the error via middleware. +func (c *Cron) Add(spec string, runner func(ctx context.Context) error, middleware ...JobOption) (JobID, error) { + schedule, err := c.parser.Parse(spec) + if err != nil { + return 0, err + } + + jobDescriptor := JobDescriptor{ + RawSpec: spec, + Run: runner, + Schedule: schedule, + next: schedule.Next(c.now()), + } + + middleware = append(c.globalMiddleware, middleware...) + + for i := len(middleware) - 1; i >= 0; i-- { + middleware[i](&jobDescriptor) + } + + c.lock.L.Lock() + defer c.lock.L.Unlock() + defer c.lock.Broadcast() + + jobDescriptor.ID = JobID(c.nextID) + c.nextID++ + + if jobDescriptor.Name == "" { + jobDescriptor.Name = fmt.Sprintf("job-%d", jobDescriptor.ID) + } + + heap.Push(&c.jobDescriptors, &jobDescriptor) + return jobDescriptor.ID, nil +} + +// Remove removes a job from the cron scheduler. +func (c *Cron) Remove(id JobID) { + c.lock.L.Lock() + defer c.lock.L.Unlock() + + for i, descriptor := range c.jobDescriptors { + if descriptor.ID == id { + heap.Remove(&c.jobDescriptors, i) + } + } +} + +// Descriptors returns a list of all job descriptors. +func (c *Cron) Descriptors() []JobDescriptor { + var descriptors []JobDescriptor + + c.lock.L.Lock() + defer c.lock.L.Unlock() + + for _, descriptor := range c.jobDescriptors { + descriptors = append(descriptors, *descriptor) + } + return descriptors +} + +// Run starts the cron scheduler. It is a blocking call. +func (c *Cron) Run(ctx context.Context) error { + defer c.quitWaiter.Wait() + + c.lock.L.Lock() + now := c.now() + for _, descriptor := range c.jobDescriptors { + descriptor.next = descriptor.Schedule.Next(now) + } + heap.Init(&c.jobDescriptors) + c.lock.L.Unlock() + + var once sync.Once + for { + c.lock.L.Lock() + // Determine the next entry to run. + for c.jobDescriptors.Len() == 0 || c.jobDescriptors[0].next.IsZero() { + c.broadcastAtDeadlineOnce(ctx, &once) + select { + case <-ctx.Done(): + c.lock.L.Unlock() + return ctx.Err() + default: + c.lock.Wait() + } + } + gap := c.jobDescriptors[0].next.Sub(now) + c.lock.L.Unlock() + + timer := time.NewTimer(gap) + + select { + case now = <-timer.C: + c.lock.L.Lock() + for { + if c.jobDescriptors[0].next.After(now) || c.jobDescriptors[0].next.IsZero() { + break + } + descriptor := heap.Pop(&c.jobDescriptors).(*JobDescriptor) + + descriptor.prev = descriptor.next + descriptor.next = descriptor.Schedule.Next(now) + heap.Push(&c.jobDescriptors, descriptor) + + var innerCtx context.Context + innerCtx = context.WithValue(ctx, prevContextKey, descriptor.prev) + innerCtx = context.WithValue(innerCtx, nextContextKey, descriptor.next) + + c.quitWaiter.Add(1) + go func() { + defer c.quitWaiter.Done() + descriptor.Run(innerCtx) + }() + } + c.lock.L.Unlock() + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + } + } +} + +func (c *Cron) now() time.Time { + return time.Now().In(c.location) +} + +func (c *Cron) broadcastAtDeadlineOnce(ctx context.Context, once *sync.Once) { + once.Do(func() { + go func() { + <-ctx.Done() + c.lock.Broadcast() + }() + }) +} + +type jobDescriptors []*JobDescriptor + +func (j *jobDescriptors) Len() int { + return len(*j) +} + +func (j *jobDescriptors) Less(i, k int) bool { + return (*j)[i].next.Before((*j)[k].next) +} + +func (j *jobDescriptors) Swap(i, k int) { + (*j)[i], (*j)[k] = (*j)[k], (*j)[i] +} + +func (j *jobDescriptors) Push(x interface{}) { + *j = append(*j, x.(*JobDescriptor)) +} + +func (j *jobDescriptors) Pop() (v interface{}) { + *j, v = (*j)[:j.Len()-1], (*j)[j.Len()-1] + return v +} + +// JobID is the identifier of jobs. +type JobID int + +// JobDescriptor contains the information about jobs. +type JobDescriptor struct { + // ID is the identifier of job + ID JobID + // Name is an optional field typically added by WithName. It can be useful in logging and metrics. + Name string + // RawSpec contains the string format of cron schedule format. + RawSpec string + // Schedule is the parsed version of RawSpec. It can be overridden by WithSchedule. + Schedule cron.Schedule + // Run is the actual work to be done. + Run func(ctx context.Context) error + // next is the next time the job should run. + next time.Time + // prev is the last time the job ran. + prev time.Time +} diff --git a/cron/cron_test.go b/cron/cron_test.go new file mode 100644 index 00000000..7aef2ce8 --- /dev/null +++ b/cron/cron_test.go @@ -0,0 +1,94 @@ +package cron + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type fakeOnceScheduler struct { + returned int32 + runAfter time.Duration +} + +func (s *fakeOnceScheduler) Next(t time.Time) time.Time { + if old := atomic.SwapInt32(&s.returned, 1); old == 0 { + return t.Add(s.runAfter) + } + return time.Time{} +} + +func TestCron_heapsort(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Millisecond) + defer cancel() + + c := New(Config{EnableSeconds: true}) + var i int32 + + c.Add("3 * * * * *", func(ctx context.Context) error { + assert.Equal(t, int32(2), atomic.SwapInt32(&i, 3)) + return nil + }, WithSchedule(&fakeOnceScheduler{runAfter: 3 * time.Millisecond})) + c.Add("1 * * * * *", func(ctx context.Context) error { + assert.Equal(t, int32(0), atomic.SwapInt32(&i, 1)) + return nil + }, WithSchedule(&fakeOnceScheduler{runAfter: 1 * time.Millisecond})) + c.Add("2 * * * * *", func(ctx context.Context) error { + assert.Equal(t, int32(0), atomic.SwapInt32(&i, 2)) + return nil + }, WithSchedule(&fakeOnceScheduler{runAfter: 2 * time.Millisecond})) + c.Run(ctx) +} + +func TestCron_no_job(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + c := New(Config{}) + c.Run(ctx) +} + +func TestCron_late_job(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + c := New(Config{EnableSeconds: true}) + + go c.Run(ctx) + time.Sleep(time.Millisecond) + ch := make(chan struct{}) + c.Add("* * * * * *", func(ctx context.Context) error { + ch <- struct{}{} + return nil + }) + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Fatal("timeout") + } +} + +func TestCron_remove_job(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond) + defer cancel() + + c := New(Config{}) + go c.Run(ctx) + + ch := make(chan struct{}) + id, _ := c.Add("* * * * * *", func(ctx context.Context) error { + ch <- struct{}{} + return nil + }, WithSchedule(&fakeOnceScheduler{runAfter: time.Millisecond})) + c.Remove(id) + select { + case <-ch: + t.Fatal("should not be called") + case <-time.After(2 * time.Millisecond): + } +} diff --git a/cron/example_test.go b/cron/example_test.go new file mode 100644 index 00000000..257b4ad2 --- /dev/null +++ b/cron/example_test.go @@ -0,0 +1,45 @@ +package cron_test + +import ( + "context" + "fmt" + "time" + + "github.com/DoNewsCode/core" + "github.com/DoNewsCode/core/cron" + "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/observability" +) + +type CronModule struct { + metrics *cron.CronJobMetrics +} + +func NewCronModule(metrics *cron.CronJobMetrics) *CronModule { + return &CronModule{metrics: metrics} +} + +func (module *CronModule) ProvideCron(crontab *cron.Cron) { + crontab.Add("* * * * * *", func(ctx context.Context) error { + fmt.Println("I am a cron") + return nil + }, cron.WithMetrics(module.metrics), cron.WithName("foo")) +} + +func Example() { + c := core.Default(core.WithInline("log.level", "none")) + c.Provide(observability.Providers()) + c.Provide( + di.Deps{func() *cron.Cron { + return cron.New(cron.Config{EnableSeconds: true}) + }}, + ) + + c.AddModuleFunc(NewCronModule) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + c.Serve(ctx) + // Output: + // I am a cron +} diff --git a/cron/helper.go b/cron/helper.go new file mode 100644 index 00000000..9875351a --- /dev/null +++ b/cron/helper.go @@ -0,0 +1,33 @@ +package cron + +import ( + "context" + "time" +) + +var ( + prevContextKey = struct{}{} + nextContextKey = struct{}{} +) + +// GetCurrentSchedule returns the current schedule for the given context. +func GetCurrentSchedule(ctx context.Context) time.Time { + if ctx == nil { + return time.Time{} + } + if t, ok := ctx.Value(prevContextKey).(time.Time); ok { + return t + } + return time.Time{} +} + +// GetNextSchedule returns the next schedule for the given context. +func GetNextSchedule(ctx context.Context) time.Time { + if ctx == nil { + return time.Time{} + } + if t, ok := ctx.Value(nextContextKey).(time.Time); ok { + return t + } + return time.Time{} +} diff --git a/cron/helper_test.go b/cron/helper_test.go new file mode 100644 index 00000000..f7d7f2a6 --- /dev/null +++ b/cron/helper_test.go @@ -0,0 +1,15 @@ +package cron + +import ( + "context" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestGetCurrentSchedule(t *testing.T) { + assert.True(t, GetCurrentSchedule(context.Background()).IsZero()) +} + +func TestGetNextSchedule(t *testing.T) { + assert.True(t, GetNextSchedule(context.Background()).IsZero()) +} diff --git a/cronopts/metrics.go b/cron/metrics.go similarity index 64% rename from cronopts/metrics.go rename to cron/metrics.go index c395f190..0bf1831d 100644 --- a/cronopts/metrics.go +++ b/cron/metrics.go @@ -1,10 +1,9 @@ -package cronopts +package cron import ( "time" "github.com/go-kit/kit/metrics" - "github.com/robfig/cron/v3" ) // CronJobMetrics collects metrics for cron jobs. @@ -12,9 +11,10 @@ type CronJobMetrics struct { cronJobDurationSeconds metrics.Histogram cronJobFailCount metrics.Counter - // labels that has been set - module string - job string + // labels that have been set + module string + job string + schedule string } // NewCronJobMetrics constructs a new *CronJobMetrics, setting default labels to "unknown". @@ -24,6 +24,7 @@ func NewCronJobMetrics(histogram metrics.Histogram, counter metrics.Counter) *Cr cronJobFailCount: counter, module: "unknown", job: "unknown", + schedule: "unknown", } } @@ -34,6 +35,7 @@ func (c *CronJobMetrics) Module(module string) *CronJobMetrics { cronJobFailCount: c.cronJobFailCount, module: module, job: c.job, + schedule: c.schedule, } } @@ -44,29 +46,27 @@ func (c *CronJobMetrics) Job(job string) *CronJobMetrics { cronJobFailCount: c.cronJobFailCount, module: c.module, job: job, + schedule: c.schedule, + } +} + +// Schedule specifies the schedule label for CronJobMetrics. +func (c *CronJobMetrics) Schedule(schedule string) *CronJobMetrics { + return &CronJobMetrics{ + cronJobDurationSeconds: c.cronJobDurationSeconds, + cronJobFailCount: c.cronJobFailCount, + module: c.module, + job: c.job, + schedule: schedule, } } // Fail marks the job as failed. func (c *CronJobMetrics) Fail() { - c.cronJobFailCount.With("module", c.module, "job", c.job).Add(1) + c.cronJobFailCount.With("module", c.module, "job", c.job, "schedule", c.schedule).Add(1) } // Observe records the duration of the job. func (c *CronJobMetrics) Observe(duration time.Duration) { - c.cronJobDurationSeconds.With("module", c.module, "job", c.job).Observe(duration.Seconds()) -} - -// Measure wraps the given job and records the duration and success. -func (c *CronJobMetrics) Measure(job cron.Job) cron.Job { - return cron.FuncJob(func() { - start := time.Now() - defer c.cronJobDurationSeconds.With("module", c.module, "job", c.job).Observe(time.Since(start).Seconds()) - job.Run() - }) -} - -// Measure returns a job wrapper that wraps the given job and records the duration and success. -func Measure(c *CronJobMetrics) cron.JobWrapper { - return c.Measure + c.cronJobDurationSeconds.With("module", c.module, "job", c.job, "schedule", c.schedule).Observe(duration.Seconds()) } diff --git a/cron/options.go b/cron/options.go new file mode 100644 index 00000000..8f11be3a --- /dev/null +++ b/cron/options.go @@ -0,0 +1,156 @@ +package cron + +import ( + "context" + "errors" + "fmt" + "runtime/debug" + "time" + + "github.com/DoNewsCode/core/logging" + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/robfig/cron/v3" +) + +// JobOption is a middleware for cron jobs. +type JobOption func(descriptors *JobDescriptor) + +// WithName sets the name of the job. +func WithName(name string) JobOption { + return func(descriptor *JobDescriptor) { + descriptor.Name = name + } +} + +// WithSchedule sets the cron schedule of the job. +func WithSchedule(schedule cron.Schedule) JobOption { + return func(descriptor *JobDescriptor) { + descriptor.Schedule = schedule + } +} + +// WithMetrics returns a new JobDescriptor that will report metrics. +func WithMetrics(metrics *CronJobMetrics) JobOption { + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + start := time.Now() + m := metrics.Job(descriptor.Name).Schedule(descriptor.RawSpec) + defer m.Observe(time.Since(start)) + err := innerRun(ctx) + if err != nil { + m.Fail() + return err + } + return nil + } + } +} + +// WithLogging returns a new Universal job that will log. +func WithLogging(logger log.Logger) JobOption { + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + due := GetCurrentSchedule(ctx) + delayed := time.Since(due) + l := logging.WithContext(logger, ctx) + if delayed > time.Second { + l = log.With(l, "delayed", delayed) + } + l = log.With(l, "job", descriptor.Name, "schedule", descriptor.RawSpec) + l.Log("msg", logging.Sprintf("job %s started", descriptor.Name)) + err := innerRun(ctx) + if err != nil { + l.Log("msg", logging.Sprintf("job %s finished with error: %s", descriptor.Name, err)) + return err + } + l.Log("msg", logging.Sprintf("job %s completed", descriptor.Name)) + return nil + } + } +} + +// WithTracing returns a new Universal job that will trace. +func WithTracing(tracer opentracing.Tracer) JobOption { + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, fmt.Sprintf("Job: %s", descriptor.Name)) + defer span.Finish() + span.SetTag("schedule", descriptor.RawSpec) + err := innerRun(ctx) + if err != nil { + ext.LogError(span, err) + return err + } + return nil + } + } +} + +// SkipIfOverlap returns a new JobDescriptor that will skip the job if it overlaps with another job. +func SkipIfOverlap() JobOption { + ch := make(chan struct{}, 1) + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + select { + case ch <- struct{}{}: + defer func() { + <-ch + }() + return innerRun(ctx) + default: + return errors.New("skipped due to overlap") + } + } + } +} + +// DelayIfOverlap returns a new JobDescriptor that will delay the job if it overlaps with another job. +func DelayIfOverlap() JobOption { + ch := make(chan struct{}, 1) + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + ch <- struct{}{} + defer func() { + <-ch + }() + return innerRun(ctx) + } + } +} + +// TimeoutIfOverlap returns a new JobDescriptor that will cancel the job's context if the next schedule is due. +func TimeoutIfOverlap() JobOption { + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + if !GetNextSchedule(ctx).IsZero() { + ctx, cancel := context.WithDeadline(ctx, GetNextSchedule(ctx)) + defer cancel() + return innerRun(ctx) + } + return innerRun(ctx) + } + } +} + +// Recover returns a new JobDescriptor that will recover from panics. +func Recover(logger log.Logger) JobOption { + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + logging.WithContext(logger, ctx).Log("msg", "job panicked", "err", r, "stack", debug.Stack()) + } + }() + return innerRun(ctx) + } + } +} diff --git a/cron/options_test.go b/cron/options_test.go new file mode 100644 index 00000000..cf7fad39 --- /dev/null +++ b/cron/options_test.go @@ -0,0 +1,241 @@ +package cron + +import ( + "bytes" + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/DoNewsCode/core/internal/stub" + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/robfig/cron/v3" +) + +type mockScheduler func(now time.Time) time.Time // mockScheduler is a function that returns the next time to run + +func (m mockScheduler) Next(t time.Time) time.Time { + return m(t) +} + +type mockParser struct{} + +func (m mockParser) Parse(spec string) (cron.Schedule, error) { + return mockScheduler(func(now time.Time) time.Time { + return now.Add(time.Millisecond) + }), nil +} + +func TestJobOption(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + logger := log.NewSyncLogger(log.NewLogfmtLogger(&buf)) + hist := stub.Histogram{} + count := stub.Counter{} + metric := NewCronJobMetrics(&hist, &count) + tracer := mocktracer.MockTracer{} + entryCount := 0 + concurrentCount := 0 + var concurrentAccess bool + + for _, ca := range []struct { + name string + stacks []JobOption + job func(context.Context) error + asserts func(t *testing.T) + }{ + { + "name and logging", + []JobOption{ + WithName("test"), + WithLogging(logger), + }, + func(ctx context.Context) error { + return nil + }, + func(t *testing.T) { + t.Log(buf.String()) + if buf.String() == "" { + t.Error("Expected logging output") + } + if strings.Contains(buf.String(), "test") == false { + t.Error("Expected test to be in the log output") + } + buf = bytes.Buffer{} + }, + }, + { + "error and logging", + []JobOption{ + WithLogging(logger), + }, + func(ctx context.Context) error { + return errors.New("test") + }, + func(t *testing.T) { + t.Log(buf.String()) + if buf.String() == "" { + t.Error("Expected logging output") + } + if strings.Contains(buf.String(), "error") == false { + t.Error("Expected error to be in the log output") + } + buf = bytes.Buffer{} + }, + }, + { + "metrics", + []JobOption{ + WithMetrics(metric), + }, + func(ctx context.Context) error { + return nil + }, + func(t *testing.T) { + if hist.ObservedValue == 0 { + t.Error("Expected histogram to be observed") + } + if count.CounterValue > 0 { + t.Error("Expected fail counter to be zero") + } + hist = stub.Histogram{} + count = stub.Counter{} + }, + }, + { + "error and metrics", + []JobOption{ + WithMetrics(metric), + }, + func(ctx context.Context) error { + return errors.New("test") + }, + func(t *testing.T) { + if hist.ObservedValue == 0 { + t.Error("Expected histogram to be observed") + } + if count.CounterValue < 1 { + t.Error("Expected fail counter to be one") + } + hist = stub.Histogram{} + count = stub.Counter{} + }, + }, + { + "tracing", + []JobOption{ + WithTracing(&tracer), + }, + func(ctx context.Context) error { + return nil + }, + func(t *testing.T) { + if len(tracer.FinishedSpans()) < 1 { + t.Error("Expected one span to be finished") + } + tracer = mocktracer.MockTracer{} + }, + }, + { + "error tracing", + []JobOption{ + WithTracing(&tracer), + }, + func(ctx context.Context) error { + return errors.New("test") + }, + func(t *testing.T) { + if len(tracer.FinishedSpans()) < 1 { + t.Error("Expected one span to be finished") + } + if tracer.FinishedSpans()[0].Tags()["error"] != true { + t.Error("Expected error tag to be true") + t.Log(tracer.FinishedSpans()[0].Tags()) + } + tracer = mocktracer.MockTracer{} + }, + }, + { + "panic", + []JobOption{ + Recover(logger), + }, + func(ctx context.Context) error { + panic("to be recovered") + }, + func(t *testing.T) { + if strings.Contains(buf.String(), "to be recovered") == false { + t.Error("Expected panic to be in the log output") + } + buf = bytes.Buffer{} + }, + }, + { + "skip if overlap", + []JobOption{ + SkipIfOverlap(), + }, + func(ctx context.Context) error { + entryCount++ + time.Sleep(5 * time.Millisecond) + return nil + }, + func(t *testing.T) { + if entryCount > 1 { + t.Errorf("expect entry once, got %d", entryCount) + } + entryCount = 0 + }, + }, + { + "delay if overlap", + []JobOption{ + DelayIfOverlap(), + }, + func(ctx context.Context) error { + entryCount++ + concurrentCount++ + if concurrentCount > 1 { + concurrentAccess = true + } + time.Sleep(3 * time.Millisecond) + concurrentCount-- + return nil + }, + func(t *testing.T) { + if entryCount < 3 { + t.Errorf("expect entry at least 3 times, got %d", entryCount) + } + if concurrentAccess { + t.Errorf("conncurrent access not allowed") + } + entryCount = 0 + concurrentCount = 0 + }, + }, + { + "timeout if overlap", + []JobOption{ + TimeoutIfOverlap(), + }, + func(ctx context.Context) error { + <-ctx.Done() + return nil + }, + func(t *testing.T) { + }, + }, + } { + ca := ca + t.Run(ca.name, func(t *testing.T) { + c := New(Config{Parser: mockParser{}}) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) + defer cancel() + c.Add("@every 1ms", ca.job, ca.stacks...) + c.Run(ctx) + ca.asserts(t) + }) + } +} diff --git a/cronopts/example_metrics_test.go b/cronopts/example_metrics_test.go deleted file mode 100644 index 72c73003..00000000 --- a/cronopts/example_metrics_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package cronopts_test - -import ( - "context" - "fmt" - "github.com/DoNewsCode/core" - "github.com/DoNewsCode/core/cronopts" - "github.com/DoNewsCode/core/observability" - "github.com/robfig/cron/v3" - "math/rand" - "time" -) - -type CronModule struct { - metrics *cronopts.CronJobMetrics -} - -func NewCronModule(metrics *cronopts.CronJobMetrics) CronModule { - return CronModule{metrics: metrics.Module("test_module")} -} - -func (c CronModule) ProvideCron(crontab *cron.Cron) { - // Create a new cron job, and measure its execution durations. - crontab.AddJob("* * * * *", c.metrics.Job("test_job").Measure(cron.FuncJob(func() { - fmt.Println("running") - // For 50% chance, the job may fail. Report it to metrics collector. - if rand.Float64() > 0.5 { - c.metrics.Fail() - } - }))) -} - -func Example_cronJobMetrics() { - c := core.Default() - c.Provide(observability.Providers()) - c.AddModuleFunc(NewCronModule) - - ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond) - defer cancel() - - c.Serve(ctx) -} diff --git a/cronopts/jobs/example_test.go b/cronopts/jobs/example_test.go deleted file mode 100644 index ce1b9dd6..00000000 --- a/cronopts/jobs/example_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package jobs_test - -import ( - "context" - "time" - - "github.com/DoNewsCode/core" - "github.com/DoNewsCode/core/cronopts" - "github.com/DoNewsCode/core/cronopts/jobs" - "github.com/DoNewsCode/core/observability" - "github.com/go-kit/log" - "github.com/opentracing/opentracing-go" - "github.com/robfig/cron/v3" -) - -type CronJobModule struct { - mJob jobs.Universal -} - -func (c CronJobModule) ProvideCron(crontab *cron.Cron) { - crontab.AddJob("@every 1s", c.mJob) -} - -// NewCronJobModule creates a new module that provides a cron job with metrics, logging and tracing. -func NewCronJobModule(tracer opentracing.Tracer, metrics *cronopts.CronJobMetrics, logger log.Logger) CronJobModule { - return CronJobModule{ - mJob: jobs.New("cronjob", func(ctx context.Context) error { - return nil - }, jobs.WithMetrics(metrics), jobs.WithTracing(tracer), jobs.WithLogs(logger)), - } -} - -func Example() { - ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) - defer cancel() - - c := core.Default(core.WithInline("log.level", "none")) - c.Provide(observability.Providers()) - c.AddModuleFunc(NewCronJobModule) - c.Serve(ctx) - - // Output: -} diff --git a/cronopts/jobs/universal.go b/cronopts/jobs/universal.go deleted file mode 100644 index 971a4634..00000000 --- a/cronopts/jobs/universal.go +++ /dev/null @@ -1,110 +0,0 @@ -// Package jobs contains a universal job type that implements cron.Job interface. -// It is designed to be used with cron.New() and cron.AddJob() methods. Compared -// to anonymous jobs, this job type supports go idioms like context and error, -// and offers hook point for common observability concerns such as metrics, -// logging and tracing. -package jobs - -import ( - "context" - "fmt" - "time" - - "github.com/DoNewsCode/core/cronopts" - "github.com/DoNewsCode/core/dag" - "github.com/DoNewsCode/core/logging" - "github.com/go-kit/log" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" -) - -// Universal is a generic job that can be used to run any task. It implements the -// cron.Job interface and supports context parameter and error propagation. The -// Name parameter allows common observability concerns such as logging, tracing -// and metrics to take advantage. -type Universal struct { - Name string - Do func(ctx context.Context) error -} - -// New returns a new Universal job. -func New(name string, do func(ctx context.Context) error, wrapper ...func(universal Universal) Universal) Universal { - base := Universal{ - Name: name, - Do: do, - } - for _, w := range wrapper { - base = w(base) - } - return base -} - -// NewFromDAG returns a new Universal job from a DAG. -func NewFromDAG(name string, dag *dag.DAG, wrapper ...func(universal Universal) Universal) Universal { - return New(name, dag.Run, wrapper...) -} - -// Run implements the cron.Job interface. -func (s Universal) Run() { - _ = s.Do(context.Background()) -} - -// WithMetrics returns a new Universal job that will report metrics. -func WithMetrics(metrics *cronopts.CronJobMetrics) func(universal Universal) Universal { - return func(universal Universal) Universal { - return Universal{ - Name: universal.Name, - Do: func(ctx context.Context) error { - start := time.Now() - metrics = metrics.Job(universal.Name) - defer metrics.Observe(time.Since(start)) - err := universal.Do(ctx) - if err != nil { - metrics.Fail() - return err - } - return nil - }, - } - } -} - -// WithLogs returns a new Universal job that will log. -func WithLogs(logger log.Logger) func(universal Universal) Universal { - return func(universal Universal) Universal { - return Universal{ - Name: universal.Name, - Do: func(ctx context.Context) error { - logger = logging.WithContext(logger, ctx) - logger = log.With(logger, "job", universal.Name) - logger.Log("msg", logging.Sprintf("job %s started", universal.Name)) - err := universal.Do(ctx) - if err != nil { - logger.Log("msg", logging.Sprintf("job %s finished with error %s", universal.Name, err)) - return err - } - logger.Log("msg", logging.Sprintf("job %s completed", universal.Name)) - return nil - }, - } - } -} - -// WithTracing returns a new Universal job that will trace. -func WithTracing(tracer opentracing.Tracer) func(universal Universal) Universal { - return func(universal Universal) Universal { - return Universal{ - Name: universal.Name, - Do: func(ctx context.Context) error { - span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, fmt.Sprintf("Job: %s", universal.Name)) - defer span.Finish() - err := universal.Do(ctx) - if err != nil { - ext.Error.Set(span, true) - return err - } - return nil - }, - } - } -} diff --git a/cronopts/jobs/universal_test.go b/cronopts/jobs/universal_test.go deleted file mode 100644 index b3f5d3c4..00000000 --- a/cronopts/jobs/universal_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package jobs - -import ( - "context" - "errors" - "testing" - - "github.com/DoNewsCode/core/cronopts" - "github.com/DoNewsCode/core/dag" - "github.com/DoNewsCode/core/internal/stub" - "github.com/go-kit/log" - "github.com/opentracing/opentracing-go" - "github.com/stretchr/testify/assert" -) - -func TestUniversal_error_propagation(t *testing.T) { - d := dag.New() - d.AddVertex(func(ctx context.Context) error { return errors.New("oops") }) - j := NewFromDAG( - "should return error", - d, - WithMetrics(cronopts.NewCronJobMetrics(&stub.Histogram{}, &stub.Counter{})), - WithTracing(opentracing.NoopTracer{}), - WithLogs(log.NewNopLogger()), - ) - assert.Error(t, j.Do(context.Background())) -} diff --git a/cronopts/metrics_test.go b/cronopts/metrics_test.go deleted file mode 100644 index 188d2400..00000000 --- a/cronopts/metrics_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package cronopts - -import ( - "testing" - "time" - - "github.com/DoNewsCode/core/internal/stub" - "github.com/robfig/cron/v3" - "github.com/stretchr/testify/assert" -) - -func TestMeasure(t *testing.T) { - histogram := &stub.Histogram{} - counter := &stub.Counter{} - metrics := NewCronJobMetrics(histogram, counter) - metrics = metrics.Module("x").Job("y") - Measure(metrics)(cron.FuncJob(func() { - time.Sleep(time.Millisecond) - })).Run() - assert.ElementsMatch(t, histogram.LabelValues, []string{"module", "x", "job", "y"}) - assert.True(t, histogram.ObservedValue > 0) - metrics.Fail() - assert.ElementsMatch(t, counter.LabelValues, []string{"module", "x", "job", "y"}) - assert.True(t, counter.CounterValue == 1) -} diff --git a/deprecated_cronopts/deprecation_test.go b/deprecated_cronopts/deprecation_test.go new file mode 100644 index 00000000..9bdd1a80 --- /dev/null +++ b/deprecated_cronopts/deprecation_test.go @@ -0,0 +1,38 @@ +package cronopts_test + +import ( + "context" + "fmt" + "github.com/DoNewsCode/core" + "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/observability" + deprecatedcron "github.com/robfig/cron/v3" + "testing" + "time" +) + +type CronModule struct{} + +func (module *CronModule) ProvideCron(crontab *deprecatedcron.Cron) { + crontab.AddFunc("* * * * * *", func() { + fmt.Println("Cron job ran") + }) +} + +func Test_deprecation(t *testing.T) { + c := core.Default(core.WithInline("log.level", "none")) + c.Provide(observability.Providers()) + c.Provide( + di.Deps{func() *deprecatedcron.Cron { + return deprecatedcron.New(deprecatedcron.WithSeconds()) + }}, + ) + + c.AddModule(CronModule{}) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + c.Serve(ctx) + // Output: + // Cron job ran +} diff --git a/cronopts/log.go b/deprecated_cronopts/log.go similarity index 85% rename from cronopts/log.go rename to deprecated_cronopts/log.go index e0c7ffe9..4e2683e0 100644 --- a/cronopts/log.go +++ b/deprecated_cronopts/log.go @@ -1,4 +1,4 @@ -// Package cronopts contains the options for cron. +// Package cronopts contains the options for cron. This package is deprecated. Use package cron instead. package cronopts import ( diff --git a/cronopts/log_test.go b/deprecated_cronopts/log_test.go similarity index 100% rename from cronopts/log_test.go rename to deprecated_cronopts/log_test.go diff --git a/go.mod b/go.mod index 6b7bc40a..93d5a14f 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,8 @@ require ( gorm.io/gorm v1.21.10 ) +require github.com/felixge/httpsnoop v1.0.1 + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect @@ -58,7 +60,6 @@ require ( github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/felixge/httpsnoop v1.0.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-sql-driver/mysql v1.5.0 // indirect github.com/go-stack/stack v1.8.0 // indirect diff --git a/internal/stub/metrics.go b/internal/stub/metrics.go index 9e4ba209..5729bb4c 100644 --- a/internal/stub/metrics.go +++ b/internal/stub/metrics.go @@ -1,6 +1,10 @@ package stub -import "github.com/go-kit/kit/metrics" +import ( + "sync" + + "github.com/go-kit/kit/metrics" +) // LabelValues contains the set of labels and their corresponding values. type LabelValues []string @@ -17,56 +21,73 @@ func (l LabelValues) Label(name string) string { // Histogram is a stub implementation of the go-kit metrics.Histogram interface. type Histogram struct { + sync.Mutex LabelValues LabelValues ObservedValue float64 } // With returns a new Histogram with the given label values. func (h *Histogram) With(labelValues ...string) metrics.Histogram { + h.Mutex.Lock() + defer h.Mutex.Unlock() h.LabelValues = labelValues return h } // Observe records the given value. func (h *Histogram) Observe(value float64) { + h.Mutex.Lock() + defer h.Mutex.Unlock() h.ObservedValue = value } // Gauge is a stub implementation of the go-kit metrics.Gauge interface. type Gauge struct { + sync.Mutex LabelValues []string GaugeValue float64 } // With returns a new Gauge with the given label values. func (g *Gauge) With(labelValues ...string) metrics.Gauge { + g.Mutex.Lock() + defer g.Mutex.Unlock() g.LabelValues = labelValues return g } // Set sets the gauge value. func (g *Gauge) Set(value float64) { + g.Mutex.Lock() + defer g.Mutex.Unlock() g.GaugeValue = value } // Add adds the given value to the gauge. func (g *Gauge) Add(delta float64) { + g.Mutex.Lock() + defer g.Mutex.Unlock() g.GaugeValue = g.GaugeValue + delta } // Counter is a stub implementation of the go-kit metrics.Counter interface. type Counter struct { + sync.Mutex LabelValues []string CounterValue float64 } // With returns a new Counter with the given label values. func (c *Counter) With(labelValues ...string) metrics.Counter { + c.Mutex.Lock() + defer c.Mutex.Unlock() c.LabelValues = labelValues return c } // Add adds the given value to the counter. func (c *Counter) Add(delta float64) { + c.Mutex.Lock() + defer c.Mutex.Unlock() c.CounterValue = c.CounterValue + delta } diff --git a/module_contract.go b/module_contract.go new file mode 100644 index 00000000..357e9bb1 --- /dev/null +++ b/module_contract.go @@ -0,0 +1,47 @@ +package core + +import ( + cron2 "github.com/DoNewsCode/core/cron" + "github.com/gorilla/mux" + "github.com/oklog/run" + "github.com/robfig/cron/v3" + "github.com/spf13/cobra" + "google.golang.org/grpc" +) + +// DeprecatedCronProvider provides cron jobs. +// Deprecated: CronProvider is deprecated. Use CronProvider instead +type DeprecatedCronProvider interface { + ProvideCron(crontab *cron.Cron) +} + +// CronProvider provides cron jobs. +type CronProvider interface { + ProvideCron(cron *cron2.Cron) +} + +// CommandProvider provides cobra.Command. +type CommandProvider interface { + ProvideCommand(command *cobra.Command) +} + +// HTTPProvider provides http services. +type HTTPProvider interface { + ProvideHTTP(router *mux.Router) +} + +// GRPCProvider provides gRPC services. +type GRPCProvider interface { + ProvideGRPC(server *grpc.Server) +} + +// CloserProvider provides a shutdown function that will be called when service exits. +type CloserProvider interface { + ProvideCloser() +} + +// RunProvider provides a runnable actor. Use it to register any server-like +// actions. For example, kafka consumer can be started here. +type RunProvider interface { + ProvideRunGroup(group *run.Group) +} diff --git a/observability/metrics.go b/observability/metrics.go index 8577fb7e..6649b483 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -1,7 +1,7 @@ package observability import ( - "github.com/DoNewsCode/core/cronopts" + "github.com/DoNewsCode/core/cron" "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/otgorm" "github.com/DoNewsCode/core/otkafka" @@ -54,23 +54,23 @@ func ProvideGRPCRequestDurationSeconds(in MetricsIn) *srvgrpc.RequestDurationSec return srvgrpc.NewRequestDurationSeconds(prometheus.NewHistogram(grpc)) } -// ProvideCronJobMetrics returns a *cronopts.CronJobMetrics that is designed to +// ProvideCronJobMetrics returns a *cron.CronJobMetrics that is designed to // measure cron job metrics. The returned metrics can be used like this: -// metrics := cronopts.NewCronJobMetrics(...) +// metrics := cron.NewCronJobMetrics(...) // job := cron.NewChain( // cron.Recover(logger), -// cronopts.Measure(metrics), +// cron.Measure(metrics), // ).Then(job) -func ProvideCronJobMetrics(in MetricsIn) *cronopts.CronJobMetrics { +func ProvideCronJobMetrics(in MetricsIn) *cron.CronJobMetrics { histogram := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{ Name: "cronjob_duration_seconds", Help: "Total time spent running cron jobs.", - }, []string{"module", "job"}) + }, []string{"module", "job", "schedule"}) counter := stdprometheus.NewCounterVec(stdprometheus.CounterOpts{ Name: "cronjob_failures_total", Help: "Total number of cron jobs that failed.", - }, []string{"module", "job"}) + }, []string{"module", "job", "schedule"}) if in.Registerer == nil { in.Registerer = stdprometheus.DefaultRegisterer @@ -79,7 +79,7 @@ func ProvideCronJobMetrics(in MetricsIn) *cronopts.CronJobMetrics { in.Registerer.MustRegister(histogram) in.Registerer.MustRegister(counter) - return cronopts.NewCronJobMetrics(prometheus.NewHistogram(histogram), prometheus.NewCounter(counter)) + return cron.NewCronJobMetrics(prometheus.NewHistogram(histogram), prometheus.NewCounter(counter)) } // ProvideGORMMetrics returns a *otgorm.Gauges that measures the connection info diff --git a/observability/observability_test.go b/observability/observability_test.go index acccfd8d..bff7aa93 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -1,13 +1,13 @@ package observability import ( - "github.com/DoNewsCode/core/cronopts" "os" "strings" "testing" "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/config" + "github.com/DoNewsCode/core/cron" "github.com/DoNewsCode/core/otgorm" "github.com/DoNewsCode/core/otkafka" "github.com/DoNewsCode/core/otredis" @@ -87,7 +87,7 @@ func TestProvideCronjobMetrics(t *testing.T) { c := core.New() c.ProvideEssentials() c.Provide(Providers()) - c.Invoke(func(metrics *cronopts.CronJobMetrics) { + c.Invoke(func(metrics *cron.CronJobMetrics) { metrics.Fail() }) } diff --git a/otkafka/integration_test.go b/otkafka/integration_test.go index 98256cad..8e5a1e51 100644 --- a/otkafka/integration_test.go +++ b/otkafka/integration_test.go @@ -333,7 +333,11 @@ func TestModule_hotReload(t *testing.T) { c.AddModuleFunc(config.New) var group run.Group - c.ApplyRunGroup(&group) + for _, m := range c.Modules() { + if p, ok := m.(core.RunProvider); ok { + p.ProvideRunGroup(&group) + } + } group.Add(func() error { <-ctx.Done() return ctx.Err() diff --git a/serve.go b/serve.go index c737b497..570c0063 100644 --- a/serve.go +++ b/serve.go @@ -9,9 +9,9 @@ import ( "os/signal" "syscall" - "github.com/DoNewsCode/core/container" "github.com/DoNewsCode/core/contract" - "github.com/DoNewsCode/core/cronopts" + cron "github.com/DoNewsCode/core/cron" + "github.com/DoNewsCode/core/deprecated_cronopts" "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/logging" "github.com/go-kit/log" @@ -19,7 +19,7 @@ import ( "github.com/gorilla/mux" "github.com/oklog/run" "github.com/pkg/errors" - "github.com/robfig/cron/v3" + deprecatedcron "github.com/robfig/cron/v3" "github.com/spf13/cobra" "google.golang.org/grpc" ) @@ -27,13 +27,14 @@ import ( type serveIn struct { di.In - Dispatcher contract.Dispatcher - Config contract.ConfigAccessor - Logger log.Logger - Container contract.Container - HTTPServer *http.Server `optional:"true"` - GRPCServer *grpc.Server `optional:"true"` - Cron *cron.Cron `optional:"true"` + Dispatcher contract.Dispatcher + Config contract.ConfigAccessor + Logger log.Logger + Container contract.Container + HTTPServer *http.Server `optional:"true"` + GRPCServer *grpc.Server `optional:"true"` + DeprecatedCron *deprecatedcron.Cron `optional:"true"` + Cron *cron.Cron `optional:"true"` } func NewServeModule(in serveIn) serveModule { @@ -42,7 +43,7 @@ func NewServeModule(in serveIn) serveModule { } } -var _ container.CommandProvider = (*serveModule)(nil) +var _ CommandProvider = (*serveModule)(nil) type serveModule struct { in serveIn @@ -63,7 +64,7 @@ func (s serveIn) httpServe(ctx context.Context, logger logging.LevelLogger) (fun s.HTTPServer = &http.Server{} } router := mux.NewRouter() - s.Container.ApplyRouter(router) + applyRouter(s.Container, router) router.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { tpl, _ := route.GetPathTemplate() @@ -104,7 +105,7 @@ func (s serveIn) grpcServe(ctx context.Context, logger logging.LevelLogger) (fun if s.GRPCServer == nil { s.GRPCServer = grpc.NewServer() } - s.Container.ApplyGRPCServer(s.GRPCServer) + applyGRPCServer(s.Container, s.GRPCServer) for module, info := range s.GRPCServer.GetServiceInfo() { for _, method := range info.Methods { @@ -141,18 +142,43 @@ func (s serveIn) cronServe(ctx context.Context, logger logging.LevelLogger) (fun return nil, nil, nil } if s.Cron == nil { + s.Cron = cron.New(cron.Config{GlobalOptions: []cron.JobOption{cron.WithLogging(log.With(s.Logger, "tag", "cron"))}}) + } + applyCron(s.Container, s.Cron) + if len(s.Cron.Descriptors()) > 0 { + ctx, cancel := context.WithCancel(ctx) + return func() error { + logger.Infof("cron runner started") + return s.Cron.Run(ctx) + }, func(err error) { + cancel() + }, nil + } + + return nil, nil, nil +} + +func (s serveIn) cronServeDeprecated(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) { + if s.Config.Bool("cron.disable") { + return nil, nil, nil + } + if s.DeprecatedCron == nil { logger := log.With(s.Logger, "tag", "cron") - s.Cron = cron.New(cron.WithLogger(cronopts.CronLogAdapter{Logging: logger})) + s.DeprecatedCron = deprecatedcron.New(deprecatedcron.WithLogger(cronopts.CronLogAdapter{Logging: logger})) + } + applyDeprecatedCron(s.Container, s.DeprecatedCron) + if len(s.DeprecatedCron.Entries()) > 0 { + return func() error { + logger.Infof("cron runner started") + logger.Warn("Directly using github.com/robfig/cron/v3 is deprecated. Please migrate to github.com/DoNewsCode/core/cron") + s.DeprecatedCron.Run() + return nil + }, func(err error) { + <-s.DeprecatedCron.Stop().Done() + }, nil } - s.Container.ApplyCron(s.Cron) - return func() error { - logger.Infof("cron runner started") - s.Cron.Run() - return nil - }, func(err error) { - <-s.Cron.Stop().Done() - }, nil + return nil, nil, nil } func (s serveIn) signalWatch(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) { @@ -193,6 +219,7 @@ func newServeCmd(s serveIn) *cobra.Command { s.httpServe, s.grpcServe, s.cronServe, + s.cronServeDeprecated, s.signalWatch, } @@ -208,7 +235,7 @@ func newServeCmd(s serveIn) *cobra.Command { } // Additional run groups - s.Container.ApplyRunGroup(&g) + applyRunGroup(s.Container, &g) if err := g.Run(); err != nil { return err @@ -220,3 +247,48 @@ func newServeCmd(s serveIn) *cobra.Command { } return serveCmd } + +func applyRouter(ctn contract.Container, router *mux.Router) { + modules := ctn.Modules() + for i := range modules { + if p, ok := modules[i].(HTTPProvider); ok { + p.ProvideHTTP(router) + } + } +} + +func applyGRPCServer(ctn contract.Container, server *grpc.Server) { + modules := ctn.Modules() + for i := range modules { + if p, ok := modules[i].(GRPCProvider); ok { + p.ProvideGRPC(server) + } + } +} + +func applyRunGroup(ctn contract.Container, group *run.Group) { + modules := ctn.Modules() + for i := range modules { + if p, ok := modules[i].(RunProvider); ok { + p.ProvideRunGroup(group) + } + } +} + +func applyCron(ctn contract.Container, cron *cron.Cron) { + modules := ctn.Modules() + for i := range modules { + if p, ok := modules[i].(CronProvider); ok { + p.ProvideCron(cron) + } + } +} + +func applyDeprecatedCron(ctn contract.Container, crontab *deprecatedcron.Cron) { + modules := ctn.Modules() + for i := range modules { + if p, ok := modules[i].(DeprecatedCronProvider); ok { + p.ProvideCron(crontab) + } + } +} diff --git a/serve_test.go b/serve_test.go index 87b0e26d..17ee9952 100644 --- a/serve_test.go +++ b/serve_test.go @@ -6,12 +6,17 @@ import ( "errors" "os" "runtime" + "sync/atomic" "testing" "time" + "github.com/DoNewsCode/core/cron" + "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/logging" + "github.com/DoNewsCode/core/observability" "github.com/go-kit/log" "github.com/oklog/run" + deprecatedcron "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" ) @@ -56,3 +61,47 @@ func TestServeIn_signalWatch(t *testing.T) { assert.Contains(t, buf.String(), "context canceled") }) } + +type OldCronModule struct { + CanRun uint32 +} + +func (module *OldCronModule) ProvideCron(crontab *deprecatedcron.Cron) { + crontab.AddFunc("* * * * * *", func() { + atomic.StoreUint32(&module.CanRun, 1) + }) +} + +type NewCronModule struct { + CanRun uint32 +} + +func (module *NewCronModule) ProvideCron(crontab *cron.Cron) { + crontab.Add("* * * * * *", func(ctx context.Context) error { + atomic.StoreUint32(&module.CanRun, 1) + return nil + }) +} + +func TestServeIn_cron_deprecation(t *testing.T) { + c := Default(WithInline("grpc.disable", true), WithInline("http.disable", true)) + c.Provide(observability.Providers()) + c.Provide( + di.Deps{func() *deprecatedcron.Cron { + return deprecatedcron.New(deprecatedcron.WithSeconds()) + }, func() *cron.Cron { + return cron.New(cron.Config{EnableSeconds: true}) + }}, + ) + + mOld := OldCronModule{} + mNew := NewCronModule{} + c.AddModule(&mOld) + c.AddModule(&mNew) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + c.Serve(ctx) + assert.True(t, mOld.CanRun == 1) + assert.True(t, mNew.CanRun == 1) +} diff --git a/srvhttp/example_test.go b/srvhttp/example_test.go index cb7f0995..999c5bbd 100644 --- a/srvhttp/example_test.go +++ b/srvhttp/example_test.go @@ -23,7 +23,12 @@ func Example_modules() { c.AddModule(srvhttp.DebugModule{}) router := mux.NewRouter() - c.ApplyRouter(router) + modules := c.Modules() + for i := range modules { + if m, ok := modules[i].(core.HTTPProvider); ok { + m.ProvideHTTP(router) + } + } http.ListenAndServe(":8080", router) }