Skip to content

Commit

Permalink
Merge pull request #193 from SpineEventEngine/1.x-bundle-keep-up-from…
Browse files Browse the repository at this point in the history
…-1.7.4

[1.x] Bulk keep-up and cancellation requests (port of #170)
  • Loading branch information
armiol authored Jan 5, 2023
2 parents 5deb3d4 + 4532cf3 commit f2087e5
Show file tree
Hide file tree
Showing 15 changed files with 413 additions and 78 deletions.
31 changes: 17 additions & 14 deletions client-js/main/client/firebase-subscription-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,23 @@ export class FirebaseSubscriptionService {
* @private
*/
_keepUpSubscriptions() {
this._subscriptions.forEach(subscription => {
const spineSubscription = subscription.internal();
if (subscription.closed) {
this._endpoint.cancelSubscription(spineSubscription).then(() => {
this._removeSubscription(subscription);
});
} else {
this._endpoint.keepUpSubscription(spineSubscription).then(response => {
const responseStatus = response.status;
const responseStatusProto = ObjectToProto.convert(responseStatus, Status.typeUrl());
if (responseStatusProto.getStatusCase() !== Status.StatusCase.OK) {
this._removeSubscription(subscription)
}
});
const cancelledSubscriptions = this._subscriptions.filter(s => s.closed);
if (cancelledSubscriptions.length > 0) {
const subscriptionMessages = cancelledSubscriptions.map(s => s.internal())
this._endpoint.cancelAll(subscriptionMessages);
cancelledSubscriptions.forEach(s => this._removeSubscription(s))
}
const subscriptions = this._subscriptions.map(value => value.internal());
if (subscriptions.length === 0) {
return;
}
this._endpoint.keepUpSubscriptions(subscriptions).then(response => {
for (let i = 0; i < response.response.length; i++) {
const r = response.response[i];
const status = ObjectToProto.convert(r.status, Status.typeUrl());
if (status.getStatusCase() !== Status.StatusCase.OK) {
this._removeSubscription(subscriptions[i])
}
}
});
}
Expand Down
148 changes: 119 additions & 29 deletions client-js/main/client/http-endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import {TypedMessage} from './typed-message';
import {ClientError, ConnectionError, ServerError, SpineError} from './errors';
import {Subscriptions} from '../proto/spine/web/keeping_up_pb';

/**
* @typedef {Object} SubscriptionRouting
Expand All @@ -36,8 +37,12 @@ import {ClientError, ConnectionError, ServerError, SpineError} from './errors';
* the name of the subscription creation endpoint; defaults to "/subscription/create"
* @property {string} keepUp
* the name of the subscription keep up endpoint; defaults to "/subscription/keep-up"
* @property {string} keepUpAll
* the name of the subscription bulk keep up endpoint; defaults to "/subscription/keep-up-all"
* @property {string} cancel
* the name of the subscription cancellation endpoint; defaults to "/subscription/cancel"
* @property {string} cancelAll
* the name of the subscription bulk cancellation endpoint; defaults to "/subscription/cancel-all"
*/

/**
Expand All @@ -54,7 +59,7 @@ import {ClientError, ConnectionError, ServerError, SpineError} from './errors';
class Endpoint {

/**
* Sends off a command to the endpoint.
* Sends a command to the endpoint.
*
* @param {!TypedMessage<Command>} command a Command to send to the Spine server
* @return {Promise<Object>} a promise of a successful server response, rejected if
Expand All @@ -65,7 +70,7 @@ class Endpoint {
}

/**
* Sends off a query to the endpoint.
* Sends a query to the endpoint.
*
* @param {!spine.client.Query} query a Query to Spine server to retrieve some domain entities
* @return {Promise<Object>} a promise of a successful server response, rejected if
Expand All @@ -77,7 +82,7 @@ class Endpoint {
}

/**
* Sends off a request to subscribe to a provided topic to an endpoint.
* Sends a request to subscribe to a provided topic to an endpoint.
*
* @param {!spine.client.Topic} topic a topic for which a subscription is created
* @return {Promise<Object>} a promise of a successful server response, rejected if
Expand All @@ -89,21 +94,33 @@ class Endpoint {
}

/**
* Sends off a request to keep a subscription, stopping it from being closed by server.
* Sends a request to keep a subscription, stopping it from being closed by server.
*
* @param {!spine.client.Subscription} subscription a subscription that should be kept open
* @return {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
* @returns {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
*/
keepUpSubscription(subscription) {
keepUpSingleSubscription(subscription) {
const typedSubscription = TypedMessage.of(subscription);
return this._keepUp(typedSubscription);
}

/**
* Sends off a request to cancel an existing subscription.
* Sends a request to keep up several subscriptions, preventing them
* from being closed by the server.
*
* Cancelling subscription stops the server updating subscription with new values.
* @param {!Array<spine.client.Subscription>} subscriptions subscriptions that should be kept open
* @return {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
*/
keepUpSubscriptions(subscriptions) {
return this._keepUpAll(subscriptions);
}

/**
* Sends a request to cancel an existing subscription.
*
* Cancelling subscription stops the server from updating subscription with new values.
*
* @param {!spine.client.Subscription} subscription a subscription that should be kept open
* @return {Promise<Object>} a promise of a successful server response, rejected if
Expand All @@ -114,6 +131,19 @@ class Endpoint {
return this._cancel(typedSubscription);
}

/**
* Sends a request to cancel all the given subscriptions.
*
* Cancelling subscriptions stops the server from updating subscription with new values.
*
* @param {!Array<spine.client.Subscription>>} subscriptions subscriptions that should
* be cancelled
* @return {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
*/
cancelAll(subscriptions) {
return this._cancelAll(subscriptions);
}

/**
* @param {!TypedMessage<Command>} command a Command to send to the Spine server
Expand Down Expand Up @@ -159,6 +189,17 @@ class Endpoint {
throw new Error('Not implemented in abstract base.');
}

/**
* @param {!Array<TypedMessage<spine.client.Subscription>>} subscriptions subscriptions to keep up
* @return {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
* @protected
* @abstract
*/
_keepUpAll(subscriptions) {
throw new Error('Not implemented in abstract base.');
}

/**
* @param {!TypedMessage<spine.client.Subscription>} subscription a subscription to be canceled
* @return {Promise<Object>} a promise of a successful server response, rejected if
Expand All @@ -169,10 +210,22 @@ class Endpoint {
_cancel(subscription) {
throw new Error('Not implemented in abstract base.');
}


/**
* @param {!Array<spine.client.Subscription>} subscriptions subscriptions to be canceled
* @return {Promise<Object>} a promise of a successful server response, rejected if
* an error occurs
* @protected
* @abstract
*/
_cancelAll(subscriptions) {
throw new Error('Not implemented in abstract base.');
}
}

/**
* Spine HTTP endpoint which is used to send off Commands and Queries using
* Spine HTTP endpoint which is used to send Commands and Queries using
* the provided HTTP client.
*/
export class HttpEndpoint extends Endpoint {
Expand All @@ -188,12 +241,12 @@ export class HttpEndpoint extends Endpoint {
}

/**
* Sends off a command to the endpoint.
* Sends a command to the endpoint.
*
* @param {!TypedMessage<Command>} command a Command to send to the Spine server
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_executeCommand(command) {
Expand All @@ -202,12 +255,12 @@ export class HttpEndpoint extends Endpoint {
}

/**
* Sends off a query to the endpoint.
* Sends a query to the endpoint.
*
* @param {!TypedMessage<Query>} query a Query to Spine server to retrieve some domain entities
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_performQuery(query) {
Expand All @@ -216,12 +269,12 @@ export class HttpEndpoint extends Endpoint {
}

/**
* Sends off a request to create a subscription for a topic.
* Sends a request to create a subscription for a topic.
*
* @param {!TypedMessage<spine.client.Topic>} topic a topic to subscribe to
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_subscribeTo(topic) {
Expand All @@ -231,13 +284,13 @@ export class HttpEndpoint extends Endpoint {
}

/**
* Sends off a request to keep alive a subscription.
* Sends a request to keep alive the given subscription.
*
* @param {!TypedMessage<spine.client.Subscription>} subscription a subscription that is prevented
* from being closed by server
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_keepUp(subscription) {
Expand All @@ -247,12 +300,31 @@ export class HttpEndpoint extends Endpoint {
}

/**
* Sends off a request to cancel a subscription.
* Sends a request to keep alive the given subscriptions.
*
* @param {!Array<spine.client.Subscription>} subscriptions subscriptions that are prevented
* from being closed by the server
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_keepUpAll(subscriptions) {
const path = (this._routing && this._routing.subscription && this._routing.subscription.keepUpAll)
|| '/subscription/keep-up-all';
const request = new Subscriptions()
request.setSubscriptionList(subscriptions);
const typed = TypedMessage.of(request);
return this._sendMessage(path, typed);
}

/**
* Sends a request to cancel the given subscription.
*
* @param {!TypedMessage<spine.client.Subscription>} subscription a subscription to be canceled
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_cancel(subscription) {
Expand All @@ -261,14 +333,32 @@ export class HttpEndpoint extends Endpoint {
return this._sendMessage(path, subscription);
}

/**
* Sends a request to cancel the given subscriptions.
*
* @param {!Array<spine.client.Subscription>} subscriptions subscriptions to be canceled
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @protected
*/
_cancelAll(subscriptions) {
const path = (this._routing && this._routing.subscription && this._routing.subscription.cancelAll)
|| '/subscription/cancel-all';
const request = new Subscriptions();
request.setSubscriptionList(subscriptions);
const typed = TypedMessage.of(request);
return this._sendMessage(path, typed);
}

/**
* Sends the given message to the given endpoint.
*
* @param {!string} endpoint an endpoint to send the message to
* @param {!TypedMessage} message a message to send, as a {@link TypedMessage}
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or a
* connection error occurs
* rejected if the client response is not `2xx`,
* or a connection error occurs
* @private
*/
_sendMessage(endpoint, message) {
Expand All @@ -286,8 +376,8 @@ export class HttpEndpoint extends Endpoint {
*
* @param {!Response} response an HTTP request response
* @return {Promise<Object|SpineError>} a promise of a successful server response JSON data,
* rejected if the client response is not 2xx or if JSON
* parsing fails
* rejected if the client response is not `2xx`,
* or if JSON parsing fails
* @private
*/
static _jsonOrError(response) {
Expand Down
2 changes: 1 addition & 1 deletion client-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "spine-web",
"version": "1.9.0-SNAPSHOT.7",
"version": "1.9.0-SNAPSHOT.8",
"license": "Apache-2.0",
"description": "A JS client for interacting with Spine applications.",
"homepage": "https://spine.io",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.spine.core.Response;
import io.spine.core.Status;
import io.spine.type.TypeUrl;
import io.spine.web.Responses;
import io.spine.web.Subscriptions;
import io.spine.web.firebase.FirebaseClient;
import io.spine.web.firebase.NodePath;
import io.spine.web.firebase.RequestNodePath;
Expand Down Expand Up @@ -107,6 +109,18 @@ public Response keepUp(Subscription subscription) {
return exists ? ok() : missing(subscription);
}

@Override
public Responses keepUpAll(Subscriptions subscriptions) {
checkNotNull(subscriptions);
return subscriptions.getSubscriptionList()
.stream()
.map(this::keepUp)
.collect(Responses::newBuilder,
Responses.Builder::addResponse,
(l, r) -> l.addAllResponse(r.getResponseList()))
.vBuild();
}

@Override
public Response cancel(Subscription subscription) {
checkNotNull(subscription);
Expand All @@ -120,6 +134,18 @@ public Response cancel(Subscription subscription) {
return localSubscription.isPresent() ? ok() : missing(subscription);
}

@Override
public Responses cancelAll(Subscriptions request) {
checkNotNull(request);
Responses.Builder result = Responses.newBuilder();
for (Subscription subscription : request.getSubscriptionList()) {
Response response = cancel(subscription);
result.addResponse(response);
}
return result.vBuild();
}


private static Response missing(Subscription subscription) {
String errorMessage =
format("Subscription `%s` is unknown or already canceled.",
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/js-tests/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "client-js-tests",
"version": "1.9.0-SNAPSHOT.7",
"version": "1.9.0-SNAPSHOT.8",
"license": "Apache-2.0",
"description": "Tests of a `spine-web` JS library against the Spine-based application.",
"scripts": {
Expand Down
Loading

0 comments on commit f2087e5

Please sign in to comment.