Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/#427 make iterate by contribute more streamable, replace filter with takewhile #428

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ Changelog
### bugfixes

* fix a bug which causes queries using the geometry filters `length:` and `area:` to fail when executed on an ignite cluster ([#426])
* fix a bug which cause ComputeJobs to keep processing for a while despite they are already canceled ([#428])

[#419]: https://github.com/GIScience/oshdb/pull/419
[#426]: https://github.com/GIScience/oshdb/pull/426
[#428]: https://github.com/GIScience/oshdb/pull/428


## 0.7.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class Kernels implements Serializable {
interface CellProcessor<S> extends SerializableBiFunction<GridOSHEntity, CellIterator, S> {}

interface CancelableProcessStatus {
default <T> boolean isActive(T ignored) {
return isActive();
}

boolean isActive();
}

Expand Down Expand Up @@ -57,7 +61,7 @@ static <R, S> CellProcessor<S> getOSMContributionCellReducer(
// iterate over the history of all OSM objects in the current cell
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
cellIterator.iterateByContribution(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.forEach(contribution -> {
OSMContribution osmContribution = new OSMContributionImpl(contribution);
accInternal.set(accumulator.apply(accInternal.get(), mapper.apply(osmContribution)));
Expand Down Expand Up @@ -87,7 +91,7 @@ static <R, S> CellProcessor<S> getOSMContributionGroupingCellReducer(
// iterate over the history of all OSM objects in the current cell
List<OSMContribution> contributions = new ArrayList<>();
cellIterator.iterateByContribution(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.forEach(contribution -> {
OSMContribution thisContribution = new OSMContributionImpl(contribution);
if (contributions.size() > 0
Expand Down Expand Up @@ -131,7 +135,7 @@ static <R, S> CellProcessor<S> getOSMEntitySnapshotCellReducer(
// iterate over the history of all OSM objects in the current cell
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
cellIterator.iterateByTimestamps(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.forEach(data -> {
OSMEntitySnapshot snapshot = new OSMEntitySnapshotImpl(data);
// immediately fold the result
Expand Down Expand Up @@ -162,7 +166,7 @@ static <R, S> CellProcessor<S> getOSMEntitySnapshotGroupingCellReducer(
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
List<OSMEntitySnapshot> osmEntitySnapshots = new ArrayList<>();
cellIterator.iterateByTimestamps(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.forEach(data -> {
OSMEntitySnapshot thisSnapshot = new OSMEntitySnapshotImpl(data);
if (osmEntitySnapshots.size() > 0
Expand Down Expand Up @@ -203,7 +207,7 @@ static <S> CellProcessor<Stream<S>> getOSMContributionCellStreamer(
return (oshEntityCell, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
return cellIterator.iterateByContribution(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.map(OSMContributionImpl::new)
.map(mapper);
};
Expand All @@ -226,7 +230,7 @@ static <S> CellProcessor<Stream<S>> getOSMContributionGroupingCellStreamer(
List<OSMContribution> contributions = new ArrayList<>();
List<S> result = new LinkedList<>();
cellIterator.iterateByContribution(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.map(OSMContributionImpl::new)
.forEach(contribution -> {
if (contributions.size() > 0 && contribution.getEntityAfter().getId()
Expand Down Expand Up @@ -260,7 +264,7 @@ static <S> CellProcessor<Stream<S>> getOSMEntitySnapshotCellStreamer(
return (oshEntityCell, cellIterator) -> {
// iterate over the history of all OSM objects in the current cell
return cellIterator.iterateByTimestamps(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.map(OSMEntitySnapshotImpl::new)
.map(mapper);
};
Expand All @@ -283,7 +287,7 @@ static <S> CellProcessor<Stream<S>> getOSMEntitySnapshotGroupingCellStreamer(
List<OSMEntitySnapshot> snapshots = new ArrayList<>();
List<S> result = new LinkedList<>();
cellIterator.iterateByTimestamps(oshEntityCell)
.filter(ignored -> process.isActive())
.takeWhile(process::isActive)
.map(OSMEntitySnapshotImpl::new)
.forEach(contribution -> {
if (snapshots.size() > 0 && contribution.getEntity().getId()
Expand Down
Loading