Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
davehorton committed Dec 5, 2024
1 parent a167610 commit 03e0a42
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 65 deletions.
70 changes: 48 additions & 22 deletions lib/session/call-session.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const {
KillReason,
RecordState,
AllowedSipRecVerbs,
AllowedConfirmSessionVerbs
AllowedConfirmSessionVerbs,
TtsStreamingEvents
} = require('../utils/constants');
const moment = require('moment');
const assert = require('assert');
Expand Down Expand Up @@ -112,8 +113,6 @@ class CallSession extends Emitter {

this._pool = srf.locals.dbHelpers.pool;

this.ttsStreamingBuffer = new TtsStreamingBuffer(this);

const handover = (newRequestor) => {
this.logger.info(`handover to new base url ${newRequestor.url}`);
this.requestor.removeAllListeners();
Expand Down Expand Up @@ -402,34 +401,27 @@ class CallSession extends Emitter {
get isTransferredCall() {
return this.application.transferredCall === true;
}

/**
* returns true if this session is a ConfirmCallSession
*/
get isAdultingCallSession() {
return this.constructor.name === 'AdultingCallSession';
}

/**
* returns true if this session is a ConfirmCallSession
*/
get isConfirmCallSession() {
return this.constructor.name === 'ConfirmCallSession';
}

/**
* returns true if this session is a SipRecCallSession
*/
get isSipRecCallSession() {
return this.constructor.name === 'SipRecCallSession';
}

/**
* returns true if this session is a SmsCallSession
*/
get isSmsCallSession() {
return this.constructor.name === 'SmsCallSession';
}
get isRestCallSession() {
return this.constructor.name === 'RestCallSession';
}
get InboundCallSession() {
return this.constructor.name === 'InboundCallSession';
}
get isNormalCallSession() {
return this.constructor.name === 'InboundCallSession' || this.constructor.name === 'RestCallSession';
}

get webhook_secret() {
return this.accountInfo?.account?.webhook_secret;
Expand Down Expand Up @@ -505,6 +497,10 @@ class CallSession extends Emitter {
this._sipRequestWithinDialogHook = url;
}

get isTtsStreamOpen() {
//TODO: also need to check for a background streaming say
return this.currentTask?.isStreamingTts;
}
// Bot Delay (actionHook delayed)
get actionHookDelayEnabled() {
return this._actionHookDelayEnabled;
Expand Down Expand Up @@ -1061,6 +1057,15 @@ class CallSession extends Emitter {
this.inbandDtmfEnabled = voipCarrier?.dtmf_type === 'tones';
}

if (this.isNormalCallSession) {
this.ttsStreamingBuffer = new TtsStreamingBuffer(this);
this.ttsStreamingBuffer.on(TtsStreamingEvents.Empty, this._onTtsStreamingEmpty.bind(this));
this.ttsStreamingBuffer.on(TtsStreamingEvents.ConnectFailure, this._onTtsStreamingConnectFailure.bind(this));
}
else {
this.logger.info(`CallSession:exec - not a normal call session: ${this.constructor.name}`);
}

while (this.tasks.length && !this.callGone) {
const taskNum = ++this.taskIdx;
const stackNum = this.stackIdx;
Expand Down Expand Up @@ -1643,11 +1648,11 @@ Duration=${duration} `
}

_lccTtsTokens(opts) {
this.ttsStreamingBuffer.bufferTokens(opts);
this.ttsStreamingBuffer?.bufferTokens(opts);
}

_lccTtsFlush(opts) {
this.ttsStreamingBuffer.flush(opts);
this.ttsStreamingBuffer?.flush(opts);
}

/**
Expand Down Expand Up @@ -2234,7 +2239,7 @@ Duration=${duration} `
this.backgroundTaskManager.stopAll();
this.clearOrRestoreActionHookDelayProcessor().catch((err) => {});

this.ttsStreamingBuffer.kill();
this.ttsStreamingBuffer?.kill();
}

/**
Expand Down Expand Up @@ -2769,6 +2774,27 @@ Duration=${duration} `
this.verbHookSpan = null;
}
}

_onTtsStreamingEmpty() {
const task = this.currentTask;
if (task && TaskName.Say === task.name) {
task.notifyTtsStreamIsEmpty();
}
}

async _onTtsStreamingConnectFailure(vendor) {
const {writeAlerts, AlertType} = this.srf.locals;
try {
await writeAlerts({
alert_type: AlertType.TTS_STREAMING_CONNECTION_FAILURE,
account_sid: this.accountSid,
vendor
});
} catch (error) {
this.logger.error({error}, 'Error writing WEBHOOK_CONNECTION_FAILURE alert');
}
this.logger.info({vendor}, 'CallSession:_onTtsStreamingConnectFailure - tts streaming connect failure');
}
}

module.exports = CallSession;
21 changes: 20 additions & 1 deletion lib/tasks/say.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,19 @@ class TaskSay extends TtsTask {
}

async handlingStreaming(cs, {ep}) {
//TODO: set channel variables for tts streaming vendor
const {vendor, voice, label} = this.getTtsVendorData(cs);
const credentials = cs.getSpeechCredentials(vendor, 'tts', label);
if (!credentials) {
throw new SpeechCredentialError(
`No text-to-speech service credentials for ${vendor} with labels: ${label} have been configured`);
}
const {api_key} = credentials;

// TODO: set channel variables for tts streaming vendor
await ep.set({
DEEPGRAM_API_KEY: api_key,
DEEPGRAM_TTS_STREAMING_MODEL: voice
});

await this.awaitTaskDone();
this.logger.info('TaskSay:handlingStreaming - done');
Expand Down Expand Up @@ -304,8 +315,16 @@ class TaskSay extends TtsTask {
delete attrs['cache_filename']; //no value in adding this to the span
span.setAttributes(attrs);
}

notifyTtsStreamIsEmpty() {
if (this.isStreamingTts && this.closeOnStreamEmpty) {
this.logger.info('TaskSay:notifyTtsStreamIsEmpty - stream is empty, killing task');
this.notifyTaskDone();
}
}
}


const spanMapping = {
// IMPORTANT!!! JAMBONZ WEBAPP WILL SHOW TEXT PERFECTLY IF THE SPAN NAME IS SMALLER OR EQUAL 25 CHARACTERS.
// EX: whisper.ratelim_reqs has length 20 <= 25 which is perfect
Expand Down
9 changes: 9 additions & 0 deletions lib/utils/constants.json
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,15 @@
"PartialMedia": "partial-media",
"FullMedia": "full-media"
},
"DeepgramTtsStreamingEvents": {
"Empty": "deepgram_tts_streaming::empty",
"ConnectFailure": "dedeepgram_tts_streaming::connect_failed",
"Connect": "deepgram_tts_streaming::connect"
},
"TtsStreamingEvents": {
"Empty": "tts_streaming::empty",
"ConnectFailure": "tts_streaming::connect_failed"
},
"MAX_SIMRINGS": 10,
"BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)",
"FS_UUID_SET_NAME": "fsUUIDs"
Expand Down
68 changes: 55 additions & 13 deletions lib/utils/tts-streaming-buffer.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
const Emitter = require('events');
const assert = require('assert');
const {DeepgramTtsStreamingEvents, TtsStreamingEvents} = require('../utils/constants');
const FEED_INTERVAL = 2000;
const MAX_CHUNK_SIZE = 2000;
const MAX_CHUNK_SIZE = 1800;

class TtsStreamingBuffer extends Emitter {
constructor(cs) {
super();
this.cs = cs;
this.logger = cs.logger;

this.tokens = null;
this.tokens = '';
this.eventHandlers = [];
}

get isEmpty() {
Expand All @@ -28,9 +30,19 @@ class TtsStreamingBuffer extends Emitter {
* Add tokens to the buffer and start feeding them to the endpoint if necessary.
*/
bufferTokens(tokens) {
const starting = this.tokens === null;
const starting = this.tokens === '';
const displayedTokens = tokens.length <= 40 ? tokens : tokens.substring(0, 40);
const totalLength = tokens.length;

this.logger.debug(
`Buffering tokens: "${displayedTokens}" (total length: ${totalLength}), starting? ${starting}`
);
this.tokens += (tokens || '');
const leftoverTokens = this.feedTokens();
const leftoverTokens = this._feedTokens();

if (this.eventHandlers.length === 0) {
this._initHandlers(this.ep);
}

/* do we need to start a timer to periodically feed tokens to the endpoint? */
if (starting && leftoverTokens > 0) {
Expand All @@ -46,10 +58,11 @@ class TtsStreamingBuffer extends Emitter {
}

flush() {
this.logger.info('Flushing TTS streaming buffer');
clearTimeout(this.timer);
this._api(this.ep, [this.ep.uuid, 'clear'])
.catch((err) => this.logger.info({err}, 'Error flushing TTS streaming'));
this.tokens = null;
this.tokens = '';
this.timer = null;
}

Expand All @@ -58,7 +71,9 @@ class TtsStreamingBuffer extends Emitter {
* Return the number of tokens left in the buffer.
*/
_feedTokens() {
this.logger.debug('_feedTokens');
if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) {
this.logger.debug('TTS stream is not open or no tokens to send');
return this.tokens?.length || 0;
}

Expand Down Expand Up @@ -106,15 +121,15 @@ class TtsStreamingBuffer extends Emitter {
const chunk = this.tokens.slice(0, chunkEnd);
this.tokens = this.tokens.slice(chunkEnd).trim(); // Remove sent chunk and trim whitespace

// Send the chunk to the endpoint
// TODO: abstract this into an endpoint method
this._api(this.ep, [this.ep.uuid, 'send', chunk])
.then(() => this.logger.debug(`Sent chunk: ${chunk}`))
/* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */
const modifiedChunk = chunk.replace(/\n\n/g, '\n \n');
this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk])
.then(() => this.logger.debug(`Sent tokens: ${chunk.substring(0, 40)}`))
.catch((err) => {
this.logger.info({err}, 'Error sending TTS chunk');
});


this.logger.debug(`_feedTokens: sent ${chunk.length}, Remaining: ${this.tokens.length}`);
return this.tokens.length;
}

Expand All @@ -127,13 +142,40 @@ class TtsStreamingBuffer extends Emitter {

kill() {
clearTimeout(this.timer);
this._api(this.ep, [this.ep.uuid, 'close'])
.catch((err) => this.logger.info({err}, 'Error closing TTS streaming'));
this.removeCustomEventListeners();
if (this.ep) {
this._api(this.ep, [this.ep.uuid, 'close'])
.catch((err) => this.logger.info({err}, 'Error closing TTS streaming'));
}
this.timer = null;
this.tokens = null;
this.tokens = '';
this.cs = null;
}

_onConnectFailure(vendor) {
this.emit(TtsStreamingEvents.ConnectFailure, {vendor});
}

_onTtsEmpty(vendor) {
this.emit(TtsStreamingEvents.Empty, {vendor});
}

_initHandlers(ep) {
this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.ConnectFailure,
this._onConnectFailure.bind(this, 'deepgram'));
this.addCustomEventListener(ep, DeepgramTtsStreamingEvents.Empty,
this._onTtsEmpty.bind(this, 'deepgram'));
}

addCustomEventListener(ep, event, handler) {
this.eventHandlers.push({ep, event, handler});
ep.addCustomEventListener(event, handler);
}

removeCustomEventListeners() {
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
}

}

module.exports = TtsStreamingBuffer;
2 changes: 1 addition & 1 deletion lib/utils/ws-requestor.js
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class WsRequestor extends BaseRequestor {

case 'command':
assert.ok(command, 'command property not supplied');
assert.ok(data || command === 'llm:tool-output', 'data property not supplied');
assert.ok(data || ['llm:tool-output', 'tts:flush'].includes(command), 'data property not supplied');
this._recvCommand(msgid, command, call_sid, queueCommand, tool_call_id, data);
break;

Expand Down
Loading

0 comments on commit 03e0a42

Please sign in to comment.