-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Source #24
Kafka Source #24
Conversation
Merged build finished. Test FAILed. |
Test FAILed. |
} | ||
} | ||
|
||
test("basic receiving") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe try this test case:
(1 to 50).foreach { i =>
test(s"basic receiving #$i") {
val topic = s"topic$i"
testUtils.createTopic(topic)
val kafkaSource = KafkaSource(Set(topic), kafkaParams)
val mapped =
kafkaSource
.toDS()
.map(kv => new String(kv._2).toInt + 1)
testStream(mapped)(
AddKafkaData(kafkaSource, topic, 1, 2, 3),
CheckAnswer(2, 3, 4),
StopStream,
DropBatches(1),
StartStream,
CheckAnswer(2, 3, 4),
StopStream,
AddKafkaData(kafkaSource, topic, 4, 5, 6),
StartStream,
CheckAnswer(2, 3, 4, 5, 6, 7),
AddKafkaData(kafkaSource, topic, 7),
CheckAnswer(2, 3, 4, 5, 6, 7, 8))
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, I am going to add more tests! That's why WIP. Going to add multi-topic tests, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, I just tried that specific test and it uncovered several concurrency issues.
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test PASSed. |
Test PASSed. |
case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s) | ||
case _ => // there are both 1s and -1s | ||
throw new IllegalArgumentException( | ||
s"Invalid comparison between non-linear histories: $this <=> $other") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marmbrus This is a bug here. Basically, [0, 0] < [0, 1]
is correct, but the earlier code would give exception as signs = [0, -1]
I want to add the randomized testing as well, but I dont want this to get blocked by that. Please start reviewing this. |
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test FAILed. |
Test FAILed. |
@marmbrus Ping for review. Also Jenkins does not seem to be testing this. What do I do to force it? |
Jenkins, test this please. |
Merged build finished. Test PASSed. |
Test PASSed. |
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder | ||
import org.apache.spark.sql.execution.streaming.{Batch, Offset, Source, StreamingRelation} | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.sql.{DataFrame, Dataset, SQLContext} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark/Kafka ordering are inconsistent with the file above.
object CompositeOffset { | ||
/** | ||
* Returns a [[CompositeOffset]] with a variable sequence of offsets. | ||
* `nulls` in the sequence are converted to `None`s. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap the trailing */
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Merged build finished. Test PASSed. |
Test PASSed. |
case _ => | ||
throw new IllegalArgumentException(s"Cannot compare $this <=> $other") | ||
} | ||
|
||
override def ==(other: Offset): Boolean = Try(compareTo(other) == 0).getOrElse(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be equals
as well. Note that means we need to override hashcode too.
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
@@ -33,5 +35,17 @@ trait Offset extends Serializable { | |||
def <(other: Offset): Boolean = compareTo(other) < 0 | |||
def <=(other: Offset): Boolean = compareTo(other) <= 0 | |||
def >=(other: Offset): Boolean = compareTo(other) >= 0 | |||
def ==(other: Offset): Boolean = compareTo(other) == 0 | |||
|
|||
override def equals(other: Any): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that its valid to override equals
without also overriding hashCode
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
No description provided.