Skip to content

Commit

Permalink
start calling run service for job completion (actions#2412)
Browse files Browse the repository at this point in the history
* start calling run service for job completion

* cleanup

* nit: lines

Co-authored-by: Tingluo Huang <tingluohuang@github.com>

* clean up

* give sanity back to thboop

Co-authored-by: Thomas Boop <52323235+thboop@users.noreply.github.com>

* add clean up back

* clean up

* clean up more

* oops

* copied from existing, but :thumb:

Co-authored-by: Thomas Boop <52323235+thboop@users.noreply.github.com>

---------

Co-authored-by: Tingluo Huang <tingluohuang@github.com>
Co-authored-by: Thomas Boop <52323235+thboop@users.noreply.github.com>
  • Loading branch information
3 people authored and nikola-jokic committed May 12, 2023
1 parent 380d752 commit 3aabb2f
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 80 deletions.
9 changes: 8 additions & 1 deletion src/Runner.Common/RunServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using Sdk.WebApi.WebApi.RawClient;

namespace GitHub.Runner.Common
Expand All @@ -16,6 +15,8 @@ public interface IRunServer : IRunnerService
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);

Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken token);

Task CompleteJobAsync(Guid planId, Guid jobId, CancellationToken token);
}

public sealed class RunServer : RunnerService, IRunServer
Expand Down Expand Up @@ -55,5 +56,11 @@ public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationTo
return jobMessage;
}

public Task CompleteJobAsync(Guid planId, Guid jobId, CancellationToken cancellationToken)
{
CheckConnection();
return RetryRequest(
async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, cancellationToken), cancellationToken);
}
}
}
15 changes: 14 additions & 1 deletion src/Runner.Common/RunnerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ protected async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCre
throw new InvalidOperationException(nameof(EstablishVssConnection));
}

protected async Task RetryRequest(Func<Task> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5
)
{
async Task<Unit> wrappedFunc()
{
await func();
return Unit.Value;
}
await RetryRequest<Unit>(wrappedFunc, cancellationToken, maxRetryAttemptsCount);
}

protected async Task<T> RetryRequest<T>(Func<Task<T>> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5
Expand All @@ -85,7 +98,7 @@ protected async Task<T> RetryRequest<T>(Func<Task<T>> func,
// TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122
catch (Exception ex) when (retryCount < maxRetryAttemptsCount)
{
Trace.Error("Catch exception during get full job message");
Trace.Error("Catch exception during request");
Trace.Error(ex);
var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left.");
Expand Down
8 changes: 8 additions & 0 deletions src/Runner.Common/Unit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Represents absence of value.
namespace GitHub.Runner.Common
{
public readonly struct Unit
{
public static readonly Unit Value = default;
}
}
2 changes: 1 addition & 1 deletion src/Runner.Sdk/Util/VssUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static RawConnection CreateRawConnection(
// settings are applied to an HttpRequestMessage.
settings.AcceptLanguages.Remove(CultureInfo.InvariantCulture);

RawConnection connection = new(serverUri, new RawHttpMessageHandler(credentials.ToOAuthCredentials(), settings), additionalDelegatingHandler);
RawConnection connection = new(serverUri, new RawHttpMessageHandler(credentials.Federated, settings), additionalDelegatingHandler);
return connection;
}

Expand Down
108 changes: 90 additions & 18 deletions src/Runner.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines.ContextData;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Common;
using GitHub.Runner.Common.Util;
Expand Down Expand Up @@ -40,21 +39,34 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
Trace.Info("Job ID {0}", message.JobId);

DateTime jobStartTimeUtc = DateTime.UtcNow;
IRunnerService server = null;

ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
if (string.Equals(message.MessageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase))
{
var runServer = HostContext.GetService<IRunServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
await runServer.ConnectAsync(systemConnection.Url, jobServerCredential);
server = runServer;
}
else
{
// Setup the job server and job server queue.
var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;

Trace.Info($"Creating job server with URL: {jobServerUrl}");
// jobServerQueue is the throttling reporter.
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) });
await jobServer.ConnectAsync(jobConnection);

_jobServerQueue.Start(message);
server = jobServer;
}


// Setup the job server and job server queue.
var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;

Trace.Info($"Creating job server with URL: {jobServerUrl}");
// jobServerQueue is the throttling reporter.
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) });
await jobServer.ConnectAsync(jobConnection);

