Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#167064659] Optimistic lock on update user metadata #577

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

BurnedMarshal
Copy link
Contributor

No description provided.

@digitalcitizenship
Copy link

digitalcitizenship commented Jan 23, 2020

Warnings
⚠️

Please include a description of your PR changes.

Affected stories

  • 🐞 #167064659: La chiamata API per la modifica dei metadati associati ad un utente non supporta correttamente l'accesso in concorrenza

New dependencies added: async-sema.

async-sema

Author: Olli Vanhoja

Description: Semaphore using `async` and `await`

Homepage: https://github.com/zeit/async-sema

Createdover 2 years ago
Last Updatedabout 1 month ago
LicenseMIT
Maintainers2
Releases18
Keywordssemaphore, async and await
README

async-sema

This is a semaphore implementation for use with async and await. The
implementation follows the traditional definition of a semaphore rather than the
definition of an asynchronous semaphore seen in some js community examples.
Where as the latter one generally allows every defined task to proceed
immediately and synchronizes at the end, async-sema allows only a selected
number of tasks to proceed at once while the rest will remain waiting.

Async-sema manages the semaphore count as a list of tokens instead of a single
variable containing the number of available resources. This enables an
interesting application of managing the actual resources with the semaphore
object itself. To make it practical the constructor for Sema includes an option
for providing an init function for the semaphore tokens. Use of a custom token
initializer is demonstrated in examples/pooling.js.

Usage

Firstly, add the package to your project's dependencies:

npm install --save async-sema

or

yarn add async-sema

Then start using it like shown in the following example. Check more
use case examples here.

Example

const { Sema } = require('async-sema');
const s = new Sema(
  4, // Allow 4 concurrent async calls
  {
    capacity: 100 // Prealloc space for 100 tokens
  }
);

async function fetchData(x) {
  await s.acquire()
  try {
    console.log(s.nrWaiting() + ' calls to fetch are waiting')
    // ... do some async stuff with x
  } finally {
    s.release();
  }
}

const data = await Promise.all(array.map(fetchData));

The package also offers a simple rate limiter utilizing the semaphore
implementation.

const { RateLimit } = require('async-sema');

async function f() {
  const lim = RateLimit(5); // rps

  for (let i = 0; i < n; i++) {
    await lim();
    // ... do something async
  }
}

API

Sema

Constructor(nr, { initFn, pauseFn, resumeFn, capacity })

Creates a semaphore object. The first argument is mandatory and the second
argument is optional.

  • nr The maximum number of callers allowed to acquire the semaphore
    concurrently.
  • initFn Function that is used to initialize the tokens used to manage
    the semaphore. The default is () => '1'.
  • pauseFn An optional fuction that is called to opportunistically request
    pausing the the incoming stream of data, instead of piling up waiting
    promises and possibly running out of memory.
    See examples/pausing.js.
  • resumeFn An optional function that is called when there is room again
    to accept new waiters on the semaphore. This function must be declared
    if a pauseFn is declared.
  • capacity Sets the size of the preallocated waiting list inside the
    semaphore. This is typically used by high performance where the developer
    can make a rough estimate of the number of concurrent users of a semaphore.

async drain()

Drains the semaphore and returns all the initialized tokens in an array.
Draining is an ideal way to ensure there are no pending async tasks, for
example before a process will terminate.

nrWaiting()

Returns the number of callers waiting on the semaphore, i.e. the number of
pending promises.

tryAcquire()

Attempt to acquire a token from the semaphore, if one is available immediately.
Otherwise, return undefined.

async acquire()

Acquire a token from the semaphore, thus decrement the number of available
execution slots. If initFn is not used then the return value of the function
can be discarded.

release(token)

Release the semaphore, thus increment the number of free execution slots. If
initFn is used then the token returned by acquire() should be given as
an argument when calling this function.

RateLimit(rps, { timeUnit, uniformDistribution })

Creates a rate limiter function that blocks with a promise whenever the rate
limit is hit and resolves the promise once the call rate is within the limit
set by rps. The second argument is optional.

The timeUnit is an optional argument setting the width of the rate limiting
window in milliseconds. The default timeUnit is 1000 ms, therefore making
the rps argument act as requests per second limit.

