Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Audio Transcript heuristic for dynamic thread allocation on client #2023

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions iped-api/src/main/java/iped/data/IItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,9 @@ public interface IItem extends IItemReader {
@Override
String toString();

void setReEnqueueItem(boolean val);
boolean isReEnqueueItem();
void setFallBackTask(boolean val);
boolean isFallBackTask();

}
15 changes: 15 additions & 0 deletions iped-app/resources/config/conf/AudioTranscriptConfig.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ batchSize = 1
# IP:PORT of the service/central node used by the RemoteWav2Vec2TranscriptTask implementation.
# remoteServiceAddress = 127.0.0.1:11111

#Performs a heuristic for dynamic thread allocation and spaced requeue. Helps improve performance of slow transcription servers.
#clientDynamicThreadRequeueHeuristics = true

#If active, the client will also help the server with the transcription task, only if the client has no other tasks to do. The heuristic must be turned on
#clientTranscriptHelp = true

#Defines the implementation class for client help, must be a local implementation ( not remote transcript task )
#clientTranscriptHelpImplementationClass = iped.engine.task.transcript.Wav2Vec2TranscriptTask

#Advanced Parameter. Defines which part of the queue the items will be sent to. 4 = 1/4 size. Values ​​greater than or equal to 1
#clientSplitQueueRatio = 4

#Advanced Parameter. Sets the delta time in milliseconds when consecutive items are requested to be requeued, provides better spacing.
#clientRequeueDeltaTime = 5000

#########################################
# MicrosoftTranscriptTask options
#########################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig {
private static final String SKIP_KNOWN_FILES = "skipKnownFiles";
private static final String PRECISION = "precision";
private static final String BATCH_SIZE = "batchSize";
private static final String REQUEUE_HEURISTICS = "clientDynamicThreadRequeueHeuristics";
private static final String CLIENTE_HELP = "clientTranscriptHelp";
private static final String IMPL_CLASS_KEY_CLIENT = "clientTranscriptHelpImplementationClass";
private static final String REQUEUE_RATIO = "clientSplitQueueRatio";
private static final String REQUEUE_DELTA_TIME = "clientRequeueDeltaTime";

