diff --git a/ClassTranscribeDatabase/CommonUtils.cs b/ClassTranscribeDatabase/CommonUtils.cs index 7c0578b..1e178e2 100644 --- a/ClassTranscribeDatabase/CommonUtils.cs +++ b/ClassTranscribeDatabase/CommonUtils.cs @@ -167,7 +167,7 @@ public static string GetMediaName(Media media) return name; } public static string ToCourseOfferingSubDirectory(CTDbContext ctx, Entity entity) { -#nullable enable + #nullable enable try { String? path = GetRelatedCourseOfferingFilePath(ctx, entity); @@ -185,11 +185,11 @@ public static string ToCourseOfferingSubDirectory(CTDbContext ctx, Entity entity // we could still get here if something in the model has been deleted. } #nullable enable - public static string GetRelatedCourseOfferingFilePath(CTDbContext ctx, Entity entity) + private static string? GetRelatedCourseOfferingFilePath(CTDbContext ctx, Entity entity) { return GetRelatedCourseOfferingFilePathAsync(ctx, entity).GetAwaiter().GetResult(); } - public static async Task GetRelatedCourseOfferingFilePathAsync(CTDbContext ctx, Entity entity) + private static async Task GetRelatedCourseOfferingFilePathAsync(CTDbContext ctx, Entity entity) { // the only thing that we can trust exists on the given the entity Id // Drop recursion... this may reduce the number of SQL calls diff --git a/ClassTranscribeDatabase/Seed.cs b/ClassTranscribeDatabase/Seed.cs index 5b96cdb..24129b0 100644 --- a/ClassTranscribeDatabase/Seed.cs +++ b/ClassTranscribeDatabase/Seed.cs @@ -51,7 +51,7 @@ public void Seed() string ignore = "57P03"; // "57P03: the database system is starting up"; if (! ex.Message.Contains(ignore) ){ - throw ex; + throw; // throw implicitly to preserve stack trace } } _logger.LogError($"Attempt {attempt} of {maxAttempts}. Cannot connect to Database"); diff --git a/ClassTranscribeDatabase/Services/RabbitMQConnection.cs b/ClassTranscribeDatabase/Services/RabbitMQConnection.cs index ac83fe5..3c5a361 100644 --- a/ClassTranscribeDatabase/Services/RabbitMQConnection.cs +++ b/ClassTranscribeDatabase/Services/RabbitMQConnection.cs @@ -227,7 +227,8 @@ public void PurgeQueue(String queueName) catch (Exception e) { Logger.LogError(e, "Error purging queue {0}", queueName); - throw e; + throw ; // throw implicitly to preserve stack trace + // see https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2200 } } diff --git a/ClassTranscribeDatabase/Services/Slack.cs b/ClassTranscribeDatabase/Services/Slack.cs index 94e6bee..406cf7d 100644 --- a/ClassTranscribeDatabase/Services/Slack.cs +++ b/ClassTranscribeDatabase/Services/Slack.cs @@ -16,7 +16,9 @@ public class SlackLogger private readonly Encoding _encoding = new UTF8Encoding(); private readonly ILogger _logger; AppSettings _appSettings; - + private static string truncate(string s, int maxLength=120) { + return s == null ? "" : s.Length < maxLength -3 ? s : s.Substring(0, maxLength-3) + "..."; + } public SlackLogger(IOptions appSettings, ILogger logger) { @@ -25,7 +27,7 @@ public SlackLogger(IOptions appSettings, ILogger logge // ignore string url = _appSettings.SLACK_WEBHOOK_URL?.Trim()??""; - if (url.Length > 0 && !url.Contains("")) + if (url.Length != 0 && url != "" && url != "changeme") { _uri = new Uri(url); } @@ -97,7 +99,7 @@ public async Task PostMessageAsync(Payload payload) catch(Exception ex) { - Console.WriteLine($"EXCEPTION SENDING SLACK MESSAGE TO '{ _uri.OriginalString }' : {ex.GetType().ToString()} : {ex.Message}"); + Console.WriteLine($"EXCEPTION SENDING SLACK MESSAGE TO '{ _uri.OriginalString }' : {ex.GetType().ToString()} : {truncate(ex.Message)}"); } } } diff --git a/PythonRpcServer/kaltura.py b/PythonRpcServer/kaltura.py index 3560fdf..41528b9 100644 --- a/PythonRpcServer/kaltura.py +++ b/PythonRpcServer/kaltura.py @@ -62,7 +62,18 @@ class KalturaProvider(MediaProvider): def __init__(self): self.client, self.ks = self.getClient( KALTURA_PARTNER_ID, KALTURA_TOKEN_ID, KATLURA_APP_TOKEN) - + def sanitize(self, s): + result = str(s) + if KALTURA_TOKEN_ID: + result = result.replace(KALTURA_TOKEN_ID,"*TOKEN*") + if KATLURA_APP_TOKEN: + result = result.replace(KATLURA_APP_TOKEN,"*APP*") + return result + + def truncate(self, s,maxLength=120): + if len(s) > maxLength-3: + return s[0:maxLength-3] + '...' + return s # Returns the Kaltura SDK client. Only used internally by constructor def getClient(self, partnerId, tokenId, appToken): config = KalturaConfiguration(partnerId) @@ -262,7 +273,7 @@ def getPlaylistItems(self, request): raise InvalidPlaylistInfoException( "Error during Channel/Playlist processing " + str(e)) end_time = perf_counter() - print(f"getPlaylistItems({request}) returning '{result}'. Processing ({end_time-start_time:.2f}) seconds.") + print(f"getPlaylistItems({request}) returning '{self.truncate(self.sanitize(result))}'. Processing ({end_time-start_time:.2f}) seconds.") return result # Main entry point - overrides stub in MediaProvider super class @@ -275,7 +286,7 @@ def getMedia(self, request): result = self.downloadLecture(videoUrl) end_time = perf_counter() - print(f"getMedia({request}) returning '{result}'. Processing ({end_time-start_time:.2f}) seconds.") + print(f"getMedia({request}) returning '{self.truncate(self.sanitize(result))}'. Processing ({end_time-start_time:.2f}) seconds.") return result except Exception as e: diff --git a/PythonRpcServer/utils.py b/PythonRpcServer/utils.py index 987a473..c9aee93 100644 --- a/PythonRpcServer/utils.py +++ b/PythonRpcServer/utils.py @@ -49,11 +49,12 @@ def getRandomString(n): # See random.seed - in this file -def getTmpFile(): +def getTmpFile(subdir="pythonrpc"): + os.mkdir(os.path.join(DATA_DIRECTORY, subdir), exist_ok=True) while True: # A key space of 34^12 should be sufficient for us... filenameSize = 12 - candidate = os.path.join(DATA_DIRECTORY, getRandomString(filenameSize)) + candidate = os.path.join(DATA_DIRECTORY, subdir, "tmp_"+getRandomString(filenameSize)) if not os.path.exists(candidate): return candidate # We wil never print this, and if we do, no-one will read it. diff --git a/TaskEngine/Tasks/DownloadMediaTask.cs b/TaskEngine/Tasks/DownloadMediaTask.cs index c25b352..9e6c37a 100644 --- a/TaskEngine/Tasks/DownloadMediaTask.cs +++ b/TaskEngine/Tasks/DownloadMediaTask.cs @@ -1,14 +1,17 @@ -using ClassTranscribeDatabase; -using ClassTranscribeDatabase.Models; -using ClassTranscribeDatabase.Services; -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Logging; -using System; +using System; +using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using System.Linq.Expressions; using System.Threading.Tasks; + +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; + +using ClassTranscribeDatabase; +using ClassTranscribeDatabase.Models; +using ClassTranscribeDatabase.Services; using static ClassTranscribeDatabase.CommonUtils; // #pragma warning disable CA2007 @@ -43,18 +46,15 @@ public DownloadMediaTask(RabbitMQConnection rabbitMQ, RpcClient rpcClient, protected override async Task OnConsume(string mediaId, TaskParameters taskParameters, ClientActiveTasks cleanup) { - RegisterTask(cleanup,mediaId); // may throw AlreadyInProgress exception + RegisterTask(cleanup, mediaId); // may throw AlreadyInProgress exception - Media media; - string subdir; - using (var _context = CTDbContext.CreateDbContext()) + (Media media, string subdir) = await prepMediaForDownload(mediaId); + if (media == null) { - media = await _context.Medias.Where(m => m.Id == mediaId) - .Include(m => m.Playlist).FirstAsync(); - GetLogger().LogInformation($"Downloading media id=({media.Id}), UniqueMediaIdentifier={media.UniqueMediaIdentifier}"); - subdir = ToCourseOfferingSubDirectory(_context, media.Playlist); // e.g. "/data/2203-abcd" + return; } - Video video = new Video(); + + Video? video = null; switch (media.SourceType) { case SourceType.Echo360: video = await DownloadEchoVideo(subdir, media); break; @@ -70,103 +70,179 @@ protected override async Task OnConsume(string mediaId, TaskParameters taskParam throw new Exception($"DownloadMediaTask failed for mediaId ({media.Id})"); } + var processNewVideo = await updateMediaWithVideo(mediaId, video); + if (processNewVideo) + { + _sceneDetectionTask.Publish(video.Id); + //_processVideoTask.Publish(video.Id); //TODO - re- add this code + } + } + + /* Print some useful log messages, check if media, playlist and courseoffing exist and determine the subdir for new files for this courseoffering + Optionally updates the media options from the playlist if they have not been create yet + */ + async Task<(Media, string)> prepMediaForDownload(string mediaId) + { + if (string.IsNullOrEmpty(mediaId)) + { + GetLogger().LogInformation($"Download Media : mediaId is null or empty - skipping download"); + return (null, null); + } + using (var _context = CTDbContext.CreateDbContext()) + { + var media = await _context.Medias.Include(m => m.Playlist).ThenInclude(p => p.Offering).Where(m => m.Id == mediaId).FirstOrDefaultAsync(); + var offeringName = media?.Playlist?.Offering?.CourseName ?? "no-offering"; + var playlistName = media?.Playlist?.Name ?? "no-playlist"; + GetLogger().LogInformation($"Download for media id=({mediaId}), (#{media?.Index}) of {offeringName}/ ({media?.Playlist?.Id}:{playlistName }). UniqueMediaIdentifier={media?.UniqueMediaIdentifier}"); + + if (mediaId == null || media.Playlist == null || media.Playlist.Offering == null) + { + GetLogger().LogInformation($"Media ({mediaId}): Media or Playlist or CourseOffering is null - perhaps it was deleted. Skipping download"); + return (null, null); + } + + // Clone media options (e.g. switch video streams) if needed + if (string.IsNullOrEmpty(media.Options) && !string.IsNullOrEmpty(media.Playlist.Options)) + { + GetLogger().LogInformation($"Media ({media.Id}): Setting options based on playlist options ({media.Playlist.Options})"); + media.Options = media.Playlist.Options; + _context.SaveChanges(); + } + var subdir = ToCourseOfferingSubDirectory(_context, media.Playlist.Offering); // e.g. "/data/2203-abcd" + return (media, subdir); + } + } + protected async Task updateMediaWithVideo(string mediaId, Video newVideo) + { + // Sanity check + if(mediaId == null || newVideo == null) + { + GetLogger().LogInformation($"Media ({mediaId}): mediaId or newVideo is null!"); + return false; + } + // We get the media again because downloading is very slow and perhaps the database has changed + using (var _context = CTDbContext.CreateDbContext()) { - var latestMedia = await _context.Medias - .Include(m=>m.Video).ThenInclude(v=>v.Video2) - .Include(m=>m.Video).ThenInclude(v=>v.Video1) - .FirstOrDefaultAsync(m => m.Id==media.Id); // Find does not support Include - if(latestMedia == null) { // should never happen... - GetLogger().LogInformation($"Media ({media.Id}): latestMedia == null !?"); - return; + var media = await _context.Medias + .Include(m => m.Video).ThenInclude(v => v.Video2) + .Include(m => m.Video).ThenInclude(v => v.Video1) + .FirstOrDefaultAsync(m => m.Id == mediaId); // Find does not support Include + if (media == null) + { // should never happen... but if it does, clean up our newly downloaded video files + GetLogger().LogInformation($"Media ({mediaId}): media == null !? (deleting newly downloaded items)"); + await newVideo.DeleteVideoAsync(_context); + return false; } - GetLogger().LogInformation($"Media ({media.Id}): latestMedia.Video == null is {latestMedia.Video == null}"); + GetLogger().LogInformation($"Media ({mediaId}): media.Video == null is {media.Video == null}"); // Don't add video if there are already videos for the given media. - if (latestMedia.Video == null) + if(newVideo.Id != null) { + GetLogger().LogError($"Media ({mediaId}): Huh? newVideo should not have an Id yet - that's my job!"); + } + if (media.Video != null) + { + GetLogger().LogInformation($"Media ({mediaId}): Surprise - media already has video set (race condition?)- no further processing required.Discarding new files"); + await newVideo.DeleteVideoAsync(_context); + return false; + } + // Time to find out what we have in the database + // Important idea: the newVideo and its filerecords are not yet part of the database. + var matchingFiles = await _context.FileRecords.Where(f => f.Hash == newVideo.Video1.Hash).ToListAsync(); + var matchedFile = matchingFiles.FirstOrDefault(); // Expect 0 or 1 + + var existingPrimaryVideos = matchedFile!= null ? await _context.Videos.Where(v => v.Video1Id == matchedFile.Id).ToListAsync() : null; + var existingPrimaryVideo = existingPrimaryVideos?.FirstOrDefault(); // If non null we expect 0 or 1 + + GetLogger().LogInformation($"Media ({mediaId}): {matchingFiles.Count} FileRecord hash match found"); + GetLogger().LogInformation($"Media ({mediaId}): {existingPrimaryVideos?.Count ?? 0} existing Videos found"); + + // cherrypick case (see comment below) + if (existingPrimaryVideo != null) { - // Check if Video already exists, if yes link it with this media item. - var file = _context.FileRecords.Where(f => f.Hash == video.Video1.Hash).ToList(); - if (!file.Any()) + GetLogger().LogInformation($"Media ({mediaId}): FileRecord and existing Video! deleting newly downloaded video"); + + media.VideoId = existingPrimaryVideo.Id; + + // We now take any useful supplementary files from the newly downladed video and add them to the existing video + // Then delete the new video (which has now been cherrypicked for all of its valuable stuff + if (newVideo.Video2 != null && existingPrimaryVideo.Video2 == null) { - GetLogger().LogInformation($"Media ({media.Id}): FileRecord with matching hash NOT found"); - // Create new video Record - await _context.Videos.AddAsync(video); - await _context.SaveChangesAsync(); - latestMedia.VideoId = video.Id; - await _context.SaveChangesAsync(); - GetLogger().LogInformation($"Downloaded (new) video.Id={video.Id}" ); - _sceneDetectionTask.Publish(video.Id); - //_processVideoTask.Publish(video.Id); //TODO - re- add this code + var v2 = newVideo.Video2; + GetLogger().LogInformation($"Media ({mediaId}): Adding video2 ({v2.Id}) to video ({existingPrimaryVideo.Id})"); + await _context.FileRecords.AddAsync(v2); + await _context.SaveChangesAsync(); // now v3 has an Id, so we can use below + existingPrimaryVideo.Video2Id = v2.Id; + newVideo.Video2 = null; // stop DeleteVideo beiow from deleting the file of video2 that we just added to existingPrimaryVideos } - else + if (newVideo.ASLVideo != null && existingPrimaryVideo.ASLVideo == null) { - GetLogger().LogInformation($"Media ({media.Id}): FileRecord with matching hash found"); - var existingVideos = await _context.Videos.Where(v => v.Video1Id == file.First().Id).ToListAsync(); - // If file exists but video doesn't. - if (!existingVideos.Any()) - { - GetLogger().LogInformation($"Media ({media.Id}): FileRecord but no Video; deleting FileRecord. Creating Video entity"); + var v3 = newVideo.ASLVideo; + GetLogger().LogInformation($"Media ({mediaId}): Adding ASL ({v3.Id}) to video ({existingPrimaryVideo.Id})"); + await _context.FileRecords.AddAsync(v3); + await _context.SaveChangesAsync(); // now v3 has an Id, so we can use below + existingPrimaryVideo.ASLVideoId = v3.Id; + newVideo.ASLVideo = null; // stop DeleteVideo beiow from deleting the file of ASL that we just added to existingPrimaryVideo + } + await _context.SaveChangesAsync(); + GetLogger().LogInformation($"Media ({media.Id}): Existing Video found. (Deleting New) video.Id=({newVideo.Id})"); + + // Deleting downloaded video as it is a duplicate. Don't start scene detection + await newVideo.DeleteVideoAsync(_context); + await _context.SaveChangesAsync(); + return false; // no need to start scene detection etc + } - // Delete existing file Record - await file.First().DeleteFileRecordAsync(_context); + await _context.Videos.AddAsync(newVideo); + await _context.SaveChangesAsync(); // now video has an Id (finally!), so we can use it for this media - // Create new video Record - await _context.Videos.AddAsync(video); - await _context.SaveChangesAsync(); // now video has an Id + media.VideoId = newVideo.Id; + await _context.SaveChangesAsync(); + GetLogger().LogInformation($"Media ({media.Id}): Assigned (new) video.Id={newVideo.Id} - done (no hash-matching FileRecords found)"); - latestMedia.VideoId = video.Id; - await _context.SaveChangesAsync(); - GetLogger().LogInformation($"Media ({media.Id}):Downloaded (file existed) new video.Id={video.Id}"); - //_transcriptionTask.Publish(video.Id); - _sceneDetectionTask.Publish(video.Id); - //_processVideoTask.Publish(video.Id); //TODO - re- add this code - } - // If video and file both exist. - else - { - GetLogger().LogInformation($"Media ({media.Id}): FileRecord and existing Video found; deleting newly downloaded video"); - - var existingVideo = await _context.Videos.Where(v => v.Video1Id == file.First().Id).FirstAsync(); - latestMedia.VideoId = existingVideo.Id; - if( video.Video2 != null && existingVideo.Video2 == null) { - var v2 = video.Video2; - await _context.FileRecords.AddAsync(v2); - await _context.SaveChangesAsync(); // now v2 has an Id - // Special case; - // add video2 to existing video - GetLogger().LogInformation($"Adding video2 ({v2.Id}) to video ({existingVideo.Id})"); - - existingVideo.Video2Id = v2.Id; - video.Video2= null; // otherwise DeleteVideo will delete the file - } - await _context.SaveChangesAsync(); - GetLogger().LogInformation($"Media ({media.Id}): Existing Video found. (Deleting New) video.Id=" + video.Id); - - // Deleting downloaded video as it's duplicate. Don't start scene detection - await video.DeleteVideoAsync(_context); - } + // clean up orphaned FileRecords + string maybeUnwantedId = matchedFile?.Id ?? ""; + if(! string.IsNullOrEmpty(maybeUnwantedId)) + { + + var isOrphanedFileRecord = ! await _context.Videos.Where(v => v.Video1Id == maybeUnwantedId || v.Video2Id == maybeUnwantedId || v.ASLVideoId == maybeUnwantedId).AnyAsync(); + GetLogger().LogInformation($"Media ({media.Id}): fileRecordId= ({maybeUnwantedId}) isOrphanedFileRecord={isOrphanedFileRecord}"); + // Delete existing file Record - no videos care about it + if (isOrphanedFileRecord) + { + GetLogger().LogInformation($"Media ({media.Id}): Deleting unnecessary FileRecord ${maybeUnwantedId} - no video entries need it"); + // Is this a problem? An empty image/audio file filerecord could match an empty video (same hash) - which we then delete here + // Future Todo: limit deletes to just FileRecords created by this video process + // Future Todo II: It would be even better to occasionally run a task that finds all File orphans of all database fields and deletes them (or moves them to a "tobedeleted" folder) + + // await matchedFile.DeleteFileRecordAsync(_context); } } + return true; } + } + public async Task