From eede1cc00cea5021343501a3c0cd908bdc82a1e1 Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Tue, 25 May 2021 16:03:59 +0200 Subject: [PATCH] KAFKA-12819: Add assert messages to MirrorMaker tests plus other quality of life improvements --- .../kafka/connect/mirror/CheckpointTest.java | 12 ++-- .../kafka/connect/mirror/HeartbeatTest.java | 9 ++- .../mirror/MirrorCheckpointConnectorTest.java | 30 +++++---- .../mirror/MirrorCheckpointTaskTest.java | 51 ++++++++++----- .../mirror/MirrorConnectorConfigTest.java | 64 ++++++++++++------- .../mirror/MirrorHeartBeatConnectorTest.java | 4 +- .../mirror/MirrorHeartbeatTaskTest.java | 8 ++- .../connect/mirror/MirrorMakerConfigTest.java | 20 +++--- .../mirror/MirrorSourceConnectorTest.java | 8 +-- .../connect/mirror/MirrorSourceTaskTest.java | 62 ++++++++++-------- .../connect/mirror/OffsetSyncStoreTest.java | 15 +++-- .../kafka/connect/mirror/OffsetSyncTest.java | 9 ++- 12 files changed, 180 insertions(+), 112 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java index fd5448c13f93a..f008f996772ce 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java @@ -32,9 +32,13 @@ public void testSerde() { byte[] value = checkpoint.recordValue(); ConsumerRecord record = new ConsumerRecord<>("any-topic", 7, 8, key, value); Checkpoint deserialized = Checkpoint.deserializeRecord(record); - assertEquals(checkpoint.consumerGroupId(), deserialized.consumerGroupId()); - assertEquals(checkpoint.topicPartition(), deserialized.topicPartition()); - assertEquals(checkpoint.upstreamOffset(), deserialized.upstreamOffset()); - assertEquals(checkpoint.downstreamOffset(), deserialized.downstreamOffset()); + assertEquals(checkpoint.consumerGroupId(), deserialized.consumerGroupId(), + "Failure on checkpoint consumerGroupId serde"); + assertEquals(checkpoint.topicPartition(), deserialized.topicPartition(), + "Failure on checkpoint topicPartition serde"); + assertEquals(checkpoint.upstreamOffset(), deserialized.upstreamOffset(), + "Failure on checkpoint upstreamOffset serde"); + assertEquals(checkpoint.downstreamOffset(), deserialized.downstreamOffset(), + "Failure on checkpoint downstreamOffset serde"); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java index fb473f7b0af0c..723b0dc2bfe56 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java @@ -31,8 +31,11 @@ public void testSerde() { byte[] value = heartbeat.recordValue(); ConsumerRecord record = new ConsumerRecord<>("any-topic", 6, 7, key, value); Heartbeat deserialized = Heartbeat.deserializeRecord(record); - assertEquals(heartbeat.sourceClusterAlias(), deserialized.sourceClusterAlias()); - assertEquals(heartbeat.targetClusterAlias(), deserialized.targetClusterAlias()); - assertEquals(heartbeat.timestamp(), deserialized.timestamp()); + assertEquals(heartbeat.sourceClusterAlias(), deserialized.sourceClusterAlias(), + "Failure on heartbeat sourceClusterAlias serde"); + assertEquals(heartbeat.targetClusterAlias(), deserialized.targetClusterAlias(), + "Failure on heartbeat targetClusterAlias serde"); + assertEquals(heartbeat.timestamp(), deserialized.timestamp(), + "Failure on heartbeat timestamp serde"); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java index 3c8453c77518a..1391e7615d3a0 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java @@ -38,6 +38,8 @@ public class MirrorCheckpointConnectorTest { + private static final String CONSUMER_GROUP = "consumer-group-1"; + @Test public void testMirrorCheckpointConnectorDisabled() { // disable the checkpoint emission @@ -45,13 +47,13 @@ public void testMirrorCheckpointConnectorDisabled() { makeProps("emit.checkpoints.enabled", "false")); List knownConsumerGroups = new ArrayList<>(); - knownConsumerGroups.add("consumer-group-1"); + knownConsumerGroups.add(CONSUMER_GROUP); // MirrorCheckpointConnector as minimum to run taskConfig() MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config); List> output = connector.taskConfigs(1); // expect no task will be created - assertEquals(0, output.size()); + assertEquals(0, output.size(), "MirrorCheckpointConnector not disabled"); } @Test @@ -61,14 +63,16 @@ public void testMirrorCheckpointConnectorEnabled() { makeProps("emit.checkpoints.enabled", "true")); List knownConsumerGroups = new ArrayList<>(); - knownConsumerGroups.add("consumer-group-1"); + knownConsumerGroups.add(CONSUMER_GROUP); // MirrorCheckpointConnector as minimum to run taskConfig() MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config); List> output = connector.taskConfigs(1); // expect 1 task will be created - assertEquals(1, output.size()); - assertEquals("consumer-group-1", output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS)); + assertEquals(1, output.size(), + "MirrorCheckpointConnectorEnabled for " + CONSUMER_GROUP + " has incorrect size"); + assertEquals(CONSUMER_GROUP, output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS), + "MirrorCheckpointConnectorEnabled for " + CONSUMER_GROUP + " failed"); } @Test @@ -77,7 +81,7 @@ public void testNoConsumerGroup() { MirrorCheckpointConnector connector = new MirrorCheckpointConnector(new ArrayList<>(), config); List> output = connector.taskConfigs(1); // expect no task will be created - assertEquals(0, output.size()); + assertEquals(0, output.size(), "ConsumerGroup shouldn't exist"); } @Test @@ -86,12 +90,12 @@ public void testReplicationDisabled() { MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("enabled", "false")); List knownConsumerGroups = new ArrayList<>(); - knownConsumerGroups.add("consumer-group-1"); + knownConsumerGroups.add(CONSUMER_GROUP); // MirrorCheckpointConnector as minimum to run taskConfig() MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config); List> output = connector.taskConfigs(1); // expect no task will be created - assertEquals(0, output.size()); + assertEquals(0, output.size(), "Replication isn't disabled"); } @Test @@ -100,13 +104,14 @@ public void testReplicationEnabled() { MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("enabled", "true")); List knownConsumerGroups = new ArrayList<>(); - knownConsumerGroups.add("consumer-group-1"); + knownConsumerGroups.add(CONSUMER_GROUP); // MirrorCheckpointConnector as minimum to run taskConfig() MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config); List> output = connector.taskConfigs(1); // expect 1 task will be created - assertEquals(1, output.size()); - assertEquals("consumer-group-1", output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS)); + assertEquals(1, output.size(), "Replication for consumer-group-1 has incorrect size"); + assertEquals(CONSUMER_GROUP, output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS), + "Replication for consumer-group-1 failed"); } @Test @@ -123,7 +128,8 @@ public void testFindConsumerGroups() throws Exception { List groupFound = connector.findConsumerGroups(); Set expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet()); - assertEquals(expectedGroups, new HashSet<>(groupFound)); + assertEquals(expectedGroups, new HashSet<>(groupFound), + "Expected groups are not the same as findConsumerGroups"); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java index abd314bb71c9e..7ef878ab2e8d3 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java @@ -36,11 +36,14 @@ public void testDownstreamTopicRenaming() { MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", new DefaultReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap()); assertEquals(new TopicPartition("source1.topic3", 4), - mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4))); + mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)), + "Renaming source1.topic3 failed"); assertEquals(new TopicPartition("topic3", 5), - mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5))); + mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)), + "Renaming target2.topic3 failed"); assertEquals(new TopicPartition("source1.source6.topic7", 8), - mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8))); + mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)), + "Renaming source1.source6.topic7 failed"); } @Test @@ -53,21 +56,33 @@ public void testCheckpoint() { Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2), new OffsetAndMetadata(10, null)); SourceRecord sourceRecord1 = mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L); - assertEquals(new TopicPartition("source1.topic1", 2), checkpoint1.topicPartition()); - assertEquals("group9", checkpoint1.consumerGroupId()); - assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition())); - assertEquals(10, checkpoint1.upstreamOffset()); - assertEquals(11, checkpoint1.downstreamOffset()); - assertEquals(123L, sourceRecord1.timestamp().longValue()); + assertEquals(new TopicPartition("source1.topic1", 2), checkpoint1.topicPartition(), + "checkpoint group9 source1.topic1 failed"); + assertEquals("group9", checkpoint1.consumerGroupId(), + "checkpoint group9 consumerGroupId failed"); + assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()), + "checkpoint group9 sourcePartition failed"); + assertEquals(10, checkpoint1.upstreamOffset(), + "checkpoint group9 upstreamOffset failed"); + assertEquals(11, checkpoint1.downstreamOffset(), + "checkpoint group9 downstreamOffset failed"); + assertEquals(123L, sourceRecord1.timestamp().longValue(), + "checkpoint group9 timestamp failed"); Checkpoint checkpoint2 = mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5", 6), new OffsetAndMetadata(12, null)); SourceRecord sourceRecord2 = mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L); - assertEquals(new TopicPartition("topic5", 6), checkpoint2.topicPartition()); - assertEquals("group11", checkpoint2.consumerGroupId()); - assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition())); - assertEquals(12, checkpoint2.upstreamOffset()); - assertEquals(13, checkpoint2.downstreamOffset()); - assertEquals(234L, sourceRecord2.timestamp().longValue()); + assertEquals(new TopicPartition("topic5", 6), checkpoint2.topicPartition(), + "checkpoint group11 topic5 failed"); + assertEquals("group11", checkpoint2.consumerGroupId(), + "checkpoint group11 consumerGroupId failed"); + assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()), + "checkpoint group11 sourcePartition failed"); + assertEquals(12, checkpoint2.upstreamOffset(), + "checkpoint group11 upstreamOffset failed"); + assertEquals(13, checkpoint2.downstreamOffset(), + "checkpoint group11 downstreamOffset failed"); + assertEquals(234L, sourceRecord2.timestamp().longValue(), + "checkpoint group11 timestamp failed"); } @Test @@ -118,7 +133,9 @@ public void testSyncOffset() { Map> output = mirrorCheckpointTask.syncGroupOffset(); - assertEquals(101, output.get(consumer1).get(t1p0).offset()); - assertEquals(51, output.get(consumer2).get(t2p0).offset()); + assertEquals(101, output.get(consumer1).get(t1p0).offset(), + "Consumer 1 " + topic1 + " failed"); + assertEquals(51, output.get(consumer2).get(t2p0).offset(), + "Consumer 2 " + topic2 + " failed"); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java index f53aa68387276..7abe30def6ddc 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java @@ -41,7 +41,8 @@ public void testTaskConfigTopicPartitions() { MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps()); Map props = config.taskConfigForTopicPartitions(topicPartitions); MirrorTaskConfig taskConfig = new MirrorTaskConfig(props); - assertEquals(taskConfig.taskTopicPartitions(), new HashSet<>(topicPartitions)); + assertEquals(taskConfig.taskTopicPartitions(), new HashSet<>(topicPartitions), + "Setting topic property configuration failed"); } @Test @@ -50,29 +51,36 @@ public void testTaskConfigConsumerGroups() { MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps()); Map props = config.taskConfigForConsumerGroups(groups); MirrorTaskConfig taskConfig = new MirrorTaskConfig(props); - assertEquals(taskConfig.taskConsumerGroups(), new HashSet<>(groups)); + assertEquals(taskConfig.taskConsumerGroups(), new HashSet<>(groups), + "Setting consumer groups property configuration failed"); } @Test public void testTopicMatching() { MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", "topic1")); - assertTrue(config.topicFilter().shouldReplicateTopic("topic1")); - assertFalse(config.topicFilter().shouldReplicateTopic("topic2")); + assertTrue(config.topicFilter().shouldReplicateTopic("topic1"), + "topic1 replication property configuration failed"); + assertFalse(config.topicFilter().shouldReplicateTopic("topic2"), + "topic2 replication property configuration failed"); } @Test public void testGroupMatching() { MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("groups", "group1")); - assertTrue(config.groupFilter().shouldReplicateGroup("group1")); - assertFalse(config.groupFilter().shouldReplicateGroup("group2")); + assertTrue(config.groupFilter().shouldReplicateGroup("group1"), + "topic1 group matching property configuration failed"); + assertFalse(config.groupFilter().shouldReplicateGroup("group2"), + "topic2 group matching property configuration failed"); } @Test public void testConfigPropertyMatching() { MirrorConnectorConfig config = new MirrorConnectorConfig( makeProps("config.properties.exclude", "prop2")); - assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop1")); - assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop2")); + assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"), + "config.properties.exclude incorrectly excluded prop1"); + assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"), + "config.properties.exclude incorrectly included prop2"); } @Test @@ -92,24 +100,26 @@ public void testConfigBackwardsCompatibility() { @Test public void testNoTopics() { MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", "")); - assertFalse(config.topicFilter().shouldReplicateTopic("topic1")); - assertFalse(config.topicFilter().shouldReplicateTopic("topic2")); - assertFalse(config.topicFilter().shouldReplicateTopic("")); + assertFalse(config.topicFilter().shouldReplicateTopic("topic1"), "topic1 shouldn't exist"); + assertFalse(config.topicFilter().shouldReplicateTopic("topic2"), "topic2 shouldn't exist"); + assertFalse(config.topicFilter().shouldReplicateTopic(""), "Empty topic shouldn't exist"); } @Test public void testAllTopics() { MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", ".*")); - assertTrue(config.topicFilter().shouldReplicateTopic("topic1")); - assertTrue(config.topicFilter().shouldReplicateTopic("topic2")); + assertTrue(config.topicFilter().shouldReplicateTopic("topic1"), + "topic1 created from wildcard should exist"); + assertTrue(config.topicFilter().shouldReplicateTopic("topic2"), + "topic2 created from wildcard should exist"); } @Test public void testListOfTopics() { MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", "topic1, topic2")); - assertTrue(config.topicFilter().shouldReplicateTopic("topic1")); - assertTrue(config.topicFilter().shouldReplicateTopic("topic2")); - assertFalse(config.topicFilter().shouldReplicateTopic("topic3")); + assertTrue(config.topicFilter().shouldReplicateTopic("topic1"), "topic1 created from list should exist"); + assertTrue(config.topicFilter().shouldReplicateTopic("topic2"), "topic2 created from list should exist"); + assertFalse(config.topicFilter().shouldReplicateTopic("topic3"), "topic3 created from list should exist"); } @Test @@ -156,7 +166,8 @@ public void testSourceConsumerConfig() { connectorConsumerProps = config.sourceConsumerConfig(); expectedConsumerProps.put("auto.offset.reset", "latest"); expectedConsumerProps.remove("max.poll.interval.ms"); - assertEquals(expectedConsumerProps, connectorConsumerProps); + assertEquals(expectedConsumerProps, connectorConsumerProps, + MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + " source consumer config not matching"); } @Test @@ -172,7 +183,8 @@ public void testSourceConsumerConfigWithSourcePrefix() { expectedConsumerProps.put("enable.auto.commit", "false"); expectedConsumerProps.put("auto.offset.reset", "latest"); expectedConsumerProps.put("max.poll.interval.ms", "100"); - assertEquals(expectedConsumerProps, connectorConsumerProps); + assertEquals(expectedConsumerProps, connectorConsumerProps, + prefix + " source consumer config not matching"); } @Test @@ -184,7 +196,8 @@ public void testSourceProducerConfig() { Map connectorProducerProps = config.sourceProducerConfig(); Map expectedProducerProps = new HashMap<>(); expectedProducerProps.put("acks", "1"); - assertEquals(expectedProducerProps, connectorProducerProps); + assertEquals(expectedProducerProps, connectorProducerProps, + MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX + " source product config not matching"); } @Test @@ -195,7 +208,8 @@ public void testSourceProducerConfigWithSourcePrefix() { Map connectorProducerProps = config.sourceProducerConfig(); Map expectedProducerProps = new HashMap<>(); expectedProducerProps.put("acks", "1"); - assertEquals(expectedProducerProps, connectorProducerProps); + assertEquals(expectedProducerProps, connectorProducerProps, + prefix + " source producer config not matching"); } @Test @@ -208,7 +222,8 @@ public void testSourceAdminConfig() { Map connectorAdminProps = config.sourceAdminConfig(); Map expectedAdminProps = new HashMap<>(); expectedAdminProps.put("connections.max.idle.ms", "10000"); - assertEquals(expectedAdminProps, connectorAdminProps); + assertEquals(expectedAdminProps, connectorAdminProps, + MirrorConnectorConfig.ADMIN_CLIENT_PREFIX + " source connector admin props not matching"); } @Test @@ -219,7 +234,7 @@ public void testSourceAdminConfigWithSourcePrefix() { Map connectorAdminProps = config.sourceAdminConfig(); Map expectedAdminProps = new HashMap<>(); expectedAdminProps.put("connections.max.idle.ms", "10000"); - assertEquals(expectedAdminProps, connectorAdminProps); + assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching"); } @Test @@ -232,7 +247,8 @@ public void testTargetAdminConfig() { Map connectorAdminProps = config.targetAdminConfig(); Map expectedAdminProps = new HashMap<>(); expectedAdminProps.put("connections.max.idle.ms", "10000"); - assertEquals(expectedAdminProps, connectorAdminProps); + assertEquals(expectedAdminProps, connectorAdminProps, + MirrorConnectorConfig.ADMIN_CLIENT_PREFIX + " target connector admin props not matching"); } @Test @@ -243,7 +259,7 @@ public void testTargetAdminConfigWithSourcePrefix() { Map connectorAdminProps = config.targetAdminConfig(); Map expectedAdminProps = new HashMap<>(); expectedAdminProps.put("connections.max.idle.ms", "10000"); - assertEquals(expectedAdminProps, connectorAdminProps); + assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching"); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java index b48c46977078e..ec0691983a63f 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java @@ -34,7 +34,7 @@ public void testMirrorHeartbeatConnectorDisabled() { MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector(config); List> output = connector.taskConfigs(1); // expect no task will be created - assertEquals(0, output.size()); + assertEquals(0, output.size(), "Expected task to not be created"); } @Test @@ -47,6 +47,6 @@ public void testReplicationDisabled() { MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector(config); List> output = connector.taskConfigs(1); // expect one task will be created, even the replication is disabled - assertEquals(1, output.size()); + assertEquals(1, output.size(), "Task should have been created even with replication disabled"); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java index d4f96e785b9f4..39fd6dff10e30 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java @@ -34,7 +34,9 @@ public void testPollCreatesRecords() throws InterruptedException { List records = heartbeatTask.poll(); assertEquals(1, records.size()); Map sourcePartition = records.iterator().next().sourcePartition(); - assertEquals(sourcePartition.get(Heartbeat.SOURCE_CLUSTER_ALIAS_KEY), "testSource"); - assertEquals(sourcePartition.get(Heartbeat.TARGET_CLUSTER_ALIAS_KEY), "testTarget"); + assertEquals(sourcePartition.get(Heartbeat.SOURCE_CLUSTER_ALIAS_KEY), "testSource", + "sourcePartition's " + Heartbeat.SOURCE_CLUSTER_ALIAS_KEY + " record was not created"); + assertEquals(sourcePartition.get(Heartbeat.TARGET_CLUSTER_ALIAS_KEY), "testTarget", + "sourcePartition's " + Heartbeat.TARGET_CLUSTER_ALIAS_KEY + " record was not created"); } -} \ No newline at end of file +} diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java index f5fe2c3ca32ec..4787ecd95ebda 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java @@ -137,13 +137,13 @@ public void testIncludesConnectorConfigProperties() { MirrorConnectorConfig connectorConfig = new MirrorConnectorConfig(connectorProps); assertEquals(100, (int) connectorConfig.getInt("tasks.max"), "Connector properties like tasks.max should be passed through to underlying Connectors."); - assertEquals(Arrays.asList("topic-1"), connectorConfig.getList("topics"), + assertEquals(Collections.singletonList("topic-1"), connectorConfig.getList("topics"), "Topics include should be passed through to underlying Connectors."); - assertEquals(Arrays.asList("group-2"), connectorConfig.getList("groups"), + assertEquals(Collections.singletonList("group-2"), connectorConfig.getList("groups"), "Groups include should be passed through to underlying Connectors."); - assertEquals(Arrays.asList("property-3"), connectorConfig.getList("config.properties.exclude"), + assertEquals(Collections.singletonList("property-3"), connectorConfig.getList("config.properties.exclude"), "Config properties exclude should be passed through to underlying Connectors."); - assertEquals(Arrays.asList("FakeMetricsReporter"), connectorConfig.getList("metric.reporters"), + assertEquals(Collections.singletonList("FakeMetricsReporter"), connectorConfig.getList("metric.reporters"), "Metrics reporters should be passed through to underlying Connectors."); assertEquals("DefaultTopicFilter", connectorConfig.getClass("topic.filter.class").getSimpleName(), "Filters should be passed through to underlying Connectors."); @@ -168,13 +168,13 @@ public void testConfigBackwardsCompatibility() { DefaultTopicFilter.TopicFilterConfig filterConfig = new DefaultTopicFilter.TopicFilterConfig(connectorProps); - assertEquals(Arrays.asList("topic3"), filterConfig.getList("topics.exclude"), + assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"), "Topics exclude should be backwards compatible."); - assertEquals(Arrays.asList("group-7"), connectorConfig.getList("groups.exclude"), + assertEquals(Collections.singletonList("group-7"), connectorConfig.getList("groups.exclude"), "Groups exclude should be backwards compatible."); - assertEquals(Arrays.asList("property-3"), connectorConfig.getList("config.properties.exclude"), + assertEquals(Collections.singletonList("property-3"), connectorConfig.getList("config.properties.exclude"), "Config properties exclude should be backwards compatible."); } @@ -193,10 +193,10 @@ public void testConfigBackwardsCompatibilitySourceTarget() { DefaultTopicFilter.TopicFilterConfig filterConfig = new DefaultTopicFilter.TopicFilterConfig(connectorProps); - assertEquals(Arrays.asList("topic3"), filterConfig.getList("topics.exclude"), + assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"), "Topics exclude should be backwards compatible."); - assertEquals(Arrays.asList("group-7"), connectorConfig.getList("groups.exclude"), + assertEquals(Collections.singletonList("group-7"), connectorConfig.getList("groups.exclude"), "Groups exclude should be backwards compatible."); } @@ -213,7 +213,7 @@ public void testIncludesTopicFilterProperties() { new DefaultTopicFilter.TopicFilterConfig(connectorProps); assertEquals(Arrays.asList("topic1", "topic2"), filterConfig.getList("topics"), "source->target.topics should be passed through to TopicFilters."); - assertEquals(Arrays.asList("topic3"), filterConfig.getList("topics.exclude"), + assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"), "source->target.topics.exclude should be passed through to TopicFilters."); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index 42d7951cd60fc..68d149c755ff8 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -167,13 +167,13 @@ public void testMirrorSourceConnectorTaskConfig() { // t3 -> [t0p2, t0p5, t1p0, t2p1] Map t1 = output.get(0); - assertEquals("t0-0,t0-3,t0-6,t1-1", t1.get(TASK_TOPIC_PARTITIONS)); + assertEquals("t0-0,t0-3,t0-6,t1-1", t1.get(TASK_TOPIC_PARTITIONS), "Config for t1 is incorrect"); Map t2 = output.get(1); - assertEquals("t0-1,t0-4,t0-7,t2-0", t2.get(TASK_TOPIC_PARTITIONS)); + assertEquals("t0-1,t0-4,t0-7,t2-0", t2.get(TASK_TOPIC_PARTITIONS), "Config for t2 is incorrect"); Map t3 = output.get(2); - assertEquals("t0-2,t0-5,t1-0,t2-1", t3.get(TASK_TOPIC_PARTITIONS)); + assertEquals("t0-2,t0-5,t1-0,t2-1", t3.get(TASK_TOPIC_PARTITIONS), "Config for t3 is incorrect"); } @Test @@ -201,7 +201,7 @@ public void testRefreshTopicPartitions() throws Exception { Map expectedPartitionCounts = new HashMap<>(); expectedPartitionCounts.put("source.topic", 1L); Map configMap = MirrorSourceConnector.configToMap(topicConfig); - assertEquals(2, configMap.size()); + assertEquals(2, configMap.size(), "configMap has incorrect size"); Map expectedNewTopics = new HashMap<>(); expectedNewTopics.put("source.topic", new NewTopic("source.topic", 1, (short) 0).configs(configMap)); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 9cf09f8c4cfd0..feb2f7fb6ba68 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -54,15 +54,22 @@ public void testSerde() { MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7", new DefaultReplicationPolicy(), 50); SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord); - assertEquals("cluster7.topic1", sourceRecord.topic()); - assertEquals(2, sourceRecord.kafkaPartition().intValue()); - assertEquals(new TopicPartition("topic1", 2), MirrorUtils.unwrapPartition(sourceRecord.sourcePartition())); - assertEquals(3L, MirrorUtils.unwrapOffset(sourceRecord.sourceOffset()).longValue()); - assertEquals(4L, sourceRecord.timestamp().longValue()); - assertEquals(key, sourceRecord.key()); - assertEquals(value, sourceRecord.value()); - assertEquals(headers.lastHeader("header1").value(), sourceRecord.headers().lastWithName("header1").value()); - assertEquals(headers.lastHeader("header2").value(), sourceRecord.headers().lastWithName("header2").value()); + assertEquals("cluster7.topic1", sourceRecord.topic(), + "Failure on cluster7.topic1 consumerRecord serde"); + assertEquals(2, sourceRecord.kafkaPartition().intValue(), + "sourceRecord kafka partition is incorrect"); + assertEquals(new TopicPartition("topic1", 2), MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()), + "topic1 unwrapped from sourcePartition is incorrect"); + assertEquals(3L, MirrorUtils.unwrapOffset(sourceRecord.sourceOffset()).longValue(), + "sourceRecord's sourceOffset is incorrect"); + assertEquals(4L, sourceRecord.timestamp().longValue(), + "sourceRecord's timestamp is incorrect"); + assertEquals(key, sourceRecord.key(), "sourceRecord's key is incorrect"); + assertEquals(value, sourceRecord.value(), "sourceRecord's value is incorrect"); + assertEquals(headers.lastHeader("header1").value(), sourceRecord.headers().lastWithName("header1").value(), + "sourceRecord's header1 is incorrect"); + assertEquals(headers.lastHeader("header2").value(), sourceRecord.headers().lastWithName("header2").value(), + "sourceRecord's header2 is incorrect"); } @Test @@ -86,16 +93,16 @@ public void testZeroOffsetSync() { MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0); // if max offset lag is zero, should always emit offset syncs - assertTrue(partitionState.update(0, 100)); - assertTrue(partitionState.update(2, 102)); - assertTrue(partitionState.update(3, 153)); - assertTrue(partitionState.update(4, 154)); - assertTrue(partitionState.update(5, 155)); - assertTrue(partitionState.update(6, 207)); - assertTrue(partitionState.update(2, 208)); - assertTrue(partitionState.update(3, 209)); - assertTrue(partitionState.update(4, 3)); - assertTrue(partitionState.update(5, 4)); + assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect"); + assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect"); + assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect"); + assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect"); + assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect"); + assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect"); + assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect"); + assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect"); + assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect"); + assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect"); } @Test @@ -134,13 +141,16 @@ public void testPoll() { for (int i = 0; i < sourceRecords.size(); i++) { SourceRecord sourceRecord = sourceRecords.get(i); ConsumerRecord consumerRecord = consumerRecordsList.get(i); - assertEquals(consumerRecord.key(), sourceRecord.key()); - assertEquals(consumerRecord.value(), sourceRecord.value()); + assertEquals(consumerRecord.key(), sourceRecord.key(), + "consumerRecord key does not equal sourceRecord key"); + assertEquals(consumerRecord.value(), sourceRecord.value(), + "consumerRecord value does not equal sourceRecord value"); // We expect that the topicname will be based on the replication policy currently used assertEquals(replicationPolicy.formatRemoteTopic(sourceClusterName, topicName), - sourceRecord.topic()); + sourceRecord.topic(), "topicName not the same as the current replicationPolicy"); // We expect that MirrorMaker will keep the same partition assignment - assertEquals(consumerRecord.partition(), sourceRecord.kafkaPartition().intValue()); + assertEquals(consumerRecord.partition(), sourceRecord.kafkaPartition().intValue(), + "partition assignment not the same as the current replicationPolicy"); // Check header values List
expectedHeaders = new ArrayList<>(); consumerRecord.headers().forEach(expectedHeaders::add); @@ -155,8 +165,10 @@ private void compareHeaders(List
expectedHeaders, List can't translate - assertEquals(-1, store.translateDownstream(tp, 5)); + assertEquals(-1, store.translateDownstream(tp, 5), + "Expected old offset to not translate"); // Downstream offsets reset store.sync(tp, 200, 10); - assertEquals(store.translateDownstream(tp, 200), 10); + assertEquals(store.translateDownstream(tp, 200), 10, + "Failure in resetting translation of downstream offset"); // Upstream offsets reset store.sync(tp, 20, 20); - assertEquals(store.translateDownstream(tp, 20), 20); + assertEquals(store.translateDownstream(tp, 20), 20, + "Failure in resetting translation of upstream offset"); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java index 33f3ab0d2be51..dc7efe291af83 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java @@ -32,8 +32,11 @@ public void testSerde() { byte[] value = offsetSync.recordValue(); ConsumerRecord record = new ConsumerRecord<>("any-topic", 6, 7, key, value); OffsetSync deserialized = OffsetSync.deserializeRecord(record); - assertEquals(offsetSync.topicPartition(), deserialized.topicPartition()); - assertEquals(offsetSync.upstreamOffset(), deserialized.upstreamOffset()); - assertEquals(offsetSync.downstreamOffset(), deserialized.downstreamOffset()); + assertEquals(offsetSync.topicPartition(), deserialized.topicPartition(), + "Failure on offset sync topic partition serde"); + assertEquals(offsetSync.upstreamOffset(), deserialized.upstreamOffset(), + "Failure on upstream offset serde"); + assertEquals(offsetSync.downstreamOffset(), deserialized.downstreamOffset(), + "Failure on downstream offset serde"); } }