Skip to content

Commit

Permalink
adds dataplane self-registration extension
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelmag110 committed Aug 21, 2024
1 parent 33e6360 commit 5c58554
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 2 deletions.
2 changes: 1 addition & 1 deletion edc-dataplane/edc-dataplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
runtimeOnly(project(":edc-extensions:dataplane:dataplane-proxy:edc-dataplane-proxy-consumer-api"))
runtimeOnly(project(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-core"))
runtimeOnly(project(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-api"))
runtimeOnly(project(":edc-extensions:dataplane:dataplane-self-registration"))

runtimeOnly(libs.edc.jsonld) // needed by the DataPlaneSignalingApi
runtimeOnly(libs.edc.core.did) // for the DID Public Key Resolver
Expand All @@ -43,7 +44,6 @@ dependencies {
runtimeOnly(libs.edc.controlplane.apiclient)

runtimeOnly(libs.edc.data.plane.selector.client)
runtimeOnly(libs.edc.data.plane.self.registration)
runtimeOnly(libs.edc.dpf.api.control)
runtimeOnly(libs.edc.dpf.api.signaling)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/


plugins {
`java-library`
}

dependencies {
implementation(libs.edc.spi.web)
implementation(libs.edc.spi.dataplane.selector)

testImplementation(libs.edc.junit)
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.dataplane.registration;

import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.health.HealthCheckResult;
import org.eclipse.edc.spi.system.health.HealthCheckService;
import org.eclipse.edc.spi.system.health.LivenessProvider;
import org.eclipse.edc.spi.system.health.ReadinessProvider;
import org.eclipse.edc.spi.system.health.StartupStatusProvider;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl;
import org.jetbrains.annotations.NotNull;

import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
import static org.eclipse.tractusx.edc.dataplane.registration.DataplaneSelfRegistrationExtension.NAME;
import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL;
import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH;

@Extension(NAME)
public class DataplaneSelfRegistrationExtension implements ServiceExtension {

private static final boolean DEFAULT_SELF_UNREGISTRATION = false;
public static final String NAME = "Dataplane Self Registration";
@Setting(value = "Enable data-plane un-registration at shutdown (not suggested for clustered environments)", type = "boolean", defaultValue = DEFAULT_SELF_UNREGISTRATION + "")
static final String SELF_UNREGISTRATION = "edc.data.plane.self.unregistration";
private final AtomicBoolean isRegistered = new AtomicBoolean(false);
private final AtomicReference<String> registrationError = new AtomicReference<>("Data plane self registration not complete");
@Inject
private DataPlaneSelectorService dataPlaneSelectorService;
@Inject
private ControlApiUrl controlApiUrl;
@Inject
private PipelineService pipelineService;
@Inject
private PublicEndpointGeneratorService publicEndpointGeneratorService;
@Inject
private HealthCheckService healthCheckService;

private ServiceExtensionContext context;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
this.context = context;
}

@Override
public void start() {
var transferTypes = Stream.concat(
toTransferTypes(PULL, publicEndpointGeneratorService.supportedDestinationTypes()),
toTransferTypes(PUSH, pipelineService.supportedSinkTypes())
);

var instance = DataPlaneInstance.Builder.newInstance()
.id(context.getRuntimeId())
.url(controlApiUrl.get().toString() + "/v1/dataflows")
.allowedSourceTypes(pipelineService.supportedSourceTypes())
.allowedDestTypes(pipelineService.supportedSinkTypes())
.allowedTransferType(transferTypes.collect(toSet()))
.build();

var monitor = context.getMonitor().withPrefix("DataPlaneHealthCheck");
var check = new DataPlaneHealthCheck();
healthCheckService.addReadinessProvider(check);
healthCheckService.addLivenessProvider(check);
healthCheckService.addStartupStatusProvider(check);

monitor.debug("Initiate data plane registration.");
dataPlaneSelectorService.addInstance(instance)
.onSuccess(it -> {
monitor.info("data plane registered to control plane");
isRegistered.set(true);
})
.onFailure(f -> registrationError.set(f.getFailureDetail()))
.orElseThrow(f -> new EdcException("Cannot register data plane to the control plane: " + f.getFailureDetail()));
}

@Override
public void shutdown() {
if (context.getConfig().getBoolean(SELF_UNREGISTRATION, DEFAULT_SELF_UNREGISTRATION)) {
dataPlaneSelectorService.unregister(context.getRuntimeId())
.onSuccess(it -> context.getMonitor().info("data plane successfully unregistered"))
.onFailure(failure -> context.getMonitor().severe("error during data plane de-registration. %s: %s"
.formatted(failure.getReason(), failure.getFailureDetail())));
}
}

private @NotNull Stream<String> toTransferTypes(FlowType pull, Set<String> types) {
return types.stream().map(it -> "%s-%s".formatted(it, pull));
}

private class DataPlaneHealthCheck implements LivenessProvider, ReadinessProvider, StartupStatusProvider {

@Override
public HealthCheckResult get() {
return HealthCheckResult.Builder.newInstance()
.component(NAME)
.success(isRegistered.get(), registrationError.get())
.build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0
#
# SPDX-License-Identifier: Apache-2.0
#
# Contributors:
# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
#
#

org.eclipse.tractusx.edc.dataplane.registration.DataplaneSelfRegistrationExtension

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.dataplane.registration;

import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.ServiceResult;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.health.HealthCheckService;
import org.eclipse.edc.web.spi.configuration.context.ControlApiUrl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(DependencyInjectionExtension.class)
class DataplaneSelfRegistrationExtensionTest {

private final DataPlaneSelectorService dataPlaneSelectorService = mock();
private final ControlApiUrl controlApiUrl = mock();
private final PipelineService pipelineService = mock();
private final PublicEndpointGeneratorService publicEndpointGeneratorService = mock();
private final HealthCheckService healthCheckService = mock();

@BeforeEach
void setUp(ServiceExtensionContext context) {
context.registerService(DataPlaneSelectorService.class, dataPlaneSelectorService);
context.registerService(ControlApiUrl.class, controlApiUrl);
context.registerService(PipelineService.class, pipelineService);
context.registerService(PublicEndpointGeneratorService.class, publicEndpointGeneratorService);
var monitor = mock(Monitor.class);
when(monitor.withPrefix(anyString())).thenReturn(mock());
context.registerService(Monitor.class, monitor);
context.registerService(HealthCheckService.class, healthCheckService);
}

@Test
void shouldRegisterInstanceAtStartup(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) throws MalformedURLException {
when(context.getRuntimeId()).thenReturn("runtimeId");
when(controlApiUrl.get()).thenReturn(URI.create("http://control/api/url"));
when(pipelineService.supportedSinkTypes()).thenReturn(Set.of("sinkType", "anotherSinkType"));
when(pipelineService.supportedSourceTypes()).thenReturn(Set.of("sourceType", "anotherSourceType"));
when(publicEndpointGeneratorService.supportedDestinationTypes()).thenReturn(Set.of("pullDestType", "anotherPullDestType"));
when(dataPlaneSelectorService.addInstance(any())).thenReturn(ServiceResult.success());

extension.initialize(context);
extension.start();

var captor = ArgumentCaptor.forClass(DataPlaneInstance.class);
verify(dataPlaneSelectorService).addInstance(captor.capture());
var dataPlaneInstance = captor.getValue();
assertThat(dataPlaneInstance.getId()).isEqualTo("runtimeId");
assertThat(dataPlaneInstance.getUrl()).isEqualTo(new URL("http://control/api/url/v1/dataflows"));
assertThat(dataPlaneInstance.getAllowedSourceTypes()).containsExactlyInAnyOrder("sourceType", "anotherSourceType");
assertThat(dataPlaneInstance.getAllowedDestTypes()).containsExactlyInAnyOrder("sinkType", "anotherSinkType");
assertThat(dataPlaneInstance.getAllowedTransferTypes())
.containsExactlyInAnyOrder("pullDestType-PULL", "anotherPullDestType-PULL", "sinkType-PUSH", "anotherSinkType-PUSH");

verify(healthCheckService).addStartupStatusProvider(any());
verify(healthCheckService).addLivenessProvider(any());
verify(healthCheckService).addReadinessProvider(any());
}

@Test
void shouldNotStart_whenRegistrationFails(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) {
when(controlApiUrl.get()).thenReturn(URI.create("http://control/api/url"));
when(dataPlaneSelectorService.addInstance(any())).thenReturn(ServiceResult.conflict("cannot register"));

extension.initialize(context);

assertThatThrownBy(extension::start).isInstanceOf(EdcException.class);
}

@Test
void shouldUnregisterInstanceAtShutdown(DataplaneSelfRegistrationExtension extension, ServiceExtensionContext context) {
when(context.getRuntimeId()).thenReturn("runtimeId");
when(dataPlaneSelectorService.unregister(any())).thenReturn(ServiceResult.success());
extension.initialize(context);

extension.shutdown();

verify(dataPlaneSelectorService).unregister("runtimeId");
}
}
1 change: 0 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ edc-dpf-api-public-v2 = { module = "org.eclipse.edc:data-plane-public-api-v2", v
edc-dpf-api-signaling = { module = "org.eclipse.edc:data-plane-signaling-api", version.ref = "edc" }
edc-data-plane-selector-control-api = { module = "org.eclipse.edc:data-plane-selector-control-api", version.ref = "edc" }
edc-data-plane-selector-client = { module = "org.eclipse.edc:data-plane-selector-client", version.ref = "edc" }
edc-data-plane-self-registration = { module = "org.eclipse.edc:data-plane-self-registration", version.ref = "edc" }

# micrometer and other infra stuff
edc-micrometer-core = { module = "org.eclipse.edc:micrometer-core", version.ref = "edc" }
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ include(":edc-extensions:data-flow-properties-provider")
// extensions - data plane
include(":edc-extensions:dataplane:dataplane-proxy:edc-dataplane-proxy-consumer-api")
include(":edc-extensions:dataplane:dataplane-selector-configuration")
include(":edc-extensions:dataplane:dataplane-self-registration")
include(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-core")
include(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-api")

Expand Down

0 comments on commit 5c58554

Please sign in to comment.