Skip to content

Commit

Permalink
⚡ (openai) Stream chat completion to avoid serverless timeout
Browse files Browse the repository at this point in the history
Closes #520
  • Loading branch information
baptisteArno committed May 25, 2023
1 parent 6bb6a2b commit 636f40f
Show file tree
Hide file tree
Showing 39 changed files with 555 additions and 121 deletions.
5 changes: 0 additions & 5 deletions apps/builder/public/templates/basic-chat-gpt.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@
"task": "Create chat completion",
"model": "gpt-3.5-turbo",
"messages": [
{
"id": "fxg16pnlnwuhfpz1r51xslbd",
"role": "system",
"content": "You are ChatGPT, a large language model trained by OpenAI."
},
{
"id": "vexqydoltfc5fkdrcednlvjz",
"role": "Messages sequence ✨",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ test.describe.parallel('Google sheets integration', () => {
.press('Enter')
await expect(
page.locator('typebot-standard').locator('text=Your name is:')
).toHaveText(`Your name is: Georges2 Smith2`)
).toHaveText(`Your name is: Georges2 Last name`)
})
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export const createCredentials = authenticatedProcedure
if (!workspace)
throw new TRPCError({ code: 'NOT_FOUND', message: 'Workspace not found' })

const { encryptedData, iv } = encrypt(credentials.data)
const { encryptedData, iv } = await encrypt(credentials.data)
const createdCredentials = await prisma.credentials.create({
data: {
...credentials,
Expand Down
2 changes: 1 addition & 1 deletion apps/builder/src/features/editor/editor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ test('Rename and icon change should work', async ({ page }) => {
])

await page.goto(`/typebots/${typebotId}/edit`)

await page.click('[data-testid="editable-icon"]')
await page.getByRole('button', { name: 'Emoji' }).click()
await expect(page.locator('text="My awesome typebot"')).toBeVisible()
await page.fill('input[placeholder="Search..."]', 'love')
await page.click('text="😍"')
Expand Down
5 changes: 1 addition & 4 deletions apps/builder/src/features/settings/settings.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ test.describe.parallel('Settings page', () => {
await page.click('text="Typebot.io branding"')
await expect(page.locator('a:has-text("Made with Typebot")')).toBeHidden()

await page.click('text="Remember session"')
await expect(
page.locator('input[type="checkbox"] >> nth=-3')
).toHaveAttribute('checked', '')
await page.click('text="Remember user"')

await expect(page.getByPlaceholder('Type your answer...')).toHaveValue(
'Baptiste'
Expand Down
5 changes: 3 additions & 2 deletions apps/builder/src/features/workspace/workspaces.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ test('can update workspace info', async ({ page }) => {
await page.click('text=Settings & Members')
await page.click('text="Settings"')
await page.click('[data-testid="editable-icon"]')
await page.getByRole('button', { name: 'Emoji' }).click()
await page.fill('input[placeholder="Search..."]', 'building')
await page.click('text="🏦"')
await page.waitForTimeout(500)
Expand All @@ -92,13 +93,13 @@ test('can manage members', async ({ page }) => {
page.getByRole('heading', { name: 'Members (1/5)' })
).toBeVisible()
await expect(page.locator('text="user@email.com"').nth(1)).toBeVisible()
await expect(page.locator('button >> text="Invite"')).toBeEnabled()
await expect(page.locator('button >> text="Invite"')).toBeDisabled()
await page.fill(
'input[placeholder="colleague@company.com"]',
'guest@email.com'
)
await page.click('button >> text="Invite"')
await expect(page.locator('button >> text="Invite"')).toBeEnabled()
await expect(page.locator('button >> text="Invite"')).toBeVisible()
await expect(
page.locator('input[placeholder="colleague@company.com"]')
).toHaveAttribute('value', '')
Expand Down
6 changes: 3 additions & 3 deletions apps/builder/src/lib/googleSheets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ export const getAuthenticatedGoogleClient = async (
where: { id: credentialsId, workspace: { members: { some: { userId } } } },
})) as CredentialsFromDb | undefined
if (!credentials) return
const data = decrypt(
const data = (await decrypt(
credentials.data,
credentials.iv
) as GoogleSheetsCredentials['data']
)) as GoogleSheetsCredentials['data']

oauth2Client.setCredentials(data)
oauth2Client.on('tokens', updateTokens(credentials.id, data))
Expand All @@ -47,7 +47,7 @@ const updateTokens =
expiry_date: credentials.expiry_date,
access_token: credentials.access_token,
}
const { encryptedData, iv } = encrypt(newCredentials)
const { encryptedData, iv } = await encrypt(newCredentials)
await prisma.credentials.update({
where: { id: credentialsId },
data: { data: encryptedData, iv },
Expand Down
2 changes: 1 addition & 1 deletion apps/builder/src/pages/api/credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const handler = async (req: NextApiRequest, res: NextApiResponse) => {
const data = (
typeof req.body === 'string' ? JSON.parse(req.body) : req.body
) as Credentials
const { encryptedData, iv } = encrypt(data.data)
const { encryptedData, iv } = await encrypt(data.data)
const workspace = await prisma.workspace.findFirst({
where: { id: workspaceId, members: { some: { userId: user.id } } },
select: { id: true },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const handler = async (req: NextApiRequest, res: NextApiResponse) => {
return res
.status(400)
.send({ message: "User didn't accepted required scopes" })
const { encryptedData, iv } = encrypt(tokens)
const { encryptedData, iv } = await encrypt(tokens)
const credentials = {
name: email,
type: 'google sheets',
Expand Down
2 changes: 2 additions & 0 deletions apps/viewer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
},
"dependencies": {
"@dqbd/tiktoken": "^1.0.7",
"@planetscale/database": "^1.7.0",
"@sentry/nextjs": "7.50.0",
"@trpc/server": "10.23.0",
"@typebot.io/js": "workspace:*",
Expand All @@ -22,6 +23,7 @@
"aws-sdk": "2.1369.0",
"bot-engine": "workspace:*",
"cors": "2.8.5",
"eventsource-parser": "^1.0.0",
"google-spreadsheet": "3.3.0",
"got": "12.6.0",
"libphonenumber-js": "1.10.28",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ const getStripeInfo = async (
where: { id: credentialsId },
})
if (!credentials) return
return decrypt(credentials.data, credentials.iv) as StripeCredentials['data']
return (await decrypt(
credentials.data,
credentials.iv
)) as StripeCredentials['data']
}

// https://stripe.com/docs/currencies#zero-decimal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,32 @@ import {
ChatReply,
SessionState,
Variable,
VariableWithUnknowValue,
VariableWithValue,
} from '@typebot.io/schemas'
import {
ChatCompletionOpenAIOptions,
OpenAICredentials,
modelLimit,
} from '@typebot.io/schemas/features/blocks/integrations/openai'
import { OpenAIApi, Configuration, ChatCompletionRequestMessage } from 'openai'
import { isDefined, byId, isNotEmpty, isEmpty } from '@typebot.io/lib'
import { decrypt } from '@typebot.io/lib/api/encryption'
import type {
ChatCompletionRequestMessage,
CreateChatCompletionRequest,
CreateChatCompletionResponse,
} from 'openai'
import { byId, isNotEmpty, isEmpty } from '@typebot.io/lib'
import { decrypt, isCredentialsV2 } from '@typebot.io/lib/api/encryption'
import { saveErrorLog } from '@/features/logs/saveErrorLog'
import { updateVariables } from '@/features/variables/updateVariables'
import { parseVariables } from '@/features/variables/parseVariables'
import { saveSuccessLog } from '@/features/logs/saveSuccessLog'
import { parseVariableNumber } from '@/features/variables/parseVariableNumber'
import { encoding_for_model } from '@dqbd/tiktoken'
import got from 'got'
import { resumeChatCompletion } from './resumeChatCompletion'
import { isPlaneteScale } from '@/helpers/api/isPlanetScale'
import { isVercel } from '@/helpers/api/isVercel'

const minTokenCompletion = 200
const createChatEndpoint = 'https://api.openai.com/v1/chat/completions'

export const createChatCompletionOpenAI = async (
state: SessionState,
Expand Down Expand Up @@ -52,13 +59,10 @@ export const createChatCompletionOpenAI = async (
console.error('Could not find credentials in database')
return { outgoingEdgeId, logs: [noCredentialsError] }
}
const { apiKey } = decrypt(
const { apiKey } = (await decrypt(
credentials.data,
credentials.iv
) as OpenAICredentials['data']
const configuration = new Configuration({
apiKey,
})
)) as OpenAICredentials['data']
const { variablesTransformedToList, messages } = parseMessages(
newSessionState.typebot.variables,
options.model
Expand All @@ -71,52 +75,39 @@ export const createChatCompletionOpenAI = async (
)

try {
const openai = new OpenAIApi(configuration)
const response = await openai.createChatCompletion({
model: options.model,
messages,
temperature,
})
const messageContent = response.data.choices.at(0)?.message?.content
const totalTokens = response.data.usage?.total_tokens
if (
isPlaneteScale() &&
isVercel() &&
isCredentialsV2(credentials) &&
newSessionState.isStreamEnabled
)
return {
clientSideActions: [{ streamOpenAiChatCompletion: { messages } }],
outgoingEdgeId,
newSessionState,
}
const response = await got
.post(createChatEndpoint, {
headers: {
Authorization: `Bearer ${apiKey}`,
},
json: {
model: options.model,
messages,
temperature,
} satisfies CreateChatCompletionRequest,
})
.json<CreateChatCompletionResponse>()
const messageContent = response.choices.at(0)?.message?.content
const totalTokens = response.usage?.total_tokens
if (isEmpty(messageContent)) {
console.error('OpenAI block returned empty message', response)
return { outgoingEdgeId, newSessionState }
}
const newVariables = options.responseMapping.reduce<
VariableWithUnknowValue[]
>((newVariables, mapping) => {
const existingVariable = newSessionState.typebot.variables.find(
byId(mapping.variableId)
)
if (!existingVariable) return newVariables
if (mapping.valueToExtract === 'Message content') {
newVariables.push({
...existingVariable,
value: Array.isArray(existingVariable.value)
? existingVariable.value.concat(messageContent)
: messageContent,
})
}
if (mapping.valueToExtract === 'Total tokens' && isDefined(totalTokens)) {
newVariables.push({
...existingVariable,
value: totalTokens,
})
}
return newVariables
}, [])
if (newVariables.length > 0)
newSessionState = await updateVariables(newSessionState)(newVariables)
state.result &&
(await saveSuccessLog({
resultId: state.result.id,
message: 'OpenAI block successfully executed',
}))
return {
return resumeChatCompletion(newSessionState, {
options,
outgoingEdgeId,
newSessionState,
}
})(messageContent, totalTokens)
} catch (err) {
const log: NonNullable<ChatReply['logs']>[number] = {
status: 'error',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { parseVariableNumber } from '@/features/variables/parseVariableNumber'
import { Connection } from '@planetscale/database'
import { decrypt } from '@typebot.io/lib/api/encryption'
import {
ChatCompletionOpenAIOptions,
OpenAICredentials,
} from '@typebot.io/schemas/features/blocks/integrations/openai'
import { SessionState } from '@typebot.io/schemas/features/chat'
import {
ParsedEvent,
ReconnectInterval,
createParser,
} from 'eventsource-parser'
import type {
ChatCompletionRequestMessage,
CreateChatCompletionRequest,
} from 'openai'

export const getChatCompletionStream =
(conn: Connection) =>
async (
state: SessionState,
options: ChatCompletionOpenAIOptions,
messages: ChatCompletionRequestMessage[]
) => {
if (!options.credentialsId) return
const credentials = (
await conn.execute('select data, iv from Credentials where id=?', [
options.credentialsId,
])
).rows.at(0) as { data: string; iv: string } | undefined
if (!credentials) {
console.error('Could not find credentials in database')
return
}
const { apiKey } = (await decrypt(
credentials.data,
credentials.iv
)) as OpenAICredentials['data']

const temperature = parseVariableNumber(state.typebot.variables)(
options.advancedSettings?.temperature
)

const encoder = new TextEncoder()
const decoder = new TextDecoder()

let counter = 0

const res = await fetch('https://api.openai.com/v1/chat/completions', {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${apiKey}`,
},
method: 'POST',
body: JSON.stringify({
messages,
model: options.model,
temperature,
stream: true,
} satisfies CreateChatCompletionRequest),
})

const stream = new ReadableStream({
async start(controller) {
function onParse(event: ParsedEvent | ReconnectInterval) {
if (event.type === 'event') {
const data = event.data
if (data === '[DONE]') {
controller.close()
return
}
try {
const json = JSON.parse(data) as {
choices: { delta: { content: string } }[]
}
const text = json.choices.at(0)?.delta.content
if (counter < 2 && (text?.match(/\n/) || []).length) {
return
}
const queue = encoder.encode(text)
controller.enqueue(queue)
counter++
} catch (e) {
controller.error(e)
}
}
}

// stream response (SSE) from OpenAI may be fragmented into multiple chunks
// this ensures we properly read chunks & invoke an event for each SSE event stream
const parser = createParser(onParse)

// https://web.dev/streams/#asynchronous-iteration
// eslint-disable-next-line @typescript-eslint/no-explicit-any
for await (const chunk of res.body as any) {
parser.feed(decoder.decode(chunk))
}
},
})

return stream
}
Loading

0 comments on commit 636f40f

Please sign in to comment.