Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3294][SQL] Eliminates boxing costs from in-memory columnar storage #2327

Closed
wants to merge 13 commits into from

Conversation

liancheng
Copy link
Contributor

This is a major refactoring of the in-memory columnar storage implementation, aims to eliminate boxing costs from critical paths (building/accessing column buffers) as much as possible. The basic idea is to refactor all major interfaces into a row-based form and use them together with SpecificMutableRow. The difficult part is how to adapt all compression schemes, esp. RunLengthEncoding and DictionaryEncoding, to this design. Since in-memory compression is disabled by default for now, and this PR should be strictly better than before no matter in-memory compression is enabled or not, maybe I'll finish that part in another PR.

UPDATE This PR also took the chance to optimize HiveTableScan by

  1. leveraging SpecificMutableRow to avoid boxing cost, and
  2. building specific Writable unwrapper functions a head of time to avoid per row pattern matching and branching costs.

TODO

  • Benchmark
  • Eliminate boxing costs in RunLengthEncoding (left to future PRs)
  • Eliminate boxing costs in DictionaryEncoding (seems not easy to do without specializing DictionaryEncoding for every supported column type) (left to future PRs)

Micro benchmark

The benchmark uses a 10 million line CSV table consists of bytes, shorts, integers, longs, floats and doubles, measures the time to build the in-memory version of this table, and the time to scan the whole in-memory table.

Benchmark code can be found here. Script used to generate the input table can be found here.

Speedup:

  • Hive table scanning + column buffer building: 18.74%

    The original benchmark uses 1K as in-memory batch size, when increased to 10K, it can be 28.32% faster.

  • In-memory table scanning: 7.95%

Before:

Building Scanning
1 16472 525
2 16168 530
3 16386 529
4 16184 538
5 16209 521
Average 16283.8 528.6

After:

Building Scanning
1 13124 458
2 13260 529
3 12981 463
4 13214 483
5 13583 500
Average 13232.4 486.6

override def update(ordinal: Int, value: Any): Unit = values(ordinal).update(value)
override def update(ordinal: Int, value: Any) {
if (value == null) setNullAt(ordinal) else values(ordinal).update(value)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is submitted separately in #2325 as this PR may take longer time to finish.

@SparkQA
Copy link

SparkQA commented Sep 9, 2014

QA tests have started for PR 2327 at commit 269bd78.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 9, 2014

QA tests have finished for PR 2327 at commit 269bd78.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder extends compression.Encoder[IntegerType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type])
    • class Encoder extends compression.Encoder[LongType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type])

@aarondav
Copy link
Contributor

aarondav commented Sep 9, 2014

Out of curiosity, does this also eliminate boxing for nested data types?

@liancheng
Copy link
Contributor Author

No, unlike Parquet, currently our in-memory columnar format doesn't support complex nested objects well. They are just serialized by Kryo and stored as opaque byte arrays.

@marmbrus
Copy link
Contributor

@aarondav to expand on that, as soon as there is any nesting all of our clever tricks for eliminating allocations go out the window. We can probably improve this in future releases.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2327 at commit 97bbc4e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have finished for PR 2327 at commit 97bbc4e.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

ok to test

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2327 at commit 489f97b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have finished for PR 2327 at commit 489f97b.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder extends compression.Encoder[IntegerType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type])
    • class Encoder extends compression.Encoder[LongType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type])

@liancheng
Copy link
Contributor Author

test this please

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 2327 at commit e5d2cf2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 2327 at commit e5d2cf2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 2327 at commit e5d2cf2.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder extends compression.Encoder[IntegerType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type])
    • class Encoder extends compression.Encoder[LongType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type])

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

Tests timed out after a configured wait of 120m.

@liancheng
Copy link
Contributor Author

@marmbrus Please help review this one. HiveTableScan is also optimized BTW.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have started for PR 2327 at commit e5d2cf2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 11, 2014

QA tests have finished for PR 2327 at commit e5d2cf2.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder extends compression.Encoder[IntegerType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type])
    • class Encoder extends compression.Encoder[LongType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type])

@marmbrus
Copy link
Contributor

I need to look this over still, but want to remove WIP?

@@ -51,10 +51,12 @@ private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](
def hasNext = buffer.hasRemaining

def extractTo(row: MutableRow, ordinal: Int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This style is going to go away in 2.12 or 2.13 I think. Should be :Unit =

@marmbrus
Copy link
Contributor

Nice speed ups. I think they might be even more pronounced when there are multiple threads fighting for the GC.

Minor comments only. Will merge after they are addressed.

@liancheng liancheng changed the title [SPARK-3294][SQL] WIP: eliminates boxing costs from in-memory columnar storage [SPARK-3294][SQL] Eliminates boxing costs from in-memory columnar storage Sep 11, 2014
@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have started for PR 2327 at commit 4419fe4.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 13, 2014

QA tests have finished for PR 2327 at commit 4419fe4.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T]
    • class Encoder extends compression.Encoder[IntegerType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type])
    • class Encoder extends compression.Encoder[LongType.type]
    • class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type])

@marmbrus
Copy link
Contributor

Thanks! I've merged this to master.

@asfgit asfgit closed this in 7404924 Sep 13, 2014
@liancheng liancheng deleted the prevent-boxing/unboxing branch September 24, 2014 00:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants