Skip to content

Commit

Permalink
Changes done for v2.2.7.
Browse files Browse the repository at this point in the history
  • Loading branch information
nysenthil committed Feb 3, 2021
1 parent 360e721 commit 0b1cf50
Show file tree
Hide file tree
Showing 14 changed files with 449 additions and 45 deletions.
6 changes: 6 additions & 0 deletions com.ibm.streamsx.sttgateway/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changes

## v2.2.7
* Feb/02/2021
* Modified the IBMVoiceGatewaySource operator to handle the missing VGW start session message and/or missing speech data packet.
* Added an optional job submission time parameter numberOfEocsNeededForVoiceCallCompletion for users to address the condition mentioned in the previous bullet.
* Added log messages to notify when the condition mentioned in the previous bullet occurs.

## v2.2.6
* Jan/16/2021
* Made the End Of Call Signal (EOCS) sending by the IBMVoiceGatewaySource operator to be more reliable and consistent.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
==============================================
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2019, 2020
# Copyright IBM Corp. 2019, 2021
==============================================
*/

/*
============================================================
First created on: Sep/20/2019
Last modified on: Jan/15/2021
Last modified on: Feb/02/2021
Please refer to the sttgateway-tech-brief.txt file in the
top-level directory of this toolkit to read about
Expand Down Expand Up @@ -684,15 +684,8 @@ Tree query(Tree& pt, typename Tree::path_type path) {
template <typename EndpointType>
void MY_OPERATOR::on_message(EndpointType* s, websocketpp::connection_hdl hdl,
typename EndpointType::message_ptr msg) {
bool vgwSessionLoggingDone = false;

if (vgwSessionLoggingNeeded == true) {
SPLAPPTRC(L_ERROR, "on_message called with hdl: " << hdl.lock().get()
<< " with a message size of: " << msg->get_payload().size() << " bytes.", "on_message");
vgwSessionLoggingDone = true;
}

if (vgwSessionLoggingDone == false) {
SPLAPPTRC(L_INFO, "on_message called with hdl: " << hdl.lock().get()
<< " with a message size of: " << msg->get_payload().size() << " bytes.", "on_message");
}
Expand Down Expand Up @@ -1295,6 +1288,15 @@ void MY_OPERATOR::on_message(EndpointType* s, websocketpp::connection_hdl hdl,
// have started arriving for this particular VGW session id.
con_metadata.vgwVoiceChannelNumber = 2;
}

if (vgwSessionLoggingNeeded == true) {
SPLAPPTRC(L_ERROR, "Operator " << operatorPhysicalName <<
"-->Channel " << boost::to_string(udpChannelNumber) <<
"-->X2 Received the very first speech data packet. vgwSessionId=" <<
con_metadata.vgwSessionId << ", vgwVoiceChannelNumber=" <<
con_metadata.vgwVoiceChannelNumber <<
", vgwIsCaller=" << con_metadata.vgwIsCaller, "on_message");
}
} // End of if (con_metadata.speechPacketsReceivedCnt == 1)

// Update it in the client connections map.
Expand Down Expand Up @@ -1365,7 +1367,7 @@ void MY_OPERATOR::on_message(EndpointType* s, websocketpp::connection_hdl hdl,
submit(oTuple, 0);

if (vgwSessionLoggingNeeded == true) {
SPLAPPTRC(L_ERROR, "Operator " << operatorPhysicalName <<
SPLAPPTRC(L_INFO, "Operator " << operatorPhysicalName <<
"-->Channel " << boost::to_string(udpChannelNumber) <<
"-->X2 Received speech data from the vgwSessionId " <<
con_metadata.vgwSessionId <<
Expand Down Expand Up @@ -1484,7 +1486,15 @@ void MY_OPERATOR::on_close(websocketpp::connection_hdl hdl) {
bool vgwSessionIdFoundInMap =
vgw_session_id_map.find(con_metadata.vgwSessionId) != vgw_session_id_map.end();

if (vgwSessionIdFoundInMap == true) {
// For the correct call clean-up operation, this operator must have received
// call start session messages for both the voice channels in a call and it
// must have also received speech data bytes from both the voice channels
// (either silence fillers or real speech data). In that case,
// vgwVoiceChannelNumber must correctly be set to either 1 or 2.
// If not, that is going to cause trouble for the downstream operator logic
// in properly releasing the speech engines assigned for a given call.
// Please see the log message that will get written in the else block below.
if (vgwSessionIdFoundInMap == true && con_metadata.vgwVoiceChannelNumber > 0) {
// Send the "End of Voice Call" signal now for this
// vgwSessionId_vgwVoiceChannelNumber combo.
OPort1Type oTuple;
Expand Down Expand Up @@ -1521,6 +1531,25 @@ void MY_OPERATOR::on_close(websocketpp::connection_hdl hdl) {
nOutputTuplesSentMetric->setValueNoLock(nOutputTuplesSent);
}
} // End of if (vgw_session_id_map[con_metadata.vgwSessionId] <= 0)
} else {
SPLAPPTRC(L_ERROR, "Operator " << operatorPhysicalName <<
"-->Channel " << boost::to_string(udpChannelNumber) <<
"-->Possible critical error: Received on_close for connection handle " <<
hdl.lock().get() <<
" with its VGW session id not found in the vgw_session_id_map. " <<
"Reason for this could be either the VGW never sent a start session " <<
"message for one of the voice channels or it sent a start session message "
"followed by no binary speech data for that voice channel. This will " <<
"cause less than the required number of EOCS tuples to be sent to the " <<
"downstream operator. Unless the downstream operator is configured to " <<
"handle the reduced number of EOCS tuples for a given VGW session id, " <<
"it may eventually end up with all the speech processors and the " <<
"speech engines to be in unreleased i.e. unavailable state for handling " <<
"any new voice calls. vgwVoiceChannelNumber must be either 1 or 2. " <<
"If it is not, then it indicates the situation described above. " <<
"vgwSessionId=" << con_metadata.vgwSessionId <<
", vgwVoiceChannelNumber=" <<
con_metadata.vgwVoiceChannelNumber << ".", "on_close");
} // End of if (vgwSessionIdFoundInMap == true)

// This entire if block is a carry-over from the "stop" message processing section in the
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.sttgateway/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

**Note:** This toolkit requires c++11 support.
</description>
<version>2.2.6</version>
<version>2.2.7</version>
<requiredProductVersion>4.2.1.6</requiredProductVersion>
</identity>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
==============================================
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2018, 2020
# Copyright IBM Corp. 2018, 2021
==============================================
*/

