diff --git a/core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala index 86d6396dfdc2b..227d73c77075c 100644 --- a/core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala @@ -41,12 +41,18 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan /** * Port number the service is listening on, available only after [[init]] is invoked. */ - override def port: Int = cm.id.port + override def port: Int = { + checkInit() + cm.id.port + } /** * Host name the service is listening on, available only after [[init]] is invoked. */ - override def hostName: String = cm.id.host + override def hostName: String = { + checkInit() + cm.id.host + } /** * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch @@ -76,6 +82,7 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan port: Int, blockIds: Seq[String], listener: BlockFetchingListener): Unit = { + checkInit() val cmId = new ConnectionManagerId(hostName, port) val blockMessageArray = new BlockMessageArray(blockIds.map { blockId => @@ -118,6 +125,7 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan blockId: String, blockData: ManagedBuffer, level: StorageLevel) { + checkInit() val msg = PutBlock(BlockId(blockId), blockData.byteBuffer(), level) val blockMessageArray = new BlockMessageArray(BlockMessage.fromPutBlock(msg)) val remoteCmId = new ConnectionManagerId(hostName, port) @@ -127,6 +135,10 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan Duration.Inf) } + private def checkInit(): Unit = if (cm == null) { + throw new IllegalStateException(getClass.getName + " has not been initialized") + } + private def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = { logDebug("Handling message " + msg) msg match {