Skip to content

Commit

Permalink
Changes done for v2.3.4.
Browse files Browse the repository at this point in the history
  • Loading branch information
nysenthil committed May 12, 2022
1 parent b5a07b3 commit b8fe2d0
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 38 deletions.
5 changes: 5 additions & 0 deletions com.ibm.streamsx.sttgateway/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changes

## v2.3.4
* May/11/2022
* Added code and logic necessary to handle certain error situations where the SpeechProcessor will send much more than the required two transcriptionCompleted signals.
* Added code to make the call-started and call-completed files' open mode as append/update instead of write/update. This will allow us to get more details about the same VgwSessionId arriving as two different back-to-back calls.

## v2.3.3
* May/10/2022
* Added code and logic necessary to scale the VgwDataRouter to process more voice calls with reduced use of CPU cycles.
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.sttgateway/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

**Note:** This toolkit requires c++11 support.
</description>
<version>2.3.3</version>
<version>2.3.4</version>
<requiredProductVersion>4.2.1.6</requiredProductVersion>
</identity>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/*
==============================================
First created on: Nov/27/2020
Last modified on: May/10/2022
Last modified on: May/11/2022

IMPORTANT NOTE
--------------
Expand Down Expand Up @@ -1015,13 +1015,15 @@ public composite VgwDataRouterToWatsonSTT {

rstring socsFileName = dataDirectory() + "/" +
BSD.vgwSessionId + "-call-started.txt";
uint64 fileHandle = fopen (socsFileName, "w+", err);
uint64 fileHandle = fopen (socsFileName, "a+", err);

if(err == 0) {
fwriteString ("VGW call session id " + BSD.vgwSessionId +
" started at " + BSD.callStartDateTime + ".\n",
fileHandle, err);
fwriteString("VGW voice channel number=" +
fwriteString("Call sequence number=" +
(rstring)BSD.callSequenceNumber +
", VGW voice channel number=" +
(rstring)BSD.vgwVoiceChannelNumber +
", Speech processor id=" +
(rstring)$idOfThisSpeechProcessor +
Expand All @@ -1032,7 +1034,9 @@ public composite VgwDataRouterToWatsonSTT {
fclose(fileHandle, err);
}

appTrc(Trace.error, "A new voice call has started. vgwSessionId=" + BSD.vgwSessionId);
appTrc(Trace.error, "A new voice call has started. vgwSessionId=" +
BSD.vgwSessionId + ", callSequenceNumber=" +
(rstring)BSD.callSequenceNumber);
} else if (has(_vgwSessionIdToUdpChannelMap, key1) == false ||
has(_vgwSessionIdToUdpChannelMap, key2) == false) {
// This means, one of the two voice channels in a
Expand All @@ -1047,7 +1051,9 @@ public composite VgwDataRouterToWatsonSTT {
uint64 fileHandle = fopen (socsFileName, "a+", err);

if(err == 0) {
fwriteString("VGW voice channel number=" +
fwriteString("Call sequence number=" +
(rstring)BSD.callSequenceNumber +
", VGW voice channel number=" +
(rstring)BSD.vgwVoiceChannelNumber +
", Speech processor id=" +
(rstring)$idOfThisSpeechProcessor +
Expand All @@ -1066,11 +1072,14 @@ public composite VgwDataRouterToWatsonSTT {
} // End of if (has(_vgwSessionIdToUdpChannelMap, _key)

appTrc(Trace.debug, "vgwSessionId=" + BSD.vgwSessionId +
", callSequenceNumber=" + (rstring)BSD.callSequenceNumber +
", isCustomerSpeechData=" + (rstring)BSD.isCustomerSpeechData +
", vgwVoiceChannelNumber=" + (rstring)BSD.vgwVoiceChannelNumber +
", speechDataFragmentCnt=" + (rstring)BSD.speechDataFragmentCnt +
", totalSpeechDataBytesReceived=" +
(rstring)BSD.totalSpeechDataBytesReceived +
", speechProcessorId=" +
(rstring)$idOfThisSpeechProcessor +
", speechEngineId=" + (rstring)BSD.speechEngineId +
", speechResultProcessorId=" + (rstring)BSD.speechResultProcessorId);
// Set the call start date time values to the tuple attributes.
Expand All @@ -1086,6 +1095,7 @@ public composite VgwDataRouterToWatsonSTT {
appTrc(Trace.error, "Received an EOCS at the speech processor id " +
(rstring)$idOfThisSpeechProcessor +
". vgwSessionId=" + BSD.vgwSessionId +
", callSequenceNumber=" + (rstring)BSD.callSequenceNumber +
", voiceChannelNumber=" + (rstring)BSD.vgwVoiceChannelNumber);
//
// Process the end of voice call signal.
Expand Down Expand Up @@ -1246,15 +1256,18 @@ public composite VgwDataRouterToWatsonSTT {
mutable int32 err = 0ul;
rstring eocsFileName = dataDirectory() + "/" +
BSD.vgwSessionId + "-call-completed.txt";
uint64 fileHandle = fopen (eocsFileName, "w+", err);
uint64 fileHandle = fopen (eocsFileName, "a+", err);

if(err == 0) {
fwriteString ("VGW call session id " + BSD.vgwSessionId +
" ended at " + ctime(getTimestamp()) + ".", fileHandle, err);
fwriteString ("VGW call session id " + BSD.vgwSessionId +
", Call sequence number=" + (rstring)BSD.callSequenceNumber +
" ended at " + ctime(getTimestamp()) + ".\n", fileHandle, err);
fclose(fileHandle, err);
}

appTrc(Trace.error, "An ongoing voice call has completed. vgwSessionId=" + BSD.vgwSessionId);
appTrc(Trace.error, "An ongoing voice call has completed. vgwSessionId=" +
BSD.vgwSessionId + ", callSequenceNumber=" +
(rstring)BSD.callSequenceNumber);
}
}
} // End of if(BSD.endOfCallSignal == false)
Expand Down Expand Up @@ -1702,9 +1715,60 @@ public composite STTResultProcessor(input MyTranscriptionResult, BinarySpeechDat
mutable rstring _ciscoGuid = "";
mutable rstring _callStartDateTime = "";
mutable int64 _callStartTimeInEpochSeconds = 0l;
mutable boolean _sttErrorObserved = false;
}

onTuple MTR: {
// WatsonSTT operator invocation done above is susceptible for
// encountering errors while attempting to transcribe a
// speech payload due to any critical errors happening in
// the backend STT infrastructure. Such errors can include
// "Unable to transcode the Mulaw speech data" (OR)
// "Session timed out with the STT backend" and so on.
// When that situation occurs, WatsonSTT operator will
// set a descriptive error message in the sttErrorMessage tuple
// attribute that will arrive here. After encountering such
// critical error, WatsonSTT operator will also send an
// immediate next tuple by setting the transcriptionCompleted
// tuple attribute to true.
// We have to take care of such error conditions here by
// tracking two such consecutive tuples (a tuple with its
// sttErrorMessage attribute set to a non-empty string and an
// immediate next tuple with its transcriptionCompleted attribute
// set to true) and then by not sending that intermittent
// transcription completed signal to the downstream application logic.
//
if (MTR.sttErrorMessage != "") {
// This tuple indicates that a critical error occurred in
// the upstream WatsonSTT operator. Let us record this
// condition in our state variable and look out for the
// immediate next tuple that will arrive here with its
// transcriptionCompleted attribute set to true.
_sttErrorObserved = true;

// We will let this tuple carrying the error message propagate
// to the downstream application logic so that it can get written
// to the full transcription result file.
}

// Look out for the immediate next tuple that arrives here after
// we observed any STT backend critical error.
if (_sttErrorObserved == true && MTR.transcriptionCompleted == true) {
// This tuple is the one that comes right after a tuple that
// carried an STT critical error message. We don't have to propagate
// this tuple to downstream logic as this is not the real end of a
// voice call as it is only indicating an intermittent error condition.
// Reset the STT error observed state variable.
_sttErrorObserved = false;
return;
}

// If there is no STT error message present in the received tuple,
// we will safely reset this particular state variable.
if (MTR.sttErrorMessage == "") {
_sttErrorObserved = false;
}

// In some cases, it was observed during testing that the
// call start time details arrive here with empty and 0 values.
// To compensate for that, we are going to keep a local copy
Expand Down
4 changes: 2 additions & 2 deletions samples/VgwDataRouterToWatsonSTT/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
<info:identity>
<info:name>VgwDataRouterToWatsonSTT</info:name>
<info:description>Example that showcases STT on Cloud and STT on CP4D</info:description>
<info:version>1.0.6</info:version>
<info:version>1.0.7</info:version>
<info:requiredProductVersion>4.2.1.6</info:requiredProductVersion>
</info:identity>
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.sttgateway</common:name>
<common:version>[2.3.3,7.0.0]</common:version>
<common:version>[2.3.4,7.0.0]</common:version>
</info:toolkit>
<info:toolkit>
<common:name>com.ibm.streamsx.json</common:name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/*
==============================================
First created on: Nov/27/2020
Last modified on: May/10/2022
Last modified on: May/11/2022

IMPORTANT NOTE
--------------
Expand Down Expand Up @@ -932,13 +932,15 @@ public composite VgwDataRouterToWatsonSTT {

rstring socsFileName = dataDirectory() + "/" +
BSD.vgwSessionId + "-call-started.txt";
uint64 fileHandle = fopen (socsFileName, "w+", err);
uint64 fileHandle = fopen (socsFileName, "a+", err);

if(err == 0) {
fwriteString ("VGW call session id " + BSD.vgwSessionId +
" started at " + BSD.callStartDateTime + ".\n",
fileHandle, err);
fwriteString("VGW voice channel number=" +
fwriteString("Call sequence number=" +
(rstring)BSD.callSequenceNumber +
", VGW voice channel number=" +
(rstring)BSD.vgwVoiceChannelNumber +
", Speech processor id=" +
(rstring)$idOfThisSpeechProcessor +
Expand All @@ -949,7 +951,9 @@ public composite VgwDataRouterToWatsonSTT {
fclose(fileHandle, err);
}

appTrc(Trace.error, "A new voice call has started. vgwSessionId=" + BSD.vgwSessionId);
appTrc(Trace.error, "A new voice call has started. vgwSessionId=" +
BSD.vgwSessionId + ", callSequenceNumber=" +
(rstring)BSD.callSequenceNumber);
} else if (has(_vgwSessionIdToUdpChannelMap, key1) == false ||
has(_vgwSessionIdToUdpChannelMap, key2) == false) {
// This means, one of the two voice channels in a
Expand All @@ -964,7 +968,9 @@ public composite VgwDataRouterToWatsonSTT {
uint64 fileHandle = fopen (socsFileName, "a+", err);

if(err == 0) {
fwriteString("VGW voice channel number=" +
fwriteString("Call sequence number=" +
(rstring)BSD.callSequenceNumber +
", VGW voice channel number=" +
(rstring)BSD.vgwVoiceChannelNumber +
", Speech processor id=" +
(rstring)$idOfThisSpeechProcessor +
Expand All @@ -983,11 +989,14 @@ public composite VgwDataRouterToWatsonSTT {
} // End of if (has(_vgwSessionIdToUdpChannelMap, _key)

appTrc(Trace.debug, "vgwSessionId=" + BSD.vgwSessionId +
", callSequenceNumber=" + (rstring)BSD.callSequenceNumber +
", isCustomerSpeechData=" + (rstring)BSD.isCustomerSpeechData +
", vgwVoiceChannelNumber=" + (rstring)BSD.vgwVoiceChannelNumber +
", speechDataFragmentCnt=" + (rstring)BSD.speechDataFragmentCnt +
", totalSpeechDataBytesReceived=" +
(rstring)BSD.totalSpeechDataBytesReceived +
", speechProcessorId=" +
(rstring)$idOfThisSpeechProcessor +
", speechEngineId=" + (rstring)BSD.speechEngineId +
", speechResultProcessorId=" + (rstring)BSD.speechResultProcessorId);
// Set the call start date time values to the tuple attributes.
Expand All @@ -1003,6 +1012,7 @@ public composite VgwDataRouterToWatsonSTT {
appTrc(Trace.error, "Received an EOCS at the speech processor id " +
(rstring)$idOfThisSpeechProcessor +
". vgwSessionId=" + BSD.vgwSessionId +
", callSequenceNumber=" + (rstring)BSD.callSequenceNumber +
", voiceChannelNumber=" + (rstring)BSD.vgwVoiceChannelNumber);
//
// Process the end of voice call signal.
Expand Down Expand Up @@ -1158,15 +1168,18 @@ public composite VgwDataRouterToWatsonSTT {
mutable int32 err = 0ul;
rstring eocsFileName = dataDirectory() + "/" +
BSD.vgwSessionId + "-call-completed.txt";
uint64 fileHandle = fopen (eocsFileName, "w+", err);
uint64 fileHandle = fopen (eocsFileName, "a+", err);

if(err == 0) {
fwriteString ("VGW call session id " + BSD.vgwSessionId +
" ended at " + ctime(getTimestamp()) + ".", fileHandle, err);
fwriteString ("VGW call session id " + BSD.vgwSessionId +
", Call sequence number=" + (rstring)BSD.callSequenceNumber +
" ended at " + ctime(getTimestamp()) + ".\n", fileHandle, err);
fclose(fileHandle, err);
}

appTrc(Trace.error, "An ongoing voice call has completed. vgwSessionId=" + BSD.vgwSessionId);
appTrc(Trace.error, "An ongoing voice call has completed. vgwSessionId=" +
BSD.vgwSessionId + ", callSequenceNumber=" +
(rstring)BSD.callSequenceNumber);
}
}
} // End of if(BSD.endOfCallSignal == false)
Expand Down Expand Up @@ -1525,9 +1538,60 @@ public composite STTResultProcessor(input MyTranscriptionResult) {
mutable rstring _ciscoGuid = "";
mutable rstring _callStartDateTime = "";
mutable int64 _callStartTimeInEpochSeconds = 0l;
mutable boolean _sttErrorObserved = false;
}

onTuple MTR: {
// WatsonSTT operator invocation done above is susceptible for
// encountering errors while attempting to transcribe a
// speech payload due to any critical errors happening in
// the backend STT infrastructure. Such errors can include
// "Unable to transcode the Mulaw speech data" (OR)
// "Session timed out with the STT backend" and so on.
// When that situation occurs, WatsonSTT operator will
// set a descriptive error message in the sttErrorMessage tuple
// attribute that will arrive here. After encountering such
// critical error, WatsonSTT operator will also send an
// immediate next tuple by setting the transcriptionCompleted
// tuple attribute to true.
// We have to take care of such error conditions here by
// tracking two such consecutive tuples (a tuple with its
// sttErrorMessage attribute set to a non-empty string and an
// immediate next tuple with its transcriptionCompleted attribute
// set to true) and then by not sending that intermittent
// transcription completed signal to the downstream application logic.
//
if (MTR.sttErrorMessage != "") {
// This tuple indicates that a critical error occurred in
// the upstream WatsonSTT operator. Let us record this
// condition in our state variable and look out for the
// immediate next tuple that will arrive here with its
// transcriptionCompleted attribute set to true.
_sttErrorObserved = true;

// We will let this tuple carrying the error message propagate
// to the downstream application logic so that it can get written
// to the full transcription result file.
}

// Look out for the immediate next tuple that arrives here after
// we observed any STT backend critical error.
if (_sttErrorObserved == true && MTR.transcriptionCompleted == true) {
// This tuple is the one that comes right after a tuple that
// carried an STT critical error message. We don't have to propagate
// this tuple to downstream logic as this is not the real end of a
// voice call as it is only indicating an intermittent error condition.
// Reset the STT error observed state variable.
_sttErrorObserved = false;
return;
}

// If there is no STT error message present in the received tuple,
// we will safely reset this particular state variable.
if (MTR.sttErrorMessage == "") {
_sttErrorObserved = false;
}

// In some cases, it was observed during testing that the
// call start time details arrive here with empty and 0 values.
// To compensate for that, we are going to keep a local copy
Expand Down
4 changes: 2 additions & 2 deletions samples/VgwDataRouterToWatsonSTTMini/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
<info:identity>
<info:name>VgwDataRouterToWatsonSTTMini</info:name>
<info:description>Example that showcases STT on Cloud and STT on CP4D</info:description>
<info:version>1.0.6</info:version>
<info:version>1.0.7</info:version>
<info:requiredProductVersion>4.2.1.6</info:requiredProductVersion>
</info:identity>
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.sttgateway</common:name>
<common:version>[2.3.3,7.0.0]</common:version>
<common:version>[2.3.4,7.0.0]</common:version>
</info:toolkit>
<info:toolkit>
<common:name>com.ibm.streamsx.websocket</common:name>
Expand Down
Loading

0 comments on commit b8fe2d0

Please sign in to comment.