diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index f54f8b753de..877a6098389 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -404,6 +404,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { MaxMemoryBytes: opts.MaxMemoryBytes, QueueSize: opts.QueueSize, ExecutorDependencies: dependencyList, + FluxLogEnabled: opts.FluxLogEnabled, }, m.log.With(zap.String("service", "storage-reads"))) if err != nil { m.log.Error("Failed to create query controller", zap.Error(err)) diff --git a/query/control/controller.go b/query/control/controller.go index 682c3a206ac..93a12de1e63 100644 --- a/query/control/controller.go +++ b/query/control/controller.go @@ -67,6 +67,8 @@ type Controller struct { log *zap.Logger dependencies []flux.Dependency + + fluxLogEnabled bool } type Config struct { @@ -115,6 +117,9 @@ type Config struct { MetricLabelKeys []string ExecutorDependencies []flux.Dependency + + // FluxLogEnabled logs any in-progress queries that get cancelled due to the server being shut down. + FluxLogEnabled bool } // complete will fill in the defaults, validate the configuration, and @@ -205,16 +210,17 @@ func New(config Config, logger *zap.Logger) (*Controller, error) { queryQueue = nil } ctrl := &Controller{ - config: c, - queries: make(map[QueryID]*Query), - queryQueue: queryQueue, - done: make(chan struct{}), - abort: make(chan struct{}), - memory: mm, - log: logger, - metrics: newControllerMetrics(metricLabelKeys), - labelKeys: metricLabelKeys, - dependencies: c.ExecutorDependencies, + config: c, + queries: make(map[QueryID]*Query), + queryQueue: queryQueue, + done: make(chan struct{}), + abort: make(chan struct{}), + memory: mm, + log: logger, + metrics: newControllerMetrics(metricLabelKeys), + labelKeys: metricLabelKeys, + dependencies: c.ExecutorDependencies, + fluxLogEnabled: config.FluxLogEnabled, } if c.ConcurrencyQuota != 0 { quota := int(c.ConcurrencyQuota) @@ -257,7 +263,7 @@ func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query, // query submits a query for execution returning immediately. // Done must be called on any returned Query objects. func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) { - q, err := c.createQuery(ctx, compiler.CompilerType()) + q, err := c.createQuery(ctx, compiler) if err != nil { return nil, handleFluxError(err) } @@ -277,7 +283,7 @@ func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Qu return q, nil } -func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Query, error) { +func (c *Controller) createQuery(ctx context.Context, compiler flux.Compiler) (*Query, error) { c.queriesMu.RLock() if c.shutdown { c.queriesMu.RUnlock() @@ -300,7 +306,7 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Qu labelValues[i] = str compileLabelValues[i] = str } - compileLabelValues[len(compileLabelValues)-1] = string(ct) + compileLabelValues[len(compileLabelValues)-1] = string(compiler.CompilerType()) cctx, cancel := context.WithCancel(ctx) parentSpan, parentCtx := tracing.StartSpanFromContextWithPromMetrics( @@ -320,6 +326,7 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Qu parentSpan: parentSpan, cancel: cancel, doneCh: make(chan struct{}), + compiler: compiler, } // Lock the queries mutex for the rest of this method. @@ -546,6 +553,17 @@ func (c *Controller) Shutdown(ctx context.Context) error { // Cancel all of the currently active queries. c.queriesMu.RLock() for _, q := range c.queries { + if c.fluxLogEnabled { + var fluxScript string + fc, ok := q.compiler.(lang.FluxCompiler) + if !ok { + fluxScript = "unknown" + } else { + fluxScript = fc.Query + } + c.log.Info("Cancelling Flux query because of server shutdown", zap.String("query", fluxScript)) + } + q.Cancel() } c.queriesMu.RUnlock() @@ -608,9 +626,10 @@ type Query struct { done sync.Once doneCh chan struct{} - program flux.Program - exec flux.Query - results chan flux.Result + program flux.Program + exec flux.Query + results chan flux.Result + compiler flux.Compiler memoryManager *queryMemoryManager alloc *memory.Allocator