Skip to content

Commit

Permalink
fix(transcription): Sends transcription over xmpp in separate thread. (
Browse files Browse the repository at this point in the history
…#570)

* fix(transcription): Sends transcription over xmpp in separate thread.

* squash: Xmpp packet queue per conference instance.

* squash: One thread pool and xmpp and xmppSend queues per instance.
  • Loading branch information
damencho authored Oct 31, 2024
1 parent 6b25725 commit 8456fe8
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 151 deletions.
106 changes: 104 additions & 2 deletions src/main/java/org/jitsi/jigasi/JvbConference.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;

import static net.java.sip.communicator.service.protocol.event.LocalUserChatRoomPresenceChangeEvent.*;
import static org.jivesoftware.smack.packet.StanzaError.Condition.*;
Expand Down Expand Up @@ -185,11 +186,13 @@ public class JvbConference
*/
private String meetingId;

private static ExecutorService threadPool = Util.createNewThreadPool("xmpp-executor-pool");

/**
* A queue used to offload xmpp execution in a new thread to avoid blocking xmpp threads,
* by executing the tasks in new thread
*/
public static final PacketQueue<Runnable> xmppInvokeQueue = new PacketQueue<>(
public final PacketQueue<Runnable> xmppInvokeQueue = new PacketQueue<>(
Integer.MAX_VALUE,
false,
"xmpp-invoke-queue",
Expand All @@ -208,7 +211,32 @@ public class JvbConference
return false;
}
},
Util.createNewThreadPool("xmpp-executor-pool")
threadPool
);

/**
* A queue used for sending xmpp messages.
*/
public final PacketQueue<Runnable> xmppSendQueue = new PacketQueue<>(
Integer.MAX_VALUE,
false,
"xmpp-send-queue",
r -> {
// do process and try
try
{
r.run();

return true;
}
catch (Throwable e)
{
logger.error("Error processing xmpp queue item", e);

return false;
}
},
threadPool
);

/**
Expand Down Expand Up @@ -2299,6 +2327,80 @@ private void processVisitorsJson(String json)
}
}

/**
* Send a message to the muc room
*
* @param messageString the message to send
*/
public void sendMessageToRoom(String messageString)
{
xmppSendQueue.add(() -> sendMessageToRoomInternal(messageString));
}

public void sendMessageToRoomInternal(String messageString)
{
if (isInTheRoom())
{
logger.error(this.callContext + " Cannot send message as chatRoom is null");
return;
}

try
{
this.mucRoom.sendMessage(this.mucRoom.createMessage(messageString));
if (logger.isTraceEnabled())
{
logger.trace(this.callContext + " Sending message: \"" + messageString + "\"");
}
}
catch (OperationFailedException e)
{
logger.warn(this.callContext + " Failed to send message " + messageString, e);
}
}

/**
* Send a json-message to the muc room
*
* @param jsonMessage the json message to send
*/
public void sendJsonMessage(JSONObject jsonMessage)
{
xmppSendQueue.add(() -> sendJsonMessageInternal(jsonMessage));
}

private void sendJsonMessageInternal(JSONObject jsonMessage)
{
if (this.mucRoom == null)
{
logger.error(this.callContext + " Cannot send message as chatRoom is null");
return;
}

if (!isInTheRoom())
{
if (logger.isDebugEnabled())
{
logger.debug(this.callContext + " Skip sending message to room which we left!");
}
return;
}

String messageString = jsonMessage.toString();
try
{
((ChatRoomJabberImpl)this.mucRoom).sendJsonMessage(messageString);
if (logger.isTraceEnabled())
{
logger.trace(this.callContext + " Sending json message: \"" + messageString + "\"");
}
}
catch (OperationFailedException e)
{
logger.warn(this.callContext + " Failed to send json message " + messageString, e);
}
}

/**
* Threads handles the timeout for stopping the conference.
* For waiting for conference call invite sent by the focus or for waiting
Expand Down
49 changes: 11 additions & 38 deletions src/main/java/org/jitsi/jigasi/TranscriptionGatewaySession.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,6 @@ public class TranscriptionGatewaySession
*/
private TranscriptHandler handler;

