Skip to content

Commit

Permalink
Allow runner to check service connection in background.
Browse files Browse the repository at this point in the history
  • Loading branch information
TingluoHuang committed Nov 7, 2024
1 parent 59ec9b4 commit 53a41fb
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 11 deletions.
10 changes: 10 additions & 0 deletions src/Runner.Sdk/Util/UrlUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,15 @@ public static string GetGitHubRequestId(HttpResponseHeaders headers)
}
return string.Empty;
}

public static string GetVssRequestId(HttpResponseHeaders headers)
{
if (headers != null &&
headers.TryGetValues("x-vss-e2eid", out var headerValues))
{
return headerValues.FirstOrDefault();
}
return string.Empty;
}
}
}
170 changes: 160 additions & 10 deletions src/Runner.Worker/JobExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using GitHub.Runner.Common.Util;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using Newtonsoft.Json;
using Pipelines = GitHub.DistributedTask.Pipelines;

namespace GitHub.Runner.Worker
Expand All @@ -42,11 +43,13 @@ public interface IJobExtension : IRunnerService
public sealed class JobExtension : RunnerService, IJobExtension
{
private readonly HashSet<string> _existingProcesses = new(StringComparer.OrdinalIgnoreCase);
private readonly List<Task<string>> _connectivityCheckTasks = new();
private readonly List<Task<CheckResult>> _connectivityCheckTasks = new();
private bool _processCleanup;
private string _processLookupId = $"github_{Guid.NewGuid()}";
private CancellationTokenSource _diskSpaceCheckToken = new();
private Task _diskSpaceCheckTask = null;
private CancellationTokenSource _serviceConnectivityCheckToken = new();
private Task _serviceConnectivityCheckTask = null;

// Download all required actions.
// Make sure all condition inputs are valid.
Expand Down Expand Up @@ -454,11 +457,14 @@ public async Task<List<IStep>> InitializeJob(IExecutionContext jobContext, Pipel
{
foreach (var checkUrl in checkUrls)
{
_connectivityCheckTasks.Add(CheckConnectivity(checkUrl));
_connectivityCheckTasks.Add(CheckConnectivity(checkUrl, accessToken: string.Empty, timeoutInSeconds: 5, token: CancellationToken.None));
}
}
}

Trace.Info($"Start checking service connectivity in background.");
_serviceConnectivityCheckTask = CheckServiceConnectivityAsync(context, _serviceConnectivityCheckToken.Token);

return steps;
}
catch (OperationCanceledException ex) when (jobContext.CancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -692,7 +698,7 @@ public async Task FinalizeJob(IExecutionContext jobContext, Pipelines.AgentJobRe
{
var result = await check;
Trace.Info($"Connectivity check result: {result}");
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = result });
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = $"{result.EndpointUrl}: {result.StatusCode}" });
}
}
catch (Exception ex)
Expand All @@ -702,6 +708,22 @@ public async Task FinalizeJob(IExecutionContext jobContext, Pipelines.AgentJobRe
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = $"Fail to check server connectivity. {ex.Message}" });
}
}

// Collect service connectivity check result
if (_serviceConnectivityCheckTask != null)
{
_serviceConnectivityCheckToken.Cancel();
try
{
await _serviceConnectivityCheckTask;
}
catch (Exception ex)
{
Trace.Error($"Fail to check service connectivity.");
Trace.Error(ex);
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = $"Fail to check service connectivity. {ex.Message}" });
}
}
}
catch (Exception ex)
{
Expand All @@ -717,33 +739,58 @@ public async Task FinalizeJob(IExecutionContext jobContext, Pipelines.AgentJobRe
}
}

