Skip to content

Commit

Permalink
Hash table references instead of nested tables. (#572)
Browse files Browse the repository at this point in the history
  • Loading branch information
mansoor-sajjad authored Oct 24, 2024
1 parent c7a72b6 commit 0dbc285
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 226 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<entur.google.pubsub.emulator.download.skip>false</entur.google.pubsub.emulator.download.skip>
<camel.version>4.4.4</camel.version>
<entur.helpers.version>2.34</entur.helpers.version>
<netex-validator-java.version>4.0.1</netex-validator-java.version>
<netex-validator-java.version>4.0.2</netex-validator-java.version>
<commons-io.version>2.11.0</commons-io.version>
<zt-zip.version>1.17</zt-zip.version>
<redisson.version>3.37.0</redisson.version>
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/no/entur/antu/config/NetexDataConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import no.entur.antu.netexdata.DefaultNetexDataRepository;
import no.entur.antu.netexdata.NetexDataResource;
import org.entur.netex.validation.validator.jaxb.NetexDataRepository;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -29,6 +30,7 @@ NetexDataResource netexDataResource() {
@Profile("!test")
NetexDataRepository netexDataRepository(
NetexDataResource netexDataResource,
RedissonClient redissonClient,
@Qualifier(
SCHEDULED_STOP_POINT_AND_QUAY_ID_CACHE
) Map<String, Map<String, String>> scheduledStopPointAndQuayIdCache,
Expand All @@ -45,6 +47,7 @@ NetexDataRepository netexDataRepository(
) {
return new DefaultNetexDataRepository(
netexDataResource,
redissonClient,
scheduledStopPointAndQuayIdCache,
serviceLinksAndFromToScheduledStopPointIdCache,
lineInfoCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,14 @@ public NetexValidatorsRunner timetableDataValidatorsRunner(
);

List<DatasetValidator> netexTimetableDatasetValidators = List.of(
duplicateLineNameValidator
// stopPointsInVehicleJourneyValidator
duplicateLineNameValidator,
stopPointsInVehicleJourneyValidator
);

List<NetexDataCollector> commonDataCollectors = List.of(
lineInfoCollector,
// serviceJourneyStopsCollector,
serviceJourneyInterchangeInfoCollector
serviceJourneyInterchangeInfoCollector,
serviceJourneyStopsCollector
);

return NetexValidatorsRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import no.entur.antu.exception.AntuException;
import org.entur.netex.validation.validator.jaxb.*;
import org.entur.netex.validation.validator.model.*;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -21,6 +24,7 @@ public class DefaultNetexDataRepository implements NetexDataRepository {
);

private final NetexDataResource netexDataResource;
private final RedissonClient redissonClient;
private final Map<String, Map<String, String>> scheduledStopPointAndQuayIdCache;
private final Map<String, Map<String, String>> serviceLinksAndFromToScheduledStopPointIdCache;
private final Map<String, List<String>> lineInfoCache;
Expand All @@ -29,13 +33,15 @@ public class DefaultNetexDataRepository implements NetexDataRepository {

public DefaultNetexDataRepository(
NetexDataResource netexDataResource,
RedissonClient redissonClient,
Map<String, Map<String, String>> scheduledStopPointAndQuayIdCache,
Map<String, Map<String, String>> serviceLinksAndFromToScheduledStopPointIdCache,
Map<String, List<String>> lineInfoCache,
Map<String, Map<String, List<String>>> serviceJourneyStopsCache,
Map<String, List<String>> serviceJourneyInterchangeInfoCache
) {
this.netexDataResource = netexDataResource;
this.redissonClient = redissonClient;
this.scheduledStopPointAndQuayIdCache = scheduledStopPointAndQuayIdCache;
this.serviceLinksAndFromToScheduledStopPointIdCache =
serviceLinksAndFromToScheduledStopPointIdCache;
Expand Down Expand Up @@ -100,56 +106,36 @@ public List<SimpleLine> lineNames(String validationReportId) {
return lineInfoForReportId.stream().map(SimpleLine::fromString).toList();
}

@Override
public List<ServiceJourneyStop> serviceJourneyStops(
String validationReportId,
ServiceJourneyId serviceJourneyId
public Map<ServiceJourneyId, List<ServiceJourneyStop>> serviceJourneyStops(
String validationReportId
) {
Map<String, List<String>> serviceJourneyStopsForReport =
serviceJourneyStopsCache.get(validationReportId);
if (serviceJourneyStopsForReport == null) {
throw new AntuException(
"ServiceJourneyStops cache not found for validation report with id: " +
validationReportId
return serviceJourneyStopsCache
.keySet()
.stream()
.filter(k -> k.startsWith(validationReportId))
.map(serviceJourneyStopsCache::get)
.flatMap(m -> m.entrySet().stream())
.collect(
Collectors.toMap(
k -> ServiceJourneyId.ofValidId(k.getKey()),
v ->
v.getValue().stream().map(ServiceJourneyStop::fromString).toList(),
(p, n) -> n
)
);
}
return Optional
.ofNullable(serviceJourneyStopsForReport.get(serviceJourneyId.id()))
.map(serviceJourneyStops ->
serviceJourneyStops
.stream()
.map(ServiceJourneyStop::fromString)
.filter(ServiceJourneyStop::isValid)
.toList()
)
.orElse(List.of());
}

@Override
public boolean hasServiceJourneyInterchangeInfos(String validationReportId) {
List<String> serviceJourneyInterchangeInfos =
serviceJourneyInterchangeInfoCache.get(validationReportId);
return (
serviceJourneyInterchangeInfos != null &&
!serviceJourneyInterchangeInfos.isEmpty()
);
}

@Override
public List<ServiceJourneyInterchangeInfo> serviceJourneyInterchangeInfos(
String validationReportId
) {
List<String> serviceJourneyInterchangeInfosForReport =
serviceJourneyInterchangeInfoCache.get(validationReportId);
if (serviceJourneyInterchangeInfosForReport == null) {
throw new AntuException(
"ServiceJourneyInterchangeInfoCache not found for validation report with id: " +
validationReportId
);
}

return serviceJourneyInterchangeInfosForReport
return Optional
.ofNullable(serviceJourneyInterchangeInfoCache)
.map(Map::entrySet)
.stream()
.flatMap(Set::stream)
.filter(entry -> entry.getKey().startsWith(validationReportId))
.flatMap(entry -> entry.getValue().stream())
.map(ServiceJourneyInterchangeInfo::fromString)
.toList();
}
Expand All @@ -176,7 +162,7 @@ public void fillNetexDataCache(
.getFromToScheduledStopPointIdPerServiceLinkId()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));

serviceLinksAndFromToScheduledStopPointIdCache.merge(
validationReportId,
Expand All @@ -199,7 +185,12 @@ public void cleanUp(String validationReportId) {
scheduledStopPointAndQuayIdCache.remove(validationReportId);
serviceLinksAndFromToScheduledStopPointIdCache.remove(validationReportId);
lineInfoCache.remove(validationReportId);
serviceJourneyStopsCache.remove(validationReportId);
serviceJourneyInterchangeInfoCache.remove(validationReportId);
redissonClient.getKeys().deleteByPattern(validationReportId + '*');
serviceJourneyStopsCache
.keySet()
.removeIf(k -> k.startsWith(validationReportId));
serviceJourneyInterchangeInfoCache
.keySet()
.removeIf(k -> k.startsWith(validationReportId));
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package no.entur.antu.netexdata.collectors;

import java.util.ArrayList;
import static no.entur.antu.config.cache.CacheConfig.SERVICE_JOURNEY_INTERCHANGE_INFO_CACHE;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import no.entur.antu.validation.AntuNetexData;
import org.entur.netex.validation.validator.jaxb.JAXBValidationContext;
import org.entur.netex.validation.validator.jaxb.NetexDataCollector;
import org.entur.netex.validation.validator.model.ServiceJourneyInterchangeInfo;
import org.redisson.api.RList;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;

Expand Down Expand Up @@ -35,20 +38,18 @@ protected void collectDataFromLineFile(
validationContext.getStopPlaceRepository()
);

antuNetexData
.serviceJourneyInterchanges()
.map(serviceJourneyInterchange ->
ServiceJourneyInterchangeInfo.of(
validationContext.getFileName(),
serviceJourneyInterchange
)
)
.forEach(serviceJourneyInterchangeInfo ->
addData(
antuNetexData.validationReportId(),
serviceJourneyInterchangeInfo
addData(
validationContext.getFileName(),
antuNetexData.validationReportId(),
antuNetexData
.serviceJourneyInterchanges()
.map(serviceJourneyInterchange ->
ServiceJourneyInterchangeInfo.of(
validationContext.getFileName(),
serviceJourneyInterchange
)
)
);
);
}

@Override
Expand All @@ -59,20 +60,31 @@ protected void collectDataFromCommonFile(
}

private void addData(
String fileName,
String validationReportId,
ServiceJourneyInterchangeInfo serviceJourneyInterchangeInfo
Stream<ServiceJourneyInterchangeInfo> serviceJourneyInterchangeInfos
) {
RLock lock = redissonClient.getLock(validationReportId);
try {
lock.lock();

serviceJourneyInterchangeInfoCache.merge(
validationReportId,
new ArrayList<>(List.of(serviceJourneyInterchangeInfo.toString())),
(existingList, newList) -> {
existingList.addAll(newList);
return existingList;
}
String keyName =
validationReportId +
"_" +
SERVICE_JOURNEY_INTERCHANGE_INFO_CACHE +
"_" +
fileName;

RList<String> serviceJourneyInterchangeInfosListCache =
redissonClient.getList(keyName);
serviceJourneyInterchangeInfosListCache.addAll(
serviceJourneyInterchangeInfos
.map(ServiceJourneyInterchangeInfo::toString)
.toList()
);
serviceJourneyInterchangeInfoCache.put(
keyName,
serviceJourneyInterchangeInfosListCache
);
} finally {
if (lock.isHeldByCurrentThread()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package no.entur.antu.netexdata.collectors;

import static no.entur.antu.config.cache.CacheConfig.SERVICE_JOURNEY_STOPS_CACHE;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -9,6 +11,7 @@
import org.entur.netex.validation.validator.model.ScheduledStopPointId;
import org.entur.netex.validation.validator.model.ServiceJourneyStop;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -44,7 +47,6 @@ protected void collectDataFromLineFile(

Map<String, List<String>> serviceJourneyStops = antuNetexData
.validServiceJourneys()
// TODO: unique service journeys ids
.map(serviceJourney -> {
Map<String, ScheduledStopPointId> scheduledStopPointIdMap =
AntuNetexData.scheduledStopPointIdByStopPointId(
Expand All @@ -71,6 +73,7 @@ protected void collectDataFromLineFile(

addServiceJourneyStops(
validationContext.getValidationReportId(),
validationContext.getFileName(),
serviceJourneyStops
);
}
Expand All @@ -84,20 +87,21 @@ protected void collectDataFromCommonFile(

private void addServiceJourneyStops(
String validationReportId,
String filename,
Map<String, List<String>> serviceJourneyStops
) {
RLock lock = redissonClient.getLock(validationReportId);
try {
lock.lock();

serviceJourneyStopsCache.merge(
validationReportId,
serviceJourneyStops,
(existingMap, newMap) -> {
existingMap.putAll(newMap);
return existingMap;
}
String keyName =
validationReportId + "_" + SERVICE_JOURNEY_STOPS_CACHE + "_" + filename;

RMap<String, List<String>> serviceJourneyStopsMap = redissonClient.getMap(
keyName
);
serviceJourneyStopsMap.putAll(serviceJourneyStops);
serviceJourneyStopsCache.put(keyName, serviceJourneyStopsMap);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
Expand Down
26 changes: 0 additions & 26 deletions src/main/java/no/entur/antu/validation/AntuNetexData.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,32 +283,6 @@ public ServiceJourney serviceJourney(
.orElse(null);
}

public ServiceJourneyStop serviceJourneyStopAtScheduleStopPoint(
VehicleJourneyRefStructure vehicleJourneyRefStructure,
ScheduledStopPointId scheduledStopPointId
) {
return serviceJourneyStops(vehicleJourneyRefStructure)
.stream()
.filter(serviceJourneyStop ->
serviceJourneyStop.scheduledStopPointId().equals(scheduledStopPointId)
)
.findFirst()
.orElse(null);
}

public List<ServiceJourneyStop> serviceJourneyStops(
VehicleJourneyRefStructure vehicleJourneyRefStructure
) {
return Optional
.ofNullable(
netexDataRepository.serviceJourneyStops(
validationReportId(),
ServiceJourneyId.ofValidId(vehicleJourneyRefStructure)
)
)
.orElse(List.of());
}

/**
* Returns the Stream of all ServiceJourneyInterchanges in all the TimeTableFrames.
*/
Expand Down
Loading

0 comments on commit 0dbc285

Please sign in to comment.