diff --git a/bom/pom.xml b/bom/pom.xml index 44688c90..38548b2b 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -86,6 +86,11 @@ stork-service-discovery-consul ${project.version} + + io.smallrye.stork + stork-service-discovery-dns + ${project.version} + io.smallrye.stork stork-service-discovery-eureka diff --git a/core/revapi.json b/core/revapi.json index 0c68dd13..b673bc81 100644 --- a/core/revapi.json +++ b/core/revapi.json @@ -31,7 +31,15 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method java.time.Duration io.smallrye.stork.utils.DurationUtils::parseDuration(java.lang.String)", + "new": "method java.time.Duration io.smallrye.stork.utils.DurationUtils::parseDuration(java.lang.String, java.lang.String)", + "justification": "Parameter name added to the `parseDuration` method. The old version assumed the parsing is done for refresh-period which is not always true" + } + ] } }, { "extension" : "revapi.reporter.json", diff --git a/core/src/main/java/io/smallrye/stork/impl/CachingServiceDiscovery.java b/core/src/main/java/io/smallrye/stork/impl/CachingServiceDiscovery.java index 28ea18a1..fcbad429 100644 --- a/core/src/main/java/io/smallrye/stork/impl/CachingServiceDiscovery.java +++ b/core/src/main/java/io/smallrye/stork/impl/CachingServiceDiscovery.java @@ -22,8 +22,10 @@ public abstract class CachingServiceDiscovery implements ServiceDiscovery { private static final Logger log = LoggerFactory.getLogger(CachingServiceDiscovery.class); + public static final String REFRESH_PERIOD = "refresh-period"; public final Duration refreshPeriod; + public static final String DEFAULT_REFRESH_INTERVAL = "5M"; private volatile List lastResults; @@ -33,9 +35,9 @@ public abstract class CachingServiceDiscovery implements ServiceDiscovery { public CachingServiceDiscovery(String refreshPeriod) { try { // TODO: document it - this.refreshPeriod = DurationUtils.parseDuration(refreshPeriod); + this.refreshPeriod = DurationUtils.parseDuration(refreshPeriod, REFRESH_PERIOD); } catch (DateTimeParseException e) { - throw new IllegalArgumentException("refresh-period for service discovery should be a number, got: " + + throw new IllegalArgumentException(REFRESH_PERIOD + " for service discovery should be a number, got: " + refreshPeriod, e); } diff --git a/core/src/main/java/io/smallrye/stork/utils/DurationUtils.java b/core/src/main/java/io/smallrye/stork/utils/DurationUtils.java index 5b11d2a9..8f7b2a5b 100644 --- a/core/src/main/java/io/smallrye/stork/utils/DurationUtils.java +++ b/core/src/main/java/io/smallrye/stork/utils/DurationUtils.java @@ -20,11 +20,12 @@ private DurationUtils() { * Otherwise, tries to convert the value assuming that it is in the accepted ISO-8601 duration format. * * @param duration duration as String + * @param parameter the parameter for which we parse the value to duration * @return {@link Duration} */ - public static Duration parseDuration(String duration) { + public static Duration parseDuration(String duration, String parameter) { if (duration.startsWith("-")) { - throw new IllegalArgumentException("Negative refresh-period specified for service discovery: " + duration); + throw new IllegalArgumentException("Negative " + parameter + " specified for service discovery: " + duration); } if (DIGITS.asPredicate().test(duration)) { return Duration.ofSeconds(Long.parseLong(duration)); diff --git a/core/src/main/java/io/smallrye/stork/utils/StorkAddressUtils.java b/core/src/main/java/io/smallrye/stork/utils/StorkAddressUtils.java index f655bd38..5d8d8c79 100644 --- a/core/src/main/java/io/smallrye/stork/utils/StorkAddressUtils.java +++ b/core/src/main/java/io/smallrye/stork/utils/StorkAddressUtils.java @@ -8,25 +8,25 @@ public final class StorkAddressUtils { /** - * Creates a new {@link HostAndPort} instance. + * Creates a new {@link HostAndPort} instance from an address. * - * @param serviceAddress the address - * @param defaultPort the default port - * @param serviceName the service name + * @param address the address, either {@code host:port} or just {@code host} + * @param defaultPort the default port, used when the address doesn't provide the port + * @param configPlace the location of the address in the configuration, for logging purposes * @return the new HostAndPort */ - public static HostAndPort parseToHostAndPort(String serviceAddress, + public static HostAndPort parseToHostAndPort(String address, int defaultPort, - String serviceName) { - if (serviceAddress == null || serviceAddress.isBlank()) { - throw new IllegalArgumentException("Blank or null address: '" + serviceAddress + "'"); + String configPlace) { + if (address == null || address.isBlank()) { + throw new IllegalArgumentException("Blank or null address: '" + address + "'"); } - if (serviceAddress.charAt(0) == '[') { - return parseIpV6AddressWithSquareBracket(serviceAddress, defaultPort, serviceName); - } else if (countColons(serviceAddress) > 1) { - return new HostAndPort(serviceAddress, defaultPort); + if (address.charAt(0) == '[') { + return parseIpV6AddressWithSquareBracket(address, defaultPort, configPlace); + } else if (countColons(address) > 1) { + return new HostAndPort(address, defaultPort); } else { - return parseNonIpv6Adress(serviceAddress, defaultPort, serviceName); + return parseNonIpv6Adress(address, defaultPort, configPlace); } } @@ -67,14 +67,14 @@ private static HostAndPort parseIpV6AddressWithSquareBracket(String serviceAddre if (!done) { throw new IllegalArgumentException( format("IPv6 Address with a square bracket '[' does not have a matching closing square bracket ']' " + - "in address '%s' for service: '%s'", serviceAddress, serviceName)); + "in address '%s' for: '%s'", serviceAddress, serviceName)); } if (++i == serviceAddress.length()) { return new HostAndPort(host.toString(), defaultPort); } else if (serviceAddress.charAt(i) != ':') { throw new IllegalArgumentException( - format("Unexpected character '%c' at character %d in address '%s' for service: '%s'", + format("Unexpected character '%c' at character %d in address '%s' for: '%s'", serviceAddress.charAt(i), i, serviceAddress, serviceName)); } else { int port = 0; @@ -86,7 +86,7 @@ private static HostAndPort parseIpV6AddressWithSquareBracket(String serviceAddre } else { throw new IllegalArgumentException( format("Unexpected character '%c' while parsing port number in " + - "address '%s' for service '%s', at character %d, expected a digit", c, serviceName, + "address '%s' for '%s', at character %d, expected a digit", c, serviceName, serviceAddress, i)); } } diff --git a/core/src/test/java/io/smallrye/stork/utils/DurationUtilsTest.java b/core/src/test/java/io/smallrye/stork/utils/DurationUtilsTest.java index bb9312c0..b9b8f18e 100644 --- a/core/src/test/java/io/smallrye/stork/utils/DurationUtilsTest.java +++ b/core/src/test/java/io/smallrye/stork/utils/DurationUtilsTest.java @@ -12,14 +12,14 @@ public class DurationUtilsTest { @Test public void testOnlyNumberValueProvided() { Duration expectedDuration = Duration.ofSeconds(3); - Duration actualDuration = DurationUtils.parseDuration("3"); + Duration actualDuration = DurationUtils.parseDuration("3", "refresh-period"); assertEquals(expectedDuration, actualDuration); } @Test public void testNumberWithUnitValueProvided() { Duration expectedDuration = Duration.ofMinutes(3); - Duration actualDuration = DurationUtils.parseDuration("3M"); + Duration actualDuration = DurationUtils.parseDuration("3M", "refresh-period"); assertEquals(expectedDuration, actualDuration); } @@ -27,7 +27,7 @@ public void testNumberWithUnitValueProvided() { public void testValueStartingWithNumberAndInCorrectFormatProvided() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> { - DurationUtils.parseDuration("-5"); + DurationUtils.parseDuration("-5", "refresh-period"); }).withMessage("Negative refresh-period specified for service discovery: -5"); } diff --git a/coverage/pom.xml b/coverage/pom.xml index 6deb333d..d4f6ee8f 100644 --- a/coverage/pom.xml +++ b/coverage/pom.xml @@ -54,6 +54,11 @@ stork-service-discovery-consul + + io.smallrye.stork + stork-service-discovery-dns + + io.smallrye.stork stork-service-discovery-kubernetes diff --git a/docs/diagrams/srv_sequence.puml b/docs/diagrams/srv_sequence.puml new file mode 100644 index 00000000..492e456b --- /dev/null +++ b/docs/diagrams/srv_sequence.puml @@ -0,0 +1,31 @@ +@startuml +skinparam participant { + BackgroundColor AliceBlue + ArrowColor DarkGrey + BorderColor DarkGrey +} + +skinparam roundcorner 20 + +skinparam sequence { + ArrowColor DarkGrey + ActorBorderColor DarkGrey + LifeLineBorderColor DarkGrey + LifeLineBackgroundColor #A9DCDF + +} +skinparam sequenceMessageAlign center + + +participant Application +participant Stork +participant "DNS Server" + +Application -> Stork : get service instances +Stork -> "DNS Server" : get SRV records for hostname +"DNS Server" -> Stork : list of SRV records +Stork -> "DNS Server" : get A/AAAA records for each SRV record target +"DNS Server" -> Stork : list of A/AAAA +Stork -> Application : list of ServiceInstances + +@enduml \ No newline at end of file diff --git a/docs/service-discovery/dns.md b/docs/service-discovery/dns.md new file mode 100644 index 00000000..dbbc0a7e --- /dev/null +++ b/docs/service-discovery/dns.md @@ -0,0 +1,56 @@ +# DNS Service Discovery + +DNS is a name resolution protocol used to determine IP addresses for hostnames. +That makes it a natural fit for service discovery. +Consul and AWS Cloud Map provide DNS resolutions for service discovery. + +This page explains how Stork can use DNS to handle the service discovery. + +## DNS records + +DNS supports a [variety of record types](https://en.wikipedia.org/wiki/List_of_DNS_record_types). Stork can resolve hostnames to addresses based on [_SRV_](https://en.wikipedia.org/wiki/SRV_record), A and AAAA records. +All these types of records may return multiple addresses for a single hostname. + +While _A_ and _AAAA_ records are quite similar, they just carry an IP (_v4_ for _A_ and _v6_ for _AAAA_), the _SRV_ records are different. +They contain a _weight_, a _target_ and a _port_ for a service instance. +The _target_ returned in an _SRV_ record needs to be resolved further by an _A_ or an _AAAA_ record. + +In short, it works as follows: +![SRV resolution](../target/srv_sequence.png) + +## Dependency + +To use the DNS service discovery, you need to add the Stork DNS Service Discovery provider dependency to your project: + +```xml + + io.smallrye.stork + stork-service-discovery-dns + {{version.current}} + +``` + +## Configuration + +Next, set the service discovery `type` to `dns`. +Additionally, you would usually specify the DNS server, or servers, to use for the discovery. +All in all, your configuration could look as follows: + +=== "stork standalone" + ```properties + stork.my-service.service-discovery.type=dns + # optional dns servers: + stork.my-service.service-discovery.dns-servers=my-dns-server:8221,my-dns-server2 + ``` + +=== "stork in quarkus" + ```properties + quarkus.stork.my-service.service-discovery.type=dns + + # optional dns servers: + quarkus.stork.my-service.service-discovery.dns-servers=my-dns-server:8221,my-dns-server2 + ``` + +All the available parameters are as follows: + +--8<-- "service-discovery/dns/target/classes/META-INF/stork-docs/dns-sd-attributes.txt" \ No newline at end of file diff --git a/load-balancer/least-response-time/src/main/java/io/smallrye/stork/loadbalancer/leastresponsetime/LeastResponseTimeLoadBalancer.java b/load-balancer/least-response-time/src/main/java/io/smallrye/stork/loadbalancer/leastresponsetime/LeastResponseTimeLoadBalancer.java index 9fd40bac..012e35ea 100644 --- a/load-balancer/least-response-time/src/main/java/io/smallrye/stork/loadbalancer/leastresponsetime/LeastResponseTimeLoadBalancer.java +++ b/load-balancer/least-response-time/src/main/java/io/smallrye/stork/loadbalancer/leastresponsetime/LeastResponseTimeLoadBalancer.java @@ -1,5 +1,7 @@ package io.smallrye.stork.loadbalancer.leastresponsetime; +import static io.smallrye.stork.impl.CachingServiceDiscovery.REFRESH_PERIOD; + import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collection; @@ -35,7 +37,7 @@ public class LeastResponseTimeLoadBalancer implements LoadBalancer { * @param config the configuration, must not be {@code null} */ public LeastResponseTimeLoadBalancer(LeastResponseTimeConfiguration config) { - long errorPenalty = DurationUtils.parseDuration(config.getErrorPenalty()).toNanos(); + long errorPenalty = DurationUtils.parseDuration(config.getErrorPenalty(), REFRESH_PERIOD).toNanos(); double decliningFactor = Double.parseDouble(config.getDecliningFactor()); powersOfDecliningFactor = new FastPower(decliningFactor); callStatistics = new CallStatistics(errorPenalty, powersOfDecliningFactor); diff --git a/load-balancer/sticky/src/main/java/io/smallrye/stork/loadbalancer/random/StickyLoadBalancerProvider.java b/load-balancer/sticky/src/main/java/io/smallrye/stork/loadbalancer/random/StickyLoadBalancerProvider.java index 320be9a9..ea8ba7d8 100644 --- a/load-balancer/sticky/src/main/java/io/smallrye/stork/loadbalancer/random/StickyLoadBalancerProvider.java +++ b/load-balancer/sticky/src/main/java/io/smallrye/stork/loadbalancer/random/StickyLoadBalancerProvider.java @@ -22,6 +22,6 @@ public class StickyLoadBalancerProvider @Override public LoadBalancer createLoadBalancer(StickyConfiguration config, ServiceDiscovery serviceDiscovery) { - return new StickyLoadBalancer(DurationUtils.parseDuration(config.getFailureBackoffTime())); + return new StickyLoadBalancer(DurationUtils.parseDuration(config.getFailureBackoffTime(), FAILURE_BACKOFF_TIME)); } } diff --git a/mkdocs.yml b/mkdocs.yml index 69730fc7..89bcd003 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -10,6 +10,7 @@ nav: - Using Stork with Quarkus: './quarkus.md' - Service Discovery: - Consul: 'service-discovery/consul.md' + - DNS: 'service-discovery/dns.md' - Kubernetes: 'service-discovery/kubernetes.md' - Eureka: 'service-discovery/eureka.md' - Composite: 'service-discovery/composite.md' diff --git a/pom.xml b/pom.xml index 3117746d..82ffbfcd 100644 --- a/pom.xml +++ b/pom.xml @@ -42,6 +42,8 @@ 4.2.2 2.22.0 + + 1.17.3 5.12.2 1.7.36 @@ -311,6 +313,7 @@ microprofile service-discovery/static-list service-discovery/consul + service-discovery/dns service-discovery/kubernetes service-discovery/eureka service-discovery/composite diff --git a/service-discovery/consul/pom.xml b/service-discovery/consul/pom.xml index 139c2e6a..236a6917 100644 --- a/service-discovery/consul/pom.xml +++ b/service-discovery/consul/pom.xml @@ -26,7 +26,7 @@ stork-service-discovery-consul - SmallRye Stork Service Discovery : Consul List + SmallRye Stork Service Discovery : Consul false diff --git a/service-discovery/consul/src/main/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscoveryProvider.java b/service-discovery/consul/src/main/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscoveryProvider.java index 7218f111..b8c8a160 100644 --- a/service-discovery/consul/src/main/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscoveryProvider.java +++ b/service-discovery/consul/src/main/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscoveryProvider.java @@ -4,6 +4,7 @@ import io.smallrye.stork.api.config.ServiceConfig; import io.smallrye.stork.api.config.ServiceDiscoveryAttribute; import io.smallrye.stork.api.config.ServiceDiscoveryType; +import io.smallrye.stork.impl.CachingServiceDiscovery; import io.smallrye.stork.spi.ServiceDiscoveryProvider; import io.smallrye.stork.spi.StorkInfrastructure; import io.vertx.core.Vertx; @@ -15,7 +16,7 @@ @ServiceDiscoveryAttribute(name = "consul-port", description = "The Consul port.", defaultValue = "8500") @ServiceDiscoveryAttribute(name = "use-health-checks", description = "Whether to use health check.", defaultValue = "true") @ServiceDiscoveryAttribute(name = "application", description = "The application name; if not defined Stork service name will be used.") -@ServiceDiscoveryAttribute(name = "refresh-period", description = "Service discovery cache refresh period.", defaultValue = "5M") +@ServiceDiscoveryAttribute(name = "refresh-period", description = "Service discovery cache refresh period.", defaultValue = CachingServiceDiscovery.DEFAULT_REFRESH_INTERVAL) @ServiceDiscoveryAttribute(name = "secure", description = "whether the connection with the service should be encrypted with TLS.") @ServiceDiscoveryType("consul") public class ConsulServiceDiscoveryProvider implements ServiceDiscoveryProvider { diff --git a/service-discovery/dns/pom.xml b/service-discovery/dns/pom.xml new file mode 100644 index 00000000..ba9151ad --- /dev/null +++ b/service-discovery/dns/pom.xml @@ -0,0 +1,89 @@ + + + + 4.0.0 + + + io.smallrye.stork + stork-parent + 1.1.3-SNAPSHOT + ../../pom.xml + + + stork-service-discovery-dns + + SmallRye Stork Service Discovery : DNS + + + false + + + + + io.smallrye.stork + stork-core + + + io.smallrye.stork + stork-configuration-generator + + + io.vertx + vertx-core + + + io.vertx + vertx-consul-client + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + + + io.smallrye.stork + stork-test-utils + test + + + org.junit.jupiter + junit-jupiter + test + + + io.smallrye.config + smallrye-config + test + + + org.assertj + assertj-core + test + + + org.awaitility + awaitility + test + + + diff --git a/service-discovery/dns/revapi.json b/service-discovery/dns/revapi.json new file mode 100644 index 00000000..d3c8ce08 --- /dev/null +++ b/service-discovery/dns/revapi.json @@ -0,0 +1,46 @@ +[ { + "extension" : "revapi.java", + "id" : "java", + "configuration" : { + "missing-classes" : { + "behavior" : "report", + "ignoreMissingAnnotations" : false + } + } +}, { + "extension" : "revapi.filter", + "configuration" : { + "elements" : { + "include" : [ { + "matcher" : "java-package", + "match" : "/io\\.smallrye\\.stork\\.servicediscovery\\.dns(\\..*)?/" + } ] + } + } +}, { + "extension" : "revapi.differences", + "id" : "breaking-changes", + "configuration" : { + "criticality" : "highlight", + "minSeverity" : "POTENTIALLY_BREAKING", + "minCriticality" : "documented", + "differences" : [ ] + } +}, { + "extension" : "revapi.reporter.json", + "configuration" : { + "minSeverity" : "POTENTIALLY_BREAKING", + "minCriticality" : "documented", + "output" : "target/compatibility.json", + "indent" : true, + "append" : false, + "keepEmptyFile" : true + } +}, { + "extension" : "revapi.reporter.text", + "configuration" : { + "minSeverity" : "POTENTIALLY_BREAKING", + "minCriticality" : "documented", + "output" : "out" + } +} ] \ No newline at end of file diff --git a/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsMetadataKey.java b/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsMetadataKey.java new file mode 100644 index 00000000..be955a45 --- /dev/null +++ b/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsMetadataKey.java @@ -0,0 +1,36 @@ +package io.smallrye.stork.servicediscovery.dns; + +import io.smallrye.stork.api.MetadataKey; + +/** + * The DNS metadata keys. + */ +public enum DnsMetadataKey implements MetadataKey { + + /** + * DNS hostname. + */ + DNS_NAME("dns-name"), + + /** + * In case of SRV queries, weight for the target. Otherwise, 1. + */ + DNS_WEIGHT("dns-weight"); + + private final String name; + + /** + * Creates a new DnsMetadataKey + * + * @param name the name + */ + DnsMetadataKey(String name) { + this.name = name; + } + + @Override + public String getName() { + return name; + } + +} diff --git a/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsRecordType.java b/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsRecordType.java new file mode 100644 index 00000000..5ad55e69 --- /dev/null +++ b/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsRecordType.java @@ -0,0 +1,7 @@ +package io.smallrye.stork.servicediscovery.dns; + +public enum DnsRecordType { + SRV, + A, + AAAA +} diff --git a/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsServiceDiscovery.java b/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsServiceDiscovery.java new file mode 100644 index 00000000..b1c58110 --- /dev/null +++ b/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsServiceDiscovery.java @@ -0,0 +1,278 @@ +package io.smallrye.stork.servicediscovery.dns; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.MultiEmitter; +import io.smallrye.mutiny.tuples.Tuple2; +import io.smallrye.stork.api.Metadata; +import io.smallrye.stork.api.ServiceInstance; +import io.smallrye.stork.impl.CachingServiceDiscovery; +import io.smallrye.stork.impl.DefaultServiceInstance; +import io.smallrye.stork.utils.DurationUtils; +import io.smallrye.stork.utils.HostAndPort; +import io.smallrye.stork.utils.ServiceInstanceIds; +import io.smallrye.stork.utils.ServiceInstanceUtils; +import io.smallrye.stork.utils.StorkAddressUtils; +import io.vertx.core.Vertx; +import io.vertx.core.dns.DnsClient; +import io.vertx.core.dns.DnsClientOptions; +import io.vertx.core.dns.SrvRecord; + +/** + * A service discovery implementation retrieving services from DNS. + */ +public class DnsServiceDiscovery extends CachingServiceDiscovery { + + private static final Logger log = LoggerFactory.getLogger(DnsServiceDiscovery.class); + + private final String serviceName; + private final String hostname; + private final DnsRecordType recordType; + private final boolean secure; + private final Integer port; + private final boolean failOnError; + private final long dnsTimeoutMs; + private final boolean recursionDesired; + + // we'll use one resolver to resolve DNS server addresses and create another resolver backed up by them + final Map dnsClients = new HashMap<>(); + + DnsServiceDiscovery(String serviceName, DnsConfiguration config, Vertx vertx) { + super(config.getRefreshPeriod()); + this.serviceName = serviceName; + this.secure = isSecure(config); + this.recordType = recordType(config.getRecordType()); + this.failOnError = Boolean.parseBoolean(config.getFailOnError()); + this.recursionDesired = Boolean.parseBoolean(config.getRecursionDesired()); + this.dnsTimeoutMs = DurationUtils.parseDuration(config.getDnsTimeout(), "DNS timeout") + .toMillis(); + + String dnsServersString = config.getDnsServers(); + + if (dnsServersString != null && !dnsServersString.isBlank() + && !"none".equalsIgnoreCase(dnsServersString)) { + for (String dnsServer : dnsServersString.split(",")) { + HostAndPort hostAndPort = StorkAddressUtils.parseToHostAndPort(dnsServer, 53, + "DNS server address for service " + serviceName); + dnsClients.put(dnsServer, createClient(vertx, hostAndPort)); + } + } + if (dnsClients.isEmpty()) { + dnsClients.put("Default DNS", createClient(vertx, null)); + } + this.hostname = config.getHostname() == null ? serviceName : config.getHostname(); + + try { + this.port = config.getPort() == null ? null : Integer.parseInt(config.getPort()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid port for service " + serviceName, e); + } + if (this.port == null && recordType != DnsRecordType.SRV) { + throw new IllegalArgumentException( + "DNS service discovery for record types different than SRV require service instance port to be specified"); + } + } + + private DnsRecordType recordType(String recordType) { + String recordTypeString = recordType.toUpperCase(Locale.ROOT); + try { + return DnsRecordType.valueOf(recordTypeString); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid DNS record type '" + recordType + "' for service " + serviceName + + ". The available types are " + Arrays.toString(DnsRecordType.values()), e); + } + } + + private DnsClient createClient(Vertx vertx, HostAndPort hostAndPort) { + DnsClientOptions options = new DnsClientOptions() + .setQueryTimeout(dnsTimeoutMs) + .setRecursionDesired(recursionDesired); + if (hostAndPort != null) { + options.setHost(hostAndPort.host).setPort(hostAndPort.port); + } + return vertx.createDnsClient(options); + } + + @Override + public Uni> fetchNewServiceInstances(List previousInstances) { + AtomicInteger queriesLeft = new AtomicInteger(dnsClients.size()); + AtomicBoolean successRecorded = new AtomicBoolean(); + if (recordType == DnsRecordType.SRV) { + return resolveSRV(previousInstances); + } else { + return resolveAorAAA(previousInstances, queriesLeft, successRecorded); + } + } + + private Uni> resolveAorAAA(List previousInstances, AtomicInteger queriesLeft, + AtomicBoolean successRecorded) { + Multi serviceInstances = Multi.createFrom().emitter( + em -> { + for (Map.Entry dnsClient : dnsClients.entrySet()) { + DnsClient client = dnsClient.getValue(); + + switch (recordType) { + case A: + client.resolveA(hostname) + .onFailure(error -> handleResolutionFailure(error, queriesLeft, em, dnsClient.getKey())) + .onSuccess(lst -> handleStringResolution(lst, em, queriesLeft, previousInstances, + successRecorded)); + break; + case AAAA: + client.resolveAAAA(hostname) + .onFailure(error -> handleResolutionFailure(error, queriesLeft, em, dnsClient.getKey())) + .onSuccess(lst -> handleStringResolution(lst, em, queriesLeft, previousInstances, + successRecorded)); + break; + default: + em.fail(new IllegalStateException("Unsupported DNS record type " + recordType)); + break; + } + } + }); + return collectResults(successRecorded, serviceInstances); + } + + private Uni> collectResults(AtomicBoolean successRecorded, Multi serviceInstances) { + return serviceInstances.collect().asList() + // we handle two kinds of results here: + // 1. (at least one success && !failOnError) || all successful; incl all returning empty + // 2. no success on either of queries + .onItem().transformToUni( + result -> { + if (successRecorded.get()) { + return Uni.createFrom().item(result); + } else { + return Uni.createFrom().failure( + new RuntimeException("No DNS server was able to resolve '" + hostname + '\'')); + } + }); + } + + private Uni> resolveSRV(List previousInstances) { + AtomicInteger queriesLeft = new AtomicInteger(dnsClients.size()); + AtomicBoolean successRecorded = new AtomicBoolean(); + // it may be okay to have a failure when resolving the SRV + Multi> records = Multi.createFrom().emitter( + em -> { + for (Map.Entry clientEntry : dnsClients.entrySet()) { + DnsClient client = clientEntry.getValue(); + + client.resolveSRV(hostname) + .onFailure(error -> handleResolutionFailure(error, queriesLeft, em, clientEntry.getKey())) + .onSuccess(lst -> { + successRecorded.set(true); + for (SrvRecord record : lst) { + em.emit(Tuple2.of(client, record)); + } + if (queriesLeft.decrementAndGet() == 0) { + em.complete(); + } + }); + } + }); + // but it's not okay when SRV target is resolved to ip + Multi instances = records.onItem().transformToUni( + record -> { + String target = record.getItem2().target(); + DnsClient client = record.getItem1(); + // TODO : an option to specify that one of these queries could be skipped + Uni> aInstances = Uni.createFrom().emitter(em -> client.resolveA(target, addresses -> { + if (addresses.failed()) { + log.warn("Failed to lookup the address retrieved from DNS: " + target, addresses.cause()); + em.complete(Collections.emptyList()); + } else { + em.complete(addresses.result()); + } + })); + Uni> aaaaInstances = Uni.createFrom().emitter(em -> client.resolveAAAA(target, addresses -> { + if (addresses.failed()) { + log.warn("Failed to lookup the address retrieved from DNS: " + target, addresses.cause()); + em.complete(Collections.emptyList()); + } else { + em.complete(addresses.result()); + } + })); + return Uni.combine().all().unis(aInstances, aaaaInstances) + .combinedWith((strings, strings2) -> { + List result = new ArrayList<>(strings); + result.addAll(strings2); + if (result.isEmpty()) { + log.warn("Failed to resolve ip address for target from SRV request: " + target); + } + return result; + }).onItem().transform( + addresses -> addresses.stream() + .map(address -> toStorkServiceInstance(address, record.getItem2().port(), + record.getItem2().weight(), previousInstances)) + .collect(Collectors.toList())); + }).concatenate() + .onItem().transformToMulti(l -> Multi.createFrom().iterable(l)) + .concatenate(); + + return collectResults(successRecorded, instances); + } + + private void handleStringResolution(List lst, MultiEmitter em, AtomicInteger queriesLeft, + List previousInstances, AtomicBoolean successRecorded) { + for (String target : lst) { + em.emit(toStorkServiceInstance(target, port, 1, previousInstances)); + } + successRecorded.set(true); + if (queriesLeft.decrementAndGet() == 0) { + em.complete(); + } + } + + private void handleResolutionFailure(Throwable error, AtomicInteger queriesLeft, MultiEmitter em, String dnsServer) { + String message = "Failure resolving name " + hostname + " with " + dnsServer; + log.warn(message, error); + if (failOnError) { + em.fail(new RuntimeException(message, error)); + } + if (queriesLeft.decrementAndGet() == 0) { + em.complete(); + } + } + + private ServiceInstance toStorkServiceInstance(String target, int port, int weight, + List previousInstances) { + if (this.port != null) { + port = this.port; + } + Metadata dnsMetadata = createDnsMetadata(hostname, weight); + + ServiceInstance matching = ServiceInstanceUtils.findMatching(previousInstances, target, port); + if (matching == null) { + return new DefaultServiceInstance(ServiceInstanceIds.next(), + target, port, secure, Collections.emptyMap(), dnsMetadata); + } else { + return matching; + } + } + + private Metadata createDnsMetadata(String hostname, int weight) { + Metadata dnsMetadata = Metadata.of(DnsMetadataKey.class); + dnsMetadata = dnsMetadata.with(DnsMetadataKey.DNS_NAME, hostname); + dnsMetadata = dnsMetadata.with(DnsMetadataKey.DNS_WEIGHT, weight); + return dnsMetadata; + } + + private boolean isSecure(DnsConfiguration config) { + return config.getSecure() != null && Boolean.parseBoolean(config.getSecure()); + } +} diff --git a/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsServiceDiscoveryProvider.java b/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsServiceDiscoveryProvider.java new file mode 100644 index 00000000..73c13f75 --- /dev/null +++ b/service-discovery/dns/src/main/java/io/smallrye/stork/servicediscovery/dns/DnsServiceDiscoveryProvider.java @@ -0,0 +1,36 @@ +package io.smallrye.stork.servicediscovery.dns; + +import io.smallrye.stork.api.ServiceDiscovery; +import io.smallrye.stork.api.config.ServiceConfig; +import io.smallrye.stork.api.config.ServiceDiscoveryAttribute; +import io.smallrye.stork.api.config.ServiceDiscoveryType; +import io.smallrye.stork.impl.CachingServiceDiscovery; +import io.smallrye.stork.spi.ServiceDiscoveryProvider; +import io.smallrye.stork.spi.StorkInfrastructure; +import io.vertx.core.Vertx; + +/** + * DNS-based service discovery implementation + */ +@ServiceDiscoveryAttribute(name = "dns-servers", description = "Comma separated list of dns servers. " + + "Servers can either be in the `server:port` or just `server` form. Use `none` to use the system resolver.", defaultValue = "none") +@ServiceDiscoveryAttribute(name = "hostname", description = "The hostname to look up; if not defined Stork service name will be used.") +@ServiceDiscoveryAttribute(name = "record-type", description = "Type of the DNS record. A, AAAA and SRV records are supported", defaultValue = "SRV") +@ServiceDiscoveryAttribute(name = "port", description = "Port of the service instances. " + + "Required if the record type is other than SRV.") +@ServiceDiscoveryAttribute(name = "refresh-period", description = "Service discovery cache refresh period.", defaultValue = CachingServiceDiscovery.DEFAULT_REFRESH_INTERVAL) +@ServiceDiscoveryAttribute(name = "secure", description = "Whether the connection with the service should be encrypted with TLS.") +@ServiceDiscoveryAttribute(name = "recursion-desired", description = "Whether DNS recursion is desired", defaultValue = "true") +@ServiceDiscoveryAttribute(name = "dns-timeout", description = "Timeout for DNS queries", defaultValue = "5s") +@ServiceDiscoveryAttribute(name = "fail-on-error", description = "Whether an error in retrieving service instances " + + "from one of the DNS servers should cause a failure of the discovery attempt.", defaultValue = "false") +@ServiceDiscoveryType("dns") +public class DnsServiceDiscoveryProvider implements ServiceDiscoveryProvider { + + @Override + public ServiceDiscovery createServiceDiscovery(DnsConfiguration config, String serviceName, + ServiceConfig serviceConfig, StorkInfrastructure storkInfrastructure) { + return new DnsServiceDiscovery(serviceName, config, storkInfrastructure.get(Vertx.class, Vertx::vertx)); + + } +} diff --git a/service-discovery/dns/src/test/java/io/smallrye/stork/servicediscovery/dns/DnsServiceDiscoveryTest.java b/service-discovery/dns/src/test/java/io/smallrye/stork/servicediscovery/dns/DnsServiceDiscoveryTest.java new file mode 100644 index 00000000..a10a63b2 --- /dev/null +++ b/service-discovery/dns/src/test/java/io/smallrye/stork/servicediscovery/dns/DnsServiceDiscoveryTest.java @@ -0,0 +1,398 @@ +package io.smallrye.stork.servicediscovery.dns; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.HostConfig; +import com.github.dockerjava.api.model.Ports; + +import io.smallrye.stork.Stork; +import io.smallrye.stork.api.Service; +import io.smallrye.stork.api.ServiceDefinition; +import io.smallrye.stork.api.ServiceInstance; +import io.smallrye.stork.test.StorkTestUtils; +import io.smallrye.stork.test.TestConfigProvider; +import io.smallrye.stork.utils.HostAndPort; +import io.smallrye.stork.utils.StorkAddressUtils; +import io.vertx.core.Vertx; +import io.vertx.ext.consul.ConsulClient; +import io.vertx.ext.consul.ConsulClientOptions; +import io.vertx.ext.consul.ServiceOptions; + +@Testcontainers +@DisabledOnOs(OS.WINDOWS) +public class DnsServiceDiscoveryTest { + private static final Logger log = LoggerFactory.getLogger(DnsServiceDiscoveryTest.class); + + public static final ExposedPort TCP_8500 = ExposedPort.tcp(8500); + public static final ExposedPort UDP_8600 = ExposedPort.udp(8600); + + private final Set registeredConsulServices = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + @Container + public GenericContainer consul = new GenericContainer<>(DockerImageName.parse("consul:1.9")) + .withCreateContainerCmdModifier(cmd -> { + HostConfig hostConfig = cmd.getHostConfig(); + Ports portBindings = hostConfig.getPortBindings(); + cmd.withExposedPorts(TCP_8500, UDP_8600); + portBindings.bind(UDP_8600, Ports.Binding.empty()); + portBindings.bind(TCP_8500, Ports.Binding.empty()); + hostConfig.withPortBindings(portBindings); + cmd.withHostConfig(hostConfig); + }); + + Stork stork; + int consulPort; + int dnsPort; + ConsulClient client; + long consulId; + + @BeforeEach + void setUp() { + TestConfigProvider.clear(); + Map portBindings = consul.getContainerInfo().getNetworkSettings().getPorts() + .getBindings(); + consulPort = Integer.parseInt(portBindings.get(TCP_8500)[0].getHostPortSpec()); + dnsPort = Integer.parseInt(portBindings.get(UDP_8600)[0].getHostPortSpec()); + client = ConsulClient.create(Vertx.vertx(), + new ConsulClientOptions().setHost("localhost").setPort(consulPort)); + stork = StorkTestUtils.getNewStorkInstance(); + + // without waiting we're sometimes in a state where container is not ready yet for second test + await().atMost(10, TimeUnit.SECONDS).until(this::clientCanTalkToConsul); + } + + @AfterEach + void cleanUp() throws InterruptedException { + deregisterServiceInstances(); + } + + @Test + void shouldGetInstancesForMavenOrg() { + String serviceName = "maven"; + + DnsConfiguration config = new DnsConfiguration().withHostname("maven.org").withRecordType("A") + .withPort("8392"); + stork.defineIfAbsent(serviceName, ServiceDefinition.of(config)); + + List serviceInstances = getServiceInstances(serviceName, 20); + assertThat(serviceInstances).isNotEmpty(); + for (ServiceInstance serviceInstance : serviceInstances) { + assertThat(serviceInstance.getPort()).isEqualTo(8392); + } + + } + + @Test + void shouldGetServiceInstanceIdsFromDns() throws InterruptedException { + //Given a service `my-service` registered in consul (available via DNS) and a refresh-period of 5 minutes + String serviceName = "my-service"; + + DnsConfiguration config = new DnsConfiguration().withDnsServers(getDnsIp() + ":" + dnsPort) + .withHostname("my-service.service.dc1.consul").withRefreshPeriod("5M") + .withPort("8111"); + stork.defineIfAbsent(serviceName, ServiceDefinition.of(config)); + + registerService(serviceName, "127.0.0.5:8406"); + + List instances = getServiceInstances(serviceName, 20); + assertThat(instances).isNotEmpty(); + assertThat(instances.get(0).getHost()).isEqualTo("127.0.0.5"); + } + + @Test + void shouldFailWithoutPortForA() throws InterruptedException { + //Given a service `my-service-2` registered in consul and a refresh-period of 5 minutes + String serviceName = "my-service-3"; + + DnsConfiguration config = new DnsConfiguration().withDnsServers(getDnsIp() + ":" + dnsPort) + .withHostname(serviceName + ".service.dc1.consul") + .withRecordType("A"); + assertThatThrownBy(() -> stork.defineIfAbsent(serviceName, ServiceDefinition.of(config))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void shouldFailWithoutPortForAAAA() throws InterruptedException { + //Given a service `my-service-2` registered in consul and a refresh-period of 5 minutes + String serviceName = "my-service-3"; + + DnsConfiguration config = new DnsConfiguration().withDnsServers(getDnsIp() + ":" + dnsPort) + .withHostname(serviceName + ".service.dc1.consul") + .withRecordType("AAAA"); + assertThatThrownBy(() -> stork.defineIfAbsent(serviceName, ServiceDefinition.of(config))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void shouldFetchA() throws InterruptedException { + //Given a service `my-service-2` registered in consul and a refresh-period of 5 minutes + String serviceName = "my-service-3"; + + DnsConfiguration config = new DnsConfiguration().withDnsServers(getDnsIp() + ":" + dnsPort) + .withHostname(serviceName + ".service.dc1.consul") + .withRecordType("A") + .withPort("8111"); + stork.defineIfAbsent(serviceName, ServiceDefinition.of(config)); + // 8333 port won't be included in A record, so the result should have 8111 + registerService(serviceName, "127.0.0.5:8333", "127.0.0.6"); + + List serviceInstances = getServiceInstances(serviceName, 5); + + assertThat(serviceInstances).hasSize(2); + + ServiceInstance five = serviceInstances.stream().filter(i -> i.getHost().endsWith("5")).findFirst().get(); + ServiceInstance six = serviceInstances.stream().filter(i -> i.getHost().endsWith("6")).findFirst().get(); + + assertThat(five.getPort()).isEqualTo(8111); + assertThat(six.getPort()).isEqualTo(8111); + } + + @Test + void shouldFetchSRVWithAValues() throws InterruptedException { + //Given a service `my-service-2` registered in consul and a refresh-period of 5 minutes + String serviceName = "my-service-x"; + + DnsConfiguration config = new DnsConfiguration().withDnsServers(getDnsIp() + ":" + dnsPort) + .withHostname(serviceName + ".service.dc1.consul"); + stork.defineIfAbsent(serviceName, ServiceDefinition.of(config)); + // 8333 port won't be included in A record, so the result should have 8111 + int port1 = 8333; + String ip1 = "127.1.1.23"; + int port2 = 8334; + String ip2 = "127.1.1.24"; + registerService(serviceName, "[" + ip1 + "]:" + port1, "[" + ip2 + "]:" + port2); + + List serviceInstances = getServiceInstances(serviceName, 5); + assertThat(serviceInstances).hasSize(2); + + ServiceInstance first = serviceInstances.stream().filter(i -> i.getHost().equals(ip1)).findFirst().get(); + ServiceInstance second = serviceInstances.stream().filter(i -> i.getHost().equals(ip2)).findFirst().get(); + + assertThat(first.getPort()).isEqualTo(port1); + assertThat(second.getPort()).isEqualTo(port2); + assertThat(first.getMetadata().getMetadata().get(DnsMetadataKey.DNS_WEIGHT)).isNotNull(); + assertThat(second.getPort()).isEqualTo(port2); + } + + @Test + void shouldFetchSRVWithAAAAValues() throws InterruptedException { + //Given a service `my-service-2` registered in consul and a refresh-period of 5 minutes + String serviceName = "my-service-x"; + + DnsConfiguration config = new DnsConfiguration().withDnsServers(getDnsIp() + ":" + dnsPort) + .withHostname(serviceName + ".service.dc1.consul"); + stork.defineIfAbsent(serviceName, ServiceDefinition.of(config)); + // 8333 port won't be included in A record, so the result should have 8111 + int port1 = 8333; + String ip1 = "2001:db8:85a3:0:0:8a2e:370:7334"; + int port2 = 8334; + String ip2 = "2001:db8:85a3:0:0:8a2e:370:7335"; + registerService(serviceName, "[" + ip1 + "]:" + port1, "[" + ip2 + "]:" + port2); + + Service service = stork.getService(serviceName); + AtomicReference> instances = new AtomicReference<>(); + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Consul", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + List serviceInstances = instances.get(); + assertThat(serviceInstances).hasSize(2); + + ServiceInstance first = serviceInstances.stream().filter(i -> i.getHost().equals(ip1)).findFirst().get(); + ServiceInstance second = serviceInstances.stream().filter(i -> i.getHost().equals(ip2)).findFirst().get(); + + assertThat(first.getPort()).isEqualTo(port1); + assertThat(second.getPort()).isEqualTo(port2); + } + + @Test + void shouldFetchAAAA() throws InterruptedException { + //Given a service `my-service-2` registered in consul and a refresh-period of 5 minutes + String serviceName = "my-service-3"; + + DnsConfiguration config = new DnsConfiguration().withDnsServers(getDnsIp() + ":" + dnsPort) + .withHostname(serviceName + ".service.dc1.consul") + .withRecordType("AAAA") + .withPort("8111"); + stork.defineIfAbsent(serviceName, ServiceDefinition.of(config)); + // 8333 port won't be included in A record, so the result should have 8111 + int port1 = 8333; + String ip1 = "2001:db8:85a3:0:0:8a2e:370:7334"; + String ip2 = "2001:db8:85a3:0:0:8a2e:370:7335"; + registerService(serviceName, "[" + ip1 + "]:" + port1, ip2); + + List serviceInstances = getServiceInstances(serviceName, 5); + + assertThat(serviceInstances).hasSize(2); + + ServiceInstance first = serviceInstances.stream().filter(i -> i.getHost().equals(ip1)).findFirst().get(); + ServiceInstance second = serviceInstances.stream().filter(i -> i.getHost().equals(ip2)).findFirst().get(); + + assertThat(first.getPort()).isEqualTo(8111); + assertThat(second.getPort()).isEqualTo(8111); + } + + @Test + void shouldRefetchWhenRefreshPeriodReached() throws InterruptedException { + //Given a service `my-service-2` registered in consul and a refresh-period of 5 minutes + String serviceName = "my-service-2"; + + DnsConfiguration config = new DnsConfiguration().withDnsServers(getDnsIp() + ":" + dnsPort) + .withHostname("my-service-2.service.dc1.consul").withRefreshPeriod("5s") + .withRecordType("SRV"); + stork.defineIfAbsent(serviceName, ServiceDefinition.of(config)); + registerService(serviceName, "127.0.0.5:8406"); + + List serviceInstances = getServiceInstances(serviceName, 20); + + assertThat(serviceInstances).isNotEmpty(); + ServiceInstance firstInstance = serviceInstances.get(0); + assertThat(firstInstance.getHost()).isEqualTo("127.0.0.5"); + long firstInstanceId = firstInstance.getId(); + + deregisterServiceInstances(); + //the service settings change in consul + registerService(serviceName, "127.0.0.6:8407", "127.0.0.5:8406"); + + Service service = stork.getService(serviceName); + // let's wait until the new services are populated to Stork (from DNS) + await().atMost(Duration.ofSeconds(7)) + .until(() -> service.getServiceDiscovery().getServiceInstances().await().indefinitely().size() == 2); + + AtomicReference> instances = new AtomicReference<>(); + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Consul", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + serviceInstances = instances.get(); + assertThat(serviceInstances.stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder( + "127.0.0.5", "127.0.0.6"); + long firstInstanceIdAfterRefetch = serviceInstances.stream() + .filter(instance -> instance.getHost().equals("127.0.0.5")) + .findFirst() + .orElseThrow() + .getId(); + assertThat(firstInstanceId).isEqualTo(firstInstanceIdAfterRefetch); + + ServiceInstance five = serviceInstances.stream().filter(i -> i.getHost().endsWith("5")).findFirst().get(); + ServiceInstance six = serviceInstances.stream().filter(i -> i.getHost().endsWith("6")).findFirst().get(); + + assertThat(five.getPort()).isEqualTo(8406); + assertThat(six.getPort()).isEqualTo(8407); + } + + private String getDnsIp() { + @SuppressWarnings("deprecation") + String dnsIp = consul.getContainerIpAddress(); + + if ("localhost".equals(dnsIp)) { + dnsIp = "127.0.0.1"; + } + return dnsIp; + } + + private List getServiceInstances(String serviceName, int seconds) { + AtomicReference> instances = new AtomicReference<>(); + + Service service = stork.getService(serviceName); + // call stork service discovery and gather service instances in the cache + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Consul", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(seconds)) + .until(() -> instances.get() != null); + return instances.get(); + } + + private void registerService(String application, + String... addresses) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(addresses.length); + Set consulServiceIds = new HashSet<>(); + for (String addressString : addresses) { + HostAndPort address = StorkAddressUtils.parseToHostAndPort(addressString, 0, ""); + String consulServiceId = "" + (consulId++); + client.registerService( + new ServiceOptions().setId(consulServiceId).setName(application) + .setAddress(address.host).setPort(address.port)) + .onComplete(result -> { + if (result.failed()) { + fail("Failed to register service in Consul " + address, result.cause()); + } else { + consulServiceIds.add(consulServiceId); + latch.countDown(); + } + }); + } + if (!latch.await(10, TimeUnit.SECONDS)) { + fail("Failed to register service in consul in time"); + } + registeredConsulServices.addAll(consulServiceIds); + } + + private void deregisterServiceInstances() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(registeredConsulServices.size()); + for (String id : registeredConsulServices) { + + log.info("unregistering service {}", id); + client.deregisterService(id, res -> { + if (res.succeeded()) { + log.info("unregistered service {}", id); + latch.countDown(); + } else { + fail("Failed to deregister service in consul", res.cause()); + } + }); + } + if (!latch.await(10, TimeUnit.SECONDS)) { + fail("Failed to deregister service in consul in time"); + } + registeredConsulServices.clear(); + } + + private boolean clientCanTalkToConsul() { + var done = new CompletableFuture<>(); + client.agentInfo().onFailure(done::completeExceptionally) + .onSuccess(done::complete); + try { + done.get(); + return true; + } catch (Exception any) { + return false; + } + } +} diff --git a/service-discovery/eureka/src/main/java/io/smallrye/stork/servicediscovery/eureka/EurekaServiceDiscoveryProvider.java b/service-discovery/eureka/src/main/java/io/smallrye/stork/servicediscovery/eureka/EurekaServiceDiscoveryProvider.java index 08653dfa..c940ca47 100644 --- a/service-discovery/eureka/src/main/java/io/smallrye/stork/servicediscovery/eureka/EurekaServiceDiscoveryProvider.java +++ b/service-discovery/eureka/src/main/java/io/smallrye/stork/servicediscovery/eureka/EurekaServiceDiscoveryProvider.java @@ -4,6 +4,7 @@ import io.smallrye.stork.api.config.ServiceConfig; import io.smallrye.stork.api.config.ServiceDiscoveryAttribute; import io.smallrye.stork.api.config.ServiceDiscoveryType; +import io.smallrye.stork.impl.CachingServiceDiscovery; import io.smallrye.stork.spi.ServiceDiscoveryProvider; import io.smallrye.stork.spi.StorkInfrastructure; @@ -18,7 +19,7 @@ @ServiceDiscoveryAttribute(name = "eureka-trust-all", description = "Enable/Disable the TLS certificate verification", defaultValue = "false") @ServiceDiscoveryAttribute(name = "eureka-tls", description = "Use TLS to connect to the Eureka server", defaultValue = "false") @ServiceDiscoveryAttribute(name = "instance", description = "The Eureka application instance Id") -@ServiceDiscoveryAttribute(name = "refresh-period", description = "Service discovery cache refresh period.", defaultValue = "5M") +@ServiceDiscoveryAttribute(name = "refresh-period", description = "Service discovery cache refresh period.", defaultValue = CachingServiceDiscovery.DEFAULT_REFRESH_INTERVAL) @ServiceDiscoveryAttribute(name = "secure", description = "Whether is should select the secured endpoint of the retrieved services.", defaultValue = "false") public class EurekaServiceDiscoveryProvider implements ServiceDiscoveryProvider { diff --git a/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryProvider.java b/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryProvider.java index b40c197b..bf737e6a 100644 --- a/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryProvider.java +++ b/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryProvider.java @@ -4,6 +4,7 @@ import io.smallrye.stork.api.config.ServiceConfig; import io.smallrye.stork.api.config.ServiceDiscoveryAttribute; import io.smallrye.stork.api.config.ServiceDiscoveryType; +import io.smallrye.stork.impl.CachingServiceDiscovery; import io.smallrye.stork.spi.ServiceDiscoveryProvider; import io.smallrye.stork.spi.StorkInfrastructure; import io.vertx.core.Vertx; @@ -15,7 +16,7 @@ @ServiceDiscoveryAttribute(name = "k8s-host", description = "The Kubernetes API host.") @ServiceDiscoveryAttribute(name = "k8s-namespace", description = "The namespace of the service. Use all to discover all namespaces.") @ServiceDiscoveryAttribute(name = "application", description = "The Kubernetes application Id; if not defined Stork service name will be used.") -@ServiceDiscoveryAttribute(name = "refresh-period", description = "Service discovery cache refresh period.", defaultValue = "5M") +@ServiceDiscoveryAttribute(name = "refresh-period", description = "Service discovery cache refresh period.", defaultValue = CachingServiceDiscovery.DEFAULT_REFRESH_INTERVAL) @ServiceDiscoveryAttribute(name = "secure", description = "Whether the connection with the service should be encrypted with TLS.") public class KubernetesServiceDiscoveryProvider implements ServiceDiscoveryProvider { diff --git a/service-discovery/static-list/src/main/java/io/smallrye/stork/servicediscovery/staticlist/StaticListServiceDiscoveryProvider.java b/service-discovery/static-list/src/main/java/io/smallrye/stork/servicediscovery/staticlist/StaticListServiceDiscoveryProvider.java index 9320edda..074d847e 100644 --- a/service-discovery/static-list/src/main/java/io/smallrye/stork/servicediscovery/staticlist/StaticListServiceDiscoveryProvider.java +++ b/service-discovery/static-list/src/main/java/io/smallrye/stork/servicediscovery/staticlist/StaticListServiceDiscoveryProvider.java @@ -38,7 +38,7 @@ public ServiceDiscovery createServiceDiscovery(StaticConfiguration config, Strin URL url = null; address = address.trim(); try { - HostAndPort hostAndPort = StorkAddressUtils.parseToHostAndPort(address, 80, serviceName); + HostAndPort hostAndPort = StorkAddressUtils.parseToHostAndPort(address, 80, "service '" + serviceName + "'"); addressList .add(new DefaultServiceInstance(ServiceInstanceIds.next(), hostAndPort.host, hostAndPort.port, isSecure(config.getSecure(), hostAndPort.port)));