Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add batching for traces
Browse files Browse the repository at this point in the history
Batching will help to achieve better ingest performance, especially if
traces are sent one by one (which is the case for Jaeger collector).

Batch is flushed either on timeout or when full.

Adds async support for traces meaning that client doesn't
need to wait for DB write. This increases ingest performance with a small
risk of data loss. New CLI flag `tracing.async-acks` added.

Flags to control batch size: `tracing.max-batch-size` and `tracing.batch-timeout`.
Flags to control batch workers: `tracing.batch-workers`
  • Loading branch information
niksajakovljevic committed Aug 29, 2022
1 parent 5a3a365 commit 85556ae
Show file tree
Hide file tree
Showing 28 changed files with 808 additions and 195 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ We use the following categories for changes:

### Added
- Implement Jaeger gRPC remote storage writer interface [#1543]
- Batching for traces to improve ingest performance along with CLI flags for better control [#1554]
- Helm chart now ships a JSON Schema for imposing a structure of the values.yaml file [#1551]

### Changed
Expand Down
23 changes: 16 additions & 7 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,22 @@ The following subsections cover all CLI flags which promscale supports. You can

### General flags

| Flag | Type | Default | Description |
|---------------------------------|:------------------------------:|:-------------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| cache.memory-target | unsigned-integer or percentage | 80% | Target for max amount of memory to use. Specified in bytes or as a percentage of system memory (e.g. 80%). |
| config | string | config.yml | YAML configuration file path for Promscale. |
| enable-feature | string | "" | Enable one or more experimental promscale features (as a comma-separated list). Current experimental features are `promql-at-modifier`, `promql-negative-offset` and `promql-per-step-stats`. For more information, please consult the following resources: [promql-at-modifier](https://prometheus.io/docs/prometheus/latest/feature_flags/#modifier-in-promql), [promql-negative-offset](https://prometheus.io/docs/prometheus/latest/feature_flags/#negative-offset-in-promql), [promql-per-step-stats](https://prometheus.io/docs/prometheus/latest/feature_flags/#per-step-stats). |
| thanos.store-api.server-address | string | "" (disabled) | Address to listen on for Thanos Store API endpoints. |
| tracing.otlp.server-address | string | ":9202" | Address to listen on for OpenTelemetry OTLP GRPC server. |
| Flag | Type | Default | Description |
|---------------------------------------------------------------------------------------------------|:------------------------------:|:-------------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| cache.memory-target | unsigned-integer or percentage | 80% | Target for max amount of memory to use. Specified in bytes or as a percentage of system memory (e.g. 80%). |
| config | string | config.yml | YAML configuration file path for Promscale. |
| enable-feature | string | "" | Enable one or more experimental promscale features (as a comma-separated list). Current experimental features are `promql-at-modifier`, `promql-negative-offset` and `promql-per-step-stats`. For more information, please consult the following resources: [promql-at-modifier](https://prometheus.io/docs/prometheus/latest/feature_flags/#modifier-in-promql), [promql-negative-offset](https://prometheus.io/docs/prometheus/latest/feature_flags/#negative-offset-in-promql), [promql-per-step-stats](https://prometheus.io/docs/prometheus/latest/feature_flags/#per-step-stats). |
| thanos.store-api.server-address | string | "" (disabled) | Address to listen on for Thanos Store API endpoints. |
| tracing.otlp.server-address | string | ":9202" | Address |
| to listen on for OpenTelemetry OTLP GRPC server. | | | |
| | | | |
| tracing.async-acks | boolean | false | |
| Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of | | | |
| traces data in the database. This increases throughput at the cost of a small chance of data loss | | | |
| tracing.max-batch-size | integer | 5000 | Maximum size of trace batch that is written to DB |
| tracing.batch-timeout | duration | 250ms | Timeout after new trace batch is created |

| tracing.batch-workers | integer | num of available cpus | Number of workers responsible for creating trace batches. Defaults to number of CPUs.

### Auth flags

Expand Down
11 changes: 11 additions & 0 deletions pkg/jaeger/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (p *Store) SpanWriter() spanstore.Writer {
return p
}

func (p *Store) StreamingSpanWriter() spanstore.Writer {
return p
}

func (p *Store) WriteSpan(ctx context.Context, span *model.Span) error {
batches := []*model.Batch{
{
Expand All @@ -57,6 +61,13 @@ func (p *Store) WriteSpan(ctx context.Context, span *model.Span) error {
return p.inserter.IngestTraces(ctx, traces)
}

// Close performs graceful shutdown of SpanWriter on Jaeger collector shutdown.
// In our case we have nothing to do
// Noop impl avoid getting GRPC error message when Jaeger collector shuts down.
func (p *Store) Close() error {
return nil
}

func (p *Store) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
code := "5xx"
start := time.Now()
Expand Down
6 changes: 5 additions & 1 deletion pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
c := ingestor.Cfg{
NumCopiers: numCopiers,
IgnoreCompressedChunks: cfg.IgnoreCompressedChunks,
AsyncAcks: cfg.AsyncAcks,
MetricsAsyncAcks: cfg.MetricsAsyncAcks,
TracesAsyncAcks: cfg.TracesAsyncAcks,
InvertedLabelsCacheSize: cfg.CacheConfig.InvertedLabelsCacheSize,
TracesBatchTimeout: cfg.TracesBatchTimeout,
TracesMaxBatchSize: cfg.TracesMaxBatchSize,
TracesBatchWorkers: cfg.TracesBatchWorkers,
}

var writerConn pgxconn.PgxConn
Expand Down
15 changes: 12 additions & 3 deletions pkg/pgclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/jackc/pgx/v4"
"github.com/timescale/promscale/pkg/limits"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
"github.com/timescale/promscale/pkg/version"
)

Expand All @@ -31,7 +32,8 @@ type Config struct {
SslMode string
DbConnectionTimeout time.Duration
IgnoreCompressedChunks bool
AsyncAcks bool
MetricsAsyncAcks bool
TracesAsyncAcks bool
WriteConnections int
WriterPoolSize int
WriterSynchronousCommit bool
Expand All @@ -40,6 +42,9 @@ type Config struct {
UsesHA bool
DbUri string
EnableStatementsCache bool
TracesBatchTimeout time.Duration
TracesMaxBatchSize int
TracesBatchWorkers int
}

const (
Expand Down Expand Up @@ -79,7 +84,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.BoolVar(&cfg.IgnoreCompressedChunks, "metrics.ignore-samples-written-to-compressed-chunks", false, "Ignore/drop samples that are being written to compressed chunks. "+
"Setting this to false allows Promscale to ingest older data by decompressing chunks that were earlier compressed. "+
"However, setting this to true will save your resources that may be required during decompression. ")
fs.IntVar(&cfg.WriteConnections, "db.connections.num-writers", 0, "Number of database connections for writing metrics to database. "+
fs.IntVar(&cfg.WriteConnections, "db.connections.num-writers", 0, "Number of database connections for writing metrics/traces to database. "+
"By default, this will be set based on the number of CPUs available to the DB Promscale is connected to.")
fs.IntVar(&cfg.WriterPoolSize, "db.connections.writer-pool.size", defaultPoolSize, "Maximum size of the writer pool of database connections. This defaults to 50% of max_connections "+
"allowed by the database.")
Expand All @@ -92,7 +97,11 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
"Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`")
fs.BoolVar(&cfg.EnableStatementsCache, "db.statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. "+
"Disable if using PgBouncer")
fs.BoolVar(&cfg.AsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.BoolVar(&cfg.MetricsAsyncAcks, "metrics.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of metric data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.BoolVar(&cfg.TracesAsyncAcks, "tracing.async-acks", false, "Acknowledge asynchronous inserts. If this is true, the inserter will not wait after insertion of traces data in the database. This increases throughput at the cost of a small chance of data loss.")
fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB")
fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created")
fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.")
return cfg
}

Expand Down
Loading

0 comments on commit 85556ae

Please sign in to comment.