Skip to content

Commit

Permalink
Merge pull request #119 from Drawaes/FixingLeadership
Browse files Browse the repository at this point in the history
Add usings to HttpResponses
  • Loading branch information
Drawaes authored Jun 17, 2019
2 parents c37f571 + b43ad94 commit 8e8f0d9
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 140 deletions.
1 change: 1 addition & 0 deletions CondenserDotNet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Release Notes", "Release No
releasenotes\4.1.1.props = releasenotes\4.1.1.props
releasenotes\4.1.10.props = releasenotes\4.1.10.props
releasenotes\4.1.11.props = releasenotes\4.1.11.props
releasenotes\4.1.13.props = releasenotes\4.1.13.props
releasenotes\4.1.3.props = releasenotes\4.1.3.props
releasenotes\4.1.4.props = releasenotes\4.1.4.props
releasenotes\4.1.9.props = releasenotes\4.1.9.props
Expand Down
7 changes: 7 additions & 0 deletions releasenotes/4.1.13.props
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<Project>
<PropertyGroup>
<PackageReleaseNotes>
* Fix potential leaks by ensuring HttpResponses are disposed correctly
</PackageReleaseNotes>
</PropertyGroup>
</Project>
52 changes: 27 additions & 25 deletions src/CondenserDotNet.Client/Leadership/LeaderWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,33 +69,35 @@ private async Task TryForElection()
for (var i = 0; i < 10; i++)
{
CondenserEventSource.Log.LeadershipSessionGetStatus(_keyToWatch);
leaderResult = await _serviceManager.Client.GetAsync($"{KeyPath}{_keyToWatch}?index={_consulIndex}");
if (!leaderResult.IsSuccessStatusCode)
using (leaderResult = await _serviceManager.Client.GetAsync($"{KeyPath}{_keyToWatch}?index={_consulIndex}"))
{
_currentLeaderEvent.Reset();
_electedLeaderEvent.Reset();
//error so return to create session
return;
if (!leaderResult.IsSuccessStatusCode)
{
_currentLeaderEvent.Reset();
_electedLeaderEvent.Reset();
//error so return to create session
return;
}
var kv = JsonConvert.DeserializeObject<KeyValue[]>(await leaderResult.Content.ReadAsStringAsync());
if (string.IsNullOrWhiteSpace(kv[0].Session))
{
_currentLeaderEvent.Reset();
_electedLeaderEvent.Reset();
break;
}
var infoService = JsonConvert.DeserializeObject<InformationService>(kv[0].ValueFromBase64());
_currentLeaderEvent.Set(infoService);
_callback?.Invoke(infoService);
if (Guid.Parse(kv[0].Session) == _sessionId)
{
_electedLeaderEvent.Set(true);
}
else
{
_electedLeaderEvent.Reset();
}
_consulIndex = leaderResult.GetConsulIndex();
}
var kv = JsonConvert.DeserializeObject<KeyValue[]>(await leaderResult.Content.ReadAsStringAsync());
if (string.IsNullOrWhiteSpace(kv[0].Session))
{
_currentLeaderEvent.Reset();
_electedLeaderEvent.Reset();
break;
}
var infoService = JsonConvert.DeserializeObject<InformationService>(kv[0].ValueFromBase64());
_currentLeaderEvent.Set(infoService);
_callback?.Invoke(infoService);
if (Guid.Parse(kv[0].Session) == _sessionId)
{
_electedLeaderEvent.Set(true);
}
else
{
_electedLeaderEvent.Reset();
}
_consulIndex = leaderResult.GetConsulIndex();
}
}
}
Expand Down
50 changes: 26 additions & 24 deletions src/CondenserDotNet.Client/Leadership/LeaderWatcherNew.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,35 +104,37 @@ private async Task WaitForLeadershipChange()
{
while(true)
{
var leaderResult = await _serviceManager.Client.GetAsync($"{KeyPath}{_keyToWatch}?index={_consulIndex}");
if(!leaderResult.IsSuccessStatusCode)
using (var leaderResult = await _serviceManager.Client.GetAsync($"{KeyPath}{_keyToWatch}?index={_consulIndex}"))
{
//Lock deleted
if(leaderResult.StatusCode == System.Net.HttpStatusCode.NotFound)
if (!leaderResult.IsSuccessStatusCode)
{
_electedLeaderEvent.Reset();
//Lock deleted
if (leaderResult.StatusCode == System.Net.HttpStatusCode.NotFound)
{
_electedLeaderEvent.Reset();
_currentInfoService.Reset();
return;
}
await Task.Delay(500);
continue;
}
var kv = JsonConvert.DeserializeObject<KeyValue[]>(await leaderResult.Content.ReadAsStringAsync());
if (string.IsNullOrWhiteSpace(kv[0].Session))
{
//no one has leadership
_currentInfoService.Reset();
_electedLeaderEvent.Reset();
return;
}
var infoService = JsonConvert.DeserializeObject<InformationService>(kv[0].ValueFromBase64());
_currentInfoService.Set(infoService);
_consulIndex = leaderResult.GetConsulIndex();
_callback?.Invoke(infoService);
if (await _sessionIdTask != new Guid(kv[0].Session))
{
_electedLeaderEvent.Reset();
return;
}
await Task.Delay(500);
continue;
}
var kv = JsonConvert.DeserializeObject<KeyValue[]>(await leaderResult.Content.ReadAsStringAsync());
if(string.IsNullOrWhiteSpace(kv[0].Session))
{
//no one has leadership
_currentInfoService.Reset();
_electedLeaderEvent.Reset();
return;
}
var infoService = JsonConvert.DeserializeObject<InformationService>(kv[0].ValueFromBase64());
_currentInfoService.Set(infoService);
_consulIndex = leaderResult.GetConsulIndex();
_callback?.Invoke(infoService);
if(await _sessionIdTask != new Guid(kv[0].Session))
{
_electedLeaderEvent.Reset();
return;
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions src/CondenserDotNet.Client/Services/ServiceRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ public async Task<IEnumerable<string>> GetAvailableServicesAsync()

public async Task<Dictionary<string, string[]>> GetAvailableServicesWithTagsAsync()
{
var result = await _client.GetAsync(HttpUtils.ServiceCatalogUrl);
if (!result.IsSuccessStatusCode)
using (var result = await _client.GetAsync(HttpUtils.ServiceCatalogUrl))
{
return null;
if (!result.IsSuccessStatusCode)
{
return null;
}
var content = await result.Content.ReadAsStringAsync();
var serviceList = JsonConvert.DeserializeObject<Dictionary<string, string[]>>(content);
return serviceList;
}
var content = await result.Content.ReadAsStringAsync();
var serviceList = JsonConvert.DeserializeObject<Dictionary<string, string[]>>(content);
return serviceList;
}

public Task<InformationService> GetServiceInstanceAsync(string serviceName)
Expand Down
38 changes: 20 additions & 18 deletions src/CondenserDotNet.Client/Services/ServiceWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,31 @@ private async Task WatcherLoop(HttpClient client)
var consulIndex = "0";
while (!_cancelationToken.Token.IsCancellationRequested)
{
var result = await client.GetAsync(_url + consulIndex, _cancelationToken.Token);
if (!result.IsSuccessStatusCode)
using (var result = await client.GetAsync(_url + consulIndex, _cancelationToken.Token))
{
if (_state == WatcherState.UsingLiveValues)
if (!result.IsSuccessStatusCode)
{
_state = WatcherState.UsingCachedValues;
if (_state == WatcherState.UsingLiveValues)
{
_state = WatcherState.UsingCachedValues;
}
await Task.Delay(1000);
continue;
}
await Task.Delay(1000);
continue;
}
var newConsulIndex = result.GetConsulIndex();
if (newConsulIndex == consulIndex)
{
var newConsulIndex = result.GetConsulIndex();
if (newConsulIndex == consulIndex)
{
continue;
}
consulIndex = newConsulIndex;
var content = await result.Content.ReadAsStringAsync();
var instance = JsonConvert.DeserializeObject<List<InformationServiceSet>>(content);
Volatile.Write(ref _instances, instance);
_listCallback?.Invoke(instance);
_state = WatcherState.UsingLiveValues;
_completionSource.TrySetResult(true);
continue;
}
consulIndex = newConsulIndex;
var content = await result.Content.ReadAsStringAsync();
var instance = JsonConvert.DeserializeObject<List<InformationServiceSet>>(content);
Volatile.Write(ref _instances, instance);
_listCallback?.Invoke(instance);
_state = WatcherState.UsingLiveValues;
_completionSource.TrySetResult(true);
continue;
}
}
catch (Exception ex)
Expand Down
66 changes: 36 additions & 30 deletions src/CondenserDotNet.Configuration/Consul/ConsulConfigSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,17 @@ public async Task<KeyOperationResult> GetKeysAsync(string keyPath)
try
{
CondenserEventSource.Log.ConfigurationGetKeysRecursive(keyPath);
var response = await _httpClient.GetAsync($"{ConsulKeyPath}{keyPath}?recurse");
if (!response.IsSuccessStatusCode)
using (var response = await _httpClient.GetAsync($"{ConsulKeyPath}{keyPath}?recurse"))
{
_logger?.LogWarning("We didn't get a succesful response from consul code was {code}", response.StatusCode);
return new KeyOperationResult() { Success = false, Dictionary = null };
if (!response.IsSuccessStatusCode)
{
_logger?.LogWarning("We didn't get a succesful response from consul code was {code}", response.StatusCode);
return new KeyOperationResult() { Success = false, Dictionary = null };
}

var dictionary = await BuildDictionaryAsync(keyPath, response);
return new KeyOperationResult() { Success = true, Dictionary = dictionary };
}

var dictionary = await BuildDictionaryAsync(keyPath, response);
return new KeyOperationResult() { Success = true, Dictionary = dictionary };
}
catch (Exception ex)
{
Expand All @@ -86,17 +88,19 @@ public async Task<KeyOperationResult> GetKeysAsync(string keyPath)
try
{
CondenserEventSource.Log.ConfigurationGetKey(keyPath);
var response = await _httpClient.GetAsync($"{ConsulKeyPath}{keyPath}");
if (!response.IsSuccessStatusCode)
using (var response = await _httpClient.GetAsync($"{ConsulKeyPath}{keyPath}"))
{
_logger?.LogWarning("We didn't get a successful response from consul code was {code}", response.StatusCode);
return (false, null);
if (!response.IsSuccessStatusCode)
{
_logger?.LogWarning("We didn't get a successful response from consul code was {code}", response.StatusCode);
return (false, null);
}

var content = await response.Content.ReadAsStringAsync();
var keys = JsonConvert.DeserializeObject<KeyValue[]>(content);
if (keys.Length != 1) return (false, null);
return (true, keys[0].Value);
}

var content = await response.Content.ReadAsStringAsync();
var keys = JsonConvert.DeserializeObject<KeyValue[]>(content);
if (keys.Length != 1) return (false, null);
return (true, keys[0].Value);
}
catch (Exception exception)
{
Expand All @@ -113,23 +117,25 @@ public async Task<KeyOperationResult> TryWatchKeysAsync(string keyPath, object s
try
{
CondenserEventSource.Log.ConfigurationWatchKey(keyPath);
var response = await _httpClient.GetAsync(url + consulState.ConsulIndex, _disposed.Token);
var newConsulIndex = response.GetConsulIndex();

if (!response.IsSuccessStatusCode)
{
consulState.ConsulIndex = newConsulIndex;
return default;
}

if (newConsulIndex == consulState.ConsulIndex)
using (var response = await _httpClient.GetAsync(url + consulState.ConsulIndex, _disposed.Token))
{
var newConsulIndex = response.GetConsulIndex();

if (!response.IsSuccessStatusCode)
{
consulState.ConsulIndex = newConsulIndex;
return default;
}

if (newConsulIndex == consulState.ConsulIndex)
{
consulState.ConsulIndex = newConsulIndex;
return default;
}
consulState.ConsulIndex = newConsulIndex;
return default;
var dictionary = await BuildDictionaryAsync(keyPath, response);
return new KeyOperationResult() { Success = true, Dictionary = dictionary };
}
consulState.ConsulIndex = newConsulIndex;
var dictionary = await BuildDictionaryAsync(keyPath, response);
return new KeyOperationResult() { Success = true, Dictionary = dictionary };
}
catch (Exception ex)
{
Expand Down
28 changes: 9 additions & 19 deletions src/CondenserDotNet.Core/HttpUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,11 @@ public static HttpClient CreateClient(IConsulAclProvider aclProvider, string age
var port = agentPort ?? DefaultPort;

var uri = new UriBuilder("http", host, port);
HttpClient client;
#if NET452
System.Net.ServicePointManager.DefaultConnectionLimit = 50;
client = new HttpClient()
var client = new HttpClient(new HttpClientHandler() { MaxConnectionsPerServer = 50 })
{
BaseAddress = uri.Uri,
Timeout = DefaultTimeout
};
#else
client = new HttpClient(new HttpClientHandler() { MaxConnectionsPerServer = 50 })
{
BaseAddress = uri.Uri,
Timeout = DefaultTimeout
};
#endif
var token = aclProvider?.GetAclToken();
if(!string.IsNullOrEmpty(token))
{
Expand All @@ -62,17 +52,17 @@ public static HttpClient CreateClient(IConsulAclProvider aclProvider, string age
return client;
}

public static Task<T> GetObject<T>(this HttpContent content) =>
content.ReadAsStringAsync().ContinueWith(sTask =>
public static async Task<T> GetObject<T>(this HttpContent content)
{
return JsonConvert.DeserializeObject<T>(sTask.Result);
});
var result = await content.ReadAsStringAsync();
return JsonConvert.DeserializeObject<T>(result);
}

public static Task<T> GetAsync<T>(this HttpClient client, string uri) =>
client.GetStringAsync(uri).ContinueWith(resultTask =>
public static async Task<T> GetAsync<T>(this HttpClient client, string uri)
{
return JsonConvert.DeserializeObject<T>(resultTask.Result);
});
var result = await client.GetStringAsync(uri);
return JsonConvert.DeserializeObject<T>(result);
}

public static StringContent GetStringContent(string stringForContent)
{
Expand Down
Loading

0 comments on commit 8e8f0d9

Please sign in to comment.