Skip to content

Commit

Permalink
Transcription enabled storage cadence updates (#836)
Browse files Browse the repository at this point in the history
* Make cadence configurable in transcription enabled storage project

* Update EnvironmentVariablesExtensions

* Update ArmTemplate
  • Loading branch information
Henry van der Vegte authored Oct 16, 2020
1 parent 66d7131 commit 98d518a
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// <copyright file="EnvironmentVariablesExtensions.cs" company="Microsoft Corporation">
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
// </copyright>

namespace Connector.Extensions
{
using System;

public static class EnvironmentVariablesExtensions
{
public static int ClampInt(this int value, int min, int max)
{
return Math.Max(min, Math.Min(value, max));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace FetchTranscriptionFunction
{
using System;
using Connector.Extensions;

public static class FetchTranscriptionEnvironmentVariables
{
Expand Down Expand Up @@ -35,10 +36,16 @@ public static class FetchTranscriptionEnvironmentVariables

public static readonly string JsonResultOutputContainer = Environment.GetEnvironmentVariable(nameof(JsonResultOutputContainer), EnvironmentVariableTarget.Process);

public static readonly int RetryLimit = int.TryParse(Environment.GetEnvironmentVariable(nameof(RetryLimit), EnvironmentVariableTarget.Process), out RetryLimit) ? RetryLimit.ClampInt(1, MaxRetryLimit) : DefaultRetryLimit;

public static readonly string StartTranscriptionServiceBusConnectionString = Environment.GetEnvironmentVariable(nameof(StartTranscriptionServiceBusConnectionString), EnvironmentVariableTarget.Process);

public static readonly string TextAnalyticsKey = Environment.GetEnvironmentVariable(nameof(TextAnalyticsKey), EnvironmentVariableTarget.Process);

public static readonly string TextAnalyticsRegion = Environment.GetEnvironmentVariable(nameof(TextAnalyticsRegion), EnvironmentVariableTarget.Process);

private const int MaxRetryLimit = 16;

private const int DefaultRetryLimit = 4;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public TextAnalytics(string locale, string subscriptionKey, string region, ILogg

public async Task<IEnumerable<string>> AddSentimentToTranscriptAsync(SpeechTranscript speechTranscript)
{
Log.LogInformation($"Starting sentiment analysis.");

if (speechTranscript == null)
{
throw new ArgumentNullException(nameof(speechTranscript));
Expand All @@ -75,8 +73,6 @@ public async Task<IEnumerable<string>> AddSentimentToTranscriptAsync(SpeechTrans

public async Task<IEnumerable<string>> RedactEntitiesAsync(SpeechTranscript speechTranscript)
{
Log.LogInformation($"Starting entity masking.");

if (speechTranscript == null)
{
throw new ArgumentNullException(nameof(speechTranscript));
Expand Down Expand Up @@ -208,14 +204,12 @@ private List<TextAnalyticsRequestsChunk> CreateRequestChunks(SpeechTranscript sp
textAnalyticsDocumentList.Add(textAnalyticsDocument);
}

Log.LogInformation($"Total text analytics documents: {textAnalyticsDocumentList.Count}");

for (int i = 0; i < textAnalyticsDocumentList.Count; i += documentRequestLimit)
{
textAnalyticChunks.Add(new TextAnalyticsRequestsChunk(textAnalyticsDocumentList.GetRange(i, Math.Min(documentRequestLimit, textAnalyticsDocumentList.Count - i))));
}

Log.LogInformation($"Total chunks: {textAnalyticChunks.Count}");
Log.LogInformation($"Received {textAnalyticChunks.Count} text analytics chunks from {textAnalyticsDocumentList.Count} documents.");
return textAnalyticChunks;
}

Expand Down Expand Up @@ -350,7 +344,6 @@ private async Task<IEnumerable<string>> AddSentimentToSpeechTranscriptAsync(List
}
}

Log.LogInformation($"Added sentiment segments.");
return sentimentErrors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ namespace FetchTranscriptionFunction

public static class TranscriptionProcessor
{
private const int MaxRetryCount = 6;

private static StorageConnector StorageConnectorInstance = new StorageConnector(FetchTranscriptionEnvironmentVariables.AzureWebJobsStorage);

private static QueueClient StartQueueClientInstance = new QueueClient(new ServiceBusConnectionStringBuilder(FetchTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString));
Expand Down Expand Up @@ -68,28 +66,32 @@ public static async Task ProcessTranscriptionJobAsync(TranscriptionStartedMessag
}
catch (WebException e)
{
if (BatchClient.IsThrottledOrTimeoutStatusCode(((HttpWebResponse)e.Response).StatusCode))
if (e.Response != null && BatchClient.IsThrottledOrTimeoutStatusCode(((HttpWebResponse)e.Response).StatusCode))
{
log.LogInformation("Timeout or throttled, retrying message.");
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchQueueClientInstance, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
return;
}
else
{
var errorMessage = $"Fetch Transcription in job with name {jobName} failed with WebException {e} and message {e.Message}.";
log.LogError($"{errorMessage}");
await RetryOrFailJobAsync(serviceBusMessage, errorMessage, jobName, transcriptionLocation, subscriptionKey, log).ConfigureAwait(false);
}

var errorMessage = $"Fetch Transcription in job with name {jobName} failed with WebException {e} and message {e.Message}.";
log.LogError($"{errorMessage}");
await RetryOrFailJobAsync(serviceBusMessage, errorMessage, jobName, transcriptionLocation, subscriptionKey, log).ConfigureAwait(false);
throw;
}
catch (TimeoutException e)
{
log.LogInformation($"Timeout - re-enqueueing fetch transcription message. Exception message: {e.Message}");
await ServiceBusUtilities.SendServiceBusMessageAsync(FetchQueueClientInstance, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false);
return;
throw;
}
catch (Exception e)
{
var errorMessage = $"Fetch Transcription in job with name {jobName} failed with Exception {e} and message {e.Message}.";
log.LogError($"{errorMessage}");
await RetryOrFailJobAsync(serviceBusMessage, errorMessage, jobName, transcriptionLocation, subscriptionKey, log).ConfigureAwait(false);
throw;
}
}

Expand Down Expand Up @@ -126,7 +128,7 @@ private static async Task ProcessFailedTranscriptionAsync(string transcriptionLo
{
var fileName = StorageConnector.GetFileNameFromUri(new Uri(audio.FileUrl));

if (retryAudioFile && audio.RetryCount < MaxRetryCount)
if (retryAudioFile && audio.RetryCount < FetchTranscriptionEnvironmentVariables.RetryLimit)
{
log.LogInformation($"Retrying transcription with name {fileName} - retry count: {audio.RetryCount}");
var sbMessage = new ServiceBusMessage
Expand Down Expand Up @@ -337,7 +339,7 @@ private static async Task RetryOrFailJobAsync(TranscriptionStartedMessage messag
message.FailedExecutionCounter += 1;
var messageDelayTime = GetMessageDelayTime(message.PollingCounter);

if (message.FailedExecutionCounter > MaxRetryCount)
if (message.FailedExecutionCounter > FetchTranscriptionEnvironmentVariables.RetryLimit)
{
await WriteFailedJobLogToStorageAsync(message, error, jobName, log).ConfigureAwait(false);
await BatchClient.DeleteTranscriptionAsync(transcriptionLocation, subscriptionKey, log).ConfigureAwait(false);
Expand Down
13 changes: 10 additions & 3 deletions samples/batch/transcription-enabled-storage/Setup/ArmTemplate.json
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@
"authRuleCT": "[resourceId('Microsoft.ServiceBus/namespaces/queues/authorizationRules',concat('ServiceBus-', resourceGroup().name), 'start_transcription_queue','StartTranscription')]",
"appServicePlanName": "AcceleratorAppServicePlan",
"createHtmlResultFile": true,
"timerBasedExecution": true
"timerBasedExecution": true,
"messagesPerFunctionExecution": 500,
"filesPerTranscriptionJob": 32,
"retryLimit": 4
},
"resources": [
{
Expand Down Expand Up @@ -472,7 +475,7 @@
"deadLetteringOnMessageExpiration": false,
"enableBatchedOperations": false,
"duplicateDetectionHistoryTimeWindow": "PT10M",
"maxDeliveryCount": 10,
"maxDeliveryCount": 1,
"status": "Active",
"autoDeleteOnIdle": "P10675199DT2H48M5.4775807S",
"enablePartitioning": false,
Expand All @@ -496,7 +499,7 @@
"deadLetteringOnMessageExpiration": false,
"enableBatchedOperations": false,
"duplicateDetectionHistoryTimeWindow": "PT10M",
"maxDeliveryCount": 10,
"maxDeliveryCount": 1,
"status": "Active",
"autoDeleteOnIdle": "P10675199DT2H48M5.4775807S",
"enablePartitioning": false,
Expand Down Expand Up @@ -723,11 +726,14 @@
"ErrorFilesOutputContainer": "[variables('errorFilesOutputContainer')]",
"ErrorReportOutputContainer": "[variables('errorReportOutputContainer')]",
"FetchTranscriptionServiceBusConnectionString": "[listKeys(variables('authRuleFT'),'2015-08-01').primaryConnectionString]",
"FilesPerTranscriptionJob": "[variables('filesPerTranscriptionJob')]",
"FUNCTIONS_EXTENSION_VERSION": "~3",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"Locale": "[parameters('Locale')]",
"MessagesPerFunctionExecution": "[variables('messagesPerFunctionExecution')]",
"ProfanityFilterMode": "[parameters('ProfanityFilterMode')]",
"PunctuationMode": "[parameters('PunctuationMode')]",
"RetryLimit": "[variables('retryLimit')]",
"SecondaryCustomModelId": "[parameters('SecondaryCustomModelId')]",
"SecondaryLocale": "[parameters('SecondaryLocale')]",
"StartTranscriptionServiceBusConnectionString": "[listKeys(variables('authRuleCT'),'2015-08-01').primaryConnectionString]",
Expand Down Expand Up @@ -782,6 +788,7 @@
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"HtmlResultOutputContainer": "[variables('htmlResultOutputContainer')]",
"JsonResultOutputContainer": "[variables('jsonResultOutputContainer')]",
"RetryLimit": "[variables('retryLimit')]",
"StartTranscriptionServiceBusConnectionString": "[listKeys(variables('authRuleCT'),'2015-08-01').primaryConnectionString]",
"TextAnalyticsKey": "[parameters('TextAnalyticsKey')]",
"TextAnalyticsRegion": "[parameters('TextAnalyticsRegion')]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ namespace StartTranscriptionByTimer

public static class StartTranscriptionByTimer
{
private const int MessagesPerExecutionThreshold = 5000;

private const double MessageReceiveTimeoutInSeconds = 60;

private static MessageReceiver MessageReceiverInstance = new MessageReceiver(new ServiceBusConnectionStringBuilder(StartTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString), prefetchCount: MessagesPerExecutionThreshold);
private static MessageReceiver MessageReceiverInstance = new MessageReceiver(new ServiceBusConnectionStringBuilder(StartTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString), prefetchCount: StartTranscriptionEnvironmentVariables.MessagesPerFunctionExecution);

[FunctionName("StartTranscriptionByTimer")]
public static async Task Run([TimerTrigger("0 */2 * * * *")] TimerInfo myTimer, ILogger log)
Expand Down Expand Up @@ -48,7 +46,7 @@ public static async Task Run([TimerTrigger("0 */2 * * * *")] TimerInfo myTimer,

log.LogInformation("Pulling messages from queue...");

var messages = await MessageReceiverInstance.ReceiveAsync(MessagesPerExecutionThreshold, TimeSpan.FromSeconds(MessageReceiveTimeoutInSeconds)).ConfigureAwait(false);
var messages = await MessageReceiverInstance.ReceiveAsync(StartTranscriptionEnvironmentVariables.MessagesPerFunctionExecution, TimeSpan.FromSeconds(MessageReceiveTimeoutInSeconds)).ConfigureAwait(false);

if (messages == null || !messages.Any())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@
namespace StartTranscriptionByTimer
{
using System;
using Connector.Extensions;

public static class StartTranscriptionEnvironmentVariables
{
public static readonly bool AddDiarization = bool.TryParse(Environment.GetEnvironmentVariable(nameof(AddDiarization), EnvironmentVariableTarget.Process), out AddDiarization) && AddDiarization;

public static readonly bool AddWordLevelTimestamps = bool.TryParse(Environment.GetEnvironmentVariable(nameof(AddWordLevelTimestamps), EnvironmentVariableTarget.Process), out AddWordLevelTimestamps) && AddWordLevelTimestamps;

public static readonly int MessagesPerFunctionExecution = int.TryParse(Environment.GetEnvironmentVariable(nameof(MessagesPerFunctionExecution), EnvironmentVariableTarget.Process), out MessagesPerFunctionExecution) ? MessagesPerFunctionExecution.ClampInt(1, MaxMessagesPerFunctionExecution) : DefaultMessagesPerFunctionExecution;

public static readonly int FilesPerTranscriptionJob = int.TryParse(Environment.GetEnvironmentVariable(nameof(FilesPerTranscriptionJob), EnvironmentVariableTarget.Process), out FilesPerTranscriptionJob) ? FilesPerTranscriptionJob.ClampInt(1, MaxFilesPerTranscriptionJob) : DefaultFilesPerTranscriptionJob;

public static readonly int RetryLimit = int.TryParse(Environment.GetEnvironmentVariable(nameof(RetryLimit), EnvironmentVariableTarget.Process), out RetryLimit) ? RetryLimit.ClampInt(1, MaxRetryLimit) : DefaultRetryLimit;

public static readonly string AudioInputContainer = Environment.GetEnvironmentVariable(nameof(AudioInputContainer), EnvironmentVariableTarget.Process);

public static readonly string AzureServiceBus = Environment.GetEnvironmentVariable(nameof(AzureServiceBus), EnvironmentVariableTarget.Process);
Expand Down Expand Up @@ -42,5 +49,17 @@ public static class StartTranscriptionEnvironmentVariables
public static readonly string SecondaryLocale = Environment.GetEnvironmentVariable(nameof(SecondaryLocale), EnvironmentVariableTarget.Process);

public static readonly string StartTranscriptionServiceBusConnectionString = Environment.GetEnvironmentVariable(nameof(StartTranscriptionServiceBusConnectionString), EnvironmentVariableTarget.Process);

private const int MaxMessagesPerFunctionExecution = 5000;

private const int DefaultMessagesPerFunctionExecution = 500;

private const int MaxFilesPerTranscriptionJob = 1000;

private const int DefaultFilesPerTranscriptionJob = 16;

private const int MaxRetryLimit = 16;

private const int DefaultRetryLimit = 4;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ namespace StartTranscriptionByTimer

public class StartTranscriptionHelper
{
private const int MaxFilesPerTranscriptionJob = 1000;

private const int MaxMessageRetryCount = 6;

private static StorageConnector StorageConnectorInstance = new StorageConnector(StartTranscriptionEnvironmentVariables.AzureWebJobsStorage);

private static QueueClient StartQueueClientInstance = new QueueClient(new ServiceBusConnectionStringBuilder(StartTranscriptionEnvironmentVariables.StartTranscriptionServiceBusConnectionString));
Expand All @@ -40,6 +36,8 @@ public class StartTranscriptionHelper

private readonly string AudioInputContainerName = StartTranscriptionEnvironmentVariables.AudioInputContainer;

private readonly int FilesPerTranscriptionJob = StartTranscriptionEnvironmentVariables.FilesPerTranscriptionJob;

private readonly string HostName = $"https://{StartTranscriptionEnvironmentVariables.AzureSpeechServicesRegion}.api.cognitive.microsoft.com/";

private ILogger Logger;
Expand Down Expand Up @@ -67,9 +65,9 @@ public async Task StartTranscriptionsAsync(IEnumerable<Message> messages, Messag
var chunkedMessages = new List<List<Message>>();
var messageCount = messages.Count();

for (int i = 0; i < messageCount; i += MaxFilesPerTranscriptionJob)
for (int i = 0; i < messageCount; i += FilesPerTranscriptionJob)
{
var chunk = messages.Skip(i).Take(Math.Min(MaxFilesPerTranscriptionJob, messageCount - i)).ToList();
var chunk = messages.Skip(i).Take(Math.Min(FilesPerTranscriptionJob, messageCount - i)).ToList();
chunkedMessages.Add(chunk);
}

Expand Down Expand Up @@ -231,7 +229,6 @@ private async Task StartBatchTranscriptionJobAsync(IEnumerable<Message> messages
var errorMessage = $"Throttled or timeout while creating post. Error Message: {e.Message}";
Logger.LogError(errorMessage);
await RetryOrFailMessagesAsync(messages, errorMessage).ConfigureAwait(false);
return;
}
else
{
Expand All @@ -245,22 +242,23 @@ private async Task StartBatchTranscriptionJobAsync(IEnumerable<Message> messages
}

await WriteFailedJobLogToStorageAsync(serviceBusMessages, errorMessage, jobName).ConfigureAwait(false);
return;
}

throw;
}
catch (TimeoutException e)
{
var errorMessage = $"Timeout while creating post, re-enqueueing transcription start. Message: {e.Message}";
Logger.LogError(errorMessage);
await RetryOrFailMessagesAsync(messages, errorMessage).ConfigureAwait(false);
return;
throw;
}
catch (Exception e)
{
var errorMessage = $"Start Transcription in job with name {jobName} failed with exception {e} and message {e.Message}";
Logger.LogError(errorMessage);
await WriteFailedJobLogToStorageAsync(serviceBusMessages, errorMessage, jobName).ConfigureAwait(false);
return;
throw;
}

Logger.LogInformation($"Fetch transcription queue successfully informed about job at: {jobName}");
Expand All @@ -272,7 +270,7 @@ private async Task RetryOrFailMessagesAsync(IEnumerable<Message> messages, strin
{
var sbMessage = JsonConvert.DeserializeObject<ServiceBusMessage>(Encoding.UTF8.GetString(message.Body));

if (sbMessage.RetryCount >= MaxMessageRetryCount)
if (sbMessage.RetryCount >= StartTranscriptionEnvironmentVariables.RetryLimit)
{
var fileName = StorageConnector.GetFileNameFromUri(sbMessage.Data.Url);
var errorFileName = fileName + ".txt";
Expand Down

0 comments on commit 98d518a

Please sign in to comment.