diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index f189ba9756..b26d382070 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -210,3 +210,21 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml)) return nil } + +// UpsertLogParsingProcessors updates the agent with log parsing processors +func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []string) error { + if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { + return fmt.Errorf("agent updater is busy") + } + defer atomic.StoreUint32(&m.lock, 0) + + // send the changes to opamp. + configHash, err := opamp.UpsertLogsParsingProcessor(context.Background(), config, names, m.OnConfigUpdate) + if err != nil { + zap.S().Errorf("failed to call agent config update for log parsing processor:", err) + return err + } + + m.updateDeployStatus(ctx, ElementTypeLogPipelines, version, string(DeployInitiated), "Deployment started", configHash, string(rawPipelineData)) + return nil +} diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go new file mode 100644 index 0000000000..36f4a1473b --- /dev/null +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -0,0 +1,223 @@ +package opamp + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "strings" + "sync" + + "github.com/knadh/koanf/parsers/yaml" + "github.com/open-telemetry/opamp-go/protobufs" + model "go.signoz.io/signoz/pkg/query-service/app/opamp/model" + "go.signoz.io/signoz/pkg/query-service/constants" + "go.uber.org/zap" +) + +var lockLogsPipelineSpec sync.RWMutex + +func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []string, callback func(string, string, error)) (string, error) { + confHash := "" + if opAmpServer == nil { + return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment") + } + + agents := opAmpServer.agents.GetAllAgents() + if len(agents) == 0 { + return confHash, fmt.Errorf("no agents available at the moment") + } + + for _, agent := range agents { + config := agent.EffectiveConfig + c, err := yaml.Parser().Unmarshal([]byte(config)) + if err != nil { + return confHash, err + } + + buildLogParsingProcessors(c, parsingProcessors) + + p, err := getOtelPipelinFromConfig(c) + if err != nil { + return confHash, err + } + if p.Pipelines.Logs == nil { + return confHash, fmt.Errorf("logs pipeline doesn't exist") + } + + // build the new processor list + updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames) + p.Pipelines.Logs.Processors = updatedProcessorList + + // add the new processor to the data ( no checks required as the keys will exists) + c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs + + updatedConf, err := yaml.Parser().Marshal(c) + if err != nil { + return confHash, err + } + + // zap.S().Infof("sending new config", string(updatedConf)) + hash := sha256.New() + _, err = hash.Write(updatedConf) + if err != nil { + return confHash, err + } + agent.EffectiveConfig = string(updatedConf) + err = agent.Upsert() + if err != nil { + return confHash, err + } + + agent.SendToAgent(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "collector.yaml": { + Body: updatedConf, + ContentType: "application/x-yaml", + }, + }, + }, + ConfigHash: hash.Sum(nil), + }, + }) + + if confHash == "" { + confHash = string(hash.Sum(nil)) + model.ListenToConfigUpdate(agent.ID, confHash, callback) + } + } + + return confHash, nil +} + +// check if the processors already exist +// if yes then update the processor. +// if something doesn't exists then remove it. +func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error { + agentProcessors := agentConf["processors"].(map[string]interface{}) + exists := map[string]struct{}{} + for key, params := range parsingProcessors { + agentProcessors[key] = params + exists[key] = struct{}{} + } + // remove the old unwanted processors + for k := range agentProcessors { + if _, ok := exists[k]; !ok && strings.HasPrefix(k, constants.LogsPPLPfx) { + delete(agentProcessors, k) + } + } + agentConf["processors"] = agentProcessors + return nil +} + +type otelPipeline struct { + Pipelines struct { + Logs *struct { + Exporters []string `json:"exporters" yaml:"exporters"` + Processors []string `json:"processors" yaml:"processors"` + Receivers []string `json:"receivers" yaml:"receivers"` + } `json:"logs" yaml:"logs"` + } `json:"pipelines" yaml:"pipelines"` +} + +func getOtelPipelinFromConfig(config map[string]interface{}) (*otelPipeline, error) { + if _, ok := config["service"]; !ok { + return nil, fmt.Errorf("service not found in OTEL config") + } + b, err := json.Marshal(config["service"]) + if err != nil { + return nil, err + } + p := otelPipeline{} + if err := json.Unmarshal(b, &p); err != nil { + return nil, err + } + return &p, nil +} + +func buildLogsProcessors(current []string, logsParserPipeline []string) ([]string, error) { + lockLogsPipelineSpec.Lock() + defer lockLogsPipelineSpec.Unlock() + + exists := map[string]struct{}{} + for _, v := range logsParserPipeline { + exists[v] = struct{}{} + } + + // removed the old processors which are not used + var pipeline []string + for _, v := range current { + k := v + if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) { + pipeline = append(pipeline, v) + } + } + + // create a reverse map of existing config processors and their position + existing := map[string]int{} + for i, p := range pipeline { + name := p + existing[name] = i + } + + // create mapping from our logsParserPipeline to position in existing processors (from current config) + // this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3 + specVsExistingMap := map[int]int{} + existingVsSpec := map[int]int{} + + // go through plan and map its elements to current positions in effective config + for i, m := range logsParserPipeline { + if loc, ok := existing[m]; ok { + specVsExistingMap[i] = loc + existingVsSpec[loc] = i + } + } + + lastMatched := 0 + newPipeline := []string{} + + for i := 0; i < len(logsParserPipeline); i++ { + m := logsParserPipeline[i] + if loc, ok := specVsExistingMap[i]; ok { + for j := lastMatched; j < loc; j++ { + if strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) { + delete(specVsExistingMap, existingVsSpec[j]) + } else { + newPipeline = append(newPipeline, pipeline[j]) + } + } + newPipeline = append(newPipeline, pipeline[loc]) + lastMatched = loc + 1 + } else { + newPipeline = append(newPipeline, m) + } + + } + if lastMatched < len(pipeline) { + newPipeline = append(newPipeline, pipeline[lastMatched:]...) + } + + if checkDuplicateString(newPipeline) { + // duplicates are most likely because the processor sequence in effective config conflicts + // with the planned sequence as per planned pipeline + return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline) + } + + return newPipeline, nil +} + +func checkDuplicateString(pipeline []string) bool { + exists := make(map[string]bool, len(pipeline)) + zap.S().Debugf("checking duplicate processors in the pipeline:", pipeline) + for _, processor := range pipeline { + name := processor + if _, ok := exists[name]; ok { + return true + } + + exists[name] = true + } + return false +} diff --git a/pkg/query-service/app/opamp/logspipeline_test.go b/pkg/query-service/app/opamp/logspipeline_test.go new file mode 100644 index 0000000000..eef08870dd --- /dev/null +++ b/pkg/query-service/app/opamp/logspipeline_test.go @@ -0,0 +1,206 @@ +package opamp + +import ( + "fmt" + "testing" + + . "github.com/smartystreets/goconvey/convey" + "go.signoz.io/signoz/pkg/query-service/constants" +) + +var buildProcessorTestData = []struct { + Name string + agentConf map[string]interface{} + pipelineProcessor map[string]interface{} + outputConf map[string]interface{} +}{ + { + Name: "Add", + agentConf: map[string]interface{}{ + "processors": map[string]interface{}{ + "batch": struct{}{}, + }, + }, + pipelineProcessor: map[string]interface{}{ + constants.LogsPPLPfx + "b": struct{}{}, + }, + outputConf: map[string]interface{}{ + "processors": map[string]interface{}{ + constants.LogsPPLPfx + "b": struct{}{}, + "batch": struct{}{}, + }, + }, + }, + { + Name: "Remove", + agentConf: map[string]interface{}{ + "processors": map[string]interface{}{ + constants.LogsPPLPfx + "b": struct{}{}, + "batch": struct{}{}, + }, + }, + pipelineProcessor: map[string]interface{}{}, + outputConf: map[string]interface{}{ + "processors": map[string]interface{}{ + "batch": struct{}{}, + }, + }, + }, + { + Name: "remove and upsert 1", + agentConf: map[string]interface{}{ + "processors": map[string]interface{}{ + constants.LogsPPLPfx + "a": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, + "batch": struct{}{}, + }, + }, + pipelineProcessor: map[string]interface{}{ + constants.LogsPPLPfx + "b": struct{}{}, + }, + outputConf: map[string]interface{}{ + "processors": map[string]interface{}{ + constants.LogsPPLPfx + "b": struct{}{}, + "batch": struct{}{}, + }, + }, + }, + { + Name: "remove and upsert 2", + agentConf: map[string]interface{}{ + "processors": map[string]interface{}{ + "memorylimiter": struct{}{}, + constants.LogsPPLPfx + "a": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, + "batch": struct{}{}, + }, + }, + pipelineProcessor: map[string]interface{}{ + constants.LogsPPLPfx + "b": struct{}{}, + }, + outputConf: map[string]interface{}{ + "processors": map[string]interface{}{ + "memorylimiter": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, + "batch": struct{}{}, + }, + }, + }, +} + +func TestBuildLogParsingProcessors(t *testing.T) { + for _, test := range buildProcessorTestData { + Convey(test.Name, t, func() { + err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor) + So(err, ShouldBeNil) + So(test.agentConf, ShouldResemble, test.outputConf) + }) + } + +} + +var BuildLogsPipelineTestData = []struct { + Name string + currentPipeline []string + logsPipeline []string + expectedPipeline []string +}{ + { + Name: "Add new pipelines", + currentPipeline: []string{"processor1", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"}, + }, + { + Name: "Add new pipeline and respect custom processors", + currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"}, + }, + { + Name: "Add new pipeline and respect custom processors", + currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d", "processor2"}, + }, + { + Name: "Add new pipeline and respect custom processors in the beginning and middle", + currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"}, + }, + { + Name: "Remove old pipeline add add new", + currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"}, + }, + { + Name: "Remove old pipeline from middle", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a"}, + expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"}, + }, + { + Name: "Remove old pipeline from middle and add new pipeline", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"}, + }, + { + Name: "Remove multiple old pipelines from middle and add multiple new ones", + currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"}, + expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"}, + }, + + // working + { + Name: "rearrange pipelines", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch"}, + }, + { + Name: "rearrange pipelines with new processor", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"}, + // expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"}, + }, + { + Name: "delete processor", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []string{}, + expectedPipeline: []string{"processor1", "processor2", "processor3", "batch"}, + }, + { + Name: "last to first", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"}, + logsPipeline: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", "processor4", "batch", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, + }, + { + Name: "multiple rearrange pipelines", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"}, + logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"}, + }, + { + Name: "multiple rearrange with new pipelines", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"}, + logsPipeline: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"}, + expectedPipeline: []string{constants.LogsPPLPfx + "_z", "processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"}, + }, +} + +func TestBuildLogsPipeline(t *testing.T) { + for _, test := range BuildLogsPipelineTestData { + Convey(test.Name, t, func() { + v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline) + So(err, ShouldBeNil) + fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline) + So(v, ShouldResemble, test.expectedPipeline) + }) + } +} diff --git a/pkg/query-service/app/opamp/opamp_server.go b/pkg/query-service/app/opamp/opamp_server.go index 237b07f121..cee50ba90c 100644 --- a/pkg/query-service/app/opamp/opamp_server.go +++ b/pkg/query-service/app/opamp/opamp_server.go @@ -2,15 +2,10 @@ package opamp import ( "context" - "crypto/sha256" - "strings" - - "github.com/knadh/koanf/parsers/yaml" "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server" "github.com/open-telemetry/opamp-go/server/types" - "go.opentelemetry.io/collector/confmap" model "go.signoz.io/signoz/pkg/query-service/app/opamp/model" "go.uber.org/zap" @@ -112,93 +107,3 @@ func Ready() bool { func Subscribe(agentId string, hash string, f model.OnChangeCallback) { model.ListenToConfigUpdate(agentId, hash, f) } - -func UpsertProcessor(ctx context.Context, processors map[string]interface{}, names []interface{}) error { - x := map[string]interface{}{ - "processors": processors, - } - - newConf := confmap.NewFromStringMap(x) - - agents := opAmpServer.agents.GetAllAgents() - for _, agent := range agents { - config := agent.EffectiveConfig - c, err := yaml.Parser().Unmarshal([]byte(config)) - if err != nil { - return err - } - agentConf := confmap.NewFromStringMap(c) - - err = agentConf.Merge(newConf) - if err != nil { - return err - } - - service := agentConf.Get("service") - - logs := service.(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] - processors := logs.(map[string]interface{})["processors"].([]interface{}) - userProcessors := []interface{}{} - // remove old ones - for _, v := range processors { - if !strings.HasPrefix(v.(string), "logstransform/pipeline_") { - userProcessors = append(userProcessors, v) - } - } - // all user processors are pushed after pipelines - processors = append(names, userProcessors...) - - service.(map[string]interface{})["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})["processors"] = processors - - s := map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "logs": map[string]interface{}{ - "processors": processors, - }, - }, - }, - } - - serviceC := confmap.NewFromStringMap(s) - - err = agentConf.Merge(serviceC) - if err != nil { - return err - } - - // ------ complete adding processor - configR, err := yaml.Parser().Marshal(agentConf.ToStringMap()) - if err != nil { - return err - } - - zap.S().Infof("sending new config", string(configR)) - hash := sha256.New() - _, err = hash.Write(configR) - if err != nil { - return err - } - agent.EffectiveConfig = string(configR) - err = agent.Upsert() - if err != nil { - return err - } - - agent.SendToAgent(&protobufs.ServerToAgent{ - RemoteConfig: &protobufs.AgentRemoteConfig{ - Config: &protobufs.AgentConfigMap{ - ConfigMap: map[string]*protobufs.AgentConfigFile{ - "collector.yaml": { - Body: configR, - ContentType: "application/x-yaml", - }, - }, - }, - ConfigHash: hash.Sum(nil), - }, - }) - } - - return nil -} diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index a18a69ac9a..191e7c6e5f 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -220,3 +220,6 @@ const ( NumberTagMapCol = "numberTagMap" BoolTagMapCol = "boolTagMap" ) + +// logsPPLPfx is a short constant for logsPipelinePrefix +const LogsPPLPfx = "logstransform/pipeline_" diff --git a/pkg/query-service/model/logparsingpipeline.go b/pkg/query-service/model/logparsingpipeline.go new file mode 100644 index 0000000000..3eec51bdc3 --- /dev/null +++ b/pkg/query-service/model/logparsingpipeline.go @@ -0,0 +1,95 @@ +package model + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" +) + +// Pipeline is stored and also deployed finally to collector config +type Pipeline struct { + Id string `json:"id,omitempty" db:"id"` + OrderId int `json:"orderId" db:"order_id"` + Name string `json:"name,omitempty" db:"name"` + Alias string `json:"alias" db:"alias"` + Description *string `json:"description" db:"description"` + Enabled bool `json:"enabled" db:"enabled"` + Filter string `json:"filter" db:"filter"` + + // configuration for pipeline + RawConfig string `db:"config_json" json:"-"` + + Config []PipelineOperator `json:"config"` + + // Updater not required as any change will result in new version + Creator +} + +type Creator struct { + CreatedBy string `json:"createdBy" db:"created_by"` + CreatedAt time.Time `json:"createdAt" db:"created_at"` +} + +type Processor struct { + Operators []PipelineOperator `json:"operators" yaml:"operators"` +} + +type PipelineOperator struct { + Type string `json:"type" yaml:"type"` + ID string `json:"id,omitempty" yaml:"id,omitempty"` + Output string `json:"output,omitempty" yaml:"output,omitempty"` + OnError string `json:"on_error,omitempty" yaml:"on_error,omitempty"` + + // don't need the following in the final config + OrderId int `json:"orderId" yaml:"-"` + Enabled bool `json:"enabled" yaml:"-"` + Name string `json:"name,omitempty" yaml:"-"` + + // optional keys depending on the type + ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"` + Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` + Regex string `json:"regex,omitempty" yaml:"regex,omitempty"` + ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"` + Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"` + TraceParser *TraceParser `json:"trace_parser,omitempty" yaml:"trace_parser,omitempty"` + Field string `json:"field,omitempty" yaml:"field,omitempty"` + Value string `json:"value,omitempty" yaml:"value,omitempty"` + From string `json:"from,omitempty" yaml:"from,omitempty"` + To string `json:"to,omitempty" yaml:"to,omitempty"` + Expr string `json:"expr,omitempty" yaml:"expr,omitempty"` + Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"` + Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"` + Default string `json:"default,omitempty" yaml:"default,omitempty"` +} + +type TimestampParser struct { + Layout string `json:"layout" yaml:"layout"` + LayoutType string `json:"layout_type" yaml:"layout_type"` + ParseFrom string `json:"parse_from" yaml:"parse_from"` +} + +type TraceParser struct { + TraceId *ParseFrom `json:"trace_id,omitempty" yaml:"trace_id,omitempty"` + SpanId *ParseFrom `json:"span_id,omitempty" yaml:"span_id,omitempty"` + TraceFlags *ParseFrom `json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"` +} + +type ParseFrom struct { + ParseFrom string `json:"parse_from" yaml:"parse_from"` +} + +type Route struct { + Output string `json:"output" yaml:"output"` + Expr string `json:"expr" yaml:"expr"` +} + +func (i *Pipeline) ParseRawConfig() error { + c := []PipelineOperator{} + err := json.Unmarshal([]byte(i.RawConfig), &c) + if err != nil { + return errors.Wrap(err, "failed to parse ingestion rule config") + } + i.Config = c + return nil +}