Skip to content

Commit

Permalink
1.0.8: EdgeAgent: Get status of logs upload request (#1392)
Browse files Browse the repository at this point in the history
* Cherry pick 4e8e005

* Update TwinConfigSourceModule.cs
  • Loading branch information
varunpuranik authored Jun 28, 2019
1 parent 49d8655 commit e7876eb
Show file tree
Hide file tree
Showing 12 changed files with 560 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public struct AgentEventIds
public const int KubernetesOperator = EventIdStart + 2400;
public const int LogsUploadRequestHandler = EventIdStart + 2500;
public const int ModuleClientProvider = EventIdStart + 2600;
public const int TaskStatusRequestHandler = EventIdStart + 2800;
const int EventIdStart = 100000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Requests
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;

public class LogsUploadRequestHandler : RequestHandlerBase<LogsUploadRequest, object>
public class LogsUploadRequestHandler : RequestHandlerBase<LogsUploadRequest, TaskStatusResponse>
{
static readonly Version ExpectedSchemaVersion = new Version("1.0");

Expand All @@ -29,7 +29,7 @@ public LogsUploadRequestHandler(ILogsUploader logsUploader, ILogsProvider logsPr

public override string RequestName => "UploadLogs";

protected override async Task<Option<object>> HandleRequestInternal(Option<LogsUploadRequest> payloadOption, CancellationToken cancellationToken)
protected override async Task<Option<TaskStatusResponse>> HandleRequestInternal(Option<LogsUploadRequest> payloadOption, CancellationToken cancellationToken)
{
LogsUploadRequest payload = payloadOption.Expect(() => new ArgumentException("Request payload not found"));
if (ExpectedSchemaVersion.CompareMajorVersion(payload.SchemaVersion, "logs upload request schema") != 0)
Expand All @@ -47,8 +47,22 @@ protected override async Task<Option<object>> HandleRequestInternal(Option<LogsU
false);
IList<(string id, ModuleLogOptions logOptions)> logOptionsList = await requestToOptionsMapper.MapToLogOptions(payload.Items, cancellationToken);
IEnumerable<Task> uploadLogsTasks = logOptionsList.Select(l => this.UploadLogs(payload.SasUrl, l.id, l.logOptions, cancellationToken));
await Task.WhenAll(uploadLogsTasks);
return Option.None<object>();
(string correlationId, BackgroundTaskStatus status) = BackgroundTask.Run(
() =>
{
try
{
return Task.WhenAll(uploadLogsTasks);
}
catch (Exception e)
{
Events.ErrorUploadingLogs(e);
throw;
}
},
"upload logs",
cancellationToken);
return Option.Some(TaskStatusResponse.Create(correlationId, status));
}

async Task UploadLogs(string sasUrl, string id, ModuleLogOptions moduleLogOptions, CancellationToken token)
Expand All @@ -63,6 +77,8 @@ async Task UploadLogs(string sasUrl, string id, ModuleLogOptions moduleLogOption
Func<ArraySegment<byte>, Task> uploaderCallback = await this.logsUploader.GetUploaderCallback(sasUrl, id, moduleLogOptions.ContentEncoding, moduleLogOptions.ContentType);
await this.logsProvider.GetLogsStream(id, moduleLogOptions, uploaderCallback, token);
}

Events.UploadLogsFinished(id);
}

static class Events
Expand All @@ -73,7 +89,9 @@ static class Events
enum EventIds
{
MismatchedMinorVersions = IdStart,
ProcessingRequest
ProcessingRequest,
UploadLogsFinished,
ErrorUploadingLogs
}

public static void MismatchedMinorVersions(string payloadSchemaVersion, Version expectedSchemaVersion)
Expand All @@ -85,6 +103,16 @@ public static void ProcessingRequest(LogsUploadRequest payload)
{
Log.LogInformation((int)EventIds.ProcessingRequest, $"Processing request to upload logs for {payload.ToJson()}");
}

public static void UploadLogsFinished(string id)
{
Log.LogInformation((int)EventIds.UploadLogsFinished, $"Finished uploading logs for module {id}");
}

public static void ErrorUploadingLogs(Exception ex)
{
Log.LogInformation((int)EventIds.ErrorUploadingLogs, ex, "Error uploading logs");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Requests
{
using Microsoft.Azure.Devices.Edge.Util;

public class TaskStatusRequest
{
public TaskStatusRequest(string schemaVersion, string correlationId)
{
this.SchemaVersion = Preconditions.CheckNonWhiteSpace(schemaVersion, nameof(schemaVersion));
this.CorrelationId = Preconditions.CheckNonWhiteSpace(correlationId, nameof(correlationId));
}

public string SchemaVersion { get; }

public string CorrelationId { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Requests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Storage;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;

public class TaskStatusRequestHandler : RequestHandlerBase<TaskStatusRequest, TaskStatusResponse>
{
static readonly Version ExpectedSchemaVersion = new Version("1.0");

public override string RequestName => "GetTaskStatus";

protected override Task<Option<TaskStatusResponse>> HandleRequestInternal(Option<TaskStatusRequest> payloadOption, CancellationToken cancellationToken)
{
TaskStatusRequest payload = payloadOption.Expect(() => new ArgumentException("Request payload not found"));
if (ExpectedSchemaVersion.CompareMajorVersion(payload.SchemaVersion, "logs upload request schema") != 0)
{
Events.MismatchedMinorVersions(payload.SchemaVersion, ExpectedSchemaVersion);
}

BackgroundTaskStatus backgroundTaskStatus = BackgroundTask.GetStatus(payload.CorrelationId);
Events.ProcessingRequest(payload, backgroundTaskStatus);
return Task.FromResult(Option.Some(TaskStatusResponse.Create(payload.CorrelationId, backgroundTaskStatus)));
}

static class Events
{
const int IdStart = AgentEventIds.TaskStatusRequestHandler;
static readonly ILogger Log = Logger.Factory.CreateLogger<TaskStatusRequestHandler>();

enum EventIds
{
MismatchedMinorVersions = IdStart,
ProcessingRequest
}

public static void MismatchedMinorVersions(string payloadSchemaVersion, Version expectedSchemaVersion)
{
Log.LogWarning((int)EventIds.MismatchedMinorVersions, $"Logs upload request schema version {payloadSchemaVersion} does not match expected schema version {expectedSchemaVersion}. Some settings may not be supported.");
}

public static void ProcessingRequest(TaskStatusRequest payload, BackgroundTaskStatus backgroundTaskStatus)
{
Log.LogInformation((int)EventIds.ProcessingRequest, $"Handling status request for task {payload.CorrelationId} - {backgroundTaskStatus.ToJson()}");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Requests
{
using Microsoft.Azure.Devices.Edge.Util;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;

public class TaskStatusResponse
{
public static TaskStatusResponse Create(string correlationId, BackgroundTaskStatus backgroundTaskStatus)
{
Preconditions.CheckNotNull(backgroundTaskStatus, nameof(backgroundTaskStatus));
string message = string.Empty;
if (backgroundTaskStatus.Status == BackgroundTaskRunStatus.Failed)
{
message = backgroundTaskStatus.Exception.Match(
e => $"Task {backgroundTaskStatus.Operation} failed because of error {e.Message}",
() => $"Task {backgroundTaskStatus.Operation} failed with no error");
}

return new TaskStatusResponse(correlationId, backgroundTaskStatus.Status, message);
}

public TaskStatusResponse(string correlationId, BackgroundTaskRunStatus status, string message)
{
this.CorrelationId = Preconditions.CheckNonWhiteSpace(correlationId, nameof(correlationId));
this.Status = status;
this.Message = Preconditions.CheckNotNull(message, nameof(message));
}

[JsonProperty("status")]
[JsonConverter(typeof(StringEnumConverter))]
public BackgroundTaskRunStatus Status { get; }

[JsonProperty("message")]
public string Message { get; }

[JsonProperty("correlationId")]
public string CorrelationId { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ protected override void Load(ContainerBuilder builder)
var requestHandlers = new List<IRequestHandler>
{
new PingRequestHandler(),
new LogsUploadRequestHandler(logsUploader, logsProvider, runtimeInfoProvider)
new LogsUploadRequestHandler(logsUploader, logsProvider, runtimeInfoProvider),
new TaskStatusRequestHandler()
};
return new RequestManager(requestHandlers, this.requestTimeout) as IRequestManager;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ public async Task TestLogsUploadRequest(string payload, string id, string sasUrl
Option<string> response = await logsUploadRequestHandler.HandleRequest(Option.Maybe(payload), CancellationToken.None);

// Assert
Assert.False(response.HasValue);
Assert.True(response.HasValue);
var taskStatusResponse = response.OrDefault().FromJson<TaskStatusResponse>();
Assert.NotNull(taskStatusResponse);
Assert.NotEmpty(taskStatusResponse.CorrelationId);
Assert.Equal(string.Empty, taskStatusResponse.Message);

await WaitForBackgroundTaskCompletion(taskStatusResponse.CorrelationId).TimeoutAfter(TimeSpan.FromSeconds(5));

logsProvider.VerifyAll();
logsUploader.VerifyAll();
Mock.Get(runtimeInfoProvider).VerifyAll();
Expand Down Expand Up @@ -151,12 +158,33 @@ public async Task TestLogsUploadAllTaskRequest()
Option<string> response = await logsUploadRequestHandler.HandleRequest(Option.Maybe(payload), CancellationToken.None);

// Assert
Assert.False(response.HasValue);
Assert.True(response.HasValue);
var taskStatusResponse = response.OrDefault().FromJson<TaskStatusResponse>();
Assert.NotNull(taskStatusResponse);
Assert.NotEmpty(taskStatusResponse.CorrelationId);
Assert.Equal(string.Empty, taskStatusResponse.Message);

await WaitForBackgroundTaskCompletion(taskStatusResponse.CorrelationId).TimeoutAfter(TimeSpan.FromSeconds(5));

logsProvider.VerifyAll();
logsUploader.VerifyAll();
runtimeInfoProvider.VerifyAll();
}

static async Task WaitForBackgroundTaskCompletion(string correlationId)
{
while (true)
{
BackgroundTaskStatus status = BackgroundTask.GetStatus(correlationId);
if (status.Status != BackgroundTaskRunStatus.Running)
{
break;
}

await Task.Delay(TimeSpan.FromSeconds(1));
}
}

[Theory]
[InlineData(@"{""schemaVersion"":""2.0"",""sasUrl"":""dummyUrl"",""items"":{""id"":""edgeAgent""},""encoding"":""gzip""}", typeof(InvalidSchemaVersionException))]
[InlineData(@"{""schemaVersion"":""0.0"",""sasUrl"":""dummyUrl"",""items"":{""id"":""edgeAgent""},""encoding"":""gzip""}", typeof(InvalidSchemaVersionException))]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Test.Requests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Agent.Core.Logs;
using Microsoft.Azure.Devices.Edge.Agent.Core.Requests;
using Microsoft.Azure.Devices.Edge.Storage;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Test.Common;
using Moq;
using Xunit;

[Unit]
public class TaskStatusRequestHandlerTest
{
[Fact]
public async Task SmokeTest()
{
async Task TestTask()
{
await Task.Delay(TimeSpan.FromSeconds(3));
}

(string correlationId, BackgroundTaskStatus backgroundTaskStatus) = BackgroundTask.Run(TestTask, "test", CancellationToken.None);

string payload = @"{
""schemaVersion"": ""1.0"",
""correlationId"": ""<correlationId>""
}".Replace("<correlationId>", correlationId);
var taskStatusRequestHandler = new TaskStatusRequestHandler();
Option<string> response = await taskStatusRequestHandler.HandleRequest(Option.Some(payload), CancellationToken.None);

Assert.True(response.HasValue);
TaskStatusResponse taskStatusResponse = response.OrDefault().FromJson<TaskStatusResponse>();
Assert.NotNull(taskStatusResponse);
Assert.Equal(taskStatusResponse.CorrelationId, correlationId);
}

[Fact]
public async Task InvalidInputsTest()
{
var taskStatusRequestHandler = new TaskStatusRequestHandler();
await Assert.ThrowsAsync<ArgumentException>(() => taskStatusRequestHandler.HandleRequest(Option.None<string>(), CancellationToken.None));

string payload1 = @"{
""schemaVersion"": ""2.0"",
""correlationId"": ""1234""
}";
await Assert.ThrowsAsync<InvalidSchemaVersionException>(() => taskStatusRequestHandler.HandleRequest(Option.Some(payload1), CancellationToken.None));

string payload2 = @"{
""schemaVersion"": ""1.0"",
""correlationId"": """"
}";
await Assert.ThrowsAsync<ArgumentException>(() => taskStatusRequestHandler.HandleRequest(Option.Some(payload2), CancellationToken.None));
}
}
}
71 changes: 71 additions & 0 deletions edge-util/src/Microsoft.Azure.Devices.Edge.Util/BackgroundTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Util
{
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

public static class BackgroundTask
{
static readonly ConcurrentDictionary<string, BackgroundTaskStatus> TaskStatuses = new ConcurrentDictionary<string, BackgroundTaskStatus>();

public static (string correlationId, BackgroundTaskStatus backgroundTaskStatus) Run(Func<Task> task, string operation, CancellationToken cancellationToken)
{
BackgroundTaskStatus backgroundTaskStatus = new BackgroundTaskStatus(BackgroundTaskRunStatus.Running, operation);
string correlationId = AddNewTask(backgroundTaskStatus);
Task.Run(
() => task().ContinueWith(
t =>
{
BackgroundTaskStatus GetNewStatus()
{
switch (t.Status)
{
case TaskStatus.Faulted:
Exception exception = t.Exception is AggregateException aggregateException
? aggregateException.InnerException
: t.Exception;
return new BackgroundTaskStatus(BackgroundTaskRunStatus.Failed, operation, Option.Some(exception));

case TaskStatus.Canceled:
return new BackgroundTaskStatus(BackgroundTaskRunStatus.Cancelled, operation);

case TaskStatus.RanToCompletion:
return new BackgroundTaskStatus(BackgroundTaskRunStatus.Completed, operation);

default:
return new BackgroundTaskStatus(BackgroundTaskRunStatus.Unknown, operation);
}
}

BackgroundTaskStatus newStatus = GetNewStatus();
if (!TaskStatuses.TryUpdate(correlationId, newStatus, backgroundTaskStatus))
{
// This should never happen.
BackgroundTaskStatus currentTask = GetStatus(correlationId);
throw new InvalidOperationException($"Failed to update background task status to - {newStatus}. Current task = {currentTask}");
}
}, cancellationToken),
cancellationToken);
return (correlationId, backgroundTaskStatus);
}

static string AddNewTask(BackgroundTaskStatus backgroundTaskStatus)
{
while (true)
{
var correlationId = Guid.NewGuid().ToString();
if (TaskStatuses.TryAdd(correlationId, backgroundTaskStatus))
{
return correlationId;
}
}
}

public static BackgroundTaskStatus GetStatus(string correlationId) =>
TaskStatuses.TryGetValue(correlationId, out BackgroundTaskStatus status)
? status
: new BackgroundTaskStatus(BackgroundTaskRunStatus.Unknown, string.Empty);
}
}
Loading

0 comments on commit e7876eb

Please sign in to comment.