Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update RabbitMQ Config options #471

Merged
merged 5 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ClassTranscribeDatabase/Globals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public class AppSettings

public string RABBITMQ_PORT { get; set; } = "5672";

public string RABBITMQ_AUTOMATIC_RECOVERY = "true";
public string RABBITMQ_HEARTBEAT_SECONDS = "60";


// RABBITMQ_PREFETCH_COUNT has been replaced with these CONCURRENT LIMITS-
//no longer used g RABBITMQ_PREFETCH_COUNT { get; set; } // No longer used; can be deleted in next cleanup
public string MAX_CONCURRENT_TRANSCRIPTIONS { get; set; }
Expand Down
19 changes: 12 additions & 7 deletions ClassTranscribeDatabase/Seed.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
using ClassTranscribeDatabase.Models;
using Microsoft.AspNetCore.Identity;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.AspNetCore.Identity;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;

using ClassTranscribeDatabase.Models;

namespace ClassTranscribeDatabase
{
Expand Down Expand Up @@ -58,7 +60,10 @@ public void Seed()
if (attempt < maxAttempts)
{
_logger.LogInformation($"Sleeping for {retrySeconds} seconds");
Thread.Sleep(1000 * retrySeconds);
// Thread.Sleep(1000 * retrySeconds);

Task.Delay(TimeSpan.FromSeconds(retrySeconds)).Wait();

}
else
{
Expand Down
9 changes: 6 additions & 3 deletions ClassTranscribeDatabase/Services/RabbitMQConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,18 @@ private void CreateSharedConnection()
ConnectionRefCount++;
return;
}
Logger.LogInformation("Creating RabbitMQ connection");
var recovery = Convert.ToBoolean(Globals.appSettings.RABBITMQ_AUTOMATIC_RECOVERY);
var heartbeat = Convert.ToUInt32(Globals.appSettings.RABBITMQ_HEARTBEAT_SECONDS);
Logger.LogInformation($"Creating RabbitMQ connection recovery:{recovery} heartbeat:{heartbeat}");
var factory = new ConnectionFactory()
{

HostName = Globals.appSettings.RABBITMQ_SERVER_NAME.Length > 0 ? Globals.appSettings.RABBITMQ_SERVER_NAME : Globals.appSettings.RabbitMQServer,
UserName = Globals.appSettings.ADMIN_USER_ID,
Password = Globals.appSettings.ADMIN_PASSWORD,
Port = Convert.ToUInt16(Globals.appSettings.RABBITMQ_PORT) // 5672

Port = Convert.ToUInt16(Globals.appSettings.RABBITMQ_PORT), // 5672
AutomaticRecoveryEnabled = recovery,
RequestedHeartbeat = TimeSpan.FromSeconds(heartbeat)
};
// A developer may still want to checkout old code which uses the old env branch
// so just complain loudly for now
Expand Down
165 changes: 91 additions & 74 deletions TaskEngine/Program.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
using ClassTranscribeDatabase;
using ClassTranscribeDatabase.Services;
using ClassTranscribeDatabase.Services.MSTranscription;
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using TaskEngine.Tasks;
using System.Threading;

using Newtonsoft.Json.Linq;

using static ClassTranscribeDatabase.CommonUtils;
using ClassTranscribeDatabase;
using ClassTranscribeDatabase.Services;
using ClassTranscribeDatabase.Services.MSTranscription;

using TaskEngine.Tasks;

namespace TaskEngine
{
Expand All @@ -29,14 +33,32 @@ class Program
public static ServiceProvider _serviceProvider;
public static ILogger<Program> _logger;
public static void Main()
{
Console.WriteLine("TaskEngine.Main starting up -GetConfigurations...");
try {
SetupServices(); // should never return
createTaskQueues();
runQueueAwakerForever();

} catch (Exception e) {
// Some paranoia here; we *should* have a logger and exception handler in place
// So this is only here to catch unexpected startup errors that otherwise might be silent
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\n");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}
}
public static void SetupServices()
{
var configuration = CTDbContext.GetConfigurations();

// This project relies on Dependency Injection to configure its various services,
// For more info, https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-3.1
// All the services used are configured using the service provider.
Console.WriteLine("SetupServices() - starting");

var serviceProvider = new ServiceCollection()
_serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.AddConsole();
Expand Down Expand Up @@ -80,10 +102,9 @@ public static void Main()
.AddSingleton<TempCode>()
.BuildServiceProvider();

_serviceProvider = serviceProvider;
_logger = serviceProvider.GetRequiredService<ILogger<Program>>();
_logger = _serviceProvider.GetRequiredService<ILogger<Program>>();

Globals.appSettings = serviceProvider.GetService<IOptions<AppSettings>>().Value;
Globals.appSettings = _serviceProvider.GetService<IOptions<AppSettings>>().Value;
TaskEngineGlobals.KeyProvider = new KeyProvider(Globals.appSettings);

AppDomain currentDomain = AppDomain.CurrentDomain;
Expand All @@ -92,112 +113,108 @@ public static void Main()
_logger.LogInformation("Seeding database");

// Seed the database, with some initial data.
Seeder seeder = serviceProvider.GetService<Seeder>();
Seeder seeder = _serviceProvider.GetService<Seeder>();
seeder.Seed();
}

_logger.LogInformation("Starting TaskEngine");

static void runQueueAwakerForever() {
_logger.LogInformation("runQueueAwakerForever - start");
QueueAwakerTask queueAwakerTask = _serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

// Thread.Sleep(initialPauseInterval);
Task.Delay(initialPauseInterval).Wait();
// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
try {
_logger.LogInformation("Periodic Check");
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
} catch (Exception e) {
_logger.LogError(e, "Error in Periodic Check");
}
// Thread.Sleep(timeInterval);
Task.Delay(timeInterval).Wait();
_logger.LogInformation("Pausing {0} minutes before next periodicCheck", periodicCheck);
};
}
static void createTaskQueues() {
_logger.LogInformation("createTaskQueues() -starting");
// Delete any pre-existing queues on rabbitMQ.
RabbitMQConnection rabbitMQ = serviceProvider.GetService<RabbitMQConnection>();
RabbitMQConnection rabbitMQ = _serviceProvider.GetService<RabbitMQConnection>();

// Active queues managed by C# (concurrency > 0) are now purged after the queue is created and before messages are processed

ushort concurrent_videotasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_VIDEO_TASKS, NO_CONCURRENCY);
ushort concurrent_synctasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_SYNC_TASKS, MIN_CONCURRENCY);
ushort concurrent_transcriptions = ToUInt16(Globals.appSettings.MAX_CONCURRENT_TRANSCRIPTIONS, MIN_CONCURRENCY);
ushort concurrent_describe_images = 1;
ushort concurrent_describe_videos = 1;

ushort concurrent_describe_images = NO_CONCURRENCY;
ushort concurrent_describe_videos = NO_CONCURRENCY;

// Create and start consuming from all queues. If concurrency >=1 the queues are purged


// Upstream Sync related
_logger.LogInformation($"Creating DownloadPlaylistInfoTask & DownloadMediaTask consumers. Concurrency={concurrent_synctasks} ");
serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);

