Skip to content

Commit

Permalink
Merge pull request #94 from carlos-schmidt/edc-v0.5.1
Browse files Browse the repository at this point in the history
Update to edc v0.5.1
  • Loading branch information
carlos-schmidt authored Mar 19, 2024
2 parents aab1736 + 55ceb63 commit e4f581f
Show file tree
Hide file tree
Showing 56 changed files with 865 additions and 600 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ jobs:
java-version: '17'
distribution: 'temurin'
- name: Build with Gradle
uses: gradle/gradle-build-action@585b565652cefbba63202a7f927df0ff99f34001
uses: gradle/gradle-build-action@0706ab3a3c20483a3f37c3d9de1b0d95297e3743
with:
arguments: clean build
132 changes: 66 additions & 66 deletions README.md

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

## Current development version

Compatibility: **Eclipse Dataspace Connector v0.4.1**
Compatibility: **Eclipse Dataspace Connector v0.5.1**

**New Features**

- counterPartyId now needed when using client extension


**Bugfixes**

## V1.0.0-alpha5
Expand Down
119 changes: 80 additions & 39 deletions client/src/main/java/de/fraunhofer/iosb/client/ClientEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,13 @@
*/
package de.fraunhofer.iosb.client;

import static java.lang.String.format;
import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;

import java.net.URL;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.policy.spi.PolicyDefinition;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement;
import org.eclipse.edc.spi.types.domain.offer.ContractOffer;

import de.fraunhofer.iosb.client.dataTransfer.DataTransferController;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.fraunhofer.iosb.client.datatransfer.DataTransferController;
import de.fraunhofer.iosb.client.negotiation.NegotiationController;
import de.fraunhofer.iosb.client.policy.PolicyController;
import de.fraunhofer.iosb.client.util.Pair;
import jakarta.json.*;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
Expand All @@ -43,6 +32,24 @@
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.eclipse.edc.catalog.spi.Dataset;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.policy.spi.PolicyDefinition;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement;
import org.eclipse.edc.spi.types.domain.offer.ContractOffer;

import java.io.StringReader;
import java.net.URL;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;

