diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 8a814c64c0423..f7b59d6a2756d 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -254,6 +254,16 @@ def __init__(self, topic, partition): def _jTopicAndPartition(self, helper): return helper.createTopicAndPartition(self._topic, self._partition) + def __eq__(self, other): + if isinstance(other, self.__class__): + return (self._topic == other._topic + and self._partition == other._partition) + else: + return False + + def __ne__(self, other): + return not self.__eq__(other) + class Broker(object): """ diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index cfea95b0dec71..a8c7b51ffe0d4 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -887,6 +887,16 @@ def transformWithOffsetRanges(rdd): self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) + def test_topic_and_partition_equality(self): + topic_and_partition_a = TopicAndPartition("foo", 0) + topic_and_partition_b = TopicAndPartition("foo", 0) + topic_and_partition_c = TopicAndPartition("bar", 0) + topic_and_partition_d = TopicAndPartition("foo", 1) + + self.assertEqual(topic_and_partition_a, topic_and_partition_b) + self.assertNotEqual(topic_and_partition_a, topic_and_partition_c) + self.assertNotEqual(topic_and_partition_a, topic_and_partition_d) + class FlumeStreamTests(PySparkStreamingTestCase): timeout = 20 # seconds diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 7b030b7d73bd5..0b1c760e71b46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -43,7 +43,7 @@ import org.apache.spark.util.SerializableConfiguration * This allows users to give the data source alias as the format type over the fully qualified * class name. * - * A new instance of this class with be instantiated each time a DDL call is made. + * A new instance of this class will be instantiated each time a DDL call is made. * * @since 1.5.0 */ @@ -74,7 +74,7 @@ trait DataSourceRegister { * less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the * data source 'org.apache.spark.sql.json.DefaultSource' * - * A new instance of this class with be instantiated each time a DDL call is made. + * A new instance of this class will be instantiated each time a DDL call is made. * * @since 1.3.0 */ @@ -100,7 +100,7 @@ trait RelationProvider { * less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the * data source 'org.apache.spark.sql.json.DefaultSource' * - * A new instance of this class with be instantiated each time a DDL call is made. + * A new instance of this class will be instantiated each time a DDL call is made. * * The difference between a [[RelationProvider]] and a [[SchemaRelationProvider]] is that * users need to provide a schema when using a [[SchemaRelationProvider]]. @@ -135,7 +135,7 @@ trait SchemaRelationProvider { * less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the * data source 'org.apache.spark.sql.json.DefaultSource' * - * A new instance of this class with be instantiated each time a DDL call is made. + * A new instance of this class will be instantiated each time a DDL call is made. * * The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is * that users need to provide a schema and a (possibly empty) list of partition columns when @@ -195,7 +195,7 @@ trait CreatableRelationProvider { * implementation should inherit from one of the descendant `Scan` classes, which define various * abstract methods for execution. * - * BaseRelations must also define a equality function that only returns true when the two + * BaseRelations must also define an equality function that only returns true when the two * instances will return the same data. This equality function is used when determining when * it is safe to substitute cached results for a given relation. * @@ -208,7 +208,7 @@ abstract class BaseRelation { /** * Returns an estimated size of this relation in bytes. This information is used by the planner - * to decided when it is safe to broadcast a relation and can be overridden by sources that + * to decide when it is safe to broadcast a relation and can be overridden by sources that * know the size ahead of time. By default, the system will assume that tables are too * large to broadcast. This method will be called multiple times during query planning * and thus should not perform expensive operations for each invocation. @@ -383,7 +383,7 @@ abstract class OutputWriter { /** * ::Experimental:: - * A [[BaseRelation]] that provides much of the common code required for formats that store their + * A [[BaseRelation]] that provides much of the common code required for relations that store their * data to an HDFS compatible filesystem. * * For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and