-
Notifications
You must be signed in to change notification settings - Fork 897
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cluster Linking mirror topic priorities demo #265
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# Applying Different Throughput Priorities to Mirror Topics with a Custom Script | ||
|
||
This demo has a custom Node.JS script which uses the Cluster Linking lag REST API to detect for bandwidth issues and then pauses mirror topics, again via REST API, to prioritize traffic from certain topics over others. | ||
|
||
# Video Demo | ||
|
||
Watch a video demo and in-depth explanation here: https://drive.google.com/file/d/1hyy5Wazg8chzRkrfw88NBvpFDuUZ8rjV/view?usp=sharing | ||
|
||
# Setup | ||
|
||
This demo assumes: | ||
|
||
## Source Cluster and Destination Cluster | ||
|
||
You're running a source cluster and a destination cluster where the network bandwidth can be limited between the two of them. These clusters can be any Confluent clusters that support Cluster Linking. | ||
|
||
In the video demo, the source cluster was a single broker CP 7.5 cluster running on the demoer's laptop. It was launched using these simple commands: | ||
|
||
* `zookeeper-server-start simple-zookeeper.properties` | ||
* `kafka-server-start simple-server.properties` | ||
|
||
In the video demo, the destination cluster was a Confluent Cloud cluster, Dedicated, with Internet Networking. This can be provisioned at confluent.cloud | ||
|
||
## Cluster Link | ||
|
||
There should be a cluster link between the two clusters. It can be a regular cluster link, a bidirectional cluster link, or a source-initiated cluster link. It can have a mirror topic prefix--as the one did in the demo--but it doesn't have to. | ||
|
||
In the demo, the cluster link was a source-initiated link. It was created by: | ||
|
||
1. Creating a cluster link called `laptop_outpost_1` (name of your choosing) in the Confluent Cloud Console at confluent.cloud, and selecting "Confluent Platform Cluster" + "source-initiated link" as the source cluster. The command `./kafka-cluster cluster-id --bootstrap-server localhost:9092` was used to get the source cluster's cluster ID. If creating via the CLI--for example, on a Confluent Platform cluster--a sample `dest-link.config` is provided which can be passed in to the `kafka-cluster-links --create` or `confluent kafka link create` command. | ||
|
||
1. Creating the source-side of the link with the command `./kafka-cluster-links --create --link laptop_outpost_1 --bootstrap-server localhost:9092 --config-file source-link.config --cluster-id <dest-cluster-id>` | ||
|
||
## Topics & Mirror Topics | ||
|
||
In the demo, topics were created on the source cluster by hand using this command: `./kafka-topics --create --bootstrap-server localhost:9092 --topic critical1 --partitions 1` (repeat for the other topic names, like `critical2`, `lowX` and `mediumX`). The topics can have multiple partitions if you'd like; a single partition topic was used in the demo since the demo was only running on one broker. | ||
|
||
In the demo, mirror topics were created by hand in the destination cluster. This can be done in the Confluent Cloud Console, or via REST API, Terraform, or CLI. For example, the command `./kafka-mirrors --create --source-topic <source-topic-name> --mirror-topic <mirror-topic-name> --link laptop_outpost_1 --bootstrap-server <dest-bootstrap-server>` | ||
|
||
The mirror topics' names--including the prefixes, if given--need to be hardcoded into the `apply-priorities.js` script. Alternatively, you can source the topic priority list from a database or GUI. | ||
|
||
# Running the Script | ||
|
||
The `apply-priorities.js` script requires: | ||
|
||
* NodeJS installed on the machine it will run on | ||
* The machine it will run on needs connectivity (access) to the Destination cluster (not necessarily to the source cluster) | ||
* The hardcoded variables in the script--bootstrap server, cluster ID, mirror topic names, and base-64 encoded API Key: API Secret--need to be replaced with ones you have access to. | ||
|
||
There are two ways to run the prioritizing script: | ||
|
||
* `node apply-priorities.js` will do one run of the priorities | ||
* `./run-apply-priorities.sh` will run | ||
Comment on lines
+52
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggest a rename of the second script that makes it clear it runs continuously |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
/*** Lag Settings ***/ | ||
|
||
// priority list of topic names | ||
const priorities = [ | ||
['outpost.critical1', 'outpost.critical2'], | ||
['outpost.medium1', 'outpost.medium2'], | ||
['outpost.low1', 'outpost.low2'] | ||
]; | ||
|
||
// if this lag is reached, then the throttles will kick in (expressed in terms of # of messages) | ||
const lagThreshold = 10; | ||
|
||
|
||
/*** Cluster Settings for Destination cluster ***/ | ||
const hostname = "REPLACE"; | ||
const clusterId = "REPLACE"; | ||
const basicAuth = "REPLACE"; // base64 encoded "apiKey:apiSecret" | ||
const linkName = "laptop_outpost_1"; | ||
|
||
const http = require("https"); | ||
|
||
|
||
function checkForLag() | ||
{ | ||
// API used: https://docs.confluent.io/cloud/current/api.html#tag/Cluster-Linking-(v3)/operation/listKafkaMirrorTopics | ||
const options = { | ||
"method": "GET", | ||
"hostname": hostname, | ||
"port": 443, | ||
"path": "/kafka/v3/clusters/" + clusterId + "/links/" + linkName + "/mirrors", | ||
"headers": { | ||
"Authorization": "Basic " + basicAuth | ||
} | ||
}; | ||
|
||
const req = http.request(options, function (res) { | ||
const chunks = []; | ||
|
||
res.on("data", function (chunk) { | ||
chunks.push(chunk); | ||
}); | ||
|
||
res.on("end", function () { | ||
const body = Buffer.concat(chunks); | ||
const topicsArray = JSON.parse(body).data; | ||
|
||
// pre-processing the list of mirror topics to get the info that we care about (name, lag, status) | ||
const mirrorTopics = {}; | ||
const mirrorTopicsNamesArray = []; | ||
console.log(topicsArray.length + ' mirror topics found'); | ||
topicsArray.forEach((topic) => { | ||
const name = topic.mirror_topic_name; | ||
const maxLag = topic.mirror_lags.reduce((max, lagObj) => Math.max(max, lagObj.lag), 0); // we only care about the maximum lag among the partitions | ||
mirrorTopics[name] = { | ||
maxLag, | ||
mirrorStatus: topic.mirror_status, | ||
topicData: topic | ||
}; | ||
mirrorTopicsNamesArray.push(name); | ||
}); | ||
|
||
let shouldThrottle = false; | ||
let resumeTopics = []; // these topics are "safe" and we will call the "Resume" command on them to ensure they are not throttled | ||
for (let i = 0; i < priorities.length; i ++) | ||
{ | ||
const priorityTopics = priorities[i]; | ||
console.log('\nPriority Level ' + i +': ' + priorityTopics.join(', ')); | ||
// add this level of priority to the "safe" topics that we will let run | ||
resumeTopics = resumeTopics.concat(priorityTopics); | ||
|
||
priorityTopics.forEach((topicName) => { | ||
if (shouldThrottle) | ||
{ | ||
return; // we're already going to throttle so don't do any more logic | ||
} | ||
|
||
const topic = mirrorTopics[topicName]; | ||
if (topic.maxLag > lagThreshold) | ||
{ | ||
console.log('Found lagging topic: ' + topicName); | ||
shouldThrottle = true; // lag threshold has been reached; start to throttle lower priorities | ||
} | ||
if (topic.mirrorStatus === 'PAUSED') | ||
{ | ||
console.log('Found paused topic: ' + topicName); | ||
shouldThrottle = true; // unpause the priority levels one at a time, in order of priority | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the name |
||
} | ||
}); | ||
|
||
if (shouldThrottle) | ||
{ | ||
break; // break the loop at this priority level if we will throttle | ||
} | ||
} | ||
|
||
console.log('\n'); | ||
if (shouldThrottle) | ||
{ | ||
console.log('Important topics are lagging, starting to throttle topics.'); | ||
console.log('These topics have priority and will be allowed to mirror: ' + resumeTopics.join (', ')); | ||
// dispatch "resume" command on this list of topic names to ensure they are not paused | ||
alterMirrorTopics('resume', resumeTopics); | ||
|
||
// find all mirror topics that don't have priority | ||
const topicsToPause = mirrorTopicsNamesArray.map((topicName) => { | ||
if (resumeTopics.indexOf(topicName) >= 0) | ||
{ | ||
return null; | ||
} | ||
return topicName; | ||
}).filter((e) => e !== null); | ||
console.log('\nThese mirror topics are low priority and will be paused: ' + topicsToPause.join(', ')); | ||
alterMirrorTopics('pause', topicsToPause); | ||
} | ||
else | ||
{ | ||
console.log('No critical lag, will resume all topics'); | ||
// un-pause (resume) all topics to make sure nothing is throttled | ||
|
||
// dispatch "resume" command on all mirror topics on this link | ||
alterMirrorTopics('resume', mirrorTopicsNamesArray); | ||
} | ||
Comment on lines
+115
to
+122
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this ever not be a no-op? could it be deleted? (since we fall into the block above if a topic is paused). If it's a no-op, maybe leave the block but change the console log to |
||
|
||
console.log('\n\n'); | ||
}); | ||
Comment on lines
+47
to
+125
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like these should all be tabbed. suggest using a JS editor to auito-indent |
||
}); | ||
|
||
req.end(); | ||
|
||
} | ||
|
||
|
||
function alterMirrorTopics(pauseOrResume, mirror_topic_names) | ||
{ | ||
const options = { | ||
"method": "POST", | ||
"hostname": hostname, | ||
"port": 443, | ||
"path": "/kafka/v3/clusters/" + clusterId + "/links/" + linkName + "/mirrors:" + pauseOrResume, | ||
"headers": { | ||
"Authorization": "Basic " + basicAuth | ||
} | ||
}; | ||
|
||
const req = http.request(options, function (res) { | ||
const chunks = []; | ||
|
||
res.on("data", function (chunk) { | ||
chunks.push(chunk); | ||
}); | ||
|
||
res.on("end", function () { | ||
const body = Buffer.concat(chunks); | ||
// console.log(body.toString()); | ||
// TODO check for errors from the API | ||
}); | ||
}); | ||
|
||
req.write(JSON.stringify({ | ||
mirror_topic_names | ||
})); | ||
|
||
req.end(); | ||
|
||
} | ||
|
||
|
||
checkForLag(); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
link.mode=DESTINATION | ||
connection.mode=INBOUND | ||
link.prefix=outpost. | ||
# link.prefix is optional |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
function runPriorities() { | ||
node apply-priorities.js | ||
sleep 10 | ||
runPriorities | ||
} | ||
|
||
runPriorities |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
listeners=PLAINTEXT://:9092 | ||
|
||
log.dirs=/tmp/kafka-logs-1 | ||
|
||
zookeeper.connect=localhost:2181 | ||
|
||
offsets.topic.replication.factor=1 | ||
|
||
confluent.license.topic.replication.factor=1 | ||
|
||
confluent.reporters.telemetry.auto.enable=false | ||
|
||
confluent.cluster.link.enable=true | ||
|
||
password.encoder.secret=encoder-secret |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
dataDir=/tmp/zookeeper-clusterlinking | ||
|
||
clientPort=2181 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
link.mode=SOURCE | ||
connection.mode=OUTBOUND | ||
bootstrap.servers=<DEST_BOOTSTRAP> | ||
|
||
security.protocol=SASL_SSL | ||
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<DEST_APIKEY>' password='<DEST_APIPASSWORD>'; | ||
sasl.mechanism=PLAIN | ||
|
||
# if source cluster has security, put in a authentication details here, prefixed by "local.". SCRAM-SHA-512 is provided for example | ||
# local.sasl.mechanism=SCRAM-SHA-512 | ||
# local.security.protocol=SASL_PLAINTEXT | ||
# local.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="app" password="password"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest something more descriptive: will run until canceled, periodically checking if any mirroring should be throttled