-
Notifications
You must be signed in to change notification settings - Fork 20
Config File
Currently, you will need a file called config.yml from which the config properties will be read. This file follows a YAML format. You can find more information about YAML on its official specification.
The following code is an example of a basic config file written in YAML:
rest_uri: http://127.0.0.1:8888
state_file: /home/rbcep/state.json
sources:
- name: kafka
class: net.redborder.cep.sources.kafka.KafkaSource
properties:
zk_connect: zknode:2181
sinks:
- name: syslog
class: net.redborder.cep.sinks.syslog.SyslogSink
- name: kafka
class: net.redborder.cep.sinks.kafka.KafkaSink
properties:
kafka_brokers: rbfgawnoirli.redborder.cluster:9092
parsers:
json: net.redborder.cep.sources.parsers.JsonParser
streams:
streamName:
source: kafka
parser: json
attributes:
attributeOne: long
attributeTwo: string
attributeThree: int
streamName2:
source: kafka
parser: json
attributes:
attributeOne: float
attributeTwo: double
attributeThree: bool
The following properties are available:
Property name | value |
---|---|
rest_uri | The URI from which the REST API will listen |
state_file | A file path that will be served as an state file (more about that below). |
streams | A hash with every stream that could be used as input and its attributes |
The state file is an snapshot of the current list of rules that are active on the system. Every time you add or remove a rule, the state file is updated. When the system starts, the state file is read and the rules that are present on it are automatically loaded and started.
If you want to use a stream from source as an input on your rules, first you have to declare it as a stream on the config file. You should specify three important properties: the source, the parser and the attributes
The source indicates from where the events are received, for example Kafka, HTTP or TCP connections.
Current sources:
parser | value |
---|---|
Kafka | net.redborder.cep.sources.kafka.KafkaSource |
You can implement your own source if you wish. To do so, you only have to implement the Source Abstract Class. It has four methods:
/**
This method is called when the instance is builded. You can initialize variables or some instance that you need.
*/
public abstract void prepare();
/**
This method configure all the stream, which are configure with this source.
*/
public abstract void addStreams(String ... streamName);
/**
This method start all configured stream on this source.
*/
public abstract void start();
/**
This method is called when the CEP is shutting down and close the source.
*/
public abstract void shutdown();
You can find a implement of the Source class on this link: KafkaSource. Also, you must define a K/V property on the parsers section on config file, like this:
sources:
- name: kafka
class: net.redborder.cep.sources.kafka.KafkaSource
properties:
zk_connect: zknode:2181
- name is the source name.
- class is the java class that is instance when you use this source.
- properties is a hash that is passed on the source and it is used to configure the source.
Sinks represent where the output goes to. So far available Sinks are: Console, Kafka and Syslog. You can implement your own sink if you wish. To do so, you only have to extend the Sink Abstract Class.
Available sinks are:
Name | Class |
---|---|
Console | net.redborder.cep.sinks.console.ConsoleSink |
Kafka | net.redborder.cep.sinks.kafka.KafkaSink |
Syslog | net.redborder.cep.sinks.syslog.SyslogSink |
Sinks can have properties as well. So far the only sink that requires a property is the KafkaSink:
Property name | value |
---|---|
kafka_brokers | Comma separated list of Kafka hosts' addresses, specified as : |
The parser tells the system the kind of events that are going to arrive from that topic. Currently the only parser implemented is the Json parser, but more parser will be implemented in the future, like a XML parser or a binary parser.
Current parsers:
parser | value |
---|---|
Json | net.redborder.cep.sources.parsers.JsonParser |
You can implement your own parser if you wish. To do so, you only have to implement the Parser interface. It only has one method, which receives an string (which is the raw event from kafka) and returns a map, where the key are strings and the value are objects. Also, you must define a K/V property on the parsers section on config file.
parsers:
json: net.redborder.cep.sources.parsers.JsonParser
Siddhi needs to know which attributes are present on each one of your streams to execute your execution plans. The attributes property lets you specify just that: the attributes (or fields) on your events and its types.
The allowed attributes types are the following:
- string
- str
- integer
- int
- long
- float
- double
- bool
- object
As an example, if you want to add a kafka stream called "clicks" as an input and this topic receive events like this one:
{ "clicks": 34, "from": "mywebsite.com" }
You will need to declare it in the config file as following:
- name: kafka
class: net.redborder.cep.sources.kafka.KafkaSource
properties:
zk_connect: zknode:2181
parsers:
json: net.redborder.cep.sources.parsers.JsonParser
streams:
clicks:
source: kafka
parser: json
attributes:
clicks: integer
from: string