private List<String> languages = new ArrayList<>();
private List<String> mimesToProcess = new ArrayList<>();
Expand All @@ -53,6 +58,11 @@ public class AudioTranscriptConfig extends AbstractTaskPropertiesConfig {
private boolean skipKnownFiles = true;
private String precision = "int8";
private int batchSize = 1;
private boolean requeueHeuristic = false;
private boolean clientTranscriptHelp = false;
private String classNameFallBack = "";
private int requeueRatio = 4;
private long requeueDeltaTime = 5000;

public String getPrecision() {
return precision;
Expand Down Expand Up @@ -136,6 +146,26 @@ public String getGoogleModel() {
return googleModel;
}

public boolean getRequeueHeuristic() {
return requeueHeuristic;
}

public boolean getClientTranscriptHelp() {
return clientTranscriptHelp;
}

public String getClassNameFallBack() {
return classNameFallBack;
}

public int getRequeueRatio() {
return requeueRatio;
}

public long getRequeueDeltaTime() {
return requeueDeltaTime;
}

@Override
public void processProperties(UTF8Properties properties) {

Expand Down Expand Up @@ -200,6 +230,22 @@ public void processProperties(UTF8Properties properties) {
if (value != null) {
batchSize = Integer.parseInt(value.trim());
}
value = properties.getProperty(CLIENTE_HELP);
if (value != null) {
clientTranscriptHelp = Boolean.valueOf(value.trim());
}
value = properties.getProperty(IMPL_CLASS_KEY_CLIENT);
if (value != null) {
classNameFallBack = value.trim();
}
value = properties.getProperty(REQUEUE_RATIO);
if (value != null) {
requeueRatio = Integer.valueOf(value.trim());
}
value = properties.getProperty(REQUEUE_DELTA_TIME);
if (value != null) {
requeueDeltaTime = Long.valueOf(value.trim());
}
}

/**
Expand Down
44 changes: 44 additions & 0 deletions iped-engine/src/main/java/iped/engine/core/ProcessingQueues.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import iped.data.IItem;
import iped.engine.data.CaseData;
import iped.engine.util.Util;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ProcessingQueues {

Expand Down Expand Up @@ -61,6 +63,48 @@ public void addItemToQueue(IItem item, int queuePriority) throws InterruptedExce
addItemToQueue(item, queuePriority, false, false);
}

public void addItemToQueueSpaced(IItem item, int queuePriority, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime) throws InterruptedException {
addItemToQueueSpaced(item, queuePriority, true, numWorkers, lastQueueIndex, lastQueueTime, queueSplit,queueDeltaTime);
}

private void addItemToQueueSpaced(IItem item, int queuePriority, boolean blockIfFull, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime)
throws InterruptedException {

Util.calctrackIDAndUpdateID(caseData, item);

LinkedList<IItem> queue = queues.get(queuePriority);
boolean sleep = false;
while (true) {
if (sleep) {
sleep = false;
Thread.sleep(1000);
}
synchronized (this) {
if (blockIfFull && queuePriority == 0 && queue.size() >= maxQueueSize) {
sleep = true;
continue;
} else {
queueSplit = (queueSplit <= 0)?4:queueSplit;
queueDeltaTime = (queueDeltaTime <= 0)?5000:queueDeltaTime;
int queueSplitInteger = (int)(queue.size()/queueSplit);
if (lastQueueIndex.get() == -1){
lastQueueIndex.set(queueSplitInteger);
lastQueueTime.set(System.currentTimeMillis());
}else{
if (lastQueueIndex.get() + numWorkers < queue.size() && ((System.currentTimeMillis() - lastQueueTime.get()) < queueDeltaTime)){
lastQueueIndex.addAndGet(numWorkers);
}else{
lastQueueIndex.set(queueSplitInteger);
}
lastQueueTime.set(System.currentTimeMillis());
}
queue.add(lastQueueIndex.get(),item);
break;
}
}
}
}

private void addItemToQueue(IItem item, int queuePriority, boolean addFirst, boolean blockIfFull)
throws InterruptedException {

Expand Down
21 changes: 21 additions & 0 deletions iped-engine/src/main/java/iped/engine/data/Item.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ public static void setStartID(int start) {

private static final int maxImageLength = 128 << 20;

private boolean reEnqueueItem = false;

private boolean fallBackTask = false;

/**
* Adiciona o item a uma categoria.
*
Expand Down Expand Up @@ -1316,4 +1320,21 @@ public Object getTempAttribute(String key) {
public void setTempAttribute(String key, Object value) {
tempAttributes.put(key, value);
}

public void setReEnqueueItem(boolean val){
this.reEnqueueItem = val;
}

public boolean isReEnqueueItem(){
return this.reEnqueueItem;
}

public void setFallBackTask(boolean val){
this.fallBackTask = val;
}

public boolean isFallBackTask(){
return this.fallBackTask;
}

}
16 changes: 16 additions & 0 deletions iped-engine/src/main/java/iped/engine/task/AbstractTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import iped.engine.data.CaseData;
import iped.engine.io.TimeoutException;
import iped.parsers.util.CorruptedCarvedException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Classe que representa uma tarefa de procesamento (assinatura, hash, carving,
Expand Down Expand Up @@ -248,6 +250,20 @@ protected void sendToNextTask(IItem evidence) throws Exception {
}
}

protected void reEnqueueItemSpaced(IItem item, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime) throws InterruptedException {
reEnqueueItemSpaced(item, worker.manager.getProcessingQueues().getCurrentQueuePriority(), numWorkers, lastQueueIndex, lastQueueTime, queueSplit, queueDeltaTime);
throw new ItemReEnqueuedException();
}

private void reEnqueueItemSpaced(IItem item, int queue, int numWorkers, AtomicInteger lastQueueIndex, AtomicLong lastQueueTime, int queueSplit, long queueDeltaTime) throws InterruptedException {
item.dispose();
SkipCommitedTask.checkAgainLaterProcessedParents(item);
worker.manager.getProcessingQueues().addItemToQueueSpaced(item, queue, numWorkers, lastQueueIndex, lastQueueTime, queueSplit, queueDeltaTime);
if (!item.isQueueEnd()) {
worker.decItemsBeingProcessed();
}
}

protected void reEnqueueItem(IItem item) throws InterruptedException {
reEnqueueItem(item, worker.manager.getProcessingQueues().getCurrentQueuePriority());
throw new ItemReEnqueuedException();
Expand Down
Loading