Skip to content

Commit

Permalink
基于时间规则的多行分割processor
Browse files Browse the repository at this point in the history
  • Loading branch information
luckywangpei committed May 14, 2023
1 parent 5f49320 commit 597f1a9
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 1 deletion.
4 changes: 3 additions & 1 deletion core/config_manager/ConfigYamlToJson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const std::string PROCESSOR_DELIMITER_ACCELERATE = "processor_delimiter_accelera

const std::string PROCESSOR_SPLIT_LINE_LOG_USING_SEP = "processor_split_log_string";
const std::string PROCESSOR_SPLIT_LINE_LOG_USING_REG = "processor_split_log_regex";
const std::string PROCESSOR_SPLIT_MULTI_ROW_BY_TIME_RULE = "processor_split_multi_row_by_time_rule";

ConfigYamlToJson::ConfigYamlToJson() {
// file_log params
Expand Down Expand Up @@ -334,7 +335,8 @@ bool ConfigYamlToJson::CheckPluginConfig(const string configName, const YAML::No
} else {
if (0 == workMode.mLogSplitProcessorPluginType.size()
&& (0 == processorPluginType.compare(PROCESSOR_SPLIT_LINE_LOG_USING_SEP)
|| 0 == processorPluginType.compare(PROCESSOR_SPLIT_LINE_LOG_USING_REG))) {
|| 0 == processorPluginType.compare(PROCESSOR_SPLIT_LINE_LOG_USING_REG)
|| 0 == processorPluginType.compare(PROCESSOR_SPLIT_MULTI_ROW_BY_TIME_RULE))) {
workMode.mLogSplitProcessorPluginType = processorPluginType;
if (iter->second.mFirstPos != 0) {
LOG_ERROR(sLogger,
Expand Down
148 changes: 148 additions & 0 deletions plugins/processor/split/multirow/split_multi_row_by_time_rule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package multirow

import (
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
"strings"
"time"
)

type ProcessorSplitMultiRowByTimeRule struct {
TimeStartFlag string
TimeFormat string
TimeFormatLength int
SplitKey string
context pipeline.Context
}

const pluginName = "processor_split_multi_row_by_time_rule"

// Init called for init some system resources, like socket, mutex...
func (p *ProcessorSplitMultiRowByTimeRule) Init(context pipeline.Context) error {
p.context = context
p.TimeFormatLength = len(p.TimeFormat)
return nil
}

func (*ProcessorSplitMultiRowByTimeRule) Description() string {
return "split multi row by timerule"
}

func (p *ProcessorSplitMultiRowByTimeRule) ProcessLogs(logArray []*protocol.Log) []*protocol.Log {
destArray := make([]*protocol.Log, 0, len(logArray))

for _, log := range logArray {
newLog := &protocol.Log{}
var destCont *protocol.Log_Content
for _, cont := range log.Contents {
if destCont == nil && (len(p.SplitKey) != 0 && cont.Key == p.SplitKey) {
destCont = cont
}
}
if log.Time != uint32(0) {
newLog.Time = log.Time
} else {
newLog.Time = (uint32)(time.Now().Unix())
}
if destCont != nil {
destArray = p.SplitLog(destArray, newLog, destCont)
}
}

return destArray
}

func (p *ProcessorSplitMultiRowByTimeRule) SplitLog(destLogArray []*protocol.Log, destLog *protocol.Log, rowContent *protocol.Log_Content) []*protocol.Log {
valueStr := rowContent.GetValue()

lastCheckIndex := 0

isFirstLine := true
//lastLine := nil
var lastLine *protocol.Log = nil
var lastLineContent *string = nil
totalLen := len(valueStr)
for i := 0; i < totalLen; i++ {
if (valueStr[i] == '\n') || (i == (totalLen-1) && valueStr[i] != '\n') {
line := valueStr[lastCheckIndex : i+1]
timeString := p.getTimeString(line)
if timeString != nil {
if isFirstLine {
copyLog := protocol.CloneLog(destLog)
lastLineContent = &line
copyLog.Contents = append(copyLog.Contents, &protocol.Log_Content{
Key: rowContent.GetKey(), Value: *lastLineContent})
destLogArray = append(destLogArray, copyLog)
lastLine = copyLog
lastCheckIndex = i + 1
isFirstLine = false
} else {
copyLog := protocol.CloneLog(destLog)
copyLog.Contents = append(copyLog.Contents, &protocol.Log_Content{
Key: rowContent.GetKey(), Value: line})
destLogArray = append(destLogArray, copyLog)
lastLine = copyLog
lastLineContent = &line
lastCheckIndex = i + 1
}
} else {
if !isFirstLine {
temp := *lastLineContent
temp += line
*lastLineContent = temp
lastLine.GetContents()[0].Value = *lastLineContent
lastCheckIndex = i + 1
} else {
copyLog := protocol.CloneLog(destLog)
copyLog.Contents = append(copyLog.Contents, &protocol.Log_Content{
Key: rowContent.GetKey(), Value: line})
destLogArray = append(destLogArray, copyLog)
lastCheckIndex = i + 1
}
}
}
}

return destLogArray
}

func (p *ProcessorSplitMultiRowByTimeRule) getTimeString(line string) *string {
if len(line) < p.TimeFormatLength {
return nil
}

index := strings.Index(line, p.TimeStartFlag)
if index >= 0 {
if (index + 1) >= (p.TimeFormatLength + 1) {
return nil
}
timeString := line[index+1 : p.TimeFormatLength+1]
return &timeString
}

return nil
}

func init() {
pipeline.Processors[pluginName] = func() pipeline.Processor {
return &ProcessorSplitMultiRowByTimeRule{
TimeStartFlag: "timestamp=",
TimeFormat: "YYYY-MM-DD hh:mm:ss",
SplitKey: "content",
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package multirow

import (
"github.com/alibaba/ilogtail/plugins/test"
"testing"

"github.com/stretchr/testify/require"

"github.com/alibaba/ilogtail/pkg/protocol"
"github.com/alibaba/ilogtail/plugins/test/mock"

"encoding/json"
)

func newProcessor() (*ProcessorSplitMultiRowByTimeRule, error) {
ctx := mock.NewEmptyContext("p", "l", "c")
processor := &ProcessorSplitMultiRowByTimeRule{
SplitKey: "content",
TimeFormat: "YYYY-MM-DD hh:mm:ss",
TimeStartFlag: "[",
}
err := processor.Init(ctx)
return processor, err
}

func TestMultiRow(t *testing.T) {
processor, err := newProcessor()
require.NoError(t, err)

log := `[2023-05-14 14:21:09][INFO][.../cache.go:198] _undef||traceid=xxx||spanid=xxx||hintCode=0||hintContent=xxx||thread=ThreadPoolExecutor-0_0||java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at java.base/java.lang.Thread.run(Thread.java:834)
[2023-05-14 14:21:10][INFO][.../cache.go:198] _undef||traceid=xxx||spanid=xxx||hintCode=0||hintContent=xxx||thread=ThreadPoolExecutor-0_0||java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at java.base/java.lang.Thread.run(Thread.java:834)`

logPb := test.CreateLogs("content", log)
logArray := make([]*protocol.Log, 1)
logArray[0] = logPb

destLogs := processor.ProcessLogs(logArray)
marshal, _ := json.MarshalIndent(destLogs, "", " ")
println("processor解析后数据: " + string(marshal))

}

0 comments on commit 597f1a9

Please sign in to comment.