Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release/0.2.1' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
hhund committed Mar 30, 2021
2 parents b98d9f3 + d6ab23e commit c3828b4
Show file tree
Hide file tree
Showing 35 changed files with 568 additions and 116 deletions.
50 changes: 49 additions & 1 deletion codex-process-data-transfer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>de.netzwerk-universitaetsmedizin.codex</groupId>
<artifactId>codex-processes-ap1</artifactId>
<version>0.2.0</version>
<version>0.2.1</version>
</parent>

<properties>
Expand Down Expand Up @@ -62,6 +62,23 @@
<outputDirectory>../codex-processes-ap1-docker-test-setup/crr/bpe/app/process</outputDirectory>
</configuration>
</execution>
<execution>
<id>copy-hapi-fhir-client/crr</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-client</artifactId>
<version>${hapi.version}</version>
</artifactItem>
</artifactItems>
<outputDirectory>../codex-processes-ap1-docker-test-setup/crr/bpe/app/plugin</outputDirectory>
</configuration>
</execution>
<execution>
<id>copy-process-plugin-to-docker-test-setup/dic</id>
<phase>package</phase>
Expand All @@ -79,6 +96,23 @@
<outputDirectory>../codex-processes-ap1-docker-test-setup/dic/bpe/app/process</outputDirectory>
</configuration>
</execution>
<execution>
<id>copy-hapi-fhir-client/dic</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-client</artifactId>
<version>${hapi.version}</version>
</artifactItem>
</artifactItems>
<outputDirectory>../codex-processes-ap1-docker-test-setup/dic/bpe/app/plugin</outputDirectory>
</configuration>
</execution>
<execution>
<id>copy-process-plugin-to-docker-test-setup/gth</id>
<phase>package</phase>
Expand Down Expand Up @@ -110,13 +144,27 @@
</includes>
<followSymlinks>false</followSymlinks>
</fileset>
<fileset>
<directory>../codex-processes-ap1-docker-test-setup/crr/bpe/app/plugin</directory>
<includes>
<include>hapi-fhir-client-${hapi.version}.jar</include>
</includes>
<followSymlinks>false</followSymlinks>
</fileset>
<fileset>
<directory>../codex-processes-ap1-docker-test-setup/dic/bpe/app/process</directory>
<includes>
<include>${project.artifactId}-${project.version}.jar</include>
</includes>
<followSymlinks>false</followSymlinks>
</fileset>
<fileset>
<directory>../codex-processes-ap1-docker-test-setup/dic/bpe/app/plugin</directory>
<includes>
<include>hapi-fhir-client-${hapi.version}.jar</include>
</includes>
<followSymlinks>false</followSymlinks>
</fileset>
<fileset>
<directory>../codex-processes-ap1-docker-test-setup/gth/bpe/app/process</directory>
<includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

