Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: logs parsing pipeline support in opamp #2456

Merged
merged 10 commits into from
Mar 22, 2023
18 changes: 18 additions & 0 deletions pkg/query-service/agentConf/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,21 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
return nil
}

// UpsertLogParsingProcessors updates the agent with log parsing processors
func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []interface{}) error {
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
return fmt.Errorf("agent updater is busy")
}
defer atomic.StoreUint32(&m.lock, 0)

// send the changes to opamp.
configHash, err := opamp.UpsertLogsParsingProcessor(context.Background(), config, names, m.OnConfigUpdate)
if err != nil {
zap.S().Errorf("failed to call agent config update for log parsing processor:", err)
return err
}

m.updateDeployStatus(ctx, ElementTypeLogPipelines, version, string(DeployInitiated), "Deployment started", configHash, string(rawPipelineData))
return nil
}
180 changes: 180 additions & 0 deletions pkg/query-service/app/opamp/logspipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package opamp

import (
"context"
"crypto/sha256"
"fmt"
"strings"
"sync"

"github.com/knadh/koanf/parsers/yaml"
"github.com/open-telemetry/opamp-go/protobufs"
model "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.uber.org/zap"
)

var lockLogsPipelineSpec sync.RWMutex

func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []interface{}, callback func(string, string, error)) (string, error) {
confHash := ""
if opAmpServer == nil {
return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment")
}

agents := opAmpServer.agents.GetAllAgents()
if len(agents) == 0 {
return confHash, fmt.Errorf("no agents available at the moment")
}

for _, agent := range agents {
config := agent.EffectiveConfig
c, err := yaml.Parser().Unmarshal([]byte(config))
if err != nil {
return confHash, err
}
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved

buildLogParsingProcessors(c, parsingProcessors)

// get the processor list
logs := c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"]
processors := logs.(map[string]interface{})["processors"].([]interface{})

// build the new processor list
updatedProcessorList, _ := buildLogsProcessors(processors, parsingProcessorsNames)

// add the new processor to the data
c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})["processors"] = updatedProcessorList

updatedConf, err := yaml.Parser().Marshal(c)
if err != nil {
return confHash, err
}

// zap.S().Infof("sending new config", string(updatedConf))
hash := sha256.New()
_, err = hash.Write(updatedConf)
if err != nil {
return confHash, err
}
agent.EffectiveConfig = string(updatedConf)
err = agent.Upsert()
if err != nil {
return confHash, err
}

agent.SendToAgent(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"collector.yaml": {
Body: updatedConf,
ContentType: "application/x-yaml",
},
},
},
ConfigHash: hash.Sum(nil),
},
})

if confHash == "" {
confHash = string(hash.Sum(nil))
model.ListenToConfigUpdate(agent.ID, confHash, callback)
}
}

return confHash, nil
}

// check if the processors already exist
// if yes then update the processor.
// if something doesn't exists then remove it.
func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error {
agentProcessors := agentConf["processors"].(map[string]interface{})
exists := map[string]struct{}{}
for key, params := range parsingProcessors {
agentProcessors[key] = params
exists[key] = struct{}{}
}
// remove the old unwanted processors
for k := range agentProcessors {
if _, ok := exists[k]; !ok && strings.HasPrefix(k, constants.LogsPPLPfx) {
delete(agentProcessors, k)
}
}
agentConf["processors"] = agentProcessors
return nil
}

func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}) ([]interface{}, error) {
lockLogsPipelineSpec.Lock()
defer lockLogsPipelineSpec.Unlock()

exists := map[string]struct{}{}
for _, v := range logsParserPipeline {
exists[v.(string)] = struct{}{}
}

// removed the old processors which are not used
var pipeline []interface{}
for _, v := range current {
k := v.(string)
if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) {
pipeline = append(pipeline, v)
}
}

// create a reverse map of existing config processors and their position
existing := map[string]int{}
for i, p := range current {
name := p.(string)
existing[name] = i
}

// create mapping from our logsParserPipeline to position in existing processors (from current config)
// this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3
specVsExistingMap := map[int]int{}

// go through plan and map its elements to current positions in effective config
for i, m := range logsParserPipeline {
if loc, ok := existing[m.(string)]; ok {
specVsExistingMap[i] = loc
}
}

lastMatched := 0

