From 0ddec3c793def2fd4dfb5ff4f1d1de690f66875e Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 28 Sep 2020 19:38:04 -0400 Subject: [PATCH 1/3] wal first pass --- .../2020-09-Write-Ahead-Log.md | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 docs/sources/design-documents/2020-09-Write-Ahead-Log.md diff --git a/docs/sources/design-documents/2020-09-Write-Ahead-Log.md b/docs/sources/design-documents/2020-09-Write-Ahead-Log.md new file mode 100644 index 000000000000..a915d2e477a1 --- /dev/null +++ b/docs/sources/design-documents/2020-09-Write-Ahead-Log.md @@ -0,0 +1,84 @@ +## Impetus + +Loki already takes numerous steps to ensure the persistence of log data, most notably the use of a configurable replication factor (redundancy) in the ingesters. However, this still leaves much to be desired in persistence guarantees, especially for single binary deployments. This proposal outlines a write ahead log (WAL) in order to compliment existing measures by allowing storage/replay of incoming writes via local disk on the ingesters. + +## Strategy + +We suggest a two pass WAL implementation which includes an intial recording of accepted writes (`segments`) and a subsequent checkpointing (`checkpoints`) which coalesces the first pass into more efficient representations to speed up replaying. + +### Segments + +Segments are the first pass and most basic WAL. They store individual records of incoming writes that have been accepted and can be used to reconstruct the in memory state of an ingester without any external input. They are sequentially named on disk and are automatically created when a target size is hit as follows: + +``` +data +└── wal + ├── 000000 + ├── 000001 + └── 000002 +``` + +### Truncation + +In order to prevent unbounded growth and remove flushed operations from the WAL, it is regularly truncated and all but the last segment (which is currently active) are deleted at a configurable interval (`ingester.checkpoint-duration`) or after a combined max segment size is reached (`ingester.checkpoint-threshold`), whichever comes first. This is where checkpoints come into the picture. + +### Checkpoints + +Before truncating the WAL, we advance the WAL segments by one in order to ensure we don't delete the currently writing segment. The directory will look like: + +``` +data +└── wal + ├── 000000 + ├── 000001 + ├── 000002 <- likely not full, no matter + └── 000003 <- newly written, empty +``` + +In order to be adaptive in the face of dynamic throughputs, the ingester will calculate the expected time until the next checkpointing operation is performed. We suggest a simplistic approach first: the duration before the current checkpointing operation was requested, which will be a maximum of `ingester.checkpoint-duration` or less if the `ingester.checkpoint-threshold` was triggered. We'll call this our `checkpoint_duration`. Then, to give some wiggle room in case we've overestimated how much time we have until the next checkpoint, we'll reduce this (say to 90%). This `checkpoint_duration` will be used to amortize writes to disk to avoid IOPs burst. + +Then, each in memory stream is iterated every across an interval, calculated by `checkpoint_duration / in_memory_streams` and written to the checkpoint. After the checkpoint completes, it is moved from it's temp directory to the `ingester.wal-dir`, taking the name of the last segment before it started (`checkpoint.000003`) and then all applicable segments (`00000`, `00001`, `00002`) and any previous checkpoint are deleted. + +#### Queueing Checkpoint operations + +Since checkpoints are created at dynamic intervals, it's possible that one operation will start at the same time another is running. In this case, the existing checkpoint operation should disregard it's internal ticker and flush it's series as fast as is possible. Afterwords, the next checkpoint operation can begin. This will likely create a localized spike in IOPS before the amortization of the following checkpoint operation takes over and is another important reason to run the WAL on an isolated disk in order to mitigate noisy neighbor problems. + +### WAL Record Types + +#### Streams + +A `Stream` record type is written when an ingester receives a push for a series it doesn't yet have in memory. + +#### Logs + +A `Logs` record type is written when an ingester receives a push, containing the fingerprint of the series it refers to and a list of `(timestamp, log_line)` tuples, _after_ a `Stream` record type is written, if applicable. + +### Restoration + +Replaying a WAL is done by loading any available checkpoints into memory and then replaying any operations from segments on top. It's likely some of these operations will fail because they're already included in the checkpoint (due to delay introduced in our amortizations), but this is ok -- we won't _lose_ any data, only try to write some data twice, which we'll ignore. + +### Deployment + +Introduction of the WAL requires that ingesters have persistent disks which are reconnected across restarts (this is a good fit for StatefulSets in Kubernetes). Additionally, it's recommended that the WAL uses an independent disk such that it's isolated from being affected by or causing noisy neighbor problems, especially during any IOPS spike(s). + +### Questions +- Can we use underlying prometheus wal pkg when possible for consistency & to mitigate undifferentiated heavy lifting. Interfaces handle page alignment & use []byte. +- Make ingester series data impl proto.Message in order to generate byte conversions for disk loading? +- how to handle `ingester.retain-period` which is helpful b/c queries cache index lookups for 5min + - could we instead only cache index lookups for periods older than `querier.query-ingesters-within`? + - otherwise can be done +- Will the prometheus wal library handle arbitrarily long records? + +### Alternatives + +#### Use the Cortex WAL + +Attractive in a specious way, the Cortex WAL seems unsuited for use in Loki due to revolving around _time_. Creating segments & checkpoints based on elapsed time suits Prometheus data where throughput is uniform and calculable via the `scrape_interval`, but unpredictable stream throughtput in Loki could result in problematic segment/checkpoint sizing. Thus a more dynamic approach is needed. + +Additionally, it would be a less efficient approach which wouldn't take advantage of the block compressions that already occurs in the ingesters. + +#### Don't build checkpoints from memory, instead write new WAL elements + +Instead of building checkpoints from memory, this would build the same efficiencies into two distinct WAL Record types: `Blocks` and `FlushedChunks`. The former is a record type which willl contain an entire compressed block after it's cut and the latter will contain an entire chunk + the sequence of blocks it holds when it's flushed. This may offer good enough amortization of writes because block cuts are assumed to be evenly distributed & chunk flushes have the same property and use jitter for synchronization. + +This would allow building checkpoints without relying on an ingester's internal state, but would likely require multiple WALs, partitioned by record type in order to be able to iterate all `FlushedChunks` -> `Blocks` -> `Series` -> `Samples` such that we could no-op the later (lesser priority) types that are superseded by the former types. The benefits do not seem worth the cost here, especially considering the simpler suggested alternative and the extensibility costs if we need to add new record types if/when the ingester changes internally. From fc7d028b55f6be3c752f15072f1de8f4079c43db Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 30 Sep 2020 11:52:16 -0400 Subject: [PATCH 2/3] design doc wal --- .../2020-09-Write-Ahead-Log.md | 74 +++++++++++++------ 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/docs/sources/design-documents/2020-09-Write-Ahead-Log.md b/docs/sources/design-documents/2020-09-Write-Ahead-Log.md index a915d2e477a1..faeb1977b3b7 100644 --- a/docs/sources/design-documents/2020-09-Write-Ahead-Log.md +++ b/docs/sources/design-documents/2020-09-Write-Ahead-Log.md @@ -1,14 +1,19 @@ +## Metadata + +Author: Owen Diehl - [owen-d](https://github.com/owen-d) ([Grafana Labs](https://grafana.com/)) +Date: 30/09/2020 + ## Impetus -Loki already takes numerous steps to ensure the persistence of log data, most notably the use of a configurable replication factor (redundancy) in the ingesters. However, this still leaves much to be desired in persistence guarantees, especially for single binary deployments. This proposal outlines a write ahead log (WAL) in order to compliment existing measures by allowing storage/replay of incoming writes via local disk on the ingesters. +Loki already takes numerous steps to ensure the persistence of log data, most notably the use of a configurable replication factor (redundancy) in the ingesters. However, this still leaves much to be desired in persistence guarantees, especially for single binary deployments. This proposal outlines a write ahead log (WAL) in order to complement existing measures by allowing storage/replay of incoming writes via local disk on the ingester components. ## Strategy -We suggest a two pass WAL implementation which includes an intial recording of accepted writes (`segments`) and a subsequent checkpointing (`checkpoints`) which coalesces the first pass into more efficient representations to speed up replaying. +We suggest a two pass WAL implementation which includes an initial recording of accepted writes (`segments`) and a subsequent checkpointing (`checkpoints`) which coalesces the first pass into more efficient representations to speed up replaying. ### Segments -Segments are the first pass and most basic WAL. They store individual records of incoming writes that have been accepted and can be used to reconstruct the in memory state of an ingester without any external input. They are sequentially named on disk and are automatically created when a target size is hit as follows: +Segments are the first pass and most basic WAL. They store individual records of incoming writes that have been accepted and can be used to reconstruct the in memory state of an ingester without any external input. Each segment is some multiple of 32kB and upon filling one segment, a new segment is created. Initially Loki will try 256kb segment sizes, readjusting as necessary. They are sequentially named on disk and are automatically created when a target size is hit as follows: ``` data @@ -20,7 +25,7 @@ data ### Truncation -In order to prevent unbounded growth and remove flushed operations from the WAL, it is regularly truncated and all but the last segment (which is currently active) are deleted at a configurable interval (`ingester.checkpoint-duration`) or after a combined max segment size is reached (`ingester.checkpoint-threshold`), whichever comes first. This is where checkpoints come into the picture. +In order to prevent unbounded growth and remove operations which have been flushed to storage from the WAL, it is regularly truncated and all but the last segment (which is currently active) are deleted at a configurable interval (`ingester.checkpoint-duration`). This is where checkpoints come into the picture. ### Checkpoints @@ -35,50 +40,77 @@ data └── 000003 <- newly written, empty ``` -In order to be adaptive in the face of dynamic throughputs, the ingester will calculate the expected time until the next checkpointing operation is performed. We suggest a simplistic approach first: the duration before the current checkpointing operation was requested, which will be a maximum of `ingester.checkpoint-duration` or less if the `ingester.checkpoint-threshold` was triggered. We'll call this our `checkpoint_duration`. Then, to give some wiggle room in case we've overestimated how much time we have until the next checkpoint, we'll reduce this (say to 90%). This `checkpoint_duration` will be used to amortize writes to disk to avoid IOPs burst. +Each in memory stream is iterated across an interval, calculated by `checkpoint_duration / in_memory_streams` and written to the checkpoint. After the checkpoint completes, it is moved from its temp directory to the `ingester.wal-dir`, taking the name of the last segment before it started (`checkpoint.000002`) and then all applicable segments (`00000`, `00001`, `00002`) and any previous checkpoint are deleted. -Then, each in memory stream is iterated every across an interval, calculated by `checkpoint_duration / in_memory_streams` and written to the checkpoint. After the checkpoint completes, it is moved from it's temp directory to the `ingester.wal-dir`, taking the name of the last segment before it started (`checkpoint.000003`) and then all applicable segments (`00000`, `00001`, `00002`) and any previous checkpoint are deleted. +Afterwards, it will look like: + +``` +data +└── wal + ├── checkpoint.000002 <- completed checkpoint + └── 000003 <- currently active wal segment +``` #### Queueing Checkpoint operations -Since checkpoints are created at dynamic intervals, it's possible that one operation will start at the same time another is running. In this case, the existing checkpoint operation should disregard it's internal ticker and flush it's series as fast as is possible. Afterwords, the next checkpoint operation can begin. This will likely create a localized spike in IOPS before the amortization of the following checkpoint operation takes over and is another important reason to run the WAL on an isolated disk in order to mitigate noisy neighbor problems. +It’s possible that one checkpoint operation will start at the same time another is running. In this case, the existing checkpoint operation should disregard its internal ticker and flush its series as fast as possible. Afterwards, the next checkpoint operation can begin. This will likely create a localized spike in IOPS before the amortization of the following checkpoint operation takes over and is another important reason to run the WAL on an isolated disk in order to mitigate noisy neighbor problems. After we've written/moved the current checkpoint, we reap the old one. ### WAL Record Types #### Streams -A `Stream` record type is written when an ingester receives a push for a series it doesn't yet have in memory. +A `Stream` record type is written when an ingester receives a push for a series it doesn't yet have in memory. At a high level, this will contain +```golang +type SeriesRecord struct { + UserID string + Labels labels.Labels + Fingerprint uint64 // label fingerprint +} +``` #### Logs A `Logs` record type is written when an ingester receives a push, containing the fingerprint of the series it refers to and a list of `(timestamp, log_line)` tuples, _after_ a `Stream` record type is written, if applicable. +```golang +type LogsRecord struct { + UserID string + Fingperprint uint64 // label fingerprint for the series these logs refer to + Entries []logproto.Entry +} +``` ### Restoration -Replaying a WAL is done by loading any available checkpoints into memory and then replaying any operations from segments on top. It's likely some of these operations will fail because they're already included in the checkpoint (due to delay introduced in our amortizations), but this is ok -- we won't _lose_ any data, only try to write some data twice, which we'll ignore. +Replaying a WAL is done by loading any available checkpoints into memory and then replaying any operations from successively named segments on top (`checkpoint.000003` -> `000004` -> `000005`, etc). It's likely some of these operations will fail because they're already included in the checkpoint (due to delay introduced in our amortizations), but this is ok -- we won't _lose_ any data, only try to write some data twice, which will be ignored. ### Deployment Introduction of the WAL requires that ingesters have persistent disks which are reconnected across restarts (this is a good fit for StatefulSets in Kubernetes). Additionally, it's recommended that the WAL uses an independent disk such that it's isolated from being affected by or causing noisy neighbor problems, especially during any IOPS spike(s). -### Questions -- Can we use underlying prometheus wal pkg when possible for consistency & to mitigate undifferentiated heavy lifting. Interfaces handle page alignment & use []byte. -- Make ingester series data impl proto.Message in order to generate byte conversions for disk loading? -- how to handle `ingester.retain-period` which is helpful b/c queries cache index lookups for 5min - - could we instead only cache index lookups for periods older than `querier.query-ingesters-within`? - - otherwise can be done -- Will the prometheus wal library handle arbitrarily long records? +### Implementation goals + +- Use underlying prometheus wal pkg when possible for consistency & to mitigate undifferentiated heavy lifting. Interfaces handle page alignment & use []byte. + - Ensure this package handles arbitrarily long records (log lines in Loki’s case). +- Ensure our in memory representations can be efficiently moved to/from `[]byte` in order to generate conversions for fast/efficient loading from checkpoints. +- Ensure chunks which have already been flushed to storage are kept around for `ingester.retain-period`, even after a WAL replay. ### Alternatives #### Use the Cortex WAL -Attractive in a specious way, the Cortex WAL seems unsuited for use in Loki due to revolving around _time_. Creating segments & checkpoints based on elapsed time suits Prometheus data where throughput is uniform and calculable via the `scrape_interval`, but unpredictable stream throughtput in Loki could result in problematic segment/checkpoint sizing. Thus a more dynamic approach is needed. - -Additionally, it would be a less efficient approach which wouldn't take advantage of the block compressions that already occurs in the ingesters. +Since we're not checkpointing from the WAL records but instead doing a memory dump, this isn't bottlenecked by throughput but rather memory size. Therefore we can start with checkpointing by duration rather than accounting for throughput as well. This makes the proposed solution nearly identical to the Cortex WAL approach. The one caveat is that wal segments will accrue between checkpoint operations and may constitute a large amount of data (log throughput varies). We may eventually consider other routes to handle this if duration based checkpointing proves insufficient. #### Don't build checkpoints from memory, instead write new WAL elements -Instead of building checkpoints from memory, this would build the same efficiencies into two distinct WAL Record types: `Blocks` and `FlushedChunks`. The former is a record type which willl contain an entire compressed block after it's cut and the latter will contain an entire chunk + the sequence of blocks it holds when it's flushed. This may offer good enough amortization of writes because block cuts are assumed to be evenly distributed & chunk flushes have the same property and use jitter for synchronization. +Instead of building checkpoints from memory, this would build the same efficiencies into two distinct WAL Record types: `Blocks` and `FlushedChunks`. The former is a record type which will contain an entire compressed block after it's cut and the latter will contain an entire chunk + the sequence of blocks it holds when it's flushed. This may offer good enough amortization of writes because block cuts are assumed to be evenly distributed & chunk flushes have the same property and use jitter for synchronization. + +This could be used to drop WAL records which have already elapsed the `ingester.retain-period`, allowing for faster WAL replays and more efficient loading. +```golang +type FlushMarker struct { + Fingerprint uint64 // labels + FlushedAt uint64 // timestamp when it was flushed, can be used with `ingester.retain-period` to either keep or discard records on replay + LastEntry logproto.Entry // last entry included in the flushed chunk +} +``` -This would allow building checkpoints without relying on an ingester's internal state, but would likely require multiple WALs, partitioned by record type in order to be able to iterate all `FlushedChunks` -> `Blocks` -> `Series` -> `Samples` such that we could no-op the later (lesser priority) types that are superseded by the former types. The benefits do not seem worth the cost here, especially considering the simpler suggested alternative and the extensibility costs if we need to add new record types if/when the ingester changes internally. +It would also allow building checkpoints without relying on an ingester's internal state, but would likely require multiple WALs, partitioned by record type in order to be able to iterate all `FlushedChunks` -> `Blocks` -> `Series` -> `Samples` such that we could no-op the later (lesser priority) types that are superseded by the former types. The benefits do not seem worth the cost here, especially considering the simpler suggested alternative and the extensibility costs if we need to add new record types if/when the ingester changes internally. From 03c283c47f08bf0b51c7b65200139424df0a8906 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 30 Sep 2020 11:55:46 -0400 Subject: [PATCH 3/3] formatting/consistency --- docs/sources/design-documents/2020-09-Write-Ahead-Log.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/sources/design-documents/2020-09-Write-Ahead-Log.md b/docs/sources/design-documents/2020-09-Write-Ahead-Log.md index faeb1977b3b7..283a200dc093 100644 --- a/docs/sources/design-documents/2020-09-Write-Ahead-Log.md +++ b/docs/sources/design-documents/2020-09-Write-Ahead-Log.md @@ -1,6 +1,7 @@ ## Metadata Author: Owen Diehl - [owen-d](https://github.com/owen-d) ([Grafana Labs](https://grafana.com/)) + Date: 30/09/2020 ## Impetus @@ -106,7 +107,7 @@ Instead of building checkpoints from memory, this would build the same efficienc This could be used to drop WAL records which have already elapsed the `ingester.retain-period`, allowing for faster WAL replays and more efficient loading. ```golang -type FlushMarker struct { +type FlushRecord struct { Fingerprint uint64 // labels FlushedAt uint64 // timestamp when it was flushed, can be used with `ingester.retain-period` to either keep or discard records on replay LastEntry logproto.Entry // last entry included in the flushed chunk