From d32ca3697ab18ba8db9905c81d57559bf8472195 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Tue, 14 Jul 2015 13:11:01 +0200 Subject: [PATCH] [SPARK-8977][Streaming] Defines the RateEstimator interface, and implements the ReceiverRateController --- .../streaming/dstream/InputDStream.scala | 17 +++++ .../dstream/ReceiverInputDStream.scala | 11 ++- .../streaming/scheduler/JobScheduler.scala | 2 + .../streaming/scheduler/RateController.scala | 69 +++++++++++++++++++ .../scheduler/rate/RateEstimator.scala | 46 +++++++++++++ 5 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index d58c99a8ff321..3ff360d1dbefa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -22,6 +22,8 @@ import scala.reflect.ClassTag import org.apache.spark.SparkContext import org.apache.spark.rdd.RDDOperationScope import org.apache.spark.streaming.{Time, Duration, StreamingContext} +import org.apache.spark.streaming.scheduler.RateController +import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator import org.apache.spark.util.Utils /** @@ -47,6 +49,21 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) /** This is an unique identifier for the input stream. */ val id = ssc.getNewInputStreamId() + /** + * A rate estimator configured by the user to compute a dynamic ingestion bound for this stream. + * @see `RateEstimator` + */ + protected [streaming] val rateEstimator = ssc.conf + .getOption("spark.streaming.RateEstimator") + .getOrElse("noop") match { + case _ => new NoopRateEstimator() + } + + // Keep track of the freshest rate for this stream using the rateEstimator + protected[streaming] val rateController: RateController = new RateController(id, rateEstimator) { + override def publish(rate: Long): Unit = () + } + /** A human-readable name of this InputDStream */ private[streaming] def name: String = { // e.g. FlumePollingDStream -> "Flume polling stream" diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index a50f0efc030ce..39e3694763826 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -24,7 +24,8 @@ import org.apache.spark.storage.BlockId import org.apache.spark.streaming._ import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.streaming.scheduler.StreamInputInfo +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator import org.apache.spark.streaming.util.WriteAheadLogUtils /** @@ -40,6 +41,14 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { + /** + * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. + */ + override val rateController: RateController = new RateController(id, rateEstimator) { + override def publish(rate: Long): Unit = + ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) + } + /** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 4af9b6d3b56ab..d3f257429c952 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -66,6 +66,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } eventLoop.start() + // Estimators receive updates from batch completion + ssc.graph.getInputStreams.map(_.rateController).foreach(ssc.addStreamingListener(_)) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala new file mode 100644 index 0000000000000..82244498cc05c --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.streaming.scheduler.rate.RateEstimator +import org.apache.spark.util.ThreadUtils + +import scala.concurrent.{ExecutionContext, Future} + +/** + * :: DeveloperApi :: + * A StreamingListener that receives batch completion updates, and maintains + * an estimate of the speed at which this stream should ingest messages, + * given an estimate computation from a `RateEstimator` + */ +@DeveloperApi +private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) + extends StreamingListener with Serializable { + + protected def publish(rate: Long): Unit + + // Used to compute & publish the rate update asynchronously + @transient private val executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update")) + + private val rateLimit : AtomicLong = new AtomicLong(-1L) + + // Asynchronous computation of the rate update + private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = + Future[Unit] { + val newSpeed = rateEstimator.compute(time, elems, workDelay, waitDelay) + newSpeed foreach { s => + rateLimit.set(s.toLong) + publish(getLatestRate()) + } + } (executionContext) + + def getLatestRate(): Long = rateLimit.get() + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted){ + val elements = batchCompleted.batchInfo.streamIdToInputInfo + + for ( + processingEnd <- batchCompleted.batchInfo.processingEndTime; + workDelay <- batchCompleted.batchInfo.processingDelay; + waitDelay <- batchCompleted.batchInfo.schedulingDelay; + elems <- elements.get(streamUID).map(_.numRecords) + ) computeAndPublish(processingEnd, elems, workDelay, waitDelay) + } + +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala new file mode 100644 index 0000000000000..1e1ccf135ad70 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler.rate + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * A component that estimates the rate at wich an InputDStream should ingest + * elements, based on updates at every batch completion. + */ +@DeveloperApi +private[streaming] trait RateEstimator extends Serializable { + + /** + * Computes the number of elements the stream attached to this `RateEstimator` + * should ingest per second, given an update on the size and completion + * times of the latest batch. + */ + def compute(time: Long, elements: Long, + processingDelay: Long, schedulingDelay: Long): Option[Double] +} + +/** + * The trivial rate estimator never sends an update + */ +private[streaming] class NoopRateEstimator extends RateEstimator { + + def compute(time: Long, elements: Long, + processingDelay: Long, schedulingDelay: Long): Option[Double] = None +}