Skip to content

Commit

Permalink
SPARK-1057 (alternative) Remove fastutil
Browse files Browse the repository at this point in the history
(This is for discussion at this point -- I'm not suggesting this should be committed.)

This is what removing fastutil looks like. Much of it is straightforward, like using `java.io` buffered stream classes, and Guava for murmurhash3.

Uses of the `FastByteArrayOutputStream` were a little trickier. In only one case though do I think the change to use `java.io` actually entails an extra array copy.

The rest is using `OpenHashMap` and `OpenHashSet`.  These are now written in terms of more scala-like operations.

`OpenHashMap` is where I made three non-trivial changes to make it work, and they need review:

- It is no longer private
- The key must be a `ClassTag`
- Unless a lot of other code changes, the key type can't enforce being a supertype of `Null`

It all works and tests pass, and I think there is reason to believe it's OK from a speed perspective.

But what about those last changes?

Author: Sean Owen <sowen@cloudera.com>

Closes alteryx#266 from srowen/SPARK-1057-alternate and squashes the following commits:

2601129 [Sean Owen] Fix Map return type error not previously caught
ec65502 [Sean Owen] Updates from matei's review
00bc81e [Sean Owen] Remove use of fastutil and replace with use of java.io, spark.util and Guava classes
  • Loading branch information
srowen authored and pwendell committed Apr 12, 2014
1 parent aa8bb11 commit 165e06a
Show file tree
Hide file tree
Showing 19 changed files with 72 additions and 107 deletions.
4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
<dependency>
<groupId>colt</groupId>
<artifactId>colt</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package org.apache.spark.broadcast

import java.io.{File, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
import java.net.{URI, URL, URLConnection}
import java.io.{BufferedInputStream, BufferedOutputStream}
import java.net.{URL, URLConnection, URI}
import java.util.concurrent.TimeUnit

import it.unimi.dsi.fastutil.io.{FastBufferedInputStream, FastBufferedOutputStream}

import org.apache.spark.{HttpServer, Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
Expand Down Expand Up @@ -164,7 +163,7 @@ private[spark] object HttpBroadcast extends Logging {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
Expand Down Expand Up @@ -195,7 +194,7 @@ private[spark] object HttpBroadcast extends Logging {
if (compress) {
compressionCodec.compressedInputStream(inputStream)
} else {
new FastBufferedInputStream(inputStream, bufferSize)
new BufferedInputStream(inputStream, bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,33 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag

import cern.jet.stat.Probability
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}

import org.apache.spark.util.collection.OpenHashMap

/**
* An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
*/
private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] {

var outputsMerged = 0
var sums = new OLMap[T] // Sum of counts for each key
var sums = new OpenHashMap[T,Long]() // Sum of counts for each key

override def merge(outputId: Int, taskResult: OLMap[T]) {
override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) {
outputsMerged += 1
val iter = taskResult.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue)
taskResult.foreach { case (key, value) =>
sums.changeValue(key, value, _ + value)
}
}

override def currentResult(): Map[T, BoundedDouble] = {
if (outputsMerged == totalOutputs) {
val result = new JHashMap[T, BoundedDouble](sums.size)
val iter = sums.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
val sum = entry.getLongValue()
result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
sums.foreach { case (key, sum) =>
result(key) = new BoundedDouble(sum, 1.0, sum, sum)
}
result
} else if (outputsMerged == 0) {
Expand All @@ -60,16 +57,13 @@ private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Dou
val p = outputsMerged.toDouble / totalOutputs
val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
val result = new JHashMap[T, BoundedDouble](sums.size)
val iter = sums.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
val sum = entry.getLongValue
sums.foreach { case (key, sum) =>
val mean = (sum + 1 - p) / p
val variance = (sum + 1) * (1 - p) / (p * p)
val stdev = math.sqrt(variance)
val low = mean - confFactor * stdev
val high = mean + confFactor * stdev
result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
result(key) = new BoundedDouble(mean, confidence, low, high)
}
result
}
Expand Down
34 changes: 16 additions & 18 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ package org.apache.spark.rdd
import java.util.Random

import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
Expand All @@ -43,6 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}

/**
Expand Down Expand Up @@ -834,24 +833,24 @@ abstract class RDD[T: ClassTag](
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
val map = new OLMap[T]
while (iter.hasNext) {
val v = iter.next()
map.put(v, map.getLong(v) + 1L)
def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
val map = new OpenHashMap[T,Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
Iterator(map)
}
def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
val iter = m2.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
m2.foreach { case (key, value) =>
m1.changeValue(key, value, _ + value)
}
m1
}
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map
// Convert to a Scala mutable map
val mutableResult = scala.collection.mutable.Map[T,Long]()
myResult.foreach { case (k, v) => mutableResult.put(k, v) }
mutableResult
}

/**
Expand All @@ -866,11 +865,10 @@ abstract class RDD[T: ClassTag](
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
val map = new OLMap[T]
while (iter.hasNext) {
val v = iter.next()
map.put(v, map.getLong(v) + 1L)
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) =>
val map = new OpenHashMap[T,Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
map
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark.scheduler

import java.io.InputStream
import java.io.{BufferedInputStream, InputStream}

import scala.io.Source

import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import org.apache.hadoop.fs.{Path, FileSystem}
import org.json4s.jackson.JsonMethods._

Expand Down Expand Up @@ -62,7 +61,7 @@ private[spark] class ReplayListenerBus(
var currentLine = "<not started>"
try {
fileStream = Some(fileSystem.open(path))
bufferedStream = Some(new FastBufferedInputStream(fileStream.get))
bufferedStream = Some(new BufferedInputStream(fileStream.get))
compressStream = Some(wrapForCompression(bufferedStream.get))

// Parse each line as an event and post the event to all attached listeners
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.spark.scheduler

import java.io.{DataInputStream, DataOutputStream}
import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream}
import java.nio.ByteBuffer

import scala.collection.mutable.HashMap

import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream

import org.apache.spark.TaskContext
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
Expand Down Expand Up @@ -104,7 +102,7 @@ private[spark] object Task {
serializer: SerializerInstance)
: ByteBuffer = {

val out = new FastByteArrayOutputStream(4096)
val out = new ByteArrayOutputStream(4096)
val dataOut = new DataOutputStream(out)

// Write currentFiles
Expand All @@ -125,8 +123,7 @@ private[spark] object Task {
dataOut.flush()
val taskBytes = serializer.serialize(task).array()
out.write(taskBytes)
out.trim()
ByteBuffer.wrap(out.array)
ByteBuffer.wrap(out.toByteArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.serializer

import java.io.{EOFException, InputStream, OutputStream}
import java.io.{ByteArrayOutputStream, EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer

import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream

import org.apache.spark.SparkEnv
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
Expand Down Expand Up @@ -73,10 +71,9 @@ trait SerializerInstance {

def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
// Default implementation uses serializeStream
val stream = new FastByteArrayOutputStream()
val stream = new ByteArrayOutputStream()
serializeStream(stream).writeAll(iterator)
val buffer = ByteBuffer.allocate(stream.position.toInt)
buffer.put(stream.array, 0, stream.position.toInt)
val buffer = ByteBuffer.wrap(stream.toByteArray)
buffer.flip()
buffer
}
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import java.io.{File, InputStream, OutputStream}
import java.io.{File, InputStream, OutputStream, BufferedOutputStream, ByteArrayOutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{ArrayBuffer, HashMap}
Expand All @@ -26,7 +26,6 @@ import scala.concurrent.duration._
import scala.util.Random

import akka.actor.{ActorSystem, Cancellable, Props}
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
import sun.nio.ch.DirectBuffer

import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, SparkConf, SparkEnv, SparkException}
Expand Down Expand Up @@ -992,7 +991,7 @@ private[spark] class BlockManager(
outputStream: OutputStream,
values: Iterator[Any],
serializer: Serializer = defaultSerializer) {
val byteStream = new FastBufferedOutputStream(outputStream)
val byteStream = new BufferedOutputStream(outputStream)
val ser = serializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}
Expand All @@ -1002,10 +1001,9 @@ private[spark] class BlockManager(
blockId: BlockId,
values: Iterator[Any],
serializer: Serializer = defaultSerializer): ByteBuffer = {
val byteStream = new FastByteArrayOutputStream(4096)
val byteStream = new ByteArrayOutputStream(4096)
dataSerializeStream(blockId, byteStream, values, serializer)
byteStream.trim()
ByteBuffer.wrap(byteStream.array)
ByteBuffer.wrap(byteStream.toByteArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.storage

import java.io.{FileOutputStream, File, OutputStream}
import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream}
import java.nio.channels.FileChannel

import it.unimi.dsi.fastutil.io.FastBufferedOutputStream

import org.apache.spark.Logging
import org.apache.spark.serializer.{SerializationStream, Serializer}

Expand Down Expand Up @@ -119,7 +117,7 @@ private[spark] class DiskBlockObjectWriter(
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
lastValidPosition = initialPosition
bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
this
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.spark.util

import java.io._
import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date

import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import org.apache.hadoop.fs.{FSDataOutputStream, Path}

import org.apache.spark.{Logging, SparkConf}
Expand Down Expand Up @@ -100,7 +99,7 @@ private[spark] class FileLogger(
hadoopDataStream.get
}

val bstream = new FastBufferedOutputStream(dstream, outputBufferSize)
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream)
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.ArrayBuffer

import it.unimi.dsi.fastutil.ints.IntOpenHashSet

import org.apache.spark.Logging
import org.apache.spark.util.collection.OpenHashSet

/**
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
Expand Down Expand Up @@ -207,7 +206,7 @@ private[spark] object SizeEstimator extends Logging {
// Estimate the size of a large array by sampling elements without replacement.
var size = 0.0
val rand = new Random(42)
val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE)
val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
for (i <- 0 until ARRAY_SAMPLE_SIZE) {
var index = 0
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.util.collection

import java.util.{Arrays, Comparator}

import com.google.common.hash.Hashing

import org.apache.spark.annotation.DeveloperApi

/**
Expand Down Expand Up @@ -199,11 +201,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)

/**
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
* We use the Murmur Hash 3 finalization step that's also used in fastutil.
*/
private def rehash(h: Int): Int = {
it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
}
private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()

/** Double the table's size and re-hash everything */
protected def growTable() {
Expand Down
Loading

0 comments on commit 165e06a

Please sign in to comment.