Skip to content

Commit

Permalink
add uite test
Browse files Browse the repository at this point in the history
  • Loading branch information
Zihan Li committed Oct 24, 2023
1 parent 1119f9c commit e9f2620
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
public class KafkaAuditCountVerifierTest {

public static final String SOURCE_TIER = "gobblin";
public static final String REFERENCE_TIERS = "producer";
public static final String REFERENCE_TIER = "producer";
public static final String REFERENCE_TIER_1 = "producer_reference";
public static final String REFERENCE_TIERS = REFERENCE_TIER + "," + REFERENCE_TIER_1;

public static final String TOTAL_COUNT_REF_TIER_0 = "producer_0";
public static final String TOTAL_COUNT_REF_TIER_1 = "producer_1";
Expand All @@ -50,7 +52,8 @@ public void testFetch() throws IOException {
// All complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 1000L,
REFERENCE_TIERS, 1000L
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L
));
// Default threshold
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
Expand All @@ -59,15 +62,17 @@ public void testFetch() throws IOException {
// 99.999 % complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIERS, 1000L
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L
));
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));

// <= 99% complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 990L,
REFERENCE_TIERS, 1000L
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L
));
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
Expand All @@ -86,7 +91,8 @@ public void testTotalCountCompleteness() throws IOException {
// All complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 1000L,
REFERENCE_TIERS, 1000L,
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
Expand All @@ -97,7 +103,8 @@ public void testTotalCountCompleteness() throws IOException {
// 99.999 % complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIERS, 1000L,
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
Expand All @@ -107,7 +114,8 @@ public void testTotalCountCompleteness() throws IOException {
// <= 99% complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 990L,
REFERENCE_TIERS, 1000L,
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
Expand Down Expand Up @@ -140,7 +148,8 @@ public void testEmptyAuditCount() throws IOException {
client.setTierCounts(
ImmutableMap.of(
SOURCE_TIER, 990L,
REFERENCE_TIERS, 0L,
REFERENCE_TIER, 0L,
REFERENCE_TIER_1, 0L,
TOTAL_COUNT_REF_TIER_0, 0L,
TOTAL_COUNT_REF_TIER_1, 0L
));
Expand All @@ -153,7 +162,8 @@ public void testEmptyAuditCount() throws IOException {
client.setTierCounts(
ImmutableMap.of(
SOURCE_TIER, 0L,
REFERENCE_TIERS, 0L,
REFERENCE_TIER, 0L,
REFERENCE_TIER_1, 0L,
TOTAL_COUNT_REF_TIER_0, 0L,
TOTAL_COUNT_REF_TIER_1, 0L
));
Expand All @@ -175,7 +185,8 @@ public void testOneCountFailed() throws IOException {
// Missing total count tier which will throw exception
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIERS, 1000L
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L
));

// Classic completeness is still returned, but total is missing
Expand All @@ -184,4 +195,42 @@ public void testOneCountFailed() throws IOException {
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.containsKey(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
}

public void testDifferentValueInReferenceTier() throws IOException {
final String topic = "testTopic";
State props = new State();
props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS, TOTAL_COUNT_REFERENCE_TIERS);
props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
TestAuditClient client = new TestAuditClient(props);
KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, client);

// Different value in reference tier
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 2000L
));

// Classic completeness is fail as 999/2000 < 99.9%
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));

// Different value in reference tier and one tier has 0 in count
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIER, 0L,
REFERENCE_TIER_1, 2000L
));

// Classic completeness is fail as 999/2000 < 99.9%
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));



}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@
@Slf4j
public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
public static final String GOBBLIN_KAFKA_PREFIX = "gobblin.kafka.";
private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 30;
public static final String DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY = GOBBLIN_KAFKA_PREFIX + "default.num.topic.partitions.per.container";
private static final int DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;

//A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that a
//A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that agit
//single JVM can consume from Kafka for a single topic and controls the number of partitions of a topic that will be
// packed into a single workunit. For example, if the container capacity is set to 10, and each topic partition has a
// weight of 1, then 10 partitions of the topic will be packed into a single workunit. This configuration is topic-independent
Expand Down Expand Up @@ -294,7 +295,7 @@ private void addStatsToWorkUnits(Map<String, List<WorkUnit>> workUnitsByTopic) t

private Double getDefaultWorkUnitSize() {
return state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER;
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
}

/**
Expand Down

0 comments on commit e9f2620

Please sign in to comment.