Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29779][CORE] Compact old event log files and cleanup #26416

Closed
wants to merge 29 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Nov 6, 2019

What changes were proposed in this pull request?

This patch proposes to compact old event log files when end users enable rolling event log, and clean up these files after compaction.

Here the "compaction" really mean is filtering out listener events for finished/removed things - like jobs and SQL executions which take most of space for event log file. To achieve this, compactor does two phases reading: 1) tracking the live jobs and SQL executions (and more to add) 2) filtering events via leveraging the information about live things and rewriting to the "compacted" file.

This approach retains the ability of compatibility on event log file and adds the possibility of reducing the overall size of event logs. There's a downside here as well: executor metrics for tasks would be inaccurate, as compactor will filter out the task events which job is finished, but I don't feel it as a blocker.

The compaction is triggered whenever FsHistoryProvider considers the app log as reload target: we only run compaction if there's new event log file be available via memorizing the last index to try compaction, so the compaction is actually triggered per new addition of event log file.

Also, first phase of two phases is done incrementally (in other words, event filter builder is constructed and updated incrementally, per log file); so even compaction is triggered with new log files, only new log files will be read for updating event filter builder. Second phase still requires reading all of event log files per compacting, though.

Why are the changes needed?

One of major goal of SPARK-28594 is to prevent the event logs to become too huge, and SPARK-29779 achieves the goal. We've got another approach in prior, but the old approach required models in both KVStore and live entities to guarantee compatibility, while they're not designed to do so.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added UTs. Also manually tested with my event log file.


override def filterExecutorMetricsUpdate(
event: SparkListenerExecutorMetricsUpdate): Option[Boolean] = {
// FIXME: need to track live executors?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only want to track live executors, we need to address this kind of FIXME. If we also want to track dead executors, FIXME can be removed.

}

override def filterUnpersistRDD(event: SparkListenerUnpersistRDD): Option[Boolean] = {
// FIXME: need to track rdd ids?
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may need to rethink on this as I'm not fully sure whether it's safe to leave when it's not live RDD or vice versa.

}

trait EventFilter {
def filterStageCompleted(event: SparkListenerStageCompleted): Option[Boolean] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the result can have three kind of values: Some(true), Some(false), None.

Some(true) means "yes the event should be retained", and Some(false) means "no the event can be dropped", and None means "I'm not aware of this event, I don't care.".

The event will be filtered "out" only when all filters except ones returning None return Some(false).


private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN)

// FIXME: javadoc - caller should provide event log files (either compacted or original)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left FIXME for addressing scaladoc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why either compacted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that's missed point. Let me address the doc entirely. I'm now addressing UTs so it will be addressed later than that.

}
}

