Skip to content

Commit

Permalink
Merge pull request #428 from GIScience/bug/#427_make_iterateByContrib…
Browse files Browse the repository at this point in the history
…ute_more_streamable_replace_filter_with_takewhile

Bug/#427 make iterate by contribute more streamable, replace filter with takewhile
  • Loading branch information
tyrasd authored Sep 16, 2021
2 parents 439c94b + 17c0155 commit c03ab5e
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 149 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ Changelog
### bugfixes

* fix a bug which causes queries using the geometry filters `length:` and `area:` to fail when executed on an ignite cluster ([#426])
* fix a bug which cause ComputeJobs to keep processing for a while despite they are already canceled ([#428])

[#419]: https://github.com/GIScience/oshdb/pull/419
[#426]: https://github.com/GIScience/oshdb/pull/426
[#428]: https://github.com/GIScience/oshdb/pull/428


## 0.7.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class Kernels implements Serializable {
interface CellProcessor<S> extends SerializableBiFunction<GridOSHEntity, CellIterator, S> {}

interface CancelableProcessStatus {
default <T> boolean isActive(T ignored) {
return isActive();
}

boolean isActive();
}

Expand Down Expand Up @@ -57,7 +61,7 @@ static <R, S> CellProcessor<S> getOSMContributionCellReducer(
// iterate over the history of all OSM objects in the current cell
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
cellIterator.iterateByContribution(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.forEach(contribution -> {
OSMContribution osmContribution = new OSMContributionImpl(contribution);
accInternal.set(accumulator.apply(accInternal.get(), mapper.apply(osmContribution)));
Expand Down Expand Up @@ -87,7 +91,7 @@ static <R, S> CellProcessor<S> getOSMContributionGroupingCellReducer(
// iterate over the history of all OSM objects in the current cell
List<OSMContribution> contributions = new ArrayList<>();
cellIterator.iterateByContribution(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.forEach(contribution -> {
OSMContribution thisContribution = new OSMContributionImpl(contribution);
if (contributions.size() > 0
Expand Down Expand Up @@ -131,7 +135,7 @@ static <R, S> CellProcessor<S> getOSMEntitySnapshotCellReducer(
// iterate over the history of all OSM objects in the current cell
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
cellIterator.iterateByTimestamps(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.forEach(data -> {
OSMEntitySnapshot snapshot = new OSMEntitySnapshotImpl(data);
// immediately fold the result
Expand Down Expand Up @@ -162,7 +166,7 @@ static <R, S> CellProcessor<S> getOSMEntitySnapshotGroupingCellReducer(
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
List<OSMEntitySnapshot> osmEntitySnapshots = new ArrayList<>();
cellIterator.iterateByTimestamps(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.forEach(data -> {
OSMEntitySnapshot thisSnapshot = new OSMEntitySnapshotImpl(data);
if (osmEntitySnapshots.size() > 0
Expand Down Expand Up @@ -203,7 +207,7 @@ static <S> CellProcessor<Stream<S>> getOSMContributionCellStreamer(
return (oshEntityCell, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
return cellIterator.iterateByContribution(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.map(OSMContributionImpl::new)
.map(mapper);
};
Expand All @@ -226,7 +230,7 @@ static <S> CellProcessor<Stream<S>> getOSMContributionGroupingCellStreamer(
List<OSMContribution> contributions = new ArrayList<>();
List<S> result = new LinkedList<>();
cellIterator.iterateByContribution(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.map(OSMContributionImpl::new)
.forEach(contribution -> {
if (contributions.size() > 0 && contribution.getEntityAfter().getId()
Expand Down Expand Up @@ -260,7 +264,7 @@ static <S> CellProcessor<Stream<S>> getOSMEntitySnapshotCellStreamer(
return (oshEntityCell, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
return cellIterator.iterateByTimestamps(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.map(OSMEntitySnapshotImpl::new)
.map(mapper);
};
Expand All @@ -283,7 +287,7 @@ static <S> CellProcessor<Stream<S>> getOSMEntitySnapshotGroupingCellStreamer(
List<OSMEntitySnapshot> snapshots = new ArrayList<>();
List<S> result = new LinkedList<>();
cellIterator.iterateByTimestamps(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.map(OSMEntitySnapshotImpl::new)
.forEach(contribution -> {
if (snapshots.size() > 0 && contribution.getEntity().getId()
Expand Down
Loading

0 comments on commit c03ab5e

Please sign in to comment.