diff --git a/LICENSE b/LICENSE index 0a42d389e4c3c..9b364a4d00079 100644 --- a/LICENSE +++ b/LICENSE @@ -771,6 +771,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. +======================================================================== +For TestTimSort (core/src/test/java/org/apache/spark/util/collection/TestTimSort.java): +======================================================================== +Copyright (C) 2015 Stijn de Gouw + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. ======================================================================== For LimitedInputStream diff --git a/core/src/main/java/org/apache/spark/util/collection/TimSort.java b/core/src/main/java/org/apache/spark/util/collection/TimSort.java index 409e1a41c5d49..a90cc0e761f62 100644 --- a/core/src/main/java/org/apache/spark/util/collection/TimSort.java +++ b/core/src/main/java/org/apache/spark/util/collection/TimSort.java @@ -425,15 +425,14 @@ private void pushRun(int runBase, int runLen) { private void mergeCollapse() { while (stackSize > 1) { int n = stackSize - 2; - if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) { + if ( (n >= 1 && runLen[n-1] <= runLen[n] + runLen[n+1]) + || (n >= 2 && runLen[n-2] <= runLen[n] + runLen[n-1])) { if (runLen[n - 1] < runLen[n + 1]) n--; - mergeAt(n); - } else if (runLen[n] <= runLen[n + 1]) { - mergeAt(n); - } else { + } else if (runLen[n] > runLen[n + 1]) { break; // Invariant is established } + mergeAt(n); } } diff --git a/core/src/test/java/org/apache/spark/util/collection/TestTimSort.java b/core/src/test/java/org/apache/spark/util/collection/TestTimSort.java new file mode 100644 index 0000000000000..45772b6d3c20d --- /dev/null +++ b/core/src/test/java/org/apache/spark/util/collection/TestTimSort.java @@ -0,0 +1,134 @@ +/** + * Copyright 2015 Stijn de Gouw + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.spark.util.collection; + +import java.util.*; + +/** + * This codes generates a int array which fails the standard TimSort. + * + * The blog that reported the bug + * http://www.envisage-project.eu/timsort-specification-and-verification/ + * + * This codes was originally wrote by Stijn de Gouw, modified by Evan Yu to adapt to + * our test suite. + * + * https://github.com/abstools/java-timsort-bug + * https://github.com/abstools/java-timsort-bug/blob/master/LICENSE + */ +public class TestTimSort { + + private static final int MIN_MERGE = 32; + + /** + * Returns an array of integers that demonstrate the bug in TimSort + */ + public static int[] getTimSortBugTestSet(int length) { + int minRun = minRunLength(length); + List runs = runsJDKWorstCase(minRun, length); + return createArray(runs, length); + } + + private static int minRunLength(int n) { + int r = 0; // Becomes 1 if any 1 bits are shifted off + while (n >= MIN_MERGE) { + r |= (n & 1); + n >>= 1; + } + return n + r; + } + + private static int[] createArray(List runs, int length) { + int[] a = new int[length]; + Arrays.fill(a, 0); + int endRun = -1; + for (long len : runs) { + a[endRun += len] = 1; + } + a[length - 1] = 0; + return a; + } + + /** + * Fills runs with a sequence of run lengths of the form
+ * Y_n x_{n,1} x_{n,2} ... x_{n,l_n}
+ * Y_{n-1} x_{n-1,1} x_{n-1,2} ... x_{n-1,l_{n-1}}
+ * ...
+ * Y_1 x_{1,1} x_{1,2} ... x_{1,l_1}
+ * The Y_i's are chosen to satisfy the invariant throughout execution, + * but the x_{i,j}'s are merged (by TimSort.mergeCollapse) + * into an X_i that violates the invariant. + * + * @param length The sum of all run lengths that will be added to runs. + */ + private static List runsJDKWorstCase(int minRun, int length) { + List runs = new ArrayList(); + + long runningTotal = 0, Y = minRun + 4, X = minRun; + + while (runningTotal + Y + X <= length) { + runningTotal += X + Y; + generateJDKWrongElem(runs, minRun, X); + runs.add(0, Y); + // X_{i+1} = Y_i + x_{i,1} + 1, since runs.get(1) = x_{i,1} + X = Y + runs.get(1) + 1; + // Y_{i+1} = X_{i+1} + Y_i + 1 + Y += X + 1; + } + + if (runningTotal + X <= length) { + runningTotal += X; + generateJDKWrongElem(runs, minRun, X); + } + + runs.add(length - runningTotal); + return runs; + } + + /** + * Adds a sequence x_1, ..., x_n of run lengths to runs such that:
+ * 1. X = x_1 + ... + x_n
+ * 2. x_j >= minRun for all j
+ * 3. x_1 + ... + x_{j-2} < x_j < x_1 + ... + x_{j-1} for all j
+ * These conditions guarantee that TimSort merges all x_j's one by one + * (resulting in X) using only merges on the second-to-last element. + * + * @param X The sum of the sequence that should be added to runs. + */ + private static void generateJDKWrongElem(List runs, int minRun, long X) { + for (long newTotal; X >= 2 * minRun + 1; X = newTotal) { + //Default strategy + newTotal = X / 2 + 1; + //Specialized strategies + if (3 * minRun + 3 <= X && X <= 4 * minRun + 1) { + // add x_1=MIN+1, x_2=MIN, x_3=X-newTotal to runs + newTotal = 2 * minRun + 1; + } else if (5 * minRun + 5 <= X && X <= 6 * minRun + 5) { + // add x_1=MIN+1, x_2=MIN, x_3=MIN+2, x_4=X-newTotal to runs + newTotal = 3 * minRun + 3; + } else if (8 * minRun + 9 <= X && X <= 10 * minRun + 9) { + // add x_1=MIN+1, x_2=MIN, x_3=MIN+2, x_4=2MIN+2, x_5=X-newTotal to runs + newTotal = 5 * minRun + 5; + } else if (13 * minRun + 15 <= X && X <= 16 * minRun + 17) { + // add x_1=MIN+1, x_2=MIN, x_3=MIN+2, x_4=2MIN+2, x_5=3MIN+4, x_6=X-newTotal to runs + newTotal = 8 * minRun + 9; + } + runs.add(0, X - newTotal); + } + runs.add(0, X); + } +} diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index 0cb1ed7397655..e0d6cc16bde05 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -65,6 +65,13 @@ class SorterSuite extends FunSuite { } } + // http://www.envisage-project.eu/timsort-specification-and-verification/ + test("SPARK-5984 TimSort bug") { + val data = TestTimSort.getTimSortBugTestSet(67108864) + new Sorter(new IntArraySortDataFormat).sort(data, 0, data.length, Ordering.Int) + (0 to data.length - 2).foreach(i => assert(data(i) <= data(i + 1))) + } + /** Runs an experiment several times. */ def runExperiment(name: String, skip: Boolean = false)(f: => Unit, prepare: () => Unit): Unit = { if (skip) {