private async Task<string> CheckConnectivity(string endpointUrl)
private async Task<CheckResult> CheckConnectivity(string endpointUrl, string accessToken, int timeoutInSeconds, CancellationToken token)
{
Trace.Info($"Check server connectivity for {endpointUrl}.");
string result = string.Empty;
using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
CheckResult result = new CheckResult() { EndpointUrl = endpointUrl };
var stopwatch = Stopwatch.StartNew();
using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutInSeconds)))
using (var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, timeoutTokenSource.Token))
{
try
{
using (var httpClientHandler = HostContext.CreateHttpClientHandler())
using (var httpClient = new HttpClient(httpClientHandler))
{
httpClient.DefaultRequestHeaders.UserAgent.AddRange(HostContext.UserAgents);
var response = await httpClient.GetAsync(endpointUrl, timeoutTokenSource.Token);
result = $"{endpointUrl}: {response.StatusCode}";
if (!string.IsNullOrEmpty(accessToken))
{
httpClient.DefaultRequestHeaders.Add("Authorization", $"Bearer {accessToken}");
}

var response = await httpClient.GetAsync(endpointUrl, linkedTokenSource.Token);
result.StatusCode = $"{response.StatusCode}";

var githubRequestId = UrlUtil.GetGitHubRequestId(response.Headers);
var vssRequestId = UrlUtil.GetVssRequestId(response.Headers);
if (!string.IsNullOrEmpty(githubRequestId))
{
result.RequestId = githubRequestId;
}
else if (!string.IsNullOrEmpty(vssRequestId))
{
result.RequestId = vssRequestId;
}
}
}
catch (Exception ex) when (ex is OperationCanceledException && token.IsCancellationRequested)
{
Trace.Error($"Request canceled during connectivity check: {ex}");
result.StatusCode = "canceled";
}
catch (Exception ex) when (ex is OperationCanceledException && timeoutTokenSource.IsCancellationRequested)
{
Trace.Error($"Request timeout during connectivity check: {ex}");
result = $"{endpointUrl}: timeout";
result.StatusCode = "timeout";
}
catch (Exception ex)
{
Trace.Error($"Catch exception during connectivity check: {ex}");
result = $"{endpointUrl}: {ex.Message}";
result.StatusCode = $"{ex.Message}";
}
}
stopwatch.Stop();
result.DurationInMs = (int)stopwatch.ElapsedMilliseconds;

return result;
}
Expand Down Expand Up @@ -781,6 +828,91 @@ private async Task CheckDiskSpaceAsync(IExecutionContext context, CancellationTo
}
}

private async Task CheckServiceConnectivityAsync(IExecutionContext context, CancellationToken token)
{
var foo = new ServiceConnectivityCheckInput() { IntervalInSecond = 30, RequestTimeoutInSecond = 30 };
foo.Endpoints.Add("pipelines", "https://pipelinesghubeus20.actions.githubusercontent.com/_apis/connectiondata");
foo.Endpoints.Add("broker", "https://broker.actions.githubusercontent.com/health");
foo.Endpoints.Add("results", "https://results-receiver.actions.githubusercontent.com/health");
var fooJson = StringUtil.ConvertToJson(foo);
context.Global.Variables.Set(WellKnownDistributedTaskVariables.RunnerServiceConnectivityTest, fooJson);

var connectionTest = context.Global.Variables.Get(WellKnownDistributedTaskVariables.RunnerServiceConnectivityTest);
if (string.IsNullOrEmpty(connectionTest))
{
return;
}

ServiceConnectivityCheckInput checkConnectivityInfo;
try
{
checkConnectivityInfo = StringUtil.ConvertFromJson<ServiceConnectivityCheckInput>(connectionTest);
}
catch (Exception ex)
{
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = $"Fail to parse JSON. {ex.Message}" });
return;
}

if (checkConnectivityInfo == null)
{
return;
}

// make sure interval is at least 10 seconds
checkConnectivityInfo.IntervalInSecond = Math.Max(10, checkConnectivityInfo.IntervalInSecond);

var systemConnection = context.Global.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
var accessToken = systemConnection.Authorization.Parameters[EndpointAuthorizationParameters.AccessToken];