class FilteredEventLogFileRewriter(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I implemented this as an event listener, and realized the serialized JSON is no longer be same. Bigger size, and some information could even change (e.g. Spark version). So I changed the implementation to write its own string if it passes the filters.

s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " +
"the overall size of event log files.")
.intConf
.checkValue(_ > 0, "Max event log files to retain should be higher than 0.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we always have at least one last "original" event log - this is necessary since the app may be running. Actually it doesn't make sense for completed app as well - compaction will blow up everything.

import org.apache.spark.sql.execution.ui._
import org.apache.spark.sql.streaming.StreamingQueryListener

class SQLEventFilterBuilder extends SparkListener with EventFilterBuilder {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will track the live executions and relevant jobs - so even jobs are finished, if relevant live execution is live, then jobs relevant events will not be filtered out.

// first pass
val bus = new ReplayListenerBus()

val builders = ServiceLoader.load(classOf[EventFilterBuilder],
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we apply ServiceLoader since compactor is in spark-core and we have to have a filter implementation for spark-sql as well.

@HeartSaVioR
Copy link
Contributor Author

cc. @vanzin @squito @gaborgsomogyi @Ngone51
This patch is still in WIP, but remained things are minors and I would like to share in prior to reach a consensus whether the idea is feasible and makes sense. Please take a look. Thanks!

@SparkQA
Copy link

SparkQA commented Nov 7, 2019

Test build #113344 has finished for PR 26416 at commit b0e4a7c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder
  • class BasicEventFilter(trackListener: BasicEventFilterBuilder) extends EventFilter with Logging
  • trait EventFilterBuilder extends SparkListenerInterface
  • trait EventFilter
  • class EventLogFileCompactor(
  • class FilteredEventLogFileRewriter(
  • class CompactedEventLogFileWriter(
  • class SQLEventFilterBuilder extends SparkListener with EventFilterBuilder
  • class SQLLiveEntitiesEventFilter(trackListener: SQLEventFilterBuilder)

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Nov 7, 2019

I've just taken a high level look at it, read the design doc and have an initial question. Namely in the design doc there is a latestsnapshot, how does compact file relates to this?

@HeartSaVioR
Copy link
Contributor Author

So the patch implements "alternative" of design on second part, since we've found some major issues on snapshotting. (all models being participated in snapshot should guarantee compatibility, which heavily restricts (SQL)AppStatusListener' modification)

We've discussed to find an answer in this doc but anything doesn't seem to be easy.
https://docs.google.com/document/d/1LbPflpEyrh9jDunwmDnrTjtMybAXcGimy36AWcUQsq8/edit

And I found a new idea with minor (I hope) downside - this is the implementation of the idea. The idea is described on description of this PR.

If you're referring design doc, please consider this patch just overwrites the second part (compaction via snapshotting and cleanup event log files).

@gaborgsomogyi
Copy link
Contributor

Until now I've missed this doc somehow, started to process it...

Comment on lines 105 to 150
val files = if (lastCompactedFileIdx > -1) {
eventLogFiles.drop(lastCompactedFileIdx)
} else {
eventLogFiles
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use eventLogFiles.drop(lastCompactedFileIdx) should be ok, as drop can handle negative number correctly.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't go through all details, and just find some minors.

And IIUC, in this way, assuming that we have a stage with plenty of tasks, (e.g. 100000) and EventLoggingListener would write them into multiple rolled event files. Then, in FsHistoryProvider, when use RollingEventLogFilesFileReader to read those rolled event files (assume the stage is still running), we may compact all those event files(assume most/all of them should be compacted) in to a single compacted event file. Because, filters couldn't drop those events, mostly, task related events, e.g. SparkListenerTaskStartEvent, SparkListenerTaskEndEvent. And this would result in a still huge compacted event file. Is that right ?

Note, I'm not talking about this way is good or not above, just want to validate my understanding on this way.

Comment on lines 236 to 227
override def writeLine(line: String, flushLogger: Boolean): Unit = {
super.writeLine(line, flushLogger)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why needs to override ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that seems to be missed after refactoring. Will remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, in base class it's defined as protected which is correct as we don't want to allow event writer to write a line instead of json (though we know it's just a line). This class is a variant of writer which should write a line "as it is", instead of rebuilding json and writing it - hence I decide to not touch a base class and only expose the method for this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not breaking the Liskov rule so have no strong opinion but I think it would be better to make it public instead of overriding (less lines and less questions in other coders head).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So CompactedEventLogFileWriter is just one of exceptional case to not repeat the code in EventLogFileWriter. I didn't want to let CompactedEventLogFileWriter extend EventLogFileWriter but then lots of code will be redundant so had to have one exception.

By intention EventLogFileWriter should still hide the method. If we are not feeling convenient with this, I'll change CompactedEventLogFileWriter to not extend EventLogFileWriter, but try to share the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm changing my mind to just think every line is json which should be mostly correct - will call writeEvent instead.

val indices = eventLogFiles.map { file => getIndex(file.getPath.getName) }.sorted

val filesToRead = if (lastCompactedFileIdx > -1) {
eventLogFiles.drop(lastCompactedFileIdx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, just use eventLogFiles.drop(lastCompactedFileIdx) should be ok.

@@ -164,6 +164,7 @@ abstract class EventLogFileWriter(
object EventLogFileWriter {
// Suffix applied to the names of files still being written by applications.
val IN_PROGRESS = ".inprogress"
val COMPACTED = ".compact"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer .compacted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the naming of extension where the extension of state store's snapshotted file is ".snapshot". Let's hear more voices on this.

@HeartSaVioR
Copy link
Contributor Author

And IIUC, in this way, assuming that we have a stage with plenty of tasks, (e.g. 100000) and EventLoggingListener would write them into multiple rolled event files. Then, in FsHistoryProvider, when use RollingEventLogFilesFileReader to read those rolled event files (assume the stage is still running), we may compact all those event files(assume most/all of them should be compacted) in to a single compacted event file. Because, filters couldn't drop those events, mostly, task related events, e.g. SparkListenerTaskStartEvent, SparkListenerTaskEndEvent. And this would result in a still huge compacted event file. Is that right ?

So the target workload would matter. We can't make all happy - the major target of the feature is streaming query. Based on the goal, the most cases we'll have only one job being live (SQL execution may make couple of jobs being logged, but it's still just a batch) and the job wouldn't be super complicated. Latency is the first class concerns on streaming workloads - we may be OK with allowing couple of seconds per batch and even 10+ of seconds, but the streaming query where a batch takes minutes wouldn't be welcomed - less and less valuable if the latency goes higher. IMHO it's more likely a hypothesis if we assume even 10000s of tasks in streaming query.

@SparkQA
Copy link

SparkQA commented Nov 10, 2019

Test build #113522 has finished for PR 26416 at commit 053a91c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 10, 2019

Test build #113524 has finished for PR 26416 at commit cff732c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 11, 2019

Test build #113556 has finished for PR 26416 at commit 404e747.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Nov 11, 2019

So, IIUC, this isn't appropriate for continuous structured streaming ?

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First round.

  • As I see several public classes added, is it possible to hide them?
  • Do I see it correctly that the compact file can grow infinitely? How probable is that it grows heavily?

private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN =
ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain")
.doc("The maximum number of event log files which will be retained as non-compacted. " +
"By default, all event log files will be retained. Please set the configuration " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, all event log files will be retained.

Is it all then or high enough not to reach it? If all then I don't think it's necessary to call fileCompactor.compact, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is to reduce the number of added configurations so yes it's effectively disabling the option. Even the option is disabled I'd prefer caller to consistently call fileCompactor.compact and let fileCompactor decide everything, for separation of concerns.

Seems like fileCompactor.compact doesn't have early return for this - I'll add it instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think fileCompactor can also decide it, it's fine. What I wanted to reach is to cut unnecessary steps.


private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN)

// FIXME: javadoc - caller should provide event log files (either compacted or original)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why either compacted?

// will be skipped
def compact(eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = {
if (eventLogFiles.isEmpty) {
return Seq.empty[FileStatus]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not eventLogFiles?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No big deal. Will change.

@@ -218,12 +219,24 @@ class RollingEventLogFilesFileReader(

private lazy val eventLogFiles: Seq[FileStatus] = {
val eventLogFiles = files.filter(isEventLogFile).sortBy { status =>
getIndex(status.getPath.getName)
val filePath = status.getPath
var idx = getIndex(status.getPath.getName) + 0.0d
Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filePath can be used here, right?
BTW, why + 0.0d needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filePath can be used here, right?

Yes, nice catch!

BTW, why + 0.0d needed?

We need to make idx being double: we want to sort the files as index, but if both original event log and compact log exist for same index, we add 0.1 to compact log, and we take latest compact log and start from there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe .toDouble then to spare an addition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That also works and more intuitive. Thanks! Will change.

private def cleanupCompactedFiles(files: Seq[FileStatus]): Unit = {
files.foreach { file =>
try {
fs.delete(file.getPath, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not checking the returned boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do and leave log message.

hadoopConf: Configuration)
extends SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) {

override val logPath: String = originalFilePath.toUri.toString + EventLogFileWriter.COMPACTED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can call such vals COMPACTED_SUFFIX? When I see stripSuffix I know what it is but such case when stays alone maybe better to name it more meaningful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to make longer name for clarity but I feel the community prefers concise name more. Same applied to IN_PROGRESS. So let's hear more voices on this.

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._

class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder {
Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a weird construct. BasicEventFilterBuilder should build a filter which is SparkListener. Same applies to the other builder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The Builder was lately introduced so that is, but should be better to swap twos.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So no luck, as SparkListener is not a trait, but an abstract class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've invested time to understand your point. The following worked for me:

...
private[spark] class BasicEventFilterBuilder extends EventFilterBuilder
...
private[spark] trait EventFilter extends SparkListenerInterface
...
private[spark] class BasicEventFilter extends SparkListener with EventFilter with Logging
...

Testing would be a bit harder though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BasicEventFilterBuilder should extend SparkListener; it is optional for BasicEventFilter to extend SparkListener(Interface) but it is intended for BasicEventFilterBuilder to extend SparkListener. BasicEventFilterBuilder and BasicEventFilter are working for different purposes.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Nov 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BasicEventFilterBuilder should extend SparkListener

Why? Not yet see your point. Is it not working what I've suggested?

If we consider responsibilities:

  • SparkListener is input
  • EventFilterBuilder builds EventFilter
  • EventFilter filters input to output

Adding EventFilterBuilder input functionality is side effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asking it differently why EventFilter can't build up itself from SparkListener events and why can't be this list of EventFilter passed to FilteredEventLogFileRewriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why EventFilter can't build up itself from SparkListener events

It's possible but then the filter does too many things and UTs will be harder to implement. Please refer how I craft UTs for filters - it passes the information which is expected to be passed from filter builder. If we unify two classes, not intuitive to inject the test information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the origin question, SparkListener interface defines "what to do" when the event comes in. EventFilter interface defines the "predicate" whether the event should be filtered or not when the event comes in.

...
private[spark] class BasicEventFilterBuilder extends EventFilterBuilder
...
private[spark] trait EventFilter extends SparkListenerInterface
...
private[spark] class BasicEventFilter extends SparkListener with EventFilter with Logging
...

So private[spark] trait EventFilter extends SparkListenerInterface is not same as current one and doesn't match with my intention. Hope this clarify my intention.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible but then the filter does too many things

I can relate to this.

Now I see your point and I have the feeling both are right.
Builder is needed to create type specific constructs.
EventFilter is needed as predicate.
Maybe we can create EventSource (or whatever name which makes more sense).

private[spark] class BasicEventFilterBuilder extends EventFilterBuilder
private[spark] trait EventSource extends SparkListener
private[spark] trait EventFilter {
  val eventSource: EventSource
}
private[spark] class BasicEventSource extends EventSource
private[spark] class BasicEventFilter extends EventFilter with Logging


private def applyFilter(filter: EventFilter, event: SparkListenerEvent): Option[Boolean] = {
event match {
case event: SparkListenerStageSubmitted => filter.filterStageSubmitted(event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only slightly touched the filtering part (and no formed view on this) but I'm always afraid a bit when I see switch case with event types. It's extremely easy to add +-1 to the list and it's always questionable when new event type comes where other programmer has no idea that this part must be modified to reach proper implementation. Will comes back to this when fully processed...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you're right. This class actually extended SparkListener to guarantee same coverage but I realized I shouldn't rewrite the json but keep the origin json so this is the result.

So the listener should be aware of origin line along with event instance to write origin line after passing filter. Maybe we can have one, not sure it's somewhat beauty, but may be able to address inconsistency between this pattern matching and SparkListener. Let me try to address this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw even the case we can miss adding filter for newer event types, but we can make sure these lines are not dropped, then that should be OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider my last comment here; once this is providing safety I'd just keep it as it is. Injecting SparkListener into this logic makes thing being complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can make sure these lines are not dropped, then that should be OK.

+1 on this. I've just reached the hearth of the filtering and double checked if None returned (which is the default) then it will be kept in the file. Until now not found a test which enforces this (just point it out if I've missed it). In case it doesn't exist maybe we can create YouShouldNeverUseThisEvent extends SparkListenerEvent in test and the filter should keep it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the UT of FilteredEventLogFileRewriterSuite out, it covers None & None.


import org.apache.spark.scheduler._

trait EventFilterBuilder extends SparkListenerInterface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to expose it to the users? Shouldn't this be private[spark]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I also changed the scope for them in my non-pushed commits, but thanks for suggestion!

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Nov 11, 2019

So, IIUC, this isn't appropriate for continuous structured streaming ?

Yes, but I don't think it's a blocker. The state of continuous processing is an experimental, and from some moment before, it hasn't been improved. That doesn't sound as major downside.

@HeartSaVioR
Copy link
Contributor Author

Do I see it correctly that the compact file can grow infinitely? How probable is that it grows heavily?

If the query is streaming then not pretty much. Once you get rid of job related events as well as SQL execution related events, other events are minors compared to them.

@SparkQA
Copy link

SparkQA commented Nov 13, 2019

Test build #113696 has finished for PR 26416 at commit 316bc17.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

If the query is streaming then not pretty much. Once you get rid of job related events as well as SQL execution related events, other events are minors compared to them.

Thanks for the info, doing some research here to get better understanding...

@HeartSaVioR
Copy link
Contributor Author

OK I just merged master branch into backup branch and reset the head. In my personal experience, merging the master branch makes things harder when squashing so I've been rebasing, but I agree it should be simpler to do and easier to review since it retains previous information. I'll try merging first and fail back to rebase.

@SparkQA
Copy link

SparkQA commented Dec 17, 2019

Test build #115414 has finished for PR 26416 at commit 23ce9d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FilterStatistic(
  • class EventLogFileCompactor(
  • class FilteredEventLogFileRewriter(
  • class CompactedEventLogFileWriter(

@SparkQA
Copy link

SparkQA commented Dec 17, 2019

Test build #115417 has finished for PR 26416 at commit 872ffbc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 23, 2019

Test build #115661 has finished for PR 26416 at commit 761833b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class EventFilterBuildersLoader(fs: FileSystem)
  • class LowerIndexLoadRequested(_msg: String) extends Exception(_msg)

@HeartSaVioR
Copy link
Contributor Author

Test failure is below:

org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.(It is not a test it is a sbt.testing.SuiteSelector)
 Error Details
org.apache.hive.service.ServiceException: Failed to Start HiveServer2
 Stack Trace
sbt.ForkMain$ForkError: org.apache.hive.service.ServiceException: Failed to Start HiveServer2
	at org.apache.hive.service.CompositeService.start(CompositeService.java:80)
	at org.apache.hive.service.server.HiveServer2.start(HiveServer2.java:107)
	at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.start(HiveThriftServer2.scala:158)
	at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.startWithContext(HiveThriftServer2.scala:61)
	at org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.startThriftServer(ThriftServerWithSparkContextSuite.scala:86)
	at org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.$anonfun$beforeAll$4(ThriftServerWithSparkContextSuite.scala:42)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.$anonfun$beforeAll$3(ThriftServerWithSparkContextSuite.scala:42)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.$anonfun$beforeAll$2(ThriftServerWithSparkContextSuite.scala:40)
	at org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.$anonfun$beforeAll$2$adapted(ThriftServerWithSparkContextSuite.scala:39)
	at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
	at org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.beforeAll(ThriftServerWithSparkContextSuite.scala:39)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:317)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:510)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: sbt.ForkMain$ForkError: org.apache.hive.service.ServiceException: Unable to connect to MetaStore!
	at org.apache.hive.service.cli.CLIService.start(CLIService.java:155)
	at org.apache.hive.service.CompositeService.start(CompositeService.java:70)
	... 30 more
Caused by: sbt.ForkMain$ForkError: java.lang.RuntimeException: java.lang.RuntimeException: class org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl not org.apache.hadoop.hive.metastore.MetaStoreFilterHook
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2229)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.loadFilterHooks(HiveMetaStoreClient.java:252)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:147)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:129)
	at org.apache.hive.service.cli.CLIService.start(CLIService.java:152)
	... 31 more
Caused by: sbt.ForkMain$ForkError: java.lang.RuntimeException: class org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl not org.apache.hadoop.hive.metastore.MetaStoreFilterHook
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2223)
	... 35 more

Not relevant.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115664 has finished for PR 26416 at commit 761833b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class EventFilterBuildersLoader(fs: FileSystem)
  • class LowerIndexLoadRequested(_msg: String) extends Exception(_msg)

@HeartSaVioR
Copy link
Contributor Author

 org.apache.spark.deploy.master.MasterSuite.SPARK-27510: Master should avoid dead loop while launching executor failed in Worker
 Error Details
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 656 times over 10.002408616 seconds. Last failure message: Map() did not contain key "app-20191223154506-0000".
 Stack Trace
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 656 times over 10.002408616 seconds. Last failure message: Map() did not contain key "app-20191223154506-0000".
	at org.scalatest.concurrent.Eventually.tryTryAgain$1(Eventually.scala:432)
	at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:439)
	at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:391)
	at org.apache.spark.deploy.master.MasterSuite.eventually(MasterSuite.scala:111)
	at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:337)
	at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:336)
	at org.apache.spark.deploy.master.MasterSuite.eventually(MasterSuite.scala:111)
	at org.apache.spark.deploy.master.MasterSuite.$anonfun$new$40(MasterSuite.scala:681)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
	at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
	at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
	at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
	at org.apache.spark.deploy.master.MasterSuite.org$scalatest$BeforeAndAfter$$super$runTest(MasterSuite.scala:111)
	at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:203)
	at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:192)
	at org.apache.spark.deploy.master.MasterSuite.runTest(MasterSuite.scala:111)
	at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
	at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite.run(Suite.scala:1124)
	at org.scalatest.Suite.run$(Suite.scala:1106)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
	at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.deploy.master.MasterSuite.org$scalatest$BeforeAndAfter$$super$run(MasterSuite.scala:111)
	at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:258)
	at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:256)
	at org.apache.spark.deploy.master.MasterSuite.run(MasterSuite.scala:111)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:317)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:510)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: Map() did not contain key "app-20191223154506-0000"
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:503)
	at org.apache.spark.deploy.master.MasterSuite.$anonfun$new$41(MasterSuite.scala:685)
	at org.scalatest.concurrent.Eventually.makeAValiantAttempt$1(Eventually.scala:395)
	at org.scalatest.concurrent.Eventually.tryTryAgain$1(Eventually.scala:409)
	... 58 more

Looks like rarely happen as I see it in history, but I think it doesn't relevant to this patch. If this test fails again in this patch I'll take a look.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115683 has finished for PR 26416 at commit 761833b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class EventFilterBuildersLoader(fs: FileSystem)
  • class LowerIndexLoadRequested(_msg: String) extends Exception(_msg)

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115697 has finished for PR 26416 at commit 761833b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class EventFilterBuildersLoader(fs: FileSystem)
  • class LowerIndexLoadRequested(_msg: String) extends Exception(_msg)

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115713 has finished for PR 26416 at commit 761833b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class EventFilterBuildersLoader(fs: FileSystem)
  • class LowerIndexLoadRequested(_msg: String) extends Exception(_msg)

@HeartSaVioR
Copy link
Contributor Author

FYI #27004 (SPARK-30348) is submitted to fix the test build #115664.

@SparkQA
Copy link

SparkQA commented Dec 26, 2019

Test build #115781 has finished for PR 26416 at commit e1a6e42.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Test failure for 115781 was org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite.(It is not a test it is a sbt.testing.SuiteSelector), not relevant

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Dec 26, 2019

Test build #115794 has finished for PR 26416 at commit e1a6e42.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Dec 26, 2019

Test build #115800 has finished for PR 26416 at commit e1a6e42.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Kindly reminder.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really tried to review again, but it's hard to review a 2500 line PR when there are very significant changes between each review.

I'd suggest trying to come up with smaller incremental PRs. e.g. the compaction / filtering implementation, without getting it hooked up to the history provider, or even to SQL for that matter; add those in different PRs.

It's more work for you, but it will allow those who are just trying to review the code to be able to help more effectively.


override def createFilter(): EventFilter = new BasicEventFilter(this)

def statistic(): FilterStatistic = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statistics (also in the type name)

@@ -159,6 +159,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)
}

// Visible for testing.
private[history] val logToCompactor = new mutable.HashMap[String, EventLogFileCompactor]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started trying to track down how this works and it's all over the place. The fact that the code around this is all new from the previous patch also makes it really hard to do incremental reviews. This is a huge PR and when significant parts of it change every time an update is made, it makes reviews really slow.

Have you tried to break this down into more incremental PRs?

For example, I hardly ever get into the FsHistoryProvider part before I start questioning why previous code behaves the way it does. Kinda feels like all the filtering and compaction code could be added before it's hooked up to the provider.

This particular code that does caching of the compactor is scattered all over the place. Here you cache the instances, but there's a bunch of code and exception handling in the compactor code itself to deal with the fact that the internal compactor state may not be reusable.

That to me screams like a bad abstraction, and that you should be thinking differently about how this caching is done, and the API between the provider and the compactor.

Another thing I noticed is that you only seem to remove things from this cache when logs are cleaned up, but not e.g. when an application finishes (and the code can make the final compaction).

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jan 3, 2020

There's a dilemma make contributors not easy to try breaking down first; there's a case when the PR cannot be broken down into multiple PRs without allowing that some parts wouldn't change any behavior and requires other parts to make actual changes. (For me, SPARK-29111 is a good example - the patch had been valid before we changed the strategy (and still be valid if we want to have incremental replay), but failed to get reviewers for committing as SPARK-29111 itself doesn't do anything.)

Reviewers will require full picture of the feature to make sure we don't end up merging only part of feature which will be no-op. Sometimes it's just the full picture (plan), but some other cases reviewers would require PRs implement the feature entirely; some reviewer may wonder how it would integrate with current Spark while reviewing the earlier part, which would be done in later part.

To make this be realistic, both contributor and reviewers should have committed to focus driving the feature forward; otherwise someone would need to put unnecessary efforts. That's SPIP/shepherd is for, but that doesn't seem to always guarantee commitment.

That's just my 2 cents on contributor's perspective. This PR is good to break down, as we've been spending huge time to review so far and it convinces me as we don't drop out the feature entirely in further review phase. I'll try to break down the PR and ping reviewers. Thanks for the suggestion.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jan 3, 2020

I've broken this patch down into three parts. #27085 / #27086 / #27087
The latest changes (caching state) were excluded - I'll file a new JIRA issue and submit a patch once we merge all three parts and resolve SPARK-29779.
Closing this. Please continue reviewing #27085. Thanks!

@HeartSaVioR HeartSaVioR closed this Jan 3, 2020
@vanzin
Copy link
Contributor

vanzin commented Jan 3, 2020

Yes, reviewers still need the big picture, but smaller reviews are easier to do and keep the code focused. They also help you with designing proper interfaces.

There are different ways to give context to others. e.g. #27053 which keeps the full body work in a separate PR for reference. In the past I've kept separate, incremental PRs on my own clone and linked from my Spark PRs, but that's a lot more work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants