Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce data sent by shared worker #4066

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,12 @@ export class MotionDetailOriginalChangeRecommendationsComponent implements OnIni
await firstValueFrom(
this.autoupdateCommunications
.listen()
.pipe(filter(data => data && data.description?.includes(MOTION_DETAIL_SUBSCRIPTION)))
.pipe(
filter(
data =>
data && Object.values(data.streamIdDescriptions)?.includes(MOTION_DETAIL_SUBSCRIPTION)
)
)
);
this.dataLoaded = true;
this.update();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,10 @@ export class AutoupdateCommunicationService {

private handleReceiveData(data: AutoupdateReceiveData, dataSubscription: Subscriber<any>): void {
dataSubscription.next(data.content);
if (data.content?.description) {
this.subscriptionsWithData.add(data.content.description.replace(SUBSCRIPTION_SUFFIX, ``));
if (data.content?.streamIdDescriptions) {
for (const id of Object.keys(data.content.streamIdDescriptions)) {
this.subscriptionsWithData.add(data.content.streamIdDescriptions[id].replace(SUBSCRIPTION_SUFFIX, ``));
}
}
if (this.tryReconnectOpen) {
this.matSnackBar.dismiss();
Expand Down
90 changes: 42 additions & 48 deletions client/src/app/site/services/autoupdate/autoupdate.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ interface AutoupdateSubscriptionMap {

interface AutoupdateIncomingMessage {
autoupdateData: AutoupdateModelData;
autoupdateDataId: Id;
id: Id;
description?: string;
idDescriptionMap: { [id: Id]: string };
}

class AutoupdateEndpoint extends EndpointConfiguration {
Expand Down Expand Up @@ -74,8 +72,6 @@ export class AutoupdateService {
private _mutex = new Mutex();
private _currentQueryParams: QueryParams | null = null;
private _resolveDataReceived: ((value: ModelData) => void)[] = [];
private _lastHandeledDataId: Id;
private _lastHandeledDataRequestId: Id;

public constructor(
private httpEndpointService: HttpStreamEndpointService,
Expand All @@ -95,9 +91,7 @@ export class AutoupdateService {
this.communication.listen().subscribe(data => {
this.handleAutoupdate({
autoupdateData: data.data,
autoupdateDataId: data.dataId,
id: data.streamId,
description: data.description
idDescriptionMap: data.streamIdDescriptions
});
});
this.communication.listenShouldReconnect().subscribe(() => {
Expand Down Expand Up @@ -254,29 +248,18 @@ export class AutoupdateService {
};
}

private async handleAutoupdate({
autoupdateData,
autoupdateDataId,
id,
description
}: AutoupdateIncomingMessage): Promise<void> {
if (!this._activeRequestObjects || !this._activeRequestObjects[id]) {
private async handleAutoupdate({ autoupdateData, idDescriptionMap }: AutoupdateIncomingMessage): Promise<void> {
const requestIds = Object.keys(idDescriptionMap).map(id => +id);
if (!this._activeRequestObjects || !requestIds.some(id => this._activeRequestObjects[id])) {
return;
}

const modelData = autoupdateFormatToModelData(autoupdateData);
console.debug(`[autoupdate] from stream:`, description, id, [modelData, autoupdateData]);
if (this._lastHandeledDataId === autoupdateDataId) {
const unlock = await this._mutex.lock();
if (this._resolveDataReceived[id]) {
await this._activeRequestObjects[this._lastHandeledDataRequestId]?.modelSubscription?.receivedData;
this._resolveDataReceived[id](modelData);
delete this._resolveDataReceived[id];
}
return unlock();
}
this._lastHandeledDataId = autoupdateDataId;
this._lastHandeledDataRequestId = id;
console.debug(
`[autoupdate] from streams:`,
requestIds.map(id => `${id} - ${idDescriptionMap[id]}`).join(`, `),
[modelData, autoupdateData]
);

const fullListUpdateCollections: {
[collection: string]: Ids;
Expand All @@ -285,26 +268,35 @@ export class AutoupdateService {
[collection: string]: { ids: Ids; parentCollection: Collection; parentField: string; parentId: Id };
} = {};

const { modelRequest } = this._activeRequestObjects[id];
if (modelRequest) {
for (const key of Object.keys(autoupdateData)) {
const data = key.split(`/`);
const collectionRelation = `${data[COLLECTION_INDEX]}/${data[FIELD_INDEX]}`;
if (modelRequest.getFullListUpdateCollectionRelations().includes(collectionRelation)) {
fullListUpdateCollections[modelRequest.getForeignCollectionByRelation(collectionRelation)] =
autoupdateData[key];
} else if (modelRequest.getExclusiveListUpdateCollectionRelations().includes(collectionRelation)) {
exclusiveListUpdateCollections[modelRequest.getForeignCollectionByRelation(collectionRelation)] = {
ids: autoupdateData[key],
parentCollection: data[COLLECTION_INDEX],
parentField: data[FIELD_INDEX],
parentId: +data[ID_INDEX]
};
for (const id of requestIds) {
const { modelRequest } = this._activeRequestObjects[id];
if (modelRequest) {
for (const key of Object.keys(autoupdateData)) {
const data = key.split(`/`);
const collectionRelation = `${data[COLLECTION_INDEX]}/${data[FIELD_INDEX]}`;
if (modelRequest.getFullListUpdateCollectionRelations().includes(collectionRelation)) {
fullListUpdateCollections[modelRequest.getForeignCollectionByRelation(collectionRelation)] =
autoupdateData[key];
} else if (modelRequest.getExclusiveListUpdateCollectionRelations().includes(collectionRelation)) {
exclusiveListUpdateCollections[
modelRequest.getForeignCollectionByRelation(collectionRelation)
] = {
ids: autoupdateData[key],
parentCollection: data[COLLECTION_INDEX],
parentField: data[FIELD_INDEX],
parentId: +data[ID_INDEX]
};
}
}
}
}

await this.prepareCollectionUpdates(modelData, fullListUpdateCollections, exclusiveListUpdateCollections, id);
await this.prepareCollectionUpdates(
modelData,
fullListUpdateCollections,
exclusiveListUpdateCollections,
requestIds
);
}

private async prepareCollectionUpdates(
Expand All @@ -315,7 +307,7 @@ export class AutoupdateService {
exclusiveListUpdateCollections: {
[collection: string]: { ids: Ids; parentCollection: Collection; parentField: string; parentId: Id };
},
requestId: number
requestIds: number[]
): Promise<void> {
const unlock = await this._mutex.lock();

Expand All @@ -327,11 +319,13 @@ export class AutoupdateService {
deletedModels: {}
})
.then(deletedModels => {
this.communication.cleanupCollections(requestId, deletedModels);
for (const requestId of requestIds) {
this.communication.cleanupCollections(requestId, deletedModels);

if (this._resolveDataReceived[requestId]) {
this._resolveDataReceived[requestId](modelData);
delete this._resolveDataReceived[requestId];
if (this._resolveDataReceived[requestId]) {
this._resolveDataReceived[requestId](modelData);
delete this._resolveDataReceived[requestId];
}
}
});

Expand Down
29 changes: 25 additions & 4 deletions client/src/app/worker/autoupdate/autoupdate-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as fzstd from 'fzstd';
import { HttpStream } from '../http/http-stream';
import { ErrorDescription, ErrorType } from '../http/stream-utils';
import { AutoupdateSubscription } from './autoupdate-subscription';
import { AutoupdateSetEndpointParams } from './interfaces-autoupdate';
import { AutoupdateReceiveData, AutoupdateSetEndpointParams } from './interfaces-autoupdate';

export class AutoupdateStream extends HttpStream {
private activeSubscriptions: AutoupdateSubscription[] = null;
Expand Down Expand Up @@ -179,10 +179,31 @@ export class AutoupdateStream extends HttpStream {
}

private sendToSubscriptions(data: any): void {
const portMap = new Map<MessagePort, Set<AutoupdateSubscription>>();
for (const subscription of this.subscriptions) {
// TODO: It might be possible to only send data to
// the subscriptions that actually need it
subscription.updateData(data);
for (const port of subscription.ports) {
if (portMap.has(port)) {
portMap.get(port).add(subscription);
} else {
portMap.set(port, new Set([subscription]));
}
}
}

for (const port of portMap.keys()) {
const streamIdDescriptions = {};
for (const sub of portMap.get(port).values()) {
streamIdDescriptions[sub.id] = sub.description;
}

port.postMessage({
sender: `autoupdate`,
action: `receive-data`,
content: {
streamIdDescriptions,
data: data
}
} as AutoupdateReceiveData);
}
}

Expand Down
19 changes: 6 additions & 13 deletions client/src/app/worker/autoupdate/autoupdate-subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,13 @@ export class AutoupdateSubscription {
* @param data The data to be processed
*/
public updateData(data: any): void {
const dataId = Date.now();
for (const port of this.ports) {
port.postMessage({
sender: `autoupdate`,
action: `receive-data`,
content: {
streamId: this.id,
dataId,
data: data,
description: this.description
streamIdDescriptions: { [this.id]: this.description },
data: data
}
} as AutoupdateReceiveData);
}
Expand All @@ -77,9 +74,8 @@ export class AutoupdateSubscription {
sender: `autoupdate`,
action: `receive-error`,
content: {
streamId: this.id,
data: data,
description: this.description
streamIdDescriptions: { [this.id]: this.description },
data: data
}
} as AutoupdateReceiveError);
}
Expand Down Expand Up @@ -124,16 +120,13 @@ export class AutoupdateSubscription {
* @param port The MessagePort the data should be send to
*/
public resendTo(port: MessagePort): void {
const dataId = Date.now();
if (this.stream && this.stream.currentData !== null) {
port.postMessage({
sender: `autoupdate`,
action: `receive-data`,
content: {
streamId: this.id,
dataId,
data: this.stream.currentData,
description: this.description
streamIdDescriptions: { [this.id]: this.description },
data: this.stream.currentData
}
} as AutoupdateReceiveData);
}
Expand Down
4 changes: 1 addition & 3 deletions client/src/app/worker/autoupdate/interfaces-autoupdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,8 @@ export interface AutoupdateSetStreamId extends AutoupdateWorkerResponse {
}

export interface AutoupdateReceiveDataContent {
streamId: Id;
dataId: Id;
streamIdDescriptions: { [id: Id]: string };
data: any;
description: string;
}

export interface AutoupdateReceiveData extends AutoupdateWorkerResponse {
Expand Down