Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beta jobs #610

Merged
merged 29 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

Unreleased changes are available as `avenga/couper:edge` container.

* **Added**
* [`beta_job`](https://docs.couper.io/configuration/block/job) block to describe one or more job `definitions` for simple recurring http tasks ([#610](https://github.com/avenga/couper/pull/610))

* **Changed**
* Use nested `jwt_signing_profile` block in [`oauth2` block](https://docs.couper.io/configuration/block/oauth2) for `grant_type` `"urn:ietf:params:oauth:grant-type:jwt-bearer"` in absence of `assertion` attribute ([#619](https://github.com/avenga/couper/pull/619))

Expand Down
2 changes: 1 addition & 1 deletion command/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestNewRun(t *testing.T) {
subT.Errorf("expected OK, got: %d", res.StatusCode)
}

uid := hook.LastEntry().Data["uid"].(string)
uid, _ := hook.LastEntry().Data["uid"].(string)
xidLen := len(xid.New().String())
if result.RequestIDFormat == "uuid4" {
if len(uid) <= xidLen {
Expand Down
20 changes: 20 additions & 0 deletions config/configload/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path"
"path/filepath"
"strings"
"time"

"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/gohcl"
Expand Down Expand Up @@ -424,6 +425,25 @@ func LoadConfig(body *hclsyntax.Body) (*config.Couper, error) {
helper.config.Servers = append(helper.config.Servers, serverConfig)
}

for _, job := range helper.config.Definitions.Job {
_, err = time.ParseDuration(job.Interval)
if err != nil {
return nil, err
}

endpointConf := &config.Endpoint{
Remain: job.Remain,
Requests: job.Requests,
}

err = refineEndpoints(helper, config.Endpoints{endpointConf}, false)
if err != nil {
return nil, err
}

job.Endpoint = endpointConf
}

if len(helper.config.Servers) == 0 {
return nil, fmt.Errorf("configuration error: missing 'server' block")
}
Expand Down
1 change: 1 addition & 0 deletions config/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package config
type Definitions struct {
Backend []*Backend `hcl:"backend,block"`
BasicAuth []*BasicAuth `hcl:"basic_auth,block"`
Job []*Job `hcl:"beta_job,block"`
JWT []*JWT `hcl:"jwt,block"`
JWTSigningProfile []*JWTSigningProfile `hcl:"jwt_signing_profile,block"`
SAML []*SAML `hcl:"saml,block"`
Expand Down
1 change: 1 addition & 0 deletions config/generate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func main() {
&config.Health{},
&config.JWTSigningProfile{},
&config.JWT{},
&config.Job{},
&config.OAuth2AC{},
&config.OAuth2ReqAuth{},
&config.OIDC{},
Expand Down
44 changes: 44 additions & 0 deletions config/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package config

import (
"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/gohcl"

"github.com/avenga/couper/config/meta"
)

var (
_ Inline = &Job{}
)

// Job represents the <Job> object.
type Job struct {
Interval string `hcl:"interval" docs:"Execution interval" type:"duration"`
Name string `hcl:"name,label"`
Remain hcl.Body `hcl:",remain"`
Requests Requests `hcl:"request,block"`

// Internally used
Endpoint *Endpoint
}

// Inline implements the <Inline> interface.
func (j Job) Inline() interface{} {
type Inline struct {
meta.LogFieldsAttribute
}

return &Inline{}
}

// Schema implements the <Inline> interface.
func (j Job) Schema(inline bool) *hcl.BodySchema {
if !inline {
schema, _ := gohcl.ImpliedBodySchema(j)
return schema
}

schema, _ := gohcl.ImpliedBodySchema(j.Inline())

return meta.MergeSchemas(schema, meta.LogFieldsAttributeSchema)
}
22 changes: 22 additions & 0 deletions config/runtime/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/avenga/couper/config/reader"
"github.com/avenga/couper/config/request"
"github.com/avenga/couper/config/runtime/server"
"github.com/avenga/couper/definitions"
"github.com/avenga/couper/errors"
"github.com/avenga/couper/eval"
"github.com/avenga/couper/handler"
Expand Down Expand Up @@ -111,6 +112,27 @@ func NewServerConfiguration(conf *config.Couper, log *logrus.Entry, memStore *ca
return nil, err
}
}

jobs := make(definitions.Jobs, 0)
for _, job := range conf.Definitions.Job {
serverOptions := &server.Options{
ServerErrTpl: errors.DefaultJSON,
}

endpointOptions, err := NewEndpointOptions(confCtx, job.Endpoint, nil, serverOptions, log, conf, memStore)
if err != nil {
return nil, err
}

epHandler := handler.NewEndpoint(endpointOptions, log, nil)

j, err := definitions.NewJob(job, epHandler, conf.Settings)
if err != nil {
return nil, err
}
jobs = append(jobs, j)
}
jobs.Run(conf.Context, log)
}

for _, srvConf := range conf.Servers {
Expand Down
133 changes: 133 additions & 0 deletions definitions/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package definitions

import (
"context"
"fmt"
"net/http"
"time"

"github.com/hashicorp/hcl/v2"
"github.com/sirupsen/logrus"

"github.com/avenga/couper/config"
"github.com/avenga/couper/config/request"
"github.com/avenga/couper/eval"
"github.com/avenga/couper/handler/middleware"
"github.com/avenga/couper/logging"
"github.com/avenga/couper/server/writer"
"github.com/avenga/couper/utils"
)

type Job struct {
conf *config.Job
handler http.Handler
interval time.Duration
settings *config.Settings
}

type Jobs []*Job

func (j Jobs) Run(ctx context.Context, log *logrus.Entry) {
if len(j) == 0 {
return
}

logEntry := log.WithContext(ctx)
logEntry.Data["type"] = "job"

for _, job := range j {
go job.Run(ctx, logEntry)
}
}

func NewJob(j *config.Job, h http.Handler, settings *config.Settings) (*Job, error) {
interval, err := time.ParseDuration(j.Interval)
if err != nil {
return nil, err
}

if interval == 0 {
return nil, fmt.Errorf("job: %s: interval must be a positive number", j.Name)
}

return &Job{
conf: j,
handler: h,
interval: interval,
settings: settings,
}, nil
}

func (j *Job) Run(ctx context.Context, logEntry *logrus.Entry) {
req, _ := http.NewRequest(http.MethodGet, "", nil)
req.Header.Set("User-Agent", "Couper / "+utils.VersionName+" job-"+j.conf.Name)

uidFn := middleware.NewUIDFunc(j.settings.RequestIDBackendHeader)

t := time.NewTicker(time.Millisecond * 50)
defer t.Stop()

firstRun := true

clh := middleware.NewCustomLogsHandler([]hcl.Body{j.conf.Remain}, j.handler, j.conf.Name)

for {
select {
case <-ctx.Done():
logEntry.WithFields(logrus.Fields{
"name": j.conf.Name,
}).Errorf("stopping: %v", ctx.Err())
return
case <-t.C:
uid := uidFn()

outReq := req.Clone(context.WithValue(ctx, request.UID, uid))

evalCtx := eval.ContextFromRequest(outReq).WithClientRequest(outReq) // setup syncMap, upstream custom logs
delete(evalCtx.HCLContext().Variables, eval.ClientRequest) // this is the noop req from above, not helpful

outCtx := context.WithValue(evalCtx, request.LogEntry, logEntry)
outCtx = context.WithValue(outCtx, request.LogCustomAccess, []hcl.Body{j.conf.Remain}) // local custom logs
outReq = outReq.WithContext(outCtx)

n := time.Now()
w := writer.NewResponseWriter(&noopResponseWriter{}, "")
clh.ServeHTTP(w, outReq)
logEntry.
WithFields(logrus.Fields{
"name": j.conf.Name,
"timings": logging.Fields{
"total": logging.RoundMS(time.Since(n)),
"interval": logging.RoundMS(j.interval),
},
"uid": uid,
}).WithContext(outCtx).
WithTime(n).
Info()

if firstRun {
t.Reset(j.interval)
firstRun = false
}
}
}
}

var _ http.ResponseWriter = &noopResponseWriter{}

type noopResponseWriter struct {
header http.Header
}

func (n noopResponseWriter) Header() http.Header {
if n.header == nil {
n.header = make(http.Header)
}
return n.header
}

func (n noopResponseWriter) Write(bytes []byte) (int, error) {
return len(bytes), nil
}

func (n noopResponseWriter) WriteHeader(_ int) {}
Loading