Skip to content

Commit

Permalink
tring to use the connector abstractly in client
Browse files Browse the repository at this point in the history
  • Loading branch information
ofirelarat committed Sep 8, 2023
1 parent a7a71b8 commit 9e6c99f
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 158 deletions.
8 changes: 6 additions & 2 deletions client/src/components/connectors-table.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Connector } from '@/models/connector';
import { Connector, KafkaConnector, RedisConnector } from '@/models/connector';
import { Accordion } from '@mantine/core';

interface TableProps {
Expand All @@ -19,7 +19,11 @@ export default function PipelinesTable({ connector }: TableProps) {
<Accordion.Item value="customization" key={connector.name}>
<Accordion.Control>Connector</Accordion.Control>
<Accordion.Panel>
{connector.name} ({connector.type}), {connector.brokers} | {connector.group_id}
{connector.name} ({connector.type}),
{connector.type == 'kafka' ?
`${(connector as KafkaConnector).brokers} | ${(connector as KafkaConnector).group_id}`
: `${(connector as RedisConnector).host} | ${(connector as RedisConnector).port}`
}
</Accordion.Panel>
</Accordion.Item>
</Accordion>
Expand Down
14 changes: 10 additions & 4 deletions client/src/models/connector.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
export interface Connector {
name: string;
type: 'kafka' | 'redis';
brokers?: string;
group_id?: string;
host?: string;
port?: number
}

export interface KafkaConnector extends Connector {
brokers: string;
group_id: string;
}

export interface RedisConnector extends Connector {
host: string;
port: number
}
4 changes: 2 additions & 2 deletions client/src/models/pipeline-list.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Connector } from './connector';
import { KafkaConnector, RedisConnector } from './connector';
import { Pipeline } from './pipeline';

export interface PipelineList {
name: string;
pipelines: Pipeline[];
connector: Connector;
connector: RedisConnector | KafkaConnector;
}
39 changes: 14 additions & 25 deletions client/src/pages/api/create-topics.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { Kafka } from 'kafkajs';
import type { NextApiRequest, NextApiResponse } from 'next';
import { Pipeline } from '@/models/pipeline';
import { Connector } from '@/models/connector';
import { Connector, KafkaConnector } from '@/models/connector';
import { createTopics } from '@/tools/kafka';

const kafkaHandler = async (connector: KafkaConnector, pipelines: Pipeline[]) => {
await createTopics(connector, pipelines);
}

const redisHandler = () => {

}

export default async function handler(
req: NextApiRequest,
Expand All @@ -12,31 +20,12 @@ export default async function handler(
const connector = req.body.kafkaConnector as Connector;

console.log(connector);

// Create the client with the broker list
const kafka = new Kafka({
clientId: 'test-client',
brokers: [...connector.brokers.split(',')],
});

const admin = kafka.admin();
const topics = Array.from(new Set(pipelines.map((p) => p.input.topic)));
try {
console.log('start to deleting topics');
await admin.deleteTopics({ topics: topics });
} catch {
console.log('failed to delete topics');
if (connector.type === 'kafka') {
await kafkaHandler(connector as KafkaConnector, pipelines);
} else if (connector.type === 'redis') {
redisHandler();
}

try {
console.log('start to creating topics');
await admin.createTopics({
topics: topics.map((x) => ({ topic: x })),
waitForLeaders: false,
});
} catch {
console.log('failed to create topics');
}

res.status(200).json({ result: 'succeed' });
} else {
Expand Down
57 changes: 9 additions & 48 deletions client/src/pages/api/pipeline-metadata.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,10 @@
import { Consumer, Kafka } from 'kafkajs';
import type { NextApiRequest, NextApiResponse } from 'next';
import { Pipeline } from '@/models/pipeline';
import { Connector } from '@/models/connector';
import { Connector, KafkaConnector } from '@/models/connector';
import { getTopicMetadata } from '@/tools/kafka';

let consumer: Consumer | null = null;

const getPipelineMetadata = async (pipeline: Pipeline, connector: Connector) => {
const kafka = new Kafka({
clientId: 'test-client',
brokers: [...connector.brokers.split(',')],
});

console.log('start get metadata');

const admin = kafka.admin();
const topicOffsets = await admin.fetchTopicOffsets(pipeline.input.topic);
let latestMsgTime = undefined;
const promise = new Promise(async (resolve, reject) => {
if (!consumer) {
consumer = kafka.consumer({ groupId: 'reader' });
await consumer.connect();
await consumer.subscribe({
topics: [pipeline.output.topic],
fromBeginning: true,
});

consumer
.run({
eachBatchAutoResolve: false,
autoCommit: false,
eachMessage: async ({ topic,partition, message}) => {
console.log(message)
resolve({msg: message, topicOffsets: topicOffsets});
}
})
.catch((err) => reject(err));

consumer.seek({topic: pipeline.input.topic,partition: topicOffsets[0].partition, offset: topicOffsets[0].offset})
} else {
resolve(undefined);
}
});

return await promise;
const kafkaHandler = async (connector: KafkaConnector, pipeline: Pipeline) => {
return getTopicMetadata(connector, pipeline);
};

export default async function handler(
Expand All @@ -53,14 +15,13 @@ export default async function handler(
const pipeline = req.body.pipeline as Pipeline;
const connector = req.body.kafkaConnector as Connector;

const metadata = await getPipelineMetadata(pipeline, connector);

if (consumer) {
console.log('retrived messages');
await consumer.disconnect();
consumer = null;
let metadata;
if (connector.type === 'kafka') {
metadata = await kafkaHandler(connector as KafkaConnector, pipeline);
}

console.log('retrived messages');

res.status(200).json({ pipelineMetadata: metadata });
} else {
res.status(405);
Expand Down
24 changes: 9 additions & 15 deletions client/src/pages/api/test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { Kafka } from 'kafkajs';
import type { NextApiRequest, NextApiResponse } from 'next';
import { Pipeline } from '@/models/pipeline';
import { Connector } from '@/models/connector';
import { Connector, KafkaConnector } from '@/models/connector';
import { produceTest } from '@/tools/kafka';

const kafkaHandler = async (connector: KafkaConnector, pipeline: Pipeline, message: string) => {
await produceTest(connector, pipeline, message);
}

export default async function handler(
req: NextApiRequest,
Expand All @@ -13,19 +17,9 @@ export default async function handler(
const message = req.body.message;
console.log(connector);

// Create the client with the broker list
const kafka = new Kafka({
clientId: 'test-client',
brokers: [...connector.brokers.split(',')],
});

const producer = kafka.producer();

await producer.connect();
await producer.send({
topic: pipeline.input.topic,
messages: [{ key: 'test-key', value: message }],
});
if (connector.type === 'kafka') {
await kafkaHandler(connector as KafkaConnector, pipeline, message);
}

res.status(200).json({ result: 'succeed' });
} else {
Expand Down
71 changes: 9 additions & 62 deletions client/src/pages/api/transformed-messages.ts
Original file line number Diff line number Diff line change
@@ -1,62 +1,10 @@
import { Consumer, Kafka } from 'kafkajs';
import type { NextApiRequest, NextApiResponse } from 'next';
import { Pipeline } from '@/models/pipeline';
import { Connector } from '@/models/connector';
import { Connector, KafkaConnector } from '@/models/connector';
import { getMessages } from '@/tools/kafka';

let consumer: Consumer | null = null;

const getMessages = async (pipeline: Pipeline, connector: Connector) => {
const kafka = new Kafka({
clientId: 'test-client',
brokers: [...connector.brokers.split(',')],
});

console.log('start get messages');
const promise = new Promise(async (resolve, reject) => {
if (!consumer) {
consumer = kafka.consumer({ groupId: 'reader' });
await consumer.connect();
await consumer.subscribe({
topics: [pipeline.output.topic],
fromBeginning: true,
});

consumer
.run({
eachBatchAutoResolve: false,
autoCommit: false,
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
const messages: {
timestamp: string;
value: string | undefined;
}[] = [];
for (let message of batch.messages) {
console.log({
value: message.value
? message.value.toString()
: 'null',
partition: batch.partition,
offset: message.offset,
timestamp: message.timestamp,
});
messages.push({
timestamp: message.timestamp,
value: message.value
? message.value.toString()
: 'null',
});
}

resolve(messages);
},
})
.catch((err) => reject(err));
} else {
resolve(undefined);
}
});

return await promise;
const kafkaHandler = async (connector: KafkaConnector, pipeline: Pipeline) => {
return getMessages(connector, pipeline)
};

export default async function handler(
Expand All @@ -67,14 +15,13 @@ export default async function handler(
const pipeline = req.body.pipeline as Pipeline;
const connector = req.body.kafkaConnector as Connector;

const messages = await getMessages(pipeline, connector);

if (consumer) {
console.log('retrived messages');
await consumer.disconnect();
consumer = null;
let messages;
if (connector.type === 'kafka') {
messages = await kafkaHandler(connector as KafkaConnector, pipeline);
}

console.log('retrived messages');

res.status(200).json({ transformedMessages: messages });
} else {
res.status(405);
Expand Down
Loading

0 comments on commit 9e6c99f

Please sign in to comment.