Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: http flusher add Convert.Separator config field #616

Merged
merged 3 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/cn/data-pipeline/flusher/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
| ---------------------------- | ------------------ | -------- |---------------------------------------------------------------------------------------------------------------------------------------------------|
| 参数 | 类型 | 是否必选 | 说明 |
|------------------------------|--------------------| -------- |---------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String | 是 | 插件类型,固定为`flusher_http` |
| RemoteURL | String | 是 | 要发送到的URL地址,示例:`http://localhost:8086/write` |
| Headers | Map<String,String> | 否 | 发送时附加的http请求header,如可添加 Authorization、Content-Type等信息 |
Expand All @@ -20,6 +20,7 @@
| Convert | Struct | 否 | ilogtail数据转换协议配置 |
| Convert.Protocol | String | 否 | ilogtail数据转换协议,可选值:`custom_single`,`influxdb`。默认值:`custom_single`<p>v2版本可选值:`raw`</p> |
| Convert.Encoding | String | 否 | ilogtail flusher数据转换编码,可选值:`json`, `custom`,默认值:`json` |
| Convert.Separator | String | 否 | ilogtail数据转换时,PipelineGroupEvents中多个Events之间拼接使用的分隔符。如`\n`。若不设置,则默认不拼接Events,即每个Event作为独立请求向后发送。 默认值为空。<p>当前仅在`Convert.Protocol: raw`有效。</p> |
| Convert.TagFieldsRename | Map<String,String> | 否 | 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> | 否 | ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags`和`time` |
| Concurrency | Int | 否 | 向url发起请求的并发数,默认为`1` |
Expand Down
11 changes: 6 additions & 5 deletions docs/cn/developer-guide/log-protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ iLogtail的日志数据默认以sls自定义协议的形式与外部进行交互

目前,iLogtail日志数据支持的协议及相应的编码方式如下表所示,其中协议类型可分为自定义协议和标准协议:

| 协议类型 | 协议名称 | 支持的编码方式 |
| ------- | ------- | ------- |
| 标准协议 | [sls协议](./protocol-spec/sls.md) | json、protobuf |
| 自定义协议 | [单条协议](./protocol-spec/custom_single.md) | json |
| 标准协议 | [Influxdb协议](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/) | custom |
| 协议类型 | 协议名称 | 支持的编码方式 |
|-------|--------------------------------------------------------------------------------------------------|---------------|
| 标准协议 | [sls协议](./protocol-spec/sls.md) | json、protobuf |
| 自定义协议 | [单条协议](./protocol-spec/custom_single.md) | json |
| 标准协议 | [Influxdb协议](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/) | custom |
| 字节流协议 | [raw协议](./protocol-spec/raw.md) | custom |
30 changes: 30 additions & 0 deletions docs/cn/developer-guide/log-protocol/protocol-spec/raw.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# raw协议

raw协议在内存中对应的数据结构定义如下:

## ByteArray & GroupInfo

传输数据的原始字节流,是`PipelineEvent`的一种实现。

```go
type ByteArray []byte
```

传输数据的标签及元数据信息,简单的key/value对。

```go
type GroupInfo struct {
Metadata Metadata
Tags Tags
}
```

## PipelineGroupEvents
传输数据整体。

```go
type PipelineGroupEvents struct {
Group *GroupInfo
Events []PipelineEvent
}
```
1 change: 1 addition & 0 deletions helper/converter_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package helper
type ConvertConfig struct {
TagFieldsRename map[string]string // Rename one or more fields from tags.
ProtocolFieldsRename map[string]string // Rename one or more fields, The protocol field options can only be: contents, tags, time
Separator string // Convert separator
Protocol string // Convert protocol
Encoding string // Convert encoding
}
10 changes: 10 additions & 0 deletions pkg/protocol/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,20 @@ var supportedEncodingMap = map[string]map[string]bool{
type Converter struct {
Protocol string
Encoding string
Separator string
TagKeyRenameMap map[string]string
ProtocolKeyRenameMap map[string]string
}

func NewConverterWithSep(protocol, encoding, sep string, tagKeyRenameMap, protocolKeyRenameMap map[string]string) (*Converter, error) {
converter, err := NewConverter(protocol, encoding, tagKeyRenameMap, protocolKeyRenameMap)
if err != nil {
return nil, err
}
converter.Separator = sep
return converter, nil
}

func NewConverter(protocol, encoding string, tagKeyRenameMap, protocolKeyRenameMap map[string]string) (*Converter, error) {
enc, ok := supportedEncodingMap[protocol]
if !ok {
Expand Down
39 changes: 32 additions & 7 deletions pkg/protocol/converter/converter_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,48 @@ func (c *Converter) ConvertToRawStream(groupEvents *models.PipelineGroupEvents,
return nil, nil, nil
}

byteStream := *GetPooledByteBuf()
var targetValues map[string]string
if len(targetFields) > 0 {
urnotsally marked this conversation as resolved.
Show resolved Hide resolved
targetValues = findTargetFieldsInGroup(targetFields, groupEvents.Group)
}

if len(c.Separator) == 0 {
return getByteStream(groupEvents, targetValues)
}

return getByteStreamWithSep(groupEvents, targetValues, c.Separator)
}

func getByteStreamWithSep(groupEvents *models.PipelineGroupEvents, targetValues map[string]string, sep string) (stream [][]byte, values []map[string]string, err error) {
joinedStream := *GetPooledByteBuf()
for idx, event := range groupEvents.Events {
eventType := event.GetType()
if eventType != models.EventTypeByteArray {
return nil, nil, fmt.Errorf("unsupported event type %v", eventType)
}
if idx != 0 {
byteStream = append(byteStream, '\n')
joinedStream = append(joinedStream, sep...)
}
byteStream = append(byteStream, event.(models.ByteArray)...)
joinedStream = append(joinedStream, event.(models.ByteArray)...)
}
return [][]byte{joinedStream}, []map[string]string{targetValues}, nil
}

var targetValues map[string]string
if len(targetFields) > 0 {
targetValues = findTargetFieldsInGroup(targetFields, groupEvents.Group)
func getByteStream(groupEvents *models.PipelineGroupEvents, targetValues map[string]string) (stream [][]byte, values []map[string]string, err error) {
byteGroup := make([][]byte, 0, len(groupEvents.Events))
valueGroup := make([]map[string]string, 0, len(groupEvents.Events))
for _, event := range groupEvents.Events {
eventType := event.GetType()
if eventType != models.EventTypeByteArray {
return nil, nil, fmt.Errorf("unsupported event type %v", eventType)
}

byteStream := *GetPooledByteBuf()
byteStream = append(byteStream, event.(models.ByteArray)...)
byteGroup = append(byteGroup, byteStream)
valueGroup = append(valueGroup, targetValues)
}
return [][]byte{byteStream}, []map[string]string{targetValues}, nil
return byteGroup, valueGroup, nil
}

func findTargetFieldsInGroup(targetFields []string, group *models.GroupInfo) map[string]string {
Expand Down
83 changes: 80 additions & 3 deletions pkg/protocol/converter/converter_raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func TestConverter_ConvertToRawStream(t *testing.T) {
type fields struct {
Protocol string
Encoding string
Separator string
TagKeyRenameMap map[string]string
ProtocolKeyRenameMap map[string]string
}
Expand All @@ -21,8 +22,9 @@ func TestConverter_ConvertToRawStream(t *testing.T) {
targetFields []string
}
mockValidFields := fields{
Protocol: ProtocolRaw,
Encoding: EncodingCustom,
Protocol: ProtocolRaw,
Encoding: EncodingCustom,
Separator: "\n",
}
mockInvalidFields := fields{
Protocol: ProtocolRaw,
Expand Down Expand Up @@ -57,7 +59,7 @@ func TestConverter_ConvertToRawStream(t *testing.T) {
Events: []models.PipelineEvent{models.ByteArray(mockByteEvent), models.ByteArray(mockByteEvent)}},
targetFields: []string{"metadata.db"},
},
wantStream: [][]byte{append(append(mockByteEvent, '\n'), mockByteEvent...)},
wantStream: [][]byte{append(append(mockByteEvent, "\n"...), mockByteEvent...)},
wantValues: []map[string]string{{"metadata.db": "test"}},
wantErr: assert.NoError,
}, {
Expand All @@ -83,6 +85,81 @@ func TestConverter_ConvertToRawStream(t *testing.T) {
c := &Converter{
Protocol: tt.fields.Protocol,
Encoding: tt.fields.Encoding,
Separator: tt.fields.Separator,
TagKeyRenameMap: tt.fields.TagKeyRenameMap,
ProtocolKeyRenameMap: tt.fields.ProtocolKeyRenameMap,
}
gotStream, gotValues, err := c.ConvertToRawStream(tt.args.groupEvents, tt.args.targetFields)
if !tt.wantErr(t, err, fmt.Sprintf("ConvertToRawStream(%v, %v)", tt.args.groupEvents, tt.args.targetFields)) {
return
}
assert.Equalf(t, tt.wantStream, gotStream, "ConvertToRawStream(%v, %v)", tt.args.groupEvents, tt.args.targetFields)
assert.Equalf(t, tt.wantValues, gotValues, "ConvertToRawStream(%v, %v)", tt.args.groupEvents, tt.args.targetFields)
})
}
}

func TestConverter_ConvertToRawStreamSeparator(t *testing.T) {
type fields struct {
Protocol string
Encoding string
Separator string
TagKeyRenameMap map[string]string
ProtocolKeyRenameMap map[string]string
}
type args struct {
groupEvents *models.PipelineGroupEvents
targetFields []string
}
mockFieldsWithSep := fields{
Protocol: ProtocolRaw,
Encoding: EncodingCustom,
Separator: "\r\n",
}
mockFieldsWithoutSep := fields{
Protocol: ProtocolRaw,
Encoding: EncodingJSON,
}
mockGroup := models.NewGroup(models.NewMetadataWithMap(map[string]string{"db": "test"}), nil)
mockByteEvent := []byte("cpu.load.short,host=server01,region=cn value=0.6")
tests := []struct {
name string
fields fields
args args
wantStream [][]byte
wantValues []map[string]string
wantErr assert.ErrorAssertionFunc
}{
{
name: "join with sep",
fields: mockFieldsWithSep,
args: args{
groupEvents: &models.PipelineGroupEvents{Group: mockGroup,
Events: []models.PipelineEvent{models.ByteArray(mockByteEvent), models.ByteArray(mockByteEvent)}},
targetFields: []string{"metadata.db"},
},
wantStream: [][]byte{append(append(mockByteEvent, "\r\n"...), mockByteEvent...)},
wantValues: []map[string]string{{"metadata.db": "test"}},
wantErr: assert.NoError,
}, {
name: "not join",
fields: mockFieldsWithoutSep,
args: args{
groupEvents: &models.PipelineGroupEvents{Group: mockGroup,
Events: []models.PipelineEvent{models.ByteArray(mockByteEvent), models.ByteArray(mockByteEvent)}},
targetFields: []string{"metadata.db"},
},
wantStream: [][]byte{mockByteEvent, mockByteEvent},
wantValues: []map[string]string{{"metadata.db": "test"}, {"metadata.db": "test"}},
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Converter{
Protocol: tt.fields.Protocol,
Encoding: tt.fields.Encoding,
Separator: tt.fields.Separator,
TagKeyRenameMap: tt.fields.TagKeyRenameMap,
ProtocolKeyRenameMap: tt.fields.ProtocolKeyRenameMap,
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/flusher/http/flusher_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (f *FlusherHTTP) Stop() error {
}

func (f *FlusherHTTP) getConverter() (*converter.Converter, error) {
return converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, nil, nil)
return converter.NewConverterWithSep(f.Convert.Protocol, f.Convert.Encoding, f.Convert.Separator, nil, nil)
}

func (f *FlusherHTTP) addTask(log interface{}) {
Expand Down
89 changes: 86 additions & 3 deletions plugins/flusher/http/flusher_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestHttpFlusherFlush(t *testing.T) {
}

func TestHttpFlusherExport(t *testing.T) {
Convey("Given a http flusher with protocol: Raw, encoding: custom, query: contains variable '%{metadata.db}'", t, func() {
Convey("Given a http flusher with Convert.Protocol: Raw, Convert.Encoding: Custom, Query: '%{metadata.db}'", t, func() {
var actualRequests []string
httpmock.Activate()
defer httpmock.DeactivateAndReset()
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestHttpFlusherExport(t *testing.T) {
})
})

Convey("Export multiple byte events in one GroupEvents with Metadata {db: mydb}", func() {
Convey("Export multiple byte events in one GroupEvents with Metadata {db: mydb}, and ", func() {
groupEvents := models.PipelineGroupEvents{
Group: models.NewGroup(mockMetadata, nil),
Events: []models.PipelineEvent{models.ByteArray(mockMetric1),
Expand All @@ -358,7 +358,89 @@ func TestHttpFlusherExport(t *testing.T) {
So(err, ShouldBeNil)
flusher.Stop()

Convey("events in the same groupEvents should be send in one request", func() {
Convey("events in the same groupEvents should be send in individual request, when Convert.Separator is not set", func() {
reqCount := httpmock.GetTotalCallCount()
So(reqCount, ShouldEqual, 2)
})

Convey("request body should be valid", func() {
So(actualRequests, ShouldResemble, []string{
mockMetric1, mockMetric2,
})
})
})
})

Convey("Given a http flusher with Convert.Protocol: Raw, Convert.Encoding: Custom, Convert.Separator: '\n' ,Query: '%{metadata.db}'", t, func() {
var actualRequests []string
httpmock.Activate()
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("POST", "http://test.com/write?db=mydb", func(req *http.Request) (*http.Response, error) {
body, _ := ioutil.ReadAll(req.Body)
actualRequests = append(actualRequests, string(body))
return httpmock.NewStringResponse(200, "ok"), nil
})

flusher := &FlusherHTTP{
RemoteURL: "http://test.com/write",
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolRaw,
Encoding: converter.EncodingCustom,
Separator: "\n",
},
Timeout: defaultTimeout,
Concurrency: 1,
Query: map[string]string{
"db": "%{metadata.db}",
},
}

err := flusher.Init(mock.NewEmptyContext("p", "l", "c"))
So(err, ShouldBeNil)

mockMetric1 := "cpu.load.short,host=server01,region=cn value=0.6 1672321328000000000"
mockMetric2 := "cpu.load.short,host=server01,region=cn value=0.2 1672321358000000000"
mockMetadata := models.NewMetadataWithKeyValues("db", "mydb")

Convey("Export a single byte events each GroupEvents with Metadata {db: mydb}", func() {
groupEventsArray := []*models.PipelineGroupEvents{
{
Group: models.NewGroup(mockMetadata, nil),
Events: []models.PipelineEvent{models.ByteArray(mockMetric1)},
},
{
Group: models.NewGroup(mockMetadata, nil),
Events: []models.PipelineEvent{models.ByteArray(mockMetric2)},
},
}
httpmock.ZeroCallCounters()
err := flusher.Export(groupEventsArray, nil)
So(err, ShouldBeNil)
flusher.Stop()

Convey("each GroupEvents should send in a single request", func() {
So(httpmock.GetTotalCallCount(), ShouldEqual, 2)
})
Convey("request body should by valid", func() {
So(actualRequests, ShouldResemble, []string{
mockMetric1, mockMetric2,
})
})
})

Convey("Export multiple byte events in one GroupEvents with Metadata {db: mydb}, and ", func() {
groupEvents := models.PipelineGroupEvents{
Group: models.NewGroup(mockMetadata, nil),
Events: []models.PipelineEvent{models.ByteArray(mockMetric1),
models.ByteArray(mockMetric2)},
}
httpmock.ZeroCallCounters()
err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil)
So(err, ShouldBeNil)
flusher.Stop()

Convey("events in the same groupEvents should be send in one request, when Convert.Separator is set", func() {
reqCount := httpmock.GetTotalCallCount()
So(reqCount, ShouldEqual, 1)
})
Expand All @@ -370,6 +452,7 @@ func TestHttpFlusherExport(t *testing.T) {
})
})
})

}

func TestHttpFlusherExportUnsupportedEventType(t *testing.T) {
Expand Down