Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Add another truncation case #3075

Merged
merged 5 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/ApiService/ApiService/OneFuzzTypes/Events.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,25 @@ public record EventNotificationFailed(
Error? Error
) : BaseEvent();

public record DownloadableEventMessage : EventMessage {
public record DownloadableEventMessage : EventMessage, ITruncatable<DownloadableEventMessage> {
public Uri SasUrl { get; init; }

public DownloadableEventMessage(Guid EventId, EventType EventType, BaseEvent Event, Guid InstanceId, string InstanceName, DateTime CreatedAt, Uri SasUrl)
: base(EventId, EventType, Event, InstanceId, InstanceName, CreatedAt) {
this.SasUrl = SasUrl;
}

public override DownloadableEventMessage Truncate(int maxLength) {
if (this.Event is ITruncatable<BaseEvent> truncatableEvent) {
return this with {
Event = truncatableEvent.Truncate(maxLength)
};
} else {
return this;
}
}
}

public record EventMessage(
Guid EventId,
EventType EventType,
Expand All @@ -366,7 +377,17 @@ public record EventMessage(
String InstanceName,
DateTime CreatedAt,
String Version = "1.0"
);
) : ITruncatable<EventMessage> {
public virtual EventMessage Truncate(int maxLength) {
if (this.Event is ITruncatable<BaseEvent> truncatableEvent) {
return this with {
Event = truncatableEvent.Truncate(maxLength)
};
} else {
return this;
}
}
}

public class BaseEventConverter : JsonConverter<BaseEvent> {
public override BaseEvent? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) {
Expand Down
19 changes: 16 additions & 3 deletions src/ApiService/ApiService/onefuzzlib/Events.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ public record SignalREvent
(
string Target,
List<DownloadableEventMessage> arguments
);

) : ITruncatable<SignalREvent> {
public SignalREvent Truncate(int maxLength) {
return this with {
arguments = arguments.Select(x => x.Truncate(maxLength)).ToList()
};
}
}

public interface IEvents {
Async.Task SendEvent(BaseEvent anEvent);
Expand Down Expand Up @@ -47,8 +52,16 @@ public Events(ILogTracer log, IOnefuzzContext context) {
}

public virtual async Async.Task QueueSignalrEvent(DownloadableEventMessage message) {
var tags = new (string, string)[] {
("event_type", message.EventType.ToString()),
("event_id", message.EventId.ToString())
};
var ev = new SignalREvent("events", new List<DownloadableEventMessage>() { message });
await _queue.SendMessage("signalr-events", JsonSerializer.Serialize(ev, _options), StorageType.Config);
var queueResult = await _queue.QueueObject("signalr-events", ev, StorageType.Config, serializerOptions: _options);

if (!queueResult) {
_log.WithTags(tags).Error($"Fsailed to queue signalr event");
}
}

public async Async.Task SendEvent(BaseEvent anEvent) {
Expand Down
37 changes: 27 additions & 10 deletions src/ApiService/ApiService/onefuzzlib/Queue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace Microsoft.OneFuzz.Service;
public interface IQueue {
Async.Task SendMessage(string name, string message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null);
Async.Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null);
Async.Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null, JsonSerializerOptions? serializerOptions = null);
Task<Uri> GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration = null);
ResourceIdentifier GetResourceId(string queueName, StorageType storageType);
Task<IList<T>> PeekQueue<T>(string name, StorageType storageType);
Expand Down Expand Up @@ -57,23 +57,40 @@ public async Task<QueueClient> GetQueueClient(string name, StorageType storageTy
public Task<QueueServiceClient> GetQueueClientService(StorageType storageType)
=> _storage.GetQueueServiceClientForAccount(_storage.GetPrimaryAccount(storageType));

public async Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) {
public async Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null, JsonSerializerOptions? serializerOptions = null) {
var queueClient = await GetQueueClient(name, storageType);
serializerOptions ??= EntityConverter.GetJsonSerializerOptions();
try {
var serialized = JsonSerializer.Serialize(obj, EntityConverter.GetJsonSerializerOptions());
var res = await queueClient.SendMessageAsync(serialized, visibilityTimeout: visibilityTimeout, timeToLive);
if (res.GetRawResponse().IsError) {
_log.Error($"Failed to send {serialized:Tag:Message} in {name:Tag:QueueName} due to {res.GetRawResponse().ReasonPhrase:Tag:Error}");
return false;
} else {
return true;
}
return await QueueObjectInternal(obj, queueClient, serializerOptions, visibilityTimeout, timeToLive);
} catch (Exception ex) {
_log.Exception(ex, $"Failed to queue message in {name:Tag:QueueName}");
if (IsMessageTooLargeException(ex) &&
obj is ITruncatable<T> truncatable) {
obj = truncatable.Truncate(1000);
try {
return await QueueObjectInternal(obj, queueClient, serializerOptions, visibilityTimeout, timeToLive);
} catch (Exception ex2) {
_log.Exception(ex2, $"Failed to queue message in {name:Tag:QueueName} after truncation");
}
}
return false;
}
}

private async Task<bool> QueueObjectInternal<T>(T obj, QueueClient queueClient, JsonSerializerOptions serializerOptions, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) {
var serialized = JsonSerializer.Serialize(obj, serializerOptions);
var res = await queueClient.SendMessageAsync(serialized, visibilityTimeout: visibilityTimeout, timeToLive);
if (res.GetRawResponse().IsError) {
_log.Error($"Failed to send {serialized:Tag:Message} in {queueClient.Name:Tag:QueueName} due to {res.GetRawResponse().ReasonPhrase:Tag:Error}");
return false;
} else {
return true;
}
}

private static bool IsMessageTooLargeException(Exception ex) =>
ex is RequestFailedException rfe && rfe.Message.Contains("The request body is too large");

public async Task<Uri> GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration) {
var queueClient = await GetQueueClient(name, storageType);
var now = DateTimeOffset.UtcNow;
Expand Down
16 changes: 1 addition & 15 deletions src/ApiService/ApiService/onefuzzlib/WebhookOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Text.Json;
using System.Threading.Tasks;
using ApiService.OneFuzzLib.Orm;
using Azure;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;

namespace Microsoft.OneFuzz.Service;
Expand Down Expand Up @@ -61,20 +60,7 @@ private async Async.Task AddEvent(Webhook webhook, DownloadableEventMessage even
}
}

try {
await _context.WebhookMessageLogOperations.QueueWebhook(message);
} catch (RequestFailedException ex) {
if (ex.Message.Contains("The request body is too large") && eventMessage.Event is ITruncatable<BaseEvent> truncatableEvent) {
_logTracer.WithTags(tags).Warning($"The WebhookMessageLog was too long for Azure Queue. Truncating event data and trying again.");
message = message with {
Event = truncatableEvent.Truncate(1000)
};
await _context.WebhookMessageLogOperations.QueueWebhook(message);
} else {
// Not handled
throw ex;
}
}
await _context.WebhookMessageLogOperations.QueueWebhook(message);
}

public async Async.Task<OneFuzzResultVoid> Send(WebhookMessageLog messageLog) {
Expand Down