/**
* Automated contract negotiation
Expand All @@ -56,7 +63,7 @@ public class ClientEndpoint {
*/
public static final String AUTOMATED_PATH = "automated";
private static final String ACCEPTED_POLICIES_PATH = "acceptedPolicies";
private static final String DATASET_PATH = "dataset";
private static final String OFFER_PATH = "offer";
private static final String NEGOTIATE_CONTRACT_PATH = "negotiateContract";
private static final String NEGOTIATE_PATH = "negotiate";
private static final String TRANSFER_PATH = "transfer";
Expand All @@ -67,12 +74,14 @@ public class ClientEndpoint {
private final PolicyController policyController;
private final DataTransferController transferController;

private final ObjectMapper objectMapper;

/**
* Initialize a client endpoint.
*
* @param monitor Logging functionality
* @param policyController Finds out policy for a given asset id and provider EDC url.
* @param negotiationController Send contract offer, negotiation status watch.
* @param policyController Provides API for accepted policy management and provider dataset retrieval.
* @param transferController Initiate transfer requests.
*/
public ClientEndpoint(Monitor monitor,
Expand All @@ -84,64 +93,90 @@ public ClientEndpoint(Monitor monitor,
this.policyController = policyController;
this.negotiationController = negotiationController;
this.transferController = transferController;
this.objectMapper = new ObjectMapper();
}

/**
* Return dataset for assetId that match any policyDefinitions' policy
* Return dataset for assetId that match any policyDefinition's policy
* of the services' policyDefinitionStore instance containing user added
* policyDefinitions. If more than one policyDefinitions are provided by the
* provider connector, an AmbiguousOrNullException will be thrown.
*
* @param providerUrl Provider of the asset.
* @param assetId Asset ID of the asset whose contract should be fetched.
* @return One policyDefinition offered by the provider for the given assetId.
* @return A dataset offered by the provider for the given assetId.
*/
@GET
@Path(DATASET_PATH)
public Response getDataset(@QueryParam("providerUrl") URL providerUrl, @QueryParam("assetId") String assetId) {
monitor.debug(format("[Client] Received a %s GET request", DATASET_PATH));
@Path(OFFER_PATH)
public Response getOffer(@QueryParam("providerUrl") URL providerUrl, @QueryParam("assetId") String assetId, @QueryParam("providerId") String counterPartyId) {
monitor.debug(format("[Client] Received a %s GET request", OFFER_PATH));

if (Objects.isNull(providerUrl)) {
return Response.status(Response.Status.BAD_REQUEST).entity("Provider URL must not be null").build();
}

try {
var dataset = policyController.getDataset(providerUrl, assetId);
return Response.ok(dataset).build();
var dataset = policyController.getDataset(counterPartyId, providerUrl, assetId);

var parsedResponse = buildResponseFrom(dataset);
return Response.ok(parsedResponse).build();

} catch (InterruptedException interruptedException) {
monitor.severe(format("[Client] Getting dataset failed for provider %s and asset %s", providerUrl,
monitor.severe(format("[Client] Getting offer failed for provider %s and asset %s", providerUrl,
assetId), interruptedException);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(interruptedException.getMessage())
.build();

} catch (JsonProcessingException policyWriteException) {
monitor.severe(format("[Client] Parsing policy failed for provider %s and asset %s", providerUrl,
assetId), policyWriteException);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(policyWriteException.getMessage())
.build();

}
}

private String buildResponseFrom(Dataset dataset) throws JsonProcessingException {
var offer = dataset.getOffers().entrySet().stream().findFirst().orElseThrow();

// Build negotiation request body for the user
var policyString = objectMapper.writeValueAsString(offer.getValue());
var policyJson = Json.createReader(new StringReader(policyString)).read();

return Json.createObjectBuilder()
.add("id", offer.getKey())
.add("policy", policyJson)
.add("assetId", offer.getValue().getTarget())
.build()
.toString();
}

/**
* Negotiate a contract agreement using the given contract offer if no agreement
* exists for this constellation.
*
* @param providerUrl Provider EDCs URL (DSP endpoint)
* @param providerId Provider EDCs ID
* @param counterPartyUrl Provider EDCs URL (DSP endpoint)
* @param counterPartyId Provider EDCs ID
* @param assetId ID of the asset to be retrieved
* @param dataDestinationUrl URL of destination data sink.
* @return Asset data
*/
@POST
@Path(NEGOTIATE_PATH)
public Response negotiateContract(@QueryParam("providerUrl") URL providerUrl,
@QueryParam("providerId") String providerId,
public Response negotiateContract(@QueryParam("providerUrl") URL counterPartyUrl,
@QueryParam("providerId") String counterPartyId,
@QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
monitor.debug(format("[Client] Received a %s POST request", NEGOTIATE_PATH));
Objects.requireNonNull(providerUrl, "Provider URL must not be null");
Objects.requireNonNull(providerId, "Provider ID must not be null");
Objects.requireNonNull(counterPartyUrl, "Provider URL must not be null");
Objects.requireNonNull(counterPartyId, "Provider ID must not be null");
Objects.requireNonNull(assetId, "Asset ID must not be null");

Pair<String, Policy> idPolicyPair; // id means contractOfferId
try {
idPolicyPair = policyController.getAcceptablePolicyForAssetId(providerUrl, assetId);
idPolicyPair = policyController.getAcceptablePolicyForAssetId(counterPartyId, counterPartyUrl, assetId);
} catch (InterruptedException negotiationException) {
monitor.severe(format("[Client] Getting policies failed for provider %s and asset %s", providerUrl,
monitor.severe(format("[Client] Getting policies failed for provider %s and asset %s", counterPartyUrl,
assetId), negotiationException);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(negotiationException.getMessage())
.build();
Expand All @@ -155,22 +190,22 @@ public Response negotiateContract(@QueryParam("providerUrl") URL providerUrl,

var contractRequest = ContractRequest.Builder.newInstance()
.contractOffer(offer)
.counterPartyAddress(providerUrl.toString())
.providerId(providerId)
.counterPartyAddress(counterPartyUrl.toString())
.providerId(counterPartyId)
.protocol(DATASPACE_PROTOCOL_HTTP)
.build();
ContractAgreement agreement;

try {
agreement = negotiationController.negotiateContract(contractRequest);
} catch (InterruptedException | ExecutionException negotiationException) {
monitor.severe(format("[Client] Negotiation failed for provider %s and contractOffer %s", providerUrl,
monitor.severe(format("[Client] Negotiation failed for provider %s and contractOffer %s", counterPartyUrl,
offer.getId()), negotiationException);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(negotiationException.getMessage())
.build();
}

return getData(providerUrl, agreement.getId(), assetId, dataDestinationUrl);
return getData(counterPartyUrl, agreement.getId(), assetId, dataDestinationUrl);
}

/**
Expand All @@ -187,7 +222,12 @@ public Response negotiateContract(ContractRequest contractRequest) {
Objects.requireNonNull(contractRequest, "ContractRequest must not be null");
try {
var agreement = negotiationController.negotiateContract(contractRequest);
return Response.ok(agreement).build();
// Sanitize response (only ID is relevant here)
var agreementResponse = Json.createObjectBuilder()
.add("agreement-id", agreement.getId())
.build()
.toString();
return Response.ok(agreementResponse).build();
} catch (InterruptedException | ExecutionException negotiationException) {
monitor.severe(
format("[Client] Negotiation failed for provider %s and contractRequest %s",
Expand All @@ -211,7 +251,8 @@ public Response negotiateContract(ContractRequest contractRequest) {
@GET
@Path(TRANSFER_PATH)
public Response getData(@QueryParam("providerUrl") URL providerUrl,
@QueryParam("agreementId") String agreementId, @QueryParam("assetId") String assetId,
@QueryParam("agreementId") String agreementId,
@QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
monitor.debug(format("[Client] Received a %s GET request", TRANSFER_PATH));
Objects.requireNonNull(providerUrl, "providerUrl must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package de.fraunhofer.iosb.client;

import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.client.dataTransfer.DataTransferController;
import de.fraunhofer.iosb.client.datatransfer.DataTransferController;
import de.fraunhofer.iosb.client.negotiation.NegotiationController;
import de.fraunhofer.iosb.client.policy.PolicyController;
import org.eclipse.edc.connector.contract.spi.negotiation.ConsumerContractNegotiationManager;
Expand Down Expand Up @@ -61,8 +61,9 @@ public void initialize(ServiceExtensionContext context) {
var negotiationController = new NegotiationController(consumerNegotiationManager,
contractNegotiationObservable, contractNegotiationStore, config);

// This controller needs base config to read EDC's hostname + specific ports
var dataTransferController = new DataTransferController(monitor, context.getConfig(), webService,
publicApiManagementService, transferProcessManager, context.getConnectorId());
publicApiManagementService, transferProcessManager);

webService.registerResource(new ClientEndpoint(monitor, negotiationController, policyController,
dataTransferController));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,9 @@
import de.fraunhofer.iosb.api.model.Endpoint;
import de.fraunhofer.iosb.api.model.HttpMethod;
import de.fraunhofer.iosb.client.ClientEndpoint;
import de.fraunhofer.iosb.client.dataTransfer.DataTransferEndpoint;
import jakarta.ws.rs.container.ContainerRequestContext;
import org.eclipse.edc.api.auth.spi.AuthenticationRequestFilter;
import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.spi.monitor.Monitor;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import static java.lang.String.format;

/**
* Custom AuthenticationRequestFilter filtering requests that go directly to an
Expand All @@ -56,4 +47,4 @@ public void addTemporaryEndpoint(String agreementId, String key, String value) {
var endpointSuffix = ClientEndpoint.AUTOMATED_PATH + "/receiveData/" + agreementId;
publicApiManagementService.addTemporaryEndpoint(new Endpoint(endpointSuffix, HttpMethod.POST, Map.of(key, List.of(value))));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.client.dataTransfer;
package de.fraunhofer.iosb.client.datatransfer;

import static java.lang.String.format;
import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.client.authentication.DataTransferEndpointManager;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.web.spi.WebService;

import java.net.URL;
import java.util.Objects;
Expand All @@ -25,14 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.client.authentication.DataTransferEndpointManager;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.web.spi.WebService;
import static java.lang.String.format;

public class DataTransferController {

Expand All @@ -58,12 +58,11 @@ public class DataTransferController {
* keys for each data transfer.
* @param transferProcessManager Initiating a transfer process as a
* consumer.
* @param connectorId Connector ID for the provider to learn
*/
public DataTransferController(Monitor monitor, Config config, WebService webService,
PublicApiManagementService publicApiManagementService, TransferProcessManager transferProcessManager, String connectorId) {
PublicApiManagementService publicApiManagementService, TransferProcessManager transferProcessManager) {
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager);
this.config = config.getConfig("edc.client");
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager, connectorId);
this.dataTransferEndpointManager = new DataTransferEndpointManager(publicApiManagementService);
this.dataTransferObservable = new DataTransferObservable(monitor);
var dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable);
Expand All @@ -86,8 +85,7 @@ public DataTransferController(Monitor monitor, Config config, WebService webServ
public String initiateTransferProcess(URL providerUrl, String agreementId, String assetId,
URL dataDestinationUrl) throws InterruptedException, ExecutionException {
// Prepare for incoming data
var dataFuture = new CompletableFuture<String>();
dataTransferObservable.register(dataFuture, agreementId);
var dataFuture = dataTransferObservable.register(agreementId);

if (Objects.isNull(dataDestinationUrl)) {
var apiKey = UUID.randomUUID().toString();
Expand All @@ -96,11 +94,13 @@ public String initiateTransferProcess(URL providerUrl, String agreementId, Strin
this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, apiKey);
return waitForData(dataFuture, agreementId);
} else {
// Send data to custom target url
var dataSinkAddress = HttpDataAddress.Builder.newInstance()
.baseUrl(dataDestinationUrl.toString())
.build();

this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, dataSinkAddress);
// Don't have to wait for data
return null;
}

Expand Down
Loading

0 comments on commit e4f581f

Please sign in to comment.