Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Starting migration of QueueNodeHearbeat #1742

Merged
merged 8 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ApiService/ApiService/ApiService.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<PackageReference Include="Microsoft.Identity.Client" Version="4.42.0" />
<PackageReference Include="Microsoft.Identity.Web.TokenCache" Version="1.23.0" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="6.16.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
Expand Down
32 changes: 23 additions & 9 deletions src/ApiService/ApiService/QueueNodeHearbeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using Azure.Data.Tables;
using System.Threading.Tasks;
using Azure;
using ApiService.onefuzzlib.orm;
chkeita marked this conversation as resolved.
Show resolved Hide resolved
using ApiService;

namespace Microsoft.OneFuzz.Service;

Expand All @@ -16,31 +18,43 @@ enum HeartbeatType
TaskAlive,
}

record NodeHeartbeatEntry(string NodeId, Dictionary<string, HeartbeatType>[] data);


public class QueueNodeHearbeat
{

private readonly ILogger _logger;
private readonly IStorageProvider _storageProvider;

public QueueNodeHearbeat(ILoggerFactory loggerFactory)
public QueueNodeHearbeat(ILoggerFactory loggerFactory, IStorageProvider storageProvider)
{
_logger = loggerFactory.CreateLogger<QueueNodeHearbeat>();
_storageProvider = storageProvider;
}

[Function("QueueNodeHearbeat")]
public void Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg)
public async Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg)
{
var hb = JsonSerializer.Deserialize<NodeHeartbeatEntry>(msg);



_logger.LogInformation($"heartbeat: {msg}");
}
}
var node = await Node.GetByMachineId(_storageProvider, hb.NodeId);

if (node == null) {
_logger.LogWarning($"invalid node id: {hb.NodeId}");
return;
}

var newNode = node with { Heartbeat = DateTimeOffset.UtcNow };

await _storageProvider.Replace(newNode);

//send_event(
// EventNodeHeartbeat(
// machine_id = node.machine_id,
// scaleset_id = node.scaleset_id,
// pool_name = node.pool_name,
// )
//)

_logger.LogInformation($"heartbeat: {msg}");
}
}
124 changes: 58 additions & 66 deletions src/ApiService/ApiService/model.cs
Original file line number Diff line number Diff line change
@@ -1,87 +1,79 @@
using Azure.Data.Tables;
using Microsoft.OneFuzz.Service;
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;

namespace ApiService;


record NodeCommandStopIfFree { }
/// Convention for databse entoties:
/// All entities are represented by immuable records
/// Only properties that also apears as parameter initializers are mapped to the database
/// The name of the property will be tranlated to snake case and used as the column name
/// It is possible to rename the column name by using the [property:JsonPropertyName("column_name")] attribute
/// the "partion key" and "row key" are identified by the [PartitionKey] and [RowKey] attributes
/// Guids are mapped to string in the db
stishkin marked this conversation as resolved.
Show resolved Hide resolved

record StopNodeCommand{}

record StopTaskNodeCommand{
Guid TaskId;
}
record NodeHeartbeatEntry(Guid NodeId, Dictionary<string, HeartbeatType>[] data);

record NodeCommandAddSshKey{
string PublicKey;
}
public record NodeCommandStopIfFree();

record NodeCommand
{
StopNodeCommand? Stop;
StopTaskNodeCommand? StopTask;
NodeCommandAddSshKey? AddSshKey;
NodeCommandStopIfFree? StopIfFree;
}
public record StopNodeCommand();

enum NodeTaskState
{
init,
setting_up,
running,
}
public record StopTaskNodeCommand(Guid TaskId);

