Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡ (openai) Stream chat completion to avoid serverless timeout #526

Merged
merged 1 commit into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions apps/builder/public/templates/basic-chat-gpt.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@
"task": "Create chat completion",
"model": "gpt-3.5-turbo",
"messages": [
{
"id": "fxg16pnlnwuhfpz1r51xslbd",
"role": "system",
"content": "You are ChatGPT, a large language model trained by OpenAI."
},
{
"id": "vexqydoltfc5fkdrcednlvjz",
"role": "Messages sequence ✨",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ test.describe.parallel('Google sheets integration', () => {
.press('Enter')
await expect(
page.locator('typebot-standard').locator('text=Your name is:')
).toHaveText(`Your name is: Georges2 Smith2`)
).toHaveText(`Your name is: Georges2 Last name`)
})
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export const createCredentials = authenticatedProcedure
if (!workspace)
throw new TRPCError({ code: 'NOT_FOUND', message: 'Workspace not found' })

const { encryptedData, iv } = encrypt(credentials.data)
const { encryptedData, iv } = await encrypt(credentials.data)
const createdCredentials = await prisma.credentials.create({
data: {
...credentials,
Expand Down
2 changes: 1 addition & 1 deletion apps/builder/src/features/editor/editor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ test('Rename and icon change should work', async ({ page }) => {
])

await page.goto(`/typebots/${typebotId}/edit`)

await page.click('[data-testid="editable-icon"]')
await page.getByRole('button', { name: 'Emoji' }).click()
await expect(page.locator('text="My awesome typebot"')).toBeVisible()
await page.fill('input[placeholder="Search..."]', 'love')
await page.click('text="😍"')
Expand Down
5 changes: 1 addition & 4 deletions apps/builder/src/features/settings/settings.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ test.describe.parallel('Settings page', () => {
await page.click('text="Typebot.io branding"')
await expect(page.locator('a:has-text("Made with Typebot")')).toBeHidden()

await page.click('text="Remember session"')
await expect(
page.locator('input[type="checkbox"] >> nth=-3')
).toHaveAttribute('checked', '')
await page.click('text="Remember user"')

await expect(page.getByPlaceholder('Type your answer...')).toHaveValue(
'Baptiste'
Expand Down
5 changes: 3 additions & 2 deletions apps/builder/src/features/workspace/workspaces.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ test('can update workspace info', async ({ page }) => {
await page.click('text=Settings & Members')
await page.click('text="Settings"')
await page.click('[data-testid="editable-icon"]')
await page.getByRole('button', { name: 'Emoji' }).click()
await page.fill('input[placeholder="Search..."]', 'building')
await page.click('text="🏦"')
await page.waitForTimeout(500)
Expand All @@ -92,13 +93,13 @@ test('can manage members', async ({ page }) => {
page.getByRole('heading', { name: 'Members (1/5)' })
).toBeVisible()
await expect(page.locator('text="user@email.com"').nth(1)).toBeVisible()
await expect(page.locator('button >> text="Invite"')).toBeEnabled()
await expect(page.locator('button >> text="Invite"')).toBeDisabled()
await page.fill(
'input[placeholder="colleague@company.com"]',
'guest@email.com'
)
await page.click('button >> text="Invite"')
await expect(page.locator('button >> text="Invite"')).toBeEnabled()
await expect(page.locator('button >> text="Invite"')).toBeVisible()
await expect(
page.locator('input[placeholder="colleague@company.com"]')
).toHaveAttribute('value', '')
Expand Down
6 changes: 3 additions & 3 deletions apps/builder/src/lib/googleSheets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ export const getAuthenticatedGoogleClient = async (
where: { id: credentialsId, workspace: { members: { some: { userId } } } },
})) as CredentialsFromDb | undefined
if (!credentials) return
const data = decrypt(
const data = (await decrypt(
credentials.data,
credentials.iv
) as GoogleSheetsCredentials['data']
)) as GoogleSheetsCredentials['data']

oauth2Client.setCredentials(data)
oauth2Client.on('tokens', updateTokens(credentials.id, data))
Expand All @@ -47,7 +47,7 @@ const updateTokens =
expiry_date: credentials.expiry_date,
access_token: credentials.access_token,
}
const { encryptedData, iv } = encrypt(newCredentials)
const { encryptedData, iv } = await encrypt(newCredentials)
await prisma.credentials.update({
where: { id: credentialsId },
data: { data: encryptedData, iv },
Expand Down
2 changes: 1 addition & 1 deletion apps/builder/src/pages/api/credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const handler = async (req: NextApiRequest, res: NextApiResponse) => {
const data = (
typeof req.body === 'string' ? JSON.parse(req.body) : req.body
) as Credentials
const { encryptedData, iv } = encrypt(data.data)
const { encryptedData, iv } = await encrypt(data.data)
const workspace = await prisma.workspace.findFirst({
where: { id: workspaceId, members: { some: { userId: user.id } } },
select: { id: true },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const handler = async (req: NextApiRequest, res: NextApiResponse) => {
return res
.status(400)
.send({ message: "User didn't accepted required scopes" })
const { encryptedData, iv } = encrypt(tokens)
const { encryptedData, iv } = await encrypt(tokens)
const credentials = {
name: email,
type: 'google sheets',
Expand Down
2 changes: 2 additions & 0 deletions apps/viewer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
},
"dependencies": {
"@dqbd/tiktoken": "^1.0.7",
"@planetscale/database": "^1.7.0",
"@sentry/nextjs": "7.50.0",
"@trpc/server": "10.23.0",
"@typebot.io/js": "workspace:*",
Expand All @@ -22,6 +23,7 @@
"aws-sdk": "2.1369.0",
"bot-engine": "workspace:*",
"cors": "2.8.5",
"eventsource-parser": "^1.0.0",
"google-spreadsheet": "3.3.0",
"got": "12.6.0",
"libphonenumber-js": "1.10.28",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ const getStripeInfo = async (
where: { id: credentialsId },
})
if (!credentials) return
return decrypt(credentials.data, credentials.iv) as StripeCredentials['data']
return (await decrypt(
credentials.data,
credentials.iv
)) as StripeCredentials['data']
}

// https://stripe.com/docs/currencies#zero-decimal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,32 @@ import {
ChatReply,
SessionState,
Variable,
VariableWithUnknowValue,
VariableWithValue,
} from '@typebot.io/schemas'
import {
ChatCompletionOpenAIOptions,
OpenAICredentials,
modelLimit,
} from '@typebot.io/schemas/features/blocks/integrations/openai'
import { OpenAIApi, Configuration, ChatCompletionRequestMessage } from 'openai'
import { isDefined, byId, isNotEmpty, isEmpty } from '@typebot.io/lib'
import { decrypt } from '@typebot.io/lib/api/encryption'
import type {
ChatCompletionRequestMessage,
CreateChatCompletionRequest,
CreateChatCompletionResponse,
} from 'openai'
import { byId, isNotEmpty, isEmpty } from '@typebot.io/lib'
import { decrypt, isCredentialsV2 } from '@typebot.io/lib/api/encryption'
import { saveErrorLog } from '@/features/logs/saveErrorLog'
import { updateVariables } from '@/features/variables/updateVariables'
import { parseVariables } from '@/features/variables/parseVariables'
import { saveSuccessLog } from '@/features/logs/saveSuccessLog'
import { parseVariableNumber } from '@/features/variables/parseVariableNumber'
import { encoding_for_model } from '@dqbd/tiktoken'
import got from 'got'
import { resumeChatCompletion } from './resumeChatCompletion'
import { isPlaneteScale } from '@/helpers/api/isPlanetScale'
import { isVercel } from '@/helpers/api/isVercel'

const minTokenCompletion = 200
const createChatEndpoint = 'https://api.openai.com/v1/chat/completions'

export const createChatCompletionOpenAI = async (
state: SessionState,
Expand Down Expand Up @@ -52,13 +59,10 @@ export const createChatCompletionOpenAI = async (
console.error('Could not find credentials in database')
return { outgoingEdgeId, logs: [noCredentialsError] }
}
const { apiKey } = decrypt(
const { apiKey } = (await decrypt(
credentials.data,
credentials.iv
) as OpenAICredentials['data']
const configuration = new Configuration({
apiKey,
})
)) as OpenAICredentials['data']
const { variablesTransformedToList, messages } = parseMessages(
newSessionState.typebot.variables,
options.model
Expand All @@ -71,52 +75,39 @@ export const createChatCompletionOpenAI = async (
)

try {
const openai = new OpenAIApi(configuration)
const response = await openai.createChatCompletion({
model: options.model,
messages,
temperature,
})
const messageContent = response.data.choices.at(0)?.message?.content
const totalTokens = response.data.usage?.total_tokens
if (
isPlaneteScale() &&
isVercel() &&
isCredentialsV2(credentials) &&
newSessionState.isStreamEnabled
)
return {
clientSideActions: [{ streamOpenAiChatCompletion: { messages } }],
outgoingEdgeId,
newSessionState,
}
const response = await got
.post(createChatEndpoint, {
headers: {
Authorization: `Bearer ${apiKey}`,
},
json: {
model: options.model,
messages,
temperature,
} satisfies CreateChatCompletionRequest,
})
.json<CreateChatCompletionResponse>()
const messageContent = response.choices.at(0)?.message?.content
const totalTokens = response.usage?.total_tokens
if (isEmpty(messageContent)) {
console.error('OpenAI block returned empty message', response)
return { outgoingEdgeId, newSessionState }
}
const newVariables = options.responseMapping.reduce<
VariableWithUnknowValue[]
>((newVariables, mapping) => {
const existingVariable = newSessionState.typebot.variables.find(
byId(mapping.variableId)
)
if (!existingVariable) return newVariables
if (mapping.valueToExtract === 'Message content') {
newVariables.push({
...existingVariable,
value: Array.isArray(existingVariable.value)
? existingVariable.value.concat(messageContent)
: messageContent,
})
}
if (mapping.valueToExtract === 'Total tokens' && isDefined(totalTokens)) {
newVariables.push({
...existingVariable,
value: totalTokens,
})
}
return newVariables
}, [])
if (newVariables.length > 0)
newSessionState = await updateVariables(newSessionState)(newVariables)
state.result &&
(await saveSuccessLog({
resultId: state.result.id,
message: 'OpenAI block successfully executed',
}))
return {
return resumeChatCompletion(newSessionState, {
options,
outgoingEdgeId,
newSessionState,
}
})(messageContent, totalTokens)
} catch (err) {
const log: NonNullable<ChatReply['logs']>[number] = {
status: 'error',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { parseVariableNumber } from '@/features/variables/parseVariableNumber'
import { Connection } from '@planetscale/database'
import { decrypt } from '@typebot.io/lib/api/encryption'
import {
ChatCompletionOpenAIOptions,
OpenAICredentials,
} from '@typebot.io/schemas/features/blocks/integrations/openai'
import { SessionState } from '@typebot.io/schemas/features/chat'
import {
ParsedEvent,
ReconnectInterval,
createParser,
} from 'eventsource-parser'
import type {
ChatCompletionRequestMessage,
CreateChatCompletionRequest,
} from 'openai'

export const getChatCompletionStream =
(conn: Connection) =>
async (
state: SessionState,
options: ChatCompletionOpenAIOptions,
messages: ChatCompletionRequestMessage[]
) => {
if (!options.credentialsId) return
const credentials = (
await conn.execute('select data, iv from Credentials where id=?', [
options.credentialsId,
])
).rows.at(0) as { data: string; iv: string } | undefined
if (!credentials) {
console.error('Could not find credentials in database')
return
}
const { apiKey } = (await decrypt(
credentials.data,
credentials.iv
)) as OpenAICredentials['data']

const temperature = parseVariableNumber(state.typebot.variables)(
options.advancedSettings?.temperature
)

const encoder = new TextEncoder()
const decoder = new TextDecoder()

let counter = 0

const res = await fetch('https://api.openai.com/v1/chat/completions', {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${apiKey}`,
},
method: 'POST',
body: JSON.stringify({
messages,
model: options.model,
temperature,
stream: true,
} satisfies CreateChatCompletionRequest),
})

const stream = new ReadableStream({
async start(controller) {
function onParse(event: ParsedEvent | ReconnectInterval) {
if (event.type === 'event') {
const data = event.data
if (data === '[DONE]') {
controller.close()
return
}
try {
const json = JSON.parse(data) as {
choices: { delta: { content: string } }[]
}
const text = json.choices.at(0)?.delta.content
if (counter < 2 && (text?.match(/\n/) || []).length) {
return
}
const queue = encoder.encode(text)
controller.enqueue(queue)
counter++
} catch (e) {
controller.error(e)
}
}
}

// stream response (SSE) from OpenAI may be fragmented into multiple chunks
// this ensures we properly read chunks & invoke an event for each SSE event stream
const parser = createParser(onParse)

// https://web.dev/streams/#asynchronous-iteration
// eslint-disable-next-line @typescript-eslint/no-explicit-any
for await (const chunk of res.body as any) {
parser.feed(decoder.decode(chunk))
}
},
})

return stream
}
Loading