Skip to content

Commit

Permalink
poll and watch k8s (#1989)
Browse files Browse the repository at this point in the history
* poll and watch k8s
  • Loading branch information
rogeralsing authored May 10, 2023
1 parent 782d21f commit be46393
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 17 deletions.
102 changes: 86 additions & 16 deletions src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ internal class KubernetesClusterMonitor : IActor
private string _address;
private string _clusterName;
private IKubernetes _kubernetes;
private DateTime _lastRestart;
private string _podName;
private bool _stopping;
private Watcher<V1Pod> _watcher;
Expand All @@ -46,7 +45,7 @@ public Task ReceiveAsync(IContext context) =>
context.Message switch
{
RegisterMember cmd => Register(cmd),
StartWatchingCluster cmd => StartWatchingCluster(cmd.ClusterName, context),
StartWatchingCluster _ => StartWatchingCluster(context),
DeregisterMember => StopWatchingCluster(),
Stopping => StopWatchingCluster(),
_ => Task.CompletedTask
Expand Down Expand Up @@ -80,20 +79,31 @@ private Task StopWatchingCluster()
return Task.CompletedTask;
}

private Task StartWatchingCluster(string clusterName, ISenderContext context)
private async Task StartWatchingCluster(IContext context)
{
var selector = $"{LabelCluster}={clusterName}";

Logger.Log(_config.DebugLogLevel, "[Cluster][KubernetesProvider] Starting to watch pods with {Selector}",
selector);
try
{
await Poll();
}
catch (Exception x)
{
Logger.LogError(x, "[Cluster][KubernetesProvider] Failed to poll the Kubernetes API");
}

_watcherTask = _kubernetes.ListNamespacedPodWithHttpMessagesAsync(
KubernetesExtensions.GetKubeNamespace(),
labelSelector: selector,
watch: true,
timeoutSeconds: _config.WatchTimeoutSeconds
);
if (!_config.DisableWatch)
{
await Watch();
}

await Task.Delay(1000);

context.Send(context.Self, new StartWatchingCluster(_clusterName));
}

private Task Watch()
{
var tcs = new TaskCompletionSource();
_watcherTask = GetListTask(_clusterName);
_watcher = _watcherTask.Watch<V1Pod, V1PodList>(Watch, Error, Closed);
_watching = true;

Expand Down Expand Up @@ -127,16 +137,71 @@ void Closed()

void Restart()
{
_lastRestart = DateTime.UtcNow;
_watching = false;

DisposeWatcher();
DisposeWatcherTask();

context.Send(context.Self!, new StartWatchingCluster(_clusterName));
tcs.SetResult();
}

return Task.CompletedTask;
return tcs.Task;
}

private async Task Poll()
{
var x = await GetListTask(_clusterName);
foreach (var eventPod in x.Body.Items)
{
var podLabels = eventPod.Metadata.Labels;

if (!podLabels.TryGetValue(LabelCluster, out var podClusterName))
{
Logger.LogInformation(
"[Cluster][KubernetesProvider] The pod {PodName} is not a Proto.Cluster node",
eventPod.Metadata.Name
);

continue;
}

if (_clusterName != podClusterName)
{
Logger.LogInformation(
"[Cluster][KubernetesProvider] The pod {PodName} is from another cluster {Cluster}",
eventPod.Metadata.Name, _clusterName
);

continue;
}

_clusterPods[eventPod.Uid()] = eventPod;
}

var uids = x.Body.Items.Select(p => p.Uid()).ToHashSet();
var toRemove = _clusterPods.Keys.Where(k => !uids.Contains(k)).ToList();

foreach(var uid in toRemove)
{
_clusterPods.Remove(uid);
}

UpdateTopology();
}

private Task<HttpOperationResponse<V1PodList>> GetListTask(string clusterName)
{
var selector = $"{LabelCluster}={clusterName}";

Logger.Log(_config.DebugLogLevel, "[Cluster][KubernetesProvider] Starting to watch pods with {Selector}",
selector);

return _kubernetes.ListNamespacedPodWithHttpMessagesAsync(
KubernetesExtensions.GetKubeNamespace(),
labelSelector: selector,
watch: true,
timeoutSeconds: _config.WatchTimeoutSeconds
);
}

private void RecreateKubernetesClient()
Expand Down Expand Up @@ -219,6 +284,11 @@ private void Watch(WatchEventType eventType, V1Pod eventPod)
_clusterPods[eventPod.Uid()] = eventPod;
}

UpdateTopology();
}

private void UpdateTopology()
{
var memberStatuses = _clusterPods.Values
.Select(x => x.GetMemberStatus())
.Where(x => x.IsRunning && (x.IsReady || x.Member.Id == _cluster.System.Id))
Expand Down
8 changes: 7 additions & 1 deletion src/Proto.Cluster.Kubernetes/KubernetesProviderConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@ namespace Proto.Cluster.Kubernetes;
public record KubernetesProviderConfig
{
public KubernetesProviderConfig(int watchTimeoutSeconds = 30, bool developerLogging = false,
Func<IKubernetes> clientFactory = null)
Func<IKubernetes> clientFactory = null, bool disableWatch = false)
{
WatchTimeoutSeconds = watchTimeoutSeconds;
DeveloperLogging = developerLogging;
ClientFactory = clientFactory ?? DefaultFactory;
DisableWatch = disableWatch;
}

/// <summary>
/// A timeout for the watch pods operation
/// </summary>
public int WatchTimeoutSeconds { get; }

/// <summary>
/// Disable the watch pods operation and rely on HTTP request response polling instead
/// </summary>
public bool DisableWatch { get; set; }

/// <summary>
/// Enables more detailed logging
/// </summary>
Expand Down

0 comments on commit be46393

Please sign in to comment.