Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add batching for traces
Browse files Browse the repository at this point in the history
Batching will help to achieve better ingest performance, especially if
traces are sent one by one (which is the case for Jaeger collector).

Adds async support for traces meaning that client doesn't
need to wait for DB write. This increases ingest performance with a small
risk of data loss. New CLI flag `tracing.async-acks` added.
  • Loading branch information
niksajakovljevic committed Aug 10, 2022
1 parent 9c1653a commit a7546c1
Show file tree
Hide file tree
Showing 19 changed files with 569 additions and 176 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ We use the following categories for changes:

### Added
- Implement Jaeger gRPC remote storage writer interface [#1543]

- Batching for traces with async support (new CLI flag) [#1554]
- Helm chart now ships a JSON Schema for imposing a structure of the values.yaml file [#1551]

### Changed
Expand Down
9 changes: 8 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ The following subsections cover all CLI flags which promscale supports. You can
| config | string | config.yml | YAML configuration file path for Promscale. |
| enable-feature | string | "" | Enable one or more experimental promscale features (as a comma-separated list). Current experimental features are `promql-at-modifier`, `promql-negative-offset` and `promql-per-step-stats`. For more information, please consult the following resources: [promql-at-modifier](https://prometheus.io/docs/prometheus/latest/feature_flags/#modifier-in-promql), [promql-negative-offset](https://prometheus.io/docs/prometheus/latest/feature_flags/#negative-offset-in-promql), [promql-per-step-stats](https://prometheus.io/docs/prometheus/latest/feature_flags/#per-step-stats). |
| thanos.store-api.server-address | string | "" (disabled) | Address to listen on for Thanos Store API endpoints. |
| tracing.otlp.server-address | string | ":9202" | Address to listen on for OpenTelemetry OTLP GRPC server. |
| tracing.otlp.server-address | string | ":9202" | Address
to listen on for OpenTelemetry OTLP GRPC server.
| |
| tracing.async-acks | boolean | false |
Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of
traces data in the database. This increases throughput at the cost of a small chance of data
loss.
|

### Auth flags

Expand Down
3 changes: 2 additions & 1 deletion pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
c := ingestor.Cfg{
NumCopiers: numCopiers,
IgnoreCompressedChunks: cfg.IgnoreCompressedChunks,
AsyncAcks: cfg.AsyncAcks,
AsyncAcksMetrics: cfg.AsyncAcksMetrics,
AsyncAcksTraces: cfg.AsyncAcksTraces,
InvertedLabelsCacheSize: cfg.CacheConfig.InvertedLabelsCacheSize,
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/pgclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type Config struct {
SslMode string
DbConnectionTimeout time.Duration
IgnoreCompressedChunks bool
AsyncAcks bool
AsyncAcksMetrics bool
AsyncAcksTraces bool
WriteConnections int
WriterPoolSize int
WriterSynchronousCommit bool
Expand Down Expand Up @@ -79,7 +80,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.BoolVar(&cfg.IgnoreCompressedChunks, "metrics.ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+
"Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+
"However, setting this to true will save your resources that may be required during decompression. ")
fs.IntVar(&cfg.WriteConnections, "db.connections.num-writers", 0, "Number of database connections for writing metrics to database. "+
fs.IntVar(&cfg.WriteConnections, "db.connections.num-writers", 0, "Number of database connections for writing metrics/traces to database. "+
"By default, this will be set based on the number of CPUs available to the DB Promscale is connected to.")
fs.IntVar(&cfg.WriterPoolSize, "db.connections.writer-pool.size", defaultPoolSize, "Maximum size of the writer pool of database connections. This defaults to 50% of max_connections "+
"allowed by the database.")
Expand All @@ -92,7 +93,8 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
"Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`")
fs.BoolVar(&cfg.EnableStatementsCache, "db.statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. "+
"Disable if using PgBouncer")
fs.BoolVar(&cfg.AsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.BoolVar(&cfg.AsyncAcksMetrics, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.BoolVar(&cfg.AsyncAcksTraces, "tracing.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of traces data in the database. This increases throughput at the cost of a small chance of data loss.")
return cfg
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/ingestor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
scache: scache,
exemplarKeyPosCache: eCache,
completeMetricCreation: make(chan struct{}, 1),
asyncAcks: cfg.AsyncAcks,
asyncAcks: cfg.AsyncAcksMetrics,
copierReadRequestCh: copierReadRequestCh,
// set to run at half our deletion interval
seriesEpochRefresh: time.NewTicker(30 * time.Minute),
Expand Down
8 changes: 5 additions & 3 deletions pkg/pgmodel/ingestor/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
)

type Cfg struct {
AsyncAcks bool
AsyncAcksMetrics bool
AsyncAcksTraces bool
NumCopiers int
DisableEpochSync bool
IgnoreCompressedChunks bool
Expand All @@ -49,7 +50,7 @@ func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.
return &DBIngestor{
sCache: sCache,
dispatcher: dispatcher,
tWriter: trace.NewWriter(conn),
tWriter: trace.NewDispatcher(trace.NewWriter(conn), cfg.NumCopiers, cfg.AsyncAcksTraces),
closed: atomic.NewBool(false),
}, nil
}
Expand All @@ -58,7 +59,7 @@ func NewPgxIngestor(conn pgxconn.PgxConn, cache cache.MetricCache, sCache cache.
// with an empty config, a new default size metrics cache and a non-ha-aware data parser
func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error) {
if cfg == nil {
cfg = &Cfg{InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize}
cfg = &Cfg{InvertedLabelsCacheSize: cache.DefaultConfig.InvertedLabelsCacheSize, NumCopiers: 2}
}
cacheConfig := cache.DefaultConfig
c := cache.NewMetricCache(cacheConfig)
Expand Down Expand Up @@ -282,6 +283,7 @@ func (ingestor *DBIngestor) Close() {
if ingestor.closed.Load() {
return
}
ingestor.tWriter.Close()
ingestor.closed.Store(true)
ingestor.dispatcher.Close()
}
Expand Down
214 changes: 214 additions & 0 deletions pkg/pgmodel/ingestor/trace/trace_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package trace

import (
"context"
"runtime"
"sync"
"time"

"github.com/timescale/promscale/pkg/log"
"go.opentelemetry.io/collector/pdata/ptrace"
)

const (
defaultReqBufferSize = 10000 // tweaking this does not result in significant perf impact
defaultBatchSize = 5000 // this is >= (we might produce bigger batches sometimes)
defaultBatchTimeout = 200 * time.Millisecond // arbitrary picked. We don't want client to wait too long
defaultMaxBufferedBatches = 200 // arbitrary picked. We want to avoid wait on batches
)

var defaultBatchWorkers = runtime.NumCPU() / 2 // we only take half so other half can be used for writers

// We batch individual InsertTracesReq
type Batch struct {
traces ptrace.Traces
spanCount int
reqStatus []chan error
}

func NewBatch() *Batch {
return &Batch{
traces: ptrace.NewTraces(),
spanCount: 0,
reqStatus: make([]chan error, 0, defaultBatchSize),
}
}

func (tb *Batch) add(in InsertTracesReq) {
inSpans := in.payload.SpanCount()
if inSpans == 0 {
return
}
in.payload.ResourceSpans().MoveAndAppendTo(tb.traces.ResourceSpans())
tb.spanCount += inSpans
tb.reqStatus = append(tb.reqStatus, in.err)
}

func (tb *Batch) isFull() bool {
return tb.spanCount >= defaultBatchSize
}

func (tb *Batch) isEmpty() bool {
return tb.spanCount == 0
}

// Batcher batches trace requests and sends batches to batch writer
// This is done to achieve better ingest performance especially b/c
// Jaeger collector sends traces one by one
type Batcher struct {
in chan InsertTracesReq
stop chan struct{}
batchers int
batchWriter *batchWriter
once sync.Once
bufferedBatches chan Batch
wg sync.WaitGroup
timeout time.Duration
}

type WriteTraces func(context.Context, ptrace.Traces) error

func NewBatcher(batchers, writers int, writer Writer) *Batcher {
return newBatcherWithTimeout(batchers, writers, writer, defaultBatchTimeout)
}

func newBatcherWithTimeout(batchers, writers int, writer Writer, timeout time.Duration) *Batcher {
if batchers == 0 || writers == 0 {
panic("number of batchers and writeres must be greater then zero")
}
bufferedBatches := make(chan Batch, defaultMaxBufferedBatches)
return &Batcher{
batchWriter: newBatchWriter(writers, writer, bufferedBatches),
in: make(chan InsertTracesReq, defaultReqBufferSize),
stop: make(chan struct{}, 1),
batchers: batchers,
bufferedBatches: bufferedBatches,
timeout: timeout,
}
}

func (b *Batcher) Run() {
for i := 0; i < b.batchers; i++ {
b.wg.Add(1)
go func() {
defer b.wg.Done()
b.batch()
}()
}
b.batchWriter.run()
}

func (b *Batcher) batch() {
ticker := time.NewTicker(b.timeout)
batch := NewBatch()
flushBatch := func(batch *Batch) *Batch {
batchCp := *batch
b.bufferedBatches <- batchCp
return NewBatch()
}
for {
select {
case <-b.stop:
ticker.Stop()
if !batch.isEmpty() {
flushBatch(batch)
}
return
case item := <-b.in:
if batch.isFull() {
batch = flushBatch(batch)
ticker.Reset(b.timeout)
}
batch.add(item)
case <-ticker.C:
if !batch.isEmpty() {
batch = flushBatch(batch)
}
}
}
}

func (b *Batcher) Stop() {
b.once.Do(func() {
b.stop <- struct{}{}
close(b.stop)
b.wg.Wait()
close(b.bufferedBatches)
b.batchWriter.stop()
})
}

// batchWriter writes batches using writer
type batchWriter struct {
batches chan Batch
writers int
writer Writer
stopCh chan struct{}
once sync.Once
wg sync.WaitGroup
}

func newBatchWriter(writers int, writer Writer, batches chan Batch) *batchWriter {
return &batchWriter{
writer: writer,
writers: writers,
batches: batches,
stopCh: make(chan struct{}, 1),
}
}

func (bw *batchWriter) run() {
for i := 0; i < bw.writers; i++ {
bw.wg.Add(1)
go func() {
defer bw.wg.Done()
for {
select {
case b, ok := <-bw.batches:
if !ok {
return
}
bw.flush(b)
case <-bw.stopCh:
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
bw.drainBuffer(timeoutCtx)
return
}
}
}()
}
}

func (bw *batchWriter) flush(b Batch) {
if b.spanCount != 0 {
err := bw.writer.InsertTraces(context.Background(), b.traces)
for _, req := range b.reqStatus {
req <- err
}
}

}

func (bw *batchWriter) drainBuffer(ctx context.Context) {
for {
select {
case b, ok := <-bw.batches:
if !ok {
return
}
bw.flush(b)
case <-ctx.Done():
log.Warn("msg", "Forced batchWriter shutdown due to timeout. Some batches migth not be written.")
return
}
}
}

func (bw *batchWriter) stop() {
bw.once.Do(func() {
bw.stopCh <- struct{}{}
close(bw.stopCh)
bw.wg.Wait()
})
}
58 changes: 58 additions & 0 deletions pkg/pgmodel/ingestor/trace/trace_batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package trace

import (
"context"
"testing"
"time"

"github.com/timescale/promscale/pkg/tests/testdata"

"go.opentelemetry.io/collector/pdata/ptrace"
)

type noopWriter struct {
callBack func(t ptrace.Traces)
}

func (nw *noopWriter) InsertTraces(ctx context.Context, traces ptrace.Traces) error {
nw.callBack(traces)
return nil
}

func (nw *noopWriter) Close() {}

func TestTraceBatcherBatching(t *testing.T) {
traces := testdata.GenerateTestTraces(4)
batchChecker := func(traces ptrace.Traces) {
if traces.SpanCount() <= 250 { // one trace has 250 spans
t.Errorf("wrong batch size. got: %v", traces.SpanCount())
}
}
batcher := newBatcherWithTimeout(1, 1, &noopWriter{callBack: batchChecker}, time.Hour) // we set long enough batch timeout
batcher.Run()
for _, t := range traces {
batcher.in <- InsertTracesReq{payload: t, err: make(chan error, 1)}
}
batcher.Stop()
}

func TestTraceBatcherTimeout(t *testing.T) {
traces := testdata.GenerateTestTraces(4)
flushCounter := 0
batchChecker := func(traces ptrace.Traces) {
flushCounter++
if traces.SpanCount() != 250 { // one trace has 250 spans meaning there is no batching on size
t.Errorf("wrong batch size. got: %v", traces.SpanCount())
}
}
batcher := NewBatcher(1, 1, &noopWriter{callBack: batchChecker})
batcher.Run()
for _, t := range traces {
batcher.in <- InsertTracesReq{payload: t, err: make(chan error, 1)}
time.Sleep(350 * time.Millisecond) // to make sure batch timeout is reached
}
batcher.Stop()
if flushCounter != 4 {
t.Errorf("wrong number of batch flushes. got: %v", flushCounter)
}
}
Loading

0 comments on commit a7546c1

Please sign in to comment.