Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect telemetry to measure upload speed for different backend. #2912

Merged
merged 1 commit into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions src/Runner.Common/JobServerQueue.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
Expand All @@ -14,10 +15,11 @@ namespace GitHub.Runner.Common
[ServiceLocator(Default = typeof(JobServerQueue))]
public interface IJobServerQueue : IRunnerService, IThrottlingReporter
{
IList<JobTelemetry> JobTelemetries { get; }
TaskCompletionSource<int> JobRecordUpdated { get; }
event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;
Task ShutdownAsync();
void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false);
void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false, bool enableTelemetry = false);
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines);
Expand Down Expand Up @@ -69,13 +71,18 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private Task[] _allDequeueTasks;
private readonly TaskCompletionSource<int> _jobCompletionSource = new();
private readonly TaskCompletionSource<int> _jobRecordUpdated = new();
private readonly List<JobTelemetry> _jobTelemetries = new();
private bool _queueInProcess = false;
private bool _resultsServiceOnly = false;
private Stopwatch _resultsUploadTimer = new();
private Stopwatch _actionsUploadTimer = new();

public TaskCompletionSource<int> JobRecordUpdated => _jobRecordUpdated;

public event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;

public IList<JobTelemetry> JobTelemetries => _jobTelemetries;

// Web console dequeue will start with process queue every 250ms for the first 60*4 times (~60 seconds).
// Then the dequeue will happen every 500ms.
// In this way, customer still can get instance live console output on job start,
Expand All @@ -87,6 +94,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private bool _firstConsoleOutputs = true;

private bool _resultsClientInitiated = false;
private bool _enableTelemetry = false;
private delegate Task ResultsFileUploadHandler(ResultsUploadFileInfo file);

public override void Initialize(IHostContext hostContext)
Expand All @@ -96,10 +104,11 @@ public override void Initialize(IHostContext hostContext)
_resultsServer = hostContext.GetService<IResultsServer>();
}

public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false)
public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false, bool enableTelemetry = false)
{
Trace.Entering();
_resultsServiceOnly = resultsServiceOnly;
_enableTelemetry = enableTelemetry;

var serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));

Expand Down Expand Up @@ -211,6 +220,12 @@ public async Task ShutdownAsync()
await _resultsServer.DisposeAsync();

Trace.Info("All queue process tasks have been stopped, and all queues are drained.");
if (_enableTelemetry)
{
var uploadTimeComparison = $"Actions upload time: {_actionsUploadTimer.ElapsedMilliseconds} ms, Result upload time: {_resultsUploadTimer.ElapsedMilliseconds} ms";
Trace.Info(uploadTimeComparison);
_jobTelemetries.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = uploadTimeComparison });
}
}

public void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber)
Expand Down Expand Up @@ -456,6 +471,10 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
{
try
{
if (_enableTelemetry)
{
_actionsUploadTimer.Start();
}
await UploadFile(file);
}
catch (Exception ex)
Expand All @@ -471,6 +490,13 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
// _fileUploadQueue.Enqueue(file);
//}
}
finally
{
if (_enableTelemetry)
{
_actionsUploadTimer.Stop();
}
}
}

Trace.Info("Try to upload {0} log files or attachments, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
Expand Down Expand Up @@ -517,6 +543,10 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
{
try
{
if (_enableTelemetry)
{
_resultsUploadTimer.Start();
}
if (String.Equals(file.Type, ChecksAttachmentType.StepSummary, StringComparison.OrdinalIgnoreCase))
{
await UploadSummaryFile(file);
Expand Down Expand Up @@ -548,6 +578,13 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
SendResultsTelemetry(ex);
}
}
finally
{
if (_enableTelemetry)
{
_resultsUploadTimer.Stop();
}
}
}

Trace.Info("Tried to upload {0} file(s) to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
Expand Down
18 changes: 16 additions & 2 deletions src/Runner.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public async Task<TaskResult> RunAsync(AgentJobRequestMessage message, Cancellat
HostContext.UserAgents.Add(new ProductInfoHeaderValue("OrchestrationId", orchestrationId.Value));
}

var jobServerQueueTelemetry = false;
if (message.Variables.TryGetValue("DistributedTask.EnableJobServerQueueTelemetry", out VariableValue enableJobServerQueueTelemetry) &&
!string.IsNullOrEmpty(enableJobServerQueueTelemetry?.Value))
{
jobServerQueueTelemetry = StringUtil.ConvertToBoolean(enableJobServerQueueTelemetry.Value);
}

ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
if (MessageUtil.IsRunServiceJob(message.MessageType))
{
Expand All @@ -72,7 +79,7 @@ public async Task<TaskResult> RunAsync(AgentJobRequestMessage message, Cancellat
launchServer.InitializeLaunchClient(new Uri(launchReceiverEndpoint), accessToken);
}
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
_jobServerQueue.Start(message, resultsServiceOnly: true);
_jobServerQueue.Start(message, resultsServiceOnly: true, enableTelemetry: jobServerQueueTelemetry);
}
else
{
Expand All @@ -94,7 +101,7 @@ public async Task<TaskResult> RunAsync(AgentJobRequestMessage message, Cancellat
VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, delegatingHandlers);
await jobServer.ConnectAsync(jobConnection);

_jobServerQueue.Start(message);
_jobServerQueue.Start(message, enableTelemetry: jobServerQueueTelemetry);
server = jobServer;
}

Expand Down Expand Up @@ -405,6 +412,13 @@ private async Task<TaskResult> CompleteJobAsync(IJobServer jobServer, IExecution
result = TaskResultUtil.MergeTaskResults(result, TaskResult.Failed);
}

// include any job telemetry from the background upload process.
if (_jobServerQueue != null &&
_jobServerQueue.JobTelemetries.Count > 0)
{
jobContext.Global.JobTelemetry.AddRange(_jobServerQueue.JobTelemetries);
}

// Clean TEMP after finish process jobserverqueue, since there might be a pending fileupload still use the TEMP dir.
_tempDirectoryManager?.CleanupTempDirectory();

Expand Down
1 change: 1 addition & 0 deletions src/Sdk/DTWebApi/WebApi/JobTelemetryType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace GitHub.DistributedTask.WebApi
{
// do NOT add new enum since it will break backward compatibility with GHES
public enum JobTelemetryType
{
[EnumMember]
Expand Down