public class DataTransferProcessPluginDefinition implements ProcessPluginDefinition
{
public static final String VERSION = "0.2.0";
public static final String VERSION = "0.2.1";

@Override
public String getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

public class FhirClientFactory
{
private static final String condition = "{\"resourceType\":\"Condition\",\"meta\":{\"profile\":[\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/chronic-lung-diseases\"]},\"clinicalStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-clinical\",\"code\":\"active\",\"display\":\"Active\"}]},\"verificationStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-ver-status\",\"code\":\"confirmed\",\"display\":\"Confirmed\"},{\"system\":\"http://snomed.info/sct\",\"code\":\"410605003\",\"display\":\"Confirmed present (qualifier value)\"}]},\"category\":[{\"coding\":[{\"system\":\"http://snomed.info/sct\",\"code\":\"418112009\",\"display\":\"Pulmonary medicine\"}]}],\"code\":{\"coding\":[{\"system\":\"http://snomed.info/sct\",\"code\":\"413839001\",\"display\":\"Chronic lung disease\"}]},\"recordedDate\":\"2020-11-10T15:50:41+01:00\"}";
private static final String condition = "{\"resourceType\":\"Condition\",\"meta\":{\"profile\":[\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/chronic-lung-diseases\"]},\"clinicalStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-clinical\",\"code\":\"active\",\"display\":\"Active\"}]},\"verificationStatus\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/condition-ver-status\",\"code\":\"confirmed\",\"display\":\"Confirmed\"},{\"system\":\"http://snomed.info/sct\",\"code\":\"410605003\",\"display\":\"Confirmed present (qualifier value)\"}]},\"category\":[{\"coding\":[{\"system\":\"http://snomed.info/sct\",\"code\":\"418112009\",\"display\":\"Pulmonary medicine\"}]}],\"code\":{\"coding\":[{\"system\":\"http://snomed.info/sct\",\"code\":\"413839001\",\"display\":\"Chronic lung disease\"}]},\"recordedDate\":\"2020-11-10T15:50:41.000+01:00\"}";
private static final String patient = "{\"resourceType\":\"Patient\",\"meta\":{\"profile\":[\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/Patient\"]},\"extension\":[{\"url\":\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/ethnic-group\",\"valueCoding\":{\"system\":\"http://snomed.info/sct\",\"code\":\"186019001\",\"display\":\"Other ethnic, mixed origin\"}},{\"url\":\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/age\",\"extension\":[{\"url\":\"dateTimeOfDocumentation\",\"valueDateTime\":\"2020-10-01\"},{\"url\":\"age\",\"valueAge\":{\"value\":67,\"unit\":\"years\",\"system\":\"http://unitsofmeasure.org\",\"code\":\"a\"}}]}],\"birthDate\":\"1953-09-30\"}";
private static final String observation = "{\"resourceType\":\"Observation\",\"meta\":{\"profile\":[\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/sars-cov-2-rt-pcr\"]},\"identifier\":[{\"type\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/v2-0203\",\"code\":\"OBI\"}]}}],\"status\":\"final\",\"category\":[{\"coding\":[{\"system\":\"http://loinc.org\",\"code\":\"26436-6\"},{\"system\":\"http://terminology.hl7.org/CodeSystem/observation-category\",\"code\":\"laboratory\"}]}],\"code\":{\"coding\":[{\"system\":\"http://loinc.org\",\"code\":\"94500-6\",\"display\":\"SARS-CoV-2 (COVID-19) RNA [Presence] in Respiratory specimen by NAA with probe detection\"}],\"text\":\"SARS-CoV-2-RNA (PCR)\"},\"effectiveDateTime\":\"2020-11-10T15:50:41+01:00\",\"valueCodeableConcept\":{\"coding\":[{\"system\":\"http://snomed.info/sct\",\"code\":\"260373001\",\"display\":\"Detected (qualifier value)\"}],\"text\":\"SARS-CoV-2-RNA positiv\"}}";
private static final String observation = "{\"resourceType\":\"Observation\",\"meta\":{\"profile\":[\"https://www.netzwerk-universitaetsmedizin.de/fhir/StructureDefinition/sars-cov-2-rt-pcr\"]},\"identifier\":[{\"type\":{\"coding\":[{\"system\":\"http://terminology.hl7.org/CodeSystem/v2-0203\",\"code\":\"OBI\"}]}}],\"status\":\"final\",\"category\":[{\"coding\":[{\"system\":\"http://loinc.org\",\"code\":\"26436-6\"},{\"system\":\"http://terminology.hl7.org/CodeSystem/observation-category\",\"code\":\"laboratory\"}]}],\"code\":{\"coding\":[{\"system\":\"http://loinc.org\",\"code\":\"94500-6\",\"display\":\"SARS-CoV-2 (COVID-19) RNA [Presence] in Respiratory specimen by NAA with probe detection\"}],\"text\":\"SARS-CoV-2-RNA (PCR)\"},\"effectiveDateTime\":\"2020-11-10T15:50:41.000+01:00\",\"valueCodeableConcept\":{\"coding\":[{\"system\":\"http://snomed.info/sct\",\"code\":\"260373001\",\"display\":\"Detected (qualifier value)\"}],\"text\":\"SARS-CoV-2-RNA positiv\"}}";

private static final Logger logger = LoggerFactory.getLogger(FhirClientFactory.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -20,8 +21,18 @@
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
import org.hl7.fhir.r4.model.Bundle.BundleType;
import org.hl7.fhir.r4.model.Bundle.HTTPVerb;
import org.hl7.fhir.r4.model.Condition;
import org.hl7.fhir.r4.model.Consent;
import org.hl7.fhir.r4.model.DiagnosticReport;
import org.hl7.fhir.r4.model.DomainResource;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Immunization;
import org.hl7.fhir.r4.model.MedicationStatement;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Procedure;
import org.hl7.fhir.r4.model.Reference;
import org.hl7.fhir.r4.model.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -352,13 +363,17 @@ public PseudonymList getPseudonymsWithNewData(DateWithPrecision exportFrom, Date
.flatMap(this::getPatients);

return new PseudonymList(patients
.map(p -> p.getIdentifier().stream()
.filter(i -> i.hasSystem() && i.hasValue()
&& ConstantsDataTransfer.NAMING_SYSTEM_NUM_CODEX_DIC_PSEUDONYM.equals(i.getSystem()))
.map(i -> i.getValue()).findFirst().orElse(null))
.map(p -> getPseudonym(p, ConstantsDataTransfer.NAMING_SYSTEM_NUM_CODEX_DIC_PSEUDONYM).orElse(null))
.filter(p -> p != null).distinct().collect(Collectors.toList()));
}

private Optional<String> getPseudonym(Patient p, String namingSystem)
{
return p.getIdentifier().stream()
.filter(i -> i.hasSystem() && i.hasValue() && namingSystem.equals(i.getSystem())).map(i -> i.getValue())
.findFirst();
}

private Stream<Patient> getPatients(Bundle bundle)
{
Stream<Patient> patients = getPatientsFromBundle(bundle);
Expand Down Expand Up @@ -433,26 +448,19 @@ private Stream<DomainResource> getNewDataWithIdentifierReferenceSupport(String p
private Stream<DomainResource> getNewDataWithoutIdentifierReferenceSupport(String pseudonym,
DateWithPrecision exportFrom, Date exportTo)
{
Bundle patientBundle = (Bundle) clientFactory.getFhirStoreClient().search().forResource(Patient.class)
.where(Patient.IDENTIFIER.exactly()
.systemAndIdentifier(ConstantsDataTransfer.NAMING_SYSTEM_NUM_CODEX_DIC_PSEUDONYM, pseudonym))
.execute();

if (logger.isDebugEnabled())
logger.debug("Patient search-bundle result: {}",
fhirContext.newJsonParser().encodeResourceToString(patientBundle));

if (patientBundle.getTotal() != 1 || !(patientBundle.getEntryFirstRep().getResource() instanceof Patient))
Optional<Patient> localPatient = findPatientInLocalFhirStore(
ConstantsDataTransfer.NAMING_SYSTEM_NUM_CODEX_DIC_PSEUDONYM, pseudonym);
if (localPatient.isEmpty())
{
logger.warn(
"Error while retrieving patient for pseudonym {}, result bundle total not 1 or first entry not patient",
pseudonym);
throw new RuntimeException("Error while retrieving patient for pseudonym " + pseudonym);
}

Patient patient = (Patient) patientBundle.getEntryFirstRep().getResource();

Bundle searchBundle = getSearchBundleWithPatientId(patient.getIdElement().getIdPart(), exportFrom, exportTo);
Bundle searchBundle = getSearchBundleWithPatientId(localPatient.get().getIdElement().getIdPart(), exportFrom,
exportTo);

if (logger.isDebugEnabled())
logger.debug("Executing Search-Bundle: {}",
Expand All @@ -464,11 +472,26 @@ private Stream<DomainResource> getNewDataWithoutIdentifierReferenceSupport(Strin
if (logger.isDebugEnabled())
logger.debug("Search-Bundle result: {}", fhirContext.newJsonParser().encodeResourceToString(resultBundle));

return Stream.concat(Stream.of(patient),
return Stream.concat(Stream.of(localPatient.get()),
resultBundle.getEntry().stream().filter(e -> e.hasResource() && e.getResource() instanceof Bundle)
.map(e -> (Bundle) e.getResource()).flatMap(this::getDomainResources));
}

private Optional<Patient> findPatientInLocalFhirStore(String system, String pseudonym)
{
Bundle patientBundle = (Bundle) clientFactory.getFhirStoreClient().search().forResource(Patient.class)
.where(Patient.IDENTIFIER.exactly().systemAndIdentifier(system, pseudonym)).execute();

if (logger.isDebugEnabled())
logger.debug("Patient search-bundle result: {}",
fhirContext.newJsonParser().encodeResourceToString(patientBundle));

if (patientBundle.getTotal() != 1 || !(patientBundle.getEntryFirstRep().getResource() instanceof Patient))
return Optional.empty();
else
return Optional.of((Patient) patientBundle.getEntryFirstRep().getResource());
}

private Stream<DomainResource> getDomainResources(Bundle bundle)
{
Stream<DomainResource> domainResources = getDomainResourcesFromBundle(bundle);
Expand Down Expand Up @@ -501,7 +524,175 @@ private Stream<DomainResource> doGetDomainResources(String nextUrl, int subTotal
@Override
public void storeBundle(Bundle bundle)
{
if (logger.isDebugEnabled())
logger.debug("Bundle: {}", fhirContext.newJsonParser().encodeResourceToString(bundle));

if (clientFactory.supportsIdentifierReferenceSearch())
clientFactory.getFhirStoreClient().transaction().withBundle(bundle)
.withAdditionalHeader(Constants.HEADER_PREFER, "handling=strict").execute();
else
storeBundleWithoutLogicalReferencesSupport(bundle);
}

private void storeBundleWithoutLogicalReferencesSupport(Bundle bundle)
{
modifyBundle(bundle);

clientFactory.getFhirStoreClient().transaction().withBundle(bundle)
.withAdditionalHeader(Constants.HEADER_PREFER, "handling=strict").execute();
}

private void modifyBundle(Bundle bundle)
{
// bundle has patient
// - db has patient by pseudonym -> update references, modify conditions
// - db does not have patient -> remove patient condition
// bundle has no patient, select from first resource by ref
// - db has patient by pseudonym -> update references, modify conditions
// - error

Optional<Patient> bundlePatient = bundle.getEntry().stream()
.filter(e -> e.hasResource() && e.getResource() instanceof Patient).map(e -> (Patient) e.getResource())
.findFirst();

if (bundlePatient.isPresent())
{
Optional<String> pseudonym = getPseudonym(bundlePatient.get(),
ConstantsDataTransfer.NAMING_SYSTEM_NUM_CODEX_CRR_PSEUDONYM);

if (pseudonym.isEmpty())
throw new RuntimeException("Patient has no pseudonym");

Optional<Patient> localPatient = findPatientInLocalFhirStore(
ConstantsDataTransfer.NAMING_SYSTEM_NUM_CODEX_CRR_PSEUDONYM, pseudonym.get());
if (localPatient.isPresent())
{
String localPatientid = localPatient.get().getIdElement().getIdPart();
modifyBundleWithPatientId(bundle, pseudonym.get(), localPatientid);
}
else
{
String tempId = bundlePatient.get().getIdElement().getIdPart();
modifyBundleWithTempPatientId(bundle, pseudonym.get(), tempId);
}
}
else
{
Optional<String> pseudonym = bundle.getEntry().stream().filter(e -> e.hasResource())
.map(e -> e.getResource()).map(this::getSubject).filter(r -> r.hasIdentifier())
.map(r -> r.getIdentifier())
.filter(i -> ConstantsDataTransfer.NAMING_SYSTEM_NUM_CODEX_CRR_PSEUDONYM.equals(i.getSystem()))
.map(i -> i.getValue()).findFirst();

if (pseudonym.isEmpty())
throw new RuntimeException("Patient has no pseudonym");

Optional<Patient> localPatient = findPatientInLocalFhirStore(
ConstantsDataTransfer.NAMING_SYSTEM_NUM_CODEX_CRR_PSEUDONYM, pseudonym.get());
if (localPatient.isPresent())
{
String localPatientid = localPatient.get().getIdElement().getIdPart();
modifyBundleWithPatientId(bundle, pseudonym.get(), localPatientid);
}
else
{
logger.warn(
"Bundle does not contain Patient, and Patient with pseudonym {} not found in local fhir store",
pseudonym.get());
throw new RuntimeException(
"Bundle has no patient and local fhir store has no patient with pseudonym " + pseudonym.get());
}
}

if (logger.isDebugEnabled())
logger.debug("Modified bundle: {}", fhirContext.newJsonParser().encodeResourceToString(bundle));
}

private void modifyBundleWithTempPatientId(Bundle bundle, String pseudonym, String tempId)
{
bundle.getEntry().stream().filter(e -> e.hasResource() && !(e.getResource() instanceof Patient)).forEach(e ->
{
setSubject(e.getResource(), new Reference(tempId));
modifyConditionalUpdateUrl(e, pseudonym, "");
});
}

private void modifyBundleWithPatientId(Bundle bundle, String pseudonym, String patientId)
{
bundle.getEntry().stream().filter(e -> e.hasResource() && !(e.getResource() instanceof Patient)).forEach(e ->
{
setSubject(e.getResource(), new Reference(new IdType("Patient", patientId)));
modifyConditionalUpdateUrl(e, pseudonym, "&patient=Patient/" + patientId);
});
}

private void modifyConditionalUpdateUrl(BundleEntryComponent entry, String pseudonym, String replacement)
{
String url = entry.getRequest().getUrl();
String newUrl = url.replace(
"&patient:identifier=" + ConstantsDataTransfer.NAMING_SYSTEM_NUM_CODEX_CRR_PSEUDONYM + "|" + pseudonym,
replacement);
entry.getRequest().setUrl(newUrl);
}

private Resource setSubject(Resource resource, Reference patientRef)
{
if (resource instanceof Condition)
{
((Condition) resource).setSubject(patientRef);
return resource;
}
else if (resource instanceof Consent)
{
((Consent) resource).setPatient(patientRef);
return resource;
}
else if (resource instanceof DiagnosticReport)
{
((DiagnosticReport) resource).setSubject(patientRef);
return resource;
}
else if (resource instanceof Immunization)
{
((Immunization) resource).setPatient(patientRef);
return resource;
}
else if (resource instanceof MedicationStatement)
{
((MedicationStatement) resource).setSubject(patientRef);
return resource;
}
else if (resource instanceof Observation)
{
((Observation) resource).setSubject(patientRef);
return resource;
}
else if (resource instanceof Procedure)
{
((Procedure) resource).setSubject(patientRef);
return resource;
}
else
throw new RuntimeException("Resource of type " + resource.getResourceType().name() + " not supported");
}

private Reference getSubject(Resource resource)
{
if (resource instanceof Condition)
return ((Condition) resource).getSubject();
else if (resource instanceof Consent)
return ((Consent) resource).getPatient();
else if (resource instanceof DiagnosticReport)
return ((DiagnosticReport) resource).getSubject();
else if (resource instanceof Immunization)
return ((Immunization) resource).getPatient();
else if (resource instanceof MedicationStatement)
return ((MedicationStatement) resource).getSubject();
else if (resource instanceof Observation)
return ((Observation) resource).getSubject();
else if (resource instanceof Procedure)
return ((Procedure) resource).getSubject();
else
throw new RuntimeException("Resource of type " + resource.getResourceType().name() + " not supported");
}
}
Loading

0 comments on commit c3828b4

Please sign in to comment.