Skip to content

Commit

Permalink
ensure token obtain from blocking acquire is the correct lease; chang…
Browse files Browse the repository at this point in the history
…e expiry to milliseconds instead of seconds
  • Loading branch information
Nicolas Pepin-Perreault committed Sep 9, 2016
1 parent 8e09e44 commit ae2d4f2
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions lib/restruct/hls/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ module Hls
# @see #acquire
# @see #release
# @see #locked
# @attr_reader [::String, nil] token The current token or nil
# @attr_reader [Fixnum] expiry The expiry of the underlying redis structures in seconds
# @attr_reader [Fixnum, nil] timeout The timeout to wait when attempting to acquire the lock, in seconds
# @attr_reader [::String, nil] token the current token or nil
# @attr_reader [Fixnum] expiry expiry of the underlying redis structures in milliseconds
# @attr_reader [Fixnum, nil] timeout the timeout to wait when attempting to acquire the lock, in seconds
class Lock < Restruct::Types::Base
include Restruct::Utils::Scriptable, Restruct::Utils::Coercion

# The default expiry on the underlying redis keys, in seconds
DEFAULT_EXPIRY = 10
# The default expiry on the underlying redis keys, in milliseconds
DEFAULT_EXPIRY = 1000

# The default timeout when blocking, in seconds; a nil value means it is non-blocking
DEFAULT_TIMEOUT = nil

attr_reader :token, :expiry, :timeout

# @param [Integer] expiry In seconds; to prevent infinite locking, each mutex is released after a certain expiry time
# @param [Integer] timeout In seconds; if > 0, will block for this amount of time when trying to obtain the lock
# @param [Integer] expiry in milliseconds; to prevent infinite locking, each mutex is released after a certain expiry time
# @param [Integer] timeout in seconds; if > 0, will block for this amount of time when trying to obtain the lock
def initialize(expiry: DEFAULT_EXPIRY, timeout: DEFAULT_TIMEOUT, **options)
super(**options)

Expand Down Expand Up @@ -65,8 +65,6 @@ def acquire

unless token.nil?
@token = token
# since it was popped from a list, need to update the expiry time of the lease, it could be old
@lease.expire(@expiry) if blocking?
acquired = true
end

Expand All @@ -91,23 +89,30 @@ def non_blocking_acquire(token = nil)

def blocking_acquire
timeout = @timeout == Float::INFINITY ? 0 : @timeout
return @tokens.pop(timeout: timeout)
token = @tokens.pop(timeout: timeout)

# Attempt to reacquire in a non blocking way to:
# 1) assert we do own the lock (edge case)
# 2) touch the lock expiry
token = non_blocking_acquire(token) unless token.nil?

return token
end
private :blocking_acquire

# The acquire script attempts to set the lease (keys[1]) to the given token (argv[1]), only
# if it wasn't already set. It then compares to check if the value of the lease is that of the token,
# and if so refreshes the expiry (argv[2]) time of the lease.
# @param [Array<(::String)>] keys The lease key specifying who owns the mutex at the moment
# @param [Array<(::String, Fixnum)>] argv The current token; the expiry time in seconds
# @param [Array<(::String, Fixnum)>] argv The current token; the expiry time in milliseconds
# @return [::String] Returns the token if acquired, nil otherwise.
defscript :acquire_script, <<~LUA
local token = ARGV[1]
local expiry = tonumber(ARGV[2])
redis.call('set', KEYS[1], token, 'NX')
if redis.call('get', KEYS[1]) == token then
redis.call('expire', KEYS[1], expiry)
redis.call('pexpire', KEYS[1], expiry)
return token
end
Expand All @@ -126,9 +131,9 @@ def blocking_acquire
local expiry = tonumber(ARGV[3])
if redis.call('get', KEYS[1]) == currentToken then
redis.call('set', KEYS[1], nextToken, 'EX', expiry)
redis.call('set', KEYS[1], nextToken, 'PX', expiry)
redis.call('lpush', KEYS[2], nextToken)
redis.call('expire', KEYS[2], expiry)
redis.call('pexpire', KEYS[2], expiry)
return true
end
Expand Down

0 comments on commit ae2d4f2

Please sign in to comment.