Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support group instance id #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions examples/consumer-v1.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
const { Kafka, logLevel } = require('../index')
const PrettyConsoleLogger = require('./prettyConsoleLogger')

const kafka = new Kafka({
logLevel: logLevel.INFO,
logCreator: PrettyConsoleLogger,
brokers: ['localhost:29092'],
clientId: 'example-consumer',
})

const myArgs = process.argv.slice(2)
const groupInstanceId = myArgs[0]

const topic = 'topic-test'
const consumer = kafka.consumer({ groupId: 'test-group', groupInstanceId })

let msgNumber = 0
const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
msgNumber++
kafka.logger().info('Message processed', {
topic,
partition,
offset: message.offset,
timestamp: message.timestamp,
headers: Object.keys(message.headers).reduce(
(headers, key) => ({
...headers,
[key]: message.headers[key].toString(),
}),
{}
),
key: message.key.toString(),
value: message.value.toString(),
msgNumber,
})
},
})
}

run().catch(e => kafka.logger().error(`[example/consumer] ${e.message}`, { stack: e.stack }))

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']

errorTypes.map(type => {
process.on(type, async e => {
try {
kafka.logger().info(`process.on ${type}`)
kafka.logger().error(e.message, { stack: e.stack })
await consumer.disconnect({ leaveGroup: false })
process.exit(0)
} catch (_) {
process.exit(1)
}
})
})

signalTraps.map(type => {
process.once(type, async () => {
console.log('')
kafka.logger().info('[example/consumer] disconnecting')
await consumer.disconnect({ leaveGroup: false })
process.exit(0)
})
})
79 changes: 79 additions & 0 deletions examples/producer-v1.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
const { Kafka, CompressionTypes, logLevel } = require('../index')
const PrettyConsoleLogger = require('./prettyConsoleLogger')

const kafka = new Kafka({
logLevel: logLevel.INFO,
logCreator: PrettyConsoleLogger,
brokers: ['localhost:29092'],
clientId: 'example-producer',
})

const topic = 'topic-test'
const producer = kafka.producer()

const getRandomNumber = () => Math.round(Math.random() * 1000)
const createMessage = num => ({
key: `key-${num}`,
value: `value-${num}-${new Date().toISOString()}`,
headers: {
'correlation-id': `${num}-${Date.now()}`,
},
})

let msgNumber = 0
let requestNumber = 0
const sendMessage = () => {
const messages = Array(1)
.fill()
.map(_ => createMessage(getRandomNumber()))

const requestId = requestNumber++
msgNumber += messages.length
kafka.logger().info(`Sending ${messages.length} messages #${requestId}...`)
return producer
.send({
topic,
compression: CompressionTypes.GZIP,
messages,
})
.then(response => {
kafka.logger().info(`Messages sent #${requestId}`, {
response,
msgNumber,
})
})
.catch(e => kafka.logger().error(`[example/producer] ${e.message}`, { stack: e.stack }))
}

let intervalId
const run = async () => {
await producer.connect()
intervalId = setInterval(sendMessage, 7000)
}

run().catch(e => kafka.logger().error(`[example/producer] ${e.message}`, { stack: e.stack }))

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']

errorTypes.map(type => {
process.on(type, async e => {
try {
kafka.logger().info(`process.on ${type}`)
kafka.logger().error(e.message, { stack: e.stack })
await producer.disconnect()
process.exit(0)
} catch (_) {
process.exit(1)
}
})
})

