Skip to content

Commit

Permalink
(#102) ContentProxy: finally, make it compile
Browse files Browse the repository at this point in the history
  • Loading branch information
ForNeVeR committed Aug 20, 2022
1 parent a762d65 commit 52e8715
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 62 deletions.
58 changes: 29 additions & 29 deletions Emulsion.ContentProxy/FileCache.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ type DownloadRequest = {
Uri: Uri
CacheKey: string
Size: uint64
ReplyChannel: AsyncReplyChannel<Stream option>
}

// TODO: Total cache limit
// TODO: Threading
type FileCache(logger: ILogger,
settings: FileCacheSettings,
sha256: SHA256) =
Expand All @@ -32,7 +30,6 @@ type FileCache(logger: ILogger,

let getFromCache(cacheKey: string) = async {
let path = getFilePath cacheKey
do! Async.SwitchToThreadPool()
return
if File.Exists path then
Some(new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Delete))
Expand All @@ -53,46 +50,49 @@ type FileCache(logger: ILogger,
}

let downloadIntoCacheAndGet uri cacheKey: Async<Stream> = async {
let! ct = Async.CancellationToken
let! stream = download uri
let path = getFilePath cacheKey
logger.Information("Saving {Uri} to path {Path}…", uri, path)

do! Async.SwitchToThreadPool()
use cachedFile = new FileStream(path, FileMode.Open, FileAccess.Write, FileShare.None)
do! Async.AwaitTask(stream.CopyToAsync cachedFile)

match! getFromCache cacheKey with
| Some
do! Async.AwaitTask(stream.CopyToAsync(cachedFile, ct))

let! file = getFromCache cacheKey
return upcast Option.get file
}

let cancellation = new CancellationTokenSource()
let processRequest request = async {
logger.Information("Cache lookup for content {Uri} (cache key {CacheKey})", uri, cacheKey)
match! getFromCache cacheKey with
let processRequest request: Async<Stream option> = async {
logger.Information("Cache lookup for content {Uri} (cache key {CacheKey})", request.Uri, request.CacheKey)
match! getFromCache request.CacheKey with
| Some content ->
logger.Information("Cache hit for content {Uri} (cache key {CacheKey})", uri, cacheKey)
logger.Information("Cache hit for content {Uri} (cache key {CacheKey})", request.Uri, request.CacheKey)
return Some content
| None ->
logger.Information("No cache hit for content {Uri} (cache key {CacheKey}), will download", uri, cacheKey)
let! shouldCache = ensureFreeCache size
logger.Information("No cache hit for content {Uri} (cache key {CacheKey}), will download", request.Uri, request.CacheKey)
let! shouldCache = ensureFreeCache request.Size
if shouldCache then
logger.Information("Resource {Uri} (cache key {CacheKey}, {Size} bytes) will fit into cache, caching", uri, cacheKey, size)
let! result = downloadIntoCacheAndGet uri cacheKey
logger.Information("Resource {Uri} (cache key {CacheKey}, {Size} bytes) downloaded", uri, cacheKey, size)
logger.Information("Resource {Uri} (cache key {CacheKey}, {Size} bytes) will fit into cache, caching", request.Uri, request.CacheKey, request.Size)
let! result = downloadIntoCacheAndGet request.Uri request.CacheKey
logger.Information("Resource {Uri} (cache key {CacheKey}, {Size} bytes) downloaded", request.Uri, request.CacheKey, request.Size)
return Some result
else
logger.Information("Resource {Uri} (cache key {CacheKey}) won't fit into cache, directly downloading", uri, cacheKey)
let! result = download uri
logger.Information("Resource {Uri} (cache key {CacheKey}) won't fit into cache, directly downloading", request.Uri, request.CacheKey)
let! result = download request.Uri
return Some result
}
let rec processLoop (processor: MailboxProcessor<_>) = async {

let rec processLoop(processor: MailboxProcessor<_ * AsyncReplyChannel<_>>) = async {
while true do
let! request, replyChannel = processor.Receive()
try
let! message = processor.Re()
do! processRequest message processor
let! result = processRequest request
replyChannel.Reply result
with
| ex -> logger.Error(ex, "Exception while processing the file download queue")
| ex ->
logger.Error(ex, "Exception while processing the file download queue")
replyChannel.Reply None
}
let processor = MailboxProcessor.Start(processLoop, cancellation.Token)

Expand All @@ -101,9 +101,9 @@ type FileCache(logger: ILogger,
cancellation.Dispose()
(processor :> IDisposable).Dispose()

member _.DownloadLink(uri: Uri, cacheKey: string, size: uint64): Async<Stream option> = processor.PostAndReply(fun chan -> {
Uri = uri
CacheKey = cacheKey
Size = size
ReplyChannel = chan
})
member _.Download(uri: Uri, cacheKey: string, size: uint64): Async<Stream option> =
processor.PostAndAsyncReply(fun chan -> ({
Uri = uri
CacheKey = cacheKey
Size = size
}, chan))
27 changes: 16 additions & 11 deletions Emulsion.Telegram/Client.fs
Original file line number Diff line number Diff line change
@@ -1,39 +1,44 @@
namespace Emulsion.Telegram

open System
open System.IO
open System.Threading

open Emulsion.ContentProxy
open Emulsion.Database
open Emulsion.Messaging.MessageSystem
open Emulsion.Settings

type FileInfo = {
TemporaryLink: Uri
Size: uint64
}

type ITelegramClient =
abstract GetTemporaryFileLink: fileId: string -> Async<Stream option>
abstract GetFileInfo: fileId: string -> Async<FileInfo option>

type Client(ctx: ServiceContext,
cancellationToken: CancellationToken,
telegramSettings: TelegramSettings,
databaseSettings: DatabaseSettings option,
hostingSettings: HostingSettings option,
fileCache: FileCache option) =
hostingSettings: HostingSettings option) =
inherit MessageSystemBase(ctx, cancellationToken)

let botConfig = { Funogram.Telegram.Bot.Config.defaultConfig with Token = telegramSettings.Token }

interface ITelegramClient with
member this.GetTemporaryFileLink(fileId) = async {
member this.GetFileInfo(fileId) = async {
let logger = ctx.Logger
logger.Information("Querying file information for file {FileId}", fileId)
let! file = Funogram.sendGetFile botConfig fileId
match file.FilePath with
| None ->
match file.FilePath, file.FileSize with
| None, None ->
logger.Warning("File {FileId} was not found on server", fileId)
return None
| Some fp ->
let uri = Uri $"https://api.telegram.org/file/bot{telegramSettings.Token}/{fp}"
return! fileCache.DownloadLink uri
| Some fp, Some sz ->
return Some {
TemporaryLink = Uri $"https://api.telegram.org/file/bot{telegramSettings.Token}/{fp}"
Size = Checked.uint64 sz
}
| x, y -> return failwith $"Unknown data received from Telegram server: {x}, {y}"
}

override _.RunUntilError receiver = async {
Expand Down
1 change: 1 addition & 0 deletions Emulsion.Tests/SettingsTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ let private testConfiguration = {
}
Database = None
Hosting = None
FileCache = None
}

let private mockConfiguration groupIdLiteral extendedJson =
Expand Down
9 changes: 4 additions & 5 deletions Emulsion.Tests/TestUtils/TelegramClientMock.fs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
namespace Emulsion.Tests.TestUtils

open System
open System.Collections.Generic

open Emulsion.Telegram

type TelegramClientMock() =
let responses = Dictionary<string, Uri>()
let responses = Dictionary<string, FileInfo option>()

interface ITelegramClient with
member this.GetTemporaryFileLink fileId = async.Return responses[fileId]
member this.GetFileInfo fileId = async.Return responses[fileId]

member _.SetResponse(fileId: string, uri: Uri): unit =
responses[fileId] <- uri
member _.SetResponse(fileId: string, fileInfo: FileInfo option): unit =
responses[fileId] <- fileInfo
9 changes: 7 additions & 2 deletions Emulsion.Tests/Web/ContentControllerTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ open Emulsion.ContentProxy
open Emulsion.Database
open Emulsion.Database.Entities
open Emulsion.Settings
open Emulsion.Telegram
open Emulsion.Tests.TestUtils
open Emulsion.Tests.TestUtils.Logging
open Emulsion.Web
Expand All @@ -35,7 +36,7 @@ type ContentControllerTests(output: ITestOutputHelper) =
use loggerFactory = new SerilogLoggerFactory(logger)
let logger = loggerFactory.CreateLogger<ContentController>()
use context = new EmulsionDbContext(databaseSettings.ContextOptions)
let controller = ContentController(logger, hostingSettings, telegramClient, context)
let controller = ContentController(logger, hostingSettings, telegramClient, None, context)
return! testAction controller
})
})
Expand Down Expand Up @@ -66,7 +67,11 @@ type ContentControllerTests(output: ITestOutputHelper) =
let fileId = "foobar"

let testLink = Uri "https://example.com/myFile"
telegramClient.SetResponse(fileId, testLink)
let testFileInfo = {
TemporaryLink = testLink
Size = 1UL
}
telegramClient.SetResponse(fileId, Some testFileInfo)

performTestWithPreparation (fun databaseOptions -> async {
use context = new EmulsionDbContext(databaseOptions.ContextOptions)
Expand Down
42 changes: 27 additions & 15 deletions Emulsion.Web/ContentController.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@

open System.Threading.Tasks

open Emulsion.Database.Entities
open Emulsion.Telegram
open Microsoft.AspNetCore.Mvc
open Microsoft.Extensions.Logging

open Emulsion.ContentProxy
open Emulsion.Database
open Emulsion.Database.Entities
open Emulsion.Settings
open Emulsion.Telegram

[<ApiController>]
[<Route("content")>]
type ContentController(logger: ILogger<ContentController>,
configuration: HostingSettings,
telegram: ITelegramClient,
fileCache: FileCache option,
context: EmulsionDbContext) =
inherit ControllerBase()

Expand All @@ -27,21 +28,32 @@ type ContentController(logger: ILogger<ContentController>,
logger.LogWarning(ex, "Error during hashId deserializing")
None

let produceRedirect contentId: Async<IActionResult option> = async {
let! content = ContentStorage.getById context contentId
match content with
| Some content ->
let! url = telegram.GetTemporaryFileLink content.FileId
return Some <| RedirectResult(url.ToString())
| None -> return None
}

[<HttpGet("{hashId}")>]
member this.Get(hashId: string): Task<IActionResult> = task {
match decodeHashId hashId with
| None -> return this.BadRequest()
| None ->
logger.LogWarning $"Cannot decode hash id: \"{hashId}\"."
return this.BadRequest()
| Some contentId ->
match! produceRedirect contentId with
| None -> return this.NotFound() :> IActionResult
| Some redirect -> return redirect
match! ContentStorage.getById context contentId with
| None ->
logger.LogWarning $"Content \"{contentId}\" not found in content storage."
return this.NotFound() :> IActionResult
| Some content ->
match fileCache with
| None ->
let link = $"https://t.me/{content.ChatUserName}/{string content.MessageId}"
return RedirectResult link
| Some cache ->
match! telegram.GetFileInfo content.FileId with
| None ->
logger.LogWarning $"File \"{content.FileId}\" could not be found on Telegram server."
return this.NotFound() :> IActionResult
| Some fileInfo ->
match! cache.Download(fileInfo.TemporaryLink, content.FileId, fileInfo.Size) with
| None ->
logger.LogWarning $"Link \"{fileInfo}\" could not be downloaded."
return this.NotFound() :> IActionResult
| Some stream ->
return FileStreamResult(stream, "application/octet-stream")
}

0 comments on commit 52e8715

Please sign in to comment.