Skip to content

Commit

Permalink
Added 'WatchKube' discovery provider
Browse files Browse the repository at this point in the history
  • Loading branch information
VIMPELCOM_MAIN\NKuksov authored and raman-m committed Oct 16, 2024
1 parent 0379c77 commit 59e55fc
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 16 deletions.
29 changes: 25 additions & 4 deletions src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@ namespace Ocelot.Provider.Kubernetes
{
public class EndPointClientV1 : KubeResourceClient, IEndPointClient
{
private readonly HttpRequest _collection;
private readonly HttpRequest _byName;
private readonly HttpRequest _watchByName;

public EndPointClientV1(IKubeApiClient client) : base(client)
{
_collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
_byName = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
_watchByName = KubeRequest.Create("api/v1/watch/namespaces/{Namespace}/endpoints/{ServiceName}");
}

public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default)
public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(serviceName))
{
throw new ArgumentNullException(nameof(serviceName));
}

var request = _collection
var request = _byName
.WithTemplateParameters(new
{
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
Expand All @@ -34,5 +37,23 @@ public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace
? await response.ReadContentAsAsync<EndpointsV1>()
: null;
}

public IObservable<IResourceEventV1<EndpointsV1>> Watch(string serviceName, string kubeNamespace,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(serviceName))
{
throw new ArgumentNullException(nameof(serviceName));
}

return ObserveEvents<EndpointsV1>(
_watchByName.WithTemplateParameters(new
{
ServiceName = serviceName,
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
}),
"watch v1/Endpoints '" + serviceName + "' in namespace " +
(kubeNamespace ?? KubeClient.DefaultNamespace));
}
}
}
2 changes: 2 additions & 0 deletions src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ namespace Ocelot.Provider.Kubernetes.Interfaces;
public interface IEndPointClient : IKubeResourceClient
{
Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default);

IObservable<IResourceEventV1<EndpointsV1>> Watch(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/Ocelot.Provider.Kubernetes/Kube.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public virtual async Task<List<Service>> GetAsync()
}

private Task<EndpointsV1> GetEndpoint() => _kubeApi
.ResourceClient(client => new EndPointClientV1(client))
.EndpointsV1()
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);

private bool CheckErroneousState(EndpointsV1 endpoint)
Expand Down
9 changes: 9 additions & 0 deletions src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Ocelot.Provider.Kubernetes.Interfaces;

namespace Ocelot.Provider.Kubernetes;

public static class KubeApiClientExtensions
{
public static IEndPointClient EndpointsV1(this IKubeApiClient client)
=> client.ResourceClient(x => new EndPointClientV1(client));
}
29 changes: 18 additions & 11 deletions src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using Ocelot.Configuration;
using Ocelot.Logging;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes.Interfaces;

namespace Ocelot.Provider.Kubernetes
{
public static class KubernetesProviderFactory
{
/// <summary>
/// String constant used for provider type definition.
{
/// <summary>
/// String constant used for provider type definition.
/// </summary>
public const string PollKube = nameof(Kubernetes.PollKube);

public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider;

public const string WatchKube = nameof(Kubernetes.WatchKube);

public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider;

private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamRoute route)
{
Expand All @@ -27,11 +29,16 @@ private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provide
Scheme = route.DownstreamScheme,
};

if (WatchKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase))
{
return new WatchKube(configuration, factory, kubeClient, serviceBuilder);
}

var defaultK8sProvider = new Kube(configuration, factory, kubeClient, serviceBuilder);

return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)
? new PollKube(config.PollingInterval, factory, defaultK8sProvider)
: defaultK8sProvider;

return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)
? new PollKube(config.PollingInterval, factory, defaultK8sProvider)
: defaultK8sProvider;
}
}
}
78 changes: 78 additions & 0 deletions src/Ocelot.Provider.Kubernetes/WatchKube.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using KubeClient.Models;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes.Interfaces;
using Ocelot.Values;

namespace Ocelot.Provider.Kubernetes;

// Dispose() won't be called because provider wasn't resolved from DI
public class WatchKube : IServiceDiscoveryProvider, IDisposable
{
private readonly KubeRegistryConfiguration _configuration;
private readonly IOcelotLogger _logger;
private readonly IKubeApiClient _kubeApi;
private readonly IKubeServiceBuilder _serviceBuilder;

private List<Service> _services = null;
private readonly IDisposable _subscription;

public WatchKube(
KubeRegistryConfiguration configuration,
IOcelotLoggerFactory factory,
IKubeApiClient kubeApi,
IKubeServiceBuilder serviceBuilder)
{
_configuration = configuration;
_logger = factory.CreateLogger<Kube>();
_kubeApi = kubeApi;
_serviceBuilder = serviceBuilder;

_subscription = CreateSubscription();
}

public virtual async Task<List<Service>> GetAsync()
{
// need to wait for first result fetching somehow
if (_services is null)
{
await Task.Delay(1000);
}

if (_services is not { Count: > 0 })
{
_logger.LogWarning(() => GetMessage("Subscription to service endpoints gave no results!"));
}

return _services;
}

private IDisposable CreateSubscription() =>
_kubeApi
.EndpointsV1()
.Watch(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace)
.Subscribe(
onNext: endpointEvent =>
{
_services = endpointEvent.EventType switch
{
ResourceEventType.Deleted or ResourceEventType.Error => new(),
_ when (endpointEvent.Resource?.Subsets?.Count ?? 0) == 0 => new(),
_ => _serviceBuilder.BuildServices(_configuration, endpointEvent.Resource).ToList(),
};
},
onError: ex =>
{
// recreate subscription in case of exceptions?
_logger.LogError(() => GetMessage("Endpoints subscription error occured"), ex);
},
onCompleted: () =>
{
// called only when subscription is cancelled
_logger.LogWarning(() => GetMessage("Subscription to service endpoints completed"));
});

private string GetMessage(string message)
=> $"{nameof(WatchKube)} provider. Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; {message}";

public void Dispose() => _subscription.Dispose();
}

0 comments on commit 59e55fc

Please sign in to comment.