Skip to content

Commit

Permalink
[#167064659] Use functional programming into RedisUserMetadataStorage…
Browse files Browse the repository at this point in the history
….set
  • Loading branch information
BurnedMarshal committed Jan 29, 2020
1 parent 8326e46 commit cc102ae
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 68 deletions.
31 changes: 29 additions & 2 deletions src/services/__tests__/redisUserMetadataStorage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ mockRedisClient.set = mockSet;
mockRedisClient.get = mockGet;
mockRedisClient.watch = mockWatch;
mockRedisClient.multi = mockMulti;
mockRedisClient.duplicate = () => mockRedisClient;
const mockDuplicate = jest.fn().mockImplementation(() => mockRedisClient);
mockRedisClient.duplicate = mockDuplicate;

mockMulti.mockImplementation(() => {
return {
Expand Down Expand Up @@ -120,7 +121,7 @@ describe("RedisUserMetadataStorage#get", () => {
);
});

describe("RedisUserMetadataStorage#get", () => {
describe("RedisUserMetadataStorage#set", () => {
beforeEach(() => {
jest.clearAllMocks();
});
Expand Down Expand Up @@ -241,4 +242,30 @@ describe("RedisUserMetadataStorage#get", () => {
expect(mockSet).not.toBeCalled();
expect(response).toEqual(left(expectedWatchError));
});

it("should duplicate the redis client if a race condition happens on the same key", async () => {
mockWatch.mockImplementation((_, callback) => callback(null));
mockGet.mockImplementation((_, callback) => {
callback(undefined, JSON.stringify(aValidUserMetadata));
});
mockExec.mockImplementationOnce(callback => {
callback(undefined, ["OK"]);
});
mockExec.mockImplementationOnce(callback => {
callback(undefined, null);
});
const newMetadata: UserMetadata = {
metadata,
version: validNewVersion
};
const response = await Promise.all([
userMetadataStorage.set(aValidUser, newMetadata),
userMetadataStorage.set(aValidUser, newMetadata)
]);
expect(mockDuplicate).toBeCalledTimes(1);
expect(response).toEqual([
right(true),
left(concurrentWriteRejectionError)
]);
});
});
136 changes: 70 additions & 66 deletions src/services/redisUserMetadataStorage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as redis from "redis";

import { Either, isLeft, isRight, left, right } from "fp-ts/lib/Either";
import { Either, isLeft, left, right, toError } from "fp-ts/lib/Either";
import { ReadableReporter } from "italia-ts-commons/lib/reporters";
import { UserMetadata } from "../../generated/backend/UserMetadata";
import { User } from "../types/user";
Expand All @@ -9,6 +9,12 @@ import { IUserMetadataStorage } from "./IUserMetadataStorage";
import RedisStorageUtils from "./redisStorageUtils";

import { Sema } from "async-sema";
import {
fromEither,
fromPredicate,
taskify,
tryCatch
} from "fp-ts/lib/TaskEither";
import { FiscalCode } from "italia-ts-commons/lib/strings";

const userMetadataPrefix = "USERMETA-";
Expand All @@ -23,7 +29,7 @@ export const concurrentWriteRejectionError = new Error(
*/
export default class RedisUserMetadataStorage extends RedisStorageUtils
implements IUserMetadataStorage {
private setOperations: Record<string, true | undefined> = {};
private setOperations: Set<string> = new Set();
private mutex: Sema = new Sema(1);
constructor(private readonly redisClient: redis.RedisClient) {
super();
Expand All @@ -35,84 +41,83 @@ export default class RedisUserMetadataStorage extends RedisStorageUtils
* during write operations of user metadata
* @see https://github.com/NodeRedis/node_redis#optimistic-locks
*/
// tslint:disable-next-line: cognitive-complexity
public async set(
user: User,
payload: UserMetadata
): Promise<Either<Error, boolean>> {
// In order to work properly, optimistic lock needs to be initialized on different
// redis client instances @see https://github.com/NodeRedis/node_redis/issues/1320#issuecomment-373200541
await this.mutex.acquire();
const raceCondition = this.setOperations[user.fiscal_code];
const raceCondition = this.setOperations.has(user.fiscal_code);
// tslint:disable-next-line: no-let
let duplicatedOrOriginalRedisClient = this.redisClient;
if (raceCondition === undefined) {
// tslint:disable-next-line: no-object-mutation
this.setOperations[user.fiscal_code] = true;
if (raceCondition === false) {
this.setOperations.add(user.fiscal_code);
} else {
// A duplicate redis client must be created only if the main client is already
// in use into an optimistic lock update on the same key
duplicatedOrOriginalRedisClient = this.redisClient.duplicate();
}
this.mutex.release();
const userMetadataWatchResult = await new Promise<Either<Error, true>>(
resolve => {
duplicatedOrOriginalRedisClient.watch(
`${userMetadataPrefix}${user.fiscal_code}`,
err => {
if (err) {
return resolve(left(err));
}
resolve(right(true));
}
);
const userMetadataWatchResult = await taskify(
(key: string, callback: (err: Error | null, value: true) => void) => {
duplicatedOrOriginalRedisClient.watch(key, err => callback(err, true));
}
);
if (isLeft(userMetadataWatchResult)) {
raceCondition
? duplicatedOrOriginalRedisClient.end(true)
: await this.resetOperation(user.fiscal_code);
return userMetadataWatchResult;
}
const getUserMetadataResult = await this.loadUserMetadataByFiscalCode(
user.fiscal_code
);
if (
isRight(getUserMetadataResult) &&
getUserMetadataResult.value.version !== payload.version - 1
) {
raceCondition
? duplicatedOrOriginalRedisClient.end(true)
: await this.resetOperation(user.fiscal_code);
return left(invalidVersionNumberError);
}
if (
isLeft(getUserMetadataResult) &&
getUserMetadataResult.value !== metadataNotFoundError
) {
raceCondition
? duplicatedOrOriginalRedisClient.end(true)
: await this.resetOperation(user.fiscal_code);
return left(getUserMetadataResult.value);
}
return await new Promise<Either<Error, boolean>>(resolve => {
duplicatedOrOriginalRedisClient
.multi()
.set(
`${userMetadataPrefix}${user.fiscal_code}`,
JSON.stringify(payload)
)(`${userMetadataPrefix}${user.fiscal_code}`)
.chain(() =>
tryCatch(
() => this.loadUserMetadataByFiscalCode(user.fiscal_code),
toError
)
.exec(async (err, results) => {
raceCondition
? duplicatedOrOriginalRedisClient.end(true)
: await this.resetOperation(user.fiscal_code);
if (err) {
return resolve(left(err));
}
if (results === null) {
return resolve(left(concurrentWriteRejectionError));
)
.chain(_ => {
if (isLeft(_) && _.value === metadataNotFoundError) {
return fromEither(
right({
metadata: "",
version: 0
})
);
}
return fromEither(_);
})
.chain(
fromPredicate(
_ => _.version === payload.version - 1,
_ => invalidVersionNumberError
)
)
.chain(() =>
taskify(
(
key: string,
data: string,
callback: (
err: Error | null,
value?: Either<Error, boolean>
) => void
) => {
duplicatedOrOriginalRedisClient
.multi()
.set(key, data)
.exec((err, results) => {
if (err) {
return callback(err);
}
if (results === null) {
return callback(concurrentWriteRejectionError);
}
callback(null, this.singleStringReply(err, results[0]));
});
}
resolve(this.singleStringReply(err, results[0]));
});
});
)(`${userMetadataPrefix}${user.fiscal_code}`, JSON.stringify(payload))
)
.chain(fromEither)
.run();
raceCondition
? duplicatedOrOriginalRedisClient.end(true)
: await this.resetOperation(user.fiscal_code);
return userMetadataWatchResult;
}

/**
Expand Down Expand Up @@ -169,8 +174,7 @@ export default class RedisUserMetadataStorage extends RedisStorageUtils

private async resetOperation(fiscalCode: FiscalCode): Promise<void> {
await this.mutex.acquire();
// tslint:disable-next-line: no-object-mutation
this.setOperations[fiscalCode] = undefined;
this.setOperations.delete(fiscalCode);
this.mutex.release();
}
}

0 comments on commit cc102ae

Please sign in to comment.