Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/tts streaming #994

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 141 additions & 17 deletions lib/session/call-session.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const {
KillReason,
RecordState,
AllowedSipRecVerbs,
AllowedConfirmSessionVerbs
AllowedConfirmSessionVerbs,
TtsStreamingEvents
} = require('../utils/constants');
const moment = require('moment');
const assert = require('assert');
Expand All @@ -21,6 +22,7 @@ const listTaskNames = require('../utils/summarize-tasks');
const HttpRequestor = require('../utils/http-requestor');
const WsRequestor = require('../utils/ws-requestor');
const ActionHookDelayProcessor = require('../utils/action-hook-delay');
const TtsStreamingBuffer = require('../utils/tts-streaming-buffer');
const {parseUri} = require('drachtio-srf');
const {
JAMBONES_INJECT_CONTENT,
Expand Down Expand Up @@ -399,34 +401,27 @@ class CallSession extends Emitter {
get isTransferredCall() {
return this.application.transferredCall === true;
}

/**
* returns true if this session is a ConfirmCallSession
*/
get isAdultingCallSession() {
return this.constructor.name === 'AdultingCallSession';
}

/**
* returns true if this session is a ConfirmCallSession
*/
get isConfirmCallSession() {
return this.constructor.name === 'ConfirmCallSession';
}

/**
* returns true if this session is a SipRecCallSession
*/
get isSipRecCallSession() {
return this.constructor.name === 'SipRecCallSession';
}

/**
* returns true if this session is a SmsCallSession
*/
get isSmsCallSession() {
return this.constructor.name === 'SmsCallSession';
}
get isRestCallSession() {
return this.constructor.name === 'RestCallSession';
}
get InboundCallSession() {
return this.constructor.name === 'InboundCallSession';
}
get isNormalCallSession() {
return this.constructor.name === 'InboundCallSession' || this.constructor.name === 'RestCallSession';
}

get webhook_secret() {
return this.accountInfo?.account?.webhook_secret;
Expand All @@ -440,6 +435,10 @@ class CallSession extends Emitter {
return this.backgroundTaskManager.isTaskRunning('bargeIn');
}

get isTtsStreamEnabled() {
return this.backgroundTaskManager.isTaskRunning('ttsStream');
}

get isListenEnabled() {
return this.backgroundTaskManager.isTaskRunning('listen');
}
Expand Down Expand Up @@ -502,6 +501,10 @@ class CallSession extends Emitter {
this._sipRequestWithinDialogHook = url;
}

get isTtsStreamOpen() {
return this.currentTask?.isStreamingTts ||
this.backgroundTaskManager.getTask('ttsStream')?.isStreamingTts;
}
// Bot Delay (actionHook delayed)
get actionHookDelayEnabled() {
return this._actionHookDelayEnabled;
Expand Down Expand Up @@ -576,6 +579,12 @@ class CallSession extends Emitter {
}
}

get appIsUsingWebsockets() {
return this.requestor instanceof WsRequestor;
}

/* end of getters and setters */

async clearOrRestoreActionHookDelayProcessor() {
if (this._actionHookDelayProcessor) {
await this._actionHookDelayProcessor.stop();
Expand Down Expand Up @@ -793,6 +802,32 @@ class CallSession extends Emitter {
}
}

async enableBackgroundTtsStream(say) {
try {
if (this.isTtsStreamEnabled) {
this.logger.debug('CallSession:enableBackgroundTtsStream - ttsStream currently enabled, ignoring request');
} else if (this.appIsUsingWebsockets && this.isNormalCallSession) {
await this.backgroundTaskManager.newTask('ttsStream', say);
this.logger.debug('CallSession:enableBackgroundTtsStream - ttsStream enabled');
} else {
this.logger.debug(
'CallSession:enableBackgroundTtsStream - ignoring request as call does not have required conditions');
}
} catch (err) {
this.logger.info({err, say}, 'CallSession:enableBackgroundTtsStream - Error creating background tts stream task');
}
}

async disableTtsStream() {
if (this.isTtsStreamEnabled) {
this.backgroundTaskManager.stop('ttsStream');
this.logger.debug('CallSession:disableTtsStream - ttsStream disabled');
}
}
async clearTtsStream() {
this.ttsStreamingBuffer?.clear();
}

async enableBotMode(gather, autoEnable) {
try {
let task;
Expand Down Expand Up @@ -1052,6 +1087,17 @@ class CallSession extends Emitter {
this.inbandDtmfEnabled = voipCarrier?.dtmf_type === 'tones';
}

if (this.isNormalCallSession) {
this.ttsStreamingBuffer = new TtsStreamingBuffer(this);
this.ttsStreamingBuffer.on(TtsStreamingEvents.Empty, this._onTtsStreamingEmpty.bind(this));
this.ttsStreamingBuffer.on(TtsStreamingEvents.Pause, this._onTtsStreamingPause.bind(this));
this.ttsStreamingBuffer.on(TtsStreamingEvents.Resume, this._onTtsStreamingResume.bind(this));
this.ttsStreamingBuffer.on(TtsStreamingEvents.ConnectFailure, this._onTtsStreamingConnectFailure.bind(this));
}
else {
this.logger.info(`CallSession:exec - not a normal call session: ${this.constructor.name}`);
}

while (this.tasks.length && !this.callGone) {
const taskNum = ++this.taskIdx;
const stackNum = this.stackIdx;
Expand Down Expand Up @@ -1635,6 +1681,39 @@ Duration=${duration} `
.catch((err) => this.logger.error(err, 'CallSession:_lccLlmUpdate'));
}

async _lccTtsTokens(opts) {
const {id, tokens} = opts;

if (id === undefined) {
this.logger.info({opts}, 'CallSession:_lccTtsTokens - invalid command since id is missing');
return;
}
else if (tokens === undefined) {
this.logger.info({opts}, 'CallSession:_lccTtsTokens - invalid command since id is missing');
return this.requestor.request('tts:tokens-result', '/tokens-result', {
id,
status: 'failed',
reason: 'missing tokens'
}).catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending'));
}
let res;
try {
res = await this.ttsStreamingBuffer?.bufferTokens(tokens);
this.logger.info({id, res}, 'CallSession:_lccTtsTokens - tts:tokens-result');
} catch (err) {
this.logger.info(err, 'CallSession:_lccTtsTokens');
}
this.requestor.request('tts:tokens-result', '/tokens-result', {id, ...res})
.catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending'));
}

_lccTtsFlush(opts) {
this.ttsStreamingBuffer?.flush(opts);
}

_lccTtsClear(opts) {
this.ttsStreamingBuffer?.clear(opts);
}

/**
* perform call hangup by jambonz
Expand Down Expand Up @@ -2016,6 +2095,18 @@ Duration=${duration} `
this._lccLlmUpdate(data, call_sid);
break;

case 'tts:tokens':
this._lccTtsTokens(data);
break;

case 'tts:flush':
this._lccTtsFlush(data);
break;

case 'tts:clear':
this._lccTtsClear(data);
break;

default:
this.logger.info(`CallSession:_onCommand - invalid command ${command}`);
}
Expand Down Expand Up @@ -2211,6 +2302,8 @@ Duration=${duration} `
// close all background tasks
this.backgroundTaskManager.stopAll();
this.clearOrRestoreActionHookDelayProcessor().catch((err) => {});

this.ttsStreamingBuffer?.stop();
}

/**
Expand Down Expand Up @@ -2745,6 +2838,37 @@ Duration=${duration} `
this.verbHookSpan = null;
}
}

_onTtsStreamingEmpty() {
const task = this.currentTask;
if (task && TaskName.Say === task.name) {
task.notifyTtsStreamIsEmpty();
}
}

_onTtsStreamingPause() {
this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_paused'})
.catch((err) => this.logger.info({err}, 'CallSession:_onTtsStreamingPause - Error sending'));
}

_onTtsStreamingResume() {
this.requestor?.request('tts:streaming-event', 'streaming-event', {event_type: 'stream_resumed'})
.catch((err) => this.logger.info({err}, 'CallSession:_onTtsStreamingResume - Error sending'));
}

async _onTtsStreamingConnectFailure(vendor) {
const {writeAlerts, AlertType} = this.srf.locals;
try {
await writeAlerts({
alert_type: AlertType.TTS_STREAMING_CONNECTION_FAILURE,
account_sid: this.accountSid,
vendor
});
} catch (error) {
this.logger.error({error}, 'Error writing WEBHOOK_CONNECTION_FAILURE alert');
}
this.logger.info({vendor}, 'CallSession:_onTtsStreamingConnectFailure - tts streaming connect failure');
}
}

module.exports = CallSession;
29 changes: 28 additions & 1 deletion lib/tasks/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class TaskConfig extends Task {
'fillerNoise',
'actionHookDelayAction',
'boostAudioSignal',
'vad'
'vad',
'ttsStream'
].forEach((k) => this[k] = this.data[k] || {});

if ('notifyEvents' in this.data) {
Expand Down Expand Up @@ -45,6 +46,12 @@ class TaskConfig extends Task {
};
delete this.transcribeOpts.enable;
}
if (this.ttsStream.enable) {
this.sayOpts = {
verb: 'say',
stream: true
};
}

if (this.data.reset) {
if (typeof this.data.reset === 'string') this.data.reset = [this.data.reset];
Expand Down Expand Up @@ -75,6 +82,7 @@ class TaskConfig extends Task {
get hasVad() { return Object.keys(this.vad).length; }
get hasFillerNoise() { return Object.keys(this.fillerNoise).length; }
get hasReferHook() { return Object.keys(this.data).includes('referHook'); }
get hasTtsStream() { return Object.keys(this.ttsStream).length; }

get summary() {
const phrase = [];
Expand Down Expand Up @@ -106,6 +114,9 @@ class TaskConfig extends Task {
if (this.onHoldMusic) phrase.push(`onHoldMusic: ${this.onHoldMusic}`);
if ('boostAudioSignal' in this.data) phrase.push(`setGain ${this.data.boostAudioSignal}`);
if (this.hasReferHook) phrase.push('set referHook');
if (this.hasTtsStream) {
phrase.push(`${this.ttsStream.enable ? 'enable' : 'disable'} ttsStream`);
}
return `${this.name}{${phrase.join(',')}}`;
}

Expand Down Expand Up @@ -305,6 +316,22 @@ class TaskConfig extends Task {
if (this.hasReferHook) {
cs.referHook = this.data.referHook;
}

if (this.ttsStream.enable && this.sayOpts) {
this.sayOpts.synthesizer = this.hasSynthesizer ? this.synthesizer : {
vendor: cs.speechSynthesisVendor,
language: cs.speechSynthesisLanguage,
voice: cs.speechSynthesisVoice,
...(cs.speechSynthesisLabel && {
label: cs.speechSynthesisLabel
})
};
this.logger.info({opts: this.gatherOpts}, 'Config: enabling ttsStream');
cs.enableBackgroundTtsStream(this.sayOpts);
} else if (!this.ttsStream.enable) {
this.logger.info('Config: disabling ttsStream');
cs.disableTtsStream();
}
}

async kill(cs) {
Expand Down
1 change: 1 addition & 0 deletions lib/tasks/gather.js
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ class TaskGather extends SttTask {
this._fillerNoiseOn = false; // in a race, if we just started audio it may sneak through here
this.ep.api('uuid_break', this.ep.uuid)
.catch((err) => this.logger.info(err, 'Error killing audio'));
cs.clearTtsStream();
}
return;
}
Expand Down
Loading
Loading