Skip to content

Commit

Permalink
Collect telemetry to measure upload speed for different backend. (#2912)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingluoHuang authored Oct 6, 2023
1 parent e25c754 commit f672567
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 4 deletions.
41 changes: 39 additions & 2 deletions src/Runner.Common/JobServerQueue.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
Expand All @@ -14,10 +15,11 @@ namespace GitHub.Runner.Common
[ServiceLocator(Default = typeof(JobServerQueue))]
public interface IJobServerQueue : IRunnerService, IThrottlingReporter
{
IList<JobTelemetry> JobTelemetries { get; }
TaskCompletionSource<int> JobRecordUpdated { get; }
event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;
Task ShutdownAsync();
void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false);
void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false, bool enableTelemetry = false);
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines);
Expand Down Expand Up @@ -69,13 +71,18 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private Task[] _allDequeueTasks;
private readonly TaskCompletionSource<int> _jobCompletionSource = new();
private readonly TaskCompletionSource<int> _jobRecordUpdated = new();
private readonly List<JobTelemetry> _jobTelemetries = new();
private bool _queueInProcess = false;
private bool _resultsServiceOnly = false;
private Stopwatch _resultsUploadTimer = new();
private Stopwatch _actionsUploadTimer = new();

public TaskCompletionSource<int> JobRecordUpdated => _jobRecordUpdated;

public event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;

public IList<JobTelemetry> JobTelemetries => _jobTelemetries;

// Web console dequeue will start with process queue every 250ms for the first 60*4 times (~60 seconds).
// Then the dequeue will happen every 500ms.
// In this way, customer still can get instance live console output on job start,
Expand All @@ -87,6 +94,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private bool _firstConsoleOutputs = true;

private bool _resultsClientInitiated = false;
private bool _enableTelemetry = false;
private delegate Task ResultsFileUploadHandler(ResultsUploadFileInfo file);

public override void Initialize(IHostContext hostContext)
Expand All @@ -96,10 +104,11 @@ public override void Initialize(IHostContext hostContext)
_resultsServer = hostContext.GetService<IResultsServer>();
}

public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false)
public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false, bool enableTelemetry = false)
{
Trace.Entering();
_resultsServiceOnly = resultsServiceOnly;
_enableTelemetry = enableTelemetry;

var serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));

Expand Down Expand Up @@ -211,6 +220,12 @@ public async Task ShutdownAsync()
await _resultsServer.DisposeAsync();

Trace.Info("All queue process tasks have been stopped, and all queues are drained.");
if (_enableTelemetry)
{
var uploadTimeComparison = $"Actions upload time: {_actionsUploadTimer.ElapsedMilliseconds} ms, Result upload time: {_resultsUploadTimer.ElapsedMilliseconds} ms";
Trace.Info(uploadTimeComparison);
_jobTelemetries.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = uploadTimeComparison });
}
}

public void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber)
Expand Down Expand Up @@ -456,6 +471,10 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
{
try
{
if (_enableTelemetry)
{
_actionsUploadTimer.Start();
}
await UploadFile(file);
}
catch (Exception ex)
Expand All @@ -471,6 +490,13 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
// _fileUploadQueue.Enqueue(file);
//}
}
finally
{
if (_enableTelemetry)
{
_actionsUploadTimer.Stop();
}
}
}

Trace.Info("Try to upload {0} log files or attachments, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
Expand Down Expand Up @@ -517,6 +543,10 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
{
try
{
if (_enableTelemetry)
{
_resultsUploadTimer.Start();
}
if (String.Equals(file.Type, ChecksAttachmentType.StepSummary, StringComparison.OrdinalIgnoreCase))
{
await UploadSummaryFile(file);
Expand Down Expand Up @@ -548,6 +578,13 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
SendResultsTelemetry(ex);
}
}
finally
{
if (_enableTelemetry)
{
_resultsUploadTimer.Stop();
}
}
}

Trace.Info("Tried to upload {0} file(s) to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
Expand Down
18 changes: 16 additions & 2 deletions src/Runner.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public async Task<TaskResult> RunAsync(AgentJobRequestMessage message, Cancellat
HostContext.UserAgents.Add(new ProductInfoHeaderValue("OrchestrationId", orchestrationId.Value));
}

var jobServerQueueTelemetry = false;
if (message.Variables.TryGetValue("DistributedTask.EnableJobServerQueueTelemetry", out VariableValue enableJobServerQueueTelemetry) &&
!string.IsNullOrEmpty(enableJobServerQueueTelemetry?.Value))
{
jobServerQueueTelemetry = StringUtil.ConvertToBoolean(enableJobServerQueueTelemetry.Value);
}

ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
if (MessageUtil.IsRunServiceJob(message.MessageType))
{
Expand All @@ -72,7 +79,7 @@ public async Task<TaskResult> RunAsync(AgentJobRequestMessage message, Cancellat
launchServer.InitializeLaunchClient(new Uri(launchReceiverEndpoint), accessToken);
}
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
_jobServerQueue.Start(message, resultsServiceOnly: true);
_jobServerQueue.Start(message, resultsServiceOnly: true, enableTelemetry: jobServerQueueTelemetry);
}
else
{
Expand All @@ -94,7 +101,7 @@ public async Task<TaskResult> RunAsync(AgentJobRequestMessage message, Cancellat
VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, delegatingHandlers);
await jobServer.ConnectAsync(jobConnection);

_jobServerQueue.Start(message);
_jobServerQueue.Start(message, enableTelemetry: jobServerQueueTelemetry);
server = jobServer;
}

Expand Down Expand Up @@ -405,6 +412,13 @@ private async Task<TaskResult> CompleteJobAsync(IJobServer jobServer, IExecution
result = TaskResultUtil.MergeTaskResults(result, TaskResult.Failed);
}

// include any job telemetry from the background upload process.
if (_jobServerQueue != null &&
_jobServerQueue.JobTelemetries.Count > 0)
{
jobContext.Global.JobTelemetry.AddRange(_jobServerQueue.JobTelemetries);
}

// Clean TEMP after finish process jobserverqueue, since there might be a pending fileupload still use the TEMP dir.
_tempDirectoryManager?.CleanupTempDirectory();

Expand Down
1 change: 1 addition & 0 deletions src/Sdk/DTWebApi/WebApi/JobTelemetryType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace GitHub.DistributedTask.WebApi
{
// do NOT add new enum since it will break backward compatibility with GHES
public enum JobTelemetryType
{
[EnumMember]
Expand Down

0 comments on commit f672567

Please sign in to comment.