_jobServerQueue.Start(message);
HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}");

IExecutionContext jobContext = null;
Expand Down Expand Up @@ -99,7 +111,7 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
{
Trace.Error(ex);
jobContext.Error(ex);
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
return await CompleteJobAsync(server, jobContext, message, TaskResult.Failed);
}

if (jobContext.Global.WriteDebug)
Expand Down Expand Up @@ -136,15 +148,15 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
// don't log error issue to job ExecutionContext, since server owns the job level issue
Trace.Error($"Job is cancelled during initialize.");
Trace.Error($"Caught exception: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Canceled);
return await CompleteJobAsync(server, jobContext, message, TaskResult.Canceled);
}
catch (Exception ex)
{
// set the job to failed.
// don't log error issue to job ExecutionContext, since server owns the job level issue
Trace.Error($"Job initialize failed.");
Trace.Error($"Caught exception from {nameof(jobExtension.InitializeJob)}: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
return await CompleteJobAsync(server, jobContext, message, TaskResult.Failed);
}

// trace out all steps
Expand Down Expand Up @@ -181,7 +193,7 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
// Log the error and fail the job.
Trace.Error($"Caught exception from job steps {nameof(StepsRunner)}: {ex}");
jobContext.Error(ex);
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
return await CompleteJobAsync(server, jobContext, message, TaskResult.Failed);
}
finally
{
Expand All @@ -192,7 +204,7 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
Trace.Info($"Job result after all job steps finish: {jobContext.Result ?? TaskResult.Succeeded}");

Trace.Info("Completing the job execution context.");
return await CompleteJobAsync(jobServer, jobContext, message);
return await CompleteJobAsync(server, jobContext, message);
}
finally
{
Expand All @@ -206,6 +218,66 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
}
}

private async Task<TaskResult> CompleteJobAsync(IRunnerService server, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null)
{
if (server is IRunServer runServer)
{
return await CompleteJobAsync(runServer, jobContext, message, taskResult);
}
else if (server is IJobServer jobServer)
{
return await CompleteJobAsync(jobServer, jobContext, message, taskResult);
}
else
{
throw new NotSupportedException();
}
}

private async Task<TaskResult> CompleteJobAsync(IRunServer runServer, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null)
{
jobContext.Debug($"Finishing: {message.JobDisplayName}");
TaskResult result = jobContext.Complete(taskResult);
if (jobContext.Global.Variables.TryGetValue("Node12ActionsWarnings", out var node12Warnings))
{
var actions = string.Join(", ", StringUtil.ConvertFromJson<HashSet<string>>(node12Warnings));
jobContext.Warning(string.Format(Constants.Runner.Node12DetectedAfterEndOfLife, actions));
}

// Make sure to clean temp after file upload since they may be pending fileupload still use the TEMP dir.
_tempDirectoryManager?.CleanupTempDirectory();

// Load any upgrade telemetry
LoadFromTelemetryFile(jobContext.Global.JobTelemetry);

// Make sure we don't submit secrets as telemetry
MaskTelemetrySecrets(jobContext.Global.JobTelemetry);

Trace.Info($"Raising job completed against run service");
var completeJobRetryLimit = 5;
var exceptions = new List<Exception>();
while (completeJobRetryLimit-- > 0)
{
try
{
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, default);
return result;
}
catch (Exception ex)
{
Trace.Error($"Catch exception while attempting to complete job {message.JobId}, job request {message.RequestId}.");
Trace.Error(ex);
exceptions.Add(ex);
}

// delay 5 seconds before next retry.
await Task.Delay(TimeSpan.FromSeconds(5));
}

// rethrow exceptions from all attempts.
throw new AggregateException(exceptions);
}

