Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[in-proc backport] [1ES] Migrate Diagnostic Events to Azure.Data.Tables (#10167) #10218

Merged
merged 2 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/WebJobs.Script.WebHost/Diagnostics/DiagnosticEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;
using Microsoft.Azure.Cosmos.Table;
using System.Runtime.Serialization;
using Azure;
using Azure.Data.Tables;
using Microsoft.Azure.WebJobs.Script.WebHost.Helpers;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Script.WebHost.Diagnostics
{
public class DiagnosticEvent : TableEntity
public class DiagnosticEvent : ITableEntity
{
internal const string CurrentEventVersion = "2024-05-01";

Expand All @@ -23,6 +24,14 @@ public DiagnosticEvent(string hostId, DateTime timestamp)
EventVersion = CurrentEventVersion;
}

public string PartitionKey { get; set; }

public string RowKey { get; set; }

public DateTimeOffset? Timestamp { get; set; }

public ETag ETag { get; set; }

public string EventVersion { get; set; }

public int HitCount { get; set; }
Expand All @@ -35,7 +44,7 @@ public DiagnosticEvent(string hostId, DateTime timestamp)

public int Level { get; set; }

[IgnoreProperty]
[IgnoreDataMember]
public LogLevel LogLevel
{
get { return (LogLevel)Level; }
Expand All @@ -44,4 +53,4 @@ public LogLevel LogLevel

public string Details { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Table;
using Azure.Data.Tables;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Hosting;
using Microsoft.Azure.WebJobs.Logging;
Expand All @@ -33,8 +33,8 @@ public class DiagnosticEventTableStorageRepository : IDiagnosticEventRepository,
private readonly object _syncLock = new object();

private ConcurrentDictionary<string, DiagnosticEvent> _events = new ConcurrentDictionary<string, DiagnosticEvent>();
private CloudTableClient _tableClient;
private CloudTable _diagnosticEventsTable;
private TableServiceClient _tableClient;
private TableClient _diagnosticEventsTable;
private string _hostId;
private bool _disposed = false;
private bool _purged = false;
Expand All @@ -55,22 +55,21 @@ public DiagnosticEventTableStorageRepository(IConfiguration configuration, IHost
ILogger<DiagnosticEventTableStorageRepository> logger)
: this(configuration, hostIdProvider, environment, scriptHost, logger, LogFlushInterval) { }

internal CloudTableClient TableClient
internal TableServiceClient TableClient
{
get
{
if (!_environment.IsPlaceholderModeEnabled() && _tableClient == null)
{
string storageConnectionString = _configuration.GetWebJobsConnectionString(ConnectionStringNames.Storage);
if (!string.IsNullOrEmpty(storageConnectionString)
&& CloudStorageAccount.TryParse(storageConnectionString, out CloudStorageAccount account))

try
{
var tableClientConfig = new TableClientConfiguration();
_tableClient = new CloudTableClient(account.TableStorageUri, account.Credentials, tableClientConfig);
_tableClient = new TableServiceClient(storageConnectionString);
}
else
catch (Exception ex)
{
_logger.LogError("Azure Storage connection string is empty or invalid. Unable to write diagnostic events.");
_logger.LogError(ex, "Azure Storage connection string is empty or invalid. Unable to write diagnostic events.");
}
}

Expand All @@ -92,7 +91,7 @@ internal string HostId

internal ConcurrentDictionary<string, DiagnosticEvent> Events => _events;

internal CloudTable GetDiagnosticEventsTable(DateTime? now = null)
internal TableClient GetDiagnosticEventsTable(DateTime? now = null)
{
if (TableClient != null)
{
Expand All @@ -103,7 +102,7 @@ internal CloudTable GetDiagnosticEventsTable(DateTime? now = null)
if (_diagnosticEventsTable == null || currentTableName != _tableName)
{
_tableName = currentTableName;
_diagnosticEventsTable = TableClient.GetTableReference(_tableName);
_diagnosticEventsTable = TableClient.GetTableClient(_tableName);
}
}

Expand Down Expand Up @@ -134,31 +133,27 @@ await Utility.InvokeWithRetriesAsync(async () =>

foreach (var table in tables)
{
var tableRecords = await table.ExecuteQuerySegmentedAsync(new TableQuery<DiagnosticEvent>(), null);

// Skip tables that have 0 records
if (tableRecords.Results.Count == 0)
{
continue;
}

// Delete table if it doesn't have records with EventVersion
var eventVersionDoesNotExists = tableRecords.Results.Any(record => string.IsNullOrEmpty(record.EventVersion) == true);
if (eventVersionDoesNotExists)
{
_logger.LogDebug("Deleting table '{tableName}' as it contains records without an EventVersion.", table.Name);
await table.DeleteIfExistsAsync();
tableDeleted = true;
continue;
}
var tableQuery = table.QueryAsync<DiagnosticEvent>(cancellationToken: default);

// If the table does have EventVersion, query if it is an outdated version
var eventVersionOutdated = tableRecords.Results.Any(record => string.Compare(DiagnosticEvent.CurrentEventVersion, record.EventVersion, StringComparison.Ordinal) > 0);
if (eventVersionOutdated)
await foreach (var record in tableQuery)
{
_logger.LogDebug("Deleting table '{tableName}' as it contains records with an outdated EventVersion.", table.Name);
await table.DeleteIfExistsAsync();
tableDeleted = true;
// Delete table if it doesn't have records with EventVersion
if (string.IsNullOrEmpty(record.EventVersion) == true)
{
_logger.LogDebug("Deleting table '{tableName}' as it contains records without an EventVersion.", table.Name);
await table.DeleteAsync();
tableDeleted = true;
break;
}

// If the table does have EventVersion, query if it is an outdated version
if (string.Compare(DiagnosticEvent.CurrentEventVersion, record.EventVersion, StringComparison.Ordinal) > 0)
{
_logger.LogDebug("Deleting table '{tableName}' as it contains records with an outdated EventVersion.", table.Name);
await table.DeleteAsync();
tableDeleted = true;
break;
}
}
}

Expand All @@ -177,7 +172,7 @@ await Utility.InvokeWithRetriesAsync(async () =>
}
}

internal virtual async Task FlushLogs(CloudTable table = null)
internal virtual async Task FlushLogs(TableClient table = null)
{
if (_environment.IsPlaceholderModeEnabled())
{
Expand All @@ -200,7 +195,7 @@ internal virtual async Task FlushLogs(CloudTable table = null)
return;
}

bool tableCreated = await TableStorageHelpers.CreateIfNotExistsAsync(table, TableCreationMaxRetryCount);
bool tableCreated = await TableStorageHelpers.CreateIfNotExistsAsync(table, TableClient, TableCreationMaxRetryCount);
if (tableCreated)
{
_logger.LogDebug("Queueing background table purge.");
Expand All @@ -225,20 +220,20 @@ internal virtual async Task FlushLogs(CloudTable table = null)
}
}

internal async Task ExecuteBatchAsync(ConcurrentDictionary<string, DiagnosticEvent> events, CloudTable table)
internal async Task ExecuteBatchAsync(ConcurrentDictionary<string, DiagnosticEvent> events, TableClient table)
{
try
{
var batch = new TableBatchOperation();
var batch = new List<TableTransactionAction>();
foreach (string errorCode in events.Keys)
{
var diagnosticEvent = events[errorCode];
diagnosticEvent.Message = Sanitizer.Sanitize(diagnosticEvent.Message);
diagnosticEvent.Details = Sanitizer.Sanitize(diagnosticEvent.Details);
TableOperation insertOperation = TableOperation.Insert(diagnosticEvent);
batch.Add(insertOperation);
TableTransactionAction insertAction = new TableTransactionAction(TableTransactionActionType.Add, diagnosticEvent);
batch.Add(insertAction);
}
await table.ExecuteBatchAsync(batch);
await table.SubmitTransactionAsync(batch);
events.Clear();
}
catch (Exception ex)
Expand Down
51 changes: 32 additions & 19 deletions src/WebJobs.Script.WebHost/Helpers/TableStorageHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Table;
using Azure;
using Azure.Data.Tables;
using Azure.Data.Tables.Models;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Script.WebHost.Helpers
Expand All @@ -18,27 +20,27 @@ internal static string GetRowKey(DateTime now)
return string.Format("{0:D19}-{1}", DateTime.MaxValue.Ticks - now.Ticks, Guid.NewGuid());
}

internal static async Task<bool> CreateIfNotExistsAsync(CloudTable table, int tableCreationRetries, int retryDelayMS = 1000)
internal static async Task<bool> CreateIfNotExistsAsync(TableClient table, TableServiceClient tableClient, int tableCreationRetries, int retryDelayMS = 1000)
{
int attempt = 0;
do
{
try
{
if (!table.Exists())
if (!await TableExistAsync(table, tableClient))
{
return await table.CreateIfNotExistsAsync();
return (await table.CreateIfNotExistsAsync())?.Value is not null;
}
}
catch (StorageException e)
catch (RequestFailedException rfe)
{
// Can get conflicts with multiple instances attempting to create
// the same table.
// Also, if a table queued up for deletion, we can get a conflict on create,
// though these should only happen in tests not production, because we only ever
// delete OLD tables and we'll never be attempting to recreate a table we just
// deleted outside of tests.
if (e.RequestInformation.HttpStatusCode == (int)HttpStatusCode.Conflict &&
if ((rfe.Status != (int)HttpStatusCode.Conflict || rfe.ErrorCode == TableErrorCode.TableBeingDeleted) &&
attempt < tableCreationRetries)
{
// wait a bit and try again
Expand All @@ -53,7 +55,7 @@ internal static async Task<bool> CreateIfNotExistsAsync(CloudTable table, int ta
return false;
}

internal static void QueueBackgroundTablePurge(CloudTable currentTable, CloudTableClient tableClient, string tableNamePrefix, ILogger logger, int delaySeconds = 30)
internal static void QueueBackgroundTablePurge(TableClient currentTable, TableServiceClient tableClient, string tableNamePrefix, ILogger logger, int delaySeconds = 30)
{
var tIgnore = Task.Run(async () =>
{
Expand All @@ -74,38 +76,49 @@ internal static void QueueBackgroundTablePurge(CloudTable currentTable, CloudTab
});
}

internal static async Task DeleteOldTablesAsync(CloudTable currentTable, CloudTableClient tableClient, string tableNamePrefix, ILogger logger)
internal static async Task DeleteOldTablesAsync(TableClient currentTable, TableServiceClient tableClient, string tableNamePrefix, ILogger logger)
{
var tablesToDelete = await ListOldTablesAsync(currentTable, tableClient, tableNamePrefix);
logger.LogDebug($"Deleting {tablesToDelete.Count()} old tables.");
foreach (var table in tablesToDelete)
{
logger.LogDebug($"Deleting table '{table.Name}'");
await table.DeleteIfExistsAsync();
await tableClient.DeleteTableAsync(table.Name);
logger.LogDebug($"{table.Name} deleted.");
}
}

internal static async Task<IEnumerable<CloudTable>> ListOldTablesAsync(CloudTable currentTable, CloudTableClient tableClient, string tableNamePrefix)
internal static async Task<IEnumerable<TableClient>> ListOldTablesAsync(TableClient currentTable, TableServiceClient tableClient, string tableNamePrefix)
{
var tables = await ListTablesAsync(tableClient, tableNamePrefix);
return tables.Where(p => !string.Equals(currentTable.Name, p.Name, StringComparison.OrdinalIgnoreCase));
}

internal static async Task<IEnumerable<CloudTable>> ListTablesAsync(CloudTableClient tableClient, string tableNamePrefix)
internal static async Task<IEnumerable<TableClient>> ListTablesAsync(TableServiceClient tableClient, string tableNamePrefix)
{
List<CloudTable> tables = new List<CloudTable>();
TableContinuationToken continuationToken = null;
// Azure.Data.Tables doesn't have a direct way to list tables with a prefix so we need to do it manually
var givenValue = tableNamePrefix + "{";
AsyncPageable<TableItem> tablesQuery = tableClient.QueryAsync(p => p.Name.CompareTo(tableNamePrefix) >= 0 && p.Name.CompareTo(givenValue) <= 0);
var tables = new List<TableClient>();

do
await foreach (var table in tablesQuery)
{
var results = await tableClient.ListTablesSegmentedAsync(tableNamePrefix, continuationToken);
continuationToken = results.ContinuationToken;
tables.AddRange(results.Results);
tables.Add(tableClient.GetTableClient(table.Name));
}
while (continuationToken != null);

return tables;
}

internal static async Task<bool> TableExistAsync(TableClient table, TableServiceClient tableClient)
{
var query = tableClient.QueryAsync(p => p.Name == table.Name);

await foreach (var item in query)
{
return true;
}

return false;
}
}
}
}
1 change: 1 addition & 0 deletions src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Data.Tables" Version="12.8.3" />
<PackageReference Include="Azure.Identity" Version="1.11.2" />
<PackageReference Include="Azure.Security.KeyVault.Secrets" Version="4.2.0" />
<PackageReference Include="Microsoft.ApplicationInsights" Version="2.22.0" />
Expand Down
4 changes: 4 additions & 0 deletions src/WebJobs.Script/runtimeassemblies-net6.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
"name": "Azure.Core",
"resolutionPolicy": "private"
},
{
"name": "Azure.Data.Tables",
"resolutionPolicy": "private"
},
{
"name": "Azure.Identity",
"resolutionPolicy": "private"
Expand Down
4 changes: 4 additions & 0 deletions src/WebJobs.Script/runtimeassemblies-net8.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
"name": "Azure.Core",
"resolutionPolicy": "private"
},
{
"name": "Azure.Data.Tables",
"resolutionPolicy": "private"
},
{
"name": "Azure.Identity",
"resolutionPolicy": "private"
Expand Down
Loading