Skip to content

Commit

Permalink
feature: Add ingester handler for shutdown and forget tokens (#6179)
Browse files Browse the repository at this point in the history
* Update grafana/dskit to 07166f9

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add /ingester/shutdown handler

This handler replaces the deprecated /ingester/flush_shutdown handler
and can be used to gracefully shut down a Loki instance and delete the
file that persists the tokens of the ingester ring.

In production environments you usually want to persist ring tokens so
that during a restart of an ingester instance, or during rollout, the
tokens from that instance are not re-distributed to other instances, but
instead kept so that the same streams end up on the same instance once
it is up and running again. For that, the tokens are written to a file
that can be specified via the `-ingester.tokens-file-path` argument.

In certain cases, however, you want to forget the tokens and
re-distribute them when shutting down an ingester instance. This was
already possible by calling `/ingester/flush_shutdown`, deleting the
tokens file and terminating the process. The new handler
`/ingester/shutdown` combines these manual steps into a
single handler.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add changelog entry

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Jun 14, 2022
1 parent 50533cb commit c12a1f4
Show file tree
Hide file tree
Showing 19 changed files with 505 additions and 392 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [6372](https://github.com/grafana/loki/pull/6372) **splitice**: Add support for numbers in JSON fields
* [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target
* [6179](https://github.com/grafana/loki/pull/6179) **chaudum**: Add new HTTP endpoint to delete ingester ring token file and shutdown process gracefully
* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage
* [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization
* [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail
Expand Down
26 changes: 24 additions & 2 deletions docs/sources/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ These endpoints are exposed by the distributor:
These endpoints are exposed by the ingester:

- [`POST /flush`](#post-flush)
- [`POST /ingester/flush_shutdown`](#post-ingesterflush_shutdown)
- **Deprecated** [`POST /ingester/flush_shutdown`](#post-ingesterflush_shutdown)
- [`POST /ingester/shutdown`](#post-ingestershutdown)

The API endpoints starting with `/loki/` are [Prometheus API-compatible](https://prometheus.io/docs/prometheus/latest/querying/api/) and the result formats can be used interchangeably.

Expand Down Expand Up @@ -807,13 +808,34 @@ In microservices mode, the `/flush` endpoint is exposed by the ingester.

## `POST /ingester/flush_shutdown`

**Deprecated**: Please use `/ingester/shutdown?flush=true` instead.

`/ingester/flush_shutdown` triggers a shutdown of the ingester and notably will _always_ flush any in memory chunks it holds.
This is helpful for scaling down WAL-enabled ingesters where we want to ensure old WAL directories are not orphaned,
but instead flushed to our chunk backend.

In microservices mode, the `/ingester/flush_shutdown` endpoint is exposed by the ingester.

### `GET /distributor/ring`
## `POST /ingester/shutdown`

`/ingester/shutdown` is similar to the [`/ingester/flush_shutdown`](#post-ingesterflush_shutdown)
endpoint, but accepts three URL query parameters `flush`, `delete_ring_tokens`, and `terminate`.

**URL query parameters:**

* `flush=<bool>`:
Flag to control whether to flush any in-memory chunks the ingester holds. Defaults to `true`.
* `delete_ring_tokens=<bool>`:
Flag to control whether to delete the file that contains the ingester ring tokens of the instance if the `-ingester.token-file-path` is specified.
* `terminate=<bool>`:
Flag to control whether to terminate the Loki process after service shutdown. Defaults to `true`.

This handler, in contrast to the `/ingester/flush_shutdown` handler, terminates the Loki process by default.
This behaviour can be changed by setting the `terminate` query parameter to `false`.

In microservices mode, the `/ingester/shutdown` endpoint is exposed by the ingester.

## `GET /distributor/ring`

Displays a web page with the distributor hash ring status, including the state, healthy and last heartbeat time of each distributor.

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca
github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/regexp v0.0.0-20220304100321-149c8afcd6cb
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1039,8 +1039,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM=
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca h1:0qHzm6VS0bCsSWKHuyfpt+pdpyScdZbzY/IFIyKSYOk=
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca/go.mod h1:q51XdMLLHNZJSG6KOGujC20ed2OoLFdx0hBmOEVfRs0=
github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96 h1:mZluMeUp1vLHKb1nSrMnA0mfupSpBeUkZqDDpfHabrQ=
github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96/go.mod h1:9It/K30QPyj/FuTqBb/SYnaS4/BJCP5YL4SRfXB7dG0=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
Expand Down
65 changes: 62 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
Expand Down Expand Up @@ -172,8 +173,10 @@ type Interface interface {
logproto.QuerierServer
CheckReady(ctx context.Context) error
FlushHandler(w http.ResponseWriter, _ *http.Request)
ShutdownHandler(w http.ResponseWriter, r *http.Request)
GetOrCreateInstance(instanceID string) (*instance, error)
// deprecated
LegacyShutdownHandler(w http.ResponseWriter, r *http.Request)
ShutdownHandler(w http.ResponseWriter, r *http.Request)
}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -209,6 +212,10 @@ type Ingester struct {
// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch
// Flag for whether stopping the ingester service should also terminate the
// loki process.
// This is set when calling the shutdown handler.
terminateOnShutdown bool

// Only used by WAL & flusher to coordinate backpressure during replay.
replayController *replayController
Expand Down Expand Up @@ -245,6 +252,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
tailersQuit: make(chan struct{}),
metrics: metrics,
flushOnShutdownSwitch: &OnceSwitch{},
terminateOnShutdown: false,
}
i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i})

Expand Down Expand Up @@ -506,6 +514,12 @@ func (i *Ingester) stopping(_ error) error {
}
i.flushQueuesDone.Wait()

// In case the flag to terminate on shutdown is set we need to mark the
// ingester service as "failed", so Loki will shut down entirely.
// The module manager logs the failure `modules.ErrStopProcess` in a special way.
if i.terminateOnShutdown && errs.Err() == nil {
return modules.ErrStopProcess
}
return errs.Err()
}

Expand All @@ -526,10 +540,16 @@ func (i *Ingester) loop() {
}
}

// ShutdownHandler triggers the following set of operations in order:
// LegacyShutdownHandler triggers the following set of operations in order:
// * Change the state of ring to stop accepting writes.
// * Flush all the chunks.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
// Note: This handler does not trigger a termination of the Loki process,
// despite its name. Instead, the ingester service is stopped, so an external
// source can trigger a safe termination through a signal to the process.
// The handler is deprecated and usage is discouraged. Use ShutdownHandler
// instead.
func (i *Ingester) LegacyShutdownHandler(w http.ResponseWriter, r *http.Request) {
level.Warn(util_log.Logger).Log("msg", "The handler /ingester/flush_shutdown is deprecated and usage is discouraged. Please use /ingester/shutdown?flush=true instead.")
originalState := i.lifecycler.FlushOnShutdown()
// We want to flush the chunks if transfer fails irrespective of original flag.
i.lifecycler.SetFlushOnShutdown(true)
Expand All @@ -538,6 +558,45 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// ShutdownHandler handles a graceful shutdown of the ingester service and
// termination of the Loki process.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
// Don't allow calling the shutdown handler multiple times
if i.State() != services.Running {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte("Ingester is stopping or already stopped."))
return
}
params := r.URL.Query()
doFlush := util.FlagFromValues(params, "flush", true)
doDeleteRingTokens := util.FlagFromValues(params, "delete_ring_tokens", false)
doTerminate := util.FlagFromValues(params, "terminate", true)
err := i.handleShutdown(doTerminate, doFlush, doDeleteRingTokens)

// Stopping the module will return the modules.ErrStopProcess error. This is
// needed so the Loki process is shut down completely.
if err == nil || err == modules.ErrStopProcess {
w.WriteHeader(http.StatusNoContent)
} else {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
}
}

// handleShutdown triggers the following operations:
// * Change the state of ring to stop accepting writes.
// * optional: Flush all the chunks.
// * optional: Delete ring tokens file
// * Unregister from KV store
// * optional: Terminate process (handled by service manager in loki.go)
func (i *Ingester) handleShutdown(terminate, flush, del bool) error {
i.lifecycler.SetFlushOnShutdown(flush)
i.lifecycler.SetClearTokensOnShutdown(del)
i.lifecycler.SetUnregisterOnShutdown(true)
i.terminateOnShutdown = terminate
return services.StopAndAwaitTerminated(context.Background(), i)
}

// Push implements logproto.Pusher.
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
instanceID, err := tenant.TenantID(ctx)
Expand Down
12 changes: 9 additions & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,15 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
)
t.Server.HTTP.Path("/flush").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)))
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)))

t.Server.HTTP.Methods("GET", "POST").Path("/flush").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)),
)
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.LegacyShutdownHandler)),
)
t.Server.HTTP.Methods("POST").Path("/ingester/shutdown").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)),
)
return t.Ingester, nil
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"html/template"
"io"
"net/http"
"net/url"
"strings"

"github.com/go-kit/log"
Expand Down Expand Up @@ -285,3 +286,14 @@ func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compressi
}
return nil
}

func FlagFromValues(values url.Values, key string, d bool) bool {
switch strings.ToLower(values.Get(key)) {
case "t", "true", "1":
return true
case "f", "false", "0":
return false
default:
return d
}
}
Loading

0 comments on commit c12a1f4

Please sign in to comment.