The uniformDistribution argument enforces a discrete uniform distribution over
time, instead of the default that allows hitting the function rps time and
then pausing for timeWindow milliseconds. Setting the uniformDistribution
option is mainly useful in a situation where the flow of rate limit function
calls is continuous and and occuring faster than timeUnit (e.g. reading a
file) and not enabling it would cause the maximum number of calls to resolve
immediately (thus exhaust the limit immediately) and therefore the next bunch
calls would need to wait for timeWindow milliseconds. However if the flow is
sparse then this option may make the
code run slower with no advantages.

Contributing

  1. Fork this repository to your own GitHub account and then clone it to your local device
  2. Move into the directory of the clone: cd async-sema
  3. Link it to the global module directory of Node.js: npm link

Inside the project where you want to test your clone of the package, you can now either use npm link async-sema to link the clone to the local dependencies.

Author

Olli Vanhoja (@OVanhoja) - ▲ZEIT

Generated by 🚫 dangerJS

*/
public async set(
user: User,
payload: UserMetadata
): Promise<Either<Error, boolean>> {
// In order to work properly, optimistic lock needs to be initialized on different
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where did you find this info ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested locally with the following test and the output was unexpected:

it.only("Test Optimistic Lock", async () => {
    const REDIS_CLIENT = createSimpleRedisClient("redis://localhost");
    const ums = new RedisUserMetadataStorage(REDIS_CLIENT);
    const meta = {
      metadata,
      version: 8
    };
    const metaR = await Promise.all([
      ums.set(aValidUser, meta),
      ums.set(aValidUser, meta),
      ums.set(aValidUser, meta)
    ]);
    console.log(metaR); // [ right(true), right(true), right(true) ]
});

So, I've read the example (second code snippet) here with the keyword another client into the description and with the modification (const duplicatedRedisClient = this.redisClient.duplicate();) the result was as expected [ left(Error: Concurrent write operation),right(true),left(Error: Concurrent write operation) ]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kudos, that's a good test

Copy link
Contributor

@gunzip gunzip Jan 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test is good, but as far as I understand (and the test seems to confirm it) the problem of concurrent updates happens only when you have mutiple redis clients trying to update the same resource (ie. different instances of the backend nodejs process / pod). so while it's right to use multiple clients inside the test, it is not right to duplicate the client inside the backend code (you just have to call watch before multi). this states even if it is the same client that tries to update the same resource (watch + multi "locks" the resource anyway, you don't have to duplicate the client).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gunzip I think that watch + multi scenario doesn't work on the same redis client instance, so if we don't duplicate the client when we start an Optimistic Lock on a key and a race condition happens on the same process/pod (it can occur) then OL doesn't work.

Copy link
Contributor

@gunzip gunzip Jan 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to recap: set() fails in case the value (in redis) that matches the key ${userMetadataPrefix}${user.fiscal_code} is changed after the call to watch() and before the call to multi() [1]. This means that another call to set() starts while the first call is till executing. This is the test code:

it.only("Test Optimistic Lock", async () => {
    const REDIS_CLIENT = createSimpleRedisClient("redis://localhost");
    const ums = new RedisUserMetadataStorage(REDIS_CLIENT);
    const meta = {
      metadata,
      version: 8
    };
    const metaR = await Promise.all([
      ums.set(aValidUser, meta),
      ums.set(aValidUser, meta),
      ums.set(aValidUser, meta)
    ]);
    console.log(metaR); // [ right(true), right(true), right(true) ]
});

