Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKIPME Full outer join #12

Merged
merged 2 commits into from
May 31, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
}

/**
* Perform a full outer join of `this` and `other`. Output will have
* each row from both RDDs or `None` where missing, i.e. one of
* (k, (Some(v), Some(w)), (k, (Some(v), None)) or (k, (None, Some(w))
* depending on the presence of (k, v) and/or (k, w) in `this` and `other`
* Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))})
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
*/
Expand Down Expand Up @@ -402,6 +415,24 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))})
}

/**
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))})
}

/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,26 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
}

/**
* Perform a full outer join of `this` and `other`. Output will have
* each row from both RDDs or `None` where missing, i.e. one of
* (k, (Some(v), Some(w)), (k, (Some(v), None)) or (k, (None, Some(w))
* depending on the presence of (k, v) and/or (k, w) in `this` and `other`
* Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty && !ws.isEmpty) {
ws.iterator.map(w => (None, Some(w)))
} else if (ws.isEmpty && !vs.isEmpty) {
vs.iterator.map(v => (Some(v), None))
} else {
for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
}
}
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
Expand Down Expand Up @@ -422,6 +442,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
rightOuterJoin(other, new HashPartitioner(numPartitions))
}

/**
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
fullOuterJoin(other, defaultPartitioner(self, other))
}

/**
* Perform a full outer join of `this` and `other`. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
fullOuterJoin(other, new HashPartitioner(numPartitions))
}

/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)

assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)

assert(grouped2.map(_ => 1).partitioner === None)
Expand All @@ -127,6 +129,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
))
}

test("fullOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.fullOuterJoin(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (Some(1), Some('x'))),
(1, (Some(2), Some('x'))),
(2, (Some(1), Some('y'))),
(2, (Some(1), Some('z'))),
(3, (Some(1), None)),
(4, (None, Some('w')))
))
}

test("join with no matches") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.join(emptyKv).collect().size === 0)
assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
assert(rdd.cogroup(emptyKv).collect().size === 2)
assert(rdd.union(emptyKv).collect().size === 2)
}
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ def dispatch(seq):
return _do_python_join(rdd, other, numPartitions, dispatch)


def python_full_outer_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
if n == 1:
vbuf.append(v)
elif n == 2:
wbuf.append(v)
if not vbuf:
vbuf.append(None)
if not wbuf:
wbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf]
return _do_python_join(rdd, other, numPartitions, dispatch)


def python_cogroup(rdd, other, numPartitions):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,23 @@ def rightOuterJoin(self, other, numPartitions=None):
"""
return python_right_outer_join(self, other, numPartitions)

def fullOuterJoin(self, other, numPartitions=None):
"""
Perform a full outer join of C{self} and C{other}.

Output will have each row from both RDDs or None where missing, i.e.
one of (k, (v, w)), (k, (v, None)), or (k, (None, w)) depending on
the presence of (k, v) and/or (k, w) in C{self} and C{other}

Hash-partitions the resulting RDD into the given number of partitions.

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("c", 3)])
>>> sorted(y.fullOuterJoin(x).collect())
[('a', (2, 1)), ('b', (None, 4)), ('c', (3, None))]
"""
return python_full_outer_join(self, other, numPartitions)

# TODO: add option to control map-side combining
def partitionBy(self, numPartitions, partitionFunc=hash):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,48 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
def fullOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], Optional[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.fullOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
*/
def fullOuterJoin[W](
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (Optional[V], Optional[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
def fullOuterJoin[W](
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (Optional[V], Optional[W])] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
val joinResult = dstream.fullOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))}
}

/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,42 @@ extends Serializable {
)
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = {
fullOuterJoin[W](other, defaultPartitioner())
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
*/
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
): DStream[(K, (Option[V], Option[W]))] = {
fullOuterJoin[W](other, defaultPartitioner(numPartitions))
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Option[V], Option[W]))] = {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner)
)
}

/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
* is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,21 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(inputData1, inputData2, operation, outputData, true)
}

test("fullOuterJoin") {
val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
val outputData = Seq(
Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ),
Seq( ("", (Some(1), Some("x"))), ("b", (None, Some("x"))), ("a", (Some(1), None)) ),
Seq( ("", (Some(1), None)) ),
Seq( ("", (None, Some("x"))) )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x")))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}

test("updateStateByKey") {
val inputData =
Seq(
Expand Down