signalTraps.map(type => {
process.once(type, async () => {
console.log('')
kafka.logger().info('[example/producer] disconnecting')
clearInterval(intervalId)
await producer.disconnect()
})
})
30 changes: 24 additions & 6 deletions src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,11 @@ module.exports = class Broker {
* @param {string} request.memberId The member id assigned by the group coordinator
* @returns {Promise}
*/
async heartbeat({ groupId, groupGenerationId, memberId }) {
async heartbeat({ groupId, groupGenerationId, memberId, groupInstanceId }) {
const heartbeat = this.lookupRequest(apiKeys.Heartbeat, requests.Heartbeat)
return await this[PRIVATE.SEND_REQUEST](heartbeat({ groupId, groupGenerationId, memberId }))
return await this[PRIVATE.SEND_REQUEST](
heartbeat({ groupId, groupGenerationId, memberId, groupInstanceId })
)
}

/**
Expand All @@ -365,6 +367,7 @@ module.exports = class Broker {
* @param {number} request.rebalanceTimeout The maximum time that the coordinator will wait for each member
* to rejoin when rebalancing the group
* @param {string} [request.memberId=""] The assigned consumer id or an empty string for a new consumer
* @param {string} [groupInstanceId=""] The assigned group instance id or an empty string for a new consumer
* @param {string} [request.protocolType="consumer"] Unique name for class of protocols implemented by group
* @param {Array} request.groupProtocols List of protocols that the member supports (assignment strategy)
* [{ name: 'AssignerName', metadata: '{"version": 1, "topics": []}' }]
Expand All @@ -375,6 +378,7 @@ module.exports = class Broker {
sessionTimeout,
rebalanceTimeout,
memberId = '',
groupInstanceId,
protocolType = 'consumer',
groupProtocols,
}) {
Expand All @@ -386,6 +390,7 @@ module.exports = class Broker {
sessionTimeout,
rebalanceTimeout,
memberId: assignedMemberId,
groupInstanceId,
protocolType,
groupProtocols,
})
Expand All @@ -407,11 +412,13 @@ module.exports = class Broker {
* @param {object} request
* @param {string} request.groupId
* @param {string} request.memberId
* @param {string} groupInstanceId
* @returns {Promise}
*/
async leaveGroup({ groupId, memberId }) {
async leaveGroup({ groupId, memberId, groupInstanceId }) {
const leaveGroup = this.lookupRequest(apiKeys.LeaveGroup, requests.LeaveGroup)
return await this[PRIVATE.SEND_REQUEST](leaveGroup({ groupId, memberId }))
this.logger.debug(`Leave group`, { groupId, memberId, groupInstanceId })
return await this[PRIVATE.SEND_REQUEST](leaveGroup({ groupId, memberId, groupInstanceId }))
}

/**
Expand All @@ -420,16 +427,18 @@ module.exports = class Broker {
* @param {string} request.groupId
* @param {number} request.generationId
* @param {string} request.memberId
* @param {string} groupInstanceId
* @param {object} request.groupAssignment
* @returns {Promise}
*/
async syncGroup({ groupId, generationId, memberId, groupAssignment }) {
async syncGroup({ groupId, generationId, memberId, groupInstanceId, groupAssignment }) {
const syncGroup = this.lookupRequest(apiKeys.SyncGroup, requests.SyncGroup)
return await this[PRIVATE.SEND_REQUEST](
syncGroup({
groupId,
generationId,
memberId,
groupInstanceId,
groupAssignment,
})
)
Expand Down Expand Up @@ -476,6 +485,7 @@ module.exports = class Broker {
* @param {string} request.groupId
* @param {number} request.groupGenerationId
* @param {string} request.memberId
* @param {string} groupInstanceId
* @param {number} [request.retentionTime=-1] -1 signals to the broker that its default configuration
* should be used.
* @param {object} request.topics Topics to commit offsets, e.g:
Expand All @@ -489,13 +499,21 @@ module.exports = class Broker {
* ]
* @returns {Promise}
*/
async offsetCommit({ groupId, groupGenerationId, memberId, retentionTime, topics }) {
async offsetCommit({
groupId,
groupGenerationId,
memberId,
retentionTime,
groupInstanceId,
topics,
}) {
const offsetCommit = this.lookupRequest(apiKeys.OffsetCommit, requests.OffsetCommit)
return await this[PRIVATE.SEND_REQUEST](
offsetCommit({
groupId,
groupGenerationId,
memberId,
groupInstanceId,
retentionTime,
topics,
})
Expand Down
19 changes: 14 additions & 5 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const Batch = require('./batch')
const SeekOffsets = require('./seekOffsets')
const SubscriptionState = require('./subscriptionState')
const {
events: { GROUP_JOIN, HEARTBEAT, CONNECT, RECEIVED_UNSUBSCRIBED_TOPICS },
events: { GROUP_JOIN, GROUP_LEAVE, HEARTBEAT, CONNECT, RECEIVED_UNSUBSCRIBED_TOPICS },
} = require('./instrumentationEvents')
const { MemberAssignment } = require('./assignerProtocol')
const {
Expand Down Expand Up @@ -66,6 +66,7 @@ module.exports = class ConsumerGroup {
cluster,
groupId,
topics,
groupInstanceId,
topicConfigurations,
logger,
instrumentationEmitter,
Expand All @@ -86,6 +87,7 @@ module.exports = class ConsumerGroup {
/** @type {import("../../types").Cluster} */
this.cluster = cluster
this.groupId = groupId
this.groupInstanceId = groupInstanceId
this.topics = topics
this.topicsSubscribed = topics
this.topicConfigurations = topicConfigurations
Expand Down Expand Up @@ -130,13 +132,14 @@ module.exports = class ConsumerGroup {
this.lastRequest = Date.now()

this[PRIVATE.SHARED_HEARTBEAT] = sharedPromiseTo(async ({ interval }) => {
const { groupId, generationId, memberId } = this
const { groupId, generationId, memberId, groupInstanceId } = this
const now = Date.now()

if (memberId && now >= this.lastRequest + interval) {
const payload = {
groupId,
memberId,
groupInstanceId,
groupGenerationId: generationId,
}

Expand All @@ -162,7 +165,7 @@ module.exports = class ConsumerGroup {
}

async [PRIVATE.JOIN]() {
const { groupId, sessionTimeout, rebalanceTimeout } = this
const { groupId, sessionTimeout, rebalanceTimeout, groupInstanceId } = this

this.coordinator = await this.cluster.findGroupCoordinator({ groupId })

Expand All @@ -171,6 +174,7 @@ module.exports = class ConsumerGroup {
sessionTimeout,
rebalanceTimeout,
memberId: this.memberId || '',
groupInstanceId,
groupProtocols: this.assigners.map(assigner =>
assigner.protocol({
topics: this.topicsSubscribed,
Expand All @@ -186,9 +190,10 @@ module.exports = class ConsumerGroup {
}

async leave() {
const { groupId, memberId } = this
const { groupId, memberId, groupInstanceId } = this
if (memberId) {
await this.coordinator.leaveGroup({ groupId, memberId })
await this.coordinator.leaveGroup({ groupId, memberId, groupInstanceId })
this.instrumentationEmitter.emit(GROUP_LEAVE, { groupId, memberId, groupInstanceId })
this.memberId = null
}
}
Expand All @@ -199,6 +204,7 @@ module.exports = class ConsumerGroup {
groupId,
generationId,
memberId,
groupInstanceId,
members,
groupProtocol,
topics,
Expand Down Expand Up @@ -234,6 +240,7 @@ module.exports = class ConsumerGroup {
groupId,
generationId,
memberId,
groupInstanceId,
groupAssignment: assignment,
})

Expand Down Expand Up @@ -325,6 +332,7 @@ module.exports = class ConsumerGroup {
groupId,
generationId,
memberId,
groupInstanceId,
})
}

Expand All @@ -344,6 +352,7 @@ module.exports = class ConsumerGroup {
groupId: this.groupId,
memberId: this.memberId,
leaderId: this.leaderId,
groupInstanceId: this.groupInstanceId,
isLeader: this.isLeader(),
memberAssignment,
groupProtocol: this.groupProtocol,
Expand Down
Loading