Skip to content

Commit

Permalink
fix #797 and refactor global filter (#798)
Browse files Browse the repository at this point in the history
* fix #797 and refactor global filter

* add check for both nil
  • Loading branch information
localvar authored Sep 21, 2022
1 parent 0aa6668 commit e7d68e6
Showing 1 changed file with 74 additions and 103 deletions.
177 changes: 74 additions & 103 deletions pkg/object/globalfilter/globalfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,75 +49,45 @@ type (

// Spec describes the GlobalFilter.
Spec struct {
BeforePipeline pipeline.Spec `json:"beforePipeline" jsonschema:"omitempty"`
AfterPipeline pipeline.Spec `json:"afterPipeline" jsonschema:"omitempty"`
BeforePipeline *pipeline.Spec `json:"beforePipeline" jsonschema:"omitempty"`
AfterPipeline *pipeline.Spec `json:"afterPipeline" jsonschema:"omitempty"`
}

// pipelineSpec defines pipeline spec to create an pipeline entity.
pipelineSpec struct {
Kind string `json:"kind" jsonschema:"omitempty"`
Name string `json:"name" jsonschema:"omitempty"`
pipeline.Spec `json:",inline"`
Kind string `json:"kind" jsonschema:"omitempty"`
Name string `json:"name" jsonschema:"omitempty"`
*pipeline.Spec `json:",inline"`
}
)

func init() {
supervisor.Register(&GlobalFilter{})
}

