Skip to content

Commit

Permalink
feat: option to log flux queries cancelled because of server shutdown (
Browse files Browse the repository at this point in the history
…#23032)

Co-authored-by: DStrand1 <dstrandboge@influxdata.com>
  • Loading branch information
williamhbaker and DStrand1 authored Jan 11, 2022
1 parent a812d8b commit b02c89e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
1 change: 1 addition & 0 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
MaxMemoryBytes: opts.MaxMemoryBytes,
QueueSize: opts.QueueSize,
ExecutorDependencies: dependencyList,
FluxLogEnabled: opts.FluxLogEnabled,
}, m.log.With(zap.String("service", "storage-reads")))
if err != nil {
m.log.Error("Failed to create query controller", zap.Error(err))
Expand Down
51 changes: 35 additions & 16 deletions query/control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type Controller struct {
log *zap.Logger

dependencies []flux.Dependency

fluxLogEnabled bool
}

type Config struct {
Expand Down Expand Up @@ -115,6 +117,9 @@ type Config struct {
MetricLabelKeys []string

ExecutorDependencies []flux.Dependency

// FluxLogEnabled logs any in-progress queries that get cancelled due to the server being shut down.
FluxLogEnabled bool
}

// complete will fill in the defaults, validate the configuration, and
Expand Down Expand Up @@ -205,16 +210,17 @@ func New(config Config, logger *zap.Logger) (*Controller, error) {
queryQueue = nil
}
ctrl := &Controller{
config: c,
queries: make(map[QueryID]*Query),
queryQueue: queryQueue,
done: make(chan struct{}),
abort: make(chan struct{}),
memory: mm,
log: logger,
metrics: newControllerMetrics(metricLabelKeys),
labelKeys: metricLabelKeys,
dependencies: c.ExecutorDependencies,
config: c,
queries: make(map[QueryID]*Query),
queryQueue: queryQueue,
done: make(chan struct{}),
abort: make(chan struct{}),
memory: mm,
log: logger,
metrics: newControllerMetrics(metricLabelKeys),
labelKeys: metricLabelKeys,
dependencies: c.ExecutorDependencies,
fluxLogEnabled: config.FluxLogEnabled,
}
if c.ConcurrencyQuota != 0 {
quota := int(c.ConcurrencyQuota)
Expand Down Expand Up @@ -257,7 +263,7 @@ func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query,
// query submits a query for execution returning immediately.
// Done must be called on any returned Query objects.
func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) {
q, err := c.createQuery(ctx, compiler.CompilerType())
q, err := c.createQuery(ctx, compiler)
if err != nil {
return nil, handleFluxError(err)
}
Expand All @@ -277,7 +283,7 @@ func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Qu
return q, nil
}

func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Query, error) {
func (c *Controller) createQuery(ctx context.Context, compiler flux.Compiler) (*Query, error) {
c.queriesMu.RLock()
if c.shutdown {
c.queriesMu.RUnlock()
Expand All @@ -300,7 +306,7 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Qu
labelValues[i] = str
compileLabelValues[i] = str
}
compileLabelValues[len(compileLabelValues)-1] = string(ct)
compileLabelValues[len(compileLabelValues)-1] = string(compiler.CompilerType())

cctx, cancel := context.WithCancel(ctx)
parentSpan, parentCtx := tracing.StartSpanFromContextWithPromMetrics(
Expand All @@ -320,6 +326,7 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Qu
parentSpan: parentSpan,
cancel: cancel,
doneCh: make(chan struct{}),
compiler: compiler,
}

// Lock the queries mutex for the rest of this method.
Expand Down Expand Up @@ -546,6 +553,17 @@ func (c *Controller) Shutdown(ctx context.Context) error {
// Cancel all of the currently active queries.
c.queriesMu.RLock()
for _, q := range c.queries {
if c.fluxLogEnabled {
var fluxScript string
fc, ok := q.compiler.(lang.FluxCompiler)
if !ok {
fluxScript = "unknown"
} else {
fluxScript = fc.Query
}
c.log.Info("Cancelling Flux query because of server shutdown", zap.String("query", fluxScript))
}

q.Cancel()
}
c.queriesMu.RUnlock()
Expand Down Expand Up @@ -608,9 +626,10 @@ type Query struct {
done sync.Once
doneCh chan struct{}

program flux.Program
exec flux.Query
results chan flux.Result
program flux.Program
exec flux.Query
results chan flux.Result
compiler flux.Compiler

memoryManager *queryMemoryManager
alloc *memory.Allocator
Expand Down

0 comments on commit b02c89e

Please sign in to comment.