Sorry if I'm nitpicking here, but I wonder, how can you tell from this code if the three calls to set() are interleaved in such a way that [1] is satisfied ? I mean, the test may returns
[ right(true), right(true), right(true) ] not because it cannot catch the race condition but because the race condition does not happen at all (or I'm missing something ?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I understand the point. To be sure of that I'll add a timeout before the multi phase, so will be certain that all tree calls happen simultaneously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the updated code:

public async set(
    user: User,
    payload: UserMetadata
  ): Promise<Either<Error, boolean>> {
    // In order to work properly, optimistic lock needs to be initialized on different
    // redis client instances
    // const duplicatedRedisClient = this.redisClient.duplicate();
    return await new Promise<Either<Error, boolean>>(resolve => {
      this.redisClient.watch(
        `${userMetadataPrefix}${user.fiscal_code}`,
        err => {
          if (err) {
            return resolve(left(err));
          }
          this.redisClient.get(
            `${userMetadataPrefix}${user.fiscal_code}`,
            (err1, response) => {
              if (err1 || response === null) {
                resolve(left(err1 || metadataNotFoundError));
              } else {
                setTimeout(() => {
                  this.redisClient
                    .multi()
                    .set(
                      `${userMetadataPrefix}${user.fiscal_code}`,
                      JSON.stringify(payload)
                    )
                    .exec((err2, results) => {
                      if (err2) {
                        return resolve(left(err2));
                      }
                      if (results === null) {
                        return resolve(left(concurrentWriteRejectionError));
                      }
                      resolve(this.singleStringReply(err, results[0]));
                    });
                }, 1000);
              }
            }
          );
        }
      );
    });
  }

As before, the result was [ right(true), right(true), right(true) ] 😄

Copy link
Contributor

@gunzip gunzip Jan 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-DELETED-

Ok lets proceed and merge this PR (I'm still not convinced that there could be issues aith a single client but I'm doing some test myself : )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've a found a comment that confirms what @BurnedMarshal says: redis/node-redis#1320 (comment)

We need one client for every set() call indeed which is sub-optimal since it adds some overhead (as stated in the reply).

*/
public async set(
user: User,
payload: UserMetadata
): Promise<Either<Error, boolean>> {
// In order to work properly, optimistic lock needs to be initialized on different
// redis client instances
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, please add a link to redis/node-redis#1320 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 809bc57

@codecov-io
Copy link

codecov-io commented Jan 24, 2020

Codecov Report

Merging #577 into master will increase coverage by 0.29%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #577      +/-   ##
==========================================
+ Coverage   82.99%   83.29%   +0.29%     
==========================================
  Files          46       46              
  Lines        1347     1371      +24     
  Branches      236      236              
==========================================
+ Hits         1118     1142      +24     
  Misses        219      219              
  Partials       10       10
Impacted Files Coverage Δ
src/services/redisUserMetadataStorage.ts 100% <100%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 71dc3e3...7679f2d. Read the comment docs.

@gunzip
Copy link
Contributor

gunzip commented Jan 28, 2020

lgtm so far but is has conflcts with master. a question: why we need a mutex to delete the fiscal code from the set here ?

  private async resetOperation(fiscalCode: FiscalCode): Promise<void> {
    await this.mutex.acquire();
    this.activeClients.delete(fiscalCode);
    this.mutex.release();
  }

@BurnedMarshal BurnedMarshal force-pushed the 167064659-set-metadata-optimistic-lock branch from 37fcb12 to 6df6a4f Compare January 29, 2020 07:58
@BurnedMarshal
Copy link
Contributor Author

lgtm so far but is has conflcts with master. a question: why we need a mutex to delete the fiscal code from the set here ?

  private async resetOperation(fiscalCode: FiscalCode): Promise<void> {
    await this.mutex.acquire();
    this.activeClients.delete(fiscalCode);
    this.mutex.release();
  }

@gunzip you are right. This mutex is useless for the nature of the shared variable (on/off values).
Fixed in 6df6a4f

@gunzip
Copy link
Contributor

gunzip commented Jan 29, 2020

@cloudify this lgtm. @BurnedMarshal added a mutex since duplicating the redis client for every update is an expensive ops (it opens a new socket every time). now the client waits to unlock a mutex before checking if there's another client doing the same operation. if it's ok for you we can proceed and merge this one.

return left(getUserMetadataResult.value);
// In order to work properly, optimistic lock needs to be initialized on different
// redis client instances @see https://github.com/NodeRedis/node_redis/issues/1320#issuecomment-373200541
await this.mutex.acquire();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to extract this logic into a kind of client pool class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple of suggestions on async-sema:

  1. review the defaults, are they good for us?
  2. use tryAcquire in place of acquire to avoid accumulating "concurrent threads", if the mutex can't be acquired, fail with an HTTP 429

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to extract this logic into a kind of client pool class

can you please elaborate a little bit more ? do you mean to create (at startup) a pool of N redisclients to reuse and change the mutex(1) to mutex(N) ?

do you think we can merge this PR as-is and implement the pooling in another PR ? (after the beta release date)

)
.chain(fromEither)
.run();
hasActiveClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should call a kind of dispose method in the aforementioned client pool

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants