Skip to content

Commit

Permalink
Fixed remaining usages to be consistent. Updated Java-side time conve…
Browse files Browse the repository at this point in the history
…rsion
  • Loading branch information
Ilya Ganelin committed Mar 30, 2015
1 parent 68f4e93 commit 70ac213
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 29 deletions.
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule

// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
private val networkTimeoutS = sc.conf.get("spark.network.timeout","120s")
private val networkTimeoutS = Utils.timeStringAsS(sc.conf.get("spark.network.timeout","120s"))
private val executorTimeoutMs = Utils.timeStringAsMs(
sc.conf.get("spark.storage.blockManagerSlaveTimeout", networkTimeoutS))
sc.conf.get("spark.storage.blockManagerSlaveTimeout", s"${networkTimeoutS}s"))

// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
private val networkTimeoutIntervalS = sc.conf.get("spark.network.timeoutInterval","60s")
private val networkTimeoutIntervalS =
Utils.timeStringAsS(sc.conf.get("spark.network.timeoutInterval","60s"))
private val checkTimeoutIntervalMs = Utils.timeStringAsMs(
sc.conf.get("spark.storage.blockManagerTimeoutIntervalMs", networkTimeoutIntervalS))
sc.conf.get("spark.storage.blockManagerTimeoutIntervalMs", s"${networkTimeoutIntervalS}s"))

private var timeoutCheckingTask: Cancellable = null
override def preStart(): Unit = {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ private[spark] object Utils extends Logging {
* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a microsecond count for
* internal use. If no suffix is provided a direct conversion is attempted.
*/
@throws(classOf[NumberFormatException])
private def timeStringToUs(str: String) : Long = {
try {
val lower = str.toLowerCase.trim()
Expand All @@ -1045,6 +1046,7 @@ private[spark] object Utils extends Logging {
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in us.
*/
@throws(classOf[NumberFormatException])
def timeStringAsUs(str: String): Long = {
timeStringToUs(str)
}
Expand All @@ -1053,6 +1055,7 @@ private[spark] object Utils extends Logging {
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in ms.
*/
@throws(classOf[NumberFormatException])
def timeStringAsMs(str : String) : Long = {
timeStringToUs(str)/1000
}
Expand All @@ -1061,6 +1064,7 @@ private[spark] object Utils extends Logging {
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in seconds.
*/
@throws(classOf[NumberFormatException])
def timeStringAsS(str : String) : Long = {
timeStringToUs(str)/1000/1000
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,35 +123,49 @@ private static boolean isSymlink(File file) throws IOException {
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use
* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a microsecond count for
* internal use. If no suffix is provided a direct conversion is attempted.
*/
public static long timeStringToUs(String str) throws IllegalArgumentException {
private static long timeStringToUs(String str) throws NumberFormatException {
String lower = str.toLowerCase().trim();
if (lower.endsWith("ms")) {
return Long.parseLong(lower.substring(0, lower.length()-2)) * 1000;
} else if (lower.endsWith("us")) {
return Long.parseLong(lower.substring(0, lower.length()-2));
} else if (lower.endsWith("s")) {
return Long.parseLong(lower.substring(0, lower.length()-1)) * 1000 * 1000;
} else {// Invalid suffix, force correct formatting
throw new IllegalArgumentException("Time must be specified as seconds (s), " +
try {
if (lower.endsWith("ms")) {
return Long.parseLong(lower.substring(0, lower.length() - 2)) * 1000;
} else if (lower.endsWith("us")) {
return Long.parseLong(lower.substring(0, lower.length() - 2));
} else if (lower.endsWith("s")) {
return Long.parseLong(lower.substring(0, lower.length() - 1)) * 1000 * 1000;
} else {// Invalid suffix, force correct formatting
return Long.parseLong(lower);
}
} catch(NumberFormatException e) {
throw new NumberFormatException("Time must be specified as seconds (s), " +
"milliseconds (ms), or microseconds (us) e.g. 50s, 100ms, or 250us.");
}

}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in us.
*/
public static long timeStringAsUs(String str) throws NumberFormatException {
return timeStringToUs(str);
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use.
* Note: may round in some cases
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in ms.
*/
public static long timeStringToMs(String str) throws IllegalArgumentException {
public static long timeStringAsMs(String str) throws NumberFormatException {
return timeStringToUs(str)/1000;
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use.
* Note: may round in some cases
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in seconds.
*/
public static long timeStringToS(String str) throws IllegalArgumentException {
public static long timeStringAsS(String str) throws NumberFormatException {
return timeStringToUs(str)/1000/1000;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ public boolean preferDirectBufs() {

/** Connect timeout in milliseconds. Default 120 secs. */
public int connectionTimeoutMs() {
long defaultTimeout = JavaUtils.timeStringToMs(
conf.get("spark.shuffle.io.connectionTimeout",
conf.get("spark.network.timeout", "120s")));
return (int) defaultTimeout;
long defaultNetworkTimeoutS = JavaUtils.timeStringAsS(
conf.get("spark.network.timeout","120s"));
long defaultTimeoutS = JavaUtils.timeStringAsS(
conf.get("spark.shuffle.io.connectionTimeout", defaultNetworkTimeoutS + "s"));
return (int) defaultTimeoutS * 1000;
}

/** Number of concurrent connections between two nodes for fetching data. */
Expand Down Expand Up @@ -71,7 +72,7 @@ public int numConnectionsPerPeer() {

/** Timeout for a single round trip of SASL token exchange, in milliseconds. */
public int saslRTTimeoutMs() {
return (int) JavaUtils.timeStringToMs(conf.get("spark.shuffle.sasl.timeout", "30s"));
return (int) JavaUtils.timeStringAsS(conf.get("spark.shuffle.sasl.timeout", "30s")) * 1000;
}

/**
Expand All @@ -85,7 +86,7 @@ public int saslRTTimeoutMs() {
* Only relevant if maxIORetries > 0.
*/
public int ioRetryWaitTimeMs() {
return (int) JavaUtils.timeStringToMs(conf.get("spark.shuffle.io.retryWait", "5s"));
return (int) JavaUtils.timeStringAsS(conf.get("spark.shuffle.io.retryWait", "5s")) * 1000;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Utils, SystemClock}
import org.apache.spark.util.{SystemClock, Utils}

/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
Expand Down Expand Up @@ -80,7 +80,7 @@ private[streaming] class BlockGenerator(

private val clock = new SystemClock()
private val blockIntervalMs =
Utils.timeStringAsMs(conf.get("spark.streaming.blockIntervalMs", "200ms"))
Utils.timeStringAsMs(conf.get("spark.streaming.blockInterval", "200ms"))
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import akka.actor.{ActorRef, Props, Actor}
import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Utils, Clock, ManualClock}
import org.apache.spark.util.{Clock, ManualClock, Utils}

/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
Expand Down

0 comments on commit 70ac213

Please sign in to comment.