Skip to content

Commit

Permalink
KafkaJS package (#638)
Browse files Browse the repository at this point in the history
Kafka optional package based on kafkajs.

This is much like the built-in but also has a produce step.

---------

Signed-off-by: chuck-dbos <134347445+chuck-dbos@users.noreply.github.com>
Co-authored-by: Qian Li <qian.li@dbos.dev>
  • Loading branch information
chuck-dbos and qianl15 authored Oct 9, 2024
1 parent 5987ed0 commit e05e906
Show file tree
Hide file tree
Showing 17 changed files with 8,551 additions and 2,365 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish_npm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
packages/dbos-compiler,
packages/dbos-openapi,
packages/dbos-sqs,
packages/dbos-kafkajs,
]
steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,6 @@ jobs:
PGPASSWORD: dbos
DB_PASSWORD: dbos
DB_USER: postgres
KAFKA_BROKER: localhost:9092
NPM_AUTH_TOKEN: ${{secrets.GITHUB_TOKEN}}
SILENCE_LOGS: true
3,358 changes: 1,001 additions & 2,357 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/communicator-email-ses/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# DBOS AWS Simple Email Service (SES) Communicator
# DBOS AWS Simple Email Service (SES) Library

This is a [DBOS](https://docs.dbos.dev/) [communicator](https://docs.dbos.dev/tutorials/communicator-tutorial) for sending email using the [Amazon Web Services Simple Email Service](https://aws.amazon.com/ses/).

Expand Down
2 changes: 1 addition & 1 deletion packages/communicator-email-ses/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@dbos-inc/communicator-email-ses",
"version": "0.0.0-placeholder",
"description": "Communicator library - email in AWS with SES",
"description": "DBOS library - email in AWS with SES",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
2 changes: 1 addition & 1 deletion packages/communicator-random/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# DBOS Random Communicator
# DBOS Random Step

This is a [DBOS](https://docs.dbos.dev/) [step](https://docs.dbos.dev/tutorials/communicator-tutorial) for generating random numbers.

Expand Down
2 changes: 1 addition & 1 deletion packages/communicator-random/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@dbos-inc/communicator-random",
"version": "0.0.0-placeholder",
"description": "Communicator library",
"description": "DBOS library for wrapping random number generation in steps",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
62 changes: 62 additions & 0 deletions packages/dbos-kafkajs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# DBOS Kafka Library (KafkaJS Version)

Publish/subscribe message queues are a common building block for distributed systems. Message queues allow processing to occur at a different place or time, perhaps in multiple client programming environments. Due to its performance, flexibility, and simple, scalable design, [Kafka](https://www.confluent.io/cloud-kafka) is a popular choice for publish/subscribe.

This package includes a [DBOS](https://docs.dbos.dev/) [step](https://docs.dbos.dev/tutorials/communicator-tutorial) for sending Kafka messages, as well as an event receiver for exactly-once processing of incoming messages (even using standard queues).

This package is based on [KafkaJS](https://kafka.js.org/). We are working on other client libraries for Kafka, please reach out to [us](https://www.dbos.dev/) if you are interested in a different client library.

## Configuring a DBOS Application with Kafka
Ensure that the DBOS SQS package is installed into the application:
```
npm install --save @dbos-inc/dbos-kafkajs
```

## Sending Messages

### Imports
First, ensure that the package classes are imported:
```typescript
import {
KafkaConfig,
logLevel,
KafkaProduceStep,
Partitioners,
} from "@dbos-inc/dbos-kafkajs";
```

### Selecting A Configuration
`KafkaProduceStep` is a configured class. This means that the configuration (or config file key name) must be provided when a class instance is created, for example:
```typescript
const kafkaConfig: KafkaConfig = {
clientId: 'dbos-kafka-test',
brokers: [`${process.env['KAFKA_BROKER'] ?? 'localhost:9092'}`],
requestTimeout: 100, // FOR TESTING
retry: { // FOR TESTING
retries: 5
},
logLevel: logLevel.NOTHING, // FOR TESTING
}

kafkaCfg = configureInstance(KafkaProduceStep, 'defKafka', kafkaConfig, defTopic, {
createPartitioner: Partitioners.DefaultPartitioner
});
```

### Sending
Within a [DBOS Transact Workflow](https://docs.dbos.dev/tutorials/workflow-tutorial), invoke the `KafkaProduceStep` function from the workflow context:
```typescript
const sendRes = await wfCtx.invoke(kafkaCfg).sendMessage({value: ourMessage});
```

## Receiving Messages
A tutorial for receiving and processing Kafka messages can be found [here](https://docs.dbos.dev/tutorials/kafka-integration).

## Simple Testing
The `kafkajs.test.ts` file included in the source repository demonstrates sending and processing Kafka messages. Before running, set the following environment variables:
- `KAFKA_BROKER`: Broker URL

## Next Steps
- For a detailed DBOS Transact tutorial, check out our [programming quickstart](https://docs.dbos.dev/getting-started/quickstart-programming).
- To learn how to deploy your application to DBOS Cloud, visit our [cloud quickstart](https://docs.dbos.dev/getting-started/quickstart-cloud/)
- To learn more about DBOS, take a look at [our documentation](https://docs.dbos.dev/) or our [source code](https://github.com/dbos-inc/dbos-transact).
234 changes: 234 additions & 0 deletions packages/dbos-kafkajs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import {
Kafka as KafkaJS,
Consumer,
ConsumerConfig,
Producer,
ProducerConfig,
KafkaConfig,
KafkaMessage,
Message,
KafkaJSProtocolError,
logLevel,
Partitioners,
} from "kafkajs";
import { Step, StepContext, ConfiguredInstance, DBOSContext, DBOSEventReceiver, InitContext } from "@dbos-inc/dbos-sdk";
import { associateClassWithEventReceiver, associateMethodWithEventReceiver } from "@dbos-inc/dbos-sdk";
import { TransactionFunction } from "@dbos-inc/dbos-sdk";
import { WorkflowFunction } from "@dbos-inc/dbos-sdk";
import { Error as DBOSError } from "@dbos-inc/dbos-sdk";
import { DBOSExecutorContext } from "@dbos-inc/dbos-sdk";

export {
KafkaConfig,
ConsumerConfig,
ProducerConfig,
KafkaMessage,
KafkaJSProtocolError as KafkaError,
logLevel,
Partitioners,
};

type KafkaArgs = [string, number, KafkaMessage];

const sleepms = (ms: number) => new Promise((r) => setTimeout(r, ms));

////////////////////////
/* Kafka Management */
///////////////////////

export class DBOSKafka implements DBOSEventReceiver {
readonly consumers: Consumer[] = [];
kafka?: KafkaJS = undefined;

executor?: DBOSExecutorContext = undefined;

constructor() { }

async initialize(dbosExecI: DBOSExecutorContext) {
this.executor = dbosExecI;
const regops = this.executor.getRegistrationsFor(this);
for (const registeredOperation of regops) {
const ro = registeredOperation.methodConfig as KafkaRegistrationInfo;
if (ro.kafkaTopics) {
const defaults = registeredOperation.classConfig as KafkaDefaults;
const method = registeredOperation.methodReg;
const cname = method.className;
const mname = method.name;
if (!method.txnConfig && !method.workflowConfig) {
throw new DBOSError.DBOSError(`Error registering method ${cname}.${mname}: A Kafka decorator can only be assigned to a transaction or workflow!`)
}
if (!defaults.kafkaConfig) {
throw new DBOSError.DBOSError(`Error registering method ${cname}.${mname}: Kafka configuration not found. Does class ${cname} have an @Kafka decorator?`)
}
const topics: Array<string | RegExp> = [];
if (Array.isArray(ro.kafkaTopics) ) {
topics.push(...ro.kafkaTopics)
} else
if (ro.kafkaTopics) {
topics.push(ro.kafkaTopics)
}
if (!this.kafka) {
this.kafka = new KafkaJS(defaults.kafkaConfig);
}
const consumerConfig = ro.consumerConfig ?? { groupId: `${this.safeGroupName(topics)}` };
const consumer = this.kafka.consumer(consumerConfig);
await consumer.connect();
// A temporary workaround for https://github.com/tulios/kafkajs/pull/1558 until it gets fixed
// If topic autocreation is on and you try to subscribe to a nonexistent topic, KafkaJS should retry until the topic is created.
// However, it has a bug where it won't. Thus, we retry instead.
const maxRetries = defaults.kafkaConfig.retry ? defaults.kafkaConfig.retry.retries ?? 5 : 5;
let retryTime = defaults.kafkaConfig.retry ? defaults.kafkaConfig.retry.maxRetryTime ?? 300 : 300;
const multiplier = defaults.kafkaConfig.retry ? defaults.kafkaConfig.retry.multiplier ?? 2 : 2;
for (let i = 0; i < maxRetries; i++) {
try {
await consumer.subscribe({ topics: topics, fromBeginning: true });
break;
} catch (error) {
const e = error as KafkaJSProtocolError;
if (e.code === 3 && i + 1 < maxRetries) { // UNKNOWN_TOPIC_OR_PARTITION
await sleepms(retryTime);
retryTime *= multiplier;
continue;
} else {
throw e
}
}
}
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// This combination uniquely identifies a message for a given Kafka cluster
const workflowUUID = `kafka-unique-id-${topic}-${partition}-${message.offset}`
const wfParams = { workflowUUID: workflowUUID, configuredInstance: null };
// All operations annotated with Kafka decorators must take in these three arguments
const args: KafkaArgs = [topic, partition, message];
// We can only guarantee exactly-once-per-message execution of transactions and workflows.
if (method.txnConfig) {
// Execute the transaction
await this.executor!.transaction(method.registeredFunction as TransactionFunction<unknown[], unknown>, wfParams, ...args);
} else if (method.workflowConfig) {
// Safely start the workflow
await this.executor!.workflow(method.registeredFunction as unknown as WorkflowFunction<unknown[], unknown>, wfParams, ...args);
}
},
})
this.consumers.push(consumer);
}
}
}

async destroy() {
for (const consumer of this.consumers) {
await consumer.disconnect();
}
}

safeGroupName(topics: Array<string | RegExp>) {
const safeGroupIdPart = topics
.map(r => r.toString())
.map( r => r.replaceAll(/[^a-zA-Z0-9\\-]/g, ''))
.join('-');
return `dbos-kafka-group-${safeGroupIdPart}`.slice(0, 255);
}

logRegisteredEndpoints() {
if (!this.executor) return;
const logger = this.executor.logger;
logger.info("Kafka endpoints supported:");
const regops = this.executor.getRegistrationsFor(this);
regops.forEach((registeredOperation) => {
const ro = registeredOperation.methodConfig as KafkaRegistrationInfo;
if (ro.kafkaTopics) {
const cname = registeredOperation.methodReg.className;
const mname = registeredOperation.methodReg.name;
if (Array.isArray(ro.kafkaTopics)) {
ro.kafkaTopics.forEach( kafkaTopic => {
logger.info(` ${kafkaTopic} -> ${cname}.${mname}`);
});
} else {
logger.info(` ${ro.kafkaTopics} -> ${cname}.${mname}`);
}
}
});
}
}

/////////////////////////////
/* Kafka Method Decorators */
/////////////////////////////

let kafkaInst: DBOSKafka | undefined = undefined;

export interface KafkaRegistrationInfo {
kafkaTopics?: string | RegExp | Array<string | RegExp>;
consumerConfig?: ConsumerConfig;
}

export function KafkaConsume(topics: string | RegExp | Array<string | RegExp>, consumerConfig?: ConsumerConfig) {
function kafkadec<This, Ctx extends DBOSContext, Return>(
target: object,
propertyKey: string,
inDescriptor: TypedPropertyDescriptor<(this: This, ctx: Ctx, ...args: KafkaArgs) => Promise<Return>>
) {
if (!kafkaInst) kafkaInst = new DBOSKafka();
const {descriptor, receiverInfo} = associateMethodWithEventReceiver(kafkaInst, target, propertyKey, inDescriptor);

const kafkaRegistration = receiverInfo as KafkaRegistrationInfo;
kafkaRegistration.kafkaTopics = topics;
kafkaRegistration.consumerConfig = consumerConfig;

return descriptor;
}
return kafkadec;
}

/////////////////////////////
/* Kafka Class Decorators */
/////////////////////////////

export interface KafkaDefaults {
kafkaConfig?: KafkaConfig;
}

export function Kafka(kafkaConfig: KafkaConfig) {
function clsdec<T extends { new(...args: unknown[]): object }>(ctor: T) {
if (!kafkaInst) kafkaInst = new DBOSKafka();
const kafkaInfo = associateClassWithEventReceiver(kafkaInst, ctor) as KafkaDefaults;
kafkaInfo.kafkaConfig = kafkaConfig;
}
return clsdec;
}

//////////////////////////////
/* Producer Step */
//////////////////////////////
export class KafkaProduceStep extends ConfiguredInstance
{
producer: Producer | undefined = undefined;
topic: string = "";

constructor(name: string, readonly cfg: KafkaConfig, topic: string, readonly pcfg ?: ProducerConfig) {
super(name);
this.topic = topic;
}

async initialize(_ctx: InitContext): Promise<void> {
const kafka = new KafkaJS(this.cfg);
this.producer = kafka.producer(this.pcfg);
await this.producer.connect();
return Promise.resolve();
}

@Step()
async sendMessage(_ctx: StepContext, msg: Message) {
return await this.producer?.send({topic: this.topic, messages:[msg]});
}

@Step()
async sendMessages(_ctx: StepContext, msg: Message[]) {
return await this.producer?.send({topic: this.topic, messages:msg});
}

async disconnect() {
await this.producer?.disconnect();
}
}
8 changes: 8 additions & 0 deletions packages/dbos-kafkajs/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testRegex: '((\\.|/)(test|spec))\\.ts?$',
moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'json', 'node'],
modulePaths: ["./"],
};
17 changes: 17 additions & 0 deletions packages/dbos-kafkajs/kafkajs-test-dbos-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# To enable auto-completion and validation for this file in VSCode, install the RedHat YAML extension
# https://marketplace.visualstudio.com/items?itemName=redhat.vscode-yaml

# yaml-language-server: $schema=https://raw.githubusercontent.com/dbos-inc/dbos-transact/main/dbos-config.schema.json

database:
hostname: 'localhost'
port: 5432
username: 'postgres'
password: ${PGPASSWORD}
app_db_name: 'postgres'
connectionTimeoutMillis: 3000
app_db_client: 'knex'
migrate:
- npx knex migrate:latest
rollback:
- npx knex migrate:rollback
Loading

0 comments on commit e05e906

Please sign in to comment.