Skip to content

Commit

Permalink
Merge branch 'branch-1.5' of github.com:apache/spark into csd-1.5
Browse files Browse the repository at this point in the history
  • Loading branch information
markhamstra committed Oct 27, 2015
2 parents ca12e62 + abb0ca7 commit 432ae9e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
10 changes: 10 additions & 0 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,16 @@ def __init__(self, topic, partition):
def _jTopicAndPartition(self, helper):
return helper.createTopicAndPartition(self._topic, self._partition)

def __eq__(self, other):
if isinstance(other, self.__class__):
return (self._topic == other._topic
and self._partition == other._partition)
else:
return False

def __ne__(self, other):
return not self.__eq__(other)


class Broker(object):
"""
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,16 @@ def transformWithOffsetRanges(rdd):

self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])

def test_topic_and_partition_equality(self):
topic_and_partition_a = TopicAndPartition("foo", 0)
topic_and_partition_b = TopicAndPartition("foo", 0)
topic_and_partition_c = TopicAndPartition("bar", 0)
topic_and_partition_d = TopicAndPartition("foo", 1)

self.assertEqual(topic_and_partition_a, topic_and_partition_b)
self.assertNotEqual(topic_and_partition_a, topic_and_partition_c)
self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)


class FlumeStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.util.SerializableConfiguration
* This allows users to give the data source alias as the format type over the fully qualified
* class name.
*
* A new instance of this class with be instantiated each time a DDL call is made.
* A new instance of this class will be instantiated each time a DDL call is made.
*
* @since 1.5.0
*/
Expand Down Expand Up @@ -74,7 +74,7 @@ trait DataSourceRegister {
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
* A new instance of this class will be instantiated each time a DDL call is made.
*
* @since 1.3.0
*/
Expand All @@ -100,7 +100,7 @@ trait RelationProvider {
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
* A new instance of this class will be instantiated each time a DDL call is made.
*
* The difference between a [[RelationProvider]] and a [[SchemaRelationProvider]] is that
* users need to provide a schema when using a [[SchemaRelationProvider]].
Expand Down Expand Up @@ -135,7 +135,7 @@ trait SchemaRelationProvider {
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
* A new instance of this class will be instantiated each time a DDL call is made.
*
* The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is
* that users need to provide a schema and a (possibly empty) list of partition columns when
Expand Down Expand Up @@ -195,7 +195,7 @@ trait CreatableRelationProvider {
* implementation should inherit from one of the descendant `Scan` classes, which define various
* abstract methods for execution.
*
* BaseRelations must also define a equality function that only returns true when the two
* BaseRelations must also define an equality function that only returns true when the two
* instances will return the same data. This equality function is used when determining when
* it is safe to substitute cached results for a given relation.
*
Expand All @@ -208,7 +208,7 @@ abstract class BaseRelation {

/**
* Returns an estimated size of this relation in bytes. This information is used by the planner
* to decided when it is safe to broadcast a relation and can be overridden by sources that
* to decide when it is safe to broadcast a relation and can be overridden by sources that
* know the size ahead of time. By default, the system will assume that tables are too
* large to broadcast. This method will be called multiple times during query planning
* and thus should not perform expensive operations for each invocation.
Expand Down Expand Up @@ -383,7 +383,7 @@ abstract class OutputWriter {

/**
* ::Experimental::
* A [[BaseRelation]] that provides much of the common code required for formats that store their
* A [[BaseRelation]] that provides much of the common code required for relations that store their
* data to an HDFS compatible filesystem.
*
* For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and
Expand Down

0 comments on commit 432ae9e

Please sign in to comment.