This repository contains an example flow, subtasks, and a Cortex-Kafka-Gateway sample implementation to connect Cortex to a kafka broker.
This provides the connectivity between Cortex and a Kafka broker. The Cortex-Kafka Gateway performs the following activities:
- Subscribes to one or more Kafka topics available on the Kafka broker, and when a message is received starts an instance of a flow in Cortex
- Listens on a port for a HTTP JSON message and posts it to the appropriate Kafka topic
This implementation is provided as a set of Node.js source files, and is for demonstration purposes only. It is not recommended as is for production use. The implementation will need to be un-zipped and then built using the command docker build . (with any additional parameters you may require. It can then be executed in a docker container - remember to map appropriately the docker host ports to the docker container ports.
The file at config/config.json defines the configuration for the gateway and is split into three sections:
This section defines how the Cortex-Kafka-gateway interacts with the Kafka broker. It has the following fields:
- clientId: The name of the instance of the Cortex-Kafka Gateway
- brokers: An array of objects representing Kafka broker addresses; each object has fields host and port
- security: a structure containing a single element, SSL which takes values true or false
- consumer: a structure containing two fields: -- groupId: A group Id -- topics: a list of objects; each object has fields topic (the name of the topic) and (optionally) fromBeginning
This section defines how the Cortex-Kafka Gateway interacts with the Cortex Intelligent Automation platform when a Kafka message is received. It has the following fields:
- api a structure defining the Cortex Flow API configuration to use to initiate a flow. It has fields server, https, port, async, contentType and verb
- authorization: a structure containing the authorisation to use on the Cortex Flow API. It has fields type, username and password
- flow: the name of the Cortex flow to execute; this must have a global structure variable named ($)Message
- initiator: the owner of the Cortex flow execution
This section defines the HTTP REST API interface that Cortex will use to post messages to a Kafka topic. It has the following fields:
- port: the local port number on which the Cortex-Kafka Gateway will listen
- path: the initial path used
This subtask will send a message to a Kafka topic (using the Cortex-Kafka Gateway).
Name | Type | M/O | Description |
---|---|---|---|
i_Message | Text | O | The message to send. Note that one of i_Message, i_Message-Structure and i_Message-List must be provided |
i_Message-Structure | Sructue | O | The message structure to send; this can be created using the subtask CTX-Kafka-Create-Message-Structure. Note that one of i_Message, i_Message-Structure and i_Message-List must be provided |
i_Message-List | List | O | A list of message structures to send; each structure can be created using the subtask CTX-Kafka-Create-Message-Structure. Note that one of i_Message, i_Message-Structure and i_Message-List must be provided |
i_Gateway-base-url | Text | M | The Url (including the port) for the Cortex-Kafka Gateway |
i_Topic | Text | M | The Kafka topic to which the message is posted |
i_Timeout | Integer | O | The time to await a response in ms |
i_acks | Integer | O | Controls the number of required acks: -1 = all insync replicas must acknowledge; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge |
i_compression | Text | O | The name of the Kafka compression codec to use |
This subtask returns a structure, o_result with fields CODE and REASON
This subtask creates a message strcture which can be used in the i_Message-Structure or i_Message-List input parameters of the CTX-Kafka-Send-Message subtask.
Name| Type| M/O | Description| i_Message|Text|M|The message to send. i_key|Text|O|The key for the message i_partition|Text|O|The partition for the message
This subtask returns a structure, o_message-structure
This flow is an example flow which can be called by the Cortex-kafka Gateway when a message is receved on a subscribed topic. The flow simply logs the message and then posts a message to a different Kafka topic using the CTX-Kafka-Send-Message subtask.
This flow uses the [CTX_Configuration-Store](https://github.com/CortexIntelligentAutomation/CTX-Configuration-Store() area Kafka-gateway which has the following parameters:
- URL The URL (including port) on which the Cortex-Kafka Gateway is listening
- Default-topic The Kafka topic to which the response message is posted
Download the Studio Package file and Import it into your Cortex Environment. Don't forget to apply rights using the Studio Authorization module.
This library depends upon the following libraries:
👍 Enjoy! 😉