Skip to content

Commit

Permalink
feat: minor refactor with helper methods
Browse files Browse the repository at this point in the history
  • Loading branch information
stakach committed Apr 11, 2024
1 parent 8a0ac33 commit 4aeea9b
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 15 deletions.
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: redis_service_manager
version: 2.1.2
version: 3.0.0

dependencies:
ulid:
Expand Down
17 changes: 16 additions & 1 deletion spec/lookup_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ describe RedisServiceManager do
end

node1.register
sleep 0.1 # ensure node1 is master
node2.register
loop do
break if node1.cluster_size == 2
Expand All @@ -44,10 +45,24 @@ describe RedisServiceManager do

# Get the cluster state
lookup = Clustering::Discovery.new RedisServiceManager.new("spec", REDIS_URL)
hash = lookup.nodes
hash = lookup.rendezvous
hash.nodes.includes?("http://node1/node1").should be_true
hash.nodes.includes?("http://node2/node2").should be_true

lookup.find?("test").should eq URI.parse("http://node2/node2")
lookup.find("test").should eq URI.parse("http://node2/node2")

lookup["testing"].should eq URI.parse("http://node1/node1")
lookup["testing"]?.should eq URI.parse("http://node1/node1")

lookup_local = Clustering::Discovery.new node1
lookup_local.own_node?("testing").should be_true
lookup_local.own_node?("test").should be_false
lookup_local.nodes.should eq [
URI.parse("http://node1/node1"),
URI.parse("http://node2/node2"),
]

node2.unregister
node1.unregister
end
Expand Down
6 changes: 5 additions & 1 deletion src/redis_service_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ class RedisServiceManager < Clustering
{hash, new_list - delete}
end

def nodes : RendezvousHash
def rendezvous : RendezvousHash
if registered?
@rendezvous_hash
else
Expand All @@ -319,6 +319,10 @@ class RedisServiceManager < Clustering
RendezvousHash.new(nodes: keys.map { |key| hash[key] })
end
end

def finalize
unregister
end
end

require "./redis_service_manager/*"
5 changes: 4 additions & 1 deletion src/redis_service_manager/clustering.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ abstract class Clustering
getter rebalance_callbacks : Array((RendezvousHash, RebalanceComplete) ->)
getter cluster_stable_callbacks : Array(->)

# the service uri for this host
abstract def uri : String

# Called when the cluster has changed
def on_rebalance(&callback : (RendezvousHash, RebalanceComplete) ->)
rebalance_callbacks << callback
Expand All @@ -40,7 +43,7 @@ abstract class Clustering
abstract def watching? : Bool

# returns the list of known nodes
abstract def nodes : RendezvousHash
abstract def rendezvous : RendezvousHash
end

require "./clustering/*"
55 changes: 44 additions & 11 deletions src/redis_service_manager/clustering/discovery.cr
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
require "uri"
require "../clustering"

class Clustering::Discovery
def initialize(@cluster : Clustering, @ttl : Time::Span = 5.seconds)
@nodes = RendezvousHash.new
@rendezvous = RendezvousHash.new
@last_updated = Time.unix(0)
@timer = Time.monotonic - (@ttl + 1.second)
@mutex = Mutex.new
Expand All @@ -12,37 +13,69 @@ class Clustering::Discovery

getter last_updated : Time
getter rebalance_callbacks : Array(RendezvousHash ->)
getter uri : URI { URI.parse @cluster.uri }

def on_rebalance(&callback : RendezvousHash ->)
@rebalance_callbacks << callback
end

def nodes : RendezvousHash
if @cluster.watching? || !expired?
def rendezvous : RendezvousHash
if @cluster.watching?
@cluster.rendezvous
elsif !expired?
# this is up to date as we are listening to cluster.on_rebalance callback
# or the current list hasn't had its TTL expire
@nodes
@rendezvous
else
@mutex.synchronize do
if expired?
@nodes = @cluster.nodes
@rendezvous = @cluster.rendezvous
@last_updated = Time.utc
@timer = Time.monotonic
end
end
@nodes
@rendezvous
end
end

protected def update_node_list(nodes : RendezvousHash)
@nodes = nodes
# Consistent hash lookup
def find?(key : String) : URI?
rendezvous.find?(key).try { |node| URI.parse(node) }
end

# Consistent hash lookup
def find(key : String) : URI
URI.parse(rendezvous.find(key))
end

def [](key)
find(key)
end

def []?(key)
find?(key)
end

# Determine if key maps to current node
def own_node?(key : String) : Bool
# don't parse into URI
rendezvous.find?(key) == @cluster.uri
end

# Returns the list of node URIs from the `rendezvous-hash`
def nodes : Array(URI)
rendezvous.nodes.map { |node| URI.parse(node) }
end

protected def update_node_list(rendezvous : RendezvousHash)
@rendezvous = rendezvous
@last_updated = Time.utc
@timer = Time.monotonic
rebalance_callbacks.each { |callback| spawn { perform(callback, nodes) } }
rebalance_callbacks.each { |callback| spawn { perform(callback, rendezvous) } }
end

protected def perform(callback, nodes)
callback.call(nodes)
protected def perform(callback, rendezvous)
callback.call(rendezvous)
rescue error
Log.error(exception: error) { "rebalance callback failed" }
end
Expand Down

0 comments on commit 4aeea9b

Please sign in to comment.