Skip to content

Commit

Permalink
[SPARK-8977][Streaming] Defines the RateEstimator interface, and impl…
Browse files Browse the repository at this point in the history
…ements the ReceiverRateController
  • Loading branch information
huitseeker authored and dragos committed Jul 22, 2015
1 parent 8941cf9 commit d32ca36
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.streaming.{Time, Duration, StreamingContext}
import org.apache.spark.streaming.scheduler.RateController
import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator
import org.apache.spark.util.Utils

/**
Expand All @@ -47,6 +49,21 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()

/**
* A rate estimator configured by the user to compute a dynamic ingestion bound for this stream.
* @see `RateEstimator`
*/
protected [streaming] val rateEstimator = ssc.conf
.getOption("spark.streaming.RateEstimator")
.getOrElse("noop") match {
case _ => new NoopRateEstimator()
}

// Keep track of the freshest rate for this stream using the rateEstimator
protected[streaming] val rateController: RateController = new RateController(id, rateEstimator) {
override def publish(rate: Long): Unit = ()
}

/** A human-readable name of this InputDStream */
private[streaming] def name: String = {
// e.g. FlumePollingDStream -> "Flume polling stream"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator
import org.apache.spark.streaming.util.WriteAheadLogUtils

/**
Expand All @@ -40,6 +41,14 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {

/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
override val rateController: RateController = new RateController(id, rateEstimator) {
override def publish(rate: Long): Unit =
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}

/**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
eventLoop.start()

// Estimators receive updates from batch completion
ssc.graph.getInputStreams.map(_.rateController).foreach(ssc.addStreamingListener(_))
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.ThreadUtils

import scala.concurrent.{ExecutionContext, Future}

/**
* :: DeveloperApi ::
* A StreamingListener that receives batch completion updates, and maintains
* an estimate of the speed at which this stream should ingest messages,
* given an estimate computation from a `RateEstimator`
*/
@DeveloperApi
private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {

protected def publish(rate: Long): Unit

// Used to compute & publish the rate update asynchronously
@transient private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))

private val rateLimit : AtomicLong = new AtomicLong(-1L)

// Asynchronous computation of the rate update
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newSpeed = rateEstimator.compute(time, elems, workDelay, waitDelay)
newSpeed foreach { s =>
rateLimit.set(s.toLong)
publish(getLatestRate())
}
} (executionContext)

def getLatestRate(): Long = rateLimit.get()

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted){
val elements = batchCompleted.batchInfo.streamIdToInputInfo

for (
processingEnd <- batchCompleted.batchInfo.processingEndTime;
workDelay <- batchCompleted.batchInfo.processingDelay;
waitDelay <- batchCompleted.batchInfo.schedulingDelay;
elems <- elements.get(streamUID).map(_.numRecords)
) computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler.rate

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* A component that estimates the rate at wich an InputDStream should ingest
* elements, based on updates at every batch completion.
*/
@DeveloperApi
private[streaming] trait RateEstimator extends Serializable {

/**
* Computes the number of elements the stream attached to this `RateEstimator`
* should ingest per second, given an update on the size and completion
* times of the latest batch.
*/
def compute(time: Long, elements: Long,
processingDelay: Long, schedulingDelay: Long): Option[Double]
}

/**
* The trivial rate estimator never sends an update
*/
private[streaming] class NoopRateEstimator extends RateEstimator {

def compute(time: Long, elements: Long,
processingDelay: Long, schedulingDelay: Long): Option[Double] = None
}

0 comments on commit d32ca36

Please sign in to comment.