-
Notifications
You must be signed in to change notification settings - Fork 958
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Broker Redirects for Session and Messages #3103
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
32282ea
Implement Broker Redirects for Session and Messages
luketomlinson d22e527
cleanup
luketomlinson 5022e2d
reenable session migration
luketomlinson bd6be32
lint + tests
luketomlinson 8954d10
encoding
luketomlinson a850dac
Add a simple BrokerMessageListener L0 test (just to get that ball rol…
pje 354dd93
reformat the new file
pje 875170f
add l0s
luketomlinson 39afd2f
fixes
luketomlinson a9670c3
tests don't hang
luketomlinson 8d865a7
tests still don't hang
luketomlinson f556fc3
fix test
luketomlinson c4372e1
cleanup
luketomlinson e5ef431
Implement DeleteSession
luketomlinson 63b6ae9
add l0
luketomlinson 238a54d
fix more tests
luketomlinson f3c1fbf
format
luketomlinson 2cc3396
tidy
luketomlinson fa6dbc4
Merge branch 'main' into luketomlinson/broker-redirects
luketomlinson File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,15 @@ public sealed class BrokerMessageListener : RunnerService, IMessageListener | |
private TimeSpan _getNextMessageRetryInterval; | ||
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; | ||
private CancellationTokenSource _getMessagesTokenSource; | ||
private VssCredentials _creds; | ||
private TaskAgentSession _session; | ||
private IBrokerServer _brokerServer; | ||
private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new(); | ||
private bool _accessTokenRevoked = false; | ||
private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); | ||
private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); | ||
private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); | ||
|
||
|
||
public override void Initialize(IHostContext hostContext) | ||
{ | ||
|
@@ -36,13 +44,134 @@ public override void Initialize(IHostContext hostContext) | |
|
||
public async Task<Boolean> CreateSessionAsync(CancellationToken token) | ||
{ | ||
await RefreshBrokerConnection(); | ||
return await Task.FromResult(true); | ||
Trace.Entering(); | ||
|
||
// Settings | ||
var configManager = HostContext.GetService<IConfigurationManager>(); | ||
_settings = configManager.LoadSettings(); | ||
var serverUrl = _settings.ServerUrlV2; | ||
Trace.Info(_settings); | ||
|
||
if (string.IsNullOrEmpty(_settings.ServerUrlV2)) | ||
{ | ||
throw new InvalidOperationException("ServerUrlV2 is not set"); | ||
} | ||
|
||
// Create connection. | ||
Trace.Info("Loading Credentials"); | ||
var credMgr = HostContext.GetService<ICredentialManager>(); | ||
_creds = credMgr.LoadCredentials(); | ||
|
||
var agent = new TaskAgentReference | ||
{ | ||
Id = _settings.AgentId, | ||
Name = _settings.AgentName, | ||
Version = BuildConstants.RunnerPackage.Version, | ||
OSDescription = RuntimeInformation.OSDescription, | ||
}; | ||
string sessionName = $"{Environment.MachineName ?? "RUNNER"}"; | ||
var taskAgentSession = new TaskAgentSession(sessionName, agent); | ||
|
||
string errorMessage = string.Empty; | ||
bool encounteringError = false; | ||
|
||
while (true) | ||
{ | ||
token.ThrowIfCancellationRequested(); | ||
Trace.Info($"Attempt to create session."); | ||
try | ||
{ | ||
Trace.Info("Connecting to the Broker Server..."); | ||
await _brokerServer.ConnectAsync(new Uri(serverUrl), _creds); | ||
Trace.Info("VssConnection created"); | ||
|
||
_term.WriteLine(); | ||
_term.WriteSuccessMessage("Connected to GitHub"); | ||
_term.WriteLine(); | ||
|
||
_session = await _brokerServer.CreateSessionAsync(taskAgentSession, token); | ||
|
||
Trace.Info($"Session created."); | ||
if (encounteringError) | ||
{ | ||
_term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected."); | ||
_sessionCreationExceptionTracker.Clear(); | ||
encounteringError = false; | ||
} | ||
|
||
return true; | ||
} | ||
catch (OperationCanceledException) when (token.IsCancellationRequested) | ||
{ | ||
Trace.Info("Session creation has been cancelled."); | ||
throw; | ||
} | ||
catch (TaskAgentAccessTokenExpiredException) | ||
{ | ||
Trace.Info("Runner OAuth token has been revoked. Session creation failed."); | ||
_accessTokenRevoked = true; | ||
throw; | ||
} | ||
catch (Exception ex) | ||
{ | ||
Trace.Error("Catch exception during create session."); | ||
Trace.Error(ex); | ||
|
||
if (ex is VssOAuthTokenRequestException vssOAuthEx && _creds.Federated is VssOAuthCredential vssOAuthCred) | ||
{ | ||
// "invalid_client" means the runner registration has been deleted from the server. | ||
if (string.Equals(vssOAuthEx.Error, "invalid_client", StringComparison.OrdinalIgnoreCase)) | ||
{ | ||
_term.WriteError("Failed to create a session. The runner registration has been deleted from the server, please re-configure. Runner registrations are automatically deleted for runners that have not connected to the service recently."); | ||
return false; | ||
} | ||
|
||
// Check whether we get 401 because the runner registration already removed by the service. | ||
// If the runner registration get deleted, we can't exchange oauth token. | ||
Trace.Error("Test oauth app registration."); | ||
var oauthTokenProvider = new VssOAuthTokenProvider(vssOAuthCred, new Uri(serverUrl)); | ||
var authError = await oauthTokenProvider.ValidateCredentialAsync(token); | ||
if (string.Equals(authError, "invalid_client", StringComparison.OrdinalIgnoreCase)) | ||
{ | ||
_term.WriteError("Failed to create a session. The runner registration has been deleted from the server, please re-configure. Runner registrations are automatically deleted for runners that have not connected to the service recently."); | ||
return false; | ||
} | ||
} | ||
|
||
if (!IsSessionCreationExceptionRetriable(ex)) | ||
{ | ||
_term.WriteError($"Failed to create session. {ex.Message}"); | ||
return false; | ||
} | ||
|
||
if (!encounteringError) //print the message only on the first error | ||
{ | ||
_term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); | ||
encounteringError = true; | ||
} | ||
|
||
Trace.Info("Sleeping for {0} seconds before retrying.", _sessionCreationRetryInterval.TotalSeconds); | ||
await HostContext.Delay(_sessionCreationRetryInterval, token); | ||
} | ||
} | ||
} | ||
|
||
public async Task DeleteSessionAsync() | ||
{ | ||
await Task.CompletedTask; | ||
if (_session != null && _session.SessionId != Guid.Empty) | ||
{ | ||
if (!_accessTokenRevoked) | ||
{ | ||
using (var ts = new CancellationTokenSource(TimeSpan.FromSeconds(30))) | ||
{ | ||
await _brokerServer.DeleteSessionAsync(ts.Token); | ||
} | ||
} | ||
else | ||
{ | ||
Trace.Warning("Runner OAuth token has been revoked. Skip deleting session."); | ||
} | ||
} | ||
} | ||
|
||
public void OnJobStatus(object sender, JobStatusEventArgs e) | ||
|
@@ -73,12 +202,13 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token) | |
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); | ||
try | ||
{ | ||
message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, | ||
message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId, | ||
runnerStatus, | ||
BuildConstants.RunnerPackage.Version, | ||
VarUtil.OS, | ||
VarUtil.OSArchitecture, | ||
_settings.DisableUpdate); | ||
_settings.DisableUpdate, | ||
_getMessagesTokenSource.Token); | ||
|
||
if (message == null) | ||
{ | ||
|
@@ -196,12 +326,84 @@ ex is AccessDeniedException || | |
} | ||
} | ||
|
||
private bool IsSessionCreationExceptionRetriable(Exception ex) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also copied |
||
{ | ||
if (ex is TaskAgentNotFoundException) | ||
{ | ||
Trace.Info("The runner no longer exists on the server. Stopping the runner."); | ||
_term.WriteError("The runner no longer exists on the server. Please reconfigure the runner."); | ||
return false; | ||
} | ||
else if (ex is TaskAgentSessionConflictException) | ||
{ | ||
Trace.Info("The session for this runner already exists."); | ||
_term.WriteError("A session for this runner already exists."); | ||
if (_sessionCreationExceptionTracker.ContainsKey(nameof(TaskAgentSessionConflictException))) | ||
{ | ||
_sessionCreationExceptionTracker[nameof(TaskAgentSessionConflictException)]++; | ||
if (_sessionCreationExceptionTracker[nameof(TaskAgentSessionConflictException)] * _sessionCreationRetryInterval.TotalSeconds >= _sessionConflictRetryLimit.TotalSeconds) | ||
{ | ||
Trace.Info("The session conflict exception have reached retry limit."); | ||
_term.WriteError($"Stop retry on SessionConflictException after retried for {_sessionConflictRetryLimit.TotalSeconds} seconds."); | ||
return false; | ||
} | ||
} | ||
else | ||
{ | ||
_sessionCreationExceptionTracker[nameof(TaskAgentSessionConflictException)] = 1; | ||
} | ||
|
||
Trace.Info("The session conflict exception haven't reached retry limit."); | ||
return true; | ||
} | ||
else if (ex is VssOAuthTokenRequestException && ex.Message.Contains("Current server time is")) | ||
{ | ||
Trace.Info("Local clock might be skewed."); | ||
_term.WriteError("The local machine's clock may be out of sync with the server time by more than five minutes. Please sync your clock with your domain or internet time and try again."); | ||
if (_sessionCreationExceptionTracker.ContainsKey(nameof(VssOAuthTokenRequestException))) | ||
{ | ||
_sessionCreationExceptionTracker[nameof(VssOAuthTokenRequestException)]++; | ||
if (_sessionCreationExceptionTracker[nameof(VssOAuthTokenRequestException)] * _sessionCreationRetryInterval.TotalSeconds >= _clockSkewRetryLimit.TotalSeconds) | ||
{ | ||
Trace.Info("The OAuth token request exception have reached retry limit."); | ||
_term.WriteError($"Stopped retrying OAuth token request exception after {_clockSkewRetryLimit.TotalSeconds} seconds."); | ||
return false; | ||
} | ||
} | ||
else | ||
{ | ||
_sessionCreationExceptionTracker[nameof(VssOAuthTokenRequestException)] = 1; | ||
} | ||
|
||
Trace.Info("The OAuth token request exception haven't reached retry limit."); | ||
return true; | ||
} | ||
else if (ex is TaskAgentPoolNotFoundException || | ||
ex is AccessDeniedException || | ||
ex is VssUnauthorizedException) | ||
{ | ||
Trace.Info($"Non-retriable exception: {ex.Message}"); | ||
return false; | ||
} | ||
|
||
else if (ex is InvalidOperationException) | ||
{ | ||
Trace.Info($"Non-retriable exception: {ex.Message}"); | ||
return false; | ||
} | ||
else | ||
{ | ||
Trace.Info($"Retriable exception: {ex.Message}"); | ||
return true; | ||
} | ||
} | ||
|
||
private async Task RefreshBrokerConnection() | ||
{ | ||
var configManager = HostContext.GetService<IConfigurationManager>(); | ||
_settings = configManager.LoadSettings(); | ||
|
||
if (_settings.ServerUrlV2 == null) | ||
if (string.IsNullOrEmpty(_settings.ServerUrlV2)) | ||
{ | ||
throw new InvalidOperationException("ServerUrlV2 is not set"); | ||
} | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is all copied from MessageListener. Eventually, one of these will be deprecated, but keeping them in sync for now