Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
soumitrak committed Nov 11, 2014
2 parents 31399a4 + 6e03de3 commit d0ce2cd
Show file tree
Hide file tree
Showing 29 changed files with 334 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()

private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
private val blockHandler = new ExternalShuffleBlockHandler()
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = {
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
new TransportContext(transportConf, handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.storage._
Expand Down Expand Up @@ -68,6 +69,8 @@ private[spark]
class FileShuffleBlockManager(conf: SparkConf)
extends ShuffleBlockManager with Logging {

private val transportConf = SparkTransportConf.fromSparkConf(conf)

private lazy val blockManager = SparkEnv.get.blockManager

// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
Expand Down Expand Up @@ -182,13 +185,14 @@ class FileShuffleBlockManager(conf: SparkConf)
val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
if (segmentOpt.isDefined) {
val segment = segmentOpt.get
return new FileSegmentManagedBuffer(segment.file, segment.offset, segment.length)
return new FileSegmentManagedBuffer(
transportConf, segment.file, segment.offset, segment.length)
}
}
throw new IllegalStateException("Failed to find shuffle block: " + blockId)
} else {
val file = blockManager.diskBlockManager.getFile(blockId)
new FileSegmentManagedBuffer(file, 0, file.length)
new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import java.nio.ByteBuffer

import com.google.common.io.ByteStreams

import org.apache.spark.SparkEnv
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._

/**
Expand All @@ -38,10 +39,12 @@ import org.apache.spark.storage._
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
private[spark]
class IndexShuffleBlockManager extends ShuffleBlockManager {
class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {

private lazy val blockManager = SparkEnv.get.blockManager

private val transportConf = SparkTransportConf.fromSparkConf(conf)

/**
* Mapping to a single shuffleBlockId with reduce ID 0.
* */
Expand Down Expand Up @@ -109,6 +112,7 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
val offset = in.readLong()
val nextOffset = in.readLong()
new FileSegmentManagedBuffer(
transportConf,
getDataFile(blockId.shuffleId, blockId.mapId),
offset,
nextOffset - offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {

private val indexShuffleBlockManager = new IndexShuffleBlockManager()
private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {

override def beforeAll() {
val transportConf = SparkTransportConf.fromSparkConf(conf)
rpcHandler = new ExternalShuffleBlockHandler()
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()

Expand Down
2 changes: 1 addition & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ title: Running Spark on YARN
---

Support for running on [YARN (Hadoop
NextGen)](http://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-yarn/hadoop-yarn-site/YARN.html)
NextGen)](http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html)
was added to Spark in version 0.6.0, and improved in subsequent releases.

# Preparations
Expand Down
29 changes: 29 additions & 0 deletions external/flume-sink/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN

Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class SparkSinkSuite extends FunSuite {
channelContext.put("transactionCapacity", 1000.toString)
channelContext.put("keep-alive", 0.toString)
channelContext.putAll(overrides)
channel.setName(scala.util.Random.nextString(10))
channel.configure(channelContext)

val sink = new SparkSink()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import java.util.concurrent.Executors
import kafka.consumer._
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -97,12 +95,6 @@ class KafkaReceiver[
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect)

// When auto.offset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
if (kafkaParams.contains("auto.offset.reset")) {
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
Expand Down Expand Up @@ -139,26 +131,4 @@ class KafkaReceiver[
}
}
}

// It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
// from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
// 'smallest'/'largest':
// scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
// scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
val dir = "/consumers/" + groupId
logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
try {
zk.deleteRecursive(dir)
} catch {
case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
} finally {
zk.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@

package org.apache.spark.streaming.kafka

import scala.reflect.ClassTag
import scala.collection.JavaConversions._

import java.lang.{Integer => JInt}
import java.util.{Map => JMap}

import scala.reflect.ClassTag
import scala.collection.JavaConversions._

import kafka.serializer.{Decoder, StringDecoder}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}

import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object KafkaUtils {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,19 @@

import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.network.util.TransportConf;

/**
* A {@link ManagedBuffer} backed by a segment in a file.
*/
public final class FileSegmentManagedBuffer extends ManagedBuffer {

/**
* Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
* Avoid unless there's a good reason not to.
*/
// TODO: Make this configurable
private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;

private final TransportConf conf;
private final File file;
private final long offset;
private final long length;

public FileSegmentManagedBuffer(File file, long offset, long length) {
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
this.conf = conf;
this.file = file;
this.offset = offset;
this.length = length;
Expand All @@ -65,7 +60,7 @@ public ByteBuffer nioByteBuffer() throws IOException {
try {
channel = new RandomAccessFile(file, "r").getChannel();
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < MIN_MEMORY_MAP_BYTES) {
if (length < conf.memoryMapBytes()) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
Expand Down Expand Up @@ -134,8 +129,12 @@ public ManagedBuffer release() {

@Override
public Object convertToNetty() throws IOException {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
if (conf.lazyFileDescriptor()) {
return new LazyFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
}
}

public File getFile() { return file; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.buffer;

import java.io.FileInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;

import com.google.common.base.Objects;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;

import org.apache.spark.network.util.JavaUtils;

/**
* A FileRegion implementation that only creates the file descriptor when the region is being
* transferred. This cannot be used with Epoll because there is no native support for it.
*
* This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we
* should push this into Netty so the native Epoll transport can support this feature.
*/
public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {

private final File file;
private final long position;
private final long count;

private FileChannel channel;

private long numBytesTransferred = 0L;

/**
* @param file file to transfer.
* @param position start position for the transfer.
* @param count number of bytes to transfer starting from position.
*/
public LazyFileRegion(File file, long position, long count) {
this.file = file;
this.position = position;
this.count = count;
}

@Override
protected void deallocate() {
JavaUtils.closeQuietly(channel);
}

@Override
public long position() {
return position;
}

@Override
public long transfered() {
return numBytesTransferred;
}

@Override
public long count() {
return count;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
if (channel == null) {
channel = new FileInputStream(file).getChannel();
}

long count = this.count - position;
if (count < 0 || position < 0) {
throw new IllegalArgumentException(
"position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
}

if (count == 0) {
return 0L;
}

long written = channel.transferTo(this.position + position, count, target);
if (written > 0) {
numBytesTransferred += written;
}
return written;
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("file", file)
.add("position", position)
.add("count", count)
.toString();
}
}
Loading

0 comments on commit d0ce2cd

Please sign in to comment.