var testResult = new ServiceConnectivityCheckResult();
while (!token.IsCancellationRequested)
{
foreach (var endpoint in checkConnectivityInfo.Endpoints)
{
if (string.IsNullOrEmpty(endpoint.Key) || string.IsNullOrEmpty(endpoint.Value))
{
continue;
}

if (!testResult.EndpointsResult.ContainsKey(endpoint.Key))
{
testResult.EndpointsResult[endpoint.Key] = new List<string>();
}

try
{
var result = await CheckConnectivity(endpoint.Value, accessToken: accessToken, timeoutInSeconds: checkConnectivityInfo.RequestTimeoutInSecond, token);
testResult.EndpointsResult[endpoint.Key].Add($"{result.StartTime:s}: {result.StatusCode} - {result.RequestId} - {result.DurationInMs}ms");
if (!testResult.HasFailure &&
result.StatusCode != "OK" &&
result.StatusCode == "canceled")
{
// track if any endpoint is not reachable
testResult.HasFailure = true;
}
}
catch (Exception ex)
{
testResult.EndpointsResult[endpoint.Key].Add($"{DateTime.UtcNow:s}: {ex.Message}");
}
}

try
{
await Task.Delay(TimeSpan.FromSeconds(checkConnectivityInfo.IntervalInSecond), token);
}
catch (TaskCanceledException)
{
// ignore
}
}

var telemetryData = StringUtil.ConvertToJson(testResult, Formatting.None);
Trace.Verbose($"Connectivity check result: {telemetryData}");
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = telemetryData });
}

private Dictionary<int, Process> SnapshotProcesses()
{
Dictionary<int, Process> snapshot = new();
Expand Down Expand Up @@ -812,5 +944,23 @@ private static void ValidateJobContainer(JobContainer container)
throw new ArgumentException("Jobs without a job container are forbidden on this runner, please add a 'container:' to your job or contact your self-hosted runner administrator.");
}
}

private class CheckResult
{
public CheckResult()
{
StartTime = DateTime.UtcNow;
}

public string EndpointUrl { get; set; }

public DateTime StartTime { get; set; }

public string StatusCode { get; set; }

public string RequestId { get; set; }

public int DurationInMs { get; set; }
}
}
}
3 changes: 2 additions & 1 deletion src/Runner.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public async Task<TaskResult> RunAsync(AgentJobRequestMessage message, Cancellat
if (message.Variables.TryGetValue(Constants.Variables.System.OrchestrationId, out VariableValue orchestrationId) &&
!string.IsNullOrEmpty(orchestrationId.Value))
{
HostContext.UserAgents.Add(new ProductInfoHeaderValue("OrchestrationId", orchestrationId.Value));
// make the orchestration id the first item in the user-agent header to avoid get truncated in server log.
HostContext.UserAgents.Insert(0, new ProductInfoHeaderValue("OrchestrationId", orchestrationId.Value));

// make sure orchestration id is in the user-agent header.
VssUtil.InitializeVssClientSettings(HostContext.UserAgents, HostContext.WebProxy);
Expand Down
42 changes: 42 additions & 0 deletions src/Sdk/DTWebApi/WebApi/ServiceConnectivityCheck.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using Newtonsoft.Json;

namespace GitHub.DistributedTask.WebApi
{
[DataContract]
public class ServiceConnectivityCheckInput
{
[JsonConstructor]
public ServiceConnectivityCheckInput()
{
Endpoints = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
}

[DataMember(EmitDefaultValue = false)]
public Dictionary<string, string> Endpoints { get; set; }

[DataMember(EmitDefaultValue = false)]
public int IntervalInSecond { get; set; }

[DataMember(EmitDefaultValue = false)]
public int RequestTimeoutInSecond { get; set; }
}

[DataContract]
public class ServiceConnectivityCheckResult
{
[JsonConstructor]
public ServiceConnectivityCheckResult()
{
EndpointsResult = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
}

[DataMember(Order = 1, EmitDefaultValue = false)]
public bool HasFailure { get; set; }

[DataMember(Order = 2, EmitDefaultValue = false)]
public Dictionary<string, List<string>> EndpointsResult { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ public static class WellKnownDistributedTaskVariables
public static readonly String JobId = "system.jobId";
public static readonly String RunnerLowDiskspaceThreshold = "system.runner.lowdiskspacethreshold";
public static readonly String RunnerEnvironment = "system.runnerEnvironment";
public static readonly String RunnerServiceConnectivityTest = "system.runner.serviceconnectivitycheckinput";
}
}

0 comments on commit 53a41fb

Please sign in to comment.