From e1b2bf945b30229385136dcc6b94791ac1188e87 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Fri, 22 Apr 2022 17:31:16 +0200 Subject: [PATCH] Revamp README --- README.md | 237 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 185 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index a382ec1..f5c8282 100644 --- a/README.md +++ b/README.md @@ -1,90 +1,219 @@ # xk6-kafka -This project is a k6 extension that can be used to load test Kafka, using a producer. Per each connection to Kafka, many messages can be sent. These messages are an array of objects containing a key and a value. There is also a consumer for testing purposes, i.e. to make sure you send the correct data to Kafka. The consumer is not meant to be used for testing Kafka under load. The extension supports producing and consuming messages in Avro format, given a schema for key and/or value. +This k6 extension provides the ability to load test Kafka using a producer. You can send many messages with each connection to Kafka. These messages are an array of objects containing a key and a value. There is also a consumer for testing purposes, that is, to make sure you send the correct data to Kafka, but it is not meant to be used for testing Kafka under load. There is support for producing and consuming messages in many formats using various serializers and deserializers. It can fetch schema from Schema Registry and also accepts hard-coded schema. Compression is also supported. -The real purpose of this extension is not only to test Apache Kafka, but also the system you've designed that uses Apache Kafka. So, you can test your consumers, and hence your system, by auto-generating messages and sending them to your system via Apache Kafka. +The real purpose of this extension is to test Apache Kafka and the system you've designed that uses Apache Kafka. So, you can test your consumers, and hence your system, by auto-generating messages and sending them to your system via Apache Kafka. -In order to build the source, you should have the latest version of Go installed. The latest version should match [k6](https://github.com/grafana/k6#build-from-source) and [xk6](https://github.com/grafana/xk6#requirements). I recommend you to have [gvm](https://github.com/moovweb/gvm) installed. +To build the source, you should have the latest version of Go installed. The latest version should match [k6](https://github.com/grafana/k6#build-from-source) and [xk6](https://github.com/grafana/xk6#requirements). I recommend you to have [gvm](https://github.com/moovweb/gvm) installed. If you want to learn more about the extension, visit [How to Load Test Your Kafka Producers and Consumers using k6](https://k6.io/blog/load-test-your-kafka-producers-and-consumers-using-k6/) article on the k6 blog. ## Supported Features -* Produce/consume messages in JSON and Avro format (custom schema) -* Authentication with SASL PLAIN and SCRAM -* Create and list topics -* Support for user-provided Avro key and value schemas -* Support for loading Avro schemas from Schema Registry -* Support to consume from all partitions with group ID -* Support Kafka message compression: Gzip, Snappy, Lz4 & Zstd +- Produce/consume messages as String, ByteArray, JSON, and Avro format (custom schema) +- Authentication with SASL PLAIN and SCRAM +- Create and list topics +- Support for user-provided Avro key and value schemas +- Support for loading Avro schemas from Schema Registry +- Support for byte array for binary data (from binary protocols) +- Support consumption from all partitions with a group ID +- Support Kafka message compression: Gzip, Snappy, Lz4 & Zstd ## Build -To build a `k6` binary with this extension, first ensure you have the prerequisites: +The k6 binary can be built on various platforms, and each platform has its own set of requirements. The following shows how to build k6 binary with this extension on GNU/Linux distributions. -- [gvm](https://github.com/moovweb/gvm) -- [Git](https://git-scm.com/) +### Prerequisites -Then, install [xk6](https://github.com/grafana/xk6) and build your custom k6 binary with the Kafka extension: +- [gvm](https://github.com/moovweb/gvm) for easier installation and management of Go versions on your machine +- [Git](https://git-scm.com/) for cloning the project +- [xk6](https://github.com/grafana/xk6) for building k6 binary with extensions -1. Install `xk6`: - ```shell - $ go install go.k6.io/xk6/cmd/xk6@latest - ``` +### Install and build -2. Build the binary: - ```shell - $ xk6 build --with github.com/mostafa/xk6-kafka@latest - ``` +Feel free the first two steps if you already have Go installed. -Note: you can always use the latest version of k6 to build the extension, but the earliest version of k6 that supports extensions via xk6 is v0.32.0. +1. Install gvm by following its [installation guide](https://github.com/moovweb/gvm#installing). +2. Install the latest version of Go using gvm. You need Go 1.4 installed for bootstrapping into higher Go versions, as explained [here](https://github.com/moovweb/gvm#a-note-on-compiling-go-15). +3. Install `xk6`: -## Run & Test + ```shell + go install go.k6.io/xk6/cmd/xk6@latest + ``` -First, you need to have your Kafka development environment setup. I recommend you to use [Lenses.io fast-data-dev Docker image](https://github.com/lensesio/fast-data-dev), which is a complete Kafka setup for development that includes: Kafka, Zookeeper, Schema Registry, Kafka-Connect, Landoop Tools, 20+ connectors. It is fairly easy to setup, if you have Docker installed. Just make sure to monitor Docker logs to have a working setup, before attempting to test. Initial setup, leader election and test data ingestion takes time. +4. Build the binary: -### Development Environment + ```shell + xk6 build --with github.com/mostafa/xk6-kafka@latest + ``` -Run the Kafka environment and expose the container ports: +**Note:** you can always use the latest version of k6 to build the extension, but the earliest version of k6 that supports extensions via xk6 is v0.32.0. The xk6 is constantly evolving, so some APIs may not be backward compatible. -```bash -sudo docker run -d --rm --name lenseio -p 2181:2181 -p 3030:3030 \ - -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 \ - -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev -``` +## Examples -After running the command, visit [localhost:3030](http://localhost:3030) to get into the fast-data-dev environment. +There are lots of examples in the [script](./scripts/) directory that show how to use various features of the extension. -You can run the command to see the container logs: +## Run and Test -```bash -sudo docker logs -f -t lensesio -``` +You can start testing your own environment right away, but it takes some time to develop the script, so it would better to test your script against a development environment, and then start testing your own environment. + +### Development environment + +I recommend the [fast-data-dev](https://github.com/lensesio/fast-data-dev) Docker image by Lenses.io, a Kafka setup for development that includes Kafka, Zookeeper, Schema Registry, Kafka-Connect, Landoop Tools, 20+ connectors. It is relatively easy to set up if you have Docker installed. Just monitor Docker logs to have a working setup before attempting to test because the initial setup, leader election, and test data ingestion take time. + +1. Run the Kafka environment and expose the container ports: + + ```bash + sudo docker run \ + --detach --rm \ + --name lensesio \ + -p 2181:2181 \ + -p 3030:3030 \ + -p 8081-8083:8081-8083 \ + -p 9581-9585:9581-9585 \ + -p 9092:9092 \ + -e ADV_HOST=127.0.0.1 \ + lensesio/fast-data-dev + ``` + +2. After running the command, visit [localhost:3030](http://localhost:3030) to get into the fast-data-dev environment. + +3. You can run the command to see the container logs: + + ```bash + sudo docker logs -f -t lensesio + ``` > If you have errors running the Kafka development environment, refer to the [fast-data-dev documentation](https://github.com/lensesio/fast-data-dev). +### The xk6-kafka API + +All the exported functions are available by importing them from `k6/x/kafka`. They are subject to change in the future versions when a new major version is released. These are the exported functions: + +
+ The JavaScript API + +```javascript +/** + * Create a new Writer object for writing messages to Kafka. + * + * @constructor + * @param {[string]} brokers An array of brokers. + * @param {string} topic The topic to write to. + * @param {string} auth: The authentication credentials for SASL PLAIN/SCRAM. + * @param {string} compression The Compression algorithm. + * @returns {object} A Writer object. + */ +function writer(brokers: [string], topic: string, auth: string, compression: string) => object {} + +/** + * Write a sequence of messages to Kafka. + * + * @function + * @param {object} writer The writer object created with the writer constructor. + * @param {[object]} messages An array of message objects containing an optional key and a value. + * @param {string} keySchema An optional Avro schema for the key. + * @param {string} valueSchema An optional Avro schema for the value. + * @returns {string} A string containing the error. + */ +function produce(writer: object, messages: [object], keySchema: string, valueSchema: string) => string {} + +/** + * Write a sequence of messages to Kafka with a specific serializer/deserializer. + * + * @function + * @param {object} writer The writer object created with the writer constructor. + * @param {[object]} messages An array of message objects containing an optional key and a value. + * @param {string} configurationJson Serializer, deserializer and schemaRegistry configuration. + * @param {string} keySchema An optional Avro schema for the key. + * @param {string} valueSchema An optional Avro schema for the value. + * @returns {string} A string containing the error. + */ +function produceWithConfiguration(writer: object, messages: [object], configurationJson: string, keySchema: string, valueSchema: string) => string {} + +/** + * Create a new Reader object for reading messages from Kafka. + * + * @constructor + * @param {[string]} brokers An array of brokers. + * @param {string} topic The topic to read from. + * @param {number} partition The partition. + * @param {number} groupID The group ID. + * @param {number} offset The offset to begin reading from. + * @param {string} auth Authentication credentials for SASL PLAIN/SCRAM. + * @returns {object} A Reader object. + */ +function reader(brokers: [string], topic: string, partition: number, groupID: string, offset: number, auth: string) => object {} + +/** + * Read a sequence of messages from Kafka. + * + * @function + * @param {object} reader The reader object created with the reader constructor. + * @param {number} limit How many messages should be read in one go, which blocks. Defaults to 1. + * @param {string} keySchema An optional Avro schema for the key. + * @param {string} valueSchema An optional Avro schema for the value. + * @returns {string} A string containing the error. + */ +function consume(reader: object, limit: number, keySchema: string, valueSchema: string) => string {} + +/** + * Read a sequence of messages from Kafka. + * + * @function + * @param {object} reader The reader object created with the reader constructor. + * @param {number} limit How many messages should be read in one go, which blocks. Defaults to 1. + * @param {string} configurationJson Serializer, deserializer and schemaRegistry configuration. + * @param {string} keySchema An optional Avro schema for the key. + * @param {string} valueSchema An optional Avro schema for the value. + * @returns {string} A string containing the error. + */ +function consumeWithConfiguration(reader: object, limit: number, configurationJson: string, keySchema: string, valueSchema: string) => string {} + +/** + * Create a topic in Kafka. It does nothing if the topic exists. + * + * @function + * @param {string} address The broker address. + * @param {string} topic The topic name. + * @param {number} partitions The number of partitions. + * @param {number} replicationFactor The replication factor in a clustered setup. + * @param {string} compression The compression algorithm. + * @returns {string} A string containing the error. + */ +function createTopic(address: string, topic: string, partitions: number, replicationFactor number, compression string) => string {} + +/** + * List all topics in Kafka. + * + * @function + * @param {string} address The broker address. + * @returns {string} A nested list of strings containing a list of topics and the error (if any). + */ +function listTopics(address string) => [[string], string] {} +``` -### k6 Test +
-The following k6 test script is used to test this extension and Apache Kafka in turn. The script is available as `test_.js` with more code and commented sections. The scripts have 4 parts: +### k6 Test Script -1. The __imports__ at the top shows the exposed functions that are imported from k6 and the extension, `check` from k6 and the `writer`, `produce`, `reader`, `consume` from the extension using the `k6/x/kafka` extension loading convention. -2. The __Avro schema__ defines a key and a value schema that are used by both producer and consumer, according to the [Avro schema specification](https://avro.apache.org/docs/current/spec.html). These are defined in the `test_avro.js` script. -3. The __Avro/JSON message producer__: - 1. The `writer` function is used to open a connection to the bootstrap servers. The first argument is an array of strings that signifies the bootstrap server addresses and the second is the topic you want to write to. You can reuse this writer object to produce as many messages as you want. This object is created in init code and is reused in the exported default function. - 2. The `produce` function is used to send a list of messages to Kafka. The first argument is the `producer` object, the second is the list of messages (with key and value). The third and the fourth arguments are the key schema and value schema in Avro format, if Avro format is used. If the schema are not passed to the function for either the key or the value, the values are treated as normal strings. Use an empty string, `""` if either of the schema is Avro and the other is going to be a string. - The produce function returns an `error` if it fails. The check is optional, but `error` being `undefined` means that `produce` function successfully sent the message. +The example scripts are available as `test_.js` with more code and commented sections in the [scripts](./scripts/) directory. The scripts usually have 4 parts: + +1. The __imports__ at the top show the exported functions from the Go extension and k6. +2. The __Avro schema__ defines a key and a value schema that are used by both producer and consumer, according to the [Avro schema specification](https://avro.apache.org/docs/current/spec.html). These are defined in the [test_avro.js](./scripts/test_avro.js) script. +3. The __message producer__: + 1. The `writer` function is used to open a connection to the bootstrap servers. The first argument is an array of strings that signifies the bootstrap server addresses, and the second is the topic you want to write to. You can reuse this writer object to produce as many messages as possible. This object is created in init code and is reused in the exported default function. + 2. The `produce` function sends a list of messages to Kafka. The first argument is the `producer` object, and the second is the list of messages (with key and value). The third and the fourth arguments are the key schema and value schema in Avro format if Avro format is used. The values are treated as normal strings if the schema are not passed to the function for either the key or the value. Use an empty string, `""` if either of the schema is Avro and the other will be a string. You can use the `produceWithConfiguration` function to pass separate serializer, deserializer, and schema registry settings, as shown in the [test_avro_with_schema_registry js](./scripts/test_avro_with_schema_registry.js) script. The produce function returns an `error` if it fails. The check is optional, but `error` being `undefined` means that `produce` function successfully sent the message. 3. The `producer.close()` function closes the `producer` object (in `tearDown`). -4. The __Avro/JSON message consumer__: - 1. The `reader` function is used to open a connection to the bootstrap servers. The first argument is an array of strings that signifies the bootstrap server addresses and the second is the topic you want to reader from. This object is created in init code and is reused in the exported default function. - 2. The `consume` function is used to read a list of messages from Kafka. The first argument is the `consumer` object, the second is the number of messages to read in one go. The third and the fourth arguments are the key schema and value schema in Avro format, if Avro format is used. If the schema are not passed to the function for either the key or the value, the values are treated as normal strings. Use an empty string, `""` if either of the schema is Avro and the other is going to be a string. - The consume function returns an empty array if it fails. The check is optional, but it checks to see if the length of the message array is exactly 10. +4. The __message consumer__: + 1. The `reader` function is used to open a connection to the bootstrap servers. The first argument is an array of strings that signifies the bootstrap server addresses, and the second is the topic you want to read from. This object is created in init code and is reused in the exported default function. + 2. The `consume` function is used to read a list of messages from Kafka. The first argument is the `consumer` object, and the second is the number of messages to read in one go. The third and the fourth arguments are the key schema and value schema in Avro format, if Avro format is used. The values are treated as normal strings if the schema are not passed to the function for either the key or the value. Use an empty string, `""` if either of the schema is Avro and the other will be a string. You can use the `consumeWithConfiguration` function to pass separate serializer, deserializer, and schema registry settings, as shown in the [test_avro_with_schema_registry js](./scripts/test_avro_with_schema_registry.js) script. The consume function returns an empty array if it fails. The check is optional, but it checks to see if the length of the message array is exactly 10. 3. The `consumer.close()` function closes the `consumer` object (in `tearDown`). You can run k6 with the Kafka extension using the following command: ```bash -$ ./k6 run --vus 50 --duration 60s scripts/test_json.js +./k6 run --vus 50 --duration 60s scripts/test_json.js ``` And here's the test result output: @@ -151,10 +280,14 @@ $ docker exec -it lensesio bash (inside container)$ kafka-topics --create --topic xk6_kafka_json_topic --bootstrap-server localhost:9092 ``` -If you want to test SASL authentication, have a look at [this commmit message](https://github.com/mostafa/xk6-kafka/pull/3/commits/403fbc48d13683d836b8033eeeefa48bf2f25c6e), where I describe how to run a test environment. +If you want to test SASL authentication, have a look at [this commit message](https://github.com/mostafa/xk6-kafka/pull/3/commits/403fbc48d13683d836b8033eeeefa48bf2f25c6e), where I describe how to run a test environment. + +## Contributions, Issues and Feedback + +I'd be thrilled to receive contributions and feedback on this piece of software. You're always welcome to create an issue if you find one (or many). I'd do my best to address the issues. ## Disclaimer -This is a proof of concept, isn't supported by the k6 team, and may break in the future. USE AT YOUR OWN RISK! +This was a proof of concept, but seems to be used by some companies nowadays, but it still isn't supported by the k6 team, rather by [me](https://github.com/mostafa) personally, and the APIs may change in the future. USE AT YOUR OWN RISK! This work is licensed under the [Apache License 2.0](https://github.com/mostafa/xk6-kafka/blob/master/LICENSE).