// Transcription Related
_logger.LogInformation($"Creating TranscriptionTask consumers. Concurrency={concurrent_transcriptions} ");

serviceProvider.GetService<TranscriptionTask>().Consume(concurrent_transcriptions);
_serviceProvider.GetService<TranscriptionTask>().Consume(concurrent_transcriptions);

// no more! - serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);
// no more! - _serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);

// Video Processing Related
_logger.LogInformation($"Creating ProcessVideoTask consumer. Concurrency={concurrent_videotasks} ");
serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
_serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
// Descriptions
serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);


_serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
_serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);

// SceneDetection now handled by native Python
// See https://github.com/classtranscribe/pyapi
serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);
_serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);

// We dont want concurrency for these tasks
_logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers.");
serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);
_serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment _serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);

// Elastic Search index should be built after TranscriptionTask
serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);

// Outdated Elastic Search index would be removed
serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);

serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);

serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);

_logger.LogInformation("Done creating task consumers");
//nolonger used :
// nope serviceProvider.GetService<nope ConvertVideoToWavTask>().Consume(concurrent_videotasks);

bool hacktest = false;
if (hacktest)
{
TempCode tempCode = serviceProvider.GetService<TempCode>();
tempCode.Temp();
return;
}
_logger.LogInformation("All done!");

QueueAwakerTask queueAwakerTask = serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

Thread.Sleep(initialPauseInterval);

// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
Thread.Sleep(timeInterval);
};
_serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);
_logger.LogInformation("createTaskQueues() - Done creating task consumers");
}

// Catch all unhandled exceptions.
static void ExceptionHandler(object sender, UnhandledExceptionEventArgs args)
{
Exception e = (Exception)args.ExceptionObject;
_logger.LogError(e, "Unhandled Exception Caught");
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\nSender:{sender ?? "null"}");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}

private static ushort ToUInt16(String val, ushort defaultVal)
Expand Down
Loading