Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#167064659] Optimistic lock on update user metadata #577

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"@pagopa/io-spid-commons": "1.1.0",
"apicache": "^1.4.0",
"applicationinsights": "^1.4.2",
"async-sema": "^3.1.0",
"azure-sb": "^0.10.6",
"body-parser": "^1.18.3",
"date-fns": "^1.30.1",
Expand Down
92 changes: 86 additions & 6 deletions src/services/__tests__/redisUserMetadataStorage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { UserMetadata } from "../../../generated/backend/UserMetadata";
import { SessionToken, WalletToken } from "../../types/token";
import { User } from "../../types/user";
import RedisUserMetadataStorage, {
concurrentWriteRejectionError,
invalidVersionNumberError,
metadataNotFoundError
} from "../redisUserMetadataStorage";
Expand Down Expand Up @@ -40,9 +41,27 @@ const validNewVersion = 11;

const mockSet = jest.fn();
const mockGet = jest.fn();
const mockWatch = jest.fn();
const mockMulti = jest.fn();
const mockRedisClient = createMockRedis().createClient();
mockRedisClient.set = mockSet;
mockRedisClient.get = mockGet;
mockRedisClient.watch = mockWatch;
mockRedisClient.multi = mockMulti;
const mockDuplicate = jest.fn().mockImplementation(() => mockRedisClient);
mockRedisClient.duplicate = mockDuplicate;

mockMulti.mockImplementation(() => {
return {
set: mockSet
};
});
const mockExec = jest.fn();
mockSet.mockImplementation((_, __) => {
return {
exec: mockExec
};
});

const userMetadataStorage = new RedisUserMetadataStorage(mockRedisClient);
const redisClientError = new Error("REDIS CLIENT ERROR");
Expand Down Expand Up @@ -102,17 +121,18 @@ describe("RedisUserMetadataStorage#get", () => {
);
});

