Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to edc v0.5.1 #94

Merged
merged 27 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
34f4ca7
Update to EDC v0.5.1
carlos-schmidt Feb 24, 2024
9e00bcd
Code cleanup
carlos-schmidt Feb 24, 2024
4bea014
Code cleanup
carlos-schmidt Feb 24, 2024
e41dc4e
Update docs
carlos-schmidt Feb 24, 2024
9308207
Merge main into update branch
carlos-schmidt Feb 24, 2024
419504d
Code cleanup
carlos-schmidt Feb 24, 2024
fbf0bdb
Code cleanup
carlos-schmidt Feb 25, 2024
ffe9908
Write new tests for PolicyService
carlos-schmidt Feb 25, 2024
55f9648
Add policy service config test
carlos-schmidt Mar 5, 2024
b8cafa1
Code cleanup
carlos-schmidt Mar 5, 2024
78b0523
Give controller correct config
carlos-schmidt Mar 5, 2024
ab74344
Cleanup, fix smaller bugs
carlos-schmidt Mar 5, 2024
93909e3
Temporary fix of multiple AuthRequestFilters
carlos-schmidt Mar 5, 2024
a06946d
Add unified auth request filter
carlos-schmidt Mar 16, 2024
16df190
Remove extension specific authreq-filter
carlos-schmidt Mar 16, 2024
3cbe135
Merge branch 'FraunhoferIOSB:main' into main
carlos-schmidt Mar 16, 2024
8c553c2
Merge main into branch
carlos-schmidt Mar 16, 2024
8efce1e
Clean up code
carlos-schmidt Mar 16, 2024
1bf45c7
Fix repeated negotiation
carlos-schmidt Mar 19, 2024
1438cdc
Fix repeated negotiation timeout while waiting
carlos-schmidt Mar 19, 2024
f5837bc
Fix wrong config value read
carlos-schmidt Mar 19, 2024
701ba0e
Fix wrong config passed
carlos-schmidt Mar 19, 2024
7b481db
Cleanup code, sanitize http responses
carlos-schmidt Mar 19, 2024
033bc5c
Update readme: client negotiation
carlos-schmidt Mar 19, 2024
4956c5f
Fix PolicyServiceConfigTest
carlos-schmidt Mar 19, 2024
50649ab
Add license header
carlos-schmidt Mar 19, 2024
55ceb63
Update postman collection
carlos-schmidt Mar 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ jobs:
java-version: '17'
distribution: 'temurin'
- name: Build with Gradle
uses: gradle/gradle-build-action@585b565652cefbba63202a7f927df0ff99f34001
uses: gradle/gradle-build-action@0706ab3a3c20483a3f37c3d9de1b0d95297e3743
with:
arguments: clean build
132 changes: 66 additions & 66 deletions README.md

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

## Current development version

Compatibility: **Eclipse Dataspace Connector v0.4.1**
Compatibility: **Eclipse Dataspace Connector v0.5.1**

**New Features**

- counterPartyId now needed when using client extension


**Bugfixes**

## V1.0.0-alpha5
Expand Down
119 changes: 80 additions & 39 deletions client/src/main/java/de/fraunhofer/iosb/client/ClientEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,13 @@
*/
package de.fraunhofer.iosb.client;

import static java.lang.String.format;
import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;

import java.net.URL;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.policy.spi.PolicyDefinition;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement;
import org.eclipse.edc.spi.types.domain.offer.ContractOffer;

import de.fraunhofer.iosb.client.dataTransfer.DataTransferController;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.fraunhofer.iosb.client.datatransfer.DataTransferController;
import de.fraunhofer.iosb.client.negotiation.NegotiationController;
import de.fraunhofer.iosb.client.policy.PolicyController;
import de.fraunhofer.iosb.client.util.Pair;
import jakarta.json.*;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
Expand All @@ -43,6 +32,24 @@
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.eclipse.edc.catalog.spi.Dataset;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.policy.spi.PolicyDefinition;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement;
import org.eclipse.edc.spi.types.domain.offer.ContractOffer;

import java.io.StringReader;
import java.net.URL;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;

