Skip to content

Commit

Permalink
Add setToken API for OAuthBearer authentication flow (#1075)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewstanovsky authored Apr 18, 2024
1 parent a0648d3 commit f594d11
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 93 deletions.
80 changes: 80 additions & 0 deletions examples/oauthbearer-default-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
Producer, Consumer and HighLevelProducer:
```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

var Kafka = require('../');

var token = "your_token";

var producer = new Kafka.Producer({
//'debug' : 'all',
'metadata.broker.list': 'localhost:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
}).setOauthBearerToken(token);

//start the producer
producer.connect();

//refresh the token
producer.setOauthBearerToken(token);
```

AdminClient:
```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var Kafka = require('../');

var token = "your_token";

var admin = Kafka.AdminClient.create({
'metadata.broker.list': 'localhost:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
}, token);

//refresh the token
admin.refreshOauthBearerToken(token);
```

ConsumerStream:
```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var Kafka = require('../');

var token = "your_token";

var stream = Kafka.KafkaConsumer.createReadStream({
'metadata.broker.list': 'localhost:9093',
'group.id': 'myGroup',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER'
}, {}, {
topics: 'test1',
initOauthBearerToken: token,
});

//refresh the token
stream.refreshOauthBearerToken(token.token);
```
8 changes: 7 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export interface ReadStreamOptions extends ReadableOptions {
autoClose?: boolean;
streamAsBatch?: boolean;
connectOptions?: any;
initOauthBearerToken?: string;
}

export interface WriteStreamOptions extends WritableOptions {
Expand All @@ -137,6 +138,7 @@ export interface ProducerStream extends Writable {
export interface ConsumerStream extends Readable {
consumer: KafkaConsumer;
connect(options: ConsumerGlobalConfig): void;
refreshOauthBearerToken(tokenStr: string): void;
close(cb?: () => void): void;
}

Expand Down Expand Up @@ -180,6 +182,8 @@ export abstract class Client<Events extends string> extends EventEmitter {

connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this;

setOauthBearerToken(tokenStr: string): this;

getClient(): any;

connectedTime(): number;
Expand Down Expand Up @@ -330,6 +334,8 @@ export interface NewTopic {
}

export interface IAdminClient {
refreshOauthBearerToken(tokenStr: string): void;

createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void;
createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;

Expand All @@ -343,5 +349,5 @@ export interface IAdminClient {
}

export abstract class AdminClient {
static create(conf: GlobalConfig): IAdminClient;
static create(conf: GlobalConfig, initOauthBearerToken?: string): IAdminClient;
}
22 changes: 21 additions & 1 deletion lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ var shallowCopy = require('./util').shallowCopy;
* active handle with the brokers.
*
*/
function createAdminClient(conf) {
function createAdminClient(conf, initOauthBearerToken) {
var client = new AdminClient(conf);

if (initOauthBearerToken) {
client.refreshOauthBearerToken(initOauthBearerToken);
}

// Wrap the error so we throw if it failed with some context
LibrdKafkaError.wrap(client.connect(), true);

Expand Down Expand Up @@ -105,6 +109,22 @@ AdminClient.prototype.disconnect = function() {
this._isConnected = false;
};

/**
* Refresh OAuthBearer token, initially provided in factory method.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
*
* @param {string} tokenStr - OAuthBearer token string
* @see connection.cc
*/
AdminClient.prototype.refreshOauthBearerToken = function (tokenStr) {
if (!tokenStr || typeof tokenStr !== 'string') {
throw new Error("OAuthBearer token is undefined/empty or not a string");
}

this._client.setToken(tokenStr);
};

/**
* Create a topic with a given config.
*
Expand Down
19 changes: 19 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,25 @@ Client.prototype.connect = function(metadataOptions, cb) {

};

/**
* Set initial token before any connection is established for oauthbearer authentication flow.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
* Call this method again to refresh the token.
*
* @param {string} tokenStr - OAuthBearer token string
* @see connection.cc
* @return {Client} - Returns itself.
*/
Client.prototype.setOauthBearerToken = function (tokenStr) {
if (!tokenStr || typeof tokenStr !== 'string') {
throw new Error("OAuthBearer token is undefined/empty or not a string");
}

this._client.setToken(tokenStr);
return this;
};

/**
* Get the native Kafka client.
*
Expand Down
16 changes: 16 additions & 0 deletions lib/kafka-consumer-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ function KafkaConsumerStream(consumer, options) {
self.push(null);
});

if (options.initOauthBearerToken) {
this.consumer.setOauthBearerToken(options.initOauthBearerToken);
}

// Call connect. Handles potentially being connected already
this.connect(this.connectOptions);

Expand All @@ -123,6 +127,18 @@ function KafkaConsumerStream(consumer, options) {

}

/**
* Refresh OAuthBearer token, initially provided in factory method.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
*
* @param {string} tokenStr - OAuthBearer token string
* @see connection.cc
*/
KafkaConsumerStream.prototype.refreshOauthBearerToken = function (tokenStr) {
this.consumer.setOauthBearerToken(tokenStr);
};

/**
* Internal stream read method. This method reads message objects.
* @param {number} size - This parameter is ignored for our cases.
Expand Down
Loading

0 comments on commit f594d11

Please sign in to comment.