Skip to content

Commit

Permalink
Fixes: no data exchange possible (#95)
Browse files Browse the repository at this point in the history
* Give controller correct config

* Cleanup, fix smaller bugs

* Temporary fix of multiple AuthRequestFilters

* Add unified auth request filter
Inside public-api-management extension

* Remove extension specific authreq-filter
Also add unified one in dependencies
  • Loading branch information
carlos-schmidt authored Mar 16, 2024
1 parent 14802ce commit aab1736
Show file tree
Hide file tree
Showing 25 changed files with 791 additions and 353 deletions.
5 changes: 4 additions & 1 deletion client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ java {
}

dependencies {

// Centralized auth request filter
implementation(project(":public-api-management"))

// See this project's README.MD for explanations
implementation("$group:contract-core:$edcVersion")
implementation("$group:dsp-catalog-http-dispatcher:$edcVersion")
implementation("$group:management-api:$edcVersion")
implementation("$group:runtime-metamodel:$edcVersion")
implementation("$group:data-plane-http-spi:$edcVersion") // HttpDataAddress

implementation("jakarta.ws.rs:jakarta.ws.rs-api:${rsApi}")

testImplementation("$group:junit:$edcVersion")
Expand Down
45 changes: 21 additions & 24 deletions client/src/main/java/de/fraunhofer/iosb/client/ClientEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
/**
* Automated contract negotiation
*/
@Consumes({ MediaType.APPLICATION_JSON, MediaType.WILDCARD })
@Produces({ MediaType.APPLICATION_JSON })
@Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
@Produces({MediaType.APPLICATION_JSON})
@Path(ClientEndpoint.AUTOMATED_PATH)
public class ClientEndpoint {
/*
Expand All @@ -70,15 +70,15 @@ public class ClientEndpoint {
/**
* Initialize a client endpoint.
*
* @param policyService Finds out policy for a given asset id and provider
* EDC url.
* @param negotiator Send contract offer, negotiation status watch.
* @param transferInitiator Initiate transfer requests.
* @param monitor Logging functionality
* @param negotiationController Send contract offer, negotiation status watch.
* @param policyController Provides API for accepted policy management and provider dataset retrieval.
* @param transferController Initiate transfer requests.
*/
public ClientEndpoint(Monitor monitor,
NegotiationController negotiationController,
PolicyController policyController,
DataTransferController transferController) {
NegotiationController negotiationController,
PolicyController policyController,
DataTransferController transferController) {
this.monitor = monitor;

this.policyController = policyController;
Expand All @@ -91,13 +91,10 @@ public ClientEndpoint(Monitor monitor,
* of the services' policyDefinitionStore instance containing user added
* policyDefinitions. If more than one policyDefinitions are provided by the
* provider connector, an AmbiguousOrNullException will be thrown.
*
*
* @param providerUrl Provider of the asset.
* @param assetId Asset ID of the asset whose contract should be fetched.
* @return One policyDefinition offered by the provider for the given assetId.
* @throws InterruptedException Thread for agreementId was waiting, sleeping, or
* otherwise occupied, and was
* interrupted.
*/
@GET
@Path(DATASET_PATH)
Expand All @@ -123,18 +120,18 @@ public Response getDataset(@QueryParam("providerUrl") URL providerUrl, @QueryPar
* Negotiate a contract agreement using the given contract offer if no agreement
* exists for this constellation.
*
* @param providerUrl Provider EDCs URL (DSP endpoint)
* @param providerId Provider EDCs ID
* @param assetId ID of the asset to be retrieved
* @param providerUrl Provider EDCs URL (DSP endpoint)
* @param providerId Provider EDCs ID
* @param assetId ID of the asset to be retrieved
* @param dataDestinationUrl URL of destination data sink.
* @return Asset data
*/
@POST
@Path(NEGOTIATE_PATH)
public Response negotiateContract(@QueryParam("providerUrl") URL providerUrl,
@QueryParam("providerId") String providerId,
@QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
@QueryParam("providerId") String providerId,
@QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
monitor.debug(format("[Client] Received a %s POST request", NEGOTIATE_PATH));
Objects.requireNonNull(providerUrl, "Provider URL must not be null");
Objects.requireNonNull(providerId, "Provider ID must not be null");
Expand Down Expand Up @@ -205,17 +202,17 @@ public Response negotiateContract(ContractRequest contractRequest) {
/**
* Submits a data transfer request to the providerUrl.
*
* @param providerUrl The data provider's url
* @param agreementId The basis of the data transfer.
* @param assetId The asset of which the data should be transferred
* @param providerUrl The data provider's url
* @param agreementId The basis of the data transfer.
* @param assetId The asset of which the data should be transferred
* @param dataDestinationUrl URL of destination data sink.
* @return On success, the data of the desired asset. Else, returns an error message.
*/
@GET
@Path(TRANSFER_PATH)
public Response getData(@QueryParam("providerUrl") URL providerUrl,
@QueryParam("agreementId") String agreementId, @QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
@QueryParam("agreementId") String agreementId, @QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
monitor.debug(format("[Client] Received a %s GET request", TRANSFER_PATH));
Objects.requireNonNull(providerUrl, "providerUrl must not be null");
Objects.requireNonNull(agreementId, "agreementId must not be null");
Expand Down
66 changes: 33 additions & 33 deletions client/src/main/java/de/fraunhofer/iosb/client/ClientExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package de.fraunhofer.iosb.client;

import org.eclipse.edc.api.auth.spi.AuthenticationService;
import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.client.dataTransfer.DataTransferController;
import de.fraunhofer.iosb.client.negotiation.NegotiationController;
import de.fraunhofer.iosb.client.policy.PolicyController;
import org.eclipse.edc.connector.contract.spi.negotiation.ConsumerContractNegotiationManager;
import org.eclipse.edc.connector.contract.spi.negotiation.observe.ContractNegotiationObservable;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
Expand All @@ -27,45 +30,42 @@
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.web.spi.WebService;

import de.fraunhofer.iosb.client.dataTransfer.DataTransferController;
import de.fraunhofer.iosb.client.negotiation.NegotiationController;
import de.fraunhofer.iosb.client.policy.PolicyController;

public class ClientExtension implements ServiceExtension {

@Inject
private AuthenticationService authenticationService;
@Inject
private CatalogService catalogService;
@Inject
private ConsumerContractNegotiationManager consumerNegotiationManager;
@Inject
private ContractNegotiationObservable contractNegotiationObservable;
@Inject
private ContractNegotiationStore contractNegotiationStore;
@Inject
private TransferProcessManager transferProcessManager;
@Inject
private TypeTransformerRegistry transformer;
@Inject
private WebService webService;
// Non-public unified authentication request filter management service
@Inject
private PublicApiManagementService publicApiManagementService;

@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();
var config = context.getConfig("edc.client");
@Inject
private CatalogService catalogService;
@Inject
private ConsumerContractNegotiationManager consumerNegotiationManager;
@Inject
private ContractNegotiationObservable contractNegotiationObservable;
@Inject
private ContractNegotiationStore contractNegotiationStore;
@Inject
private TransferProcessManager transferProcessManager;
@Inject
private TypeTransformerRegistry transformer;
@Inject
private WebService webService;

var policyController = new PolicyController(monitor, catalogService, transformer, config);
@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();
var config = context.getConfig("edc.client");

var negotiationController = new NegotiationController(consumerNegotiationManager,
contractNegotiationObservable, contractNegotiationStore, config);
var policyController = new PolicyController(monitor, catalogService, transformer, config);

var dataTransferController = new DataTransferController(monitor, config, webService,
authenticationService, transferProcessManager);
var negotiationController = new NegotiationController(consumerNegotiationManager,
contractNegotiationObservable, contractNegotiationStore, config);

webService.registerResource(new ClientEndpoint(monitor, negotiationController, policyController,
dataTransferController));
var dataTransferController = new DataTransferController(monitor, context.getConfig(), webService,
publicApiManagementService, transferProcessManager, context.getConnectorId());

}
webService.registerResource(new ClientEndpoint(monitor, negotiationController, policyController,
dataTransferController));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
*/
package de.fraunhofer.iosb.client.authentication;

import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.api.model.Endpoint;
import de.fraunhofer.iosb.api.model.HttpMethod;
import de.fraunhofer.iosb.client.ClientEndpoint;
import de.fraunhofer.iosb.client.dataTransfer.DataTransferEndpoint;
import jakarta.ws.rs.container.ContainerRequestContext;
import org.eclipse.edc.api.auth.spi.AuthenticationRequestFilter;
import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.spi.monitor.Monitor;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -32,49 +36,24 @@
* Custom AuthenticationRequestFilter filtering requests that go directly to an
* AAS service (managed by this extension) or the extension's configuration.
*/
public class CustomAuthenticationRequestFilter extends AuthenticationRequestFilter {
public class DataTransferEndpointManager {

private final Monitor monitor;
private final Map<String, String> tempKeys;
private final PublicApiManagementService publicApiManagementService;

public CustomAuthenticationRequestFilter(Monitor monitor, AuthenticationService authenticationService) {
super(authenticationService);
this.monitor = monitor;
tempKeys = new ConcurrentHashMap<>();
public DataTransferEndpointManager(PublicApiManagementService publicApiManagementService) {
this.publicApiManagementService = publicApiManagementService;
}

/**
* Add key,value pair for a request. This key will only be available for one
* request.
*
* @param key The key name
* @param value The actual key
*/
public void addTemporaryApiKey(String key, String value) {
tempKeys.put(key, value);
}
/**
* On automated data transfer: If the request is valid, the key,value pair used
* for this request will no longer be valid.
* @param agreementId Agreement to build the endpoint path suffix
* @param key The key name
* @param value The value
*/
@Override
public void filter(ContainerRequestContext requestContext) {
Objects.requireNonNull(requestContext);
var requestPath = requestContext.getUriInfo().getPath();

for (String key : tempKeys.keySet()) {
if (requestContext.getHeaders().containsKey(key)
&& requestContext.getHeaderString(key).equals(tempKeys.get(key))
&& requestPath.startsWith(
format("%s/%s", ClientEndpoint.AUTOMATED_PATH, DataTransferEndpoint.RECEIVE_DATA_PATH))) {
monitor.debug(
format("[Client] Data Transfer request with custom api key %s", key));
tempKeys.remove(key);
return;
}
}

super.filter(requestContext);
public void addTemporaryEndpoint(String agreementId, String key, String value) {
var endpointSuffix = ClientEndpoint.AUTOMATED_PATH + "/receiveData/" + agreementId;
publicApiManagementService.addTemporaryEndpoint(new Endpoint(endpointSuffix, HttpMethod.POST, Map.of(key, List.of(value))));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.edc.api.auth.spi.AuthenticationService;
import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.client.authentication.DataTransferEndpointManager;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.web.spi.WebService;

import de.fraunhofer.iosb.client.authentication.CustomAuthenticationRequestFilter;

public class DataTransferController {

static final String DATA_TRANSFER_API_KEY = "data-transfer-api-key";
Expand All @@ -46,27 +45,26 @@ public class DataTransferController {
private final DataTransferObservable dataTransferObservable;
private final TransferInitiator transferInitiator;

private final CustomAuthenticationRequestFilter dataEndpointAuthenticationRequestFilter;
private final DataTransferEndpointManager dataTransferEndpointManager;

/**
* Class constructor
*
* @param monitor Logging.
* @param config Read config value transfer timeout and
* own URI
* @param webService Register data transfer endpoint.
* @param authenticationService Creating and passing through custom api
* keys for each data transfer.
* @param transferProcessManager Initiating a transfer process as a
* consumer.
* @param monitor Logging.
* @param config Read config value transfer timeout and
* own URI
* @param webService Register data transfer endpoint.
* @param publicApiManagementService Creating and passing through custom api
* keys for each data transfer.
* @param transferProcessManager Initiating a transfer process as a
* consumer.
* @param connectorId Connector ID for the provider to learn
*/
public DataTransferController(Monitor monitor, Config config, WebService webService,
AuthenticationService authenticationService, TransferProcessManager transferProcessManager) {
this.config = config;
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager);
this.dataEndpointAuthenticationRequestFilter = new CustomAuthenticationRequestFilter(monitor,
authenticationService);

PublicApiManagementService publicApiManagementService, TransferProcessManager transferProcessManager, String connectorId) {
this.config = config.getConfig("edc.client");
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager, connectorId);
this.dataTransferEndpointManager = new DataTransferEndpointManager(publicApiManagementService);
this.dataTransferObservable = new DataTransferObservable(monitor);
var dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable);
webService.registerResource(dataTransferEndpoint);
Expand All @@ -76,26 +74,24 @@ public DataTransferController(Monitor monitor, Config config, WebService webServ
* Initiates the transfer process defined by the arguments. The data of the
* transfer will be sent to {@link DataTransferEndpoint#RECEIVE_DATA_PATH}.
*
* @param providerUrl The provider from whom the data is to be fetched.
* @param agreementId Non-null ContractAgreement of the negotiation process.
* @param assetId The asset to be fetched.
* @param dataSinkAddress HTTPDataAddress the result of the transfer should be
* sent to. (If null, send to extension and print in log)
*
* @return A completable future whose result will be the data or an error
* message.
* @param providerUrl The provider from whom the data is to be fetched.
* @param agreementId Non-null ContractAgreement of the negotiation process.
* @param assetId The asset to be fetched.
* @param dataDestinationUrl HTTPDataAddress the result of the transfer should be
* sent to. (If null, send to extension and print in log)
* @return A completable future whose result will be the data or an error message.
* @throws InterruptedException If the data transfer was interrupted
* @throws ExecutionException If the data transfer process failed
*/
public String initiateTransferProcess(URL providerUrl, String agreementId, String assetId,
URL dataDestinationUrl) throws InterruptedException, ExecutionException {
URL dataDestinationUrl) throws InterruptedException, ExecutionException {
// Prepare for incoming data
var dataFuture = new CompletableFuture<String>();
dataTransferObservable.register(dataFuture, agreementId);

if (Objects.isNull(dataDestinationUrl)) {
var apiKey = UUID.randomUUID().toString();
dataEndpointAuthenticationRequestFilter.addTemporaryApiKey(DATA_TRANSFER_API_KEY, apiKey);
dataTransferEndpointManager.addTemporaryEndpoint(agreementId, DATA_TRANSFER_API_KEY, apiKey);

this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, apiKey);
return waitForData(dataFuture, agreementId);
Expand All @@ -112,7 +108,7 @@ public String initiateTransferProcess(URL providerUrl, String agreementId, Strin

private String waitForData(CompletableFuture<String> dataFuture, String agreementId)
throws InterruptedException, ExecutionException {
var waitForTransferTimeout = config.getInteger("getWaitForTransferTimeout",
var waitForTransferTimeout = config.getInteger("waitForTransferTimeout",
WAIT_FOR_TRANSFER_TIMEOUT_DEFAULT);
try {
// Fetch TransferTimeout everytime to adapt to runtime config changes
Expand Down
Loading

0 comments on commit aab1736

Please sign in to comment.