/**
* Automated contract negotiation
Expand All @@ -56,7 +63,7 @@ public class ClientEndpoint {
*/
public static final String AUTOMATED_PATH = "automated";
private static final String ACCEPTED_POLICIES_PATH = "acceptedPolicies";
private static final String DATASET_PATH = "dataset";
private static final String OFFER_PATH = "offer";
private static final String NEGOTIATE_CONTRACT_PATH = "negotiateContract";
private static final String NEGOTIATE_PATH = "negotiate";
private static final String TRANSFER_PATH = "transfer";
Expand All @@ -67,12 +74,14 @@ public class ClientEndpoint {
private final PolicyController policyController;
private final DataTransferController transferController;

private final ObjectMapper objectMapper;

/**
* Initialize a client endpoint.
*
* @param monitor Logging functionality
* @param policyController Finds out policy for a given asset id and provider EDC url.
* @param negotiationController Send contract offer, negotiation status watch.
* @param policyController Provides API for accepted policy management and provider dataset retrieval.
* @param transferController Initiate transfer requests.
*/
public ClientEndpoint(Monitor monitor,
Expand All @@ -84,64 +93,90 @@ public ClientEndpoint(Monitor monitor,
this.policyController = policyController;
this.negotiationController = negotiationController;
this.transferController = transferController;
this.objectMapper = new ObjectMapper();
}

/**
* Return dataset for assetId that match any policyDefinitions' policy
* Return dataset for assetId that match any policyDefinition's policy
* of the services' policyDefinitionStore instance containing user added
* policyDefinitions. If more than one policyDefinitions are provided by the
* provider connector, an AmbiguousOrNullException will be thrown.
*
* @param providerUrl Provider of the asset.
* @param assetId Asset ID of the asset whose contract should be fetched.
* @return One policyDefinition offered by the provider for the given assetId.
* @return A dataset offered by the provider for the given assetId.
*/
@GET
@Path(DATASET_PATH)
public Response getDataset(@QueryParam("providerUrl") URL providerUrl, @QueryParam("assetId") String assetId) {
monitor.debug(format("[Client] Received a %s GET request", DATASET_PATH));
@Path(OFFER_PATH)
public Response getOffer(@QueryParam("providerUrl") URL providerUrl, @QueryParam("assetId") String assetId, @QueryParam("providerId") String counterPartyId) {
monitor.debug(format("[Client] Received a %s GET request", OFFER_PATH));

if (Objects.isNull(providerUrl)) {
return Response.status(Response.Status.BAD_REQUEST).entity("Provider URL must not be null").build();
}

try {
var dataset = policyController.getDataset(providerUrl, assetId);
return Response.ok(dataset).build();
var dataset = policyController.getDataset(counterPartyId, providerUrl, assetId);

var parsedResponse = buildResponseFrom(dataset);
return Response.ok(parsedResponse).build();

} catch (InterruptedException interruptedException) {
monitor.severe(format("[Client] Getting dataset failed for provider %s and asset %s", providerUrl,
monitor.severe(format("[Client] Getting offer failed for provider %s and asset %s", providerUrl,
assetId), interruptedException);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(interruptedException.getMessage())
.build();

} catch (JsonProcessingException policyWriteException) {
monitor.severe(format("[Client] Parsing policy failed for provider %s and asset %s", providerUrl,
assetId), policyWriteException);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(policyWriteException.getMessage())
.build();

}
}

private String buildResponseFrom(Dataset dataset) throws JsonProcessingException {
var offer = dataset.getOffers().entrySet().stream().findFirst().orElseThrow();

// Build negotiation request body for the user
var policyString = objectMapper.writeValueAsString(offer.getValue());
var policyJson = Json.createReader(new StringReader(policyString)).read();

return Json.createObjectBuilder()
.add("id", offer.getKey())
.add("policy", policyJson)
.add("assetId", offer.getValue().getTarget())
.build()
.toString();
}

/**
* Negotiate a contract agreement using the given contract offer if no agreement
* exists for this constellation.
*
* @param providerUrl Provider EDCs URL (DSP endpoint)
* @param providerId Provider EDCs ID
* @param counterPartyUrl Provider EDCs URL (DSP endpoint)
* @param counterPartyId Provider EDCs ID
* @param assetId ID of the asset to be retrieved
* @param dataDestinationUrl URL of destination data sink.
* @return Asset data
*/
@POST
@Path(NEGOTIATE_PATH)
public Response negotiateContract(@QueryParam("providerUrl") URL providerUrl,
@QueryParam("providerId") String providerId,
public Response negotiateContract(@QueryParam("providerUrl") URL counterPartyUrl,
@QueryParam("providerId") String counterPartyId,
@QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
monitor.debug(format("[Client] Received a %s POST request", NEGOTIATE_PATH));
Objects.requireNonNull(providerUrl, "Provider URL must not be null");
Objects.requireNonNull(providerId, "Provider ID must not be null");
Objects.requireNonNull(counterPartyUrl, "Provider URL must not be null");
Objects.requireNonNull(counterPartyId, "Provider ID must not be null");
Objects.requireNonNull(assetId, "Asset ID must not be null");

Pair<String, Policy> idPolicyPair; // id means contractOfferId
try {
idPolicyPair = policyController.getAcceptablePolicyForAssetId(providerUrl, assetId);
idPolicyPair = policyController.getAcceptablePolicyForAssetId(counterPartyId, counterPartyUrl, assetId);
} catch (InterruptedException negotiationException) {
monitor.severe(format("[Client] Getting policies failed for provider %s and asset %s", providerUrl,
monitor.severe(format("[Client] Getting policies failed for provider %s and asset %s", counterPartyUrl,
assetId), negotiationException);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(negotiationException.getMessage())
.build();
Expand All @@ -155,22 +190,22 @@ public Response negotiateContract(@QueryParam("providerUrl") URL providerUrl,

var contractRequest = ContractRequest.Builder.newInstance()
.contractOffer(offer)
.counterPartyAddress(providerUrl.toString())
.providerId(providerId)
.counterPartyAddress(counterPartyUrl.toString())
.providerId(counterPartyId)
.protocol(DATASPACE_PROTOCOL_HTTP)
.build();
ContractAgreement agreement;

try {
agreement = negotiationController.negotiateContract(contractRequest);
} catch (InterruptedException | ExecutionException negotiationException) {
monitor.severe(format("[Client] Negotiation failed for provider %s and contractOffer %s", providerUrl,
monitor.severe(format("[Client] Negotiation failed for provider %s and contractOffer %s", counterPartyUrl,
offer.getId()), negotiationException);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(negotiationException.getMessage())
.build();
}

return getData(providerUrl, agreement.getId(), assetId, dataDestinationUrl);
return getData(counterPartyUrl, agreement.getId(), assetId, dataDestinationUrl);
}

/**
Expand All @@ -187,7 +222,12 @@ public Response negotiateContract(ContractRequest contractRequest) {
Objects.requireNonNull(contractRequest, "ContractRequest must not be null");
try {
var agreement = negotiationController.negotiateContract(contractRequest);
return Response.ok(agreement).build();
// Sanitize response (only ID is relevant here)
var agreementResponse = Json.createObjectBuilder()
.add("agreement-id", agreement.getId())
.build()
.toString();
return Response.ok(agreementResponse).build();
} catch (InterruptedException | ExecutionException negotiationException) {
monitor.severe(
format("[Client] Negotiation failed for provider %s and contractRequest %s",
Expand All @@ -211,7 +251,8 @@ public Response negotiateContract(ContractRequest contractRequest) {
@GET
@Path(TRANSFER_PATH)
public Response getData(@QueryParam("providerUrl") URL providerUrl,
@QueryParam("agreementId") String agreementId, @QueryParam("assetId") String assetId,
@QueryParam("agreementId") String agreementId,
@QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
monitor.debug(format("[Client] Received a %s GET request", TRANSFER_PATH));
Objects.requireNonNull(providerUrl, "providerUrl must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package de.fraunhofer.iosb.client;

import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.client.dataTransfer.DataTransferController;
import de.fraunhofer.iosb.client.datatransfer.DataTransferController;
import de.fraunhofer.iosb.client.negotiation.NegotiationController;
import de.fraunhofer.iosb.client.policy.PolicyController;
import org.eclipse.edc.connector.contract.spi.negotiation.ConsumerContractNegotiationManager;
Expand Down Expand Up @@ -61,8 +61,9 @@ public void initialize(ServiceExtensionContext context) {
var negotiationController = new NegotiationController(consumerNegotiationManager,
contractNegotiationObservable, contractNegotiationStore, config);

// This controller needs base config to read EDC's hostname + specific ports
var dataTransferController = new DataTransferController(monitor, context.getConfig(), webService,
publicApiManagementService, transferProcessManager, context.getConnectorId());
publicApiManagementService, transferProcessManager);

webService.registerResource(new ClientEndpoint(monitor, negotiationController, policyController,
dataTransferController));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,9 @@
import de.fraunhofer.iosb.api.model.Endpoint;
import de.fraunhofer.iosb.api.model.HttpMethod;
import de.fraunhofer.iosb.client.ClientEndpoint;
import de.fraunhofer.iosb.client.dataTransfer.DataTransferEndpoint;
import jakarta.ws.rs.container.ContainerRequestContext;
import org.eclipse.edc.api.auth.spi.AuthenticationRequestFilter;
import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.spi.monitor.Monitor;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import static java.lang.String.format;

/**
* Custom AuthenticationRequestFilter filtering requests that go directly to an
Expand All @@ -56,4 +47,4 @@ public void addTemporaryEndpoint(String agreementId, String key, String value) {
var endpointSuffix = ClientEndpoint.AUTOMATED_PATH + "/receiveData/" + agreementId;
publicApiManagementService.addTemporaryEndpoint(new Endpoint(endpointSuffix, HttpMethod.POST, Map.of(key, List.of(value))));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.client.dataTransfer;
package de.fraunhofer.iosb.client.datatransfer;

import static java.lang.String.format;
import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.client.authentication.DataTransferEndpointManager;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.web.spi.WebService;

import java.net.URL;
import java.util.Objects;
Expand All @@ -25,14 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.client.authentication.DataTransferEndpointManager;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.web.spi.WebService;
import static java.lang.String.format;

public class DataTransferController {

Expand All @@ -58,12 +58,11 @@ public class DataTransferController {
* keys for each data transfer.
* @param transferProcessManager Initiating a transfer process as a
* consumer.
* @param connectorId Connector ID for the provider to learn
*/
public DataTransferController(Monitor monitor, Config config, WebService webService,
PublicApiManagementService publicApiManagementService, TransferProcessManager transferProcessManager, String connectorId) {
PublicApiManagementService publicApiManagementService, TransferProcessManager transferProcessManager) {
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager);
this.config = config.getConfig("edc.client");
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager, connectorId);
this.dataTransferEndpointManager = new DataTransferEndpointManager(publicApiManagementService);
this.dataTransferObservable = new DataTransferObservable(monitor);
var dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable);
Expand All @@ -86,8 +85,7 @@ public DataTransferController(Monitor monitor, Config config, WebService webServ
public String initiateTransferProcess(URL providerUrl, String agreementId, String assetId,
URL dataDestinationUrl) throws InterruptedException, ExecutionException {
// Prepare for incoming data
var dataFuture = new CompletableFuture<String>();
dataTransferObservable.register(dataFuture, agreementId);
var dataFuture = dataTransferObservable.register(agreementId);

if (Objects.isNull(dataDestinationUrl)) {
var apiKey = UUID.randomUUID().toString();
Expand All @@ -96,11 +94,13 @@ public String initiateTransferProcess(URL providerUrl, String agreementId, Strin
this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, apiKey);
return waitForData(dataFuture, agreementId);
} else {
// Send data to custom target url
var dataSinkAddress = HttpDataAddress.Builder.newInstance()
.baseUrl(dataDestinationUrl.toString())
.build();

this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, dataSinkAddress);
// Don't have to wait for data
return null;
}

Expand Down
Loading