Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7214] Reserve space for unrolling even when MemoryStore nearly full #5784

Closed
wants to merge 5 commits into from

Conversation

charlesreiss
Copy link
Contributor

If making the initial reservation of space to unrolling a block fails, attempt to drop blocks to make room before giving up.

@SparkQA
Copy link

SparkQA commented Apr 29, 2015

Test build #31305 has finished for PR 5784 at commit 6c57a97.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 29, 2015

Test build #31308 has finished for PR 5784 at commit 16797ea.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public final class UnsafeFixedWidthAggregationMap
    • public static class MapEntry
    • public final class UnsafeRow implements MutableRow
    • class UnsafeRowConverter(fieldTypes: Array[DataType])
    • case class Hypot(
    • case class Pow(left: Expression, right: Expression) extends BinaryMathExpression(math.pow, "POWER")
    • abstract class MathematicalExpression(f: Double => Double, name: String)
    • case class Acos(child: Expression) extends MathematicalExpression(math.acos, "ACOS")
    • case class Asin(child: Expression) extends MathematicalExpression(math.asin, "ASIN")
    • case class Atan(child: Expression) extends MathematicalExpression(math.atan, "ATAN")
    • case class Cbrt(child: Expression) extends MathematicalExpression(math.cbrt, "CBRT")
    • case class Ceil(child: Expression) extends MathematicalExpression(math.ceil, "CEIL")
    • case class Cos(child: Expression) extends MathematicalExpression(math.cos, "COS")
    • case class Cosh(child: Expression) extends MathematicalExpression(math.cosh, "COSH")
    • case class Exp(child: Expression) extends MathematicalExpression(math.exp, "EXP")
    • case class Expm1(child: Expression) extends MathematicalExpression(math.expm1, "EXPM1")
    • case class Floor(child: Expression) extends MathematicalExpression(math.floor, "FLOOR")
    • case class Log(child: Expression) extends MathematicalExpression(math.log, "LOG")
    • case class Log10(child: Expression) extends MathematicalExpression(math.log10, "LOG10")
    • case class Log1p(child: Expression) extends MathematicalExpression(math.log1p, "LOG1P")
    • case class Rint(child: Expression) extends MathematicalExpression(math.rint, "ROUND")
    • case class Signum(child: Expression) extends MathematicalExpression(math.signum, "SIGNUM")
    • case class Sin(child: Expression) extends MathematicalExpression(math.sin, "SIN")
    • case class Sinh(child: Expression) extends MathematicalExpression(math.sinh, "SINH")
    • case class Tan(child: Expression) extends MathematicalExpression(math.tan, "TAN")
    • case class Tanh(child: Expression) extends MathematicalExpression(math.tanh, "TANH")
    • case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
    • case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
    • case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
    • public final class PlatformDependent
    • public class ByteArrayMethods
    • public final class LongArray
    • public final class BitSet
    • public final class BitSetMethods
    • public final class Murmur3_x86_32
    • public final class BytesToBytesMap
    • public final class Location
    • class Doubling implements HashMapGrowthStrategy
    • public class ExecutorMemoryManager
    • public class HeapMemoryAllocator implements MemoryAllocator
    • public class MemoryBlock extends MemoryLocation
    • public class MemoryLocation
    • public final class TaskMemoryManager
    • public class UnsafeMemoryAllocator implements MemoryAllocator
  • This patch adds the following new dependencies:
    • spark-unsafe_2.10-1.4.0-SNAPSHOT.jar

@shaneknapp
Copy link
Contributor

@brennonyork -- something seems strange w/the output of pr_public_classes.sh...

i checked the jenkins output also, and it's reporting that the SHA1 is "SHA1: origin/pr/5784/merge", which seems not right. thoughts?

@brennonyork
Copy link

@shaneknapp well, to start, I couldn't agree more that something looks very fishy with pr_public_classes. That was a direct port from the previous code though which makes it even more interesting :/

To address the SHA1 hash its getting pulled from, as I'm sure you know, this code here which, I'll admit, is interesting in and of itself in that it could produce the SHA1 as an actual hash or what we see above (in the case that the patch can merge without conflict into master).

That said pr_public_classes only relies on the ghprbActualCommit and not the SHA1 so unless that were empty somehow I'm not immediately sure how this could be happening (of which it isn't empty according to Jenkins).

My only thought, and I'm hoping you could shed some light here, would be a possible race condition from some shared state on each Jenkins box such that the Bash calls (or environment variables) aren't atomic to the PR they're building. Thoughts on that? I'll continue to dig and see what I can find.

@shaneknapp
Copy link
Contributor