/**
* The ChatRoom of the conference which is going to be transcribed.
* We will post messages to the ChatRoom to update users of progress
* of transcription
*/
private ChatRoom chatRoom = null;

/**
* The transcriber managing transcriptions of audio
*/
Expand Down Expand Up @@ -213,14 +206,13 @@ Exception onConferenceCallStarted(Call jvbConferenceCall)
// We can now safely set the Call connecting to the muc room
// and the ChatRoom of the muc room
this.jvbCall = jvbConferenceCall;
this.chatRoom = super.jvbConference.getJvbRoom();

// If the transcription service is not correctly configured, there is no
// point in continuing this session, so end it immediately
if (!service.isConfiguredProperly())
{
logger.warn("TranscriptionService is not properly configured");
sendMessageToRoom("Transcriber is not properly " +
super.jvbConference.sendMessageToRoom("Transcriber is not properly " +
"configured. Contact the service administrators and let them " +
"know! I will now leave.");
jvbConference.stop();
Expand Down Expand Up @@ -260,7 +252,7 @@ Exception onConferenceCallStarted(Call jvbConferenceCall)

if (welcomeMessage.length() > 0)
{
sendMessageToRoom(welcomeMessage.toString());
super.jvbConference.sendMessageToRoom(welcomeMessage.toString());
}

try
Expand Down Expand Up @@ -637,6 +629,13 @@ private List<ConferenceMember> getCurrentConferenceMembers()
*/
private List<ChatRoomMember> getCurrentChatRoomMembers()
{
if (super.jvbConference == null)
{
return null;
}

ChatRoom chatRoom = super.jvbConference.getJvbRoom();

return chatRoom == null ? null : chatRoom.getMembers();
}

Expand Down Expand Up @@ -710,40 +709,14 @@ private String getParticipantIdentifier(ConferenceMember conferenceMember)
return getConferenceMemberResourceID(conferenceMember);
}


/**
* Send a message to the muc room
*
* @param messageString the message to send
*/
private void sendMessageToRoom(String messageString)
{
if (chatRoom == null)
{
logger.error("Cannot send message as chatRoom is null");
return;
}

Message message = chatRoom.createMessage(messageString);
try
{
chatRoom.sendMessage(message);
logger.debug("Sending message: \"" + messageString + "\"");
}
catch (OperationFailedException e)
{
logger.warn("Failed to send message " + messageString, e);
}
}

/**
* Send a {@link TranscriptionResult} to the {@link ChatRoom}
*
* @param result the {@link TranscriptionResult} to send
*/
private void sendTranscriptionResultToRoom(TranscriptionResult result)
{
handler.publishTranscriptionResult(this.chatRoom, result);
handler.publishTranscriptionResult(super.jvbConference, result);
}

/**
Expand All @@ -753,7 +726,7 @@ private void sendTranscriptionResultToRoom(TranscriptionResult result)
*/
private void sendTranslationResultToRoom(TranslationResult result)
{
handler.publishTranslationResult(this.chatRoom, result);
handler.publishTranslationResult(super.jvbConference, result);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/jitsi/jigasi/lobby/Lobby.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected void leaveRoom()
@Override
public void invitationReceived(ChatRoomInvitationReceivedEvent evt)
{
JvbConference.xmppInvokeQueue.add(() -> invitationReceivedInternal(evt));
this.jvbConference.xmppInvokeQueue.add(() -> invitationReceivedInternal(evt));
}

private void invitationReceivedInternal(ChatRoomInvitationReceivedEvent chatRoomInvitationReceivedEvent)
Expand Down Expand Up @@ -255,7 +255,7 @@ private void notifyAccessGranted()
@Override
public void localUserPresenceChanged(LocalUserChatRoomPresenceChangeEvent evt)
{
JvbConference.xmppInvokeQueue.add(() -> localUserPresenceChangedInternal(evt));
this.jvbConference.xmppInvokeQueue.add(() -> localUserPresenceChangedInternal(evt));
}

private void localUserPresenceChangedInternal(LocalUserChatRoomPresenceChangeEvent evt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
*/
package org.jitsi.jigasi.transcription;

import net.java.sip.communicator.impl.protocol.jabber.*;
import net.java.sip.communicator.service.protocol.*;
import net.java.sip.communicator.service.protocol.Message;
import org.jitsi.jigasi.*;
import org.jitsi.service.libjitsi.*;
import org.jitsi.service.neomedia.*;
Expand Down Expand Up @@ -156,16 +153,6 @@ public abstract class AbstractTranscriptPublisher<T>
private static final Logger logger
= Logger.getLogger(AbstractTranscriptPublisher.class);

/**
* Aspect for successful upload of transcript
*/
private static final String DD_ASPECT_SUCCESS = "upload_success";

/**
* Aspect for failed upload of transcript
*/
private static final String DD_ASPECT_FAIL = "upload_fail";

/**
* Get a string which contains a time stamp and a random UUID, with an
* optional pre- and suffix attached.
Expand All @@ -187,76 +174,6 @@ protected static String generateHardToGuessTimeString(String prefix,
UUID.randomUUID(), suffix);
}

/**
* Send a message to the muc room
*
* @param chatRoom the chatroom to send the message to
* @param message the message to send
*/
protected void sendMessage(ChatRoom chatRoom, T message)
{
if (chatRoom == null)
{
logger.error("Cannot send message as chatRoom is null");
return;
}

String messageString = message.toString();
Message chatRoomMessage = chatRoom.createMessage(messageString);
try
{
chatRoom.sendMessage(chatRoomMessage);
if (logger.isTraceEnabled())
logger.trace("Sending message: \"" + messageString + "\"");
}
catch (OperationFailedException e)
{
logger.warn("Failed to send message " + messageString, e);
}
}

/**
* Send a json-message to the muc room
*
* @param chatRoom the chatroom to send the message to
* @param jsonMessage the json message to send
*/
protected void sendJsonMessage(ChatRoom chatRoom, T jsonMessage)
{
if (chatRoom == null)
{
logger.error("Cannot send message as chatRoom is null");
return;
}

if (!(chatRoom instanceof ChatRoomJabberImpl))
{
logger.error("Cannot send message as chatRoom is not an" +
"instance of ChatRoomJabberImpl");
return;
}

if (!chatRoom.isJoined())
{
if (logger.isDebugEnabled())
{
logger.debug("Skip sending message to room which we left!");
}
return;
}

String messageString = jsonMessage.toString();
try
{
((ChatRoomJabberImpl)chatRoom).sendJsonMessage(messageString);
if (logger.isTraceEnabled())
logger.trace("Sending json message: \"" + messageString + "\"");
}
catch (OperationFailedException e)
{
logger.warn("Failed to send json message " + messageString, e);
}
}
/**
* Save a transcript given as a String to subdirectory of getLogDirPath()
* with the given directory name and the given file name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.jitsi.jigasi.transcription;

import net.java.sip.communicator.service.protocol.*;
import org.jitsi.jigasi.*;
import org.json.simple.*;

import java.time.*;
Expand Down Expand Up @@ -225,19 +226,15 @@ public JSONFormatter getFormatter()
}

@Override
public void publish(ChatRoom room, TranscriptionResult result)
public void publish(JvbConference jvbConference, TranscriptionResult result)
{
JSONObject eventObject = createTranscriptionJSONObject(result);

super.sendJsonMessage(room, eventObject);
jvbConference.sendJsonMessage(createTranscriptionJSONObject(result));
}

@Override
public void publish(ChatRoom room, TranslationResult result)
public void publish(JvbConference jvbConference, TranslationResult result)
{
JSONObject eventObject = createTranslationJSONObject(result);

super.sendJsonMessage(room, eventObject);
jvbConference.sendJsonMessage(createTranslationJSONObject(result));
}

/**
Expand Down
Loading

0 comments on commit 8456fe8

Please sign in to comment.