-
Notifications
You must be signed in to change notification settings - Fork 0
/
BlobStorageGraphBuilder.cs
115 lines (103 loc) · 4.89 KB
/
BlobStorageGraphBuilder.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
using System;
using System.Threading.Tasks;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Supervision;
using Arcane.Framework.Services.Base;
using Arcane.Framework.Sources.BlobStorage;
using Arcane.Stream.BlobStorage.Exceptions;
using Arcane.Stream.BlobStorage.Extensions;
using Arcane.Stream.BlobStorage.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Snd.Sdk.Storage.Base;
using Snd.Sdk.Storage.Models.Base;
using Snd.Sdk.Storage.Models.BlobPath;
using Snd.Sdk.Tasks;
namespace Arcane.Stream.BlobStorage.GraphBuilder;
public class BlobStorageGraphBuilder : IStreamGraphBuilder<BlobStorageStreamContext>
{
private readonly IBlobStorageListService sourceBlobListStorageService;
private readonly IBlobStorageWriter targetBlobStorageService;
private readonly IBlobStorageWriter sourceBlobStorageWriter;
private readonly IBlobStorageReader sourceBlobStorageReader;
private readonly ILogger<BlobStorageGraphBuilder> logger;
public BlobStorageGraphBuilder(
IBlobStorageListService sourceBlobStorageService,
IBlobStorageReader sourceBlobStorageReader,
[FromKeyedServices(StorageType.SOURCE)] IBlobStorageWriter sourceBlobStorageWriter,
[FromKeyedServices(StorageType.TARGET)] IBlobStorageWriter targetBlobStorageService,
ILogger<BlobStorageGraphBuilder> logger)
{
this.sourceBlobListStorageService = sourceBlobStorageService;
this.sourceBlobStorageReader = sourceBlobStorageReader;
this.sourceBlobStorageWriter = sourceBlobStorageWriter;
this.targetBlobStorageService = targetBlobStorageService;
this.logger = logger;
}
public IRunnableGraph<(UniqueKillSwitch, Task)> BuildGraph(BlobStorageStreamContext context)
{
if (!AmazonS3StoragePath.IsAmazonS3Path(context.SourcePath))
{
throw new ConfigurationException("Source path is invalid, only Amazon S3 paths are supported");
}
if (!AmazonS3StoragePath.IsAmazonS3Path(context.TargetPath))
{
throw new ConfigurationException("Target path is invalid, only Amazon S3 paths are supported");
}
var parsedSourcePath = new AmazonS3StoragePath(context.SourcePath);
var parsedTargetPath = new AmazonS3StoragePath(context.TargetPath);
var source = BlobStorageSource.Create(
context.SourcePath,
this.sourceBlobListStorageService,
context.ChangeCaptureInterval);
return Source.FromGraph(source)
.Throttle(context.ElementsPerSecond, TimeSpan.FromSeconds(1), context.RequestThrottleBurst,
ThrottleMode.Shaping)
.SelectAsync(context.ReadParallelism, b => this.GetBlobContentAsync(parsedSourcePath, b))
.SelectAsync(context.WriteParallelism, b => this.SaveBlobContentAsync(parsedTargetPath, b))
.ViaMaterialized(KillSwitches.Single<(IStoragePath, string)>(), Keep.Right)
.ToMaterialized(context.GetSink(this.RemoveSource), Keep.Both)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(DecideOnFailure));
}
private Task<(IStoragePath, string, BinaryData)> GetBlobContentAsync(IStoragePath rootPath, string blobPath)
{
this.logger.LogInformation("Reading blob content from {BlobPath}", rootPath.Join(blobPath).ToHdfsPath());
return this.sourceBlobStorageReader
.GetBlobContentAsync(rootPath.ToHdfsPath(), blobPath, data => data)
.Map(data =>
{
if (data == null)
{
throw new ProcessingException(rootPath, blobPath);
}
return (rootPath, blobPath, data);
});
}
private Task<(IStoragePath, string)> SaveBlobContentAsync(IStoragePath targetPath, (IStoragePath, string, BinaryData) writeRequest)
{
var (rootPath, blobName, data) = writeRequest;
this.logger.LogInformation("Saving blob content to {BlobPath}", targetPath.Join(blobName).ToHdfsPath());
return this.targetBlobStorageService
.SaveBytesAsBlob(data, targetPath.ToHdfsPath(), blobName, overwrite: true)
.Map(_ => (rootPath, blobName));
}
private async Task RemoveSource((IStoragePath, string) deleteRequest)
{
var (sourceRoot, sourceBlobName) = deleteRequest;
this.logger.LogInformation("Removing blob content from {BlobPath}", sourceRoot.Join(sourceBlobName).ToHdfsPath());
var res = await this.sourceBlobStorageWriter.RemoveBlob(sourceRoot.ToHdfsPath(), sourceBlobName);
if (!res)
{
throw new SinkException($"Failed to remove blob {sourceBlobName} from {sourceRoot}");
}
}
private static Directive DecideOnFailure(Exception ex)
{
return ex switch
{
ProcessingException => Directive.Resume,
_ => Directive.Stop
};
}
}