Skip to content
This repository has been archived by the owner on Nov 24, 2021. It is now read-only.

Kafka Integration #99

Open
afoerster opened this issue Feb 11, 2019 · 3 comments
Open

Kafka Integration #99

afoerster opened this issue Feb 11, 2019 · 3 comments
Assignees

Comments

@afoerster
Copy link
Contributor

afoerster commented Feb 11, 2019

This story includes all work associated with getting Kafka integration working. It can be broken out as needed into smaller pieces.

Add kafka arguments to LogCollectorCliParser

LogCollectorCliParser

Here is a rough cut:

class LogCollectorCliParser(args: Seq[String]) extends ScallopConf(args) {
  lazy val port = opt[Int]("port", required = false, descr = "Listening port")
  lazy val zkHosts = opt[String]("zk-hosts", required = true, descr = "Zookeeper hosts")
  lazy val topic = opt[String]("topic", required = false, descr = "Kafka Topic")
  lazy val mode = opt[String]("consume-mode", required = false, descr = "'http' or 'kafka'", default = "http")

  verify()
}
}

Verify a mode is chosen and it is valid. To keep backward compatibility, if no mode is chosen it should default to 'http'.

Verify 'port' is chosen with the http listen mode and 'topic' is provided in the kafka listen mode.

Expose Kafka

Branch on the new 'listen-mode' in LogCollector.scala to start listening to the kafka topic

Integrate new arguments into control.sh

Add environment variables in control.sh for the new arguments.
Create a script in the bin directory to run the kafka consume mode by calling control.sh. This will make it easy to test changes to the scripts and arguments.

Create a test producer

Create a test producer that will put events onto a topic that will then be read by the kafka consumer. The test producer will make it easy to run the kafka consumer outside of unit tests but not in full production

Integrate new arguments into service.sdl

Service.sdl is the configuration file for the CSD https://github.com/cloudera/cm_ext/wiki/Service-Descriptor-Language-Reference

There should be at least two new arguments for consume-mode and topic.
Listen mode should default to http.

Deploy the CSD to Valhalla and test

Test all changes with the CSD and new parcel deployed on a test cluster.

I have scripts for this that are not yet committed, hopefully by the time this is reached they will be.

Document changes

Add a page to the docs dir describing usage and limitations. Register it in mkdocs.yml

@afoerster
Copy link
Contributor Author

Adding kafka properties:

  • Add a kafka properties file argument --kafka-properties
  • The properties should be mutually dependent the kafka consume argument (not sure what it was called)
  • The kafka properties file should be read into a Java properties file (https://docs.oracle.com/javase/7/docs/api/java/util/Properties.html)
  • It might be the case that the kafka Properties file in their api is already a java.util.Properties, in that case you can just initialize it with a file path. Otherwise you can loop through the java.util.properties and add it to the Kafka Properties

safwanislam added a commit that referenced this issue May 1, 2019
@safwanislam
Copy link
Contributor

In PulseKafkaConsumer.scala - Add proper error handling when an application isn't supplied. We should just throw the message away if this is the case

afoerster pushed a commit that referenced this issue May 8, 2019
@safwanislam
Copy link
Contributor

safwanislam commented Aug 8, 2019

Instructions I found from earlier that I wanted to save here:

service.sdl

- name log-collector-kafka (upper)
- argument for kafka
- make sure kafka arguments/port argument have defaults if they are used as env variables
`make` in cloudera-integration/csd

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants