From 317ffb47a7aeeae4af669977314b0dd36fb2d939 Mon Sep 17 00:00:00 2001 From: "shuju.jiang" <1803248734@qq.com> Date: Mon, 8 Apr 2024 19:09:03 +0800 Subject: [PATCH] [ISSUE #4047] Add OpenAI configuration and adjust DTO --- .../source/config/ChatGPTSourceConfig.java | 5 + .../config/ChatGPTSourceConnectorConfig.java | 4 +- .../chatgpt/source/config/OpenaiConfig.java | 25 +++ .../source/config/OpenaiProxyConfig.java | 12 ++ .../connector/ChatGPTSourceConnector.java | 178 +++++++++++------- .../chatgpt/source/dto/ChatGPTRequestDTO.java | 26 +++ .../src/main/resources/prompt | 21 +++ .../src/main/resources/source-config.yml | 47 +++-- .../connector/ChatGPTSourceConnectorTest.java | 1 - .../src/test/resources/source-config.yml | 22 ++- 10 files changed, 251 insertions(+), 90 deletions(-) create mode 100644 eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java create mode 100644 eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java create mode 100644 eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java create mode 100644 eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java index fac00904e4..9596866910 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java @@ -27,4 +27,9 @@ public class ChatGPTSourceConfig extends SourceConfig { public ChatGPTSourceConnectorConfig connectorConfig; + + public OpenaiProxyConfig openaiProxyConfig; + + public OpenaiConfig openaiConfig; + } diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java index 032adc45c5..f958bc2acf 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java @@ -30,8 +30,6 @@ public class ChatGPTSourceConnectorConfig { private int idleTimeout; - private String openaiToken; - - private String openaiModel; + private boolean proxyEnable; } diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java new file mode 100644 index 0000000000..1281b65714 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java @@ -0,0 +1,25 @@ +package org.apache.eventmesh.connector.chatgpt.source.config; + + +import java.util.List; +import java.util.Map; + +import lombok.Data; + +@Data +public class OpenaiConfig { + + private String token; + private String model; + private long timeout; + private Double temperature; + private Integer maxTokens; + private Boolean logprob; + private Double topLogprobs; + private Map logitBias; + private Double frequencyPenalty; + private Double presencePenalty; + private String user; + private List stop; + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java new file mode 100644 index 0000000000..9bfffc886d --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java @@ -0,0 +1,12 @@ +package org.apache.eventmesh.connector.chatgpt.source.config; + +import lombok.Data; + +@Data +public class OpenaiProxyConfig { + + private String host; + + private int port; + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java index f3ea6be3e7..c28644065b 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java @@ -18,9 +18,20 @@ package org.apache.eventmesh.connector.chatgpt.source.connector; +import okhttp3.OkHttpClient; +import retrofit2.Retrofit; + +import static com.theokanning.openai.service.OpenAiService.defaultObjectMapper; +import static com.theokanning.openai.service.OpenAiService.defaultRetrofit; + import org.apache.eventmesh.common.ThreadPoolFactory; import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.utils.AssertUtils; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiConfig; +import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiProxyConfig; import org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig; +import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO; import org.apache.eventmesh.openconnect.api.config.Config; import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; @@ -28,6 +39,8 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.util.CloudEventUtil; +import java.net.InetSocketAddress; +import java.net.Proxy; import java.net.URI; import java.time.Duration; import java.time.ZonedDateTime; @@ -50,13 +63,14 @@ import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.BodyHandler; -import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.theokanning.openai.client.OpenAiApi; import com.theokanning.openai.completion.chat.ChatCompletionRequest; +import com.theokanning.openai.completion.chat.ChatCompletionRequest.ChatCompletionRequestBuilder; import com.theokanning.openai.completion.chat.ChatMessage; import com.theokanning.openai.completion.chat.ChatMessageRole; import com.theokanning.openai.service.OpenAiService; -import lombok.Data; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -68,22 +82,13 @@ public class ChatGPTSourceConnector implements Source { private BlockingQueue queue; private HttpServer server; private OpenAiService openAiService; - private ExecutorService consumeExecutorService; - - @Data - private static class ChatGPTBody { - - String prompt; - - String source; - - String subject; - - @JsonProperty("datacontenttype") - String dataContentType; + private final ExecutorService chatgptSourceExecutorService = + ThreadPoolFactory.createThreadPoolExecutor( + Runtime.getRuntime().availableProcessors() * 2, + Runtime.getRuntime().availableProcessors() * 2, + "ChatGPTSourceThread"); - String type; - } + private String chatCompletionRequestTemplateStr; @Override public Class configClass() { @@ -103,11 +108,68 @@ public void init(ConnectorContext connectorContext) { doInit(); } - private CloudEvent genGptConnectRecord(ChatGPTBody event) { + private void initOpenAi() { + OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig(); + AssertUtils.isTrue(openaiConfig.getTimeout() > 0, "openaiTimeout must be >= 0"); + boolean proxyEnable = sourceConfig.connectorConfig.isProxyEnable(); + if (proxyEnable) { + OpenaiProxyConfig chatgptProxyConfig = sourceConfig.openaiProxyConfig; + if (chatgptProxyConfig.getHost() == null) { + throw new IllegalStateException("chatgpt proxy config 'host' cannot be null"); + } + ObjectMapper mapper = defaultObjectMapper(); + Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(chatgptProxyConfig.getHost(), chatgptProxyConfig.getPort())); + OkHttpClient client = OpenAiService + .defaultClient(openaiConfig.getToken(), Duration.ofSeconds(openaiConfig.getTimeout())) + .newBuilder() + .proxy(proxy) + .build(); + Retrofit retrofit = defaultRetrofit(client, mapper); + OpenAiApi api = retrofit.create(OpenAiApi.class); + this.openAiService = new OpenAiService(api); + } else { + this.openAiService = + new OpenAiService(openaiConfig.getToken(), Duration.ofSeconds(openaiConfig.getTimeout())); + } + ChatCompletionRequestBuilder builder = ChatCompletionRequest + .builder() + .model(openaiConfig.getModel()); + AssertUtils.notNull(openaiConfig.getModel(), "model cannot be null"); + builder = builder.model(openaiConfig.getModel()); + if (openaiConfig.getUser() != null) { + builder = builder.user(openaiConfig.getUser()); + } + if (openaiConfig.getPresencePenalty() != null) { + builder = builder.presencePenalty(openaiConfig.getPresencePenalty()); + } + if (openaiConfig.getFrequencyPenalty() != null) { + builder = builder.frequencyPenalty(openaiConfig.getFrequencyPenalty()); + } + if (openaiConfig.getMaxTokens() != null) { + builder = builder.maxTokens(openaiConfig.getMaxTokens()); + } + if (openaiConfig.getTemperature() != null) { + builder = builder.temperature(openaiConfig.getTemperature()); + } + if (openaiConfig.getLogitBias() != null && !openaiConfig.getLogitBias().isEmpty()) { + builder = builder.logitBias(openaiConfig.getLogitBias()); + } + if (openaiConfig.getStop() != null && !openaiConfig.getStop().isEmpty()) { + builder = builder.stop(openaiConfig.getStop()); + } + this.chatCompletionRequestTemplateStr = JsonUtils.toJSONString(builder.build()); + } + + public ChatCompletionRequest newChatCompletionRequest(List chatMessages) { + ChatCompletionRequest request = JsonUtils.parseObject(chatCompletionRequestTemplateStr, ChatCompletionRequest.class); + request.setMessages(chatMessages); + return request; + } + + private CloudEvent genGptConnectRecord(ChatGPTRequestDTO event) { List chatMessages = new ArrayList<>(); chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), event.getPrompt())); - ChatCompletionRequest req = - ChatCompletionRequest.builder().messages(chatMessages).model(sourceConfig.connectorConfig.getOpenaiModel()).build(); + ChatCompletionRequest req = newChatCompletionRequest(chatMessages); StringBuilder gptData = new StringBuilder(); try { @@ -117,64 +179,40 @@ private CloudEvent genGptConnectRecord(ChatGPTBody event) { log.error("Failed to generate GPT connection record: {}", e.getMessage()); } - return CloudEventBuilder.v1() - .withId(UUID.randomUUID().toString()) - .withSource(URI.create(event.getSource())) - .withType(event.getType()) - .withTime(ZonedDateTime.now().toOffsetDateTime()) - .withData(gptData.toString().getBytes()) - .withSubject(event.getSubject()) - .withDataContentType(event.getDataContentType()) - .build(); + return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create(event.getSource())).withType(event.getType()) + .withTime(ZonedDateTime.now().toOffsetDateTime()).withData(gptData.toString().getBytes()).withSubject(event.getSubject()) + .withDataContentType(event.getDataContentType()).build(); } - /** - * use proxy: - * ObjectMapper mapper = defaultObjectMapper(); - * Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 7890)); - * OkHttpClient client = - * OpenAiService.defaultClient(sourceConfig.connectorConfig.getOpenaiToken(), Duration.ofSeconds(60L)).newBuilder().proxy(proxy).build(); - * Retrofit retrofit = defaultRetrofit(client, mapper); - * OpenAiApi api = retrofit.create(OpenAiApi.class); - * this.openAiService = new OpenAiService(api); - */ @SuppressWarnings("checkstyle:WhitespaceAround") private void doInit() { - - this.openAiService = new OpenAiService(sourceConfig.connectorConfig.getOpenaiToken(), Duration.ofSeconds(60L)); + initOpenAi(); this.queue = new LinkedBlockingQueue<>(1024); - this.consumeExecutorService = ThreadPoolFactory.createThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 2, - Runtime.getRuntime().availableProcessors() * 2, - "ChatGPTSourceThread"); final Vertx vertx = Vertx.vertx(); final Router router = Router.router(vertx); - router.route() - .path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST) - .handler(BodyHandler.create()) - .handler(ctx -> { - try { - RequestBody body = ctx.body(); - ChatGPTBody bodyObject = body.asPojo(ChatGPTBody.class); - if (bodyObject.getSubject() == null || bodyObject.getDataContentType() == null || bodyObject.getPrompt() == null) { - throw new IllegalStateException("Attributes 'subject', 'datacontenttype', and 'prompt' cannot be null"); - } - consumeExecutorService.execute(() -> { - try { - CloudEvent cloudEvent = genGptConnectRecord(bodyObject); - queue.add(cloudEvent); - log.info("[ChatGPTSourceConnector] Succeed to convert payload into CloudEvent."); - ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end(); - } catch (Exception e) { - log.error("[ChatGPTSourceConnector]Error processing request: {}", e.getMessage(), e); - ctx.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end(); - } - }); - } catch (Exception e) { - log.error("[ChatGPTSourceConnector] Malformed request. StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), e); - ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end(); + router.route().path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST).handler(BodyHandler.create()).handler(ctx -> { + try { + RequestBody body = ctx.body(); + ChatGPTRequestDTO bodyObject = body.asPojo(ChatGPTRequestDTO.class); + if (bodyObject.getSubject() == null || bodyObject.getDataContentType() == null || bodyObject.getPrompt() == null) { + throw new IllegalStateException("Attributes 'subject', 'datacontenttype', and 'prompt' cannot be null"); } - }); + chatgptSourceExecutorService.execute(() -> { + try { + CloudEvent cloudEvent = genGptConnectRecord(bodyObject); + queue.add(cloudEvent); + log.info("[ChatGPTSourceConnector] Succeed to convert payload into CloudEvent."); + ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end(); + } catch (Exception e) { + log.error("[ChatGPTSourceConnector] Error processing request: {}", e.getMessage(), e); + ctx.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end(); + } + }); + } catch (Exception e) { + log.error("[ChatGPTSourceConnector] Malformed request. StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), e); + ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end(); + } + }); this.server = vertx.createHttpServer(new HttpServerOptions().setPort(this.sourceConfig.connectorConfig.getPort()) .setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router); } diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java new file mode 100644 index 0000000000..d99ebf1837 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java @@ -0,0 +1,26 @@ +package org.apache.eventmesh.connector.chatgpt.source.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ChatGPTRequestDTO { + + private String source; + + private String subject; + + @JsonProperty("datacontenttype") + private String dataContentType; + + private String type; + + private String prompt; + +} diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt new file mode 100644 index 0000000000..69a8852fb6 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt @@ -0,0 +1,21 @@ +You are an AI assistant named CloudEventsConverter. Your task is to convert input text provided by the user into a CloudEvents-formatted JSON object, avoid escape characters . + +For the following text, extract the following information: + +Create a CloudEvents-formatted JSON object with the following fields: +- specversion: Set to "1.0" (the current CloudEvents specification version) +- type: Set to \\\ {type} \\\ +- source: Set to \\\ {source} \\\ +- id: Set to \\\ {id} \\\ (Generate a unique identifier for the event (e.g., "A234-1234-1234")) +- time: Set to \\\ {time} \\\ (the current timestamp in ISO 8601 format. e.g:"2023-03-25T12:34:56.789Z") +- datacontenttype: Set to \\\ {datacontenttype} \\\ +- data: Set to the input text provided by the user +\\\ +{fields} +\\\ + +text: \\\ {text} \\\ + +If any of the fields marked as \\\ {} \\\ are null or empty, use a default value. + +Return the CloudEvents-formatted JSON object to the user,The format of the data field matches the datacontenttype,Just need to return the JSON object, nothing else needs to be returned。 diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml index 0241f66ec6..23c70bd32b 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml @@ -16,18 +16,37 @@ # pubSubConfig: - meshAddress: 127.0.0.1:10000 - subject: TopicTest - idc: FT - env: PRD - group: chatgptSource - appId: 5032 - userName: chatgptSourceUser - passWord: chatgptPassWord + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: chatgptSource + appId: 5032 + userName: chatgptSourceUser + passWord: chatgptPassWord connectorConfig: - connectorName: chatgptSource - path: /chatgpt - port: 3756 - idleTimeout: 999 - openaiToken: - openaiModel: gpt-3.5-turbo + connectorName: chatgptSource + path: /chatgpt + port: 3756 + idleTimeout: 999 + proxyEnable: true + +# https://platform.openai.com/docs/api-reference/chat/create +openaiConfig: + token: + model: gpt-3.5-turbo + timeout: 60 + temperature: 1 + maxTokens: + frequencyPenalty: 0 + presencePenalty: 0 + user: eventMesh + stop: [] + logitBias: { + + } + +openaiProxyConfig: + host: 127.0.0.1 + port: 7890 + diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java index 40a71bcc4d..656e870357 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java @@ -71,7 +71,6 @@ void testPoll() throws Exception { List res = connector.poll(); Assertions.assertEquals(batchSize, res.size()); - // test invalid requests HttpPost invalidPost = new HttpPost(uri); TestEvent event = new TestEvent(); diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml index 0241f66ec6..a3c0187515 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml @@ -29,5 +29,23 @@ connectorConfig: path: /chatgpt port: 3756 idleTimeout: 999 - openaiToken: - openaiModel: gpt-3.5-turbo + proxyEnable: true + +# https://platform.openai.com/docs/api-reference/chat/create +openaiConfig: + token: + model: gpt-3.5-turbo + timeout: 60 + temperature: 1 + maxTokens: + frequencyPenalty: 0 + presencePenalty: 0 + user: eventMesh + stop: [] + logitBias: { + + } + +openaiProxyConfig: + host: 127.0.0.1 + port: 7890