Skip to content

Commit

Permalink
A look at PolicyList.update (#454)
Browse files Browse the repository at this point in the history
This started out as just a way to find out why mjolnir was syncing with lists several times for each update to a policy list.

The main changes are

- Verbosity was irrelevant to the sync command but for some reason was an option.
  Unfortunately all this did was suppress whether to tell you when it had finished, meaning it wouldn't
  when verbose logging was disabled. Historically this was probably a parameter that got passed through
  to applyServerAcl/applyUserBans, which can be horribly verbose, but they access the config directly.

- Stop emitting `'PolicyList.update'` when there are no changes.
- Include a revision ID for the `'PolicyList.update'`method and event.
- Use the revision ID in the `ProtectedRoomsSet` so that we don't unnecessarily resynchronize all rooms when the `'PolicyList.update'` event is received. Though not when the `sync` command is used. Since this is supposed to `sync` in the case when there is a state reset or otherwise or the user has changed some room settings.
- insert an await lock around the `PolicyList.update` method to avoid a race condition where a call can be started and finished within the extent of an existing call (via another task, this can happen if the server is slow with handling one request). `PolicyList.udpate` now has a helper that is synchronous to be called directly after requesting the room state. The reason for this is to enforce that no one `await`s while updating the policy list's cache of rules. Which is important because it is one of the biggest methods that I tolerate and visually checking for `await` is impossible.
- The revision ID uses a ULID, but this is unnecessary and could have just been a "dumb counter".

closes #447
  • Loading branch information
Gnuxie authored Dec 8, 2022
1 parent 1d3da94 commit 433ff7e
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 69 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"parse-duration": "^1.0.2",
"pg": "^8.8.0",
"shell-quote": "^1.7.3",
"ulidx": "^0.3.0",
"yaml": "^2.1.1"
},
"engines": {
Expand Down
6 changes: 3 additions & 3 deletions src/Mjolnir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ export class Mjolnir {

if (this.config.verifyPermissionsOnStartup) {
await this.managementRoomOutput.logMessage(LogLevel.INFO, "Mjolnir@startup", "Checking permissions...");
await this.protectedRoomsTracker.verifyPermissions(this.config.verboseLogging);
await this.protectedRoomsTracker.verifyPermissions();
}

// Start the bot.
Expand All @@ -311,7 +311,7 @@ export class Mjolnir {
this.currentState = STATE_SYNCING;
if (this.config.syncOnStartup) {
await this.managementRoomOutput.logMessage(LogLevel.INFO, "Mjolnir@startup", "Syncing lists...");
await this.protectedRoomsTracker.syncLists(this.config.verboseLogging);
await this.protectedRoomsTracker.syncLists();
}

this.currentState = STATE_RUNNING;
Expand Down Expand Up @@ -426,7 +426,7 @@ export class Mjolnir {
}

if (withSync) {
await this.protectedRoomsTracker.syncLists(this.config.verboseLogging);
await this.protectedRoomsTracker.syncLists();
}
}

Expand Down
64 changes: 33 additions & 31 deletions src/ProtectedRoomsSet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import ManagementRoomOutput from "./ManagementRoomOutput";
import { MatrixSendClient } from "./MatrixEmitter";
import AccessControlUnit, { Access } from "./models/AccessControlUnit";
import { RULE_ROOM, RULE_SERVER, RULE_USER } from "./models/ListRule";
import PolicyList, { ListRuleChange } from "./models/PolicyList";
import PolicyList, { ListRuleChange, Revision } from "./models/PolicyList";
import { RoomUpdateError } from "./models/RoomUpdateError";
import { ProtectionManager } from "./protections/ProtectionManager";
import { EventRedactionQueue, RedactUserInRoom } from "./queues/EventRedactionQueue";
Expand Down Expand Up @@ -92,7 +92,12 @@ export class ProtectedRoomsSet {
* Intended to be `this.syncWithUpdatedPolicyList` so we can add it in `this.watchList` and remove it in `this.unwatchList`.
* Otherwise we would risk being informed about lists we no longer watch.
*/
private readonly listUpdateListener: (list: PolicyList, changes: ListRuleChange[]) => void;
private readonly listUpdateListener: (list: PolicyList, changes: ListRuleChange[], revision: Revision) => void;

/**
* The revision of a each watched list that we have applied to protected rooms.
*/
private readonly listRevisions = new Map<PolicyList, /** The last revision we used to sync protected rooms. */ Revision>();

constructor(
private readonly client: MatrixSendClient,
Expand Down Expand Up @@ -210,15 +215,9 @@ export class ProtectedRoomsSet {
}

/**
* Sync all the rooms with all the watched lists, banning and applying any changed ACLS.
* @param verbose Whether to report any errors to the management room.
* Synchronize all the protected rooms with all of the policies described in the watched policy lists.
*/
public async syncLists(verbose = true) {
for (const list of this.policyLists) {
const changes = await list.updateList();
await this.printBanlistChanges(changes, list);
}

private async syncRoomsWithPolicies() {
let hadErrors = false;
const [aclErrors, banErrors] = await Promise.all([
this.applyServerAcls(this.policyLists, this.protectedRoomsByActivity()),
Expand All @@ -229,7 +228,7 @@ export class ProtectedRoomsSet {
hadErrors = hadErrors || await this.printActionResult(banErrors, "Errors updating member bans:");
hadErrors = hadErrors || await this.printActionResult(redactionErrors, "Error updating redactions:");

if (!hadErrors && verbose) {
if (!hadErrors) {
const html = `<font color="#00cc00">Done updating rooms - no errors</font>`;
const text = "Done updating rooms - no errors";
await this.client.sendMessage(this.managementRoomId, {
Expand All @@ -241,6 +240,22 @@ export class ProtectedRoomsSet {
}
}

/**
* Update each watched list and then synchronize all the protected rooms with all the policies described in the watched lists,
* banning and applying any changed ACLS via `syncRoomsWithPolicies`.
*/
public async syncLists() {
for (const list of this.policyLists) {
const { revision } = await list.updateList();
const previousRevision = this.listRevisions.get(list);
if (previousRevision === undefined || revision.supersedes(previousRevision)) {
this.listRevisions.set(list, revision);
// we rely on `this.listUpdateListener` to print the changes to the list.
}
}
await this.syncRoomsWithPolicies();
}

public addProtectedRoom(roomId: string): void {
if (this.protectedRooms.has(roomId)) {
// we need to protect ourselves form syncing all the lists unnecessarily
Expand All @@ -263,28 +278,15 @@ export class ProtectedRoomsSet {
* @param policyList The `PolicyList` which we will check for changes and apply them to all protected rooms.
* @returns When all of the protected rooms have been updated.
*/
private async syncWithUpdatedPolicyList(policyList: PolicyList, changes: ListRuleChange[]): Promise<void> {
let hadErrors = false;
const [aclErrors, banErrors] = await Promise.all([
this.applyServerAcls(this.policyLists, this.protectedRoomsByActivity()),
this.applyUserBans(this.protectedRoomsByActivity())
]);
const redactionErrors = await this.processRedactionQueue();
hadErrors = hadErrors || await this.printActionResult(aclErrors, "Errors updating server ACLs:");
hadErrors = hadErrors || await this.printActionResult(banErrors, "Errors updating member bans:");
hadErrors = hadErrors || await this.printActionResult(redactionErrors, "Error updating redactions:");

if (!hadErrors) {
const html = `<font color="#00cc00"><b>Done updating rooms - no errors</b></font>`;
const text = "Done updating rooms - no errors";
await this.client.sendMessage(this.managementRoomId, {
msgtype: "m.notice",
body: text,
format: "org.matrix.custom.html",
formatted_body: html,
});
private async syncWithUpdatedPolicyList(policyList: PolicyList, changes: ListRuleChange[], revision: Revision): Promise<void> {
// avoid resyncing the rooms if we have already done so for the latest revision of this list.
const previousRevision = this.listRevisions.get(policyList);
if (previousRevision === undefined || revision.supersedes(previousRevision)) {
this.listRevisions.set(policyList, revision);
await this.syncRoomsWithPolicies();
}
// This can fail if the change is very large and it is much less important than applying bans, so do it last.
// We always print changes because we make this listener responsible for doing it.
await this.printBanlistChanges(changes, policyList);
}

Expand Down
2 changes: 1 addition & 1 deletion src/commands/SyncCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ import { Mjolnir } from "../Mjolnir";

// !mjolnir sync
export async function execSyncCommand(roomId: string, event: any, mjolnir: Mjolnir) {
return mjolnir.protectedRoomsTracker.syncLists(mjolnir.config.verboseLogging);
return mjolnir.protectedRoomsTracker.syncLists();
}
2 changes: 1 addition & 1 deletion src/commands/UnbanBanCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export async function execUnbanCommand(roomId: string, event: any, mjolnir: Mjol

if (unbannedSomeone) {
await mjolnir.managementRoomOutput.logMessage(LogLevel.DEBUG, "UnbanBanCommand", `Syncing lists to ensure no users were accidentally unbanned`);
await mjolnir.protectedRoomsTracker.syncLists(mjolnir.config.verboseLogging);
await mjolnir.protectedRoomsTracker.syncLists();
}
};

Expand Down
77 changes: 70 additions & 7 deletions src/models/PolicyList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import { extractRequestError, LogService, RoomCreateOptions, UserID } from "matr
import { EventEmitter } from "events";
import { ALL_RULE_TYPES, EntityType, ListRule, Recommendation, ROOM_RULE_TYPES, RULE_ROOM, RULE_SERVER, RULE_USER, SERVER_RULE_TYPES, USER_RULE_TYPES } from "./ListRule";
import { MatrixSendClient } from "../MatrixEmitter";
import AwaitLock from "await-lock";
import { monotonicFactory } from "ulidx";

export const SHORTCODE_EVENT_TYPE = "org.matrix.mjolnir.shortcode";

Expand Down Expand Up @@ -54,8 +56,8 @@ export interface ListRuleChange {

declare interface PolicyList {
// PolicyList.update is emitted when the PolicyList has pulled new rules from Matrix and informs listeners of any changes.
on(event: 'PolicyList.update', listener: (list: PolicyList, changes: ListRuleChange[]) => void): this
emit(event: 'PolicyList.update', list: PolicyList, changes: ListRuleChange[]): boolean
on(event: 'PolicyList.update', listener: (list: PolicyList, changes: ListRuleChange[], revision: Revision) => void): this
emit(event: 'PolicyList.update', list: PolicyList, changes: ListRuleChange[], revision: Revision): boolean
}

/**
Expand Down Expand Up @@ -96,6 +98,18 @@ class PolicyList extends EventEmitter {
*/
private static readonly EVENT_RULE_ANNOTATION_KEY = 'org.matrix.mjolnir.annotation.rule';

/**
* An ID that represents the current version of the list state.
* Each time we use `updateList` we create a new revision to represent the change of state.
* Listeners can then use the revision to work out whether they have already applied
* the latest revision.
*/
private revisionId = new Revision();

/**
* A lock to protect `updateList` from a situation where one call to `getRoomState` can start and end before another.
*/
private readonly updateListLock = new AwaitLock();
/**
* Construct a PolicyList, does not synchronize with the room.
* @param roomId The id of the policy room, i.e. a room containing MSC2313 policies.
Expand Down Expand Up @@ -346,10 +360,24 @@ class PolicyList extends EventEmitter {
* and updating the model to reflect the room.
* @returns A description of any rules that were added, modified or removed from the list as a result of this update.
*/
public async updateList(): Promise<ListRuleChange[]> {
let changes: ListRuleChange[] = [];
public async updateList(): Promise<ReturnType<PolicyList["updateListWithState"]>> {
await this.updateListLock.acquireAsync();
try {
const state = await this.client.getRoomState(this.roomId);
return this.updateListWithState(state);
} finally {
this.updateListLock.release();
}
}

const state = await this.client.getRoomState(this.roomId);
/**
* Same as `updateList` but without async to make sure that no one uses await within the body.
* The reason no one should use await is to avoid a horrible race should `updateList` be called more than once.
* @param state Room state to update the list with, provided by `updateList`
* @returns Any changes that have been made to the PolicyList.
*/
private updateListWithState(state: any): { revision: Revision, changes: ListRuleChange[] } {
const changes: ListRuleChange[] = [];
for (const event of state) {
if (event['state_key'] === '' && event['type'] === SHORTCODE_EVENT_TYPE) {
this.shortcode = (event['content'] || {})['shortcode'] || null;
Expand Down Expand Up @@ -445,15 +473,18 @@ class PolicyList extends EventEmitter {
changes.push({ rule, changeType, event, sender: event.sender, ...previousState ? { previousState } : {} });
}
}
this.emit('PolicyList.update', this, changes);
if (changes.length > 0) {
this.revisionId = new Revision();
this.emit('PolicyList.update', this, changes, this.revisionId);
}
if (this.batchedEvents.keys.length !== 0) {
// The only reason why this isn't a TypeError is because we need to know about this when it happens, because it means
// we're probably doing something wrong, on the other hand, if someone messes with a server implementation and
// strange things happen where events appear in /sync sooner than they do in /state (which would be outrageous)
// we don't want Mjolnir to stop working properly. Though, I am not confident a burried warning is going to alert us.
LogService.warn("PolicyList", "The policy list is being informed about events that it cannot find in the room state, this is really bad and you should seek help.");
}
return changes;
return { revision: this.revisionId, changes };
}

/**
Expand Down Expand Up @@ -533,3 +564,35 @@ class UpdateBatcher {
this.checkBatch(eventId);
}
}

/**
* Represents a specific version of the state contained in `PolicyList`.
* These are unique and can be compared with `supersedes`.
* We use a ULID to work out whether a revision supersedes another.
*/
export class Revision {

/**
* Ensures that ULIDs are monotonic.
*/
private static makeULID = monotonicFactory();

/**
* Is only public for the comparison method,
* I feel like I'm missing something here and it is possible without
*/
public readonly ulid = Revision.makeULID();

constructor() {
// nothing to do.
}

/**
* Check whether this revision supersedes another revision.
* @param revision The revision we want to check this supersedes.
* @returns True if this Revision supersedes the other revision.
*/
public supersedes(revision: Revision): boolean {
return this.ulid > revision.ulid;
}
}
Loading

0 comments on commit 433ff7e

Please sign in to comment.