so, believe it or not, the jenkins bash environment is actually atomic (all machines are identical hardware, same kernel, system updates, package versions, ssh bash environment, etc). you can see the bash environment for the worker this job executed on here (https://amplab.cs.berkeley.edu/jenkins/computer/amp-jenkins-worker-08/log), and it's identical to all of the other ones.

regarding sha1, derp. i totally knew that. :p

anyways, i'll poke around those shell scripts some more and see if there's anything i can tweak to better report/fail on errors. i'm also going to redir a lot of the git error output to /dev/null as it's been cluttering up the test logs. i know there's a PR to move this stuff to python, but for the time being i'll see if i can't make things better. once the PR is done i'll add you (@brennonyork) as a reviewer.

@shaneknapp
Copy link
Contributor

jenkins, test this please

@brennonyork
Copy link

thanks for that clarity @shaneknapp. Looks like I don't have access to see the Jenkins environment variables from the link you sent (unless it became stale before I clicked), but I'll look for the review note and provide what I can!

@SparkQA
Copy link

SparkQA commented Apr 30, 2015

Test build #31441 has finished for PR 5784 at commit 16797ea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@squito
Copy link
Contributor

squito commented Apr 30, 2015

I've been trying to come up with a scenario when this would be undesirable. I've only come up with one scenario: on the second put, you have less than the initial room available, AND the second block is bigger than your total memory. After this change, you'll end up with nothing in the cache, but before, you would still have the first block.

however, that is quite a stretch: you could end up in this situation before in any case, if you had more than the initial room available to start with, but the second block is bigger than your total memory.

does that seem right? not that this is a show stopper, just trying to make sure I understand

@@ -280,20 +283,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
// Hold the accounting lock, in case another thread concurrently puts a block that
// takes up the unrolling space we just ensured here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this comment into the new method

@charlesreiss
Copy link
Contributor Author

The idea about it happening anyways with slightly different amount of space is right; often fragmentation will mean that there will be more than unrollMemoryThreshold free in the MemoryStore even though it is essentially full.

Note that the total amount passed to ensureFreeSpace() is limited by maxUnrollMemory (taking into account already-reserved unrolling space), so, if there are no "huge" blocks, there's a limit to how much can be spuriously dropped.

* necessary. If blocks are dropped, adds them to droppedBlocks. Returns whether the
* request was granted, and any blocks that were dropped trying to grant it.
*/
def reserveUnrollMemoryForThisThreadDroppingBlocks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this private

@squito
Copy link
Contributor

squito commented May 1, 2015

just some teeny style things, otherwise lgtm

@squito
Copy link
Contributor

squito commented May 1, 2015

just curious, I assume this matters only when spark.storage.unrollMemoryThreshold is changed to something bigger?

@SparkQA
Copy link

SparkQA commented May 1, 2015

Test build #31481 has finished for PR 5784 at commit fa212c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@charlesreiss
Copy link
Contributor Author

No; one just needs to have enough small blocks to store that the free space gets below 1MB (and not be unpersist'ing/etc. and have the timing work out that storing the last block isn't stopped by unroll space reservations).

@SparkQA
Copy link

SparkQA commented May 1, 2015

Test build #31495 has finished for PR 5784 at commit 63afdfb.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@squito
Copy link
Contributor

squito commented May 1, 2015

Jenkins, retest this please

@squito
Copy link
Contributor

squito commented May 1, 2015

ah I see, I'm glad I asked! thanks for clarifying. I'm pretty sure that last set of test failures was spurious, so assuming the latest set of tests pass, I think this is good. Since the memory store is so central, though, I'd like to err on the side of caution. @aarondav @rxin do you want to take a quick look?

@SparkQA
Copy link

SparkQA commented May 1, 2015

Test build #31560 has finished for PR 5784 at commit 63afdfb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented May 2, 2015

cc @andrewor14 who worked a lot on this

@charlesreiss
Copy link
Contributor Author

@andrewor14 @rxin Update on this?

@andrewor14
Copy link
Contributor

Hi @woggle thanks for the patch, but I won't have the bandwidth to look at this in detail until after the 1.4 release. I will have a look then.

@JoshRosen
Copy link
Contributor

Hey @andrewor14, given our recent discussions / work on unrolling, do you think this PR is still necessary / relevant?

@andrewor14
Copy link
Contributor

@woggle It turns out that my recent patch #9000 unintentionally fixed this issue :). In particular, reserving the initial memory now drops blocks (see this line). Can you verify?

@andrewor14
Copy link
Contributor

I think this patch is largely out of date by now given the new memory management patches. I would recommend we close this issue.

@charlesreiss
Copy link
Contributor Author

Agreed. I haven't had a chance to actually make sure your ptach #9000 it fixes the problem, but it looks like it should.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants