Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter profiling tool based on start time. #5918

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.nvidia.spark.rapids.tool.profiling

import org.apache.spark.sql.rapids.tool.AppFilterImpl

import org.rogach.scallop.{ScallopConf, ScallopOption}

class ProfileArgs(arguments: Seq[String]) extends ScallopConf(arguments) {
Expand Down Expand Up @@ -79,6 +81,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
"Default is 24 hours (86400 seconds) and must be greater than 3 seconds. If it " +
"times out, it will report what it was able to process up until the timeout.",
default = Some(86400))
val startAppTime: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose application start occurred within the past specified " +
"time period. Valid time periods are min(minute),h(hours),d(days),w(weeks)," +
"m(months). If a period is not specified it defaults to days.")

validate(filterCriteria) {
case crit if (crit.endsWith("-newest-filesystem") ||
Expand All @@ -92,5 +99,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
case _ => Left("Error, timeout must be greater than 3 seconds.")
}

validate(startAppTime) {
case time if (AppFilterImpl.parseAppTimePeriod(time) > 0L) => Right(Unit)
case _ => Left("Time period specified, must be greater than 0 and valid periods " +
"are min(minute),h(hours),d(days),w(weeks),m(months).")
}

verify()
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.nvidia.spark.rapids.tool.EventLogPathProcessor
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.AppFilterImpl

/**
* A profiling tool to parse Spark Event Log
Expand All @@ -29,7 +30,7 @@ object ProfileMain extends Logging {
* Entry point from spark-submit running this as the driver.
*/
def main(args: Array[String]) {
val exitCode = mainInternal(new ProfileArgs(args))
val (exitCode, _) = mainInternal(new ProfileArgs(args))
if (exitCode != 0) {
System.exit(exitCode)
}
Expand All @@ -38,23 +39,40 @@ object ProfileMain extends Logging {
/**
* Entry point for tests
*/
def mainInternal(appArgs: ProfileArgs): Int = {
def mainInternal(appArgs: ProfileArgs): (Int, Int) = {

// Parsing args
val eventlogPaths = appArgs.eventlog()
val filterN = appArgs.filterCriteria
val matchEventLogs = appArgs.matchEventLogs
val hadoopConf = new Configuration()
val numOutputRows = appArgs.numOutputRows.getOrElse(1000)
val timeout = appArgs.timeout.toOption
val nThreads = appArgs.numThreads.getOrElse(
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)

// Get the event logs required to process
val (eventLogFsFiltered, _) = EventLogPathProcessor.processAllPaths(filterN.toOption,
matchEventLogs.toOption, eventlogPaths, hadoopConf)
if (eventLogFsFiltered.isEmpty) {

val filteredLogs = if (argsContainsAppFilters(appArgs)) {
val appFilter = new AppFilterImpl(numOutputRows, hadoopConf, timeout, nThreads)
appFilter.filterEventLogs(eventLogFsFiltered, appArgs)
} else {
eventLogFsFiltered
}

if (filteredLogs.isEmpty) {
logWarning("No event logs to process after checking paths, exiting!")
return 0
return (0, filteredLogs.size)
}

val profiler = new Profiler(hadoopConf, appArgs)
profiler.profile(eventLogFsFiltered)
0
(0, filteredLogs.size)
}

def argsContainsAppFilters(appArgs: ProfileArgs): Boolean = {
appArgs.startAppTime.isSupplied
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.EventLogInfo
import com.nvidia.spark.rapids.tool.profiling.ProfileArgs
import com.nvidia.spark.rapids.tool.qualification.QualificationArgs
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging

import org.rogach.scallop.{ScallopConf, ScallopOption}

class AppFilterImpl(
numRows: Int,
hadoopConf: Configuration,
Expand All @@ -44,7 +47,7 @@ class AppFilterImpl(
private val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("qualAppFilter" + "-%d").build()
logInfo(s"Threadpool size is $nThreads")
private val qualFilterthreadPool = Executors.newFixedThreadPool(nThreads, threadFactory)
private val appFilterthreadPool = Executors.newFixedThreadPool(nThreads, threadFactory)
.asInstanceOf[ThreadPoolExecutor]

private class FilterThread(path: EventLogInfo) extends Runnable {
Expand All @@ -53,26 +56,50 @@ class AppFilterImpl(

def filterEventLogs(
allPaths: Seq[EventLogInfo],
appArgs: QualificationArgs): Seq[EventLogInfo] = {
appArgs: ScallopConf): Seq[EventLogInfo] = {
allPaths.foreach { path =>
try {
qualFilterthreadPool.submit(new FilterThread(path))
appFilterthreadPool.submit(new FilterThread(path))
} catch {
case e: Exception =>
logError(s"Unexpected exception submitting log ${path.eventLog.toString}, skipping!", e)
}
}
// wait for the threads to finish processing the files
qualFilterthreadPool.shutdown()
if (!qualFilterthreadPool.awaitTermination(waitTimeInSec, TimeUnit.SECONDS)) {
appFilterthreadPool.shutdown()
if (!appFilterthreadPool.awaitTermination(waitTimeInSec, TimeUnit.SECONDS)) {
logError(s"Processing log files took longer then $waitTimeInSec seconds," +
" stopping processing any more event logs")
qualFilterthreadPool.shutdownNow()
" stopping processing any more event logs")
appFilterthreadPool.shutdownNow()
}

// This will be required to do the actual filtering
val apps = appsForFiltering.asScala
val apps: Seq[AppFilterReturnParameters] = appsForFiltering.asScala.toSeq

appArgs match {
case profileArgs:ProfileArgs =>
filterEventLogsInternal(apps, profileArgs)
case qualificationArgs: QualificationArgs =>
filterEventLogsInternal(apps, qualificationArgs)
}
}

private def filterEventLogsInternal(
apps: Seq[AppFilterReturnParameters],
appArgs: ProfileArgs): Seq[EventLogInfo] = {
val appTimeFiltered = if (appArgs.startAppTime.isSupplied) {
val msTimeToFilter = AppFilterImpl.parseAppTimePeriodArgs(appArgs.startAppTime)
apps.filter(_.appInfo.appStartInfo.exists(_.startTime >= msTimeToFilter))
} else {
apps
}

appTimeFiltered.map(_.eventlog)
}

private def filterEventLogsInternal(
apps: Seq[AppFilterReturnParameters],
appArgs: QualificationArgs): Seq[EventLogInfo] = {
val filterAppName = appArgs.applicationName.getOrElse("")
val filterCriteria = appArgs.filterCriteria.getOrElse("")
val userName = appArgs.userName.getOrElse("")
Expand Down Expand Up @@ -138,7 +165,7 @@ class AppFilterImpl(
}

val appTimeFiltered = if (appArgs.startAppTime.isSupplied) {
val msTimeToFilter = AppFilterImpl.parseAppTimePeriodArgs(appArgs)
val msTimeToFilter = AppFilterImpl.parseAppTimePeriodArgs(appArgs.startAppTime)
val logicFiltered = if (appArgs.any()) {
apps
} else {
Expand Down Expand Up @@ -257,9 +284,9 @@ class AppFilterImpl(

object AppFilterImpl {

def parseAppTimePeriodArgs(appArgs: QualificationArgs): Long = {
if (appArgs.startAppTime.isSupplied) {
val appStartStr = appArgs.startAppTime.getOrElse("")
def parseAppTimePeriodArgs(startAppTime: ScallopOption[String]): Long = {
if (startAppTime.isSupplied) {
val appStartStr = startAppTime.getOrElse("")
parseAppTimePeriod(appStartStr)
} else {
0L
Expand Down
Loading