Skip to content

Commit

Permalink
chunks: decode varints directly from byte buffer; stop panicing on so…
Browse files Browse the repository at this point in the history
…me corrupt inputs (grafana#7264)

**What this PR does / why we need it**:

This is much faster, since we don't pay for byte-at-a-time reads from
the buffer.

For best performance with gzip and lz4, we still want a bufio.Reader. We
don't need the pool of buffered readers now.

Other compression formats have their own internal buffers so don't need
extra buffering.

**Checklist**
- [x] Reviewed the `CONTRIBUTING.md` guide
- NA Documentation added
- NA Tests updated
- [x] `CHANGELOG.md` updated
- NA Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`

(using changes from grafana#7246, ignoring
lines that didn't change much)

```
name                           old time/op      new time/op      delta
Write/ordered-lz4-256k-4           24.1ms ± 7%      22.1ms ± 2%    -8.52%  (p=0.008 n=5+5)
Write/unordered-lz4-4              54.6ms ±29%      49.1ms ± 1%   -10.04%  (p=0.016 n=5+4)
Read/none_66_kB-4                  4.16ms ± 6%      2.54ms ± 2%   -38.87%  (p=0.008 n=5+5)
Read/gzip_66_kB-4                   177ms ± 4%       161ms ± 4%    -9.09%  (p=0.008 n=5+5)
Read/lz4-64k_66_kB-4               59.5ms ± 2%      49.5ms ± 3%   -16.80%  (p=0.008 n=5+5)
Read/lz4-256k_66_kB-4              62.9ms ± 0%      51.2ms ± 1%   -18.58%  (p=0.016 n=4+5)
Read/lz4-1M_66_kB-4                62.9ms ± 2%      51.5ms ± 1%   -18.03%  (p=0.008 n=5+5)
Read/lz4_66_kB-4                   63.7ms ± 4%      51.2ms ± 1%   -19.68%  (p=0.008 n=5+5)
Read/snappy_66_kB-4                51.1ms ± 2%      41.9ms ± 1%   -17.95%  (p=0.016 n=5+4)
Read/flate_66_kB-4                  168ms ± 4%       150ms ± 2%   -10.49%  (p=0.008 n=5+5)
Read/zstd_66_kB-4                   334ms ±16%       272ms ± 6%   -18.52%  (p=0.008 n=5+5)
Read/none_262_kB-4                 3.93ms ± 1%      2.48ms ± 2%   -36.91%  (p=0.008 n=5+5)
Read/gzip_262_kB-4                  159ms ± 5%       143ms ± 2%   -10.12%  (p=0.008 n=5+5)
Read/lz4-64k_262_kB-4              53.5ms ± 1%      43.2ms ± 3%   -19.35%  (p=0.008 n=5+5)
Read/lz4-256k_262_kB-4             55.8ms ± 1%      45.3ms ± 2%   -18.75%  (p=0.008 n=5+5)
Read/lz4-1M_262_kB-4               57.5ms ± 2%      45.5ms ± 2%   -20.91%  (p=0.008 n=5+5)
Read/lz4_262_kB-4                  56.8ms ± 2%      45.3ms ± 2%   -20.26%  (p=0.008 n=5+5)
Read/snappy_262_kB-4               46.0ms ± 2%      37.5ms ± 1%   -18.57%  (p=0.008 n=5+5)
Read/flate_262_kB-4                 153ms ± 3%       134ms ± 2%   -12.33%  (p=0.008 n=5+5)
Read/none_524_kB-4                 3.85ms ± 2%      2.46ms ± 2%   -36.04%  (p=0.008 n=5+5)
Read/gzip_524_kB-4                  128ms ± 2%       119ms ± 2%    -7.64%  (p=0.008 n=5+5)
Read/lz4-64k_524_kB-4              44.6ms ± 3%      34.7ms ± 1%   -22.19%  (p=0.008 n=5+5)
Read/lz4-256k_524_kB-4             45.4ms ± 1%      36.7ms ± 5%   -19.07%  (p=0.008 n=5+5)
Read/lz4-1M_524_kB-4               47.3ms ± 1%      38.6ms ± 2%   -18.36%  (p=0.008 n=5+5)
Read/lz4_524_kB-4                  47.4ms ± 1%      38.0ms ± 1%   -19.70%  (p=0.008 n=5+5)
Read/snappy_524_kB-4               37.3ms ± 1%      31.1ms ± 3%   -16.47%  (p=0.008 n=5+5)
Read/flate_524_kB-4                 122ms ± 1%       111ms ± 3%    -9.04%  (p=0.008 n=5+5)
Read/sample_none_66_kB-4           7.81ms ± 1%      6.62ms ± 6%   -15.24%  (p=0.008 n=5+5)
Read/sample_gzip_66_kB-4            237ms ± 2%       219ms ± 1%    -7.57%  (p=0.008 n=5+5)
Read/sample_lz4-64k_66_kB-4         105ms ± 2%        92ms ± 1%   -12.03%  (p=0.008 n=5+5)
Read/sample_lz4-256k_66_kB-4        109ms ± 1%        98ms ± 2%    -9.65%  (p=0.008 n=5+5)
Read/sample_lz4-1M_66_kB-4          113ms ± 8%        98ms ± 2%   -13.66%  (p=0.008 n=5+5)
Read/sample_lz4_66_kB-4             109ms ± 1%        99ms ± 2%    -9.29%  (p=0.008 n=5+5)
Read/sample_snappy_66_kB-4         86.6ms ± 1%      77.2ms ± 6%   -10.92%  (p=0.008 n=5+5)
Read/sample_flate_66_kB-4           236ms ± 8%       214ms ± 3%    -9.11%  (p=0.008 n=5+5)
Read/sample_none_262_kB-4          7.70ms ± 2%      6.34ms ± 2%   -17.67%  (p=0.008 n=5+5)
Read/sample_lz4-64k_262_kB-4       94.7ms ± 1%      84.4ms ± 2%   -10.90%  (p=0.008 n=5+5)
Read/sample_lz4-256k_262_kB-4      98.4ms ± 1%      87.7ms ± 2%   -10.93%  (p=0.008 n=5+5)
Read/sample_lz4-1M_262_kB-4         101ms ± 2%        90ms ± 2%   -11.19%  (p=0.008 n=5+5)
Read/sample_lz4_262_kB-4            100ms ± 2%        90ms ± 3%   -10.78%  (p=0.008 n=5+5)
Read/sample_snappy_262_kB-4        77.9ms ± 2%      68.7ms ± 2%   -11.84%  (p=0.008 n=5+5)
Read/sample_flate_262_kB-4          211ms ± 2%       190ms ± 2%    -9.86%  (p=0.008 n=5+5)
Read/sample_none_524_kB-4          7.82ms ± 2%      6.43ms ± 2%   -17.80%  (p=0.008 n=5+5)
Read/sample_gzip_524_kB-4           176ms ± 1%       164ms ± 2%    -6.80%  (p=0.008 n=5+5)
Read/sample_lz4-64k_524_kB-4       77.7ms ± 1%      69.8ms ± 4%   -10.10%  (p=0.008 n=5+5)
Read/sample_lz4-256k_524_kB-4      81.7ms ± 2%      72.1ms ± 3%   -11.79%  (p=0.008 n=5+5)
Read/sample_lz4-1M_524_kB-4        83.8ms ± 1%      74.4ms ± 1%   -11.21%  (p=0.008 n=5+5)
Read/sample_lz4_524_kB-4           83.2ms ± 0%      76.1ms ± 8%    -8.53%  (p=0.016 n=4+5)
Read/sample_snappy_524_kB-4        63.6ms ± 2%      56.5ms ± 2%   -11.23%  (p=0.008 n=5+5)
Read/sample_flate_524_kB-4          172ms ± 2%       157ms ± 1%    -8.88%  (p=0.008 n=5+5)

name                           old speed        new speed        delta
Write/ordered-lz4-256k-4          687MB/s ± 6%     749MB/s ± 2%    +9.09%  (p=0.008 n=5+5)
Write/unordered-lz4-4             313MB/s ±24%     341MB/s ± 1%    +9.23%  (p=0.016 n=5+4)
Read/none_66_kB-4                1.82GB/s ± 6%    2.97GB/s ± 2%   +63.42%  (p=0.008 n=5+5)
Read/gzip_66_kB-4                 669MB/s ± 4%     736MB/s ± 4%   +10.00%  (p=0.008 n=5+5)
Read/lz4-64k_66_kB-4             1.42GB/s ± 2%    1.71GB/s ± 3%   +20.21%  (p=0.008 n=5+5)
Read/lz4-256k_66_kB-4            1.43GB/s ± 0%    1.76GB/s ± 1%   +22.83%  (p=0.016 n=4+5)
Read/lz4-1M_66_kB-4              1.43GB/s ± 2%    1.74GB/s ± 1%   +21.99%  (p=0.008 n=5+5)
Read/lz4_66_kB-4                 1.41GB/s ± 4%    1.76GB/s ± 1%   +24.44%  (p=0.008 n=5+5)
Read/snappy_66_kB-4              1.29GB/s ± 2%    1.57GB/s ± 1%   +21.86%  (p=0.016 n=5+4)
Read/flate_66_kB-4                710MB/s ± 4%     793MB/s ± 2%   +11.70%  (p=0.008 n=5+5)
Read/zstd_66_kB-4                 425MB/s ±14%     517MB/s ± 6%   +21.74%  (p=0.008 n=5+5)
Read/none_262_kB-4               1.93GB/s ± 1%    3.06GB/s ± 2%   +58.54%  (p=0.008 n=5+5)
Read/gzip_262_kB-4                701MB/s ± 5%     779MB/s ± 2%   +11.20%  (p=0.008 n=5+5)
Read/lz4-64k_262_kB-4            1.47GB/s ± 1%    1.82GB/s ± 3%   +24.03%  (p=0.008 n=5+5)
Read/lz4-256k_262_kB-4           1.48GB/s ± 1%    1.82GB/s ± 2%   +23.09%  (p=0.008 n=5+5)
Read/lz4-1M_262_kB-4             1.46GB/s ± 2%    1.84GB/s ± 2%   +26.44%  (p=0.008 n=5+5)
Read/lz4_262_kB-4                1.48GB/s ± 2%    1.85GB/s ± 2%   +25.42%  (p=0.008 n=5+5)
Read/snappy_262_kB-4             1.31GB/s ± 2%    1.61GB/s ± 1%   +22.80%  (p=0.008 n=5+5)
Read/flate_262_kB-4               729MB/s ± 3%     831MB/s ± 2%   +14.05%  (p=0.008 n=5+5)
Read/none_524_kB-4               1.97GB/s ± 2%    3.08GB/s ± 2%   +56.33%  (p=0.008 n=5+5)
Read/gzip_524_kB-4                714MB/s ± 2%     773MB/s ± 2%    +8.27%  (p=0.008 n=5+5)
Read/lz4-64k_524_kB-4            1.47GB/s ± 3%    1.88GB/s ± 1%   +28.48%  (p=0.008 n=5+5)
Read/lz4-256k_524_kB-4           1.50GB/s ± 1%    1.86GB/s ± 5%   +23.63%  (p=0.008 n=5+5)
Read/lz4-1M_524_kB-4             1.49GB/s ± 1%    1.83GB/s ± 2%   +22.51%  (p=0.008 n=5+5)
Read/lz4_524_kB-4                1.49GB/s ± 1%    1.86GB/s ± 1%   +24.53%  (p=0.008 n=5+5)
Read/snappy_524_kB-4             1.34GB/s ± 1%    1.60GB/s ± 3%   +19.75%  (p=0.008 n=5+5)
Read/flate_524_kB-4               750MB/s ± 1%     824MB/s ± 3%    +9.97%  (p=0.008 n=5+5)
Read/sample_none_66_kB-4          967MB/s ± 1%    1142MB/s ± 6%   +18.13%  (p=0.008 n=5+5)
Read/sample_gzip_66_kB-4          501MB/s ± 2%     542MB/s ± 1%    +8.19%  (p=0.008 n=5+5)
Read/sample_lz4-64k_66_kB-4       811MB/s ± 2%     922MB/s ± 1%   +13.66%  (p=0.008 n=5+5)
Read/sample_lz4-256k_66_kB-4      825MB/s ± 1%     913MB/s ± 2%   +10.68%  (p=0.008 n=5+5)
Read/sample_lz4-1M_66_kB-4        795MB/s ± 7%     920MB/s ± 1%   +15.67%  (p=0.008 n=5+5)
Read/sample_lz4_66_kB-4           824MB/s ± 1%     909MB/s ± 2%   +10.25%  (p=0.008 n=5+5)
Read/sample_snappy_66_kB-4        759MB/s ± 1%     852MB/s ± 6%   +12.37%  (p=0.008 n=5+5)
Read/sample_flate_66_kB-4         506MB/s ± 7%     556MB/s ± 4%    +9.88%  (p=0.008 n=5+5)
Read/sample_none_262_kB-4         983MB/s ± 2%    1194MB/s ± 2%   +21.47%  (p=0.008 n=5+5)
Read/sample_lz4-64k_262_kB-4      830MB/s ± 1%     932MB/s ± 2%   +12.24%  (p=0.008 n=5+5)
Read/sample_lz4-256k_262_kB-4     839MB/s ± 1%     942MB/s ± 2%   +12.28%  (p=0.008 n=5+5)
Read/sample_lz4-1M_262_kB-4       829MB/s ± 2%     934MB/s ± 2%   +12.60%  (p=0.008 n=5+5)
Read/sample_lz4_262_kB-4          836MB/s ± 2%     937MB/s ± 3%   +12.11%  (p=0.008 n=5+5)
Read/sample_snappy_262_kB-4       774MB/s ± 2%     878MB/s ± 2%   +13.44%  (p=0.008 n=5+5)
Read/sample_flate_262_kB-4        529MB/s ± 2%     587MB/s ± 2%   +10.94%  (p=0.008 n=5+5)
Read/sample_none_524_kB-4         970MB/s ± 2%    1180MB/s ± 2%   +21.64%  (p=0.008 n=5+5)
Read/sample_gzip_524_kB-4         521MB/s ± 1%     559MB/s ± 2%    +7.31%  (p=0.008 n=5+5)
Read/sample_lz4-64k_524_kB-4      842MB/s ± 1%     937MB/s ± 4%   +11.30%  (p=0.008 n=5+5)
Read/sample_lz4-256k_524_kB-4     834MB/s ± 2%     945MB/s ± 3%   +13.37%  (p=0.008 n=5+5)
Read/sample_lz4-1M_524_kB-4       842MB/s ± 1%     949MB/s ± 1%   +12.63%  (p=0.008 n=5+5)
Read/sample_lz4_524_kB-4          849MB/s ± 0%     929MB/s ± 7%    +9.53%  (p=0.016 n=4+5)
Read/sample_snappy_524_kB-4       783MB/s ± 2%     882MB/s ± 2%   +12.65%  (p=0.008 n=5+5)
Read/sample_flate_524_kB-4        534MB/s ± 2%     585MB/s ± 1%    +9.73%  (p=0.008 n=5+5)

name                           old alloc/op     new alloc/op     delta
Write/unordered-lz4-4              18.8MB ± 4%      17.2MB ± 7%    -8.28%  (p=0.008 n=5+5)
Read/none_66_kB-4                  41.9kB ± 0%      43.7kB ± 0%    +4.41%  (p=0.008 n=5+5)
Read/gzip_66_kB-4                   713kB ± 0%       741kB ± 0%    +3.85%  (p=0.016 n=5+4)
Read/lz4-64k_66_kB-4                730kB ± 1%       748kB ± 0%    +2.53%  (p=0.016 n=5+4)
Read/lz4-256k_66_kB-4               770kB ± 0%       806kB ± 3%    +4.70%  (p=0.016 n=4+5)
Read/lz4-1M_66_kB-4                 770kB ± 0%       812kB ± 4%    +5.43%  (p=0.016 n=4+5)
Read/lz4_66_kB-4                    770kB ± 0%       792kB ± 0%    +2.83%  (p=0.029 n=4+4)
Read/snappy_66_kB-4                 361kB ± 0%       376kB ± 0%    +4.02%  (p=0.008 n=5+5)
Read/flate_66_kB-4                  715kB ± 0%       743kB ± 0%    +3.88%  (p=0.016 n=5+4)
Read/none_262_kB-4                 11.2kB ± 0%      11.7kB ± 0%    +4.12%  (p=0.016 n=5+4)
Read/gzip_262_kB-4                  198kB ± 0%       203kB ± 0%    +2.93%  (p=0.016 n=4+5)
Read/lz4-64k_262_kB-4               169kB ± 0%       174kB ± 0%    +2.75%  (p=0.008 n=5+5)
Read/lz4-256k_262_kB-4              177kB ± 0%       182kB ± 0%    +2.79%  (p=0.008 n=5+5)
Read/lz4-1M_262_kB-4                181kB ± 0%       186kB ± 0%    +2.77%  (p=0.008 n=5+5)
Read/lz4_262_kB-4                   181kB ± 0%       186kB ± 0%    +2.73%  (p=0.016 n=5+4)
Read/snappy_262_kB-4               87.8kB ± 0%      90.1kB ± 0%    +2.58%  (p=0.008 n=5+5)
Read/flate_262_kB-4                 197kB ± 0%       203kB ± 0%    +2.63%  (p=0.016 n=5+4)
Read/zstd_262_kB-4                  553MB ± 0%       552MB ± 0%    -0.15%  (p=0.032 n=5+5)
Read/none_524_kB-4                 5.94kB ± 0%      6.16kB ± 0%    +3.76%  (p=0.016 n=5+4)
Read/gzip_524_kB-4                 96.2kB ± 0%      98.3kB ± 0%    +2.16%  (p=0.016 n=4+5)
Read/lz4-64k_524_kB-4              70.9kB ± 0%      72.8kB ± 0%    +2.71%  (p=0.016 n=4+5)
Read/lz4-256k_524_kB-4             73.8kB ± 0%      75.8kB ± 0%    +2.68%  (p=0.008 n=5+5)
Read/lz4-1M_524_kB-4               76.5kB ± 0%      78.7kB ± 0%    +2.77%  (p=0.016 n=4+5)
Read/lz4_524_kB-4                  76.5kB ± 0%      78.6kB ± 0%    +2.74%  (p=0.016 n=5+4)
Read/snappy_524_kB-4               39.1kB ± 0%      39.5kB ± 0%    +1.19%  (p=0.008 n=5+5)
Read/flate_524_kB-4                95.4kB ± 0%      97.3kB ± 0%    +2.06%  (p=0.008 n=5+5)
Read/sample_none_66_kB-4           39.9kB ± 0%      41.7kB ± 0%    +4.62%  (p=0.008 n=5+5)
Read/sample_gzip_66_kB-4            686kB ± 0%       715kB ± 0%    +4.23%  (p=0.016 n=4+5)
Read/sample_lz4-64k_66_kB-4         707kB ± 0%       728kB ± 0%    +2.88%  (p=0.016 n=5+4)
Read/sample_lz4-256k_66_kB-4        749kB ± 0%       783kB ± 4%    +4.65%  (p=0.008 n=5+5)
Read/sample_lz4_66_kB-4             748kB ± 0%       770kB ± 0%    +2.91%  (p=0.016 n=5+4)
Read/sample_snappy_66_kB-4          350kB ± 0%       364kB ± 0%    +4.04%  (p=0.029 n=4+4)
Read/sample_flate_66_kB-4           688kB ± 0%       716kB ± 0%    +4.08%  (p=0.029 n=4+4)
Read/sample_none_262_kB-4          10.6kB ± 0%      11.0kB ± 0%    +4.23%  (p=0.016 n=5+4)
Read/sample_gzip_262_kB-4           194kB ± 0%       199kB ± 0%    +2.48%  (p=0.016 n=4+5)
Read/sample_lz4-64k_262_kB-4        165kB ± 0%       169kB ± 0%    +2.86%  (p=0.016 n=4+5)
Read/sample_lz4-256k_262_kB-4       172kB ± 0%       177kB ± 0%    +2.92%  (p=0.016 n=4+5)
Read/sample_lz4-1M_262_kB-4         176kB ± 0%       181kB ± 0%    +2.91%  (p=0.029 n=4+4)
Read/sample_lz4_262_kB-4            176kB ± 0%       181kB ± 0%    +2.81%  (p=0.016 n=5+4)
Read/sample_snappy_262_kB-4        88.1kB ± 0%      90.8kB ± 0%    +3.11%  (p=0.029 n=4+4)
Read/sample_flate_262_kB-4          194kB ± 0%       198kB ± 0%    +2.13%  (p=0.029 n=4+4)
Read/sample_none_524_kB-4          5.56kB ± 0%      5.76kB ± 0%    +3.65%  (p=0.016 n=5+4)
Read/sample_gzip_524_kB-4          95.4kB ± 0%      96.9kB ± 0%    +1.56%  (p=0.008 n=5+5)
Read/sample_lz4-64k_524_kB-4       69.0kB ± 0%      71.0kB ± 0%    +2.87%  (p=0.008 n=5+5)
Read/sample_lz4-256k_524_kB-4      71.8kB ± 0%      73.8kB ± 0%    +2.79%  (p=0.016 n=4+5)
Read/sample_lz4-1M_524_kB-4        74.5kB ± 0%      76.6kB ± 0%    +2.75%  (p=0.008 n=5+5)
Read/sample_lz4_524_kB-4           74.5kB ± 0%     188.4kB ±89%  +152.98%  (p=0.008 n=5+5)
Read/sample_snappy_524_kB-4        40.7kB ± 1%      41.3kB ± 1%    +1.58%  (p=0.008 n=5+5)
Read/sample_flate_524_kB-4         95.2kB ± 0%      96.2kB ± 0%    +0.95%  (p=0.008 n=5+5)

name                           old allocs/op    new allocs/op    delta
Read/gzip_66_kB-4                   13.2k ± 0%       13.2k ± 0%    -0.04%  (p=0.000 n=5+4)
Read/zstd_66_kB-4                   18.0k ± 1%       17.8k ± 1%    -1.06%  (p=0.032 n=5+5)
Read/gzip_262_kB-4                  4.61k ± 0%       4.61k ± 0%    -0.11%  (p=0.029 n=4+4)
Read/flate_262_kB-4                 4.61k ± 0%       4.60k ± 0%    -0.11%  (p=0.008 n=5+5)
Read/zstd_262_kB-4                  7.10k ± 2%       6.88k ± 3%    -3.01%  (p=0.008 n=5+5)
Read/gzip_524_kB-4                  2.62k ± 0%       2.62k ± 0%    -0.06%  (p=0.000 n=4+5)
Read/zstd_524_kB-4                  6.13k ± 2%       5.94k ± 2%    -3.03%  (p=0.024 n=5+5)
Read/sample_flate_66_kB-4           13.2k ± 0%       13.2k ± 0%    -0.01%  (p=0.029 n=4+4)
Read/sample_flate_262_kB-4          4.62k ± 0%       4.61k ± 0%    -0.22%  (p=0.029 n=4+4)
Read/sample_zstd_262_kB-4           7.12k ± 1%       6.97k ± 1%    -2.11%  (p=0.008 n=5+5)
Read/sample_gzip_524_kB-4           2.63k ± 0%       2.63k ± 0%    -0.11%  (p=0.008 n=5+5)
Read/sample_flate_524_kB-4          2.63k ± 0%       2.62k ± 0%    -0.15%  (p=0.008 n=5+5)
Read/sample_zstd_524_kB-4           6.11k ± 2%       5.89k ± 1%    -3.68%  (p=0.016 n=5+4)
```

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
2 people authored and changhyuni committed Nov 8, 2022
1 parent 94d6849 commit 11bb5db
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [6179](https://github.com/grafana/loki/pull/6179) **chaudum**: Add new HTTP endpoint to delete ingester ring token file and shutdown process gracefully
* [5997](https://github.com/grafana/loki/pull/5997) **simonswine**: Querier: parallize label queries to both stores.
* [5406](https://github.com/grafana/loki/pull/5406) **ctovena**: Revise the configuration parameters that configure the usage report to grafana.com.
* [7264](https://github.com/grafana/loki/pull/7264) **bboreham**: Chunks: decode varints directly from byte buffer, for speed.
* [7263](https://github.com/grafana/loki/pull/7263) **bboreham**: Dependencies: klauspost/compress package to v1.15.11; improves performance.

##### Fixes
Expand Down
80 changes: 47 additions & 33 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package chunkenc

import (
"bufio"
"bytes"
"context"
"encoding/binary"
Expand Down Expand Up @@ -1114,12 +1113,14 @@ type bufferedIterator struct {
origBytes []byte
stats *stats.Context

bufReader *bufio.Reader
reader io.Reader
pool ReaderPool
reader io.Reader
pool ReaderPool

err error

readBuf [20]byte // Enough bytes to store two varints.
readBufValid int // How many bytes are left in readBuf from previous read.

buf []byte // The buffer for a single entry.
currLine []byte // the current line, this is the same as the buffer but sliced the the line size.
currTs int64
Expand All @@ -1134,7 +1135,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *buffer
stats: stats,
origBytes: b,
reader: nil, // will be initialized later
bufReader: nil, // will be initialized later
pool: pool,
}
}
Expand All @@ -1146,8 +1146,12 @@ func (si *bufferedIterator) Next() bool {

if !si.closed && si.reader == nil {
// initialize reader now, hopefully reusing one of the previous readers
si.reader = si.pool.GetReader(bytes.NewBuffer(si.origBytes))
si.bufReader = BufReaderPool.Get(si.reader)
var err error
si.reader, err = si.pool.GetReader(bytes.NewBuffer(si.origBytes))
if err != nil {
si.err = err
return false
}
}

ts, line, ok := si.moveNext()
Expand All @@ -1166,22 +1170,30 @@ func (si *bufferedIterator) Next() bool {

// moveNext moves the buffer to the next entry
func (si *bufferedIterator) moveNext() (int64, []byte, bool) {
ts, err := binary.ReadVarint(si.bufReader)
if err != nil {
if err != io.EOF {
si.err = err
}
return 0, nil, false
}

l, err := binary.ReadUvarint(si.bufReader)
if err != nil {
if err != io.EOF {
si.err = err
return 0, nil, false
var ts int64
var tWidth, lWidth, lineSize, lastAttempt int
for lWidth == 0 { // Read until both varints have enough bytes.
n, err := si.reader.Read(si.readBuf[si.readBufValid:])
si.readBufValid += n
if err != nil {
if err != io.EOF {
si.err = err
return 0, nil, false
}
if si.readBufValid == 0 { // Got EOF and no data in the buffer.
return 0, nil, false
}
if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time.
si.err = fmt.Errorf("invalid data in chunk")
return 0, nil, false
}
}
var l uint64
ts, tWidth = binary.Varint(si.readBuf[:si.readBufValid])
l, lWidth = binary.Uvarint(si.readBuf[tWidth:si.readBufValid])
lineSize = int(l)
lastAttempt = si.readBufValid
}
lineSize := int(l)

if lineSize >= maxLineLength {
si.err = fmt.Errorf("line too long %d, maximum %d", lineSize, maxLineLength)
Expand All @@ -1199,19 +1211,25 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) {
return 0, nil, false
}
}
si.buf = si.buf[:lineSize]
// Take however many bytes are left in the read buffer.
n := copy(si.buf, si.readBuf[tWidth+lWidth:si.readBufValid])
// Shift down what is still left in the fixed-size read buffer, if any.
si.readBufValid = copy(si.readBuf[:], si.readBuf[tWidth+lWidth+n:si.readBufValid])

// Then process reading the line.
n, err := si.bufReader.Read(si.buf[:lineSize])
if err != nil && err != io.EOF {
si.err = err
return 0, nil, false
}
for n < lineSize {
r, err := si.bufReader.Read(si.buf[n:lineSize])
if err != nil && err != io.EOF {
r, err := si.reader.Read(si.buf[n:lineSize])
n += r
if err != nil {
// We might get EOF after reading enough bytes to fill the buffer, which is OK.
// EOF and zero bytes read when the buffer isn't full is an error.
if err == io.EOF && r != 0 {
continue
}
si.err = err
return 0, nil, false
}
n += r
}
return ts, si.buf[:lineSize], true
}
Expand All @@ -1231,10 +1249,6 @@ func (si *bufferedIterator) close() {
si.pool.PutReader(si.reader)
si.reader = nil
}
if si.bufReader != nil {
BufReaderPool.Put(si.bufReader)
si.bufReader = nil
}

if si.buf != nil {
BytesBufferPool.Put(si.buf)
Expand Down
32 changes: 32 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,38 @@ func TestBlock(t *testing.T) {
}
}

func TestCorruptChunk(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()

chk := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
cases := []struct {
data []byte
}{
// Data that should not decode as lines from a chunk in any encoding.
{data: []byte{0}},
{data: []byte{1}},
{data: []byte("asdfasdfasdfqwyteqwtyeq")},
}

ctx, start, end := context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64)
for i, c := range cases {
chk.blocks = []block{{b: c.data}}
it, err := chk.Iterator(ctx, start, end, logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err, "case %d", i)

idx := 0
for it.Next() {
idx++
}
require.Error(t, it.Error(), "case %d", i)
require.NoError(t, it.Close())
}
})
}
}

func TestReadFormatV1(t *testing.T) {
t.Parallel()

Expand Down
105 changes: 50 additions & 55 deletions pkg/chunkenc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type WriterPool interface {

// ReaderPool similar to WriterPool but for reading chunks.
type ReaderPool interface {
GetReader(io.Reader) io.Reader
GetReader(io.Reader) (io.Reader, error)
PutReader(io.Reader)
}

Expand All @@ -44,13 +44,6 @@ var (
// Noop is the no compression pool
Noop NoopPool

// BufReaderPool is bufio.Reader pool
BufReaderPool = &BufioReaderPool{
pool: sync.Pool{
New: func() interface{} { return bufio.NewReader(nil) },
},
}

// BytesBufferPool is a bytes buffer used for lines decompressed.
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })
Expand Down Expand Up @@ -117,21 +110,32 @@ type GzipPool struct {
level int
}

// Gzip needs buffering to read efficiently.
// We need to be able to see the underlying gzip.Reader to Reset it.
type gzipBufferedReader struct {
*bufio.Reader
gzipReader *gzip.Reader
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *GzipPool) GetReader(src io.Reader) io.Reader {
func (pool *GzipPool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*gzip.Reader)
err := reader.Reset(src)
reader := r.(*gzipBufferedReader)
err := reader.gzipReader.Reset(src)
if err != nil {
panic(err)
return nil, err
}
return reader
reader.Reader.Reset(reader.gzipReader)
return reader, nil
}
reader, err := gzip.NewReader(src)
gzipReader, err := gzip.NewReader(src)
if err != nil {
panic(err)
return nil, err
}
return reader
return &gzipBufferedReader{
gzipReader: gzipReader,
Reader: bufio.NewReaderSize(gzipReader, 4*1024),
}, nil
}

// PutReader places back in the pool a CompressionReader
Expand Down Expand Up @@ -171,16 +175,16 @@ type FlatePool struct {
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *FlatePool) GetReader(src io.Reader) io.Reader {
func (pool *FlatePool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(flate.Resetter)
err := reader.Reset(src, nil)
if err != nil {
panic(err)
}
return reader.(io.Reader)
return reader.(io.Reader), nil
}
return flate.NewReader(src)
return flate.NewReader(src), nil
}

// PutReader places back in the pool a CompressionReader
Expand Down Expand Up @@ -219,21 +223,21 @@ type ZstdPool struct {
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *ZstdPool) GetReader(src io.Reader) io.Reader {
func (pool *ZstdPool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*zstd.Decoder)
err := reader.Reset(src)
if err != nil {
panic(err)
return nil, err
}
return reader
return reader, nil
}
reader, err := zstd.NewReader(src)
if err != nil {
panic(err)
return nil, err
}
runtime.SetFinalizer(reader, (*zstd.Decoder).Close)
return reader
return reader, nil
}

// PutReader places back in the pool a CompressionReader
Expand Down Expand Up @@ -267,16 +271,27 @@ type LZ4Pool struct {
bufferSize uint32 // available values: 1<<16 (64k), 1<<18 (256k), 1<<20 (1M), 1<<22 (4M). Defaults to 4MB, if not set.
}

// We need to be able to see the underlying lz4.Reader to Reset it.
type lz4BufferedReader struct {
*bufio.Reader
lz4Reader *lz4.Reader
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *LZ4Pool) GetReader(src io.Reader) io.Reader {
var r *lz4.Reader
func (pool *LZ4Pool) GetReader(src io.Reader) (io.Reader, error) {
var r *lz4BufferedReader
if pooled := pool.readers.Get(); pooled != nil {
r = pooled.(*lz4.Reader)
r.Reset(src)
r = pooled.(*lz4BufferedReader)
r.lz4Reader.Reset(src)
r.Reader.Reset(r.lz4Reader)
} else {
r = lz4.NewReader(src)
lz4Reader := lz4.NewReader(src)
r = &lz4BufferedReader{
lz4Reader: lz4Reader,
Reader: bufio.NewReaderSize(lz4Reader, 4*1024),
}
}
return r
return r, nil
}

// PutReader places back in the pool a CompressionReader
Expand Down Expand Up @@ -315,13 +330,13 @@ type SnappyPool struct {
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *SnappyPool) GetReader(src io.Reader) io.Reader {
func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*snappy.Reader)
reader.Reset(src)
return reader
return reader, nil
}
return snappy.NewReader(src)
return snappy.NewReader(src), nil
}

// PutReader places back in the pool a CompressionReader
Expand All @@ -347,8 +362,8 @@ func (pool *SnappyPool) PutWriter(writer io.WriteCloser) {
type NoopPool struct{}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *NoopPool) GetReader(src io.Reader) io.Reader {
return src
func (pool *NoopPool) GetReader(src io.Reader) (io.Reader, error) {
return src, nil
}

// PutReader places back in the pool a CompressionReader
Expand All @@ -367,23 +382,3 @@ func (pool *NoopPool) GetWriter(dst io.Writer) io.WriteCloser {

// PutWriter places back in the pool a CompressionWriter
func (pool *NoopPool) PutWriter(writer io.WriteCloser) {}

// BufioReaderPool is a bufio reader that uses sync.Pool.
type BufioReaderPool struct {
pool sync.Pool
}

// Get returns a bufio.Reader which reads from r. The buffer size is that of the pool.
func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader {
buf := bufPool.pool.Get().(*bufio.Reader)
if buf == nil {
return bufio.NewReaderSize(r, 4*1024)
}
buf.Reset(r)
return buf
}

// Put puts the bufio.Reader back into the pool.
func (bufPool *BufioReaderPool) Put(b *bufio.Reader) {
bufPool.pool.Put(b)
}
3 changes: 2 additions & 1 deletion pkg/chunkenc/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func TestPool(t *testing.T) {
require.NoError(t, w.Close())

require.True(t, buf.Len() != 0, enc)
r := rpool.GetReader(bytes.NewBuffer(buf.Bytes()))
r, err := rpool.GetReader(bytes.NewBuffer(buf.Bytes()))
require.NoError(t, err)
defer rpool.PutReader(r)
n, err := r.Read(res)
if err != nil {
Expand Down
Loading

0 comments on commit 11bb5db

Please sign in to comment.