/*
==============================================
First created on: Nov/24/2020
Last modified on: Nov/27/2020
Last modified on: Feb/02/2021

A) What does this example application do?
--------------------------------------
Expand Down Expand Up @@ -331,6 +331,20 @@ public composite VgwDataRouter {
// Is IBM Voice Gateway message exchange logging needed for debugging?
expression<boolean> $vgwSessionLoggingNeeded :
(boolean)getSubmissionTimeValue("vgwSessionLoggingNeeded", "false");
// Under some cicumstances, if the IBMVoiceGatewaySource operator sends
// only one EOCS (End Of Call Sigmal) tuple instead of two as required for
// the two voice channels, that may eventually cause the application logic
// below not be able to release the speech processor jobs properly at the end of
// a voice call for a given VGW session id. We have seen it in certain
// customer environments. To avoid that condition, such customers can
// configure this application to treat the very first EOCS tuple as
// sufficient to treat a voice call as a "completed call". In that case,
// it will simply ignore if and when a second EOCS tuple arrives.
// This feature can be activated to compensate for the situation described
// above if it happens in some customer environments.
// (Senthil added this on Feb/01/2021).
expression<int32> $numberOfEocsNeededForVoiceCallCompletion :
(int32)getSubmissionTimeValue("numberOfEocsNeededForVoiceCallCompletion", "2");
//
// IBM Watson STT related submission time values are defined below.
expression<int32> $totalNumberOfSpeechProcessorJobs :(int32)
Expand Down Expand Up @@ -674,7 +688,8 @@ public composite VgwDataRouter {
boolean key1Exists = has(_vgwSessionVgwVoiceChannelNumberCompletedMap, key1);
boolean key2Exists = has(_vgwSessionVgwVoiceChannelNumberCompletedMap, key2);

if (key1Exists == true && key2Exists == true) {
if ($numberOfEocsNeededForVoiceCallCompletion == 2 &&
(key1Exists == true && key2Exists == true)) {
// Since the voice call for this VGW session id has ended completely,
// we can also release the speech processor id assigned for this call so that
// it can be repurposed for handling any new future calls.
Expand All @@ -690,16 +705,54 @@ public composite VgwDataRouter {
removeM(_vgwSessionIdToSpeechProcessorMap, EOCS.vgwSessionId);
removeM(_vgwSessionVgwVoiceChannelNumberCompletedMap, key1);
removeM(_vgwSessionVgwVoiceChannelNumberCompletedMap, key2);
appTrc(Trace.error, "A call with vgwSessionId=" + EOCS.vgwSessionId +
appTrc(Trace.error, "i) A call with vgwSessionId=" + EOCS.vgwSessionId +
" ended and its speech processor id " + (rstring)speechProcessorId +
" got released.");
} else if ($numberOfEocsNeededForVoiceCallCompletion == 1 &&
(key1Exists == true || key2Exists == true)) {
// If the user configured this application to handle
// a single EOCS as sufficient to consider a voice call
// completed for a given VGW session id, we will use this
// block of code. Please refer to the constant i.e. expression
// declaration section above to read the commentary about this idea.
//
// Since the voice call for this VGW session id has ended completely,
// we can also release the speech processor id assigned for this call so that
// it can be repurposed for handling any new future calls.
// We can go ahead and release the speech processor id by adding it back to
// the speech processor status list.
// Let us decrement the given speech processor's current call handling count.
// It is a zero based indexed array. Hence, we have to subtract by 1 to get the
// current index in that array.
_speechProcessorStatusList[speechProcessorId-1] =
_speechProcessorStatusList[speechProcessorId-1] - 1;

// We can now do the clean-up in our state variables.
removeM(_vgwSessionIdToSpeechProcessorMap, EOCS.vgwSessionId);

if(key1Exists == true) {
removeM(_vgwSessionVgwVoiceChannelNumberCompletedMap, key1);
}

if(key2Exists == true) {
removeM(_vgwSessionVgwVoiceChannelNumberCompletedMap, key2);
}

appTrc(Trace.error, "ii) A call with vgwSessionId=" + EOCS.vgwSessionId +
" ended and its speech processor id " + (rstring)speechProcessorId +
" got released.");
}
} else {
appTrc(Trace.error,
"_YYYYY No speech processor id is available at this time for the " +
"vgwSessionId_vgwVoiceChannelNumber: " + _key +
" We are not going to process the currently received EOCS " +
" of this speaker in this voice call. This is serious error.");
// Flag an error only when the user configured for two
// EOCS tuples to be received for considering a voice call
// as complted.
if ($numberOfEocsNeededForVoiceCallCompletion == 2) {
appTrc(Trace.error,
"_YYYYY No speech processor id is available at this time for the " +
"vgwSessionId_vgwVoiceChannelNumber: " + _key +
" We are not going to process the currently received EOCS " +
" of this speaker in this voice call. This is a serious error.");
}
}
} // End of onTuple EOCS.

Expand Down
2 changes: 1 addition & 1 deletion samples/VgwDataRouter/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<info:identity>
<info:name>VgwDataRouter</info:name>
<info:description>Example that shows how to route VGW speech data to different Speech processor jobs</info:description>
<info:version>1.0.0</info:version>
<info:version>1.0.1</info:version>
<info:requiredProductVersion>4.2.1.6</info:requiredProductVersion>
</info:identity>
<info:dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
==============================================
# Licensed Materials - Property of IBM
# Copyright IBM Corp. 2018, 2020
# Copyright IBM Corp. 2018, 2021
==============================================
*/

