From 03e0a4290984d8219913cc92dbd34e4655c00826 Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Thu, 5 Dec 2024 17:13:36 -0500 Subject: [PATCH] wip --- lib/session/call-session.js | 70 +++++++++++++++++++++---------- lib/tasks/say.js | 21 +++++++++- lib/utils/constants.json | 9 ++++ lib/utils/tts-streaming-buffer.js | 68 ++++++++++++++++++++++++------ lib/utils/ws-requestor.js | 2 +- package-lock.json | 54 ++++++++++++------------ package.json | 4 +- 7 files changed, 163 insertions(+), 65 deletions(-) diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 1d75aa45..d2249eb6 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -9,7 +9,8 @@ const { KillReason, RecordState, AllowedSipRecVerbs, - AllowedConfirmSessionVerbs + AllowedConfirmSessionVerbs, + TtsStreamingEvents } = require('../utils/constants'); const moment = require('moment'); const assert = require('assert'); @@ -112,8 +113,6 @@ class CallSession extends Emitter { this._pool = srf.locals.dbHelpers.pool; - this.ttsStreamingBuffer = new TtsStreamingBuffer(this); - const handover = (newRequestor) => { this.logger.info(`handover to new base url ${newRequestor.url}`); this.requestor.removeAllListeners(); @@ -402,34 +401,27 @@ class CallSession extends Emitter { get isTransferredCall() { return this.application.transferredCall === true; } - - /** - * returns true if this session is a ConfirmCallSession - */ get isAdultingCallSession() { return this.constructor.name === 'AdultingCallSession'; } - - /** - * returns true if this session is a ConfirmCallSession - */ get isConfirmCallSession() { return this.constructor.name === 'ConfirmCallSession'; } - - /** - * returns true if this session is a SipRecCallSession - */ get isSipRecCallSession() { return this.constructor.name === 'SipRecCallSession'; } - - /** - * returns true if this session is a SmsCallSession - */ get isSmsCallSession() { return this.constructor.name === 'SmsCallSession'; } + get isRestCallSession() { + return this.constructor.name === 'RestCallSession'; + } + get InboundCallSession() { + return this.constructor.name === 'InboundCallSession'; + } + get isNormalCallSession() { + return this.constructor.name === 'InboundCallSession' || this.constructor.name === 'RestCallSession'; + } get webhook_secret() { return this.accountInfo?.account?.webhook_secret; @@ -505,6 +497,10 @@ class CallSession extends Emitter { this._sipRequestWithinDialogHook = url; } + get isTtsStreamOpen() { + //TODO: also need to check for a background streaming say + return this.currentTask?.isStreamingTts; + } // Bot Delay (actionHook delayed) get actionHookDelayEnabled() { return this._actionHookDelayEnabled; @@ -1061,6 +1057,15 @@ class CallSession extends Emitter { this.inbandDtmfEnabled = voipCarrier?.dtmf_type === 'tones'; } + if (this.isNormalCallSession) { + this.ttsStreamingBuffer = new TtsStreamingBuffer(this); + this.ttsStreamingBuffer.on(TtsStreamingEvents.Empty, this._onTtsStreamingEmpty.bind(this)); + this.ttsStreamingBuffer.on(TtsStreamingEvents.ConnectFailure, this._onTtsStreamingConnectFailure.bind(this)); + } + else { + this.logger.info(`CallSession:exec - not a normal call session: ${this.constructor.name}`); + } + while (this.tasks.length && !this.callGone) { const taskNum = ++this.taskIdx; const stackNum = this.stackIdx; @@ -1643,11 +1648,11 @@ Duration=${duration} ` } _lccTtsTokens(opts) { - this.ttsStreamingBuffer.bufferTokens(opts); + this.ttsStreamingBuffer?.bufferTokens(opts); } _lccTtsFlush(opts) { - this.ttsStreamingBuffer.flush(opts); + this.ttsStreamingBuffer?.flush(opts); } /** @@ -2234,7 +2239,7 @@ Duration=${duration} ` this.backgroundTaskManager.stopAll(); this.clearOrRestoreActionHookDelayProcessor().catch((err) => {}); - this.ttsStreamingBuffer.kill(); + this.ttsStreamingBuffer?.kill(); } /** @@ -2769,6 +2774,27 @@ Duration=${duration} ` this.verbHookSpan = null; } } + + _onTtsStreamingEmpty() { + const task = this.currentTask; + if (task && TaskName.Say === task.name) { + task.notifyTtsStreamIsEmpty(); + } + } + + async _onTtsStreamingConnectFailure(vendor) { + const {writeAlerts, AlertType} = this.srf.locals; + try { + await writeAlerts({ + alert_type: AlertType.TTS_STREAMING_CONNECTION_FAILURE, + account_sid: this.accountSid, + vendor + }); + } catch (error) { + this.logger.error({error}, 'Error writing WEBHOOK_CONNECTION_FAILURE alert'); + } + this.logger.info({vendor}, 'CallSession:_onTtsStreamingConnectFailure - tts streaming connect failure'); + } } module.exports = CallSession; diff --git a/lib/tasks/say.js b/lib/tasks/say.js index 7b3b9aed..a6073bcf 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -101,8 +101,19 @@ class TaskSay extends TtsTask { } async handlingStreaming(cs, {ep}) { - //TODO: set channel variables for tts streaming vendor + const {vendor, voice, label} = this.getTtsVendorData(cs); + const credentials = cs.getSpeechCredentials(vendor, 'tts', label); + if (!credentials) { + throw new SpeechCredentialError( + `No text-to-speech service credentials for ${vendor} with labels: ${label} have been configured`); + } + const {api_key} = credentials; + // TODO: set channel variables for tts streaming vendor + await ep.set({ + DEEPGRAM_API_KEY: api_key, + DEEPGRAM_TTS_STREAMING_MODEL: voice + }); await this.awaitTaskDone(); this.logger.info('TaskSay:handlingStreaming - done'); @@ -304,8 +315,16 @@ class TaskSay extends TtsTask { delete attrs['cache_filename']; //no value in adding this to the span span.setAttributes(attrs); } + + notifyTtsStreamIsEmpty() { + if (this.isStreamingTts && this.closeOnStreamEmpty) { + this.logger.info('TaskSay:notifyTtsStreamIsEmpty - stream is empty, killing task'); + this.notifyTaskDone(); + } + } } + const spanMapping = { // IMPORTANT!!! JAMBONZ WEBAPP WILL SHOW TEXT PERFECTLY IF THE SPAN NAME IS SMALLER OR EQUAL 25 CHARACTERS. // EX: whisper.ratelim_reqs has length 20 <= 25 which is perfect diff --git a/lib/utils/constants.json b/lib/utils/constants.json index 090a7285..3b28110a 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -226,6 +226,15 @@ "PartialMedia": "partial-media", "FullMedia": "full-media" }, + "DeepgramTtsStreamingEvents": { + "Empty": "deepgram_tts_streaming::empty", + "ConnectFailure": "dedeepgram_tts_streaming::connect_failed", + "Connect": "deepgram_tts_streaming::connect" + }, + "TtsStreamingEvents": { + "Empty": "tts_streaming::empty", + "ConnectFailure": "tts_streaming::connect_failed" + }, "MAX_SIMRINGS": 10, "BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)", "FS_UUID_SET_NAME": "fsUUIDs" diff --git a/lib/utils/tts-streaming-buffer.js b/lib/utils/tts-streaming-buffer.js index 7d3c19b3..ebd400fb 100644 --- a/lib/utils/tts-streaming-buffer.js +++ b/lib/utils/tts-streaming-buffer.js @@ -1,7 +1,8 @@ const Emitter = require('events'); const assert = require('assert'); +const {DeepgramTtsStreamingEvents, TtsStreamingEvents} = require('../utils/constants'); const FEED_INTERVAL = 2000; -const MAX_CHUNK_SIZE = 2000; +const MAX_CHUNK_SIZE = 1800; class TtsStreamingBuffer extends Emitter { constructor(cs) { @@ -9,7 +10,8 @@ class TtsStreamingBuffer extends Emitter { this.cs = cs; this.logger = cs.logger; - this.tokens = null; + this.tokens = ''; + this.eventHandlers = []; } get isEmpty() { @@ -28,9 +30,19 @@ class TtsStreamingBuffer extends Emitter { * Add tokens to the buffer and start feeding them to the endpoint if necessary. */ bufferTokens(tokens) { - const starting = this.tokens === null; + const starting = this.tokens === ''; + const displayedTokens = tokens.length <= 40 ? tokens : tokens.substring(0, 40); + const totalLength = tokens.length; + + this.logger.debug( + `Buffering tokens: "${displayedTokens}" (total length: ${totalLength}), starting? ${starting}` + ); this.tokens += (tokens || ''); - const leftoverTokens = this.feedTokens(); + const leftoverTokens = this._feedTokens(); + + if (this.eventHandlers.length === 0) { + this._initHandlers(this.ep); + } /* do we need to start a timer to periodically feed tokens to the endpoint? */ if (starting && leftoverTokens > 0) { @@ -46,10 +58,11 @@ class TtsStreamingBuffer extends Emitter { } flush() { + this.logger.info('Flushing TTS streaming buffer'); clearTimeout(this.timer); this._api(this.ep, [this.ep.uuid, 'clear']) .catch((err) => this.logger.info({err}, 'Error flushing TTS streaming')); - this.tokens = null; + this.tokens = ''; this.timer = null; } @@ -58,7 +71,9 @@ class TtsStreamingBuffer extends Emitter { * Return the number of tokens left in the buffer. */ _feedTokens() { + this.logger.debug('_feedTokens'); if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) { + this.logger.debug('TTS stream is not open or no tokens to send'); return this.tokens?.length || 0; } @@ -106,15 +121,15 @@ class TtsStreamingBuffer extends Emitter { const chunk = this.tokens.slice(0, chunkEnd); this.tokens = this.tokens.slice(chunkEnd).trim(); // Remove sent chunk and trim whitespace - // Send the chunk to the endpoint - // TODO: abstract this into an endpoint method - this._api(this.ep, [this.ep.uuid, 'send', chunk]) - .then(() => this.logger.debug(`Sent chunk: ${chunk}`)) + /* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */ + const modifiedChunk = chunk.replace(/\n\n/g, '\n \n'); + this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]) + .then(() => this.logger.debug(`Sent tokens: ${chunk.substring(0, 40)}`)) .catch((err) => { this.logger.info({err}, 'Error sending TTS chunk'); }); - + this.logger.debug(`_feedTokens: sent ${chunk.length}, Remaining: ${this.tokens.length}`); return this.tokens.length; } @@ -127,13 +142,40 @@ class TtsStreamingBuffer extends Emitter { kill() { clearTimeout(this.timer); - this._api(this.ep, [this.ep.uuid, 'close']) - .catch((err) => this.logger.info({err}, 'Error closing TTS streaming')); + this.removeCustomEventListeners(); + if (this.ep) { + this._api(this.ep, [this.ep.uuid, 'close']) + .catch((err) => this.logger.info({err}, 'Error closing TTS streaming')); + } this.timer = null; - this.tokens = null; + this.tokens = ''; this.cs = null; } + _onConnectFailure(vendor) { + this.emit(TtsStreamingEvents.ConnectFailure, {vendor}); + } + + _onTtsEmpty(vendor) { + this.emit(TtsStreamingEvents.Empty, {vendor}); + } + + _initHandlers(ep) { + this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.ConnectFailure, + this._onConnectFailure.bind(this, 'deepgram')); + this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.Empty, + this._onTtsEmpty.bind(this, 'deepgram')); + } + + addCustomEventListener(ep, event, handler) { + this.eventHandlers.push({ep, event, handler}); + ep.addCustomEventListener(event, handler); + } + + removeCustomEventListeners() { + this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler)); + } + } module.exports = TtsStreamingBuffer; diff --git a/lib/utils/ws-requestor.js b/lib/utils/ws-requestor.js index c010d434..b002b794 100644 --- a/lib/utils/ws-requestor.js +++ b/lib/utils/ws-requestor.js @@ -408,7 +408,7 @@ class WsRequestor extends BaseRequestor { case 'command': assert.ok(command, 'command property not supplied'); - assert.ok(data || command === 'llm:tool-output', 'data property not supplied'); + assert.ok(data || ['llm:tool-output', 'tts:flush'].includes(command), 'data property not supplied'); this._recvCommand(msgid, command, call_sid, queueCommand, tool_call_id, data); break; diff --git a/package-lock.json b/package-lock.json index 3f12c9bd..b7bbfe22 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,8 +17,8 @@ "@jambonz/realtimedb-helpers": "^0.8.8", "@jambonz/speech-utils": "^0.1.22", "@jambonz/stats-collector": "^0.1.10", - "@jambonz/time-series": "^0.2.9", - "@jambonz/verb-specifications": "^0.0.88", + "@jambonz/time-series": "^0.2.13", + "@jambonz/verb-specifications": "^0.0.89", "@opentelemetry/api": "^1.8.0", "@opentelemetry/exporter-jaeger": "^1.23.0", "@opentelemetry/exporter-trace-otlp-http": "^0.50.0", @@ -1572,18 +1572,18 @@ } }, "node_modules/@jambonz/time-series": { - "version": "0.2.9", - "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.9.tgz", - "integrity": "sha512-+/Oal0mjjV4iQ8q0ymtvVJP+GqgGpYUb2bc/FD/xDxOzKtl340l9yoM3oczREJg5IvEkpExz6NogJzUiCSeVnQ==", + "version": "0.2.13", + "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.13.tgz", + "integrity": "sha512-Kj+l+YUnI27zZA4qoPRzjN7L82W7GuMXYq9ttDjXQ0ZBIdOLAzJjB6R3jJ3b+mvoNEQ6qG5MUtfoc6CpTFH5lw==", "dependencies": { "debug": "^4.3.1", "influx": "^5.9.3" } }, "node_modules/@jambonz/verb-specifications": { - "version": "0.0.88", - "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.88.tgz", - "integrity": "sha512-gbJn/sT1DKJspB6EjnEpMWQBi983YYUbE9+9Ycfim6FUeMwD/BDoSNYkQNAxMDX8+s6NqQCMZYGOmb5qY5ZwzQ==", + "version": "0.0.89", + "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.89.tgz", + "integrity": "sha512-zMLZYp3DtOHImHAR4HDDEdPujomJ6pqvK14PFZ2/RKZZMUoMbs2XxYIpKLPvir1StYwT+WHauttnn/YIYAnxvw==", "dependencies": { "debug": "^4.3.4", "pino": "^8.8.0" @@ -3556,10 +3556,11 @@ "integrity": "sha512-3lqz5YjWTYnW6dlDa5TLaTCcShfar1e40rmcJVwCBJC6mWlFuj0eCHIElmG1g5kyuJ/GD+8Wn4FFCcz4gJPfaQ==" }, "node_modules/cross-spawn": { - "version": "7.0.3", - "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", - "integrity": "sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==", + "version": "7.0.6", + "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", + "integrity": "sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==", "dev": true, + "license": "MIT", "dependencies": { "path-key": "^3.1.0", "shebang-command": "^2.0.0", @@ -3877,9 +3878,10 @@ } }, "node_modules/drachtio-fsmrf": { - "version": "3.0.45", - "resolved": "https://registry.npmjs.org/drachtio-fsmrf/-/drachtio-fsmrf-3.0.45.tgz", - "integrity": "sha512-YsrevTwGvg9v0OQXB4gZedlmzegB83fyd3NX3X2x7NXsKa5jO6TZCvZVHtBf/jR/ELE3B0aVLpxHw2YviRMIuQ==", + "version": "3.0.46", + "resolved": "https://registry.npmjs.org/drachtio-fsmrf/-/drachtio-fsmrf-3.0.46.tgz", + "integrity": "sha512-ScgyOnsOL45feuKKquT2Gij/jDRdjVha2TnQ6/Me2/M/C+29c9W7cdYlt3UZB3GyodazBukwIy5RrZf1iuXBGw==", + "license": "MIT", "dependencies": { "camel-case": "^4.1.2", "debug": "^2.6.9", @@ -10742,18 +10744,18 @@ } }, "@jambonz/time-series": { - "version": "0.2.9", - "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.9.tgz", - "integrity": "sha512-+/Oal0mjjV4iQ8q0ymtvVJP+GqgGpYUb2bc/FD/xDxOzKtl340l9yoM3oczREJg5IvEkpExz6NogJzUiCSeVnQ==", + "version": "0.2.13", + "resolved": "https://registry.npmjs.org/@jambonz/time-series/-/time-series-0.2.13.tgz", + "integrity": "sha512-Kj+l+YUnI27zZA4qoPRzjN7L82W7GuMXYq9ttDjXQ0ZBIdOLAzJjB6R3jJ3b+mvoNEQ6qG5MUtfoc6CpTFH5lw==", "requires": { "debug": "^4.3.1", "influx": "^5.9.3" } }, "@jambonz/verb-specifications": { - "version": "0.0.88", - "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.88.tgz", - "integrity": "sha512-gbJn/sT1DKJspB6EjnEpMWQBi983YYUbE9+9Ycfim6FUeMwD/BDoSNYkQNAxMDX8+s6NqQCMZYGOmb5qY5ZwzQ==", + "version": "0.0.89", + "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.89.tgz", + "integrity": "sha512-zMLZYp3DtOHImHAR4HDDEdPujomJ6pqvK14PFZ2/RKZZMUoMbs2XxYIpKLPvir1StYwT+WHauttnn/YIYAnxvw==", "requires": { "debug": "^4.3.4", "pino": "^8.8.0" @@ -12240,9 +12242,9 @@ "integrity": "sha512-3lqz5YjWTYnW6dlDa5TLaTCcShfar1e40rmcJVwCBJC6mWlFuj0eCHIElmG1g5kyuJ/GD+8Wn4FFCcz4gJPfaQ==" }, "cross-spawn": { - "version": "7.0.3", - "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", - "integrity": "sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==", + "version": "7.0.6", + "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", + "integrity": "sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==", "dev": true, "requires": { "path-key": "^3.1.0", @@ -12464,9 +12466,9 @@ } }, "drachtio-fsmrf": { - "version": "3.0.45", - "resolved": "https://registry.npmjs.org/drachtio-fsmrf/-/drachtio-fsmrf-3.0.45.tgz", - "integrity": "sha512-YsrevTwGvg9v0OQXB4gZedlmzegB83fyd3NX3X2x7NXsKa5jO6TZCvZVHtBf/jR/ELE3B0aVLpxHw2YviRMIuQ==", + "version": "3.0.46", + "resolved": "https://registry.npmjs.org/drachtio-fsmrf/-/drachtio-fsmrf-3.0.46.tgz", + "integrity": "sha512-ScgyOnsOL45feuKKquT2Gij/jDRdjVha2TnQ6/Me2/M/C+29c9W7cdYlt3UZB3GyodazBukwIy5RrZf1iuXBGw==", "requires": { "camel-case": "^4.1.2", "debug": "^2.6.9", diff --git a/package.json b/package.json index 9ba6180b..e2d92253 100644 --- a/package.json +++ b/package.json @@ -33,8 +33,8 @@ "@jambonz/realtimedb-helpers": "^0.8.8", "@jambonz/speech-utils": "^0.1.22", "@jambonz/stats-collector": "^0.1.10", - "@jambonz/time-series": "^0.2.9", - "@jambonz/verb-specifications": "^0.0.88", + "@jambonz/time-series": "^0.2.13", + "@jambonz/verb-specifications": "^0.0.89", "@opentelemetry/api": "^1.8.0", "@opentelemetry/exporter-jaeger": "^1.23.0", "@opentelemetry/exporter-trace-otlp-http": "^0.50.0",