diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e2b1549b905c0..3043423edd12c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1844,19 +1844,17 @@ public void process(final Object key, final Object value) {} assertEquals(0, punctuatedWallClockTime.size()); mockTime.sleep(100L); - for (long i = 0L; i < 10L; i++) { - clientSupplier.consumer.addRecord(new ConsumerRecord<>( - topic1, - 1, - i, - i * 100L, - TimestampType.CREATE_TIME, - ConsumerRecord.NULL_CHECKSUM, - ("K" + i).getBytes().length, - ("V" + i).getBytes().length, - ("K" + i).getBytes(), - ("V" + i).getBytes())); - } + clientSupplier.consumer.addRecord(new ConsumerRecord<>( + topic1, + 1, + 100L, + 100L, + TimestampType.CREATE_TIME, + ConsumerRecord.NULL_CHECKSUM, + "K".getBytes().length, + "V".getBytes().length, + "K".getBytes(), + "V".getBytes())); thread.runOnce(); @@ -1936,19 +1934,19 @@ public void process(final Object key, final Object value) { clientSupplier.consumer.addRecord(new ConsumerRecord<>( topic1, 1, - 0L, - 100L, - TimestampType.CREATE_TIME, - ConsumerRecord.NULL_CHECKSUM, - "K".getBytes().length, - "V".getBytes().length, - "K".getBytes(), - "V".getBytes())); + 110L, + 110L, + TimestampType.CREATE_TIME, + ConsumerRecord.NULL_CHECKSUM, + "K".getBytes().length, + "V".getBytes().length, + "K".getBytes(), + "V".getBytes())); thread.runOnce(); assertEquals(2, peekedContextTime.size()); - assertEquals(0L, peekedContextTime.get(1).longValue()); + assertEquals(110L, peekedContextTime.get(1).longValue()); } @Test