-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathProgram.cs
127 lines (117 loc) · 4.89 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Serilog;
using Serilog.Events;
//using Serilog.Formatting.Compact;
using Serilog.Formatting.Json;
using Microsoft.Extensions.DependencyInjection;
using Amazon.SQS;
using Amazon.SQS.Model;
namespace TodoApi
{
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IAmazonSQS _sqs;
private readonly IConfiguration _configuration;
public Worker(ILogger<Worker> logger, IAmazonSQS sqs, IConfiguration configuration)
{
_logger = logger;
_sqs = sqs;
_configuration = configuration;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
_logger.LogDebug("Long-polling SQS messages...");
try
{
var request = new ReceiveMessageRequest()
{
QueueUrl = _configuration.GetSection("queues")["main"],
WaitTimeSeconds = 20
};
var result = await _sqs.ReceiveMessageAsync(request, stoppingToken);
if (result.Messages.Any())
{
foreach (var message in result.Messages)
{
_logger.LogInformation("SQS message received: {msg}", message.Body);
await _sqs.DeleteMessageAsync(new DeleteMessageRequest() {
QueueUrl = request.QueueUrl,
ReceiptHandle = message.ReceiptHandle
}, stoppingToken);
}
}
}
catch (System.UriFormatException e)
{
throw e; //aborts app with ... "Level":"Fatal","MessageTemplate":"Application start-up failed","Exception":"System.UriFormatException: Invalid URI: ...
}
catch (System.Threading.Tasks.TaskCanceledException e)
{
_logger.LogInformation("SQS poller canceled: {exception}", e);
}
catch (Exception e) //FIXME this is ok during shutdown: System.Threading.Tasks.TaskCanceledException: The operation was canceled.\n ---> System.IO.IOException: Unable to read data from the transport connection: Operation canceled.\n ---> System.Net.Sockets.SocketException (125): Operation canceled
{
_logger.LogError("SQS polling- or message processing issue (re-trying in 10s): {exception}", e); //FIXME no blind retry in prod ;-)
await Task.Delay(10000, stoppingToken);
}
}
}
}
public class Program
{
public static void Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.Enrich.FromLogContext()
.MinimumLevel.Override("Microsoft.AspNetCore", LogEventLevel.Warning)
.WriteTo.Console(new JsonFormatter()) //.WriteTo.Console(new RenderedCompactJsonFormatter())
.CreateLogger();
try
{
Log.Information("Starting up");
CreateHostBuilder(args).Build().Run();
}
catch (Exception ex)
{
Log.Fatal(ex, "Application start-up failed");
}
finally
{
Log.CloseAndFlush();
}
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseSerilog()
.ConfigureServices(services =>
{
services.AddAWSService<IAmazonSQS>();
services.AddHostedService<Worker>();
/* services.AddCors(options =>
{
options.AddDefaultPolicy(builder =>
{
builder.WithOrigins("https://web.dev.zuehlke.p.iraten.ch") //FIXME hard-coded
.AllowAnyHeader()
.AllowAnyMethod()
.AllowCredentials();
});
}); */
})
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
}