describe("RedisUserMetadataStorage#get", () => {
describe("RedisUserMetadataStorage#set", () => {
beforeEach(() => {
jest.clearAllMocks();
});

it("should update user metadata", async () => {
mockWatch.mockImplementation((_, callback) => callback(null));
mockGet.mockImplementation((_, callback) => {
callback(undefined, JSON.stringify(aValidUserMetadata));
});
mockSet.mockImplementation((_, __, callback) => {
callback(undefined, "OK");
mockExec.mockImplementation(callback => {
callback(undefined, ["OK"]);
});
const newMetadata: UserMetadata = {
metadata,
Expand All @@ -126,11 +146,12 @@ describe("RedisUserMetadataStorage#get", () => {
});

it("should set user metadata if don't exists", async () => {
mockWatch.mockImplementation((_, callback) => callback(null));
mockGet.mockImplementation((_, callback) => {
callback(undefined, null);
});
mockSet.mockImplementation((_, __, callback) => {
callback(undefined, "OK");
mockExec.mockImplementation(callback => {
callback(undefined, ["OK"]);
});
const newMetadata: UserMetadata = {
metadata,
Expand Down Expand Up @@ -172,10 +193,11 @@ describe("RedisUserMetadataStorage#get", () => {
});

it("should fail update user metadata if redis client error occours on set", async () => {
mockWatch.mockImplementation((_, callback) => callback(null));
mockGet.mockImplementation((_, callback) => {
callback(undefined, JSON.stringify(aValidUserMetadata));
});
mockSet.mockImplementation((_, __, callback) => {
mockExec.mockImplementation(callback => {
callback(redisClientError, undefined);
});
const newMetadata: UserMetadata = {
Expand All @@ -188,4 +210,62 @@ describe("RedisUserMetadataStorage#get", () => {
expect(mockSet.mock.calls[0][1]).toBe(JSON.stringify(newMetadata));
expect(response).toEqual(left(redisClientError));
});

it("should fail update user metadata if happens a concurrent write operation", async () => {
mockWatch.mockImplementation((_, callback) => callback(null));
mockGet.mockImplementation((_, callback) => {
callback(undefined, JSON.stringify(aValidUserMetadata));
});
mockExec.mockImplementation(callback => {
callback(undefined, null);
});
const newMetadata: UserMetadata = {
metadata,
version: validNewVersion
};
const response = await userMetadataStorage.set(aValidUser, newMetadata);
expect(mockGet.mock.calls[0][0]).toBe(`USERMETA-${aValidUser.fiscal_code}`);
expect(mockSet.mock.calls[0][0]).toBe(`USERMETA-${aValidUser.fiscal_code}`);
expect(mockSet.mock.calls[0][1]).toBe(JSON.stringify(newMetadata));
expect(response).toEqual(left(concurrentWriteRejectionError));
});

it("should fail update user metadata if optimistic lock can't be initialized", async () => {
const expectedWatchError = new Error("Error on redis watch method");
mockWatch.mockImplementation((_, callback) => callback(expectedWatchError));
const newMetadata: UserMetadata = {
metadata,
version: validNewVersion
};
const response = await userMetadataStorage.set(aValidUser, newMetadata);
expect(mockGet).not.toBeCalled();
expect(mockSet).not.toBeCalled();
expect(response).toEqual(left(expectedWatchError));
});

it("should duplicate the redis client if a race condition happens on the same key", async () => {
mockWatch.mockImplementation((_, callback) => callback(null));
mockGet.mockImplementation((_, callback) => {
callback(undefined, JSON.stringify(aValidUserMetadata));
});
mockExec.mockImplementationOnce(callback => {
callback(undefined, ["OK"]);
});
mockExec.mockImplementationOnce(callback => {
callback(undefined, null);
});
const newMetadata: UserMetadata = {
metadata,
version: validNewVersion
};
const response = await Promise.all([
userMetadataStorage.set(aValidUser, newMetadata),
userMetadataStorage.set(aValidUser, newMetadata)
]);
expect(mockDuplicate).toBeCalledTimes(1);
expect(response).toEqual([
right(true),
left(concurrentWriteRejectionError)
]);
});
});
116 changes: 89 additions & 27 deletions src/services/redisUserMetadataStorage.ts
Original file line number Diff line number Diff line change
@@ -1,60 +1,122 @@
import * as redis from "redis";

import { Either, isLeft, isRight, left, right } from "fp-ts/lib/Either";
import { Either, isLeft, left, right, toError } from "fp-ts/lib/Either";
import { ReadableReporter } from "italia-ts-commons/lib/reporters";
import { UserMetadata } from "../../generated/backend/UserMetadata";
import { User } from "../types/user";
import { log } from "../utils/logger";
import { IUserMetadataStorage } from "./IUserMetadataStorage";
import RedisStorageUtils from "./redisStorageUtils";

import { Sema } from "async-sema";
import {
fromEither,
fromPredicate,
taskify,
tryCatch
} from "fp-ts/lib/TaskEither";

const userMetadataPrefix = "USERMETA-";
export const metadataNotFoundError = new Error("User Metadata not found");
export const invalidVersionNumberError = new Error("Invalid version number");
export const concurrentWriteRejectionError = new Error(
"Concurrent write operation"
);

/**
* Service that manages user metadata stored into Redis database.
*/
export default class RedisUserMetadataStorage extends RedisStorageUtils
implements IUserMetadataStorage {
private activeClients: Set<string> = new Set();
private mutex: Sema = new Sema(1);
constructor(private readonly redisClient: redis.RedisClient) {
super();
}

/**
* {@inheritDoc}
*
* This method doesn't support atomic operations on concurrency scenario.
* Story https://www.pivotaltracker.com/story/show/167064659
* This method uses Optimistic Lock to prevent race condition
* during write operations of user metadata
* @see https://github.com/NodeRedis/node_redis#optimistic-locks
*/
public async set(
user: User,
payload: UserMetadata
): Promise<Either<Error, boolean>> {
const getUserMetadataResult = await this.loadUserMetadataByFiscalCode(
user.fiscal_code
);
if (
isRight(getUserMetadataResult) &&
getUserMetadataResult.value.version !== payload.version - 1
) {
return left(invalidVersionNumberError);
}
if (
isLeft(getUserMetadataResult) &&
getUserMetadataResult.value !== metadataNotFoundError
) {
return left(getUserMetadataResult.value);
// In order to work properly, optimistic lock needs to be initialized on different
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where did you find this info ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested locally with the following test and the output was unexpected:

it.only("Test Optimistic Lock", async () => {
    const REDIS_CLIENT = createSimpleRedisClient("redis://localhost");
    const ums = new RedisUserMetadataStorage(REDIS_CLIENT);
    const meta = {
      metadata,
      version: 8
    };
    const metaR = await Promise.all([
      ums.set(aValidUser, meta),
      ums.set(aValidUser, meta),
      ums.set(aValidUser, meta)
    ]);
    console.log(metaR); // [ right(true), right(true), right(true) ]
});

So, I've read the example (second code snippet) here with the keyword another client into the description and with the modification (const duplicatedRedisClient = this.redisClient.duplicate();) the result was as expected [ left(Error: Concurrent write operation),right(true),left(Error: Concurrent write operation) ]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kudos, that's a good test

Copy link
Contributor

@gunzip gunzip Jan 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test is good, but as far as I understand (and the test seems to confirm it) the problem of concurrent updates happens only when you have mutiple redis clients trying to update the same resource (ie. different instances of the backend nodejs process / pod). so while it's right to use multiple clients inside the test, it is not right to duplicate the client inside the backend code (you just have to call watch before multi). this states even if it is the same client that tries to update the same resource (watch + multi "locks" the resource anyway, you don't have to duplicate the client).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gunzip I think that watch + multi scenario doesn't work on the same redis client instance, so if we don't duplicate the client when we start an Optimistic Lock on a key and a race condition happens on the same process/pod (it can occur) then OL doesn't work.

Copy link
Contributor

@gunzip gunzip Jan 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to recap: set() fails in case the value (in redis) that matches the key ${userMetadataPrefix}${user.fiscal_code} is changed after the call to watch() and before the call to multi() [1]. This means that another call to set() starts while the first call is till executing. This is the test code:

it.only("Test Optimistic Lock", async () => {
    const REDIS_CLIENT = createSimpleRedisClient("redis://localhost");
    const ums = new RedisUserMetadataStorage(REDIS_CLIENT);
    const meta = {
      metadata,
      version: 8
    };
    const metaR = await Promise.all([
      ums.set(aValidUser, meta),
      ums.set(aValidUser, meta),
      ums.set(aValidUser, meta)
    ]);
    console.log(metaR); // [ right(true), right(true), right(true) ]
});

Sorry if I'm nitpicking here, but I wonder, how can you tell from this code if the three calls to set() are interleaved in such a way that [1] is satisfied ? I mean, the test may returns
[ right(true), right(true), right(true) ] not because it cannot catch the race condition but because the race condition does not happen at all (or I'm missing something ?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I understand the point. To be sure of that I'll add a timeout before the multi phase, so will be certain that all tree calls happen simultaneously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the updated code:

public async set(
    user: User,
    payload: UserMetadata
  ): Promise<Either<Error, boolean>> {
    // In order to work properly, optimistic lock needs to be initialized on different
    // redis client instances
    // const duplicatedRedisClient = this.redisClient.duplicate();
    return await new Promise<Either<Error, boolean>>(resolve => {
      this.redisClient.watch(
        `${userMetadataPrefix}${user.fiscal_code}`,
        err => {
          if (err) {
            return resolve(left(err));
          }
          this.redisClient.get(
            `${userMetadataPrefix}${user.fiscal_code}`,
            (err1, response) => {
              if (err1 || response === null) {
                resolve(left(err1 || metadataNotFoundError));
              } else {
                setTimeout(() => {
                  this.redisClient
                    .multi()
                    .set(
                      `${userMetadataPrefix}${user.fiscal_code}`,
                      JSON.stringify(payload)
                    )
                    .exec((err2, results) => {
                      if (err2) {
                        return resolve(left(err2));
                      }
                      if (results === null) {
                        return resolve(left(concurrentWriteRejectionError));
                      }
                      resolve(this.singleStringReply(err, results[0]));
                    });
                }, 1000);
              }
            }
          );
        }
      );
    });
  }

As before, the result was [ right(true), right(true), right(true) ] 😄

Copy link
Contributor

@gunzip gunzip Jan 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-DELETED-

Ok lets proceed and merge this PR (I'm still not convinced that there could be issues aith a single client but I'm doing some test myself : )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've a found a comment that confirms what @BurnedMarshal says: redis/node-redis#1320 (comment)

We need one client for every set() call indeed which is sub-optimal since it adds some overhead (as stated in the reply).

// redis client instances @see https://github.com/NodeRedis/node_redis/issues/1320#issuecomment-373200541
await this.mutex.acquire();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to extract this logic into a kind of client pool class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple of suggestions on async-sema:

  1. review the defaults, are they good for us?
  2. use tryAcquire in place of acquire to avoid accumulating "concurrent threads", if the mutex can't be acquired, fail with an HTTP 429

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to extract this logic into a kind of client pool class

can you please elaborate a little bit more ? do you mean to create (at startup) a pool of N redisclients to reuse and change the mutex(1) to mutex(N) ?

do you think we can merge this PR as-is and implement the pooling in another PR ? (after the beta release date)

const hasActiveClient = this.activeClients.has(user.fiscal_code);
// tslint:disable-next-line: no-let
let duplicatedOrOriginalRedisClient = this.redisClient;
if (hasActiveClient === false) {
this.activeClients.add(user.fiscal_code);
} else {
// A duplicated redis client must be created only if the main client is already
// in use for another optimistic lock update on the same key to prevent performance drop
duplicatedOrOriginalRedisClient = this.redisClient.duplicate();
}
return await new Promise<Either<Error, boolean>>(resolve => {
// Set key to hold the string value. If key already holds a value, it is overwritten, regardless of its type.
// @see https://redis.io/commands/set
this.redisClient.set(
`${userMetadataPrefix}${user.fiscal_code}`,
JSON.stringify(payload),
(err, response) => resolve(this.singleStringReply(err, response))
);
});
this.mutex.release();
const errorOrIsUpdateSuccessful = await taskify(
(key: string, callback: (err: Error | null, value: true) => void) => {
duplicatedOrOriginalRedisClient.watch(key, err => callback(err, true));
}
)(`${userMetadataPrefix}${user.fiscal_code}`)
.chain(() =>
tryCatch(
() => this.loadUserMetadataByFiscalCode(user.fiscal_code),
toError
)
)
.chain(_ => {
if (isLeft(_) && _.value === metadataNotFoundError) {
return fromEither(
right({
metadata: "",
version: 0
})
);
}
return fromEither(_);
})
.chain(
fromPredicate(
_ => _.version === payload.version - 1,
_ => invalidVersionNumberError
)
)
.chain(() =>
taskify(
(
key: string,
data: string,
callback: (
err: Error | null,
value?: Either<Error, boolean>
) => void
) => {
duplicatedOrOriginalRedisClient
.multi()
.set(key, data)
.exec((err, results) => {
if (err) {
return callback(err);
}
if (results === null) {
return callback(concurrentWriteRejectionError);
}
callback(null, this.singleStringReply(err, results[0]));
});
}
)(`${userMetadataPrefix}${user.fiscal_code}`, JSON.stringify(payload))
)
.chain(fromEither)
.run();
hasActiveClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should call a kind of dispose method in the aforementioned client pool

? duplicatedOrOriginalRedisClient.end(true)
: this.activeClients.delete(user.fiscal_code);
return errorOrIsUpdateSuccessful;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,11 @@ async-listener@^0.6.0:
semver "^5.3.0"
shimmer "^1.1.0"

async-sema@^3.1.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/async-sema/-/async-sema-3.1.0.tgz#3a813beb261e4cc58b19213916a48e931e21d21e"
integrity sha512-+JpRq3r0zjpRLDruS6q/nC4V5tzsaiu07521677Mdi5i+AkaU/aNJH38rYHJVQ4zvz+SSkjgc8FUI7qIZrR+3g==

async-retry@1.2.3:
version "1.2.3"
resolved "https://registry.yarnpkg.com/async-retry/-/async-retry-1.2.3.tgz#a6521f338358d322b1a0012b79030c6f411d1ce0"
Expand Down