-
Notifications
You must be signed in to change notification settings - Fork 0
/
RestApiSourceExtensions.cs
56 lines (51 loc) · 1.99 KB
/
RestApiSourceExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Akka.Streams;
using Akka.Streams.Dsl;
using Arcane.Framework.Contracts;
using Arcane.Framework.Services.Base;
using Arcane.Framework.Sinks.Json;
using Arcane.Framework.Sources.RestApi;
using Parquet.Data;
using Snd.Sdk.Helpers;
using Snd.Sdk.Metrics.Base;
using Snd.Sdk.Storage.Base;
namespace Arcane.Stream.RestApi.Extensions;
public static class RestApiSourceExtensions
{
public static IRunnableGraph<(UniqueKillSwitch, Task)> BuildGraph(this RestApiSource source,
MetricsService metricsService,
IBlobStorageWriter blobStorageWriter,
string sinkLocation,
IStreamContext context,
int rowsPerGroup,
TimeSpan groupingInterval)
{
var dimensions = source.GetDefaultTags().GetAsDictionary(context, context.StreamId);
var jsonSink = context.MultilineJsonSinkFromContext(source.GetParquetSchema(), blobStorageWriter, sinkLocation);
return Source.FromGraph(source)
.GroupedWithin(rowsPerGroup, groupingInterval)
.Select(grp =>
{
var rows = grp.ToList();
metricsService.Increment(DeclaredMetrics.ROWS_INCOMING, dimensions, rows.Count);
return rows;
})
.Log(context.StreamKind)
.ViaMaterialized(KillSwitches.Single<List<JsonElement>>(), Keep.Right)
.ToMaterialized(jsonSink, Keep.Both);
}
public static MultilineJsonSink MultilineJsonSinkFromContext(this IStreamContext streamContext, Schema schema, IBlobStorageWriter blobStorageWriter, string sinkLocation)
{
var jsonSink = MultilineJsonSink.Create(
blobStorageWriter,
$"{sinkLocation}/{streamContext.StreamId}",
schema,
streamContext.IsBackfilling ? "backfill" : "data",
dropCompletionToken: streamContext.IsBackfilling);
return jsonSink;
}
}