Skip to content

Commit

Permalink
Merge pull request apache-spark-on-k8s#358 from palantir/rk/more-upst…
Browse files Browse the repository at this point in the history
…ream
  • Loading branch information
robert3005 authored Apr 20, 2018
2 parents 18acf6d + 7f5253f commit 5ccc040
Show file tree
Hide file tree
Showing 271 changed files with 9,917 additions and 4,515 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions;

import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;

/**
* Simulates Hive's hashing function from Hive v1.2.1
Expand Down Expand Up @@ -51,4 +52,8 @@ public static int hashUnsafeBytesBlock(MemoryBlock mb) {
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
}

public static int hashUTF8String(UTF8String str) {
return hashUnsafeBytesBlock(str.getMemoryBlock());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.primitives.Ints;

import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;

/**
* 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction.
Expand Down Expand Up @@ -82,6 +83,10 @@ public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
return fmix(h1, lengthInBytes);
}

public static int hashUTF8String(UTF8String str, int seed) {
return hashUnsafeBytesBlock(str.getMemoryBlock(), seed);
}

public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}
Expand All @@ -91,7 +96,7 @@ public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes,
}

public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
// This is compatible with original and another implementations.
// This is compatible with original and other implementations.
// Use this method for new components after Spark 2.3.
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ private static void checkBasic(String str, int len) {

assertTrue(s1.contains(s2));
assertTrue(s2.contains(s1));
assertTrue(s1.startsWith(s1));
assertTrue(s1.endsWith(s1));
assertTrue(s1.startsWith(s2));
assertTrue(s1.endsWith(s2));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class UTF8StringPropertyCheckSuite extends FunSuite with GeneratorDrivenProperty
def padding(origin: String, pad: String, length: Int, isLPad: Boolean): String = {
if (length <= 0) return ""
if (length <= origin.length) {
if (length <= 0) "" else origin.substring(0, length)
origin.substring(0, length)
} else {
if (pad.length == 0) return origin
val toPad = length - origin.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public UnsafeInMemorySorter(
int initialSize,
boolean canUseRadixSort) {
this(consumer, memoryManager, recordComparator, prefixComparator,
consumer.allocateArray(initialSize * 2), canUseRadixSort);
consumer.allocateArray(initialSize * 2L), canUseRadixSort);
}

public UnsafeInMemorySorter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int

@Override
public LongArray allocate(int length) {
assert (length * 2 <= buffer.size()) :
assert (length * 2L <= buffer.size()) :
"the buffer is smaller than required: " + buffer.size() + " < " + (length * 2);
return buffer;
}
Expand Down
30 changes: 28 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.{MutableURLClassLoader, Utils}

private[deploy] object DependencyUtils {
private[deploy] object DependencyUtils extends Logging {

def resolveMavenDependencies(
packagesExclusions: String,
Expand Down Expand Up @@ -75,7 +76,7 @@ private[deploy] object DependencyUtils {
def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
if (jars != null) {
for (jar <- jars.split(",")) {
SparkSubmit.addJarToClasspath(jar, loader)
addJarToClasspath(jar, loader)
}
}
}
Expand Down Expand Up @@ -151,6 +152,31 @@ private[deploy] object DependencyUtils {
}.mkString(",")
}

def addJarToClasspath(localJar: String, loader: MutableURLClassLoader): Unit = {
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
val file = new File(uri.getPath)
if (file.exists()) {
loader.addURL(file.toURI.toURL)
} else {
logWarning(s"Local jar $file does not exist, skipping.")
}
case _ =>
logWarning(s"Skip remote jar $uri.")
}
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
def mergeFileLists(lists: String*): String = {
val merged = lists.filterNot(StringUtils.isBlank)
.flatMap(Utils.stringToSeq)
if (merged.nonEmpty) merged.mkString(",") else null
}

private def splitOnFragment(path: String): (URI, Option[String]) = {
val uri = Utils.resolveURI(path)
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)
Expand Down
Loading

0 comments on commit 5ccc040

Please sign in to comment.