AtomicKafka is a lightweight NPM Package developed to simplify the process of establishing bidirectional, real-time data streaming with Apache Kafka in your web-app.
Website | Library | Demo Apps | Featured on Medium
- Websocket connections between the client and the server that accept user-defined event strings and callbacks
- Broker initialization and connection to Apache Kafka
- Consumer and Producer classes are predefined to be as modular as possible
- Consumer functions accept user-defined callback functions to support lightweight stream processing
- React Hook that throttles the websocket event listener with a time interval to maintain client performance
- Supports multiple Kafka streams
AtomicKafka currently supports running Apache Kafka clusters either using a Docker image or by connecting to Confluent Cloud.
Docker:
- Download this .yml and run the following command in your terminal:
docker-compose up -d
Confluent Cloud:
- Follow the steps on Confluent Cloud to create a free account with Confluent cloud. Obtain the API_ACCESS_KEY, API_ACCESS_SECRET, and BOOTSTRAP_SERVER
Include the following lines in your .env depending on your Kafka environment. Set the PORT variable to the port where AtomicKafkaServer will be initialized in the next step.
- Docker .env config: (API_KEY and API_SECRET are intentionally left blank)
PORT=<USER_DEFINED> API_KEY= API_SECRET= KAFKA_BOOTSTRAP_SERVER=localhost:9092
- Confluent Cloud .env config: (PORT intentionally left blank)
PORT= API_KEY=<API_ACCESS_KEY> API_SECRET=<API_ACCESS_SECRET> KAFKA_BOOTSTRAP_SERVER=<BOOTSTRAP_SERVER>
$ npm install atomic-kafka
Initialize a server instance of your choice (HTTP, Node.js, etc). The example below contemplates a Node.js Express server.
ATTENTION: a Server instance must be created for every remote AtomicKafkaClient.
- Initialize and configure expressApp according to desired specifications.
- Require in AtomicKafkaServer.
- Define a server that listens on the user-defined PORT environment variable.
- Initialize an AtomicKafkaServer instance aks by passing in the server.
/* initialize and configure Node.js expressApp according to user specifications
then add the following: */
const AtomicKafkaServer = require('atomic-kafka/server');
const server = expressApp.listen(process.env.PORT, () => {
console.log(`Listening on port ${process.env.PORT}`);
})
const aks = new AtomicKafkaServer(server);
- Initialize a newConsumer on the aks instance and pass in the group_ID_string.
- Enable the built-in websocket by invoking socketConsume and passing in the group_ID_string, an event_string, and the topic_string.
/* AKS_Producer_Init */
aks.newProducer('topic');
aks.globalProduce('produceMessageEvent', 'topic');
- Initialize a newProducer on the aks instance and pass in the topic_string.
- Enable the built-in websocket by invoking globalProducer and passing in an event_string and the topic_string.
/* AKS_Consumer_Init */
aks.newConsumer('group_ID');
aks.socketConsume('group_ID', 'consumeMessageEvent', 'topic');
/* in your React.jsx Component */
import AtomicKafkaClient from 'atomic-kafka/client';
/* in your TypeScript React Component */
declare function require(name:string);
const AtomicKafkaClient = require('atomic-kafka/client').default;
- Initialize akc as an AtomicKafkaClient. Pass in AtomicKafkaServer instance host's URI_STRING
- Define a callback to process message payload through the React state management tool of your choice.
- Implement useInterval to consume from the kafka cluster on interval.
- Return the invocation of the consumer function on the akc instance. Pass in a user-defined websocket event_string, the previously defined callback, and the interval_delay in milliseconds.
function ConsumerComponent() {
const akc = new AtomicKafkaClient('ATOMIC_KAFKA_SERVER_URI_STRING');
const callback = (payload) => {
/* user-provided data stream processing function definition
that effects state change */
}
/* Throttles message consumption. Interval in milliseconds,
can be any number */
akc.useInterval(() => akc.consumer('consumeMessageEvent', callback), 4000);
}
- Initialize akc as an AtomicKafkaClient. Pass in AtomicKafkaServer instance host's URI_STRING
- Generate a payload formatted as an arbitrarily-nested JSON object. The example below defines a payload, but it can be generated at any point in the client according to the user's specification.
- Invoke the consumer function. Pass in the websocket event_string and the payload.
function ProducerComponent() {
const akc = new AtomicKafkaClient('ATOMIC_KAFKA_SERVER_URI_STRING');
const payload = {
/* Data to be sent to the cluster. Arbitrarily-nested JSON format.
Can be defined anywhere in the app. */
}
akc.producer('produceMessageEvent', payload);
}
We want this open-sourced project to continue to improve. If you would like to make a contribution to AtomicKafka, please fork this repo, add your awesome changes to a well-named feature branch of this repository, and make a pull request. We look forward to your input! And if you want to support AtomicKafka, please click on the ⭐ button for us!
Nikhil Massand
This product is released under the MIT License