Skip to content

Commit

Permalink
Filter profiling tool based on start time. (#5918)
Browse files Browse the repository at this point in the history
* Add components for filtering profiling tool based on app start time

* Add unit tests for start time based filtering

Signed-off-by: Partho Sarthi <psarthi@nvidia.com>

* Update docs and fix stylechecks

Signed-off-by: Partho Sarthi <psarthi@nvidia.com>

* Update readme for profiling tool

Signed-off-by: Partho Sarthi <psarthi@nvidia.com>

Co-authored-by: Partho Sarthi <psarthi@nvidia.com>
  • Loading branch information
parthosa and parthosa authored Jun 29, 2022
1 parent 5ce8a6e commit d986f64
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 28 deletions.
18 changes: 13 additions & 5 deletions docs/spark-profiling-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,13 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
applications). Default is false.
--csv Output each table to a CSV file as well
creating the summary text file.
-f, --filter-criteria <arg> Filter newest or oldest N eventlogs for
processing.eg: 100-newest-filesystem (for
processing newest 100 event logs). eg:
100-oldest-filesystem (for processing oldest
100 event logs)
-f, --filter-criteria <arg> Filter newest or oldest N eventlogs based on
application start timestamp for processing.
Filesystem based filtering happens before
application based filtering (see start-app-time).
eg: 100-newest-filesystem (for processing newest
100 event logs). eg: 100-oldest-filesystem (for
processing oldest 100 event logs).
-g, --generate-dot Generate query visualizations in DOT format.
Default is false
--generate-timeline Write an SVG graph out for the full
Expand All @@ -553,6 +555,12 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
-p, --print-plans Print the SQL plans to a file named
'planDescriptions.log'.
Default is false.
-s, --start-app-time <arg> Filter event logs whose application start
occurred within the past specified time
period. Valid time periods are
min(minute),h(hours),d(days),w(weeks),m(months).
If a period is not specified it defaults to
days.
-t, --timeout <arg> Maximum time in seconds to wait for the event
logs to be processed. Default is 24 hours
(86400 seconds) and must be greater than 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package com.nvidia.spark.rapids.tool.profiling

import org.rogach.scallop.{ScallopConf, ScallopOption}

import org.apache.spark.sql.rapids.tool.AppFilterImpl

