Skip to content

Commit

Permalink
[SPARK-11270][STREAMING] Add improved equality testing for TopicAndPa…
Browse files Browse the repository at this point in the history
…rtition from the Kafka Streaming API

jerryshao tdas

I know this is kind of minor, and I know you all are busy, but this brings this class in line with the `OffsetRange` class, and makes tests a little more concise.

Instead of doing something like:
```
assert topic_and_partition_instance._topic == "foo"
assert topic_and_partition_instance._partition == 0
```

You can do something like:
```
assert topic_and_partition_instance == TopicAndPartition("foo", 0)
```

Before:
```
>>> from pyspark.streaming.kafka import TopicAndPartition
>>> TopicAndPartition("foo", 0) == TopicAndPartition("foo", 0)
False
```

After:
```
>>> from pyspark.streaming.kafka import TopicAndPartition
>>> TopicAndPartition("foo", 0) == TopicAndPartition("foo", 0)
True
```

I couldn't find any tests - am I missing something?

Author: Nick Evans <me@nicolasevans.org>

Closes apache#9236 from manygrams/topic_and_partition_equality.

(cherry picked from commit 8f888ee)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
  • Loading branch information
ontarionick authored and tdas committed Oct 27, 2015
1 parent 8a6e63c commit abb0ca7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
10 changes: 10 additions & 0 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,16 @@ def __init__(self, topic, partition):
def _jTopicAndPartition(self, helper):
return helper.createTopicAndPartition(self._topic, self._partition)

def __eq__(self, other):
if isinstance(other, self.__class__):
return (self._topic == other._topic
and self._partition == other._partition)
else:
return False

def __ne__(self, other):
return not self.__eq__(other)


class Broker(object):
"""
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,16 @@ def transformWithOffsetRanges(rdd):

self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])

def test_topic_and_partition_equality(self):
topic_and_partition_a = TopicAndPartition("foo", 0)
topic_and_partition_b = TopicAndPartition("foo", 0)
topic_and_partition_c = TopicAndPartition("bar", 0)
topic_and_partition_d = TopicAndPartition("foo", 1)

self.assertEqual(topic_and_partition_a, topic_and_partition_b)
self.assertNotEqual(topic_and_partition_a, topic_and_partition_c)
self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)


class FlumeStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds
Expand Down

0 comments on commit abb0ca7

Please sign in to comment.