Skip to content

Commit

Permalink
Merge pull request #1517 from SpineEventEngine/allow-detecting-entiti…
Browse files Browse the repository at this point in the history
…es-stop-matching-subscriptions

Allow to detect Entities which no longer match subscription criteria
  • Loading branch information
armiol authored Jun 6, 2023
2 parents d6822a7 + 0d4a6e5 commit 8b2daa8
Show file tree
Hide file tree
Showing 36 changed files with 1,511 additions and 223 deletions.
71 changes: 71 additions & 0 deletions client/src/main/java/io/spine/client/NoLongerMatchingConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2023, TeamDev. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and/or binary forms, with or without
* modification, must retain the above copyright notice and the following
* disclaimer.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.spine.client;

import com.google.protobuf.Any;
import io.spine.base.Identifier;
import io.spine.protobuf.AnyPacker;

import java.util.function.Consumer;

/**
* Consumer-delegate of entity IDs, which correspond to entities no longer matching
* the subscription criteria.
*
* <p>Unpacks the ID values from the passed {@code Any} instances according
* to the identifier type passed. It is a responsibility of callees to provide
* the relevant type of identifiers.
*
* @param <I>
* the type of entity identifiers
*/
final class NoLongerMatchingConsumer<I> implements Consumer<Any> {

private final Class<I> idClass;
private final Consumer<I> delegate;

/**
* Creates a new instance of {@code IdConsumer}.
*
* @param idClass
* the type of identifiers used to unpack the incoming {@code Any} instances
* @param delegate
* the consumer to delegate the observation to
*/
NoLongerMatchingConsumer(Class<I> idClass, Consumer<I> delegate) {
this.idClass = idClass;
this.delegate = delegate;
}

@Override
public void accept(Any packedId) {
var entityId = AnyPacker.unpack(packedId, EntityId.class);
var any = entityId.getId();
var unpacked = Identifier.unpack(any, idClass);
delegate.accept(unpacked);
}
}
74 changes: 74 additions & 0 deletions client/src/main/java/io/spine/client/NoLongerMatchingFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2023, TeamDev. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and/or binary forms, with or without
* modification, must retain the above copyright notice and the following
* disclaimer.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.spine.client;

import io.grpc.stub.StreamObserver;

import static io.spine.client.EntityStateUpdate.KindCase.NO_LONGER_MATCHING;

/**
* An observer taking {@code SubscriptionUpdate} and passing it to the specified
* {@code NoLongerMatchingConsumer} in case the {@code SubscriptionUpdate} tells
* that some entity stopped matching the subscription criteria.
*
* <p>If {@code SubscriptionUpdate} does not correspond to "no longer matching" scenario,
* does nothing.
*/
final class NoLongerMatchingFilter implements StreamObserver<SubscriptionUpdate> {

private final NoLongerMatchingConsumer<?> consumer;

NoLongerMatchingFilter(NoLongerMatchingConsumer<?> consumer) {
this.consumer = consumer;
}

@Override
public void onNext(SubscriptionUpdate value) {
if (value.hasEntityUpdates()) {
var updates = value.getEntityUpdates()
.getUpdateList();
var stream = updates.stream();
stream.filter(NoLongerMatchingFilter::isNoLongerMatching)
.map(EntityStateUpdate::getId)
.forEach(consumer);
}
}

private static boolean isNoLongerMatching(EntityStateUpdate update) {
return NO_LONGER_MATCHING == update.getKindCase();
}

@Override
public void onError(Throwable t) {
// Do nothing.
}

@Override
public void onCompleted() {
// Do nothing.
}
}
39 changes: 33 additions & 6 deletions client/src/main/java/io/spine/client/SubscribingRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.grpc.stub.StreamObserver;
import io.spine.base.MessageContext;

import java.util.Optional;
import java.util.function.Consumer;