class ProfileArgs(arguments: Seq[String]) extends ScallopConf(arguments) {

banner("""
Expand Down Expand Up @@ -79,6 +81,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
"Default is 24 hours (86400 seconds) and must be greater than 3 seconds. If it " +
"times out, it will report what it was able to process up until the timeout.",
default = Some(86400))
val startAppTime: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose application start occurred within the past specified " +
"time period. Valid time periods are min(minute),h(hours),d(days),w(weeks)," +
"m(months). If a period is not specified it defaults to days.")

validate(filterCriteria) {
case crit if (crit.endsWith("-newest-filesystem") ||
Expand All @@ -92,5 +99,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
case _ => Left("Error, timeout must be greater than 3 seconds.")
}

validate(startAppTime) {
case time if (AppFilterImpl.parseAppTimePeriod(time) > 0L) => Right(Unit)
case _ => Left("Time period specified, must be greater than 0 and valid periods " +
"are min(minute),h(hours),d(days),w(weeks),m(months).")
}

verify()
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.nvidia.spark.rapids.tool.EventLogPathProcessor
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.AppFilterImpl

/**
* A profiling tool to parse Spark Event Log
Expand All @@ -29,7 +30,7 @@ object ProfileMain extends Logging {
* Entry point from spark-submit running this as the driver.
*/
def main(args: Array[String]) {
val exitCode = mainInternal(new ProfileArgs(args))
val (exitCode, _) = mainInternal(new ProfileArgs(args))
if (exitCode != 0) {
System.exit(exitCode)
}
Expand All @@ -38,23 +39,40 @@ object ProfileMain extends Logging {
/**
* Entry point for tests
*/
def mainInternal(appArgs: ProfileArgs): Int = {
def mainInternal(appArgs: ProfileArgs): (Int, Int) = {

// Parsing args
val eventlogPaths = appArgs.eventlog()
val filterN = appArgs.filterCriteria
val matchEventLogs = appArgs.matchEventLogs
val hadoopConf = new Configuration()
val numOutputRows = appArgs.numOutputRows.getOrElse(1000)
val timeout = appArgs.timeout.toOption
val nThreads = appArgs.numThreads.getOrElse(
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)

// Get the event logs required to process
val (eventLogFsFiltered, _) = EventLogPathProcessor.processAllPaths(filterN.toOption,
matchEventLogs.toOption, eventlogPaths, hadoopConf)
if (eventLogFsFiltered.isEmpty) {

val filteredLogs = if (argsContainsAppFilters(appArgs)) {
val appFilter = new AppFilterImpl(numOutputRows, hadoopConf, timeout, nThreads)
appFilter.filterEventLogs(eventLogFsFiltered, appArgs)
} else {
eventLogFsFiltered
}

if (filteredLogs.isEmpty) {
logWarning("No event logs to process after checking paths, exiting!")
return 0
return (0, filteredLogs.size)
}

val profiler = new Profiler(hadoopConf, appArgs)
profiler.profile(eventLogFsFiltered)
0
(0, filteredLogs.size)
}

def argsContainsAppFilters(appArgs: ProfileArgs): Boolean = {
appArgs.startAppTime.isSupplied
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.EventLogInfo
import com.nvidia.spark.rapids.tool.profiling.ProfileArgs
import com.nvidia.spark.rapids.tool.qualification.QualificationArgs
import org.apache.hadoop.conf.Configuration
import org.rogach.scallop.{ScallopConf, ScallopOption}

import org.apache.spark.internal.Logging

Expand All @@ -44,7 +46,7 @@ class AppFilterImpl(
private val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("qualAppFilter" + "-%d").build()
logInfo(s"Threadpool size is $nThreads")
private val qualFilterthreadPool = Executors.newFixedThreadPool(nThreads, threadFactory)
private val appFilterthreadPool = Executors.newFixedThreadPool(nThreads, threadFactory)
.asInstanceOf[ThreadPoolExecutor]

private class FilterThread(path: EventLogInfo) extends Runnable {
Expand All @@ -53,26 +55,49 @@ class AppFilterImpl(

def filterEventLogs(
allPaths: Seq[EventLogInfo],
appArgs: QualificationArgs): Seq[EventLogInfo] = {
appArgs: ScallopConf): Seq[EventLogInfo] = {
allPaths.foreach { path =>
try {
qualFilterthreadPool.submit(new FilterThread(path))
appFilterthreadPool.submit(new FilterThread(path))
} catch {
case e: Exception =>
logError(s"Unexpected exception submitting log ${path.eventLog.toString}, skipping!", e)
}
}
// wait for the threads to finish processing the files
qualFilterthreadPool.shutdown()
if (!qualFilterthreadPool.awaitTermination(waitTimeInSec, TimeUnit.SECONDS)) {
appFilterthreadPool.shutdown()
if (!appFilterthreadPool.awaitTermination(waitTimeInSec, TimeUnit.SECONDS)) {
logError(s"Processing log files took longer then $waitTimeInSec seconds," +
" stopping processing any more event logs")
qualFilterthreadPool.shutdownNow()
" stopping processing any more event logs")
appFilterthreadPool.shutdownNow()
}

// This will be required to do the actual filtering
val apps = appsForFiltering.asScala
val apps: Seq[AppFilterReturnParameters] = appsForFiltering.asScala.toSeq

appArgs match {
case profileArgs:ProfileArgs =>
filterEventLogsInternal(apps, profileArgs)
case qualificationArgs: QualificationArgs =>
filterEventLogsInternal(apps, qualificationArgs)
}
}

private def filterEventLogsInternal(
apps: Seq[AppFilterReturnParameters],
appArgs: ProfileArgs): Seq[EventLogInfo] = {
val appTimeFiltered = if (appArgs.startAppTime.isSupplied) {
val msTimeToFilter = AppFilterImpl.parseAppTimePeriodArgs(appArgs.startAppTime)
apps.filter(_.appInfo.appStartInfo.exists(_.startTime >= msTimeToFilter))
} else {
apps
}
appTimeFiltered.map(_.eventlog)
}

private def filterEventLogsInternal(
apps: Seq[AppFilterReturnParameters],
appArgs: QualificationArgs): Seq[EventLogInfo] = {
val filterAppName = appArgs.applicationName.getOrElse("")
val filterCriteria = appArgs.filterCriteria.getOrElse("")
val userName = appArgs.userName.getOrElse("")
Expand Down Expand Up @@ -138,7 +163,7 @@ class AppFilterImpl(
}

val appTimeFiltered = if (appArgs.startAppTime.isSupplied) {
val msTimeToFilter = AppFilterImpl.parseAppTimePeriodArgs(appArgs)
val msTimeToFilter = AppFilterImpl.parseAppTimePeriodArgs(appArgs.startAppTime)
val logicFiltered = if (appArgs.any()) {
apps
} else {
Expand Down Expand Up @@ -257,9 +282,9 @@ class AppFilterImpl(

object AppFilterImpl {

def parseAppTimePeriodArgs(appArgs: QualificationArgs): Long = {
if (appArgs.startAppTime.isSupplied) {
val appStartStr = appArgs.startAppTime.getOrElse("")
def parseAppTimePeriodArgs(startAppTime: ScallopOption[String]): Long = {
if (startAppTime.isSupplied) {
val appStartStr = startAppTime.getOrElse("")
parseAppTimePeriod(appStartStr)
} else {
0L
Expand Down
Loading

0 comments on commit d986f64

Please sign in to comment.