Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaJS package #638

Merged
merged 16 commits into from
Oct 9, 2024
1 change: 1 addition & 0 deletions .github/workflows/publish_npm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
packages/dbos-compiler,
packages/dbos-openapi,
packages/dbos-sqs,
packages/dbos-kafkajs,
]
steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,6 @@ jobs:
PGPASSWORD: dbos
DB_PASSWORD: dbos
DB_USER: postgres
KAFKA_BROKER: localhost:9092
NPM_AUTH_TOKEN: ${{secrets.GITHUB_TOKEN}}
SILENCE_LOGS: true
3,358 changes: 1,001 additions & 2,357 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/communicator-email-ses/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# DBOS AWS Simple Email Service (SES) Communicator
# DBOS AWS Simple Email Service (SES) Library

This is a [DBOS](https://docs.dbos.dev/) [communicator](https://docs.dbos.dev/tutorials/communicator-tutorial) for sending email using the [Amazon Web Services Simple Email Service](https://aws.amazon.com/ses/).

Expand Down
2 changes: 1 addition & 1 deletion packages/communicator-email-ses/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@dbos-inc/communicator-email-ses",
"version": "0.0.0-placeholder",
"description": "Communicator library - email in AWS with SES",
"description": "DBOS library - email in AWS with SES",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
2 changes: 1 addition & 1 deletion packages/communicator-random/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# DBOS Random Communicator
# DBOS Random Step

This is a [DBOS](https://docs.dbos.dev/) [step](https://docs.dbos.dev/tutorials/communicator-tutorial) for generating random numbers.

Expand Down
2 changes: 1 addition & 1 deletion packages/communicator-random/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@dbos-inc/communicator-random",
"version": "0.0.0-placeholder",
"description": "Communicator library",
"description": "DBOS library for wrapping random number generation in steps",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
62 changes: 62 additions & 0 deletions packages/dbos-kafkajs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# DBOS Kafka Library (KafkaJS Version)

Publish/subscribe message queues are a common building block for distributed systems. Message queues allow processing to occur at a different place or time, perhaps in multiple client programming environments. Due to its performance, flexibility, and simple, scalable design, [Kafka](https://www.confluent.io/cloud-kafka) is a popular choice for publish/subscribe.

This package includes a [DBOS](https://docs.dbos.dev/) [step](https://docs.dbos.dev/tutorials/communicator-tutorial) for sending Kafka messages, as well as an event receiver for exactly-once processing of incoming messages (even using standard queues).

This package is based on [KafkaJS](https://kafka.js.org/). We are working on other client libraries for Kafka, please reach out to [us](https://www.dbos.dev/) if you are interested in a different client library.

## Configuring a DBOS Application with Kafka
Ensure that the DBOS SQS package is installed into the application:
```
npm install --save @dbos-inc/dbos-kafkajs
```

## Sending Messages

### Imports
First, ensure that the package classes are imported:
```typescript
import {
KafkaConfig,
logLevel,
KafkaProduceStep,
Partitioners,
} from "@dbos-inc/dbos-kafkajs";
```

### Selecting A Configuration
`KafkaProduceStep` is a configured class. This means that the configuration (or config file key name) must be provided when a class instance is created, for example:
```typescript
const kafkaConfig: KafkaConfig = {
clientId: 'dbos-kafka-test',
brokers: [`${process.env['KAFKA_BROKER'] ?? 'localhost:9092'}`],
requestTimeout: 100, // FOR TESTING
retry: { // FOR TESTING
retries: 5
},
logLevel: logLevel.NOTHING, // FOR TESTING
}

kafkaCfg = configureInstance(KafkaProduceStep, 'defKafka', kafkaConfig, defTopic, {
createPartitioner: Partitioners.DefaultPartitioner
});
```

### Sending
Within a [DBOS Transact Workflow](https://docs.dbos.dev/tutorials/workflow-tutorial), invoke the `KafkaProduceStep` function from the workflow context:
```typescript
const sendRes = await wfCtx.invoke(kafkaCfg).sendMessage({value: ourMessage});
```

## Receiving Messages
A tutorial for receiving and processing Kafka messages can be found [here](https://docs.dbos.dev/tutorials/kafka-integration).

## Simple Testing
The `kafkajs.test.ts` file included in the source repository demonstrates sending and processing Kafka messages. Before running, set the following environment variables:
- `KAFKA_BROKER`: Broker URL

## Next Steps
- For a detailed DBOS Transact tutorial, check out our [programming quickstart](https://docs.dbos.dev/getting-started/quickstart-programming).
- To learn how to deploy your application to DBOS Cloud, visit our [cloud quickstart](https://docs.dbos.dev/getting-started/quickstart-cloud/)
- To learn more about DBOS, take a look at [our documentation](https://docs.dbos.dev/) or our [source code](https://github.com/dbos-inc/dbos-transact).
234 changes: 234 additions & 0 deletions packages/dbos-kafkajs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import {
Kafka as KafkaJS,
Consumer,
ConsumerConfig,
Producer,
ProducerConfig,
KafkaConfig,
KafkaMessage,
Message,
KafkaJSProtocolError,
logLevel,
Partitioners,
} from "kafkajs";
import { Step, StepContext, ConfiguredInstance, DBOSContext, DBOSEventReceiver, InitContext } from "@dbos-inc/dbos-sdk";
import { associateClassWithEventReceiver, associateMethodWithEventReceiver } from "@dbos-inc/dbos-sdk";
import { TransactionFunction } from "@dbos-inc/dbos-sdk";
import { WorkflowFunction } from "@dbos-inc/dbos-sdk";
import { Error as DBOSError } from "@dbos-inc/dbos-sdk";
import { DBOSExecutorContext } from "@dbos-inc/dbos-sdk";

export {
KafkaConfig,
ConsumerConfig,
ProducerConfig,
KafkaMessage,
KafkaJSProtocolError as KafkaError,
logLevel,
Partitioners,
};

type KafkaArgs = [string, number, KafkaMessage];

const sleepms = (ms: number) => new Promise((r) => setTimeout(r, ms));

////////////////////////
/* Kafka Management */
///////////////////////

export class DBOSKafka implements DBOSEventReceiver {
readonly consumers: Consumer[] = [];
kafka?: KafkaJS = undefined;

executor?: DBOSExecutorContext = undefined;

constructor() { }

async initialize(dbosExecI: DBOSExecutorContext) {
this.executor = dbosExecI;
const regops = this.executor.getRegistrationsFor(this);
for (const registeredOperation of regops) {
const ro = registeredOperation.methodConfig as KafkaRegistrationInfo;
if (ro.kafkaTopics) {
const defaults = registeredOperation.classConfig as KafkaDefaults;
const method = registeredOperation.methodReg;
const cname = method.className;
const mname = method.name;
if (!method.txnConfig && !method.workflowConfig) {
throw new DBOSError.DBOSError(`Error registering method ${cname}.${mname}: A Kafka decorator can only be assigned to a transaction or workflow!`)
}
if (!defaults.kafkaConfig) {
throw new DBOSError.DBOSError(`Error registering method ${cname}.${mname}: Kafka configuration not found. Does class ${cname} have an @Kafka decorator?`)
}
const topics: Array<string | RegExp> = [];
if (Array.isArray(ro.kafkaTopics) ) {
topics.push(...ro.kafkaTopics)
} else
if (ro.kafkaTopics) {
topics.push(ro.kafkaTopics)
}
if (!this.kafka) {
this.kafka = new KafkaJS(defaults.kafkaConfig);
}
const consumerConfig = ro.consumerConfig ?? { groupId: `${this.safeGroupName(topics)}` };
const consumer = this.kafka.consumer(consumerConfig);
await consumer.connect();
// A temporary workaround for https://github.com/tulios/kafkajs/pull/1558 until it gets fixed
// If topic autocreation is on and you try to subscribe to a nonexistent topic, KafkaJS should retry until the topic is created.
// However, it has a bug where it won't. Thus, we retry instead.
const maxRetries = defaults.kafkaConfig.retry ? defaults.kafkaConfig.retry.retries ?? 5 : 5;
let retryTime = defaults.kafkaConfig.retry ? defaults.kafkaConfig.retry.maxRetryTime ?? 300 : 300;
const multiplier = defaults.kafkaConfig.retry ? defaults.kafkaConfig.retry.multiplier ?? 2 : 2;
for (let i = 0; i < maxRetries; i++) {
try {
await consumer.subscribe({ topics: topics, fromBeginning: true });
break;
} catch (error) {
const e = error as KafkaJSProtocolError;
if (e.code === 3 && i + 1 < maxRetries) { // UNKNOWN_TOPIC_OR_PARTITION
await sleepms(retryTime);
retryTime *= multiplier;
continue;
} else {
throw e
}
}
}
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// This combination uniquely identifies a message for a given Kafka cluster
const workflowUUID = `kafka-unique-id-${topic}-${partition}-${message.offset}`
const wfParams = { workflowUUID: workflowUUID, configuredInstance: null };
// All operations annotated with Kafka decorators must take in these three arguments
const args: KafkaArgs = [topic, partition, message];
// We can only guarantee exactly-once-per-message execution of transactions and workflows.
if (method.txnConfig) {
// Execute the transaction
await this.executor!.transaction(method.registeredFunction as TransactionFunction<unknown[], unknown>, wfParams, ...args);
} else if (method.workflowConfig) {
// Safely start the workflow
await this.executor!.workflow(method.registeredFunction as unknown as WorkflowFunction<unknown[], unknown>, wfParams, ...args);
}
},
})
this.consumers.push(consumer);
}
}
}

async destroy() {
for (const consumer of this.consumers) {
await consumer.disconnect();
}
}

safeGroupName(topics: Array<string | RegExp>) {
const safeGroupIdPart = topics
.map(r => r.toString())
.map( r => r.replaceAll(/[^a-zA-Z0-9\\-]/g, ''))
.join('-');
return `dbos-kafka-group-${safeGroupIdPart}`.slice(0, 255);
}

logRegisteredEndpoints() {
if (!this.executor) return;
const logger = this.executor.logger;
logger.info("Kafka endpoints supported:");
const regops = this.executor.getRegistrationsFor(this);
regops.forEach((registeredOperation) => {
const ro = registeredOperation.methodConfig as KafkaRegistrationInfo;
if (ro.kafkaTopics) {
const cname = registeredOperation.methodReg.className;
const mname = registeredOperation.methodReg.name;
if (Array.isArray(ro.kafkaTopics)) {
ro.kafkaTopics.forEach( kafkaTopic => {
logger.info(` ${kafkaTopic} -> ${cname}.${mname}`);
});
} else {
logger.info(` ${ro.kafkaTopics} -> ${cname}.${mname}`);
}
}
});
}
}

/////////////////////////////
/* Kafka Method Decorators */
/////////////////////////////

let kafkaInst: DBOSKafka | undefined = undefined;

export interface KafkaRegistrationInfo {
kafkaTopics?: string | RegExp | Array<string | RegExp>;
consumerConfig?: ConsumerConfig;
}

export function KafkaConsume(topics: string | RegExp | Array<string | RegExp>, consumerConfig?: ConsumerConfig) {
function kafkadec<This, Ctx extends DBOSContext, Return>(
target: object,
propertyKey: string,
inDescriptor: TypedPropertyDescriptor<(this: This, ctx: Ctx, ...args: KafkaArgs) => Promise<Return>>
) {
if (!kafkaInst) kafkaInst = new DBOSKafka();
const {descriptor, receiverInfo} = associateMethodWithEventReceiver(kafkaInst, target, propertyKey, inDescriptor);

const kafkaRegistration = receiverInfo as KafkaRegistrationInfo;
kafkaRegistration.kafkaTopics = topics;
kafkaRegistration.consumerConfig = consumerConfig;

return descriptor;
}
return kafkadec;
}

/////////////////////////////
/* Kafka Class Decorators */
/////////////////////////////

export interface KafkaDefaults {
kafkaConfig?: KafkaConfig;
}

export function Kafka(kafkaConfig: KafkaConfig) {
function clsdec<T extends { new(...args: unknown[]): object }>(ctor: T) {
if (!kafkaInst) kafkaInst = new DBOSKafka();
const kafkaInfo = associateClassWithEventReceiver(kafkaInst, ctor) as KafkaDefaults;
kafkaInfo.kafkaConfig = kafkaConfig;
}
return clsdec;
}

//////////////////////////////
/* Producer Step */
//////////////////////////////
export class KafkaProduceStep extends ConfiguredInstance
{
producer: Producer | undefined = undefined;
topic: string = "";

constructor(name: string, readonly cfg: KafkaConfig, topic: string, readonly pcfg ?: ProducerConfig) {
super(name);
this.topic = topic;
}

async initialize(_ctx: InitContext): Promise<void> {
const kafka = new KafkaJS(this.cfg);
this.producer = kafka.producer(this.pcfg);
await this.producer.connect();
return Promise.resolve();
}

@Step()
async sendMessage(_ctx: StepContext, msg: Message) {
return await this.producer?.send({topic: this.topic, messages:[msg]});
}

@Step()
async sendMessages(_ctx: StepContext, msg: Message[]) {
return await this.producer?.send({topic: this.topic, messages:msg});
}

async disconnect() {
await this.producer?.disconnect();
}
}
8 changes: 8 additions & 0 deletions packages/dbos-kafkajs/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testRegex: '((\\.|/)(test|spec))\\.ts?$',
moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'json', 'node'],
modulePaths: ["./"],
};
17 changes: 17 additions & 0 deletions packages/dbos-kafkajs/kafkajs-test-dbos-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# To enable auto-completion and validation for this file in VSCode, install the RedHat YAML extension
# https://marketplace.visualstudio.com/items?itemName=redhat.vscode-yaml

# yaml-language-server: $schema=https://raw.githubusercontent.com/dbos-inc/dbos-transact/main/dbos-config.schema.json

database:
hostname: 'localhost'
port: 5432
username: 'postgres'
password: ${PGPASSWORD}
app_db_name: 'postgres'
connectionTimeoutMillis: 3000
app_db_client: 'knex'
migrate:
- npx knex migrate:latest
rollback:
- npx knex migrate:rollback
Loading