-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql,kv,storage: push column batch generation into kvserver #82323
Comments
Queries has https://cockroachlabs.atlassian.net/browse/CRDB-14837 to track this. During 22.2 planning, we kept this off the roadmap but left it next in priority as resources free up. FYI @mgartner |
In case you weren't aware, we do actually use apache arrow already in the vectorized engine, to send the columnar data around via DistSQL. The unfortunate bit is that we do not currently use the arrow serialized data as-is in the vectorized engine for most types - it requires deserialization, which sort of defeats the point. See |
This seems somewhat dramatic. At least some of the point here is to project out column to make the returned batch smaller. I hope that our arrow to column vector conversion is relatively high throughput. |
I think it's high throughput enough, and there are some benchmarks. I just meant I've always kind of regretted not finding a way to unify the in memory representation and the arrow one for that zero copy goodness! |
How sad would people feel if we don't implement the "local fast-path" with the KV projection pushdown? I mean that even if the KV Scan request is evaluated locally and we create The main argument for keeping the local fast path is that we can eliminate this serialization / deserialization step, and it seems nice to do it if we can. However, there are several reasons for not doing it:
Update: we probably will implement this local fast-path after all in order to eliminate (or at least substantially reduce) the perf hit when comparing against single-tenant deployment. |
92758: sql: correct the usage of UDF terms "arguments" and "parameters" r=mgartner a=mgartner #### sql: correct the usage of UDF terms "arguments" and "parameters" A _parameter_ is a variable in a function definition. An _argument_ is an expression passed to a function when the function is invoked. UDF-related code was not using this nomenclature correctly, creating confusion. This commit fixes up our usage of these terms within SQL optimization and execution. I know this is a pedantic change, but it is motivated by my struggles in writing clear code and comments with the previously overloading of the term _argument_. Epic: CRDB-20370 Release note: None #### sql: correct the usage of terms "arguments" and "parameters" in descriptors Release note: None 92854: kvserver,logstore: move term and entry loading to logstore r=tbg a=pavelkalinnikov This PR moves entry and term fetching to the `logstore` package. Part of #91979 Release note: None 92883: kvserver: allow exceeding MaxSpanRequestKeys limit in some cases r=yuzefovich a=yuzefovich This commit removes an assertion that `MaxSpanRequestKeys` is not exceeded. In particular, this limit can be exceeded when SQL asks for full SQL rows (by specifying non-zero `WholeRowsOfSize`) and doesn't allow for empty responses (by specifying `AllowEmpty` as `false`). Note that this code path hasn't been hit so far in production because the only user of `WholeRowsOfSize` (the streamer) currently doesn't set `MaxSpanRequestKeys`. Informs: #82323. Release note: None Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com> Co-authored-by: Pavel Kalinnikov <pavel@cockroachlabs.com> Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
93134: storage: refactor the iteration for scans r=yuzefovich a=yuzefovich **storage: refactor the iteration for scans** This commit refactors the iteration that we have for handling scans and reverse scans in order to split the "get" part from the "advance key" part. This is needed for the KV projection pushdown work in order to avoid copying each key-value pair: unlike with the existing `pebbleResults` struct where we append all KVs into `repr` buffer (and, thus, copy each KV anyway), with the projection pushdown work we want to perform the decoding directly on the unsafe KV that the iterator is pointing at. With the previous "eager" advancement of the iterator, that KV pair would get invalidated before we could decode it (to keep only the needed parts of the KV). Now the decoder will be able to access the unsafe KV (and copy out the necessary parts) before it is invalidated. An additional boolean is introduced to indicate whether a new KV was added into the result set or not (also needed for the pushdown work) but is currently unused. Previously, the logic was as: - seek to the first key - in the loop: - get one KV and advance to the next key (both actions as a single operation). Now, the logic becomes: - seek to the first key - in the loop: - get one KV - if the iteration can continue, advance to the next key. This is implemented by introducing a simple state machine for the "advance key" functions. The benchmark results show minor changes, mostly positive, some negative: https://gist.github.com/yuzefovich/4f976075e8dc9c5da9171c281787432d Informs: #82323 Epic: CRDB-14837 Release note: None **storage: eliminate a copy of the key in some cases with reverse scans** This commit changes the contract of `prevKey` method to require that a copy is passed in. Previously, the method would make a copy of the key on its own before using the iterator, but in one of the two places where `prevKey` is used it would be redundant since the caller (`seekVersion`) already made a copy. Epic: None Release note: None Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
94348: sql,storage: some preliminary changes for KV projection pushdown r=yuzefovich a=yuzefovich This PR contains a couple of commits that are mostly mechanical changes in preparation of the KV pushdown work. Some microbenchmarks of this PR are [here](https://gist.github.com/yuzefovich/24d3238bc638cc1121fd345c68ca3d0b), and they show effectively no change in the scan speed. Epic: CRDB-14837 Informs: #82323 Informs: #87610 Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
94438: sql,storage: add support for COL_BATCH_RESPONSE scan format r=yuzefovich a=yuzefovich This commit introduces a new `COL_BATCH_RESPONSE` scan format for Scans and ReverseScans which results only in needed columns to be returned from the KV server. In other words, this commit introduces the ability to perform the KV projection pushdown. The main idea of this feature is to use the injected decoding logic from SQL in order to process each KV and keep only the needed parts (i.e. necessary SQL columns). Those needed parts are then propagated back to the KV client as coldata.Batch'es (serialized in the Apache Arrow format). Here is the outline of all components involved: ``` ┌────────────────────────────────────────────────┐ │ SQL │ │________________________________________________│ │ colfetcher.ColBatchDirectScan │ │ │ │ │ ▼ │ │ row.txnKVFetcher │ │ (behind the row.KVBatchFetcher interface) │ └────────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────┐ │ KV Client │ └────────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────┐ │ KV Server │ │________________________________________________│ │ colfetcher.cFetcherWrapper │ │ (behind the storage.CFetcherWrapper interface) │ │ │ │ │ ▼ │ │ colfetcher.cFetcher │ │ │ │ │ ▼ │ │ storage.mvccScanFetchAdapter ────────┐│ │ (behind the storage.NextKVer interface) ││ │ │ ││ │ ▼ ││ │ storage.pebbleMVCCScanner ││ │ (which put's KVs into storage.singleResults) <┘│ └────────────────────────────────────────────────┘ ``` On the KV client side, `row.txnKVFetcher` issues Scans and ReverseScans with the `COL_BATCH_RESPONSE` format and returns the response (which contains the columnar data) to the `colfetcher.ColBatchDirectScan`. On the KV server side, we create a `storage.CFetcherWrapper` that asks the `colfetcher.cFetcher` for the next `coldata.Batch`. The `cFetcher`, in turn, fetches the next KV, decodes it, and keeps only values for the needed SQL columns, discarding the rest of the KV. The KV is emitted by the `mvccScanFetchAdapter` which - via the `singleResults` struct - exposes access to the current KV that the `pebbleMVCCScanner` is pointing at. Note that there is an additional "implicit synchronization" between components that is not shown on this diagram. In particular, `storage.singleResults.maybeTrimPartialLastRow` must be in sync with the `colfetcher.cFetcher` which is achieved by - the `cFetcher` exposing access to the first key of the last incomplete SQL row via the `FirstKeyOfRowGetter`, - the `singleResults` using that key as the resume key for the response, - and the `cFetcher` removing that last partial SQL row when `NextKV()` returns `partialRow=true`. This "upstream" link (although breaking the layering a bit) allows us to avoid a performance penalty for handling the case with multiple column families. (This case is handled by the `storage.pebbleResults` via tracking offsets into the `pebbleResults.repr`.) This code structure deserves some elaboration. First, there is a mismatch between the "push" mode in which the `pebbleMVCCScanner` operates and the "pull" mode that the `NextKVer` exposes. The adaption between two different modes is achieved via the `mvccScanFetcherAdapter` grabbing (when the control returns to it) the current unstable KV pair from the `singleResults` struct which serves as a one KV pair buffer that the `pebbleMVCCScanner` `put`s into. Second, in order be able to use the unstable KV pair without performing a copy, the `pebbleMVCCScanner` stops at the current KV pair and returns the control flow (which is exactly what `pebbleMVCCScanner.getOne` does) back to the `mvccScanFetcherAdapter`, with the adapter advancing the scanner only when the next KV pair is needed. There are multiple scenarios which are currently not supported: - SQL cannot issue Get requests (likely will support in 23.1) - `TraceKV` option is not supported (likely will support in 23.1) - user-defined types other than enums are not supported (will _not_ support in 23.1) - non-default key locking strength as well as SKIP LOCKED wait policy are not supported (will _not_ support in 23.1). The usage of this feature is currently disabled by default, but I intend to enable it by default for multi-tenant setups. The rationale is that currently there is a large performance hit when enabling it for single-tenant deployments whereas it offers significant speed up in the multi-tenant world. The microbenchmarks [show](https://gist.github.com/yuzefovich/669c295a8a4fdffa6490532284c5a719) the expected improvement in multi-tenant setups when the tenant runs in a separate process whenever we don't need to decode all of the columns from the table. The TPCH numbers, though, don't show the expected speedup: ``` Q1: before: 11.47s after: 8.84s -22.89% Q2: before: 0.41s after: 0.29s -27.71% Q3: before: 7.89s after: 9.68s 22.63% Q4: before: 4.48s after: 4.52s 0.86% Q5: before: 10.39s after: 10.35s -0.29% Q6: before: 33.57s after: 33.41s -0.48% Q7: before: 23.82s after: 23.81s -0.02% Q8: before: 3.78s after: 3.76s -0.68% Q9: before: 28.15s after: 28.03s -0.42% Q10: before: 5.00s after: 4.98s -0.42% Q11: before: 2.44s after: 2.44s 0.22% Q12: before: 34.78s after: 34.65s -0.37% Q13: before: 3.20s after: 2.94s -8.28% Q14: before: 3.13s after: 3.21s 2.43% Q15: before: 16.80s after: 16.73s -0.38% Q16: before: 1.60s after: 1.65s 2.96% Q17: before: 0.85s after: 0.96s 13.04% Q18: before: 16.39s after: 15.47s -5.61% Q19: before: 13.76s after: 13.01s -5.45% Q20: before: 55.33s after: 55.12s -0.38% Q21: before: 24.31s after: 24.31s -0.00% Q22: before: 1.28s after: 1.41s 10.26% ``` At the moment, `coldata.Batch` that is included into the response is always serialized into the Arrow format, but I intend to introduce the local fastpath to avoid that serialization. That work will be done in a follow-up and should be able to reduce the perf hit for single-tenant deployments. A quick note on the TODOs sprinkled in this commit: - `TODO(yuzefovich)` means that this will be left for 23.2 or later. - `TODO(yuzefovich, 23.1)` means that it should be addressed in 23.1. A quick note on testing: this commit randomizes the fact whether the new infrastructure is used in almost all test builds. Introducing some unit testing (say, in `storage` package) seems rather annoying since we must create keys that are valid SQL keys (i.e. have TableID / Index ID prefix) and need to come with the corresponding `fetchpb.IndexFetchSpec`. Not having unit tests in the `storage` seems ok to me given that the "meat" of the work there is still done by the `pebbleMVCCScanner` which is exercised using the regular Scans. End-to-end testing is well covered by all of our existing tests which now runs randomly. I did run the CI multiple times with the new feature enabled by default with no failure, so I hope that it shouldn't become flaky. Addresses: #82323. Informs: #87610. Epic: CRDB-14837 Release note: None 95701: gossip: Track latency by nodeID rather than addr r=kvoli,erikgrinaker a=andrewbaptist Previously the latency to remote nodes was tracked by address rather than the node's id. This could result in a few problems. First, the remote address could be reused across nodes. This could result in incorrect information. Additionally, places that used this information (such as the allocator) needed to unnecessarily map the node id to address just to do a lookup. Finally in preparation for dialback on heartbeat #84289 the use of the OriginAddr field in the PingRequest will change to be the actual address that a node should use to dial back. Currently this field is not set correctly. Epic: none Release note: None 95796: ui: add CPU Time chart do statement details r=maryliag a=maryliag This commit adds a new chart for CPU time on Statement Details page. Part Of #87213 <img width="1508" alt="Screen Shot 2023-01-24 at 6 01 07 PM" src="https://user-images.githubusercontent.com/1017486/214440274-c48d3bb6-ecbe-47a2-861a-0a8407d219c4.png"> Release note (ui change): Add CPU Time chart to Statement Details page. 95832: cdc: remove 'nonsensitive' tag from changefeed description in telemetry logs r=jayshrivastava a=jayshrivastava Previously, the description field in changefeed telemetry logs was marked as `nonsensitive`. This is incorrect because the description field may contain an SQL statement which is not safe to report. This change removes the `nonsensitive` tag so the field is redacted by default. Fixes: #95823 Epic: none Release note: none 95838: logictest: remove smallEngineBlocks randomization r=yuzefovich a=yuzefovich This metamorphic randomization has caused some flakiness (due to a subset of tests taking very long time) so is now removed. This feature should be tested in a more targeted fashion. Fixes: #95799. Fixes: #95829 Release note: None 95840: opt: replace make with dev in test instructions r=mgartner a=mgartner Epic: None Release note: None 95842: roachtest: fix parameters passed to require.NoError r=yuzefovich,srosenberg,herkolategan a=renatolabs When context is passed to an assertion, the parameters *must* be a string format, followed by arguments (as you would in a call to `fmt.Sprintf`). The previous code would panic trying to cast int to string. Informs #95416 Release note: None Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com> Co-authored-by: Andrew Baptist <baptist@cockroachlabs.com> Co-authored-by: maryliag <marylia@cockroachlabs.com> Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com> Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com> Co-authored-by: Renato Costa <renato@cockroachlabs.com>
95033: storage,colfetcher: implement local fast-path for COL_BATCH_RESPONSE r=yuzefovich a=yuzefovich This commit implements the local fast-path for the COL_BATCH_RESPONSE scan format. The idea is that if a Scan request is evaluated locally (i.e. on the same node for single-tenant deployments or within the shared process for multi-tenant deployments), then we can avoid the redundant serialization of the columnar batches in the Apache Arrow format and just pass the batches as a slice of pointers through the protobuf. Additionally, this also allows us to avoid a copy of the data from `ScanResponse.BatchResponse` into the columnar batch. To achieve this the ScanResponses and the ReverseScanResponses now contain a new custom `ColBatches` message which only includes `[]coldata.Batch` that is not marshalled as part of the protobuf serialization. Now that we can have a single multi-range request result in locally- and remotely-executed single-range requests, we need to be careful when combining them. In particular, in order to preserve the ordering between single-range requests we now always deserialize the remotely-executed ones (since this "combining" happens on the KV client side and won't be sent over the wire again) while "merging" them accordingly. This required introduction of an injected helper for the deserialization from the Apache Arrow format into the `kvpb` package. This deserialization also required that we have access to the `fetchpb.IndexFetchSpec` proto that is stored in the BatchRequest, thus, the signature of `combine` method has been adjusted to include the reference to the BatchRequest. Additional quirk of this commit is that the `cFetcher` cannot reuse the same batch when it is used by the `cFetcherWrapper` when skipping the serialization. (If it did reuse batches, then the slice of batches would contain multiple references to the same batch, so only the last reference would be correct - all previous ones would have been reset.) To do that the `colmem.SetAccountingHelper` has been adjusted to be able to keep the same heuristic when it comes to the sizing of the batch while always allocating a new one, even if under other circumstances it would have reused the old batch. It's also worth noting the story about memory accounting of these local batches. The `SetAccountingHelper` used by the `cFetcher` always tracks the memory usage only of the last batch, so we need to account for all other batches ourselves. We go around this by providing the `cFetcher` with a "detached" memory account (i.e. an account that is not connected to the memory accounting system) that is used by the `cFetcher` to limit the batch size based on the footprint, and modifying the `cFetcherWrapper` to perform the accounting against the proper memory account. This commit also clarifies the contract of `CFetcherWrapper.NextBatch` that it is the wrapper's responsibility to perform memory accounting of all batches, regardless of the return format, against the provided memory account. This only covers part of the story from the KV server side. On the KV client side the memory accounting is done in `txnKVFetcher`. When the batches are serialized, they are included in `ScanResponse.BatchResponse` field and, thus, are included into `BatchResponse.Size` which we use for accounting. For the non-serialized batches this commit implements the custom `Size()` method so that the true footprint of all `coldata.Batch`es is included into `BatchResponse.Size`. As a result, all local batches (including the ones that were deserialized when combining responses to locally- and remotely-executed requests) are tracked by the `txnKVFetcher` until a new `BatchRequest` is issued, so the ColBatchDirectScan doesn't need to perform the accounting. (Note that we perform the accounting for `ScanResponse.BatchResponse` slices in a similar manner - we don't shrink the memory account when a single response becomes garbage (due to likely under-accounting in other places).) A special note on type schemas with enums: since enums require type hydration that is not easily available on the KV server side and we treat them simply as bytes values, the presence of enums forces us to serialize the batches even for locally-executed requests. This seems like a minor limitation in comparison to not supporting enums at all. Another note on the datum-backed vectors: since the `cFetcherWrapper` also doesn't have access to a valid `eval.Context`, the datum-backed vectors produced by the wrapper are "incomplete". Previously, since we always serialized the batches, it wasn't an issue. However, now if we get a non-serialized batch from the locally-executed request, we must update all datum-backed vectors with the proper eval context. This is done by the `ColBatchDirectScan`. The microbenchmarks of this change when the direct columnar scans are always enabled are [here](https://gist.github.com/yuzefovich/a9b28669f35ff658b2e89ed7b1d43e38). Note that there are three distinct operation modes in that gist: - `Cockroach` and `MultinodeCockroach` - single-tenant deployments - `SharedProcessTenant` - this is how we imagine that dedicated clusters will run once the Unified Architecture is achieved - `SepProcessTenant` - this is how we run Serverless. For the first two this commit results mostly in a minor improvement in latency and sometimes noticeable reducation in allocations, as expected. SepProcessTenant config - which cannot take advantage of the local fastpath - sees a minor slowdown in latency and no changes in allocations, as expected (I'm attributing this to increased overhead of the direct columnar scans and increased size of `ScanResponse` objects). However, these are micro-benchmarks, and they don't show the full picture. In particular, they don't process enough data and often select all columns in the table for this feature to show its benefits. I'm more excited about the results on the TPC-H queries. Here is the impact of this commit on 3 node cluster running in single-tenant model (averaged over 10 runs): ``` Q1: before: 4.46s after: 4.23s -5.15% Q2: before: 3.18s after: 3.30s 3.45% Q3: before: 2.43s after: 2.11s -13.20% Q4: before: 1.83s after: 1.84s 0.44% Q5: before: 2.65s after: 2.48s -6.34% Q6: before: 7.59s after: 7.46s -1.65% Q7: before: 5.56s after: 5.72s 2.71% Q8: before: 1.14s after: 1.11s -2.29% Q9: before: 5.77s after: 5.31s -7.86% Q10: before: 1.98s after: 1.94s -1.92% Q11: before: 0.73s after: 0.69s -5.52% Q12: before: 7.18s after: 6.91s -3.79% Q13: before: 1.24s after: 1.24s 0.16% Q14: before: 0.70s after: 0.66s -5.32% Q15: before: 3.99s after: 3.64s -8.89% Q16: before: 0.95s after: 0.94s -1.16% Q17: before: 0.27s after: 0.26s -5.49% Q18: before: 2.67s after: 2.15s -19.39% Q19: before: 4.03s after: 2.96s -26.46% Q20: before: 12.91s after: 11.49s -10.98% Q21: before: 7.14s after: 6.99s -2.13% Q22: before: 0.60s after: 0.57s -5.48% ``` Furthermore, here is the comparison of the direct columnar scans disabled vs enabled: ``` Q1: before: 4.36s after: 4.23s -2.91% Q2: before: 3.57s after: 3.30s -7.63% Q3: before: 2.31s after: 2.11s -8.61% Q4: before: 1.88s after: 1.84s -2.07% Q5: before: 2.55s after: 2.48s -2.70% Q6: before: 7.94s after: 7.46s -6.04% Q7: before: 5.87s after: 5.72s -2.61% Q8: before: 1.12s after: 1.11s -1.07% Q9: before: 5.79s after: 5.31s -8.27% Q10: before: 1.97s after: 1.94s -1.47% Q11: before: 0.69s after: 0.69s -0.29% Q12: before: 6.99s after: 6.91s -1.16% Q13: before: 1.24s after: 1.24s -0.48% Q14: before: 0.68s after: 0.66s -3.37% Q15: before: 3.72s after: 3.64s -2.23% Q16: before: 0.96s after: 0.94s -1.88% Q17: before: 0.28s after: 0.26s -6.18% Q18: before: 2.47s after: 2.15s -12.87% Q19: before: 3.20s after: 2.96s -7.35% Q20: before: 11.71s after: 11.49s -1.88% Q21: before: 7.00s after: 6.99s -0.06% Q22: before: 0.58s after: 0.57s -2.07% ``` In other words, on TPC-H queries it is now already beneficial to enable the direct columnar scans in single-tenant world (and I think there are more minor optimizations ahead). For reference, [here](https://gist.github.com/yuzefovich/0afce5c0692713cf28712f076bab415b) is the comparison of direct columnar scans disabled vs enabled on this commit. It also shows that we're not that far off from reaching the performance parity in micro-benchmarks. Addresses: #82323. Informs: #87610. Epic: CRDB-14837 Release note: None Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
23.1 must-haves:
Get
requestsTraceKV
flag ofcFetcher
)KV Bytes Read
statistic?23.1 nice-to-haves:
estimatedRowCount
as the hint forcFetcherWrapper
(sql, kv: propagate and utilize estimated row count hint for KV projection pushdown work #94850)Later:
Is your feature request related to a problem? Please describe.
One known bottleneck for cockroach performance is so-called "scan speed". In practice, this is the speed to scan data off of disk, encode it into the scan response, decode it, then re-encode it into a columnar format. The columnar format is now used extensively in execution. The above summary is misleading in a dedicated cluster: often the query execution happens in the same process as the kvserver, so the encoding and decoding step can be skipped. In multi-tenant deployments, the data must be transmitted over the network back to the server. This can be particularly costly when the data is being served from a separate availability zone ([1], #71887). The above proposal has the potential to improve the speed by 1) not decoding columns we don't need and 2) creating much smaller responses.
Any eventual movement towards columnarization at the storage layer will need to have a corresponding read API. This issue posits that we should build the columnar read API first to gain experience.
Describe the solution you'd like
We should make an apache arrow batch response format which does column projection based on the
IndexFetchSpec
.Additional context
Relates very closely to if not just adds exposition to #71887.
@jordanlewis made a prototype here: #52863. At the time it showed a ~5% win in TPCH performance.
@RaduBerinde put in a ton of work to clean up how we specify the data to be fetched. Now there exists a small protobuf which could conceivably be transmitted with the scan request and used to describe how to decode the data.
[1] We're probably going to do #72593 to attack the cross-AZ network cost problem.
Jira issue: CRDB-16284
Epic: CRDB-14837
The text was updated successfully, but these errors were encountered: