From ea6fc2bf3c2cf9d08c77f68f998596e683acb3e3 Mon Sep 17 00:00:00 2001 From: Reasno Date: Fri, 14 Jan 2022 10:45:20 +0800 Subject: [PATCH 01/19] feat(observability): capture transport status BREAKING CHANGE: most Observe() methods now take a time.Duration instead of float64. --- cronopts/metrics.go | 4 +-- internal/stub/metrics.go | 13 ++++++++- observability/metrics.go | 4 +-- srvgrpc/metrics.go | 38 +++++++++++++++++++++++--- srvgrpc/metrics_test.go | 20 +++++++++++--- srvhttp/metrics.go | 59 ++++++++++++++++++++++++++++------------ srvhttp/metrics_test.go | 9 +++--- 7 files changed, 113 insertions(+), 34 deletions(-) diff --git a/cronopts/metrics.go b/cronopts/metrics.go index 887683b1..c395f190 100644 --- a/cronopts/metrics.go +++ b/cronopts/metrics.go @@ -53,8 +53,8 @@ func (c *CronJobMetrics) Fail() { } // Observe records the duration of the job. -func (c *CronJobMetrics) Observe(value float64) { - c.cronJobDurationSeconds.With("module", c.module, "job", c.job).Observe(value) +func (c *CronJobMetrics) Observe(duration time.Duration) { + c.cronJobDurationSeconds.With("module", c.module, "job", c.job).Observe(duration.Seconds()) } // Measure wraps the given job and records the duration and success. diff --git a/internal/stub/metrics.go b/internal/stub/metrics.go index c3536e91..e4e3725b 100644 --- a/internal/stub/metrics.go +++ b/internal/stub/metrics.go @@ -2,9 +2,20 @@ package stub import "github.com/go-kit/kit/metrics" +type LabelValues []string + +func (l LabelValues) Label(name string) string { + for i := 0; i < len(l); i += 2 { + if l[i] == name { + return l[i+1] + } + } + return "" +} + // Histogram is a stub implementation of the go-kit metrics.Histogram interface. type Histogram struct { - LabelValues []string + LabelValues LabelValues ObservedValue float64 } diff --git a/observability/metrics.go b/observability/metrics.go index 325d9575..8577fb7e 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -27,7 +27,7 @@ func ProvideHTTPRequestDurationSeconds(in MetricsIn) *srvhttp.RequestDurationSec http := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{ Name: "http_request_duration_seconds", Help: "Total time spent serving requests.", - }, []string{"module", "service", "route"}) + }, []string{"module", "service", "route", "status"}) if in.Registerer == nil { in.Registerer = stdprometheus.DefaultRegisterer @@ -44,7 +44,7 @@ func ProvideGRPCRequestDurationSeconds(in MetricsIn) *srvgrpc.RequestDurationSec grpc := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{ Name: "grpc_request_duration_seconds", Help: "Total time spent serving requests.", - }, []string{"module", "service", "route"}) + }, []string{"module", "service", "route", "status"}) if in.Registerer == nil { in.Registerer = stdprometheus.DefaultRegisterer diff --git a/srvgrpc/metrics.go b/srvgrpc/metrics.go index f721e9c9..67e15033 100644 --- a/srvgrpc/metrics.go +++ b/srvgrpc/metrics.go @@ -2,11 +2,13 @@ package srvgrpc import ( "context" + "strconv" "time" "github.com/go-kit/kit/metrics" "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" + "google.golang.org/grpc/status" ) // MetricsModule exposes prometheus metrics. Here only provides a simple call, @@ -30,9 +32,13 @@ func Metrics(metrics *RequestDurationSeconds) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { start := time.Now() defer func() { - metrics.Route(info.FullMethod).Observe(time.Since(start).Seconds()) + if s, ok := status.FromError(err); ok { + metrics = metrics.Status(int(s.Code())) + } + metrics.Route(info.FullMethod).Observe(time.Since(start)) }() - return handler(ctx, req) + resp, err = handler(ctx, req) + return resp, err } } @@ -48,6 +54,7 @@ type RequestDurationSeconds struct { module string service string route string + status int } // NewRequestDurationSeconds returns a new RequestDurationSeconds instance. @@ -57,6 +64,7 @@ func NewRequestDurationSeconds(histogram metrics.Histogram) *RequestDurationSeco module: "unknown", service: "unknown", route: "unknown", + status: 0, } } @@ -77,6 +85,7 @@ func (r *RequestDurationSeconds) Service(service string) *RequestDurationSeconds module: r.module, service: service, route: r.route, + status: r.status, } } @@ -87,10 +96,31 @@ func (r *RequestDurationSeconds) Route(route string) *RequestDurationSeconds { module: r.module, service: r.service, route: route, + status: r.status, + } +} + +// Status specifies the status label for RequestDurationSeconds. +func (r *RequestDurationSeconds) Status(status int) *RequestDurationSeconds { + return &RequestDurationSeconds{ + histogram: r.histogram, + module: r.module, + service: r.service, + route: r.route, + status: status, } } // Observe records the time taken to process the request. -func (r RequestDurationSeconds) Observe(seconds float64) { - r.histogram.With("module", r.module, "service", r.service, "route", r.route).Observe(seconds) +func (r RequestDurationSeconds) Observe(duration time.Duration) { + r.histogram.With( + "module", + r.module, + "service", + r.service, + "route", + r.route, + "status", + strconv.Itoa(r.status), + ).Observe(duration.Seconds()) } diff --git a/srvgrpc/metrics_test.go b/srvgrpc/metrics_test.go index 974a1a79..aafc6167 100644 --- a/srvgrpc/metrics_test.go +++ b/srvgrpc/metrics_test.go @@ -2,10 +2,12 @@ package srvgrpc import ( "context" - "github.com/DoNewsCode/core/internal/stub" "testing" "time" + "github.com/DoNewsCode/core/internal/stub" + "google.golang.org/grpc/status" + "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) @@ -13,11 +15,11 @@ import ( func TestRequestDurationSeconds(t *testing.T) { histogram := &stub.Histogram{} rds := NewRequestDurationSeconds(histogram) - rds = rds.Module("m").Service("s").Route("r") - rds.Observe(5) + rds = rds.Module("m").Service("s").Status(1).Route("r") + rds.Observe(5 * time.Second) assert.Equal(t, 5.0, histogram.ObservedValue) - assert.ElementsMatch(t, []string{"module", "m", "service", "s", "route", "r"}, histogram.LabelValues) + assert.ElementsMatch(t, []string{"module", "m", "service", "s", "route", "r", "status", "1"}, histogram.LabelValues) f := grpc.UnaryHandler(func(ctx context.Context, req interface{}) (interface{}, error) { time.Sleep(time.Millisecond) @@ -26,3 +28,13 @@ func TestRequestDurationSeconds(t *testing.T) { _, _ = Metrics(rds)(context.Background(), nil, &grpc.UnaryServerInfo{FullMethod: "/"}, f) assert.GreaterOrEqual(t, 1.0, histogram.ObservedValue) } + +func TestMetrics(t *testing.T) { + handler := grpc.UnaryHandler(func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, status.Error(2, "error") + }) + histogram := &stub.Histogram{} + rds := NewRequestDurationSeconds(histogram) + Metrics(rds)(context.Background(), nil, &grpc.UnaryServerInfo{FullMethod: "/"}, handler) + assert.Equal(t, "2", histogram.LabelValues.Label("status")) +} diff --git a/srvhttp/metrics.go b/srvhttp/metrics.go index c9e84288..9b59065d 100644 --- a/srvhttp/metrics.go +++ b/srvhttp/metrics.go @@ -2,8 +2,10 @@ package srvhttp import ( "net/http" + "strconv" "time" + "github.com/felixge/httpsnoop" "github.com/go-kit/kit/metrics" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -22,21 +24,19 @@ func (m MetricsModule) ProvideHTTP(router *mux.Router) { func Metrics(metrics *RequestDurationSeconds) func(handler http.Handler) http.Handler { return func(handler http.Handler) http.Handler { return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - start := time.Now() - defer func() { - route := mux.CurrentRoute(request) - if route == nil { - metrics.Route("").Observe(time.Since(start).Seconds()) - return - } - path, err := route.GetPathTemplate() - if err != nil { - metrics.Route("").Observe(time.Since(start).Seconds()) - return - } - metrics.Route(path).Observe(time.Since(start).Seconds()) - }() - handler.ServeHTTP(writer, request) + collection := httpsnoop.CaptureMetrics(handler, writer, request) + metrics = metrics.Status(collection.Code) + route := mux.CurrentRoute(request) + if route == nil { + metrics.Route("").Observe(collection.Duration) + return + } + path, err := route.GetPathTemplate() + if err != nil { + metrics.Route("").Observe(collection.Duration) + return + } + metrics.Route(path).Observe(collection.Duration) }) } } @@ -53,6 +53,7 @@ type RequestDurationSeconds struct { module string service string route string + status int } // NewRequestDurationSeconds returns a new RequestDurationSeconds. The default @@ -63,6 +64,7 @@ func NewRequestDurationSeconds(histogram metrics.Histogram) *RequestDurationSeco module: "unknown", service: "unknown", route: "unknown", + status: 0, } } @@ -73,6 +75,7 @@ func (r *RequestDurationSeconds) Module(module string) *RequestDurationSeconds { module: module, service: r.service, route: r.route, + status: r.status, } } @@ -83,6 +86,7 @@ func (r *RequestDurationSeconds) Service(service string) *RequestDurationSeconds module: r.module, service: service, route: r.route, + status: r.status, } } @@ -93,10 +97,31 @@ func (r *RequestDurationSeconds) Route(route string) *RequestDurationSeconds { module: r.module, service: r.service, route: route, + status: r.status, + } +} + +// Status specifies the status label for RequestDurationSeconds. +func (r *RequestDurationSeconds) Status(status int) *RequestDurationSeconds { + return &RequestDurationSeconds{ + histogram: r.histogram, + module: r.module, + service: r.service, + route: r.route, + status: status, } } // Observe records the time taken to process the request. -func (r *RequestDurationSeconds) Observe(seconds float64) { - r.histogram.With("module", r.module, "service", r.service, "route", r.route).Observe(seconds) +func (r *RequestDurationSeconds) Observe(duration time.Duration) { + r.histogram.With( + "module", + r.module, + "service", + r.service, + "route", + r.route, + "status", + strconv.Itoa(r.status), + ).Observe(duration.Seconds()) } diff --git a/srvhttp/metrics_test.go b/srvhttp/metrics_test.go index 646ab306..d18273a3 100644 --- a/srvhttp/metrics_test.go +++ b/srvhttp/metrics_test.go @@ -1,23 +1,24 @@ package srvhttp import ( - "github.com/DoNewsCode/core/internal/stub" "net/http" "net/http/httptest" "testing" "time" + "github.com/DoNewsCode/core/internal/stub" + "github.com/stretchr/testify/assert" ) func TestRequestDurationSeconds(t *testing.T) { histogram := &stub.Histogram{} rds := NewRequestDurationSeconds(histogram) - rds = rds.Module("m").Service("s").Route("r") - rds.Observe(5) + rds = rds.Module("m").Service("s").Route("r").Status(8) + rds.Observe(5 * time.Second) assert.Equal(t, 5.0, histogram.ObservedValue) - assert.ElementsMatch(t, []string{"module", "m", "service", "s", "route", "r"}, histogram.LabelValues) + assert.ElementsMatch(t, []string{"module", "m", "service", "s", "route", "r", "status", "8"}, histogram.LabelValues) f := http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { time.Sleep(time.Millisecond) From cc2eaa0a84997e70f04dd1090e714a6e3907eaac Mon Sep 17 00:00:00 2001 From: Reasno Date: Fri, 14 Jan 2022 20:18:10 +0800 Subject: [PATCH 02/19] wip: new cron package --- cron/cron.go | 191 +++++++++++++++++++++++++++++++++++++++++++++ cron/cron_test.go | 79 +++++++++++++++++++ cron/helper.go | 31 ++++++++ cron/middleware.go | 85 ++++++++++++++++++++ cron/options.go | 39 +++++++++ 5 files changed, 425 insertions(+) create mode 100644 cron/cron.go create mode 100644 cron/cron_test.go create mode 100644 cron/helper.go create mode 100644 cron/middleware.go create mode 100644 cron/options.go diff --git a/cron/cron.go b/cron/cron.go new file mode 100644 index 00000000..d6a73d88 --- /dev/null +++ b/cron/cron.go @@ -0,0 +1,191 @@ +package cron + +import ( + "container/heap" + "context" + "fmt" + "sync" + "time" + + "github.com/robfig/cron/v3" +) + +type Cron struct { + parser cron.ScheduleParser + lock *sync.Cond + jobDescriptors jobDescriptors + globalMiddleware []JobMiddleware + location *time.Location + nextID int + quitWaiter sync.WaitGroup + baseContext context.Context + baseContextCancel func() +} + +func New(options ...Option) *Cron { + c := &Cron{ + parser: cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor), + location: time.Local, + nextID: 1, + lock: sync.NewCond(&sync.Mutex{}), + baseContext: context.Background(), + } + for _, f := range options { + f(c) + } + return c +} + +func (c *Cron) Add(spec string, runner func(ctx context.Context) error, middleware ...JobMiddleware) (JobID, error) { + schedule, err := c.parser.Parse(spec) + if err != nil { + return 0, err + } + + jobDescriptor := JobDescriptor{ + RawSpec: spec, + Run: runner, + Schedule: schedule, + next: schedule.Next(c.now()), + } + + middleware = append(middleware, c.globalMiddleware...) + + for i := len(middleware) - 1; i > 0; i-- { + jobDescriptor = middleware[i](jobDescriptor) + } + + c.lock.L.Lock() + defer c.lock.L.Unlock() + defer c.lock.Broadcast() + + jobDescriptor.ID = JobID(c.nextID) + c.nextID++ + + if jobDescriptor.Name == "" { + jobDescriptor.Name = fmt.Sprintf("job-%d", jobDescriptor.ID) + } + + heap.Push(&c.jobDescriptors, &jobDescriptor) + return jobDescriptor.ID, nil +} + +func (c *Cron) Remove(id JobID) { + c.lock.L.Lock() + defer c.lock.L.Unlock() + + for i, descriptor := range c.jobDescriptors { + if descriptor.ID == id { + heap.Remove(&c.jobDescriptors, i) + } + } +} + +func (c *Cron) Run(ctx context.Context) error { + defer c.quitWaiter.Wait() + + c.lock.L.Lock() + now := c.now() + for _, descriptor := range c.jobDescriptors { + descriptor.next = descriptor.Schedule.Next(now) + } + heap.Init(&c.jobDescriptors) + c.lock.L.Unlock() + + var once sync.Once + for { + c.lock.L.Lock() + // Determine the next entry to run. + for c.jobDescriptors.Len() == 0 || c.jobDescriptors[0].next.IsZero() { + c.broadcastAtDeadlineOnce(ctx, &once) + select { + case <-ctx.Done(): + c.lock.L.Unlock() + return ctx.Err() + default: + c.lock.Wait() + } + } + gap := c.jobDescriptors[0].next.Sub(now) + c.lock.L.Unlock() + + timer := time.NewTimer(gap) + + select { + case now = <-timer.C: + c.lock.L.Lock() + for { + if c.jobDescriptors[0].next.After(now) || c.jobDescriptors[0].next.IsZero() { + break + } + descriptor := heap.Pop(&c.jobDescriptors) + + descriptor.(*JobDescriptor).prev = descriptor.(*JobDescriptor).next + descriptor.(*JobDescriptor).next = descriptor.(*JobDescriptor).Schedule.Next(now) + heap.Push(&c.jobDescriptors, descriptor) + + var innerCtx context.Context + innerCtx = context.WithValue(ctx, prevContextKey, descriptor.(*JobDescriptor).prev) + innerCtx = context.WithValue(innerCtx, nextContextKey, descriptor.(*JobDescriptor).next) + + c.quitWaiter.Add(1) + go func() { + defer c.quitWaiter.Done() + descriptor.(*JobDescriptor).Run(innerCtx) + }() + } + c.lock.L.Unlock() + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + } + } +} + +func (c *Cron) now() time.Time { + return time.Now().In(c.location) +} + +func (c *Cron) broadcastAtDeadlineOnce(ctx context.Context, once *sync.Once) { + once.Do(func() { + go func() { + <-ctx.Done() + c.lock.Broadcast() + }() + }) +} + +type jobDescriptors []*JobDescriptor + +func (j *jobDescriptors) Len() int { + return len(*j) +} + +func (j *jobDescriptors) Less(i, k int) bool { + return (*j)[i].next.Before((*j)[k].next) +} + +func (j *jobDescriptors) Swap(i, k int) { + (*j)[i], (*j)[k] = (*j)[k], (*j)[i] +} + +func (j *jobDescriptors) Push(x interface{}) { + *j = append(*j, x.(*JobDescriptor)) +} + +func (j *jobDescriptors) Pop() (v interface{}) { + *j, v = (*j)[:j.Len()-1], (*j)[j.Len()-1] + return v +} + +type JobID int + +type JobDescriptor struct { + ID JobID + Name string + RawSpec string + Schedule cron.Schedule + next time.Time + prev time.Time + Run func(ctx context.Context) error +} diff --git a/cron/cron_test.go b/cron/cron_test.go new file mode 100644 index 00000000..536c100a --- /dev/null +++ b/cron/cron_test.go @@ -0,0 +1,79 @@ +package cron + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestCron_sort(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + defer cancel() + c := New(WithSeconds()) + var i int32 + c.Add("3 * * * * *", func(ctx context.Context) error { + assert.Equal(t, 2, atomic.SwapInt32(&i, 3)) + return nil + }) + c.Add("1 * * * * *", func(ctx context.Context) error { + assert.Equal(t, 0, atomic.SwapInt32(&i, 1)) + return nil + }) + c.Add("2 * * * * *", func(ctx context.Context) error { + assert.Equal(t, 0, atomic.SwapInt32(&i, 2)) + return nil + }) + c.Run(ctx) +} + +func TestCron_no_job(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + c := New() + c.Run(ctx) +} + +func TestCron_late_job(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + c := New(WithSeconds()) + go c.Run(ctx) + time.Sleep(time.Millisecond) + ch := make(chan struct{}) + c.Add("* * * * * *", func(ctx context.Context) error { + ch <- struct{}{} + return nil + }) + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Fatal("timeout") + } +} + +func TestCron_remove_job(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + c := New(WithSeconds()) + go c.Run(ctx) + + ch := make(chan struct{}) + id, _ := c.Add("* * * * * *", func(ctx context.Context) error { + ch <- struct{}{} + return nil + }) + c.Remove(id) + select { + case <-ch: + t.Fatal("should not be called") + case <-time.After(2 * time.Second): + } +} diff --git a/cron/helper.go b/cron/helper.go new file mode 100644 index 00000000..e7fdde9c --- /dev/null +++ b/cron/helper.go @@ -0,0 +1,31 @@ +package cron + +import ( + "context" + "time" +) + +var ( + prevContextKey = struct{}{} + nextContextKey = struct{}{} +) + +func GetCurrentSchedule(ctx context.Context) time.Time { + if ctx == nil { + return time.Time{} + } + if t, ok := ctx.Value(prevContextKey).(time.Time); ok { + return t + } + return time.Time{} +} + +func GetNextSchedule(ctx context.Context) time.Time { + if ctx == nil { + return time.Time{} + } + if t, ok := ctx.Value(nextContextKey).(time.Time); ok { + return t + } + return time.Time{} +} diff --git a/cron/middleware.go b/cron/middleware.go new file mode 100644 index 00000000..120aa435 --- /dev/null +++ b/cron/middleware.go @@ -0,0 +1,85 @@ +package cron + +import ( + "context" + "fmt" + "time" + + "github.com/DoNewsCode/core/cronopts" + "github.com/DoNewsCode/core/logging" + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/robfig/cron/v3" +) + +type JobMiddleware func(descriptors JobDescriptor) JobDescriptor + +func SetName(name string) JobMiddleware { + return func(descriptors JobDescriptor) JobDescriptor { + descriptors.Name = name + return descriptors + } +} + +func ReplaceSchedule(schedule cron.Schedule) JobMiddleware { + return func(descriptors JobDescriptor) JobDescriptor { + descriptors.Schedule = schedule + return descriptors + } +} + +// WrapMetrics returns a new JobDescriptor that will report metrics. +func WrapMetrics(metrics *cronopts.CronJobMetrics) JobMiddleware { + return func(descriptor JobDescriptor) JobDescriptor { + descriptor.Run = func(ctx context.Context) error { + start := time.Now() + metrics = metrics.Job(descriptor.Name) + defer metrics.Observe(time.Since(start)) + err := descriptor.Run(ctx) + if err != nil { + metrics.Fail() + return err + } + return nil + } + return descriptor + } +} + +// WrapLogs returns a new Universal job that will log. +func WrapLogs(logger log.Logger) JobMiddleware { + return func(descriptor JobDescriptor) JobDescriptor { + descriptor.Run = func(ctx context.Context) error { + logger = logging.WithContext(logger, ctx) + logger = log.With(logger, "job", descriptor.Name, "schedule", descriptor.RawSpec) + logger.Log("msg", logging.Sprintf("job %s started", descriptor.Name)) + err := descriptor.Run(ctx) + if err != nil { + logger.Log("msg", logging.Sprintf("job %s finished with error: %s", descriptor.Name, err)) + return err + } + logger.Log("msg", logging.Sprintf("job %s completed", descriptor.Name)) + return nil + } + return descriptor + } +} + +// WrapTracing returns a new Universal job that will trace. +func WrapTracing(tracer opentracing.Tracer) JobMiddleware { + return func(descriptor JobDescriptor) JobDescriptor { + descriptor.Run = func(ctx context.Context) error { + span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, fmt.Sprintf("Job: %s", descriptor.Name)) + defer span.Finish() + span.SetTag("schedule", descriptor.RawSpec) + err := descriptor.Run(ctx) + if err != nil { + ext.Error.Set(span, true) + return err + } + return nil + } + return descriptor + } +} diff --git a/cron/options.go b/cron/options.go new file mode 100644 index 00000000..c90d3b84 --- /dev/null +++ b/cron/options.go @@ -0,0 +1,39 @@ +package cron + +import ( + "time" + + "github.com/robfig/cron/v3" +) + +// Option represents a modification to the default behavior of a Cron. +type Option func(*Cron) + +// WithLocation overrides the timezone of the cron instance. +func WithLocation(loc *time.Location) Option { + return func(c *Cron) { + c.location = loc + } +} + +// WithSeconds overrides the parser used for interpreting job schedules to +// include a seconds field as the first one. +func WithSeconds() Option { + return WithParser(cron.NewParser( + cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, + )) +} + +// WithParser overrides the parser used for interpreting job schedules. +func WithParser(p cron.ScheduleParser) Option { + return func(c *Cron) { + c.parser = p + } +} + +// WithGlobalMiddleware specifies Job wrappers to apply to all jobs added to this cron. +func WithGlobalMiddleware(middleware ...JobMiddleware) Option { + return func(c *Cron) { + c.globalMiddleware = middleware + } +} From 7f382ff10e262e9042af5044f0b27ab776817adf Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 15:57:34 +0800 Subject: [PATCH 03/19] refactor(cron): remove cronopts, add cron This PR replaces the cron implementation. BREAKING CHANGE: the new cron package github/DoNewsCode/core/cron is not compatible with github.com/robfig/cron/v3. See examples for how to migrate. --- container/container.go | 40 ++- contract/container.go | 2 - cron/config.go | 19 ++ cron/cron.go | 92 +++++-- cron/cron_test.go | 45 ++-- cron/example_test.go | 44 ++++ cron/helper.go | 2 + cron/helper_test.go | 15 ++ {cronopts => cron}/metrics.go | 43 ++- cron/middleware.go | 123 +++++++-- cron/middleware_test.go | 244 ++++++++++++++++++ cron/options.go | 39 --- cronopts/example_metrics_test.go | 42 --- cronopts/jobs/example_test.go | 43 --- cronopts/jobs/universal.go | 110 -------- cronopts/jobs/universal_test.go | 27 -- cronopts/metrics_test.go | 25 -- deprecated_cronopts/deprecation_test.go | 38 +++ {cronopts => deprecated_cronopts}/log.go | 2 +- {cronopts => deprecated_cronopts}/log_test.go | 0 go.mod | 3 +- observability/metrics.go | 12 +- observability/observability_test.go | 4 +- serve.go | 70 +++-- serve_test.go | 48 ++++ 25 files changed, 714 insertions(+), 418 deletions(-) create mode 100644 cron/config.go create mode 100644 cron/example_test.go create mode 100644 cron/helper_test.go rename {cronopts => cron}/metrics.go (65%) create mode 100644 cron/middleware_test.go delete mode 100644 cron/options.go delete mode 100644 cronopts/example_metrics_test.go delete mode 100644 cronopts/jobs/example_test.go delete mode 100644 cronopts/jobs/universal.go delete mode 100644 cronopts/jobs/universal_test.go delete mode 100644 cronopts/metrics_test.go create mode 100644 deprecated_cronopts/deprecation_test.go rename {cronopts => deprecated_cronopts}/log.go (84%) rename {cronopts => deprecated_cronopts}/log_test.go (100%) diff --git a/container/container.go b/container/container.go index 7dbb47df..74322993 100644 --- a/container/container.go +++ b/container/container.go @@ -5,18 +5,24 @@ package container import ( "github.com/DoNewsCode/core/contract" + "github.com/DoNewsCode/core/cron" "github.com/gorilla/mux" "github.com/oklog/run" - "github.com/robfig/cron/v3" + deprecatedcron "github.com/robfig/cron/v3" "github.com/spf13/cobra" "google.golang.org/grpc" ) var _ contract.Container = (*Container)(nil) -// CronProvider provides cron jobs. +// DeprecatedCronProvider provides cron jobs. +// Deprecated: CronProvider is deprecated. Use CronProvider instead +type DeprecatedCronProvider interface { + ProvideCron(crontab *deprecatedcron.Cron) +} + type CronProvider interface { - ProvideCron(crontab *cron.Cron) + ProvideCron(cron *cron.Cron) } // CommandProvider provides cobra.Command. @@ -47,13 +53,14 @@ type RunProvider interface { // Container holds all modules registered. type Container struct { - httpProviders []func(router *mux.Router) - grpcProviders []func(server *grpc.Server) - closerProviders []func() - runProviders []func(g *run.Group) - modules []interface{} - cronProviders []func(crontab *cron.Cron) - commandProviders []func(command *cobra.Command) + httpProviders []func(router *mux.Router) + grpcProviders []func(server *grpc.Server) + closerProviders []func() + runProviders []func(g *run.Group) + modules []interface{} + cronProviders []func(cron *cron.Cron) + deprecatedCronProviders []func(cron *deprecatedcron.Cron) + commandProviders []func(command *cobra.Command) } // ApplyRouter iterates through every HTTPProvider registered in the container, @@ -107,6 +114,16 @@ func (c *Container) Modules() []interface{} { return c.modules } +// ApplyDeprecatedCron iterates through every CronProvider registered in the container, +// and introduce the *deprecatedcron.Cron to everyone. +// +// Deprecated: migrate to ApplyCron. +func (c *Container) ApplyDeprecatedCron(crontab *deprecatedcron.Cron) { + for _, p := range c.deprecatedCronProviders { + p(crontab) + } +} + // ApplyCron iterates through every CronProvider registered in the container, // and introduce the *cron.Cron to everyone. func (c *Container) ApplyCron(crontab *cron.Cron) { @@ -130,6 +147,9 @@ func (c *Container) AddModule(module interface{}) { if p, ok := module.(GRPCProvider); ok { c.grpcProviders = append(c.grpcProviders, p.ProvideGRPC) } + if p, ok := module.(DeprecatedCronProvider); ok { + c.deprecatedCronProviders = append(c.deprecatedCronProviders, p.ProvideCron) + } if p, ok := module.(CronProvider); ok { c.cronProviders = append(c.cronProviders, p.ProvideCron) } diff --git a/contract/container.go b/contract/container.go index 220b085a..1a88533d 100644 --- a/contract/container.go +++ b/contract/container.go @@ -3,7 +3,6 @@ package contract import ( "github.com/gorilla/mux" "github.com/oklog/run" - "github.com/robfig/cron/v3" "github.com/spf13/cobra" "google.golang.org/grpc" ) @@ -12,7 +11,6 @@ import ( type Container interface { ApplyRouter(router *mux.Router) ApplyGRPCServer(server *grpc.Server) - ApplyCron(crontab *cron.Cron) ApplyRunGroup(g *run.Group) ApplyRootCommand(command *cobra.Command) Shutdown() diff --git a/cron/config.go b/cron/config.go new file mode 100644 index 00000000..2a612de2 --- /dev/null +++ b/cron/config.go @@ -0,0 +1,19 @@ +package cron + +import ( + "time" + + "github.com/robfig/cron/v3" +) + +// Config is the configuration for the cron package. +type Config struct { + // Parser is the parser to parse cron expressions. + Parser cron.ScheduleParser + // Location is the timezone to use in parsing cron expressions. + Location *time.Location + // GlobalOptions are the job options that are applied to all jobs. + GlobalOptions []JobOptions + // EnableSeconds is whether to enable seconds in the cron expression. + EnableSeconds bool +} diff --git a/cron/cron.go b/cron/cron.go index d6a73d88..32951025 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -1,3 +1,7 @@ +// Package cron is a partial rewrite based on the github.com/robfig/cron/v3 +// package. The API in this package enforces context passing and error +// propagation, and consequently enables better logging, metrics and tracing +// support. package cron import ( @@ -10,33 +14,44 @@ import ( "github.com/robfig/cron/v3" ) +// Cron schedules jobs to be run on the specified schedule. type Cron struct { - parser cron.ScheduleParser - lock *sync.Cond - jobDescriptors jobDescriptors - globalMiddleware []JobMiddleware - location *time.Location - nextID int - quitWaiter sync.WaitGroup - baseContext context.Context - baseContextCancel func() + parser cron.ScheduleParser + lock *sync.Cond + jobDescriptors jobDescriptors + globalMiddleware []JobOptions + location *time.Location + nextID int + quitWaiter sync.WaitGroup } -func New(options ...Option) *Cron { +// New returns a new Cron instance. +func New(config Config) *Cron { c := &Cron{ - parser: cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor), - location: time.Local, - nextID: 1, - lock: sync.NewCond(&sync.Mutex{}), - baseContext: context.Background(), + parser: config.Parser, + lock: sync.NewCond(&sync.Mutex{}), + jobDescriptors: jobDescriptors{}, + globalMiddleware: config.GlobalOptions, + location: config.Location, + nextID: 1, } - for _, f := range options { - f(c) + if config.Parser == nil { + if config.EnableSeconds { + c.parser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + } else { + c.parser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + } + } + if config.Location == nil { + c.location = time.Local } return c } -func (c *Cron) Add(spec string, runner func(ctx context.Context) error, middleware ...JobMiddleware) (JobID, error) { +// Add adds a new job to the cron scheduler.A list of middleware can be supplied. +// Note the error returned by the runner will be discarded. It is the user's +// responsibility to handle the error via middleware. +func (c *Cron) Add(spec string, runner func(ctx context.Context) error, middleware ...JobOptions) (JobID, error) { schedule, err := c.parser.Parse(spec) if err != nil { return 0, err @@ -49,10 +64,10 @@ func (c *Cron) Add(spec string, runner func(ctx context.Context) error, middlewa next: schedule.Next(c.now()), } - middleware = append(middleware, c.globalMiddleware...) + middleware = append(c.globalMiddleware, middleware...) - for i := len(middleware) - 1; i > 0; i-- { - jobDescriptor = middleware[i](jobDescriptor) + for i := len(middleware) - 1; i >= 0; i-- { + middleware[i](&jobDescriptor) } c.lock.L.Lock() @@ -81,6 +96,20 @@ func (c *Cron) Remove(id JobID) { } } +// Descriptors returns a list of all job descriptors. +func (c *Cron) Descriptors() []JobDescriptor { + var descriptors []JobDescriptor + + c.lock.L.Lock() + defer c.lock.L.Unlock() + + for _, descriptor := range c.jobDescriptors { + descriptors = append(descriptors, *descriptor) + } + return descriptors +} + +// Run starts the cron scheduler. It is a blocking call. func (c *Cron) Run(ctx context.Context) error { defer c.quitWaiter.Wait() @@ -178,14 +207,23 @@ func (j *jobDescriptors) Pop() (v interface{}) { return v } +// JobID is the identifier of jobs. type JobID int +// JobDescriptor contains the information about jobs. type JobDescriptor struct { - ID JobID - Name string - RawSpec string + // ID is the identifier of job + ID JobID + // Name is an optional field typically added by WithName. It can be useful in logging and metrics. + Name string + // RawSpec contains the string format of cron schedule format. + RawSpec string + // Schedule is the parsed version of RawSpec. It can be overridden by WithSchedule. Schedule cron.Schedule - next time.Time - prev time.Time - Run func(ctx context.Context) error + // Run is the actual work to be done. + Run func(ctx context.Context) error + // next is the next time the job should run. + next time.Time + // prev is the last time the job ran. + prev time.Time } diff --git a/cron/cron_test.go b/cron/cron_test.go index 536c100a..7aef2ce8 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -9,24 +9,38 @@ import ( "github.com/stretchr/testify/assert" ) -func TestCron_sort(t *testing.T) { +type fakeOnceScheduler struct { + returned int32 + runAfter time.Duration +} + +func (s *fakeOnceScheduler) Next(t time.Time) time.Time { + if old := atomic.SwapInt32(&s.returned, 1); old == 0 { + return t.Add(s.runAfter) + } + return time.Time{} +} + +func TestCron_heapsort(t *testing.T) { t.Parallel() - ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Millisecond) defer cancel() - c := New(WithSeconds()) + + c := New(Config{EnableSeconds: true}) var i int32 + c.Add("3 * * * * *", func(ctx context.Context) error { - assert.Equal(t, 2, atomic.SwapInt32(&i, 3)) + assert.Equal(t, int32(2), atomic.SwapInt32(&i, 3)) return nil - }) + }, WithSchedule(&fakeOnceScheduler{runAfter: 3 * time.Millisecond})) c.Add("1 * * * * *", func(ctx context.Context) error { - assert.Equal(t, 0, atomic.SwapInt32(&i, 1)) + assert.Equal(t, int32(0), atomic.SwapInt32(&i, 1)) return nil - }) + }, WithSchedule(&fakeOnceScheduler{runAfter: 1 * time.Millisecond})) c.Add("2 * * * * *", func(ctx context.Context) error { - assert.Equal(t, 0, atomic.SwapInt32(&i, 2)) + assert.Equal(t, int32(0), atomic.SwapInt32(&i, 2)) return nil - }) + }, WithSchedule(&fakeOnceScheduler{runAfter: 2 * time.Millisecond})) c.Run(ctx) } @@ -34,7 +48,7 @@ func TestCron_no_job(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - c := New() + c := New(Config{}) c.Run(ctx) } @@ -42,7 +56,8 @@ func TestCron_late_job(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - c := New(WithSeconds()) + c := New(Config{EnableSeconds: true}) + go c.Run(ctx) time.Sleep(time.Millisecond) ch := make(chan struct{}) @@ -59,21 +74,21 @@ func TestCron_late_job(t *testing.T) { func TestCron_remove_job(t *testing.T) { t.Parallel() - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond) defer cancel() - c := New(WithSeconds()) + c := New(Config{}) go c.Run(ctx) ch := make(chan struct{}) id, _ := c.Add("* * * * * *", func(ctx context.Context) error { ch <- struct{}{} return nil - }) + }, WithSchedule(&fakeOnceScheduler{runAfter: time.Millisecond})) c.Remove(id) select { case <-ch: t.Fatal("should not be called") - case <-time.After(2 * time.Second): + case <-time.After(2 * time.Millisecond): } } diff --git a/cron/example_test.go b/cron/example_test.go new file mode 100644 index 00000000..615395e4 --- /dev/null +++ b/cron/example_test.go @@ -0,0 +1,44 @@ +package cron_test + +import ( + "context" + "fmt" + "github.com/DoNewsCode/core" + "github.com/DoNewsCode/core/cron" + "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/observability" + "time" +) + +type CronModule struct { + metrics *cron.CronJobMetrics +} + +func NewCronModule(metrics *cron.CronJobMetrics) *CronModule { + return &CronModule{metrics: metrics} +} + +func (module *CronModule) ProvideCron(crontab *cron.Cron) { + crontab.Add("* * * * * *", func(ctx context.Context) error { + fmt.Println("I am a cron") + return nil + }, cron.WithMetrics(module.metrics), cron.WithName("foo")) +} + +func Example() { + c := core.Default(core.WithInline("log.level", "none")) + c.Provide(observability.Providers()) + c.Provide( + di.Deps{func() *cron.Cron { + return cron.New(cron.Config{EnableSeconds: true}) + }}, + ) + + c.AddModuleFunc(NewCronModule) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + c.Serve(ctx) + // Output: + // I am a cron +} diff --git a/cron/helper.go b/cron/helper.go index e7fdde9c..9875351a 100644 --- a/cron/helper.go +++ b/cron/helper.go @@ -10,6 +10,7 @@ var ( nextContextKey = struct{}{} ) +// GetCurrentSchedule returns the current schedule for the given context. func GetCurrentSchedule(ctx context.Context) time.Time { if ctx == nil { return time.Time{} @@ -20,6 +21,7 @@ func GetCurrentSchedule(ctx context.Context) time.Time { return time.Time{} } +// GetNextSchedule returns the next schedule for the given context. func GetNextSchedule(ctx context.Context) time.Time { if ctx == nil { return time.Time{} diff --git a/cron/helper_test.go b/cron/helper_test.go new file mode 100644 index 00000000..f7d7f2a6 --- /dev/null +++ b/cron/helper_test.go @@ -0,0 +1,15 @@ +package cron + +import ( + "context" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestGetCurrentSchedule(t *testing.T) { + assert.True(t, GetCurrentSchedule(context.Background()).IsZero()) +} + +func TestGetNextSchedule(t *testing.T) { + assert.True(t, GetNextSchedule(context.Background()).IsZero()) +} diff --git a/cronopts/metrics.go b/cron/metrics.go similarity index 65% rename from cronopts/metrics.go rename to cron/metrics.go index c395f190..794ff027 100644 --- a/cronopts/metrics.go +++ b/cron/metrics.go @@ -1,10 +1,8 @@ -package cronopts +package cron import ( - "time" - "github.com/go-kit/kit/metrics" - "github.com/robfig/cron/v3" + "time" ) // CronJobMetrics collects metrics for cron jobs. @@ -13,8 +11,9 @@ type CronJobMetrics struct { cronJobFailCount metrics.Counter // labels that has been set - module string - job string + module string + job string + schedule string } // NewCronJobMetrics constructs a new *CronJobMetrics, setting default labels to "unknown". @@ -24,6 +23,7 @@ func NewCronJobMetrics(histogram metrics.Histogram, counter metrics.Counter) *Cr cronJobFailCount: counter, module: "unknown", job: "unknown", + schedule: "unknown", } } @@ -34,6 +34,7 @@ func (c *CronJobMetrics) Module(module string) *CronJobMetrics { cronJobFailCount: c.cronJobFailCount, module: module, job: c.job, + schedule: c.schedule, } } @@ -44,29 +45,27 @@ func (c *CronJobMetrics) Job(job string) *CronJobMetrics { cronJobFailCount: c.cronJobFailCount, module: c.module, job: job, + schedule: c.schedule, + } +} + +// Schedule specifies the schedule label for CronJobMetrics. +func (c *CronJobMetrics) Schedule(schedule string) *CronJobMetrics { + return &CronJobMetrics{ + cronJobDurationSeconds: c.cronJobDurationSeconds, + cronJobFailCount: c.cronJobFailCount, + module: c.module, + job: c.job, + schedule: schedule, } } // Fail marks the job as failed. func (c *CronJobMetrics) Fail() { - c.cronJobFailCount.With("module", c.module, "job", c.job).Add(1) + c.cronJobFailCount.With("module", c.module, "job", c.job, "schedule", c.schedule).Add(1) } // Observe records the duration of the job. func (c *CronJobMetrics) Observe(duration time.Duration) { - c.cronJobDurationSeconds.With("module", c.module, "job", c.job).Observe(duration.Seconds()) -} - -// Measure wraps the given job and records the duration and success. -func (c *CronJobMetrics) Measure(job cron.Job) cron.Job { - return cron.FuncJob(func() { - start := time.Now() - defer c.cronJobDurationSeconds.With("module", c.module, "job", c.job).Observe(time.Since(start).Seconds()) - job.Run() - }) -} - -// Measure returns a job wrapper that wraps the given job and records the duration and success. -func Measure(c *CronJobMetrics) cron.JobWrapper { - return c.Measure + c.cronJobDurationSeconds.With("module", c.module, "job", c.job, "schedule", c.schedule).Observe(duration.Seconds()) } diff --git a/cron/middleware.go b/cron/middleware.go index 120aa435..4af03c78 100644 --- a/cron/middleware.go +++ b/cron/middleware.go @@ -2,10 +2,11 @@ package cron import ( "context" + "errors" "fmt" + "runtime/debug" "time" - "github.com/DoNewsCode/core/cronopts" "github.com/DoNewsCode/core/logging" "github.com/go-kit/log" "github.com/opentracing/opentracing-go" @@ -13,48 +14,55 @@ import ( "github.com/robfig/cron/v3" ) -type JobMiddleware func(descriptors JobDescriptor) JobDescriptor +// JobOptions is a middleware for cron jobs. +type JobOptions func(descriptors *JobDescriptor) -func SetName(name string) JobMiddleware { - return func(descriptors JobDescriptor) JobDescriptor { - descriptors.Name = name - return descriptors +// WithName sets the name of the job. +func WithName(name string) JobOptions { + return func(descriptor *JobDescriptor) { + descriptor.Name = name } } -func ReplaceSchedule(schedule cron.Schedule) JobMiddleware { - return func(descriptors JobDescriptor) JobDescriptor { - descriptors.Schedule = schedule - return descriptors +// WithSchedule sets the cron schedule of the job. +func WithSchedule(schedule cron.Schedule) JobOptions { + return func(descriptor *JobDescriptor) { + descriptor.Schedule = schedule } } -// WrapMetrics returns a new JobDescriptor that will report metrics. -func WrapMetrics(metrics *cronopts.CronJobMetrics) JobMiddleware { - return func(descriptor JobDescriptor) JobDescriptor { +// WithMetrics returns a new JobDescriptor that will report metrics. +func WithMetrics(metrics *CronJobMetrics) JobOptions { + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { start := time.Now() - metrics = metrics.Job(descriptor.Name) + metrics = metrics.Job(descriptor.Name).Schedule(descriptor.RawSpec) defer metrics.Observe(time.Since(start)) - err := descriptor.Run(ctx) + err := innerRun(ctx) if err != nil { metrics.Fail() return err } return nil } - return descriptor } } -// WrapLogs returns a new Universal job that will log. -func WrapLogs(logger log.Logger) JobMiddleware { - return func(descriptor JobDescriptor) JobDescriptor { +// WithLogging returns a new Universal job that will log. +func WithLogging(logger log.Logger) JobOptions { + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { + due := GetCurrentSchedule(ctx) + delayed := due.Sub(time.Now()) logger = logging.WithContext(logger, ctx) + if delayed > time.Second { + log.With(logger, "delayed", delayed) + } logger = log.With(logger, "job", descriptor.Name, "schedule", descriptor.RawSpec) logger.Log("msg", logging.Sprintf("job %s started", descriptor.Name)) - err := descriptor.Run(ctx) + err := innerRun(ctx) if err != nil { logger.Log("msg", logging.Sprintf("job %s finished with error: %s", descriptor.Name, err)) return err @@ -62,24 +70,87 @@ func WrapLogs(logger log.Logger) JobMiddleware { logger.Log("msg", logging.Sprintf("job %s completed", descriptor.Name)) return nil } - return descriptor } } -// WrapTracing returns a new Universal job that will trace. -func WrapTracing(tracer opentracing.Tracer) JobMiddleware { - return func(descriptor JobDescriptor) JobDescriptor { +// WithTracing returns a new Universal job that will trace. +func WithTracing(tracer opentracing.Tracer) JobOptions { + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, fmt.Sprintf("Job: %s", descriptor.Name)) defer span.Finish() span.SetTag("schedule", descriptor.RawSpec) - err := descriptor.Run(ctx) + err := innerRun(ctx) if err != nil { ext.Error.Set(span, true) return err } return nil } - return descriptor + } +} + +// SkipIfOverlap returns a new JobDescriptor that will skip the job if it overlaps with another job. +func SkipIfOverlap() JobOptions { + ch := make(chan struct{}, 1) + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + select { + case ch <- struct{}{}: + defer func() { + <-ch + }() + return innerRun(ctx) + default: + return errors.New("skipped due to overlap") + } + } + } +} + +// DelayIfOverlap returns a new JobDescriptor that will delay the job if it overlaps with another job. +func DelayIfOverlap() JobOptions { + ch := make(chan struct{}, 1) + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + ch <- struct{}{} + defer func() { + <-ch + }() + return innerRun(ctx) + } + } +} + +// TimeoutIfOverlap returns a new JobDescriptor that will cancel the job's context if the next schedule is due. +func TimeoutIfOverlap() JobOptions { + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + if !GetNextSchedule(ctx).IsZero() { + ctx, cancel := context.WithDeadline(ctx, GetNextSchedule(ctx)) + defer cancel() + return innerRun(ctx) + } + return innerRun(ctx) + } + } +} + +// Recover returns a new JobDescriptor that will recover from panics. +func Recover(logger log.Logger) JobOptions { + return func(descriptor *JobDescriptor) { + innerRun := descriptor.Run + descriptor.Run = func(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + logging.WithContext(logger, ctx).Log("msg", "job panicked", "err", r, "stack", debug.Stack()) + } + }() + return innerRun(ctx) + } } } diff --git a/cron/middleware_test.go b/cron/middleware_test.go new file mode 100644 index 00000000..3f215019 --- /dev/null +++ b/cron/middleware_test.go @@ -0,0 +1,244 @@ +package cron + +import ( + "bytes" + "context" + "errors" + "github.com/DoNewsCode/core/internal/stub" + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/robfig/cron/v3" + "strings" + "testing" + "time" +) + +type mockScheduler func(now time.Time) time.Time // mockScheduler is a function that returns the next time to run + +func (m mockScheduler) Next(t time.Time) time.Time { + return m(t) +} + +type mockParser struct{} + +func (m mockParser) Parse(spec string) (cron.Schedule, error) { + return mockScheduler(func(now time.Time) time.Time { + return now.Add(time.Millisecond) + }), nil +} + +func TestMiddleware_generic(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + logger := log.NewLogfmtLogger(&buf) + hist := stub.Histogram{} + count := stub.Counter{} + metric := NewCronJobMetrics(&hist, &count) + tracer := mocktracer.MockTracer{} + entryCount := 0 + concurrentCount := 0 + var concurrentAccess bool + + for _, ca := range []struct { + name string + stacks []JobOptions + job func(context.Context) error + asserts func(t *testing.T) + }{ + { + "name and logging", + []JobOptions{ + WithName("test"), + WithLogging(logger), + }, + func(ctx context.Context) error { + return nil + }, + func(t *testing.T) { + t.Log(buf.String()) + if buf.String() == "" { + t.Error("Expected logging output") + } + if strings.Contains(buf.String(), "test") == false { + t.Error("Expected test to be in the log output") + } + buf = bytes.Buffer{} + }, + }, + { + "error and logging", + []JobOptions{ + WithLogging(logger), + }, + func(ctx context.Context) error { + return errors.New("test") + }, + func(t *testing.T) { + t.Log(buf.String()) + if buf.String() == "" { + t.Error("Expected logging output") + } + if strings.Contains(buf.String(), "error") == false { + t.Error("Expected error to be in the log output") + } + buf = bytes.Buffer{} + }, + }, + { + "metrics", + []JobOptions{ + WithMetrics(metric), + }, + func(ctx context.Context) error { + return nil + }, + func(t *testing.T) { + if hist.ObservedValue == 0 { + t.Error("Expected histogram to be observed") + } + if count.CounterValue > 0 { + t.Error("Expected fail counter to be zero") + } + hist = stub.Histogram{} + count = stub.Counter{} + }, + }, + { + "error and metrics", + []JobOptions{ + WithMetrics(metric), + }, + func(ctx context.Context) error { + return errors.New("test") + }, + func(t *testing.T) { + if hist.ObservedValue == 0 { + t.Error("Expected histogram to be observed") + } + if count.CounterValue < 1 { + t.Error("Expected fail counter to be one") + } + hist = stub.Histogram{} + count = stub.Counter{} + }, + }, + { + "tracing", + []JobOptions{ + WithTracing(&tracer), + }, + func(ctx context.Context) error { + return nil + }, + func(t *testing.T) { + if len(tracer.FinishedSpans()) < 1 { + t.Error("Expected one span to be finished") + } + tracer = mocktracer.MockTracer{} + }, + }, + { + "error tracing", + []JobOptions{ + WithTracing(&tracer), + }, + func(ctx context.Context) error { + return errors.New("test") + }, + func(t *testing.T) { + if len(tracer.FinishedSpans()) < 1 { + t.Error("Expected one span to be finished") + } + if tracer.FinishedSpans()[0].Tags()["error"] != true { + t.Error("Expected error tag to be true") + t.Log(tracer.FinishedSpans()[0].Tags()) + } + tracer = mocktracer.MockTracer{} + }, + }, + { + "panic", + []JobOptions{ + Recover(logger), + }, + func(ctx context.Context) error { + panic("to be recovered") + }, + func(t *testing.T) { + if strings.Contains(buf.String(), "to be recovered") == false { + t.Error("Expected panic to be in the log output") + } + buf = bytes.Buffer{} + }, + }, + { + "skip if overlap", + []JobOptions{ + SkipIfOverlap(), + }, + func(ctx context.Context) error { + entryCount++ + time.Sleep(5 * time.Millisecond) + return nil + }, + func(t *testing.T) { + if entryCount > 1 { + t.Errorf("expect entry once, got %d", entryCount) + } + entryCount = 0 + }, + }, + { + "delay if overlap", + []JobOptions{ + DelayIfOverlap(), + }, + func(ctx context.Context) error { + entryCount++ + concurrentCount++ + if concurrentCount > 1 { + concurrentAccess = true + } + time.Sleep(3 * time.Millisecond) + concurrentCount-- + return nil + }, + func(t *testing.T) { + if entryCount < 4 { + t.Errorf("expect entry at least 4 times, got %d", entryCount) + } + if concurrentAccess { + t.Errorf("conncurrent access not allowed") + } + entryCount = 0 + concurrentCount = 0 + }, + }, + { + "timeout if overlap", + []JobOptions{ + TimeoutIfOverlap(), + }, + func(ctx context.Context) error { + entryCount++ + <-ctx.Done() + return nil + }, + func(t *testing.T) { + if entryCount < 3 { + t.Errorf("expect entry at least 4 times, got %d", entryCount) + } + }, + }, + } { + ca := ca + t.Run(ca.name, func(t *testing.T) { + c := New(Config{Parser: mockParser{}}) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) + defer cancel() + c.Add("@every 1ms", ca.job, ca.stacks...) + c.Run(ctx) + ca.asserts(t) + }) + } +} diff --git a/cron/options.go b/cron/options.go deleted file mode 100644 index c90d3b84..00000000 --- a/cron/options.go +++ /dev/null @@ -1,39 +0,0 @@ -package cron - -import ( - "time" - - "github.com/robfig/cron/v3" -) - -// Option represents a modification to the default behavior of a Cron. -type Option func(*Cron) - -// WithLocation overrides the timezone of the cron instance. -func WithLocation(loc *time.Location) Option { - return func(c *Cron) { - c.location = loc - } -} - -// WithSeconds overrides the parser used for interpreting job schedules to -// include a seconds field as the first one. -func WithSeconds() Option { - return WithParser(cron.NewParser( - cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, - )) -} - -// WithParser overrides the parser used for interpreting job schedules. -func WithParser(p cron.ScheduleParser) Option { - return func(c *Cron) { - c.parser = p - } -} - -// WithGlobalMiddleware specifies Job wrappers to apply to all jobs added to this cron. -func WithGlobalMiddleware(middleware ...JobMiddleware) Option { - return func(c *Cron) { - c.globalMiddleware = middleware - } -} diff --git a/cronopts/example_metrics_test.go b/cronopts/example_metrics_test.go deleted file mode 100644 index 72c73003..00000000 --- a/cronopts/example_metrics_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package cronopts_test - -import ( - "context" - "fmt" - "github.com/DoNewsCode/core" - "github.com/DoNewsCode/core/cronopts" - "github.com/DoNewsCode/core/observability" - "github.com/robfig/cron/v3" - "math/rand" - "time" -) - -type CronModule struct { - metrics *cronopts.CronJobMetrics -} - -func NewCronModule(metrics *cronopts.CronJobMetrics) CronModule { - return CronModule{metrics: metrics.Module("test_module")} -} - -func (c CronModule) ProvideCron(crontab *cron.Cron) { - // Create a new cron job, and measure its execution durations. - crontab.AddJob("* * * * *", c.metrics.Job("test_job").Measure(cron.FuncJob(func() { - fmt.Println("running") - // For 50% chance, the job may fail. Report it to metrics collector. - if rand.Float64() > 0.5 { - c.metrics.Fail() - } - }))) -} - -func Example_cronJobMetrics() { - c := core.Default() - c.Provide(observability.Providers()) - c.AddModuleFunc(NewCronModule) - - ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond) - defer cancel() - - c.Serve(ctx) -} diff --git a/cronopts/jobs/example_test.go b/cronopts/jobs/example_test.go deleted file mode 100644 index ce1b9dd6..00000000 --- a/cronopts/jobs/example_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package jobs_test - -import ( - "context" - "time" - - "github.com/DoNewsCode/core" - "github.com/DoNewsCode/core/cronopts" - "github.com/DoNewsCode/core/cronopts/jobs" - "github.com/DoNewsCode/core/observability" - "github.com/go-kit/log" - "github.com/opentracing/opentracing-go" - "github.com/robfig/cron/v3" -) - -type CronJobModule struct { - mJob jobs.Universal -} - -func (c CronJobModule) ProvideCron(crontab *cron.Cron) { - crontab.AddJob("@every 1s", c.mJob) -} - -// NewCronJobModule creates a new module that provides a cron job with metrics, logging and tracing. -func NewCronJobModule(tracer opentracing.Tracer, metrics *cronopts.CronJobMetrics, logger log.Logger) CronJobModule { - return CronJobModule{ - mJob: jobs.New("cronjob", func(ctx context.Context) error { - return nil - }, jobs.WithMetrics(metrics), jobs.WithTracing(tracer), jobs.WithLogs(logger)), - } -} - -func Example() { - ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) - defer cancel() - - c := core.Default(core.WithInline("log.level", "none")) - c.Provide(observability.Providers()) - c.AddModuleFunc(NewCronJobModule) - c.Serve(ctx) - - // Output: -} diff --git a/cronopts/jobs/universal.go b/cronopts/jobs/universal.go deleted file mode 100644 index 24130bd7..00000000 --- a/cronopts/jobs/universal.go +++ /dev/null @@ -1,110 +0,0 @@ -// Package jobs contains a universal job type that implements cron.Job interface. -// It is designed to be used with cron.New() and cron.AddJob() methods. Compared -// to anonymous jobs, this job type supports go idioms like context and error, -// and offers hook point for common observability concerns such as metrics, -// logging and tracing. -package jobs - -import ( - "context" - "fmt" - "time" - - "github.com/DoNewsCode/core/cronopts" - "github.com/DoNewsCode/core/dag" - "github.com/DoNewsCode/core/logging" - "github.com/go-kit/log" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" -) - -// Universal is a generic job that can be used to run any task. It implements the -// cron.Job interface and supports context parameter and error propagation. The -// Name parameter allows common observability concerns such as logging, tracing -// and metrics to take advantage. -type Universal struct { - Name string - Do func(ctx context.Context) error -} - -// New returns a new Universal job. -func New(name string, do func(ctx context.Context) error, wrapper ...func(universal Universal) Universal) Universal { - base := Universal{ - Name: name, - Do: do, - } - for _, w := range wrapper { - base = w(base) - } - return base -} - -// NewFromDAG returns a new Universal job from a DAG. -func NewFromDAG(name string, dag *dag.DAG, wrapper ...func(universal Universal) Universal) Universal { - return New(name, dag.Run, wrapper...) -} - -// Run implements the cron.Job interface. -func (s Universal) Run() { - _ = s.Do(context.Background()) -} - -// WithMetrics returns a new Universal job that will report metrics. -func WithMetrics(metrics *cronopts.CronJobMetrics) func(universal Universal) Universal { - return func(universal Universal) Universal { - return Universal{ - Name: universal.Name, - Do: func(ctx context.Context) error { - start := time.Now() - metrics = metrics.Job(universal.Name) - defer metrics.Observe(time.Since(start).Seconds()) - err := universal.Do(ctx) - if err != nil { - metrics.Fail() - return err - } - return nil - }, - } - } -} - -// WithLogs returns a new Universal job that will log. -func WithLogs(logger log.Logger) func(universal Universal) Universal { - return func(universal Universal) Universal { - return Universal{ - Name: universal.Name, - Do: func(ctx context.Context) error { - logger = logging.WithContext(logger, ctx) - logger = log.With(logger, "job", universal.Name) - logger.Log("msg", logging.Sprintf("job %s started", universal.Name)) - err := universal.Do(ctx) - if err != nil { - logger.Log("msg", logging.Sprintf("job %s finished with error %s", universal.Name, err)) - return err - } - logger.Log("msg", logging.Sprintf("job %s completed", universal.Name)) - return nil - }, - } - } -} - -// WithTracing returns a new Universal job that will trace. -func WithTracing(tracer opentracing.Tracer) func(universal Universal) Universal { - return func(universal Universal) Universal { - return Universal{ - Name: universal.Name, - Do: func(ctx context.Context) error { - span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, fmt.Sprintf("Job: %s", universal.Name)) - defer span.Finish() - err := universal.Do(ctx) - if err != nil { - ext.Error.Set(span, true) - return err - } - return nil - }, - } - } -} diff --git a/cronopts/jobs/universal_test.go b/cronopts/jobs/universal_test.go deleted file mode 100644 index b3f5d3c4..00000000 --- a/cronopts/jobs/universal_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package jobs - -import ( - "context" - "errors" - "testing" - - "github.com/DoNewsCode/core/cronopts" - "github.com/DoNewsCode/core/dag" - "github.com/DoNewsCode/core/internal/stub" - "github.com/go-kit/log" - "github.com/opentracing/opentracing-go" - "github.com/stretchr/testify/assert" -) - -func TestUniversal_error_propagation(t *testing.T) { - d := dag.New() - d.AddVertex(func(ctx context.Context) error { return errors.New("oops") }) - j := NewFromDAG( - "should return error", - d, - WithMetrics(cronopts.NewCronJobMetrics(&stub.Histogram{}, &stub.Counter{})), - WithTracing(opentracing.NoopTracer{}), - WithLogs(log.NewNopLogger()), - ) - assert.Error(t, j.Do(context.Background())) -} diff --git a/cronopts/metrics_test.go b/cronopts/metrics_test.go deleted file mode 100644 index 188d2400..00000000 --- a/cronopts/metrics_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package cronopts - -import ( - "testing" - "time" - - "github.com/DoNewsCode/core/internal/stub" - "github.com/robfig/cron/v3" - "github.com/stretchr/testify/assert" -) - -func TestMeasure(t *testing.T) { - histogram := &stub.Histogram{} - counter := &stub.Counter{} - metrics := NewCronJobMetrics(histogram, counter) - metrics = metrics.Module("x").Job("y") - Measure(metrics)(cron.FuncJob(func() { - time.Sleep(time.Millisecond) - })).Run() - assert.ElementsMatch(t, histogram.LabelValues, []string{"module", "x", "job", "y"}) - assert.True(t, histogram.ObservedValue > 0) - metrics.Fail() - assert.ElementsMatch(t, counter.LabelValues, []string{"module", "x", "job", "y"}) - assert.True(t, counter.CounterValue == 1) -} diff --git a/deprecated_cronopts/deprecation_test.go b/deprecated_cronopts/deprecation_test.go new file mode 100644 index 00000000..9bdd1a80 --- /dev/null +++ b/deprecated_cronopts/deprecation_test.go @@ -0,0 +1,38 @@ +package cronopts_test + +import ( + "context" + "fmt" + "github.com/DoNewsCode/core" + "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/observability" + deprecatedcron "github.com/robfig/cron/v3" + "testing" + "time" +) + +type CronModule struct{} + +func (module *CronModule) ProvideCron(crontab *deprecatedcron.Cron) { + crontab.AddFunc("* * * * * *", func() { + fmt.Println("Cron job ran") + }) +} + +func Test_deprecation(t *testing.T) { + c := core.Default(core.WithInline("log.level", "none")) + c.Provide(observability.Providers()) + c.Provide( + di.Deps{func() *deprecatedcron.Cron { + return deprecatedcron.New(deprecatedcron.WithSeconds()) + }}, + ) + + c.AddModule(CronModule{}) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + c.Serve(ctx) + // Output: + // Cron job ran +} diff --git a/cronopts/log.go b/deprecated_cronopts/log.go similarity index 84% rename from cronopts/log.go rename to deprecated_cronopts/log.go index e0c7ffe9..626f7446 100644 --- a/cronopts/log.go +++ b/deprecated_cronopts/log.go @@ -1,4 +1,4 @@ -// Package cronopts contains the options for cron. +// Package deprecated_cronopts contains the options for cron. This package is deprecated. Use package cron instead. package cronopts import ( diff --git a/cronopts/log_test.go b/deprecated_cronopts/log_test.go similarity index 100% rename from cronopts/log_test.go rename to deprecated_cronopts/log_test.go diff --git a/go.mod b/go.mod index 6b7bc40a..93d5a14f 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,8 @@ require ( gorm.io/gorm v1.21.10 ) +require github.com/felixge/httpsnoop v1.0.1 + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect @@ -58,7 +60,6 @@ require ( github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/felixge/httpsnoop v1.0.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-sql-driver/mysql v1.5.0 // indirect github.com/go-stack/stack v1.8.0 // indirect diff --git a/observability/metrics.go b/observability/metrics.go index 8577fb7e..f1f1fb2b 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -1,7 +1,7 @@ package observability import ( - "github.com/DoNewsCode/core/cronopts" + "github.com/DoNewsCode/core/cron" "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/otgorm" "github.com/DoNewsCode/core/otkafka" @@ -54,14 +54,14 @@ func ProvideGRPCRequestDurationSeconds(in MetricsIn) *srvgrpc.RequestDurationSec return srvgrpc.NewRequestDurationSeconds(prometheus.NewHistogram(grpc)) } -// ProvideCronJobMetrics returns a *cronopts.CronJobMetrics that is designed to +// ProvideCronJobMetrics returns a *deprecated_cronopts.CronJobMetrics that is designed to // measure cron job metrics. The returned metrics can be used like this: -// metrics := cronopts.NewCronJobMetrics(...) +// metrics := deprecated_cronopts.NewCronJobMetrics(...) // job := cron.NewChain( // cron.Recover(logger), -// cronopts.Measure(metrics), +// deprecated_cronopts.Measure(metrics), // ).Then(job) -func ProvideCronJobMetrics(in MetricsIn) *cronopts.CronJobMetrics { +func ProvideCronJobMetrics(in MetricsIn) *cron.CronJobMetrics { histogram := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{ Name: "cronjob_duration_seconds", Help: "Total time spent running cron jobs.", @@ -79,7 +79,7 @@ func ProvideCronJobMetrics(in MetricsIn) *cronopts.CronJobMetrics { in.Registerer.MustRegister(histogram) in.Registerer.MustRegister(counter) - return cronopts.NewCronJobMetrics(prometheus.NewHistogram(histogram), prometheus.NewCounter(counter)) + return cron.NewCronJobMetrics(prometheus.NewHistogram(histogram), prometheus.NewCounter(counter)) } // ProvideGORMMetrics returns a *otgorm.Gauges that measures the connection info diff --git a/observability/observability_test.go b/observability/observability_test.go index acccfd8d..bff7aa93 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -1,13 +1,13 @@ package observability import ( - "github.com/DoNewsCode/core/cronopts" "os" "strings" "testing" "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/config" + "github.com/DoNewsCode/core/cron" "github.com/DoNewsCode/core/otgorm" "github.com/DoNewsCode/core/otkafka" "github.com/DoNewsCode/core/otredis" @@ -87,7 +87,7 @@ func TestProvideCronjobMetrics(t *testing.T) { c := core.New() c.ProvideEssentials() c.Provide(Providers()) - c.Invoke(func(metrics *cronopts.CronJobMetrics) { + c.Invoke(func(metrics *cron.CronJobMetrics) { metrics.Fail() }) } diff --git a/serve.go b/serve.go index c737b497..7b1e0051 100644 --- a/serve.go +++ b/serve.go @@ -11,7 +11,8 @@ import ( "github.com/DoNewsCode/core/container" "github.com/DoNewsCode/core/contract" - "github.com/DoNewsCode/core/cronopts" + cron "github.com/DoNewsCode/core/cron" + "github.com/DoNewsCode/core/deprecated_cronopts" "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/logging" "github.com/go-kit/log" @@ -19,7 +20,7 @@ import ( "github.com/gorilla/mux" "github.com/oklog/run" "github.com/pkg/errors" - "github.com/robfig/cron/v3" + deprecatedcron "github.com/robfig/cron/v3" "github.com/spf13/cobra" "google.golang.org/grpc" ) @@ -27,13 +28,14 @@ import ( type serveIn struct { di.In - Dispatcher contract.Dispatcher - Config contract.ConfigAccessor - Logger log.Logger - Container contract.Container - HTTPServer *http.Server `optional:"true"` - GRPCServer *grpc.Server `optional:"true"` - Cron *cron.Cron `optional:"true"` + Dispatcher contract.Dispatcher + Config contract.ConfigAccessor + Logger log.Logger + Container contract.Container + HTTPServer *http.Server `optional:"true"` + GRPCServer *grpc.Server `optional:"true"` + DeprecatedCron *deprecatedcron.Cron `optional:"true"` + Cron *cron.Cron `optional:"true"` } func NewServeModule(in serveIn) serveModule { @@ -140,19 +142,46 @@ func (s serveIn) cronServe(ctx context.Context, logger logging.LevelLogger) (fun if s.Config.Bool("cron.disable") { return nil, nil, nil } - if s.Cron == nil { - logger := log.With(s.Logger, "tag", "cron") - s.Cron = cron.New(cron.WithLogger(cronopts.CronLogAdapter{Logging: logger})) + if mContainer, ok := s.Container.(interface{ ApplyCron(*cron.Cron) }); ok { + if s.Cron == nil { + s.Cron = cron.New(cron.Config{GlobalOptions: []cron.JobOptions{cron.WithLogging(log.With(s.Logger, "tag", "cron"))}}) + } + mContainer.ApplyCron(s.Cron) + if len(s.Cron.Descriptors()) > 0 { + ctx, cancel := context.WithCancel(ctx) + return func() error { + logger.Infof("cron runner started") + return s.Cron.Run(ctx) + }, func(err error) { + cancel() + }, nil + } } - s.Container.ApplyCron(s.Cron) + return nil, nil, nil +} - return func() error { - logger.Infof("cron runner started") - s.Cron.Run() - return nil - }, func(err error) { - <-s.Cron.Stop().Done() - }, nil +func (s serveIn) cronServeDeprecated(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) { + if s.Config.Bool("cron.disable") { + return nil, nil, nil + } + if mContainer, ok := s.Container.(interface{ ApplyDeprecatedCron(*deprecatedcron.Cron) }); ok { + if s.DeprecatedCron == nil { + logger := log.With(s.Logger, "tag", "cron") + s.DeprecatedCron = deprecatedcron.New(deprecatedcron.WithLogger(cronopts.CronLogAdapter{Logging: logger})) + } + mContainer.ApplyDeprecatedCron(s.DeprecatedCron) + if len(s.DeprecatedCron.Entries()) > 0 { + return func() error { + logger.Infof("cron runner started") + logger.Warn("Directly using github.com/robfig/cron/v3 is deprecated. Please migrate to github.com/DoNewsCode/core/cron") + s.DeprecatedCron.Run() + return nil + }, func(err error) { + <-s.DeprecatedCron.Stop().Done() + }, nil + } + } + return nil, nil, nil } func (s serveIn) signalWatch(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) { @@ -193,6 +222,7 @@ func newServeCmd(s serveIn) *cobra.Command { s.httpServe, s.grpcServe, s.cronServe, + s.cronServeDeprecated, s.signalWatch, } diff --git a/serve_test.go b/serve_test.go index 87b0e26d..854e0d33 100644 --- a/serve_test.go +++ b/serve_test.go @@ -4,6 +4,10 @@ import ( "bytes" "context" "errors" + "github.com/DoNewsCode/core/cron" + "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/observability" + deprecatedcron "github.com/robfig/cron/v3" "os" "runtime" "testing" @@ -56,3 +60,47 @@ func TestServeIn_signalWatch(t *testing.T) { assert.Contains(t, buf.String(), "context canceled") }) } + +type OldCronModule struct { + CanRun bool +} + +func (module *OldCronModule) ProvideCron(crontab *deprecatedcron.Cron) { + crontab.AddFunc("* * * * * *", func() { + module.CanRun = true + }) +} + +type NewCronModule struct { + CanRun bool +} + +func (module *NewCronModule) ProvideCron(crontab *cron.Cron) { + crontab.Add("* * * * * *", func(ctx context.Context) error { + module.CanRun = true + return nil + }) +} + +func TestServeIn_cron_deprecation(t *testing.T) { + c := Default(WithInline("grpc.disable", true), WithInline("http.disable", true)) + c.Provide(observability.Providers()) + c.Provide( + di.Deps{func() *deprecatedcron.Cron { + return deprecatedcron.New(deprecatedcron.WithSeconds()) + }, func() *cron.Cron { + return cron.New(cron.Config{EnableSeconds: true}) + }}, + ) + + mOld := OldCronModule{} + mNew := NewCronModule{} + c.AddModule(&mOld) + c.AddModule(&mNew) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + c.Serve(ctx) + assert.True(t, mOld.CanRun) + assert.True(t, mNew.CanRun) +} From feb119544152bc95626e03f4734e02f67bb75315 Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 16:00:01 +0800 Subject: [PATCH 04/19] refactor(cron): deprecate cronopts, add cron This PR replaces the cron implementation. BREAKING CHANGE: the new cron package github/DoNewsCode/core/cron is not compatible with github.com/robfig/cron/v3. See examples for how to migrate. --- cron/cron.go | 1 + deprecated_cronopts/log.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cron/cron.go b/cron/cron.go index 32951025..d100c608 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -85,6 +85,7 @@ func (c *Cron) Add(spec string, runner func(ctx context.Context) error, middlewa return jobDescriptor.ID, nil } +// Remove removes a job from the cron scheduler. func (c *Cron) Remove(id JobID) { c.lock.L.Lock() defer c.lock.L.Unlock() diff --git a/deprecated_cronopts/log.go b/deprecated_cronopts/log.go index 626f7446..4e2683e0 100644 --- a/deprecated_cronopts/log.go +++ b/deprecated_cronopts/log.go @@ -1,4 +1,4 @@ -// Package deprecated_cronopts contains the options for cron. This package is deprecated. Use package cron instead. +// Package cronopts contains the options for cron. This package is deprecated. Use package cron instead. package cronopts import ( From 2a8b9b658fa37c5f6ddde7bcf31c025c57547bd3 Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 16:02:19 +0800 Subject: [PATCH 05/19] fix: delayed time calculation --- cron/middleware.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cron/middleware.go b/cron/middleware.go index 4af03c78..29b8e6dc 100644 --- a/cron/middleware.go +++ b/cron/middleware.go @@ -55,7 +55,7 @@ func WithLogging(logger log.Logger) JobOptions { innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { due := GetCurrentSchedule(ctx) - delayed := due.Sub(time.Now()) + delayed := time.Now().Sub(due) logger = logging.WithContext(logger, ctx) if delayed > time.Second { log.With(logger, "delayed", delayed) From 708ae0d1530b4648236471b68a53b21cc9b079ed Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 16:04:22 +0800 Subject: [PATCH 06/19] refactor: change job middleware to job options --- cron/{middleware.go => options.go} | 0 cron/{middleware_test.go => options_test.go} | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename cron/{middleware.go => options.go} (100%) rename cron/{middleware_test.go => options_test.go} (99%) diff --git a/cron/middleware.go b/cron/options.go similarity index 100% rename from cron/middleware.go rename to cron/options.go diff --git a/cron/middleware_test.go b/cron/options_test.go similarity index 99% rename from cron/middleware_test.go rename to cron/options_test.go index 3f215019..937f4cfd 100644 --- a/cron/middleware_test.go +++ b/cron/options_test.go @@ -27,7 +27,7 @@ func (m mockParser) Parse(spec string) (cron.Schedule, error) { }), nil } -func TestMiddleware_generic(t *testing.T) { +func TestJobOption(t *testing.T) { t.Parallel() var buf bytes.Buffer logger := log.NewLogfmtLogger(&buf) From b8c91e750a78ca9951799ad978647a1f5a1975d8 Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 16:05:55 +0800 Subject: [PATCH 07/19] fix: use time.Since --- cron/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cron/options.go b/cron/options.go index 29b8e6dc..08ed236a 100644 --- a/cron/options.go +++ b/cron/options.go @@ -55,7 +55,7 @@ func WithLogging(logger log.Logger) JobOptions { innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { due := GetCurrentSchedule(ctx) - delayed := time.Now().Sub(due) + delayed := time.Since(due) logger = logging.WithContext(logger, ctx) if delayed > time.Second { log.With(logger, "delayed", delayed) From 419d4f00cc79c297066172bbd269b0ecf5cd71bd Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 16:11:37 +0800 Subject: [PATCH 08/19] fix: inconsistent labels --- container/container_test.go | 2 +- observability/metrics.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/container/container_test.go b/container/container_test.go index 51ec767d..aeaf12fb 100644 --- a/container/container_test.go +++ b/container/container_test.go @@ -3,9 +3,9 @@ package container import ( "testing" + "github.com/DoNewsCode/core/cron" "github.com/gorilla/mux" "github.com/oklog/run" - "github.com/robfig/cron/v3" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" "google.golang.org/grpc" diff --git a/observability/metrics.go b/observability/metrics.go index f1f1fb2b..15242c68 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -65,12 +65,12 @@ func ProvideCronJobMetrics(in MetricsIn) *cron.CronJobMetrics { histogram := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{ Name: "cronjob_duration_seconds", Help: "Total time spent running cron jobs.", - }, []string{"module", "job"}) + }, []string{"module", "job", "schedule"}) counter := stdprometheus.NewCounterVec(stdprometheus.CounterOpts{ Name: "cronjob_failures_total", Help: "Total number of cron jobs that failed.", - }, []string{"module", "job"}) + }, []string{"module", "job", "schedule"}) if in.Registerer == nil { in.Registerer = stdprometheus.DefaultRegisterer From 8ec990029742c8ab92fef5b0a7aa33fdccf58aba Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 16:19:10 +0800 Subject: [PATCH 09/19] fix: race --- cron/options_test.go | 2 +- serve_test.go | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cron/options_test.go b/cron/options_test.go index 937f4cfd..e872604d 100644 --- a/cron/options_test.go +++ b/cron/options_test.go @@ -30,7 +30,7 @@ func (m mockParser) Parse(spec string) (cron.Schedule, error) { func TestJobOption(t *testing.T) { t.Parallel() var buf bytes.Buffer - logger := log.NewLogfmtLogger(&buf) + logger := log.NewSyncLogger(log.NewLogfmtLogger(&buf)) hist := stub.Histogram{} count := stub.Counter{} metric := NewCronJobMetrics(&hist, &count) diff --git a/serve_test.go b/serve_test.go index 854e0d33..b03dfe43 100644 --- a/serve_test.go +++ b/serve_test.go @@ -10,6 +10,7 @@ import ( deprecatedcron "github.com/robfig/cron/v3" "os" "runtime" + "sync/atomic" "testing" "time" @@ -62,22 +63,22 @@ func TestServeIn_signalWatch(t *testing.T) { } type OldCronModule struct { - CanRun bool + CanRun uint32 } func (module *OldCronModule) ProvideCron(crontab *deprecatedcron.Cron) { crontab.AddFunc("* * * * * *", func() { - module.CanRun = true + atomic.StoreUint32(&module.CanRun, 1) }) } type NewCronModule struct { - CanRun bool + CanRun uint32 } func (module *NewCronModule) ProvideCron(crontab *cron.Cron) { crontab.Add("* * * * * *", func(ctx context.Context) error { - module.CanRun = true + atomic.StoreUint32(&module.CanRun, 1) return nil }) } @@ -101,6 +102,6 @@ func TestServeIn_cron_deprecation(t *testing.T) { defer cancel() c.Serve(ctx) - assert.True(t, mOld.CanRun) - assert.True(t, mNew.CanRun) + assert.True(t, mOld.CanRun == 1) + assert.True(t, mNew.CanRun == 1) } From 0a28287072245b46f6787c05fe84527641746bfb Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 16:29:10 +0800 Subject: [PATCH 10/19] fix: race --- cron/options.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cron/options.go b/cron/options.go index 08ed236a..508dbb9c 100644 --- a/cron/options.go +++ b/cron/options.go @@ -56,18 +56,18 @@ func WithLogging(logger log.Logger) JobOptions { descriptor.Run = func(ctx context.Context) error { due := GetCurrentSchedule(ctx) delayed := time.Since(due) - logger = logging.WithContext(logger, ctx) + l := logging.WithContext(logger, ctx) if delayed > time.Second { - log.With(logger, "delayed", delayed) + l = log.With(l, "delayed", delayed) } - logger = log.With(logger, "job", descriptor.Name, "schedule", descriptor.RawSpec) - logger.Log("msg", logging.Sprintf("job %s started", descriptor.Name)) + l = log.With(l, "job", descriptor.Name, "schedule", descriptor.RawSpec) + l.Log("msg", logging.Sprintf("job %s started", descriptor.Name)) err := innerRun(ctx) if err != nil { - logger.Log("msg", logging.Sprintf("job %s finished with error: %s", descriptor.Name, err)) + l.Log("msg", logging.Sprintf("job %s finished with error: %s", descriptor.Name, err)) return err } - logger.Log("msg", logging.Sprintf("job %s completed", descriptor.Name)) + l.Log("msg", logging.Sprintf("job %s completed", descriptor.Name)) return nil } } From eaa934062f7fd458fdc73369bfd067f217c2b419 Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 16:59:38 +0800 Subject: [PATCH 11/19] fix: race --- cron/options.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cron/options.go b/cron/options.go index 508dbb9c..de2671b1 100644 --- a/cron/options.go +++ b/cron/options.go @@ -37,11 +37,11 @@ func WithMetrics(metrics *CronJobMetrics) JobOptions { innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { start := time.Now() - metrics = metrics.Job(descriptor.Name).Schedule(descriptor.RawSpec) - defer metrics.Observe(time.Since(start)) + m := metrics.Job(descriptor.Name).Schedule(descriptor.RawSpec) + defer m.Observe(time.Since(start)) err := innerRun(ctx) if err != nil { - metrics.Fail() + m.Fail() return err } return nil From 2596422dc1b36d7cb9fee395d3d7c5bee86be0d2 Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 17:27:37 +0800 Subject: [PATCH 12/19] fix: race --- internal/stub/metrics.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/internal/stub/metrics.go b/internal/stub/metrics.go index 9e4ba209..93e4e048 100644 --- a/internal/stub/metrics.go +++ b/internal/stub/metrics.go @@ -1,6 +1,9 @@ package stub -import "github.com/go-kit/kit/metrics" +import ( + "github.com/go-kit/kit/metrics" + "sync" +) // LabelValues contains the set of labels and their corresponding values. type LabelValues []string @@ -17,56 +20,73 @@ func (l LabelValues) Label(name string) string { // Histogram is a stub implementation of the go-kit metrics.Histogram interface. type Histogram struct { + mu sync.Mutex LabelValues LabelValues ObservedValue float64 } // With returns a new Histogram with the given label values. func (h *Histogram) With(labelValues ...string) metrics.Histogram { + h.mu.Lock() + defer h.mu.Unlock() h.LabelValues = labelValues return h } // Observe records the given value. func (h *Histogram) Observe(value float64) { + h.mu.Lock() + defer h.mu.Unlock() h.ObservedValue = value } // Gauge is a stub implementation of the go-kit metrics.Gauge interface. type Gauge struct { + mu sync.Mutex LabelValues []string GaugeValue float64 } // With returns a new Gauge with the given label values. func (g *Gauge) With(labelValues ...string) metrics.Gauge { + g.mu.Lock() + defer g.mu.Unlock() g.LabelValues = labelValues return g } // Set sets the gauge value. func (g *Gauge) Set(value float64) { + g.mu.Lock() + defer g.mu.Unlock() g.GaugeValue = value } // Add adds the given value to the gauge. func (g *Gauge) Add(delta float64) { + g.mu.Lock() + defer g.mu.Unlock() g.GaugeValue = g.GaugeValue + delta } // Counter is a stub implementation of the go-kit metrics.Counter interface. type Counter struct { + mu sync.Mutex LabelValues []string CounterValue float64 } // With returns a new Counter with the given label values. func (c *Counter) With(labelValues ...string) metrics.Counter { + c.mu.Lock() + defer c.mu.Unlock() c.LabelValues = labelValues return c } // Add adds the given value to the counter. func (c *Counter) Add(delta float64) { + c.mu.Lock() + defer c.mu.Unlock() c.CounterValue = c.CounterValue + delta } From b5b4473f4815a59a964c3276016374e8079e5893 Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 17:42:00 +0800 Subject: [PATCH 13/19] fix: race --- cron/options_test.go | 8 ++------ internal/stub/metrics.go | 34 +++++++++++++++++----------------- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/cron/options_test.go b/cron/options_test.go index e872604d..c5a97b87 100644 --- a/cron/options_test.go +++ b/cron/options_test.go @@ -204,8 +204,8 @@ func TestJobOption(t *testing.T) { return nil }, func(t *testing.T) { - if entryCount < 4 { - t.Errorf("expect entry at least 4 times, got %d", entryCount) + if entryCount < 3 { + t.Errorf("expect entry at least 3 times, got %d", entryCount) } if concurrentAccess { t.Errorf("conncurrent access not allowed") @@ -220,14 +220,10 @@ func TestJobOption(t *testing.T) { TimeoutIfOverlap(), }, func(ctx context.Context) error { - entryCount++ <-ctx.Done() return nil }, func(t *testing.T) { - if entryCount < 3 { - t.Errorf("expect entry at least 4 times, got %d", entryCount) - } }, }, } { diff --git a/internal/stub/metrics.go b/internal/stub/metrics.go index 93e4e048..4df81ee3 100644 --- a/internal/stub/metrics.go +++ b/internal/stub/metrics.go @@ -20,73 +20,73 @@ func (l LabelValues) Label(name string) string { // Histogram is a stub implementation of the go-kit metrics.Histogram interface. type Histogram struct { - mu sync.Mutex + sync.Mutex LabelValues LabelValues ObservedValue float64 } // With returns a new Histogram with the given label values. func (h *Histogram) With(labelValues ...string) metrics.Histogram { - h.mu.Lock() - defer h.mu.Unlock() + h.Mutex.Lock() + defer h.Mutex.Unlock() h.LabelValues = labelValues return h } // Observe records the given value. func (h *Histogram) Observe(value float64) { - h.mu.Lock() - defer h.mu.Unlock() + h.Mutex.Lock() + defer h.Mutex.Unlock() h.ObservedValue = value } // Gauge is a stub implementation of the go-kit metrics.Gauge interface. type Gauge struct { - mu sync.Mutex + sync.Mutex LabelValues []string GaugeValue float64 } // With returns a new Gauge with the given label values. func (g *Gauge) With(labelValues ...string) metrics.Gauge { - g.mu.Lock() - defer g.mu.Unlock() + g.Mutex.Lock() + defer g.Mutex.Unlock() g.LabelValues = labelValues return g } // Set sets the gauge value. func (g *Gauge) Set(value float64) { - g.mu.Lock() - defer g.mu.Unlock() + g.Mutex.Lock() + defer g.Mutex.Unlock() g.GaugeValue = value } // Add adds the given value to the gauge. func (g *Gauge) Add(delta float64) { - g.mu.Lock() - defer g.mu.Unlock() + g.Mutex.Lock() + defer g.Mutex.Unlock() g.GaugeValue = g.GaugeValue + delta } // Counter is a stub implementation of the go-kit metrics.Counter interface. type Counter struct { - mu sync.Mutex + sync.Mutex LabelValues []string CounterValue float64 } // With returns a new Counter with the given label values. func (c *Counter) With(labelValues ...string) metrics.Counter { - c.mu.Lock() - defer c.mu.Unlock() + c.Mutex.Lock() + defer c.Mutex.Unlock() c.LabelValues = labelValues return c } // Add adds the given value to the counter. func (c *Counter) Add(delta float64) { - c.mu.Lock() - defer c.mu.Unlock() + c.Mutex.Lock() + defer c.Mutex.Unlock() c.CounterValue = c.CounterValue + delta } From 6967b419a8c3474bb18dc8d36c7e2720f145d969 Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 18:18:15 +0800 Subject: [PATCH 14/19] fix: rename JobOptions to JobOption --- cron/config.go | 2 +- cron/cron.go | 4 ++-- cron/options.go | 22 +++++++++++----------- cron/options_test.go | 22 +++++++++++----------- serve.go | 2 +- 5 files changed, 26 insertions(+), 26 deletions(-) diff --git a/cron/config.go b/cron/config.go index 2a612de2..690df18c 100644 --- a/cron/config.go +++ b/cron/config.go @@ -13,7 +13,7 @@ type Config struct { // Location is the timezone to use in parsing cron expressions. Location *time.Location // GlobalOptions are the job options that are applied to all jobs. - GlobalOptions []JobOptions + GlobalOptions []JobOption // EnableSeconds is whether to enable seconds in the cron expression. EnableSeconds bool } diff --git a/cron/cron.go b/cron/cron.go index d100c608..8b9ddef8 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -19,7 +19,7 @@ type Cron struct { parser cron.ScheduleParser lock *sync.Cond jobDescriptors jobDescriptors - globalMiddleware []JobOptions + globalMiddleware []JobOption location *time.Location nextID int quitWaiter sync.WaitGroup @@ -51,7 +51,7 @@ func New(config Config) *Cron { // Add adds a new job to the cron scheduler.A list of middleware can be supplied. // Note the error returned by the runner will be discarded. It is the user's // responsibility to handle the error via middleware. -func (c *Cron) Add(spec string, runner func(ctx context.Context) error, middleware ...JobOptions) (JobID, error) { +func (c *Cron) Add(spec string, runner func(ctx context.Context) error, middleware ...JobOption) (JobID, error) { schedule, err := c.parser.Parse(spec) if err != nil { return 0, err diff --git a/cron/options.go b/cron/options.go index de2671b1..cc46e72c 100644 --- a/cron/options.go +++ b/cron/options.go @@ -14,25 +14,25 @@ import ( "github.com/robfig/cron/v3" ) -// JobOptions is a middleware for cron jobs. -type JobOptions func(descriptors *JobDescriptor) +// JobOption is a middleware for cron jobs. +type JobOption func(descriptors *JobDescriptor) // WithName sets the name of the job. -func WithName(name string) JobOptions { +func WithName(name string) JobOption { return func(descriptor *JobDescriptor) { descriptor.Name = name } } // WithSchedule sets the cron schedule of the job. -func WithSchedule(schedule cron.Schedule) JobOptions { +func WithSchedule(schedule cron.Schedule) JobOption { return func(descriptor *JobDescriptor) { descriptor.Schedule = schedule } } // WithMetrics returns a new JobDescriptor that will report metrics. -func WithMetrics(metrics *CronJobMetrics) JobOptions { +func WithMetrics(metrics *CronJobMetrics) JobOption { return func(descriptor *JobDescriptor) { innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { @@ -50,7 +50,7 @@ func WithMetrics(metrics *CronJobMetrics) JobOptions { } // WithLogging returns a new Universal job that will log. -func WithLogging(logger log.Logger) JobOptions { +func WithLogging(logger log.Logger) JobOption { return func(descriptor *JobDescriptor) { innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { @@ -74,7 +74,7 @@ func WithLogging(logger log.Logger) JobOptions { } // WithTracing returns a new Universal job that will trace. -func WithTracing(tracer opentracing.Tracer) JobOptions { +func WithTracing(tracer opentracing.Tracer) JobOption { return func(descriptor *JobDescriptor) { innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { @@ -92,7 +92,7 @@ func WithTracing(tracer opentracing.Tracer) JobOptions { } // SkipIfOverlap returns a new JobDescriptor that will skip the job if it overlaps with another job. -func SkipIfOverlap() JobOptions { +func SkipIfOverlap() JobOption { ch := make(chan struct{}, 1) return func(descriptor *JobDescriptor) { innerRun := descriptor.Run @@ -111,7 +111,7 @@ func SkipIfOverlap() JobOptions { } // DelayIfOverlap returns a new JobDescriptor that will delay the job if it overlaps with another job. -func DelayIfOverlap() JobOptions { +func DelayIfOverlap() JobOption { ch := make(chan struct{}, 1) return func(descriptor *JobDescriptor) { innerRun := descriptor.Run @@ -126,7 +126,7 @@ func DelayIfOverlap() JobOptions { } // TimeoutIfOverlap returns a new JobDescriptor that will cancel the job's context if the next schedule is due. -func TimeoutIfOverlap() JobOptions { +func TimeoutIfOverlap() JobOption { return func(descriptor *JobDescriptor) { innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { @@ -141,7 +141,7 @@ func TimeoutIfOverlap() JobOptions { } // Recover returns a new JobDescriptor that will recover from panics. -func Recover(logger log.Logger) JobOptions { +func Recover(logger log.Logger) JobOption { return func(descriptor *JobDescriptor) { innerRun := descriptor.Run descriptor.Run = func(ctx context.Context) error { diff --git a/cron/options_test.go b/cron/options_test.go index c5a97b87..4c28667a 100644 --- a/cron/options_test.go +++ b/cron/options_test.go @@ -41,13 +41,13 @@ func TestJobOption(t *testing.T) { for _, ca := range []struct { name string - stacks []JobOptions + stacks []JobOption job func(context.Context) error asserts func(t *testing.T) }{ { "name and logging", - []JobOptions{ + []JobOption{ WithName("test"), WithLogging(logger), }, @@ -67,7 +67,7 @@ func TestJobOption(t *testing.T) { }, { "error and logging", - []JobOptions{ + []JobOption{ WithLogging(logger), }, func(ctx context.Context) error { @@ -86,7 +86,7 @@ func TestJobOption(t *testing.T) { }, { "metrics", - []JobOptions{ + []JobOption{ WithMetrics(metric), }, func(ctx context.Context) error { @@ -105,7 +105,7 @@ func TestJobOption(t *testing.T) { }, { "error and metrics", - []JobOptions{ + []JobOption{ WithMetrics(metric), }, func(ctx context.Context) error { @@ -124,7 +124,7 @@ func TestJobOption(t *testing.T) { }, { "tracing", - []JobOptions{ + []JobOption{ WithTracing(&tracer), }, func(ctx context.Context) error { @@ -139,7 +139,7 @@ func TestJobOption(t *testing.T) { }, { "error tracing", - []JobOptions{ + []JobOption{ WithTracing(&tracer), }, func(ctx context.Context) error { @@ -158,7 +158,7 @@ func TestJobOption(t *testing.T) { }, { "panic", - []JobOptions{ + []JobOption{ Recover(logger), }, func(ctx context.Context) error { @@ -173,7 +173,7 @@ func TestJobOption(t *testing.T) { }, { "skip if overlap", - []JobOptions{ + []JobOption{ SkipIfOverlap(), }, func(ctx context.Context) error { @@ -190,7 +190,7 @@ func TestJobOption(t *testing.T) { }, { "delay if overlap", - []JobOptions{ + []JobOption{ DelayIfOverlap(), }, func(ctx context.Context) error { @@ -216,7 +216,7 @@ func TestJobOption(t *testing.T) { }, { "timeout if overlap", - []JobOptions{ + []JobOption{ TimeoutIfOverlap(), }, func(ctx context.Context) error { diff --git a/serve.go b/serve.go index 7b1e0051..b33cb42f 100644 --- a/serve.go +++ b/serve.go @@ -144,7 +144,7 @@ func (s serveIn) cronServe(ctx context.Context, logger logging.LevelLogger) (fun } if mContainer, ok := s.Container.(interface{ ApplyCron(*cron.Cron) }); ok { if s.Cron == nil { - s.Cron = cron.New(cron.Config{GlobalOptions: []cron.JobOptions{cron.WithLogging(log.With(s.Logger, "tag", "cron"))}}) + s.Cron = cron.New(cron.Config{GlobalOptions: []cron.JobOption{cron.WithLogging(log.With(s.Logger, "tag", "cron"))}}) } mContainer.ApplyCron(s.Cron) if len(s.Cron.Descriptors()) > 0 { From 0c1770325773fa01c94cf19175df98eb4e933e25 Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 18:59:58 +0800 Subject: [PATCH 15/19] refactor: Reduce the API interface of Container --- c.go | 4 ++- contract/container.go | 13 ---------- serve.go | 60 +++++++++++++++++++++---------------------- 3 files changed, 32 insertions(+), 45 deletions(-) diff --git a/c.go b/c.go index 5d1e50ce..5963bacc 100644 --- a/c.go +++ b/c.go @@ -32,7 +32,7 @@ type C struct { Env contract.Env contract.ConfigAccessor logging.LevelLogger - contract.Container + *container.Container contract.Dispatcher di *dig.Container } @@ -330,6 +330,7 @@ func (c *C) ProvideEssentials() { Env contract.Env AppName contract.AppName Container contract.Container + ConcreteContainer *container.Container ConfigUnmarshaler contract.ConfigUnmarshaler ConfigAccessor contract.ConfigAccessor ConfigRouter contract.ConfigRouter @@ -346,6 +347,7 @@ func (c *C) ProvideEssentials() { Env: c.Env, AppName: c.AppName, Container: c.Container, + ConcreteContainer: c.Container, ConfigUnmarshaler: c.ConfigAccessor, ConfigAccessor: c.ConfigAccessor, Logger: c.LevelLogger, diff --git a/contract/container.go b/contract/container.go index 1a88533d..7c893fc9 100644 --- a/contract/container.go +++ b/contract/container.go @@ -1,19 +1,6 @@ package contract -import ( - "github.com/gorilla/mux" - "github.com/oklog/run" - "github.com/spf13/cobra" - "google.golang.org/grpc" -) - // Container holds modules. type Container interface { - ApplyRouter(router *mux.Router) - ApplyGRPCServer(server *grpc.Server) - ApplyRunGroup(g *run.Group) - ApplyRootCommand(command *cobra.Command) - Shutdown() Modules() []interface{} - AddModule(module interface{}) } diff --git a/serve.go b/serve.go index b33cb42f..77781e92 100644 --- a/serve.go +++ b/serve.go @@ -31,7 +31,7 @@ type serveIn struct { Dispatcher contract.Dispatcher Config contract.ConfigAccessor Logger log.Logger - Container contract.Container + Container *container.Container HTTPServer *http.Server `optional:"true"` GRPCServer *grpc.Server `optional:"true"` DeprecatedCron *deprecatedcron.Cron `optional:"true"` @@ -142,21 +142,20 @@ func (s serveIn) cronServe(ctx context.Context, logger logging.LevelLogger) (fun if s.Config.Bool("cron.disable") { return nil, nil, nil } - if mContainer, ok := s.Container.(interface{ ApplyCron(*cron.Cron) }); ok { - if s.Cron == nil { - s.Cron = cron.New(cron.Config{GlobalOptions: []cron.JobOption{cron.WithLogging(log.With(s.Logger, "tag", "cron"))}}) - } - mContainer.ApplyCron(s.Cron) - if len(s.Cron.Descriptors()) > 0 { - ctx, cancel := context.WithCancel(ctx) - return func() error { - logger.Infof("cron runner started") - return s.Cron.Run(ctx) - }, func(err error) { - cancel() - }, nil - } + if s.Cron == nil { + s.Cron = cron.New(cron.Config{GlobalOptions: []cron.JobOption{cron.WithLogging(log.With(s.Logger, "tag", "cron"))}}) } + s.Container.ApplyCron(s.Cron) + if len(s.Cron.Descriptors()) > 0 { + ctx, cancel := context.WithCancel(ctx) + return func() error { + logger.Infof("cron runner started") + return s.Cron.Run(ctx) + }, func(err error) { + cancel() + }, nil + } + return nil, nil, nil } @@ -164,23 +163,22 @@ func (s serveIn) cronServeDeprecated(ctx context.Context, logger logging.LevelLo if s.Config.Bool("cron.disable") { return nil, nil, nil } - if mContainer, ok := s.Container.(interface{ ApplyDeprecatedCron(*deprecatedcron.Cron) }); ok { - if s.DeprecatedCron == nil { - logger := log.With(s.Logger, "tag", "cron") - s.DeprecatedCron = deprecatedcron.New(deprecatedcron.WithLogger(cronopts.CronLogAdapter{Logging: logger})) - } - mContainer.ApplyDeprecatedCron(s.DeprecatedCron) - if len(s.DeprecatedCron.Entries()) > 0 { - return func() error { - logger.Infof("cron runner started") - logger.Warn("Directly using github.com/robfig/cron/v3 is deprecated. Please migrate to github.com/DoNewsCode/core/cron") - s.DeprecatedCron.Run() - return nil - }, func(err error) { - <-s.DeprecatedCron.Stop().Done() - }, nil - } + if s.DeprecatedCron == nil { + logger := log.With(s.Logger, "tag", "cron") + s.DeprecatedCron = deprecatedcron.New(deprecatedcron.WithLogger(cronopts.CronLogAdapter{Logging: logger})) } + s.Container.ApplyDeprecatedCron(s.DeprecatedCron) + if len(s.DeprecatedCron.Entries()) > 0 { + return func() error { + logger.Infof("cron runner started") + logger.Warn("Directly using github.com/robfig/cron/v3 is deprecated. Please migrate to github.com/DoNewsCode/core/cron") + s.DeprecatedCron.Run() + return nil + }, func(err error) { + <-s.DeprecatedCron.Stop().Done() + }, nil + } + return nil, nil, nil } From 823b7d1b68111909a1856d773533b65d06675aa6 Mon Sep 17 00:00:00 2001 From: reasno Date: Sat, 15 Jan 2022 23:13:06 +0800 Subject: [PATCH 16/19] refactor: Reduce the API interface of Container --- c.go | 25 ++++++- c_test.go | 16 +++++ container/container.go | 130 +----------------------------------- container/container_test.go | 57 +--------------- module_contract.go | 46 +++++++++++++ serve.go | 60 ++++++++++++++--- srvhttp/example_test.go | 7 +- 7 files changed, 145 insertions(+), 196 deletions(-) create mode 100644 module_contract.go diff --git a/c.go b/c.go index 5963bacc..cc9c15d2 100644 --- a/c.go +++ b/c.go @@ -9,6 +9,7 @@ package core import ( "context" "fmt" + "github.com/spf13/cobra" "reflect" "github.com/DoNewsCode/core/codec/yaml" @@ -330,7 +331,6 @@ func (c *C) ProvideEssentials() { Env contract.Env AppName contract.AppName Container contract.Container - ConcreteContainer *container.Container ConfigUnmarshaler contract.ConfigUnmarshaler ConfigAccessor contract.ConfigAccessor ConfigRouter contract.ConfigRouter @@ -347,7 +347,6 @@ func (c *C) ProvideEssentials() { Env: c.Env, AppName: c.AppName, Container: c.Container, - ConcreteContainer: c.Container, ConfigUnmarshaler: c.ConfigAccessor, ConfigAccessor: c.ConfigAccessor, Logger: c.LevelLogger, @@ -375,6 +374,17 @@ func (c *C) Serve(ctx context.Context) error { }) } +// Shutdown iterates through every CloserProvider registered in the container, +// and calls them in the reversed order of registration. +func (c *C) Shutdown() { + modules := c.Modules() + for i := range modules { + if closer, ok := modules[len(modules)-i-1].(CloserProvider); ok { + closer.ProvideCloser() + } + } +} + // AddModuleFunc add the module after Invoking its' constructor. Clean up // functions and errors are handled automatically. func (c *C) AddModuleFunc(constructor interface{}) { @@ -406,6 +416,17 @@ func (c *C) AddModuleFunc(constructor interface{}) { } } +// ApplyRootCommand iterates through every CommandProvider registered in the container, +// and introduce the root *cobra.Command to everyone. +func (c *C) ApplyRootCommand(command *cobra.Command) { + modules := c.Modules() + for i := range modules { + if p, ok := modules[i].(CommandProvider); ok { + p.ProvideCommand(command) + } + } +} + // Invoke runs the given function after instantiating its dependencies. Any // arguments that the function has are treated as its dependencies. The // dependencies are instantiated in an unspecified order along with any diff --git a/c_test.go b/c_test.go index 8dd47cf4..c27f7297 100644 --- a/c_test.go +++ b/c_test.go @@ -200,3 +200,19 @@ func TestC_cleanup(t *testing.T) { assert.True(t, dependencyCleanupCalled) assert.True(t, moduleCleanupCalled) } + +type closer func() + +func (f closer) ProvideCloser() { + f() +} + +func TestContainer_Shutdown(t *testing.T) { + seq := 0 + container := New() + container.AddModule(closer(func() { assert.Equal(t, 2, seq); seq = 1 })) + container.AddModule(closer(func() { assert.Equal(t, 3, seq); seq = 2 })) + container.AddModule(closer(func() { assert.Equal(t, 0, seq); seq = 3 })) + container.Shutdown() + assert.Equal(t, 1, seq) +} diff --git a/container/container.go b/container/container.go index 74322993..81ac5cf7 100644 --- a/container/container.go +++ b/container/container.go @@ -5,94 +5,13 @@ package container import ( "github.com/DoNewsCode/core/contract" - "github.com/DoNewsCode/core/cron" - "github.com/gorilla/mux" - "github.com/oklog/run" - deprecatedcron "github.com/robfig/cron/v3" - "github.com/spf13/cobra" - "google.golang.org/grpc" ) var _ contract.Container = (*Container)(nil) -// DeprecatedCronProvider provides cron jobs. -// Deprecated: CronProvider is deprecated. Use CronProvider instead -type DeprecatedCronProvider interface { - ProvideCron(crontab *deprecatedcron.Cron) -} - -type CronProvider interface { - ProvideCron(cron *cron.Cron) -} - -// CommandProvider provides cobra.Command. -type CommandProvider interface { - ProvideCommand(command *cobra.Command) -} - -// HTTPProvider provides http services. -type HTTPProvider interface { - ProvideHTTP(router *mux.Router) -} - -// GRPCProvider provides gRPC services. -type GRPCProvider interface { - ProvideGRPC(server *grpc.Server) -} - -// CloserProvider provides a shutdown function that will be called when service exits. -type CloserProvider interface { - ProvideCloser() -} - -// RunProvider provides a runnable actor. Use it to register any server-like -// actions. For example, kafka consumer can be started here. -type RunProvider interface { - ProvideRunGroup(group *run.Group) -} - // Container holds all modules registered. type Container struct { - httpProviders []func(router *mux.Router) - grpcProviders []func(server *grpc.Server) - closerProviders []func() - runProviders []func(g *run.Group) - modules []interface{} - cronProviders []func(cron *cron.Cron) - deprecatedCronProviders []func(cron *deprecatedcron.Cron) - commandProviders []func(command *cobra.Command) -} - -// ApplyRouter iterates through every HTTPProvider registered in the container, -// and introduce the router to everyone. -func (c *Container) ApplyRouter(router *mux.Router) { - for _, p := range c.httpProviders { - p(router) - } -} - -// ApplyGRPCServer iterates through every GRPCProvider registered in the container, -// and introduce a *grpc.Server to everyone. -func (c *Container) ApplyGRPCServer(server *grpc.Server) { - for _, p := range c.grpcProviders { - p(server) - } -} - -// Shutdown iterates through every CloserProvider registered in the container, -// and calls them in the reversed order of registration. -func (c *Container) Shutdown() { - for i := len(c.closerProviders) - 1; i >= 0; i-- { - c.closerProviders[i]() - } -} - -// ApplyRunGroup iterates through every RunProvider registered in the container, -// and introduce the *run.Group to everyone. -func (c *Container) ApplyRunGroup(g *run.Group) { - for _, p := range c.runProviders { - p(g) - } + modules []interface{} } // Modules returns all modules in the container. This method is used to scan for @@ -114,53 +33,6 @@ func (c *Container) Modules() []interface{} { return c.modules } -// ApplyDeprecatedCron iterates through every CronProvider registered in the container, -// and introduce the *deprecatedcron.Cron to everyone. -// -// Deprecated: migrate to ApplyCron. -func (c *Container) ApplyDeprecatedCron(crontab *deprecatedcron.Cron) { - for _, p := range c.deprecatedCronProviders { - p(crontab) - } -} - -// ApplyCron iterates through every CronProvider registered in the container, -// and introduce the *cron.Cron to everyone. -func (c *Container) ApplyCron(crontab *cron.Cron) { - for _, p := range c.cronProviders { - p(crontab) - } -} - -// ApplyRootCommand iterates through every CommandProvider registered in the container, -// and introduce the root *cobra.Command to everyone. -func (c *Container) ApplyRootCommand(command *cobra.Command) { - for _, p := range c.commandProviders { - p(command) - } -} - func (c *Container) AddModule(module interface{}) { - if p, ok := module.(HTTPProvider); ok { - c.httpProviders = append(c.httpProviders, p.ProvideHTTP) - } - if p, ok := module.(GRPCProvider); ok { - c.grpcProviders = append(c.grpcProviders, p.ProvideGRPC) - } - if p, ok := module.(DeprecatedCronProvider); ok { - c.deprecatedCronProviders = append(c.deprecatedCronProviders, p.ProvideCron) - } - if p, ok := module.(CronProvider); ok { - c.cronProviders = append(c.cronProviders, p.ProvideCron) - } - if p, ok := module.(RunProvider); ok { - c.runProviders = append(c.runProviders, p.ProvideRunGroup) - } - if p, ok := module.(CommandProvider); ok { - c.commandProviders = append(c.commandProviders, p.ProvideCommand) - } - if p, ok := module.(CloserProvider); ok { - c.closerProviders = append(c.closerProviders, p.ProvideCloser) - } c.modules = append(c.modules, module) } diff --git a/container/container_test.go b/container/container_test.go index aeaf12fb..7ea597b2 100644 --- a/container/container_test.go +++ b/container/container_test.go @@ -3,36 +3,9 @@ package container import ( "testing" - "github.com/DoNewsCode/core/cron" - "github.com/gorilla/mux" - "github.com/oklog/run" - "github.com/spf13/cobra" "github.com/stretchr/testify/assert" - "google.golang.org/grpc" ) -type mock struct{} - -func (m mock) ProvideRunGroup(group *run.Group) { - panic("implement me") -} - -func (m mock) ProvideGRPC(server *grpc.Server) { - panic("implement me") -} - -func (m mock) ProvideHTTP(router *mux.Router) { - panic("implement me") -} - -func (m mock) ProvideCron(crontab *cron.Cron) { - panic("implement me") -} - -func (m mock) ProvideCommand(command *cobra.Command) { - panic("implement me") -} - func TestContainer_AddModule(t *testing.T) { cases := []struct { name string @@ -43,19 +16,7 @@ func TestContainer_AddModule(t *testing.T) { "any", "foo", func(t *testing.T, container Container) { - assert.Contains(t, container.modules, "foo") - }, - }, - { - "mock", - mock{}, - func(t *testing.T, container Container) { - assert.Len(t, container.runProviders, 1) - assert.Len(t, container.httpProviders, 1) - assert.Len(t, container.grpcProviders, 1) - assert.Len(t, container.cronProviders, 1) - assert.Len(t, container.commandProviders, 1) - assert.Len(t, container.closerProviders, 0) + assert.Contains(t, container.Modules(), "foo") }, }, } @@ -70,19 +31,3 @@ func TestContainer_AddModule(t *testing.T) { }) } } - -type closer func() - -func (f closer) ProvideCloser() { - f() -} - -func TestContainer_Shutdown(t *testing.T) { - seq := 0 - container := Container{} - container.AddModule(closer(func() { assert.Equal(t, 2, seq); seq = 1 })) - container.AddModule(closer(func() { assert.Equal(t, 3, seq); seq = 2 })) - container.AddModule(closer(func() { assert.Equal(t, 0, seq); seq = 3 })) - container.Shutdown() - assert.Equal(t, 1, seq) -} diff --git a/module_contract.go b/module_contract.go new file mode 100644 index 00000000..3c9c3d50 --- /dev/null +++ b/module_contract.go @@ -0,0 +1,46 @@ +package core + +import ( + cron2 "github.com/DoNewsCode/core/cron" + "github.com/gorilla/mux" + "github.com/oklog/run" + "github.com/robfig/cron/v3" + "github.com/spf13/cobra" + "google.golang.org/grpc" +) + +// DeprecatedCronProvider provides cron jobs. +// Deprecated: CronProvider is deprecated. Use CronProvider instead +type DeprecatedCronProvider interface { + ProvideCron(crontab *cron.Cron) +} + +type CronProvider interface { + ProvideCron(cron *cron2.Cron) +} + +// CommandProvider provides cobra.Command. +type CommandProvider interface { + ProvideCommand(command *cobra.Command) +} + +// HTTPProvider provides http services. +type HTTPProvider interface { + ProvideHTTP(router *mux.Router) +} + +// GRPCProvider provides gRPC services. +type GRPCProvider interface { + ProvideGRPC(server *grpc.Server) +} + +// CloserProvider provides a shutdown function that will be called when service exits. +type CloserProvider interface { + ProvideCloser() +} + +// RunProvider provides a runnable actor. Use it to register any server-like +// actions. For example, kafka consumer can be started here. +type RunProvider interface { + ProvideRunGroup(group *run.Group) +} diff --git a/serve.go b/serve.go index 77781e92..570c0063 100644 --- a/serve.go +++ b/serve.go @@ -9,7 +9,6 @@ import ( "os/signal" "syscall" - "github.com/DoNewsCode/core/container" "github.com/DoNewsCode/core/contract" cron "github.com/DoNewsCode/core/cron" "github.com/DoNewsCode/core/deprecated_cronopts" @@ -31,7 +30,7 @@ type serveIn struct { Dispatcher contract.Dispatcher Config contract.ConfigAccessor Logger log.Logger - Container *container.Container + Container contract.Container HTTPServer *http.Server `optional:"true"` GRPCServer *grpc.Server `optional:"true"` DeprecatedCron *deprecatedcron.Cron `optional:"true"` @@ -44,7 +43,7 @@ func NewServeModule(in serveIn) serveModule { } } -var _ container.CommandProvider = (*serveModule)(nil) +var _ CommandProvider = (*serveModule)(nil) type serveModule struct { in serveIn @@ -65,7 +64,7 @@ func (s serveIn) httpServe(ctx context.Context, logger logging.LevelLogger) (fun s.HTTPServer = &http.Server{} } router := mux.NewRouter() - s.Container.ApplyRouter(router) + applyRouter(s.Container, router) router.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { tpl, _ := route.GetPathTemplate() @@ -106,7 +105,7 @@ func (s serveIn) grpcServe(ctx context.Context, logger logging.LevelLogger) (fun if s.GRPCServer == nil { s.GRPCServer = grpc.NewServer() } - s.Container.ApplyGRPCServer(s.GRPCServer) + applyGRPCServer(s.Container, s.GRPCServer) for module, info := range s.GRPCServer.GetServiceInfo() { for _, method := range info.Methods { @@ -145,7 +144,7 @@ func (s serveIn) cronServe(ctx context.Context, logger logging.LevelLogger) (fun if s.Cron == nil { s.Cron = cron.New(cron.Config{GlobalOptions: []cron.JobOption{cron.WithLogging(log.With(s.Logger, "tag", "cron"))}}) } - s.Container.ApplyCron(s.Cron) + applyCron(s.Container, s.Cron) if len(s.Cron.Descriptors()) > 0 { ctx, cancel := context.WithCancel(ctx) return func() error { @@ -167,7 +166,7 @@ func (s serveIn) cronServeDeprecated(ctx context.Context, logger logging.LevelLo logger := log.With(s.Logger, "tag", "cron") s.DeprecatedCron = deprecatedcron.New(deprecatedcron.WithLogger(cronopts.CronLogAdapter{Logging: logger})) } - s.Container.ApplyDeprecatedCron(s.DeprecatedCron) + applyDeprecatedCron(s.Container, s.DeprecatedCron) if len(s.DeprecatedCron.Entries()) > 0 { return func() error { logger.Infof("cron runner started") @@ -236,7 +235,7 @@ func newServeCmd(s serveIn) *cobra.Command { } // Additional run groups - s.Container.ApplyRunGroup(&g) + applyRunGroup(s.Container, &g) if err := g.Run(); err != nil { return err @@ -248,3 +247,48 @@ func newServeCmd(s serveIn) *cobra.Command { } return serveCmd } + +func applyRouter(ctn contract.Container, router *mux.Router) { + modules := ctn.Modules() + for i := range modules { + if p, ok := modules[i].(HTTPProvider); ok { + p.ProvideHTTP(router) + } + } +} + +func applyGRPCServer(ctn contract.Container, server *grpc.Server) { + modules := ctn.Modules() + for i := range modules { + if p, ok := modules[i].(GRPCProvider); ok { + p.ProvideGRPC(server) + } + } +} + +func applyRunGroup(ctn contract.Container, group *run.Group) { + modules := ctn.Modules() + for i := range modules { + if p, ok := modules[i].(RunProvider); ok { + p.ProvideRunGroup(group) + } + } +} + +func applyCron(ctn contract.Container, cron *cron.Cron) { + modules := ctn.Modules() + for i := range modules { + if p, ok := modules[i].(CronProvider); ok { + p.ProvideCron(cron) + } + } +} + +func applyDeprecatedCron(ctn contract.Container, crontab *deprecatedcron.Cron) { + modules := ctn.Modules() + for i := range modules { + if p, ok := modules[i].(DeprecatedCronProvider); ok { + p.ProvideCron(crontab) + } + } +} diff --git a/srvhttp/example_test.go b/srvhttp/example_test.go index cb7f0995..999c5bbd 100644 --- a/srvhttp/example_test.go +++ b/srvhttp/example_test.go @@ -23,7 +23,12 @@ func Example_modules() { c.AddModule(srvhttp.DebugModule{}) router := mux.NewRouter() - c.ApplyRouter(router) + modules := c.Modules() + for i := range modules { + if m, ok := modules[i].(core.HTTPProvider); ok { + m.ProvideHTTP(router) + } + } http.ListenAndServe(":8080", router) } From 71da516efea92ebed78bfc6c82d9af94104efa36 Mon Sep 17 00:00:00 2001 From: reasno Date: Sun, 16 Jan 2022 08:56:50 +0800 Subject: [PATCH 17/19] refactor: Reduce the API interface of Container --- module_contract.go | 1 + 1 file changed, 1 insertion(+) diff --git a/module_contract.go b/module_contract.go index 3c9c3d50..357e9bb1 100644 --- a/module_contract.go +++ b/module_contract.go @@ -15,6 +15,7 @@ type DeprecatedCronProvider interface { ProvideCron(crontab *cron.Cron) } +// CronProvider provides cron jobs. type CronProvider interface { ProvideCron(cron *cron2.Cron) } From cef878ed47c3601e4f8bdc2ebdc960e2afc7184b Mon Sep 17 00:00:00 2001 From: reasno Date: Sun, 16 Jan 2022 09:00:04 +0800 Subject: [PATCH 18/19] refactor: Reduce the API interface of Container --- otkafka/integration_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/otkafka/integration_test.go b/otkafka/integration_test.go index 98256cad..8e5a1e51 100644 --- a/otkafka/integration_test.go +++ b/otkafka/integration_test.go @@ -333,7 +333,11 @@ func TestModule_hotReload(t *testing.T) { c.AddModuleFunc(config.New) var group run.Group - c.ApplyRunGroup(&group) + for _, m := range c.Modules() { + if p, ok := m.(core.RunProvider); ok { + p.ProvideRunGroup(&group) + } + } group.Add(func() error { <-ctx.Done() return ctx.Err() From 160ea54a17653f21c10d98d3980e77fa59f3e448 Mon Sep 17 00:00:00 2001 From: Trock Date: Mon, 17 Jan 2022 10:56:17 +0800 Subject: [PATCH 19/19] fix: minor adjustments of docs,imports --- c.go | 6 +++--- cron/cron.go | 12 ++++++------ cron/example_test.go | 3 ++- cron/metrics.go | 5 +++-- cron/options.go | 2 +- cron/options_test.go | 7 ++++--- internal/stub/metrics.go | 3 ++- observability/metrics.go | 6 +++--- serve_test.go | 8 ++++---- 9 files changed, 28 insertions(+), 24 deletions(-) diff --git a/c.go b/c.go index cc9c15d2..cf13d37b 100644 --- a/c.go +++ b/c.go @@ -9,7 +9,6 @@ package core import ( "context" "fmt" - "github.com/spf13/cobra" "reflect" "github.com/DoNewsCode/core/codec/yaml" @@ -22,6 +21,7 @@ import ( "github.com/go-kit/log" "github.com/knadh/koanf/providers/confmap" "github.com/knadh/koanf/providers/file" + "github.com/spf13/cobra" "go.uber.org/dig" ) @@ -225,7 +225,7 @@ func (c *C) AddModule(modules ...interface{}) { } } -// Provide adds a dependencies provider to the core. Note the dependency provider +// Provide adds dependencies provider to the core. Note the dependency provider // must be a function in the form of: // // func(foo Foo) Bar @@ -385,7 +385,7 @@ func (c *C) Shutdown() { } } -// AddModuleFunc add the module after Invoking its' constructor. Clean up +// AddModuleFunc add the module after Invoking its constructor. Clean up // functions and errors are handled automatically. func (c *C) AddModuleFunc(constructor interface{}) { c.provide(constructor) diff --git a/cron/cron.go b/cron/cron.go index 8b9ddef8..8d8249f1 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -148,20 +148,20 @@ func (c *Cron) Run(ctx context.Context) error { if c.jobDescriptors[0].next.After(now) || c.jobDescriptors[0].next.IsZero() { break } - descriptor := heap.Pop(&c.jobDescriptors) + descriptor := heap.Pop(&c.jobDescriptors).(*JobDescriptor) - descriptor.(*JobDescriptor).prev = descriptor.(*JobDescriptor).next - descriptor.(*JobDescriptor).next = descriptor.(*JobDescriptor).Schedule.Next(now) + descriptor.prev = descriptor.next + descriptor.next = descriptor.Schedule.Next(now) heap.Push(&c.jobDescriptors, descriptor) var innerCtx context.Context - innerCtx = context.WithValue(ctx, prevContextKey, descriptor.(*JobDescriptor).prev) - innerCtx = context.WithValue(innerCtx, nextContextKey, descriptor.(*JobDescriptor).next) + innerCtx = context.WithValue(ctx, prevContextKey, descriptor.prev) + innerCtx = context.WithValue(innerCtx, nextContextKey, descriptor.next) c.quitWaiter.Add(1) go func() { defer c.quitWaiter.Done() - descriptor.(*JobDescriptor).Run(innerCtx) + descriptor.Run(innerCtx) }() } c.lock.L.Unlock() diff --git a/cron/example_test.go b/cron/example_test.go index 615395e4..257b4ad2 100644 --- a/cron/example_test.go +++ b/cron/example_test.go @@ -3,11 +3,12 @@ package cron_test import ( "context" "fmt" + "time" + "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/cron" "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/observability" - "time" ) type CronModule struct { diff --git a/cron/metrics.go b/cron/metrics.go index 794ff027..0bf1831d 100644 --- a/cron/metrics.go +++ b/cron/metrics.go @@ -1,8 +1,9 @@ package cron import ( - "github.com/go-kit/kit/metrics" "time" + + "github.com/go-kit/kit/metrics" ) // CronJobMetrics collects metrics for cron jobs. @@ -10,7 +11,7 @@ type CronJobMetrics struct { cronJobDurationSeconds metrics.Histogram cronJobFailCount metrics.Counter - // labels that has been set + // labels that have been set module string job string schedule string diff --git a/cron/options.go b/cron/options.go index cc46e72c..8f11be3a 100644 --- a/cron/options.go +++ b/cron/options.go @@ -83,7 +83,7 @@ func WithTracing(tracer opentracing.Tracer) JobOption { span.SetTag("schedule", descriptor.RawSpec) err := innerRun(ctx) if err != nil { - ext.Error.Set(span, true) + ext.LogError(span, err) return err } return nil diff --git a/cron/options_test.go b/cron/options_test.go index 4c28667a..cf7fad39 100644 --- a/cron/options_test.go +++ b/cron/options_test.go @@ -4,13 +4,14 @@ import ( "bytes" "context" "errors" + "strings" + "testing" + "time" + "github.com/DoNewsCode/core/internal/stub" "github.com/go-kit/log" "github.com/opentracing/opentracing-go/mocktracer" "github.com/robfig/cron/v3" - "strings" - "testing" - "time" ) type mockScheduler func(now time.Time) time.Time // mockScheduler is a function that returns the next time to run diff --git a/internal/stub/metrics.go b/internal/stub/metrics.go index 4df81ee3..5729bb4c 100644 --- a/internal/stub/metrics.go +++ b/internal/stub/metrics.go @@ -1,8 +1,9 @@ package stub import ( - "github.com/go-kit/kit/metrics" "sync" + + "github.com/go-kit/kit/metrics" ) // LabelValues contains the set of labels and their corresponding values. diff --git a/observability/metrics.go b/observability/metrics.go index 15242c68..6649b483 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -54,12 +54,12 @@ func ProvideGRPCRequestDurationSeconds(in MetricsIn) *srvgrpc.RequestDurationSec return srvgrpc.NewRequestDurationSeconds(prometheus.NewHistogram(grpc)) } -// ProvideCronJobMetrics returns a *deprecated_cronopts.CronJobMetrics that is designed to +// ProvideCronJobMetrics returns a *cron.CronJobMetrics that is designed to // measure cron job metrics. The returned metrics can be used like this: -// metrics := deprecated_cronopts.NewCronJobMetrics(...) +// metrics := cron.NewCronJobMetrics(...) // job := cron.NewChain( // cron.Recover(logger), -// deprecated_cronopts.Measure(metrics), +// cron.Measure(metrics), // ).Then(job) func ProvideCronJobMetrics(in MetricsIn) *cron.CronJobMetrics { histogram := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{ diff --git a/serve_test.go b/serve_test.go index b03dfe43..17ee9952 100644 --- a/serve_test.go +++ b/serve_test.go @@ -4,19 +4,19 @@ import ( "bytes" "context" "errors" - "github.com/DoNewsCode/core/cron" - "github.com/DoNewsCode/core/di" - "github.com/DoNewsCode/core/observability" - deprecatedcron "github.com/robfig/cron/v3" "os" "runtime" "sync/atomic" "testing" "time" + "github.com/DoNewsCode/core/cron" + "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/logging" + "github.com/DoNewsCode/core/observability" "github.com/go-kit/log" "github.com/oklog/run" + deprecatedcron "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" )