Skip to content

Commit

Permalink
feat: prometheus decoder optimize (#866)
Browse files Browse the repository at this point in the history
* feat: pooling prometheus buf & request

* fix: copybuffer bug fix

* fix: reset timeseries

* fix: zero copy prompb decode

* fix: temp remove __hostname__

* feat: not recycle uncompressed request body

* feat: simplify the snappy decode

* feat: make prompb fields const

* feat: add options for prometheus decoder unsafe mode

* feat: restore the __hostname__ tag

* fix: lint & license

* fix: decoder options pass though

* doc: note that exemplar not supported

* doc: ref link for the prompb message definition
  • Loading branch information
snakorse authored May 26, 2023
1 parent 6207897 commit 4a8776d
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 37 deletions.
37 changes: 19 additions & 18 deletions docs/cn/data-pipeline/input/service-http-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,26 @@

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
|--------------------|-------------------|------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String || 插件类型,固定为`service_http_server` |
| 参数 | 类型 | 是否必选 | 说明 |
|--------------------|-------------------|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String || 插件类型,固定为`service_http_server` |
| Format | String || <p>数据格式。</p> <p>支持格式:`sls``prometheus``influxdb``otlp_logv1``otlp_metricv1``pyroscope``statsd`</p> <p>v2版本支持格式:`raw``prometheus``otlp_logv1``otlp_metricv1``otlp_tracev1`</p><p>说明:`raw`格式以原始请求字节流传输数据</p> |
| Address | String || <p>监听地址。</p><p></p> |
| Path | String || <p>接收端点, 如Format 为 `otlp_logv1` 时, 默认端点为`/v1/logs` 。</p><p></p> |
| ReadTimeoutSec | String || <p>读取超时时间。</p><p>默认取值为:`10s`。</p> |
| ShutdownTimeoutSec | String || <p>关闭超时时间。</p><p>默认取值为:`5s`。</p> |
| MaxBodySize | String || <p>最大传输 body 大小。</p><p>默认取值为:`64k`。</p> |
| UnlinkUnixSock | String || <p>启动前如果监听地址为unix socket,是否进行强制释放。</p><p>默认取值为:`true`。</p> |
| FieldsExtend | Boolean || <p>是否支持非integer以外的数据类型(如String)</p><p>目前仅针对有 String、Bool 等额外类型的 influxdb Format 有效</p> |
| QueryParams | []String || 需要解析到Group.Metadata中的请求参数。<p>解析结果会以KeyValue放入Metadata。默认取值为`[]`,即不解析。</p><p>仅v2版本有效</p> |
| QueryParamPrefix | String || 解析请求参数时需要添加的key前缀,如`_query_param_`。<p>前缀会直接拼接在每个QueryParam前,无额外连接符,默认取值为空,即不增加前缀。</p><p>仅v2版本有效</p> |
| HeaderParams | []String || 需要解析到Group.Metadata中的header参数。<p>解析结果会以KeyValue放入Metadata。默认取值为`[]`,即不解析。</p><p>仅v2版本有效</p> |
| HeaderParamPrefix | String || 解析Header参数时需要添加的key前缀,如`_header_param_`。<p>前缀会直接拼接在每个HeaderParam前,无额外连接符,默认取值为空,即不增加前缀。</p><p>仅v2版本有效</p> |
| DisableUncompress | Boolean || 禁用对于请求数据的解压缩, 默认取值为:`false`<p>目前仅针对Raw Format有效</p><p>仅v2版本有效</p> |
| Tags | map[String]String || 输出数据默认携带标签<p>仅v1版本有效</p> |
| DumpData | Boolean || [开发使用] 将接收的请求存储于本地文件, 默认取值为:`false` |
| DumpDataKeepFiles | Int || [开发使用] Dump文件保留文件数目, 文件按小时滚动, 此参数默认值为5, 表示保留5小时Dump 参数 |
| Address | String || <p>监听地址。</p><p></p> |
| Path | String || <p>接收端点, 如Format 为 `otlp_logv1` 时, 默认端点为`/v1/logs` 。</p><p></p> |
| ReadTimeoutSec | String || <p>读取超时时间。</p><p>默认取值为:`10s`。</p> |
| ShutdownTimeoutSec | String || <p>关闭超时时间。</p><p>默认取值为:`5s`。</p> |
| MaxBodySize | String || <p>最大传输 body 大小。</p><p>默认取值为:`64k`。</p> |
| UnlinkUnixSock | String || <p>启动前如果监听地址为unix socket,是否进行强制释放。</p><p>默认取值为:`true`。</p> |
| FieldsExtend | Boolean || <p>是否支持非integer以外的数据类型(如String)</p><p>目前仅针对有 String、Bool 等额外类型的 influxdb Format 有效</p> |
| QueryParams | []String || 需要解析到Group.Metadata中的请求参数。<p>解析结果会以KeyValue放入Metadata。默认取值为`[]`,即不解析。</p><p>仅v2版本有效</p> |
| QueryParamPrefix | String || 解析请求参数时需要添加的key前缀,如`_query_param_`。<p>前缀会直接拼接在每个QueryParam前,无额外连接符,默认取值为空,即不增加前缀。</p><p>仅v2版本有效</p> |
| HeaderParams | []String || 需要解析到Group.Metadata中的header参数。<p>解析结果会以KeyValue放入Metadata。默认取值为`[]`,即不解析。</p><p>仅v2版本有效</p> |
| HeaderParamPrefix | String || 解析Header参数时需要添加的key前缀,如`_header_param_`。<p>前缀会直接拼接在每个HeaderParam前,无额外连接符,默认取值为空,即不增加前缀。</p><p>仅v2版本有效</p> |
| DisableUncompress | Boolean || 禁用对于请求数据的解压缩, 默认取值为:`false`<p>目前仅针对Raw Format有效</p><p>仅v2版本有效</p> |
| Tags | map[String]String || 输出数据默认携带标签<p>仅v1版本有效</p> |
| DumpData | Boolean || [开发使用] 将接收的请求存储于本地文件, 默认取值为:`false` |
| DumpDataKeepFiles | Int || [开发使用] Dump文件保留文件数目, 文件按小时滚动, 此参数默认值为5, 表示保留5小时Dump 参数 |
| AllowUnsafeMode | Boolean || 是否允许unsafe模式的Decode,启用该模式,Decoder将可能利用go unsafe技术来加速解码,目前仅当Format=prometheus时有效(注:暂不支持Exemplar、Histogram) |

## 样例

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ require (
github.com/VictoriaMetrics/metricsql v0.45.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/richardartoul/molecule v1.0.0 // indirect
github.com/valyala/fastjson v1.6.3 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/richardartoul/molecule v1.0.0 h1:+LFA9cT7fn8KF39zy4dhOnwcOwRoqKiBkPqKqya+8+U=
github.com/richardartoul/molecule v1.0.0/go.mod h1:uvX/8buq8uVeiZiFht+0lqSLBHF+uGV8BrTv8W/SIwk=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
1 change: 1 addition & 0 deletions licenses/LICENSE_OF_ILOGTAIL_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ When distributed in a binary form, iLogtail may contain portions of the followin
- [github.com/mailru/easyjson](https://pkg.go.dev/github.com/mailru/easyjson?tab=licenses)
- [github.com/mitchellh/mapstructure](https://pkg.go.dev/github.com/mitchellh/mapstructure?tab=licenses)
- [github.com/paulbellamy/ratecounter](https://pkg.go.dev/github.com/paulbellamy/ratecounter?tab=licenses)
- [github.com/richardartoul/molecule](https://pkg.go.dev/github.com/richardartoul/molecule?tab=licenses)
- [github.com/satori/go.uuid](https://pkg.go.dev/github.com/satori/go.uuid?tab=licenses)
- [github.com/shopspring/decimal](https://pkg.go.dev/github.com/shopspring/decimal?tab=licenses)
- [github.com/siddontang/go](https://pkg.go.dev/github.com/siddontang/go?tab=licenses)
Expand Down
1 change: 1 addition & 0 deletions pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9
github.com/pyroscope-io/jfr-parser v0.6.0
github.com/pyroscope-io/pyroscope v0.0.0-00010101000000-000000000000
github.com/richardartoul/molecule v1.0.0
github.com/smartystreets/goconvey v1.7.2
github.com/stretchr/testify v1.8.2
go.opentelemetry.io/collector/pdata v0.66.0
Expand Down
2 changes: 2 additions & 0 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,8 @@ github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9/go.mod h1:
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
github.com/richardartoul/molecule v1.0.0 h1:+LFA9cT7fn8KF39zy4dhOnwcOwRoqKiBkPqKqya+8+U=
github.com/richardartoul/molecule v1.0.0/go.mod h1:uvX/8buq8uVeiZiFht+0lqSLBHF+uGV8BrTv8W/SIwk=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
38 changes: 33 additions & 5 deletions pkg/protocol/decoder/common/comon.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package common

import (
"bytes"
"compress/gzip"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"sync"

"github.com/golang/snappy"
"github.com/pierrec/lz4"
Expand All @@ -40,6 +42,23 @@ const (
ProtocolPyroscope = "pyroscope"
)

var bufPool = sync.Pool{
New: func() interface{} {
buf := bytes.NewBuffer(make([]byte, 0, 32*1024))
return buf
},
}

func GetPooledBuf() *bytes.Buffer {
buf := bufPool.Get().(*bytes.Buffer)
return buf
}

func PutPooledBuf(buf *bytes.Buffer) {
buf.Reset()
bufPool.Put(buf)
}

func CollectBody(res http.ResponseWriter, req *http.Request, maxBodySize int64) ([]byte, int, error) {
body := req.Body

Expand All @@ -54,16 +73,25 @@ func CollectBody(res http.ResponseWriter, req *http.Request, maxBodySize int64)
}

body = http.MaxBytesReader(res, body, maxBodySize)
bytes, err := ioutil.ReadAll(body)
if err != nil {
return nil, http.StatusRequestEntityTooLarge, err
}

if req.Header.Get("Content-Encoding") == "snappy" {
bytes, err = snappy.Decode(nil, bytes)
// for snappy encoding, use pooled buf to read compressed request body
buf := GetPooledBuf()
defer PutPooledBuf(buf)
_, err := io.Copy(buf, body) // nolint
if err != nil {
return nil, http.StatusBadRequest, err
}
data, err := snappy.Decode(nil, buf.Bytes())
if err != nil {
return nil, http.StatusBadRequest, err
}
return data, http.StatusOK, nil
}

bytes, err := ioutil.ReadAll(body)
if err != nil {
return nil, http.StatusRequestEntityTooLarge, err
}

if req.Header.Get("x-log-compresstype") == "lz4" {
Expand Down
3 changes: 2 additions & 1 deletion pkg/protocol/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
type Option struct {
FieldsExtend bool
DisableUncompress bool
AllowUnsafeMode bool
}

// GetDecoder return a new decoder for specific format
Expand All @@ -45,7 +46,7 @@ func GetDecoderWithOptions(format string, option Option) (extensions.Decoder, er
case common.ProtocolSLS:
return &sls.Decoder{}, nil
case common.ProtocolPrometheus:
return &prometheus.Decoder{}, nil
return &prometheus.Decoder{AllowUnsafeMode: option.AllowUnsafeMode}, nil
case common.ProtocolInflux, common.ProtocolInfluxdb:
return &influxdb.Decoder{FieldsExtend: option.FieldsExtend}, nil
case common.ProtocolStatsd:
Expand Down
Loading

0 comments on commit 4a8776d

Please sign in to comment.