private async Task<TaskResult> CompleteJobAsync(IJobServer jobServer, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null)
{
jobContext.Debug($"Finishing: {message.JobDisplayName}");
Expand Down
20 changes: 0 additions & 20 deletions src/Sdk/Common/Common/Authentication/VssCredentialsExtension.cs

This file was deleted.

12 changes: 6 additions & 6 deletions src/Sdk/Common/Common/RawHttpMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ namespace GitHub.Services.Common
public class RawHttpMessageHandler: HttpMessageHandler
{
public RawHttpMessageHandler(
VssOAuthCredential credentials)
FederatedCredential credentials)
: this(credentials, new RawClientHttpRequestSettings())
{
}

public RawHttpMessageHandler(
VssOAuthCredential credentials,
FederatedCredential credentials,
RawClientHttpRequestSettings settings)
: this(credentials, settings, new HttpClientHandler())
{
}

public RawHttpMessageHandler(
VssOAuthCredential credentials,
FederatedCredential credentials,
RawClientHttpRequestSettings settings,
HttpMessageHandler innerHandler)
{
Expand Down Expand Up @@ -56,7 +56,7 @@ public RawHttpMessageHandler(
/// <summary>
/// Gets the credentials associated with this handler.
/// </summary>
public VssOAuthCredential Credentials
public FederatedCredential Credentials
{
get;
private set;
Expand Down Expand Up @@ -111,7 +111,7 @@ protected override async Task<HttpResponseMessage> SendAsync(
// Ensure that we attempt to use the most appropriate authentication mechanism by default.
if (m_tokenProvider == null)
{
m_tokenProvider = this.Credentials.GetTokenProvider(request.RequestUri);
m_tokenProvider = this.Credentials.CreateTokenProvider(request.RequestUri, null, null);
}
}

Expand Down Expand Up @@ -254,7 +254,7 @@ private static void ApplySettings(
private CredentialWrapper m_credentialWrapper;
private object m_thisLock;
private const Int32 m_maxAuthRetries = 3;
private VssOAuthTokenProvider m_tokenProvider;
private IssuedTokenProvider m_tokenProvider;

//.Net Core does not attempt NTLM schema on Linux, unless ICredentials is a CredentialCache instance
//This workaround may not be needed after this corefx fix is consumed: https://github.com/dotnet/corefx/pull/7923
Expand Down
5 changes: 3 additions & 2 deletions src/Sdk/DTPipelines/Pipelines/AgentJobRequestMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ public AgentJobRequestMessage(
IList<String> fileTable,
TemplateToken jobOutputs,
IList<TemplateToken> defaults,
ActionsEnvironmentReference actionsEnvironment)
ActionsEnvironmentReference actionsEnvironment,
String messageType = JobRequestMessageTypes.PipelineAgentJobRequest)
{
this.MessageType = JobRequestMessageTypes.PipelineAgentJobRequest;
this.MessageType = messageType;
this.Plan = plan;
this.JobId = jobId;
this.JobDisplayName = jobDisplayName;
Expand Down
26 changes: 25 additions & 1 deletion src/Sdk/DTWebApi/WebApi/RunServiceHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,38 @@ public RunServiceHttpClient(
StreamID = messageId
};

requestUri = new Uri(requestUri, "acquirejob");

var payloadJson = JsonUtility.ToString(payload);
var requestContent = new StringContent(payloadJson, System.Text.Encoding.UTF8, "application/json");
return SendAsync<Pipelines.AgentJobRequestMessage>(
httpMethod,
additionalHeaders: null,
requestUri: requestUri,
content: requestContent,
cancellationToken: cancellationToken);
}

public Task CompleteJobAsync(
Uri requestUri,
Guid planId,
Guid jobId,
CancellationToken cancellationToken = default)
{
HttpMethod httpMethod = new HttpMethod("POST");
var payload = new {
PlanId = planId,
JobId = jobId
};

requestUri = new Uri(requestUri, "completejob");

var payloadJson = JsonUtility.ToString(payload);
var requestContent = new StringContent(payloadJson, System.Text.Encoding.UTF8, "application/json");
return SendAsync(
httpMethod,
requestUri,
content: requestContent,
cancellationToken: cancellationToken);
}
}
}
11 changes: 11 additions & 0 deletions src/Sdk/WebApi/WebApi/RawHttpClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ protected async Task<HttpResponseMessage> SendAsync(
}
}

protected Task<T> SendAsync<T>(
HttpMethod method,
Uri requestUri,
HttpContent content = null,
IEnumerable<KeyValuePair<String, String>> queryParameters = null,
Object userState = null,
CancellationToken cancellationToken = default(CancellationToken))
{
return SendAsync<T>(method, null, requestUri, content, queryParameters, userState, cancellationToken);
}

protected async Task<T> SendAsync<T>(
HttpMethod method,
IEnumerable<KeyValuePair<String, String>> additionalHeaders,
Expand Down
Loading

0 comments on commit 3aabb2f

Please sign in to comment.