/*
==============================================
First created on: Nov/24/2020
Last modified on: Nov/28/2020
Last modified on: Feb/02/2021

IMPORTANT NOTE
--------------
Expand Down Expand Up @@ -372,7 +372,21 @@ public composite VgwDataRouterToWatsonS2T {
(boolean)getSubmissionTimeValue("wsConnectionLoggingNeeded", "false");
// Is client message exchange logging needed for debugging?
expression<boolean> $wsClientSessionLoggingNeeded :
(boolean)getSubmissionTimeValue("wsClientSessionLoggingNeeded", "false");
(boolean)getSubmissionTimeValue("wsClientSessionLoggingNeeded", "false");
// Under some cicumstances, if the IBMVoiceGatewaySource operator sends
// only one EOCS (End Of Call Sigmal) tuple instead of two as required for
// the two voice channels, that may eventually cause the application logic
// below not be able to release the speech engines properly at the end of
// a voice call for a given VGW session id. We have seen it in certain
// customer environments. To avoid that condition, such customers can
// configure this application to treat the very first EOCS tuple as
// sufficient to treat a voice call as a "completed call". In that case,
// it will simply ignore if and when a second EOCS tuple arrives.
// This feature can be activated to compensate for the situation described
// above if it happens in some customer environments.
// (Senthil added this on Feb/01/2021).
expression<int32> $numberOfEocsNeededForVoiceCallCompletion :
(int32)getSubmissionTimeValue("numberOfEocsNeededForVoiceCallCompletion", "2");
//
// IBM Watson S2T related submission time values are defined below.
expression<int32> $numberOfS2TEngines :(int32)
Expand Down Expand Up @@ -858,6 +872,8 @@ public composite VgwDataRouterToWatsonS2T {
" started at " + ctime(getTimestamp()) + ".", fileHandle, err);
fclose(fileHandle, err);
}

appTrc(Trace.error, "A new voice call has started. vgwSessionId=" + BSD.vgwSessionId);
}

// Insert into the state map for future reference.
Expand Down Expand Up @@ -935,6 +951,53 @@ public composite VgwDataRouterToWatsonS2T {
insertM(_vgwSessionToCompletedUdpChannelMap, _key, _oTuple.speechEngineId);
}

// Senthil added this if block on Feb/01/2020.
if($numberOfEocsNeededForVoiceCallCompletion == 1) {
// If the user configured this application to handle
// only one EOCS to treat a voice call as completed, then we
// will try to clean-up the other voice channel if it exists.
mutable int32 otherVgwVoiceChannelNumber = 1;

if(EOCS.vgwVoiceChannelNumber == 1) {
otherVgwVoiceChannelNumber = 2;
}

// Get the sessionId + channelNumber combo string.
_key = EOCS.vgwSessionId + "_" + (rstring)otherVgwVoiceChannelNumber;

if (has(_vgwSessionIdToUdpChannelMap, _key) == true) {
// Let us send an empty blob to the WatsonS2T operator to indicate that
// this speaker of a given voice call is done.
_oTuple = (BinarySpeech_t){};
// Copy the three input tuple attributes that must
// match with that of the outgoing tuple.
assignFrom(_oTuple, EOCS);
// Override the following two attributes to reflect the other voice channel.
// Flip this attribute value.
if(_oTuple.isCustomerSpeechData == true) {
_oTuple.isCustomerSpeechData = false;
} else {
_oTuple.isCustomerSpeechData = true;
}

_oTuple.vgwVoiceChannelNumber = otherVgwVoiceChannelNumber;

// Assign the S2T engine id where this voice channel was
// getting processed until now.
_oTuple.speechEngineId = _vgwSessionIdToUdpChannelMap[_key];
// We have to send this tuple to the result processor as well for
// the call recording logic to work correctly.
_oTuple.speechResultProcessorId =
_vgwSessionToResultProcessorChannelMap[EOCS.vgwSessionId];
submit(_oTuple, BSDF);
// We are now done with this vgwSessionId_vgwVoiceChannelNumber combo.
removeM(_vgwSessionIdToUdpChannelMap, _key);
// Add the S2T engine id to this call completed map to be released later in the
// following if block only after receiving EOCS for both the voice channels of this call.
insertM(_vgwSessionToCompletedUdpChannelMap, _key, _oTuple.speechEngineId);
}
}

// Since this voice call is ending, let us release the S2T result processor
// instance that was allocated above for this voice call.
if (has(_vgwSessionToResultProcessorChannelMap,
Expand All @@ -951,8 +1014,22 @@ public composite VgwDataRouterToWatsonS2T {
// Remove the result processor id only if the EOCS signal
// was sent for both of the voice channels. That must first
// happen before we can release the result processor id.
if (has(_vgwSessionIdToUdpChannelMap, key1) == false &&
has(_vgwSessionIdToUdpChannelMap, key2) == false) {
//
// This if condition was changed by Senthil on
// Feb/01/2021 for the following reason.
// If the user configured this application to handle
// a single EOCS as sufficient to consider a voice call
// completed for a given VGW session id, we will use the
// second || i.e. OR condition. Please refer to the
// constant i.e. expression declaration section above to
// read the commentary about this idea.
//
if (($numberOfEocsNeededForVoiceCallCompletion == 2 &&
(has(_vgwSessionIdToUdpChannelMap, key1) == false &&
has(_vgwSessionIdToUdpChannelMap, key2) == false)) ||
($numberOfEocsNeededForVoiceCallCompletion == 1 &&
(has(_vgwSessionIdToUdpChannelMap, key1) == false ||
has(_vgwSessionIdToUdpChannelMap, key2) == false))) {
removeM(_vgwSessionToResultProcessorChannelMap, EOCS.vgwSessionId);

// Since the voice call for this VGW session id has ended completely,
Expand Down Expand Up @@ -985,6 +1062,8 @@ public composite VgwDataRouterToWatsonS2T {
" ended at " + ctime(getTimestamp()) + ".", fileHandle, err);
fclose(fileHandle, err);
}

appTrc(Trace.error, "An ongoing voice call has completed. vgwSessionId=" + EOCS.vgwSessionId);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion samples/VgwDataRouterToWatsonS2T/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<info:identity>
<info:name>VgwDataRouterToWatsonS2T</info:name>
<info:description>Example that showcases embedded S2T in IBM Streams</info:description>
<info:version>1.0.0</info:version>
<info:version>1.0.1</info:version>
<info:requiredProductVersion>4.2.1.6</info:requiredProductVersion>
</info:identity>
<info:dependencies>
Expand Down
Loading

0 comments on commit 0b1cf50

Please sign in to comment.