Skip to content

Commit

Permalink
Create topic only on the first VU upon initialization
Browse files Browse the repository at this point in the history
Delete topics upon teardown
  • Loading branch information
mostafa committed May 4, 2022
1 parent 261726a commit 36bf95f
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 48 deletions.
15 changes: 14 additions & 1 deletion scripts/test_avro.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ const valueSchema = JSON.stringify({
],
});

createTopic(bootstrapServers[0], kafkaTopic);
if (__VU == 1) {
createTopic(bootstrapServers[0], kafkaTopic);
}

export default function () {
for (let index = 0; index < 100; index++) {
Expand Down Expand Up @@ -106,6 +108,17 @@ export default function () {
}

export function teardown(data) {
if (__VU == 1) {
// Delete the topic
const error = deleteTopic(address, kafkaTopic);
if (error === undefined) {
// If no error returns, it means that the topic
// is successfully deleted
console.log("Topic deleted successfully");
} else {
console.log("Error while deleting topic: ", error);
}
}
producer.close();
consumer.close();
}
17 changes: 15 additions & 2 deletions scripts/test_avro_no_key.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ const valueSchema = JSON.stringify({
],
});

createTopic(bootstrapServers[0], kafkaTopic);
if (__VU == 1) {
createTopic(bootstrapServers[0], kafkaTopic);
}

export default function () {
for (let index = 0; index < 100; index++) {
Expand Down Expand Up @@ -87,11 +89,22 @@ export default function () {
});

for (let index = 0; index < rx_messages.length; index++) {
console.debug('Received Message: ' + JSON.stringify(rx_messages[index]));
console.debug("Received Message: " + JSON.stringify(rx_messages[index]));
}
}

export function teardown(data) {
if (__VU == 1) {
// Delete the topic
const error = deleteTopic(address, kafkaTopic);
if (error === undefined) {
// If no error returns, it means that the topic
// is successfully deleted
console.log("Topic deleted successfully");
} else {
console.log("Error while deleting topic: ", error);
}
}
producer.close();
consumer.close();
}
26 changes: 16 additions & 10 deletions scripts/test_avro_with_schema_registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ const valueSchema = `{
var configuration = JSON.stringify({
consumer: {
keyDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
valueDeserializer:
"io.confluent.kafka.serializers.KafkaAvroDeserializer",
valueDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
},
producer: {
keySerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
Expand All @@ -71,7 +70,9 @@ var configuration = JSON.stringify({
},
});

createTopic(bootstrapServers[0], topic);
if (__VU == 1) {
createTopic(bootstrapServers[0], kafkaTopic);
}

export default function () {
for (let index = 0; index < 100; index++) {
Expand All @@ -98,19 +99,24 @@ export default function () {
});
}

let messages = consumeWithConfiguration(
consumer,
20,
configuration,
keySchema,
valueSchema
);
let messages = consumeWithConfiguration(consumer, 20, configuration, keySchema, valueSchema);
check(messages, {
"20 message returned": (msgs) => msgs.length == 20,
});
}

export function teardown(data) {
if (__VU == 1) {
// Delete the topic
const error = deleteTopic(address, kafkaTopic);
if (error === undefined) {
// If no error returns, it means that the topic
// is successfully deleted
console.log("Topic deleted successfully");
} else {
console.log("Error while deleting topic: ", error);
}
}
producer.close();
consumer.close();
}
36 changes: 18 additions & 18 deletions scripts/test_avro_with_schema_registry_no_key.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ const valueSchema = `{
var configuration = JSON.stringify({
consumer: {
keyDeserializer: "",
valueDeserializer:
"io.confluent.kafka.serializers.KafkaAvroDeserializer",
valueDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
},
producer: {
keySerializer: "",
Expand All @@ -49,7 +48,9 @@ var configuration = JSON.stringify({
},
});

createTopic(bootstrapServers[0], topic);
if (__VU == 1) {
createTopic(bootstrapServers[0], kafkaTopic);
}

export default function () {
for (let index = 0; index < 100; index++) {
Expand All @@ -61,35 +62,34 @@ export default function () {
}),
},
];
let error = produceWithConfiguration(
producer,
messages,
configuration,
null,
valueSchema
);
let error = produceWithConfiguration(producer, messages, configuration, null, valueSchema);
check(error, {
"is sent": (err) => err == undefined,
});
}

let rx_messages = consumeWithConfiguration(
consumer,
20,
configuration,
null,
valueSchema
);
let rx_messages = consumeWithConfiguration(consumer, 20, configuration, null, valueSchema);
check(rx_messages, {
"20 message returned": (msgs) => msgs.length == 20,
});

for (let index = 0; index < rx_messages.length; index++) {
console.debug('Received Message: ' + JSON.stringify(rx_messages[index]));
console.debug("Received Message: " + JSON.stringify(rx_messages[index]));
}
}

export function teardown(data) {
if (__VU == 1) {
// Delete the topic
const error = deleteTopic(address, kafkaTopic);
if (error === undefined) {
// If no error returns, it means that the topic
// is successfully deleted
console.log("Topic deleted successfully");
} else {
console.log("Error while deleting topic: ", error);
}
}
producer.close();
consumer.close();
}
17 changes: 15 additions & 2 deletions scripts/test_bytes.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ const kafkaTopic = "xk6_kafka_byte_array_topic";
const producer = writer(bootstrapServers, kafkaTopic);
const consumer = reader(bootstrapServers, kafkaTopic);

createTopic(bootstrapServers[0], kafkaTopic);
if (__VU == 1) {
createTopic(bootstrapServers[0], kafkaTopic);
}

var configuration = JSON.stringify({
producer: {
Expand All @@ -33,7 +35,7 @@ var configuration = JSON.stringify({
},
});

const payload = "byte array payload"
const payload = "byte array payload";

export default function () {
for (let index = 0; index < 100; index++) {
Expand Down Expand Up @@ -64,6 +66,17 @@ export default function () {
}

export function teardown(data) {
if (__VU == 1) {
// Delete the topic
const error = deleteTopic(address, kafkaTopic);
if (error === undefined) {
// If no error returns, it means that the topic
// is successfully deleted
console.log("Topic deleted successfully");
} else {
console.log("Error while deleting topic: ", error);
}
}
producer.close();
consumer.close();
}
11 changes: 11 additions & 0 deletions scripts/test_json.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ export default function () {
}

export function teardown(data) {
if (__VU == 1) {
// Delete the topic
const error = deleteTopic(address, kafkaTopic);
if (error === undefined) {
// If no error returns, it means that the topic
// is successfully deleted
console.log("Topic deleted successfully");
} else {
console.log("Error while deleting topic: ", error);
}
}
producer.close();
consumer.close();
}
24 changes: 16 additions & 8 deletions scripts/test_json_with_snappy_compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ const consumer = reader(bootstrapServers, kafkaTopic);

const replicationFactor = 1;
const partitions = 1;
// Create the topic or do nothing if the topic exists.
createTopic(
bootstrapServers[0],
kafkaTopic,
partitions,
replicationFactor,
compression
);

if (__VU == 1) {
// Create the topic or do nothing if the topic exists.
createTopic(bootstrapServers[0], kafkaTopic, partitions, replicationFactor, compression);
}

export default function () {
for (let index = 0; index < 100; index++) {
Expand Down Expand Up @@ -80,6 +77,17 @@ export default function () {
}

export function teardown(data) {
if (__VU == 1) {
// Delete the topic
const error = deleteTopic(address, kafkaTopic);
if (error === undefined) {
// If no error returns, it means that the topic
// is successfully deleted
console.log("Topic deleted successfully");
} else {
console.log("Error while deleting topic: ", error);
}
}
producer.close();
consumer.close();
}
10 changes: 9 additions & 1 deletion scripts/test_sasl_auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,15 @@ export default function () {

export function teardown(data) {
if (__VU == 1) {
deleteTopic(bootstrapServers[0], kafkaTopic, auth);
// Delete the topic
const error = deleteTopic(address, kafkaTopic);
if (error === undefined) {
// If no error returns, it means that the topic
// is successfully deleted
console.log("Topic deleted successfully");
} else {
console.log("Error while deleting topic: ", error);
}
}
producer.close();
consumer.close();
Expand Down
23 changes: 17 additions & 6 deletions scripts/test_topics.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@ list topics on all Kafka partitions and creates a topic.
*/

import {
createTopic,
listTopics
} from 'k6/x/kafka'; // import kafka extension
import { createTopic, listTopics } from "k6/x/kafka"; // import kafka extension

const address = "localhost:9092";
const kafkaTopic = "xk6_kafka_test_topic";

const results = listTopics(address)
const results = listTopics(address);
const error = createTopic(address, kafkaTopic);

export default function () {
results.forEach(topic => console.log(topic));
results.forEach((topic) => console.log(topic));

if (error === undefined) {
// If no error returns, it means that the topic
Expand All @@ -27,3 +24,17 @@ export default function () {
console.log("Error while creating topic: ", error);
}
}

export function teardown() {
if (__VU == 1) {
// Delete the topic
const error = deleteTopic(address, kafkaTopic);
if (error === undefined) {
// If no error returns, it means that the topic
// is successfully deleted
console.log("Topic deleted successfully");
} else {
console.log("Error while deleting topic: ", error);
}
}
}

0 comments on commit 36bf95f

Please sign in to comment.