// CreateAndUpdateBeforePipelineForSpec creates beforPipeline if the spec is nil, otherwise it updates by the spec.
func (gf *GlobalFilter) CreateAndUpdateBeforePipelineForSpec(spec *Spec, previousGeneration *pipeline.Pipeline) error {
beforePipeline := &pipelineSpec{
Kind: pipeline.Kind,
Name: "before",
Spec: spec.BeforePipeline,
}
pipeline, err := gf.CreateAndUpdatePipeline(beforePipeline, previousGeneration)
if err != nil {
return err
}
if pipeline == nil {
return fmt.Errorf("before pipeline is nil, spec: %v", beforePipeline)
}
gf.beforePipeline.Store(pipeline)
return nil
}
// Validate validates Spec.
func (s *Spec) Validate() (err error) {
bothNil := true

// CreateAndUpdateAfterPipelineForSpec creates afterPipeline if the spec is nil, otherwise it updates with the spec.
func (gf *GlobalFilter) CreateAndUpdateAfterPipelineForSpec(spec *Spec, previousGeneration *pipeline.Pipeline) error {
afterPipeline := &pipelineSpec{
Kind: pipeline.Kind,
Name: "after",
Spec: spec.AfterPipeline,
}
pipeline, err := gf.CreateAndUpdatePipeline(afterPipeline, previousGeneration)
if err != nil {
return err
}
if pipeline == nil {
return fmt.Errorf("after pipeline is nil, spec: %v", afterPipeline)
if s.BeforePipeline != nil {
bothNil = false
if err := s.BeforePipeline.Validate(); err != nil {
return fmt.Errorf("before pipeline is invalid: %v", err)
}
}
gf.afterPipeline.Store(pipeline)
return nil
}

// CreateAndUpdatePipeline creates and updates GlobalFilter's pipelines.
func (gf *GlobalFilter) CreateAndUpdatePipeline(spec *pipelineSpec, previousGeneration *pipeline.Pipeline) (*pipeline.Pipeline, error) {
// init jsonConfig
jsonConfig := codectool.MustMarshalJSON(spec)
specs, err := supervisor.NewSpec(string(jsonConfig))
if err != nil {
return nil, err
if s.AfterPipeline != nil {
bothNil = false
if err := s.AfterPipeline.Validate(); err != nil {
return fmt.Errorf("after pipeline is invalid: %v", err)
}
}

// init or update pipeline
pipeline := new(pipeline.Pipeline)
if previousGeneration != nil {
pipeline.Inherit(specs, previousGeneration, nil)
} else {
pipeline.Init(specs, nil)
if bothNil {
return fmt.Errorf("both beforePipeline and afterPipeline are nil")
}
return pipeline, nil

return nil
}

// Category returns the object category of itself.
Expand Down Expand Up @@ -155,68 +125,69 @@ func (gf *GlobalFilter) Inherit(superSpec *supervisor.Spec, previousGeneration s
gf.reload(previousGeneration.(*GlobalFilter))
}

// Handle `beforePipeline` and `afterPipeline` before and after the handler is executed.
func (gf *GlobalFilter) Handle(ctx *context.Context, handler context.Handler) {
p, ok := handler.(*pipeline.Pipeline)
if !ok {
panic("handler is not a pipeline")
}

var before, after *pipeline.Pipeline
if v := gf.beforePipeline.Load(); v != nil {
before, _ = v.(*pipeline.Pipeline)
}
if v := gf.afterPipeline.Load(); v != nil {
after, _ = v.(*pipeline.Pipeline)
}

p.HandleWithBeforeAfter(ctx, before, after)
}

// Close closes GlobalFilter itself.
func (gf *GlobalFilter) Close() {
}

// Validate validates Spec.
func (s *Spec) Validate() (err error) {
err = s.BeforePipeline.Validate()
if err != nil {
return fmt.Errorf("before pipeline is invalid: %v", err)
}
err = s.AfterPipeline.Validate()
if err != nil {
return fmt.Errorf("after pipeline is invalid: %v", err)
}

return nil
}

func (gf *GlobalFilter) reload(previousGeneration *GlobalFilter) {
var beforePreviousPipeline, afterPreviousPipeline *pipeline.Pipeline
// create and update beforePipeline entity
if len(gf.spec.BeforePipeline.Flow) != 0 {
// create and update beforePipeline
if gf.spec.BeforePipeline != nil {
var previous *pipeline.Pipeline
if previousGeneration != nil {
previous := previousGeneration.beforePipeline.Load()
if previous != nil {
beforePreviousPipeline = previous.(*pipeline.Pipeline)
}
previous, _ = previousGeneration.beforePipeline.Load().(*pipeline.Pipeline)
}
err := gf.CreateAndUpdateBeforePipelineForSpec(gf.spec, beforePreviousPipeline)
p, err := gf.createPipeline("before", gf.spec.BeforePipeline, previous)
if err != nil {
panic(fmt.Errorf("create before pipeline failed: %v", err))
}
gf.beforePipeline.Store(p)
}
// create and update afterPipeline entity
if len(gf.spec.AfterPipeline.Flow) != 0 {

// create and update afterPipeline
if gf.spec.AfterPipeline != nil {
var previous *pipeline.Pipeline
if previousGeneration != nil {
previous := previousGeneration.afterPipeline.Load()
if previous != nil {
afterPreviousPipeline = previous.(*pipeline.Pipeline)
}
previous, _ = previousGeneration.afterPipeline.Load().(*pipeline.Pipeline)
}
err := gf.CreateAndUpdateAfterPipelineForSpec(gf.spec, afterPreviousPipeline)
p, err := gf.createPipeline("after", gf.spec.AfterPipeline, previous)
if err != nil {
panic(fmt.Errorf("create after pipeline failed: %v", err))
}
gf.afterPipeline.Store(p)
}
}

func (gf *GlobalFilter) createPipeline(name string, spec *pipeline.Spec, previousGeneration *pipeline.Pipeline) (*pipeline.Pipeline, error) {
jsonSpec := codectool.MustMarshalJSON(&pipelineSpec{
Kind: pipeline.Kind,
Name: name,
Spec: spec,
})

fullSpec, err := supervisor.NewSpec(string(jsonSpec))
if err != nil {
return nil, err
}

// init or update pipeline
p := &pipeline.Pipeline{}
if previousGeneration != nil {
p.Inherit(fullSpec, previousGeneration, nil)
} else {
p.Init(fullSpec, nil)
}

return p, nil
}

// Handle `beforePipeline` and `afterPipeline` before and after the handler is executed.
func (gf *GlobalFilter) Handle(ctx *context.Context, handler context.Handler) {
p, ok := handler.(*pipeline.Pipeline)
if !ok {
panic("handler is not a pipeline")
}

before, _ := gf.beforePipeline.Load().(*pipeline.Pipeline)
after, _ := gf.afterPipeline.Load().(*pipeline.Pipeline)
p.HandleWithBeforeAfter(ctx, before, after)
}

// Close closes GlobalFilter itself.
func (gf *GlobalFilter) Close() {
}

0 comments on commit e7d68e6

Please sign in to comment.