From 21b2c857bbfa6bddd4417e40fb76ed98ddbaa845 Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Mon, 2 Dec 2024 15:52:03 -0500 Subject: [PATCH 1/9] wip --- lib/session/call-session.js | 6 +++++ lib/tasks/say.js | 53 ++++++++++++++++++++++++++++--------- lib/tasks/tts-task.js | 22 ++++++++++++--- package-lock.json | 29 ++++++++++---------- package.json | 4 +-- 5 files changed, 80 insertions(+), 34 deletions(-) diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 1d939a2d..b2dab2e0 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -576,6 +576,12 @@ class CallSession extends Emitter { } } + get appIsUsingWebsockets() { + return this.requestor instanceof WsRequestor; + } + + /* end of getters and setters */ + async clearOrRestoreActionHookDelayProcessor() { if (this._actionHookDelayProcessor) { await this._actionHookDelayProcessor.stop(); diff --git a/lib/tasks/say.js b/lib/tasks/say.js index f1d3ad3b..0c4a9628 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -1,3 +1,4 @@ +const assert = require('assert'); const TtsTask = require('./tts-task'); const {TaskName, TaskPreconditions} = require('../utils/constants'); const pollySSMLSplit = require('polly-ssml-split'); @@ -35,24 +36,40 @@ class TaskSay extends TtsTask { super(logger, opts, parentTask); this.preconditions = TaskPreconditions.Endpoint; - this.text = (Array.isArray(this.data.text) ? this.data.text : [this.data.text]) - .map((t) => breakLengthyTextIfNeeded(this.logger, t)) - .flat(); + assert.ok((typeof this.data.text === 'string' || Array.isArray(this.data.text)) || this.data.stream === true, + 'Say: either text or stream:true is required'); - this.loop = this.data.loop || 1; - this.isHandledByPrimaryProvider = true; + + if (this.data.stream === true) { + this._isStreaming = true; + this.closeOnStreamEmpty = this.data.closeOnStreamEmpty !== false; + } + else { + this._isStreaming = false; + this.text = (Array.isArray(this.data.text) ? this.data.text : [this.data.text]) + .map((t) => breakLengthyTextIfNeeded(this.logger, t)) + .flat(); + + this.loop = this.data.loop || 1; + this.isHandledByPrimaryProvider = true; + } } get name() { return TaskName.Say; } get summary() { - for (let i = 0; i < this.text.length; i++) { - if (this.text[i].startsWith('silence_stream')) continue; - return `${this.name}{text=${this.text[i].slice(0, 15)}${this.text[i].length > 15 ? '...' : ''}}`; + if (this.isStreaming) return `${this.name} streaming`; + else { + for (let i = 0; i < this.text.length; i++) { + if (this.text[i].startsWith('silence_stream')) continue; + return `${this.name}{text=${this.text[i].slice(0, 15)}${this.text[i].length > 15 ? '...' : ''}}`; + } + return `${this.name}{${this.text[0]}}`; } - return `${this.name}{${this.text[0]}}`; } + get isStreaming() { return this._isStreaming; } + _validateURL(urlString) { try { new URL(urlString); @@ -63,14 +80,19 @@ class TaskSay extends TtsTask { } async exec(cs, obj) { + if (this.isStreaming && !cs.appIsUsingWebsockets) { + throw new Error('Say: streaming say verb requires applications to use the websocket API'); + } + try { - await this.handling(cs, obj); + if (this.isStreaming) await this.handlingStreaming(cs, obj); + else await this.handling(cs, obj); this.emit('playDone'); } catch (error) { if (error instanceof SpeechCredentialError) { // if say failed due to speech credentials, alarm is writtern and error notification is sent // finished this say to move to next task. - this.logger.info('Say failed due to SpeechCredentialError, finished!'); + this.logger.info({error}, 'Say failed due to SpeechCredentialError, finished!'); this.emit('playDone'); return; } @@ -78,6 +100,11 @@ class TaskSay extends TtsTask { } } + async handlingStreaming(cs, {ep}) { + const {vendor, language, voice, label} = this.getTtsVendorData(cs); + + } + async handling(cs, {ep}) { const {srf, accountSid:account_sid, callSid:target_sid} = cs; const {writeAlerts, AlertType} = srf.locals; @@ -96,7 +123,7 @@ class TaskSay extends TtsTask { let voice = this.synthesizer.voice && this.synthesizer.voice !== 'default' ? this.synthesizer.voice : cs.speechSynthesisVoice; - let label = this.taskInlcudeSynthesizer ? this.synthesizer.label : cs.speechSynthesisLabel; + let label = this.taskIncludeSynthesizer ? this.synthesizer.label : cs.speechSynthesisLabel; const fallbackVendor = this.synthesizer.fallbackVendor && this.synthesizer.fallbackVendor !== 'default' ? this.synthesizer.fallbackVendor : @@ -107,7 +134,7 @@ class TaskSay extends TtsTask { const fallbackVoice = this.synthesizer.fallbackVoice && this.synthesizer.fallbackVoice !== 'default' ? this.synthesizer.fallbackVoice : cs.fallbackSpeechSynthesisVoice; - const fallbackLabel = this.taskInlcudeSynthesizer ? + const fallbackLabel = this.taskIncludeSynthesizer ? this.synthesizer.fallbackLabel : cs.fallbackSpeechSynthesisLabel; if (cs.hasFallbackTts) { diff --git a/lib/tasks/tts-task.js b/lib/tasks/tts-task.js index 65fe96a7..f2285fef 100644 --- a/lib/tasks/tts-task.js +++ b/lib/tasks/tts-task.js @@ -13,11 +13,11 @@ class TtsTask extends Task { this.earlyMedia = this.data.earlyMedia === true || (parentTask && parentTask.earlyMedia); /** - * Task use taskInlcudeSynthesizer to identify - * if taskInlcudeSynthesizer === true, use label from verb.synthesizer, even it's empty - * if taskInlcudeSynthesizer === false, use label from application.synthesizer + * Task use taskIncludeSynthesizer to identify + * if taskIncludeSynthesizer === true, use label from verb.synthesizer, even it's empty + * if taskIncludeSynthesizer === false, use label from application.synthesizer */ - this.taskInlcudeSynthesizer = !!this.data.synthesizer; + this.taskIncludeSynthesizer = !!this.data.synthesizer; this.synthesizer = this.data.synthesizer || {}; this.disableTtsCache = this.data.disableTtsCache; this.options = this.synthesizer.options || {}; @@ -44,6 +44,20 @@ class TtsTask extends Task { } } + getTtsVendorData(cs) { + const vendor = this.synthesizer.vendor && this.synthesizer.vendor !== 'default' ? + this.synthesizer.vendor : + cs.speechSynthesisVendor; + const language = this.synthesizer.language && this.synthesizer.language !== 'default' ? + this.synthesizer.language : + cs.speechSynthesisLanguage ; + const voice = this.synthesizer.voice && this.synthesizer.voice !== 'default' ? + this.synthesizer.voice : + cs.speechSynthesisVoice; + const label = this.taskIncludeSynthesizer ? this.synthesizer.label : cs.speechSynthesisLabel; + return {vendor, language, voice, label}; + } + async _synthesizeWithSpecificVendor(cs, ep, {vendor, language, voice, label, preCache = false}) { const {srf, accountSid:account_sid} = cs; const {writeAlerts, AlertType, stats} = srf.locals; diff --git a/package-lock.json b/package-lock.json index 1b561e71..f1ace642 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,8 +17,8 @@ "@jambonz/realtimedb-helpers": "^0.8.8", "@jambonz/speech-utils": "^0.1.22", "@jambonz/stats-collector": "^0.1.10", - "@jambonz/time-series": "^0.2.12", - "@jambonz/verb-specifications": "^0.0.86", + "@jambonz/time-series": "^0.2.13", + "@jambonz/verb-specifications": "^0.0.88", "@opentelemetry/api": "^1.8.0", "@opentelemetry/exporter-jaeger": "^1.23.0", "@opentelemetry/exporter-trace-otlp-http": "^0.50.0", @@ -1572,19 +1572,18 @@ } }, "node_modules/@jambonz/time-series": { - "version": "0.2.12", - "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.12.tgz", - "integrity": "sha512-EYw5f1QasblWrP2K/NabpJYkQm8XOCP1fem8luO8c7Jm8YTBwI+Ge3zB7rpU8ruoVdbrTot/pcihhTqzq4IYqA==", - "license": "MIT", + "version": "0.2.13", + "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.13.tgz", + "integrity": "sha512-Kj+l+YUnI27zZA4qoPRzjN7L82W7GuMXYq9ttDjXQ0ZBIdOLAzJjB6R3jJ3b+mvoNEQ6qG5MUtfoc6CpTFH5lw==", "dependencies": { "debug": "^4.3.1", "influx": "^5.9.3" } }, "node_modules/@jambonz/verb-specifications": { - "version": "0.0.86", - "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.86.tgz", - "integrity": "sha512-nTMMeFJtkSIVD3icQajOKv+zFBiAaUNuky2htVgYKipE2AIq/H/SsdurYdBij3KQq1o58/FuUuXcqCwjnZnoUg==", + "version": "0.0.88", + "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.88.tgz", + "integrity": "sha512-gbJn/sT1DKJspB6EjnEpMWQBi983YYUbE9+9Ycfim6FUeMwD/BDoSNYkQNAxMDX8+s6NqQCMZYGOmb5qY5ZwzQ==", "dependencies": { "debug": "^4.3.4", "pino": "^8.8.0" @@ -10744,18 +10743,18 @@ } }, "@jambonz/time-series": { - "version": "0.2.12", - "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.12.tgz", - "integrity": "sha512-EYw5f1QasblWrP2K/NabpJYkQm8XOCP1fem8luO8c7Jm8YTBwI+Ge3zB7rpU8ruoVdbrTot/pcihhTqzq4IYqA==", + "version": "0.2.13", + "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.13.tgz", + "integrity": "sha512-Kj+l+YUnI27zZA4qoPRzjN7L82W7GuMXYq9ttDjXQ0ZBIdOLAzJjB6R3jJ3b+mvoNEQ6qG5MUtfoc6CpTFH5lw==", "requires": { "debug": "^4.3.1", "influx": "^5.9.3" } }, "@jambonz/verb-specifications": { - "version": "0.0.86", - "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.86.tgz", - "integrity": "sha512-nTMMeFJtkSIVD3icQajOKv+zFBiAaUNuky2htVgYKipE2AIq/H/SsdurYdBij3KQq1o58/FuUuXcqCwjnZnoUg==", + "version": "0.0.88", + "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.88.tgz", + "integrity": "sha512-gbJn/sT1DKJspB6EjnEpMWQBi983YYUbE9+9Ycfim6FUeMwD/BDoSNYkQNAxMDX8+s6NqQCMZYGOmb5qY5ZwzQ==", "requires": { "debug": "^4.3.4", "pino": "^8.8.0" diff --git a/package.json b/package.json index e619b269..7fa08cfb 100644 --- a/package.json +++ b/package.json @@ -33,8 +33,8 @@ "@jambonz/realtimedb-helpers": "^0.8.8", "@jambonz/speech-utils": "^0.1.22", "@jambonz/stats-collector": "^0.1.10", - "@jambonz/time-series": "^0.2.12", - "@jambonz/verb-specifications": "^0.0.86", + "@jambonz/time-series": "^0.2.13", + "@jambonz/verb-specifications": "^0.0.88", "@opentelemetry/api": "^1.8.0", "@opentelemetry/exporter-jaeger": "^1.23.0", "@opentelemetry/exporter-trace-otlp-http": "^0.50.0", From c278188afa860d17894f4b450b003320acdef96a Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Wed, 4 Dec 2024 09:16:24 -0500 Subject: [PATCH 2/9] add TtsStreamingBuffer class to abstract handling of streaming tokens --- lib/session/call-session.js | 20 +++++ lib/tasks/say.js | 20 +++-- lib/tasks/tts-task.js | 2 +- lib/utils/tts-streaming-buffer.js | 139 ++++++++++++++++++++++++++++++ 4 files changed, 172 insertions(+), 9 deletions(-) create mode 100644 lib/utils/tts-streaming-buffer.js diff --git a/lib/session/call-session.js b/lib/session/call-session.js index b2dab2e0..5286ec9b 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -21,6 +21,7 @@ const listTaskNames = require('../utils/summarize-tasks'); const HttpRequestor = require('../utils/http-requestor'); const WsRequestor = require('../utils/ws-requestor'); const ActionHookDelayProcessor = require('../utils/action-hook-delay'); +const TtsStreamingBuffer = require('../utils/tts-streaming-buffer'); const {parseUri} = require('drachtio-srf'); const { JAMBONES_INJECT_CONTENT, @@ -111,6 +112,8 @@ class CallSession extends Emitter { this._pool = srf.locals.dbHelpers.pool; + this.ttsStreamingBuffer = new TtsStreamingBuffer(this); + const handover = (newRequestor) => { this.logger.info(`handover to new base url ${newRequestor.url}`); this.requestor.removeAllListeners(); @@ -1641,6 +1644,13 @@ Duration=${duration} ` .catch((err) => this.logger.error(err, 'CallSession:_lccLlmUpdate')); } + _lccTtsTokens(opts) { + this.ttsStreamingBuffer.bufferTokens(opts); + } + + _lccTtsFlush(opts) { + this.ttsStreamingBuffer.flush(opts); + } /** * perform call hangup by jambonz @@ -2022,6 +2032,14 @@ Duration=${duration} ` this._lccLlmUpdate(data, call_sid); break; + case 'tts:tokens': + this._lccTtsTokens(data); + break; + + case 'tts:flush': + this._lccTtsFlush(data); + break; + default: this.logger.info(`CallSession:_onCommand - invalid command ${command}`); } @@ -2217,6 +2235,8 @@ Duration=${duration} ` // close all background tasks this.backgroundTaskManager.stopAll(); this.clearOrRestoreActionHookDelayProcessor().catch((err) => {}); + + this.ttsStreamingBuffer.kill(); } /** diff --git a/lib/tasks/say.js b/lib/tasks/say.js index 0c4a9628..7b3b9aed 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -41,11 +41,11 @@ class TaskSay extends TtsTask { if (this.data.stream === true) { - this._isStreaming = true; + this._isStreamingTts = true; this.closeOnStreamEmpty = this.data.closeOnStreamEmpty !== false; } else { - this._isStreaming = false; + this._isStreamingTts = false; this.text = (Array.isArray(this.data.text) ? this.data.text : [this.data.text]) .map((t) => breakLengthyTextIfNeeded(this.logger, t)) .flat(); @@ -58,7 +58,7 @@ class TaskSay extends TtsTask { get name() { return TaskName.Say; } get summary() { - if (this.isStreaming) return `${this.name} streaming`; + if (this.isStreamingTts) return `${this.name} streaming`; else { for (let i = 0; i < this.text.length; i++) { if (this.text[i].startsWith('silence_stream')) continue; @@ -68,7 +68,7 @@ class TaskSay extends TtsTask { } } - get isStreaming() { return this._isStreaming; } + get isStreamingTts() { return this._isStreamingTts; } _validateURL(urlString) { try { @@ -80,12 +80,12 @@ class TaskSay extends TtsTask { } async exec(cs, obj) { - if (this.isStreaming && !cs.appIsUsingWebsockets) { + if (this.isStreamingTts && !cs.appIsUsingWebsockets) { throw new Error('Say: streaming say verb requires applications to use the websocket API'); } try { - if (this.isStreaming) await this.handlingStreaming(cs, obj); + if (this.isStreamingTts) await this.handlingStreaming(cs, obj); else await this.handling(cs, obj); this.emit('playDone'); } catch (error) { @@ -101,8 +101,11 @@ class TaskSay extends TtsTask { } async handlingStreaming(cs, {ep}) { - const {vendor, language, voice, label} = this.getTtsVendorData(cs); - + //TODO: set channel variables for tts streaming vendor + + + await this.awaitTaskDone(); + this.logger.info('TaskSay:handlingStreaming - done'); } async handling(cs, {ep}) { @@ -280,6 +283,7 @@ class TaskSay extends TtsTask { this._playResolve = null; } } + this.notifyTaskDone(); } _addStreamingTtsAttributes(span, evt) { diff --git a/lib/tasks/tts-task.js b/lib/tasks/tts-task.js index f2285fef..30fae4e6 100644 --- a/lib/tasks/tts-task.js +++ b/lib/tasks/tts-task.js @@ -45,7 +45,7 @@ class TtsTask extends Task { } getTtsVendorData(cs) { - const vendor = this.synthesizer.vendor && this.synthesizer.vendor !== 'default' ? + const vendor = this.synthesizer.vendor && this.synthesizer.vendor !== 'default' ? this.synthesizer.vendor : cs.speechSynthesisVendor; const language = this.synthesizer.language && this.synthesizer.language !== 'default' ? diff --git a/lib/utils/tts-streaming-buffer.js b/lib/utils/tts-streaming-buffer.js new file mode 100644 index 00000000..7d3c19b3 --- /dev/null +++ b/lib/utils/tts-streaming-buffer.js @@ -0,0 +1,139 @@ +const Emitter = require('events'); +const assert = require('assert'); +const FEED_INTERVAL = 2000; +const MAX_CHUNK_SIZE = 2000; + +class TtsStreamingBuffer extends Emitter { + constructor(cs) { + super(); + this.cs = cs; + this.logger = cs.logger; + + this.tokens = null; + } + + get isEmpty() { + return this.tokens.length === 0; + } + + get size() { + return this.tokens.length; + } + + get ep() { + return this.cs?.ep; + } + + /** + * Add tokens to the buffer and start feeding them to the endpoint if necessary. + */ + bufferTokens(tokens) { + const starting = this.tokens === null; + this.tokens += (tokens || ''); + const leftoverTokens = this.feedTokens(); + + /* do we need to start a timer to periodically feed tokens to the endpoint? */ + if (starting && leftoverTokens > 0) { + assert(!this.timer); + this.timer = setInterval(() => { + const remaining = this._feedTokens(); + if (remaining === 0) { + clearInterval(this.timer); + this.timer = null; + } + }, FEED_INTERVAL); + } + } + + flush() { + clearTimeout(this.timer); + this._api(this.ep, [this.ep.uuid, 'clear']) + .catch((err) => this.logger.info({err}, 'Error flushing TTS streaming')); + this.tokens = null; + this.timer = null; + } + + /** + * Send the next chunk of tokens to the endpoint (max 2000 chars) + * Return the number of tokens left in the buffer. + */ + _feedTokens() { + if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) { + return this.tokens?.length || 0; + } + + // Helper function to find a sentence boundary + const findSentenceBoundary = (text, limit) => { + const sentenceEndRegex = /[.!?](?=\s|$)/g; + let lastSentenceBoundary = -1; + let match; + + while ((match = sentenceEndRegex.exec(text)) && match.index < limit) { + // Ensure it's not a decimal point (e.g., "3.14") + if (match.index === 0 || !/\d$/.test(text[match.index - 1])) { + lastSentenceBoundary = match.index + 1; // Include the punctuation + } + } + return lastSentenceBoundary; + }; + + // Helper function to find a word boundary + const findWordBoundary = (text, limit) => { + const wordBoundaryRegex = /\s+/g; + let lastWordBoundary = -1; + let match; + + while ((match = wordBoundaryRegex.exec(text)) && match.index < limit) { + lastWordBoundary = match.index; + } + return lastWordBoundary; + }; + + // Try to find the best chunk to send + const limit = Math.min(MAX_CHUNK_SIZE, this.tokens.length); + let chunkEnd = findSentenceBoundary(this.tokens, limit); + + if (chunkEnd === -1) { + // If no sentence boundary, try word boundary + chunkEnd = findWordBoundary(this.tokens, limit); + } + + if (chunkEnd === -1) { + // If no boundaries at all, just take the max allowed + chunkEnd = limit; + } + + const chunk = this.tokens.slice(0, chunkEnd); + this.tokens = this.tokens.slice(chunkEnd).trim(); // Remove sent chunk and trim whitespace + + // Send the chunk to the endpoint + // TODO: abstract this into an endpoint method + this._api(this.ep, [this.ep.uuid, 'send', chunk]) + .then(() => this.logger.debug(`Sent chunk: ${chunk}`)) + .catch((err) => { + this.logger.info({err}, 'Error sending TTS chunk'); + }); + + + return this.tokens.length; + } + + async _api(ep, args) { + const res = await ep.api('uuid_deepgram_tts_streaming', `^^|${args.join('|')}`); + if (!res.body?.startsWith('+OK')) { + throw new Error({args}, `Error calling uuid_deepgram_tts_streaming: ${res.body}`); + } + } + + kill() { + clearTimeout(this.timer); + this._api(this.ep, [this.ep.uuid, 'close']) + .catch((err) => this.logger.info({err}, 'Error closing TTS streaming')); + this.timer = null; + this.tokens = null; + this.cs = null; + } + +} + +module.exports = TtsStreamingBuffer; From c9c0ccb5759e8e190ab4a9b9bedcffaee750b5c1 Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Thu, 5 Dec 2024 17:13:36 -0500 Subject: [PATCH 3/9] wip --- lib/session/call-session.js | 70 +++++++++++++++++++++---------- lib/tasks/say.js | 21 +++++++++- lib/utils/constants.json | 9 ++++ lib/utils/tts-streaming-buffer.js | 68 ++++++++++++++++++++++++------ lib/utils/ws-requestor.js | 2 +- package-lock.json | 27 ++++++------ package.json | 2 +- 7 files changed, 148 insertions(+), 51 deletions(-) diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 5286ec9b..305ba2d7 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -9,7 +9,8 @@ const { KillReason, RecordState, AllowedSipRecVerbs, - AllowedConfirmSessionVerbs + AllowedConfirmSessionVerbs, + TtsStreamingEvents } = require('../utils/constants'); const moment = require('moment'); const assert = require('assert'); @@ -112,8 +113,6 @@ class CallSession extends Emitter { this._pool = srf.locals.dbHelpers.pool; - this.ttsStreamingBuffer = new TtsStreamingBuffer(this); - const handover = (newRequestor) => { this.logger.info(`handover to new base url ${newRequestor.url}`); this.requestor.removeAllListeners(); @@ -402,34 +401,27 @@ class CallSession extends Emitter { get isTransferredCall() { return this.application.transferredCall === true; } - - /** - * returns true if this session is a ConfirmCallSession - */ get isAdultingCallSession() { return this.constructor.name === 'AdultingCallSession'; } - - /** - * returns true if this session is a ConfirmCallSession - */ get isConfirmCallSession() { return this.constructor.name === 'ConfirmCallSession'; } - - /** - * returns true if this session is a SipRecCallSession - */ get isSipRecCallSession() { return this.constructor.name === 'SipRecCallSession'; } - - /** - * returns true if this session is a SmsCallSession - */ get isSmsCallSession() { return this.constructor.name === 'SmsCallSession'; } + get isRestCallSession() { + return this.constructor.name === 'RestCallSession'; + } + get InboundCallSession() { + return this.constructor.name === 'InboundCallSession'; + } + get isNormalCallSession() { + return this.constructor.name === 'InboundCallSession' || this.constructor.name === 'RestCallSession'; + } get webhook_secret() { return this.accountInfo?.account?.webhook_secret; @@ -505,6 +497,10 @@ class CallSession extends Emitter { this._sipRequestWithinDialogHook = url; } + get isTtsStreamOpen() { + //TODO: also need to check for a background streaming say + return this.currentTask?.isStreamingTts; + } // Bot Delay (actionHook delayed) get actionHookDelayEnabled() { return this._actionHookDelayEnabled; @@ -1061,6 +1057,15 @@ class CallSession extends Emitter { this.inbandDtmfEnabled = voipCarrier?.dtmf_type === 'tones'; } + if (this.isNormalCallSession) { + this.ttsStreamingBuffer = new TtsStreamingBuffer(this); + this.ttsStreamingBuffer.on(TtsStreamingEvents.Empty, this._onTtsStreamingEmpty.bind(this)); + this.ttsStreamingBuffer.on(TtsStreamingEvents.ConnectFailure, this._onTtsStreamingConnectFailure.bind(this)); + } + else { + this.logger.info(`CallSession:exec - not a normal call session: ${this.constructor.name}`); + } + while (this.tasks.length && !this.callGone) { const taskNum = ++this.taskIdx; const stackNum = this.stackIdx; @@ -1645,11 +1650,11 @@ Duration=${duration} ` } _lccTtsTokens(opts) { - this.ttsStreamingBuffer.bufferTokens(opts); + this.ttsStreamingBuffer?.bufferTokens(opts); } _lccTtsFlush(opts) { - this.ttsStreamingBuffer.flush(opts); + this.ttsStreamingBuffer?.flush(opts); } /** @@ -2236,7 +2241,7 @@ Duration=${duration} ` this.backgroundTaskManager.stopAll(); this.clearOrRestoreActionHookDelayProcessor().catch((err) => {}); - this.ttsStreamingBuffer.kill(); + this.ttsStreamingBuffer?.kill(); } /** @@ -2771,6 +2776,27 @@ Duration=${duration} ` this.verbHookSpan = null; } } + + _onTtsStreamingEmpty() { + const task = this.currentTask; + if (task && TaskName.Say === task.name) { + task.notifyTtsStreamIsEmpty(); + } + } + + async _onTtsStreamingConnectFailure(vendor) { + const {writeAlerts, AlertType} = this.srf.locals; + try { + await writeAlerts({ + alert_type: AlertType.TTS_STREAMING_CONNECTION_FAILURE, + account_sid: this.accountSid, + vendor + }); + } catch (error) { + this.logger.error({error}, 'Error writing WEBHOOK_CONNECTION_FAILURE alert'); + } + this.logger.info({vendor}, 'CallSession:_onTtsStreamingConnectFailure - tts streaming connect failure'); + } } module.exports = CallSession; diff --git a/lib/tasks/say.js b/lib/tasks/say.js index 7b3b9aed..a6073bcf 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -101,8 +101,19 @@ class TaskSay extends TtsTask { } async handlingStreaming(cs, {ep}) { - //TODO: set channel variables for tts streaming vendor + const {vendor, voice, label} = this.getTtsVendorData(cs); + const credentials = cs.getSpeechCredentials(vendor, 'tts', label); + if (!credentials) { + throw new SpeechCredentialError( + `No text-to-speech service credentials for ${vendor} with labels: ${label} have been configured`); + } + const {api_key} = credentials; + // TODO: set channel variables for tts streaming vendor + await ep.set({ + DEEPGRAM_API_KEY: api_key, + DEEPGRAM_TTS_STREAMING_MODEL: voice + }); await this.awaitTaskDone(); this.logger.info('TaskSay:handlingStreaming - done'); @@ -304,8 +315,16 @@ class TaskSay extends TtsTask { delete attrs['cache_filename']; //no value in adding this to the span span.setAttributes(attrs); } + + notifyTtsStreamIsEmpty() { + if (this.isStreamingTts && this.closeOnStreamEmpty) { + this.logger.info('TaskSay:notifyTtsStreamIsEmpty - stream is empty, killing task'); + this.notifyTaskDone(); + } + } } + const spanMapping = { // IMPORTANT!!! JAMBONZ WEBAPP WILL SHOW TEXT PERFECTLY IF THE SPAN NAME IS SMALLER OR EQUAL 25 CHARACTERS. // EX: whisper.ratelim_reqs has length 20 <= 25 which is perfect diff --git a/lib/utils/constants.json b/lib/utils/constants.json index c3904d91..e4d01f3e 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -226,6 +226,15 @@ "PartialMedia": "partial-media", "FullMedia": "full-media" }, + "DeepgramTtsStreamingEvents": { + "Empty": "deepgram_tts_streaming::empty", + "ConnectFailure": "dedeepgram_tts_streaming::connect_failed", + "Connect": "deepgram_tts_streaming::connect" + }, + "TtsStreamingEvents": { + "Empty": "tts_streaming::empty", + "ConnectFailure": "tts_streaming::connect_failed" + }, "MAX_SIMRINGS": 10, "BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)", "FS_UUID_SET_NAME": "fsUUIDs", diff --git a/lib/utils/tts-streaming-buffer.js b/lib/utils/tts-streaming-buffer.js index 7d3c19b3..ebd400fb 100644 --- a/lib/utils/tts-streaming-buffer.js +++ b/lib/utils/tts-streaming-buffer.js @@ -1,7 +1,8 @@ const Emitter = require('events'); const assert = require('assert'); +const {DeepgramTtsStreamingEvents, TtsStreamingEvents} = require('../utils/constants'); const FEED_INTERVAL = 2000; -const MAX_CHUNK_SIZE = 2000; +const MAX_CHUNK_SIZE = 1800; class TtsStreamingBuffer extends Emitter { constructor(cs) { @@ -9,7 +10,8 @@ class TtsStreamingBuffer extends Emitter { this.cs = cs; this.logger = cs.logger; - this.tokens = null; + this.tokens = ''; + this.eventHandlers = []; } get isEmpty() { @@ -28,9 +30,19 @@ class TtsStreamingBuffer extends Emitter { * Add tokens to the buffer and start feeding them to the endpoint if necessary. */ bufferTokens(tokens) { - const starting = this.tokens === null; + const starting = this.tokens === ''; + const displayedTokens = tokens.length <= 40 ? tokens : tokens.substring(0, 40); + const totalLength = tokens.length; + + this.logger.debug( + `Buffering tokens: "${displayedTokens}" (total length: ${totalLength}), starting? ${starting}` + ); this.tokens += (tokens || ''); - const leftoverTokens = this.feedTokens(); + const leftoverTokens = this._feedTokens(); + + if (this.eventHandlers.length === 0) { + this._initHandlers(this.ep); + } /* do we need to start a timer to periodically feed tokens to the endpoint? */ if (starting && leftoverTokens > 0) { @@ -46,10 +58,11 @@ class TtsStreamingBuffer extends Emitter { } flush() { + this.logger.info('Flushing TTS streaming buffer'); clearTimeout(this.timer); this._api(this.ep, [this.ep.uuid, 'clear']) .catch((err) => this.logger.info({err}, 'Error flushing TTS streaming')); - this.tokens = null; + this.tokens = ''; this.timer = null; } @@ -58,7 +71,9 @@ class TtsStreamingBuffer extends Emitter { * Return the number of tokens left in the buffer. */ _feedTokens() { + this.logger.debug('_feedTokens'); if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) { + this.logger.debug('TTS stream is not open or no tokens to send'); return this.tokens?.length || 0; } @@ -106,15 +121,15 @@ class TtsStreamingBuffer extends Emitter { const chunk = this.tokens.slice(0, chunkEnd); this.tokens = this.tokens.slice(chunkEnd).trim(); // Remove sent chunk and trim whitespace - // Send the chunk to the endpoint - // TODO: abstract this into an endpoint method - this._api(this.ep, [this.ep.uuid, 'send', chunk]) - .then(() => this.logger.debug(`Sent chunk: ${chunk}`)) + /* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */ + const modifiedChunk = chunk.replace(/\n\n/g, '\n \n'); + this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]) + .then(() => this.logger.debug(`Sent tokens: ${chunk.substring(0, 40)}`)) .catch((err) => { this.logger.info({err}, 'Error sending TTS chunk'); }); - + this.logger.debug(`_feedTokens: sent ${chunk.length}, Remaining: ${this.tokens.length}`); return this.tokens.length; } @@ -127,13 +142,40 @@ class TtsStreamingBuffer extends Emitter { kill() { clearTimeout(this.timer); - this._api(this.ep, [this.ep.uuid, 'close']) - .catch((err) => this.logger.info({err}, 'Error closing TTS streaming')); + this.removeCustomEventListeners(); + if (this.ep) { + this._api(this.ep, [this.ep.uuid, 'close']) + .catch((err) => this.logger.info({err}, 'Error closing TTS streaming')); + } this.timer = null; - this.tokens = null; + this.tokens = ''; this.cs = null; } + _onConnectFailure(vendor) { + this.emit(TtsStreamingEvents.ConnectFailure, {vendor}); + } + + _onTtsEmpty(vendor) { + this.emit(TtsStreamingEvents.Empty, {vendor}); + } + + _initHandlers(ep) { + this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.ConnectFailure, + this._onConnectFailure.bind(this, 'deepgram')); + this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.Empty, + this._onTtsEmpty.bind(this, 'deepgram')); + } + + addCustomEventListener(ep, event, handler) { + this.eventHandlers.push({ep, event, handler}); + ep.addCustomEventListener(event, handler); + } + + removeCustomEventListeners() { + this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler)); + } + } module.exports = TtsStreamingBuffer; diff --git a/lib/utils/ws-requestor.js b/lib/utils/ws-requestor.js index c010d434..b002b794 100644 --- a/lib/utils/ws-requestor.js +++ b/lib/utils/ws-requestor.js @@ -408,7 +408,7 @@ class WsRequestor extends BaseRequestor { case 'command': assert.ok(command, 'command property not supplied'); - assert.ok(data || command === 'llm:tool-output', 'data property not supplied'); + assert.ok(data || ['llm:tool-output', 'tts:flush'].includes(command), 'data property not supplied'); this._recvCommand(msgid, command, call_sid, queueCommand, tool_call_id, data); break; diff --git a/package-lock.json b/package-lock.json index f1ace642..873bd9ac 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,7 +18,7 @@ "@jambonz/speech-utils": "^0.1.22", "@jambonz/stats-collector": "^0.1.10", "@jambonz/time-series": "^0.2.13", - "@jambonz/verb-specifications": "^0.0.88", + "@jambonz/verb-specifications": "^0.0.89", "@opentelemetry/api": "^1.8.0", "@opentelemetry/exporter-jaeger": "^1.23.0", "@opentelemetry/exporter-trace-otlp-http": "^0.50.0", @@ -1581,9 +1581,9 @@ } }, "node_modules/@jambonz/verb-specifications": { - "version": "0.0.88", - "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.88.tgz", - "integrity": "sha512-gbJn/sT1DKJspB6EjnEpMWQBi983YYUbE9+9Ycfim6FUeMwD/BDoSNYkQNAxMDX8+s6NqQCMZYGOmb5qY5ZwzQ==", + "version": "0.0.89", + "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.89.tgz", + "integrity": "sha512-zMLZYp3DtOHImHAR4HDDEdPujomJ6pqvK14PFZ2/RKZZMUoMbs2XxYIpKLPvir1StYwT+WHauttnn/YIYAnxvw==", "dependencies": { "debug": "^4.3.4", "pino": "^8.8.0" @@ -3556,10 +3556,11 @@ "integrity": "sha512-3lqz5YjWTYnW6dlDa5TLaTCcShfar1e40rmcJVwCBJC6mWlFuj0eCHIElmG1g5kyuJ/GD+8Wn4FFCcz4gJPfaQ==" }, "node_modules/cross-spawn": { - "version": "7.0.3", - "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", - "integrity": "sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==", + "version": "7.0.6", + "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", + "integrity": "sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==", "dev": true, + "license": "MIT", "dependencies": { "path-key": "^3.1.0", "shebang-command": "^2.0.0", @@ -10752,9 +10753,9 @@ } }, "@jambonz/verb-specifications": { - "version": "0.0.88", - "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.88.tgz", - "integrity": "sha512-gbJn/sT1DKJspB6EjnEpMWQBi983YYUbE9+9Ycfim6FUeMwD/BDoSNYkQNAxMDX8+s6NqQCMZYGOmb5qY5ZwzQ==", + "version": "0.0.89", + "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.89.tgz", + "integrity": "sha512-zMLZYp3DtOHImHAR4HDDEdPujomJ6pqvK14PFZ2/RKZZMUoMbs2XxYIpKLPvir1StYwT+WHauttnn/YIYAnxvw==", "requires": { "debug": "^4.3.4", "pino": "^8.8.0" @@ -12241,9 +12242,9 @@ "integrity": "sha512-3lqz5YjWTYnW6dlDa5TLaTCcShfar1e40rmcJVwCBJC6mWlFuj0eCHIElmG1g5kyuJ/GD+8Wn4FFCcz4gJPfaQ==" }, "cross-spawn": { - "version": "7.0.3", - "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", - "integrity": "sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==", + "version": "7.0.6", + "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", + "integrity": "sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==", "dev": true, "requires": { "path-key": "^3.1.0", diff --git a/package.json b/package.json index 7fa08cfb..85950aa1 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,7 @@ "@jambonz/speech-utils": "^0.1.22", "@jambonz/stats-collector": "^0.1.10", "@jambonz/time-series": "^0.2.13", - "@jambonz/verb-specifications": "^0.0.88", + "@jambonz/verb-specifications": "^0.0.89", "@opentelemetry/api": "^1.8.0", "@opentelemetry/exporter-jaeger": "^1.23.0", "@opentelemetry/exporter-trace-otlp-http": "^0.50.0", From 3cf56ca162cbec1cae8b41800c4d55e5ae6aeaaa Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Fri, 6 Dec 2024 15:31:40 -0500 Subject: [PATCH 4/9] add throttling support --- lib/session/call-session.js | 31 +++++++++++++++++++++- lib/tasks/say.js | 6 +++++ lib/utils/constants.json | 4 +++ lib/utils/tts-streaming-buffer.js | 44 ++++++++++++++++++++++++++----- lib/utils/ws-requestor.js | 10 ++++++- 5 files changed, 86 insertions(+), 9 deletions(-) diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 305ba2d7..de4fd9a4 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -1060,6 +1060,8 @@ class CallSession extends Emitter { if (this.isNormalCallSession) { this.ttsStreamingBuffer = new TtsStreamingBuffer(this); this.ttsStreamingBuffer.on(TtsStreamingEvents.Empty, this._onTtsStreamingEmpty.bind(this)); + this.ttsStreamingBuffer.on(TtsStreamingEvents.Pause, this._onTtsStreamingPause.bind(this)); + this.ttsStreamingBuffer.on(TtsStreamingEvents.Resume, this._onTtsStreamingResume.bind(this)); this.ttsStreamingBuffer.on(TtsStreamingEvents.ConnectFailure, this._onTtsStreamingConnectFailure.bind(this)); } else { @@ -1650,7 +1652,24 @@ Duration=${duration} ` } _lccTtsTokens(opts) { - this.ttsStreamingBuffer?.bufferTokens(opts); + const {id, tokens} = opts; + + if (id === undefined) { + this.logger.info({opts}, 'CallSession:_lccTtsTokens - invalid command since id is missing'); + return; + } + else if (tokens === undefined) { + this.logger.info({opts}, 'CallSession:_lccTtsTokens - invalid command since id is missing'); + return this.requestor.request('tts:tokens-result', '/tokens-result', { + id, + status: 'failed', + reason: 'missing tokens' + }).catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending')); + } + const res = this.ttsStreamingBuffer?.bufferTokens(tokens); + this.logger.info({id, res}, 'CallSession:_lccTtsTokens - tts:tokens-result'); + this.requestor.request('tts:tokens-result', '/tokens-result', {id, ...res}) + .catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending')); } _lccTtsFlush(opts) { @@ -2784,6 +2803,16 @@ Duration=${duration} ` } } + _onTtsStreamingPause() { + this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_paused'}) + .catch((err) => this.logger.info({err}, 'CallSession:_onTtsStreamingPause - Error sending')); + } + + _onTtsStreamingResume() { + this.requestor?.request('tts:streaming-event', 'streaming-event', {event_type: 'stream_resumed'}) + .catch((err) => this.logger.info({err}, 'CallSession:_onTtsStreamingResume - Error sending')); + } + async _onTtsStreamingConnectFailure(vendor) { const {writeAlerts, AlertType} = this.srf.locals; try { diff --git a/lib/tasks/say.js b/lib/tasks/say.js index a6073bcf..379fb753 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -115,7 +115,13 @@ class TaskSay extends TtsTask { DEEPGRAM_TTS_STREAMING_MODEL: voice }); + cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_open'}) + .catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending')); + await this.awaitTaskDone(); + cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'}) + .catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending')); + this.logger.info('TaskSay:handlingStreaming - done'); } diff --git a/lib/utils/constants.json b/lib/utils/constants.json index e4d01f3e..d3aa60e1 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -203,6 +203,8 @@ "verb:status", "llm:event", "llm:tool-call", + "tts:tokens-result", + "tts:streaming-event", "jambonz:error" ], "RecordState": { @@ -233,6 +235,8 @@ }, "TtsStreamingEvents": { "Empty": "tts_streaming::empty", + "Pause": "tts_streaming::pause", + "Resume": "tts_streaming::resume", "ConnectFailure": "tts_streaming::connect_failed" }, "MAX_SIMRINGS": 10, diff --git a/lib/utils/tts-streaming-buffer.js b/lib/utils/tts-streaming-buffer.js index ebd400fb..4b25950c 100644 --- a/lib/utils/tts-streaming-buffer.js +++ b/lib/utils/tts-streaming-buffer.js @@ -3,6 +3,9 @@ const assert = require('assert'); const {DeepgramTtsStreamingEvents, TtsStreamingEvents} = require('../utils/constants'); const FEED_INTERVAL = 2000; const MAX_CHUNK_SIZE = 1800; +const HIGH_WATER_BUFFER_SIZE = 5000; +const LOW_WATER_BUFFER_SIZE = 1000; + class TtsStreamingBuffer extends Emitter { constructor(cs) { @@ -12,12 +15,17 @@ class TtsStreamingBuffer extends Emitter { this.tokens = ''; this.eventHandlers = []; + this._isFull = false; } get isEmpty() { return this.tokens.length === 0; } + get isFull() { + return this._isFull; + } + get size() { return this.tokens.length; } @@ -34,8 +42,20 @@ class TtsStreamingBuffer extends Emitter { const displayedTokens = tokens.length <= 40 ? tokens : tokens.substring(0, 40); const totalLength = tokens.length; + /* if we crossed the high water mark, reject the request */ + if (this.tokens.length + totalLength > HIGH_WATER_BUFFER_SIZE) { + this.logger.info( + `TtsStreamingBuffer:bufferTokensTTS buffer is full, rejecting request to buffer ${totalLength} tokens`); + + if (!this._isFull) { + this._isFull = true; + this.emit(TtsStreamingEvents.Pause); + } + return {status: 'failed', reason: 'full'}; + } + this.logger.debug( - `Buffering tokens: "${displayedTokens}" (total length: ${totalLength}), starting? ${starting}` + `TtsStreamingBuffer:bufferTokens "${displayedTokens}" (length: ${totalLength}), starting? ${starting}` ); this.tokens += (tokens || ''); const leftoverTokens = this._feedTokens(); @@ -55,15 +75,18 @@ class TtsStreamingBuffer extends Emitter { } }, FEED_INTERVAL); } + + return {status: 'ok'}; } flush() { - this.logger.info('Flushing TTS streaming buffer'); + this.logger.debug('TtsStreamingBuffer:flush'); clearTimeout(this.timer); this._api(this.ep, [this.ep.uuid, 'clear']) - .catch((err) => this.logger.info({err}, 'Error flushing TTS streaming')); + .catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:flush Error flushing TTS streaming')); this.tokens = ''; this.timer = null; + this._isFull = false; } /** @@ -124,12 +147,19 @@ class TtsStreamingBuffer extends Emitter { /* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */ const modifiedChunk = chunk.replace(/\n\n/g, '\n \n'); this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]) - .then(() => this.logger.debug(`Sent tokens: ${chunk.substring(0, 40)}`)) + .then(() => this.logger.debug(`TtsStreamingBuffer:_feedTokens tokens: ${chunk.substring(0, 40)}`)) .catch((err) => { - this.logger.info({err}, 'Error sending TTS chunk'); + this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk'); }); - this.logger.debug(`_feedTokens: sent ${chunk.length}, Remaining: ${this.tokens.length}`); + this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, Remaining: ${this.tokens.length}`); + + if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) { + this.logger.info('TtsStreamingBuffer:_feedTokens TTS streaming buffer is no longer full'); + this._isFull = false; + this.emit(TtsStreamingEvents.Resume); + } + return this.tokens.length; } @@ -145,7 +175,7 @@ class TtsStreamingBuffer extends Emitter { this.removeCustomEventListeners(); if (this.ep) { this._api(this.ep, [this.ep.uuid, 'close']) - .catch((err) => this.logger.info({err}, 'Error closing TTS streaming')); + .catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:kill Error closing TTS streaming')); } this.timer = null; this.tokens = ''; diff --git a/lib/utils/ws-requestor.js b/lib/utils/ws-requestor.js index b002b794..3952de40 100644 --- a/lib/utils/ws-requestor.js +++ b/lib/utils/ws-requestor.js @@ -44,7 +44,15 @@ class WsRequestor extends BaseRequestor { async request(type, hook, params, httpHeaders = {}) { assert(HookMsgTypes.includes(type)); const url = hook.url || hook; - const wantsAck = !['call:status', 'verb:status', 'jambonz:error', 'llm:event', 'llm:tool-call'].includes(type); + const wantsAck = ![ + 'call:status', + 'verb:status', + 'jambonz:error', + 'llm:event', + 'llm:tool-call', + 'tts:streaming-event', + 'tts:tokens-result', + ].includes(type); if (this.maliciousClient) { this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client'); From b9fa2c1a0952ad4d4f0dd93410f38df4f35a4311 Mon Sep 17 00:00:00 2001 From: Hoan Luu Huu <110280845+xquanluu@users.noreply.github.com> Date: Mon, 9 Dec 2024 20:18:51 +0700 Subject: [PATCH 5/9] support background ttsStream (#995) * wip * add TtsStreamingBuffer class to abstract handling of streaming tokens * wip * support background ttsStream * wip --------- Co-authored-by: Dave Horton --- lib/session/call-session.js | 31 ++++++++++++++++++++++++++-- lib/tasks/config.js | 29 +++++++++++++++++++++++++- lib/tasks/say.js | 1 - lib/utils/background-task-manager.js | 22 ++++++++++++++++++++ lib/utils/constants.json | 9 ++++++++ 5 files changed, 88 insertions(+), 4 deletions(-) diff --git a/lib/session/call-session.js b/lib/session/call-session.js index de4fd9a4..22a3dcab 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -435,6 +435,10 @@ class CallSession extends Emitter { return this.backgroundTaskManager.isTaskRunning('bargeIn'); } + get isTtsStreamEnabled() { + return this.backgroundTaskManager.isTaskRunning('ttsStream'); + } + get isListenEnabled() { return this.backgroundTaskManager.isTaskRunning('listen'); } @@ -498,8 +502,8 @@ class CallSession extends Emitter { } get isTtsStreamOpen() { - //TODO: also need to check for a background streaming say - return this.currentTask?.isStreamingTts; + return this.currentTask?.isStreamingTts || + this.backgroundTaskManager.getTask('ttsStream')?.isStreamingTts; } // Bot Delay (actionHook delayed) get actionHookDelayEnabled() { @@ -798,6 +802,29 @@ class CallSession extends Emitter { } } + async enableBackgroundTtsStream(say) { + try { + if (this.isTtsStreamEnabled) { + this.logger.debug('CallSession:enableBackgroundTtsStream - ttsStream currently enabled, ignoring request'); + } else if (this.appIsUsingWebsockets && this.isNormalCallSession) { + await this.backgroundTaskManager.newTask('ttsStream', say); + this.logger.debug('CallSession:enableBackgroundTtsStream - ttsStream enabled'); + } else { + this.logger.debug( + 'CallSession:enableBackgroundTtsStream - ignoring request as call does not have required conditions'); + } + } catch (err) { + this.logger.info({err, say}, 'CallSession:enableBackgroundTtsStream - Error creating background tts stream task'); + } + } + + async disableTtsStream() { + if (this.isTtsStreamEnabled) { + this.backgroundTaskManager.stop('ttsStream'); + this.logger.debug('CallSession:disableTtsStream - ttsStream disabled'); + } + } + async enableBotMode(gather, autoEnable) { try { let task; diff --git a/lib/tasks/config.js b/lib/tasks/config.js index 8f163062..72ec3c4b 100644 --- a/lib/tasks/config.js +++ b/lib/tasks/config.js @@ -16,7 +16,8 @@ class TaskConfig extends Task { 'fillerNoise', 'actionHookDelayAction', 'boostAudioSignal', - 'vad' + 'vad', + 'ttsStream' ].forEach((k) => this[k] = this.data[k] || {}); if ('notifyEvents' in this.data) { @@ -45,6 +46,12 @@ class TaskConfig extends Task { }; delete this.transcribeOpts.enable; } + if (this.ttsStream.enable) { + this.sayOpts = { + verb: 'say', + stream: true + }; + } if (this.data.reset) { if (typeof this.data.reset === 'string') this.data.reset = [this.data.reset]; @@ -75,6 +82,7 @@ class TaskConfig extends Task { get hasVad() { return Object.keys(this.vad).length; } get hasFillerNoise() { return Object.keys(this.fillerNoise).length; } get hasReferHook() { return Object.keys(this.data).includes('referHook'); } + get hasTtsStream() { return Object.keys(this.ttsStream).length; } get summary() { const phrase = []; @@ -106,6 +114,9 @@ class TaskConfig extends Task { if (this.onHoldMusic) phrase.push(`onHoldMusic: ${this.onHoldMusic}`); if ('boostAudioSignal' in this.data) phrase.push(`setGain ${this.data.boostAudioSignal}`); if (this.hasReferHook) phrase.push('set referHook'); + if (this.hasTtsStream) { + phrase.push(`${this.ttsStream.enable ? 'enable' : 'disable'} ttsStream`); + } return `${this.name}{${phrase.join(',')}}`; } @@ -305,6 +316,22 @@ class TaskConfig extends Task { if (this.hasReferHook) { cs.referHook = this.data.referHook; } + + if (this.ttsStream.enable && this.sayOpts) { + this.sayOpts.synthesizer = this.hasSynthesizer ? this.synthesizer : { + vendor: cs.speechSynthesisVendor, + language: cs.speechSynthesisLanguage, + voice: cs.speechSynthesisVoice, + ...(cs.speechSynthesisLabel && { + label: cs.speechSynthesisLabel + }) + }; + this.logger.info({opts: this.gatherOpts}, 'Config: enabling ttsStream'); + cs.enableBackgroundTtsStream(this.sayOpts); + } else if (!this.ttsStream.enable) { + this.logger.info('Config: disabling ttsStream'); + cs.disableTtsStream(); + } } async kill(cs) { diff --git a/lib/tasks/say.js b/lib/tasks/say.js index 379fb753..ead92d89 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -330,7 +330,6 @@ class TaskSay extends TtsTask { } } - const spanMapping = { // IMPORTANT!!! JAMBONZ WEBAPP WILL SHOW TEXT PERFECTLY IF THE SPAN NAME IS SMALLER OR EQUAL 25 CHARACTERS. // EX: whisper.ratelim_reqs has length 20 <= 25 which is perfect diff --git a/lib/utils/background-task-manager.js b/lib/utils/background-task-manager.js index a83eade1..60fde746 100644 --- a/lib/utils/background-task-manager.js +++ b/lib/utils/background-task-manager.js @@ -46,6 +46,9 @@ class BackgroundTaskManager extends Emitter { case 'transcribe': task = await this._initTranscribe(opts); break; + case 'ttsStream': + task = await this._initTtsStream(opts); + break; default: break; } @@ -173,6 +176,25 @@ class BackgroundTaskManager extends Emitter { return task; } + // Initiate Tts Stream + async _initTtsStream(opts) { + let task; + try { + const t = normalizeJambones(this.logger, [opts]); + task = makeTask(this.logger, t[0]); + const resources = await this.cs._evaluatePreconditions(task); + const {span, ctx} = this.rootSpan.startChildSpan(`background-ttsStream:${task.summary}`); + task.span = span; + task.ctx = ctx; + task.exec(this.cs, resources) + .then(this._taskCompleted.bind(this, 'ttsStream', task)) + .catch(this._taskError.bind(this, 'ttsStream', task)); + } catch (err) { + this.logger.info(err, 'BackgroundTaskManager:_initTtsStream - Error creating ttsStream task'); + } + return task; + } + _taskCompleted(type, task) { this.logger.debug({type, task}, `BackgroundTaskManager:_taskCompleted: task completed, sticky: ${task.sticky}`); task.removeAllListeners(); diff --git a/lib/utils/constants.json b/lib/utils/constants.json index d3aa60e1..4d0045df 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -239,6 +239,15 @@ "Resume": "tts_streaming::resume", "ConnectFailure": "tts_streaming::connect_failed" }, + "DeepgramTtsStreamingEvents": { + "Empty": "deepgram_tts_streaming::empty", + "ConnectFailure": "dedeepgram_tts_streaming::connect_failed", + "Connect": "deepgram_tts_streaming::connect" + }, + "TtsStreamingEvents": { + "Empty": "tts_streaming::empty", + "ConnectFailure": "tts_streaming::connect_failed" + }, "MAX_SIMRINGS": 10, "BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)", "FS_UUID_SET_NAME": "fsUUIDs", From f74a2536a412017ee58ab3d7645d378bae894719 Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Tue, 10 Dec 2024 12:58:01 -0500 Subject: [PATCH 6/9] fix #998 incorrectly sending final transcript with is_final=false --- lib/utils/transcription-utils.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/utils/transcription-utils.js b/lib/utils/transcription-utils.js index d19e8f41..61226201 100644 --- a/lib/utils/transcription-utils.js +++ b/lib/utils/transcription-utils.js @@ -185,7 +185,10 @@ const selectDefaultGoogleModel = (task, language, version) => { (useV2 ? 'long' : 'latest_long'); }; const consolidateTranscripts = (bufferedTranscripts, channel, language, vendor) => { - if (bufferedTranscripts.length === 1) return bufferedTranscripts[0]; + if (bufferedTranscripts.length === 1) { + bufferedTranscripts[0].is_final = true; + return bufferedTranscripts[0]; + } let totalConfidence = 0; const finalTranscript = bufferedTranscripts.reduce((acc, evt) => { totalConfidence += evt.alternatives[0].confidence; From dea3e433f189d7b2ca0f108589f78aa350b12da8 Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Tue, 10 Dec 2024 14:45:35 -0500 Subject: [PATCH 7/9] wip --- lib/session/call-session.js | 24 +++++- lib/tasks/gather.js | 1 + lib/utils/constants.json | 13 ++- lib/utils/tts-streaming-buffer.js | 127 +++++++++++++++++++++++------- lib/utils/ws-requestor.js | 26 +++--- 5 files changed, 139 insertions(+), 52 deletions(-) diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 22a3dcab..22ee096d 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -824,6 +824,9 @@ class CallSession extends Emitter { this.logger.debug('CallSession:disableTtsStream - ttsStream disabled'); } } + async clearTtsStream() { + this.ttsStreamingBuffer?.clear(); + } async enableBotMode(gather, autoEnable) { try { @@ -1678,7 +1681,7 @@ Duration=${duration} ` .catch((err) => this.logger.error(err, 'CallSession:_lccLlmUpdate')); } - _lccTtsTokens(opts) { + async _lccTtsTokens(opts) { const {id, tokens} = opts; if (id === undefined) { @@ -1693,8 +1696,13 @@ Duration=${duration} ` reason: 'missing tokens' }).catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending')); } - const res = this.ttsStreamingBuffer?.bufferTokens(tokens); - this.logger.info({id, res}, 'CallSession:_lccTtsTokens - tts:tokens-result'); + let res; + try { + res = await this.ttsStreamingBuffer?.bufferTokens(tokens); + this.logger.info({id, res}, 'CallSession:_lccTtsTokens - tts:tokens-result'); + } catch (err) { + this.logger.info(err, 'CallSession:_lccTtsTokens'); + } this.requestor.request('tts:tokens-result', '/tokens-result', {id, ...res}) .catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending')); } @@ -1703,6 +1711,10 @@ Duration=${duration} ` this.ttsStreamingBuffer?.flush(opts); } + _lccTtsClear(opts) { + this.ttsStreamingBuffer?.clear(opts); + } + /** * perform call hangup by jambonz */ @@ -2091,6 +2103,10 @@ Duration=${duration} ` this._lccTtsFlush(data); break; + case 'tts:clear': + this._lccTtsClear(data); + break; + default: this.logger.info(`CallSession:_onCommand - invalid command ${command}`); } @@ -2287,7 +2303,7 @@ Duration=${duration} ` this.backgroundTaskManager.stopAll(); this.clearOrRestoreActionHookDelayProcessor().catch((err) => {}); - this.ttsStreamingBuffer?.kill(); + this.ttsStreamingBuffer?.stop(); } /** diff --git a/lib/tasks/gather.js b/lib/tasks/gather.js index 7153ac1f..c2758c63 100644 --- a/lib/tasks/gather.js +++ b/lib/tasks/gather.js @@ -723,6 +723,7 @@ class TaskGather extends SttTask { this._fillerNoiseOn = false; // in a race, if we just started audio it may sneak through here this.ep.api('uuid_break', this.ep.uuid) .catch((err) => this.logger.info(err, 'Error killing audio')); + cs.clearTtsStream(); } return; } diff --git a/lib/utils/constants.json b/lib/utils/constants.json index 4d0045df..fff7332f 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -239,14 +239,11 @@ "Resume": "tts_streaming::resume", "ConnectFailure": "tts_streaming::connect_failed" }, - "DeepgramTtsStreamingEvents": { - "Empty": "deepgram_tts_streaming::empty", - "ConnectFailure": "dedeepgram_tts_streaming::connect_failed", - "Connect": "deepgram_tts_streaming::connect" - }, - "TtsStreamingEvents": { - "Empty": "tts_streaming::empty", - "ConnectFailure": "tts_streaming::connect_failed" + "TtsStreamingConnectionStatus": { + "NotConnected": "not_connected", + "Connected": "connected", + "Connecting": "connecting", + "Failed": "failed" }, "MAX_SIMRINGS": 10, "BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)", diff --git a/lib/utils/tts-streaming-buffer.js b/lib/utils/tts-streaming-buffer.js index 4b25950c..504a4efc 100644 --- a/lib/utils/tts-streaming-buffer.js +++ b/lib/utils/tts-streaming-buffer.js @@ -1,6 +1,10 @@ const Emitter = require('events'); const assert = require('assert'); -const {DeepgramTtsStreamingEvents, TtsStreamingEvents} = require('../utils/constants'); +const { + DeepgramTtsStreamingEvents, + TtsStreamingEvents, + TtsStreamingConnectionStatus +} = require('../utils/constants'); const FEED_INTERVAL = 2000; const MAX_CHUNK_SIZE = 1800; const HIGH_WATER_BUFFER_SIZE = 5000; @@ -16,6 +20,8 @@ class TtsStreamingBuffer extends Emitter { this.tokens = ''; this.eventHandlers = []; this._isFull = false; + this._connectionStatus = TtsStreamingConnectionStatus.NotConnected; + this._flushPending = false; } get isEmpty() { @@ -34,10 +40,41 @@ class TtsStreamingBuffer extends Emitter { return this.cs?.ep; } + async start() { + assert.ok( + this._connectionStatus === TtsStreamingConnectionStatus.NotConnected || + this._connectionStatus === TtsStreamingConnectionStatus.Failed, + 'TtsStreamingBuffer:start already started'); + + if (this.eventHandlers.length === 0) { + this._initHandlers(this.ep); + } + + this._connectionStatus = TtsStreamingConnectionStatus.Connecting; + try { + await this._api(this.ep, [this.ep.uuid, 'connect']); + } catch (err) { + this.logger.info({err}, 'TtsStreamingBuffer:start Error connecting to TTS streaming'); + this._connectionStatus = TtsStreamingConnectionStatus.Failed; + } + } + + stop() { + clearTimeout(this.timer); + this.removeCustomEventListeners(); + if (this.ep) { + this._api(this.ep, [this.ep.uuid, 'close']) + .catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:kill Error closing TTS streaming')); + } + this.timer = null; + this.tokens = ''; + this._connectionStatus = TtsStreamingConnectionStatus.NotConnected; + } + /** * Add tokens to the buffer and start feeding them to the endpoint if necessary. */ - bufferTokens(tokens) { + async bufferTokens(tokens) { const starting = this.tokens === ''; const displayedTokens = tokens.length <= 40 ? tokens : tokens.substring(0, 40); const totalLength = tokens.length; @@ -58,17 +95,13 @@ class TtsStreamingBuffer extends Emitter { `TtsStreamingBuffer:bufferTokens "${displayedTokens}" (length: ${totalLength}), starting? ${starting}` ); this.tokens += (tokens || ''); - const leftoverTokens = this._feedTokens(); - - if (this.eventHandlers.length === 0) { - this._initHandlers(this.ep); - } + const leftoverTokens = await this._feedTokens(); /* do we need to start a timer to periodically feed tokens to the endpoint? */ if (starting && leftoverTokens > 0) { assert(!this.timer); - this.timer = setInterval(() => { - const remaining = this._feedTokens(); + this.timer = setInterval(async() => { + const remaining = await this._feedTokens(); if (remaining === 0) { clearInterval(this.timer); this.timer = null; @@ -81,9 +114,25 @@ class TtsStreamingBuffer extends Emitter { flush() { this.logger.debug('TtsStreamingBuffer:flush'); + if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) { + this.logger.debug('TtsStreamingBuffer:flush TTS stream is not quite ready - wait for connect'); + this._flushPending = true; + return; + } + else if (this._connectionStatus === TtsStreamingConnectionStatus.Connected) { + this._api(this.ep, [this.ep.uuid, 'flush']) + .catch((err) => this.logger.info({err}, + `TtsStreamingBuffer:flush Error flushing TTS streaming: ${JSON.stringify(err)}`)); + } + } + + clear() { + this.logger.debug('TtsStreamingBuffer:clear'); + + if (this._connectionStatus !== TtsStreamingConnectionStatus.Connected) return; clearTimeout(this.timer); this._api(this.ep, [this.ep.uuid, 'clear']) - .catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:flush Error flushing TTS streaming')); + .catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:clear Error clearing TTS streaming')); this.tokens = ''; this.timer = null; this._isFull = false; @@ -93,13 +142,24 @@ class TtsStreamingBuffer extends Emitter { * Send the next chunk of tokens to the endpoint (max 2000 chars) * Return the number of tokens left in the buffer. */ - _feedTokens() { + async _feedTokens() { this.logger.debug('_feedTokens'); if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) { this.logger.debug('TTS stream is not open or no tokens to send'); return this.tokens?.length || 0; } + if (this._connectionStatus === TtsStreamingConnectionStatus.NotConnected || + this._connectionStatus === TtsStreamingConnectionStatus.Failed) { + await this.start(); + return this.tokens.length; + } + + if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) { + this.logger.debug('TtsStreamingBuffer:_feedTokens TTS stream is not ready, waiting for connect'); + return this.tokens.length; + } + // Helper function to find a sentence boundary const findSentenceBoundary = (text, limit) => { const sentenceEndRegex = /[.!?](?=\s|$)/g; @@ -142,17 +202,17 @@ class TtsStreamingBuffer extends Emitter { } const chunk = this.tokens.slice(0, chunkEnd); - this.tokens = this.tokens.slice(chunkEnd).trim(); // Remove sent chunk and trim whitespace + this.tokens = this.tokens.slice(chunkEnd); // Remove sent chunk and trim whitespace /* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */ const modifiedChunk = chunk.replace(/\n\n/g, '\n \n'); - this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]) - .then(() => this.logger.debug(`TtsStreamingBuffer:_feedTokens tokens: ${chunk.substring(0, 40)}`)) - .catch((err) => { - this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk'); - }); + try { + await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]); + } catch (err) { + this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk'); + } - this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, Remaining: ${this.tokens.length}`); + this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, remaining: ${this.tokens.length}`); if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) { this.logger.info('TtsStreamingBuffer:_feedTokens TTS streaming buffer is no longer full'); @@ -160,6 +220,11 @@ class TtsStreamingBuffer extends Emitter { this.emit(TtsStreamingEvents.Resume); } + if (0 === this.tokens.length && this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + return this.tokens.length; } @@ -170,27 +235,29 @@ class TtsStreamingBuffer extends Emitter { } } - kill() { - clearTimeout(this.timer); - this.removeCustomEventListeners(); - if (this.ep) { - this._api(this.ep, [this.ep.uuid, 'close']) - .catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:kill Error closing TTS streaming')); - } - this.timer = null; - this.tokens = ''; - this.cs = null; - } - _onConnectFailure(vendor) { this.emit(TtsStreamingEvents.ConnectFailure, {vendor}); } + async _onConnect(vendor) { + this.logger.debug('ws api connected'); + this._connectionStatus = TtsStreamingConnectionStatus.Connected; + if (this.tokens.length > 0) { + await this._feedTokens(); + } + if (this._flushPending) { + this.flush(); + this._flushPending = false; + } + } + _onTtsEmpty(vendor) { this.emit(TtsStreamingEvents.Empty, {vendor}); } _initHandlers(ep) { + this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.Connect, + this._onConnect.bind(this, 'deepgram')); this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.ConnectFailure, this._onConnectFailure.bind(this, 'deepgram')); this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.Empty, diff --git a/lib/utils/ws-requestor.js b/lib/utils/ws-requestor.js index 3952de40..6b97a9a2 100644 --- a/lib/utils/ws-requestor.js +++ b/lib/utils/ws-requestor.js @@ -12,6 +12,20 @@ const { JAMBONES_WS_MAX_PAYLOAD, HTTP_USER_AGENT_HEADER } = require('../config'); +const MTYPE_WANTS_ACK = [ + 'call:status', + 'verb:status', + 'jambonz:error', + 'llm:event', + 'llm:tool-call', + 'tts:streaming-event', + 'tts:tokens-result', +]; +const MTYPE_NO_DATA = [ + 'llm:tool-output', + 'tts:flush', + 'tts:clear' +]; class WsRequestor extends BaseRequestor { constructor(logger, account_sid, hook, secret) { @@ -44,15 +58,7 @@ class WsRequestor extends BaseRequestor { async request(type, hook, params, httpHeaders = {}) { assert(HookMsgTypes.includes(type)); const url = hook.url || hook; - const wantsAck = ![ - 'call:status', - 'verb:status', - 'jambonz:error', - 'llm:event', - 'llm:tool-call', - 'tts:streaming-event', - 'tts:tokens-result', - ].includes(type); + const wantsAck = !MTYPE_WANTS_ACK.includes(type); if (this.maliciousClient) { this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client'); @@ -416,7 +422,7 @@ class WsRequestor extends BaseRequestor { case 'command': assert.ok(command, 'command property not supplied'); - assert.ok(data || ['llm:tool-output', 'tts:flush'].includes(command), 'data property not supplied'); + assert.ok(data || MTYPE_NO_DATA.includes(command), 'data property not supplied'); this._recvCommand(msgid, command, call_sid, queueCommand, tool_call_id, data); break; From 63ff94637792c2f6bb68a447f9ab177a27a8ec06 Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Fri, 13 Dec 2024 15:20:33 -0500 Subject: [PATCH 8/9] dont send if we have nothing to send --- lib/utils/tts-streaming-buffer.js | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/utils/tts-streaming-buffer.js b/lib/utils/tts-streaming-buffer.js index 504a4efc..5502e116 100644 --- a/lib/utils/tts-streaming-buffer.js +++ b/lib/utils/tts-streaming-buffer.js @@ -206,18 +206,20 @@ class TtsStreamingBuffer extends Emitter { /* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */ const modifiedChunk = chunk.replace(/\n\n/g, '\n \n'); - try { - await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]); - } catch (err) { - this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk'); - } + if (modifiedChunk.length > 0) { + try { + await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]); + } catch (err) { + this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk'); + } - this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, remaining: ${this.tokens.length}`); + this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, remaining: ${this.tokens.length}`); - if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) { - this.logger.info('TtsStreamingBuffer:_feedTokens TTS streaming buffer is no longer full'); - this._isFull = false; - this.emit(TtsStreamingEvents.Resume); + if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) { + this.logger.info('TtsStreamingBuffer:_feedTokens TTS streaming buffer is no longer full'); + this._isFull = false; + this.emit(TtsStreamingEvents.Resume); + } } if (0 === this.tokens.length && this.timer) { From 4f2ccce9df949715db81adfacc0c9c7bc77d356f Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Fri, 13 Dec 2024 21:36:36 -0500 Subject: [PATCH 9/9] initial testing with cartesia --- lib/session/call-session.js | 15 +++ lib/tasks/gather.js | 1 - lib/tasks/say.js | 25 ++-- lib/tasks/tts-task.js | 33 ++++++ lib/utils/constants.json | 12 +- lib/utils/tts-streaming-buffer.js | 191 +++++++++++++++++------------- 6 files changed, 183 insertions(+), 94 deletions(-) diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 22ee096d..80c83d0b 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -579,6 +579,21 @@ class CallSession extends Emitter { } } + getTsStreamingVendor() { + let v; + if (this.currentTask?.isStreamingTts) { + const {vendor} = this.currentTask.getTtsVendorData(this); + v = vendor; + } + else if (this.backgroundTaskManager.getTask('ttsStream')?.isStreamingTts) { + const {vendor} = this.backgroundTaskManager.getTask('ttsStream').getTtsVendorData(this); + v = vendor; + } + + //TMP!!! + return 'cartesia'; + } + get appIsUsingWebsockets() { return this.requestor instanceof WsRequestor; } diff --git a/lib/tasks/gather.js b/lib/tasks/gather.js index c2758c63..e89c9a74 100644 --- a/lib/tasks/gather.js +++ b/lib/tasks/gather.js @@ -1171,7 +1171,6 @@ class TaskGather extends SttTask { } catch (err) { /*already logged error*/ } // Gather got response from hook, cancel actionHookDelay processing - this.logger.debug('TaskGather:_resolve - checking ahd'); if (this.cs.actionHookDelayProcessor) { if (returnedVerbs) { this.logger.debug('TaskGather:_resolve - got response from action hook, cancelling actionHookDelay'); diff --git a/lib/tasks/say.js b/lib/tasks/say.js index ead92d89..591af887 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -101,27 +101,28 @@ class TaskSay extends TtsTask { } async handlingStreaming(cs, {ep}) { - const {vendor, voice, label} = this.getTtsVendorData(cs); + const {vendor, language, voice, label} = this.getTtsVendorData(cs); const credentials = cs.getSpeechCredentials(vendor, 'tts', label); if (!credentials) { throw new SpeechCredentialError( `No text-to-speech service credentials for ${vendor} with labels: ${label} have been configured`); } - const {api_key} = credentials; - // TODO: set channel variables for tts streaming vendor - await ep.set({ - DEEPGRAM_API_KEY: api_key, - DEEPGRAM_TTS_STREAMING_MODEL: voice - }); + try { + //TMP!! + await this.setTtsStreamingChannelVars(/*vendor*/ 'cartesia', language, voice, credentials, ep); + cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_open'}) + .catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending')); + } catch (err) { + this.logger.info({err}, 'TaskSay:handlingStreaming - Error setting channel vars'); + cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'}) + .catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending')); - cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_open'}) - .catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending')); + //TODO: send tts:streaming-event with error? + this.notifyTaskDone(); + } await this.awaitTaskDone(); - cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'}) - .catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending')); - this.logger.info('TaskSay:handlingStreaming - done'); } diff --git a/lib/tasks/tts-task.js b/lib/tasks/tts-task.js index 30fae4e6..f6d49b0d 100644 --- a/lib/tasks/tts-task.js +++ b/lib/tasks/tts-task.js @@ -58,6 +58,39 @@ class TtsTask extends Task { return {vendor, language, voice, label}; } + async setTtsStreamingChannelVars(vendor, language, voice, credentials, ep) { + let {api_key, cartesia_model_id, cartesia_voice_id} = credentials; + let obj; + + switch (vendor) { + case 'deepgram': + obj = { + DEEPGRAM_API_KEY: api_key, + DEEPGRAM_TTS_STREAMING_MODEL: voice + }; + break; + case 'cartesia': + //TMP!! + cartesia_model_id = 'sonic'; + cartesia_voice_id = 'f785af04-229c-4a7c-b71b-f3194c7f08bb'; + api_key = 'sk_car_gYBQoqC19S8gK2lLHhnTT'; + //TMP!! + + obj = { + CARTESIA_API_KEY: api_key, + CARTESIA_TTS_STREAMING_MODEL_ID: cartesia_model_id, + CARTESIA_TTS_STREAMING_VOICE_ID: cartesia_voice_id, + CARTESIA_TTS_STREAMING_LANGUAGE: language || 'en' + }; + break; + default: + throw new Error(`vendor ${vendor} is not supported for tts streaming yet`); + } + this.logger.info({vendor, credentials, obj}, 'setTtsStreamingChannelVars'); + + await ep.set(obj); + } + async _synthesizeWithSpecificVendor(cs, ep, {vendor, language, voice, label, preCache = false}) { const {srf, accountSid:account_sid} = cs; const {writeAlerts, AlertType, stats} = srf.locals; diff --git a/lib/utils/constants.json b/lib/utils/constants.json index fff7332f..8a0087b8 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -230,9 +230,19 @@ }, "DeepgramTtsStreamingEvents": { "Empty": "deepgram_tts_streaming::empty", - "ConnectFailure": "dedeepgram_tts_streaming::connect_failed", + "ConnectFailure": "deepgram_tts_streaming::connect_failed", "Connect": "deepgram_tts_streaming::connect" }, + "CartesiaTtsStreamingEvents": { + "Empty": "cartesia_tts_streaming::empty", + "ConnectFailure": "cartesia_tts_streaming::connect_failed", + "Connect": "cartesia_tts_streaming::connect" + }, + "ElevenlabsTtsStreamingEvents": { + "Empty": "elevenlabs_tts_streaming::empty", + "ConnectFailure": "elevenlabs_tts_streaming::connect_failed", + "Connect": "elevenlabs_tts_streaming::connect" + }, "TtsStreamingEvents": { "Empty": "tts_streaming::empty", "Pause": "tts_streaming::pause", diff --git a/lib/utils/tts-streaming-buffer.js b/lib/utils/tts-streaming-buffer.js index 5502e116..5a3b297a 100644 --- a/lib/utils/tts-streaming-buffer.js +++ b/lib/utils/tts-streaming-buffer.js @@ -1,7 +1,6 @@ const Emitter = require('events'); const assert = require('assert'); const { - DeepgramTtsStreamingEvents, TtsStreamingEvents, TtsStreamingConnectionStatus } = require('../utils/constants'); @@ -42,16 +41,20 @@ class TtsStreamingBuffer extends Emitter { async start() { assert.ok( - this._connectionStatus === TtsStreamingConnectionStatus.NotConnected || - this._connectionStatus === TtsStreamingConnectionStatus.Failed, - 'TtsStreamingBuffer:start already started'); + this._connectionStatus === TtsStreamingConnectionStatus.NotConnected, + 'TtsStreamingBuffer:start already started, or has failed'); - if (this.eventHandlers.length === 0) { - this._initHandlers(this.ep); + this.vendor = this.cs.getTsStreamingVendor(); + if (!this.vendor) { + this.logger.info('TtsStreamingBuffer:start No TTS streaming vendor configured'); + throw new Error('No TTS streaming vendor configured'); } + this.logger.info(`TtsStreamingBuffer:start Connecting to TTS streaming with vendor ${this.vendor}`); + this._connectionStatus = TtsStreamingConnectionStatus.Connecting; try { + if (this.eventHandlers.length === 0) this._initHandlers(this.ep); await this._api(this.ep, [this.ep.uuid, 'connect']); } catch (err) { this.logger.info({err}, 'TtsStreamingBuffer:start Error connecting to TTS streaming'); @@ -75,6 +78,12 @@ class TtsStreamingBuffer extends Emitter { * Add tokens to the buffer and start feeding them to the endpoint if necessary. */ async bufferTokens(tokens) { + + if (this._connectionStatus === TtsStreamingConnectionStatus.Failed) { + this.logger.info('TtsStreamingBuffer:bufferTokens TTS streaming connection failed, rejecting request'); + return {status: 'failed', reason: `connection to ${this.vendor} failed`}; + } + const starting = this.tokens === ''; const displayedTokens = tokens.length <= 40 ? tokens : tokens.substring(0, 40); const totalLength = tokens.length; @@ -95,7 +104,8 @@ class TtsStreamingBuffer extends Emitter { `TtsStreamingBuffer:bufferTokens "${displayedTokens}" (length: ${totalLength}), starting? ${starting}` ); this.tokens += (tokens || ''); - const leftoverTokens = await this._feedTokens(); + + const leftoverTokens = await this._feedTokens(); /* do we need to start a timer to periodically feed tokens to the endpoint? */ if (starting && leftoverTokens > 0) { @@ -144,82 +154,92 @@ class TtsStreamingBuffer extends Emitter { */ async _feedTokens() { this.logger.debug('_feedTokens'); - if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) { - this.logger.debug('TTS stream is not open or no tokens to send'); - return this.tokens?.length || 0; - } - - if (this._connectionStatus === TtsStreamingConnectionStatus.NotConnected || - this._connectionStatus === TtsStreamingConnectionStatus.Failed) { - await this.start(); - return this.tokens.length; - } - if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) { - this.logger.debug('TtsStreamingBuffer:_feedTokens TTS stream is not ready, waiting for connect'); - return this.tokens.length; - } + try { + if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) { + this.logger.debug('TTS stream is not open or no tokens to send'); + return this.tokens?.length || 0; + } - // Helper function to find a sentence boundary - const findSentenceBoundary = (text, limit) => { - const sentenceEndRegex = /[.!?](?=\s|$)/g; - let lastSentenceBoundary = -1; - let match; + if (this._connectionStatus === TtsStreamingConnectionStatus.NotConnected || + this._connectionStatus === TtsStreamingConnectionStatus.Failed) { + await this.start(); + return this.tokens.length; + } - while ((match = sentenceEndRegex.exec(text)) && match.index < limit) { - // Ensure it's not a decimal point (e.g., "3.14") - if (match.index === 0 || !/\d$/.test(text[match.index - 1])) { - lastSentenceBoundary = match.index + 1; // Include the punctuation - } + if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) { + this.logger.debug('TtsStreamingBuffer:_feedTokens TTS stream is not ready, waiting for connect'); + return this.tokens.length; } - return lastSentenceBoundary; - }; - // Helper function to find a word boundary - const findWordBoundary = (text, limit) => { - const wordBoundaryRegex = /\s+/g; - let lastWordBoundary = -1; - let match; + // Helper function to find a sentence boundary + const findSentenceBoundary = (text, limit) => { + const sentenceEndRegex = /[.!?](?=\s|$)/g; + let lastSentenceBoundary = -1; + let match; + + while ((match = sentenceEndRegex.exec(text)) && match.index < limit) { + // Ensure it's not a decimal point (e.g., "3.14") + if (match.index === 0 || !/\d$/.test(text[match.index - 1])) { + lastSentenceBoundary = match.index + 1; // Include the punctuation + } + } + return lastSentenceBoundary; + }; - while ((match = wordBoundaryRegex.exec(text)) && match.index < limit) { - lastWordBoundary = match.index; - } - return lastWordBoundary; - }; + // Helper function to find a word boundary + const findWordBoundary = (text, limit) => { + const wordBoundaryRegex = /\s+/g; + let lastWordBoundary = -1; + let match; - // Try to find the best chunk to send - const limit = Math.min(MAX_CHUNK_SIZE, this.tokens.length); - let chunkEnd = findSentenceBoundary(this.tokens, limit); + while ((match = wordBoundaryRegex.exec(text)) && match.index < limit) { + lastWordBoundary = match.index; + } + return lastWordBoundary; + }; - if (chunkEnd === -1) { - // If no sentence boundary, try word boundary - chunkEnd = findWordBoundary(this.tokens, limit); - } + // Try to find the best chunk to send + const limit = Math.min(MAX_CHUNK_SIZE, this.tokens.length); + let chunkEnd = findSentenceBoundary(this.tokens, limit); - if (chunkEnd === -1) { - // If no boundaries at all, just take the max allowed - chunkEnd = limit; - } + if (chunkEnd === -1) { + // If no sentence boundary, try word boundary - const chunk = this.tokens.slice(0, chunkEnd); - this.tokens = this.tokens.slice(chunkEnd); // Remove sent chunk and trim whitespace + //TMP!! lets try forcing full sentences + this.logger.debug('TtsStreamingBuffer:_feedTokens: no sentence boundary found, waiting for full sentence'); + return this.tokens.length; + //chunkEnd = findWordBoundary(this.tokens, limit); + } - /* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */ - const modifiedChunk = chunk.replace(/\n\n/g, '\n \n'); - if (modifiedChunk.length > 0) { - try { - await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]); - } catch (err) { - this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk'); + if (chunkEnd === -1) { + // If no boundaries at all, just take the max allowed + chunkEnd = limit; } - this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, remaining: ${this.tokens.length}`); + const chunk = this.tokens.slice(0, chunkEnd); + this.tokens = this.tokens.slice(chunkEnd); // Remove sent chunk and trim whitespace - if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) { - this.logger.info('TtsStreamingBuffer:_feedTokens TTS streaming buffer is no longer full'); - this._isFull = false; - this.emit(TtsStreamingEvents.Resume); + /* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */ + const modifiedChunk = chunk.replace(/\n\n/g, '\n \n'); + if (modifiedChunk.length > 0) { + try { + await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]); + } catch (err) { + this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk'); + } + + this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, remaining: ${this.tokens.length}`); + + if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) { + this.logger.info('TtsStreamingBuffer:_feedTokens TTS streaming buffer is no longer full'); + this._isFull = false; + this.emit(TtsStreamingEvents.Resume); + } } + } catch (err) { + this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk'); + this.tokens = ''; } if (0 === this.tokens.length && this.timer) { @@ -231,18 +251,22 @@ class TtsStreamingBuffer extends Emitter { } async _api(ep, args) { - const res = await ep.api('uuid_deepgram_tts_streaming', `^^|${args.join('|')}`); + const apiCmd = `uuid_${this.vendor}_tts_streaming`; + const res = await ep.api(apiCmd, `^^|${args.join('|')}`); if (!res.body?.startsWith('+OK')) { - throw new Error({args}, `Error calling uuid_deepgram_tts_streaming: ${res.body}`); + throw new Error({args}, `Error calling ${apiCmd}: ${res.body}`); } } _onConnectFailure(vendor) { + this.logger.info(`streaming tts connection failed to ${vendor}`); + this._connectionStatus = TtsStreamingConnectionStatus.Failed; + this.tokens = ''; this.emit(TtsStreamingEvents.ConnectFailure, {vendor}); } async _onConnect(vendor) { - this.logger.debug('ws api connected'); + this.logger.info(`streaming tts connection made to ${vendor}`); this._connectionStatus = TtsStreamingConnectionStatus.Connected; if (this.tokens.length > 0) { await this._feedTokens(); @@ -257,15 +281,6 @@ class TtsStreamingBuffer extends Emitter { this.emit(TtsStreamingEvents.Empty, {vendor}); } - _initHandlers(ep) { - this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.Connect, - this._onConnect.bind(this, 'deepgram')); - this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.ConnectFailure, - this._onConnectFailure.bind(this, 'deepgram')); - this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.Empty, - this._onTtsEmpty.bind(this, 'deepgram')); - } - addCustomEventListener(ep, event, handler) { this.eventHandlers.push({ep, event, handler}); ep.addCustomEventListener(event, handler); @@ -275,6 +290,22 @@ class TtsStreamingBuffer extends Emitter { this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler)); } + _initHandlers(ep) { + [ + // DH: add other vendors here as modules are added + 'deepgram', + 'cartesia', + 'elevenlabs' + ].forEach((vendor) => { + const eventClassName = `${vendor.charAt(0).toUpperCase() + vendor.slice(1)}TtsStreamingEvents`; + const eventClass = require('../utils/constants')[eventClassName]; + if (!eventClass) throw new Error(`Event class for vendor ${vendor} not found`); + + this.addCustomEventListener(ep, eventClass.Connect, this._onConnect.bind(this, vendor)); + this.addCustomEventListener(ep, eventClass.ConnectFailure, this._onConnectFailure.bind(this, vendor)); + this.addCustomEventListener(ep, eventClass.Empty, this._onTtsEmpty.bind(this, vendor)); + }); + } } module.exports = TtsStreamingBuffer;