record NodeTasks
{
Guid MachineId;
Guid TaskId;
NodeTaskState State = NodeTaskState.init;
public record NodeCommandAddSshKey(string PublicKey);


public record NodeCommand
(
StopNodeCommand? Stop,
StopTaskNodeCommand? StopTask,
NodeCommandAddSshKey? AddSshKey,
NodeCommandStopIfFree? StopIfFree
);

public enum NodeTaskState
{
Init,
SettingUp,
Running,
}

enum NodeState
public record NodeTasks
(
Guid MachineId,
Guid TaskId,
NodeTaskState State = NodeTaskState.Init
);

public enum NodeState
{
init,
Init,
free,
setting_up,
rebooting,
ready,
busy,
done,
shutdown,
halt,
SettingUp,
Rebooting,
Ready,
Busy,
Done,
Shutdown,
Halt,
}


record Node : ITableEntity
{
[DataMember(Name = "initialized_at")]
public DateTimeOffset? InitializedAt;
[DataMember(Name = "pool_name")]
public string PoolName;
[DataMember(Name = "pool_id")]
public Guid? PoolId;
[DataMember(Name = "machine_id")]
public Guid MachineId;
[DataMember(Name = "state")]
public NodeState State;
[DataMember(Name = "scaleset_id")]
public Guid? ScalesetId;
[DataMember(Name = "heartbeat")]
public DateTimeOffset Heartbeat;
[DataMember(Name = "version")]
public Version Version;
[DataMember(Name = "reimage_requested")]
public bool ReimageRequested;
[DataMember(Name = "delete_requested")]
public bool DeleteRequested;
[DataMember(Name = "debug_keep_node")]
public bool DebugKeepNode;

public string PartitionKey { get => PoolName; set => PoolName = value; }
public string RowKey { get => MachineId.ToString(); set => MachineId = Guid.Parse(value); }
public Azure.ETag ETag { get; set; }
DateTimeOffset? ITableEntity.Timestamp { get; set; }

}
public partial record Node
(
DateTimeOffset? InitializedAt,
[PartitionKey] string PoolName,
Guid? PoolId,
[RowKey] Guid MachineId,
NodeState State,
Guid? ScalesetId,
DateTimeOffset Heartbeat,
Version Version,
bool ReimageRequested,
bool DeleteRequested,
bool DebugKeepNode
): EntityBase();
20 changes: 20 additions & 0 deletions src/ApiService/ApiService/onefuzzlib/Nodes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using ApiService.onefuzzlib.orm;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ApiService
{
public partial record Node
{
public async static Task<Node?> GetByMachineId(IStorageProvider storageProvider, Guid machineId) {
var tableClient = await storageProvider.GetTableClient("Node");

var data = storageProvider.QueryAsync<Node>(filter: $"RowKey eq {machineId}");

return await data.FirstOrDefaultAsync();
}
}
}
39 changes: 16 additions & 23 deletions src/ApiService/ApiService/onefuzzlib/orm/Orm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@
using ApiService.onefuzzlib.orm;
using System.Text.Json.Serialization;
using System.Collections.Concurrent;
using Azure;

namespace Microsoft.OneFuzz.Service;

public abstract record EntityBase
{
public ETag? ETag { get; set; }
public DateTimeOffset? TimeStamp { get; set; }
}

public class RowKeyAttribute : Attribute { }
public class PartitionKeyAttribute : Attribute { }
Expand Down Expand Up @@ -109,7 +115,7 @@ private EntityInfo GetEntityInfo<T>()
});
}

public TableEntity ToTableEntity<T>(T typedEntity)
public TableEntity ToTableEntity<T>(T typedEntity) where T: EntityBase
{
if (typedEntity == null)
{
Expand Down Expand Up @@ -163,11 +169,15 @@ public TableEntity ToTableEntity<T>(T typedEntity)

}

if (typedEntity.ETag.HasValue) {
tableEntity.ETag = typedEntity.ETag.Value;
}

return tableEntity;
}


public T ToRecord<T>(TableEntity entity)
public T ToRecord<T>(TableEntity entity) where T: EntityBase
{
var entityInfo = GetEntityInfo<T>();
var parameters =
Expand Down Expand Up @@ -235,30 +245,13 @@ public T ToRecord<T>(TableEntity entity)
}
).ToArray();

return (T)entityInfo.constructor.Invoke(parameters);
}

var entityRecord = (T)entityInfo.constructor.Invoke(parameters);
entityRecord.ETag = entity.ETag;
entityRecord.TimeStamp = entity.Timestamp;

public interface IStorageProvider
{
Task<TableServiceClient> GetStorageClient(string table, string accounId);
return entityRecord;
}

public class StorageProvider : IStorageProvider
{
public async Task<TableServiceClient> GetStorageClient(string table, string accounId)
{
var (name, key) = GetStorageAccountNameAndKey(accounId);
var tableClient = new TableServiceClient(new Uri(accounId), new TableSharedKeyCredential(name, key));
await tableClient.CreateTableIfNotExistsAsync(table);
return tableClient;
}

private (string, string) GetStorageAccountNameAndKey(string accounId)
{
throw new NotImplementedException();
}
}
}


Expand Down
63 changes: 63 additions & 0 deletions src/ApiService/ApiService/onefuzzlib/orm/StorageProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using Azure.Data.Tables;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ApiService;
using Microsoft.OneFuzz.Service;

namespace ApiService.onefuzzlib.orm
{

public interface IStorageProvider
{
Task<TableClient> GetTableClient(string table);
IAsyncEnumerable<T> QueryAsync<T>(string filter) where T : EntityBase;
Task<bool> Replace<T>(T entity) where T : EntityBase;

}

public class StorageProvider : IStorageProvider
{
private readonly string _accountId;
private readonly EntityConverter _entityConverter;

public StorageProvider(string accountId) {
_accountId = accountId;
_entityConverter = new EntityConverter();
}

public async Task<TableClient> GetTableClient(string table)
{
var (name, key) = GetStorageAccountNameAndKey(_accountId);
var tableClient = new TableServiceClient(new Uri(_accountId), new TableSharedKeyCredential(name, key));
await tableClient.CreateTableIfNotExistsAsync(table);
return tableClient.GetTableClient(table);
}

private (string, string) GetStorageAccountNameAndKey(string accounId)
{
throw new NotImplementedException();
}

public async IAsyncEnumerable<T> QueryAsync<T>(string filter) where T : EntityBase
{
var tableClient = await GetTableClient(typeof(T).Name);

await foreach (var x in tableClient.QueryAsync<TableEntity>(filter).Select(x => _entityConverter.ToRecord<T>(x))) {
yield return x;
}
}

public async Task<bool> Replace<T>(T entity) where T : EntityBase
{
var tableClient = await GetTableClient(typeof(T).Name);
var tableEntity = _entityConverter.ToTableEntity(entity);
var response = await tableClient.UpsertEntityAsync(tableEntity);
return !response.IsError;

}
}

}
Loading