// go through plan again in the increasing order
for i := 0; i < len(logsParserPipeline); i++ {
m := logsParserPipeline[i]

if loc, ok := specVsExistingMap[i]; ok {
lastMatched = loc + 1
} else {
if lastMatched <= 0 {
zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position 0:", m)
pipeline = append([]interface{}{m}, pipeline[lastMatched:]...)
lastMatched++
} else {
zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position :", lastMatched, " ", m)

prior := make([]interface{}, len(pipeline[:lastMatched]))
next := make([]interface{}, len(pipeline[lastMatched:]))

copy(prior, pipeline[:lastMatched])
copy(next, pipeline[lastMatched:])

pipeline = append(prior, m)
pipeline = append(pipeline, next...)
}
}
}

if checkDuplicates(pipeline) {
// duplicates are most likely because the processor sequence in effective config conflicts
// with the planned sequence as per planned pipeline
return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline)
}

return pipeline, nil
}
161 changes: 161 additions & 0 deletions pkg/query-service/app/opamp/logspipeline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package opamp

import (
"fmt"
"testing"

. "github.com/smartystreets/goconvey/convey"
"go.signoz.io/signoz/pkg/query-service/constants"
)

var buildProcessorTestData = []struct {
Name string
agentConf map[string]interface{}
pipelineProcessor map[string]interface{}
outputConf map[string]interface{}
}{
{
Name: "Add",
agentConf: map[string]interface{}{
"processors": map[string]interface{}{
"batch": struct{}{},
},
},
pipelineProcessor: map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{},
},
outputConf: map[string]interface{}{
"processors": map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{},
"batch": struct{}{},
},
},
},
{
Name: "Remove",
agentConf: map[string]interface{}{
"processors": map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{},
"batch": struct{}{},
},
},
pipelineProcessor: map[string]interface{}{},
outputConf: map[string]interface{}{
"processors": map[string]interface{}{
"batch": struct{}{},
},
},
},
{
Name: "remove and upsert 1",
agentConf: map[string]interface{}{
"processors": map[string]interface{}{
constants.LogsPPLPfx + "_a": struct{}{},
constants.LogsPPLPfx + "_b": struct{}{},
"batch": struct{}{},
},
},
pipelineProcessor: map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{},
},
outputConf: map[string]interface{}{
"processors": map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{},
"batch": struct{}{},
},
},
},
{
Name: "remove and upsert 2",
agentConf: map[string]interface{}{
"processors": map[string]interface{}{
"memory_limiter": struct{}{},
constants.LogsPPLPfx + "_a": struct{}{},
constants.LogsPPLPfx + "_b": struct{}{},
"batch": struct{}{},
},
},
pipelineProcessor: map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{},
},
outputConf: map[string]interface{}{
"processors": map[string]interface{}{
"memory_limiter": struct{}{},
constants.LogsPPLPfx + "_b": struct{}{},
"batch": struct{}{},
},
},
},
}

func TestBuildLogParsingProcessors(t *testing.T) {
for _, test := range buildProcessorTestData {
Convey(test.Name, t, func() {
err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor)
So(err, ShouldBeNil)
So(test.agentConf, ShouldResemble, test.outputConf)
})
}

}

var BuildLogsPipelineTestData = []struct {
Name string
nityanandagohain marked this conversation as resolved.
Show resolved Hide resolved
currentPipeline []interface{}
logsPipeline []interface{}
expectedPipeline []interface{}
}{
{
Name: "Add new pipelines",
currentPipeline: []interface{}{"processor1", "processor2"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"},
},
{
Name: "Add new pipeline and respect custom processors",
currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", "processor2"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"},
expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "processor2"},
},
{
Name: "Add new pipeline and respect custom processors in the beginning and middle",
currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"},
expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"},
},
{
Name: "Remove old pipeline add add new",
currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"},
expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", "processor2"},
},
{
Name: "Remove old pipeline from middle",
currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"},
expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "batch"},
},
{
Name: "Remove old pipeline from middle and add new pipeline",
currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "processor3", "batch"},
},
{
Name: "Remove multiple old pipelines from middle and add multiple new ones",
currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_c", "processor4", constants.LogsPPLPfx + "_d", "processor5", "batch"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1"},
expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", "processor2", "processor3", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1", "processor4", "processor5", "batch"},
},
}

func TestBuildLogsPipeline(t *testing.T) {
for _, test := range BuildLogsPipelineTestData {
Convey(test.Name, t, func() {
v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline)
So(err, ShouldBeNil)
fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline)
So(v, ShouldResemble, test.expectedPipeline)
})
}
}
Loading