/**
Expand All @@ -47,13 +48,13 @@
* (e.g. {@link io.spine.core.Event}); if subscribed message type does not have a context,
* this parameter is likely to be the same as {@code M}
* @param <B>
* the type of this requests for return type covariance
* the type of this request, for returning type covariance
*/
public abstract class
SubscribingRequest<M extends Message,
C extends MessageContext,
W extends Message,
B extends SubscribingRequest<M, C, W, B>>
C extends MessageContext,
W extends Message,
B extends SubscribingRequest<M, C, W, B>>
extends FilteringRequest<M, Topic, TopicBuilder, B> {

SubscribingRequest(ClientRequest parent, Class<M> type) {
Expand Down Expand Up @@ -117,11 +118,28 @@ public B onConsumingError(ConsumerErrorHandler<M> handler) {
public Subscription post() {
var topic = builder().build();
var observer = createObserver();
return subscribe(topic, observer);
var subscription = chain().map(c -> subscribe(topic, observer, c))
.orElseGet(() -> subscribe(topic, observer));
return subscription;
}

private StreamObserver<W> createObserver() {
return consumers().build().toObserver();
return consumers().build()
.toObserver();
}

/**
* Returns an observer of raw {@code SubscriptionUpdate}s, which will be called
* in addition to notifying the {@linkplain #consumers() consumers}.
*
* <p>Descendants may choose to override this method in order to specify their
* observer chaining policy.
*
* @return {@code StreamObserver} to call in chain (wrapped into {@code Optional}),
* or {@code Optional.empty()} if no such chaining is configured
*/
protected Optional<StreamObserver<SubscriptionUpdate>> chain() {
return Optional.empty();
}

private Subscription subscribe(Topic topic, StreamObserver<W> observer) {
Expand All @@ -130,4 +148,13 @@ private Subscription subscribe(Topic topic, StreamObserver<W> observer) {
.subscribeTo(topic, observer);
return subscription;
}

private Subscription subscribe(Topic topic,
StreamObserver<W> observer,
StreamObserver<SubscriptionUpdate> chain) {
var subscription =
client().subscriptions()
.subscribeTo(topic, observer, chain);
return subscription;
}
}
52 changes: 50 additions & 2 deletions client/src/main/java/io/spine/client/SubscriptionObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.google.protobuf.Message;
import io.grpc.stub.StreamObserver;

import java.util.Optional;

import static io.spine.protobuf.AnyPacker.unpack;
import static io.spine.util.Exceptions.unsupported;

Expand All @@ -42,16 +44,61 @@
* and sent to the delegate observer one by one.
*
* @param <M>
* the type of the delegate observer messages, which could be unpacked entity state
* or {@code Event}
* the type of the delegate-observer's messages, which can either be
* an unpacked entity state or an {@code Event}
*/
final class SubscriptionObserver<M extends Message>
implements StreamObserver<SubscriptionUpdate> {

/**
* Delegate which would receive the unpacked domain-specific messages, such as
* entity states or {@code Event}s.
*/
private final StreamObserver<M> delegate;


/**
* Optional chained observer of raw {@code SubscriptionUpdate}s,
* which would receive its input within the same subscription.
*
* <p>Such an observer may be handy for descendants which need more details
* than just an "unpacked" domain message.
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType"
/* Could have been `@Nullable`,
but it is always used as `Optional`;
so having `Optional` field is an optimization. */)
private final Optional<StreamObserver<SubscriptionUpdate>> chain;

/**
* Creates a new instance of {@code SubscriptionObserver}.
*
* <p>Specifies no chained observer.
*
* @param targetObserver a delegate consuming the {@code Entity} state, or an {@code Event}
*/
SubscriptionObserver(StreamObserver<M> targetObserver) {
this.delegate = targetObserver;
this.chain = Optional.empty();
}

/**
* Creates a new instance of {@code SubscriptionObserver}.
*
* <p>Use this constructor in favour of
* {@link SubscriptionObserver#SubscriptionObserver(StreamObserver)
* SubscriptionObserver(StreamObserver)} to additionally set the observer chain.
*
* @param targetObserver
* a delegate consuming the {@code Entity} state, or an {@code Event}
* @param chain
* chained observer consuming {@code SubscriptionUpdate} obtained within
* the same subscription
*/
SubscriptionObserver(StreamObserver<M> targetObserver,
StreamObserver<SubscriptionUpdate> chain) {
this.delegate = targetObserver;
this.chain = Optional.of(chain);
}

@SuppressWarnings("unchecked") // Logically correct.
Expand Down Expand Up @@ -79,6 +126,7 @@ public void onNext(SubscriptionUpdate value) {
default:
throw unsupported("Unsupported update case `%s`.", updateCase);
}
chain.ifPresent(observer -> observer.onNext(value));
}

@Override
Expand Down
50 changes: 50 additions & 0 deletions client/src/main/java/io/spine/client/SubscriptionRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
package io.spine.client;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.stub.StreamObserver;
import io.spine.base.EntityState;
import io.spine.core.EmptyContext;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.spine.client.Filters.extractFilters;

/**
Expand All @@ -44,8 +48,16 @@
public final class SubscriptionRequest<S extends EntityState<?>>
extends SubscribingRequest<S, EmptyContext, S, SubscriptionRequest<S>> {

/**
* The builder of consumers of entity state.
*/
private final StateConsumers.Builder<S> consumers;

/**
* Optional consumer of entity IDs, which no longer match the subscription criteria.
*/
private @Nullable NoLongerMatchingConsumer<?> nlmConsumer;

SubscriptionRequest(ClientRequest parent, Class<S> type) {
super(parent, type);
this.consumers = StateConsumers.newBuilder();
Expand All @@ -69,6 +81,44 @@ public SubscriptionRequest<S> where(CompositeEntityStateFilter... filter) {
return self();
}

/**
* Adds a consumer observing the entities which previously matched the subscription criteria,
* but stopped to do so.
*
* <p>The consumer is fed with the ID of the entity in the use-cases which follow:
*
* <ul>
* <li>the value of entity fields is changed, so that the entity state does not pass
* the subscription filters;
* <li>entity is deleted;
* <li>entity is archived.</li>
* </ul>
*
* <p>It is a responsibility of callee to provide a correct type of entity identifiers.
*
* @param consumer
* the consumer to notify
* @param idType
* the type of entity identifiers
* @param <I>
* type of entity identifiers, for covariance
* @return this instance of {@code SubscriptionRequest}, for call chaining
*/
public <I> SubscriptionRequest<S> whenNoLongerMatching(Class<I> idType, Consumer<I> consumer) {
checkNotNull(idType);
checkNotNull(consumer);
nlmConsumer = new NoLongerMatchingConsumer<>(idType, consumer);
return self();
}

@Override
protected Optional<StreamObserver<SubscriptionUpdate>> chain() {
if (null == nlmConsumer) {
return Optional.empty();
}
return Optional.of(new NoLongerMatchingFilter(nlmConsumer));
}

@Override
StateConsumers.Builder<S> consumers() {
return consumers;
Expand Down
9 changes: 9 additions & 0 deletions client/src/main/java/io/spine/client/Subscriptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ <M extends Message> Subscription subscribeTo(Topic topic, StreamObserver<M> obse
return subscription;
}

<M extends Message> Subscription subscribeTo(Topic topic,
StreamObserver<M> observer,
StreamObserver<SubscriptionUpdate> chain) {
var subscription = blockingServiceStub.subscribe(topic);
service.activate(subscription, new SubscriptionObserver<>(observer, chain));
add(subscription);
return subscription;
}

/** Adds all the passed subscriptions. */
void addAll(Iterable<Subscription> newSubscriptions) {
newSubscriptions.forEach(this::add);
Expand Down
Loading

0 comments on commit 8b2daa8

Please sign in to comment.