Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up temporary files created by forceToDiskExecution #1621

Merged

Conversation

ajohnson-stripe-zz
Copy link
Contributor

This adds functionality for WriteExecution to keep track of temporary files created and then delete them after execution has finished. Currently this is only used by forceToDiskExecution, which previously would leave around temporary files every time it was used.

This fixes #1615.

Copy link

@oscar-stripe oscar-stripe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Thanks for attacking this problem.

val newFn = { (conf: Config, mode: Mode) =>
(fn(conf, mode), otherFn(conf, mode))
}
WriteExecution(head, h :: t ::: tail, newFn)
WriteExecution(head, h :: t ::: tail, newFn, tempFiles)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you need to merge the sets from the left and right? Seems like we are losing this.tempFilesToCleanup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch, I do need that here.

Set(tempFile)
}

Execution.write(writeFn, readFn, filesToDeleteFn)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you test a .zip to be sure we don't lose files there?

try {
val path = new Path(file)
if (fs.exists(path)) {
fs.delete(path, true)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment what the boolean means at this call site?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

case hdfsMode: HadoopMode => FileSystem.get(hdfsMode.jobConf)
}

filesToCleanup foreach { file: String =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we prefer the . style and use filesToCleanup.foreach {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@ajohnson-stripe-zz ajohnson-stripe-zz force-pushed the forceToDiskExecution-cleanup branch 2 times, most recently from f8a6cf2 to 53de7ff Compare November 14, 2016 21:53
@ajohnson-stripe-zz
Copy link
Contributor Author

Thanks for the feedback @oscar-stripe. I incorporated in the changes you suggested.

I also noticed that the REPL expects that the temporary outputs from forceToDiskExecution stick around in things like the snapshot method on ShellTypedPipe. As such I moved the actual deletion to happen in a shutdown hook on JVM exit.

@ajohnson-stripe-zz
Copy link
Contributor Author

Hmm, I'm not sure I see why the CI build is failing. It's complaining about failing when compiling maple: .[error] (maple/compile:compileIncremental) javac returned nonzero exit code, but I only see warnings from the compiler, no errors. I can build that fine locally too.

I do see this in the log, maybe related?

.tar: Unexpected EOF in archive
tar: rmtlseek not stopped at a record boundary
tar: Error is not recoverable: exiting now

}
} catch {
// If we fail in deleting a temp file, log the error but don't fail the run
case e: Exception => LOG.info(s"Unable to delete temp file $file")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pass e to the log statement? that way we get the message and stacktrace? Also should this be a warn?

@@ -284,6 +288,34 @@ object Execution {
}

/**
* This is a Thread used as a shutdown hook to clean up temporary files created by some Execution
*/
case class TempFileCleanup(filesToCleanup: mutable.Set[String], mode: Mode) extends Thread {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we're just iterating over the filesToCleanup in this case class, maybe we can make this a more generic interface rather than mutable.Set? Iterable / Seq / Set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// The "true" parameter here indicates that we should recursively delete everything under the given path
fs.delete(path, true)
}
} catch {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as this is part of a shutdown hook, should we add a catch all clause?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean catch { case e: Throwable => so we catch anything? Is that what you mean?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oscar-stripe yeah. If we for whatever reason throw an caught exception in this code that will result in the shutdown code not being run completely as I understand it.

@@ -284,6 +288,34 @@ object Execution {
}

/**
* This is a Thread used as a shutdown hook to clean up temporary files created by some Execution
*/
case class TempFileCleanup(filesToCleanup: mutable.Set[String], mode: Mode) extends Thread {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe also call out that if the job is aborted this will not cleanup stuff (if folks haven't read / know about shutdown hook semantics)

@@ -284,6 +288,34 @@ object Execution {
}

/**
* This is a Thread used as a shutdown hook to clean up temporary files created by some Execution
*/
case class TempFileCleanup(filesToCleanup: mutable.Set[String], mode: Mode) extends Thread {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to taking an immutable Iterable here. Can we make the class private?

// The "true" parameter here indicates that we should recursively delete everything under the given path
fs.delete(path, true)
}
} catch {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean catch { case e: Throwable => so we catch anything? Is that what you mean?

@ajohnson-stripe-zz ajohnson-stripe-zz force-pushed the forceToDiskExecution-cleanup branch from 53de7ff to b34d3d4 Compare November 15, 2016 12:56
@ajohnson-stripe-zz
Copy link
Contributor Author

I've incorporated the next round of feedback. I'm still looking into it, but I'm not able to reproduce the CI build failure.

@johnynek
Copy link
Collaborator

I restarted. Let's see if it is a CI issue.

On Tue, Nov 15, 2016 at 8:17 AM Andrew Johnson notifications@github.com
wrote:

I've incorporated the next round of feedback. I'm still looking into it,
but I'm not able to reproduce the CI build failure.


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
#1621 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEJdvQAZ-YG1YeGlmmZu_ynvCm-F0h0ks5q-dsLgaJpZM4Kxsnr
.

@piyushnarang
Copy link
Collaborator

@ajohnson-stripe changes look good to me. 👍

@@ -313,7 +347,7 @@ object Execution {
new EvalCache {
override protected[EvalCache] val messageQueue: LinkedBlockingQueue[EvalCache.FlowDefAction] = self.messageQueue
override def start(): Unit = sys.error("Invalid to start child EvalCache")
override def finished(): Unit = sys.error("Invalid to finish child EvalCache")
override def finished(mode: Mode): Unit = sys.error("Invalid to finish child EvalCache")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to override the addFilesToCleanup to forward back to the parent list? I think any files added to the child will not be deleted, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that does need to be added

@@ -305,15 +338,17 @@ object Execution {
type Counters = Map[Long, ExecutionCounters]
private[this] val cache = new FutureCache[(Config, Execution[Any]), (Any, Counters)]
private[this] val toWriteCache = new FutureCache[(Config, ToWrite), Counters]
protected[EvalCache] val filesToCleanup = mutable.Set[String]()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think we should keep this private and override the add method. The reason is we forgot one more thing: this is not thread-safe, but we have so many threads in play, we should make this thread-safe.

I think we are modifying this value from several threads.

So, if we just forward the method, we can handle the locking a bit more cleanly inside that method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll switch that up.

}
} catch {
// If we fail in deleting a temp file, log the error but don't fail the run
case e: Throwable => LOG.warn(s"Unable to delete temp file $file", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style-wise I think case e => LOG.warn(s"Unable to delete temp file $file", e) is the same (without the :Throwable part)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same, but leaving out the : Throwable generates a compiler warning requesting that you be explicit. I see a mix of both in the codebase now, but I'm happy to go with whatever's the preferred style.

*
* If the job is aborted the shutdown hook may not run and the temporary files will not get cleaned up
*/
private[scalding] case class TempFileCleanup(filesToCleanup: Iterable[String], mode: Mode) extends Thread {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this extend Runnable instead? Does it have to extend Thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addShutdownHook will only take a Thread unfortunately

@johnynek
Copy link
Collaborator

👍 looks good to me.

@johnynek
Copy link
Collaborator

For throwable we should be explicit and avoid the warning in my view.
On Tue, Nov 15, 2016 at 12:05 Andrew Johnson notifications@github.com
wrote:

@ajohnson-stripe commented on this pull request.

In scalding-core/src/main/scala/com/twitter/scalding/Execution.scala
#1621:

  • override def run(): Unit = {
  •  val fs = mode match {
    
  •    case localMode: CascadingLocal => FileSystem.getLocal(new Configuration)
    
  •    case hdfsMode: HadoopMode =>  FileSystem.get(hdfsMode.jobConf)
    
  •  }
    
  •  filesToCleanup.foreach { file: String =>
    
  •    try {
    
  •      val path = new Path(file)
    
  •      if (fs.exists(path)) {
    
  •        // The "true" parameter here indicates that we should recursively delete everything under the given path
    
  •        fs.delete(path, true)
    
  •      }
    
  •    } catch {
    
  •      // If we fail in deleting a temp file, log the error but don't fail the run
    
  •      case e: Throwable => LOG.warn(s"Unable to delete temp file $file", e)
    

It is the same, but leaving out the : Throwable generates a compiler
warning requesting that you be explicit. I see a mix of both in the
codebase now, but I'm happy to go with whatever's the preferred style.


You are receiving this because you commented.

Reply to this email directly, view it on GitHub
#1621, or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEJdpRzfsyQs4txuM5bz7H4UcdXfz_9ks5q-hCHgaJpZM4Kxsnr
.

@johnynek johnynek merged commit d389c63 into twitter:develop Nov 17, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

forceToDiskExecution does not clean up data after the execution has completed
6 participants