diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 1d939a2d..80c83d0b 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -9,7 +9,8 @@ const { KillReason, RecordState, AllowedSipRecVerbs, - AllowedConfirmSessionVerbs + AllowedConfirmSessionVerbs, + TtsStreamingEvents } = require('../utils/constants'); const moment = require('moment'); const assert = require('assert'); @@ -21,6 +22,7 @@ const listTaskNames = require('../utils/summarize-tasks'); const HttpRequestor = require('../utils/http-requestor'); const WsRequestor = require('../utils/ws-requestor'); const ActionHookDelayProcessor = require('../utils/action-hook-delay'); +const TtsStreamingBuffer = require('../utils/tts-streaming-buffer'); const {parseUri} = require('drachtio-srf'); const { JAMBONES_INJECT_CONTENT, @@ -399,34 +401,27 @@ class CallSession extends Emitter { get isTransferredCall() { return this.application.transferredCall === true; } - - /** - * returns true if this session is a ConfirmCallSession - */ get isAdultingCallSession() { return this.constructor.name === 'AdultingCallSession'; } - - /** - * returns true if this session is a ConfirmCallSession - */ get isConfirmCallSession() { return this.constructor.name === 'ConfirmCallSession'; } - - /** - * returns true if this session is a SipRecCallSession - */ get isSipRecCallSession() { return this.constructor.name === 'SipRecCallSession'; } - - /** - * returns true if this session is a SmsCallSession - */ get isSmsCallSession() { return this.constructor.name === 'SmsCallSession'; } + get isRestCallSession() { + return this.constructor.name === 'RestCallSession'; + } + get InboundCallSession() { + return this.constructor.name === 'InboundCallSession'; + } + get isNormalCallSession() { + return this.constructor.name === 'InboundCallSession' || this.constructor.name === 'RestCallSession'; + } get webhook_secret() { return this.accountInfo?.account?.webhook_secret; @@ -440,6 +435,10 @@ class CallSession extends Emitter { return this.backgroundTaskManager.isTaskRunning('bargeIn'); } + get isTtsStreamEnabled() { + return this.backgroundTaskManager.isTaskRunning('ttsStream'); + } + get isListenEnabled() { return this.backgroundTaskManager.isTaskRunning('listen'); } @@ -502,6 +501,10 @@ class CallSession extends Emitter { this._sipRequestWithinDialogHook = url; } + get isTtsStreamOpen() { + return this.currentTask?.isStreamingTts || + this.backgroundTaskManager.getTask('ttsStream')?.isStreamingTts; + } // Bot Delay (actionHook delayed) get actionHookDelayEnabled() { return this._actionHookDelayEnabled; @@ -576,6 +579,27 @@ class CallSession extends Emitter { } } + getTsStreamingVendor() { + let v; + if (this.currentTask?.isStreamingTts) { + const {vendor} = this.currentTask.getTtsVendorData(this); + v = vendor; + } + else if (this.backgroundTaskManager.getTask('ttsStream')?.isStreamingTts) { + const {vendor} = this.backgroundTaskManager.getTask('ttsStream').getTtsVendorData(this); + v = vendor; + } + + //TMP!!! + return 'cartesia'; + } + + get appIsUsingWebsockets() { + return this.requestor instanceof WsRequestor; + } + + /* end of getters and setters */ + async clearOrRestoreActionHookDelayProcessor() { if (this._actionHookDelayProcessor) { await this._actionHookDelayProcessor.stop(); @@ -793,6 +817,32 @@ class CallSession extends Emitter { } } + async enableBackgroundTtsStream(say) { + try { + if (this.isTtsStreamEnabled) { + this.logger.debug('CallSession:enableBackgroundTtsStream - ttsStream currently enabled, ignoring request'); + } else if (this.appIsUsingWebsockets && this.isNormalCallSession) { + await this.backgroundTaskManager.newTask('ttsStream', say); + this.logger.debug('CallSession:enableBackgroundTtsStream - ttsStream enabled'); + } else { + this.logger.debug( + 'CallSession:enableBackgroundTtsStream - ignoring request as call does not have required conditions'); + } + } catch (err) { + this.logger.info({err, say}, 'CallSession:enableBackgroundTtsStream - Error creating background tts stream task'); + } + } + + async disableTtsStream() { + if (this.isTtsStreamEnabled) { + this.backgroundTaskManager.stop('ttsStream'); + this.logger.debug('CallSession:disableTtsStream - ttsStream disabled'); + } + } + async clearTtsStream() { + this.ttsStreamingBuffer?.clear(); + } + async enableBotMode(gather, autoEnable) { try { let task; @@ -1052,6 +1102,17 @@ class CallSession extends Emitter { this.inbandDtmfEnabled = voipCarrier?.dtmf_type === 'tones'; } + if (this.isNormalCallSession) { + this.ttsStreamingBuffer = new TtsStreamingBuffer(this); + this.ttsStreamingBuffer.on(TtsStreamingEvents.Empty, this._onTtsStreamingEmpty.bind(this)); + this.ttsStreamingBuffer.on(TtsStreamingEvents.Pause, this._onTtsStreamingPause.bind(this)); + this.ttsStreamingBuffer.on(TtsStreamingEvents.Resume, this._onTtsStreamingResume.bind(this)); + this.ttsStreamingBuffer.on(TtsStreamingEvents.ConnectFailure, this._onTtsStreamingConnectFailure.bind(this)); + } + else { + this.logger.info(`CallSession:exec - not a normal call session: ${this.constructor.name}`); + } + while (this.tasks.length && !this.callGone) { const taskNum = ++this.taskIdx; const stackNum = this.stackIdx; @@ -1635,6 +1696,39 @@ Duration=${duration} ` .catch((err) => this.logger.error(err, 'CallSession:_lccLlmUpdate')); } + async _lccTtsTokens(opts) { + const {id, tokens} = opts; + + if (id === undefined) { + this.logger.info({opts}, 'CallSession:_lccTtsTokens - invalid command since id is missing'); + return; + } + else if (tokens === undefined) { + this.logger.info({opts}, 'CallSession:_lccTtsTokens - invalid command since id is missing'); + return this.requestor.request('tts:tokens-result', '/tokens-result', { + id, + status: 'failed', + reason: 'missing tokens' + }).catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending')); + } + let res; + try { + res = await this.ttsStreamingBuffer?.bufferTokens(tokens); + this.logger.info({id, res}, 'CallSession:_lccTtsTokens - tts:tokens-result'); + } catch (err) { + this.logger.info(err, 'CallSession:_lccTtsTokens'); + } + this.requestor.request('tts:tokens-result', '/tokens-result', {id, ...res}) + .catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending')); + } + + _lccTtsFlush(opts) { + this.ttsStreamingBuffer?.flush(opts); + } + + _lccTtsClear(opts) { + this.ttsStreamingBuffer?.clear(opts); + } /** * perform call hangup by jambonz @@ -2016,6 +2110,18 @@ Duration=${duration} ` this._lccLlmUpdate(data, call_sid); break; + case 'tts:tokens': + this._lccTtsTokens(data); + break; + + case 'tts:flush': + this._lccTtsFlush(data); + break; + + case 'tts:clear': + this._lccTtsClear(data); + break; + default: this.logger.info(`CallSession:_onCommand - invalid command ${command}`); } @@ -2211,6 +2317,8 @@ Duration=${duration} ` // close all background tasks this.backgroundTaskManager.stopAll(); this.clearOrRestoreActionHookDelayProcessor().catch((err) => {}); + + this.ttsStreamingBuffer?.stop(); } /** @@ -2745,6 +2853,37 @@ Duration=${duration} ` this.verbHookSpan = null; } } + + _onTtsStreamingEmpty() { + const task = this.currentTask; + if (task && TaskName.Say === task.name) { + task.notifyTtsStreamIsEmpty(); + } + } + + _onTtsStreamingPause() { + this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_paused'}) + .catch((err) => this.logger.info({err}, 'CallSession:_onTtsStreamingPause - Error sending')); + } + + _onTtsStreamingResume() { + this.requestor?.request('tts:streaming-event', 'streaming-event', {event_type: 'stream_resumed'}) + .catch((err) => this.logger.info({err}, 'CallSession:_onTtsStreamingResume - Error sending')); + } + + async _onTtsStreamingConnectFailure(vendor) { + const {writeAlerts, AlertType} = this.srf.locals; + try { + await writeAlerts({ + alert_type: AlertType.TTS_STREAMING_CONNECTION_FAILURE, + account_sid: this.accountSid, + vendor + }); + } catch (error) { + this.logger.error({error}, 'Error writing WEBHOOK_CONNECTION_FAILURE alert'); + } + this.logger.info({vendor}, 'CallSession:_onTtsStreamingConnectFailure - tts streaming connect failure'); + } } module.exports = CallSession; diff --git a/lib/tasks/config.js b/lib/tasks/config.js index 8f163062..72ec3c4b 100644 --- a/lib/tasks/config.js +++ b/lib/tasks/config.js @@ -16,7 +16,8 @@ class TaskConfig extends Task { 'fillerNoise', 'actionHookDelayAction', 'boostAudioSignal', - 'vad' + 'vad', + 'ttsStream' ].forEach((k) => this[k] = this.data[k] || {}); if ('notifyEvents' in this.data) { @@ -45,6 +46,12 @@ class TaskConfig extends Task { }; delete this.transcribeOpts.enable; } + if (this.ttsStream.enable) { + this.sayOpts = { + verb: 'say', + stream: true + }; + } if (this.data.reset) { if (typeof this.data.reset === 'string') this.data.reset = [this.data.reset]; @@ -75,6 +82,7 @@ class TaskConfig extends Task { get hasVad() { return Object.keys(this.vad).length; } get hasFillerNoise() { return Object.keys(this.fillerNoise).length; } get hasReferHook() { return Object.keys(this.data).includes('referHook'); } + get hasTtsStream() { return Object.keys(this.ttsStream).length; } get summary() { const phrase = []; @@ -106,6 +114,9 @@ class TaskConfig extends Task { if (this.onHoldMusic) phrase.push(`onHoldMusic: ${this.onHoldMusic}`); if ('boostAudioSignal' in this.data) phrase.push(`setGain ${this.data.boostAudioSignal}`); if (this.hasReferHook) phrase.push('set referHook'); + if (this.hasTtsStream) { + phrase.push(`${this.ttsStream.enable ? 'enable' : 'disable'} ttsStream`); + } return `${this.name}{${phrase.join(',')}}`; } @@ -305,6 +316,22 @@ class TaskConfig extends Task { if (this.hasReferHook) { cs.referHook = this.data.referHook; } + + if (this.ttsStream.enable && this.sayOpts) { + this.sayOpts.synthesizer = this.hasSynthesizer ? this.synthesizer : { + vendor: cs.speechSynthesisVendor, + language: cs.speechSynthesisLanguage, + voice: cs.speechSynthesisVoice, + ...(cs.speechSynthesisLabel && { + label: cs.speechSynthesisLabel + }) + }; + this.logger.info({opts: this.gatherOpts}, 'Config: enabling ttsStream'); + cs.enableBackgroundTtsStream(this.sayOpts); + } else if (!this.ttsStream.enable) { + this.logger.info('Config: disabling ttsStream'); + cs.disableTtsStream(); + } } async kill(cs) { diff --git a/lib/tasks/gather.js b/lib/tasks/gather.js index 7153ac1f..e89c9a74 100644 --- a/lib/tasks/gather.js +++ b/lib/tasks/gather.js @@ -723,6 +723,7 @@ class TaskGather extends SttTask { this._fillerNoiseOn = false; // in a race, if we just started audio it may sneak through here this.ep.api('uuid_break', this.ep.uuid) .catch((err) => this.logger.info(err, 'Error killing audio')); + cs.clearTtsStream(); } return; } @@ -1170,7 +1171,6 @@ class TaskGather extends SttTask { } catch (err) { /*already logged error*/ } // Gather got response from hook, cancel actionHookDelay processing - this.logger.debug('TaskGather:_resolve - checking ahd'); if (this.cs.actionHookDelayProcessor) { if (returnedVerbs) { this.logger.debug('TaskGather:_resolve - got response from action hook, cancelling actionHookDelay'); diff --git a/lib/tasks/say.js b/lib/tasks/say.js index f1d3ad3b..591af887 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -1,3 +1,4 @@ +const assert = require('assert'); const TtsTask = require('./tts-task'); const {TaskName, TaskPreconditions} = require('../utils/constants'); const pollySSMLSplit = require('polly-ssml-split'); @@ -35,24 +36,40 @@ class TaskSay extends TtsTask { super(logger, opts, parentTask); this.preconditions = TaskPreconditions.Endpoint; - this.text = (Array.isArray(this.data.text) ? this.data.text : [this.data.text]) - .map((t) => breakLengthyTextIfNeeded(this.logger, t)) - .flat(); + assert.ok((typeof this.data.text === 'string' || Array.isArray(this.data.text)) || this.data.stream === true, + 'Say: either text or stream:true is required'); - this.loop = this.data.loop || 1; - this.isHandledByPrimaryProvider = true; + + if (this.data.stream === true) { + this._isStreamingTts = true; + this.closeOnStreamEmpty = this.data.closeOnStreamEmpty !== false; + } + else { + this._isStreamingTts = false; + this.text = (Array.isArray(this.data.text) ? this.data.text : [this.data.text]) + .map((t) => breakLengthyTextIfNeeded(this.logger, t)) + .flat(); + + this.loop = this.data.loop || 1; + this.isHandledByPrimaryProvider = true; + } } get name() { return TaskName.Say; } get summary() { - for (let i = 0; i < this.text.length; i++) { - if (this.text[i].startsWith('silence_stream')) continue; - return `${this.name}{text=${this.text[i].slice(0, 15)}${this.text[i].length > 15 ? '...' : ''}}`; + if (this.isStreamingTts) return `${this.name} streaming`; + else { + for (let i = 0; i < this.text.length; i++) { + if (this.text[i].startsWith('silence_stream')) continue; + return `${this.name}{text=${this.text[i].slice(0, 15)}${this.text[i].length > 15 ? '...' : ''}}`; + } + return `${this.name}{${this.text[0]}}`; } - return `${this.name}{${this.text[0]}}`; } + get isStreamingTts() { return this._isStreamingTts; } + _validateURL(urlString) { try { new URL(urlString); @@ -63,14 +80,19 @@ class TaskSay extends TtsTask { } async exec(cs, obj) { + if (this.isStreamingTts && !cs.appIsUsingWebsockets) { + throw new Error('Say: streaming say verb requires applications to use the websocket API'); + } + try { - await this.handling(cs, obj); + if (this.isStreamingTts) await this.handlingStreaming(cs, obj); + else await this.handling(cs, obj); this.emit('playDone'); } catch (error) { if (error instanceof SpeechCredentialError) { // if say failed due to speech credentials, alarm is writtern and error notification is sent // finished this say to move to next task. - this.logger.info('Say failed due to SpeechCredentialError, finished!'); + this.logger.info({error}, 'Say failed due to SpeechCredentialError, finished!'); this.emit('playDone'); return; } @@ -78,6 +100,32 @@ class TaskSay extends TtsTask { } } + async handlingStreaming(cs, {ep}) { + const {vendor, language, voice, label} = this.getTtsVendorData(cs); + const credentials = cs.getSpeechCredentials(vendor, 'tts', label); + if (!credentials) { + throw new SpeechCredentialError( + `No text-to-speech service credentials for ${vendor} with labels: ${label} have been configured`); + } + + try { + //TMP!! + await this.setTtsStreamingChannelVars(/*vendor*/ 'cartesia', language, voice, credentials, ep); + cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_open'}) + .catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending')); + } catch (err) { + this.logger.info({err}, 'TaskSay:handlingStreaming - Error setting channel vars'); + cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'}) + .catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending')); + + //TODO: send tts:streaming-event with error? + this.notifyTaskDone(); + } + + await this.awaitTaskDone(); + this.logger.info('TaskSay:handlingStreaming - done'); + } + async handling(cs, {ep}) { const {srf, accountSid:account_sid, callSid:target_sid} = cs; const {writeAlerts, AlertType} = srf.locals; @@ -96,7 +144,7 @@ class TaskSay extends TtsTask { let voice = this.synthesizer.voice && this.synthesizer.voice !== 'default' ? this.synthesizer.voice : cs.speechSynthesisVoice; - let label = this.taskInlcudeSynthesizer ? this.synthesizer.label : cs.speechSynthesisLabel; + let label = this.taskIncludeSynthesizer ? this.synthesizer.label : cs.speechSynthesisLabel; const fallbackVendor = this.synthesizer.fallbackVendor && this.synthesizer.fallbackVendor !== 'default' ? this.synthesizer.fallbackVendor : @@ -107,7 +155,7 @@ class TaskSay extends TtsTask { const fallbackVoice = this.synthesizer.fallbackVoice && this.synthesizer.fallbackVoice !== 'default' ? this.synthesizer.fallbackVoice : cs.fallbackSpeechSynthesisVoice; - const fallbackLabel = this.taskInlcudeSynthesizer ? + const fallbackLabel = this.taskIncludeSynthesizer ? this.synthesizer.fallbackLabel : cs.fallbackSpeechSynthesisLabel; if (cs.hasFallbackTts) { @@ -253,6 +301,7 @@ class TaskSay extends TtsTask { this._playResolve = null; } } + this.notifyTaskDone(); } _addStreamingTtsAttributes(span, evt) { @@ -273,6 +322,13 @@ class TaskSay extends TtsTask { delete attrs['cache_filename']; //no value in adding this to the span span.setAttributes(attrs); } + + notifyTtsStreamIsEmpty() { + if (this.isStreamingTts && this.closeOnStreamEmpty) { + this.logger.info('TaskSay:notifyTtsStreamIsEmpty - stream is empty, killing task'); + this.notifyTaskDone(); + } + } } const spanMapping = { diff --git a/lib/tasks/tts-task.js b/lib/tasks/tts-task.js index 65fe96a7..f6d49b0d 100644 --- a/lib/tasks/tts-task.js +++ b/lib/tasks/tts-task.js @@ -13,11 +13,11 @@ class TtsTask extends Task { this.earlyMedia = this.data.earlyMedia === true || (parentTask && parentTask.earlyMedia); /** - * Task use taskInlcudeSynthesizer to identify - * if taskInlcudeSynthesizer === true, use label from verb.synthesizer, even it's empty - * if taskInlcudeSynthesizer === false, use label from application.synthesizer + * Task use taskIncludeSynthesizer to identify + * if taskIncludeSynthesizer === true, use label from verb.synthesizer, even it's empty + * if taskIncludeSynthesizer === false, use label from application.synthesizer */ - this.taskInlcudeSynthesizer = !!this.data.synthesizer; + this.taskIncludeSynthesizer = !!this.data.synthesizer; this.synthesizer = this.data.synthesizer || {}; this.disableTtsCache = this.data.disableTtsCache; this.options = this.synthesizer.options || {}; @@ -44,6 +44,53 @@ class TtsTask extends Task { } } + getTtsVendorData(cs) { + const vendor = this.synthesizer.vendor && this.synthesizer.vendor !== 'default' ? + this.synthesizer.vendor : + cs.speechSynthesisVendor; + const language = this.synthesizer.language && this.synthesizer.language !== 'default' ? + this.synthesizer.language : + cs.speechSynthesisLanguage ; + const voice = this.synthesizer.voice && this.synthesizer.voice !== 'default' ? + this.synthesizer.voice : + cs.speechSynthesisVoice; + const label = this.taskIncludeSynthesizer ? this.synthesizer.label : cs.speechSynthesisLabel; + return {vendor, language, voice, label}; + } + + async setTtsStreamingChannelVars(vendor, language, voice, credentials, ep) { + let {api_key, cartesia_model_id, cartesia_voice_id} = credentials; + let obj; + + switch (vendor) { + case 'deepgram': + obj = { + DEEPGRAM_API_KEY: api_key, + DEEPGRAM_TTS_STREAMING_MODEL: voice + }; + break; + case 'cartesia': + //TMP!! + cartesia_model_id = 'sonic'; + cartesia_voice_id = 'f785af04-229c-4a7c-b71b-f3194c7f08bb'; + api_key = 'sk_car_gYBQoqC19S8gK2lLHhnTT'; + //TMP!! + + obj = { + CARTESIA_API_KEY: api_key, + CARTESIA_TTS_STREAMING_MODEL_ID: cartesia_model_id, + CARTESIA_TTS_STREAMING_VOICE_ID: cartesia_voice_id, + CARTESIA_TTS_STREAMING_LANGUAGE: language || 'en' + }; + break; + default: + throw new Error(`vendor ${vendor} is not supported for tts streaming yet`); + } + this.logger.info({vendor, credentials, obj}, 'setTtsStreamingChannelVars'); + + await ep.set(obj); + } + async _synthesizeWithSpecificVendor(cs, ep, {vendor, language, voice, label, preCache = false}) { const {srf, accountSid:account_sid} = cs; const {writeAlerts, AlertType, stats} = srf.locals; diff --git a/lib/utils/background-task-manager.js b/lib/utils/background-task-manager.js index a83eade1..60fde746 100644 --- a/lib/utils/background-task-manager.js +++ b/lib/utils/background-task-manager.js @@ -46,6 +46,9 @@ class BackgroundTaskManager extends Emitter { case 'transcribe': task = await this._initTranscribe(opts); break; + case 'ttsStream': + task = await this._initTtsStream(opts); + break; default: break; } @@ -173,6 +176,25 @@ class BackgroundTaskManager extends Emitter { return task; } + // Initiate Tts Stream + async _initTtsStream(opts) { + let task; + try { + const t = normalizeJambones(this.logger, [opts]); + task = makeTask(this.logger, t[0]); + const resources = await this.cs._evaluatePreconditions(task); + const {span, ctx} = this.rootSpan.startChildSpan(`background-ttsStream:${task.summary}`); + task.span = span; + task.ctx = ctx; + task.exec(this.cs, resources) + .then(this._taskCompleted.bind(this, 'ttsStream', task)) + .catch(this._taskError.bind(this, 'ttsStream', task)); + } catch (err) { + this.logger.info(err, 'BackgroundTaskManager:_initTtsStream - Error creating ttsStream task'); + } + return task; + } + _taskCompleted(type, task) { this.logger.debug({type, task}, `BackgroundTaskManager:_taskCompleted: task completed, sticky: ${task.sticky}`); task.removeAllListeners(); diff --git a/lib/utils/constants.json b/lib/utils/constants.json index c3904d91..8a0087b8 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -203,6 +203,8 @@ "verb:status", "llm:event", "llm:tool-call", + "tts:tokens-result", + "tts:streaming-event", "jambonz:error" ], "RecordState": { @@ -226,6 +228,33 @@ "PartialMedia": "partial-media", "FullMedia": "full-media" }, + "DeepgramTtsStreamingEvents": { + "Empty": "deepgram_tts_streaming::empty", + "ConnectFailure": "deepgram_tts_streaming::connect_failed", + "Connect": "deepgram_tts_streaming::connect" + }, + "CartesiaTtsStreamingEvents": { + "Empty": "cartesia_tts_streaming::empty", + "ConnectFailure": "cartesia_tts_streaming::connect_failed", + "Connect": "cartesia_tts_streaming::connect" + }, + "ElevenlabsTtsStreamingEvents": { + "Empty": "elevenlabs_tts_streaming::empty", + "ConnectFailure": "elevenlabs_tts_streaming::connect_failed", + "Connect": "elevenlabs_tts_streaming::connect" + }, + "TtsStreamingEvents": { + "Empty": "tts_streaming::empty", + "Pause": "tts_streaming::pause", + "Resume": "tts_streaming::resume", + "ConnectFailure": "tts_streaming::connect_failed" + }, + "TtsStreamingConnectionStatus": { + "NotConnected": "not_connected", + "Connected": "connected", + "Connecting": "connecting", + "Failed": "failed" + }, "MAX_SIMRINGS": 10, "BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)", "FS_UUID_SET_NAME": "fsUUIDs", diff --git a/lib/utils/transcription-utils.js b/lib/utils/transcription-utils.js index d19e8f41..61226201 100644 --- a/lib/utils/transcription-utils.js +++ b/lib/utils/transcription-utils.js @@ -185,7 +185,10 @@ const selectDefaultGoogleModel = (task, language, version) => { (useV2 ? 'long' : 'latest_long'); }; const consolidateTranscripts = (bufferedTranscripts, channel, language, vendor) => { - if (bufferedTranscripts.length === 1) return bufferedTranscripts[0]; + if (bufferedTranscripts.length === 1) { + bufferedTranscripts[0].is_final = true; + return bufferedTranscripts[0]; + } let totalConfidence = 0; const finalTranscript = bufferedTranscripts.reduce((acc, evt) => { totalConfidence += evt.alternatives[0].confidence; diff --git a/lib/utils/tts-streaming-buffer.js b/lib/utils/tts-streaming-buffer.js new file mode 100644 index 00000000..5a3b297a --- /dev/null +++ b/lib/utils/tts-streaming-buffer.js @@ -0,0 +1,311 @@ +const Emitter = require('events'); +const assert = require('assert'); +const { + TtsStreamingEvents, + TtsStreamingConnectionStatus +} = require('../utils/constants'); +const FEED_INTERVAL = 2000; +const MAX_CHUNK_SIZE = 1800; +const HIGH_WATER_BUFFER_SIZE = 5000; +const LOW_WATER_BUFFER_SIZE = 1000; + + +class TtsStreamingBuffer extends Emitter { + constructor(cs) { + super(); + this.cs = cs; + this.logger = cs.logger; + + this.tokens = ''; + this.eventHandlers = []; + this._isFull = false; + this._connectionStatus = TtsStreamingConnectionStatus.NotConnected; + this._flushPending = false; + } + + get isEmpty() { + return this.tokens.length === 0; + } + + get isFull() { + return this._isFull; + } + + get size() { + return this.tokens.length; + } + + get ep() { + return this.cs?.ep; + } + + async start() { + assert.ok( + this._connectionStatus === TtsStreamingConnectionStatus.NotConnected, + 'TtsStreamingBuffer:start already started, or has failed'); + + this.vendor = this.cs.getTsStreamingVendor(); + if (!this.vendor) { + this.logger.info('TtsStreamingBuffer:start No TTS streaming vendor configured'); + throw new Error('No TTS streaming vendor configured'); + } + + this.logger.info(`TtsStreamingBuffer:start Connecting to TTS streaming with vendor ${this.vendor}`); + + this._connectionStatus = TtsStreamingConnectionStatus.Connecting; + try { + if (this.eventHandlers.length === 0) this._initHandlers(this.ep); + await this._api(this.ep, [this.ep.uuid, 'connect']); + } catch (err) { + this.logger.info({err}, 'TtsStreamingBuffer:start Error connecting to TTS streaming'); + this._connectionStatus = TtsStreamingConnectionStatus.Failed; + } + } + + stop() { + clearTimeout(this.timer); + this.removeCustomEventListeners(); + if (this.ep) { + this._api(this.ep, [this.ep.uuid, 'close']) + .catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:kill Error closing TTS streaming')); + } + this.timer = null; + this.tokens = ''; + this._connectionStatus = TtsStreamingConnectionStatus.NotConnected; + } + + /** + * Add tokens to the buffer and start feeding them to the endpoint if necessary. + */ + async bufferTokens(tokens) { + + if (this._connectionStatus === TtsStreamingConnectionStatus.Failed) { + this.logger.info('TtsStreamingBuffer:bufferTokens TTS streaming connection failed, rejecting request'); + return {status: 'failed', reason: `connection to ${this.vendor} failed`}; + } + + const starting = this.tokens === ''; + const displayedTokens = tokens.length <= 40 ? tokens : tokens.substring(0, 40); + const totalLength = tokens.length; + + /* if we crossed the high water mark, reject the request */ + if (this.tokens.length + totalLength > HIGH_WATER_BUFFER_SIZE) { + this.logger.info( + `TtsStreamingBuffer:bufferTokensTTS buffer is full, rejecting request to buffer ${totalLength} tokens`); + + if (!this._isFull) { + this._isFull = true; + this.emit(TtsStreamingEvents.Pause); + } + return {status: 'failed', reason: 'full'}; + } + + this.logger.debug( + `TtsStreamingBuffer:bufferTokens "${displayedTokens}" (length: ${totalLength}), starting? ${starting}` + ); + this.tokens += (tokens || ''); + + const leftoverTokens = await this._feedTokens(); + + /* do we need to start a timer to periodically feed tokens to the endpoint? */ + if (starting && leftoverTokens > 0) { + assert(!this.timer); + this.timer = setInterval(async() => { + const remaining = await this._feedTokens(); + if (remaining === 0) { + clearInterval(this.timer); + this.timer = null; + } + }, FEED_INTERVAL); + } + + return {status: 'ok'}; + } + + flush() { + this.logger.debug('TtsStreamingBuffer:flush'); + if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) { + this.logger.debug('TtsStreamingBuffer:flush TTS stream is not quite ready - wait for connect'); + this._flushPending = true; + return; + } + else if (this._connectionStatus === TtsStreamingConnectionStatus.Connected) { + this._api(this.ep, [this.ep.uuid, 'flush']) + .catch((err) => this.logger.info({err}, + `TtsStreamingBuffer:flush Error flushing TTS streaming: ${JSON.stringify(err)}`)); + } + } + + clear() { + this.logger.debug('TtsStreamingBuffer:clear'); + + if (this._connectionStatus !== TtsStreamingConnectionStatus.Connected) return; + clearTimeout(this.timer); + this._api(this.ep, [this.ep.uuid, 'clear']) + .catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:clear Error clearing TTS streaming')); + this.tokens = ''; + this.timer = null; + this._isFull = false; + } + + /** + * Send the next chunk of tokens to the endpoint (max 2000 chars) + * Return the number of tokens left in the buffer. + */ + async _feedTokens() { + this.logger.debug('_feedTokens'); + + try { + if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) { + this.logger.debug('TTS stream is not open or no tokens to send'); + return this.tokens?.length || 0; + } + + if (this._connectionStatus === TtsStreamingConnectionStatus.NotConnected || + this._connectionStatus === TtsStreamingConnectionStatus.Failed) { + await this.start(); + return this.tokens.length; + } + + if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) { + this.logger.debug('TtsStreamingBuffer:_feedTokens TTS stream is not ready, waiting for connect'); + return this.tokens.length; + } + + // Helper function to find a sentence boundary + const findSentenceBoundary = (text, limit) => { + const sentenceEndRegex = /[.!?](?=\s|$)/g; + let lastSentenceBoundary = -1; + let match; + + while ((match = sentenceEndRegex.exec(text)) && match.index < limit) { + // Ensure it's not a decimal point (e.g., "3.14") + if (match.index === 0 || !/\d$/.test(text[match.index - 1])) { + lastSentenceBoundary = match.index + 1; // Include the punctuation + } + } + return lastSentenceBoundary; + }; + + // Helper function to find a word boundary + const findWordBoundary = (text, limit) => { + const wordBoundaryRegex = /\s+/g; + let lastWordBoundary = -1; + let match; + + while ((match = wordBoundaryRegex.exec(text)) && match.index < limit) { + lastWordBoundary = match.index; + } + return lastWordBoundary; + }; + + // Try to find the best chunk to send + const limit = Math.min(MAX_CHUNK_SIZE, this.tokens.length); + let chunkEnd = findSentenceBoundary(this.tokens, limit); + + if (chunkEnd === -1) { + // If no sentence boundary, try word boundary + + //TMP!! lets try forcing full sentences + this.logger.debug('TtsStreamingBuffer:_feedTokens: no sentence boundary found, waiting for full sentence'); + return this.tokens.length; + //chunkEnd = findWordBoundary(this.tokens, limit); + } + + if (chunkEnd === -1) { + // If no boundaries at all, just take the max allowed + chunkEnd = limit; + } + + const chunk = this.tokens.slice(0, chunkEnd); + this.tokens = this.tokens.slice(chunkEnd); // Remove sent chunk and trim whitespace + + /* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */ + const modifiedChunk = chunk.replace(/\n\n/g, '\n \n'); + if (modifiedChunk.length > 0) { + try { + await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]); + } catch (err) { + this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk'); + } + + this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, remaining: ${this.tokens.length}`); + + if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) { + this.logger.info('TtsStreamingBuffer:_feedTokens TTS streaming buffer is no longer full'); + this._isFull = false; + this.emit(TtsStreamingEvents.Resume); + } + } + } catch (err) { + this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk'); + this.tokens = ''; + } + + if (0 === this.tokens.length && this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + + return this.tokens.length; + } + + async _api(ep, args) { + const apiCmd = `uuid_${this.vendor}_tts_streaming`; + const res = await ep.api(apiCmd, `^^|${args.join('|')}`); + if (!res.body?.startsWith('+OK')) { + throw new Error({args}, `Error calling ${apiCmd}: ${res.body}`); + } + } + + _onConnectFailure(vendor) { + this.logger.info(`streaming tts connection failed to ${vendor}`); + this._connectionStatus = TtsStreamingConnectionStatus.Failed; + this.tokens = ''; + this.emit(TtsStreamingEvents.ConnectFailure, {vendor}); + } + + async _onConnect(vendor) { + this.logger.info(`streaming tts connection made to ${vendor}`); + this._connectionStatus = TtsStreamingConnectionStatus.Connected; + if (this.tokens.length > 0) { + await this._feedTokens(); + } + if (this._flushPending) { + this.flush(); + this._flushPending = false; + } + } + + _onTtsEmpty(vendor) { + this.emit(TtsStreamingEvents.Empty, {vendor}); + } + + addCustomEventListener(ep, event, handler) { + this.eventHandlers.push({ep, event, handler}); + ep.addCustomEventListener(event, handler); + } + + removeCustomEventListeners() { + this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler)); + } + + _initHandlers(ep) { + [ + // DH: add other vendors here as modules are added + 'deepgram', + 'cartesia', + 'elevenlabs' + ].forEach((vendor) => { + const eventClassName = `${vendor.charAt(0).toUpperCase() + vendor.slice(1)}TtsStreamingEvents`; + const eventClass = require('../utils/constants')[eventClassName]; + if (!eventClass) throw new Error(`Event class for vendor ${vendor} not found`); + + this.addCustomEventListener(ep, eventClass.Connect, this._onConnect.bind(this, vendor)); + this.addCustomEventListener(ep, eventClass.ConnectFailure, this._onConnectFailure.bind(this, vendor)); + this.addCustomEventListener(ep, eventClass.Empty, this._onTtsEmpty.bind(this, vendor)); + }); + } +} + +module.exports = TtsStreamingBuffer; diff --git a/lib/utils/ws-requestor.js b/lib/utils/ws-requestor.js index c010d434..6b97a9a2 100644 --- a/lib/utils/ws-requestor.js +++ b/lib/utils/ws-requestor.js @@ -12,6 +12,20 @@ const { JAMBONES_WS_MAX_PAYLOAD, HTTP_USER_AGENT_HEADER } = require('../config'); +const MTYPE_WANTS_ACK = [ + 'call:status', + 'verb:status', + 'jambonz:error', + 'llm:event', + 'llm:tool-call', + 'tts:streaming-event', + 'tts:tokens-result', +]; +const MTYPE_NO_DATA = [ + 'llm:tool-output', + 'tts:flush', + 'tts:clear' +]; class WsRequestor extends BaseRequestor { constructor(logger, account_sid, hook, secret) { @@ -44,7 +58,7 @@ class WsRequestor extends BaseRequestor { async request(type, hook, params, httpHeaders = {}) { assert(HookMsgTypes.includes(type)); const url = hook.url || hook; - const wantsAck = !['call:status', 'verb:status', 'jambonz:error', 'llm:event', 'llm:tool-call'].includes(type); + const wantsAck = !MTYPE_WANTS_ACK.includes(type); if (this.maliciousClient) { this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client'); @@ -408,7 +422,7 @@ class WsRequestor extends BaseRequestor { case 'command': assert.ok(command, 'command property not supplied'); - assert.ok(data || command === 'llm:tool-output', 'data property not supplied'); + assert.ok(data || MTYPE_NO_DATA.includes(command), 'data property not supplied'); this._recvCommand(msgid, command, call_sid, queueCommand, tool_call_id, data); break; diff --git a/package-lock.json b/package-lock.json index 1b561e71..873bd9ac 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,8 +17,8 @@ "@jambonz/realtimedb-helpers": "^0.8.8", "@jambonz/speech-utils": "^0.1.22", "@jambonz/stats-collector": "^0.1.10", - "@jambonz/time-series": "^0.2.12", - "@jambonz/verb-specifications": "^0.0.86", + "@jambonz/time-series": "^0.2.13", + "@jambonz/verb-specifications": "^0.0.89", "@opentelemetry/api": "^1.8.0", "@opentelemetry/exporter-jaeger": "^1.23.0", "@opentelemetry/exporter-trace-otlp-http": "^0.50.0", @@ -1572,19 +1572,18 @@ } }, "node_modules/@jambonz/time-series": { - "version": "0.2.12", - "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.12.tgz", - "integrity": "sha512-EYw5f1QasblWrP2K/NabpJYkQm8XOCP1fem8luO8c7Jm8YTBwI+Ge3zB7rpU8ruoVdbrTot/pcihhTqzq4IYqA==", - "license": "MIT", + "version": "0.2.13", + "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.13.tgz", + "integrity": "sha512-Kj+l+YUnI27zZA4qoPRzjN7L82W7GuMXYq9ttDjXQ0ZBIdOLAzJjB6R3jJ3b+mvoNEQ6qG5MUtfoc6CpTFH5lw==", "dependencies": { "debug": "^4.3.1", "influx": "^5.9.3" } }, "node_modules/@jambonz/verb-specifications": { - "version": "0.0.86", - "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.86.tgz", - "integrity": "sha512-nTMMeFJtkSIVD3icQajOKv+zFBiAaUNuky2htVgYKipE2AIq/H/SsdurYdBij3KQq1o58/FuUuXcqCwjnZnoUg==", + "version": "0.0.89", + "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.89.tgz", + "integrity": "sha512-zMLZYp3DtOHImHAR4HDDEdPujomJ6pqvK14PFZ2/RKZZMUoMbs2XxYIpKLPvir1StYwT+WHauttnn/YIYAnxvw==", "dependencies": { "debug": "^4.3.4", "pino": "^8.8.0" @@ -3557,10 +3556,11 @@ "integrity": "sha512-3lqz5YjWTYnW6dlDa5TLaTCcShfar1e40rmcJVwCBJC6mWlFuj0eCHIElmG1g5kyuJ/GD+8Wn4FFCcz4gJPfaQ==" }, "node_modules/cross-spawn": { - "version": "7.0.3", - "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", - "integrity": "sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==", + "version": "7.0.6", + "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", + "integrity": "sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==", "dev": true, + "license": "MIT", "dependencies": { "path-key": "^3.1.0", "shebang-command": "^2.0.0", @@ -10744,18 +10744,18 @@ } }, "@jambonz/time-series": { - "version": "0.2.12", - "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.12.tgz", - "integrity": "sha512-EYw5f1QasblWrP2K/NabpJYkQm8XOCP1fem8luO8c7Jm8YTBwI+Ge3zB7rpU8ruoVdbrTot/pcihhTqzq4IYqA==", + "version": "0.2.13", + "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.13.tgz", + "integrity": "sha512-Kj+l+YUnI27zZA4qoPRzjN7L82W7GuMXYq9ttDjXQ0ZBIdOLAzJjB6R3jJ3b+mvoNEQ6qG5MUtfoc6CpTFH5lw==", "requires": { "debug": "^4.3.1", "influx": "^5.9.3" } }, "@jambonz/verb-specifications": { - "version": "0.0.86", - "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.86.tgz", - "integrity": "sha512-nTMMeFJtkSIVD3icQajOKv+zFBiAaUNuky2htVgYKipE2AIq/H/SsdurYdBij3KQq1o58/FuUuXcqCwjnZnoUg==", + "version": "0.0.89", + "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.89.tgz", + "integrity": "sha512-zMLZYp3DtOHImHAR4HDDEdPujomJ6pqvK14PFZ2/RKZZMUoMbs2XxYIpKLPvir1StYwT+WHauttnn/YIYAnxvw==", "requires": { "debug": "^4.3.4", "pino": "^8.8.0" @@ -12242,9 +12242,9 @@ "integrity": "sha512-3lqz5YjWTYnW6dlDa5TLaTCcShfar1e40rmcJVwCBJC6mWlFuj0eCHIElmG1g5kyuJ/GD+8Wn4FFCcz4gJPfaQ==" }, "cross-spawn": { - "version": "7.0.3", - "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", - "integrity": "sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==", + "version": "7.0.6", + "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", + "integrity": "sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==", "dev": true, "requires": { "path-key": "^3.1.0", diff --git a/package.json b/package.json index e619b269..85950aa1 100644 --- a/package.json +++ b/package.json @@ -33,8 +33,8 @@ "@jambonz/realtimedb-helpers": "^0.8.8", "@jambonz/speech-utils": "^0.1.22", "@jambonz/stats-collector": "^0.1.10", - "@jambonz/time-series": "^0.2.12", - "@jambonz/verb-specifications": "^0.0.86", + "@jambonz/time-series": "^0.2.13", + "@jambonz/verb-specifications": "^0.0.89", "@opentelemetry/api": "^1.8.0", "@opentelemetry/exporter-jaeger": "^1.23.0", "@opentelemetry/exporter-trace-otlp-http": "^0.50.0",